Skip to main content

Workflows

Stem Workflows let you orchestrate multi-step business processes with durable state, typed results, automatic retries, and event-driven resumes. The StemWorkflowApp helper wires together a Stem instance, workflow store, event bus, and runtime so you can start runs, monitor progress, and interact with suspended steps from one place.

Runtime Overview

bin/workflows.dart
  final workflowApp = await StemWorkflowApp.fromUrl(
'redis://127.0.0.1:56379',
adapters: const [StemRedisAdapter(), StemPostgresAdapter()],
overrides: const StemStoreOverrides(
backend: 'redis://127.0.0.1:56379/1',
workflow: 'postgresql://<user>:<password>@127.0.0.1:65432/stem',
),
flows: [ApprovalsFlow.flow],
scripts: [retryScript],
eventBusFactory: WorkflowEventBusFactory.inMemory(),
workerConfig: const StemWorkerConfig(queue: 'workflow'),
);

Start the runtime once the app is constructed:

  await workflowApp.start();

StemWorkflowApp exposes:

  • runtime – registers Flow/WorkflowScript definitions and dequeues runs.
  • store – persists checkpoints, suspension metadata, and results.
  • eventBus – emits topics that resume waiting steps.
  • app – the underlying StemApp (broker + result backend + worker).

StemClient Entrypoint

StemClient is the shared entrypoint when you want a single object to own the broker, result backend, and workflow helpers. It creates workflow apps and workers with consistent configuration so you don't pass broker/backend handles around.

bin/workflows_client.dart
Future<void> bootstrapWorkflowClient() async {
final client = await StemClient.fromUrl('memory://');
final app = await client.createWorkflowApp(flows: [ApprovalsFlow.flow]);
await app.start();
await app.close();
await client.close();
}

Declaring Typed Flows

Flows use the declarative DSL (FlowBuilder) to capture ordered steps. Specify Flow<T> to document the completion type; generic metadata is preserved all the way through WorkflowResult<T>.

lib/workflows/approvals_flow.dart
class ApprovalsFlow {
static final flow = Flow<String>(
name: 'approvals.flow',
build: (flow) {
flow.step('draft', (ctx) async {
final payload = ctx.params['draft'] as Map<String, Object?>;
return payload['documentId'];
});

flow.step('manager-review', (ctx) async {
final resume = ctx.takeResumeData() as Map<String, Object?>?;
if (resume == null) {
await ctx.awaitEvent('approvals.manager');
return null;
}
return resume['approvedBy'] as String?;
});

flow.step('finalize', (ctx) async {
final approvedBy = ctx.previousResult as String?;
return 'approved-by:$approvedBy';
});
},
);
}

Future<void> registerFlow(StemWorkflowApp workflowApp) async {
workflowApp.runtime.registerWorkflow(ApprovalsFlow.flow.definition);
}

Steps re-run from the top after every suspension, so handlers must be idempotent and rely on FlowContext helpers: iteration, takeResumeData, sleep, awaitEvent, idempotencyKey, and persisted step outputs.

Workflow Scripts

WorkflowScript offers a higher-level facade that feels like a regular async function. You still get typed results and step-level durability, but the DSL handles ctx.step registration automatically.

lib/workflows/retry_script.dart
final retryScript = WorkflowScript(
name: 'billing.retry-script',
run: (script) async {
final chargeId = await script.step<String>('charge', (ctx) async {
final resume = ctx.takeResumeData() as Map<String, Object?>?;
if (resume == null) {
await ctx.awaitEvent('billing.charge.prepared');
return 'pending';
}
return resume['chargeId'] as String;
});

final receipt = await script.step<String>('confirm', (ctx) async {
ctx.idempotencyKey('confirm-$chargeId');
return 'receipt-$chargeId';
});

return receipt;
},
);

final retryDefinition = retryScript.definition;

Scripts can enable autoVersion: true inside script.step calls to track loop iterations using the stepName#iteration naming convention.

Annotated Workflows (stem_builder)

If you prefer decorators over the DSL, annotate workflow classes and tasks with @WorkflowDefn, @workflow.run, @workflow.step, and @TaskDefn, then generate the registry with stem_builder.

lib/workflows/annotated.dart
(name: 'approvals.flow')
class ApprovalsAnnotatedWorkflow {
.step
Future<String> draft(FlowContext ctx) async {
final payload = ctx.params['draft'] as Map<String, Object?>;
return payload['documentId'] as String;
}

(name: 'manager-review')
Future<String?> managerReview(FlowContext ctx) async {
final resume = ctx.takeResumeData() as Map<String, Object?>?;
if (resume == null) {
ctx.awaitEvent('approvals.manager');
return null;
}
return resume['approvedBy'] as String?;
}

.step
Future<String> finalize(FlowContext ctx) async {
final approvedBy = ctx.previousResult as String?;
return 'approved-by:$approvedBy';
}
}

