Producer API
Enqueue tasks from your Dart services through a TaskEnqueuer surface such as
StemClient, StemApp, or StemWorkflowApp. Start with the in-memory broker,
then opt into Redis/Postgres as needed.
For adapter-backed deployments, prefer StemClient.fromUrl(...) or
StemStack.fromUrl(...).createClient(...). Keep StemClient.create(...) for
the rarer case where you must provide custom factories directly.
Raw Enqueue
- In-memory (bin/producer.dart)
- Redis + Result Backend (bin/producer_redis.dart)
- Signed Payloads (bin/producer_signed.dart)
Future<void> enqueueInMemory() async {
final client = await StemClient.inMemory(
tasks: [
FunctionTaskHandler<void>(
name: 'hello.print',
entrypoint: (context, args) async {
final name = args['name'] as String? ?? 'friend';
print('Hello $name');
return null;
},
),
],
);
final app = await client.createApp();
final taskId = await app.enqueue(
'hello.print',
args: {'name': 'Stem'},
);
await app.waitForTask<void>(taskId);
print('Enqueued $taskId');
await app.close();
await client.close();
}
Future<void> enqueueWithRedis() async {
final brokerUrl =
Platform.environment['STEM_BROKER_URL'] ?? 'redis://localhost:6379';
final tasks = [
FunctionTaskHandler<void>(
name: 'reports.generate',
entrypoint: (context, args) async {
final id = args['reportId'] as String? ?? 'unknown';
print('Queued report $id');
return null;
},
),
];
final client = await StemClient.fromUrl(
brokerUrl,
adapters: const [StemRedisAdapter()],
overrides: StemStoreOverrides(backend: '$brokerUrl/1'),
tasks: tasks,
);
await client.enqueue(
'reports.generate',
args: {'reportId': 'monthly-2025-10'},
options: const TaskOptions(queue: 'reports', maxRetries: 3),
meta: {'requestedBy': 'finance'},
);
await client.close();
}
Future<void> enqueueWithSigning() async {
final config = StemConfig.fromEnvironment();
final tasks = [
FunctionTaskHandler<void>(
name: 'billing.charge',
entrypoint: (context, args) async {
final customerId = args['customerId'] as String? ?? 'unknown';
print('Queued charge for $customerId');
return null;
},
),
];
final client = await StemClient.fromUrl(
config.brokerUrl,
adapters: const [StemRedisAdapter()],
overrides: const StemStoreOverrides(backend: 'memory://'),
tasks: tasks,
signer: PayloadSigner.maybe(config.signing),
);
await client.enqueue(
'billing.charge',
args: {'customerId': 'cust_123', 'amount': 4200},
notBefore: DateTime.now().add(const Duration(minutes: 5)),
);
await client.close();
}
Typed Enqueue Helpers
For the common producer path, prefer TaskDefinition<TArgs, TResult>. The
definition owns argument encoding, result decoding, and default publish
metadata, while exposing direct helpers and a fluent builder for overrides
(headers, meta, options, scheduling):
class ReportPayload {
const ReportPayload({required this.reportId});
final String reportId;
}
class GenerateReportTask extends TaskHandler<String> {
static final definition = TaskDefinition<ReportPayload, String>(
name: 'reports.generate',
encodeArgs: (payload) => {'reportId': payload.reportId},
metadata: const TaskMetadata(description: 'Generate PDF reports'),
);
String get name => definition.name;
TaskOptions get options => const TaskOptions(queue: 'reports');
Future<String> call(TaskContext context, Map<String, Object?> args) async {
final id = args['reportId'] as String?;
return generateReport(id!);
}
}
Future<void> enqueueTyped() async {
final client = await StemClient.inMemory(tasks: [GenerateReportTask()]);
final app = await client.createApp();
final result = await GenerateReportTask.definition.enqueueAndWait(
app,
const ReportPayload(reportId: 'monthly-2025-10'),
options: const TaskOptions(priority: 5),
headers: const {'x-requested-by': 'analytics'},
);
print(result?.value);
await app.close();
await client.close();
}
Typed helpers are also available on Canvas (definition.toSignature) so
group/chain/chord APIs produce strongly typed TaskResult<T> streams. Need to
tweak headers/meta/queue at call sites? Start from
definition.buildCall(args, ...) when you need the explicit advanced
transport path.
Raw task-name strings still work, but they are the lower-level interop path.
Reach for them when the task name is truly dynamic or you are crossing a
boundary that does not have the generated/manual TaskDefinition. When those
calls already have typed DTO args, prefer
enqueuer.enqueueValue(name, dto, codec: ...) over hand-building an args
map.
If you later inspect the raw Envelope, prefer envelope.argsJson(...),
envelope.argsVersionedJson(...), envelope.metaJson(...), or
envelope.metaVersionedJson(...) over manual map casts.
Enqueue options
Use TaskEnqueueOptions to override scheduling, routing, retry behavior, and
callbacks for a single publish. Common fields include countdown, eta,
expires, queue, exchange, routingKey, priority, serializer,
compression, ignoreResult, taskId, retry, retryPolicy, link, and
linkError.
Adapter support varies; for example, not every broker honors priorities or delayed delivery. Stem falls back to best-effort behavior when a capability is unsupported.
Example:
await enqueuer.enqueue(
'tasks.email',
args: {'to': 'ops@example.com'},
enqueueOptions: TaskEnqueueOptions(
countdown: const Duration(seconds: 30),
queue: 'critical',
retry: true,
retryPolicy: TaskRetryPolicy(
backoff: true,
defaultDelay: const Duration(seconds: 2),
maxRetries: 5,
),
),
);
Tips
- Reuse a single
TaskEnqueuerimplementation; in most apps that meansStemClient,StemApp, orStemWorkflowApp. - Capture the returned task id when you need to poll status from the result backend.
- Use
TaskOptionsto set queue, retries, priority, isolation, and visibility timeouts. metais stored with result backend entries—great for audit trails.headerstravel with the envelope and can carry tracing information.- To schedule tasks in the future, set
notBefore. - For signing configuration, see Payload Signing.
Configuring Payload Encoders
Every Stem, StemApp, StemWorkflowApp, and Canvas now accepts a
TaskPayloadEncoderRegistry or explicit argsEncoder/resultEncoder values.
Encoders run exactly once in each direction: producers encode arguments, workers
decode them before invoking handlers, and handler return values are encoded
before hitting the result backend. Example:
class AesPayloadEncoder extends TaskPayloadEncoder {
const AesPayloadEncoder();
Object? encode(Object? value) => encrypt(value);
Object? decode(Object? stored) => decrypt(stored);
}
Future<void> configureProducerEncoders() async {
final client = await StemClient.inMemory(
tasks: const [],
argsEncoder: const AesPayloadEncoder(),
resultEncoder: const JsonTaskPayloadEncoder(),
additionalEncoders: const [CustomBinaryEncoder()],
);
final app = await client.createApp();
await app.close();
await client.close();
}
Handlers needing different encoders can override TaskMetadata.argsEncoder and
TaskMetadata.resultEncoder. The worker automatically stamps every task status
with the encoder id (__stemResultEncoder), so downstream consumers and
adapters always know how to decode stored payloads.
Continue with the Worker guide to consume the tasks you enqueue.