Skip to main content

Core Concepts

Understand the building blocks that power Stem. These pages explain how tasks, workers, routing, signals, and canvases fit together so you can reason about behavior before touching production.

Feature Highlights

  • Queueing and retries with Stem.enqueue, TaskOptions, and retry strategies.
  • StemClient entrypoint to share broker/backend configuration across workers and workflow apps.
  • Worker lifecycle management, concurrency controls, and graceful shutdown.
  • Beat scheduler for interval/cron/solar/clocked jobs.
  • Canvas primitives (chains, groups, chords) for task composition.
  • First-class batch submissions with durable aggregate status inspection.
  • Lifecycle signals for instrumentation and integrations.
  • Queue-scoped custom events via QueueEventsProducer/QueueEvents.
  • Declarative routing across queues and broadcast channels.
  • Result backends and progress reporting via TaskContext.

Core Concept Snippets

lib/tasks.dart
class EmailTask extends TaskHandler<void> {

String get name => 'email.send';


TaskOptions get options => const TaskOptions(maxRetries: 2);


Future<void> call(TaskContext context, Map<String, Object?> args) async {
final to = args['to'] as String? ?? 'anonymous';
print('Emailing $to (attempt ${context.attempt})');
}
}

final registry = SimpleTaskRegistry()..register(EmailTask());
lib/routing.dart
final inlineRegistry = RoutingRegistry(
RoutingConfig(
defaultQueue: const DefaultQueueConfig(alias: 'default', queue: 'primary'),
queues: {'primary': QueueDefinition(name: 'primary')},
routes: [
RouteDefinition(
match: RouteMatch.fromJson(const {'task': 'reports.*'}),
target: RouteTarget(type: 'queue', name: 'primary'),
),
],
),
);
lib/signals.dart
void configureSignals() {
StemSignals.configure(
configuration: const StemSignalConfiguration(
enabled: true,
enabledSignals: {'worker-heartbeat': false},
),
);
}
lib/canvas_chain.dart
Future<void> main() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<String>(
name: 'fetch.user',
entrypoint: (context, args) async => 'Ada',
),
FunctionTaskHandler<String>(
name: 'enrich.user',
entrypoint: (context, args) async {
final prev = context.meta['chainPrevResult'] as String? ?? 'Friend';
return '$prev Lovelace';
},
),
FunctionTaskHandler<Object?>(
name: 'send.email',
entrypoint: (context, args) async {
final fullName =
context.meta['chainPrevResult'] as String? ?? 'Friend';
print('Sending email to $fullName');
return null;
},
),
],
workerConfig: const StemWorkerConfig(
consumerName: 'chain-worker',
concurrency: 1,
prefetchMultiplier: 1,
),
);
await app.start();

final canvas = app.canvas;
final chainResult = await canvas.chain([
task('fetch.user'),
task('enrich.user'),
task('send.email'),
]);

print(
'Chain completed with state: ${chainResult.finalStatus?.state} '
'value=${chainResult.value}',
);

await app.close();
}
  • Tasks & Retries – Task handlers, options, retries, and idempotency guidelines.
  • Producer API – Enqueue tasks with args, metadata, signing, and delays.
  • Payload Signing – Sign envelopes, rotate keys, and verify signatures.
  • Rate Limiting – Throttle hot handlers with shared limits.
  • Uniqueness – Deduplicate naturally unique tasks.
  • Namespaces – Isolate environments and tenants.
  • Routing – Queue aliases, priorities, and broadcast channels.
  • Signals – Lifecycle hooks for instrumentation and integrations.
  • Queue Events – Publish/listen to queue-scoped custom events.
  • Canvas Patterns – Chains, groups, and chords for composing work.
  • Observability – Metrics, traces, logging, and lifecycle signals.
  • Persistence & Stores – Result backends, schedule/lock stores, and revocation storage.
  • Workflows – Durable Flow/Script runtimes with typed results, suspensions, and event watchers.
  • CLI & Control – Quickly inspect queues, workers, and health from the command line.

Continue with the Workers guide for operational details.