Uniqueness & Deduplication
Stem can prevent duplicate enqueues for naturally unique tasks. Enable
TaskOptions.unique and configure a UniqueTaskCoordinator backed by a
LockStore (Redis or in-memory).
Quick start
- Mark the task as unique:
lib/uniqueness.dart
TaskOptions get options => const TaskOptions(
queue: 'email',
unique: true,
uniqueFor: Duration(minutes: 15),
maxRetries: 2,
);
- Create a coordinator with a shared lock store:
- In-memory (local)
- Redis (shared)
lib/uniqueness.dart
UniqueTaskCoordinator buildInMemoryCoordinator() {
final lockStore = InMemoryLockStore();
return UniqueTaskCoordinator(
lockStore: lockStore,
defaultTtl: const Duration(minutes: 5),
);
}
lib/uniqueness.dart
Future<UniqueTaskCoordinator> buildRedisCoordinator() async {
final redisUrl =
Platform.environment['STEM_LOCK_STORE_URL'] ?? 'redis://localhost:6379/5';
final lockStore = await RedisLockStore.connect(redisUrl);
return UniqueTaskCoordinator(
lockStore: lockStore,
defaultTtl: const Duration(minutes: 5),
);
}
- Wire the coordinator into the producer and worker:
lib/uniqueness.dart
final app = await StemApp.fromUrl(
'memory://',
tasks: [SendDigestTask()],
uniqueTasks: true,
uniqueTaskDefaultTtl: const Duration(minutes: 5),
workerConfig: const StemWorkerConfig(
queue: 'email',
consumerName: 'unique-worker',
),
);
How uniqueness works
- The unique key is derived from task name, queue, args, headers, and meta
(excluding keys prefixed with
stem.). - Keys are canonicalized (sorted maps, stable JSON) to ensure equivalent inputs hash to the same key.
- Use
uniqueForto control the lock TTL. When unset, Stem falls back to the taskvisibilityTimeout, envelope visibility timeout, or the coordinator default TTL (in that order).
Override the unique key
Override the computed key when you need custom grouping:
lib/uniqueness.dart
Future<void> enqueueWithOverride(Stem stem) async {
await stem.enqueue(
'orders.sync',
args: const {'id': 42},
options: const TaskOptions(unique: true, uniqueFor: Duration(minutes: 10)),
meta: const {UniqueTaskMetadata.override: 'order-42'},
);
}
What happens on duplicates
- Duplicate enqueues return the existing task id.
- The
stem.tasks.deduplicatedmetric increments. - Duplicates are recorded on the task status metadata under
stem.unique.duplicates.
Release semantics
Unique locks are released after task completion (success or failure) by the worker that holds the lock. If a worker crashes mid-task, the lock expires when the TTL elapses, allowing a future enqueue.
Retry behavior
Retries keep the original task id. Uniqueness is evaluated at enqueue time, so retries do not create new unique claims.
Example
The snippet below shows the enqueue behavior (see unique_tasks for a full demo):
lib/uniqueness.dart
Future<void> enqueueDigest(Stem stem) async {
final firstId = await stem.enqueue(
'email.sendDigest',
args: const {'userId': 42},
options: const TaskOptions(
queue: 'email',
unique: true,
uniqueFor: Duration(minutes: 15),
),
);
final secondId = await stem.enqueue(
'email.sendDigest',
args: const {'userId': 42},
options: const TaskOptions(
queue: 'email',
unique: true,
uniqueFor: Duration(minutes: 15),
),
);
print('first enqueue id: $firstId');
print('second enqueue id: $secondId (dup is re-used)');
}
Next steps
- See Tasks & Retries for other
TaskOptionssettings. - Combine uniqueness with Rate Limiting to guard hot paths.