Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions internal/datastore/mysql/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"context"
"database/sql"
"fmt"
"time"

sq "github.com/Masterminds/squirrel"
Expand Down Expand Up @@ -44,14 +45,6 @@
mcc.isClosed = true
}

func (mcc *mysqlGarbageCollector) LockForGCRun(ctx context.Context) (bool, error) {
return mcc.mds.tryAcquireLock(ctx, gcRunLock)
}

func (mcc *mysqlGarbageCollector) UnlockAfterGCRun() error {
return mcc.mds.releaseLock(context.Background(), gcRunLock)
}

func (mcc *mysqlGarbageCollector) Now(ctx context.Context) (time.Time, error) {
if mcc.isClosed {
return time.Time{}, spiceerrors.MustBugf("mysqlGarbageCollector is closed")
Expand Down Expand Up @@ -105,7 +98,9 @@
return revisions.NewForTransactionID(uintValue), nil
}

// - implementation misses metrics
// NOTE: MySQL uses GET_LOCK/RELEASE_LOCK (session-level) rather than per-batch
// transaction-level locks like PostgreSQL, because MySQL lacks xact-level advisory
// locks. The lock is held for the entire method, so preemption is coarser-grained.
func (mcc *mysqlGarbageCollector) DeleteBeforeTx(
ctx context.Context,
txID datastore.Revision,
Expand All @@ -114,6 +109,19 @@
return removed, spiceerrors.MustBugf("mysqlGarbageCollector is closed")
}

acquired, err := mcc.mds.tryAcquireLock(ctx, gcRunLock)
if err != nil {
return removed, fmt.Errorf("error acquiring gc lock: %w", err)
}

Check warning on line 115 in internal/datastore/mysql/gc.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/mysql/gc.go#L114-L115

Added lines #L114 - L115 were not covered by tests
if !acquired {
return removed, datastore.ErrGCPreempted
}

Check warning on line 118 in internal/datastore/mysql/gc.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/mysql/gc.go#L117-L118

Added lines #L117 - L118 were not covered by tests
defer func() {
if releaseErr := mcc.mds.releaseLock(context.Background(), gcRunLock); releaseErr != nil {
log.Warn().Err(releaseErr).Msg("error releasing gc lock")
}

Check warning on line 122 in internal/datastore/mysql/gc.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/mysql/gc.go#L121-L122

Added lines #L121 - L122 were not covered by tests
}()

// Delete any relationship rows with deleted_transaction <= the transaction ID.
removed.Relationships, err = mcc.batchDelete(ctx, mcc.mds.driver.RelationTuple(), sq.LtOrEq{colDeletedTxn: txID})
if err != nil {
Expand All @@ -139,6 +147,19 @@
return 0, nil
}

acquired, err := mcc.mds.tryAcquireLock(ctx, gcRunLock)
if err != nil {
return 0, fmt.Errorf("error acquiring gc lock: %w", err)
}

Check warning on line 153 in internal/datastore/mysql/gc.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/mysql/gc.go#L152-L153

Added lines #L152 - L153 were not covered by tests
if !acquired {
return 0, datastore.ErrGCPreempted
}

Check warning on line 156 in internal/datastore/mysql/gc.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/mysql/gc.go#L155-L156

Added lines #L155 - L156 were not covered by tests
defer func() {
if releaseErr := mcc.mds.releaseLock(context.Background(), gcRunLock); releaseErr != nil {
log.Warn().Err(releaseErr).Msg("error releasing gc lock")
}

Check warning on line 160 in internal/datastore/mysql/gc.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/mysql/gc.go#L159-L160

Added lines #L159 - L160 were not covered by tests
}()

now, err := mcc.Now(ctx)
if err != nil {
return 0, err
Expand Down
61 changes: 35 additions & 26 deletions internal/datastore/postgres/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"time"

sq "github.com/Masterminds/squirrel"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"

"github.com/authzed/spicedb/internal/datastore/common"
Expand Down Expand Up @@ -58,14 +58,6 @@
pgg.conn.Release()
}

func (pgg *pgGarbageCollector) LockForGCRun(ctx context.Context) (bool, error) {
return pgg.pgd.tryAcquireLock(ctx, pgg.conn, gcRunLock)
}

func (pgg *pgGarbageCollector) UnlockAfterGCRun() error {
return pgg.pgd.releaseLock(context.Background(), pgg.conn, gcRunLock)
}

func (pgg *pgGarbageCollector) Now(ctx context.Context) (time.Time, error) {
if pgg.isClosed {
return time.Time{}, spiceerrors.MustBugf("pgGarbageCollector is closed")
Expand Down Expand Up @@ -136,10 +128,6 @@
)
}

type exec interface {
Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)
}

