Skip to content

Commit b53a6df

Browse files
committed
refactor: replace .id() with .dedup() API for extensible job deduplication
1 parent 69c0635 commit b53a6df

11 files changed

Lines changed: 87 additions & 78 deletions

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ coverage
1515
npm-debug.log
1616
yarn-error.log
1717

18+
# OS specific
19+
.DS_Store
20+
1821
# Editors specific
1922
.fleet
2023
.idea

README.md

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -134,22 +134,22 @@ The `groupId` is stored with job data and accessible via `job.data.groupId`.
134134

135135
## Job Deduplication
136136

137-
Prevent the same job from being pushed to the queue twice using custom job IDs:
137+
Prevent the same job from being pushed to the queue twice using `.dedup()`:
138138

139139
```typescript
140140
// First dispatch - job is created
141-
await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run()
141+
await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()
142142

143-
// Second dispatch with same ID - silently skipped
144-
await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run()
143+
// Second dispatch with same dedup ID - silently skipped
144+
await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()
145145
```
146146

147-
The custom ID is automatically prefixed with the job name, so different job types can use the same ID without conflicts:
147+
The dedup ID is automatically prefixed with the job name, so different job types can use the same ID without conflicts:
148148

149149
```typescript
150150
// These are two different jobs, no conflict
151-
await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run()
152-
await SendReceiptJob.dispatch({ orderId: 123 }).id('order-123').run()
151+
await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()
152+
await SendReceiptJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()
153153
```
154154

155155
Deduplication is atomic and race-condition-free for adapters that support storage-level uniqueness checks:
@@ -159,10 +159,10 @@ Deduplication is atomic and race-condition-free for adapters that support storag
159159
- **SyncAdapter**: Executes jobs inline and does not support deduplication
160160

161161
> [!NOTE]
162-
> Without `.id()`, jobs use auto-generated UUIDs and are never deduplicated. The `.id()` method is only available on single dispatch, not `dispatchMany`.
162+
> Without `.dedup()`, jobs use auto-generated UUIDs and are never deduplicated. The `.dedup()` method is only available on single dispatch, not `dispatchMany`.
163163
164164
> [!TIP]
165-
> When job retention is enabled (`removeOnComplete: false`), completed jobs remain in storage. A re-dispatch with the same custom ID will be silently skipped since the record still exists. With the default retention (`true`), completed jobs are removed immediately, so re-dispatch with the same ID succeeds normally.
165+
> When job retention is enabled (`removeOnComplete: false`), completed jobs remain in storage. A re-dispatch with the same dedup ID will be silently skipped since the record still exists. With the default retention (`true`), completed jobs are removed immediately, so re-dispatch with the same ID succeeds normally.
166166
167167
## Job History & Retention
168168

src/drivers/fake_adapter.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ export class FakeAdapter implements Adapter {
160160
}
161161

