Programmatic Workers & Enqueuers
Use Stem's Dart APIs to embed task production and processing inside your application services. This guide focuses on the two core roles: producer (enqueuer) and worker.
This page is intentionally about the lower-level embedding surface. If you want
the default happy path, prefer StemClient, StemApp, or StemWorkflowApp
and come here only when you need direct runtime composition.
Producer (Enqueuer)
- Minimal
- Redis Broker
- Payload Signing
Future<void> minimalProducer() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<void>(
name: 'email.send',
entrypoint: (context, args) async {
final to = args['to'] as String? ?? 'friend';
print('Queued email to $to');
return null;
},
),
],
);
final taskId = await app.enqueue(
'email.send',
args: {'to': 'hello@example.com', 'subject': 'Welcome'},
);
print('Enqueued $taskId');
await app.waitForTask<void>(taskId);
await app.close();
}
Future<void> redisProducer() async {
final brokerUrl =
Platform.environment['STEM_BROKER_URL'] ?? 'redis://localhost:6379';
final client = await StemClient.fromUrl(
brokerUrl,
adapters: const [StemRedisAdapter()],
overrides: StemStoreOverrides(backend: '$brokerUrl/1'),
tasks: [
FunctionTaskHandler<void>(
name: 'report.generate',
entrypoint: (context, args) async {
final id = args['reportId'] as String? ?? 'unknown';
print('Queued report $id');
return null;
},
),
],
);
await client.enqueue(
'report.generate',
args: {'reportId': 'monthly-2025-10'},
options: const TaskOptions(queue: 'reports'),
);
await client.close();
}
Future<void> signedProducer() async {
final config = StemConfig.fromEnvironment();
final signer = PayloadSigner.maybe(config.signing);
final client = await StemClient.fromUrl(
config.brokerUrl,
adapters: const [StemRedisAdapter()],
overrides: const StemStoreOverrides(backend: 'memory://'),
tasks: [
FunctionTaskHandler<void>(
name: 'billing.charge',
entrypoint: (context, args) async {
final customerId = args['customerId'] as String? ?? 'unknown';
print('Queued charge for $customerId');
return null;
},
),
],
signer: signer,
);
await client.enqueue(
'billing.charge',
args: {'customerId': 'cust_123', 'amount': 4200},
);
await client.close();
}
Tips
- Always reuse the producer runtime (
StemClient,StemApp, or rawStem) rather than constructing one per request. - Use
TaskOptionsto set queue, retries, timeouts, and isolation. - Add custom metadata via the
metaargument for observability or downstream processing.
Worker
- Minimal
- Redis Broker
- Retries & Signals
class EmailTask extends TaskHandler<void> {
String get name => 'email.send';
TaskOptions get options => const TaskOptions(maxRetries: 2);
Future<void> call(TaskContext context, Map<String, Object?> args) async {
final to = args['to'] as String;
print('Sending to $to (attempt ${context.attempt})');
}
}
Future<void> minimalWorker() async {
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final worker = Worker(
broker: broker,
backend: backend,
tasks: [EmailTask()],
queue: 'default',
);
await worker.start();
}
Future<void> redisWorker() async {
final brokerUrl =
Platform.environment['STEM_BROKER_URL'] ?? 'redis://localhost:6379';
final worker = Worker(
broker: await RedisStreamsBroker.connect(brokerUrl),
backend: await RedisResultBackend.connect('$brokerUrl/1'),
tasks: [RedisEmailTask()],
queue: 'default',
concurrency: Platform.numberOfProcessors,
);
await worker.start();
}
class RedisEmailTask extends TaskHandler<void> {
String get name => 'email.send';
TaskOptions get options => const TaskOptions(
queue: 'default',
maxRetries: 3,
visibilityTimeout: Duration(seconds: 30),
);
Future<void> call(TaskContext context, Map<String, Object?> args) async {}
}
class FlakyTask extends TaskHandler<void> {
String get name => 'demo.flaky';
TaskOptions get options => const TaskOptions(maxRetries: 2);
Future<void> call(TaskContext context, Map<String, Object?> args) async {
if (context.attempt < 2) {
throw StateError('Simulated failure');
}
print('Succeeded on attempt ${context.attempt}');
}
}
Future<void> retryWorker() async {
StemSignals.taskRetry.connect((payload, _) {
print('[retry] next run at: ${payload.nextRetryAt}');
});
final worker = Worker(
broker: InMemoryBroker(),
backend: InMemoryResultBackend(),
tasks: [FlakyTask()],
retryStrategy: ExponentialJitterRetryStrategy(
base: const Duration(milliseconds: 200),
max: const Duration(seconds: 1),
),
);
await worker.start();
}
Lifecycle Tips
- Call
worker.shutdown()on SIGINT/SIGTERM to drain in-flight tasks and emitworkerStopping/workerShutdownsignals. - Monitor heartbeats via
StemSignals.workerHeartbeator the heartbeat backend for liveness checks. - Use
WorkerLifecycleConfigto install signal handlers, configure soft/hard shutdown timeouts, and recycle isolates after N tasks or memory thresholds.
Putting It Together
A lightweight service wires the producer and worker into your application startup:
class StemRuntime {
StemRuntime({required this.tasks, required this.brokerUrl});
final List<TaskHandler<Object?>> tasks;
final String brokerUrl;
final InMemoryBroker _stemBroker = InMemoryBroker();
final InMemoryResultBackend _stemBackend = InMemoryResultBackend();
final InMemoryBroker _workerBroker = InMemoryBroker();
final InMemoryResultBackend _workerBackend = InMemoryResultBackend();
late final Stem stem = Stem(
broker: _stemBroker,
backend: _stemBackend,
tasks: tasks,
);
late final Worker worker = Worker(
broker: _workerBroker,
backend: _workerBackend,
tasks: tasks,
);
Future<void> start() async {
await worker.start();
}
Future<void> stop() async {
await worker.shutdown();
await _workerBackend.close();
await _workerBroker.close();
await _stemBackend.close();
await _stemBroker.close();
}
}
Swap the in-memory adapters for Redis/Postgres when you deploy, keeping the API surface the same.
If your service wants a higher-level bootstrap that owns broker/backend/tasks
in one place, use StemApp or StemClient instead of wiring raw Stem and
Worker instances by hand. This page stays focused on the lower-level
embedding path.
Checklist
- Reuse producer and worker objects—avoid per-request construction.
- Keep a shared
taskslist/module so producers and workers stay in sync. - Reach for a custom
TaskRegistryonly when you need advanced dynamic registration behavior. - Capture task IDs returned by
Stem.enqueuewhen you need to poll results or correlate with your own auditing. - Emit lifecycle signals (
StemSignals) and wire logs/metrics early so production instrumentation is already in place. - For HTTP/GraphQL handlers, wrap enqueues in try/catch to surface validation errors before tasks hit the queue.
Next, continue with the Worker Control CLI or explore Signals for advanced instrumentation.