Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Added
- Added support for YAML-based validation files in DevContext (https://github.com/authzed/spicedb/pull/3024)
- Added support for YAML-based validation files in the Language Server (https://github.com/authzed/spicedb/pull/3024)
- CockroachDB: sentinel-drain cancel handling for connection pools. Cancelled contexts now send a PostgreSQL cancel request to CRDB (stopping server-side work early) and drain the connection deterministically before returning it to the pool, preventing pool depletion. The `SeparatingContextDatastoreProxy` (which previously severed read contexts to avoid this problem) has been removed. The `write-conn-acquisition-timeout` default is now 0 (disabled). Controlled by `--datastore-experimental-cancel-draining` (default: on).

### Changed
- Removed MySQL metrics prefixed with `go_sql_stats_connections_*` in favor of those prefixed with `go_sql_*` (https://github.com/authzed/spicedb/pull/2980)
Expand Down
9 changes: 6 additions & 3 deletions docs/spicedb.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,5 @@ require (
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/jackc/pgx/v5 => github.com/ecordell/pgx/v5 v5.2.1-0.20260421021323-3c7831eb5e6f
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/ecordell/optgen v0.2.6 h1:qPglm/JyuW6vL9IYnSwgFMces2dLFCuEd2Yw6B0VRmU=
github.com/ecordell/optgen v0.2.6/go.mod h1:pqjipFkG6vAwvKgjPGWaZyqmtWAqdb2w6EcTnP+kgqQ=
github.com/ecordell/pgx/v5 v5.2.1-0.20260421021323-3c7831eb5e6f h1:goNAeHXhnD58PqnWB4o3dxkN+GT/s32F8w71eAFDdss=
github.com/ecordell/pgx/v5 v5.2.1-0.20260421021323-3c7831eb5e6f/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -1046,8 +1048,6 @@ github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7Ulw
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx-zerolog v0.0.0-20230315001418-f978528409eb h1:pSv+zRVeAYjbXRFjyytFIMRBSKWVowCi7KbXSMR/+ug=
github.com/jackc/pgx-zerolog v0.0.0-20230315001418-f978528409eb/go.mod h1:CRUuPsmIajLt3dZIlJ5+O8IDSib6y8yrst8DkCthTa4=
github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc=
github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jeroenrinzema/psql-wire v0.17.0 h1:2U5ElqxglXbStaoh6liohLjxkWIjvUamgVwcr8a90Mk=
Expand Down
27 changes: 21 additions & 6 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ccoveille/go-safecast/v2"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgconn/ctxwatch"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/prometheus/client_golang/prometheus"
"github.com/shopspring/decimal"
Expand Down Expand Up @@ -202,6 +203,18 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas

// this ctx and cancel is tied to the lifetime of the datastore
ds.ctx, ds.cancel = context.WithCancel(context.Background())

if config.experimentalCancelDraining {
// Install cancel-and-drain handler on both pools: on context cancellation,
// pgx sends a PostgreSQL cancel request and drains any in-flight 57014 via
// SELECT 1 before returning the connection to the pool.
cancelHandler := func(pgConn *pgconn.PgConn) ctxwatch.Handler {
return &pgconn.CancelAndDrainContextWatcherHandler{Conn: pgConn}
}
readPoolConfig.ConnConfig.BuildContextWatcherHandler = cancelHandler
writePoolConfig.ConnConfig.BuildContextWatcherHandler = cancelHandler
}

ds.writePool, err = pool.NewRetryPool(ds.ctx, "write", writePoolConfig, healthChecker, config.maxRetries, config.connectRate)
if err != nil {
ds.cancel()
Expand Down Expand Up @@ -242,18 +255,20 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
})
}

// When cancel draining is disabled, wrap with the context-severing proxy to
// restore the pre-cancellation behavior: read contexts are severed so
// cancellations never reach the pool (preventing connection closure).
if !config.experimentalCancelDraining {
return datastore.NewSeparatingContextDatastoreProxy(ds), nil
}

return ds, nil
}

// NewCRDBDatastore initializes a SpiceDB datastore that uses a CockroachDB
// database while leveraging its AOST functionality.
func NewCRDBDatastore(ctx context.Context, url string, options ...Option) (datastore.Datastore, error) {
ds, err := newCRDBDatastore(ctx, url, options...)
if err != nil {
return nil, err
}

return datastore.NewSeparatingContextDatastoreProxy(ds), nil
return newCRDBDatastore(ctx, url, options...)
}

type crdbDatastore struct {
Expand Down
35 changes: 32 additions & 3 deletions internal/datastore/crdb/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type crdbOptions struct {
includeQueryParametersInTraces bool
watchDisabled bool
acquireTimeout time.Duration
experimentalCancelDraining bool
}

const (
Expand All @@ -61,14 +62,21 @@ const (
defaultEnablePrometheusStats = false
defaultEnableConnectionBalancing = true
defaultConnectRate = 100 * time.Millisecond
defaultAcquireTimeout = 30 * time.Millisecond
defaultFilterMaximumIDCount = 100
defaultWithIntegrity = false
defaultColumnOptimizationOption = common.ColumnOptimizationOptionStaticValues
defaultIncludeQueryParametersInTraces = false
defaultWatchDisabled = false
)

// defaultAcquireTimeout is 0 (disabled). The write-conn-acquisition-timeout
// previously existed as backpressure against pool depletion caused by
// cancelled connections closing their DB connection. With the cancel handler
// installed, cancelled connections are drained and returned to the pool instead
// of closed, so pool depletion no longer occurs. Operators may set a non-zero
// value to enable explicit load-shedding via ResourceExhausted.
var defaultAcquireTimeout = time.Duration(0)

// Option provides the facility to configure how clients within the CRDB
// datastore interact with the running CockroachDB database.
type Option func(*crdbOptions)
Expand All @@ -94,6 +102,7 @@ func generateConfig(options []Option) (crdbOptions, error) {
includeQueryParametersInTraces: defaultIncludeQueryParametersInTraces,
watchDisabled: defaultWatchDisabled,
acquireTimeout: defaultAcquireTimeout,
experimentalCancelDraining: true,
}

for _, option := range options {
Expand Down Expand Up @@ -124,6 +133,14 @@ func generateConfig(options []Option) (crdbOptions, error) {
computed.writePoolOpts.ConnMaxLifetimeJitter = ptr.To(30 * time.Minute)
}

// When cancel draining is disabled, restore the 30ms acquisition timeout if
// the caller hasn't set an explicit non-zero value. The timeout exists as
// backpressure against pool depletion; without cancel draining the pool can
// be depleted by cancelled connections, so the timeout is load-bearing again.
if !computed.experimentalCancelDraining && computed.acquireTimeout == 0 {
computed.acquireTimeout = 30 * time.Millisecond
}

return computed, nil
}

Expand Down Expand Up @@ -399,8 +416,20 @@ func WithWatchDisabled(isDisabled bool) Option {
return func(po *crdbOptions) { po.watchDisabled = isDisabled }
}

// WithAcquireTimeout configures the amount of time to wait to acquire a connection
// from the pool with Try* methods before applying backpressure.
// WithExperimentalCancelDraining enables cancel-and-drain handling on both
// connection pools. When enabled, cancelled contexts send a PostgreSQL cancel
// request to CRDB (stopping server-side work early) and drain the connection
// with SELECT 1 before returning it to the pool, preventing pool depletion.
// The SeparatingContextDatastoreProxy is also removed when this is enabled.
// Default: true. Disable to revert to pre-cancellation behavior if issues arise.
func WithExperimentalCancelDraining(enabled bool) Option {
return func(po *crdbOptions) { po.experimentalCancelDraining = enabled }
}

// WithAcquireTimeout configures the amount of time to wait to acquire a write
// connection from the pool before returning ResourceExhausted to the caller.
// Default is 0 (disabled — wait indefinitely). Set a non-zero value to enable
// explicit load-shedding.
func WithAcquireTimeout(timeout time.Duration) Option {
return func(po *crdbOptions) { po.acquireTimeout = timeout }
}
45 changes: 45 additions & 0 deletions internal/datastore/crdb/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,48 @@ func TestConfiguration(t *testing.T) {
})
}
}

