Workers
Workers pull tasks, manage concurrency, and publish lifecycle signals. Use these guides to embed workers programmatically and operate them in production.
Minimal entrypoints
class EmailTask extends TaskHandler<void> {
String get name => 'email.send';
TaskOptions get options => const TaskOptions(maxRetries: 2);
Future<void> call(TaskContext context, Map<String, Object?> args) async {
final to = args['to'] as String;
print('Sending to $to (attempt ${context.attempt})');
}
}
Future<void> minimalWorker() async {
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final worker = Worker(
broker: broker,
backend: backend,
tasks: [EmailTask()],
queue: 'default',
);
await worker.start();
}
Future<void> minimalProducer() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<void>(
name: 'email.send',
entrypoint: (context, args) async {
final to = args['to'] as String? ?? 'friend';
print('Queued email to $to');
return null;
},
),
],
);
final taskId = await app.enqueue(
'email.send',
args: {'to': 'hello@example.com', 'subject': 'Welcome'},
);
print('Enqueued $taskId');
await app.waitForTask<void>(taskId);
await app.close();
}
Redis-backed worker
Future<void> redisWorker() async {
final brokerUrl =
Platform.environment['STEM_BROKER_URL'] ?? 'redis://localhost:6379';
final worker = Worker(
broker: await RedisStreamsBroker.connect(brokerUrl),
backend: await RedisResultBackend.connect('$brokerUrl/1'),
tasks: [RedisEmailTask()],
queue: 'default',
concurrency: Platform.numberOfProcessors,
);
await worker.start();
}
class RedisEmailTask extends TaskHandler<void> {
String get name => 'email.send';
TaskOptions get options => const TaskOptions(
queue: 'default',
maxRetries: 3,
visibilityTimeout: Duration(seconds: 30),
);
Future<void> call(TaskContext context, Map<String, Object?> args) async {}
}
Future<void> redisProducer() async {
final brokerUrl =
Platform.environment['STEM_BROKER_URL'] ?? 'redis://localhost:6379';
final client = await StemClient.fromUrl(
brokerUrl,
adapters: const [StemRedisAdapter()],
overrides: StemStoreOverrides(backend: '$brokerUrl/1'),
tasks: [
FunctionTaskHandler<void>(
name: 'report.generate',
entrypoint: (context, args) async {
final id = args['reportId'] as String? ?? 'unknown';
print('Queued report $id');
return null;
},
),
],
);
await client.enqueue(
'report.generate',
args: {'reportId': 'monthly-2025-10'},
options: const TaskOptions(queue: 'reports'),
);
await client.close();
}
Lifecycle overview
Workers connect to the broker, claim deliveries, execute task handlers, and
emit lifecycle signals as they progress (taskReceived, taskPrerun,
taskPostrun, taskSucceeded, taskFailed). Worker-level signals announce
startup, readiness, heartbeat, and shutdown so dashboards and alerts can track
capacity in near real time.
SignalSubscription registerWorkerSignals() {
return StemSignals.onWorkerReady((payload, _) {
print('Worker ready: ${payload.worker.id}');
}, workerId: 'signals-worker');
}
Shutdowns are cooperative: warm stops fetching new work, soft requests termination checkpoints, and hard requeues active deliveries. The Worker Control CLI sends those commands through the same control queues the dashboard uses, so operational tooling stays consistent.
Queue subscriptions
Workers can subscribe to:
- A single queue (default:
default) for straightforward deployments. - Multiple queues by configuring a routing subscription (priority queues, fan-out, or dedicated lanes per workload).
Queue subscriptions determine which stream shards the worker polls, so keep queue names stable and document them alongside the shared task definitions your service uses.
Future<(StemClient, Worker)> bootstrapStem() async {
final routing = await loadRouting();
final tasks = [EmailTask()];
final config = StemConfig.fromEnvironment();
final subscription = RoutingSubscription(
queues: config.workerQueues.isEmpty
? [config.defaultQueue]
: config.workerQueues,
broadcastChannels: config.workerBroadcasts,
);
final client = await StemClient.create(
broker: StemBrokerFactory(
create: () => RedisStreamsBroker.connect('redis://localhost:6379'),
dispose: (broker) => broker.close(),
),
backend: StemBackendFactory.inMemory(),
tasks: tasks,
routing: routing,
);
final worker = await client.createWorker(
workerConfig: StemWorkerConfig(
subscription: subscription,
),
);
return (client, worker);
}
class EmailTask extends TaskHandler<void> {
String get name => 'email.send';
TaskOptions get options => const TaskOptions(queue: 'default');
Future<void> call(TaskContext context, Map<String, Object?> args) async {}
}
Concurrency & autoscaling
Workers run multiple tasks in parallel using isolate pools. Configure base
concurrency with concurrency, then enable autoscaling to expand/contract
within a min/max range based on backlog and inflight counts.
Prefetch controls how aggressively a worker claims work ahead of execution. Use smaller values for fairness and larger values for throughput. If you're using autoscaling, align the prefetch multiplier with your maximum concurrency so scaling does not starve queues.
final worker = Worker(
broker: _autoscaleBroker,
backend: _autoscaleBackend,
tasks: const [],
queue: 'critical',
concurrency: 12,
autoscale: const WorkerAutoscaleConfig(
enabled: true,
minConcurrency: 2,
maxConcurrency: 12,
scaleUpStep: 2,
scaleDownStep: 1,
idlePeriod: Duration(seconds: 45),
tick: Duration(milliseconds: 250),
),
);
Key environment variables
-
STEM_BROKER_URL– broker connection string (Redis/Postgres/memory). -
STEM_RESULT_BACKEND_URL– durable result backend (optional but recommended). -
STEM_DEFAULT_QUEUE– fallback queue when routing is unset. -
STEM_PREFETCH_MULTIPLIER– prefetch multiplier applied to concurrency. -
STEM_WORKER_QUEUES– explicit queue subscriptions (comma separated). -
STEM_WORKER_BROADCASTS– broadcast channel subscriptions (comma separated). -
STEM_WORKER_NAMESPACE– worker heartbeat/control namespace (observability). -
STEM_ROUTING_CONFIG– path to routing config (YAML/JSON). -
STEM_SIGNING_*– enable payload signing for tamper detection. -
STEM_TLS_*– TLS settings for broker/backends. -
Programmatic Integration – Wire producers and workers inside your Dart services (includes in-memory and Redis examples).
-
Worker Control CLI – Inspect, revoke, scale, and shut down workers remotely.
-
Daemonization Guide – Run workers under systemd, launchd, or custom supervisors.
Looking for retry tuning or task-definition guidance? See the Core Concepts.