Queue Events
Stem supports queue-scoped custom events similar to BullMQ QueueEvents and
"custom events" patterns.
Use this when you need lightweight event streams for domain notifications
(order.created, invoice.settled) without creating task handlers.
API Surface
QueueEventsProducer.emit(queue, eventName, payload, headers, meta)QueueEventsProducer.emitValue(queue, eventName, value, codec, headers, meta)QueueEventsProducer.emitJson(queue, eventName, dto, headers, meta)QueueEventsProducer.emitVersionedJson(queue, eventName, dto, version, headers, meta)QueueEvents.start()/QueueEvents.close()QueueEvents.eventsstream (all events for that queue)QueueEvents.on(eventName)stream (filtered by name)
All events are delivered as QueueCustomEvent, which implements StemEvent.
Use event.payloadValue(...) / event.requiredPayloadValue(...) to read typed
payload fields instead of repeating raw payload['key'] casts.
If one queue event maps to one DTO, use event.payloadJson(...),
event.payloadVersionedJson(...), or event.payloadAs(codec: ...) to decode
the whole payload in one step.
If the whole queue-event metadata map is one DTO, use event.metaJson(...),
event.metaVersionedJson(...), or event.metaAs(codec: ...) instead of
manual event.meta[...] casts.
Producer + Listener
Future<void> queueEventsProducerListener(Broker broker) async {
final producer = QueueEventsProducer(broker: broker);
final listener = QueueEvents(
broker: broker,
queue: 'orders',
consumerName: 'orders-events',
);
await listener.start();
final subscription = listener.on('order.created').listen((event) {
final created = event.payloadJson<_OrderCreatedEvent>(
decode: _OrderCreatedEvent.fromJson,
);
print('Order created: ${created.orderId}');
print('Trace id: ${event.headers['x-trace-id']}');
});
await producer.emitJson(
'orders',
'order.created',
const _OrderCreatedEvent(orderId: 'ord-1001'),
headers: const {'x-trace-id': 'trace-123'},
meta: const {'tenant': 'acme'},
);
await Future<void>.delayed(const Duration(milliseconds: 200));
await subscription.cancel();
await listener.close();
}
Fan-out to Multiple Listeners
Multiple listeners on the same queue receive each emitted event.
Future<void> queueEventsFanout(Broker broker) async {
final producer = QueueEventsProducer(broker: broker);
final listenerA = QueueEvents(
broker: broker,
queue: 'orders',
consumerName: 'orders-a',
);
final listenerB = QueueEvents(
broker: broker,
queue: 'orders',
consumerName: 'orders-b',
);
await listenerA.start();
await listenerB.start();
final subscriptionA = listenerA.events.listen((event) {
final updated = event.payloadJson<_OrderUpdatedEvent>(
decode: _OrderUpdatedEvent.fromJson,
);
print('A saw ${event.name} for ${updated.id}');
});
final subscriptionB = listenerB.events.listen((event) {
final updated = event.payloadJson<_OrderUpdatedEvent>(
decode: _OrderUpdatedEvent.fromJson,
);
print('B saw ${event.name} for ${updated.id}');
});
await producer.emitJson(
'orders',
'order.updated',
const _OrderUpdatedEvent(id: 'o-1'),
);
await Future<void>.delayed(const Duration(milliseconds: 200));
await subscriptionA.cancel();
await subscriptionB.cancel();
await listenerA.close();
await listenerB.close();
}
Semantics
- Events are queue-scoped: listeners receive only events for their configured queue.
emitValue(...)is the codec-backed path when the payload should be authored as a typed object but still use a custom map encoder or explicitPayloadCodec<T>.emitJson(...)is the DTO convenience path when the payload already exposestoJson().emitVersionedJson(...)is the same convenience path when the payload schema should persist an explicit__stemPayloadVersion.on(eventName)matches exact event names.headersandmetaround-trip to listeners.- Event names and queue names must be non-empty.
- Delivery follows the underlying broker's broadcast behavior for active
listeners (no historical replay API is built into
QueueEvents).
When to Use Queue Events vs Signals
- Use Signals for runtime lifecycle hooks (task/worker/scheduler/control).
- Use Queue Events for application-domain events you publish and consume.