Skip to main content

Tasks & Retries

Tasks are the units of work executed by Stem workers. Each task is represented by a handler registered in a TaskRegistry. Handlers expose metadata through TaskOptions, which control routing, retry behavior, timeouts, and isolation.

Registering Handlers

class EmailTask extends TaskHandler<void> {

String get name => 'email.send';


TaskOptions get options => const TaskOptions(maxRetries: 2);


Future<void> call(TaskContext context, Map<String, Object?> args) async {
final to = args['to'] as String? ?? 'anonymous';
print('Emailing $to (attempt ${context.attempt})');
}
}

final registry = SimpleTaskRegistry()..register(EmailTask());

Typed Task Definitions

Stem ships with TaskDefinition<TArgs, TResult> so producers get compile-time checks for required arguments and result types. A definition bundles the task name, argument encoder, optional metadata, and default TaskOptions. Build a call with .call(args) or TaskEnqueueBuilder and hand it to Stem.enqueueCall or Canvas helpers:

class InvoicePayload {
const InvoicePayload({required this.invoiceId});
final String invoiceId;
}

class PublishInvoiceTask extends TaskHandler<void> {
static final definition = TaskDefinition<InvoicePayload, bool>(
name: 'invoice.publish',
encodeArgs: (payload) => {'invoiceId': payload.invoiceId},
metadata: const TaskMetadata(description: 'Publishes invoices downstream'),
defaultOptions: const TaskOptions(queue: 'billing'),
);


String get name => definition.name;


TaskOptions get options => definition.defaultOptions;


Future<void> call(TaskContext context, Map<String, Object?> args) async {
final invoiceId = args['invoiceId'] as String;
await publishInvoice(invoiceId);
}
}

Future<void> runTypedDefinitionExample() async {
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final stem = Stem(
broker: broker,
registry: SimpleTaskRegistry()..register(PublishInvoiceTask()),
backend: backend,
);

final taskId = await stem.enqueueCall(
PublishInvoiceTask.definition(const InvoicePayload(invoiceId: 'inv_42')),
);
final result = await stem.waitForTask<bool>(taskId);
if (result?.isSucceeded == true) {
print('Invoice published');
}
await backend.close();
await broker.close();
}

Typed results flow through TaskResult<TResult> when you call Stem.waitForTask<TResult>, Canvas.group<T>, Canvas.chain<T>, or Canvas.chord<T>. Supplying a custom decode callback on the task signature lets you deserialize complex objects before they reach application code.

Configuring Retries

Workers apply an ExponentialJitterRetryStrategy by default. Each retry is scheduled by publishing a new envelope with an updated notBefore. Control retry cadence by:

  • Setting TaskOptions.maxRetries (initial attempt + maxRetries).
  • Supplying a custom RetryStrategy to the worker.
  • Tuning the broker connection (e.g. Redis blockTime, claimInterval, defaultVisibilityTimeout) so delayed messages are drained quickly.

See the examples/retry_task Compose demo for a runnable setup that prints every retry signal and shows how the strategy interacts with broker timings.

lib/retry_backoff.dart
final RetryStrategy retryStrategy = ExponentialJitterRetryStrategy(
base: const Duration(milliseconds: 200),
max: const Duration(seconds: 2),
);
lib/retry_backoff.dart
  final workerConfig = StemWorkerConfig(retryStrategy: retryStrategy);

Task Context

TaskContext provides metadata and control helpers:

  • context.attempt – current attempt number (0-based).
  • context.heartbeat() – extend the lease to avoid timeouts.
  • context.extendLease(Duration by) – request additional processing time.
  • context.progress(percent, data: {...}) – emit progress signals for UI hooks.

Use the context to build idempotent handlers. Re-enqueue work, cancel jobs, or store audit details in context.meta.

See the example/task_context_mixed demo for a runnable sample that exercises inline + isolate enqueue, TaskRetryPolicy overrides, and enqueue options. The example/task_usage_patterns.dart sample shows in-memory TaskContext and TaskInvocationContext patterns without external dependencies.

Enqueue from a running task

Use TaskContext.enqueue/spawn to schedule follow-up work with the same defaults as Stem.enqueue. For isolate entrypoints, TaskInvocationContext exposes the same API plus the fluent builder.

