Suspensions and Events
Suspension is where workflows differ from normal queue consumers. A workflow can stop executing, persist its state, and resume later on the same worker or a different worker.
Sleep
sleep(duration) records a wake-up time in the workflow store. The runtime
periodically scans due runs and re-enqueues the internal workflow task when the
sleep expires.
For the common "sleep once, continue on resume" case, prefer the higher-level helper:
await ctx.sleepFor(duration: const Duration(milliseconds: 200));
Await external events
awaitEvent(topic, deadline: ...) records a durable watcher. External code can
resume those runs through the runtime API by emitting a payload for the topic.
When you inspect watcher entries directly, use watcher.dataJson(...) or
watcher.dataAs(codec: ...) when the full watcher metadata maps to one DTO.
If only the nested watcher payload is a DTO, use watcher.payloadJson(...) or
watcher.payloadAs(codec: ...) instead of manual raw-map casts.
Typical flow:
- a step calls
awaitEvent('orders.payment.confirmed') - the run is marked suspended in the store
- another process calls
WorkflowRuntime.emit(...)/WorkflowRuntime.emitValue(...)(or an app/service wrapper around it) with a payload - the runtime resumes the run and exposes the payload through
waitForEvent(...),event.wait(ctx), or the lower-leveltakeResumeData()/takeResumeValue<T>(codec: ...)
For the common "wait for one event and continue" case, prefer:
final payload = await ctx.waitForEventJson<PaymentConfirmed>(
topic: 'orders.payment.confirmed',
decode: PaymentConfirmed.fromJson,
);
Emit resume events
Use WorkflowRuntime.emit(...) / WorkflowRuntime.emitJson(...) /
WorkflowRuntime.emitVersionedJson(...) / WorkflowRuntime.emitValue(...)
(or the app wrappers workflowApp.emitJson(...) /
workflowApp.emitVersionedJson(...) / workflowApp.emitValue(...)) instead
of hand-editing store state:
await workflowApp.emitJson(
'orders.payment.confirmed',
const PaymentConfirmed(paymentId: 'pay_42', approvedBy: 'gateway'),
);
Typed event payloads still serialize to a string-keyed JSON-like map.
emitJson(...), emitVersionedJson(...), and emitValue(...) are
DTO/codec convenience layers, not a new transport shape.
When the topic and codec travel together in your codebase, prefer
WorkflowEventRef<T>.json(...) for normal DTO payloads,
WorkflowEventRef<T>.versionedJson(...) when the payload schema should carry
an explicit __stemPayloadVersion, WorkflowEventRef<T>.versionedMap(...)
when the payload needs a custom map encoder plus stored schema version, and
keep event.emit(emitter, dto) as the happy path.
Pair that with await event.wait(ctx). If you are writing a flow and
deliberately want the lower-level FlowStepControl path, use
event.awaitOn(step) instead of dropping back to a raw topic string.
For low-level sleep/event directives that still need DTO metadata, use
step.sleepJson(...), step.sleepVersionedJson(...),
step.awaitEventJson(...), step.awaitEventVersionedJson(...), or
FlowStepControl.awaitTopicJson(...) instead of hand-built maps.
Inspect waiting runs
The workflow store can tell you which runs are waiting on a topic:
runsWaitingOn(topic)listRuns(...)
That is the foundation for dashboards, operational tooling, and bulk inspection.
Group operations
Because due runs and event watchers are persisted, you can:
- resume batches of runs waiting on one topic
- inspect all suspended runs even with no active worker
- rebuild dashboard views after process restarts