Skip to main content

Uniqueness & Deduplication

Stem can prevent duplicate enqueues for naturally unique tasks. Enable TaskOptions.unique and configure a UniqueTaskCoordinator backed by a LockStore (Redis or in-memory).

Quick start

  1. Mark the task as unique:
lib/uniqueness.dart
  
TaskOptions get options => const TaskOptions(
queue: 'email',
unique: true,
uniqueFor: Duration(minutes: 15),
maxRetries: 2,
);
  1. Create a coordinator with a shared lock store:
lib/uniqueness.dart
UniqueTaskCoordinator buildInMemoryCoordinator() {
final lockStore = InMemoryLockStore();
return UniqueTaskCoordinator(
lockStore: lockStore,
defaultTtl: const Duration(minutes: 5),
);
}
  1. Wire the coordinator into the producer and worker:
lib/uniqueness.dart
  final app = await StemApp.fromUrl(
'memory://',
tasks: [SendDigestTask()],
uniqueTasks: true,
uniqueTaskDefaultTtl: const Duration(minutes: 5),
workerConfig: const StemWorkerConfig(
queue: 'email',
consumerName: 'unique-worker',
),
);

How uniqueness works

  • The unique key is derived from task name, queue, args, headers, and meta (excluding keys prefixed with stem.).
  • Keys are canonicalized (sorted maps, stable JSON) to ensure equivalent inputs hash to the same key.
  • Use uniqueFor to control the lock TTL. When unset, Stem falls back to the task visibilityTimeout, envelope visibility timeout, or the coordinator default TTL (in that order).

Override the unique key

Override the computed key when you need custom grouping:

lib/uniqueness.dart
Future<void> enqueueWithOverride(Stem stem) async {
await stem.enqueue(
'orders.sync',
args: const {'id': 42},
options: const TaskOptions(unique: true, uniqueFor: Duration(minutes: 10)),
meta: const {UniqueTaskMetadata.override: 'order-42'},
);
}

What happens on duplicates

  • Duplicate enqueues return the existing task id.
  • The stem.tasks.deduplicated metric increments.
  • Duplicates are recorded on the task status metadata under stem.unique.duplicates.

Release semantics

Unique locks are released after task completion (success or failure) by the worker that holds the lock. If a worker crashes mid-task, the lock expires when the TTL elapses, allowing a future enqueue.

Retry behavior

Retries keep the original task id. Uniqueness is evaluated at enqueue time, so retries do not create new unique claims.

Example

The snippet below shows the enqueue behavior (see unique_tasks for a full demo):

lib/uniqueness.dart
Future<void> enqueueDigest(Stem stem) async {
final firstId = await stem.enqueue(
'email.sendDigest',
args: const {'userId': 42},
options: const TaskOptions(
queue: 'email',
unique: true,
uniqueFor: Duration(minutes: 15),
),
);

final secondId = await stem.enqueue(
'email.sendDigest',
args: const {'userId': 42},
options: const TaskOptions(
queue: 'email',
unique: true,
uniqueFor: Duration(minutes: 15),
),
);

print('first enqueue id: $firstId');
print('second enqueue id: $secondId (dup is re-used)');
}

Next steps