Programmatic Workers & Enqueuers
Use Stem's Dart APIs to embed task production and processing inside your application services. This guide focuses on the two core roles: producer (enqueuer) and worker.
Producer (Enqueuer)
- Minimal
- Redis Broker
- Payload Signing
lib/producer.dart
Future<void> minimalProducer() async {
final registry = SimpleTaskRegistry()
..register(
FunctionTaskHandler<void>(
name: 'email.send',
entrypoint: (context, args) async {
final to = args['to'] as String? ?? 'friend';
print('Queued email to $to');
return null;
},
),
);
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final stem = Stem(
broker: broker,
registry: registry,
backend: backend,
);
final taskId = await stem.enqueue(
'email.send',
args: {'to': 'hello@example.com', 'subject': 'Welcome'},
);
print('Enqueued $taskId');
await backend.close();
await broker.close();
}
lib/producer_redis.dart
Future<void> redisProducer() async {
final brokerUrl =
Platform.environment['STEM_BROKER_URL'] ?? 'redis://localhost:6379';
final broker = await RedisStreamsBroker.connect(brokerUrl);
final backend = await RedisResultBackend.connect('$brokerUrl/1');
final registry = SimpleTaskRegistry()
..register(
FunctionTaskHandler<void>(
name: 'report.generate',
entrypoint: (context, args) async {
final id = args['reportId'] as String? ?? 'unknown';
print('Queued report $id');
return null;
},
),
);
final stem = Stem(
broker: broker,
registry: registry,
backend: backend,
);
await stem.enqueue(
'report.generate',
args: {'reportId': 'monthly-2025-10'},
options: const TaskOptions(queue: 'reports'),
);
await backend.close();
await broker.close();
}
lib/producer_signed.dart
Future<void> signedProducer() async {
final config = StemConfig.fromEnvironment();
final signer = PayloadSigner.maybe(config.signing);
final registry = SimpleTaskRegistry()
..register(
FunctionTaskHandler<void>(
name: 'billing.charge',
entrypoint: (context, args) async {
final customerId = args['customerId'] as String? ?? 'unknown';
print('Queued charge for $customerId');
return null;
},
),
);
final broker = await RedisStreamsBroker.connect(
config.brokerUrl,
tls: config.tls,
);
final backend = InMemoryResultBackend();
final stem = Stem(
broker: broker,
registry: registry,
backend: backend,
signer: signer,
);
await stem.enqueue(
'billing.charge',
args: {'customerId': 'cust_123', 'amount': 4200},
);
await backend.close();
await broker.close();
}
Tips
- Always reuse a
Steminstance rather than creating one per request. - Use
TaskOptionsto set queue, retries, timeouts, and isolation. - Add custom metadata via the
metaargument for observability or downstream processing.
Worker
- Minimal
- Redis Broker
- Retries & Signals
bin/worker.dart
class EmailTask extends TaskHandler<void> {
String get name => 'email.send';
TaskOptions get options => const TaskOptions(maxRetries: 2);
Future<void> call(TaskContext context, Map<String, Object?> args) async {
final to = args['to'] as String;
print('Sending to $to (attempt ${context.attempt})');
}
}
Future<void> minimalWorker() async {
final registry = SimpleTaskRegistry()..register(EmailTask());
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final worker = Worker(
broker: broker,
registry: registry,
backend: backend,
queue: 'default',
);
await worker.start();
}
bin/worker_redis.dart
Future<void> redisWorker() async {
final brokerUrl =
Platform.environment['STEM_BROKER_URL'] ?? 'redis://localhost:6379';
final registry = SimpleTaskRegistry()..register(RedisEmailTask());
final worker = Worker(
broker: await RedisStreamsBroker.connect(brokerUrl),
registry: registry,
backend: await RedisResultBackend.connect('$brokerUrl/1'),
queue: 'default',
concurrency: Platform.numberOfProcessors,
);
await worker.start();
}
class RedisEmailTask extends TaskHandler<void> {
String get name => 'email.send';
TaskOptions get options => const TaskOptions(
queue: 'default',
maxRetries: 3,
visibilityTimeout: Duration(seconds: 30),
);
Future<void> call(TaskContext context, Map<String, Object?> args) async {}
}
bin/worker_retry.dart
class FlakyTask extends TaskHandler<void> {
String get name => 'demo.flaky';
TaskOptions get options => const TaskOptions(maxRetries: 2);
Future<void> call(TaskContext context, Map<String, Object?> args) async {
if (context.attempt < 2) {
throw StateError('Simulated failure');
}
print('Succeeded on attempt ${context.attempt}');
}
}
Future<void> retryWorker() async {
StemSignals.taskRetry.connect((payload, _) {
print('[retry] next run at: ${payload.nextRetryAt}');
});
final registry = SimpleTaskRegistry()..register(FlakyTask());
final worker = Worker(
broker: InMemoryBroker(),
registry: registry,
backend: InMemoryResultBackend(),
retryStrategy: ExponentialJitterRetryStrategy(
base: const Duration(milliseconds: 200),
max: const Duration(seconds: 1),
),
);
await worker.start();
}
Lifecycle Tips
- Call
worker.shutdown()on SIGINT/SIGTERM to drain in-flight tasks and emitworkerStopping/workerShutdownsignals. - Monitor heartbeats via
StemSignals.workerHeartbeator the heartbeat backend for liveness checks. - Use
WorkerLifecycleConfigto install signal handlers, configure soft/hard shutdown timeouts, and recycle isolates after N tasks or memory thresholds.
Putting It Together
A lightweight service wires the producer and worker into your application startup:
lib/bootstrap.dart
class StemRuntime {
StemRuntime({required this.registry, required this.brokerUrl});
final TaskRegistry registry;
final String brokerUrl;
final InMemoryBroker _stemBroker = InMemoryBroker();
final InMemoryResultBackend _stemBackend = InMemoryResultBackend();
final InMemoryBroker _workerBroker = InMemoryBroker();
final InMemoryResultBackend _workerBackend = InMemoryResultBackend();
late final Stem stem = Stem(
broker: _stemBroker,
registry: registry,
backend: _stemBackend,
);
late final Worker worker = Worker(
broker: _workerBroker,
registry: registry,
backend: _workerBackend,
);
Future<void> start() async {
await worker.start();
}
Future<void> stop() async {
await worker.shutdown();
await _workerBackend.close();
await _workerBroker.close();
await _stemBackend.close();
await _stemBroker.close();
}
}
Swap the in-memory adapters for Redis/Postgres when you deploy, keeping the API surface the same.
Checklist
- Reuse producer and worker objects—avoid per-request construction.
- Inject the
TaskRegistryfrom a central module so producers and workers stay in sync. - Capture task IDs returned by
Stem.enqueuewhen you need to poll results or correlate with your own auditing. - Emit lifecycle signals (
StemSignals) and wire logs/metrics early so production instrumentation is already in place. - For HTTP/GraphQL handlers, wrap enqueues in try/catch to surface validation errors before tasks hit the queue.
Next, continue with the Worker Control CLI or explore Signals for advanced instrumentation.