Stem Signals
Stem exposes lifecycle signals so instrumentation can react to publish, worker, scheduler, workflow, and control-plane events without modifying runtime code.
All signal payloads implement StemEvent and dispatch through
Signal<T extends StemEvent>, giving every handler a shared event shape:
eventNameoccurredAtattributes
Signal Catalog
| Category | Stem Signal | Payload Highlights | Celery Equivalent |
|---|---|---|---|
| Publish | beforeTaskPublish, afterTaskPublish | Envelope, attempt metadata, task id | before_task_publish, after_task_publish |
| Worker lifecycle | workerInit, workerReady, workerStopping, workerShutdown, workerHeartbeat, workerChildInit, workerChildShutdown | WorkerInfo, optional reason/timestamps | worker_init, worker_ready, worker_shutting_down, worker_shutdown, heartbeat_sent, worker_process_init/shutdown |
| Task lifecycle | taskReceived, taskPrerun, taskPostrun, taskRetry, taskSucceeded, taskFailed, taskRevoked | Envelope, WorkerInfo, attempt, result/error context | task_received, task_prerun, task_postrun, task_retry, task_success, task_failure, task_revoked |
| Workflow lifecycle | workflowRunStarted, workflowRunSuspended, workflowRunResumed, workflowRunCompleted, workflowRunFailed, workflowRunCancelled | run id, workflow name, status, optional step metadata | n/a |
| Scheduler | scheduleEntryDue, scheduleEntryDispatched, scheduleEntryFailed | ScheduleEntry, tick timestamp, drift, error stack | beat_scheduler_ready, beat_schedule |
| Control plane | controlCommandReceived, controlCommandCompleted | ControlCommandMessage, reply status, payload/error maps | control_command_sent, control_command_received |
Ordering & Dispatch Semantics
beforeTaskPublishfires immediately before broker IO;afterTaskPublishruns once persistence succeeds.taskReceivedis emitted when a worker claims/dequeues a task.taskPrerunfires immediately before handler invocation.- Execution ordering is
taskReceived->taskPrerun-> handler ->taskPostrun. - Worker lifecycle follows
workerInit->workerReady-> optionalworkerStopping->workerShutdown. - Scheduler signals emit due -> dispatched/failed.
- Dispatch is sequential and priority-aware;
asynccallbacks are awaited. - Listener errors are routed to
StemSignals.configure(onError: ...)and do not crash the worker loop. SignalContext.cancel()stops lower-priority listeners for the current emit.
Configuration
Use StemSignals.configure or supply environment variables consumed by
ObservabilityConfig.fromEnvironment:
void configureSignals() {
StemSignals.configure(
configuration: const StemSignalConfiguration(
enabled: true,
enabledSignals: {'worker-heartbeat': false},
),
);
}
Environment knobs:
STEM_SIGNALS_ENABLED=falsedisables all signals.STEM_SIGNALS_DISABLED=worker-heartbeat,task-prerundisables selected ones.
Listening for Signals
- Publish
- Task lifecycle
- Worker lifecycle
- Worker-scoped filters
- Scheduler
- Control plane
- StemEvent view
lib/signals.dart
List<SignalSubscription> registerPublishSignals() {
return [
StemSignals.beforeTaskPublish.connect((payload, _) {
print('Publishing ${payload.envelope.name}');
}),
StemSignals.afterTaskPublish.connect((payload, _) {
print('Published ${payload.envelope.id}');
}),
];
}
lib/signals.dart
List<SignalSubscription> registerTaskSignals() {
return [
StemSignals.taskRetry.connect((payload, _) {
print('Retrying ${payload.envelope.name} at ${payload.nextRetryAt}');
}),
StemSignals.taskFailed.connect((payload, _) {
print('Task failed: ${payload.envelope.id}');
}),
];
}
lib/signals.dart
SignalSubscription registerWorkerSignals() {
return StemSignals.onWorkerReady((payload, _) {
print('Worker ready: ${payload.worker.id}');
}, workerId: 'signals-worker');
}
lib/signals.dart
List<SignalSubscription> registerWorkerScopedSignals() {
return [
StemSignals.onTaskFailure(
(payload, _) {
print(
'Task failed on worker ${payload.worker.id}: ${payload.taskName}',
);
},
taskName: 'signals.demo',
workerId: 'signals-worker',
),
StemSignals.onControlCommandCompleted(
(payload, _) {
print(
'Control ${payload.command.type} -> ${payload.status} on ${payload.worker.id}',
);
},
workerId: 'signals-worker',
commandType: 'ping',
),
];
}
lib/signals.dart
SignalSubscription registerSchedulerSignals() {
return StemSignals.scheduleEntryDispatched.connect((payload, _) {
print('Schedule dispatched: ${payload.entry.id}');
});
}
lib/signals.dart
SignalSubscription registerControlSignals() {
return StemSignals.onControlCommandCompleted((payload, _) {
print('Control command: ${payload.command.type}');
});
}
lib/signals.dart
SignalSubscription registerStemEventView() {
return StemSignals.onTaskSuccess((payload, context) {
final event = context.event;
if (event != null) {
print(
'Event ${event.eventName} at ${event.occurredAt.toIso8601String()}',
);
}
});
}
Worker-scoped filtering is available on these convenience helpers:
onWorkerInit,onWorkerReady,onWorkerStopping,onWorkerShutdownonWorkerHeartbeat,onWorkerChildInit,onWorkerChildShutdownonTaskReceived,onTaskPrerun,onTaskPostrun,onTaskSuccess,onTaskFailure,onTaskRetry,onTaskRevokedonControlCommandReceived,onControlCommandCompleted
Custom Queue Events
Signals cover runtime lifecycle hooks. For application-domain events (BullMQ
QueueEvents style), use QueueEventsProducer and QueueEvents.
Adapters & Middleware
StemSignalEmitterbuilds payloads and emits signals; Stem runtime uses this same emitter internally.SignalMiddleware.coordinator()forwards enqueue middleware to publish signals.SignalMiddleware.worker()emits receive/prerun/failure hooks from existing worker middleware chains.
lib/signals.dart
List<Middleware> buildSignalMiddlewareForProducer() {
return [SignalMiddleware.coordinator()];
}
lib/signals.dart
List<Middleware> buildSignalMiddlewareForWorker() {
return [
SignalMiddleware.worker(
workerInfo: () => const WorkerInfo(
id: 'signals-worker',
queues: ['default'],
broadcasts: [],
),
),
];
}
Celery Comparison
| Celery | Stem | Notes |
|---|---|---|
task_prerun / task_postrun | taskPrerun / taskPostrun | Payload includes TaskContext and worker metadata. |
worker_ready | workerReady | Worker-scoped filters available via onWorkerReady(workerId: ...). |
worker_process_init/shutdown | workerChildInit / workerChildShutdown | Mirrors isolate pool spawn/recycle notifications. |
before_task_publish | beforeTaskPublish | Fires before broker writes. |
beat_schedule | scheduleEntryDispatched | Carries scheduled vs executed timestamps plus drift duration. |
Signals tied to Celery-specific pools remain out of scope.