Skip to main content

Rate Limiting

Stem supports per-task rate limits via TaskOptions.rateLimit and a pluggable RateLimiter interface. This lets you throttle hot handlers with a shared Redis-backed limiter or custom driver.

Stem also supports group-scoped rate limits with TaskOptions.groupRateLimit for shared quotas across multiple task types/tenants.

Quick start

lib/shared.dart
  final registry = SimpleTaskRegistry()
..register(
FunctionTaskHandler<void>(
name: _taskName,
options: const TaskOptions(
queue: 'throttled',
maxRetries: 0,
visibilityTimeout: Duration(seconds: 60),
rateLimit: '3/s',
),
entrypoint: _renderEntrypoint,
),
);

Docs snippet (in-memory demo)

lib/rate_limiting.dart
class RateLimitedTask extends TaskHandler<void> {

String get name => 'demo.rateLimited';


TaskOptions get options => const TaskOptions(
rateLimit: '10/s',
maxRetries: 3,
);


Future<void> call(TaskContext context, Map<String, Object?> args) async {
final actor = args['actor'] as String? ?? 'anonymous';
print('Handled rate-limited task for $actor');
}
}

Run the rate_limit_delay example for a full demo:

  • packages/stem/example/rate_limit_delay

Rate limit syntax

rateLimit accepts short strings like:

  • 10/s — 10 tokens per second
  • 100/m — 100 tokens per minute
  • 500/h — 500 tokens per hour

groupRateLimit uses the same syntax.

How it works

  • The worker parses rateLimit for each task.
  • The worker asks the RateLimiter for an acquire decision.
  • If denied, the task is retried with backoff and rateLimited=true metadata.
  • Retry delays come from the limiter retryAfter if provided, otherwise the worker’s retry strategy.
  • If granted, the task executes immediately.

Group rate limiting

Group rate limits share a limiter bucket across related tasks.

  • groupRateLimit: limiter policy for the shared group bucket
  • groupRateKey: optional static key (if omitted, Stem resolves from header)
  • groupRateKeyHeader: header used when groupRateKey is not set (default: tenant)
  • groupRateLimiterFailureMode (default: failOpen):
    • failOpen: continue execution if limiter backend fails
    • failClosed: requeue/retry when limiter backend fails
lib/rate_limiting.dart
class GroupRateLimitedTask extends TaskHandler<void> {

String get name => 'demo.groupRateLimited';


TaskOptions get options => const TaskOptions(
groupRateLimit: '20/m',
groupRateKeyHeader: 'tenant',
groupRateLimiterFailureMode: RateLimiterFailureMode.failClosed,
maxRetries: 5,
);


Future<void> call(TaskContext context, Map<String, Object?> args) async {
final tenant = args['tenant'] as String? ?? 'global';
print('Handled group-rate-limited task for $tenant');
}
}

Redis-backed limiter example

The example/rate_limit_delay demo ships a Redis fixed-window limiter. It:

  • shares tokens across multiple workers,
  • logs when a token is granted or denied,
  • reschedules denied tasks with retry metadata.

Inspect it here:

lib/rate_limiter.dart
class RedisFixedWindowRateLimiter implements RateLimiter {
RedisFixedWindowRateLimiter._(
this._connection,
this._command, {
required this.namespace,
});

final RedisConnection _connection;
final Command _command;
final String namespace;
bool _closed = false;

static const _script = '''
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local interval = tonumber(ARGV[2])

local current = redis.call('GET', key)
if not current then
redis.call('SET', key, 1, 'PX', interval)
return {1, interval}
end

current = tonumber(current)
if current < limit then
redis.call('INCR', key)
local ttl = redis.call('PTTL', key)
if ttl < 0 then ttl = interval end
return {1, ttl}
end

local ttl = redis.call('PTTL', key)
if ttl < 0 then ttl = interval end
return {0, ttl}
''';

static Future<RedisFixedWindowRateLimiter> connect(
String uri, {
String namespace = 'stem-demo',
}) async {
final parsed = Uri.parse(uri);
final host = parsed.host.isNotEmpty ? parsed.host : 'localhost';
final port = parsed.hasPort ? parsed.port : 6379;
final connection = RedisConnection();
final scheme = parsed.scheme.isEmpty ? 'redis' : parsed.scheme;

if (scheme == 'rediss') {
throw UnsupportedError(
'TLS connections are not implemented for the rate limiter demo. '
'Use redis:// URLs or extend the example.',
);
}

final command = await connection.connect(host, port);

if (parsed.userInfo.isNotEmpty) {
final parts = parsed.userInfo.split(':');
final password = parts.length == 2 ? parts[1] : parts[0];
await command.send_object(['AUTH', password]);
}

if (parsed.pathSegments.isNotEmpty) {
final db = int.tryParse(parsed.pathSegments.first);
if (db != null) {
await command.send_object(['SELECT', db]);
}
}

final resolvedNamespace = parsed.queryParameters['ns'] ?? namespace.trim();

return RedisFixedWindowRateLimiter._(
connection,
command,
namespace: resolvedNamespace.isEmpty ? 'stem-demo' : resolvedNamespace,
);
}

String _keyFor(String key) => '$namespace:rate:$key';


Future<RateLimitDecision> acquire(
String key, {
int tokens = 1,
Duration? interval,
Map<String, Object?>? meta,
}) async {
final window = interval ?? const Duration(seconds: 1);
final response = await _command.send_object([
'EVAL',
_script,
1,
_keyFor(key),
tokens,
window.inMilliseconds,
]);

if (response is! List || response.length != 2) {
throw StateError(
'Unexpected response from rate limiter script: $response',
);
}

final allowed = (response[0] as num).toInt() == 1;
final ttlMs = (response[1] as num).toInt();
final remainingMs = ttlMs < 0 ? window.inMilliseconds : ttlMs;
final retryAfter = allowed ? null : Duration(milliseconds: remainingMs);

final decision = RateLimitDecision(
allowed: allowed,
retryAfter: retryAfter,
meta: {
'windowMs': window.inMilliseconds,
'remainingMs': remainingMs,
if (meta != null) ...meta,
},
);

final status = allowed ? 'granted' : 'denied';
final retryText = retryAfter == null
? 'available immediately'
: 'retry in ${retryAfter.inMilliseconds}ms';
stdout.writeln(
'[rate-limiter][$status] key=$key tokens=$tokens window=${window.inMilliseconds}ms -> $retryText',
);

return decision;
}

Future<void> close() async {
if (_closed) return;
_closed = true;
await _connection.close();
}
}

Observability

When a task is rate limited:

  • context.meta['rateLimited'] is set on the retry attempt,
  • taskRetry signals include retry metadata,
  • worker logs show the limiter decision (if you log it).

Keying behavior

The worker uses a default rate-limit key of:

<taskName>:<tenant>

If no tenant header is set, it defaults to global. Add a tenant header when enqueuing tasks to enforce per-tenant limits.

Redis limiter wiring

The rate_limit_delay example reads STEM_RATE_LIMIT_URL to point the limiter at Redis. Use a dedicated Redis DB or key prefix to keep limiter state isolated from your broker/result backend.

lib/shared.dart
Future<RedisFixedWindowRateLimiter> connectRateLimiter(String uri) =>
RedisFixedWindowRateLimiter.connect(uri);

Tips

  • Use shared Redis for global limits across worker processes.
  • Keep the rate limit key stable (by default it uses task name + tenant).
  • Start with generous limits, then tighten after observing throughput.

Next steps