Best Practices
These guidelines help keep task systems reliable and observable as you scale. They are framework-agnostic and apply directly to Stem.
Task design
- Keep task arguments small and serializable.
- Store large payloads in object storage and pass references instead.
- Make tasks idempotent; assume retries can happen.
- Wrap enqueue + state changes in a transaction or outbox pattern when interacting with databases.
- Prefer deterministic task names and queues for routing clarity.
- Surface structured metadata for tracing and auditing.
- Define a task
- Typed enqueue
lib/best_practices.dart
class IdempotentTask extends TaskHandler<void> {
String get name => 'orders.sync';
TaskOptions get options => const TaskOptions(maxRetries: 3);
TaskMetadata get metadata => const TaskMetadata(idempotent: true);
Future<void> call(TaskContext context, Map<String, Object?> args) async {
final orderId = args['orderId'] as String? ?? 'unknown';
print('Sync order $orderId');
}
}
lib/best_practices.dart
Future<void> enqueueTyped(Stem stem) async {
await stem.enqueue(
'orders.sync',
args: {'orderId': 'order-42'},
meta: {'requestId': 'req-001'},
);
}
Error handling
- Treat transient failures as retryable; use explicit backoff policies.
- Fail fast on validation errors to avoid wasted retries.
- Send poison-pill tasks to a DLQ and fix root causes before replaying.
lib/workers_programmatic.dart
class FlakyTask extends TaskHandler<void> {
String get name => 'demo.flaky';
TaskOptions get options => const TaskOptions(maxRetries: 2);
Future<void> call(TaskContext context, Map<String, Object?> args) async {
if (context.attempt < 2) {
throw StateError('Simulated failure');
}
print('Succeeded on attempt ${context.attempt}');
}
}
Future<void> retryWorker() async {
StemSignals.taskRetry.connect((payload, _) {
print('[retry] next run at: ${payload.nextRetryAt}');
});
final registry = SimpleTaskRegistry()..register(FlakyTask());
final worker = Worker(
broker: InMemoryBroker(),
registry: registry,
backend: InMemoryResultBackend(),
retryStrategy: ExponentialJitterRetryStrategy(
base: const Duration(milliseconds: 200),
max: const Duration(seconds: 1),
),
);
await worker.start();
}
Concurrency & load
- Start with conservative concurrency and scale up with metrics.
- Embrace concurrency by running more worker processes instead of single hot loops.
- Use rate limits for hot handlers or fragile downstreams.
- Avoid long-running inline loops without heartbeats or progress signals.
lib/rate_limiting.dart
// #region rate-limit-task-options
class RateLimitedTask extends TaskHandler<void> {
String get name => 'demo.rateLimited';
TaskOptions get options => const TaskOptions(
rateLimit: '10/s',
maxRetries: 3,
);
Future<void> call(TaskContext context, Map<String, Object?> args) async {
final actor = args['actor'] as String? ?? 'anonymous';
print('Handled rate-limited task for $actor');
}
}
// #endregion rate-limit-task-options
Observability
- Emit lifecycle signals early so you can build dashboards from day one.
- Track queue depth, retry rates, and DLQ volume as leading indicators.
- Correlate task IDs with business logs for easier incident response.
lib/signals.dart
void configureSignals() {
StemSignals.configure(
configuration: const StemSignalConfiguration(
enabled: true,
enabledSignals: {'worker-heartbeat': false},
),
);
}
Operations
- Separate environments with namespaces and credentials.
- Bake health checks into deploy pipelines.
- Automate rotation of signing keys and TLS certificates.
lib/production_checklist.dart
Future<void> configureSigning() async {
final config = StemConfig.fromEnvironment();
Terminology clarity
- Task: a unit of work executed by a worker.
- Queue: the channel tasks are routed through.
- Worker: the process that consumes tasks and executes handlers.
- Backend: the store for task results and group state.