Workers
Workers pull tasks, manage concurrency, and publish lifecycle signals. Use these guides to embed workers programmatically and operate them in production.
Minimal entrypoints
class EmailTask extends TaskHandler<void> {
String get name => 'email.send';
TaskOptions get options => const TaskOptions(maxRetries: 2);
Future<void> call(TaskContext context, Map<String, Object?> args) async {
final to = args['to'] as String;
print('Sending to $to (attempt ${context.attempt})');
}
}
Future<void> minimalWorker() async {
final registry = SimpleTaskRegistry()..register(EmailTask());
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final worker = Worker(
broker: broker,
registry: registry,
backend: backend,
queue: 'default',
);
await worker.start();
}
Future<void> minimalProducer() async {
final registry = SimpleTaskRegistry()
..register(
FunctionTaskHandler<void>(
name: 'email.send',
entrypoint: (context, args) async {
final to = args['to'] as String? ?? 'friend';
print('Queued email to $to');
return null;
},
),
);
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final stem = Stem(
broker: broker,
registry: registry,
backend: backend,
);
final taskId = await stem.enqueue(
'email.send',
args: {'to': 'hello@example.com', 'subject': 'Welcome'},
);
print('Enqueued $taskId');
await backend.close();
await broker.close();
}
Redis-backed worker
Future<void> redisWorker() async {
final brokerUrl =
Platform.environment['STEM_BROKER_URL'] ?? 'redis://localhost:6379';
final registry = SimpleTaskRegistry()..register(RedisEmailTask());
final worker = Worker(
broker: await RedisStreamsBroker.connect(brokerUrl),
registry: registry,
backend: await RedisResultBackend.connect('$brokerUrl/1'),
queue: 'default',
concurrency: Platform.numberOfProcessors,
);
await worker.start();
}
class RedisEmailTask extends TaskHandler<void> {
String get name => 'email.send';
TaskOptions get options => const TaskOptions(
queue: 'default',
maxRetries: 3,
visibilityTimeout: Duration(seconds: 30),
);
Future<void> call(TaskContext context, Map<String, Object?> args) async {}
}
Future<void> redisProducer() async {
final brokerUrl =
Platform.environment['STEM_BROKER_URL'] ?? 'redis://localhost:6379';
final broker = await RedisStreamsBroker.connect(brokerUrl);
final backend = await RedisResultBackend.connect('$brokerUrl/1');
final registry = SimpleTaskRegistry()
..register(
FunctionTaskHandler<void>(
name: 'report.generate',
entrypoint: (context, args) async {
final id = args['reportId'] as String? ?? 'unknown';
print('Queued report $id');
return null;
},
),
);
final stem = Stem(
broker: broker,
registry: registry,
backend: backend,
);
await stem.enqueue(
'report.generate',
args: {'reportId': 'monthly-2025-10'},
options: const TaskOptions(queue: 'reports'),
);
await backend.close();
await broker.close();
}
Lifecycle overview
Workers connect to the broker, claim deliveries, execute task handlers, and
emit lifecycle signals as they progress (taskReceived, taskPrerun,
taskPostrun, taskSucceeded, taskFailed). Worker-level signals announce
startup, readiness, heartbeat, and shutdown so dashboards and alerts can track
capacity in near real time.
SignalSubscription registerWorkerSignals() {
return StemSignals.onWorkerReady((payload, _) {
print('Worker ready: ${payload.worker.id}');
}, workerId: 'signals-worker');
}
Shutdowns are cooperative: warm stops fetching new work, soft requests termination checkpoints, and hard requeues active deliveries. The Worker Control CLI sends those commands through the same control queues the dashboard uses, so operational tooling stays consistent.
Queue subscriptions
Workers can subscribe to:
- A single queue (default:
default) for straightforward deployments. - Multiple queues by configuring a routing subscription (priority queues, fan-out, or dedicated lanes per workload).
Queue subscriptions determine which stream shards the worker polls, so keep queue names stable and document them alongside task registries.
Future<(Stem, Worker)> bootstrapStem() async {
final routing = await loadRouting();
final registry = SimpleTaskRegistry()..register(EmailTask());
final config = StemConfig.fromEnvironment();
final subscription = RoutingSubscription(
queues: config.workerQueues.isEmpty
? [config.defaultQueue]
: config.workerQueues,
broadcastChannels: config.workerBroadcasts,
);
final stem = Stem(
broker: await RedisStreamsBroker.connect('redis://localhost:6379'),
registry: registry,
backend: InMemoryResultBackend(),
routing: routing,
);
final worker = Worker(
broker: await RedisStreamsBroker.connect('redis://localhost:6379'),
registry: registry,
backend: InMemoryResultBackend(),
subscription: subscription,
);
return (stem, worker);
}
class EmailTask extends TaskHandler<void> {
String get name => 'email.send';
TaskOptions get options => const TaskOptions(queue: 'default');
Future<void> call(TaskContext context, Map<String, Object?> args) async {}
}
Concurrency & autoscaling
Workers run multiple tasks in parallel using isolate pools. Configure base
concurrency with concurrency, then enable autoscaling to expand/contract
within a min/max range based on backlog and inflight counts.
Prefetch controls how aggressively a worker claims work ahead of execution. Use smaller values for fairness and larger values for throughput. If you're using autoscaling, align the prefetch multiplier with your maximum concurrency so scaling does not starve queues.
final worker = Worker(
broker: _autoscaleBroker,
registry: SimpleTaskRegistry(),
backend: _autoscaleBackend,
queue: 'critical',
concurrency: 12,
autoscale: const WorkerAutoscaleConfig(
enabled: true,
minConcurrency: 2,
maxConcurrency: 12,
scaleUpStep: 2,
scaleDownStep: 1,
idlePeriod: Duration(seconds: 45),
tick: Duration(milliseconds: 250),
),
);
Key environment variables
-
STEM_BROKER_URL– broker connection string (Redis/Postgres/memory). -
STEM_RESULT_BACKEND_URL– durable result backend (optional but recommended). -
STEM_DEFAULT_QUEUE– fallback queue when routing is unset. -
STEM_PREFETCH_MULTIPLIER– prefetch multiplier applied to concurrency. -
STEM_WORKER_QUEUES– explicit queue subscriptions (comma separated). -
STEM_WORKER_BROADCASTS– broadcast channel subscriptions (comma separated). -
STEM_WORKER_NAMESPACE– worker heartbeat/control namespace (observability). -
STEM_ROUTING_CONFIG– path to routing config (YAML/JSON). -
STEM_SIGNING_*– enable payload signing for tamper detection. -
STEM_TLS_*– TLS settings for broker/backends. -
Programmatic Integration – Wire producers and workers inside your Dart services (includes in-memory and Redis examples).
-
Worker Control CLI – Inspect, revoke, scale, and shut down workers remotely.
-
Daemonization Guide – Run workers under systemd, launchd, or custom supervisors.
Looking for retry tuning or task registries? See the Core Concepts.