Rate Limiting
Stem supports per-task rate limits via TaskOptions.rateLimit and a pluggable
RateLimiter interface. This lets you throttle hot handlers with a shared
Redis-backed limiter or custom driver.
Stem also supports group-scoped rate limits with TaskOptions.groupRateLimit
for shared quotas across multiple task types/tenants.
Quick start
- Task Options
- Worker Wiring
- Producer Enqueue
final registry = SimpleTaskRegistry()
..register(
FunctionTaskHandler<void>(
name: _taskName,
options: const TaskOptions(
queue: 'throttled',
maxRetries: 0,
visibilityTimeout: Duration(seconds: 60),
rateLimit: '3/s',
),
entrypoint: _renderEntrypoint,
),
);
final worker = Worker(
broker: broker,
registry: registry,
backend: backend,
rateLimiter: rateLimiter,
queue: 'throttled',
consumerName:
Platform.environment['WORKER_NAME'] ?? 'rate-limit-demo-worker',
subscription: RoutingSubscription.singleQueue('throttled'),
concurrency: 2,
);
for (var i = 0; i < totalJobs; i++) {
final delaySeconds = i >= totalJobs / 2 ? 4 : 0;
final notBefore = delaySeconds > 0
? DateTime.now().add(Duration(seconds: delaySeconds))
: null;
final priority = i.isEven ? 9 : 2;
final route = routing.resolve(
RouteRequest(
task: taskName(),
headers: const {},
queue: 'throttled',
),
);
final appliedPriority = route.effectivePriority(priority);
final id = await stem.enqueue(
taskName(),
args: {
'job': i + 1,
'scheduledFor': notBefore?.toIso8601String(),
'requestedPriority': priority,
},
options: TaskOptions(
queue: 'throttled',
priority: priority,
maxRetries: 0,
),
notBefore: notBefore,
meta: {
'requestedPriority': priority,
'appliedPriority': appliedPriority,
if (notBefore != null) 'scheduledFor': notBefore.toIso8601String(),
},
);
stdout.writeln(
'[producer] job=${i + 1} priority=$priority '
'applied=$appliedPriority delay=${delaySeconds}s id=$id',
);
}
Docs snippet (in-memory demo)
- Define a rate-limited task
- Limiter config + state
- Limiter acquire decision
- Wire worker with rate limiter
- Enqueue with tenant header
- Bootstrap StemApp
- Start worker
- Create Stem client
- Enqueue demo task
- Shutdown cleanly
class RateLimitedTask extends TaskHandler<void> {
String get name => 'demo.rateLimited';
TaskOptions get options => const TaskOptions(
rateLimit: '10/s',
maxRetries: 3,
);
Future<void> call(TaskContext context, Map<String, Object?> args) async {
final actor = args['actor'] as String? ?? 'anonymous';
print('Handled rate-limited task for $actor');
}
}
DemoRateLimiter({required this.capacity, required this.interval});
final int capacity;
final Duration interval;
int _used = 0;
DateTime _windowStart = DateTime.now();
Future<RateLimitDecision> acquire(
String key, {
int tokens = 1,
Duration? interval,
Map<String, Object?>? meta,
}) async {
final window = interval ?? this.interval;
final now = DateTime.now();
final elapsed = now.difference(_windowStart);
if (elapsed >= window) {
_windowStart = now;
_used = 0;
}
if (_used + tokens <= capacity) {
_used += tokens;
return RateLimitDecision(allowed: true, meta: {'key': key});
}
final retryAfter = window - elapsed;
return RateLimitDecision(
allowed: false,
retryAfter: retryAfter.isNegative ? Duration.zero : retryAfter,
meta: {'key': key},
);
}
final limiter = DemoRateLimiter(
capacity: 2,
interval: const Duration(seconds: 1),
);
final workerConfig = StemWorkerConfig(rateLimiter: limiter);
Future<String> enqueueRateLimited(Stem stem) async {
return stem.enqueue(
'demo.rateLimited',
args: {'actor': 'acme'},
headers: const {'tenant': 'acme'},
);
}
// #region rate-limit-worker
final limiter = DemoRateLimiter(
capacity: 2,
interval: const Duration(seconds: 1),
);
final workerConfig = StemWorkerConfig(rateLimiter: limiter);
// #endregion rate-limit-worker
final app = await StemApp.inMemory(
tasks: [RateLimitedTask()],
workerConfig: workerConfig,
);
await app.start();
final stem = app.stem;
await enqueueRateLimited(stem);
await app.close();
Run the rate_limit_delay example for a full demo:
packages/stem/example/rate_limit_delay
Rate limit syntax
rateLimit accepts short strings like:
10/s— 10 tokens per second100/m— 100 tokens per minute500/h— 500 tokens per hour
groupRateLimit uses the same syntax.
How it works
- The worker parses
rateLimitfor each task. - The worker asks the
RateLimiterfor an acquire decision. - If denied, the task is retried with backoff and
rateLimited=truemetadata. - Retry delays come from the limiter
retryAfterif provided, otherwise the worker’s retry strategy. - If granted, the task executes immediately.
Group rate limiting
Group rate limits share a limiter bucket across related tasks.
groupRateLimit: limiter policy for the shared group bucketgroupRateKey: optional static key (if omitted, Stem resolves from header)groupRateKeyHeader: header used whengroupRateKeyis not set (default:tenant)groupRateLimiterFailureMode(default:failOpen):failOpen: continue execution if limiter backend failsfailClosed: requeue/retry when limiter backend fails
class GroupRateLimitedTask extends TaskHandler<void> {
String get name => 'demo.groupRateLimited';
TaskOptions get options => const TaskOptions(
groupRateLimit: '20/m',
groupRateKeyHeader: 'tenant',
groupRateLimiterFailureMode: RateLimiterFailureMode.failClosed,
maxRetries: 5,
);
Future<void> call(TaskContext context, Map<String, Object?> args) async {
final tenant = args['tenant'] as String? ?? 'global';
print('Handled group-rate-limited task for $tenant');
}
}
Redis-backed limiter example
The example/rate_limit_delay demo ships a Redis fixed-window limiter. It:
- shares tokens across multiple workers,
- logs when a token is granted or denied,
- reschedules denied tasks with retry metadata.
Inspect it here:
class RedisFixedWindowRateLimiter implements RateLimiter {
RedisFixedWindowRateLimiter._(
this._connection,
this._command, {
required this.namespace,
});
final RedisConnection _connection;
final Command _command;
final String namespace;
bool _closed = false;
static const _script = '''
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local interval = tonumber(ARGV[2])
local current = redis.call('GET', key)
if not current then
redis.call('SET', key, 1, 'PX', interval)
return {1, interval}
end
current = tonumber(current)
if current < limit then
redis.call('INCR', key)
local ttl = redis.call('PTTL', key)
if ttl < 0 then ttl = interval end
return {1, ttl}
end
local ttl = redis.call('PTTL', key)
if ttl < 0 then ttl = interval end
return {0, ttl}
''';
static Future<RedisFixedWindowRateLimiter> connect(
String uri, {
String namespace = 'stem-demo',
}) async {
final parsed = Uri.parse(uri);
final host = parsed.host.isNotEmpty ? parsed.host : 'localhost';
final port = parsed.hasPort ? parsed.port : 6379;
final connection = RedisConnection();
final scheme = parsed.scheme.isEmpty ? 'redis' : parsed.scheme;
if (scheme == 'rediss') {
throw UnsupportedError(
'TLS connections are not implemented for the rate limiter demo. '
'Use redis:// URLs or extend the example.',
);
}
final command = await connection.connect(host, port);
if (parsed.userInfo.isNotEmpty) {
final parts = parsed.userInfo.split(':');
final password = parts.length == 2 ? parts[1] : parts[0];
await command.send_object(['AUTH', password]);
}
if (parsed.pathSegments.isNotEmpty) {
final db = int.tryParse(parsed.pathSegments.first);
if (db != null) {
await command.send_object(['SELECT', db]);
}
}
final resolvedNamespace = parsed.queryParameters['ns'] ?? namespace.trim();
return RedisFixedWindowRateLimiter._(
connection,
command,
namespace: resolvedNamespace.isEmpty ? 'stem-demo' : resolvedNamespace,
);
}
String _keyFor(String key) => '$namespace:rate:$key';
Future<RateLimitDecision> acquire(
String key, {
int tokens = 1,
Duration? interval,
Map<String, Object?>? meta,
}) async {
final window = interval ?? const Duration(seconds: 1);
final response = await _command.send_object([
'EVAL',
_script,
1,
_keyFor(key),
tokens,
window.inMilliseconds,
]);
if (response is! List || response.length != 2) {
throw StateError(
'Unexpected response from rate limiter script: $response',
);
}
final allowed = (response[0] as num).toInt() == 1;
final ttlMs = (response[1] as num).toInt();
final remainingMs = ttlMs < 0 ? window.inMilliseconds : ttlMs;
final retryAfter = allowed ? null : Duration(milliseconds: remainingMs);
final decision = RateLimitDecision(
allowed: allowed,
retryAfter: retryAfter,
meta: {
'windowMs': window.inMilliseconds,
'remainingMs': remainingMs,
if (meta != null) ...meta,
},
);
final status = allowed ? 'granted' : 'denied';
final retryText = retryAfter == null
? 'available immediately'
: 'retry in ${retryAfter.inMilliseconds}ms';
stdout.writeln(
'[rate-limiter][$status] key=$key tokens=$tokens window=${window.inMilliseconds}ms -> $retryText',
);
return decision;
}
Future<void> close() async {
if (_closed) return;
_closed = true;
await _connection.close();
}
}
Observability
When a task is rate limited:
context.meta['rateLimited']is set on the retry attempt,taskRetrysignals include retry metadata,- worker logs show the limiter decision (if you log it).
Keying behavior
The worker uses a default rate-limit key of:
<taskName>:<tenant>
If no tenant header is set, it defaults to global. Add a tenant header when
enqueuing tasks to enforce per-tenant limits.
Redis limiter wiring
The rate_limit_delay example reads STEM_RATE_LIMIT_URL to point the limiter
at Redis. Use a dedicated Redis DB or key prefix to keep limiter state isolated
from your broker/result backend.
Future<RedisFixedWindowRateLimiter> connectRateLimiter(String uri) =>
RedisFixedWindowRateLimiter.connect(uri);
Tips
- Use shared Redis for global limits across worker processes.
- Keep the rate limit key stable (by default it uses task name + tenant).
- Start with generous limits, then tighten after observing throughput.
Next steps
- See Tasks & Retries for other
TaskOptionsknobs. - Use Observability to instrument rate-limited flows.