Future<void> enqueueFromContext(TaskContext context) async {
await context.enqueue(
'tasks.child',
args: {'id': '123'},
enqueueOptions: TaskEnqueueOptions(
countdown: const Duration(seconds: 30),
queue: 'critical',
retry: true,
retryPolicy: const TaskRetryPolicy(
backoff: true,
defaultDelay: Duration(seconds: 2),
maxRetries: 5,
),
),
);

// Alias for enqueue.
await context.spawn('tasks.child', args: {'id': '456'});
}

Inside isolate entrypoints:

Future<void> enqueueWithBuilder(TaskInvocationContext invocation) async {
final call = invocation
.enqueueBuilder(
definition: childDefinition,
args: const ChildArgs('value'),
)
.queue('critical')
.priority(9)
.delay(const Duration(seconds: 5))
.enqueueOptions(
const TaskEnqueueOptions(
retry: true,
retryPolicy: TaskRetryPolicy(
backoff: true,
defaultDelay: Duration(seconds: 1),
),
),
)
.build();

await invocation.enqueueCall(call);
}

Retry from a running task

Handlers can request a retry directly from the context:

await context.retry(countdown: const Duration(seconds: 10));

Retries respect TaskOptions.retryPolicy unless you override it with TaskEnqueueOptions.retryPolicy or context.retry(retryPolicy: ...).

Retry policy overrides

TaskRetryPolicy captures backoff controls and can be applied per handler or per enqueue:

final options = TaskOptions(
maxRetries: 3,
retryPolicy: TaskRetryPolicy(
backoff: true,
defaultDelay: const Duration(seconds: 1),
backoffMax: const Duration(seconds: 30),
),
);

Isolation & Timeouts

Set soft/hard timeouts to guard against runaway tasks:

const emailTimeoutOptions = TaskOptions(
softTimeLimit: Duration(seconds: 15),
hardTimeLimit: Duration(seconds: 30),
acksLate: true,
);
  • Soft timeouts trigger WorkerEventType.timeout so you can log or notify.
  • Hard timeouts raise TimeoutException to force retries or failure.
  • Provide an isolateEntrypoint to run the task in a dedicated isolate when enforcing hard limits or dealing with CPU-intensive code.

Idempotency Checklist

  • Make task inputs explicit (args, headers, meta).
  • Guard external calls with idempotency keys.
  • Store state transitions atomically (e.g. using Postgres or Redis transactions).
  • Set TaskOptions.unique/uniqueFor for naturally unique jobs (see Uniqueness).
  • Use TaskOptions.rateLimit with a worker RateLimiter to throttle hot tasks (see Rate Limiting).

With these practices in place, tasks can be retried safely and composed via chains, groups, and chords (see Canvas Patterns).

Task Payload Encoders

Handlers often need to encrypt, compress, or otherwise transform arguments and results before they leave the process. Stem exposes TaskPayloadEncoder so you can swap out the default JSON pass-through behavior:

Encoders/global.dart
class Base64PayloadEncoder extends TaskPayloadEncoder {
const Base64PayloadEncoder();


Object? encode(Object? value) =>
value is String ? base64Encode(utf8.encode(value)) : value;


Object? decode(Object? stored) =>
stored is String ? utf8.decode(base64Decode(stored)) : stored;
}

Future<void> configureEncoders() async {
final app = await StemApp.inMemory(
tasks: [EmailTask()],
argsEncoder: const Base64PayloadEncoder(),
resultEncoder: const Base64PayloadEncoder(),
additionalEncoders: const [MyOtherEncoder()],
);
await app.close();
}

Workers automatically decode arguments once (stem-args-encoder header / __stemArgsEncoder meta) and encode results once (__stemResultEncoder meta) before writing to the backend. When you need task-specific behavior, set the metadata overrides:

class EncodedTask extends TaskHandler<void> {

String get name => 'encoded.task';


TaskMetadata get metadata => const TaskMetadata(
argsEncoder: Base64PayloadEncoder(),
resultEncoder: Base64PayloadEncoder(),
);


TaskOptions get options => const TaskOptions();


Future<void> call(TaskContext context, Map<String, Object?> args) async {}
}

Because encoders are centrally registered inside the TaskPayloadEncoderRegistry, every producer/worker instance that shares the registry can resolve encoder ids reliably—even across processes or languages.