162162
async pushOn(queue: string, jobData: JobData): Promise<void> {
163-
if (jobData.unique) {
163+
if (jobData.dedup) {
164164
const existing = await this.getJob(jobData.id, queue)
165165
if (existing) return
166166
}
@@ -174,7 +174,7 @@ export class FakeAdapter implements Adapter {
174174
}
175175

176176
async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
177-
if (jobData.unique) {
177+
if (jobData.dedup) {
178178
const existing = await this.getJob(jobData.id, queue)
179179
if (existing) return
180180
}

src/drivers/knex_adapter.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ export class KnexAdapter implements Adapter {
378378
score,
379379
})
380380

381-
if (jobData.unique) {
381+
if (jobData.dedup) {
382382
await query.onConflict(['id', 'queue']).ignore()
383383
} else {
384384
await query
@@ -400,7 +400,7 @@ export class KnexAdapter implements Adapter {
400400
execute_at: executeAt,
401401
})
402402

403-
if (jobData.unique) {
403+
if (jobData.dedup) {
404404
await query.onConflict(['id', 'queue']).ignore()
405405
} else {
406406
await query

src/drivers/redis_adapter.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ const PUSH_JOB_SCRIPT = `
3636
`
3737

3838
/**
39-
* Lua script for pushing a unique job.
39+
* Lua script for pushing a dedup job.
4040
* Uses HSETNX to only store data if the job doesn't already exist.
4141
* Only adds to pending ZSET if the job was newly created.
4242
*/
43-
const PUSH_UNIQUE_JOB_SCRIPT = `
43+
const PUSH_DEDUP_JOB_SCRIPT = `
4444
local data_key = KEYS[1]
4545
local pending_key = KEYS[2]
4646
local job_id = ARGV[1]
@@ -73,11 +73,11 @@ const PUSH_DELAYED_JOB_SCRIPT = `
7373
`
7474

7575
/**
76-
* Lua script for pushing a unique delayed job.
76+
* Lua script for pushing a dedup delayed job.
7777
* Uses HSETNX to only store data if the job doesn't already exist.
7878
* Only adds to delayed ZSET if the job was newly created.
7979
*/
80-
const PUSH_UNIQUE_DELAYED_JOB_SCRIPT = `
80+
const PUSH_DEDUP_DELAYED_JOB_SCRIPT = `
8181
local data_key = KEYS[1]
8282
local delayed_key = KEYS[2]
8383
local job_id = ARGV[1]
@@ -660,7 +660,7 @@ export class RedisAdapter implements Adapter {
660660
const keys = this.#getKeys(queue)
661661
const executeAt = Date.now() + delay
662662

663-
const script = jobData.unique ? PUSH_UNIQUE_DELAYED_JOB_SCRIPT : PUSH_DELAYED_JOB_SCRIPT
663+
const script = jobData.dedup ? PUSH_DEDUP_DELAYED_JOB_SCRIPT : PUSH_DELAYED_JOB_SCRIPT
664664

665665
await this.#connection.eval(
666666
script,
@@ -679,7 +679,7 @@ export class RedisAdapter implements Adapter {
679679
const timestamp = Date.now()
680680
const score = calculateScore(priority, timestamp)
681681

682-
const script = jobData.unique ? PUSH_UNIQUE_JOB_SCRIPT : PUSH_JOB_SCRIPT
682+
const script = jobData.dedup ? PUSH_DEDUP_JOB_SCRIPT : PUSH_JOB_SCRIPT
683683

684684
await this.#connection.eval(
685685
script,

src/job_dispatcher.ts

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ import { parse } from './utils.js'
1515
*
1616
* ```
1717
* Job.dispatch(payload)
18-
* .toQueue('emails') // optional: target queue
19-
* .priority(1) // optional: 1-10, lower = higher priority
20-
* .in('5m') // optional: delay before processing
21-
* .with('redis') // optional: specific adapter
22-
* .run() // dispatch the job
18+
* .toQueue('emails') // optional: target queue
19+
* .priority(1) // optional: 1-10, lower = higher priority
20+
* .in('5m') // optional: delay before processing
21+
* .dedup({ id: 'order-123' }) // optional: deduplication
22+
* .with('redis') // optional: specific adapter
23+
* .run() // dispatch the job
2324
* ```
2425
*
2526
* @typeParam T - The payload type for this job
@@ -47,7 +48,7 @@ export class JobDispatcher<T> {
4748
#delay?: Duration
4849
#priority?: number
4950
#groupId?: string
50-
#id?: string
51+
#dedup?: { id: string }
5152

5253
/**
5354
* Create a new job dispatcher.
@@ -150,35 +151,36 @@ export class JobDispatcher<T> {
150151
}
151152

152153
/**
153-
* Set a custom job ID for deduplication.
154+
* Configure deduplication for this job.
154155
*
155-
* When a custom ID is provided, the adapter will silently skip
156-
* the job if one with the same ID already exists in the queue.
156+
* When deduplication is configured, the adapter will silently skip
157+
* the job if one with the same dedup ID already exists in the queue.
157158
* The ID is automatically prefixed with the job name to prevent
158159
* collisions between different job types.
159160
*
160-
* @param jobId - Custom identifier for this job
161+
* @param options - Deduplication options
162+
* @param options.id - Unique deduplication key
161163
* @returns This dispatcher for chaining
162164
*
163165
* @example
164166
* ```typescript
165167
* // Prevent duplicate invoice jobs for the same order
166168
* await SendInvoiceJob.dispatch({ orderId: 123 })
167-
* .id('order-123')
169+
* .dedup({ id: 'order-123' })
168170
* .run()
169171
*
170-
* // Second dispatch with same ID is silently skipped
172+
* // Second dispatch with same dedup ID is silently skipped
171173
* await SendInvoiceJob.dispatch({ orderId: 123 })
172-
* .id('order-123')
174+
* .dedup({ id: 'order-123' })
173175
* .run()
174176
* ```
175177
*/
176-
id(jobId: string): this {
177-
if (!jobId) {
178-
throw new Error('Job ID must be a non-empty string')
178+
dedup(options: { id: string }): this {
179+
if (!options.id) {
180+
throw new Error('Dedup ID must be a non-empty string')
179181
}
180182

181-
this.#id = jobId
183+
this.#dedup = options
182184

183185
return this
184186
}
@@ -216,7 +218,7 @@ export class JobDispatcher<T> {
216218
* ```
217219
*/
218220
async run(): Promise<DispatchResult> {
219-
const id = this.#id ? `${this.#name}::${this.#id}` : randomUUID()
221+
const id = this.#dedup ? `${this.#name}::${this.#dedup.id}` : randomUUID()
220222

221223
debug('dispatching job %s with id %s using payload %s', this.#name, id, this.#payload)
222224

@@ -232,7 +234,7 @@ export class JobDispatcher<T> {
232234
priority: this.#priority,
233235
groupId: this.#groupId,
234236
createdAt: Date.now(),
235-
...(this.#id ? { unique: true } : {}),
237+
...(this.#dedup ? { dedup: { id: this.#dedup.id } } : {}),
236238
}
237239

238240
const message: JobDispatchMessage = { jobs: [jobData], queue: this.#queue, delay: parsedDelay }

src/types/main.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,15 @@ export interface JobData {
135135
traceContext?: Record<string, string>
136136

137137
/**
138-
* When true, adapters use atomic insert-if-not-exists semantics
138+
* Deduplication configuration for this job.
139+
* When set, adapters use atomic insert-if-not-exists semantics
139140
* to silently skip duplicate jobs with the same ID.
140-
* Set automatically when a custom job ID is provided via `.id()`.
141+
* Set automatically when `.dedup()` is called on the dispatcher.
141142
*/
142-
unique?: boolean
143+
dedup?: {
144+
/** The original dedup key provided by the caller (before name-prefixing). */
145+
id: string
146+
}
143147
}
144148

145149
/**

tests/_mocks/memory_adapter.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ export class MemoryAdapter implements Adapter {
5151
}
5252

5353
async pushOn(queue: string, jobData: JobData): Promise<void> {
54-
if (jobData.unique) {
54+
if (jobData.dedup) {
5555
const existing = await this.getJob(jobData.id, queue)
5656
if (existing) return
5757
}
@@ -68,7 +68,7 @@ export class MemoryAdapter implements Adapter {
6868
}
6969

7070
async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
71-
if (jobData.unique) {
71+
if (jobData.dedup) {
7272
const existing = await this.getJob(jobData.id, queue)
7373
if (existing) return
7474
}

tests/_utils/register_driver_test_suite.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1650,7 +1650,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) {
16501650
assert.equal(third!.id, 'low')
16511651
})
16521652

1653-
test('pushOn with unique flag should skip duplicate job', async ({ assert }) => {
1653+
test('pushOn with dedup should skip duplicate job', async ({ assert }) => {
16541654
const adapter = await options.createAdapter()
16551655
adapter.setWorkerId('worker-1')
16561656

@@ -1659,15 +1659,15 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) {
16591659
name: 'TestJob',
16601660
payload: { attempt: 1 },
16611661
attempts: 0,
1662-
unique: true,
1662+
dedup: { id: 'order-1' },
16631663
})
16641664

16651665
await adapter.pushOn('test-queue', {
16661666
id: 'TestJob::order-1',
16671667
name: 'TestJob',
16681668
payload: { attempt: 2 },
16691669
attempts: 0,
1670-
unique: true,
1670+
dedup: { id: 'order-1' },
16711671
})
16721672

16731673
const size = await adapter.sizeOf('test-queue')
@@ -1677,7 +1677,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) {
16771677
assert.deepEqual(job!.payload, { attempt: 1 })
16781678
})
16791679

1680-
test('pushOn without unique flag should insert normally', async ({ assert }) => {
1680+
test('pushOn without dedup should insert normally', async ({ assert }) => {
16811681
const adapter = await options.createAdapter()
16821682
adapter.setWorkerId('worker-1')
16831683

@@ -1699,7 +1699,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) {
16991699
assert.equal(size, 2)
17001700
})
17011701

1702-
test('pushLaterOn with unique flag should skip duplicate delayed job', async ({ assert }) => {
1702+
test('pushLaterOn with dedup should skip duplicate delayed job', async ({ assert }) => {
17031703
const adapter = await options.createAdapter()
17041704
adapter.setWorkerId('worker-1')
17051705

@@ -1710,7 +1710,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) {
17101710
name: 'TestJob',
17111711
payload: { attempt: 1 },
17121712
attempts: 0,
1713-
unique: true,
1713+
dedup: { id: 'delayed-1' },
17141714
},
17151715
60_000
17161716
)
@@ -1722,7 +1722,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) {
17221722
name: 'TestJob',
17231723
payload: { attempt: 2 },
17241724
attempts: 0,
1725-
unique: true,
1725+
dedup: { id: 'delayed-1' },
17261726
},
17271727
60_000
17281728
)
@@ -1732,7 +1732,7 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) {
17321732
assert.deepEqual(job!.data.payload, { attempt: 1 })
17331733
})
17341734

1735-
test('pushOn with unique flag should allow same id on different queues', async ({ assert }) => {
1735+
test('pushOn with dedup should allow same id on different queues', async ({ assert }) => {
17361736
const adapter = await options.createAdapter()
17371737
adapter.setWorkerId('worker-1')
17381738

@@ -1741,15 +1741,15 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) {
17411741
name: 'TestJob',
17421742
payload: { queue: 'a' },
17431743
attempts: 0,
1744-
unique: true,
1744+
dedup: { id: 'shared-id' },
17451745
})
17461746

17471747
await adapter.pushOn('queue-b', {
17481748
id: 'TestJob::shared-id',
17491749
name: 'TestJob',
17501750
payload: { queue: 'b' },
17511751
attempts: 0,
1752-
unique: true,
1752+
dedup: { id: 'shared-id' },
17531753
})
17541754

17551755
const sizeA = await adapter.sizeOf('queue-a')

0 commit comments

Comments
 (0)