func TestDefaultAcquireTimeoutIsZero(t *testing.T) {
config, err := generateConfig(nil)
require.NoError(t, err)
require.Equal(t, time.Duration(0), config.acquireTimeout,
"default acquireTimeout should be 0 (disabled) since cancel-and-drain handler keeps pool healthy")
}

func TestAcquireTimeoutWithCancelDraining(t *testing.T) {
tests := []struct {
name string
options []Option
expectedTimeout time.Duration
}{
{
name: "cancel draining enabled (default): timeout stays 0",
options: []Option{},
expectedTimeout: 0,
},
{
name: "cancel draining disabled: timeout restored to 30ms",
options: []Option{WithExperimentalCancelDraining(false)},
expectedTimeout: 30 * time.Millisecond,
},
{
name: "cancel draining disabled with explicit timeout: explicit value wins",
options: []Option{WithExperimentalCancelDraining(false), WithAcquireTimeout(100 * time.Millisecond)},
expectedTimeout: 100 * time.Millisecond,
},
{
name: "cancel draining disabled with explicit zero: treated same as unset, gets 30ms",
// There is no sentinel to distinguish "explicitly set to 0" from "default 0",
// so disabling cancel draining always restores 30ms when timeout is 0.
options: []Option{WithExperimentalCancelDraining(false), WithAcquireTimeout(0)},
expectedTimeout: 30 * time.Millisecond,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config, err := generateConfig(tt.options)
require.NoError(t, err)
require.Equal(t, tt.expectedTimeout, config.acquireTimeout)
})
}
}
10 changes: 8 additions & 2 deletions internal/datastore/crdb/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func (p *RetryPool) withRetries(ctx context.Context, acquireTimeout time.Duratio
err = wrapRetryableError(ctx, fn(conn))
if err == nil {
conn.Release()
conn = nil // suppress the deferred release
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a potential footgun. Is there a different place that we can set the defer so that we don't have to do this bookkeeping?

if retries > 0 {
log.Ctx(ctx).Info().Uint8("retries", retries).Msg("resettable database error succeeded after retry")
}
Expand All @@ -319,12 +320,13 @@ func (p *RetryPool) withRetries(ctx context.Context, acquireTimeout time.Duratio
resettable *ResettableError
retryable *RetryableError
)
if errors.As(err, &resettable) || conn.Conn().IsClosed() {
if errors.As(err, &resettable) || (conn != nil && conn.Conn().IsClosed()) {
log.Ctx(ctx).Info().Err(err).Uint8("retries", retries).Msg("resettable error")

nodeID := p.Node(conn.Conn())
p.GC(conn.Conn())
conn.Release()
conn = nil // will be reassigned by acquireFromDifferentNode below

// After a resettable error, mark the node as unhealthy
// The health tracker enforces an error rate, so a single request
Expand All @@ -346,7 +348,10 @@ func (p *RetryPool) withRetries(ctx context.Context, acquireTimeout time.Duratio
common.SleepOnErr(ctx, err, retries)
continue
}
conn.Release()
if conn != nil {
conn.Release()
conn = nil // suppress the deferred release
}

// error is not resettable or retryable
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
Expand Down Expand Up @@ -459,5 +464,6 @@ func wrapRetryableError(ctx context.Context, err error) error {
if IsRetryableError(ctx, err) {
return &RetryableError{Err: err}
}

return err
}
Loading
Loading