Observe & Operate
With Redis/Postgres in place, it’s time to watch the system run. This guide covers telemetry, worker heartbeats, DLQ tooling, and the remote-control channel—all the pieces you need to operate Stem confidently.
1. Enable OpenTelemetry Export
Stem emits metrics, traces, and structured logs out of the box. Point it at an
OTLP endpoint (the repo ships a ready-made stack under examples/otel_metrics/):
# Start the example collector, Prometheus, and Grafana stack.
docker compose -f examples/otel_metrics/docker-compose.yml up
# Export OTLP details for producers and workers.
export STEM_OTLP_ENDPOINT=http://localhost:4318
export STEM_OTLP_HEADERS="authorization=Basic c3RlbTpwYXNz"
export STEM_OTLP_METRICS_INTERVAL=10s
export STEM_LOG_FORMAT=json
In Dart, no extra code is required—the env vars activate exporters. Metrics
include queue depth, retry counts, lease renewals, and worker concurrency;
traces connect Stem.enqueue spans with worker execution spans so you can
follow a task end-to-end.
If you want an explicit in-process configuration, wire metrics and tracing directly:
void configureMetrics() {
StemMetrics.instance.configure(exporters: [ConsoleMetricsExporter()]);
}
Stem buildTracedStem(
Broker broker,
ResultBackend backend,
TaskRegistry registry,
) {
// Configure OpenTelemetry globally; StemTracer.instance reads from it.
final _ = StemTracer.instance;
return Stem(
broker: broker,
registry: registry,
backend: backend,
);
}
2. Inspect Worker Heartbeats & Status
Workers publish detailed heartbeats (in-flight counts, leases, queues) to the result backend. Use the CLI to view them live:
# Snapshot the latest heartbeat for every worker.
stem worker status \
--backend "$STEM_RESULT_BACKEND_URL"
# Stream live updates (press Ctrl+C to stop).
stem worker status \
--broker "$STEM_BROKER_URL" \
--follow
From Dart you can pull the same data:
Future<void> listWorkerHeartbeats() async {
final backend = await RedisResultBackend.connect(
Platform.environment['STEM_RESULT_BACKEND_URL']!,
);
final heartbeats = await backend.listWorkerHeartbeats();
for (final hb in heartbeats) {
print('${hb.workerId} -> queues=${hb.queues} inflight=${hb.inflight}');
}
await backend.close();
}
3. Operate Workers via the Control Channel
Stem exposes a built-in control bus so you can interact with workers without SSH or custom wiring.
# Discover workers and latency.
stem worker ping
# Collect stats (queues, concurrency, runtimes) as JSON.
stem worker stats --json
# Revoke a problematic task globally (optionally terminate in-flight).
stem worker revoke \
--task 1f23c6a1-... \
--terminate \
# Issue a warm shutdown to drain work gracefully.
stem worker shutdown \
--worker default@host-a
Need to manage multiple instances on one host? Ship the bundled daemonization templates or lean on the multi-wrapper:
# Launch and supervise multiple workers with templated PID/log files.
stem worker multi start alpha beta \
--pidfile /var/run/stem/%n.pid \
--logfile /var/log/stem/%n.log \
--command "/usr/bin/dart run bin/worker.dart"
# Drain and stop the same fleet.
stem worker multi stop alpha beta
4. Manage Queues, Retries, and DLQ
The CLI exposes queues, retries, and dead letters so operators can recover quickly.
# Inspect queue depth and inflight counts.
stem observe queues
# Inspect worker snapshots from the result backend.
stem observe workers
# Inspect the dead-letter queue with pagination.
stem dlq list --queue default --limit 20
# Replay failed tasks back onto their original queues.
stem dlq replay --queue default --limit 10 --confirm
Behind the scenes the CLI talks to the same Redis data structures used by workers, so you see the exact state the runtime is using.
5. Alert on Scheduler Drift & Schedules
Beat records run history, drift, and errors. Keep an eye on it with:
stem observe schedules \
--file config/schedules.yaml
stem schedule dry-run --spec "every:5m" --count 5
These commands surface the same drift metrics your Grafana dashboards chart and help confirm schedule definitions before they go live.
6. React to Signals for Custom Integrations
Signals fire for task, worker, and scheduler lifecycle events. Wire them into chat, incident tooling, or analytics:
void installAnalytics() {
StemSignals.taskRetry.connect((payload, _) {
print('Task ${payload.envelope.name} retry ${payload.attempt}');
});
StemSignals.workerHeartbeat.connect((payload, _) {
if (payload.worker.queues.length > 100) {
// Send to your alerting system.
}
});
StemSignals.scheduleEntryFailed.connect((payload, _) {
print('Scheduler entry ${payload.entry.id} failed: ${payload.error}');
});
}
void registerSignals() {
StemSignals.taskRetry.connect((payload, _) {
metrics.recordRetry(delay: payload.nextRetryAt.difference(DateTime.now()));
});
StemSignals.workerHeartbeat.connect((payload, _) {
heartbeatGauge.set(1, tags: {'worker': payload.worker.id});
});
}
void logTaskStart(Envelope envelope) {
stemLogger.info(
'Task started',
Context({'task': envelope.name, 'id': envelope.id}),
);
}
Combine signal handlers with telemetry to build rich observability without scattering logic across the codebase.
7. Next Stop
You now have dashboards, CLI tooling, and remote control over workers. Finish the onboarding journey by applying security hardening, TLS, and production checklists in Prepare for Production.
If you want more hands-on drills:
- Run
example/ops_health_suiteto practicestem healthandstem observeflows. - Run
example/scheduler_observabilityto watch drift metrics and schedule signals.