Deep Dive: Worker Heartbeat and Job Recovery
How to handle worker crashes in a distributed job queue—using database heartbeats, startup reconciliation, and distributed stale job detection.
Deep Dive: Worker Heartbeat and Job Recovery
Making sure jobs don’t get lost when workers die
The Code Evolution Analyzer has workers that process long-running analysis jobs (30 seconds to 10+ minutes per repository). Workers sometimes crash. Pods get killed. Network partitions happen. Without careful handling, jobs silently disappear.
This is the story of building a heartbeat system that automatically detects and recovers lost jobs.
The Problem
NATS JetStream provides durable message queues with at-least-once delivery. A worker pulls a message, processes it, and acknowledges completion. Simple enough—until a worker dies mid-job.
When a worker crashes:
- The in-progress job is half-finished (files cloned, analysis partial)
- The NATS message is unacknowledged but the worker can’t respond
- NATS waits for the
ack_waittimeout (10 minutes) before redelivery - But the job is already marked “processing” in the database
- When NATS redelivers, another worker sees “already processing” and skips it
Result: job stuck forever in “processing” state with no worker actually working on it.
The Solution: Database Heartbeats
The fix is to track worker activity in the database, not just the message queue:
-- Jobs table additions
ALTER TABLE jobs ADD COLUMN worker_id TEXT;
ALTER TABLE jobs ADD COLUMN last_activity_at TIMESTAMP WITH TIME ZONE;
-- Index for heartbeat queries
CREATE INDEX idx_jobs_processing_activity
ON jobs(status, last_activity_at)
WHERE status = 'processing';
Every worker gets a unique identity:
import os from 'os';
import crypto from 'crypto';
// Unique per process: hostname-pid-random
const WORKER_ID = `${os.hostname()}-${process.pid}-${crypto.randomBytes(4).toString('hex')}`;
// Example: "cloc-analyzer-worker-0-12345-abc123"
The identity includes hostname (for Kubernetes StatefulSet pod names), PID (survives container restarts within a pod), and random bytes (handles edge cases).
Claiming Jobs
When a worker pulls a job from NATS, it tries to claim ownership atomically:
async function claimJob(jobId) {
const result = await db.query(`
UPDATE jobs
SET worker_id = $1,
last_activity_at = NOW(),
status = 'processing'
WHERE id = $2
AND status IN ('queued', 'processing')
AND (
worker_id IS NULL -- No one owns it
OR worker_id = $1 -- We already own it (reconnect)
OR last_activity_at < NOW() - INTERVAL '5 minutes' -- Stale owner
)
RETURNING id
`, [WORKER_ID, jobId]);
return result.rows.length > 0; // True if we got it
}
The claim succeeds only if:
- No worker owns the job yet (
worker_id IS NULL) - We’re reclaiming our own job (e.g., NATS redelivery during reconnect)
- The previous owner hasn’t updated activity in 5 minutes (presumed dead)
If another worker just claimed it, our claim fails and we skip the job:
async function processJob(msg) {
const { job_id, repo_url } = JSON.parse(msg.data);
const claimed = await claimJob(job_id);
if (!claimed) {
logger.info({
event: 'job_skipped_active_elsewhere',
jobId: job_id,
}, 'Job is being processed by another worker');
msg.ack(); // Acknowledge to prevent NATS redelivery
return;
}
// Process the job...
}
Heartbeat Updates
During job processing, every progress update also updates the heartbeat:
async function updateProgress(jobId, stage, progress, message) {
await db.query(`
UPDATE jobs
SET stage = $1,
progress = $2,
message = $3,
last_activity_at = NOW() -- Heartbeat!
WHERE id = $4
`, [stage, progress, message, jobId]);
}
As long as the worker is alive and making progress, last_activity_at stays fresh. When it stops updating, we know something’s wrong.
Startup Reconciliation
When a worker starts (or restarts after a crash), it checks for orphaned jobs:
async function reconcileOrphanedJobs() {
// Find jobs assigned to our identity (from previous process instance)
const orphanedJobs = await db.query(`
SELECT id, repo_url, status, stage
FROM jobs
WHERE worker_id = $1 AND status IN ('queued', 'processing')
`, [WORKER_ID]);
if (orphanedJobs.rows.length === 0) {
logger.info({ event: 'reconciliation_complete', orphanedCount: 0 });
return;
}
logger.warn({
event: 'orphaned_jobs_detected',
count: orphanedJobs.rows.length,
jobs: orphanedJobs.rows.map(j => j.id),
});
// Clear and re-enqueue each orphaned job
for (const job of orphanedJobs.rows) {
await clearAndReenqueueJob(job.id, job.repo_url, 'Worker restarted');
}
}
This works because WORKER_ID includes the PID. A restarted process has a different PID, but the same hostname. We query using the new identity and find jobs from the old identity… wait, that doesn’t work.
Actually, the query above has a subtle bug. Let me show the real implementation:
// Generate worker ID
const WORKER_ID = `${os.hostname()}-${process.pid}-${crypto.randomBytes(4).toString('hex')}`;
// On startup: look for ANY jobs assigned to THIS hostname pattern
// (previous incarnations would have same hostname, different pid/random)
async function reconcileOrphanedJobs() {
const hostPattern = `${os.hostname()}-%`; // Match any PID/random suffix
const orphanedJobs = await db.query(`
SELECT id, repo_url, status, stage, worker_id
FROM jobs
WHERE worker_id LIKE $1
AND worker_id != $2 -- Not OUR current ID
AND status IN ('queued', 'processing')
`, [hostPattern, WORKER_ID]);
// ... clear and re-enqueue
}
Jobs from previous incarnations of the same pod (same hostname, different PID) get detected and recovered.
Periodic Stale Job Detection
Startup reconciliation handles worker restarts. But what if a worker crashes and never comes back? Or what if it hangs without dying?
Every 30 seconds, all workers check for stale jobs system-wide:
const HEARTBEAT_INTERVAL_MS = 30 * 1000; // 30 seconds
const STALE_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes
setInterval(async () => {
const staleJobs = await db.query(`
SELECT id, repo_url, worker_id, last_activity_at
FROM jobs
WHERE status = 'processing'
AND (
last_activity_at IS NULL
OR last_activity_at < NOW() - INTERVAL '5 minutes'
)
ORDER BY last_activity_at ASC NULLS FIRST
LIMIT 100
`);
if (staleJobs.rows.length === 0) return;
logger.warn({
event: 'stale_jobs_detected',
count: staleJobs.rows.length,
});
for (const job of staleJobs.rows) {
await clearAndReenqueueJob(job.id, job.repo_url,
`Worker ${job.worker_id} stopped responding`);
}
}, HEARTBEAT_INTERVAL_MS);
Any worker can clear stale jobs, not just the one that owned them. This provides distributed recovery—even if only one worker is healthy, it will eventually clean up all the mess.
Re-enqueue Logic
Clearing a job means resetting it to queued state and re-publishing to NATS:
async function clearAndReenqueueJob(jobId, repoUrl, reason) {
// Check current state
const job = await db.query('SELECT status FROM jobs WHERE id = $1', [jobId]);
// Don't touch completed/failed jobs
if (job.rows[0]?.status === 'completed' || job.rows[0]?.status === 'failed') {
return;
}
// Reset to queued state
await db.query(`
UPDATE jobs SET
status = 'queued',
worker_id = NULL,
last_activity_at = NULL,
started_at = NULL,
message = $1
WHERE id = $2
`, [`Cleared: ${reason}`, jobId]);
// Re-publish to NATS for any worker to pick up
await js.publish('jobs.analyze', JSON.stringify({
job_id: jobId,
repo_url: repoUrl,
cleared: true,
clear_reason: reason,
}));
logger.info({
event: 'job_reenqueued',
jobId,
reason,
});
}
The cleared: true flag tells workers this is a recovery situation, useful for logging and metrics.
Race Condition Handling
What if two workers detect the same stale job simultaneously? Both try to clear it, both re-publish to NATS, and suddenly we have duplicate jobs.
The fix is in the claim logic: even if a job appears multiple times in NATS, only one worker successfully claims it:
// Worker A and Worker B both get the NATS message
// Worker A tries to claim first:
// UPDATE ... SET worker_id = 'A' WHERE worker_id IS NULL
// → Success! worker_id is now 'A'
// Worker B tries to claim:
// UPDATE ... SET worker_id = 'B' WHERE worker_id IS NULL
// → Fails! worker_id is 'A', not NULL
// Worker B skips the job and acks the message
Duplicate NATS messages are harmless because the database provides the single source of truth for job ownership.
Configuration Trade-offs
The key tuning parameters:
// Stale threshold: how long before we consider a job abandoned
const STALE_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes
// Heartbeat interval: how often to check for stale jobs
const HEARTBEAT_INTERVAL_MS = 30 * 1000; // 30 seconds
// NATS ack timeout: how long before NATS redelivers
// Should be > STALE_THRESHOLD_MS to avoid racing
ack_wait: 10 * 60 * 1000 * 1000000 // 10 minutes (nanoseconds)
For faster recovery: Reduce stale threshold to 2-3 minutes. Risk: slow jobs get incorrectly marked as stale.
For fewer false positives: Increase stale threshold to 10 minutes. Risk: slower recovery from actual crashes.
The NATS ack_wait should exceed the stale threshold so that database-based recovery happens before NATS redelivery. Otherwise, you get a race between two recovery mechanisms.
Recovery Scenarios
Scenario 1: Worker Pod Restart
T+0:00 Worker-0 processing job ABC
T+0:05 Worker-0 pod killed (kubectl delete, OOM, etc.)
T+0:06 New Worker-0 pod starts
T+0:07 Startup reconciliation finds job ABC (same hostname pattern)
T+0:08 Job ABC cleared and re-enqueued
T+0:09 Worker-1 picks up job ABC from NATS
Recovery time: ~3 seconds (immediate on startup)
Scenario 2: Worker Crash, No Restart
T+0:00 Worker-1 processing job XYZ
T+0:30 Worker-1 crashes (segfault, OOM killer)
T+0:30 Pod restarts but takes 3 minutes to become ready
T+5:30 Worker-0's heartbeat check finds XYZ stale (5 min no activity)
T+5:31 Worker-0 clears and re-enqueues job XYZ
T+5:32 Worker-0 picks up and completes job XYZ
Recovery time: ~5-6 minutes (stale threshold + heartbeat interval)
Scenario 3: Worker Deadlock
T+0:00 Worker-0 processing job DEF
T+1:00 Worker-0 deadlocks (infinite loop, blocked I/O)
T+1:00 Progress updates stop, last_activity_at freezes
T+6:00 Worker-1's heartbeat check finds DEF stale
T+6:01 Worker-1 clears and re-enqueues job DEF
T+6:02 Worker-1 processes job DEF
Recovery time: 5-6 minutes (same as crash scenario)
What This Doesn’t Handle
-
Short-lived failures: If processing fails but the worker stays alive, heartbeats continue normally. The job fails through normal error handling, not heartbeat detection.
-
Network partitions: If a worker can reach NATS but not PostgreSQL, it can’t update heartbeats. Jobs may appear stale even though the worker is “alive.” Acceptable trade-off—incomplete processing should be retried.
-
Clock skew: All timestamps use database server time (
NOW()), so worker clock drift doesn’t matter. But extreme database clock jumps could cause issues. -
Partial work: Cleared jobs start over from scratch. Incremental recovery (resuming mid-analysis) would need application-level checkpointing.
Lessons Learned
Database as truth, queue for coordination: NATS delivers messages, PostgreSQL tracks ownership. Trying to do both with just the queue is fragile.
Unique identity is hard: PID alone isn’t unique (reused after crash). Hostname alone isn’t unique (multiple pods). Random bytes alone aren’t deterministic. The combination hostname-pid-random covers all cases.
Recovery mechanisms can race: If NATS redelivery and heartbeat detection both try to recover the same job, you get duplicates. One must clearly win—in our case, database claims are the arbiter.
5 minutes is a reasonable stale threshold: Short enough to recover promptly, long enough to avoid false positives from slow but healthy processing.
See also: Building a Code Evolution Analyzer in a Weekend — the full project story
See also: Deep Dive: Microservices Migration — the emulator’s similar architecture