Skip to main content

Queue Events

Stem supports queue-scoped custom events similar to BullMQ QueueEvents and "custom events" patterns.

Use this when you need lightweight event streams for domain notifications (order.created, invoice.settled) without creating task handlers.

API Surface

  • QueueEventsProducer.emit(queue, eventName, payload, headers, meta)
  • QueueEventsProducer.emitValue(queue, eventName, value, codec, headers, meta)
  • QueueEventsProducer.emitJson(queue, eventName, dto, headers, meta)
  • QueueEventsProducer.emitVersionedJson(queue, eventName, dto, version, headers, meta)
  • QueueEvents.start() / QueueEvents.close()
  • QueueEvents.events stream (all events for that queue)
  • QueueEvents.on(eventName) stream (filtered by name)

All events are delivered as QueueCustomEvent, which implements StemEvent. Use event.payloadValue(...) / event.requiredPayloadValue(...) to read typed payload fields instead of repeating raw payload['key'] casts. If one queue event maps to one DTO, use event.payloadJson(...), event.payloadVersionedJson(...), or event.payloadAs(codec: ...) to decode the whole payload in one step. If the whole queue-event metadata map is one DTO, use event.metaJson(...), event.metaVersionedJson(...), or event.metaAs(codec: ...) instead of manual event.meta[...] casts.

Producer + Listener

lib/queue_events.dart
Future<void> queueEventsProducerListener(Broker broker) async {
final producer = QueueEventsProducer(broker: broker);
final listener = QueueEvents(
broker: broker,
queue: 'orders',
consumerName: 'orders-events',
);
await listener.start();

final subscription = listener.on('order.created').listen((event) {
final created = event.payloadJson<_OrderCreatedEvent>(
decode: _OrderCreatedEvent.fromJson,
);
print('Order created: ${created.orderId}');
print('Trace id: ${event.headers['x-trace-id']}');
});

await producer.emitJson(
'orders',
'order.created',
const _OrderCreatedEvent(orderId: 'ord-1001'),
headers: const {'x-trace-id': 'trace-123'},
meta: const {'tenant': 'acme'},
);

await Future<void>.delayed(const Duration(milliseconds: 200));
await subscription.cancel();
await listener.close();
}

Fan-out to Multiple Listeners

Multiple listeners on the same queue receive each emitted event.

lib/queue_events.dart
Future<void> queueEventsFanout(Broker broker) async {
final producer = QueueEventsProducer(broker: broker);
final listenerA = QueueEvents(
broker: broker,
queue: 'orders',
consumerName: 'orders-a',
);
final listenerB = QueueEvents(
broker: broker,
queue: 'orders',
consumerName: 'orders-b',
);
await listenerA.start();
await listenerB.start();

final subscriptionA = listenerA.events.listen((event) {
final updated = event.payloadJson<_OrderUpdatedEvent>(
decode: _OrderUpdatedEvent.fromJson,
);
print('A saw ${event.name} for ${updated.id}');
});
final subscriptionB = listenerB.events.listen((event) {
final updated = event.payloadJson<_OrderUpdatedEvent>(
decode: _OrderUpdatedEvent.fromJson,
);
print('B saw ${event.name} for ${updated.id}');
});

await producer.emitJson(
'orders',
'order.updated',
const _OrderUpdatedEvent(id: 'o-1'),
);

await Future<void>.delayed(const Duration(milliseconds: 200));
await subscriptionA.cancel();
await subscriptionB.cancel();
await listenerA.close();
await listenerB.close();
}

Semantics

  • Events are queue-scoped: listeners receive only events for their configured queue.
  • emitValue(...) is the codec-backed path when the payload should be authored as a typed object but still use a custom map encoder or explicit PayloadCodec<T>.
  • emitJson(...) is the DTO convenience path when the payload already exposes toJson().
  • emitVersionedJson(...) is the same convenience path when the payload schema should persist an explicit __stemPayloadVersion.
  • on(eventName) matches exact event names.
  • headers and meta round-trip to listeners.
  • Event names and queue names must be non-empty.
  • Delivery follows the underlying broker's broadcast behavior for active listeners (no historical replay API is built into QueueEvents).

When to Use Queue Events vs Signals

  • Use Signals for runtime lifecycle hooks (task/worker/scheduler/control).
  • Use Queue Events for application-domain events you publish and consume.