Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion src/backend/distributed/operations/shard_cleaner.c
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,29 @@ TupleToCleanupRecord(HeapTuple heapTuple, TupleDesc tupleDescriptor)
/*
* CleanupRecordExists returns whether a cleanup record with the given
* record ID exists in pg_dist_cleanup.
*
* The scan deliberately uses a freshly acquired GetLatestSnapshot()
* rather than the caller's transaction snapshot. This is required so
* that the cleanup worker, after a successful TryLockOperationId(),
* can observe a deletion of the cleanup record committed by the
* operation owner immediately before it released the operation-ID
* advisory lock. Reading under the older outer-transaction snapshot
* (the default when NULL is passed to systable_beginscan) would still
* see the row as live and trigger a redundant drop of the underlying
* resource. Callers that need to read pg_dist_cleanup under their
* own (possibly older) snapshot semantics should NOT use this helper.
*
* Note: there is no automated regression test for this race today.
* TryLockOperationId() uses dontWait = true (cleanup never blocks),
* so pg_isolationtester cannot orchestrate the required interleave
* because it schedules sessions by detecting *blocked* steps. A
* deterministic test will require Citus to adopt PostgreSQL's
* INJECTION_POINT() infrastructure (or an equivalent test-only
* delay GUC inserted between TryLockOperationId() success and this
* scan) so a competing op-completing session can be raced into the
* gap. Until that test infrastructure exists, the GetLatestSnapshot()
* call below is load-bearing: replacing it with NULL silently
* reintroduces the double-drop race.
*/
static bool
CleanupRecordExists(uint64 recordId)
Expand All @@ -1192,10 +1215,14 @@ CleanupRecordExists(uint64 recordId)
ScanKeyInit(&scanKey[0], Anum_pg_dist_cleanup_record_id,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(recordId));

/* make sure we can always see deletion after acquiring the operation ID lock */
Copy link
Copy Markdown
Contributor

@colm-mchugh colm-mchugh May 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comment format strays from convention. Maybe add this text to the function comment at line 1179 ?

Snapshot snapshot = GetLatestSnapshot();

SysScanDesc scanDescriptor = systable_beginscan(pgDistCleanup,
DistCleanupPrimaryKeyIndexId(),
indexOK,
NULL, scanKeyCount, scanKey);
snapshot,
scanKeyCount, scanKey);

HeapTuple heapTuple = systable_getnext(scanDescriptor);
bool recordExists = HeapTupleIsValid(heapTuple);
Expand Down
Loading