Queue Events
Stem supports queue-scoped custom events similar to BullMQ QueueEvents and
"custom events" patterns.
Use this when you need lightweight event streams for domain notifications
(order.created, invoice.settled) without creating task handlers.
API Surface
QueueEventsProducer.emit(queue, eventName, payload, headers, meta)QueueEvents.start()/QueueEvents.close()QueueEvents.eventsstream (all events for that queue)QueueEvents.on(eventName)stream (filtered by name)
All events are delivered as QueueCustomEvent, which implements StemEvent.
Producer + Listener
lib/queue_events.dart
Future<void> queueEventsProducerListener(Broker broker) async {
final producer = QueueEventsProducer(broker: broker);
final listener = QueueEvents(
broker: broker,
queue: 'orders',
consumerName: 'orders-events',
);
await listener.start();
final subscription = listener.on('order.created').listen((event) {
print('Order created: ${event.payload['orderId']}');
print('Trace id: ${event.headers['x-trace-id']}');
});
await producer.emit(
'orders',
'order.created',
payload: const {'orderId': 'ord-1001'},
headers: const {'x-trace-id': 'trace-123'},
meta: const {'tenant': 'acme'},
);
await Future<void>.delayed(const Duration(milliseconds: 200));
await subscription.cancel();
await listener.close();
}
Fan-out to Multiple Listeners
Multiple listeners on the same queue receive each emitted event.
lib/queue_events.dart
Future<void> queueEventsFanout(Broker broker) async {
final producer = QueueEventsProducer(broker: broker);
final listenerA = QueueEvents(
broker: broker,
queue: 'orders',
consumerName: 'orders-a',
);
final listenerB = QueueEvents(
broker: broker,
queue: 'orders',
consumerName: 'orders-b',
);
await listenerA.start();
await listenerB.start();
final subscriptionA = listenerA.events.listen((event) {
print('A saw ${event.name}');
});
final subscriptionB = listenerB.events.listen((event) {
print('B saw ${event.name}');
});
await producer.emit('orders', 'order.updated', payload: const {'id': 'o-1'});
await Future<void>.delayed(const Duration(milliseconds: 200));
await subscriptionA.cancel();
await subscriptionB.cancel();
await listenerA.close();
await listenerB.close();
}
Semantics
- Events are queue-scoped: listeners receive only events for their configured queue.
on(eventName)matches exact event names.headersandmetaround-trip to listeners.- Event names and queue names must be non-empty.
- Delivery follows the underlying broker's broadcast behavior for active
listeners (no historical replay API is built into
QueueEvents).
When to Use Queue Events vs Signals
- Use Signals for runtime lifecycle hooks (task/worker/scheduler/control).
- Use Queue Events for application-domain events you publish and consume.