Skip to content

Commit d082087

Browse files
committed
feat: add .dedup() API for job deduplication
1 parent b53a6df commit d082087

21 files changed

Lines changed: 1299 additions & 244 deletions

README.md

Lines changed: 69 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,9 @@ The `groupId` is stored with job data and accessible via `job.data.groupId`.
134134

135135
## Job Deduplication
136136

137-
Prevent the same job from being pushed to the queue twice using `.dedup()`:
137+
Prevent the same job from being pushed multiple times. Four modes, all via `.dedup()`:
138+
139+
### Simple (skip while job exists)
138140

139141
```typescript
140142
// First dispatch - job is created
@@ -144,25 +146,65 @@ await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()
144146
await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()
145147
```
146148

147-
The dedup ID is automatically prefixed with the job name, so different job types can use the same ID without conflicts:
149+
### Throttle (skip within TTL window)
148150

149151
```typescript
150-
// These are two different jobs, no conflict
151-
await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()
152-
await SendReceiptJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()
152+
// Within 5s, duplicates are skipped. After 5s, a new job is created.
153+
await SendEmailJob.dispatch({ to: 'user@example.com' })
154+
.dedup({ id: 'welcome-123', ttl: '5s' })
155+
.run()
153156
```
154157

155-
Deduplication is atomic and race-condition-free for adapters that support storage-level uniqueness checks:
158+
### Extend (reset TTL on duplicate)
156159

157-
- **Redis**: Uses `HSETNX` (set-if-not-exists)
158-
- **Knex**: Uses `INSERT ... ON CONFLICT DO NOTHING`
159-
- **SyncAdapter**: Executes jobs inline and does not support deduplication
160+
```typescript
161+
// Each duplicate push resets the TTL timer.
162+
await RateLimitJob.dispatch({ userId: 42 }).dedup({ id: 'rate-42', ttl: '1m', extend: true }).run()
163+
```
160164

161-
> [!NOTE]
162-
> Without `.dedup()`, jobs use auto-generated UUIDs and are never deduplicated. The `.dedup()` method is only available on single dispatch, not `dispatchMany`.
165+
### Debounce (replace payload + reset TTL)
166+
167+
```typescript
168+
// Within the 2s window, the latest payload overwrites the previous pending job.
169+
await SaveDraftJob.dispatch({ content: 'latest draft' })
170+
.dedup({ id: 'draft-42', ttl: '2s', replace: true, extend: true })
171+
.run()
172+
```
173+
174+
### Inspecting the outcome
175+
176+
`DispatchResult` tells you what happened:
177+
178+
```typescript
179+
const { jobId, deduped } = await SaveDraftJob.dispatch({ content: '...' })
180+
.dedup({ id: 'draft-42', ttl: '2s', replace: true })
181+
.run()
182+
183+
// deduped: 'added' | 'skipped' | 'replaced' | 'extended'
184+
// jobId: the UUID of the job (the existing one when deduped)
185+
```
186+
187+
### How it works
188+
189+
- The dedup ID is automatically prefixed with the job name (`SendInvoiceJob::order-123`), so different job types can reuse the same key.
190+
- `ttl` accepts a Duration (`'5s'`, `'1m'`) or milliseconds.
191+
- `extend` and `replace` **require** `ttl` — calling them without `ttl` throws.
192+
- `replace` only applies to jobs in `pending` or `delayed` state. Active (executing) jobs are left alone; the dispatch returns `{ deduped: 'skipped' }`.
193+
- `replace` swaps the **payload only** — priority, queue, delay, and groupId of the existing job are retained. To change those, use a different dedup id or wait for the TTL to expire.
194+
- `retryJob` does not touch the dedup entry — a retried job continues to occupy the dedup slot. TTL runs on wall-clock time, so long-running retries may outlive the TTL window. Use a generous TTL or no TTL if retries must stay deduped.
195+
- Atomic and race-free:
196+
- **Redis**: `SET + PEXPIRE` under a Lua script with `HSETNX`-style guards.
197+
- **Knex**: transactional `SELECT ... FOR UPDATE` + insert/update inside a transaction.
198+
- **SyncAdapter**: executes inline, no dedup support.
199+
200+
### Caveats
163201

164-
> [!TIP]
165-
> When job retention is enabled (`removeOnComplete: false`), completed jobs remain in storage. A re-dispatch with the same dedup ID will be silently skipped since the record still exists. With the default retention (`true`), completed jobs are removed immediately, so re-dispatch with the same ID succeeds normally.
202+
- Without `.dedup()`, jobs use auto-generated UUIDs and are never deduplicated.
203+
- `.dedup()` is only available on single dispatch. `dispatchMany` / `pushManyOn` reject jobs with a `dedup` field.
204+
- Scheduled jobs (`.schedule()`) do not support dedup — each cron/interval fire is an independent dispatch.
205+
- With no `ttl`, dedup persists until the job is removed (completed/failed without retention). When retention keeps the record, re-dispatch stays blocked until the record is pruned.
206+
- With `ttl`, dedup expires after the window — a new job (new UUID) is created. The old job still runs.
207+
- Knex concurrent race: two `pushOn` calls with the same dedup id firing at the exact same instant on Postgres (READ COMMITTED) can both succeed (rare). Serialize at the app layer if strict guarantees are required, or use Redis.
166208

167209
## Job History & Retention
168210