func (pgg *pgGarbageCollector) DeleteBeforeTx(ctx context.Context, txID datastore.Revision) (datastore.DeletionCounts, error) {
if pgg.isClosed {
return datastore.DeletionCounts{}, spiceerrors.MustBugf("pgGarbageCollector is closed")
Expand All @@ -148,16 +136,22 @@
return pgg.deleteBeforeTx(ctx, pgg.conn, txID)
}

func (pgg *pgGarbageCollector) deleteBeforeTx(ctx context.Context, conn exec, txID datastore.Revision) (datastore.DeletionCounts, error) {
// txBeginner is satisfied by *pgxpool.Conn (used in production) and by any
// ConnPooler (used in tests to route batch transactions through the
// QueryInterceptor).
type txBeginner interface {
Begin(ctx context.Context) (pgx.Tx, error)
}

func (pgg *pgGarbageCollector) deleteBeforeTx(ctx context.Context, beginner txBeginner, txID datastore.Revision) (datastore.DeletionCounts, error) {
revision := txID.(postgresRevision)

minTxAlive := NewXid8(revision.snapshot.xmin)
removed := datastore.DeletionCounts{}
var err error
// Delete any relationship rows that were already dead when this transaction started
removed.Relationships, err = pgg.batchDelete(
ctx,
conn,
beginner,
schema.TableTuple,
gcPKCols,
sq.Lt{schema.ColDeletedXid: minTxAlive},
Expand All @@ -167,13 +161,9 @@
return removed, fmt.Errorf("failed to GC relationships table: %w", err)
}

// Delete all transaction rows with ID < the transaction ID.
//
// We don't delete the transaction itself to ensure there is always at least
// one transaction present.
removed.Transactions, err = pgg.batchDelete(
ctx,
conn,
beginner,
schema.TableTransaction,
gcPKCols,
sq.Lt{schema.ColXID: minTxAlive},
Expand All @@ -183,10 +173,9 @@
return removed, fmt.Errorf("failed to GC transactions table: %w", err)
}

// Delete any namespace rows with deleted_transaction <= the transaction ID.
removed.Namespaces, err = pgg.batchDelete(
ctx,
conn,
beginner,
schema.TableNamespace,
gcPKCols,
sq.Lt{schema.ColDeletedXid: minTxAlive},
Expand All @@ -201,7 +190,7 @@

func (pgg *pgGarbageCollector) batchDelete(
ctx context.Context,
conn exec,
beginner txBeginner,
tableName string,
pkCols []string,
filter sqlFilter,
Expand All @@ -212,7 +201,6 @@
return -1, err
}
if index != nil {
// Force the proper index for the GC operation.
sql = "/*+ IndexOnlyScan(" + tableName + " " + index.Name + ") */" + sql
}

Expand All @@ -224,11 +212,32 @@

var deletedCount int64
for {
cr, err := conn.Exec(ctx, query, args...)
tx, err := beginner.Begin(ctx)
if err != nil {
return deletedCount, fmt.Errorf("error starting transaction for gc batch: %w", err)
}

locked, err := pgg.pgd.tryAcquireXactLock(ctx, tx, gcRunLock)
if err != nil {
_ = tx.Rollback(ctx)
return deletedCount, fmt.Errorf("error acquiring gc lock: %w", err)
}

Check warning on line 224 in internal/datastore/postgres/gc.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/postgres/gc.go#L222-L224

Added lines #L222 - L224 were not covered by tests

if !locked {
_ = tx.Rollback(ctx)
return deletedCount, datastore.ErrGCPreempted
}

cr, err := tx.Exec(ctx, query, args...)
if err != nil {
_ = tx.Rollback(ctx)

Check warning on line 233 in internal/datastore/postgres/gc.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/postgres/gc.go#L233

Added line #L233 was not covered by tests
return deletedCount, err
}

if err := tx.Commit(ctx); err != nil {
return deletedCount, fmt.Errorf("error committing gc batch: %w", err)
}

Check warning on line 239 in internal/datastore/postgres/gc.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/postgres/gc.go#L238-L239

Added lines #L238 - L239 were not covered by tests

rowsDeleted := cr.RowsAffected()
deletedCount += rowsDeleted
if rowsDeleted < gcBatchDeleteSize {
Expand Down
16 changes: 16 additions & 0 deletions internal/datastore/postgres/locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"context"
"fmt"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)

Expand Down Expand Up @@ -38,6 +39,21 @@
return lockAcquired, nil
}

// tryAcquireXactLock attempts to acquire a transaction-level advisory lock.
// The lock is automatically released when the transaction commits or rolls back.
// Returns true if the lock was acquired, false if another session holds it.
func (pgd *pgDatastore) tryAcquireXactLock(ctx context.Context, tx pgx.Tx, lockID lockID) (bool, error) {
row := tx.QueryRow(ctx, `
SELECT pg_try_advisory_xact_lock($1)
`, lockID)

var lockAcquired bool
if err := row.Scan(&lockAcquired); err != nil {
return false, err
}

Check warning on line 53 in internal/datastore/postgres/locks.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/postgres/locks.go#L52-L53

Added lines #L52 - L53 were not covered by tests
return lockAcquired, nil
}

func (pgd *pgDatastore) releaseLock(ctx context.Context, conn *pgxpool.Conn, lockID lockID) error {
row := conn.QueryRow(ctx, `
SELECT pg_advisory_unlock($1)
Expand Down
72 changes: 72 additions & 0 deletions internal/datastore/postgres/postgres_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,21 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) {
WriteConnsMinOpen(10),
WriteConnsMaxOpen(10),
))

t.Run("TestGCPreemption", createMultiDatastoreTest(
b,
GCPreemptionTest,
RevisionQuantization(0),
GCWindow(1*time.Millisecond),
GCInterval(veryLargeGCInterval),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
ReadConnsMinOpen(10),
ReadConnsMaxOpen(10),
WriteConnsMinOpen(10),
WriteConnsMaxOpen(10),
WithRevisionHeartbeat(false),
))
})

t.Run(fmt.Sprintf("%spostgres-%s-%s-%s", pgbouncerStr, config.pgVersion, config.targetMigration, config.migrationPhase), func(t *testing.T) {
Expand Down Expand Up @@ -1780,6 +1795,63 @@ func LockingTest(t *testing.T, ds datastore.Datastore, ds2 datastore.Datastore)
require.NoError(t, err)
}

// GCPreemptionTest verifies that when another session holds the GC advisory
// lock, batchDelete returns ErrGCPreempted instead of blocking, and that GC
// succeeds again once the holder releases the lock.
func GCPreemptionTest(t *testing.T, ds datastore.Datastore, ds2 datastore.Datastore) {
require := require.New(t)

pds := ds.(*pgDatastore)
pds2 := ds2.(*pgDatastore)
ctx := t.Context()

// Seed two writes so there is something for GC to delete.
_, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error {
return rwt.LegacyWriteNamespaces(ctx,
namespace.Namespace("resource", namespace.MustRelation("reader", nil)),
namespace.Namespace("user"))
})
require.NoError(err)

writeRev, err := ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error {
return rwt.LegacyWriteNamespaces(ctx,
namespace.Namespace("resource",
namespace.MustRelation("reader", nil),
namespace.MustRelation("unused", nil)),
namespace.Namespace("user"))
})
require.NoError(err)

// Hold the GC xact-lock from a second session via an uncommitted transaction.
holderConn, err := pds2.writePool.Acquire(ctx)
require.NoError(err)
defer holderConn.Release()

holderTx, err := holderConn.Begin(ctx)
require.NoError(err)
// Ensure the lock is released even if an assertion fails.
defer func() { _ = holderTx.Rollback(ctx) }()

acquired, err := pds2.tryAcquireXactLock(ctx, holderTx, gcRunLock)
require.NoError(err)
require.True(acquired, "holder should have acquired the GC lock")

// GC from the other datastore should observe the lock and preempt.
pgg, err := pds.BuildGarbageCollector(ctx)
require.NoError(err)
defer pgg.Close()

_, err = pgg.DeleteBeforeTx(ctx, writeRev)
require.ErrorIs(err, datastore.ErrGCPreempted)

// Release the lock by rolling back the holder transaction, then GC should succeed.
require.NoError(holderTx.Rollback(ctx))

removed, err := pgg.DeleteBeforeTx(ctx, writeRev)
require.NoError(err)
require.GreaterOrEqual(removed.Transactions, int64(1))
}

func StrictReadModeFallbackTest(t *testing.T, primaryDS datastore.Datastore, unwrappedReplicaDS datastore.Datastore) {
require := require.New(t)

Expand Down
Loading
Loading