Skip to main content

Producer API

Enqueue tasks from your Dart services through a TaskEnqueuer surface such as StemClient, StemApp, or StemWorkflowApp. Start with the in-memory broker, then opt into Redis/Postgres as needed.

For adapter-backed deployments, prefer StemClient.fromUrl(...) or StemStack.fromUrl(...).createClient(...). Keep StemClient.create(...) for the rarer case where you must provide custom factories directly.

Raw Enqueue

Future<void> enqueueInMemory() async {
final client = await StemClient.inMemory(
tasks: [
FunctionTaskHandler<void>(
name: 'hello.print',
entrypoint: (context, args) async {
final name = args['name'] as String? ?? 'friend';
print('Hello $name');
return null;
},
),
],
);
final app = await client.createApp();

final taskId = await app.enqueue(
'hello.print',
args: {'name': 'Stem'},
);
await app.waitForTask<void>(taskId);

print('Enqueued $taskId');
await app.close();
await client.close();
}

Typed Enqueue Helpers

For the common producer path, prefer TaskDefinition<TArgs, TResult>. The definition owns argument encoding, result decoding, and default publish metadata, while exposing direct helpers and a fluent builder for overrides (headers, meta, options, scheduling):

bin/producer_typed.dart
class ReportPayload {
const ReportPayload({required this.reportId});
final String reportId;
}

class GenerateReportTask extends TaskHandler<String> {
static final definition = TaskDefinition<ReportPayload, String>(
name: 'reports.generate',
encodeArgs: (payload) => {'reportId': payload.reportId},
metadata: const TaskMetadata(description: 'Generate PDF reports'),
);


String get name => definition.name;


TaskOptions get options => const TaskOptions(queue: 'reports');


Future<String> call(TaskContext context, Map<String, Object?> args) async {
final id = args['reportId'] as String?;
return generateReport(id!);
}
}

Future<void> enqueueTyped() async {
final client = await StemClient.inMemory(tasks: [GenerateReportTask()]);
final app = await client.createApp();

final result = await GenerateReportTask.definition.enqueueAndWait(
app,
const ReportPayload(reportId: 'monthly-2025-10'),
options: const TaskOptions(priority: 5),
headers: const {'x-requested-by': 'analytics'},
);
print(result?.value);
await app.close();
await client.close();
}

Typed helpers are also available on Canvas (definition.toSignature) so group/chain/chord APIs produce strongly typed TaskResult<T> streams. Need to tweak headers/meta/queue at call sites? Start from definition.buildCall(args, ...) when you need the explicit advanced transport path.

Raw task-name strings still work, but they are the lower-level interop path. Reach for them when the task name is truly dynamic or you are crossing a boundary that does not have the generated/manual TaskDefinition. When those calls already have typed DTO args, prefer enqueuer.enqueueValue(name, dto, codec: ...) over hand-building an args map. If you later inspect the raw Envelope, prefer envelope.argsJson(...), envelope.argsVersionedJson(...), envelope.metaJson(...), or envelope.metaVersionedJson(...) over manual map casts.

Enqueue options

Use TaskEnqueueOptions to override scheduling, routing, retry behavior, and callbacks for a single publish. Common fields include countdown, eta, expires, queue, exchange, routingKey, priority, serializer, compression, ignoreResult, taskId, retry, retryPolicy, link, and linkError.

Adapter support varies; for example, not every broker honors priorities or delayed delivery. Stem falls back to best-effort behavior when a capability is unsupported.

Example:

await enqueuer.enqueue(
'tasks.email',
args: {'to': 'ops@example.com'},
enqueueOptions: TaskEnqueueOptions(
countdown: const Duration(seconds: 30),
queue: 'critical',
retry: true,
retryPolicy: TaskRetryPolicy(
backoff: true,
defaultDelay: const Duration(seconds: 2),
maxRetries: 5,
),
),
);

Tips

  • Reuse a single TaskEnqueuer implementation; in most apps that means StemClient, StemApp, or StemWorkflowApp.
  • Capture the returned task id when you need to poll status from the result backend.
  • Use TaskOptions to set queue, retries, priority, isolation, and visibility timeouts.
  • meta is stored with result backend entries—great for audit trails.
  • headers travel with the envelope and can carry tracing information.
  • To schedule tasks in the future, set notBefore.
  • For signing configuration, see Payload Signing.

Configuring Payload Encoders

Every Stem, StemApp, StemWorkflowApp, and Canvas now accepts a TaskPayloadEncoderRegistry or explicit argsEncoder/resultEncoder values. Encoders run exactly once in each direction: producers encode arguments, workers decode them before invoking handlers, and handler return values are encoded before hitting the result backend. Example:

lib/bootstrap_typed_encoders.dart
class AesPayloadEncoder extends TaskPayloadEncoder {
const AesPayloadEncoder();

Object? encode(Object? value) => encrypt(value);

Object? decode(Object? stored) => decrypt(stored);
}

Future<void> configureProducerEncoders() async {
final client = await StemClient.inMemory(
tasks: const [],
argsEncoder: const AesPayloadEncoder(),
resultEncoder: const JsonTaskPayloadEncoder(),
additionalEncoders: const [CustomBinaryEncoder()],
);
final app = await client.createApp();

await app.close();
await client.close();
}

Handlers needing different encoders can override TaskMetadata.argsEncoder and TaskMetadata.resultEncoder. The worker automatically stamps every task status with the encoder id (__stemResultEncoder), so downstream consumers and adapters always know how to decode stored payloads.

Continue with the Worker guide to consume the tasks you enqueue.