Skip to main content

Routing Configuration

Stem workers and publishers resolve queue and broadcast targets from the routing file referenced by STEM_ROUTING_CONFIG. The file is parsed into a typed RoutingConfig, validated by the RoutingRegistry, and used by helpers such as buildWorkerSubscription. The loader helpers (RoutingConfigLoader, WorkerSubscriptionBuilder, buildWorkerSubscription) live in the stem_cli package and are used by the CLI to produce friendly diagnostics. In application code you can use the core stem types directly (see “Loading subscriptions”). When no file is supplied Stem falls back to a legacy single-queue configuration.

default_queue:
alias: default
queue: primary
fallbacks:
- secondary
queues:
primary:
exchange: jobs
routing_key: jobs.default
priority_range: [0, 9]
secondary: {}
broadcasts:
control:
delivery: at-least-once
updates:
delivery: at-most-once
routes:
- match:
task: reports.*
target:
type: queue
name: primary
- match:
task: control.*
target:
type: broadcast
name: control

Queue priority ranges

  • Each queue definition accepts an optional priority_range either as a two element list ([min, max]) or an object ({ min: 0, max: 9 }).
  • When omitted, the queue defaults to min: 0, max: 9. Values outside the range trigger a FormatException during load.
  • The routing registry clamps any priority assigned via RoutingInfo.priority or route overrides into the configured range, guaranteeing that both Redis and Postgres brokers store buckets within [min, max].
  • Publishers may continue to set Envelope.priority; the registry will respect that hint when resolving a route. When no range is defined the value is clamped to [0, 9].
lib/routing.dart
final priorityQueue = QueueDefinition(
name: 'priority',
priorityRange: const QueuePriorityRange(min: 0, max: 5),
);

final priorityRegistry = RoutingRegistry(
RoutingConfig(
defaultQueue: const DefaultQueueConfig(alias: 'default', queue: 'priority'),
queues: {'priority': priorityQueue},
),
);

Broadcast channels

  • Add broadcast channels under broadcasts; each entry is keyed by the logical channel name and may declare:
    • delivery: semantic hint for consumers. at-least-once is the default and matches the behaviour of both Redis Streams and Postgres fan-out tables.
    • durability: optional metadata surfaced in RoutingInfo.meta. Current brokers treat it as an advisory flag.
  • Routes may target broadcasts via target: { type: broadcast, name: channel } and workers subscribe by listing the channel under STEM_WORKER_BROADCASTS or via the CLI --broadcast flag.
  • Broadcast deliveries reuse the same envelope payload; brokers set RoutingInfo.broadcastChannel to the logical channel and ensure each subscriber receives the message exactly once when acked.

Loading subscriptions

  • RoutingConfigLoader, WorkerSubscriptionBuilder, and buildWorkerSubscription live in the stem_cli package. The CLI uses them to provide friendly diagnostics and to honor --queue/--broadcast when building worker subscriptions.
  • In application code you can either import package:stem_cli/stem_cli.dart (see packages/stem/example/email_service) or build the registry directly: load YAML with RoutingConfig.fromYaml, then construct a RoutingRegistry and RoutingSubscription yourself.

Using the config in Dart

Load the routing file once during service start-up and reuse the registry across producers and workers:

lib/routing.dart
Future<(Stem, Worker)> bootstrapStem() async {
final routing = await loadRouting();
final registry = SimpleTaskRegistry()..register(EmailTask());
final config = StemConfig.fromEnvironment();
final subscription = RoutingSubscription(
queues: config.workerQueues.isEmpty
? [config.defaultQueue]
: config.workerQueues,
broadcastChannels: config.workerBroadcasts,
);

final stem = Stem(
broker: await RedisStreamsBroker.connect('redis://localhost:6379'),
registry: registry,
backend: InMemoryResultBackend(),
routing: routing,
);

final worker = Worker(
broker: await RedisStreamsBroker.connect('redis://localhost:6379'),
registry: registry,
backend: InMemoryResultBackend(),
subscription: subscription,
);

return (stem, worker);
}

class EmailTask extends TaskHandler<void> {

String get name => 'email.send';


TaskOptions get options => const TaskOptions(queue: 'default');


Future<void> call(TaskContext context, Map<String, Object?> args) async {}
}

For lightweight services or tests, you can construct the registry inline:

final inlineRegistry = RoutingRegistry(
RoutingConfig(
defaultQueue: const DefaultQueueConfig(alias: 'default', queue: 'primary'),
queues: {'primary': QueueDefinition(name: 'primary')},
routes: [
RouteDefinition(
match: RouteMatch.fromJson(const {'task': 'reports.*'}),
target: RouteTarget(type: 'queue', name: 'primary'),
),
],
),
);

Both approaches keep routing logic declarative while letting you evolve queue topology without editing code.