Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 86 additions & 61 deletions src/backend/distributed/utils/background_jobs.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,85 +244,110 @@ citus_task_wait(PG_FUNCTION_ARGS)


/*
* citus_job_wait_internal implements the waiting on a job for reuse in other areas where
* we want to wait on jobs. eg the background rebalancer.
* citus_job_wait_internal implements the waiting on a job, e.g. for the background
* rebalancer. If desiredStatus is provided, we throw an error if we reach a
* different terminal state that can never transition to the desired state.
*
* When a desiredStatus is provided it will provide an error when a different state is
* reached and the state cannot ever reach the desired state anymore.
* With the PG_TRY/PG_CATCH block, if the user cancels this SQL statement
* (Ctrl+C, statement_timeout, etc.), we will cancel the job in progress
* so it doesn't remain running in background.
*/
void
citus_job_wait_internal(int64 jobid, BackgroundJobStatus *desiredStatus)
{
/*
* Since we are wait polling we will actually allocate memory on every poll. To make
* sure we don't put unneeded pressure on the memory we create a context that we clear
* every iteration.
*/
MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext,
"JobsWaitContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContext oldContext = MemoryContextSwitchTo(waitContext);

while (true)
PG_TRY();
{
MemoryContextReset(waitContext);

BackgroundJob *job = GetBackgroundJobByJobId(jobid);
if (!job)
{
ereport(ERROR, (errmsg("no job found for job with jobid: %ld", jobid)));
}

if (desiredStatus && job->state == *desiredStatus)
/*
* Since we are wait polling, we actually allocate memory on every poll. To avoid
* putting unneeded pressure on memory, we create a context that we reset
* every iteration.
*/
MemoryContext waitContext = AllocSetContextCreate(CurrentMemoryContext,
"JobsWaitContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContext oldContext = MemoryContextSwitchTo(waitContext);

while (true)
{
/* job has reached its desired status, done waiting */
break;
}
MemoryContextReset(waitContext);

if (IsBackgroundJobStatusTerminal(job->state))
{
if (desiredStatus)
BackgroundJob *job = GetBackgroundJobByJobId(jobid);
if (!job)
{
/*
* We have reached a terminal state, which is not the desired state we
* were waiting for, otherwise we would have escaped earlier. Since it is
* a terminal state we know that we can never reach the desired state.
*/
ereport(ERROR,
(errmsg("no job found for job with jobid: %ld", jobid)));
}

Oid reachedStatusOid = BackgroundJobStatusOid(job->state);
Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out,
reachedStatusOid);
char *reachedStatusName = DatumGetCString(reachedStatusNameDatum);
/* If we have a desiredStatus and we've reached it, we're done */
if (desiredStatus && job->state == *desiredStatus)
{
break;
}

Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus);
Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out,
desiredStatusOid);
char *desiredStatusName = DatumGetCString(desiredStatusNameDatum);
/* If the job is in a terminal state (e.g. SUCCEEDED, FAILED, or CANCELED),
* but not the desired state, throw an error or stop waiting.
*/
if (IsBackgroundJobStatusTerminal(job->state))
{
if (desiredStatus)
{
Oid reachedStatusOid = BackgroundJobStatusOid(job->state);
Datum reachedStatusNameDatum = DirectFunctionCall1(enum_out,
reachedStatusOid);
char *reachedStatusName = DatumGetCString(reachedStatusNameDatum);

Oid desiredStatusOid = BackgroundJobStatusOid(*desiredStatus);
Datum desiredStatusNameDatum = DirectFunctionCall1(enum_out,
desiredStatusOid);
char *desiredStatusName = DatumGetCString(desiredStatusNameDatum);

ereport(ERROR,
(errmsg(
"Job reached terminal state \"%s\" instead of desired "
"state \"%s\"", reachedStatusName,
desiredStatusName)));
}

ereport(ERROR,
(errmsg("Job reached terminal state \"%s\" instead of desired "
"state \"%s\"", reachedStatusName, desiredStatusName)));
/* Otherwise, if no desiredStatus was given, we accept this terminal state. */
break;
}

/* job has reached its terminal state, done waiting */
break;
}
/* Before sleeping, check for user interrupts (Ctrl+C, statement_timeout, etc.) */
CHECK_FOR_INTERRUPTS();

/* sleep for a while, before rechecking the job status */
CHECK_FOR_INTERRUPTS();
const long delay_ms = 1000;
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
delay_ms,
WAIT_EVENT_PG_SLEEP);
/* Sleep 1 second before re-checking the job status */
const long delay_ms = 1000;
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
delay_ms,
WAIT_EVENT_PG_SLEEP);

ResetLatch(MyLatch);
ResetLatch(MyLatch);
}

MemoryContextSwitchTo(oldContext);
MemoryContextDelete(waitContext);
}
PG_CATCH();
{
/*
* If we get here, the user canceled the statement or an ERROR occurred.
* We forcibly cancel the job so that it doesn't remain running in background.
* This ensures no "zombie" shard moves or leftover replication slots.
*/

MemoryContextSwitchTo(oldContext);
MemoryContextDelete(waitContext);
/* Switch out of the waitContext so we can safely do cleanup in TopMemoryContext. */
MemoryContextSwitchTo(TopMemoryContext);

/* Attempt to cancel the job; if it's already in a terminal state, that's okay. */
(void) DirectFunctionCall1(citus_job_cancel, Int64GetDatum(jobid));

/* Re-throw the original error so Postgres knows this statement was canceled. */
PG_RE_THROW();
}
PG_END_TRY();
}


