Skip to content

Latest commit

 

History

History
695 lines (527 loc) · 21.4 KB

File metadata and controls

695 lines (527 loc) · 21.4 KB

@boringnode/queue

typescript-image gh-workflow-image npm-image npm-download-image license-image

A simple and efficient queue system for Node.js applications. Built for simplicity and ease of use, @boringnode/queue allows you to dispatch background jobs and process them asynchronously with support for multiple queue adapters.

Installation

npm install @boringnode/queue

Features

  • Multiple Queue Adapters: Redis, Knex (PostgreSQL, MySQL, SQLite), and Sync
  • Type-Safe Jobs: TypeScript classes with typed payloads
  • Delayed Jobs: Schedule jobs to run after a delay
  • Priority Queues: Process high-priority jobs first
  • Bulk Dispatch: Efficiently dispatch thousands of jobs at once
  • Job Grouping: Organize related jobs for monitoring
  • Job Deduplication: Prevent duplicate jobs with custom IDs
  • Retry with Backoff: Exponential, linear, or fixed backoff strategies
  • Job Timeout: Fail or retry jobs that exceed a time limit
  • Job History: Retain completed/failed jobs for debugging
  • Scheduled Jobs: Cron or interval-based recurring jobs
  • Auto-Discovery: Automatically register jobs from specified locations

Quick Start

1. Define a Job

import { Job } from '@boringnode/queue'
import type { JobOptions } from '@boringnode/queue/types'

interface SendEmailPayload {
  to: string
}

export default class SendEmailJob extends Job<SendEmailPayload> {
  static options: JobOptions = {
    queue: 'email',
  }

  async execute(): Promise<void> {
    console.log(`Sending email to: ${this.payload.to}`)
  }
}

Note

The job name defaults to the class name (SendEmailJob). You can override it with name: 'CustomName' in options.

Warning

If you minify your code in production, class names may be mangled. Always specify name explicitly in your job options.

2. Configure the Queue Manager

import { QueueManager } from '@boringnode/queue'
import { redis } from '@boringnode/queue/drivers/redis_adapter'

await QueueManager.init({
  default: 'redis',
  adapters: {
    redis: redis({ host: 'localhost', port: 6379 }),
  },
  locations: ['./app/jobs/**/*.ts'],
})

3. Dispatch Jobs

// Simple dispatch
await SendEmailJob.dispatch({ to: 'user@example.com' })

// With options
await SendEmailJob.dispatch({ to: 'user@example.com' })
  .toQueue('high-priority')
  .priority(1)
  .in('5m')

4. Start a Worker

import { Worker } from '@boringnode/queue'

const worker = new Worker(config)
await worker.start(['default', 'email'])

Bulk Dispatch

Efficiently dispatch thousands of jobs in a single batch operation:

const { jobIds } = await SendEmailJob.dispatchMany([
  { to: 'user1@example.com' },
  { to: 'user2@example.com' },
  { to: 'user3@example.com' },
])
  .group('newsletter-jan-2025')
  .toQueue('emails')
  .priority(3)

console.log(`Dispatched ${jobIds.length} jobs`)

This uses Redis MULTI/EXEC or SQL batch insert for optimal performance.

Job Grouping

Organize related jobs together for monitoring and filtering:

// Group newsletter jobs
await SendEmailJob.dispatch({ to: 'user@example.com' }).group('newsletter-jan-2025')

// Group with bulk dispatch
await SendEmailJob.dispatchMany(recipients).group('newsletter-jan-2025')

The groupId is stored with job data and accessible via job.data.groupId.

Job Deduplication

Prevent the same job from being pushed multiple times. Four modes, all via .dedup():

Simple (skip while job exists)

// First dispatch - job is created
await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()

// Second dispatch with same dedup ID - silently skipped
await SendInvoiceJob.dispatch({ orderId: 123 }).dedup({ id: 'order-123' }).run()

Throttle (skip within TTL window)

// Within 5s, duplicates are skipped. After 5s, a new job is created.
await SendEmailJob.dispatch({ to: 'user@example.com' })
  .dedup({ id: 'welcome-123', ttl: '5s' })
  .run()

Extend (reset TTL on duplicate)

// Each duplicate push resets the TTL timer.
await RateLimitJob.dispatch({ userId: 42 }).dedup({ id: 'rate-42', ttl: '1m', extend: true }).run()

Debounce (replace payload + reset TTL)

// Within the 2s window, the latest payload overwrites the previous pending job.
await SaveDraftJob.dispatch({ content: 'latest draft' })
  .dedup({ id: 'draft-42', ttl: '2s', replace: true, extend: true })
  .run()

Inspecting the outcome

DispatchResult tells you what happened:

