Canvas Patterns
This guide walks through Stem's task composition primitives—chains, groups, and
chords—using in-memory brokers and backends. Each snippet references a runnable
file under packages/stem/example/docs_snippets/ so you can experiment locally
with dart run. If you bootstrap with StemApp, use app.canvas to reuse the
same broker, backend, registry, and encoder registry.
Chains
Chains execute tasks serially. Each step receives the previous result via
context.meta['chainPrevResult'].
Future<void> main() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<String>(
name: 'fetch.user',
entrypoint: (context, args) async => 'Ada',
),
FunctionTaskHandler<String>(
name: 'enrich.user',
entrypoint: (context, args) async {
final prev = context.meta['chainPrevResult'] as String? ?? 'Friend';
return '$prev Lovelace';
},
),
FunctionTaskHandler<Object?>(
name: 'send.email',
entrypoint: (context, args) async {
final fullName =
context.meta['chainPrevResult'] as String? ?? 'Friend';
print('Sending email to $fullName');
return null;
},
),
],
workerConfig: const StemWorkerConfig(
consumerName: 'chain-worker',
concurrency: 1,
prefetchMultiplier: 1,
),
);
await app.start();
final canvas = app.canvas;
final chainResult = await canvas.chain([
task('fetch.user'),
task('enrich.user'),
task('send.email'),
]);
print(
'Chain completed with state: ${chainResult.finalStatus?.state} '
'value=${chainResult.value}',
);
await app.close();
}
If any step fails, the chain stops immediately. Retry by invoking canvas.chain
again with the same signatures.
Groups
Groups fan out work and persist each branch in the result backend.
Future<void> main() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<int>(
name: 'square',
entrypoint: (context, args) async {
final value = args['value'] as int;
await Future<void>.delayed(const Duration(milliseconds: 50));
return value * value;
},
),
],
workerConfig: const StemWorkerConfig(
consumerName: 'group-worker',
concurrency: 2,
prefetchMultiplier: 1,
),
);
await app.start();
final canvas = app.canvas;
const groupHandle = 'squares-demo';
await canvas.group([
task('square', args: <String, Object?>{'value': 2}),
task('square', args: <String, Object?>{'value': 3}),
task('square', args: <String, Object?>{'value': 4}),
], groupId: groupHandle);
await _waitFor(() async {
final status = await app.backend.getGroup(groupHandle);
return status?.results.length == 3;
});
final groupStatus = await app.backend.getGroup(groupHandle);
final values = groupStatus?.results.values.map((s) => s.payload).toList();
print('Group results: $values');
await app.close();
}
Future<void> _waitFor(
Future<bool> Function() predicate, {
Duration timeout = const Duration(seconds: 5),
Duration pollInterval = const Duration(milliseconds: 50),
}) async {
final deadline = DateTime.now().add(timeout);
while (DateTime.now().isBefore(deadline)) {
if (await predicate()) return;
await Future<void>.delayed(pollInterval);
}
throw TimeoutException('Timed out waiting for group completion', timeout);
}
Batches
Batches provide a first-class immutable submission API on top of durable group state:
canvas.submitBatch(signatures)returns a stablebatchIdand task ids.canvas.inspectBatch(batchId)returns aggregate lifecycle status (pending,running,succeeded,failed,cancelled,partial).
Future<void> main() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<int>(
name: 'batch.double',
entrypoint: (context, args) async {
final value = args['value'] as int? ?? 0;
return value * 2;
},
),
],
workerConfig: const StemWorkerConfig(
consumerName: 'batch-worker',
concurrency: 1,
prefetchMultiplier: 1,
),
);
await app.start();
final submission = await app.canvas.submitBatch<int>([
task('batch.double', args: {'value': 1}),
task('batch.double', args: {'value': 2}),
task('batch.double', args: {'value': 3}),
]);
// Batches may still be running immediately after submission.
BatchStatus? status;
for (var i = 0; i < 20; i += 1) {
status = await app.canvas.inspectBatch(submission.batchId);
if (status?.isTerminal == true) {
break;
}
await Future<void>.delayed(const Duration(milliseconds: 50));
}
print(
'Batch ${submission.batchId} state=${status?.state} '
'completed=${status?.completed}/${status?.expected}',
);
await app.close();
}
Chords
Chords combine a group with a callback. Once all body tasks succeed, the callback
runs with context.meta['chordResults'] populated.
Future<void> main() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<int>(
name: 'fetch.metric',
entrypoint: (context, args) async {
await Future<void>.delayed(const Duration(milliseconds: 40));
return args['value'] as int;
},
),
FunctionTaskHandler<Object?>(
name: 'aggregate.metric',
entrypoint: (context, args) async {
final values =
(context.meta['chordResults'] as List?)
?.whereType<int>()
.toList() ??
const [];
final sum = values.fold<int>(0, (a, b) => a + b);
print('Aggregated result: $sum');
return null;
},
),
],
workerConfig: const StemWorkerConfig(
consumerName: 'chord-worker',
concurrency: 3,
prefetchMultiplier: 1,
),
);
await app.start();
final canvas = app.canvas;
final chordResult = await canvas.chord(
body: [
task('fetch.metric', args: <String, Object?>{'value': 5}),
task('fetch.metric', args: <String, Object?>{'value': 7}),
task('fetch.metric', args: <String, Object?>{'value': 11}),
],
callback: task('aggregate.metric'),
);
print('Callback task id: ${chordResult.callbackTaskId}');
print('Chord values: ${chordResult.values}');
await app.close();
}
If any branch fails, the callback is skipped and the chord group is marked as
failed. Inspect backend.getGroup(chordId) to see which branch failed before
retrying.
Dependency semantics
- Chains model parent → child dependencies: each step is enqueued only after the previous one succeeds.
- Groups model fan-out dependencies: a group is “complete” once all child tasks finish. The expected count is stored in the backend.
- Chords combine both: a callback depends on the entire group finishing successfully.
Child result retrieval
Canvas.groupreturns aGroupDispatchwith a result stream for each child.Canvas.chordpreserves the original signature order when buildingchordResults, so you can map results back to inputs deterministically.backend.getGroup(groupId)returns the latest status for each child task.
Removal semantics
Group and chord metadata live in the result backend. Set backend TTLs or explicitly expire group records to avoid unbounded storage growth.
Running the examples
From the repository root:
cd packages/stem/example/docs_snippets
dart run lib/canvas_chain.dart
dart run lib/canvas_group.dart
dart run lib/canvas_chord.dart
Each script bootstraps a StemApp in-memory runtime, starts a worker, and then
uses app.canvas for composition.
Best practices
- Keep callbacks idempotent; chords can be retried manually.
- Polling is fine for examples—production deployments should rely on notifications or shorter intervals.
- Expire group records via backend TTLs to avoid unbounded storage.