Skip to main content

Observability

Instrument Stem applications with built-in metrics, traces, and lifecycle signals.

Metrics

Stem exports OpenTelemetry metrics via StemMetrics. Enable exporters with environment variables or programmatically.

export STEM_METRIC_EXPORTERS=otlp:http://localhost:4318/v1/metrics
export STEM_OTLP_ENDPOINT=http://localhost:4318
void configureMetrics() {
StemMetrics.instance.configure(exporters: [ConsoleMetricsExporter()]);
}

Common metric names:

MetricTypeDescription
stem.tasks.startedCounterIncremented when a worker begins executing a task.
stem.tasks.succeeded / stem.tasks.failedCounterOutcome counters per task/queue.
stem.tasks.retriedCounterNumber of retries scheduled.
stem.task.durationHistogramTask execution time in milliseconds.
stem.worker.concurrencyGaugeCurrent active isolates vs configured concurrency.
stem.worker.inflightGaugeMessages currently reserved by the worker.

Tracing

Wrap enqueue and task handlers with traces by enabling the built-in tracer.

export STEM_TRACE_EXPORTER=otlp:http://localhost:4318/v1/traces
Stem buildTracedStem(
Broker broker,
ResultBackend backend,
TaskRegistry registry,
) {
// Configure OpenTelemetry globally; StemTracer.instance reads from it.
final _ = StemTracer.instance;
return Stem(
broker: broker,
registry: registry,
backend: backend,
);
}

Traces include spans for stem.enqueue, stem.consume, and task execution. Use attributes (stem.task, stem.queue, stem.retry.attempt) to filter in your tracing backend.

Signals

StemSignals fire lifecycle hooks for tasks, workers, scheduler events, and control-plane commands.

void registerSignals() {
StemSignals.taskRetry.connect((payload, _) {
metrics.recordRetry(delay: payload.nextRetryAt.difference(DateTime.now()));
});

StemSignals.workerHeartbeat.connect((payload, _) {
heartbeatGauge.set(1, tags: {'worker': payload.worker.id});
});
}

Workflow Introspection

Workflow runtimes can emit step-level events (started/completed/failed/retrying) through a WorkflowIntrospectionSink. Use it to publish step telemetry or bridge to your own tracing/logging systems.

class LoggingWorkflowIntrospectionSink implements WorkflowIntrospectionSink {

Future<void> recordStepEvent(WorkflowStepEvent event) async {
stemLogger.info('workflow.step', {
'run': event.runId,
'workflow': event.workflow,
'step': event.stepId,
'type': event.type.name,
'iteration': event.iteration,
});
}
}

Logging

Use stemLogger (Contextual logger) for structured logs.

void logTaskStart(Envelope envelope) {
stemLogger.info(
'Task started',
Context({'task': envelope.name, 'id': envelope.id}),
);
}

Workers automatically include attempt, queue, and worker id in log contexts when StemSignals are enabled.

Health checks

Run the CLI to verify connectivity before deployments:

stem health --broker "$STEM_BROKER_URL" --backend "$STEM_RESULT_BACKEND_URL"

This checks broker/back-end reachability, TLS certificates, and signing configuration, returning a non-zero exit code on failure.

Dashboards

A minimal dashboard typically charts:

  • Task throughput (stem.tasks.started, stem.tasks.succeeded, stem.tasks.failed).
  • Retry delay distribution (stem.tasks.retried, stem.task.duration).
  • Worker heartbeats and concurrency (stem.worker.concurrency, stem.worker.inflight).
  • Scheduler drift (StemSignals.onScheduleEntryDispatched drift metrics).

Exporters can be mixed—enable console during development and OTLP in staging/ production. For local exploration, run the examples/otel_metrics stack to see metrics in a collector + Jaeger pipeline.