This directory contains SQL migration files for setting up the Dead Letter Queue (DLQ) functionality in the backfill library.
dlq_schema.sql- Complete DLQ schema with table creation, indexes, and documentation
The backfill library can automatically initialize the DLQ schema when needed:
use backfill::BackfillClient;
let client = BackfillClient::new("postgresql://localhost/mydb", "my_schema").await?;
// This will create the DLQ table if it doesn't exist
client.init_dlq().await?;For production environments where you prefer explicit migration control:
# Replace schema name if needed
sed 's/graphile_worker/your_schema_name/g' docs/dlq_schema.sql | psql -d your_database
# Or run directly if using the default schema
psql -d your_database -f docs/dlq_schema.sqlWith diesel-cli:
# Copy the SQL content into a diesel migration
diesel migration generate add_dlq_schema
# Copy dlq_schema.sql content to the up.sql fileWith sqlx-cli:
# Create a new migration
sqlx migrate add add_dlq_schema
# Copy dlq_schema.sql content to the migration fileWith refinery or other tools:
# Copy dlq_schema.sql to your migrations directory
cp docs/dlq_schema.sql migrations/001_add_dlq_schema.sqlThe default schema is graphile_worker to match graphile-worker conventions. To use a different schema:
# Replace all occurrences of the schema name
sed 's/graphile_worker/your_custom_schema/g' docs/dlq_schema.sql > custom_dlq_schema.sql
psql -d your_database -f custom_dlq_schema.sqlYou can extend the DLQ table with additional columns for your specific needs:
-- Add custom columns after initial schema creation
ALTER TABLE your_schema.backfill_dlq
ADD COLUMN tenant_id TEXT,
ADD COLUMN environment TEXT DEFAULT 'production',
ADD COLUMN custom_metadata JSONB;
-- Add corresponding indexes
CREATE INDEX idx_backfill_dlq_tenant
ON your_schema.backfill_dlq (tenant_id);The DLQ table provides comprehensive job failure tracking:
job_id- Original job identifiertask_identifier- Job type/handler namequeue_name- Processing queuepayload- Original job data (JSON)
failure_count- Total failures before DLQlast_error- Most recent error messagefailed_at- When moved to DLQmax_attempts- Retry limit when job failed
requeued_count- Times requeued from DLQlast_requeued_at- Most recent requeuecreated_by- System that moved job to DLQnotes- Admin notes for manual intervention
The schema includes optimized indexes for:
- Admin interfaces - Fast filtering by task type, queue, and time
- Statistics queries - Efficient aggregation for monitoring
- Requeue operations - Quick lookup for admin actions
Once the schema is created, the library provides full admin functionality:
// Get DLQ statistics
let stats = client.dlq_stats().await?;
println!("Total DLQ jobs: {}", stats.total_jobs);
// List failed jobs with filtering
let filter = DlqFilter {
task_identifier: Some("send_email".to_string()),
queue_name: Some("notifications".to_string()),
limit: Some(50),
offset: Some(0),
};
let jobs = client.list_dlq_jobs(filter).await?;
// Requeue a job for retry
client.requeue_dlq_job(job_id, None).await?;
// Delete permanently failed jobs
client.delete_dlq_job(job_id).await?;The DLQ schema is designed for easy monitoring integration:
-- Monitor DLQ growth
SELECT COUNT(*) FROM your_schema.backfill_dlq
WHERE failed_at > NOW() - INTERVAL '1 hour';
-- Check failure patterns
SELECT task_identifier, COUNT(*), AVG(failure_count)
FROM your_schema.backfill_dlq
GROUP BY task_identifier
ORDER BY COUNT(*) DESC;
-- Recent high-failure jobs
SELECT * FROM your_schema.backfill_dlq
WHERE failure_count > 5
AND failed_at > NOW() - INTERVAL '24 hours'
ORDER BY failed_at DESC;Set up alerts when DLQ job counts exceed thresholds or when specific job types consistently fail.