(name: 'billing.retry-script', kind: WorkflowKind.script)
class BillingRetryAnnotatedWorkflow {
.run
Future<String> run(WorkflowScriptContext script) async {
final chargeId = await script.step<String>('charge', (ctx) async {
final resume = ctx.takeResumeData() as Map<String, Object?>?;
if (resume == null) {
await ctx.awaitEvent('billing.charge.prepared');
return 'pending';
}
return resume['chargeId'] as String;
});

return script.step<String>('confirm', (ctx) async {
ctx.idempotencyKey('confirm-$chargeId');
return 'receipt-$chargeId';
});
}
}

(
name: 'send_email',
options: TaskOptions(maxRetries: 5),
)
Future<void> sendEmail(
TaskInvocationContext ctx,
Map<String, Object?> args,
) async {
// send email
}

Future<void> registerAnnotatedDefinitions(StemWorkflowApp app) async {
// Generated by stem_builder.
registerStemDefinitions(
workflows: app.runtime.registry,
tasks: app.app.registry,
);
}

Build the registry (example):

dart pub add --dev build_runner stem_builder
dart run build_runner build

Starting & Awaiting Workflows

bin/run_workflow.dart
Future<void> runWorkflow(StemWorkflowApp workflowApp) async {
final runId = await workflowApp.startWorkflow(
'approvals.flow',
params: {
'draft': {'documentId': 'doc-42'},
},
cancellationPolicy: const WorkflowCancellationPolicy(
maxRunDuration: Duration(hours: 2),
maxSuspendDuration: Duration(minutes: 30),
),
);

final result = await workflowApp.waitForCompletion<String>(
runId,
timeout: const Duration(minutes: 5),
);

if (result?.isCompleted == true) {
print('Workflow finished with ${result!.value}');
} else {
print('Workflow state: ${result?.status}');
}
}

waitForCompletion<T> returns a WorkflowResult<T> that includes the decoded value, original RunState, and a timedOut flag so callers can decide whether to keep polling or surface status upstream.

Cancellation policies

WorkflowCancellationPolicy guards long-running runs. Use it to auto-cancel workflows that exceed a wall-clock budget or remain suspended longer than allowed.

Suspension, Events, and Groups of Runs

  • sleep(duration) stores a wake-up timestamp; the runtime polls dueRuns and resumes those runs by re-enqueuing the internal workflow task.
  • awaitEvent(topic, deadline: ...) registers durable watchers so external services can emit(topic, payload). The payload becomes resumeData for the awaiting step.
  • runsWaitingOn(topic) exposes all runs suspended on a channel—useful for CLI tooling or dashboards. After a topic resumes the runtime calls markResumed(runId, data: suspensionData) so flows can inspect the payload.

Because watchers and due runs are persisted in the WorkflowStore, you can operate on groups of workflows (pause, resume, or inspect every run waiting on a topic) even if no worker is currently online.

Run Leases & Multi-Worker Recovery

Workflow runs are lease-based: a worker claims a run for a fixed duration, renews the lease while executing, and releases it on completion. This prevents two workers from executing the same run concurrently while still allowing takeover after crashes.

Operational guidance:

  • Keep runLeaseDuration >= the broker visibility timeout so redelivered workflow tasks retry instead of being dropped before the lease expires.
  • Ensure workers renew leases (leaseExtension) before either the workflow lease or broker visibility timeout expires.
  • Keep system clocks in sync (NTP) because lease expiry is time-based across workers and the shared store.

Deterministic Tests with WorkflowClock

Inject a WorkflowClock when you need deterministic timestamps (e.g. for lease expiry or due run scheduling). The FakeWorkflowClock lets tests advance time without waiting on real timers.

final clock = FakeWorkflowClock(DateTime.utc(2024, 1, 1));
final store = InMemoryWorkflowStore(clock: clock);
final runtime = WorkflowRuntime(
stem: stem,
store: store,
eventBus: InMemoryEventBus(store: store),
clock: clock,
);

Payload Encoders in Workflow Apps

Workflows execute on top of a Stem worker, so they inherit the same TaskPayloadEncoder facilities as regular tasks. StemWorkflowApp.create accepts either a shared TaskPayloadEncoderRegistry or explicit defaults:

lib/workflows/bootstrap.dart
final encoders = TaskPayloadEncoderRegistry(
defaultArgsEncoder: const JsonTaskPayloadEncoder(),
defaultResultEncoder: const Base64PayloadEncoder(),
);

Future<void> configureWorkflowEncoders() async {
final app = await StemWorkflowApp.fromUrl(
'memory://',
flows: [ApprovalsFlow.flow],
encoderRegistry: encoders,
additionalEncoders: const [GzipPayloadEncoder()],
);

await app.close();
}

Every workflow run task stores the result encoder id in RunState.resultMeta, and the internal tasks dispatched by workflows reuse the same registry—so typed steps can safely emit encrypted/binary payloads while workers decode them exactly once.

Need per-workflow overrides? Register custom encoders on individual task handlers (via TaskMetadata) or attach a specialized encoder to a Flow/script step that persists sensitive data in the workflow store.

Tooling Tips

  • Use workflowApp.store.listRuns(...) to filter by workflow/status when building admin dashboards.
  • workflowApp.runtime.emit(topic, payload) is the canonical way to resume batches of runs waiting on external events.
  • CLI integrations (see stem workflow ...) rely on the same store APIs, so keeping the store tidy (expired runs, watchers) ensures responsive tooling.