SyncWatchdog
SyncWatchdog es un supervisor en memoria que detecta cuándo un replicador
PouchDB deja de avanzar de forma silenciosa y lo expone como un evento de
telemetría sync.state='stalled' y como un Observable de estado. Vive en
tables-sdk (src/core/brokers/SyncWatchdog.ts) y lo instancia Database
como database.syncWatchdog.
Complementa al broker de sincronización y al modelo general descrito en sincronización: no reemplaza la replicación, solo la observa.
El problema que resuelve
Sección titulada «El problema que resuelve»La API de replicación de PouchDB emite eventos paused, active, error y
complete, pero paused cubre dos situaciones muy distintas:
- Idle saludable: el long-poll está abierto, el servidor no tiene cambios
y el replicador espera. Se emite
pausedy, en cuanto llega un cambio, vuelve a emitirseactive. - Atascado: el socket TCP murió (cambio de red móvil, app en background,
corte del proxy) pero PouchDB no se rinde porque
retry: truereintenta la request de forma transparente. No hay eventoerror, no hay log: el replicador simplemente queda congelado.
El síntoma reportado en producción es del tipo “Android dejó de sincronizar y
volvió, no sabemos por qué”. SyncWatchdog hace observable ese hueco.
Entradas (heartbeat)
Sección titulada «Entradas (heartbeat)»Database conecta los handlers de replicación con el watchdog. Cada dirección
(pull, push) reporta su actividad mediante estos métodos:
| Método | Origen en Database | Efecto |
|---|---|---|
notePaused(direction, error?) | .on('paused') | Marca la dirección como pausada y registra pausedSinceMs si aún no estaba pausada. |
noteActive(direction) | .on('active') | Limpia el estado pausado/estancado. Si venía de stalled, emite telemetría de recuperación. |
noteChange(direction) | .on('change') | Registra lastChangeAtMs: progreso real del replicador. |
noteError(direction, message) | .on('error') | Guarda lastErrorMessage para anexarlo a un futuro evento stalled. |
SyncDirection es el tipo 'pull' | 'push'.
Lógica de detección
Sección titulada «Lógica de detección»start() programa un setInterval que ejecuta tick() cada checkIntervalMs
(por defecto 30 s). En cada ciclo, para cada dirección, evalúa:
const pausedFor = now - dir.pausedSinceMs;const ageOfLastChangeMs = dir.lastChangeAtMs ? now - dir.lastChangeAtMs : null;const noProgress = ageOfLastChangeMs === null || ageOfLastChangeMs >= this.stalledAfterMs;
if (!dir.isStalled && pausedFor >= this.stalledAfterMs && noProgress) { // marcar stalled + emitir telemetría}Una dirección se considera estancada cuando se cumplen todas estas condiciones:
-
La dirección está en estado
paused(isPaused === trueypausedSinceMsdefinido). -
Lleva pausada de forma continua más de
stalledAfterMs(por defecto 3 minutos):pausedFor >= stalledAfterMs. -
No hubo progreso en esa ventana: o nunca llegó un
change(lastChangeAtMs === null), o el último cambio es más antiguo questalledAfterMs. -
Aún no estaba marcada como
stalled(la telemetríastalledse emite una sola vez por episodio, no en cadatick).
Cuando se marca como estancada, emite un evento de telemetría con:
{ "sync.type": "pull", "sync.direction": "pull", "sync.state": "stalled", "sync.paused_for_seconds": 200, "sync.last_change_age_seconds": 240, "sync.error": "…último mensaje de error, si lo hubo"}Cómo noteChange evita falsos positivos
Sección titulada «Cómo noteChange evita falsos positivos»Durante un long-poll sano PouchDB alterna paused → active → paused muy
rápido. Si llega un change mientras la dirección sigue marcada como pausada,
noteChange adelanta pausedSinceMs al momento del cambio. Así, un
estancamiento posterior se mide desde el último progreso real y
sync.paused_for_seconds no queda inflado en el siguiente evento stalled.
Recuperación y limpieza
Sección titulada «Recuperación y limpieza»Cuando vuelve a llegar active o change, el flag stalled se limpia:
noteActiveponeisActive=true,isPaused=false,isStalled=falsey borrapausedSinceMs. Si la dirección venía destalled, emite un evento de telemetríasync.state='active'anotado consync.paused_for_seconds, que es la métrica que responde “cuánto duró el hueco” y es consultable en el observabilidad.noteChangelimpiaisStalledy vuelve a emitir un snapshot si la dirección estaba estancada.
API pública
Sección titulada «API pública»class SyncWatchdog { constructor(config?: SyncWatchdogConfig);
observe(): Observable<SyncStatus>; // snapshot reactivo (BehaviorSubject) getStatus(): SyncStatus; // lectura síncrona del estado actual
start(): void; // arranca el chequeo periódico (idempotente) stop(): void; // detiene el chequeo periódico (idempotente)
noteActive(direction: SyncDirection): void; notePaused(direction: SyncDirection, error?: string): void; noteChange(direction: SyncDirection): void; noteError(direction: SyncDirection, message: string): void;
tick(now?: number): void; // hook de test: una evaluación síncrona}Configuración
Sección titulada «Configuración»interface SyncWatchdogConfig { checkIntervalMs?: number; // cada cuánto re-evalúa. Default 30_000 (30 s) stalledAfterMs?: number; // umbral de pausa sin progreso. Default 180_000 (3 min)}Estado observable
Sección titulada «Estado observable»observe() emite un SyncStatus cada vez que cambia cualquier dirección.
getStatus() devuelve el valor actual de forma síncrona.
interface SyncStatus { pull: SyncDirectionStatus; push: SyncDirectionStatus; updatedAt: number;}
interface SyncDirectionStatus { isActive: boolean; isPaused: boolean; isStalled: boolean; pausedSinceMs: number | null; lastChangeAtMs: number | null; lastErrorMessage: string | null;}Telemetría emitida
Sección titulada «Telemetría emitida»Todos los eventos pasan por telemetry.recordSyncEvent(...) y las llamadas
están envueltas en try/catch: la telemetría nunca puede romper los handlers
de sincronización. Los estados que produce este watchdog son:
sync.state='stalled'— al detectar el estancamiento (una vez por episodio).sync.state='active'— al recuperarse desdestalled, consync.paused_for_seconds.
Otros estados (paused, active normal, complete, errores) los emite
directamente Database desde sus handlers de replicación.