Persistence & Stores
Use persistence when you need durable task state, workflow state, shared schedules, or revocation storage. Stem ships with Redis, Postgres, and SQLite adapters plus in-memory variants for local development.
For the normal path, prefer StemClient.inMemory(...),
StemClient.fromUrl(...), or a reusable StemStack.fromUrl(...).createClient(...).
Drop to StemClient.create(...) only when you really need custom broker or
backend factories that the adapter stack cannot express.
Result backend
- In-memory (lib/bootstrap.dart)
- SQLite (stem_sqlite)
- Redis (lib/bootstrap.dart)
- Postgres (lib/bootstrap.dart)
Future<void> connectInMemoryBackend() async {
final client = await StemClient.inMemory(tasks: demoTasks);
await demoTaskDefinition.enqueue(client);
await client.close();
}
Future<void> connectSqliteBackend() async {
final client = await StemClient.fromUrl(
'sqlite:///${File('stem_broker.sqlite').absolute.path}',
adapters: const [StemSqliteAdapter()],
overrides: StemStoreOverrides(
backend: 'sqlite:///${File('stem_backend.sqlite').absolute.path}',
),
tasks: demoTasks,
);
await demoTaskDefinition.enqueue(client);
await client.close();
}
Future<void> connectRedisBackend() async {
final client = await StemClient.fromUrl(
'redis://localhost:6379',
adapters: const [StemRedisAdapter()],
overrides: const StemStoreOverrides(
backend: 'redis://localhost:6379/1',
),
tasks: demoTasks,
);
await demoTaskDefinition.enqueue(client);
await client.close();
}
Future<void> connectPostgresBackend() async {
final client = await StemClient.fromUrl(
'redis://localhost:6379',
adapters: const [StemRedisAdapter(), StemPostgresAdapter()],
overrides: const StemStoreOverrides(
backend: 'postgres://postgres:postgres@localhost:5432/stem',
),
tasks: demoTasks,
);
await demoTaskDefinition.enqueue(client);
await client.close();
}
Payload encoders
Result backends now respect pluggable TaskPayloadEncoders. Producers encode
arguments before publishing, workers decode them once before invoking handlers,
and handler return values are encoded before they hit the backend. Every stored
status contains the encoder id (__stemResultEncoder), letting other processes
decode payloads without guessing formats.
Configure defaults when bootstrapping Stem, StemApp, Canvas, or workflow
apps:
class Base64PayloadEncoder extends TaskPayloadEncoder {
const Base64PayloadEncoder();
Object? encode(Object? value) =>
value is String ? base64Encode(utf8.encode(value)) : value;
Object? decode(Object? stored) =>
stored is String ? utf8.decode(base64Decode(stored)) : stored;
}
Future<void> configureEncoders() async {
final app = await StemApp.inMemory(
tasks: const [],
argsEncoder: const JsonTaskPayloadEncoder(),
resultEncoder: const Base64PayloadEncoder(),
additionalEncoders: const [GzipPayloadEncoder()],
);
await app.close();
}
Handlers needing bespoke treatment can override TaskMetadata.argsEncoder and
TaskMetadata.resultEncoder; the worker ensures only that task uses the custom
encoder while the rest fall back to the global defaults.
Workflow store
Workflow stores persist:
- workflow runs and status
- flow step results and script checkpoint results
- suspension/watcher records
- due-run scheduling metadata
That store is what allows workflow resumes, run inspection, and recovery across worker restarts. See the top-level Workflows section for the durable orchestration model and runtime behavior.
Schedule & lock stores
Future<void> configureBeatStores() async {
final scheduleStore = await RedisScheduleStore.connect(
'redis://localhost:6379/2',
);
final lockStore = await RedisLockStore.connect('redis://localhost:6379/3');
final broker = await RedisStreamsBroker.connect('redis://localhost:6379');
final beat = Beat(
broker: broker,
store: scheduleStore,
lockStore: lockStore,
);
await beat.stop();
await broker.close();
await scheduleStore.close();
await lockStore.close();
}
Switch to Postgres with PostgresScheduleStore.connect / PostgresLockStore.connect.
Revoke store
Store revocations in Redis/Postgres/SQLite so workers can honour
stem worker revoke:
export STEM_REVOKE_STORE_URL=postgres://postgres:postgres@localhost:5432/stem
Future<void> configurePostgresRevokeStore() async {
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final revokeStore = await PostgresRevokeStore.connect(
'postgres://postgres:postgres@localhost:5432/stem',
);
final worker = Worker(
broker: broker,
backend: backend,
tasks: demoTasks,
revokeStore: revokeStore,
);
await worker.shutdown();
await revokeStore.close();
await backend.close();
await broker.close();
}
Future<void> configureSqliteRevokeStore() async {
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final revokeStore = await SqliteRevokeStore.open(
File('stem_revoke.sqlite'),
namespace: 'stem',
);
final worker = Worker(
broker: broker,
backend: backend,
tasks: demoTasks,
revokeStore: revokeStore,
);
await worker.shutdown();
await revokeStore.close();
await backend.close();
await broker.close();
}
Tips
- In-memory adapters are great for local tests; switch to Redis/Postgres when you need persistence or multi-process coordination.
- SQLite is single-writer: keep only workers connected to the backend and use a separate SQLite file for the broker.
- Postgres adapters automatically migrate required tables on first connect.
- Configure TTLs on the result backend via
backend.setto limit retained data. - For HA Beat deployments, use the same lock store across instances.