Producer API
Enqueue tasks from your Dart services using Stem.enqueue. Start with the
in-memory broker, then opt into Redis/Postgres as needed.
Enqueue tasks
- In-memory (bin/producer.dart)
- Redis + Result Backend (bin/producer_redis.dart)
- Signed Payloads (bin/producer_signed.dart)
Future<void> enqueueInMemory() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<void>(
name: 'hello.print',
entrypoint: (context, args) async {
final name = args['name'] as String? ?? 'friend';
print('Hello $name');
return null;
},
),
],
);
final taskId = await app.stem.enqueue(
'hello.print',
args: {'name': 'Stem'},
);
print('Enqueued $taskId');
await app.close();
}
Future<void> enqueueWithRedis() async {
final brokerUrl =
Platform.environment['STEM_BROKER_URL'] ?? 'redis://localhost:6379';
final broker = await RedisStreamsBroker.connect(brokerUrl);
final backend = await RedisResultBackend.connect('$brokerUrl/1');
final registry = SimpleTaskRegistry()
..register(
FunctionTaskHandler<void>(
name: 'reports.generate',
entrypoint: (context, args) async {
final id = args['reportId'] as String? ?? 'unknown';
print('Queued report $id');
return null;
},
),
);
final stem = Stem(
broker: broker,
registry: registry,
backend: backend,
);
await stem.enqueue(
'reports.generate',
args: {'reportId': 'monthly-2025-10'},
options: const TaskOptions(queue: 'reports', maxRetries: 3),
meta: {'requestedBy': 'finance'},
);
await backend.close();
await broker.close();
}
Future<void> enqueueWithSigning() async {
final config = StemConfig.fromEnvironment();
final broker = await RedisStreamsBroker.connect(
config.brokerUrl,
tls: config.tls,
);
final backend = InMemoryResultBackend();
final registry = SimpleTaskRegistry()
..register(
FunctionTaskHandler<void>(
name: 'billing.charge',
entrypoint: (context, args) async {
final customerId = args['customerId'] as String? ?? 'unknown';
print('Queued charge for $customerId');
return null;
},
),
);
final stem = Stem(
broker: broker,
registry: registry,
backend: backend,
signer: PayloadSigner.maybe(config.signing),
);
await stem.enqueue(
'billing.charge',
args: {'customerId': 'cust_123', 'amount': 4200},
notBefore: DateTime.now().add(const Duration(minutes: 5)),
);
await backend.close();
await broker.close();
}
Typed Enqueue Helpers
When you need compile-time guarantees for task arguments and result types, wrap
your handler in a TaskDefinition. The definition knows how to encode args and
decode results, and exposes a fluent builder for overrides (headers, meta,
options, scheduling):
class ReportPayload {
const ReportPayload({required this.reportId});
final String reportId;
}
class GenerateReportTask extends TaskHandler<String> {
static final definition = TaskDefinition<ReportPayload, String>(
name: 'reports.generate',
encodeArgs: (payload) => {'reportId': payload.reportId},
metadata: const TaskMetadata(description: 'Generate PDF reports'),
);
String get name => definition.name;
TaskOptions get options => const TaskOptions(queue: 'reports');
Future<String> call(TaskContext context, Map<String, Object?> args) async {
final id = args['reportId'] as String;
return await generateReport(id);
}
}
Future<void> enqueueTyped() async {
final app = await StemApp.inMemory(tasks: [GenerateReportTask()]);
await app.start();
final call = GenerateReportTask.definition.call(
const ReportPayload(reportId: 'monthly-2025-10'),
options: const TaskOptions(priority: 5),
headers: const {'x-requested-by': 'analytics'},
);
final taskId = await app.stem.enqueueCall(call);
final result = await app.stem.waitForTask<String>(taskId);
print(result?.value);
await app.close();
}
Typed helpers are also available on Canvas (definition.toSignature) so
group/chain/chord APIs produce strongly typed TaskResult<T> streams.
Need to tweak headers/meta/queue at call sites? Wrap the definition in a
TaskEnqueueBuilder and invoke await builder.enqueueWith(stem);.
Enqueue options
Use TaskEnqueueOptions to override scheduling, routing, retry behavior, and
callbacks for a single publish. Common fields include countdown, eta,
expires, queue, exchange, routingKey, priority, serializer,
compression, ignoreResult, taskId, retry, retryPolicy, link, and
linkError.
Adapter support varies; for example, not every broker honors priorities or delayed delivery. Stem falls back to best-effort behavior when a capability is unsupported.
Example:
await stem.enqueue(
'tasks.email',
args: {'to': 'ops@example.com'},
enqueueOptions: TaskEnqueueOptions(
countdown: const Duration(seconds: 30),
queue: 'critical',
retry: true,
retryPolicy: TaskRetryPolicy(
backoff: true,
defaultDelay: const Duration(seconds: 2),
maxRetries: 5,
),
),
);
Tips
- Reuse a single
Steminstance; create it during application bootstrap. - Capture the returned task id when you need to poll status from the result backend.
- Use
TaskOptionsto set queue, retries, priority, isolation, and visibility timeouts. metais stored with result backend entries—great for audit trails.headerstravel with the envelope and can carry tracing information.- To schedule tasks in the future, set
notBefore. - For signing configuration, see Payload Signing.
Configuring Payload Encoders
Every Stem, StemApp, StemWorkflowApp, and Canvas now accepts a
TaskPayloadEncoderRegistry or explicit argsEncoder/resultEncoder values.
Encoders run exactly once in each direction: producers encode arguments, workers
decode them before invoking handlers, and handler return values are encoded
before hitting the result backend. Example:
class AesPayloadEncoder extends TaskPayloadEncoder {
const AesPayloadEncoder();
Object? encode(Object? value) => encrypt(value);
Object? decode(Object? stored) => decrypt(stored);
}
Future<void> configureProducerEncoders() async {
final app = await StemApp.inMemory(
tasks: const [],
argsEncoder: const AesPayloadEncoder(),
resultEncoder: const JsonTaskPayloadEncoder(),
additionalEncoders: const [CustomBinaryEncoder()],
);
await app.close();
}
Handlers needing different encoders can override TaskMetadata.argsEncoder and
TaskMetadata.resultEncoder. The worker automatically stamps every task status
with the encoder id (__stemResultEncoder), so downstream consumers and
adapters always know how to decode stored payloads.
Continue with the Worker guide to consume the tasks you enqueue.