@@ -569,7 +611,7 @@ import * as boringqueue from '@boringnode/queue'
569611

570612
const instrumentation = new QueueInstrumentation({
571613
messagingSystem: 'boringqueue', // default
572-
executionSpanLinkMode: 'link', // or 'parent'
614+
executionSpanLinkMode: 'link', // or 'parent'
573615
})
574616

575617
instrumentation.enable()
@@ -582,19 +624,19 @@ The instrumentation patches `QueueManager.init()` to automatically inject its wr
582624

583625
The instrumentation uses standard [OTel messaging semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/) where they map cleanly, plus a few queue-specific custom attributes.
584626

585-
| Attribute | Kind | Description |
586-
| ------------------------------- | ------- | ------------------------------------------ |
587-
| `messaging.system` | Semconv | `'boringqueue'` (configurable) |
588-
| `messaging.operation.name` | Semconv | `'publish'` or `'process'` |
589-
| `messaging.destination.name` | Semconv | Queue name |
590-
| `messaging.message.id` | Semconv | Job ID for single-message spans |
591-
| `messaging.batch.message_count` | Semconv | Number of jobs in a batch dispatch |
592-
| `messaging.message.retry.count` | Custom | Retry count (0-based) for a job attempt |
593-
| `messaging.job.name` | Custom | Job class name (e.g. `SendEmailJob`) |
594-
| `messaging.job.status` | Custom | `'completed'`, `'failed'`, or `'retrying'` |
595-
| `messaging.job.group_id` | Custom | Queue-specific group identifier |
596-
| `messaging.job.priority` | Custom | Queue-specific job priority |
597-
| `messaging.job.delay_ms` | Custom | Delay before the job becomes available |
627+
| Attribute | Kind | Description |
628+
| ------------------------------- | ------- | --------------------------------------------- |
629+
| `messaging.system` | Semconv | `'boringqueue'` (configurable) |
630+
| `messaging.operation.name` | Semconv | `'publish'` or `'process'` |
631+
| `messaging.destination.name` | Semconv | Queue name |
632+
| `messaging.message.id` | Semconv | Job ID for single-message spans |
633+
| `messaging.batch.message_count` | Semconv | Number of jobs in a batch dispatch |
634+
| `messaging.message.retry.count` | Custom | Retry count (0-based) for a job attempt |
635+
| `messaging.job.name` | Custom | Job class name (e.g. `SendEmailJob`) |
636+
| `messaging.job.status` | Custom | `'completed'`, `'failed'`, or `'retrying'` |
637+
| `messaging.job.group_id` | Custom | Queue-specific group identifier |
638+
| `messaging.job.priority` | Custom | Queue-specific job priority |
639+
| `messaging.job.delay_ms` | Custom | Delay before the job becomes available |
598640
| `messaging.job.queue_time_ms` | Custom | Time spent waiting in queue before processing |
599641

600642
### Trace Context Propagation

src/contracts/adapter.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type {
2+
DedupOutcome,
23
JobData,
34
JobRecord,
45
JobRetention,
@@ -7,6 +8,17 @@ import type {
78
ScheduleListOptions,
89
} from '../types/main.js'
910

11+
/**
12+
* Result of a push operation when dedup was involved.
13+
* `outcome` tells the dispatcher what happened; `jobId` is the ID of the
14+
* existing job when deduped (skipped/replaced/extended).
15+
*/
16+
export interface PushResult {
17+
outcome: DedupOutcome
18+
/** ID of the existing job when a duplicate was detected, otherwise the newly added job's id. */
19+
jobId: string
20+
}
21+
1022
/**
1123
* A job that has been acquired by a worker for processing.
1224
* Extends JobData with the timestamp when the job was acquired.
@@ -119,33 +131,37 @@ export interface Adapter {
119131
* Push a job to the default queue for immediate processing.
120132
*
121133
* @param jobData - The job data to push
134+
* @returns PushResult if jobData.dedup is set, otherwise void
122135
*/
123-
push(jobData: JobData): Promise<void>
136+
push(jobData: JobData): Promise<PushResult | void>
124137

125138
/**
126139
* Push a job to a specific queue for immediate processing.
127140
*
128141
* @param queue - The queue name to push to
129142
* @param jobData - The job data to push
143+
* @returns PushResult if jobData.dedup is set, otherwise void
130144
*/
131-
pushOn(queue: string, jobData: JobData): Promise<void>
145+
pushOn(queue: string, jobData: JobData): Promise<PushResult | void>
132146

133147
/**
134148
* Push a job to the default queue with a delay.
135149
*
136150
* @param jobData - The job data to push
137151
* @param delay - Delay in milliseconds before the job becomes available
152+
* @returns PushResult if jobData.dedup is set, otherwise void
138153
*/
139-
pushLater(jobData: JobData, delay: number): Promise<void>
154+
pushLater(jobData: JobData, delay: number): Promise<PushResult | void>
140155

141156
/**
142157
* Push a job to a specific queue with a delay.
143158
*
144159
* @param queue - The queue name to push to
145160
* @param jobData - The job data to push
146161
* @param delay - Delay in milliseconds before the job becomes available
162+
* @returns PushResult if jobData.dedup is set, otherwise void
147163
*/
148-
pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void>
164+
pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<PushResult | void>
149165

150166
/**
151167
* Push multiple jobs to the default queue for immediate processing.

0 commit comments

Comments
 (0)