const { jobId, deduped } = await SaveDraftJob.dispatch({ content: '...' })
  .dedup({ id: 'draft-42', ttl: '2s', replace: true })
  .run()

// deduped: 'added' | 'skipped' | 'replaced' | 'extended'
// jobId: the UUID of the job (the existing one when deduped)

How it works

  • The dedup ID is automatically prefixed with the job name (SendInvoiceJob::order-123), so different job types can reuse the same key.
  • ttl accepts a Duration ('5s', '1m') or milliseconds.
  • extend and replace require ttl — calling them without ttl throws.
  • replace only applies to jobs in pending or delayed state. Active (executing) jobs are left alone; the dispatch returns { deduped: 'skipped' }.
  • replace swaps the payload only — priority, queue, delay, and groupId of the existing job are retained. To change those, use a different dedup id or wait for the TTL to expire.
  • retryJob does not touch the dedup entry — a retried job continues to occupy the dedup slot. TTL runs on wall-clock time, so long-running retries may outlive the TTL window. Use a generous TTL or no TTL if retries must stay deduped.
  • Atomic and race-free:
    • Redis: SET + PEXPIRE under a Lua script with HSETNX-style guards.
    • Knex: transactional SELECT ... FOR UPDATE + insert/update inside a transaction.
    • SyncAdapter: executes inline, no dedup support.

Caveats

  • Without .dedup(), jobs use auto-generated UUIDs and are never deduplicated.
  • .dedup() is only available on single dispatch. dispatchMany / pushManyOn reject jobs with a dedup field.
  • Scheduled jobs (.schedule()) do not support dedup — each cron/interval fire is an independent dispatch.
  • With no ttl, dedup persists until the job is removed (completed/failed without retention). When retention keeps the record, re-dispatch stays blocked until the record is pruned.
  • With ttl, dedup expires after the window — a new job (new UUID) is created. The old job still runs.
  • Knex concurrent race: two pushOn calls with the same dedup id firing at the exact same instant on Postgres (READ COMMITTED) can both succeed (rare). Serialize at the app layer if strict guarantees are required, or use Redis.

Job History & Retention

Keep completed and failed jobs for debugging:

export default class ImportantJob extends Job<Payload> {
  static options: JobOptions = {
    // Keep last 1000 completed jobs
    removeOnComplete: { count: 1000 },

    // Keep failed jobs for 7 days
    removeOnFail: { age: '7d' },
  }
}
Retention options
Value Behavior
true (default) Remove immediately
false Keep forever
{ count: n } Keep last n jobs
{ age: '7d' } Keep for duration
{ count: 100, age: '1d' } Both limits apply

Query job history:

const job = await adapter.getJob('job-id', 'queue-name')
console.log(job.status) // 'completed' | 'failed'
console.log(job.finishedAt) // timestamp
console.log(job.error) // error message (if failed)

Adapters

Redis (recommended for production)

import { redis } from '@boringnode/queue/drivers/redis_adapter'

// With options
const adapter = redis({ host: 'localhost', port: 6379 })

// With existing ioredis instance
import { Redis } from 'ioredis'
const connection = new Redis({ host: 'localhost' })
const adapter = redis(connection)

Knex (PostgreSQL, MySQL, SQLite)

import { knex } from '@boringnode/queue/drivers/knex_adapter'

const adapter = knex({
  client: 'pg',
  connection: { host: 'localhost', database: 'myapp' },
})
More Knex examples
// With existing Knex instance
import Knex from 'knex'
const connection = Knex({ client: 'pg', connection: '...' })
const adapter = knex(connection)

// Custom table name
const adapter = knex(config, 'custom_jobs_table')
Database setup with QueueSchemaService

The Knex adapter requires tables to be created before use. Use QueueSchemaService to create them:

import { QueueSchemaService } from '@boringnode/queue'
import Knex from 'knex'

const connection = Knex({ client: 'pg', connection: '...' })
const schemaService = new QueueSchemaService(connection)

// Create tables with default names
await schemaService.createJobsTable()
await schemaService.createSchedulesTable()

// Or extend with custom columns
await schemaService.createJobsTable('queue_jobs', (table) => {
  table.string('tenant_id', 255).nullable()
})

AdonisJS migration example:

import { BaseSchema } from '@adonisjs/lucid/schema'
import { QueueSchemaService } from '@boringnode/queue'

export default class extends BaseSchema {
  async up() {
    const schemaService = new QueueSchemaService(this.db.connection().getWriteClient())
    await schemaService.createJobsTable()
    await schemaService.createSchedulesTable()
  }

  async down() {
    const schemaService = new QueueSchemaService(this.db.connection().getWriteClient())
    await schemaService.dropSchedulesTable()
    await schemaService.dropJobsTable()
  }
}

