Skip to main content

Canvas Patterns

This guide walks through Stem's task composition primitives—chains, groups, and chords—using in-memory brokers and backends. Each snippet references a runnable file under packages/stem/example/docs_snippets/ so you can experiment locally with dart run. If you bootstrap with StemApp, use app.canvas to reuse the same broker, backend, registry, and encoder registry.

Chains

Chains execute tasks serially. Each step receives the previous result via context.meta['chainPrevResult'].

Future<void> main() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<String>(
name: 'fetch.user',
entrypoint: (context, args) async => 'Ada',
),
FunctionTaskHandler<String>(
name: 'enrich.user',
entrypoint: (context, args) async {
final prev = context.meta['chainPrevResult'] as String? ?? 'Friend';
return '$prev Lovelace';
},
),
FunctionTaskHandler<Object?>(
name: 'send.email',
entrypoint: (context, args) async {
final fullName =
context.meta['chainPrevResult'] as String? ?? 'Friend';
print('Sending email to $fullName');
return null;
},
),
],
workerConfig: const StemWorkerConfig(
consumerName: 'chain-worker',
concurrency: 1,
prefetchMultiplier: 1,
),
);
await app.start();

final canvas = app.canvas;
final chainResult = await canvas.chain([
task('fetch.user'),
task('enrich.user'),
task('send.email'),
]);

print(
'Chain completed with state: ${chainResult.finalStatus?.state} '
'value=${chainResult.value}',
);

await app.close();
}

If any step fails, the chain stops immediately. Retry by invoking canvas.chain again with the same signatures.

Groups

Groups fan out work and persist each branch in the result backend.

Future<void> main() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<int>(
name: 'square',
entrypoint: (context, args) async {
final value = args['value'] as int;
await Future<void>.delayed(const Duration(milliseconds: 50));
return value * value;
},
),
],
workerConfig: const StemWorkerConfig(
consumerName: 'group-worker',
concurrency: 2,
prefetchMultiplier: 1,
),
);
await app.start();

final canvas = app.canvas;
const groupHandle = 'squares-demo';
await canvas.group([
task('square', args: <String, Object?>{'value': 2}),
task('square', args: <String, Object?>{'value': 3}),
task('square', args: <String, Object?>{'value': 4}),
], groupId: groupHandle);

await _waitFor(() async {
final status = await app.backend.getGroup(groupHandle);
return status?.results.length == 3;
});

final groupStatus = await app.backend.getGroup(groupHandle);
final values = groupStatus?.results.values.map((s) => s.payload).toList();
print('Group results: $values');

await app.close();
}

Future<void> _waitFor(
Future<bool> Function() predicate, {
Duration timeout = const Duration(seconds: 5),
Duration pollInterval = const Duration(milliseconds: 50),
}) async {
final deadline = DateTime.now().add(timeout);
while (DateTime.now().isBefore(deadline)) {
if (await predicate()) return;
await Future<void>.delayed(pollInterval);
}
throw TimeoutException('Timed out waiting for group completion', timeout);
}

Batches

Batches provide a first-class immutable submission API on top of durable group state:

  • canvas.submitBatch(signatures) returns a stable batchId and task ids.
  • canvas.inspectBatch(batchId) returns aggregate lifecycle status (pending, running, succeeded, failed, cancelled, partial).
Future<void> main() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<int>(
name: 'batch.double',
entrypoint: (context, args) async {
final value = args['value'] as int? ?? 0;
return value * 2;
},
),
],
workerConfig: const StemWorkerConfig(
consumerName: 'batch-worker',
concurrency: 1,
prefetchMultiplier: 1,
),
);
await app.start();

final submission = await app.canvas.submitBatch<int>([
task('batch.double', args: {'value': 1}),
task('batch.double', args: {'value': 2}),
task('batch.double', args: {'value': 3}),
]);

// Batches may still be running immediately after submission.
BatchStatus? status;
for (var i = 0; i < 20; i += 1) {
status = await app.canvas.inspectBatch(submission.batchId);
if (status?.isTerminal == true) {
break;
}
await Future<void>.delayed(const Duration(milliseconds: 50));
}
print(
'Batch ${submission.batchId} state=${status?.state} '
'completed=${status?.completed}/${status?.expected}',
);

await app.close();
}

Chords

Chords combine a group with a callback. Once all body tasks succeed, the callback runs with context.meta['chordResults'] populated.

Future<void> main() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<int>(
name: 'fetch.metric',
entrypoint: (context, args) async {
await Future<void>.delayed(const Duration(milliseconds: 40));
return args['value'] as int;
},
),
FunctionTaskHandler<Object?>(
name: 'aggregate.metric',
entrypoint: (context, args) async {
final values =
(context.meta['chordResults'] as List?)
?.whereType<int>()
.toList() ??
const [];
final sum = values.fold<int>(0, (a, b) => a + b);
print('Aggregated result: $sum');
return null;
},
),
],
workerConfig: const StemWorkerConfig(
consumerName: 'chord-worker',
concurrency: 3,
prefetchMultiplier: 1,
),
);
await app.start();

final canvas = app.canvas;
final chordResult = await canvas.chord(
body: [
task('fetch.metric', args: <String, Object?>{'value': 5}),
task('fetch.metric', args: <String, Object?>{'value': 7}),
task('fetch.metric', args: <String, Object?>{'value': 11}),
],
callback: task('aggregate.metric'),
);

print('Callback task id: ${chordResult.callbackTaskId}');
print('Chord values: ${chordResult.values}');

await app.close();
}

If any branch fails, the callback is skipped and the chord group is marked as failed. Inspect backend.getGroup(chordId) to see which branch failed before retrying.

Dependency semantics

  • Chains model parent → child dependencies: each step is enqueued only after the previous one succeeds.
  • Groups model fan-out dependencies: a group is “complete” once all child tasks finish. The expected count is stored in the backend.
  • Chords combine both: a callback depends on the entire group finishing successfully.

Child result retrieval

  • Canvas.group returns a GroupDispatch with a result stream for each child.
  • Canvas.chord preserves the original signature order when building chordResults, so you can map results back to inputs deterministically.
  • backend.getGroup(groupId) returns the latest status for each child task.

Removal semantics

Group and chord metadata live in the result backend. Set backend TTLs or explicitly expire group records to avoid unbounded storage growth.

Running the examples

From the repository root:

cd packages/stem/example/docs_snippets
dart run lib/canvas_chain.dart
dart run lib/canvas_group.dart
dart run lib/canvas_chord.dart

Each script bootstraps a StemApp in-memory runtime, starts a worker, and then uses app.canvas for composition.

Best practices

  • Keep callbacks idempotent; chords can be retried manually.
  • Polling is fine for examples—production deployments should rely on notifications or shorter intervals.
  • Expire group records via backend TTLs to avoid unbounded storage.