Workflows
Stem Workflows let you orchestrate multi-step business processes with durable
state, typed results, automatic retries, and event-driven resumes. The
StemWorkflowApp helper wires together a Stem instance, workflow store,
event bus, and runtime so you can start runs, monitor progress, and interact
with suspended steps from one place.
Runtime Overview
final workflowApp = await StemWorkflowApp.fromUrl(
'redis://127.0.0.1:56379',
adapters: const [StemRedisAdapter(), StemPostgresAdapter()],
overrides: const StemStoreOverrides(
backend: 'redis://127.0.0.1:56379/1',
workflow: 'postgresql://<user>:<password>@127.0.0.1:65432/stem',
),
flows: [ApprovalsFlow.flow],
scripts: [retryScript],
eventBusFactory: WorkflowEventBusFactory.inMemory(),
workerConfig: const StemWorkerConfig(queue: 'workflow'),
);
Start the runtime once the app is constructed:
await workflowApp.start();
StemWorkflowApp exposes:
runtime– registersFlow/WorkflowScriptdefinitions and dequeues runs.store– persists checkpoints, suspension metadata, and results.eventBus– emits topics that resume waiting steps.app– the underlyingStemApp(broker + result backend + worker).
StemClient Entrypoint
StemClient is the shared entrypoint when you want a single object to own the
broker, result backend, and workflow helpers. It creates workflow apps and
workers with consistent configuration so you don't pass broker/backend handles
around.
Future<void> bootstrapWorkflowClient() async {
final client = await StemClient.fromUrl('memory://');
final app = await client.createWorkflowApp(flows: [ApprovalsFlow.flow]);
await app.start();
await app.close();
await client.close();
}
Declaring Typed Flows
Flows use the declarative DSL (FlowBuilder) to capture ordered steps. Specify
Flow<T> to document the completion type; generic metadata is preserved all the
way through WorkflowResult<T>.
class ApprovalsFlow {
static final flow = Flow<String>(
name: 'approvals.flow',
build: (flow) {
flow.step('draft', (ctx) async {
final payload = ctx.params['draft'] as Map<String, Object?>;
return payload['documentId'];
});
flow.step('manager-review', (ctx) async {
final resume = ctx.takeResumeData() as Map<String, Object?>?;
if (resume == null) {
await ctx.awaitEvent('approvals.manager');
return null;
}
return resume['approvedBy'] as String?;
});
flow.step('finalize', (ctx) async {
final approvedBy = ctx.previousResult as String?;
return 'approved-by:$approvedBy';
});
},
);
}
Future<void> registerFlow(StemWorkflowApp workflowApp) async {
workflowApp.runtime.registerWorkflow(ApprovalsFlow.flow.definition);
}
Steps re-run from the top after every suspension, so handlers must be
idempotent and rely on FlowContext helpers: iteration, takeResumeData,
sleep, awaitEvent, idempotencyKey, and persisted step outputs.
Workflow Scripts
WorkflowScript offers a higher-level facade that feels like a regular async
function. You still get typed results and step-level durability, but the DSL
handles ctx.step registration automatically.
final retryScript = WorkflowScript(
name: 'billing.retry-script',
run: (script) async {
final chargeId = await script.step<String>('charge', (ctx) async {
final resume = ctx.takeResumeData() as Map<String, Object?>?;
if (resume == null) {
await ctx.awaitEvent('billing.charge.prepared');
return 'pending';
}
return resume['chargeId'] as String;
});
final receipt = await script.step<String>('confirm', (ctx) async {
ctx.idempotencyKey('confirm-$chargeId');
return 'receipt-$chargeId';
});
return receipt;
},
);
final retryDefinition = retryScript.definition;
Scripts can enable autoVersion: true inside script.step calls to track loop
iterations using the stepName#iteration naming convention.
Annotated Workflows (stem_builder)
If you prefer decorators over the DSL, annotate workflow classes and tasks with
@WorkflowDefn, @workflow.run, @workflow.step, and @TaskDefn, then generate
the registry with stem_builder.
(name: 'approvals.flow')
class ApprovalsAnnotatedWorkflow {
.step
Future<String> draft(FlowContext ctx) async {
final payload = ctx.params['draft'] as Map<String, Object?>;
return payload['documentId'] as String;
}
(name: 'manager-review')
Future<String?> managerReview(FlowContext ctx) async {
final resume = ctx.takeResumeData() as Map<String, Object?>?;
if (resume == null) {
ctx.awaitEvent('approvals.manager');
return null;
}
return resume['approvedBy'] as String?;
}
.step
Future<String> finalize(FlowContext ctx) async {
final approvedBy = ctx.previousResult as String?;
return 'approved-by:$approvedBy';
}
}
(name: 'billing.retry-script', kind: WorkflowKind.script)
class BillingRetryAnnotatedWorkflow {
.run
Future<String> run(WorkflowScriptContext script) async {
final chargeId = await script.step<String>('charge', (ctx) async {
final resume = ctx.takeResumeData() as Map<String, Object?>?;
if (resume == null) {
await ctx.awaitEvent('billing.charge.prepared');
return 'pending';
}
return resume['chargeId'] as String;
});
return script.step<String>('confirm', (ctx) async {
ctx.idempotencyKey('confirm-$chargeId');
return 'receipt-$chargeId';
});
}
}
(
name: 'send_email',
options: TaskOptions(maxRetries: 5),
)
Future<void> sendEmail(
TaskInvocationContext ctx,
Map<String, Object?> args,
) async {
// send email
}
Future<void> registerAnnotatedDefinitions(StemWorkflowApp app) async {
// Generated by stem_builder.
registerStemDefinitions(
workflows: app.runtime.registry,
tasks: app.app.registry,
);
}
Build the registry (example):
dart pub add --dev build_runner stem_builder
dart run build_runner build
Starting & Awaiting Workflows
Future<void> runWorkflow(StemWorkflowApp workflowApp) async {
final runId = await workflowApp.startWorkflow(
'approvals.flow',
params: {
'draft': {'documentId': 'doc-42'},
},
cancellationPolicy: const WorkflowCancellationPolicy(
maxRunDuration: Duration(hours: 2),
maxSuspendDuration: Duration(minutes: 30),
),
);
final result = await workflowApp.waitForCompletion<String>(
runId,
timeout: const Duration(minutes: 5),
);
if (result?.isCompleted == true) {
print('Workflow finished with ${result!.value}');
} else {
print('Workflow state: ${result?.status}');
}
}
waitForCompletion<T> returns a WorkflowResult<T> that includes the decoded
value, original RunState, and a timedOut flag so callers can decide whether
to keep polling or surface status upstream.
Cancellation policies
WorkflowCancellationPolicy guards long-running runs. Use it to auto-cancel
workflows that exceed a wall-clock budget or remain suspended longer than
allowed.
Suspension, Events, and Groups of Runs
sleep(duration)stores a wake-up timestamp; the runtime pollsdueRunsand resumes those runs by re-enqueuing the internal workflow task.awaitEvent(topic, deadline: ...)registers durable watchers so external services canemit(topic, payload). The payload becomesresumeDatafor the awaiting step.runsWaitingOn(topic)exposes all runs suspended on a channel—useful for CLI tooling or dashboards. After a topic resumes the runtime callsmarkResumed(runId, data: suspensionData)so flows can inspect the payload.
Because watchers and due runs are persisted in the WorkflowStore, you can
operate on groups of workflows (pause, resume, or inspect every run waiting on
a topic) even if no worker is currently online.
Run Leases & Multi-Worker Recovery
Workflow runs are lease-based: a worker claims a run for a fixed duration, renews the lease while executing, and releases it on completion. This prevents two workers from executing the same run concurrently while still allowing takeover after crashes.
Operational guidance:
- Keep
runLeaseDuration>= the broker visibility timeout so redelivered workflow tasks retry instead of being dropped before the lease expires. - Ensure workers renew leases (
leaseExtension) before either the workflow lease or broker visibility timeout expires. - Keep system clocks in sync (NTP) because lease expiry is time-based across workers and the shared store.
Deterministic Tests with WorkflowClock
Inject a WorkflowClock when you need deterministic timestamps (e.g. for lease
expiry or due run scheduling). The FakeWorkflowClock lets tests advance time
without waiting on real timers.
final clock = FakeWorkflowClock(DateTime.utc(2024, 1, 1));
final store = InMemoryWorkflowStore(clock: clock);
final runtime = WorkflowRuntime(
stem: stem,
store: store,
eventBus: InMemoryEventBus(store: store),
clock: clock,
);
Payload Encoders in Workflow Apps
Workflows execute on top of a Stem worker, so they inherit the same
TaskPayloadEncoder facilities as regular tasks. StemWorkflowApp.create
accepts either a shared TaskPayloadEncoderRegistry or explicit defaults:
final encoders = TaskPayloadEncoderRegistry(
defaultArgsEncoder: const JsonTaskPayloadEncoder(),
defaultResultEncoder: const Base64PayloadEncoder(),
);
Future<void> configureWorkflowEncoders() async {
final app = await StemWorkflowApp.fromUrl(
'memory://',
flows: [ApprovalsFlow.flow],
encoderRegistry: encoders,
additionalEncoders: const [GzipPayloadEncoder()],
);
await app.close();
}
Every workflow run task stores the result encoder id in RunState.resultMeta,
and the internal tasks dispatched by workflows reuse the same registry—so
typed steps can safely emit encrypted/binary payloads while workers decode them
exactly once.
Need per-workflow overrides? Register custom encoders on individual task
handlers (via TaskMetadata) or attach a specialized encoder to a Flow/script
step that persists sensitive data in the workflow store.
Tooling Tips
- Use
workflowApp.store.listRuns(...)to filter by workflow/status when building admin dashboards. workflowApp.runtime.emit(topic, payload)is the canonical way to resume batches of runs waiting on external events.- CLI integrations (see
stem workflow ...) rely on the same store APIs, so keeping the store tidy (expired runs, watchers) ensures responsive tooling.