Fake (testing + assertions)

import { QueueManager } from '@boringnode/queue'
import { redis } from '@boringnode/queue/drivers/redis_adapter'

await QueueManager.init({
  default: 'redis',
  adapters: {
    redis: redis({ host: 'localhost' }),
  },
  locations: ['./app/jobs/**/*.ts'],
})

// The `using` keyword automatically restores the real adapters when
// the variable goes out of scope (at the end of the test function).
using fake = QueueManager.fake()

await SendEmailJob.dispatch({ to: 'user@example.com' })

fake.assertPushed(SendEmailJob)
fake.assertPushed(SendEmailJob, {
  queue: 'default',
  payload: (payload) => payload.to === 'user@example.com',
})
fake.assertPushedCount(1)

You can also call QueueManager.restore() manually if you need more control over when the real adapters are restored.

Sync (for testing)

import { sync } from '@boringnode/queue/drivers/sync_adapter'

const adapter = sync() // Jobs execute immediately

Use the sync adapter for tests and lightweight local development only.

  • await MyJob.dispatch(payload).run() waits for the job to fully finish.
  • Retries are executed inline, not by a background worker.
  • If you configure backoff, the adapter will sleep between attempts.
  • This means the caller can stay blocked for the full retry duration.

Example: with maxRetries: 3 and an exponential backoff of 1s, 2s, 4s, the request or command that dispatched the job can stay busy for about 7 seconds before the job exhausts its retries and runs failed().

Job Options

export default class MyJob extends Job<Payload> {
  static options: JobOptions = {
    queue: 'email', // Queue name (default: 'default')
    priority: 1, // Lower = higher priority (default: 5)
    maxRetries: 3, // Retry attempts before failing
    timeout: '30s', // Max execution time
    failOnTimeout: true, // Fail permanently on timeout (default: retry)
    removeOnComplete: { count: 100 }, // Keep last 100 completed
    removeOnFail: { age: '7d' }, // Keep failed for 7 days
  }
}

Delayed Jobs

await SendEmailJob.dispatch(payload).in('30s') // 30 seconds
await SendEmailJob.dispatch(payload).in('5m') // 5 minutes
await SendEmailJob.dispatch(payload).in('2h') // 2 hours
await SendEmailJob.dispatch(payload).in('1d') // 1 day

Retry & Backoff

import { exponentialBackoff } from '@boringnode/queue'

export default class ReliableJob extends Job<Payload> {
  static options: JobOptions = {
    maxRetries: 5,
    retry: {
      backoff: () =>
        exponentialBackoff({
          baseDelay: '1s',
          maxDelay: '1m',
          multiplier: 2,
          jitter: true,
        }),
    },
  }
}

maxRetries can be defined directly on the job options, and retry.backoff controls the delay between attempts.

With the sync adapter, these delays happen inline in the caller via sleep. If a job fails repeatedly, dispatch().run() will take as long as the total backoff duration. Use a worker-backed adapter when you do not want retries to slow down the request/command that dispatched the job.

Available strategies
import { exponentialBackoff, linearBackoff, fixedBackoff } from '@boringnode/queue'

// Exponential: 1s, 2s, 4s, 8s...
exponentialBackoff({ baseDelay: '1s', maxDelay: '1m', multiplier: 2 })

// Linear: 1s, 2s, 3s, 4s...
linearBackoff({ baseDelay: '1s', maxDelay: '30s', multiplier: 1 })

// Fixed: 5s, 5s, 5s...
fixedBackoff({ baseDelay: '5s', jitter: true })

Job Timeout

export default class LongRunningJob extends Job<Payload> {
  static options: JobOptions = {
    timeout: '30s',
    failOnTimeout: false, // Will retry (default)
  }

  async execute(): Promise<void> {
    for (const item of this.payload.items) {
      // Check abort signal for graceful timeout handling
      if (this.signal?.aborted) {
        throw new Error('Job timed out')
      }
      await this.processItem(item)
    }
  }
}

Job Context

Access execution metadata via this.context:

async execute(): Promise<void> {
  console.log(this.context.jobId)       // Unique job ID
  console.log(this.context.attempt)     // 1, 2, 3...
  console.log(this.context.queue)       // Queue name
  console.log(this.context.priority)    // Priority value
  console.log(this.context.acquiredAt)  // When acquired
  console.log(this.context.stalledCount) // Stall recoveries
}

Scheduled Jobs

Run jobs on a recurring basis:

// Every 10 seconds
await MetricsJob.schedule({ endpoint: '/health' }).every('10s')

