Skip to main content

Next Steps

Use this page as a jump table once you’ve finished the first walkthroughs.

Calling tasks

lib/producer.dart
Future<void> enqueueWithRedis() async {
final brokerUrl =
Platform.environment['STEM_BROKER_URL'] ?? 'redis://localhost:6379';

final broker = await RedisStreamsBroker.connect(brokerUrl);
final backend = await RedisResultBackend.connect('$brokerUrl/1');
final registry = SimpleTaskRegistry()
..register(
FunctionTaskHandler<void>(
name: 'reports.generate',
entrypoint: (context, args) async {
final id = args['reportId'] as String? ?? 'unknown';
print('Queued report $id');
return null;
},
),
);

final stem = Stem(
broker: broker,
registry: registry,
backend: backend,
);

await stem.enqueue(
'reports.generate',
args: {'reportId': 'monthly-2025-10'},
options: const TaskOptions(queue: 'reports', maxRetries: 3),
meta: {'requestedBy': 'finance'},
);
await backend.close();
await broker.close();
}

Canvas/Workflows

lib/canvas_chain.dart
Future<void> main() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<String>(
name: 'fetch.user',
entrypoint: (context, args) async => 'Ada',
),
FunctionTaskHandler<String>(
name: 'enrich.user',
entrypoint: (context, args) async {
final prev = context.meta['chainPrevResult'] as String? ?? 'Friend';
return '$prev Lovelace';
},
),
FunctionTaskHandler<Object?>(
name: 'send.email',
entrypoint: (context, args) async {
final fullName =
context.meta['chainPrevResult'] as String? ?? 'Friend';
print('Sending email to $fullName');
return null;
},
),
],
workerConfig: const StemWorkerConfig(
consumerName: 'chain-worker',
concurrency: 1,
prefetchMultiplier: 1,
),
);
await app.start();

final canvas = app.canvas;
final chainResult = await canvas.chain([
task('fetch.user'),
task('enrich.user'),
task('send.email'),
]);

print(
'Chain completed with state: ${chainResult.finalStatus?.state} '
'value=${chainResult.value}',
);

await app.close();
}

Routing

lib/routing.dart
final inlineRegistry = RoutingRegistry(
RoutingConfig(
defaultQueue: const DefaultQueueConfig(alias: 'default', queue: 'primary'),
queues: {'primary': QueueDefinition(name: 'primary')},
routes: [
RouteDefinition(
match: RouteMatch.fromJson(const {'task': 'reports.*'}),
target: RouteTarget(type: 'queue', name: 'primary'),
),
],
),
);

Remote control

lib/worker_control.dart
final worker = Worker(
broker: _autoscaleBroker,
registry: SimpleTaskRegistry(),
backend: _autoscaleBackend,
queue: 'critical',
concurrency: 12,
autoscale: const WorkerAutoscaleConfig(
enabled: true,
minConcurrency: 2,
maxConcurrency: 12,
scaleUpStep: 2,
scaleDownStep: 1,
idlePeriod: Duration(seconds: 45),
tick: Duration(milliseconds: 250),
),
);

Observability & ops

lib/observability.dart
void configureMetrics() {
StemMetrics.instance.configure(exporters: [ConsoleMetricsExporter()]);
}

Timezone

lib/scheduler.dart
Future<void> addScheduleSpecs(ScheduleStore store) async {
await addIntervalSchedule(store);
await addCronSchedule(store);
await addSolarSchedule(store);
await addClockedSchedule(store);
}

Optimization

lib/rate_limiting.dart
// #region rate-limit-task-options
class RateLimitedTask extends TaskHandler<void> {

String get name => 'demo.rateLimited';


TaskOptions get options => const TaskOptions(
rateLimit: '10/s',
maxRetries: 3,
);


Future<void> call(TaskContext context, Map<String, Object?> args) async {
final actor = args['actor'] as String? ?? 'anonymous';
print('Handled rate-limited task for $actor');
}
}
// #endregion rate-limit-task-options