Persistence & Stores
Use persistence when you need durable task state, shared schedules, or revocation storage. Stem ships with Redis, Postgres, and SQLite adapters plus in-memory variants for local development.
Result backend
- In-memory (lib/bootstrap.dart)
- SQLite (stem_sqlite)
- Redis (lib/bootstrap.dart)
- Postgres (lib/bootstrap.dart)
Future<void> connectInMemoryBackend() async {
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final stem = Stem(
broker: broker,
registry: registry,
backend: backend,
);
await stem.enqueue('demo', args: {});
await backend.close();
await broker.close();
}
Future<void> connectSqliteBackend() async {
final broker = await SqliteBroker.open(File('stem_broker.sqlite'));
final backend = await SqliteResultBackend.open(File('stem_backend.sqlite'));
final stem = Stem(
broker: broker,
registry: registry,
backend: backend,
);
await stem.enqueue('demo', args: {});
await backend.close();
await broker.close();
}
Future<void> connectRedisBackend() async {
final backend = await RedisResultBackend.connect('redis://localhost:6379/1');
final broker = await RedisStreamsBroker.connect('redis://localhost:6379');
final stem = Stem(
broker: broker,
registry: registry,
backend: backend,
);
await stem.enqueue('demo', args: {});
await backend.close();
await broker.close();
}
Future<void> connectPostgresBackend() async {
final backend = await PostgresResultBackend.connect(
connectionString: 'postgres://postgres:postgres@localhost:5432/stem',
);
final broker = await RedisStreamsBroker.connect('redis://localhost:6379');
final stem = Stem(
broker: broker,
registry: registry,
backend: backend,
);
await stem.enqueue('demo', args: {});
await backend.close();
await broker.close();
}
Payload encoders
Result backends now respect pluggable TaskPayloadEncoders. Producers encode
arguments before publishing, workers decode them once before invoking handlers,
and handler return values are encoded before they hit the backend. Every stored
status contains the encoder id (__stemResultEncoder), letting other processes
decode payloads without guessing formats.
Configure defaults when bootstrapping Stem, StemApp, Canvas, or workflow
apps:
class Base64PayloadEncoder extends TaskPayloadEncoder {
const Base64PayloadEncoder();
Object? encode(Object? value) =>
value is String ? base64Encode(utf8.encode(value)) : value;
Object? decode(Object? stored) =>
stored is String ? utf8.decode(base64Decode(stored)) : stored;
}
Future<void> configureEncoders() async {
final app = await StemApp.inMemory(
tasks: const [],
argsEncoder: const JsonTaskPayloadEncoder(),
resultEncoder: const Base64PayloadEncoder(),
additionalEncoders: const [GzipPayloadEncoder()],
);
await app.close();
}
Handlers needing bespoke treatment can override TaskMetadata.argsEncoder and
TaskMetadata.resultEncoder; the worker ensures only that task uses the custom
encoder while the rest fall back to the global defaults.
Schedule & lock stores
Future<void> configureBeatStores() async {
final scheduleStore = await RedisScheduleStore.connect(
'redis://localhost:6379/2',
);
final lockStore = await RedisLockStore.connect('redis://localhost:6379/3');
final broker = await RedisStreamsBroker.connect('redis://localhost:6379');
final beat = Beat(
broker: broker,
store: scheduleStore,
lockStore: lockStore,
);
await beat.stop();
await broker.close();
await scheduleStore.close();
await lockStore.close();
}
Switch to Postgres with PostgresScheduleStore.connect / PostgresLockStore.connect.
Revoke store
Store revocations in Redis/Postgres/SQLite so workers can honour
stem worker revoke:
export STEM_REVOKE_STORE_URL=postgres://postgres:postgres@localhost:5432/stem
Future<void> configurePostgresRevokeStore() async {
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final revokeStore = await PostgresRevokeStore.connect(
'postgres://postgres:postgres@localhost:5432/stem',
);
final worker = Worker(
broker: broker,
registry: registry,
backend: backend,
revokeStore: revokeStore,
);
await worker.shutdown();
await revokeStore.close();
await backend.close();
await broker.close();
}
Future<void> configureSqliteRevokeStore() async {
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final revokeStore = await SqliteRevokeStore.open(
File('stem_revoke.sqlite'),
namespace: 'stem',
);
final worker = Worker(
broker: broker,
registry: registry,
backend: backend,
revokeStore: revokeStore,
);
await worker.shutdown();
await revokeStore.close();
await backend.close();
await broker.close();
}
Tips
- In-memory adapters are great for local tests; switch to Redis/Postgres when you need persistence or multi-process coordination.
- SQLite is single-writer: keep only workers connected to the backend and use a separate SQLite file for the broker.
- Postgres adapters automatically migrate required tables on first connect.
- Configure TTLs on the result backend via
backend.setto limit retained data. - For HA Beat deployments, use the same lock store across instances.