Ir al contenido

SyncWatchdog

SyncWatchdog es un supervisor en memoria que detecta cuándo un replicador PouchDB deja de avanzar de forma silenciosa y lo expone como un evento de telemetría sync.state='stalled' y como un Observable de estado. Vive en tables-sdk (src/core/brokers/SyncWatchdog.ts) y lo instancia Database como database.syncWatchdog.

Complementa al broker de sincronización y al modelo general descrito en sincronización: no reemplaza la replicación, solo la observa.

La API de replicación de PouchDB emite eventos paused, active, error y complete, pero paused cubre dos situaciones muy distintas:

  • Idle saludable: el long-poll está abierto, el servidor no tiene cambios y el replicador espera. Se emite paused y, en cuanto llega un cambio, vuelve a emitirse active.
  • Atascado: el socket TCP murió (cambio de red móvil, app en background, corte del proxy) pero PouchDB no se rinde porque retry: true reintenta la request de forma transparente. No hay evento error, no hay log: el replicador simplemente queda congelado.

El síntoma reportado en producción es del tipo “Android dejó de sincronizar y volvió, no sabemos por qué”. SyncWatchdog hace observable ese hueco.

Database conecta los handlers de replicación con el watchdog. Cada dirección (pull, push) reporta su actividad mediante estos métodos:

MétodoOrigen en DatabaseEfecto
notePaused(direction, error?).on('paused')Marca la dirección como pausada y registra pausedSinceMs si aún no estaba pausada.
noteActive(direction).on('active')Limpia el estado pausado/estancado. Si venía de stalled, emite telemetría de recuperación.
noteChange(direction).on('change')Registra lastChangeAtMs: progreso real del replicador.
noteError(direction, message).on('error')Guarda lastErrorMessage para anexarlo a un futuro evento stalled.

SyncDirection es el tipo 'pull' | 'push'.

start() programa un setInterval que ejecuta tick() cada checkIntervalMs (por defecto 30 s). En cada ciclo, para cada dirección, evalúa:

const pausedFor = now - dir.pausedSinceMs;
const ageOfLastChangeMs = dir.lastChangeAtMs ? now - dir.lastChangeAtMs : null;
const noProgress = ageOfLastChangeMs === null || ageOfLastChangeMs >= this.stalledAfterMs;
if (!dir.isStalled && pausedFor >= this.stalledAfterMs && noProgress) {
// marcar stalled + emitir telemetría
}

Una dirección se considera estancada cuando se cumplen todas estas condiciones:

  1. La dirección está en estado paused (isPaused === true y pausedSinceMs definido).

  2. Lleva pausada de forma continua más de stalledAfterMs (por defecto 3 minutos): pausedFor >= stalledAfterMs.

  3. No hubo progreso en esa ventana: o nunca llegó un change (lastChangeAtMs === null), o el último cambio es más antiguo que stalledAfterMs.

  4. Aún no estaba marcada como stalled (la telemetría stalled se emite una sola vez por episodio, no en cada tick).

Cuando se marca como estancada, emite un evento de telemetría con:

{
"sync.type": "pull",
"sync.direction": "pull",
"sync.state": "stalled",
"sync.paused_for_seconds": 200,
"sync.last_change_age_seconds": 240,
"sync.error": "…último mensaje de error, si lo hubo"
}

Durante un long-poll sano PouchDB alterna pausedactivepaused muy rápido. Si llega un change mientras la dirección sigue marcada como pausada, noteChange adelanta pausedSinceMs al momento del cambio. Así, un estancamiento posterior se mide desde el último progreso real y sync.paused_for_seconds no queda inflado en el siguiente evento stalled.

Cuando vuelve a llegar active o change, el flag stalled se limpia:

  • noteActive pone isActive=true, isPaused=false, isStalled=false y borra pausedSinceMs. Si la dirección venía de stalled, emite un evento de telemetría sync.state='active' anotado con sync.paused_for_seconds, que es la métrica que responde “cuánto duró el hueco” y es consultable en el observabilidad.
  • noteChange limpia isStalled y vuelve a emitir un snapshot si la dirección estaba estancada.
class SyncWatchdog {
constructor(config?: SyncWatchdogConfig);
observe(): Observable<SyncStatus>; // snapshot reactivo (BehaviorSubject)
getStatus(): SyncStatus; // lectura síncrona del estado actual
start(): void; // arranca el chequeo periódico (idempotente)
stop(): void; // detiene el chequeo periódico (idempotente)
noteActive(direction: SyncDirection): void;
notePaused(direction: SyncDirection, error?: string): void;
noteChange(direction: SyncDirection): void;
noteError(direction: SyncDirection, message: string): void;
tick(now?: number): void; // hook de test: una evaluación síncrona
}
interface SyncWatchdogConfig {
checkIntervalMs?: number; // cada cuánto re-evalúa. Default 30_000 (30 s)
stalledAfterMs?: number; // umbral de pausa sin progreso. Default 180_000 (3 min)
}

observe() emite un SyncStatus cada vez que cambia cualquier dirección. getStatus() devuelve el valor actual de forma síncrona.

interface SyncStatus {
pull: SyncDirectionStatus;
push: SyncDirectionStatus;
updatedAt: number;
}
interface SyncDirectionStatus {
isActive: boolean;
isPaused: boolean;
isStalled: boolean;
pausedSinceMs: number | null;
lastChangeAtMs: number | null;
lastErrorMessage: string | null;
}

Todos los eventos pasan por telemetry.recordSyncEvent(...) y las llamadas están envueltas en try/catch: la telemetría nunca puede romper los handlers de sincronización. Los estados que produce este watchdog son:

  • sync.state='stalled' — al detectar el estancamiento (una vez por episodio).
  • sync.state='active' — al recuperarse desde stalled, con sync.paused_for_seconds.

Otros estados (paused, active normal, complete, errores) los emite directamente Database desde sus handlers de replicación.