Tasks & Retries
Tasks are the units of work executed by Stem workers. Each task is represented by
a handler registered in a TaskRegistry. Handlers expose metadata through
TaskOptions, which control routing, retry behavior, timeouts, and isolation.
Registering Handlers
- In-memory (tasks/email_task.dart)
- Redis (tasks/email_task.dart)
class EmailTask extends TaskHandler<void> {
String get name => 'email.send';
TaskOptions get options => const TaskOptions(maxRetries: 2);
Future<void> call(TaskContext context, Map<String, Object?> args) async {
final to = args['to'] as String? ?? 'anonymous';
print('Emailing $to (attempt ${context.attempt})');
}
}
final registry = SimpleTaskRegistry()..register(EmailTask());
class RedisEmailTask extends TaskHandler<void> {
String get name => 'email.send';
TaskOptions get options => const TaskOptions(
queue: 'email',
maxRetries: 4,
visibilityTimeout: Duration(seconds: 30),
unique: true,
uniqueFor: Duration(minutes: 5),
);
Future<void> call(TaskContext context, Map<String, Object?> args) async {
await sendEmailRemote(args);
}
}
final redisRegistry = SimpleTaskRegistry()..register(RedisEmailTask());
Typed Task Definitions
Stem ships with TaskDefinition<TArgs, TResult> so producers get compile-time
checks for required arguments and result types. A definition bundles the task
name, argument encoder, optional metadata, and default TaskOptions. Build a
call with .call(args) or TaskEnqueueBuilder and hand it to Stem.enqueueCall
or Canvas helpers:
class InvoicePayload {
const InvoicePayload({required this.invoiceId});
final String invoiceId;
}
class PublishInvoiceTask extends TaskHandler<void> {
static final definition = TaskDefinition<InvoicePayload, bool>(
name: 'invoice.publish',
encodeArgs: (payload) => {'invoiceId': payload.invoiceId},
metadata: const TaskMetadata(description: 'Publishes invoices downstream'),
defaultOptions: const TaskOptions(queue: 'billing'),
);
String get name => definition.name;
TaskOptions get options => definition.defaultOptions;
Future<void> call(TaskContext context, Map<String, Object?> args) async {
final invoiceId = args['invoiceId'] as String;
await publishInvoice(invoiceId);
}
}
Future<void> runTypedDefinitionExample() async {
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final stem = Stem(
broker: broker,
registry: SimpleTaskRegistry()..register(PublishInvoiceTask()),
backend: backend,
);
final taskId = await stem.enqueueCall(
PublishInvoiceTask.definition(const InvoicePayload(invoiceId: 'inv_42')),
);
final result = await stem.waitForTask<bool>(taskId);
if (result?.isSucceeded == true) {
print('Invoice published');
}
await backend.close();
await broker.close();
}
Typed results flow through TaskResult<TResult> when you call
Stem.waitForTask<TResult>, Canvas.group<T>, Canvas.chain<T>, or
Canvas.chord<T>. Supplying a custom decode callback on the task signature
lets you deserialize complex objects before they reach application code.
Configuring Retries
Workers apply an ExponentialJitterRetryStrategy by default. Each retry is
scheduled by publishing a new envelope with an updated notBefore. Control
retry cadence by:
- Setting
TaskOptions.maxRetries(initial attempt +maxRetries). - Supplying a custom
RetryStrategyto the worker. - Tuning the broker connection (e.g. Redis
blockTime,claimInterval,defaultVisibilityTimeout) so delayed messages are drained quickly.
See the examples/retry_task Compose demo for a runnable setup that prints
every retry signal and shows how the strategy interacts with broker timings.
final RetryStrategy retryStrategy = ExponentialJitterRetryStrategy(
base: const Duration(milliseconds: 200),
max: const Duration(seconds: 2),
);
final workerConfig = StemWorkerConfig(retryStrategy: retryStrategy);
Task Context
TaskContext provides metadata and control helpers:
context.attempt– current attempt number (0-based).context.heartbeat()– extend the lease to avoid timeouts.context.extendLease(Duration by)– request additional processing time.context.progress(percent, data: {...})– emit progress signals for UI hooks.
Use the context to build idempotent handlers. Re-enqueue work, cancel jobs, or
store audit details in context.meta.
See the example/task_context_mixed demo for a runnable sample that exercises
inline + isolate enqueue, TaskRetryPolicy overrides, and enqueue options.
The example/task_usage_patterns.dart sample shows in-memory TaskContext and
TaskInvocationContext patterns without external dependencies.
Enqueue from a running task
Use TaskContext.enqueue/spawn to schedule follow-up work with the same
defaults as Stem.enqueue. For isolate entrypoints, TaskInvocationContext
exposes the same API plus the fluent builder.
Future<void> enqueueFromContext(TaskContext context) async {
await context.enqueue(
'tasks.child',
args: {'id': '123'},
enqueueOptions: TaskEnqueueOptions(
countdown: const Duration(seconds: 30),
queue: 'critical',
retry: true,
retryPolicy: const TaskRetryPolicy(
backoff: true,
defaultDelay: Duration(seconds: 2),
maxRetries: 5,
),
),
);
// Alias for enqueue.
await context.spawn('tasks.child', args: {'id': '456'});
}
Inside isolate entrypoints:
Future<void> enqueueWithBuilder(TaskInvocationContext invocation) async {
final call = invocation
.enqueueBuilder(
definition: childDefinition,
args: const ChildArgs('value'),
)
.queue('critical')
.priority(9)
.delay(const Duration(seconds: 5))
.enqueueOptions(
const TaskEnqueueOptions(
retry: true,
retryPolicy: TaskRetryPolicy(
backoff: true,
defaultDelay: Duration(seconds: 1),
),
),
)
.build();
await invocation.enqueueCall(call);
}
Retry from a running task
Handlers can request a retry directly from the context:
await context.retry(countdown: const Duration(seconds: 10));
Retries respect TaskOptions.retryPolicy unless you override it with
TaskEnqueueOptions.retryPolicy or context.retry(retryPolicy: ...).
Retry policy overrides
TaskRetryPolicy captures backoff controls and can be applied per handler or
per enqueue:
final options = TaskOptions(
maxRetries: 3,
retryPolicy: TaskRetryPolicy(
backoff: true,
defaultDelay: const Duration(seconds: 1),
backoffMax: const Duration(seconds: 30),
),
);
Isolation & Timeouts
Set soft/hard timeouts to guard against runaway tasks:
const emailTimeoutOptions = TaskOptions(
softTimeLimit: Duration(seconds: 15),
hardTimeLimit: Duration(seconds: 30),
acksLate: true,
);
- Soft timeouts trigger
WorkerEventType.timeoutso you can log or notify. - Hard timeouts raise
TimeoutExceptionto force retries or failure. - Provide an
isolateEntrypointto run the task in a dedicated isolate when enforcing hard limits or dealing with CPU-intensive code.
Idempotency Checklist
- Make task inputs explicit (
args,headers,meta). - Guard external calls with idempotency keys.
- Store state transitions atomically (e.g. using Postgres or Redis transactions).
- Set
TaskOptions.unique/uniqueForfor naturally unique jobs (see Uniqueness). - Use
TaskOptions.rateLimitwith a workerRateLimiterto throttle hot tasks (see Rate Limiting).
With these practices in place, tasks can be retried safely and composed via chains, groups, and chords (see Canvas Patterns).
Task Payload Encoders
Handlers often need to encrypt, compress, or otherwise transform arguments and
results before they leave the process. Stem exposes TaskPayloadEncoder so you
can swap out the default JSON pass-through behavior:
class Base64PayloadEncoder extends TaskPayloadEncoder {
const Base64PayloadEncoder();
Object? encode(Object? value) =>
value is String ? base64Encode(utf8.encode(value)) : value;
Object? decode(Object? stored) =>
stored is String ? utf8.decode(base64Decode(stored)) : stored;
}
Future<void> configureEncoders() async {
final app = await StemApp.inMemory(
tasks: [EmailTask()],
argsEncoder: const Base64PayloadEncoder(),
resultEncoder: const Base64PayloadEncoder(),
additionalEncoders: const [MyOtherEncoder()],
);
await app.close();
}
Workers automatically decode arguments once (stem-args-encoder header /
__stemArgsEncoder meta) and encode results once (__stemResultEncoder meta)
before writing to the backend. When you need task-specific behavior, set the
metadata overrides:
class EncodedTask extends TaskHandler<void> {
String get name => 'encoded.task';
TaskMetadata get metadata => const TaskMetadata(
argsEncoder: Base64PayloadEncoder(),
resultEncoder: Base64PayloadEncoder(),
);
TaskOptions get options => const TaskOptions();
Future<void> call(TaskContext context, Map<String, Object?> args) async {}
}
Because encoders are centrally registered inside the
TaskPayloadEncoderRegistry, every producer/worker instance that shares the
registry can resolve encoder ids reliably—even across processes or languages.