Skip to main content

Persistence & Stores

Use persistence when you need durable task state, shared schedules, or revocation storage. Stem ships with Redis, Postgres, and SQLite adapters plus in-memory variants for local development.

Result backend

Future<void> connectInMemoryBackend() async {
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final stem = Stem(
broker: broker,
registry: registry,
backend: backend,
);
await stem.enqueue('demo', args: {});
await backend.close();
await broker.close();
}

Payload encoders

Result backends now respect pluggable TaskPayloadEncoders. Producers encode arguments before publishing, workers decode them once before invoking handlers, and handler return values are encoded before they hit the backend. Every stored status contains the encoder id (__stemResultEncoder), letting other processes decode payloads without guessing formats.

Configure defaults when bootstrapping Stem, StemApp, Canvas, or workflow apps:

lib/bootstrap_encoders.dart
class Base64PayloadEncoder extends TaskPayloadEncoder {
const Base64PayloadEncoder();

Object? encode(Object? value) =>
value is String ? base64Encode(utf8.encode(value)) : value;

Object? decode(Object? stored) =>
stored is String ? utf8.decode(base64Decode(stored)) : stored;
}

Future<void> configureEncoders() async {
final app = await StemApp.inMemory(
tasks: const [],
argsEncoder: const JsonTaskPayloadEncoder(),
resultEncoder: const Base64PayloadEncoder(),
additionalEncoders: const [GzipPayloadEncoder()],
);
await app.close();
}

Handlers needing bespoke treatment can override TaskMetadata.argsEncoder and TaskMetadata.resultEncoder; the worker ensures only that task uses the custom encoder while the rest fall back to the global defaults.

Schedule & lock stores

lib/beat_bootstrap.dart
Future<void> configureBeatStores() async {
final scheduleStore = await RedisScheduleStore.connect(
'redis://localhost:6379/2',
);
final lockStore = await RedisLockStore.connect('redis://localhost:6379/3');
final broker = await RedisStreamsBroker.connect('redis://localhost:6379');

final beat = Beat(
broker: broker,
store: scheduleStore,
lockStore: lockStore,
);

await beat.stop();
await broker.close();
await scheduleStore.close();
await lockStore.close();
}

Switch to Postgres with PostgresScheduleStore.connect / PostgresLockStore.connect.

Revoke store

Store revocations in Redis/Postgres/SQLite so workers can honour stem worker revoke:

export STEM_REVOKE_STORE_URL=postgres://postgres:postgres@localhost:5432/stem
Postgres revoke store
Future<void> configurePostgresRevokeStore() async {
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final revokeStore = await PostgresRevokeStore.connect(
'postgres://postgres:postgres@localhost:5432/stem',
);
final worker = Worker(
broker: broker,
registry: registry,
backend: backend,
revokeStore: revokeStore,
);

await worker.shutdown();
await revokeStore.close();
await backend.close();
await broker.close();
}
SQLite revoke store
Future<void> configureSqliteRevokeStore() async {
final broker = InMemoryBroker();
final backend = InMemoryResultBackend();
final revokeStore = await SqliteRevokeStore.open(
File('stem_revoke.sqlite'),
namespace: 'stem',
);
final worker = Worker(
broker: broker,
registry: registry,
backend: backend,
revokeStore: revokeStore,
);

await worker.shutdown();
await revokeStore.close();
await backend.close();
await broker.close();
}

Tips

  • In-memory adapters are great for local tests; switch to Redis/Postgres when you need persistence or multi-process coordination.
  • SQLite is single-writer: keep only workers connected to the backend and use a separate SQLite file for the broker.
  • Postgres adapters automatically migrate required tables on first connect.
  • Configure TTLs on the result backend via backend.set to limit retained data.
  • For HA Beat deployments, use the same lock store across instances.