Skip to main content

Queue Events

Stem supports queue-scoped custom events similar to BullMQ QueueEvents and "custom events" patterns.

Use this when you need lightweight event streams for domain notifications (order.created, invoice.settled) without creating task handlers.

API Surface

  • QueueEventsProducer.emit(queue, eventName, payload, headers, meta)
  • QueueEvents.start() / QueueEvents.close()
  • QueueEvents.events stream (all events for that queue)
  • QueueEvents.on(eventName) stream (filtered by name)

All events are delivered as QueueCustomEvent, which implements StemEvent.

Producer + Listener

lib/queue_events.dart
Future<void> queueEventsProducerListener(Broker broker) async {
final producer = QueueEventsProducer(broker: broker);
final listener = QueueEvents(
broker: broker,
queue: 'orders',
consumerName: 'orders-events',
);
await listener.start();

final subscription = listener.on('order.created').listen((event) {
print('Order created: ${event.payload['orderId']}');
print('Trace id: ${event.headers['x-trace-id']}');
});

await producer.emit(
'orders',
'order.created',
payload: const {'orderId': 'ord-1001'},
headers: const {'x-trace-id': 'trace-123'},
meta: const {'tenant': 'acme'},
);

await Future<void>.delayed(const Duration(milliseconds: 200));
await subscription.cancel();
await listener.close();
}

Fan-out to Multiple Listeners

Multiple listeners on the same queue receive each emitted event.

lib/queue_events.dart
Future<void> queueEventsFanout(Broker broker) async {
final producer = QueueEventsProducer(broker: broker);
final listenerA = QueueEvents(
broker: broker,
queue: 'orders',
consumerName: 'orders-a',
);
final listenerB = QueueEvents(
broker: broker,
queue: 'orders',
consumerName: 'orders-b',
);
await listenerA.start();
await listenerB.start();

final subscriptionA = listenerA.events.listen((event) {
print('A saw ${event.name}');
});
final subscriptionB = listenerB.events.listen((event) {
print('B saw ${event.name}');
});

await producer.emit('orders', 'order.updated', payload: const {'id': 'o-1'});

await Future<void>.delayed(const Duration(milliseconds: 200));
await subscriptionA.cancel();
await subscriptionB.cancel();
await listenerA.close();
await listenerB.close();
}

Semantics

  • Events are queue-scoped: listeners receive only events for their configured queue.
  • on(eventName) matches exact event names.
  • headers and meta round-trip to listeners.
  • Event names and queue names must be non-empty.
  • Delivery follows the underlying broker's broadcast behavior for active listeners (no historical replay API is built into QueueEvents).

When to Use Queue Events vs Signals

  • Use Signals for runtime lifecycle hooks (task/worker/scheduler/control).
  • Use Queue Events for application-domain events you publish and consume.