Skip to main content

Workers

Workers pull tasks, manage concurrency, and publish lifecycle signals. Use these guides to embed workers programmatically and operate them in production.

Minimal entrypoints

workers_programmatic.dart
class EmailTask extends TaskHandler<void> {

String get name => 'email.send';


TaskOptions get options => const TaskOptions(maxRetries: 2);


Future<void> call(TaskContext context, Map<String, Object?> args) async {
final to = args['to'] as String;
print('Sending to $to (attempt ${context.attempt})');
}
}

Future<void> minimalWorker() async {
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();

final worker = Worker(
broker: broker,
backend: backend,
tasks: [EmailTask()],
queue: 'default',
);

await worker.start();
}
workers_programmatic.dart
Future<void> minimalProducer() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<void>(
name: 'email.send',
entrypoint: (context, args) async {
final to = args['to'] as String? ?? 'friend';
print('Queued email to $to');
return null;
},
),
],
);

final taskId = await app.enqueue(
'email.send',
args: {'to': 'hello@example.com', 'subject': 'Welcome'},
);

print('Enqueued $taskId');
await app.waitForTask<void>(taskId);
await app.close();
}

Redis-backed worker

workers_programmatic.dart
Future<void> redisWorker() async {
final brokerUrl =
Platform.environment['STEM_BROKER_URL'] ?? 'redis://localhost:6379';

final worker = Worker(
broker: await RedisStreamsBroker.connect(brokerUrl),
backend: await RedisResultBackend.connect('$brokerUrl/1'),
tasks: [RedisEmailTask()],
queue: 'default',
concurrency: Platform.numberOfProcessors,
);

await worker.start();
}

class RedisEmailTask extends TaskHandler<void> {

String get name => 'email.send';


TaskOptions get options => const TaskOptions(
queue: 'default',
maxRetries: 3,
visibilityTimeout: Duration(seconds: 30),
);


Future<void> call(TaskContext context, Map<String, Object?> args) async {}
}
workers_programmatic.dart
Future<void> redisProducer() async {
final brokerUrl =
Platform.environment['STEM_BROKER_URL'] ?? 'redis://localhost:6379';
final client = await StemClient.fromUrl(
brokerUrl,
adapters: const [StemRedisAdapter()],
overrides: StemStoreOverrides(backend: '$brokerUrl/1'),
tasks: [
FunctionTaskHandler<void>(
name: 'report.generate',
entrypoint: (context, args) async {
final id = args['reportId'] as String? ?? 'unknown';
print('Queued report $id');
return null;
},
),
],
);

await client.enqueue(
'report.generate',
args: {'reportId': 'monthly-2025-10'},
options: const TaskOptions(queue: 'reports'),
);
await client.close();
}

Lifecycle overview

Workers connect to the broker, claim deliveries, execute task handlers, and emit lifecycle signals as they progress (taskReceived, taskPrerun, taskPostrun, taskSucceeded, taskFailed). Worker-level signals announce startup, readiness, heartbeat, and shutdown so dashboards and alerts can track capacity in near real time.

signals.dart
SignalSubscription registerWorkerSignals() {
return StemSignals.onWorkerReady((payload, _) {
print('Worker ready: ${payload.worker.id}');
}, workerId: 'signals-worker');
}

Shutdowns are cooperative: warm stops fetching new work, soft requests termination checkpoints, and hard requeues active deliveries. The Worker Control CLI sends those commands through the same control queues the dashboard uses, so operational tooling stays consistent.

Queue subscriptions

Workers can subscribe to:

  • A single queue (default: default) for straightforward deployments.
  • Multiple queues by configuring a routing subscription (priority queues, fan-out, or dedicated lanes per workload).

Queue subscriptions determine which stream shards the worker polls, so keep queue names stable and document them alongside the shared task definitions your service uses.

routing.dart
Future<(StemClient, Worker)> bootstrapStem() async {
final routing = await loadRouting();
final tasks = [EmailTask()];
final config = StemConfig.fromEnvironment();
final subscription = RoutingSubscription(
queues: config.workerQueues.isEmpty
? [config.defaultQueue]
: config.workerQueues,
broadcastChannels: config.workerBroadcasts,
);

final client = await StemClient.create(
broker: StemBrokerFactory(
create: () => RedisStreamsBroker.connect('redis://localhost:6379'),
dispose: (broker) => broker.close(),
),
backend: StemBackendFactory.inMemory(),
tasks: tasks,
routing: routing,
);

final worker = await client.createWorker(
workerConfig: StemWorkerConfig(
subscription: subscription,
),
);

return (client, worker);
}

class EmailTask extends TaskHandler<void> {

String get name => 'email.send';


TaskOptions get options => const TaskOptions(queue: 'default');


Future<void> call(TaskContext context, Map<String, Object?> args) async {}
}

Concurrency & autoscaling

Workers run multiple tasks in parallel using isolate pools. Configure base concurrency with concurrency, then enable autoscaling to expand/contract within a min/max range based on backlog and inflight counts.

Prefetch controls how aggressively a worker claims work ahead of execution. Use smaller values for fairness and larger values for throughput. If you're using autoscaling, align the prefetch multiplier with your maximum concurrency so scaling does not starve queues.

worker_control.dart
final worker = Worker(
broker: _autoscaleBroker,
backend: _autoscaleBackend,
tasks: const [],
queue: 'critical',
concurrency: 12,
autoscale: const WorkerAutoscaleConfig(
enabled: true,
minConcurrency: 2,
maxConcurrency: 12,
scaleUpStep: 2,
scaleDownStep: 1,
idlePeriod: Duration(seconds: 45),
tick: Duration(milliseconds: 250),
),
);

Key environment variables

  • STEM_BROKER_URL – broker connection string (Redis/Postgres/memory).

  • STEM_RESULT_BACKEND_URL – durable result backend (optional but recommended).

  • STEM_DEFAULT_QUEUE – fallback queue when routing is unset.

  • STEM_PREFETCH_MULTIPLIER – prefetch multiplier applied to concurrency.

  • STEM_WORKER_QUEUES – explicit queue subscriptions (comma separated).

  • STEM_WORKER_BROADCASTS – broadcast channel subscriptions (comma separated).

  • STEM_WORKER_NAMESPACE – worker heartbeat/control namespace (observability).

  • STEM_ROUTING_CONFIG – path to routing config (YAML/JSON).

  • STEM_SIGNING_* – enable payload signing for tamper detection.

  • STEM_TLS_* – TLS settings for broker/backends.

  • Programmatic Integration – Wire producers and workers inside your Dart services (includes in-memory and Redis examples).

  • Worker Control CLI – Inspect, revoke, scale, and shut down workers remotely.

  • Daemonization Guide – Run workers under systemd, launchd, or custom supervisors.

Looking for retry tuning or task-definition guidance? See the Core Concepts.