import { prisma } from '../../config/database.js'; export interface SweepResult { pendingFound: number; runningFound: number; pendingMarked: number; runningMarked: number; entries: Array<{ id: string; tenantId: string; kind: 'pending-stale' | 'running-stale'; ageHours: number; }>; } const DEFAULT_RUNNING_HOURS_BY_TYPE: Record = { initial: 24, daily: 4, incremental: 2, custom: 24, }; /** * Watchdog para jobs `sat_sync_jobs` stale. * * Categorías: * 1. `pending` con `nextRetryAt` > pendingHours atrás. El cron horario * `retryTimedOutJobs` normalmente los retoma, pero si no arranca * (dev, caída, reinicio largo) el job queda colgado y bloquea el * lock para nuevos syncs del mismo (tenant, contribuyente). * * 2. `running` con `startedAt` > runningHours atrás. Thresholds difieren * por tipo: initial (8h) porque un bootstrap de 6 años puede tardar * varias horas; daily (4h); incremental (2h) porque es ventana corta. * Si lleva >threshold es casi seguro huérfano de un proceso que murió. * * Marca ambos como `failed` con `errorMessage` descriptivo. Idempotente * (volver a correrlo no reabre los ya-marcados-failed). * * - `apply=false` (default): dry-run, no toca BD. * - `pendingHours`: threshold pending (default 12h). * - `runningHours`: fallback threshold running si no se usa por-tipo (default 4h). * - `runningHoursByType`: override por tipo de sync. */ export async function sweepStaleSatJobs(params: { apply: boolean; pendingHours?: number; runningHours?: number; runningHoursByType?: Record; } = { apply: false }): Promise { const pendingHours = params.pendingHours ?? 12; const runningHoursByType = { ...DEFAULT_RUNNING_HOURS_BY_TYPE, ...(params.runningHoursByType || {}) }; const now = new Date(); const pendingCutoff = new Date(now.getTime() - pendingHours * 3600 * 1000); const stalePending = await prisma.satSyncJob.findMany({ where: { status: 'pending', nextRetryAt: { lt: pendingCutoff } }, orderBy: { createdAt: 'asc' }, }); // running: evaluar por tipo usando thresholds distintos const allRunning = await prisma.satSyncJob.findMany({ where: { status: 'running' }, orderBy: { createdAt: 'asc' }, }); const staleRunning = allRunning.filter(j => { const thresholdHours = runningHoursByType[j.type] ?? params.runningHours ?? 4; const cutoff = new Date(now.getTime() - thresholdHours * 3600 * 1000); return (j.startedAt ?? j.createdAt) < cutoff; }); const result: SweepResult = { pendingFound: stalePending.length, runningFound: staleRunning.length, pendingMarked: 0, runningMarked: 0, entries: [], }; for (const j of stalePending) { const ageHours = Math.round((now.getTime() - (j.nextRetryAt ?? j.createdAt).getTime()) / 3_600_000); result.entries.push({ id: j.id, tenantId: j.tenantId, kind: 'pending-stale', ageHours }); } for (const j of staleRunning) { const ageHours = Math.round((now.getTime() - (j.startedAt ?? j.createdAt).getTime()) / 3_600_000); result.entries.push({ id: j.id, tenantId: j.tenantId, kind: 'running-stale', ageHours }); } if (!params.apply) return result; for (const j of stalePending) { await prisma.satSyncJob.update({ where: { id: j.id }, data: { status: 'failed', completedAt: now, errorMessage: `Abandoned by watchdog: pending with nextRetryAt ${j.nextRetryAt?.toISOString()} > ${pendingHours}h in the past. Retry cron didn't pick it up.`, }, }); result.pendingMarked++; } for (const j of staleRunning) { const thresholdHours = runningHoursByType[j.type] ?? params.runningHours ?? 4; await prisma.satSyncJob.update({ where: { id: j.id }, data: { status: 'failed', completedAt: now, errorMessage: `Abandoned by watchdog: running ${j.type} with startedAt ${j.startedAt?.toISOString()} > ${thresholdHours}h (process crash / orphan). SAT request is lost; re-launch manually.`, }, }); result.runningMarked++; } return result; }