Canvas Patterns
This guide walks through Stem's task composition primitives—chains, groups, and
chords—using in-memory brokers and backends. Each snippet references a runnable
file under packages/stem/example/docs_snippets/ so you can experiment locally
with dart run. If you bootstrap with StemApp, use app.canvas to reuse the
same broker, backend, task handlers, and encoder registry. StemApp lazy-starts
its managed worker for canvas dispatch too, so the common path does not need an
explicit await app.start().
Chains
Chains execute tasks serially. Each step receives the previous result via
context.meta, so prefer typed reads like
context.meta.valueOr<String>('chainPrevResult', 'fallback') over raw casts.
Future<void> main() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<String>(
name: 'fetch.user',
entrypoint: (context, args) async => 'Ada',
),
FunctionTaskHandler<String>(
name: 'enrich.user',
entrypoint: (context, args) async {
final prev = context.meta.valueOr<String>(
'chainPrevResult',
'Friend',
);
return '$prev Lovelace';
},
),
FunctionTaskHandler<Object?>(
name: 'send.email',
entrypoint: (context, args) async {
final fullName = context.meta.valueOr<String>(
'chainPrevResult',
'Friend',
);
print('Sending email to $fullName');
return null;
},
),
],
workerConfig: const StemWorkerConfig(
consumerName: 'chain-worker',
concurrency: 1,
prefetchMultiplier: 1,
),
);
final canvas = app.canvas;
final chainResult = await canvas.chain([
task('fetch.user'),
task('enrich.user'),
task('send.email'),
]);
print(
'Chain completed with state: ${chainResult.finalStatus?.state} '
'value=${chainResult.value}',
);
await app.close();
}
If any step fails, the chain stops immediately. Retry by invoking canvas.chain
again with the same signatures.
Groups
Groups fan out work and persist each branch in the result backend.
Future<void> main() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<int>(
name: 'square',
entrypoint: (context, args) async {
final value = args.requiredValue<int>('value');
await Future<void>.delayed(const Duration(milliseconds: 50));
return value * value;
},
),
],
workerConfig: const StemWorkerConfig(
consumerName: 'group-worker',
concurrency: 2,
prefetchMultiplier: 1,
),
);
final canvas = app.canvas;
final dispatch = await canvas.group([
task('square', args: <String, Object?>{'value': 2}),
task('square', args: <String, Object?>{'value': 3}),
task('square', args: <String, Object?>{'value': 4}),
]);
await _waitFor(() async {
final status = await app.getGroupStatus(dispatch.groupId);
return status?.results.length == 3;
});
final groupStatus = await app.getGroupStatus(dispatch.groupId);
final values = groupStatus?.resultValues<int>().values.toList();
print('Group results: $values');
await app.close();
}
Future<void> _waitFor(
Future<bool> Function() predicate, {
Duration timeout = const Duration(seconds: 5),
Duration pollInterval = const Duration(milliseconds: 50),
}) async {
final deadline = DateTime.now().add(timeout);
while (DateTime.now().isBefore(deadline)) {
if (await predicate()) return;
await Future<void>.delayed(pollInterval);
}
throw TimeoutException('Timed out waiting for group completion', timeout);
}
Batches
Batches provide a first-class immutable submission API on top of durable group state:
canvas.submitBatch(signatures)returns a stablebatchIdand task ids.canvas.inspectBatch(batchId)returns aggregate lifecycle status (pending,running,succeeded,failed,cancelled,partial).
Future<void> main() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<int>(
name: 'batch.double',
entrypoint: (context, args) async {
final value = args['value'] as int? ?? 0;
return value * 2;
},
),
],
workerConfig: const StemWorkerConfig(
consumerName: 'batch-worker',
concurrency: 1,
prefetchMultiplier: 1,
),
);
final submission = await app.canvas.submitBatch<int>([
task('batch.double', args: {'value': 1}),
task('batch.double', args: {'value': 2}),
task('batch.double', args: {'value': 3}),
]);
// Batches may still be running immediately after submission.
BatchStatus? status;
for (var i = 0; i < 20; i += 1) {
status = await app.canvas.inspectBatch(submission.batchId);
if (status?.isTerminal == true) {
break;
}
await Future<void>.delayed(const Duration(milliseconds: 50));
}
print(
'Batch ${submission.batchId} state=${status?.state} '
'completed=${status?.completed}/${status?.expected}',
);
await app.close();
}
Chords
Chords combine a group with a callback. Once all body tasks succeed, the
callback runs with context.meta['chordResults'] populated. Prefer
context.meta.valueListOr<T>('chordResults', const []) over manual list casts
when reading those results.
Future<void> main() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<int>(
name: 'fetch.metric',
entrypoint: (context, args) async {
await Future<void>.delayed(const Duration(milliseconds: 40));
return args.requiredValue<int>('value');
},
),
FunctionTaskHandler<Object?>(
name: 'aggregate.metric',
entrypoint: (context, args) async {
final values = context.meta.valueListOr<int>(
'chordResults',
const [],
);
final sum = values.fold<int>(0, (a, b) => a + b);
print('Aggregated result: $sum');
return null;
},
),
],
workerConfig: const StemWorkerConfig(
consumerName: 'chord-worker',
concurrency: 3,
prefetchMultiplier: 1,
),
);
final canvas = app.canvas;
final chordResult = await canvas.chord(
body: [
task('fetch.metric', args: <String, Object?>{'value': 5}),
task('fetch.metric', args: <String, Object?>{'value': 7}),
task('fetch.metric', args: <String, Object?>{'value': 11}),
],
callback: task('aggregate.metric'),
);
print('Callback task id: ${chordResult.callbackTaskId}');
print('Chord values: ${chordResult.values}');
await app.close();
}
If any branch fails, the callback is skipped and the chord group is marked as
failed. Inspect the latest group status via StemApp.getGroupStatus(...) or
StemClient.getGroupStatus(...) before retrying. If you are operating below
the runtime layer, read the raw backend directly.
Dependency semantics
- Chains model parent → child dependencies: each step is enqueued only after the previous one succeeds.
- Groups model fan-out dependencies: a group is “complete” once all child tasks finish. The expected count is stored in the backend.
- Chords combine both: a callback depends on the entire group finishing successfully.
Child result retrieval
Canvas.groupreturns aGroupDispatchwith a result stream for each child.Canvas.chordpreserves the original signature order when buildingchordResults, so you can map results back to inputs deterministically.StemApp.getGroupStatus(...)andStemClient.getGroupStatus(...)return the latest status for each child task. Usestatus.resultValues<T>()for scalar child results orstatus.resultJson(...)/status.resultAs(codec: ...)for DTO payloads before dropping down to raw backend reads.
Removal semantics
Group and chord metadata live in the result backend. Set backend TTLs or explicitly expire group records to avoid unbounded storage growth.
Running the examples
From the repository root:
cd packages/stem/example/docs_snippets
dart run lib/canvas_chain.dart
dart run lib/canvas_group.dart
dart run lib/canvas_chord.dart
Each script bootstraps a StemApp in-memory runtime and then uses app.canvas
for composition.
Best practices
- Keep callbacks idempotent; chords can be retried manually.
- Polling is fine for examples—production deployments should rely on notifications or shorter intervals.
- Expire group records via backend TTLs to avoid unbounded storage.