Skip to main content

Persistence & Stores

Use persistence when you need durable task state, workflow state, shared schedules, or revocation storage. Stem ships with Redis, Postgres, and SQLite adapters plus in-memory variants for local development.

For the normal path, prefer StemClient.inMemory(...), StemClient.fromUrl(...), or a reusable StemStack.fromUrl(...).createClient(...). Drop to StemClient.create(...) only when you really need custom broker or backend factories that the adapter stack cannot express.

Result backend

Future<void> connectInMemoryBackend() async {
final client = await StemClient.inMemory(tasks: demoTasks);
await demoTaskDefinition.enqueue(client);
await client.close();
}

Payload encoders

Result backends now respect pluggable TaskPayloadEncoders. Producers encode arguments before publishing, workers decode them once before invoking handlers, and handler return values are encoded before they hit the backend. Every stored status contains the encoder id (__stemResultEncoder), letting other processes decode payloads without guessing formats.

Configure defaults when bootstrapping Stem, StemApp, Canvas, or workflow apps:

lib/bootstrap_encoders.dart
class Base64PayloadEncoder extends TaskPayloadEncoder {
const Base64PayloadEncoder();

Object? encode(Object? value) =>
value is String ? base64Encode(utf8.encode(value)) : value;

Object? decode(Object? stored) =>
stored is String ? utf8.decode(base64Decode(stored)) : stored;
}

Future<void> configureEncoders() async {
final app = await StemApp.inMemory(
tasks: const [],
argsEncoder: const JsonTaskPayloadEncoder(),
resultEncoder: const Base64PayloadEncoder(),
additionalEncoders: const [GzipPayloadEncoder()],
);
await app.close();
}

Handlers needing bespoke treatment can override TaskMetadata.argsEncoder and TaskMetadata.resultEncoder; the worker ensures only that task uses the custom encoder while the rest fall back to the global defaults.

Workflow store

Workflow stores persist:

  • workflow runs and status
  • flow step results and script checkpoint results
  • suspension/watcher records
  • due-run scheduling metadata

That store is what allows workflow resumes, run inspection, and recovery across worker restarts. See the top-level Workflows section for the durable orchestration model and runtime behavior.

Schedule & lock stores

lib/beat_bootstrap.dart
Future<void> configureBeatStores() async {
final scheduleStore = await RedisScheduleStore.connect(
'redis://localhost:6379/2',
);
final lockStore = await RedisLockStore.connect('redis://localhost:6379/3');
final broker = await RedisStreamsBroker.connect('redis://localhost:6379');

final beat = Beat(
broker: broker,
store: scheduleStore,
lockStore: lockStore,
);

await beat.stop();
await broker.close();
await scheduleStore.close();
await lockStore.close();
}

Switch to Postgres with PostgresScheduleStore.connect / PostgresLockStore.connect.

Revoke store

Store revocations in Redis/Postgres/SQLite so workers can honour stem worker revoke:

export STEM_REVOKE_STORE_URL=postgres://postgres:postgres@localhost:5432/stem
Postgres revoke store
Future<void> configurePostgresRevokeStore() async {
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final revokeStore = await PostgresRevokeStore.connect(
'postgres://postgres:postgres@localhost:5432/stem',
);
final worker = Worker(
broker: broker,
backend: backend,
tasks: demoTasks,
revokeStore: revokeStore,
);

await worker.shutdown();
await revokeStore.close();
await backend.close();
await broker.close();
}
SQLite revoke store
Future<void> configureSqliteRevokeStore() async {
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final revokeStore = await SqliteRevokeStore.open(
File('stem_revoke.sqlite'),
namespace: 'stem',
);
final worker = Worker(
broker: broker,
backend: backend,
tasks: demoTasks,
revokeStore: revokeStore,
);

await worker.shutdown();
await revokeStore.close();
await backend.close();
await broker.close();
}

Tips

  • In-memory adapters are great for local tests; switch to Redis/Postgres when you need persistence or multi-process coordination.
  • SQLite is single-writer: keep only workers connected to the backend and use a separate SQLite file for the broker.
  • Postgres adapters automatically migrate required tables on first connect.
  • Configure TTLs on the result backend via backend.set to limit retained data.
  • For HA Beat deployments, use the same lock store across instances.