Skip to main content

Producer API

Enqueue tasks from your Dart services using Stem.enqueue. Start with the in-memory broker, then opt into Redis/Postgres as needed.

Enqueue tasks

Future<void> enqueueInMemory() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<void>(
name: 'hello.print',
entrypoint: (context, args) async {
final name = args['name'] as String? ?? 'friend';
print('Hello $name');
return null;
},
),
],
);

final taskId = await app.stem.enqueue(
'hello.print',
args: {'name': 'Stem'},
);

print('Enqueued $taskId');
await app.close();
}

Typed Enqueue Helpers

When you need compile-time guarantees for task arguments and result types, wrap your handler in a TaskDefinition. The definition knows how to encode args and decode results, and exposes a fluent builder for overrides (headers, meta, options, scheduling):

bin/producer_typed.dart
class ReportPayload {
const ReportPayload({required this.reportId});
final String reportId;
}

class GenerateReportTask extends TaskHandler<String> {
static final definition = TaskDefinition<ReportPayload, String>(
name: 'reports.generate',
encodeArgs: (payload) => {'reportId': payload.reportId},
metadata: const TaskMetadata(description: 'Generate PDF reports'),
);


String get name => definition.name;


TaskOptions get options => const TaskOptions(queue: 'reports');


Future<String> call(TaskContext context, Map<String, Object?> args) async {
final id = args['reportId'] as String;
return await generateReport(id);
}
}

Future<void> enqueueTyped() async {
final app = await StemApp.inMemory(tasks: [GenerateReportTask()]);
await app.start();

final call = GenerateReportTask.definition.call(
const ReportPayload(reportId: 'monthly-2025-10'),
options: const TaskOptions(priority: 5),
headers: const {'x-requested-by': 'analytics'},
);

final taskId = await app.stem.enqueueCall(call);
final result = await app.stem.waitForTask<String>(taskId);
print(result?.value);
await app.close();
}

Typed helpers are also available on Canvas (definition.toSignature) so group/chain/chord APIs produce strongly typed TaskResult<T> streams. Need to tweak headers/meta/queue at call sites? Wrap the definition in a TaskEnqueueBuilder and invoke await builder.enqueueWith(stem);.

Enqueue options

Use TaskEnqueueOptions to override scheduling, routing, retry behavior, and callbacks for a single publish. Common fields include countdown, eta, expires, queue, exchange, routingKey, priority, serializer, compression, ignoreResult, taskId, retry, retryPolicy, link, and linkError.

Adapter support varies; for example, not every broker honors priorities or delayed delivery. Stem falls back to best-effort behavior when a capability is unsupported.

Example:

await stem.enqueue(
'tasks.email',
args: {'to': 'ops@example.com'},
enqueueOptions: TaskEnqueueOptions(
countdown: const Duration(seconds: 30),
queue: 'critical',
retry: true,
retryPolicy: TaskRetryPolicy(
backoff: true,
defaultDelay: const Duration(seconds: 2),
maxRetries: 5,
),
),
);

Tips

  • Reuse a single Stem instance; create it during application bootstrap.
  • Capture the returned task id when you need to poll status from the result backend.
  • Use TaskOptions to set queue, retries, priority, isolation, and visibility timeouts.
  • meta is stored with result backend entries—great for audit trails.
  • headers travel with the envelope and can carry tracing information.
  • To schedule tasks in the future, set notBefore.
  • For signing configuration, see Payload Signing.

Configuring Payload Encoders

Every Stem, StemApp, StemWorkflowApp, and Canvas now accepts a TaskPayloadEncoderRegistry or explicit argsEncoder/resultEncoder values. Encoders run exactly once in each direction: producers encode arguments, workers decode them before invoking handlers, and handler return values are encoded before hitting the result backend. Example:

lib/bootstrap_typed_encoders.dart
class AesPayloadEncoder extends TaskPayloadEncoder {
const AesPayloadEncoder();

Object? encode(Object? value) => encrypt(value);

Object? decode(Object? stored) => decrypt(stored);
}

Future<void> configureProducerEncoders() async {
final app = await StemApp.inMemory(
tasks: const [],
argsEncoder: const AesPayloadEncoder(),
resultEncoder: const JsonTaskPayloadEncoder(),
additionalEncoders: const [CustomBinaryEncoder()],
);

await app.close();
}

Handlers needing different encoders can override TaskMetadata.argsEncoder and TaskMetadata.resultEncoder. The worker automatically stamps every task status with the encoder id (__stemResultEncoder), so downstream consumers and adapters always know how to decode stored payloads.

Continue with the Worker guide to consume the tasks you enqueue.