Connect to Infrastructure
Graduate from the in-memory demo to a multi-process setup backed by Redis or Postgres. You will run workers, Beat, and the CLI in separate terminals while exploring routing, broadcast delivery, and canvas composition with persistent storage.
1. Run Redis and Postgres Locally
Docker is the fastest way to spin up dependencies:
# Redis Streams for broker, locks, rate limiting, and schedules.
docker run --rm -p 6379:6379 redis:7-alpine
# Postgres for durable task results or schedule storage (optional now, useful later).
docker run --rm -p 5432:5432 \
-e POSTGRES_PASSWORD=postgres \
postgres:14
Export the connection details so producers, workers, and Beat share them:
export STEM_BROKER_URL=redis://localhost:6379
export STEM_RESULT_BACKEND_URL=redis://localhost:6379/1
export STEM_SCHEDULE_STORE_URL=redis://localhost:6379/2
export STEM_CONTROL_NAMESPACE=stem
2. Bootstrap Stem Config
Use StemConfig.fromEnvironment() to hydrate adapters from the environment and
share them across your app. Split the bootstrap into smaller steps so each
piece is easy to scan and reuse:
Load configuration
final config = StemConfig.fromEnvironment(Platform.environment);
Connect adapters
final broker = await RedisStreamsBroker.connect(
config.brokerUrl,
tls: config.tls,
);
final backend = await RedisResultBackend.connect(
_resolveRedisUrl(config.brokerUrl, config.resultBackendUrl, 1),
tls: config.tls,
);
final revokeStore = await RedisRevokeStore.connect(
_resolveRedisUrl(config.brokerUrl, config.revokeStoreUrl, 2),
);
final routing = await _loadRoutingRegistry(config);
final rateLimiter = await connectRateLimiter(config);
Create the Stem producer
final stem = Stem(
broker: broker,
backend: backend,
registry: registry,
routing: routing,
);
Create the worker
final subscription = _buildSubscription(config);
final worker = Worker(
broker: broker,
backend: backend,
registry: registry,
revokeStore: revokeStore,
rateLimiter: rateLimiter,
queue: config.defaultQueue,
subscription: subscription,
concurrency: 8,
autoscale: const WorkerAutoscaleConfig(
enabled: true,
minConcurrency: 2,
maxConcurrency: 16,
backlogPerIsolate: 2.0,
idlePeriod: Duration(seconds: 45),
),
);
Together, these steps give you access to routing, rate limiting, revoke storage, and queue configuration—all backed by Redis.
3. Launch Workers, Beat, and Producers
With the environment configured, run Stem components from separate terminals:
# Terminal 1 — run a worker process (set STEM_WORKER_COMMAND or pass --command).
export STEM_WORKER_COMMAND="dart run bin/worker.dart"
stem worker multi start alpha --queue default --queue reports --queue emails
# Terminal 2 — apply schedules and run Beat (Dart entrypoint).
stem schedule apply --file config/schedules.json --yes
stem schedule list
dart run packages/stem/example/scheduler_observability/bin/beat.dart
Use a producer entrypoint to enqueue work:
Future<void> enqueueWithRedis() async {
final brokerUrl =
Platform.environment['STEM_BROKER_URL'] ?? 'redis://localhost:6379';
final broker = await RedisStreamsBroker.connect(brokerUrl);
final backend = await RedisResultBackend.connect('$brokerUrl/1');
final registry = SimpleTaskRegistry()
..register(
FunctionTaskHandler<void>(
name: 'reports.generate',
entrypoint: (context, args) async {
final id = args['reportId'] as String? ?? 'unknown';
print('Queued report $id');
return null;
},
),
);
final stem = Stem(
broker: broker,
registry: registry,
backend: backend,
);
await stem.enqueue(
'reports.generate',
args: {'reportId': 'monthly-2025-10'},
options: const TaskOptions(queue: 'reports', maxRetries: 3),
meta: {'requestedBy': 'finance'},
);
await backend.close();
await broker.close();
}
Routing configuration supports default queue aliases, glob-based routing
rules, and broadcast channels. A minimal config/routing.yaml might look like:
default_queue: critical
queues:
reports:
routing_key: reports.generate
priority_range: [2, 7]
emails:
routing_key: billing.email-*
broadcasts:
maintenance:
delivery: fanout
Stem clamps priorities to queue-defined ranges and publishes broadcast tasks to all subscribed workers exactly once per acknowledgement window.
4. Coordinate Work with Canvas and Result Backend
Now that Redis backs the result store, you can orchestrate more complex pipelines and query progress from any process:
Future<void> runCanvasFlows(
Bootstrap bootstrap,
SimpleTaskRegistry registry,
) async {
final canvas = Canvas(
broker: bootstrap.stem.broker,
backend: await RedisResultBackend.connect(
_resolveRedisUrl(
bootstrap.config.brokerUrl,
bootstrap.config.resultBackendUrl,
1,
),
),
registry: registry,
);
final ids = await canvas.group([
task('media.resize', args: {'file': 'hero.png'}),
task('media.resize', args: {'file': 'thumb.png'}),
], groupId: 'image-assets');
final chordId = await canvas.chord(
body: [
task('reports.render', args: {'week': '2024-W28'}),
task('reports.render', args: {'week': '2024-W29'}),
],
callback: task(
'billing.email-receipt',
args: {'to': 'finance@example.com'},
options: const TaskOptions(queue: 'emails'),
),
);
print('Group dispatched: $ids');
print('Chord callback task id: $chordId');
}
Later, you can monitor status from any machine:
Future<void> inspectChordStatus(String chordId) async {
final backend = await RedisResultBackend.connect(
_resolveRedisUrl(
Platform.environment['STEM_BROKER_URL']!,
Platform.environment['STEM_RESULT_BACKEND_URL'],
1,
),
);
final status = await backend.get(chordId);
print('Chord completion state: ${status?.state}');
}
5. Listen to Signals for Cross-Cutting Integrations
Signals surface lifecycle milestones that you can pipe into analytics or incident tooling:
void installSignalHandlers() {
StemSignals.taskSucceeded.connect((payload, _) {
if (payload.taskName == 'reports.render') {
print('Report ${payload.taskId} succeeded');
}
});
StemSignals.workerReady.connect((payload, _) {
print(
'Worker ${payload.worker.id} ready '
'(queues=${payload.worker.queues.join(",")})',
);
});
}
Call installSignalHandlers() during bootstrap before workers or producers
start emitting events.
6. What’s Next
- Keep the infrastructure running and head to Observe & Operate to enable telemetry, inspect heartbeats, replay DLQs, and issue remote control commands.
- Browse the runnable examples under
examples/for Redis/Postgres, mixed-cluster, autoscaling, scheduler observability, and signing-key rotation drills you can adapt to your environment.