Expand Down
155 changes: 155 additions & 0 deletions src/test/regress/expected/issue_7896.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
---------------------------------------------------------------------
-- Regression Test: Simulate zombie replication slot when
-- citus_rebalance_wait() is canceled.
--
-- In the buggy behavior, canceling citus_rebalance_wait()
-- (via a short statement_timeout or Ctrl+C) leaves behind an active logical
-- replication slot on a worker. This, in turn, prevents DROP DATABASE
-- (with FORCE) from succeeding.
--
-- With the fix applied, the underlying rebalance job is canceled,
-- no zombie slot remains.
---------------------------------------------------------------------
---------------------------------------------------------------------
-- 1) Setup the test environment.
---------------------------------------------------------------------
SET citus.next_shard_id TO 17560000;
CREATE SCHEMA issue_7896;
SET search_path TO issue_7896;
SET client_min_messages TO ERROR;
---------------------------------------------------------------------
-- 2) Set cluster parameters and initialize environment.
---------------------------------------------------------------------
SET citus.shard_replication_factor TO 2;
SET citus.enable_repartition_joins TO ON;
-- For faster background task processing, set a short background task queue interval.
ALTER SYSTEM SET citus.background_task_queue_interval TO '1s';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)

---------------------------------------------------------------------
-- 3) Create a distributed table.
---------------------------------------------------------------------
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (a int PRIMARY KEY);
SELECT create_distributed_table('t1', 'a', shard_count => 4, colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------

(1 row)

---------------------------------------------------------------------
-- 4) Insert enough data so that a rebalance has measurable work.
---------------------------------------------------------------------
INSERT INTO t1
SELECT generate_series(1, 1000000);
---------------------------------------------------------------------
-- 5) Verify that a rebalance on a balanced cluster is a no-op.
---------------------------------------------------------------------
SELECT 1 FROM citus_rebalance_start();
?column?
---------------------------------------------------------------------
1
(1 row)

-- Expected: NOTICE "No moves available for rebalancing".
SELECT citus_rebalance_wait();
citus_rebalance_wait
---------------------------------------------------------------------

(1 row)

-- Expected: WARNING "no ongoing rebalance that can be waited on".
---------------------------------------------------------------------
-- 6) Force a shard movement so that a rebalance job is scheduled.
-- Remove and re-add a worker using a parameter placeholder.
---------------------------------------------------------------------
SELECT citus_remove_node('localhost', :worker_2_port);
citus_remove_node
---------------------------------------------------------------------

(1 row)

SELECT citus_add_node('localhost', :worker_2_port);
citus_add_node
---------------------------------------------------------------------
30
(1 row)

---------------------------------------------------------------------
-- 7) Start a rebalance job that will do actual work.
---------------------------------------------------------------------
SELECT citus_rebalance_start(
rebalance_strategy := 'by_disk_size',
shard_transfer_mode := 'force_logical'
);
citus_rebalance_start
---------------------------------------------------------------------
1
(1 row)

-- Expected: Notice that moves are scheduled as a background job.
---------------------------------------------------------------------
-- 8) Attempt to wait on the rebalance with a short timeout so that the wait
-- is canceled. The PG_CATCH block in citus_job_wait_internal should then
-- cancel the underlying job (cleaning up temporary replication slots).
---------------------------------------------------------------------
SET statement_timeout = '2s';
SET client_min_messages TO NOTICE;
DO $$
BEGIN
BEGIN
RAISE NOTICE 'Waiting on rebalance with a 2-second timeout...';
-- Public function citus_rebalance_wait() takes no arguments.
PERFORM citus_rebalance_wait();
EXCEPTION
WHEN query_canceled THEN
RAISE NOTICE 'Rebalance wait canceled as expected';
-- Your fix should cancel the underlying rebalance job.
END;
END;
$$ LANGUAGE plpgsql;
NOTICE: Waiting on rebalance with a 2-second timeout...
CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE
NOTICE: Rebalance wait canceled as expected
CONTEXT: PL/pgSQL function inline_code_block line XX at RAISE
SET statement_timeout = '0';
SET client_min_messages TO ERROR;
---------------------------------------------------------------------
-- 9) Cleanup orphaned background resources (if any).
---------------------------------------------------------------------
CALL citus_cleanup_orphaned_resources();
---------------------------------------------------------------------
-- 10) Traverse nodes and check for active replication slots.
--
-- Connect to the coordinator and worker nodes, then query for replication slots.
-- Expected Outcome (with the fix applied): No active replication slots.
---------------------------------------------------------------------
\c - - - :master_port
SELECT * FROM pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced
---------------------------------------------------------------------
(0 rows)

\c - - - :worker_1_port
SELECT * FROM pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced
---------------------------------------------------------------------
(0 rows)

\c - - - :worker_2_port
SELECT * FROM pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size | two_phase | inactive_since | conflicting | invalidation_reason | failover | synced
---------------------------------------------------------------------
(0 rows)

---------------------------------------------------------------------
-- 11) Cleanup: Drop the test schema.
---------------------------------------------------------------------
\c - - - :master_port
SET search_path TO issue_7896;
SET client_min_messages TO WARNING;
DROP SCHEMA IF EXISTS issue_7896 CASCADE;
1 change: 1 addition & 0 deletions src/test/regress/multi_mx_schedule
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ test: local_shard_execution_dropped_column
test: metadata_sync_helpers

test: issue_6592
test: issue_7896
test: executor_local_failure

# test that no tests leaked intermediate results. This should always be last
Expand Down
Loading