Routing Configuration
Stem workers and publishers resolve queue and broadcast targets from the routing
file referenced by STEM_ROUTING_CONFIG. The file is parsed into a typed
RoutingConfig, validated by the RoutingRegistry, and used by helpers such as
buildWorkerSubscription. The loader helpers (RoutingConfigLoader,
WorkerSubscriptionBuilder, buildWorkerSubscription) live in the stem_cli
package and are used by the CLI to produce friendly diagnostics. In application
code you can use the core stem types directly (see “Loading subscriptions”).
When no file is supplied Stem falls back to a legacy single-queue configuration.
- config/routing.yaml
- lib/routing.dart
default_queue:
alias: default
queue: primary
fallbacks:
- secondary
queues:
primary:
exchange: jobs
routing_key: jobs.default
priority_range: [0, 9]
secondary: {}
broadcasts:
control:
delivery: at-least-once
updates:
delivery: at-most-once
routes:
- match:
task: reports.*
target:
type: queue
name: primary
- match:
task: control.*
target:
type: broadcast
name: control
Future<RoutingRegistry> loadRouting() async {
final source = await File('config/routing.yaml').readAsString();
return RoutingRegistry.fromYaml(source);
}
final registry = RoutingRegistry(
RoutingConfig(
defaultQueue: const DefaultQueueConfig(alias: 'default', queue: 'primary'),
queues: {'primary': QueueDefinition(name: 'primary')},
routes: [
RouteDefinition(
match: RouteMatch.fromJson(const {'task': 'reports.*'}),
target: RouteTarget(type: 'queue', name: 'primary'),
),
],
),
);
Queue priority ranges
- Each queue definition accepts an optional
priority_rangeeither as a two element list ([min, max]) or an object ({ min: 0, max: 9 }). - When omitted, the queue defaults to
min: 0,max: 9. Values outside the range trigger aFormatExceptionduring load. - The routing registry clamps any priority assigned via
RoutingInfo.priorityor route overrides into the configured range, guaranteeing that both Redis and Postgres brokers store buckets within[min, max]. - Publishers may continue to set
Envelope.priority; the registry will respect that hint when resolving a route. When no range is defined the value is clamped to[0, 9].
final priorityQueue = QueueDefinition(
name: 'priority',
priorityRange: const QueuePriorityRange(min: 0, max: 5),
);
final priorityRegistry = RoutingRegistry(
RoutingConfig(
defaultQueue: const DefaultQueueConfig(alias: 'default', queue: 'priority'),
queues: {'priority': priorityQueue},
),
);
Broadcast channels
- Add broadcast channels under
broadcasts; each entry is keyed by the logical channel name and may declare:delivery: semantic hint for consumers.at-least-onceis the default and matches the behaviour of both Redis Streams and Postgres fan-out tables.durability: optional metadata surfaced inRoutingInfo.meta. Current brokers treat it as an advisory flag.
- Routes may target broadcasts via
target: { type: broadcast, name: channel }and workers subscribe by listing the channel underSTEM_WORKER_BROADCASTSor via the CLI--broadcastflag. - Broadcast deliveries reuse the same envelope payload; brokers set
RoutingInfo.broadcastChannelto the logical channel and ensure each subscriber receives the message exactly once when acked.
Loading subscriptions
RoutingConfigLoader,WorkerSubscriptionBuilder, andbuildWorkerSubscriptionlive in thestem_clipackage. The CLI uses them to provide friendly diagnostics and to honor--queue/--broadcastwhen building worker subscriptions.- In application code you can either import
package:stem_cli/stem_cli.dart(seepackages/stem/example/email_service) or build the registry directly: load YAML withRoutingConfig.fromYaml, then construct aRoutingRegistryandRoutingSubscriptionyourself.
Using the config in Dart
Load the routing file once during service start-up and reuse the registry across producers and workers:
Future<(Stem, Worker)> bootstrapStem() async {
final routing = await loadRouting();
final registry = SimpleTaskRegistry()..register(EmailTask());
final config = StemConfig.fromEnvironment();
final subscription = RoutingSubscription(
queues: config.workerQueues.isEmpty
? [config.defaultQueue]
: config.workerQueues,
broadcastChannels: config.workerBroadcasts,
);
final stem = Stem(
broker: await RedisStreamsBroker.connect('redis://localhost:6379'),
registry: registry,
backend: InMemoryResultBackend(),
routing: routing,
);
final worker = Worker(
broker: await RedisStreamsBroker.connect('redis://localhost:6379'),
registry: registry,
backend: InMemoryResultBackend(),
subscription: subscription,
);
return (stem, worker);
}
class EmailTask extends TaskHandler<void> {
String get name => 'email.send';
TaskOptions get options => const TaskOptions(queue: 'default');
Future<void> call(TaskContext context, Map<String, Object?> args) async {}
}
For lightweight services or tests, you can construct the registry inline:
final inlineRegistry = RoutingRegistry(
RoutingConfig(
defaultQueue: const DefaultQueueConfig(alias: 'default', queue: 'primary'),
queues: {'primary': QueueDefinition(name: 'primary')},
routes: [
RouteDefinition(
match: RouteMatch.fromJson(const {'task': 'reports.*'}),
target: RouteTarget(type: 'queue', name: 'primary'),
),
],
),
);
Both approaches keep routing logic declarative while letting you evolve queue topology without editing code.