Reliability Guide
This guide summarizes reliability practices for task systems using Stem.
Recovery workflow
- Identify the failing task or queue.
- Inspect recent errors and DLQ entries.
- Fix the root cause before replaying.
- Replay only the affected tasks.
Broker fetch notes
- Redis Streams uses consumer groups plus
XAUTOCLAIMto reclaim idle deliveries; long-running tasks should emit heartbeats or extend leases. - Postgres uses polling with
locked_untilleases; tasks become visible again after the lease expires.
Workflow lease notes
- Workflow runs are lease-based. Workers must renew leases while executing, and other workers can take over after the lease expires.
- Keep
runLeaseDuration>= broker visibility timeout to prevent redelivered workflow tasks from being dropped before takeover is possible. - Keep
leaseExtensionrenewals ahead of both the workflow lease expiry and the broker visibility timeout.
Poison-pill handling
- If a task fails repeatedly for the same reason, treat it as a poison pill.
- Move it to the DLQ and add guardrails or validation to prevent repeats.
- Record the failure pattern for future detection.
Scheduler reliability
- Run multiple Beat instances only when backed by a shared lock store.
- Monitor schedule drift and failures to detect store latency.
- Re-apply schedules after deploys to ensure definitions stay current.
Retries and backoff
- Use bounded retries with jittered backoff to avoid thundering herds.
- Separate transient failures from permanent failures.
- For permanent errors, fail fast and alert.
- Configure a jittered retry strategy
- Log retry signals and outcomes
- Create a task that exercises retries
retry_task/bin/worker.dart
final worker = Worker(
broker: broker,
registry: registry,
backend: backend,
queue: 'retry-demo',
consumerName: workerName,
retryStrategy: ExponentialJitterRetryStrategy(
base: const Duration(milliseconds: 200),
max: const Duration(seconds: 1),
),
);
retry_task/lib/shared.dart
List<SignalSubscription> attachLogging(String label) {
String prefix(String event) => '[retry][$label][$event]';
void log(String event, Map<String, Object?> data) {
// ignore: avoid_print
print('${prefix(event)} ${jsonEncode(data)}');
}
return <SignalSubscription>[
StemSignals.onBeforeTaskPublish((payload, context) {
log('before_task_publish', {
'task': payload.envelope.name,
'id': payload.envelope.id,
'attempt': payload.attempt,
'sender': context.sender,
});
}),
StemSignals.onTaskPrerun((payload, _) {
log('task_prerun', {
'task': payload.envelope.name,
'id': payload.envelope.id,
'attempt': payload.context.attempt,
'worker': payload.worker.id,
});
}),
StemSignals.onTaskRetry((payload, _) {
log('task_retry', {
'task': payload.envelope.name,
'id': payload.envelope.id,
'attempt': payload.attempt,
'reason': payload.reason.toString(),
'nextRunAt': payload.nextRetryAt.toIso8601String(),
'worker': payload.worker.id,
});
}),
StemSignals.onTaskFailure((payload, _) {
log('task_failed', {
'task': payload.envelope.name,
'id': payload.envelope.id,
'attempt': payload.attempt,
'worker': payload.worker.id,
'error': payload.error.toString(),
});
}),
StemSignals.onTaskPostrun((payload, _) {
log('task_postrun', {
'task': payload.envelope.name,
'id': payload.envelope.id,
'state': payload.state.name,
'worker': payload.worker.id,
});
}),
];
}
retry_task/lib/shared.dart
FutureOr<void> _alwaysFailEntrypoint(
TaskInvocationContext context,
Map<String, Object?> args,
) {
final attempt = context.attempt;
final max = context.meta['maxRetries'] ?? 3;
// ignore: avoid_print
print('[retry][task][attempt] {"attempt":$attempt,"max":$max}');
throw StateError('Simulated failure on attempt $attempt');
}
Heartbeats and progress
Use heartbeats and progress updates to prevent long-running tasks from being reclaimed prematurely.
- Configure worker heartbeat intervals
- Emit progress and heartbeats inside a task
- Stream worker events for observability
progress_heartbeat/bin/worker.dart
final worker = Worker(
broker: broker,
registry: registry,
backend: backend,
queue: progressQueue,
subscription: RoutingSubscription.singleQueue(progressQueue),
consumerName: workerName,
heartbeatInterval: const Duration(seconds: 2),
workerHeartbeatInterval: const Duration(seconds: 5),
prefetchMultiplier: 1,
);
progress_heartbeat/lib/shared.dart
class ProgressTask extends TaskHandler<String> {
String get name => 'progress.demo';
TaskOptions get options => const TaskOptions(queue: progressQueue);
TaskMetadata get metadata => const TaskMetadata(
description: 'Task that reports progress and heartbeats on a loop.',
tags: ['progress', 'heartbeat'],
);
Future<String> call(TaskContext context, Map<String, Object?> args) async {
final steps = (args['steps'] as num?)?.toInt() ?? 10;
final delayMs = (args['delayMs'] as num?)?.toInt() ?? 800;
stdout.writeln('[task][start] id=${context.id} steps=$steps');
for (var i = 0; i < steps; i += 1) {
context.heartbeat();
await context.progress(
(i + 1) / steps,
data: {'step': i + 1, 'total': steps},
);
await Future<void>.delayed(Duration(milliseconds: delayMs));
}
stdout.writeln('[task][done] id=${context.id}');
return 'ok';
}
}
progress_heartbeat/lib/shared.dart
void attachWorkerEventLogging(Worker worker) {
worker.events.listen((event) {
switch (event.type) {
case WorkerEventType.progress:
stdout.writeln(
'[event][progress] id=${event.envelope?.id} progress=${event.progress} data=${event.data}',
);
break;
case WorkerEventType.heartbeat:
stdout.writeln('[event][heartbeat] id=${event.envelopeId}');
break;
case WorkerEventType.completed:
stdout.writeln('[event][completed] id=${event.envelope?.id}');
break;
case WorkerEventType.failed:
stdout.writeln('[event][failed] id=${event.envelope?.id}');
break;
case WorkerEventType.revoked:
stdout.writeln('[event][revoked] id=${event.envelope?.id}');
break;
case WorkerEventType.retried:
case WorkerEventType.timeout:
case WorkerEventType.error:
stdout.writeln('[event][${event.type.name}] id=${event.envelope?.id}');
break;
}
});
}
Observability signals
- Track retry rates and DLQ volume as reliability signals.
- Monitor queue backlog and worker heartbeats to detect stalls.
- Tie task IDs to business logs for fast root-cause analysis.
- Use
StemSignals.taskRetry/taskFailedto drive notifications when error rates spike.
Operational checks
stem health --broker "$STEM_BROKER_URL" --backend "$STEM_RESULT_BACKEND_URL"
stem observe queues
stem observe workers
stem dlq list --queue <queue>