Skip to main content

Beat Scheduler Guide

Stem Beat enqueues periodic tasks so you can keep background jobs on schedule. This guide shows how to define schedules, load them, run Beat alongside your workers, and monitor results.

Define schedules

Beat accepts YAML, JSON, or programmatic entries. YAML keeps schedules under version control and mirrors the CLI format:

config/schedules.yaml
cleanup-temp-files:
task: maintenance.cleanup
spec: every:5m
queue: maintenance
args:
path: /tmp
options:
maxRetries: 2

midnight-report:
task: reports.generate
spec:
cron: '0 0 * * *'
timezone: America/New_York
queue: reports
kwargs:
report: daily-summary

solar-check:
task: solar.notify
spec:
solar: sunset
latitude: 40.7128
longitude: -74.0060
offset: -15m

one-off-reconcile:
task: billing.reconcile
spec:
clocked: 2025-02-01T00:00:00Z
queue: finance
options:
runOnce: true

Load schedules

Apply schedule files to the schedule store before calling beat.start():

Future<void> loadSchedules() async {
final store = await RedisScheduleStore.connect('redis://localhost:6379/2');
final beat = Beat(
broker: await RedisStreamsBroker.connect('redis://localhost:6379'),
store: store,
lockStore: await RedisLockStore.connect('redis://localhost:6379/3'),
);

await applyScheduleFile(store, 'config/schedules.yaml');
await beat.start();
}

To build schedules imperatively, call store.upsert with the spec classes (IntervalScheduleSpec, CronScheduleSpec, SolarScheduleSpec, ClockedScheduleSpec).

Start Beat

bin/beat_dev.dart
Future<void> main() async {
final registry = SimpleTaskRegistry()..register(DemoTask());
final broker = InMemoryBroker();
final store = InMemoryScheduleStore();
final lockStore = InMemoryLockStore();

final beat = Beat(
broker: broker,
store: store,
lockStore: lockStore,
);

await store.upsert(
ScheduleEntry(
id: 'demo-once',
taskName: 'demo.run',
queue: 'default',
spec: ClockedScheduleSpec(
runAt: DateTime.now().add(const Duration(seconds: 5)),
),
),
);

await beat.start();
await Future<void>.delayed(const Duration(seconds: 6));
await beat.stop();
await broker.close();
}

class DemoTask extends TaskHandler<void> {

String get name => 'demo.run';


TaskOptions get options => const TaskOptions(queue: 'default');


Future<void> call(TaskContext context, Map<String, Object?> args) async {
print('Running scheduled job at ${DateTime.now()}');
}
}

CLI alternative

Prefer configuration over code? Use the CLI with the same schedule file:

stem schedule apply \
--file config/schedules.yaml \
--yes

stem schedule list
stem schedule dry-run --spec "every:5m"

Programmatic spec helpers

Future<void> addScheduleSpecs(ScheduleStore store) async {
await addIntervalSchedule(store);
await addCronSchedule(store);
await addSolarSchedule(store);
await addClockedSchedule(store);
}
SpecDescription
IntervalMillisecond-resolution interval with optional jitter, startAt, endAt.
CronClassic 5/6-field cron with optional timezone per entry.
SolarSunrise, sunset, or solar noon with lat/long and optional offsets.
ClockedOne-shot timestamp; set runOnce to prevent rescheduling.

Timezone handling

  • Schedule entries accept an optional IANA timezone identifier.
  • If your schedule store uses the default calculator, schedules evaluate in UTC.
  • To honor per-entry timezones, construct the schedule store with a ScheduleCalculator configured with a timezone data provider.
  • You must load timezone data in your process (for example, timezone/data/latest.dart) before using a timezone-aware calculator.

Observe Beat activity

Stem emits scheduler signals that mirror Celery Beat hooks:

void registerBeatSignals() {
StemSignals.scheduleEntryDue.connect((payload, _) {
print('[due] ${payload.entry.id} @ ${payload.tickAt}');
});

StemSignals.scheduleEntryDispatched.connect((payload, _) {
print('[dispatched] drift=${payload.drift.inMilliseconds}ms');
});

StemSignals.scheduleEntryFailed.connect((payload, _) {
print('[failed] ${payload.entry.id}: ${payload.error}');
});
}

You can also query the schedule store directly:

Future<void> listDueEntries(ScheduleStore store) async {
final dueEntries = await store.due(DateTime.now());
for (final entry in dueEntries) {
print('Upcoming: ${entry.id} at ${entry.nextRunAt}');
}
}

Tips & tricks

  • Use lockStore (Redis or Postgres) when running Beat in HA mode so only one instance triggers jobs at a time.
  • Call Beat.stop() on shutdown to flush outstanding timers and release locks.
  • Run Beat from a Dart entrypoint wired to your schedule store (see the Redis example below):
lib/scheduler.dart
Future<void> startRedisBeat() async {
final brokerUrl =
Platform.environment['STEM_BROKER_URL'] ?? 'redis://localhost:6379';
final store = await RedisScheduleStore.connect('$brokerUrl/2');
final beat = Beat(
broker: await RedisStreamsBroker.connect(brokerUrl),
store: store,
lockStore: await RedisLockStore.connect('$brokerUrl/3'),
);

await applyScheduleFile(store, 'config/schedules.yaml');
await beat.start();
}

class EmailTask extends TaskHandler<void> {

String get name => 'email.send';


TaskOptions get options => const TaskOptions(queue: 'default');


Future<void> call(TaskContext context, Map<String, Object?> args) async {}
}
  • Store schedules in source control and re-apply them with stem schedule apply --yes after deployments.

Next, hook Beat into your deployment automation and monitor the scheduler signals alongside worker metrics.