# Stem Documentation > Spec-driven background jobs for Dart This file contains all documentation content in a single document following the llmstxt.org standard. ## Broker Caveats This page highlights broker-specific constraints that affect routing, priorities, and control-plane behavior. These caveats are based on the adapter implementations. ## In-memory broker - **No priority buckets**: `supportsPriority` is false, so priorities are not enforced. - **Single-queue consumption**: only one queue can be consumed per subscription. - **Not durable**: data is lost when the process exits. ## SQLite broker - **Broadcast scope is in-process**: fan-out works for subscribers running in the same process, but cross-process worker control broadcasts are not supported. - **Single-queue consumption**: only one queue can be consumed per subscription. - **Polling-based delivery**: tasks are polled on `pollInterval` and claimed via row locks; latency depends on the poll interval. - **Single-writer constraint**: SQLite allows one writer at a time. Use separate broker/backend files and avoid producer writes to the backend. - **Native assets**: build CLI bundles (`dart build cli`) when using `sqlite3` to ensure the native library is packaged reliably. - **Local disk only**: avoid network filesystems for WAL-backed SQLite files. ## Redis Streams broker - **Single-queue consumption**: only one queue can be consumed per subscription. - **Priority uses per-queue streams**: each priority bucket maps to a dedicated stream key. - **Delayed delivery**: delayed tasks are stored in a sorted set and re-enqueued when due. - **Broadcast channels**: broadcasts are stored in per-channel streams and consumed via dedicated consumer groups. - **Visibility timeouts**: the broker reclaims idle deliveries via `XAUTOCLAIM`. Extending a lease requeues the task into the delayed set (it does not update the original stream entry). - **Key eviction risk**: Redis eviction policies can drop stream, delayed, or dead-letter keys. Use a maxmemory policy that avoids evicting Stem keys, or isolate Stem data in a dedicated Redis instance. ## Postgres broker - **Single-queue consumption**: only one queue can be consumed per subscription. - **Polling-based delivery**: workers poll for due jobs on an interval. - **Visibility timeouts**: tasks are locked with a `locked_until` lease; if a worker dies or stops heartbeating, jobs become visible again after the lease expires. - **Dead letter retention**: dead letters are retained for a default window (7 days) unless configured otherwise. - **Broadcast channels**: broadcasts are stored in a separate table and read alongside queue deliveries. ## Result backend caveat (ordering) - **Group result ordering**: group/chord results are stored as maps (Redis hashes / Postgres tables) and returned without ordering guarantees. If you need stable ordering, sort results by task id or track ordering in group metadata. ## Shutdown semantics (broker impact) - **Soft shutdowns are cooperative**: brokers only see acknowledgements (or requeues). If a worker stops without acking a delivery, the task becomes visible again after the visibility lease expires (Redis reclaim interval / Postgres `locked_until`). - **Long-running tasks** should emit heartbeats or extend leases so the broker does not re-deliver them mid-execution. ## Tips - Use routing subscriptions to pin workers to a single queue when using Redis or Postgres. - Prefer Redis when you need low-latency delivery and high throughput. - Prefer Postgres when you need SQL visibility and a single durable store. ## Example entrypoints ```dart title="brokers.dart" file=/../packages/stem/example/docs_snippets/lib/brokers.dart#brokers-in-memory ``` ```dart title="brokers.dart" file=/../packages/stem/example/docs_snippets/lib/brokers.dart#brokers-redis ``` ```dart title="brokers.dart" file=/../packages/stem/example/docs_snippets/lib/brokers.dart#brokers-postgres ``` ```dart title="brokers.dart" file=/../packages/stem/example/docs_snippets/lib/brokers.dart#brokers-sqlite ``` --- ## Broker Comparison Stem ships with multiple broker adapters and planned integrations. This page explains what a broker does, how it differs from result storage, and how to choose the right transport for your deployment. ## What is a broker? A **broker** is the message transport between producers and workers. When you call `Stem.enqueue`, the broker persists the task envelope and makes it available to one or more workers. The broker is not where results live; it only handles delivery. In Stem: - **Producer → Broker**: publishes a task envelope. - **Worker ← Broker**: consumes and acknowledges deliveries. - **Result backend**: stores the task result/state (separate component). ## Broker vs result backend Use the broker for **delivery**, and a result backend for **history**: - **Broker**: queues, leases, priority, delayed delivery, broadcast channels. - **Result backend**: task status, result payloads, heartbeats, group/chord metadata. You can mix and match (e.g. Redis broker + Postgres backend) depending on durability and performance needs. ## What a Stem broker must provide Every broker adapter implements the same `Broker` contract and should support: - **At-least-once delivery** with acknowledgements. - **Visibility/leases** so workers can extend or retry tasks safely. - **Priority buckets** so higher-priority tasks are dispatched first. - **Delayed tasks** via `notBefore` and retry scheduling. - **Broadcast channels** (used for worker control commands). If the broker does not support a feature, it should document the limitation. Planned adapters may not support full control-plane tooling until release. ## Broker feature matrix | Broker | Status | Delivery | Delays | Priority | Broadcast/Control | Notes | | ----------------- | ------------ | -------- | ------ | -------- | ----------------- | ----- | | Redis Streams | ✅ Supported | At-least-once | ✅ | ✅ | ✅ | Lowest latency, great default. | | Postgres | ✅ Supported | At-least-once | ✅ | ✅ | ✅ | Durable, SQL-friendly; higher latency than Redis. | | SQLite | ✅ Supported | At-least-once | ✅ | ✅ | ⚠️ | Single-host file broker; broadcast fan-out is in-process only. | | In-memory | ✅ Supported | At-least-once | ✅ | ✅ | ✅ | Single-process only; testing/dev. | | RabbitMQ | 🔜 Planned | AMQP acks | ✅ | ✅ | ✅ | Mature routing; requires AMQP infra. | | Amazon SQS | 🔜 Planned | Visibility timeout | ✅ | Limited | ⚠️ | Fully managed; no native fanout. | ## Broker summaries ### Redis Streams Best default for most deployments: low latency, strong support for delayed tasks and leases, and mature operations tooling. Tune persistence (AOF or RDB) based on durability needs. ### Postgres A good fit when you prefer a single database dependency or want SQL visibility into queue state. Slightly higher latency than Redis; use a connection pool aligned with worker concurrency. ### SQLite Best for single-host development and demos. The SQLite broker uses polling- based delivery and supports broadcast fan-out only for subscribers in the same process. Use separate SQLite files for broker vs. backend to avoid WAL contention. See the [SQLite adapter guide](./sqlite.md) for setup and operational notes. ### In-memory Perfect for tests and local demos. Not durable and only works inside a single process. ### RabbitMQ (planned) Ideal for teams already invested in AMQP routing and tooling. Stem will map its queue/broadcast model onto topic and fanout exchanges. ### Amazon SQS (planned) Great for managed AWS deployments and automatic scaling. Expect higher latency and limited fanout without SNS. ### Adapter Guidance - **Redis Streams** is the default. Enable persistence (AOF) and replicate to a hot standby for fault tolerance. Configure namespaces per environment with ACLs. The `examples/redis_postgres_worker` sample pairs Redis with Postgres for result storage. - **Postgres** integrates tightly with the existing result backend for teams already running Postgres. Leases are implemented via advisory locks; ensure the connection pool matches expected concurrency. - **SQLite** is ideal for single-host development and demos. Use separate DB files for broker and backend; avoid producer writes to the backend. - **In-memory** adapters mirror the Redis API and are safe for smoke tests. - **RabbitMQ & SQS** bindings follow the same `Broker` contract. Keep tasks idempotent—visibility/lease guarantees differ slightly (documented in the adapter guides once released). ### Selecting a Broker 1. Start with Redis Streams unless your platform mandates a specific broker. 2. Consider Postgres when you need transactional enqueue/dequeue or prefer a single database dependency with built-in durability. 3. Use in-memory during development to simplify onboarding. 4. If you already operate RabbitMQ/SQS at scale, wire the respective adapter and monitor the lease/ack semantics described in the spec. ## Broker configuration quick start Set the broker URL: ```bash export STEM_BROKER_URL=redis://localhost:6379 ``` Then bootstrap (pass a namespace if you want logical separation per env): ```dart final broker = await RedisStreamsBroker.connect( Platform.environment['STEM_BROKER_URL']!, namespace: 'stem', ); ``` Or use the broker snippet entrypoints: ```dart title="brokers.dart" file=/../packages/stem/example/docs_snippets/lib/brokers.dart#brokers-redis ``` ```dart title="brokers.dart" file=/../packages/stem/example/docs_snippets/lib/brokers.dart#brokers-postgres ``` ```dart title="brokers.dart" file=/../packages/stem/example/docs_snippets/lib/brokers.dart#brokers-sqlite ``` ```dart title="brokers.dart" file=/../packages/stem/example/docs_snippets/lib/brokers.dart#brokers-in-memory ``` For result storage, see the [Persistence](../core-concepts/persistence.md) guide. Need adapter limitations and behavior details? See [Broker Caveats](./caveats.md). --- ## SQLite Adapter Stem ships a SQLite adapter in `stem_sqlite` that implements broker, result backend, and revoke store contracts. It is designed for local development, demo environments, and single-node deployments that want a zero-infra dependency. ## When to use SQLite Use SQLite when you: - Need a **single-process** or **single-host** deployment. - Want a **zero-infrastructure** dev/test broker + backend. - Prefer a local file-backed queue for demos or smoke tests. Avoid SQLite when you need multi-host scaling, cross-process broadcast control, or high-throughput workloads. Redis or Postgres are better fits in production. ## Install Add the adapter package: ```yaml dependencies: stem_sqlite: ^0.1.1 ``` ## Quick start (broker) ```dart title="brokers.dart" file=/../packages/stem/example/docs_snippets/lib/brokers.dart#brokers-sqlite ``` ## Quick start (result backend) ```dart title="persistence.dart" file=/../packages/stem/example/docs_snippets/lib/persistence.dart#persistence-backend-sqlite ``` ## Quick start (revoke store) ```dart title="persistence.dart" file=/../packages/stem/example/docs_snippets/lib/persistence.dart#persistence-revoke-store-sqlite ``` ## Configuration knobs SQLite adapters expose the same tuning hooks as other brokers/backends: **Broker options** - `namespace`: logical namespace for queue rows. - `defaultVisibilityTimeout`: lease duration before re-delivery. - `pollInterval`: how often workers poll for due jobs. - `sweeperInterval`: how often to clear expired locks. - `deadLetterRetention`: how long to keep dead letter rows. **Result backend options** - `namespace`: logical namespace for task result rows. - `defaultTtl`: how long task results are retained by default. - `groupDefaultTtl`: TTL for group/chord metadata. - `heartbeatTtl`: TTL for worker heartbeat rows. - `cleanupInterval`: how frequently expired rows are cleaned up. These options are passed to `SqliteBroker.open(...)` and `SqliteResultBackend.open(...)`. Migrations run automatically on first open; keep the database file on local disk and allow the process to create the file if it does not exist. ## Recommended layout (separate DB files) SQLite uses WAL and only allows **one writer at a time**. To avoid lock contention: - **Use separate DB files** for the broker and backend. - **Keep producers off the backend** (let workers be the only writers). - **Do not share a single SQLite file** between broker and backend. A simple layout: ``` ./stem_broker.sqlite # broker only ./stem_backend.sqlite # result backend only ``` The `task_context_mixed` example defaults to separate files and exposes: - `STEM_SQLITE_BROKER_PATH` - `STEM_SQLITE_BACKEND_PATH` ## Running with native assets The `sqlite3` package uses native assets. For stable behavior, build CLI bundles and run the compiled binary: ```bash cd packages/stem/example/task_context_mixed dart build cli -t bin/worker.dart -o build/worker dart build cli -t bin/enqueue.dart -o build/enqueue build/worker/bundle/bin/worker build/enqueue/bundle/bin/enqueue ``` ## Adapter limitations SQLite brokers are intentionally minimal: - **Broadcast fan-out is in-process only** (worker control commands across processes are not supported). - **Single-queue subscriptions only** (one queue per worker subscription). - **Polling-based delivery** (latency depends on `pollInterval`). - **Single-writer constraint** (plan your processes and DB files accordingly). If you need cross-process broadcast control, multi-queue consumption, or multi-host scaling, use Redis or Postgres instead. ## Examples - `packages/stem/example/task_context_mixed` – TaskContext/TaskInvocationContext enqueue patterns on SQLite. - `packages/stem/example/workflows/sqlite_store.dart` – workflow state stored in SQLite. - `packages/stem_sqlite/example/stem_sqlite_example.dart` – adapter smoke test. --- ## Stem vs BullMQ This page is the canonical Stem comparison matrix for BullMQ-style features. It focuses on capability parity, not API-level compatibility. **As of:** February 24, 2026 ## Status semantics | Status | Meaning | | --- | --- | | `✓` | Functionally equivalent built-in capability exists in Stem. | | `~` | Partial or non-isomorphic capability exists, but semantics differ from BullMQ/BullMQ-Pro. | | `✗` | No built-in capability in Stem today. | ## Feature matrix | BullMQ row | Stem | Rationale (with evidence) | | --- | --- | --- | | Backend | `✓` | Stem supports multiple backends/adapters (Redis, Postgres, SQLite, in-memory). See [Broker Overview](../brokers/overview.md) and [Developer Environment](../getting-started/developer-environment.md). | | Observables | `✓` | Stem has built-in metrics, tracing, and lifecycle signals. See [Observability](../core-concepts/observability.md) and [Signals](../core-concepts/signals.md). | | Group Rate Limit | `✓` | Stem supports group-scoped rate limiting via `TaskOptions.groupRateLimit`, `groupRateKey`, and `groupRateKeyHeader`. See [Rate Limiting](../core-concepts/rate-limiting.md). | | Group Support | `✓` | Stem provides `Canvas.group` and `Canvas.chord` primitives. See [Canvas Patterns](../core-concepts/canvas.md). | | Batches Support | `✓` | Stem exposes first-class batch APIs (`submitBatch`, `inspectBatch`) with durable batch lifecycle status. See [Canvas Patterns](../core-concepts/canvas.md). | | Parent/Child Dependencies | `✓` | Stem supports dependency composition through chains, groups/chords, and workflow steps. See [Canvas Patterns](../core-concepts/canvas.md) and [Workflows](../core-concepts/workflows.md). | | Deduplication (Debouncing) | `~` | `TaskOptions.unique` prevents duplicate enqueue claims, but semantics are lock/TTL-based rather than BullMQ-native dedupe APIs. See [Uniqueness](../core-concepts/uniqueness.md). | | Deduplication (Throttling) | `~` | `uniqueFor` and lock TTL windows approximate throttling behavior, but are not a direct BullMQ equivalent. See [Uniqueness](../core-concepts/uniqueness.md). | | Priorities | `✓` | Stem supports task priority and queue priority ranges. See [Tasks](../core-concepts/tasks.md) and [Routing](../core-concepts/routing.md). | | Concurrency | `✓` | Workers support configurable concurrency and isolate pools. See [Workers](../workers/index.md) and [Worker Control](../workers/worker-control.md). | | Delayed jobs | `✓` | Delayed execution is supported via enqueue options and broker scheduling. See [Quick Start](../getting-started/quick-start.md) and [Broker Overview](../brokers/overview.md). | | Global events | `✓` | Stem exposes global lifecycle events through `StemSignals`, plus queue-scoped custom events through `QueueEvents`. See [Signals](../core-concepts/signals.md) and [Queue Events](../core-concepts/queue-events.md). | | Rate Limiter | `✓` | Stem supports per-task rate limits with pluggable limiter backends. See [Rate Limiting](../core-concepts/rate-limiting.md). | | Pause/Resume | `✓` | Stem provides queue pause/resume commands (`stem worker pause`, `stem worker resume`) and persistent pause state when a revoke store is configured. See [Worker Control](../workers/worker-control.md). | | Sandboxed worker | `~` | Stem supports isolate-based execution boundaries, but this is not equivalent to BullMQ's Node child-process sandbox model. See [Worker Control](../workers/worker-control.md). | | Repeatable jobs | `✓` | Stem Beat supports interval, cron, solar, and clocked schedules. See [Scheduler](../scheduler/index.md) and [Beat Scheduler Guide](../scheduler/beat-guide.md). | | Atomic ops | `~` | Stem includes atomic behavior in specific stores/flows, but end-to-end transactional guarantees (for all enqueue/ack/result paths) are not universally built-in. See [Tasks idempotency guidance](../core-concepts/tasks.md#idempotency-checklist) and [Best Practices](../getting-started/best-practices.md). | | Persistence | `✓` | Stem persists task/workflow/schedule state through pluggable backends/stores. See [Persistence & Stores](../core-concepts/persistence.md). | | UI | `~` | Stem includes an experimental dashboard, not a fully mature operator UI parity target yet. See [Dashboard](../core-concepts/dashboard.md). | | Optimized for | `~` | Stem is optimized for jobs/messages plus durable workflow orchestration, not only queue semantics. See [Core Concepts](../core-concepts/index.md) and [Workflows](../core-concepts/workflows.md). | ## Update policy When this matrix changes: 1. Update the **As of** date. 2. Keep row names aligned with BullMQ terminology. 3. Update rationale links so every status remains auditable. ## BullMQ events parity notes Stem supports the two common BullMQ event-listening styles: | BullMQ concept | Stem equivalent | | --- | --- | | `QueueEvents` listeners | `QueueEvents` + `QueueEventsProducer` (queue-scoped custom events) | | Custom queue events | `producer.emit(queue, eventName, payload, headers, meta)` | | Worker-specific event listeners | `StemSignals` convenience APIs with `workerId` filters (`onWorkerReady`, `onWorkerInit`, `onTaskFailure`, `onControlCommandCompleted`, etc.) | --- ## Canvas Patterns This guide walks through Stem's task composition primitives—chains, groups, and chords—using in-memory brokers and backends. Each snippet references a runnable file under `packages/stem/example/docs_snippets/` so you can experiment locally with `dart run`. If you bootstrap with `StemApp`, use `app.canvas` to reuse the same broker, backend, registry, and encoder registry. ## Chains Chains execute tasks serially. Each step receives the previous result via `context.meta['chainPrevResult']`. ```dart file=/../packages/stem/example/docs_snippets/lib/canvas_chain.dart#canvas-chain ``` If any step fails, the chain stops immediately. Retry by invoking `canvas.chain` again with the same signatures. ## Groups Groups fan out work and persist each branch in the result backend. ```dart file=/../packages/stem/example/docs_snippets/lib/canvas_group.dart#canvas-group ``` ## Batches Batches provide a first-class immutable submission API on top of durable group state: - `canvas.submitBatch(signatures)` returns a stable `batchId` and task ids. - `canvas.inspectBatch(batchId)` returns aggregate lifecycle status (`pending`, `running`, `succeeded`, `failed`, `cancelled`, `partial`). ```dart file=/../packages/stem/example/docs_snippets/lib/canvas_batch.dart#canvas-batch ``` ## Chords Chords combine a group with a callback. Once all body tasks succeed, the callback runs with `context.meta['chordResults']` populated. ```dart file=/../packages/stem/example/docs_snippets/lib/canvas_chord.dart#canvas-chord ``` If any branch fails, the callback is skipped and the chord group is marked as failed. Inspect `backend.getGroup(chordId)` to see which branch failed before retrying. ## Dependency semantics - **Chains** model parent → child dependencies: each step is enqueued only after the previous one succeeds. - **Groups** model fan-out dependencies: a group is “complete” once all child tasks finish. The expected count is stored in the backend. - **Chords** combine both: a callback depends on the entire group finishing successfully. ## Child result retrieval - `Canvas.group` returns a `GroupDispatch` with a result stream for each child. - `Canvas.chord` preserves the original signature order when building `chordResults`, so you can map results back to inputs deterministically. - `backend.getGroup(groupId)` returns the latest status for each child task. ## Removal semantics Group and chord metadata live in the result backend. Set backend TTLs or explicitly expire group records to avoid unbounded storage growth. ## Running the examples From the repository root: ```bash cd packages/stem/example/docs_snippets dart run lib/canvas_chain.dart dart run lib/canvas_group.dart dart run lib/canvas_chord.dart ``` Each script bootstraps a `StemApp` in-memory runtime, starts a worker, and then uses `app.canvas` for composition. ## Best practices - Keep callbacks idempotent; chords can be retried manually. - Polling is fine for examples—production deployments should rely on notifications or shorter intervals. - Expire group records via backend TTLs to avoid unbounded storage. --- ## CLI & Control Plane The `stem` CLI complements the programmatic APIs—use it to inspect state, manage workers, and operate schedules and routing. ## Inspect vs control - **Inspect** commands read state (`stem observe ...`, `stem worker stats`) and are safe to run frequently. - **Control** commands mutate worker state (`stem worker revoke`, `stem worker shutdown`) and should be restricted to operators. - Broadcast commands (like `stem worker ping`) fan out to all workers unless you target a specific worker ID. - Use `--namespace` to ensure the CLI targets the same namespace as your workers. ## Remote control primer The worker control commands (`ping`, `stats`, `inspect`, `revoke`, `shutdown`, `pause`, `resume`) publish control messages into the broker. Each command uses a request id and waits for replies on a per-request reply queue. Targeting rules: - **Broadcast** (default): omit `--worker` to broadcast to every worker in the namespace. - **Targeted**: pass one or more `--worker` values to address specific worker ids only. Inspect vs control semantics: - **Inspect** (`ping`, `stats`, `inspect`) returns snapshots and does not mutate worker state. - **Control** (`revoke`, `shutdown`, `pause`, `resume`) persists intent and asks workers to change behavior (terminate tasks, shut down, or pause queue consumption). Payload highlights (as sent by the CLI): - `ping`/`stats`: command `type` with a `targets` list (defaults to `*`). - `inspect`: includes `includeRevoked` and the control `namespace`. - `revoke`: includes `revocations` plus `requester` and `namespace`. - `shutdown`: includes `mode` (`warm`, `soft`, `hard`). Example usage: ```bash # Broadcast to all workers stem worker ping # Target a single worker by id stem worker stats --worker worker-a ``` ```dart title="Observe queues" file=/../packages/stem/example/docs_snippets/lib/cli_control.dart#cli-control-observe-queues ``` ```dart title="Observe workers" file=/../packages/stem/example/docs_snippets/lib/cli_control.dart#cli-control-observe-workers ``` ```dart title="Observe DLQ" file=/../packages/stem/example/docs_snippets/lib/cli_control.dart#cli-control-observe-dlq ``` ```dart title="Observe schedules" file=/../packages/stem/example/docs_snippets/lib/cli_control.dart#cli-control-observe-schedules ``` ```dart title="Ping workers" file=/../packages/stem/example/docs_snippets/lib/cli_control.dart#cli-control-worker-ping ``` ```dart title="Worker stats" file=/../packages/stem/example/docs_snippets/lib/cli_control.dart#cli-control-worker-stats ``` ```dart title="Revoke a task" file=/../packages/stem/example/docs_snippets/lib/cli_control.dart#cli-control-worker-revoke ``` ```dart title="Shutdown workers" file=/../packages/stem/example/docs_snippets/lib/cli_control.dart#cli-control-worker-shutdown ``` ```dart title="Pause queues" file=/../packages/stem/example/docs_snippets/lib/cli_control.dart#cli-control-worker-pause ``` ```dart title="Resume queues" file=/../packages/stem/example/docs_snippets/lib/cli_control.dart#cli-control-worker-resume ``` ```dart title="Apply schedules" file=/../packages/stem/example/docs_snippets/lib/cli_control.dart#cli-control-schedule-apply ``` ```dart title="List schedules" file=/../packages/stem/example/docs_snippets/lib/cli_control.dart#cli-control-schedule-list ``` ```dart title="Dry-run a schedule" file=/../packages/stem/example/docs_snippets/lib/cli_control.dart#cli-control-schedule-dry-run ``` ## Registry resolution Many CLI commands that reference task names need a registry. The default CLI context does not load one automatically, so wire it via `runStemCli` with a `contextBuilder` that sets `CliContext.registry`. For multi-binary deployments, ensure the CLI and workers share the same registry entrypoint so task names, encoders, and routing rules stay consistent. If a command needs a registry and none is available, it will exit with an error or fall back to raw task metadata (depending on the subcommand). ## List registered tasks ```bash stem tasks ls ``` Expected output: ```text NAME DESCRIPTION IDEMPOTENT TAGS email.send Sends an email yes notifications ``` ## Observe queues, workers, and schedules ```bash stem observe queues stem observe workers stem observe dlq stem observe schedules ``` Expected output: ```text Queue | Pending | Inflight Worker | Active | Last Heartbeat Queue | Task ID | Reason Summary: due=... overdue=... ``` Requirements: - `stem observe queues` and `stem observe dlq` need a broker. - `stem observe workers` requires a result backend. - `stem observe schedules` reads from `STEM_SCHEDULE_STORE_URL` or a schedule file. ## Worker control commands ```bash stem worker ping stem worker stats stem worker revoke --task stem worker shutdown --mode warm stem worker pause --queue default stem worker resume --queue default ``` Expected output: ```text ping: ok stats: workers=... queues=... ``` Combine with programmatic signals for richer telemetry (see [Worker Control CLI](../workers/worker-control.md)). ## Scheduler ```bash stem schedule apply \ --file config/schedules.yaml \ --yes stem schedule list stem schedule dry-run --spec "every:5m" ``` Run Beat from a Dart entrypoint wired to your schedule store: ```dart title="lib/scheduler.dart" file=/../packages/stem/example/docs_snippets/lib/scheduler.dart#beat-redis ``` Requirements: - `stem schedule apply/list/dry-run` use `STEM_SCHEDULE_STORE_URL` when set, otherwise they operate on local schedule files. - Beat needs a broker, schedule store, and (for HA) a lock store. Expected output (schedule list): ```text ID | Task | Queue | Spec | Next Run | Last Run | Jitter | Enabled ----------+----------------+----------+------------------+------------------------+------------------------+---------+--------- cleanup | maintenance... | default | every:5m | 2025-01-01T00:05:00Z | 2025-01-01T00:00:00Z | 0ms | yes ``` ## Health checks ```bash stem health \ --broker "$STEM_BROKER_URL" \ --backend "$STEM_RESULT_BACKEND_URL" ``` The command exits non-zero when connectivity, TLS, or signing checks fail—ideal for CI/CD gates. ## Backend requirements by command Use this table to sanity-check which connection strings are required: | Command | Broker | Result backend | Schedule store | Revoke store | Registry | | --- | --- | --- | --- | --- | --- | | `stem tasks ls` | ❌ | ❌ | ❌ | ❌ | ✅ | | `stem observe queues` | ✅ | ❌ | ❌ | ❌ | ❌ | | `stem observe workers` | ❌ | ✅ | ❌ | ❌ | ❌ | | `stem observe dlq` | ✅ | ❌ | ❌ | ❌ | ❌ | | `stem observe schedules` | ❌ | ❌ | ✅ | ❌ | ❌ | | `stem worker ping/stats/inspect` | ✅ | ❌ | ❌ | ❌ | ❌ | | `stem worker status` | optional (follow) | optional (snapshot) | ❌ | ❌ | ❌ | | `stem worker revoke` | ✅ | optional | ❌ | optional | ❌ | | `stem worker shutdown` | ✅ | ❌ | ❌ | ❌ | ❌ | | `stem worker pause/resume` | ✅ | ❌ | ❌ | optional | ❌ | | `stem schedule apply/list/dry-run` | ❌ | ❌ | ✅ | ❌ | ❌ | | `stem health` | ✅ | optional | ❌ | ❌ | ❌ | Notes: - The CLI resolves URLs from `STEM_BROKER_URL`, `STEM_RESULT_BACKEND_URL`, `STEM_SCHEDULE_STORE_URL`, and `STEM_REVOKE_STORE_URL`. - `STEM_REVOKE_STORE_URL` supports `redis://`, `postgres://`, `sqlite:///`, `file:///`, and `memory://` targets. - When a backend is “optional”, the command still runs but will skip that slice of data (for example, worker heartbeats without a result backend). - Schedule commands fall back to local schedule files when no schedule store is configured. Keep the CLI handy when iterating on the code examples in the feature guides. --- ## Dashboard Stem ships an experimental Hotwire + Routed dashboard that surfaces live queue, task, event, and worker data. It connects through the same broker/result-backend contracts as your workers, so Redis, Postgres, and in-memory deployments all work without code changes. ## Quick start (local) 1. Ensure the routed ecosystem checkout lives alongside this repo (the dashboard overrides `routed`, `routed_hotwire`, and related packages to `../routed_ecosystem` relative to `packages/dashboard`). If you do not have the routed workspace available, `dart pub get` in `packages/dashboard` will fail because of the local dependency overrides. 2. Install dependencies: ```bash dart pub get cd dashboard dart pub get ``` 3. Start the server: ```bash dart run bin/dashboard.dart ``` 4. Visit `http://127.0.0.1:3080/`. ## Environment variables The dashboard reuses `StemConfig`, so it accepts the same environment settings as workers and the CLI: | Variable | Purpose | Default | | --- | --- | --- | | `STEM_BROKER_URL` | Broker URL | `redis://127.0.0.1:6379/0` | | `STEM_RESULT_BACKEND_URL` | Result backend URL | broker URL | | `STEM_NAMESPACE` / `STEM_DASHBOARD_NAMESPACE` | Namespace | `stem` | | `STEM_TLS_*` | TLS configuration | unset | Supported schemes include `redis://`, `rediss://`, `postgres://`, `postgresql://`, and `memory://`. ```dart title="lib/dashboard.dart" file=/../packages/stem/example/docs_snippets/lib/dashboard.dart#dashboard-config ``` ```dart title="lib/dashboard.dart" file=/../packages/stem/example/docs_snippets/lib/dashboard.dart#dashboard-overrides ``` ```dart title="lib/dashboard.dart" file=/../packages/stem/example/docs_snippets/lib/dashboard.dart#dashboard-tls ``` ## Deployment & auth guidance The dashboard does not ship with its own auth layer yet. For anything beyond local use, place it behind your standard perimeter controls: - **Reverse proxy + auth**: run behind Nginx/Envoy/Traefik with SSO, basic auth, or IP allowlists. - **Private network**: expose the dashboard only inside a VPN/VPC. - **TLS termination**: terminate HTTPS at the proxy and forward to `http://127.0.0.1:3080`. - **Audit controls**: restrict who can issue control commands from the UI. ## Reverse proxy notes When deploying behind a proxy, ensure the proxy forwards: - `Host` (or set `X-Forwarded-Host`) - `X-Forwarded-Proto` (so URLs and redirects stay correct) - `X-Forwarded-For` (for request logging) If you mount the dashboard at a subpath, configure the proxy to rewrite to the app root and to pass websocket/Turbo stream upgrades. ## What you can do - **Overview**: queue + worker cards and busiest queues table. - **Tasks**: sortable queue listings, filters, row expansion, and ad-hoc enqueue. - **Events**: live stream of queue/worker deltas over Turbo Streams. - **Workers**: heartbeat freshness, isolate counts, queue assignments, and control actions (ping, soft shutdown, hard shutdown). - **Queue recovery**: replay dead letters per queue. ## Required dependencies (local dev) The dashboard depends on local `routed` ecosystem overrides: - `routed`, `routed_hotwire` - `server_testing`, `routed_testing`, `property_testing` - `third_party/dartastic_opentelemetry_sdk` stub (keeps tests passing) Make sure the `../routed_ecosystem` checkout exists relative to `packages/dashboard` before running `dart pub get`. ## Troubleshooting: “No data” If the UI loads but queues/workers are empty: 1. **Broker URL**: confirm `STEM_BROKER_URL` points at the broker you expect. 2. **Namespace**: verify `STEM_NAMESPACE` / `STEM_DASHBOARD_NAMESPACE` matches your worker namespace. 3. **Result backend**: set `STEM_RESULT_BACKEND_URL` explicitly if it differs from the broker (worker heartbeats live there). 4. **Redis permissions**: for Redis, the dashboard scans `stem:stream:*` and reads `stem:worker:heartbeat` keys. ACLs must permit `SCAN`, `XLEN`, `XPENDING`, `ZCARD`, and `GET` on those keys. 5. **TLS**: when using `rediss://`, make sure `STEM_TLS_*` variables are set. 6. **Workers online**: confirm workers are running and emitting heartbeats; if `stem worker stats --json` shows no heartbeats, the dashboard will too. ## Diagnostics Use the CLI to confirm what the dashboard should show: ```bash stem health \ --broker "$STEM_BROKER_URL" \ --backend "$STEM_RESULT_BACKEND_URL" stem worker stats --json stem observe queues ``` ## Notes - The Events page currently synthesizes deltas from polling. Wiring Stem's signal bus will replace this with true lifecycle events. - The dashboard uses the same control plane as `stem worker` (control queues + command replies), so the UI reflects real worker state. --- ## Core Concepts Understand the building blocks that power Stem. These pages explain how tasks, workers, routing, signals, and canvases fit together so you can reason about behavior before touching production. ### Feature Highlights - Queueing and retries with `Stem.enqueue`, `TaskOptions`, and retry strategies. - `StemClient` entrypoint to share broker/backend configuration across workers and workflow apps. - Worker lifecycle management, concurrency controls, and graceful shutdown. - Beat scheduler for interval/cron/solar/clocked jobs. - Canvas primitives (chains, groups, chords) for task composition. - First-class batch submissions with durable aggregate status inspection. - Lifecycle signals for instrumentation and integrations. - Queue-scoped custom events via `QueueEventsProducer`/`QueueEvents`. - Declarative routing across queues and broadcast channels. - Result backends and progress reporting via `TaskContext`. ### Core Concept Snippets ```dart title="lib/tasks.dart" file=/../packages/stem/example/docs_snippets/lib/tasks.dart#tasks-register-in-memory ``` ```dart title="lib/routing.dart" file=/../packages/stem/example/docs_snippets/lib/routing.dart#routing-inline ``` ```dart title="lib/signals.dart" file=/../packages/stem/example/docs_snippets/lib/signals.dart#signals-configure ``` ```dart title="lib/canvas_chain.dart" file=/../packages/stem/example/docs_snippets/lib/canvas_chain.dart#canvas-chain ``` - **[Tasks & Retries](./tasks.md)** – Task handlers, options, retries, and idempotency guidelines. - **[Producer API](./producer.md)** – Enqueue tasks with args, metadata, signing, and delays. - **[Payload Signing](./signing.md)** – Sign envelopes, rotate keys, and verify signatures. - **[Rate Limiting](./rate-limiting.md)** – Throttle hot handlers with shared limits. - **[Uniqueness](./uniqueness.md)** – Deduplicate naturally unique tasks. - **[Namespaces](./namespaces.md)** – Isolate environments and tenants. - **[Routing](./routing.md)** – Queue aliases, priorities, and broadcast channels. - **[Signals](./signals.md)** – Lifecycle hooks for instrumentation and integrations. - **[Queue Events](./queue-events.md)** – Publish/listen to queue-scoped custom events. - **[Canvas Patterns](./canvas.md)** – Chains, groups, and chords for composing work. - **[Observability](./observability.md)** – Metrics, traces, logging, and lifecycle signals. - **[Persistence & Stores](./persistence.md)** – Result backends, schedule/lock stores, and revocation storage. - **[Workflows](./workflows.md)** – Durable Flow/Script runtimes with typed results, suspensions, and event watchers. - **[CLI & Control](./cli-control.md)** – Quickly inspect queues, workers, and health from the command line. Continue with the [Workers guide](../workers/index.md) for operational details. --- ## Namespaces Namespaces provide logical isolation between environments (dev/staging/prod) or between tenants sharing the same infrastructure. Most Stem components honor the namespace when naming queues, result keys, and control channels. Unless you pass an explicit namespace, adapters default to `stem`. ## What uses the namespace - **Brokers & backends**: queue streams, task results, heartbeats, and dead letters are scoped by the namespace configured on each adapter. - **Schedule & lock stores**: schedule entries and scheduler locks use namespace-prefixed keys/tables so multiple schedulers can share a store. - **Revoke stores**: revocation entries are scoped by namespace. - **Workflow stores**: workflow runs and topic queues are namespaced. - **Control plane**: worker control channels and heartbeat topics use the worker namespace. - **Dashboard**: reads control/observability data using its configured namespace. ## Configure namespaces ```dart title="Broker namespace" file=/../packages/stem/example/docs_snippets/lib/namespaces.dart#namespaces-broker ``` ```dart title="Backend namespace" file=/../packages/stem/example/docs_snippets/lib/namespaces.dart#namespaces-backend ``` ```dart title="Broker + Backend (combined)" file=/../packages/stem/example/docs_snippets/lib/namespaces.dart#namespaces-broker-backend ``` ```dart title="lib/namespaces.dart" file=/../packages/stem/example/docs_snippets/lib/namespaces.dart#namespaces-worker ``` ```dart title="Namespace isolation" file=/../packages/stem/example/docs_snippets/lib/namespaces.dart#namespaces-isolation ``` ## Environment variables Stem does not use a single global namespace variable; instead, namespaces are configured per adapter or via the worker observability settings. The default is `stem` when no override is provided. | Variable | Purpose | Default | | --- | --- | --- | | `STEM_WORKER_NAMESPACE` | Worker heartbeat/control namespace when using `ObservabilityConfig.fromEnvironment` | `stem` | | `STEM_WORKFLOW_NAMESPACE` | Workflow store namespace used by the CLI workflow runner | `stem` | | `STEM_DASHBOARD_NAMESPACE` | Namespace the dashboard reads | falls back to `STEM_NAMESPACE` or `stem` | | `STEM_NAMESPACE` | Dashboard fallback namespace value | `stem` | ## CLI usage When using a non-default namespace, pass it explicitly to worker control commands: ```bash stem worker stats --namespace "staging" stem worker ping --namespace "staging" ``` ## Tips - Keep namespaces consistent across producers, workers, schedulers, and CLI. - Use per-environment namespaces (e.g., `dev`, `staging`, `prod`) to avoid cross- environment interference. - If the dashboard shows no data, verify `STEM_DASHBOARD_NAMESPACE` matches your worker namespace. ## Next steps - See [Workers](../workers/index.md) for worker namespace variables. - See [Dashboard](./dashboard.md) for dashboard namespace settings. --- ## Observability Instrument Stem applications with built-in metrics, traces, and lifecycle signals. ## Metrics Stem exports OpenTelemetry metrics via `StemMetrics`. Enable exporters with environment variables or programmatically. ```bash export STEM_METRIC_EXPORTERS=otlp:http://localhost:4318/v1/metrics export STEM_OTLP_ENDPOINT=http://localhost:4318 ``` ```dart file=/../packages/stem/example/docs_snippets/lib/observability.dart#observability-metrics ``` Common metric names: | Metric | Type | Description | | --- | --- | --- | | `stem.tasks.started` | Counter | Incremented when a worker begins executing a task. | | `stem.tasks.succeeded` / `stem.tasks.failed` | Counter | Outcome counters per task/queue. | | `stem.tasks.retried` | Counter | Number of retries scheduled. | | `stem.task.duration` | Histogram | Task execution time in milliseconds. | | `stem.worker.concurrency` | Gauge | Current active isolates vs configured concurrency. | | `stem.worker.inflight` | Gauge | Messages currently reserved by the worker. | ## Tracing Wrap enqueue and task handlers with traces by enabling the built-in tracer. ```bash export STEM_TRACE_EXPORTER=otlp:http://localhost:4318/v1/traces ``` ```dart file=/../packages/stem/example/docs_snippets/lib/observability.dart#observability-tracing ``` Traces include spans for `stem.enqueue`, `stem.consume`, and task execution. Use attributes (`stem.task`, `stem.queue`, `stem.retry.attempt`) to filter in your tracing backend. ## Signals `StemSignals` fire lifecycle hooks for tasks, workers, scheduler events, and control-plane commands. ```dart file=/../packages/stem/example/docs_snippets/lib/observability.dart#observability-signals ``` ## Workflow Introspection Workflow runtimes can emit step-level events (started/completed/failed/retrying) through a `WorkflowIntrospectionSink`. Use it to publish step telemetry or bridge to your own tracing/logging systems. ```dart class LoggingWorkflowIntrospectionSink implements WorkflowIntrospectionSink { @override Future recordStepEvent(WorkflowStepEvent event) async { stemLogger.info('workflow.step', { 'run': event.runId, 'workflow': event.workflow, 'step': event.stepId, 'type': event.type.name, 'iteration': event.iteration, }); } } ``` ## Logging Use `stemLogger` (Contextual logger) for structured logs. ```dart file=/../packages/stem/example/docs_snippets/lib/observability.dart#observability-logging ``` Workers automatically include attempt, queue, and worker id in log contexts when `StemSignals` are enabled. ## Health checks Run the CLI to verify connectivity before deployments: ```bash stem health --broker "$STEM_BROKER_URL" --backend "$STEM_RESULT_BACKEND_URL" ``` This checks broker/back-end reachability, TLS certificates, and signing configuration, returning a non-zero exit code on failure. ## Dashboards A minimal dashboard typically charts: - Task throughput (`stem.tasks.started`, `stem.tasks.succeeded`, `stem.tasks.failed`). - Retry delay distribution (`stem.tasks.retried`, `stem.task.duration`). - Worker heartbeats and concurrency (`stem.worker.concurrency`, `stem.worker.inflight`). - Scheduler drift (`StemSignals.onScheduleEntryDispatched` drift metrics). Exporters can be mixed—enable console during development and OTLP in staging/ production. For local exploration, run the `examples/otel_metrics` stack to see metrics in a collector + Jaeger pipeline. --- ## Persistence & Stores Use persistence when you need durable task state, shared schedules, or revocation storage. Stem ships with Redis, Postgres, and SQLite adapters plus in-memory variants for local development. ## Result backend ```dart file=/../packages/stem/example/docs_snippets/lib/persistence.dart#persistence-backend-in-memory ``` ```dart file=/../packages/stem/example/docs_snippets/lib/persistence.dart#persistence-backend-sqlite ``` ```dart file=/../packages/stem/example/docs_snippets/lib/persistence.dart#persistence-backend-redis ``` ```dart file=/../packages/stem/example/docs_snippets/lib/persistence.dart#persistence-backend-postgres ``` ### Payload encoders Result backends now respect pluggable `TaskPayloadEncoder`s. Producers encode arguments before publishing, workers decode them once before invoking handlers, and handler return values are encoded before they hit the backend. Every stored status contains the encoder id (`__stemResultEncoder`), letting other processes decode payloads without guessing formats. Configure defaults when bootstrapping `Stem`, `StemApp`, `Canvas`, or workflow apps: ```dart title="lib/bootstrap_encoders.dart" file=/../packages/stem/example/docs_snippets/lib/persistence.dart#persistence-encoders ``` Handlers needing bespoke treatment can override `TaskMetadata.argsEncoder` and `TaskMetadata.resultEncoder`; the worker ensures only that task uses the custom encoder while the rest fall back to the global defaults. ## Schedule & lock stores ```dart title="lib/beat_bootstrap.dart" file=/../packages/stem/example/docs_snippets/lib/persistence.dart#persistence-beat-stores ``` Switch to Postgres with `PostgresScheduleStore.connect` / `PostgresLockStore.connect`. ## Revoke store Store revocations in Redis/Postgres/SQLite so workers can honour `stem worker revoke`: ```bash export STEM_REVOKE_STORE_URL=postgres://postgres:postgres@localhost:5432/stem ``` ```dart title="Postgres revoke store" file=/../packages/stem/example/docs_snippets/lib/persistence.dart#persistence-revoke-store ``` ```dart title="SQLite revoke store" file=/../packages/stem/example/docs_snippets/lib/persistence.dart#persistence-revoke-store-sqlite ``` ## Tips - In-memory adapters are great for local tests; switch to Redis/Postgres when you need persistence or multi-process coordination. - SQLite is single-writer: keep only workers connected to the backend and use a separate SQLite file for the broker. - Postgres adapters automatically migrate required tables on first connect. - Configure TTLs on the result backend via `backend.set` to limit retained data. - For HA Beat deployments, use the same lock store across instances. --- ## Producer API Enqueue tasks from your Dart services using `Stem.enqueue`. Start with the in-memory broker, then opt into Redis/Postgres as needed. ## Enqueue tasks ```dart file=/../packages/stem/example/docs_snippets/lib/producer.dart#producer-in-memory ``` ```dart file=/../packages/stem/example/docs_snippets/lib/producer.dart#producer-redis ``` ```dart file=/../packages/stem/example/docs_snippets/lib/producer.dart#producer-signed ``` ## Typed Enqueue Helpers When you need compile-time guarantees for task arguments and result types, wrap your handler in a `TaskDefinition`. The definition knows how to encode args and decode results, and exposes a fluent builder for overrides (headers, meta, options, scheduling): ```dart title="bin/producer_typed.dart" file=/../packages/stem/example/docs_snippets/lib/producer.dart#producer-typed ``` Typed helpers are also available on `Canvas` (`definition.toSignature`) so group/chain/chord APIs produce strongly typed `TaskResult` streams. Need to tweak headers/meta/queue at call sites? Wrap the definition in a `TaskEnqueueBuilder` and invoke `await builder.enqueueWith(stem);`. ## Enqueue options Use `TaskEnqueueOptions` to override scheduling, routing, retry behavior, and callbacks for a single publish. Common fields include `countdown`, `eta`, `expires`, `queue`, `exchange`, `routingKey`, `priority`, `serializer`, `compression`, `ignoreResult`, `taskId`, `retry`, `retryPolicy`, `link`, and `linkError`. Adapter support varies; for example, not every broker honors priorities or delayed delivery. Stem falls back to best-effort behavior when a capability is unsupported. Example: ```dart await stem.enqueue( 'tasks.email', args: {'to': 'ops@example.com'}, enqueueOptions: TaskEnqueueOptions( countdown: const Duration(seconds: 30), queue: 'critical', retry: true, retryPolicy: TaskRetryPolicy( backoff: true, defaultDelay: const Duration(seconds: 2), maxRetries: 5, ), ), ); ``` ## Tips - Reuse a single `Stem` instance; create it during application bootstrap. - Capture the returned task id when you need to poll status from the result backend. - Use `TaskOptions` to set queue, retries, priority, isolation, and visibility timeouts. - `meta` is stored with result backend entries—great for audit trails. - `headers` travel with the envelope and can carry tracing information. - To schedule tasks in the future, set `notBefore`. - For signing configuration, see [Payload Signing](./signing.md). ## Configuring Payload Encoders Every `Stem`, `StemApp`, `StemWorkflowApp`, and `Canvas` now accepts a `TaskPayloadEncoderRegistry` or explicit `argsEncoder`/`resultEncoder` values. Encoders run exactly once in each direction: producers encode arguments, workers decode them before invoking handlers, and handler return values are encoded before hitting the result backend. Example: ```dart title="lib/bootstrap_typed_encoders.dart" file=/../packages/stem/example/docs_snippets/lib/producer.dart#producer-encoders ``` Handlers needing different encoders can override `TaskMetadata.argsEncoder` and `TaskMetadata.resultEncoder`. The worker automatically stamps every task status with the encoder id (`__stemResultEncoder`), so downstream consumers and adapters always know how to decode stored payloads. Continue with the [Worker guide](../workers/programmatic-integration.md) to consume the tasks you enqueue. --- ## Queue Events Stem supports queue-scoped custom events similar to BullMQ `QueueEvents` and "custom events" patterns. Use this when you need lightweight event streams for domain notifications (`order.created`, `invoice.settled`) without creating task handlers. ## API Surface - `QueueEventsProducer.emit(queue, eventName, payload, headers, meta)` - `QueueEvents.start()` / `QueueEvents.close()` - `QueueEvents.events` stream (all events for that queue) - `QueueEvents.on(eventName)` stream (filtered by name) All events are delivered as `QueueCustomEvent`, which implements `StemEvent`. ## Producer + Listener ```dart title="lib/queue_events.dart" file=/../packages/stem/example/docs_snippets/lib/queue_events.dart#queue-events-producer-listener ``` ## Fan-out to Multiple Listeners Multiple listeners on the same queue receive each emitted event. ```dart title="lib/queue_events.dart" file=/../packages/stem/example/docs_snippets/lib/queue_events.dart#queue-events-fanout ``` ## Semantics - Events are queue-scoped: listeners receive only events for their configured queue. - `on(eventName)` matches exact event names. - `headers` and `meta` round-trip to listeners. - Event names and queue names must be non-empty. - Delivery follows the underlying broker's broadcast behavior for active listeners (no historical replay API is built into `QueueEvents`). ## When to Use Queue Events vs Signals - Use [Signals](./signals.md) for runtime lifecycle hooks (task/worker/scheduler/control). - Use Queue Events for application-domain events you publish and consume. --- ## Rate Limiting Stem supports per-task rate limits via `TaskOptions.rateLimit` and a pluggable `RateLimiter` interface. This lets you throttle hot handlers with a shared Redis-backed limiter or custom driver. Stem also supports group-scoped rate limits with `TaskOptions.groupRateLimit` for shared quotas across multiple task types/tenants. ## Quick start ```dart title="lib/shared.dart" file=/../packages/stem/example/rate_limit_delay/lib/shared.dart#rate-limit-task-options ``` ```dart title="bin/worker.dart" file=/../packages/stem/example/rate_limit_delay/bin/worker.dart#rate-limit-worker ``` ```dart title="bin/producer.dart" file=/../packages/stem/example/rate_limit_delay/bin/producer.dart#rate-limit-producer-enqueue ``` ### Docs snippet (in-memory demo) ```dart title="lib/rate_limiting.dart" file=/../packages/stem/example/docs_snippets/lib/rate_limiting.dart#rate-limit-task-options ``` ```dart title="lib/rate_limiting.dart" file=/../packages/stem/example/docs_snippets/lib/rate_limiting.dart#rate-limit-demo-limiter-config ``` ```dart title="lib/rate_limiting.dart" file=/../packages/stem/example/docs_snippets/lib/rate_limiting.dart#rate-limit-demo-limiter-acquire ``` ```dart title="lib/rate_limiting.dart" file=/../packages/stem/example/docs_snippets/lib/rate_limiting.dart#rate-limit-worker ``` ```dart title="lib/rate_limiting.dart" file=/../packages/stem/example/docs_snippets/lib/rate_limiting.dart#rate-limit-producer ``` ```dart title="lib/rate_limiting.dart" file=/../packages/stem/example/docs_snippets/lib/rate_limiting.dart#rate-limit-demo-registry ``` ```dart title="lib/rate_limiting.dart" file=/../packages/stem/example/docs_snippets/lib/rate_limiting.dart#rate-limit-demo-worker-start ``` ```dart title="lib/rate_limiting.dart" file=/../packages/stem/example/docs_snippets/lib/rate_limiting.dart#rate-limit-demo-stem ``` ```dart title="lib/rate_limiting.dart" file=/../packages/stem/example/docs_snippets/lib/rate_limiting.dart#rate-limit-demo-enqueue ``` ```dart title="lib/rate_limiting.dart" file=/../packages/stem/example/docs_snippets/lib/rate_limiting.dart#rate-limit-demo-shutdown ``` Run the `rate_limit_delay` example for a full demo: - `packages/stem/example/rate_limit_delay` ## Rate limit syntax `rateLimit` accepts short strings like: - `10/s` — 10 tokens per second - `100/m` — 100 tokens per minute - `500/h` — 500 tokens per hour `groupRateLimit` uses the same syntax. ## How it works - The worker parses `rateLimit` for each task. - The worker asks the `RateLimiter` for an acquire decision. - If denied, the task is retried with backoff and `rateLimited=true` metadata. - Retry delays come from the limiter `retryAfter` if provided, otherwise the worker’s retry strategy. - If granted, the task executes immediately. ## Group rate limiting Group rate limits share a limiter bucket across related tasks. - `groupRateLimit`: limiter policy for the shared group bucket - `groupRateKey`: optional static key (if omitted, Stem resolves from header) - `groupRateKeyHeader`: header used when `groupRateKey` is not set (default: `tenant`) - `groupRateLimiterFailureMode` (default: `failOpen`): - `failOpen`: continue execution if limiter backend fails - `failClosed`: requeue/retry when limiter backend fails ```dart title="lib/rate_limiting.dart" file=/../packages/stem/example/docs_snippets/lib/rate_limiting.dart#rate-limit-group-task-options ``` ## Redis-backed limiter example The `example/rate_limit_delay` demo ships a Redis fixed-window limiter. It: - shares tokens across multiple workers, - logs when a token is granted or denied, - reschedules denied tasks with retry metadata. Inspect it here: ```dart title="lib/rate_limiter.dart" file=/../packages/stem/example/rate_limit_delay/lib/rate_limiter.dart#rate-limit-redis-limiter ``` ## Observability When a task is rate limited: - `context.meta['rateLimited']` is set on the retry attempt, - `taskRetry` signals include retry metadata, - worker logs show the limiter decision (if you log it). ## Keying behavior The worker uses a default rate-limit key of: ``` : ``` If no tenant header is set, it defaults to `global`. Add a `tenant` header when enqueuing tasks to enforce per-tenant limits. ## Redis limiter wiring The `rate_limit_delay` example reads `STEM_RATE_LIMIT_URL` to point the limiter at Redis. Use a dedicated Redis DB or key prefix to keep limiter state isolated from your broker/result backend. ```dart title="lib/shared.dart" file=/../packages/stem/example/rate_limit_delay/lib/shared.dart#rate-limit-redis-connector ``` ## Tips - Use shared Redis for global limits across worker processes. - Keep the rate limit key stable (by default it uses task name + tenant). - Start with generous limits, then tighten after observing throughput. ## Next steps - See [Tasks & Retries](./tasks.md) for other `TaskOptions` knobs. - Use [Observability](./observability.md) to instrument rate-limited flows. --- ## Routing Configuration Stem workers and publishers resolve queue and broadcast targets from the routing file referenced by `STEM_ROUTING_CONFIG`. The file is parsed into a typed `RoutingConfig`, validated by the `RoutingRegistry`, and used by helpers such as `buildWorkerSubscription`. The loader helpers (`RoutingConfigLoader`, `WorkerSubscriptionBuilder`, `buildWorkerSubscription`) live in the `stem_cli` package and are used by the CLI to produce friendly diagnostics. In application code you can use the core `stem` types directly (see “Loading subscriptions”). When no file is supplied Stem falls back to a legacy single-queue configuration. ```yaml default_queue: alias: default queue: primary fallbacks: - secondary queues: primary: exchange: jobs routing_key: jobs.default priority_range: [0, 9] secondary: {} broadcasts: control: delivery: at-least-once updates: delivery: at-most-once routes: - match: task: reports.* target: type: queue name: primary - match: task: control.* target: type: broadcast name: control ``` ```dart file=/../packages/stem/example/docs_snippets/lib/routing.dart#routing-load ``` ## Queue priority ranges - Each queue definition accepts an optional `priority_range` either as a two element list (`[min, max]`) or an object (`{ min: 0, max: 9 }`). - When omitted, the queue defaults to `min: 0`, `max: 9`. Values outside the range trigger a `FormatException` during load. - The routing registry clamps any priority assigned via `RoutingInfo.priority` or route overrides into the configured range, guaranteeing that both Redis and Postgres brokers store buckets within `[min, max]`. - Publishers may continue to set `Envelope.priority`; the registry will respect that hint when resolving a route. When no range is defined the value is clamped to `[0, 9]`. ```dart title="lib/routing.dart" file=/../packages/stem/example/docs_snippets/lib/routing.dart#routing-priority-range ``` ## Broadcast channels - Add broadcast channels under `broadcasts`; each entry is keyed by the logical channel name and may declare: - `delivery`: semantic hint for consumers. `at-least-once` is the default and matches the behaviour of both Redis Streams and Postgres fan-out tables. - `durability`: optional metadata surfaced in `RoutingInfo.meta`. Current brokers treat it as an advisory flag. - Routes may target broadcasts via `target: { type: broadcast, name: channel }` and workers subscribe by listing the channel under `STEM_WORKER_BROADCASTS` or via the CLI `--broadcast` flag. - Broadcast deliveries reuse the same envelope payload; brokers set `RoutingInfo.broadcastChannel` to the logical channel and ensure each subscriber receives the message exactly once when acked. ## Loading subscriptions - `RoutingConfigLoader`, `WorkerSubscriptionBuilder`, and `buildWorkerSubscription` live in the `stem_cli` package. The CLI uses them to provide friendly diagnostics and to honor `--queue`/`--broadcast` when building worker subscriptions. - In application code you can either import `package:stem_cli/stem_cli.dart` (see `packages/stem/example/email_service`) or build the registry directly: load YAML with `RoutingConfig.fromYaml`, then construct a `RoutingRegistry` and `RoutingSubscription` yourself. ## Using the config in Dart Load the routing file once during service start-up and reuse the registry across producers and workers: ```dart title="lib/routing.dart" file=/../packages/stem/example/docs_snippets/lib/routing.dart#routing-bootstrap ``` For lightweight services or tests, you can construct the registry inline: ```dart file=/../packages/stem/example/docs_snippets/lib/routing.dart#routing-inline ``` Both approaches keep routing logic declarative while letting you evolve queue topology without editing code. --- ## Stem Signals Stem exposes lifecycle signals so instrumentation can react to publish, worker, scheduler, workflow, and control-plane events without modifying runtime code. All signal payloads implement `StemEvent` and dispatch through `Signal`, giving every handler a shared event shape: - `eventName` - `occurredAt` - `attributes` ## Signal Catalog | Category | Stem Signal | Payload Highlights | Celery Equivalent | | --- | --- | --- | --- | | Publish | `beforeTaskPublish`, `afterTaskPublish` | `Envelope`, attempt metadata, task id | `before_task_publish`, `after_task_publish` | | Worker lifecycle | `workerInit`, `workerReady`, `workerStopping`, `workerShutdown`, `workerHeartbeat`, `workerChildInit`, `workerChildShutdown` | `WorkerInfo`, optional reason/timestamps | `worker_init`, `worker_ready`, `worker_shutting_down`, `worker_shutdown`, `heartbeat_sent`, `worker_process_init/shutdown` | | Task lifecycle | `taskReceived`, `taskPrerun`, `taskPostrun`, `taskRetry`, `taskSucceeded`, `taskFailed`, `taskRevoked` | `Envelope`, `WorkerInfo`, attempt, result/error context | `task_received`, `task_prerun`, `task_postrun`, `task_retry`, `task_success`, `task_failure`, `task_revoked` | | Workflow lifecycle | `workflowRunStarted`, `workflowRunSuspended`, `workflowRunResumed`, `workflowRunCompleted`, `workflowRunFailed`, `workflowRunCancelled` | run id, workflow name, status, optional step metadata | n/a | | Scheduler | `scheduleEntryDue`, `scheduleEntryDispatched`, `scheduleEntryFailed` | `ScheduleEntry`, tick timestamp, drift, error stack | `beat_scheduler_ready`, `beat_schedule` | | Control plane | `controlCommandReceived`, `controlCommandCompleted` | `ControlCommandMessage`, reply status, payload/error maps | `control_command_sent`, `control_command_received` | ## Ordering & Dispatch Semantics - `beforeTaskPublish` fires immediately before broker IO; `afterTaskPublish` runs once persistence succeeds. - `taskReceived` is emitted when a worker claims/dequeues a task. - `taskPrerun` fires immediately before handler invocation. - Execution ordering is `taskReceived` -> `taskPrerun` -> handler -> `taskPostrun`. - Worker lifecycle follows `workerInit` -> `workerReady` -> optional `workerStopping` -> `workerShutdown`. - Scheduler signals emit due -> dispatched/failed. - Dispatch is sequential and priority-aware; `async` callbacks are awaited. - Listener errors are routed to `StemSignals.configure(onError: ...)` and do not crash the worker loop. - `SignalContext.cancel()` stops lower-priority listeners for the current emit. ## Configuration Use `StemSignals.configure` or supply environment variables consumed by `ObservabilityConfig.fromEnvironment`: ```dart file=/../packages/stem/example/docs_snippets/lib/signals.dart#signals-configure ``` Environment knobs: - `STEM_SIGNALS_ENABLED=false` disables all signals. - `STEM_SIGNALS_DISABLED=worker-heartbeat,task-prerun` disables selected ones. ## Listening for Signals ```dart title="lib/signals.dart" file=/../packages/stem/example/docs_snippets/lib/signals.dart#signals-publish-listeners ``` ```dart title="lib/signals.dart" file=/../packages/stem/example/docs_snippets/lib/signals.dart#signals-task-listeners ``` ```dart title="lib/signals.dart" file=/../packages/stem/example/docs_snippets/lib/signals.dart#signals-worker-listeners ``` ```dart title="lib/signals.dart" file=/../packages/stem/example/docs_snippets/lib/signals.dart#signals-worker-scoped ``` ```dart title="lib/signals.dart" file=/../packages/stem/example/docs_snippets/lib/signals.dart#signals-scheduler-listeners ``` ```dart title="lib/signals.dart" file=/../packages/stem/example/docs_snippets/lib/signals.dart#signals-control-listeners ``` ```dart title="lib/signals.dart" file=/../packages/stem/example/docs_snippets/lib/signals.dart#signals-stem-event ``` Worker-scoped filtering is available on these convenience helpers: - `onWorkerInit`, `onWorkerReady`, `onWorkerStopping`, `onWorkerShutdown` - `onWorkerHeartbeat`, `onWorkerChildInit`, `onWorkerChildShutdown` - `onTaskReceived`, `onTaskPrerun`, `onTaskPostrun`, `onTaskSuccess`, `onTaskFailure`, `onTaskRetry`, `onTaskRevoked` - `onControlCommandReceived`, `onControlCommandCompleted` ## Custom Queue Events Signals cover runtime lifecycle hooks. For application-domain events (BullMQ `QueueEvents` style), use [`QueueEventsProducer` and `QueueEvents`](./queue-events.md). ## Adapters & Middleware - `StemSignalEmitter` builds payloads and emits signals; Stem runtime uses this same emitter internally. - `SignalMiddleware.coordinator()` forwards enqueue middleware to publish signals. - `SignalMiddleware.worker()` emits receive/prerun/failure hooks from existing worker middleware chains. ```dart title="lib/signals.dart" file=/../packages/stem/example/docs_snippets/lib/signals.dart#signals-middleware-producer ``` ```dart title="lib/signals.dart" file=/../packages/stem/example/docs_snippets/lib/signals.dart#signals-middleware-worker ``` ## Celery Comparison | Celery | Stem | Notes | | --- | --- | --- | | `task_prerun` / `task_postrun` | `taskPrerun` / `taskPostrun` | Payload includes `TaskContext` and worker metadata. | | `worker_ready` | `workerReady` | Worker-scoped filters available via `onWorkerReady(workerId: ...)`. | | `worker_process_init/shutdown` | `workerChildInit` / `workerChildShutdown` | Mirrors isolate pool spawn/recycle notifications. | | `before_task_publish` | `beforeTaskPublish` | Fires before broker writes. | | `beat_schedule` | `scheduleEntryDispatched` | Carries scheduled vs executed timestamps plus drift duration. | Signals tied to Celery-specific pools remain out of scope. --- ## Payload Signing Stem can sign every task envelope so workers can detect tampering or untrusted publishers. This guide focuses on newcomers: how signing works, how to wire it into your app, and where to look when something fails. ## Why sign envelopes? Signing lets workers verify that the envelope payload (args, headers, metadata, and timing fields) is unchanged between the producer and the broker. When signing is enabled on workers, any envelope missing a signature or carrying an invalid signature is rejected and moved to the DLQ with a `signature-invalid` reason. ## How signing works in Stem - Producers create a `PayloadSigner` from environment-derived config and pass it into `Stem` to sign new envelopes. - Workers create the same signer (or verification-only config) and pass it into `Worker` to verify each delivery. - Schedulers/Beat that enqueue tasks should also sign. - Signatures are stored in envelope headers: `stem-signature` and `stem-signature-key`. Signing is opt-in: if no signing keys are configured, envelopes are sent and accepted unsigned. ## Quick start (HMAC) 1) Generate a shared secret and export signing variables on **every** producer and worker (and any scheduler that enqueues tasks): ```bash export STEM_SIGNING_ALGORITHM=hmac-sha256 export STEM_SIGNING_KEYS="v1:$(openssl rand -base64 32)" export STEM_SIGNING_ACTIVE_KEY=v1 ``` 2) Wire the signer into producers, workers, and schedulers. These snippets come from the `example/microservice` project so you can see the full context. Load signing config once at startup: ```dart title="example/microservice/enqueuer/bin/main.dart" file=/../packages/stem/example/microservice/enqueuer/bin/main.dart#signing-producer-config ``` Create a signer from that config: ```dart title="example/microservice/enqueuer/bin/main.dart" file=/../packages/stem/example/microservice/enqueuer/bin/main.dart#signing-producer-signer ``` Attach the signer to the producer so envelopes are signed: ```dart title="example/microservice/enqueuer/bin/main.dart" file=/../packages/stem/example/microservice/enqueuer/bin/main.dart#signing-producer-stem ``` Load signing config once at startup: ```dart title="example/microservice/worker/bin/worker.dart" file=/../packages/stem/example/microservice/worker/bin/worker.dart#signing-worker-config ``` If your worker only needs to verify, the signer can be created from public keys: ```dart title="example/microservice/worker/bin/worker.dart" file=/../packages/stem/example/microservice/worker/bin/worker.dart#signing-worker-signer ``` Attach the signer to the worker so signatures are verified: ```dart title="example/microservice/worker/bin/worker.dart" file=/../packages/stem/example/microservice/worker/bin/worker.dart#signing-worker-wire ``` Load signing config once at startup: ```dart title="example/microservice/beat/bin/beat.dart" file=/../packages/stem/example/microservice/beat/bin/beat.dart#signing-beat-config ``` Schedulers that enqueue tasks should also sign: ```dart title="example/microservice/beat/bin/beat.dart" file=/../packages/stem/example/microservice/beat/bin/beat.dart#signing-beat-signer ``` ```dart title="example/microservice/beat/bin/beat.dart" file=/../packages/stem/example/microservice/beat/bin/beat.dart#signing-beat-wire ``` ## Ed25519 (asymmetric signing) Ed25519 keeps private keys only on producers while workers verify with public keys. 1) Generate keys and export the values: ```bash dart run scripts/security/generate_ed25519_keys.dart ``` 2) Set variables on producers, workers, and schedulers: ```bash export STEM_SIGNING_ALGORITHM=ed25519 export STEM_SIGNING_PUBLIC_KEYS=primary: export STEM_SIGNING_PRIVATE_KEYS=primary: export STEM_SIGNING_ACTIVE_KEY=primary ``` 3) For workers, you may omit `STEM_SIGNING_PRIVATE_KEYS` if you only want to verify signatures. ## Key rotation (safe overlap) 1) Add the new key alongside the old one in your key list. 2) Update `STEM_SIGNING_ACTIVE_KEY` on producers first. 3) Roll workers (they accept all configured keys). 4) Remove the old key after the backlog drains. Example: producer logging the active key and enqueuing during rotation (from `example/signing_key_rotation`): ```dart title="example/signing_key_rotation/bin/producer.dart" file=/../packages/stem/example/signing_key_rotation/bin/producer.dart#signing-rotation-producer-active-key ``` ```dart title="example/signing_key_rotation/bin/producer.dart" file=/../packages/stem/example/signing_key_rotation/bin/producer.dart#signing-rotation-producer-enqueue ``` ## Reference: signing environment variables | Variable | Purpose | Notes | | --- | --- | --- | | `STEM_SIGNING_ALGORITHM` | `hmac-sha256` (default) or `ed25519` | Defaults to HMAC. | | `STEM_SIGNING_KEYS` | HMAC secrets (`keyId:base64`) | Comma-separated list. Required for HMAC. | | `STEM_SIGNING_ACTIVE_KEY` | Key id used for new signatures | Required when signing. | | `STEM_SIGNING_PUBLIC_KEYS` | Ed25519 public keys (`keyId:base64`) | Comma-separated list. Required for Ed25519. | | `STEM_SIGNING_PRIVATE_KEYS` | Ed25519 private keys (`keyId:base64`) | Only needed by signers. | ## Failure behavior & troubleshooting - Missing or invalid signatures are dead-lettered with reason `signature-invalid` and increment the `stem.tasks.signature_invalid` metric. - If you see `signature-invalid` in the DLQ, confirm all producers are signing and that workers have the same key set. - If the active key id is not present in the key list, producers will fail fast when trying to sign. ## Next steps - Review [Prepare for Production](../getting-started/production-checklist.md) for TLS guidance and deployment hardening. - Use the [Producer API](./producer.md) guide for advanced enqueue patterns. --- ## Tasks & Retries Tasks are the units of work executed by Stem workers. Each task is represented by a handler registered in a `TaskRegistry`. Handlers expose metadata through `TaskOptions`, which control routing, retry behavior, timeouts, and isolation. ## Registering Handlers ```dart file=/../packages/stem/example/docs_snippets/lib/tasks.dart#tasks-register-in-memory ``` ```dart file=/../packages/stem/example/docs_snippets/lib/tasks.dart#tasks-register-redis ``` ## Typed Task Definitions Stem ships with `TaskDefinition` so producers get compile-time checks for required arguments and result types. A definition bundles the task name, argument encoder, optional metadata, and default `TaskOptions`. Build a call with `.call(args)` or `TaskEnqueueBuilder` and hand it to `Stem.enqueueCall` or `Canvas` helpers: ```dart file=/../packages/stem/example/docs_snippets/lib/tasks.dart#tasks-typed-definition ``` Typed results flow through `TaskResult` when you call `Stem.waitForTask`, `Canvas.group`, `Canvas.chain`, or `Canvas.chord`. Supplying a custom `decode` callback on the task signature lets you deserialize complex objects before they reach application code. ## Configuring Retries Workers apply an `ExponentialJitterRetryStrategy` by default. Each retry is scheduled by publishing a new envelope with an updated `notBefore`. Control retry cadence by: - Setting `TaskOptions.maxRetries` (initial attempt + `maxRetries`). - Supplying a custom `RetryStrategy` to the worker. - Tuning the broker connection (e.g. Redis `blockTime`, `claimInterval`, `defaultVisibilityTimeout`) so delayed messages are drained quickly. See the `examples/retry_task` Compose demo for a runnable setup that prints every retry signal and shows how the strategy interacts with broker timings. ```dart title="lib/retry_backoff.dart" file=/../packages/stem/example/docs_snippets/lib/retry_backoff.dart#retry-backoff-strategy ``` ```dart title="lib/retry_backoff.dart" file=/../packages/stem/example/docs_snippets/lib/retry_backoff.dart#retry-backoff-worker ``` ## Task Context `TaskContext` provides metadata and control helpers: - `context.attempt` – current attempt number (0-based). - `context.heartbeat()` – extend the lease to avoid timeouts. - `context.extendLease(Duration by)` – request additional processing time. - `context.progress(percent, data: {...})` – emit progress signals for UI hooks. Use the context to build idempotent handlers. Re-enqueue work, cancel jobs, or store audit details in `context.meta`. See the `example/task_context_mixed` demo for a runnable sample that exercises inline + isolate enqueue, TaskRetryPolicy overrides, and enqueue options. The `example/task_usage_patterns.dart` sample shows in-memory TaskContext and TaskInvocationContext patterns without external dependencies. ### Enqueue from a running task Use `TaskContext.enqueue`/`spawn` to schedule follow-up work with the same defaults as `Stem.enqueue`. For isolate entrypoints, `TaskInvocationContext` exposes the same API plus the fluent builder. ```dart file=/../packages/stem/example/docs_snippets/lib/tasks.dart#tasks-context-enqueue ``` Inside isolate entrypoints: ```dart file=/../packages/stem/example/docs_snippets/lib/tasks.dart#tasks-invocation-builder ``` ### Retry from a running task Handlers can request a retry directly from the context: ```dart await context.retry(countdown: const Duration(seconds: 10)); ``` Retries respect `TaskOptions.retryPolicy` unless you override it with `TaskEnqueueOptions.retryPolicy` or `context.retry(retryPolicy: ...)`. ### Retry policy overrides `TaskRetryPolicy` captures backoff controls and can be applied per handler or per enqueue: ```dart final options = TaskOptions( maxRetries: 3, retryPolicy: TaskRetryPolicy( backoff: true, defaultDelay: const Duration(seconds: 1), backoffMax: const Duration(seconds: 30), ), ); ``` ## Isolation & Timeouts Set soft/hard timeouts to guard against runaway tasks: ```dart file=/../packages/stem/example/docs_snippets/lib/tasks.dart#tasks-timeouts ``` - **Soft timeouts** trigger `WorkerEventType.timeout` so you can log or notify. - **Hard timeouts** raise `TimeoutException` to force retries or failure. - Provide an `isolateEntrypoint` to run the task in a dedicated isolate when enforcing hard limits or dealing with CPU-intensive code. ## Idempotency Checklist - Make task inputs explicit (`args`, `headers`, `meta`). - Guard external calls with idempotency keys. - Store state transitions atomically (e.g. using Postgres or Redis transactions). - Set `TaskOptions.unique`/`uniqueFor` for naturally unique jobs (see [Uniqueness](./uniqueness.md)). - Use `TaskOptions.rateLimit` with a worker `RateLimiter` to throttle hot tasks (see [Rate Limiting](./rate-limiting.md)). With these practices in place, tasks can be retried safely and composed via chains, groups, and chords (see [Canvas Patterns](./canvas.md)). ## Task Payload Encoders Handlers often need to encrypt, compress, or otherwise transform arguments and results before they leave the process. Stem exposes `TaskPayloadEncoder` so you can swap out the default JSON pass-through behavior: ```dart title="Encoders/global.dart" file=/../packages/stem/example/docs_snippets/lib/tasks.dart#tasks-encoders-global ``` Workers automatically decode arguments once (`stem-args-encoder` header / `__stemArgsEncoder` meta) and encode results once (`__stemResultEncoder` meta) before writing to the backend. When you need task-specific behavior, set the metadata overrides: ```dart file=/../packages/stem/example/docs_snippets/lib/tasks.dart#tasks-encoders-metadata ``` Because encoders are centrally registered inside the `TaskPayloadEncoderRegistry`, every producer/worker instance that shares the registry can resolve encoder ids reliably—even across processes or languages. --- ## Uniqueness & Deduplication Stem can prevent duplicate enqueues for naturally unique tasks. Enable `TaskOptions.unique` and configure a `UniqueTaskCoordinator` backed by a `LockStore` (Redis or in-memory). ## Quick start 1) Mark the task as unique: ```dart title="lib/uniqueness.dart" file=/../packages/stem/example/docs_snippets/lib/uniqueness.dart#uniqueness-task-options ``` 2) Create a coordinator with a shared lock store: ```dart title="lib/uniqueness.dart" file=/../packages/stem/example/docs_snippets/lib/uniqueness.dart#uniqueness-coordinator-inmemory ``` ```dart title="lib/uniqueness.dart" file=/../packages/stem/example/docs_snippets/lib/uniqueness.dart#uniqueness-coordinator-redis ``` 3) Wire the coordinator into the producer and worker: ```dart title="lib/uniqueness.dart" file=/../packages/stem/example/docs_snippets/lib/uniqueness.dart#uniqueness-stem-worker ``` ## How uniqueness works - The unique key is derived from task name, queue, args, headers, and meta (excluding keys prefixed with `stem.`). - Keys are canonicalized (sorted maps, stable JSON) to ensure equivalent inputs hash to the same key. - Use `uniqueFor` to control the lock TTL. When unset, Stem falls back to the task `visibilityTimeout`, envelope visibility timeout, or the coordinator default TTL (in that order). ## Override the unique key Override the computed key when you need custom grouping: ```dart title="lib/uniqueness.dart" file=/../packages/stem/example/docs_snippets/lib/uniqueness.dart#uniqueness-override-key ``` ## What happens on duplicates - Duplicate enqueues return the existing task id. - The `stem.tasks.deduplicated` metric increments. - Duplicates are recorded on the task status metadata under `stem.unique.duplicates`. ## Release semantics Unique locks are released after task completion (success or failure) by the worker that holds the lock. If a worker crashes mid-task, the lock expires when the TTL elapses, allowing a future enqueue. ## Retry behavior Retries keep the original task id. Uniqueness is evaluated at enqueue time, so retries do not create new unique claims. ## Example The snippet below shows the enqueue behavior (see `unique_tasks` for a full demo): ```dart title="lib/uniqueness.dart" file=/../packages/stem/example/docs_snippets/lib/uniqueness.dart#uniqueness-enqueue ``` ## Next steps - See [Tasks & Retries](./tasks.md) for other `TaskOptions` settings. - Combine uniqueness with [Rate Limiting](./rate-limiting.md) to guard hot paths. --- ## Workflows Stem Workflows let you orchestrate multi-step business processes with durable state, typed results, automatic retries, and event-driven resumes. The `StemWorkflowApp` helper wires together a `Stem` instance, workflow store, event bus, and runtime so you can start runs, monitor progress, and interact with suspended steps from one place. ## Runtime Overview ```dart title="bin/workflows.dart" file=/../packages/stem/example/docs_snippets/lib/workflows.dart#workflows-app-create ``` Start the runtime once the app is constructed: ```dart file=/../packages/stem/example/docs_snippets/lib/workflows.dart#workflows-app-start ``` `StemWorkflowApp` exposes: - `runtime` – registers `Flow`/`WorkflowScript` definitions and dequeues runs. - `store` – persists checkpoints, suspension metadata, and results. - `eventBus` – emits topics that resume waiting steps. - `app` – the underlying `StemApp` (broker + result backend + worker). ## StemClient Entrypoint `StemClient` is the shared entrypoint when you want a single object to own the broker, result backend, and workflow helpers. It creates workflow apps and workers with consistent configuration so you don't pass broker/backend handles around. ```dart title="bin/workflows_client.dart" file=/../packages/stem/example/docs_snippets/lib/workflows.dart#workflows-client ``` ## Declaring Typed Flows Flows use the declarative DSL (`FlowBuilder`) to capture ordered steps. Specify `Flow` to document the completion type; generic metadata is preserved all the way through `WorkflowResult`. ```dart title="lib/workflows/approvals_flow.dart" file=/../packages/stem/example/docs_snippets/lib/workflows.dart#workflows-flow ``` Steps re-run from the top after every suspension, so handlers must be idempotent and rely on `FlowContext` helpers: `iteration`, `takeResumeData`, `sleep`, `awaitEvent`, `idempotencyKey`, and persisted step outputs. ## Workflow Scripts `WorkflowScript` offers a higher-level facade that feels like a regular async function. You still get typed results and step-level durability, but the DSL handles `ctx.step` registration automatically. ```dart title="lib/workflows/retry_script.dart" file=/../packages/stem/example/docs_snippets/lib/workflows.dart#workflows-script ``` Scripts can enable `autoVersion: true` inside `script.step` calls to track loop iterations using the `stepName#iteration` naming convention. ## Annotated Workflows (stem_builder) If you prefer decorators over the DSL, annotate workflow classes and tasks with `@WorkflowDefn`, `@workflow.run`, `@workflow.step`, and `@TaskDefn`, then generate the registry with `stem_builder`. ```dart title="lib/workflows/annotated.dart" file=/../packages/stem/example/docs_snippets/lib/workflows.dart#workflows-annotated ``` Build the registry (example): ```bash dart pub add --dev build_runner stem_builder dart run build_runner build ``` ## Starting & Awaiting Workflows ```dart title="bin/run_workflow.dart" file=/../packages/stem/example/docs_snippets/lib/workflows.dart#workflows-run ``` `waitForCompletion` returns a `WorkflowResult` that includes the decoded value, original `RunState`, and a `timedOut` flag so callers can decide whether to keep polling or surface status upstream. ### Cancellation policies `WorkflowCancellationPolicy` guards long-running runs. Use it to auto-cancel workflows that exceed a wall-clock budget or remain suspended longer than allowed. ## Suspension, Events, and Groups of Runs - `sleep(duration)` stores a wake-up timestamp; the runtime polls `dueRuns` and resumes those runs by re-enqueuing the internal workflow task. - `awaitEvent(topic, deadline: ...)` registers durable watchers so external services can `emit(topic, payload)`. The payload becomes `resumeData` for the awaiting step. - `runsWaitingOn(topic)` exposes all runs suspended on a channel—useful for CLI tooling or dashboards. After a topic resumes the runtime calls `markResumed(runId, data: suspensionData)` so flows can inspect the payload. Because watchers and due runs are persisted in the `WorkflowStore`, you can operate on *groups* of workflows (pause, resume, or inspect every run waiting on a topic) even if no worker is currently online. ## Run Leases & Multi-Worker Recovery Workflow runs are lease-based: a worker claims a run for a fixed duration, renews the lease while executing, and releases it on completion. This prevents two workers from executing the same run concurrently while still allowing takeover after crashes. Operational guidance: - Keep `runLeaseDuration` **>=** the broker visibility timeout so redelivered workflow tasks retry instead of being dropped before the lease expires. - Ensure workers renew leases (`leaseExtension`) before either the workflow lease or broker visibility timeout expires. - Keep system clocks in sync (NTP) because lease expiry is time-based across workers and the shared store. ## Deterministic Tests with WorkflowClock Inject a `WorkflowClock` when you need deterministic timestamps (e.g. for lease expiry or due run scheduling). The `FakeWorkflowClock` lets tests advance time without waiting on real timers. ```dart final clock = FakeWorkflowClock(DateTime.utc(2024, 1, 1)); final store = InMemoryWorkflowStore(clock: clock); final runtime = WorkflowRuntime( stem: stem, store: store, eventBus: InMemoryEventBus(store: store), clock: clock, ); ``` ## Payload Encoders in Workflow Apps Workflows execute on top of a `Stem` worker, so they inherit the same `TaskPayloadEncoder` facilities as regular tasks. `StemWorkflowApp.create` accepts either a shared `TaskPayloadEncoderRegistry` or explicit defaults: ```dart title="lib/workflows/bootstrap.dart" file=/../packages/stem/example/docs_snippets/lib/workflows.dart#workflows-encoders ``` Every workflow run task stores the result encoder id in `RunState.resultMeta`, and the internal tasks dispatched by workflows reuse the same registry—so typed steps can safely emit encrypted/binary payloads while workers decode them exactly once. Need per-workflow overrides? Register custom encoders on individual task handlers (via `TaskMetadata`) or attach a specialized encoder to a `Flow`/script step that persists sensitive data in the workflow store. ## Tooling Tips - Use `workflowApp.store.listRuns(...)` to filter by workflow/status when building admin dashboards. - `workflowApp.runtime.emit(topic, payload)` is the canonical way to resume batches of runs waiting on external events. - CLI integrations (see `stem workflow ...`) rely on the same store APIs, so keeping the store tidy (expired runs, watchers) ensures responsive tooling. --- ## Best Practices These guidelines help keep task systems reliable and observable as you scale. They are framework-agnostic and apply directly to Stem. ## Task design - Keep task arguments small and serializable. - Store large payloads in object storage and pass references instead. - Make tasks idempotent; assume retries can happen. - Wrap enqueue + state changes in a transaction or outbox pattern when interacting with databases. - Prefer deterministic task names and queues for routing clarity. - Surface structured metadata for tracing and auditing. ```dart title="lib/best_practices.dart" file=/../packages/stem/example/docs_snippets/lib/best_practices.dart#best-practices-task ``` ```dart title="lib/best_practices.dart" file=/../packages/stem/example/docs_snippets/lib/best_practices.dart#best-practices-enqueue ``` ## Error handling - Treat transient failures as retryable; use explicit backoff policies. - Fail fast on validation errors to avoid wasted retries. - Send poison-pill tasks to a DLQ and fix root causes before replaying. ```dart title="lib/workers_programmatic.dart" file=/../packages/stem/example/docs_snippets/lib/workers_programmatic.dart#workers-worker-retry ``` ## Concurrency & load - Start with conservative concurrency and scale up with metrics. - Embrace concurrency by running more worker processes instead of single hot loops. - Use rate limits for hot handlers or fragile downstreams. - Avoid long-running inline loops without heartbeats or progress signals. ```dart title="lib/rate_limiting.dart" file=/../packages/stem/example/docs_snippets/lib/rate_limiting.dart#rate-limit-task ``` ## Observability - Emit lifecycle signals early so you can build dashboards from day one. - Track queue depth, retry rates, and DLQ volume as leading indicators. - Correlate task IDs with business logs for easier incident response. ```dart title="lib/signals.dart" file=/../packages/stem/example/docs_snippets/lib/signals.dart#signals-configure ``` ## Operations - Separate environments with namespaces and credentials. - Bake health checks into deploy pipelines. - Automate rotation of signing keys and TLS certificates. ```dart title="lib/production_checklist.dart" file=/../packages/stem/example/docs_snippets/lib/production_checklist.dart#production-signing-config ``` ## Terminology clarity - **Task**: a unit of work executed by a worker. - **Queue**: the channel tasks are routed through. - **Worker**: the process that consumes tasks and executes handlers. - **Backend**: the store for task results and group state. ## Next steps - [Observability & Ops](./observability-and-ops.md) - [Production Checklist](./production-checklist.md) - [Tasks & Retries](../core-concepts/tasks.md) --- ## Connect to Infrastructure Graduate from the in-memory demo to a multi-process setup backed by Redis or Postgres. You will run workers, Beat, and the CLI in separate terminals while exploring routing, broadcast delivery, and canvas composition with persistent storage. ## 1. Run Redis and Postgres Locally Docker is the fastest way to spin up dependencies: ```bash # Redis Streams for broker, locks, rate limiting, and schedules. docker run --rm -p 6379:6379 redis:7-alpine # Postgres for durable task results or schedule storage (optional now, useful later). docker run --rm -p 5432:5432 \ -e POSTGRES_PASSWORD=postgres \ postgres:14 ``` Export the connection details so producers, workers, and Beat share them: ```bash export STEM_BROKER_URL=redis://localhost:6379 export STEM_RESULT_BACKEND_URL=redis://localhost:6379/1 export STEM_SCHEDULE_STORE_URL=redis://localhost:6379/2 export STEM_CONTROL_NAMESPACE=stem ``` ## 2. Bootstrap Stem Config Use `StemConfig.fromEnvironment()` to hydrate adapters from the environment and share them across your app. Split the bootstrap into smaller steps so each piece is easy to scan and reuse: ### Load configuration ```dart title="lib/stem_bootstrap.dart" file=/../packages/stem/example/docs_snippets/lib/developer_environment.dart#dev-env-config ``` ### Connect adapters ```dart title="lib/stem_bootstrap.dart" file=/../packages/stem/example/docs_snippets/lib/developer_environment.dart#dev-env-adapters ``` ### Create the Stem producer ```dart title="lib/stem_bootstrap.dart" file=/../packages/stem/example/docs_snippets/lib/developer_environment.dart#dev-env-stem ``` ### Create the worker ```dart title="lib/stem_bootstrap.dart" file=/../packages/stem/example/docs_snippets/lib/developer_environment.dart#dev-env-worker ``` Together, these steps give you access to routing, rate limiting, revoke storage, and queue configuration—all backed by Redis. ## 3. Launch Workers, Beat, and Producers With the environment configured, run Stem components from separate terminals: ```bash # Terminal 1 — run a worker process (set STEM_WORKER_COMMAND or pass --command). export STEM_WORKER_COMMAND="dart run bin/worker.dart" stem worker multi start alpha --queue default --queue reports --queue emails # Terminal 2 — apply schedules and run Beat (Dart entrypoint). stem schedule apply --file config/schedules.json --yes stem schedule list dart run packages/stem/example/scheduler_observability/bin/beat.dart ``` Use a producer entrypoint to enqueue work: ```dart title="lib/producer.dart" file=/../packages/stem/example/docs_snippets/lib/producer.dart#producer-redis ``` Routing configuration supports default queue aliases, glob-based routing rules, and broadcast channels. A minimal `config/routing.yaml` might look like: ```yaml title="config/routing.yaml" default_queue: critical queues: reports: routing_key: reports.generate priority_range: [2, 7] emails: routing_key: billing.email-* broadcasts: maintenance: delivery: fanout ``` Stem clamps priorities to queue-defined ranges and publishes broadcast tasks to all subscribed workers exactly once per acknowledgement window. ## 4. Coordinate Work with Canvas and Result Backend Now that Redis backs the result store, you can orchestrate more complex pipelines and query progress from any process: ```dart file=/../packages/stem/example/docs_snippets/lib/developer_environment.dart#dev-env-canvas ``` Later, you can monitor status from any machine: ```dart file=/../packages/stem/example/docs_snippets/lib/developer_environment.dart#dev-env-status ``` ## 5. Listen to Signals for Cross-Cutting Integrations Signals surface lifecycle milestones that you can pipe into analytics or incident tooling: ```dart title="lib/signals.dart" file=/../packages/stem/example/docs_snippets/lib/developer_environment.dart#dev-env-signals ``` Call `installSignalHandlers()` during bootstrap before workers or producers start emitting events. ## 6. What’s Next - Keep the infrastructure running and head to [Observe & Operate](./observability-and-ops.md) to enable telemetry, inspect heartbeats, replay DLQs, and issue remote control commands. - Browse the runnable examples under `examples/` for Redis/Postgres, mixed-cluster, autoscaling, scheduler observability, and signing-key rotation drills you can adapt to your environment. --- ## First Steps This walkthrough stays in-memory so you can learn the pipeline without running external services. It defines a task, starts a worker, enqueues a message, then verifies the result inside a single Dart process. ## 1. Define a task handler Create a task handler (StemApp will register it for you): ```dart file=/../packages/stem/example/docs_snippets/lib/first_steps.dart#first-steps-task ``` ## 2. Bootstrap the in-memory runtime Use `StemApp` to create the broker, backend, and worker in memory: ```dart file=/../packages/stem/example/docs_snippets/lib/first_steps.dart#first-steps-bootstrap ``` ## 3. Enqueue from a producer Enqueue a task from the same process: ```dart file=/../packages/stem/example/docs_snippets/lib/first_steps.dart#first-steps-enqueue ``` ## 4. Fetch task results Result backends store the task state and payload: ```dart file=/../packages/stem/example/docs_snippets/lib/first_steps.dart#first-steps-results ``` ## Choose a broker and backend Stem lets you mix brokers and backends. Use this quick guide when selecting your first deployment: | Need | Broker | Backend | | ---- | ------ | ------- | | Lowest latency, simplest ops | Redis Streams | Redis or Postgres | | SQL visibility & durability | Postgres broker | Postgres | | Local dev & tests | In-memory | In-memory | Decision shortcuts: - Start with **Redis** unless your org mandates another transport. - Use **Postgres** when you want a single durable store or SQL-level visibility. - Use **in-memory** only for local tests/demos. For more detail, see [Broker Overview](../brokers/overview.md) and [Persistence](../core-concepts/persistence.md). ## Install - Install Stem and the CLI as shown in [Quick Start](./quick-start.md). - Ensure `stem --version` runs in your shell. ## App setup - Register tasks and options via `StemApp` or a shared registry (see [Tasks & Retries](../core-concepts/tasks.md)). - Wire producers with the same task list/registry (see [Producer API](../core-concepts/producer.md)). ## Run a worker - Start a worker against your broker and queues (see [Connect to Infrastructure](./developer-environment.md)). - Use [Worker Control CLI](../workers/worker-control.md) to confirm it is responding. ## Call a task - Enqueue from your app or the CLI (see [Producer API](../core-concepts/producer.md)). ## Keeping results - Configure a result backend for stored task results and groups (see [Persistence](../core-concepts/persistence.md)). ## Configuration - Use `STEM_*` environment variables for brokers, routing, scheduling, and signing (see [CLI & Control](../core-concepts/cli-control.md)). - Define routing rules in `STEM_ROUTING_CONFIG` for multi-queue setups (see [Routing](../core-concepts/routing.md)). ## Troubleshooting - Diagnose common errors in [Troubleshooting](./troubleshooting.md). ## Next steps - Move to [Connect to Infrastructure](./developer-environment.md) to wire routing, autoscaling, and multi-queue workers. - Review [Broker Overview](../brokers/overview.md) for transport tradeoffs. - Explore [Persistence](../core-concepts/persistence.md) to store results in Redis or Postgres. --- ## Getting Started This track is the recommended path for anyone new to Stem. Follow the pages in order—each builds on the previous one and links to deeper references when you want to explore further. - **[Introduction](./intro.md)** – Prerequisites, the feature tour, and how the onboarding journey is structured. - **[Quick Start](./quick-start.md)** – Create your first Stem tasks, enqueue with delays/priorities, and inspect results in memory. - **[First Steps](./first-steps.md)** – Run a worker against Redis, enqueue from a producer, and read results. - **[Connect to Infrastructure](./developer-environment.md)** – Run Redis/Postgres locally, configure brokers/backends, experiment with routing and canvas patterns. - **[Observe & Operate](./observability-and-ops.md)** – Enable OpenTelemetry export, inspect workers/queues/DLQ via CLI, and wire lifecycle signals. - **[Prepare for Production](./production-checklist.md)** – Apply signing/TLS, deploy with systemd or CLI multi-process tooling, and run quality gates before launch. - **[Troubleshooting](./troubleshooting.md)** – Common errors and quick fixes while onboarding. - **[Stem vs BullMQ](../comparisons/stem-vs-bullmq.md)** – Canonical feature mapping with `✓/~ /✗` parity semantics. Once you complete the journey, continue with the in-depth material under [Core Concepts](../core-concepts/index.md) and [Workers](../workers/index.md). ## Preview: a full Stem pipeline in one file Use this example as a mental model for how tasks, workers, and brokers fit together. ```dart title="stem_example.dart" file=/../packages/stem/example/stem_example.dart#getting-started-task-definition ``` ```dart title="stem_example.dart" file=/../packages/stem/example/stem_example.dart#getting-started-task-options ``` ```dart title="stem_example.dart" file=/../packages/stem/example/stem_example.dart#getting-started-runtime-setup ``` ```dart title="stem_example.dart" file=/../packages/stem/example/stem_example.dart#getting-started-enqueue ``` --- ## Welcome to Stem Stem is a Dart-native background work platform that gives you Celery‑level capabilities without leaving the Dart ecosystem. This onboarding path assumes you have never touched Stem before and walks you from “what is this?” to “I can ship a production deployment.” ## What is a task queue? A task queue lets you push work to background workers instead of blocking your web or API process. The core pipeline looks like: ``` Producer → Broker → Worker → Result Backend ``` - **Architecture at a glance** ![Task queue pipeline](/img/task-queue-pipeline.svg) - **Producer** enqueues a task (e.g. send email). - **Broker** stores and delivers tasks to workers. - **Worker** executes tasks and reports status. - **Result backend** keeps history and outputs. In Stem, you can mix and match brokers and backends (for example, Redis for fast delivery and Postgres for durable results). ## A minimal Stem pipeline The core objects are a task handler, a worker, and a producer. This example keeps everything in a single file so you can see the moving parts together. ```dart title="stem_example.dart" file=/../packages/stem/example/stem_example.dart#getting-started-task-definition ``` ```dart title="stem_example.dart" file=/../packages/stem/example/stem_example.dart#getting-started-runtime-setup ``` ```dart title="stem_example.dart" file=/../packages/stem/example/stem_example.dart#getting-started-enqueue ``` ## What You’ll Unlock - **Core pipeline** – Enqueue tasks with delays, priorities, retries, rate limits, and canvas compositions, backed by result stores. - **Workers & signals** – Operate isolate-based workers, autoscale them, and react to lifecycle signals. - **Observability & tooling** – Stream metrics, traces, heartbeats, and inspect queues, DLQs, and schedules from the CLI. - **Security & deployment** – Sign payloads, enable TLS, and run Stem via systemd/SysV or the multi-worker CLI wrapper. - **Enablement & quality** – Use runnable examples, runbooks, and automated quality gates to keep deployments healthy. ## Prerequisites - Dart **3.3+** installed (`dart --version`). - Access to the Dart pub cache (`dart pub ...`). - Optional but recommended: Docker Desktop or another container runtime for local Redis/Postgres instances. - Optional: Node.js 18+ if you plan to run the documentation site locally. - A text editor capable of running Dart tooling (VS Code, IntelliJ, Neovim). ## Onboarding Path 1. **[Quick Start](./quick-start.md)** – Build and run your first Stem worker entirely in memory while you learn the task pipeline primitives. 2. **[First Steps](./first-steps.md)** – Use Redis to run producers and workers in separate processes, then fetch results. 3. **[Connect to Infrastructure](./developer-environment.md)** – Point Stem at Redis/Postgres, run workers/Beat across processes, and try routing/canvas patterns. 4. **[Observe & Operate](./observability-and-ops.md)** – Enable telemetry, inspect heartbeats, replay DLQ entries, and wire control commands. 5. **[Prepare for Production](./production-checklist.md)** – Enable signing, TLS, daemonization, and automated quality gates before launch. Each step includes copy-pasteable code or CLI examples and ends with pointers to deeper reference material. > **Next:** Jump into the [Quick Start](./quick-start.md) to see Stem in action. --- ## Monitoring Guide This guide focuses on visibility: dashboards, metrics, and signals that help catch issues early. ## Dashboard deployment - **Local**: run the dashboard on a developer workstation for quick checks. - **Internal**: deploy behind an auth proxy (SSO, basic auth, or IP allowlist). - **Shared ops**: place behind a reverse proxy with TLS termination and strict access controls. Restrict control actions to operators and ensure TLS is terminated at the proxy or upstream load balancer. See the [Dashboard](../core-concepts/dashboard.md) guide for setup details. ## Key indicators - **Queue depth**: growing queues indicate stalled workers or upstream spikes. - **Queue latency**: long gaps between enqueue time and start time point to saturation or routing issues. - **Retry rate**: a rise in retries usually signals transient dependency issues. - **DLQ volume**: poison pills or schema mismatches show up here. - **Worker heartbeats**: missing heartbeats point to crashed or partitioned workers. - **Scheduler drift**: drift spikes indicate schedule store or broker delays. ## Signals & metrics - Use lifecycle signals to emit structured events early. - Export metrics to your standard stack (OTLP, Prometheus, Datadog, etc.). - Correlate task IDs with traces/logs. ```dart title="lib/observability.dart" file=/../packages/stem/example/docs_snippets/lib/observability.dart#observability-metrics ``` ```dart title="lib/observability.dart" file=/../packages/stem/example/docs_snippets/lib/observability.dart#observability-signals ``` ```dart title="lib/observability.dart" file=/../packages/stem/example/docs_snippets/lib/observability.dart#observability-queue-depth ``` ```bash export STEM_METRIC_EXPORTERS=otlp:http://localhost:4318/v1/metrics export STEM_OTLP_ENDPOINT=http://localhost:4318 ``` ## CLI probes ```bash stem observe queues stem observe workers stem observe schedules stem worker stats --json ``` Set `STEM_SCHEDULE_STORE_URL` before running `stem observe schedules`. ```dart title="lib/observability_ops.dart" file=/../packages/stem/example/docs_snippets/lib/observability_ops.dart#ops-heartbeats ``` ## Next steps - [Observability & Ops](./observability-and-ops.md) - [Dashboard](../core-concepts/dashboard.md) - [Worker Control CLI](../workers/worker-control.md) --- ## Next Steps Use this page as a jump table once you’ve finished the first walkthroughs. ## Calling tasks - [Producer API](../core-concepts/producer.md) - [Tasks & Retries](../core-concepts/tasks.md) - [Retry & Backoff](./retry-backoff.md) ```dart title="lib/producer.dart" file=/../packages/stem/example/docs_snippets/lib/producer.dart#producer-redis ``` ```dart title="lib/tasks.dart" file=/../packages/stem/example/docs_snippets/lib/tasks.dart#tasks-register-redis ``` ## Canvas/Workflows - [Canvas Patterns](../core-concepts/canvas.md) - [Workflows](../core-concepts/workflows.md) ```dart title="lib/canvas_chain.dart" file=/../packages/stem/example/docs_snippets/lib/canvas_chain.dart#canvas-chain ``` ## Routing - [Routing](../core-concepts/routing.md) ```dart title="lib/routing.dart" file=/../packages/stem/example/docs_snippets/lib/routing.dart#routing-inline ``` ## Remote control - [CLI & Control](../core-concepts/cli-control.md) - [Worker Control CLI](../workers/worker-control.md) ```dart title="lib/worker_control.dart" file=/../packages/stem/example/docs_snippets/lib/worker_control.dart#worker-control-autoscale ``` ## Observability & ops - [Observability](../core-concepts/observability.md) - [Dashboard](../core-concepts/dashboard.md) - [Production Checklist](./production-checklist.md) ```dart title="lib/observability.dart" file=/../packages/stem/example/docs_snippets/lib/observability.dart#observability-metrics ``` ## Timezone - [Scheduler](../scheduler/index.md) - [Beat Scheduler Guide](../scheduler/beat-guide.md) ```dart title="lib/scheduler.dart" file=/../packages/stem/example/docs_snippets/lib/scheduler.dart#beat-specs ``` ## Optimization - [Rate Limiting](../core-concepts/rate-limiting.md) - [Uniqueness](../core-concepts/uniqueness.md) - [Payload Signing](../core-concepts/signing.md) ```dart title="lib/rate_limiting.dart" file=/../packages/stem/example/docs_snippets/lib/rate_limiting.dart#rate-limit-task ``` --- ## Observe & Operate With Redis/Postgres in place, it’s time to watch the system run. This guide covers telemetry, worker heartbeats, DLQ tooling, and the remote-control channel—all the pieces you need to operate Stem confidently. ## 1. Enable OpenTelemetry Export Stem emits metrics, traces, and structured logs out of the box. Point it at an OTLP endpoint (the repo ships a ready-made stack under `examples/otel_metrics/`): ```bash # Start the example collector, Prometheus, and Grafana stack. docker compose -f examples/otel_metrics/docker-compose.yml up # Export OTLP details for producers and workers. export STEM_OTLP_ENDPOINT=http://localhost:4318 export STEM_OTLP_HEADERS="authorization=Basic c3RlbTpwYXNz" export STEM_OTLP_METRICS_INTERVAL=10s export STEM_LOG_FORMAT=json ``` In Dart, no extra code is required—the env vars activate exporters. Metrics include queue depth, retry counts, lease renewals, and worker concurrency; traces connect `Stem.enqueue` spans with worker execution spans so you can follow a task end-to-end. If you want an explicit in-process configuration, wire metrics and tracing directly: ```dart title="Configure metrics" file=/../packages/stem/example/docs_snippets/lib/observability.dart#observability-metrics ``` ```dart title="Tracing-enabled Stem" file=/../packages/stem/example/docs_snippets/lib/observability.dart#observability-tracing ``` ## 2. Inspect Worker Heartbeats & Status Workers publish detailed heartbeats (in-flight counts, leases, queues) to the result backend. Use the CLI to view them live: ```bash # Snapshot the latest heartbeat for every worker. stem worker status \ --backend "$STEM_RESULT_BACKEND_URL" # Stream live updates (press Ctrl+C to stop). stem worker status \ --broker "$STEM_BROKER_URL" \ --follow ``` From Dart you can pull the same data: ```dart file=/../packages/stem/example/docs_snippets/lib/observability_ops.dart#ops-heartbeats ``` ## 3. Operate Workers via the Control Channel Stem exposes a built-in control bus so you can interact with workers without SSH or custom wiring. ```bash # Discover workers and latency. stem worker ping # Collect stats (queues, concurrency, runtimes) as JSON. stem worker stats --json # Revoke a problematic task globally (optionally terminate in-flight). stem worker revoke \ --task 1f23c6a1-... \ --terminate \ # Issue a warm shutdown to drain work gracefully. stem worker shutdown \ --worker default@host-a ``` Need to manage multiple instances on one host? Ship the bundled daemonization templates or lean on the multi-wrapper: ```bash # Launch and supervise multiple workers with templated PID/log files. stem worker multi start alpha beta \ --pidfile /var/run/stem/%n.pid \ --logfile /var/log/stem/%n.log \ --command "/usr/bin/dart run bin/worker.dart" # Drain and stop the same fleet. stem worker multi stop alpha beta ``` ## 4. Manage Queues, Retries, and DLQ The CLI exposes queues, retries, and dead letters so operators can recover quickly. ```bash # Inspect queue depth and inflight counts. stem observe queues # Inspect worker snapshots from the result backend. stem observe workers # Inspect the dead-letter queue with pagination. stem dlq list --queue default --limit 20 # Replay failed tasks back onto their original queues. stem dlq replay --queue default --limit 10 --confirm ``` Behind the scenes the CLI talks to the same Redis data structures used by workers, so you see the exact state the runtime is using. ## 5. Alert on Scheduler Drift & Schedules Beat records run history, drift, and errors. Keep an eye on it with: ```bash stem observe schedules \ --file config/schedules.yaml stem schedule dry-run --spec "every:5m" --count 5 ``` These commands surface the same drift metrics your Grafana dashboards chart and help confirm schedule definitions before they go live. ## 6. React to Signals for Custom Integrations Signals fire for task, worker, and scheduler lifecycle events. Wire them into chat, incident tooling, or analytics: ```dart title="lib/analytics.dart" file=/../packages/stem/example/docs_snippets/lib/observability_ops.dart#ops-analytics ``` ```dart title="Signal handlers" file=/../packages/stem/example/docs_snippets/lib/observability.dart#observability-signals ``` ```dart title="Structured logging" file=/../packages/stem/example/docs_snippets/lib/observability.dart#observability-logging ``` Combine signal handlers with telemetry to build rich observability without scattering logic across the codebase. ## 7. Next Stop You now have dashboards, CLI tooling, and remote control over workers. Finish the onboarding journey by applying security hardening, TLS, and production checklists in [Prepare for Production](./production-checklist.md). If you want more hands-on drills: - Run `example/ops_health_suite` to practice `stem health` and `stem observe` flows. - Run `example/scheduler_observability` to watch drift metrics and schedule signals. --- ## Prepare for Production You have Stem running with observability and operations tooling. This final step hardens the deployment: signing, TLS, daemon supervision, and automated quality gates so every rollout is repeatable. ## 1. Sign Payloads and Rotate Keys Enable signing for producers and workers to detect tampering: ```bash export STEM_SIGNING_ALGORITHM=hmac-sha256 export STEM_SIGNING_KEYS="v1:$(openssl rand -base64 32)" export STEM_SIGNING_ACTIVE_KEY=v1 ``` In code, wire the signer into both producers and workers: ```dart title="lib/production_checklist.dart" file=/../packages/stem/example/docs_snippets/lib/production_checklist.dart#production-signing-config ``` ```dart title="lib/production_checklist.dart" file=/../packages/stem/example/docs_snippets/lib/production_checklist.dart#production-signing-signer ``` ```dart title="lib/production_checklist.dart" file=/../packages/stem/example/docs_snippets/lib/production_checklist.dart#production-signing-registry ``` ```dart title="lib/production_checklist.dart" file=/../packages/stem/example/docs_snippets/lib/production_checklist.dart#production-signing-stem ``` ```dart title="lib/production_checklist.dart" file=/../packages/stem/example/docs_snippets/lib/production_checklist.dart#production-signing-worker ``` ```dart title="lib/production_checklist.dart" file=/../packages/stem/example/docs_snippets/lib/production_checklist.dart#production-signing-enqueue ``` ```dart title="lib/production_checklist.dart" file=/../packages/stem/example/docs_snippets/lib/production_checklist.dart#production-signing-shutdown ``` When you rotate keys, set `STEM_SIGNING_KEYS` to include both the old and new entries, update `STEM_SIGNING_ACTIVE_KEY` with the new identifier, then deploy workers. Stem will accept signatures from any configured key until you remove retired entries. See [Payload Signing](../core-concepts/signing.md) for a full reference and Ed25519 guidance. ## 2. Secure Connections with TLS Use the repo’s helper script to generate local certificates or plug in the ones issued by your platform: ```bash scripts/security/generate_tls_assets.sh --out tmp/tls export STEM_TLS_CA_CERT=$PWD/tmp/tls/ca.pem export STEM_TLS_CLIENT_CERT=$PWD/tmp/tls/client.pem export STEM_TLS_CLIENT_KEY=$PWD/tmp/tls/client-key.pem ``` Any TLS handshake issues surface actionable logs; temporarily set `STEM_TLS_ALLOW_INSECURE=true` only while debugging. Update Redis/Postgres URLs to include TLS if required (for example, `rediss://host:port`). ## 3. Supervise Processes with Managed Services Stem ships ready-to-use templates under `templates/systemd/` and `templates/sysv/`. Drop in environment files with your Stem variables and enable the services: ```bash sudo cp templates/systemd/stem-worker@.service /etc/systemd/system/ sudo systemctl enable stem-worker@default.service sudo systemctl start stem-worker@default.service sudo systemctl enable stem-scheduler.service sudo systemctl start stem-scheduler.service ``` For bare-metal or container images, the CLI can manage multiple instances with templated PID/log locations: ```bash stem worker multi start web-1 web-2 \ --command "/usr/bin/dart run bin/worker.dart" \ --pidfile /var/run/stem/%n.pid \ --logfile /var/log/stem/%n.log \ --env-file /etc/stem/worker.env ``` Verify health from your orchestration probes: ```bash stem worker healthcheck --node web-1 --json stem worker diagnose --node web-1 \ --pidfile /var/run/stem/web-1.pid \ --logfile /var/log/stem/web-1.log ``` ## 4. Final Pre-Flight Checklist Before every deployment run through these guardrails: - **Quality gates** – run `example/quality_gates` (`just quality`) to execute format, analyze, unit/chaos/perf tests, and coverage targets. - **Observability** – confirm Grafana dashboards (task success rate, latency p95, queue depth) and OpenTelemetry exporters are healthy. - **Routing & schedules** – `stem routing dump --json` to confirm the active configuration and `stem schedule dry-run` for all modified entries. - **DLQ hygiene** – ensure dead letters are empty or triaged with `stem dlq list`. - **Control plane** – dry-run worker commands (`stem worker ping`, `stem worker stats`) against staging to verify access. Document the results in your team’s runbook (see `docs/process/observability-runbook.md` and `docs/process/scheduler-parity.md`) so the production checklist stays auditable. ## 5. Where to Go Next - Deep-dive into the [Core Concepts](../core-concepts/index.md) section for everything you saw at a higher level. - Explore the [Workers](../workers/index.md) and [Scheduler](../scheduler/index.md) docs for advanced tuning. - If you’re planning larger architecture changes, follow the OpenSpec workflow documented in `openspec/AGENTS.md`. --- ## Quick Start Spin up Stem in minutes with nothing but Dart installed. This walkthrough stays fully in-memory so you can focus on the core pipeline: enqueueing, retries, delays, priorities, and chaining work together. ## 1. Create a Demo Project ```bash dart create stem_quickstart cd stem_quickstart # Add Stem as a dependency and activate the CLI. dart pub add stem dart pub global activate stem ``` Add the Dart pub cache to your `PATH` so the `stem` CLI is reachable: ```bash export PATH="$HOME/.pub-cache/bin:$PATH" stem --version ``` ## 2. Register Tasks with Options Replace the generated `bin/stem_quickstart.dart` with the script built from the snippets below. The full, runnable version lives at `packages/stem/example/docs_snippets/lib/quick_start.dart` in the repository. ### Define task handlers Each task declares its name and retry/timeout options. ```dart file=/../packages/stem/example/docs_snippets/lib/quick_start.dart#quickstart-task-resize ``` ```dart file=/../packages/stem/example/docs_snippets/lib/quick_start.dart#quickstart-task-email ``` ### Bootstrap worker + Stem Use `StemApp` to wire tasks, the in-memory broker/backend, and the worker: ```dart file=/../packages/stem/example/docs_snippets/lib/quick_start.dart#quickstart-bootstrap ``` ### Enqueue tasks Publish an immediate task plus a delayed task with custom metadata: ```dart file=/../packages/stem/example/docs_snippets/lib/quick_start.dart#quickstart-enqueue ``` Run the script: ```bash dart run bin/stem_quickstart.dart ``` Stem handles retries, time limits, rate limiting, and priority ordering even with the in-memory adapters—great for tests and local demos. ## 3. Compose Work with Canvas Stem’s canvas API lets you chain, group, or create chords of tasks. Add this helper to the bottom of the file above to try a chain: ```dart file=/../packages/stem/example/docs_snippets/lib/quick_start.dart#quickstart-canvas-example ``` Then call it from `main` once the worker has started: ```dart file=/../packages/stem/example/docs_snippets/lib/quick_start.dart#quickstart-canvas-call ``` Finally, inspect the result state before shutting down: ```dart file=/../packages/stem/example/docs_snippets/lib/quick_start.dart#quickstart-inspect ``` Each step records progress in the result backend, and failures trigger retries or DLQ placement according to `TaskOptions`. ## 4. Peek at Retries and DLQ Force a failure to see retry behaviour: ```dart file=/../packages/stem/example/docs_snippets/lib/quick_start_failure.dart#quickstart-email-failure ``` The retry pipeline and DLQ logic are built into the worker. When the task exceeds `maxRetries`, the envelope moves to the DLQ; you’ll learn how to inspect and replay those entries in the next guide. ## 5. Where to Next - Connect Stem to Redis/Postgres, try broadcast routing, and run Beat in [Connect to Infrastructure](./developer-environment.md). - Explore worker control commands, DLQ tooling, and OpenTelemetry export in [Observe & Operate](./observability-and-ops.md). - Keep the script—you’ll reuse the tasks and app bootstrap in later steps. --- ## Reliability Guide This guide summarizes reliability practices for task systems using Stem. ## Recovery workflow 1. Identify the failing task or queue. 2. Inspect recent errors and DLQ entries. 3. Fix the root cause before replaying. 4. Replay only the affected tasks. ## Broker fetch notes - **Redis Streams** uses consumer groups plus `XAUTOCLAIM` to reclaim idle deliveries; long-running tasks should emit heartbeats or extend leases. - **Postgres** uses polling with `locked_until` leases; tasks become visible again after the lease expires. ## Workflow lease notes - Workflow runs are lease-based. Workers must renew leases while executing, and other workers can take over after the lease expires. - Keep `runLeaseDuration` **>=** broker visibility timeout to prevent redelivered workflow tasks from being dropped before takeover is possible. - Keep `leaseExtension` renewals ahead of both the workflow lease expiry and the broker visibility timeout. ## Poison-pill handling - If a task fails repeatedly for the same reason, treat it as a poison pill. - Move it to the DLQ and add guardrails or validation to prevent repeats. - Record the failure pattern for future detection. ## Scheduler reliability - Run multiple Beat instances only when backed by a shared lock store. - Monitor schedule drift and failures to detect store latency. - Re-apply schedules after deploys to ensure definitions stay current. ## Retries and backoff - Use bounded retries with jittered backoff to avoid thundering herds. - Separate transient failures from permanent failures. - For permanent errors, fail fast and alert. ```dart title="retry_task/bin/worker.dart" file=/../packages/stem/example/retry_task/bin/worker.dart#reliability-retry-worker ``` ```dart title="retry_task/lib/shared.dart" file=/../packages/stem/example/retry_task/lib/shared.dart#reliability-retry-signals ``` ```dart title="retry_task/lib/shared.dart" file=/../packages/stem/example/retry_task/lib/shared.dart#reliability-retry-entrypoint ``` ## Heartbeats and progress Use heartbeats and progress updates to prevent long-running tasks from being reclaimed prematurely. ```dart title="progress_heartbeat/bin/worker.dart" file=/../packages/stem/example/progress_heartbeat/bin/worker.dart#reliability-heartbeat-worker ``` ```dart title="progress_heartbeat/lib/shared.dart" file=/../packages/stem/example/progress_heartbeat/lib/shared.dart#reliability-progress-task ``` ```dart title="progress_heartbeat/lib/shared.dart" file=/../packages/stem/example/progress_heartbeat/lib/shared.dart#reliability-worker-event-logging ``` ## Observability signals - Track retry rates and DLQ volume as reliability signals. - Monitor queue backlog and worker heartbeats to detect stalls. - Tie task IDs to business logs for fast root-cause analysis. - Use `StemSignals.taskRetry` / `taskFailed` to drive notifications when error rates spike. ## Operational checks ```bash stem health --broker "$STEM_BROKER_URL" --backend "$STEM_RESULT_BACKEND_URL" stem observe queues stem observe workers stem dlq list --queue ``` ## Next steps - [Troubleshooting](./troubleshooting.md) - [Observability & Ops](./observability-and-ops.md) - [Worker Control CLI](../workers/worker-control.md) --- ## Retry & Backoff Retries keep transient failures from becoming outages. Use backoff to prevent retry storms. ## Principles - Treat transient errors as retryable. - Fail fast on permanent errors (bad input, missing resources). - Add jitter to spread retries over time. ## When to retry - Network timeouts - Rate-limited downstream APIs - Temporary resource exhaustion Avoid retrying when: - Validation fails - Authorization fails - The task is not idempotent ## Backoff strategy - Start with a short delay, then increase gradually. - Cap the maximum delay. - Add random jitter to avoid spikes. ## Stem defaults Workers use `ExponentialJitterRetryStrategy` by default: - `base`: 2 seconds - `max`: 5 minutes Retries are scheduled by publishing a new envelope with `notBefore` set to the next retry time. Each retry increments the attempt counter until `TaskOptions.maxRetries` is exhausted. ## Task options Use `maxRetries` on task handlers to cap retries: ```dart title="lib/retry_backoff.dart" file=/../packages/stem/example/docs_snippets/lib/retry_backoff.dart#retry-backoff-task-options ``` ## Custom strategies Provide a custom `RetryStrategy` to the worker when you need fixed delays, linear backoff, or bespoke logic: ```dart title="lib/retry_backoff.dart" file=/../packages/stem/example/docs_snippets/lib/retry_backoff.dart#retry-backoff-strategy ``` ```dart title="lib/retry_backoff.dart" file=/../packages/stem/example/docs_snippets/lib/retry_backoff.dart#retry-backoff-worker ``` ```dart title="lib/retry_backoff.dart" file=/../packages/stem/example/docs_snippets/lib/retry_backoff.dart#retry-backoff-custom-strategy ``` ```dart title="lib/retry_backoff.dart" file=/../packages/stem/example/docs_snippets/lib/retry_backoff.dart#retry-backoff-custom-worker ``` You can also implement your own strategy by conforming to the `RetryStrategy` interface and returning the desired delay for each attempt. ## Observability cues Watch these signals and metrics to verify retry behavior: - `StemSignals.taskRetry` includes the next retry timestamp. - `stem.tasks.retried` and `stem.tasks.failed` counters highlight spikes. - DLQ volume indicates retries are exhausting or errors are permanent. ```dart title="lib/retry_backoff.dart" file=/../packages/stem/example/docs_snippets/lib/retry_backoff.dart#retry-backoff-signals ``` ## Operational checklist - Monitor retry rates and DLQ volume. - Alert on sustained retry spikes. - Requeue only after the root cause is fixed. ## Next steps - [Tasks & Retries](../core-concepts/tasks.md) - [Reliability Guide](./reliability.md) - [Troubleshooting](./troubleshooting.md) --- ## Troubleshooting Common issues when getting started with Stem and how to resolve them. ## Worker starts but no tasks are processed Checklist: - Make sure the producer and worker share the same broker URL. - Confirm the worker is subscribed to the queue you are enqueueing into. - If routing is enabled, verify the routing file and default queue. ```dart title="lib/troubleshooting.dart" file=/../packages/stem/example/docs_snippets/lib/troubleshooting.dart#troubleshooting-task ``` ```dart title="lib/troubleshooting.dart" file=/../packages/stem/example/docs_snippets/lib/troubleshooting.dart#troubleshooting-bootstrap ``` ```dart title="lib/troubleshooting.dart" file=/../packages/stem/example/docs_snippets/lib/troubleshooting.dart#troubleshooting-enqueue ``` ```dart title="lib/troubleshooting.dart" file=/../packages/stem/example/docs_snippets/lib/troubleshooting.dart#troubleshooting-results ``` Helpful commands: ```bash stem worker stats --json stem worker inspect stem observe queues ``` ## Routing file fails to parse Checklist: - Validate the routing file path and format (YAML/JSON). - Confirm `STEM_ROUTING_CONFIG` points at the file you expect. - Confirm the registry matches the task names referenced in the file. - If you use queue priorities, ensure the broker supports them. ```dart title="lib/routing.dart" file=/../packages/stem/example/docs_snippets/lib/routing.dart#routing-load ``` ```dart title="lib/routing.dart" file=/../packages/stem/example/docs_snippets/lib/routing.dart#routing-inline ``` Helpful commands: ```bash stem routing dump stem routing dump --json stem routing dump --sample ``` ## Missing or misconfigured result backend Symptoms: `stem observe` fails or task results never appear. Checklist: - Set `STEM_RESULT_BACKEND_URL` for any workflow that needs stored results. - Ensure the backend URL uses the correct scheme (`redis://`, `postgres://`). - Confirm the worker is configured with the same result backend. ```dart title="lib/persistence.dart" file=/../packages/stem/example/docs_snippets/lib/persistence.dart#persistence-backend-redis ``` ```dart title="lib/persistence.dart" file=/../packages/stem/example/docs_snippets/lib/persistence.dart#persistence-backend-postgres ``` Helpful commands: ```bash stem health --backend "$STEM_RESULT_BACKEND_URL" stem observe workers stem observe queues ``` ## TLS or signing failures Symptoms: health checks fail or tasks land in the DLQ with signature errors. Checklist: - Verify `STEM_TLS_*` variables are set on every component that connects. - Confirm `STEM_SIGNING_KEYS`/`STEM_SIGNING_PUBLIC_KEYS` match across producers and workers. - Ensure `STEM_SIGNING_ACTIVE_KEY` is set and present in the key list. - Check DLQ entries for `signature-invalid` reasons. ```dart title="lib/producer.dart" file=/../packages/stem/example/docs_snippets/lib/producer.dart#producer-signed ``` ```dart title="lib/production_checklist.dart" file=/../packages/stem/example/docs_snippets/lib/production_checklist.dart#production-signing-config ``` ```dart title="lib/production_checklist.dart" file=/../packages/stem/example/docs_snippets/lib/production_checklist.dart#production-signing-runtime ``` Helpful commands: ```bash stem health \ --broker "$STEM_BROKER_URL" \ --backend "$STEM_RESULT_BACKEND_URL" stem dlq list --queue stem dlq show --queue --id ``` ## Namespace mismatch Symptoms: CLI sees no data or control commands return empty responses. Checklist: - Ensure all processes (producer, worker, CLI) use the same namespace string. - For workers, confirm `STEM_WORKER_NAMESPACE` matches your CLI `--namespace`. ```dart title="lib/namespaces.dart" file=/../packages/stem/example/docs_snippets/lib/namespaces.dart#namespaces-broker-backend ``` ```dart title="lib/namespaces.dart" file=/../packages/stem/example/docs_snippets/lib/namespaces.dart#namespaces-worker ``` Helpful commands: ```bash stem worker stats --namespace "stem" stem worker ping --namespace "stem" ``` ## Migrations or schema errors Checklist: - Run the migration commands shipped with the adapter (Redis/Postgres). - Ensure your store URLs point to the migrated database/schema. - Set `STEM_SCHEDULE_STORE_URL` before running schedule commands. Helpful commands: ```bash stem schedule list ``` ## DLQ stalls or poison-pill tasks Checklist: - Inspect DLQ entries and replay only after fixing the root cause. - For repeat failures, consider lowering retries or adding task-level guards. Helpful commands: ```bash stem dlq list --queue stem dlq show --queue --id stem dlq replay --queue --id ``` Example enqueue that will land in the DLQ: ```dart title="bin/producer.dart" file=/../packages/stem/example/dlq_sandbox/bin/producer.dart#dlq-producer-enqueue ``` ## Control commands return no replies This usually means the control broadcast channel is not being consumed. Checklist: - Ensure the worker is running and connected to the same broker. - If you use a custom namespace, pass `--namespace` to CLI commands. - Verify that the broker supports broadcast/control channels. Helpful commands: ```bash stem worker stats --json stem observe workers ``` ## Task retries instantly or too quickly Checklist: - Confirm your task’s `TaskOptions` (`maxRetries`, `visibilityTimeout`). - Ensure the broker supports delayed deliveries (`notBefore`). - Check broker clock drift if delays feel inconsistent. ```dart title="lib/retry_backoff.dart" file=/../packages/stem/example/docs_snippets/lib/retry_backoff.dart#retry-backoff-task-options ``` ```dart title="lib/retry_backoff.dart" file=/../packages/stem/example/docs_snippets/lib/retry_backoff.dart#retry-backoff-strategy ``` ## Connection refused Checklist: - Verify Redis/Postgres is running and reachable. - Confirm the URL scheme (`redis://`, `postgres://`). - Ensure Docker ports are mapped (`-p 6379:6379`, `-p 5432:5432`). Helpful commands: ```bash stem health \ --broker "$STEM_BROKER_URL" \ --backend "$STEM_RESULT_BACKEND_URL" \ ``` ## Still stuck? - Review the [Observability & Ops](./observability-and-ops.md) guide for heartbeats, DLQ inspection, and control commands. - Check the runnable examples under `packages/stem/example/`. --- ## Beat Scheduler Guide Stem Beat enqueues periodic tasks so you can keep background jobs on schedule. This guide shows how to define schedules, load them, run Beat alongside your workers, and monitor results. ## Define schedules Beat accepts YAML, JSON, or programmatic entries. YAML keeps schedules under version control and mirrors the CLI format: ```yaml title="config/schedules.yaml" cleanup-temp-files: task: maintenance.cleanup spec: every:5m queue: maintenance args: path: /tmp options: maxRetries: 2 midnight-report: task: reports.generate spec: cron: '0 0 * * *' timezone: America/New_York queue: reports kwargs: report: daily-summary solar-check: task: solar.notify spec: solar: sunset latitude: 40.7128 longitude: -74.0060 offset: -15m one-off-reconcile: task: billing.reconcile spec: clocked: 2025-02-01T00:00:00Z queue: finance options: runOnce: true ``` ## Load schedules Apply schedule files to the schedule store before calling `beat.start()`: ```dart file=/../packages/stem/example/docs_snippets/lib/scheduler.dart#beat-load ``` To build schedules imperatively, call `store.upsert` with the spec classes (`IntervalScheduleSpec`, `CronScheduleSpec`, `SolarScheduleSpec`, `ClockedScheduleSpec`). ## Start Beat ```dart title="bin/beat_dev.dart" file=/../packages/stem/example/docs_snippets/lib/scheduler.dart#beat-dev ``` ```dart title="bin/beat_redis.dart" file=/../packages/stem/example/docs_snippets/lib/scheduler.dart#beat-redis ``` ```dart title="bin/beat_postgres.dart" file=/../packages/stem/example/docs_snippets/lib/scheduler.dart#beat-postgres ``` ### CLI alternative Prefer configuration over code? Use the CLI with the same schedule file: ```bash stem schedule apply \ --file config/schedules.yaml \ --yes stem schedule list stem schedule dry-run --spec "every:5m" ``` ## Programmatic spec helpers ```dart file=/../packages/stem/example/docs_snippets/lib/scheduler.dart#beat-specs ``` | Spec | Description | | ------------- | ----------- | | `Interval` | Millisecond-resolution interval with optional jitter, `startAt`, `endAt`. | | `Cron` | Classic 5/6-field cron with optional timezone per entry. | | `Solar` | Sunrise, sunset, or solar noon with lat/long and optional offsets. | | `Clocked` | One-shot timestamp; set `runOnce` to prevent rescheduling. | ## Timezone handling - Schedule entries accept an optional IANA timezone identifier. - If your schedule store uses the default calculator, schedules evaluate in UTC. - To honor per-entry timezones, construct the schedule store with a `ScheduleCalculator` configured with a timezone data provider. - You must load timezone data in your process (for example, `timezone/data/latest.dart`) before using a timezone-aware calculator. ## Observe Beat activity Stem emits scheduler signals that mirror Celery Beat hooks: ```dart file=/../packages/stem/example/docs_snippets/lib/scheduler.dart#beat-signals ``` You can also query the schedule store directly: ```dart file=/../packages/stem/example/docs_snippets/lib/scheduler.dart#beat-due ``` ## Tips & tricks - Use `lockStore` (Redis or Postgres) when running Beat in HA mode so only one instance triggers jobs at a time. - Call `Beat.stop()` on shutdown to flush outstanding timers and release locks. - Run Beat from a Dart entrypoint wired to your schedule store (see the Redis example below): ```dart title="lib/scheduler.dart" file=/../packages/stem/example/docs_snippets/lib/scheduler.dart#beat-redis ``` - Store schedules in source control and re-apply them with `stem schedule apply --yes` after deployments. Next, hook Beat into your deployment automation and monitor the scheduler signals alongside worker metrics. --- ## Scheduler Stem Beat coordinates periodic work across your cluster. Explore the scheduler capabilities, storage backends, and operational tooling. ## What Beat does Beat reads schedule definitions from a schedule store, evaluates when each entry is due, and enqueues the corresponding task. Use it for cron-style jobs, interval tasks, solar events, or one-off clocked runs. ## Schedule spec types Stem ships concrete spec types you can store or generate: - **Interval** (`IntervalScheduleSpec`) — run every N seconds/minutes/hours. - **Cron** (`CronScheduleSpec`) — standard cron expressions. - **Solar** (`SolarScheduleSpec`) — sunrise/sunset-based schedules. - **Clocked** (`ClockedScheduleSpec`) — single run at a specific time. ## Beat in production Beat is a separate process from workers. It only enqueues tasks; workers still execute them. That separation means you can scale Beat (and its schedule store) independently from worker fleets. ## HA and lock stores To run Beat in high availability mode, multiple Beat instances can share the same schedule store and a lock store. The lock store ensures only one scheduler emits a given schedule entry at a time. Redis and Postgres stores support this pattern out of the box. ## Schedule stores Beat persists schedule entries so restarts do not lose state. For production, use a shared schedule store (Redis/Postgres) and a lock store to coordinate HA instances. The CLI schedule commands use `STEM_SCHEDULE_STORE_URL` when set; otherwise they operate on local schedule files. ## CLI entrypoints Common scheduler CLI commands: - `stem schedule apply` — load schedule entries from JSON/YAML into the store. - `stem schedule list` — inspect entries in the store. - `stem schedule dry-run` — preview due times before rollout. - `stem observe schedules` — inspect schedule drift and dispatch status. ## Scheduler snippets ```dart title="Interval schedule" file=/../packages/stem/example/docs_snippets/lib/scheduler.dart#beat-interval-spec ``` ```dart title="Cron schedule" file=/../packages/stem/example/docs_snippets/lib/scheduler.dart#beat-cron-spec ``` ```dart title="Solar schedule" file=/../packages/stem/example/docs_snippets/lib/scheduler.dart#beat-solar-spec ``` ```dart title="Clocked schedule" file=/../packages/stem/example/docs_snippets/lib/scheduler.dart#beat-clocked-spec ``` ```dart title="Scheduler signals" file=/../packages/stem/example/docs_snippets/lib/scheduler.dart#beat-signals ``` Beat itself runs as a Dart process; see the Beat guide for entrypoints. - **[Beat Scheduler Guide](./beat-guide.md)** – Configure Beat, load schedules, and run it with in-memory, Redis, or Postgres stores. - **Example:** `example/scheduler_observability` shows drift metrics, schedule signals, and CLI inspection. Looking for locking and storage details? See the Postgres and Redis sections in [Broker Overview](../brokers/overview.md). --- ## Daemonization Guide Stem now ships opinionated service templates and CLI helpers so you can manage workers like you would with Celery’s `celery multi`. This guide mirrors `docs/process/daemonization.md` and walks through real examples. ## Prerequisites - Create an unprivileged `stem` user/group. - Install the Stem CLI and your worker launcher binary/script (for example, `/usr/local/bin/stem-worker`). - Copy templates from the repository (`templates/`) into your packaging step: systemd units, SysV scripts, and `/etc/default/stem`. ## Worker entrypoint The daemonization templates expect a worker launcher that runs until signaled. This stub worker lives in `examples/daemonized_worker/bin/worker.dart`: ```dart title="examples/daemonized_worker/bin/worker.dart" file=/../packages/stem/example/daemonized_worker/bin/worker.dart#daemonized-worker-entrypoint ``` ```dart title="examples/daemonized_worker/bin/worker.dart" file=/../packages/stem/example/daemonized_worker/bin/worker.dart#daemonized-worker-signal-handlers ``` ```dart title="examples/daemonized_worker/bin/worker.dart" file=/../packages/stem/example/daemonized_worker/bin/worker.dart#daemonized-worker-loop ``` ## Systemd Example ```bash sudo install -D templates/systemd/stem-worker@.service \ /etc/systemd/system/stem-worker@.service sudo install -D templates/etc/default/stem /etc/stem/stem.env sudo install -d -o stem -g stem /var/lib/stem /var/log/stem /var/run/stem ``` Set the worker command and queues inside `/etc/stem/stem.env`: ```bash STEM_WORKER_COMMAND="/usr/local/bin/stem-worker --queues=default,critical" ``` Enable an instance named `alpha`: ```bash sudo systemctl daemon-reload sudo systemctl enable --now stem-worker@alpha.service sudo journalctl -u stem-worker@alpha.service ``` The unit expands PID/log templates, reloads via `stem worker multi restart`, and applies the hardening flags (`NoNewPrivileges`, `ProtectSystem`, etc.). Install a matching logrotate snippet (for example, `/etc/logrotate.d/stem`) when journald is not used so `/var/log/stem/*.log` is rotated regularly. ## SysV Example ```bash sudo install -D templates/sysv/init.d/stem-worker /etc/init.d/stem-worker sudo install -D templates/etc/default/stem /etc/default/stem sudo chmod 755 /etc/init.d/stem-worker sudo update-rc.d stem-worker defaults ``` `/etc/default/stem` controls the nodes and command: ```bash STEMD_NODES="alpha beta" STEMD_COMMAND="/usr/local/bin/stem-worker --queues=background" ``` Run it like any other service: ```bash sudo service stem-worker start sudo service stem-worker status sudo service stem-worker stop ``` ## Scheduler Set `STEM_SCHEDULER_COMMAND` in the environment file and enable `stem-scheduler.service` (systemd) or `/etc/init.d/stem-scheduler` (SysV). ## Docker Example `examples/daemonized_worker/` contains a Dockerfile and entrypoint that run `stem worker multi` directly. Build and run from the repo root: ``` docker build -f examples/daemonized_worker/Dockerfile -t stem-multi . docker run --rm -e STEM_WORKER_COMMAND="dart run examples/daemonized_worker/bin/worker.dart" stem-multi ``` Override `STEM_WORKER_*` environment variables to control nodes, PID/log templates, and the worker command. ## Troubleshooting - Missing directories → ensure `/var/log/stem` and `/var/run/stem` exist with `stem:stem` ownership. - Stale PID files → `stem worker multi status` cleans them up. - Custom systemd options → use `systemctl edit stem-worker@.service` to create a drop-in override. - Health probes → `stem worker healthcheck --pidfile=/var/run/stem/.pid` returns exit code `0` when the process is alive. Run `stem worker diagnose` with the PID/log paths to identify missing directories or stale PID files. --- ## Workers Workers pull tasks, manage concurrency, and publish lifecycle signals. Use these guides to embed workers programmatically and operate them in production. ## Minimal entrypoints ```dart title="workers_programmatic.dart" file=/../packages/stem/example/docs_snippets/lib/workers_programmatic.dart#workers-worker-minimal ``` ```dart title="workers_programmatic.dart" file=/../packages/stem/example/docs_snippets/lib/workers_programmatic.dart#workers-producer-minimal ``` ## Redis-backed worker ```dart title="workers_programmatic.dart" file=/../packages/stem/example/docs_snippets/lib/workers_programmatic.dart#workers-worker-redis ``` ```dart title="workers_programmatic.dart" file=/../packages/stem/example/docs_snippets/lib/workers_programmatic.dart#workers-producer-redis ``` ## Lifecycle overview Workers connect to the broker, claim deliveries, execute task handlers, and emit lifecycle signals as they progress (`taskReceived`, `taskPrerun`, `taskPostrun`, `taskSucceeded`, `taskFailed`). Worker-level signals announce startup, readiness, heartbeat, and shutdown so dashboards and alerts can track capacity in near real time. ```dart title="signals.dart" file=/../packages/stem/example/docs_snippets/lib/signals.dart#signals-worker-listeners ``` Shutdowns are cooperative: warm stops fetching new work, soft requests termination checkpoints, and hard requeues active deliveries. The Worker Control CLI sends those commands through the same control queues the dashboard uses, so operational tooling stays consistent. ## Queue subscriptions Workers can subscribe to: - **A single queue** (default: `default`) for straightforward deployments. - **Multiple queues** by configuring a routing subscription (priority queues, fan-out, or dedicated lanes per workload). Queue subscriptions determine which stream shards the worker polls, so keep queue names stable and document them alongside task registries. ```dart title="routing.dart" file=/../packages/stem/example/docs_snippets/lib/routing.dart#routing-bootstrap ``` ## Concurrency & autoscaling Workers run multiple tasks in parallel using isolate pools. Configure base concurrency with `concurrency`, then enable autoscaling to expand/contract within a min/max range based on backlog and inflight counts. Prefetch controls how aggressively a worker claims work ahead of execution. Use smaller values for fairness and larger values for throughput. If you're using autoscaling, align the prefetch multiplier with your maximum concurrency so scaling does not starve queues. ```dart title="worker_control.dart" file=/../packages/stem/example/docs_snippets/lib/worker_control.dart#worker-control-autoscale ``` ## Key environment variables - `STEM_BROKER_URL` – broker connection string (Redis/Postgres/memory). - `STEM_RESULT_BACKEND_URL` – durable result backend (optional but recommended). - `STEM_DEFAULT_QUEUE` – fallback queue when routing is unset. - `STEM_PREFETCH_MULTIPLIER` – prefetch multiplier applied to concurrency. - `STEM_WORKER_QUEUES` – explicit queue subscriptions (comma separated). - `STEM_WORKER_BROADCASTS` – broadcast channel subscriptions (comma separated). - `STEM_WORKER_NAMESPACE` – worker heartbeat/control namespace (observability). - `STEM_ROUTING_CONFIG` – path to routing config (YAML/JSON). - `STEM_SIGNING_*` – enable payload signing for tamper detection. - `STEM_TLS_*` – TLS settings for broker/backends. - **[Programmatic Integration](./programmatic-integration.md)** – Wire producers and workers inside your Dart services (includes in-memory and Redis examples). - **[Worker Control CLI](./worker-control.md)** – Inspect, revoke, scale, and shut down workers remotely. - **[Daemonization Guide](./daemonization.md)** – Run workers under systemd, launchd, or custom supervisors. Looking for retry tuning or task registries? See the [Core Concepts](../core-concepts/index.md). --- ## Programmatic Workers & Enqueuers Use Stem's Dart APIs to embed task production and processing inside your application services. This guide focuses on the two core roles: **producer** (enqueuer) and **worker**. ## Producer (Enqueuer) ```dart title="lib/producer.dart" file=/../packages/stem/example/docs_snippets/lib/workers_programmatic.dart#workers-producer-minimal ``` ```dart title="lib/producer_redis.dart" file=/../packages/stem/example/docs_snippets/lib/workers_programmatic.dart#workers-producer-redis ``` ```dart title="lib/producer_signed.dart" file=/../packages/stem/example/docs_snippets/lib/workers_programmatic.dart#workers-producer-signed ``` ### Tips - Always reuse a `Stem` instance rather than creating one per request. - Use `TaskOptions` to set queue, retries, timeouts, and isolation. - Add custom metadata via the `meta` argument for observability or downstream processing. ## Worker ```dart title="bin/worker.dart" file=/../packages/stem/example/docs_snippets/lib/workers_programmatic.dart#workers-worker-minimal ``` ```dart title="bin/worker_redis.dart" file=/../packages/stem/example/docs_snippets/lib/workers_programmatic.dart#workers-worker-redis ``` ```dart title="bin/worker_retry.dart" file=/../packages/stem/example/docs_snippets/lib/workers_programmatic.dart#workers-worker-retry ``` ### Lifecycle Tips - Call `worker.shutdown()` on SIGINT/SIGTERM to drain in-flight tasks and emit `workerStopping`/`workerShutdown` signals. - Monitor heartbeats via `StemSignals.workerHeartbeat` or the heartbeat backend for liveness checks. - Use `WorkerLifecycleConfig` to install signal handlers, configure soft/hard shutdown timeouts, and recycle isolates after N tasks or memory thresholds. ## Putting It Together A lightweight service wires the producer and worker into your application startup: ```dart title="lib/bootstrap.dart" file=/../packages/stem/example/docs_snippets/lib/workers_programmatic.dart#workers-bootstrap ``` Swap the in-memory adapters for Redis/Postgres when you deploy, keeping the API surface the same. ## Checklist - Reuse producer and worker objects—avoid per-request construction. - Inject the `TaskRegistry` from a central module so producers and workers stay in sync. - Capture task IDs returned by `Stem.enqueue` when you need to poll results or correlate with your own auditing. - Emit lifecycle signals (`StemSignals`) and wire logs/metrics early so production instrumentation is already in place. - For HTTP/GraphQL handlers, wrap enqueues in try/catch to surface validation errors before tasks hit the queue. Next, continue with the [Worker Control CLI](./worker-control.md) or explore [Signals](../core-concepts/signals.md) for advanced instrumentation. --- ## Worker Control Stem exposes a broker-backed control plane so operators can inspect, revoke, and coordinate workers without restarts. This guide walks through the CLI surface, revocation durability, and termination semantics for inline vs isolate handlers. ## Remote control primer - **Inspect** commands read worker state (`stem worker stats`, `stem worker inspect`). - **Control** commands mutate state (`stem worker revoke`, `stem worker shutdown`). - Commands broadcast to all workers unless you pass `--worker` to target specific IDs. - Use `--namespace` to match the control namespace used by your workers. ## CLI Overview | Command | Purpose | | ------- | ------- | | `stem worker ping` | Broadcast a ping and aggregate worker responses. | | `stem worker inspect` | List in-flight tasks (and optional revoke cache) per worker. | | `stem worker stats` | Summarize inflight counts, queue depth, and metadata. | | `stem worker revoke` | Persist revocations and broadcast terminate/best-effort revokes. | | `stem worker shutdown` | Request warm/soft/hard shutdown via the control channel. | | `stem worker pause` | Pause one or more queues on target workers. | | `stem worker resume` | Resume paused queues on target workers. | | `stem worker status` | Stream heartbeats or snapshot the backend (existing command). | | `stem worker healthcheck` | Probe worker processes for readiness/liveness. | | `stem worker diagnose` | Run local diagnostics for pid/log/env configuration issues. | | `stem worker multi` | Manage multiple worker processes (start/stop/restart/status). | Use `--namespace` to target non-default control namespaces. Omitting `--worker` broadcasts to every worker. All commands honour the same environment variables as `stem health` (`STEM_BROKER_URL`, `STEM_RESULT_BACKEND_URL`, TLS/signing flags). ### Quick Examples ```bash # Ping a subset of workers by identifier stem worker ping --worker worker-a --worker worker-b # Inspect all workers (JSON output) stem worker inspect --json # Revoke a task and request termination stem worker revoke --task 1761057... --terminate # Pause default queue on one worker stem worker pause --worker worker-a --queue default # Resume that queue later stem worker resume --worker worker-a --queue default ``` For a runnable lab that exercises ping/stats/revoke/shutdown against real workers, see `example/worker_control_lab` in the repository. ## Autoscaling Concurrency Workers can autoscale their isolate pools between configured minimum and maximum bounds. Enable the evaluator by passing `WorkerAutoscaleConfig` to the worker constructor: ```dart file=/../packages/stem/example/docs_snippets/lib/worker_control.dart#worker-control-autoscale ``` The autoscaler samples broker queue depth alongside inflight counts to decide when to scale. Metrics expose the current setting via `stem.worker.concurrency`, and `stem worker stats --json` includes the live `activeConcurrency` value so dashboards can observe adjustments. See `example/autoscaling_demo` for a queue-backlog scenario that triggers scale-up and scale-down events. ## CLI Multi-Instance Management `stem worker multi` orchestrates OS processes for worker nodes. It honours the same placeholders as the service templates (`%n`, `%h`, `%I`, `%d`) when expanding PID/log/workdir templates and uses `STEM_WORKER_COMMAND` (or `--command`/`--command-line`) for the executable. ```bash export STEM_WORKER_COMMAND="/usr/bin/stem-worker" stem worker multi start alpha beta \ --pidfile=/var/run/stem/%n.pid \ --logfile=/var/log/stem/%n.log \ --env-file=/etc/stem/stem.env stem worker multi status alpha beta --pidfile=/var/run/stem/%n.pid stem worker multi stop alpha beta --pidfile=/var/run/stem/%n.pid ``` The CLI auto-creates directories and exposes `STEM_WORKER_NODE`, `STEM_WORKER_PIDFILE`, and `STEM_WORKER_LOGFILE` to the launched process so apps can discover their runtime context. ## Worker Healthcheck Use `stem worker healthcheck` inside systemd `ExecStartPost=`, Kubernetes probes, or shell scripts to determine whether a worker process is running: ``` stem worker healthcheck \ --node alpha \ --pidfile=/var/run/stem/alpha.pid \ --logfile=/var/log/stem/alpha.log \ --json ``` Exit code `0` indicates the PID file exists and the process is alive. The JSON payload includes the pid, timestamp captured from the PID file, and the uptime in seconds. ## Worker Diagnostics `stem worker diagnose` performs common checks (PID/log directories, stale PID files, environment file parsing) to help troubleshoot daemonization issues: ``` stem worker diagnose \ --pidfile=/var/run/stem/alpha.pid \ --logfile=/var/log/stem/alpha.log \ --env-file=/etc/stem/stem.env ``` Warnings and errors are printed for missing directories, unparseable PIDs, and other configuration gaps. Use `--json` when integrating with tooling. ## Persistent Revokes Revocations are durable so new workers or restarts continue honouring them. The CLI resolves the backing store in this order: 1. `STEM_REVOKE_STORE_URL` 2. `STEM_RESULT_BACKEND_URL` 3. `STEM_BROKER_URL` Supported schemes: Redis (`redis://`, `rediss://`), Postgres (`postgres://`, `postgresql://`), SQLite (`sqlite:///path/to/stem.sqlite`), a newline-delimited file (`file:///path/to/revokes.stem` or bare path), and in-memory (`memory://` – useful for tests). Workers hydrate the revocation cache at startup, prune expired records, and apply new control messages. The CLI writes through the store *before* broadcasting control messages to guarantee durability precedes visibility. ## Termination Semantics ### Inline vs isolate handlers Stem executes tasks either inline (worker main isolate) or in dedicated child isolates when a handler exposes an `isolateEntrypoint`. Inline handlers share the worker event loop and can be interrupted immediately at the next checkpoint. Isolate handlers communicate with the worker only when they emit control signals (heartbeat, lease extension, progress). That difference governs how quickly a `--terminate` revoke takes effect. If you are using `FunctionTaskHandler`, keep execution inline by passing `runInIsolate: false` or using the `FunctionTaskHandler.inline(...)` factory when the entrypoint captures state that cannot cross isolate boundaries. ### Inline handler example ```dart title="tasks/inline_report_task.dart" file=/../packages/stem/example/docs_snippets/lib/worker_control.dart#worker-control-inline ``` ### Isolate handler example ```dart title="tasks/image_render_task.dart" file=/../packages/stem/example/docs_snippets/lib/worker_control.dart#worker-control-isolate ``` `stem worker revoke --terminate` throws `TaskRevokedException` the next time an inline handler calls `TaskContext.heartbeat`, `extendLease`, or `progress`, allowing the worker to cancel and record the task as cancelled. Isolate handlers must emit cooperative checkpoints (heartbeat/lease/progress) to be interrupted; otherwise they finish naturally. ### Cooperative checkpoints for isolate handlers Make sure isolate entrypoints call one of the cooperative helpers inside any long-running loop. Each helper throws `TaskRevokedException` when a terminate revoke is pending, which lets the handler fail fast. ```dart title="tasks/crunch.dart" file=/../packages/stem/example/docs_snippets/lib/worker_control.dart#worker-control-crunch ``` For CPU-bound workloads, batch work or insert `await Future.delayed(...)` so the isolate yields periodically. Without checkpoints the worker cannot pre-empt the task until it returns on its own. Operators should pair `--terminate` with `stem worker inspect` to monitor inflight tasks that still need to quiesce. If a handler never emits heartbeats, add them or implement explicit cancellation logic. ## Shutdown Modes and Lifecycle Guards Use `stem worker shutdown --mode warm|soft|hard` to trigger runtime shutdowns: - **Warm** stops fetching new deliveries and drains current work. - **Soft** issues terminate revocations, then escalates to hard after the configured grace period if tasks continue running. - **Hard** immediately requeues active deliveries and terminates isolates. By default, workers install signal handlers that map `SIGTERM` to warm, `SIGINT` to soft, and `SIGQUIT` to hard. Disable them by constructing the worker with `WorkerLifecycleConfig(installSignalHandlers: false)` when embedding Stem inside a larger host that already owns signal routing. ## Queue Pause/Resume `stem worker pause` and `stem worker resume` target queue names (repeatable `--queue`) and optionally specific workers (`--worker`). Paused queues are requeued instead of executed until resumed. - Pause/resume state is persisted when a revoke store is configured. - Without a revoke store, pause/resume still works for active workers but does not survive worker restarts. Lifecycle guards can also recycle isolates automatically: ```dart file=/../packages/stem/example/docs_snippets/lib/worker_control.dart#worker-control-lifecycle ``` Recycling occurs after the active task finishes; the worker logs the recycle reason and spawns a fresh isolate before accepting new work. ## Configuration Summary | Variable | Purpose | | --- | --- | | `STEM_REVOKE_STORE_URL` | Override the revoke store target (defaults to backend or broker). | | `STEM_CONTROL_NAMESPACE` | Override the control namespace (defaults to heartbeat namespace). | | `STEM_WORKER_NAMESPACE` | Logical grouping for worker IDs/queues. | | `STEM_CONTROL_TIMEOUT` | Default control command timeout (e.g. `5s`). | Set `STEM_REVOKE_STORE_URL` to the datastore you want to back revocations. For example, to use Postgres alongside the result backend: ```bash export STEM_REVOKE_STORE_URL=postgres://stem:secret@db:5432/stem ``` For local single-node deployments, SQLite works as well: ```bash export STEM_REVOKE_STORE_URL=sqlite:///var/lib/stem/revokes.sqlite ``` ## Additional Resources - `stem worker --help` – built-in CLI usage for each subcommand. - The `examples/` directory in the Stem repository demonstrates control commands alongside worker lifecycle signals.