Skip to main content

Workers

Workers pull tasks, manage concurrency, and publish lifecycle signals. Use these guides to embed workers programmatically and operate them in production.

Minimal entrypoints

workers_programmatic.dart
class EmailTask extends TaskHandler<void> {

String get name => 'email.send';


TaskOptions get options => const TaskOptions(maxRetries: 2);


Future<void> call(TaskContext context, Map<String, Object?> args) async {
final to = args['to'] as String;
print('Sending to $to (attempt ${context.attempt})');
}
}

Future<void> minimalWorker() async {
final registry = SimpleTaskRegistry()..register(EmailTask());
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();

final worker = Worker(
broker: broker,
registry: registry,
backend: backend,
queue: 'default',
);

await worker.start();
}
workers_programmatic.dart
Future<void> minimalProducer() async {
final registry = SimpleTaskRegistry()
..register(
FunctionTaskHandler<void>(
name: 'email.send',
entrypoint: (context, args) async {
final to = args['to'] as String? ?? 'friend';
print('Queued email to $to');
return null;
},
),
);

final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final stem = Stem(
broker: broker,
registry: registry,
backend: backend,
);

final taskId = await stem.enqueue(
'email.send',
args: {'to': 'hello@example.com', 'subject': 'Welcome'},
);

print('Enqueued $taskId');
await backend.close();
await broker.close();
}

Redis-backed worker

workers_programmatic.dart
Future<void> redisWorker() async {
final brokerUrl =
Platform.environment['STEM_BROKER_URL'] ?? 'redis://localhost:6379';
final registry = SimpleTaskRegistry()..register(RedisEmailTask());

final worker = Worker(
broker: await RedisStreamsBroker.connect(brokerUrl),
registry: registry,
backend: await RedisResultBackend.connect('$brokerUrl/1'),
queue: 'default',
concurrency: Platform.numberOfProcessors,
);

await worker.start();
}

class RedisEmailTask extends TaskHandler<void> {

String get name => 'email.send';


TaskOptions get options => const TaskOptions(
queue: 'default',
maxRetries: 3,
visibilityTimeout: Duration(seconds: 30),
);


Future<void> call(TaskContext context, Map<String, Object?> args) async {}
}
workers_programmatic.dart
Future<void> redisProducer() async {
final brokerUrl =
Platform.environment['STEM_BROKER_URL'] ?? 'redis://localhost:6379';
final broker = await RedisStreamsBroker.connect(brokerUrl);
final backend = await RedisResultBackend.connect('$brokerUrl/1');
final registry = SimpleTaskRegistry()
..register(
FunctionTaskHandler<void>(
name: 'report.generate',
entrypoint: (context, args) async {
final id = args['reportId'] as String? ?? 'unknown';
print('Queued report $id');
return null;
},
),
);

final stem = Stem(
broker: broker,
registry: registry,
backend: backend,
);

await stem.enqueue(
'report.generate',
args: {'reportId': 'monthly-2025-10'},
options: const TaskOptions(queue: 'reports'),
);
await backend.close();
await broker.close();
}

Lifecycle overview

Workers connect to the broker, claim deliveries, execute task handlers, and emit lifecycle signals as they progress (taskReceived, taskPrerun, taskPostrun, taskSucceeded, taskFailed). Worker-level signals announce startup, readiness, heartbeat, and shutdown so dashboards and alerts can track capacity in near real time.

signals.dart
SignalSubscription registerWorkerSignals() {
return StemSignals.onWorkerReady((payload, _) {
print('Worker ready: ${payload.worker.id}');
}, workerId: 'signals-worker');
}

Shutdowns are cooperative: warm stops fetching new work, soft requests termination checkpoints, and hard requeues active deliveries. The Worker Control CLI sends those commands through the same control queues the dashboard uses, so operational tooling stays consistent.

Queue subscriptions

Workers can subscribe to:

  • A single queue (default: default) for straightforward deployments.
  • Multiple queues by configuring a routing subscription (priority queues, fan-out, or dedicated lanes per workload).

Queue subscriptions determine which stream shards the worker polls, so keep queue names stable and document them alongside task registries.

routing.dart
Future<(Stem, Worker)> bootstrapStem() async {
final routing = await loadRouting();
final registry = SimpleTaskRegistry()..register(EmailTask());
final config = StemConfig.fromEnvironment();
final subscription = RoutingSubscription(
queues: config.workerQueues.isEmpty
? [config.defaultQueue]
: config.workerQueues,
broadcastChannels: config.workerBroadcasts,
);

final stem = Stem(
broker: await RedisStreamsBroker.connect('redis://localhost:6379'),
registry: registry,
backend: InMemoryResultBackend(),
routing: routing,
);

final worker = Worker(
broker: await RedisStreamsBroker.connect('redis://localhost:6379'),
registry: registry,
backend: InMemoryResultBackend(),
subscription: subscription,
);

return (stem, worker);
}

class EmailTask extends TaskHandler<void> {

String get name => 'email.send';


TaskOptions get options => const TaskOptions(queue: 'default');


Future<void> call(TaskContext context, Map<String, Object?> args) async {}
}

Concurrency & autoscaling

Workers run multiple tasks in parallel using isolate pools. Configure base concurrency with concurrency, then enable autoscaling to expand/contract within a min/max range based on backlog and inflight counts.

Prefetch controls how aggressively a worker claims work ahead of execution. Use smaller values for fairness and larger values for throughput. If you're using autoscaling, align the prefetch multiplier with your maximum concurrency so scaling does not starve queues.

worker_control.dart
final worker = Worker(
broker: _autoscaleBroker,
registry: SimpleTaskRegistry(),
backend: _autoscaleBackend,
queue: 'critical',
concurrency: 12,
autoscale: const WorkerAutoscaleConfig(
enabled: true,
minConcurrency: 2,
maxConcurrency: 12,
scaleUpStep: 2,
scaleDownStep: 1,
idlePeriod: Duration(seconds: 45),
tick: Duration(milliseconds: 250),
),
);

Key environment variables

  • STEM_BROKER_URL – broker connection string (Redis/Postgres/memory).

  • STEM_RESULT_BACKEND_URL – durable result backend (optional but recommended).

  • STEM_DEFAULT_QUEUE – fallback queue when routing is unset.

  • STEM_PREFETCH_MULTIPLIER – prefetch multiplier applied to concurrency.

  • STEM_WORKER_QUEUES – explicit queue subscriptions (comma separated).

  • STEM_WORKER_BROADCASTS – broadcast channel subscriptions (comma separated).

  • STEM_WORKER_NAMESPACE – worker heartbeat/control namespace (observability).

  • STEM_ROUTING_CONFIG – path to routing config (YAML/JSON).

  • STEM_SIGNING_* – enable payload signing for tamper detection.

  • STEM_TLS_* – TLS settings for broker/backends.

  • Programmatic Integration – Wire producers and workers inside your Dart services (includes in-memory and Redis examples).

  • Worker Control CLI – Inspect, revoke, scale, and shut down workers remotely.

  • Daemonization Guide – Run workers under systemd, launchd, or custom supervisors.

Looking for retry tuning or task registries? See the Core Concepts.