Skip to main content

Programmatic Workers & Enqueuers

Use Stem's Dart APIs to embed task production and processing inside your application services. This guide focuses on the two core roles: producer (enqueuer) and worker.

Producer (Enqueuer)

lib/producer.dart
Future<void> minimalProducer() async {
final registry = SimpleTaskRegistry()
..register(
FunctionTaskHandler<void>(
name: 'email.send',
entrypoint: (context, args) async {
final to = args['to'] as String? ?? 'friend';
print('Queued email to $to');
return null;
},
),
);

final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final stem = Stem(
broker: broker,
registry: registry,
backend: backend,
);

final taskId = await stem.enqueue(
'email.send',
args: {'to': 'hello@example.com', 'subject': 'Welcome'},
);

print('Enqueued $taskId');
await backend.close();
await broker.close();
}

Tips

  • Always reuse a Stem instance rather than creating one per request.
  • Use TaskOptions to set queue, retries, timeouts, and isolation.
  • Add custom metadata via the meta argument for observability or downstream processing.

Worker

bin/worker.dart
class EmailTask extends TaskHandler<void> {

String get name => 'email.send';


TaskOptions get options => const TaskOptions(maxRetries: 2);


Future<void> call(TaskContext context, Map<String, Object?> args) async {
final to = args['to'] as String;
print('Sending to $to (attempt ${context.attempt})');
}
}

Future<void> minimalWorker() async {
final registry = SimpleTaskRegistry()..register(EmailTask());
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();

final worker = Worker(
broker: broker,
registry: registry,
backend: backend,
queue: 'default',
);

await worker.start();
}

Lifecycle Tips

  • Call worker.shutdown() on SIGINT/SIGTERM to drain in-flight tasks and emit workerStopping/workerShutdown signals.
  • Monitor heartbeats via StemSignals.workerHeartbeat or the heartbeat backend for liveness checks.
  • Use WorkerLifecycleConfig to install signal handlers, configure soft/hard shutdown timeouts, and recycle isolates after N tasks or memory thresholds.

Putting It Together

A lightweight service wires the producer and worker into your application startup:

lib/bootstrap.dart
class StemRuntime {
StemRuntime({required this.registry, required this.brokerUrl});

final TaskRegistry registry;
final String brokerUrl;

final InMemoryBroker _stemBroker = InMemoryBroker();
final InMemoryResultBackend _stemBackend = InMemoryResultBackend();
final InMemoryBroker _workerBroker = InMemoryBroker();
final InMemoryResultBackend _workerBackend = InMemoryResultBackend();

late final Stem stem = Stem(
broker: _stemBroker,
registry: registry,
backend: _stemBackend,
);

late final Worker worker = Worker(
broker: _workerBroker,
registry: registry,
backend: _workerBackend,
);

Future<void> start() async {
await worker.start();
}

Future<void> stop() async {
await worker.shutdown();
await _workerBackend.close();
await _workerBroker.close();
await _stemBackend.close();
await _stemBroker.close();
}
}

Swap the in-memory adapters for Redis/Postgres when you deploy, keeping the API surface the same.

Checklist

  • Reuse producer and worker objects—avoid per-request construction.
  • Inject the TaskRegistry from a central module so producers and workers stay in sync.
  • Capture task IDs returned by Stem.enqueue when you need to poll results or correlate with your own auditing.
  • Emit lifecycle signals (StemSignals) and wire logs/metrics early so production instrumentation is already in place.
  • For HTTP/GraphQL handlers, wrap enqueues in try/catch to surface validation errors before tasks hit the queue.

Next, continue with the Worker Control CLI or explore Signals for advanced instrumentation.