From d5a1a830e69b744252aaeb68b193e143239e5d61 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Tue, 7 Apr 2026 16:15:00 -0400 Subject: [PATCH] refactor: make Postgres GC preemptible via per-batch xact locks Moves GC locking out of the GarbageCollector interface and into the datastore implementations. Postgres now acquires a transaction-level advisory lock per batch in batchDelete, so a running GC can be preempted between batches rather than holding a session-level lock for the full run. MySQL keeps session-level locks (no xact-level advisory lock equivalent) but scopes them to DeleteBeforeTx and DeleteExpiredRels. A new ErrGCPreempted sentinel lets RunGarbageCollection treat preemption as partial success: metric counters and MarkGCCompleted still fire, DeleteExpiredRels is skipped when DeleteBeforeTx is preempted, and the caller returns nil so the preempting node continues the work. --- internal/datastore/mysql/gc.go | 39 ++++-- internal/datastore/postgres/gc.go | 61 ++++++---- internal/datastore/postgres/locks.go | 16 +++ .../postgres/postgres_shared_test.go | 72 +++++++++++ pkg/datastore/gc.go | 57 ++++----- pkg/datastore/gc_test.go | 115 ++++++++++-------- 6 files changed, 243 insertions(+), 117 deletions(-) diff --git a/internal/datastore/mysql/gc.go b/internal/datastore/mysql/gc.go index d9d8fd2430..7e3ed10696 100644 --- a/internal/datastore/mysql/gc.go +++ b/internal/datastore/mysql/gc.go @@ -3,6 +3,7 @@ package mysql import ( "context" "database/sql" + "fmt" "time" sq "github.com/Masterminds/squirrel" @@ -44,14 +45,6 @@ func (mcc *mysqlGarbageCollector) Close() { mcc.isClosed = true } -func (mcc *mysqlGarbageCollector) LockForGCRun(ctx context.Context) (bool, error) { - return mcc.mds.tryAcquireLock(ctx, gcRunLock) -} - -func (mcc *mysqlGarbageCollector) UnlockAfterGCRun() error { - return mcc.mds.releaseLock(context.Background(), gcRunLock) -} - func (mcc *mysqlGarbageCollector) Now(ctx context.Context) (time.Time, error) { if mcc.isClosed { return time.Time{}, spiceerrors.MustBugf("mysqlGarbageCollector is closed") @@ -105,7 +98,9 @@ func (mcc *mysqlGarbageCollector) TxIDBefore(ctx context.Context, before time.Ti return revisions.NewForTransactionID(uintValue), nil } -// - implementation misses metrics +// NOTE: MySQL uses GET_LOCK/RELEASE_LOCK (session-level) rather than per-batch +// transaction-level locks like PostgreSQL, because MySQL lacks xact-level advisory +// locks. The lock is held for the entire method, so preemption is coarser-grained. func (mcc *mysqlGarbageCollector) DeleteBeforeTx( ctx context.Context, txID datastore.Revision, @@ -114,6 +109,19 @@ func (mcc *mysqlGarbageCollector) DeleteBeforeTx( return removed, spiceerrors.MustBugf("mysqlGarbageCollector is closed") } + acquired, err := mcc.mds.tryAcquireLock(ctx, gcRunLock) + if err != nil { + return removed, fmt.Errorf("error acquiring gc lock: %w", err) + } + if !acquired { + return removed, datastore.ErrGCPreempted + } + defer func() { + if releaseErr := mcc.mds.releaseLock(context.Background(), gcRunLock); releaseErr != nil { + log.Warn().Err(releaseErr).Msg("error releasing gc lock") + } + }() + // Delete any relationship rows with deleted_transaction <= the transaction ID. removed.Relationships, err = mcc.batchDelete(ctx, mcc.mds.driver.RelationTuple(), sq.LtOrEq{colDeletedTxn: txID}) if err != nil { @@ -139,6 +147,19 @@ func (mcc *mysqlGarbageCollector) DeleteExpiredRels(ctx context.Context) (int64, return 0, nil } + acquired, err := mcc.mds.tryAcquireLock(ctx, gcRunLock) + if err != nil { + return 0, fmt.Errorf("error acquiring gc lock: %w", err) + } + if !acquired { + return 0, datastore.ErrGCPreempted + } + defer func() { + if releaseErr := mcc.mds.releaseLock(context.Background(), gcRunLock); releaseErr != nil { + log.Warn().Err(releaseErr).Msg("error releasing gc lock") + } + }() + now, err := mcc.Now(ctx) if err != nil { return 0, err diff --git a/internal/datastore/postgres/gc.go b/internal/datastore/postgres/gc.go index 5512ecefcf..bf42e43d3e 100644 --- a/internal/datastore/postgres/gc.go +++ b/internal/datastore/postgres/gc.go @@ -7,7 +7,7 @@ import ( "time" sq "github.com/Masterminds/squirrel" - "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/authzed/spicedb/internal/datastore/common" @@ -58,14 +58,6 @@ func (pgg *pgGarbageCollector) Close() { pgg.conn.Release() } -func (pgg *pgGarbageCollector) LockForGCRun(ctx context.Context) (bool, error) { - return pgg.pgd.tryAcquireLock(ctx, pgg.conn, gcRunLock) -} - -func (pgg *pgGarbageCollector) UnlockAfterGCRun() error { - return pgg.pgd.releaseLock(context.Background(), pgg.conn, gcRunLock) -} - func (pgg *pgGarbageCollector) Now(ctx context.Context) (time.Time, error) { if pgg.isClosed { return time.Time{}, spiceerrors.MustBugf("pgGarbageCollector is closed") @@ -136,10 +128,6 @@ func (pgg *pgGarbageCollector) DeleteExpiredRels(ctx context.Context) (int64, er ) } -type exec interface { - Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error) -} - func (pgg *pgGarbageCollector) DeleteBeforeTx(ctx context.Context, txID datastore.Revision) (datastore.DeletionCounts, error) { if pgg.isClosed { return datastore.DeletionCounts{}, spiceerrors.MustBugf("pgGarbageCollector is closed") @@ -148,16 +136,22 @@ func (pgg *pgGarbageCollector) DeleteBeforeTx(ctx context.Context, txID datastor return pgg.deleteBeforeTx(ctx, pgg.conn, txID) } -func (pgg *pgGarbageCollector) deleteBeforeTx(ctx context.Context, conn exec, txID datastore.Revision) (datastore.DeletionCounts, error) { +// txBeginner is satisfied by *pgxpool.Conn (used in production) and by any +// ConnPooler (used in tests to route batch transactions through the +// QueryInterceptor). +type txBeginner interface { + Begin(ctx context.Context) (pgx.Tx, error) +} + +func (pgg *pgGarbageCollector) deleteBeforeTx(ctx context.Context, beginner txBeginner, txID datastore.Revision) (datastore.DeletionCounts, error) { revision := txID.(postgresRevision) minTxAlive := NewXid8(revision.snapshot.xmin) removed := datastore.DeletionCounts{} var err error - // Delete any relationship rows that were already dead when this transaction started removed.Relationships, err = pgg.batchDelete( ctx, - conn, + beginner, schema.TableTuple, gcPKCols, sq.Lt{schema.ColDeletedXid: minTxAlive}, @@ -167,13 +161,9 @@ func (pgg *pgGarbageCollector) deleteBeforeTx(ctx context.Context, conn exec, tx return removed, fmt.Errorf("failed to GC relationships table: %w", err) } - // Delete all transaction rows with ID < the transaction ID. - // - // We don't delete the transaction itself to ensure there is always at least - // one transaction present. removed.Transactions, err = pgg.batchDelete( ctx, - conn, + beginner, schema.TableTransaction, gcPKCols, sq.Lt{schema.ColXID: minTxAlive}, @@ -183,10 +173,9 @@ func (pgg *pgGarbageCollector) deleteBeforeTx(ctx context.Context, conn exec, tx return removed, fmt.Errorf("failed to GC transactions table: %w", err) } - // Delete any namespace rows with deleted_transaction <= the transaction ID. removed.Namespaces, err = pgg.batchDelete( ctx, - conn, + beginner, schema.TableNamespace, gcPKCols, sq.Lt{schema.ColDeletedXid: minTxAlive}, @@ -201,7 +190,7 @@ func (pgg *pgGarbageCollector) deleteBeforeTx(ctx context.Context, conn exec, tx func (pgg *pgGarbageCollector) batchDelete( ctx context.Context, - conn exec, + beginner txBeginner, tableName string, pkCols []string, filter sqlFilter, @@ -212,7 +201,6 @@ func (pgg *pgGarbageCollector) batchDelete( return -1, err } if index != nil { - // Force the proper index for the GC operation. sql = "/*+ IndexOnlyScan(" + tableName + " " + index.Name + ") */" + sql } @@ -224,11 +212,32 @@ func (pgg *pgGarbageCollector) batchDelete( var deletedCount int64 for { - cr, err := conn.Exec(ctx, query, args...) + tx, err := beginner.Begin(ctx) + if err != nil { + return deletedCount, fmt.Errorf("error starting transaction for gc batch: %w", err) + } + + locked, err := pgg.pgd.tryAcquireXactLock(ctx, tx, gcRunLock) if err != nil { + _ = tx.Rollback(ctx) + return deletedCount, fmt.Errorf("error acquiring gc lock: %w", err) + } + + if !locked { + _ = tx.Rollback(ctx) + return deletedCount, datastore.ErrGCPreempted + } + + cr, err := tx.Exec(ctx, query, args...) + if err != nil { + _ = tx.Rollback(ctx) return deletedCount, err } + if err := tx.Commit(ctx); err != nil { + return deletedCount, fmt.Errorf("error committing gc batch: %w", err) + } + rowsDeleted := cr.RowsAffected() deletedCount += rowsDeleted if rowsDeleted < gcBatchDeleteSize { diff --git a/internal/datastore/postgres/locks.go b/internal/datastore/postgres/locks.go index b15b3217c4..5536edbcbf 100644 --- a/internal/datastore/postgres/locks.go +++ b/internal/datastore/postgres/locks.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) @@ -38,6 +39,21 @@ func (pgd *pgDatastore) tryAcquireLock(ctx context.Context, conn *pgxpool.Conn, return lockAcquired, nil } +// tryAcquireXactLock attempts to acquire a transaction-level advisory lock. +// The lock is automatically released when the transaction commits or rolls back. +// Returns true if the lock was acquired, false if another session holds it. +func (pgd *pgDatastore) tryAcquireXactLock(ctx context.Context, tx pgx.Tx, lockID lockID) (bool, error) { + row := tx.QueryRow(ctx, ` + SELECT pg_try_advisory_xact_lock($1) + `, lockID) + + var lockAcquired bool + if err := row.Scan(&lockAcquired); err != nil { + return false, err + } + return lockAcquired, nil +} + func (pgd *pgDatastore) releaseLock(ctx context.Context, conn *pgxpool.Conn, lockID lockID) error { row := conn.QueryRow(ctx, ` SELECT pg_advisory_unlock($1) diff --git a/internal/datastore/postgres/postgres_shared_test.go b/internal/datastore/postgres/postgres_shared_test.go index 0f43b6274c..4ba3a330a2 100644 --- a/internal/datastore/postgres/postgres_shared_test.go +++ b/internal/datastore/postgres/postgres_shared_test.go @@ -107,6 +107,21 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { WriteConnsMinOpen(10), WriteConnsMaxOpen(10), )) + + t.Run("TestGCPreemption", createMultiDatastoreTest( + b, + GCPreemptionTest, + RevisionQuantization(0), + GCWindow(1*time.Millisecond), + GCInterval(veryLargeGCInterval), + WatchBufferLength(50), + MigrationPhase(config.migrationPhase), + ReadConnsMinOpen(10), + ReadConnsMaxOpen(10), + WriteConnsMinOpen(10), + WriteConnsMaxOpen(10), + WithRevisionHeartbeat(false), + )) }) t.Run(fmt.Sprintf("%spostgres-%s-%s-%s", pgbouncerStr, config.pgVersion, config.targetMigration, config.migrationPhase), func(t *testing.T) { @@ -1780,6 +1795,63 @@ func LockingTest(t *testing.T, ds datastore.Datastore, ds2 datastore.Datastore) require.NoError(t, err) } +// GCPreemptionTest verifies that when another session holds the GC advisory +// lock, batchDelete returns ErrGCPreempted instead of blocking, and that GC +// succeeds again once the holder releases the lock. +func GCPreemptionTest(t *testing.T, ds datastore.Datastore, ds2 datastore.Datastore) { + require := require.New(t) + + pds := ds.(*pgDatastore) + pds2 := ds2.(*pgDatastore) + ctx := t.Context() + + // Seed two writes so there is something for GC to delete. + _, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + return rwt.LegacyWriteNamespaces(ctx, + namespace.Namespace("resource", namespace.MustRelation("reader", nil)), + namespace.Namespace("user")) + }) + require.NoError(err) + + writeRev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { + return rwt.LegacyWriteNamespaces(ctx, + namespace.Namespace("resource", + namespace.MustRelation("reader", nil), + namespace.MustRelation("unused", nil)), + namespace.Namespace("user")) + }) + require.NoError(err) + + // Hold the GC xact-lock from a second session via an uncommitted transaction. + holderConn, err := pds2.writePool.Acquire(ctx) + require.NoError(err) + defer holderConn.Release() + + holderTx, err := holderConn.Begin(ctx) + require.NoError(err) + // Ensure the lock is released even if an assertion fails. + defer func() { _ = holderTx.Rollback(ctx) }() + + acquired, err := pds2.tryAcquireXactLock(ctx, holderTx, gcRunLock) + require.NoError(err) + require.True(acquired, "holder should have acquired the GC lock") + + // GC from the other datastore should observe the lock and preempt. + pgg, err := pds.BuildGarbageCollector(ctx) + require.NoError(err) + defer pgg.Close() + + _, err = pgg.DeleteBeforeTx(ctx, writeRev) + require.ErrorIs(err, datastore.ErrGCPreempted) + + // Release the lock by rolling back the holder transaction, then GC should succeed. + require.NoError(holderTx.Rollback(ctx)) + + removed, err := pgg.DeleteBeforeTx(ctx, writeRev) + require.NoError(err) + require.GreaterOrEqual(removed.Transactions, int64(1)) +} + func StrictReadModeFallbackTest(t *testing.T, primaryDS datastore.Datastore, unwrappedReplicaDS datastore.Datastore) { require := require.New(t) diff --git a/pkg/datastore/gc.go b/pkg/datastore/gc.go index f9a9433504..24d90fb8e9 100644 --- a/pkg/datastore/gc.go +++ b/pkg/datastore/gc.go @@ -2,6 +2,7 @@ package datastore import ( "context" + "errors" "fmt" "time" @@ -61,6 +62,11 @@ var ( gcFailureCounter = prometheus.NewCounter(gcFailureCounterConfig) ) +// ErrGCPreempted is returned when a garbage collection run is preempted by +// another node acquiring the GC lock. This is not an error condition — it +// means another node took over GC and this node should stop. +var ErrGCPreempted = errors.New("gc preempted by another node") + // RegisterGCMetrics registers garbage collection metrics to the default // registry and returns them (so that they be unregistered). func RegisterGCMetrics() ([]prometheus.Collector, error) { @@ -105,16 +111,6 @@ type GarbageCollectableDatastore interface { // GarbageCollector represents a runnable garbage collector for a datastore. type GarbageCollector interface { - // LockForGCRun attempts to acquire a lock for garbage collection. This lock - // is typically done at the datastore level, to ensure that no other nodes are - // running garbage collection at the same time. - LockForGCRun(ctx context.Context) (bool, error) - - // UnlockAfterGCRun releases the lock after a garbage collection run. - // NOTE: this method does not take a context, as the context used for the - // reset of the GC run can be canceled/timed out and the unlock will still need to happen. - UnlockAfterGCRun() error - // Now returns the current time from the datastore. Now(context.Context) (time.Time, error) @@ -122,9 +118,11 @@ type GarbageCollector interface { TxIDBefore(context.Context, time.Time) (Revision, error) // DeleteBeforeTx deletes all data before the provided transaction ID. + // Returns ErrGCPreempted if another node acquired the GC lock. DeleteBeforeTx(ctx context.Context, txID Revision) (DeletionCounts, error) // DeleteExpiredRels deletes all relationships that have expired. + // Returns ErrGCPreempted if another node acquired the GC lock. DeleteExpiredRels(ctx context.Context) (int64, error) // Close closes the garbage collector. @@ -230,26 +228,6 @@ func RunGarbageCollection(ctx context.Context, collectable GarbageCollectableDat } defer gc.Close() - ok, err := gc.LockForGCRun(ctx) - if err != nil { - return fmt.Errorf("error locking for gc run: %w", err) - } - - if !ok { - log.Info(). - Msg("datastore garbage collection already in progress on another node") - return nil - } - - defer func() { - err := gc.UnlockAfterGCRun() - if err != nil { - log.Error(). - Err(err). - Msg("error unlocking after gc run") - } - }() - now, err := gc.Now(ctx) if err != nil { return fmt.Errorf("error retrieving now: %w", err) @@ -262,7 +240,11 @@ func RunGarbageCollection(ctx context.Context, collectable GarbageCollectableDat collected, err := gc.DeleteBeforeTx(ctx, watermark) - expiredRelationshipsCount, eerr := gc.DeleteExpiredRels(ctx) + var expiredRelationshipsCount int64 + var eerr error + if !errors.Is(err, ErrGCPreempted) { + expiredRelationshipsCount, eerr = gc.DeleteExpiredRels(ctx) + } // even if an error happened, garbage would have been collected. This makes sure these are reflected even if the // worker eventually fails or times out. @@ -273,6 +255,19 @@ func RunGarbageCollection(ctx context.Context, collectable GarbageCollectableDat collectionDuration := time.Since(startTime) gcDurationHistogram.Observe(collectionDuration.Seconds()) + if errors.Is(err, ErrGCPreempted) || errors.Is(eerr, ErrGCPreempted) { + log.Ctx(ctx).Info(). + Stringer("highestTxID", watermark). + Dur("duration", collectionDuration). + Interface("collected", collected). + Int64("expiredRelationships", expiredRelationshipsCount). + Msg("datastore garbage collection partially completed: preempted by another node") + // Still mark as completed — partial GC is valid, and the other node + // will continue the work. + collectable.MarkGCCompleted() + return nil + } + if err != nil { return fmt.Errorf("error deleting in gc: %w", err) } diff --git a/pkg/datastore/gc_test.go b/pkg/datastore/gc_test.go index 3f23885a70..f75516e94c 100644 --- a/pkg/datastore/gc_test.go +++ b/pkg/datastore/gc_test.go @@ -35,8 +35,6 @@ type fakeGC struct { metrics gcMetrics // GUARDED_BY(lock) deleter gcDeleter // GUARDED_BY(lock) lastRevision uint64 // GUARDED_BY(lock) - wasLocked bool // GUARDED_BY(lock) - wasUnlocked bool // GUARDED_BY(lock) } type gcMetrics struct { @@ -85,20 +83,6 @@ func newFakeGCStore(deleter gcDeleter) *fakeGCStore { } } -func (gc *fakeGC) LockForGCRun(ctx context.Context) (bool, error) { - gc.lock.Lock() - defer gc.lock.Unlock() - gc.wasLocked = true - return true, nil -} - -func (gc *fakeGC) UnlockAfterGCRun() error { - gc.lock.Lock() - defer gc.lock.Unlock() - gc.wasUnlocked = true - return nil -} - func (*fakeGC) Now(_ context.Context) (time.Time, error) { return time.Now(), nil } @@ -134,17 +118,6 @@ type gcDeleter interface { DeleteExpiredRels() (int64, error) } -// Always error trying to perform a delete -type alwaysErrorDeleter struct{} - -func (alwaysErrorDeleter) DeleteBeforeTx(_ uint64) (DeletionCounts, error) { - return DeletionCounts{}, fmt.Errorf("delete error") -} - -func (alwaysErrorDeleter) DeleteExpiredRels() (int64, error) { - return 0, fmt.Errorf("delete error") -} - // Only error on specific revisions type revisionErrorDeleter struct { errorOnRevisions []uint64 @@ -241,31 +214,71 @@ func TestGCFailureBackoffReset(t *testing.T) { require.Greater(t, gc.markedCompleteCount, 20, "Next interval was not reset with backoff") } -func TestGCUnlockOnTimeout(t *testing.T) { - gc := newFakeGCStore(alwaysErrorDeleter{}) +// preemptingDeleter returns ErrGCPreempted from the selected methods. +type preemptingDeleter struct { + preemptDeleteBeforeTx bool + preemptDeleteExpiredRels bool +} - errCh := make(chan error, 1) - hasRunChan := make(chan bool, 1) - synctest.Test(t, func(t *testing.T) { - ctx, cancel := context.WithCancel(t.Context()) - t.Cleanup(func() { - cancel() - }) - go func() { - errCh <- StartGarbageCollector(ctx, gc, 10*time.Millisecond, 10*time.Second, 1*time.Minute) - }() - time.Sleep(30 * time.Millisecond) - hasRunChan <- gc.HasGCRun() - cancel() - synctest.Wait() - }) - require.Error(t, <-errCh) - require.False(t, <-hasRunChan, "GC should not have run because it should always be erroring.") +func (d *preemptingDeleter) DeleteBeforeTx(_ uint64) (DeletionCounts, error) { + if d.preemptDeleteBeforeTx { + return DeletionCounts{}, ErrGCPreempted + } + return DeletionCounts{}, nil +} - // TODO: should this be inside the goroutine as well? - gc.fakeGC.lock.Lock() - defer gc.fakeGC.lock.Unlock() +func (d *preemptingDeleter) DeleteExpiredRels() (int64, error) { + if d.preemptDeleteExpiredRels { + return 0, ErrGCPreempted + } + return 0, nil +} - require.True(t, gc.fakeGC.wasLocked, "GC should have been locked") - require.True(t, gc.fakeGC.wasUnlocked, "GC should have been unlocked") +func TestGCPreemption(t *testing.T) { + cases := []struct { + name string + preemptDeleteBeforeTx bool + preemptDeleteExpiredRels bool + expectDeleteBeforeTxCount int + expectDeleteExpiredRelsCount int + }{ + { + name: "preempted on DeleteBeforeTx skips DeleteExpiredRels", + preemptDeleteBeforeTx: true, + expectDeleteBeforeTxCount: 1, + expectDeleteExpiredRelsCount: 0, + }, + { + name: "preempted on DeleteExpiredRels", + preemptDeleteExpiredRels: true, + expectDeleteBeforeTxCount: 1, + expectDeleteExpiredRelsCount: 1, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + gc := newFakeGCStore(&preemptingDeleter{ + preemptDeleteBeforeTx: tc.preemptDeleteBeforeTx, + preemptDeleteExpiredRels: tc.preemptDeleteExpiredRels, + }) + + synctest.Test(t, func(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + + err := RunGarbageCollection(ctx, gc, 10*time.Second) + require.NoError(t, err, "preemption should not be treated as an error") + }) + + gc.lock.Lock() + defer gc.lock.Unlock() + require.Positive(t, gc.markedCompleteCount, "preemption should still mark GC as completed") + + gc.fakeGC.lock.Lock() + defer gc.fakeGC.lock.Unlock() + require.Equal(t, tc.expectDeleteBeforeTxCount, gc.fakeGC.metrics.deleteBeforeTxCount) + require.Equal(t, tc.expectDeleteExpiredRelsCount, gc.fakeGC.metrics.deleteExpiredRelsCount) + }) + } }