// Cron schedule
await CleanupJob.schedule({ days: 30 })
  .id('daily-cleanup')
  .cron('0 0 * * *') // Midnight daily
  .timezone('Europe/Paris')
Schedule management
import { Schedule } from '@boringnode/queue'

// Find and manage
const schedule = await Schedule.find('daily-cleanup')
await schedule.pause()
await schedule.resume()
await schedule.trigger() // Run now
await schedule.delete()

// List schedules
const all = await Schedule.list()
const active = await Schedule.list({ status: 'active' })

Schedule options:

Method Description
.id(string) Unique identifier
.every(duration) Fixed interval ('5s', '1m', '1h')
.cron(expression) Cron schedule
.timezone(tz) Timezone (default: 'UTC')
.from(date) Start boundary
.to(date) End boundary
.limit(n) Maximum runs

Dependency Injection

Integrate with IoC containers:

await QueueManager.init({
  // ...
  jobFactory: async (JobClass) => {
    return app.container.make(JobClass)
  },
})
Example with injected services
export default class SendEmailJob extends Job<SendEmailPayload> {
  constructor(
    private mailer: MailerService,
    private logger: Logger
  ) {
    super()
  }

  async execute(): Promise<void> {
    this.logger.info(`Sending email to ${this.payload.to}`)
    await this.mailer.send(this.payload)
  }
}

Worker Configuration

const config = {
  worker: {
    concurrency: 5, // Parallel jobs
    idleDelay: '2s', // Poll interval when idle
    timeout: '1m', // Default job timeout
    stalledThreshold: '30s', // When to consider job stalled
    stalledInterval: '30s', // How often to check
    maxStalledCount: 1, // Max recoveries before failing
    gracefulShutdown: true, // Wait for jobs on SIGTERM
  },
}

Logging

import { pino } from 'pino'

await QueueManager.init({
  // ...
  logger: pino(),
})

OpenTelemetry Instrumentation (experimental)

Warning

The OpenTelemetry instrumentation is experimental and its API may change in future releases.

@boringnode/queue ships with built-in OpenTelemetry instrumentation that creates PRODUCER spans for job dispatch and CONSUMER spans for job execution, following OTel messaging semantic conventions.

Quick Setup

import { QueueInstrumentation } from '@boringnode/queue/otel'
import * as boringqueue from '@boringnode/queue'

const instrumentation = new QueueInstrumentation({
  messagingSystem: 'boringqueue', // default
  executionSpanLinkMode: 'link', // or 'parent'
})

instrumentation.enable()
instrumentation.manuallyRegister(boringqueue)

The instrumentation patches QueueManager.init() to automatically inject its wrappers — no config changes needed in your queue setup.

Span Attributes

The instrumentation uses standard OTel messaging semantic conventions where they map cleanly, plus a few queue-specific custom attributes.

Attribute Kind Description
messaging.system Semconv 'boringqueue' (configurable)
messaging.operation.name Semconv 'publish' or 'process'
messaging.destination.name Semconv Queue name
messaging.message.id Semconv Job ID for single-message spans
messaging.batch.message_count Semconv Number of jobs in a batch dispatch
messaging.message.retry.count Custom Retry count (0-based) for a job attempt
messaging.job.name Custom Job class name (e.g. SendEmailJob)
messaging.job.status Custom 'completed', 'failed', or 'retrying'
messaging.job.group_id Custom Queue-specific group identifier
messaging.job.priority Custom Queue-specific job priority
messaging.job.delay_ms Custom Delay before the job becomes available
messaging.job.queue_time_ms Custom Time spent waiting in queue before processing

Trace Context Propagation

The instrumentation automatically propagates trace context from dispatch to execution:

  • Link mode (default): Each job execution is an independent trace, linked to the dispatch span
  • Parent mode: Job execution is a child of the dispatch span (same trace)

Child spans created inside execute() (DB queries, HTTP calls, etc.) are automatically parented to the job consumer span.

diagnostics_channel

Raw telemetry events are available via diagnostics_channel for custom subscribers:

import { tracingChannels } from '@boringnode/queue'

const { executeChannel } = tracingChannels

executeChannel.subscribe({
  start() {},
  end() {},
  asyncStart() {},
  asyncEnd(message) {
    console.log(`Job ${message.job.name} ${message.status} in ${message.duration}ms`)
  },
  error() {},
})

Benchmarks

Performance comparison with BullMQ (5ms simulated work per job):

Jobs Concurrency @boringnode/queue BullMQ Diff
1000 5 1096ms 1116ms 1.8% faster
1000 10 565ms 579ms 2.4% faster
100K 10 56.2s 57.5s 2.1% faster
100K 20 29.1s 29.6s 1.7% faster
npm run benchmark -- --realistic