Next Steps
Use this page as a jump table once you’ve finished the first walkthroughs.
Calling tasks
- Producer enqueue
- Task handler
lib/producer.dart
Future<void> enqueueWithRedis() async {
final brokerUrl =
Platform.environment['STEM_BROKER_URL'] ?? 'redis://localhost:6379';
final tasks = [
FunctionTaskHandler<void>(
name: 'reports.generate',
entrypoint: (context, args) async {
final id = args['reportId'] as String? ?? 'unknown';
print('Queued report $id');
return null;
},
),
];
final client = await StemClient.fromUrl(
brokerUrl,
adapters: const [StemRedisAdapter()],
overrides: StemStoreOverrides(backend: '$brokerUrl/1'),
tasks: tasks,
);
await client.enqueue(
'reports.generate',
args: {'reportId': 'monthly-2025-10'},
options: const TaskOptions(queue: 'reports', maxRetries: 3),
meta: {'requestedBy': 'finance'},
);
await client.close();
}
lib/tasks.dart
class RedisEmailTask extends TaskHandler<void> {
String get name => 'email.send';
TaskOptions get options => const TaskOptions(
queue: 'email',
maxRetries: 4,
visibilityTimeout: Duration(seconds: 30),
unique: true,
uniqueFor: Duration(minutes: 5),
);
Future<void> call(TaskContext context, Map<String, Object?> args) async {
await sendEmailRemote(args);
}
}
final redisTasks = [RedisEmailTask()];
Canvas/Workflows
lib/canvas_chain.dart
Future<void> main() async {
final app = await StemApp.inMemory(
tasks: [
FunctionTaskHandler<String>(
name: 'fetch.user',
entrypoint: (context, args) async => 'Ada',
),
FunctionTaskHandler<String>(
name: 'enrich.user',
entrypoint: (context, args) async {
final prev = context.meta.valueOr<String>(
'chainPrevResult',
'Friend',
);
return '$prev Lovelace';
},
),
FunctionTaskHandler<Object?>(
name: 'send.email',
entrypoint: (context, args) async {
final fullName = context.meta.valueOr<String>(
'chainPrevResult',
'Friend',
);
print('Sending email to $fullName');
return null;
},
),
],
workerConfig: const StemWorkerConfig(
consumerName: 'chain-worker',
concurrency: 1,
prefetchMultiplier: 1,
),
);
final canvas = app.canvas;
final chainResult = await canvas.chain([
task('fetch.user'),
task('enrich.user'),
task('send.email'),
]);
print(
'Chain completed with state: ${chainResult.finalStatus?.state} '
'value=${chainResult.value}',
);
await app.close();
}
Routing
lib/routing.dart
final inlineRegistry = RoutingRegistry(
RoutingConfig(
defaultQueue: const DefaultQueueConfig(alias: 'default', queue: 'primary'),
queues: {'primary': QueueDefinition(name: 'primary')},
routes: [
RouteDefinition(
match: RouteMatch.fromJson(const {'task': 'reports.*'}),
target: RouteTarget(type: 'queue', name: 'primary'),
),
],
),
);
Remote control
lib/worker_control.dart
final worker = Worker(
broker: _autoscaleBroker,
backend: _autoscaleBackend,
tasks: const [],
queue: 'critical',
concurrency: 12,
autoscale: const WorkerAutoscaleConfig(
enabled: true,
minConcurrency: 2,
maxConcurrency: 12,
scaleUpStep: 2,
scaleDownStep: 1,
idlePeriod: Duration(seconds: 45),
tick: Duration(milliseconds: 250),
),
);
Observability & ops
lib/observability.dart
void configureMetrics() {
StemMetrics.instance.configure(exporters: [ConsoleMetricsExporter()]);
}
Timezone
lib/scheduler.dart
Future<void> addScheduleSpecs(ScheduleStore store) async {
await addIntervalSchedule(store);
await addCronSchedule(store);
await addSolarSchedule(store);
await addClockedSchedule(store);
}
Optimization
lib/rate_limiting.dart
// #region rate-limit-task-options
class RateLimitedTask extends TaskHandler<void> {
String get name => 'demo.rateLimited';
TaskOptions get options => const TaskOptions(
rateLimit: '10/s',
maxRetries: 3,
);
Future<void> call(TaskContext context, Map<String, Object?> args) async {
final actor = args['actor'] as String? ?? 'anonymous';
print('Handled rate-limited task for $actor');
}
}
// #endregion rate-limit-task-options