Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

### Fixed
- Query plan contexts are written to during recursive calls -- for now, disble dispatch inside recursive calls (https://github.com/authzed/spicedb/pull/3078)
- Add a timeout to callers of singeflighted code (https://github.com/authzed/spicedb/pull/3048)

## [1.52.0] - 2026-04-30
### Added
Expand Down
8 changes: 2 additions & 6 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/shopspring/decimal"
"go.opentelemetry.io/otel"
"golang.org/x/sync/errgroup"
"resenje.org/singleflight"

"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/datastore/crdb/migrations"
Expand Down Expand Up @@ -278,7 +277,6 @@ type crdbDatastore struct {
beginChangefeedQuery string
transactionNowQuery string

featuresGroup singleflight.Group[string, *datastore.Features]
cachedFeatures *datastore.Features // GUARDED_BY(featuresLock)
featuresLock sync.Mutex

Expand Down Expand Up @@ -562,9 +560,7 @@ func (cds *crdbDatastore) Features(ctx context.Context) (*datastore.Features, er
return cached, nil
}

features, _, err := cds.featuresGroup.Do(ctx, "", func(ictx context.Context) (*datastore.Features, error) {
return cds.features(ictx)
})
Comment on lines -565 to -567
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI this is a refactor - i dislike having singleflight uses scattered in many places.

features, err := cds.features(ctx)
if err != nil {
return nil, err
}
Expand All @@ -573,7 +569,7 @@ func (cds *crdbDatastore) Features(ctx context.Context) (*datastore.Features, er
cds.cachedFeatures = features
cds.featuresLock.Unlock()

return features, err
return features, nil
}

func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, error) {
Expand Down
39 changes: 38 additions & 1 deletion internal/datastore/proxy/singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@

import (
"context"
"time"

"resenje.org/singleflight"

"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
)

// singleflightTimeout is the maximum time that a caller of a singleflighted method
// will wait before giving up on the singleflight executor.
// This prevents a possible deadlock when all datastore connections are held by goroutines waiting on the
// singleflight while the singleflight executor is blocked waiting for a connection.
const singleFlightTimeout = 1 * time.Second

// NewSingleflightDatastoreProxy creates a new Datastore proxy which
// deduplicates calls to Datastore methods that can share results.
func NewSingleflightDatastoreProxy(d datastore.Datastore) datastore.Datastore {
Expand All @@ -19,6 +26,7 @@
headRevGroup singleflight.Group[string, datastore.RevisionWithSchemaHash]
checkRevGroup singleflight.Group[string, string]
statsGroup singleflight.Group[string, datastore.Stats]
featuresGroup singleflight.Group[string, *datastore.Features]
delegate datastore.Datastore
}

Expand All @@ -43,17 +51,31 @@
func (p *singleflightProxy) OptimizedRevision(ctx context.Context) (datastore.RevisionWithSchemaHash, error) {
// NOTE: Optimized revisions are singleflighted by the underlying datastore via the
// CachedOptimizedRevisions struct.
ctx, span := tracer.Start(ctx, "singleflightProxy.OptimizedRevision")
defer span.End()
return p.delegate.OptimizedRevision(ctx)
}

func (p *singleflightProxy) CheckRevision(ctx context.Context, revision datastore.Revision) error {
ctx, span := tracer.Start(ctx, "singleflightProxy.CheckRevision")
defer span.End()

Check warning on line 62 in internal/datastore/proxy/singleflight.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/proxy/singleflight.go#L62

Added line #L62 was not covered by tests
ctx, cancel := context.WithTimeout(ctx, singleFlightTimeout)
defer cancel()

Check warning on line 65 in internal/datastore/proxy/singleflight.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/proxy/singleflight.go#L65

Added line #L65 was not covered by tests
_, _, err := p.checkRevGroup.Do(ctx, revision.String(), func(ctx context.Context) (string, error) {
return "", p.delegate.CheckRevision(ctx, revision)
})
return err
}

func (p *singleflightProxy) HeadRevision(ctx context.Context) (datastore.RevisionWithSchemaHash, error) {
ctx, span := tracer.Start(ctx, "singleflightProxy.HeadRevision")
defer span.End()

Check warning on line 75 in internal/datastore/proxy/singleflight.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/proxy/singleflight.go#L75

Added line #L75 was not covered by tests
ctx, cancel := context.WithTimeout(ctx, singleFlightTimeout)
defer cancel()

Check warning on line 78 in internal/datastore/proxy/singleflight.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/proxy/singleflight.go#L78

Added line #L78 was not covered by tests
rev, _, err := p.headRevGroup.Do(ctx, "", func(ctx context.Context) (datastore.RevisionWithSchemaHash, error) {
return p.delegate.HeadRevision(ctx)
})
Expand All @@ -69,14 +91,29 @@
}

func (p *singleflightProxy) Statistics(ctx context.Context) (datastore.Stats, error) {
ctx, span := tracer.Start(ctx, "singleflightProxy.Statistics")
defer span.End()

ctx, cancel := context.WithTimeout(ctx, singleFlightTimeout)
defer cancel()

stats, _, err := p.statsGroup.Do(ctx, "", func(ctx context.Context) (datastore.Stats, error) {
return p.delegate.Statistics(ctx)
})
return stats, err
}

func (p *singleflightProxy) Features(ctx context.Context) (*datastore.Features, error) {
return p.delegate.Features(ctx)
ctx, span := tracer.Start(ctx, "singleflightProxy.Features")
defer span.End()

ctx, cancel := context.WithTimeout(ctx, singleFlightTimeout)
defer cancel()

features, _, err := p.featuresGroup.Do(ctx, "", func(ctx context.Context) (*datastore.Features, error) {
return p.delegate.Features(ctx)
})
return features, err
}

func (p *singleflightProxy) OfflineFeatures() (*datastore.Features, error) {
Expand Down
9 changes: 9 additions & 0 deletions internal/datastore/revisions/optimized.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ import (
"github.com/authzed/spicedb/pkg/datastore"
)

// singleflightTimeout is the maximum time that a caller of a singleflighted method
// will wait before giving up on the singleflight executor.
// This prevents a possible deadlock when all datastore connections are held by goroutines waiting on the
// singleflight while the singleflight executor is blocked waiting for a connection.
const singleFlightTimeout = 1 * time.Second

var tracer = otel.Tracer("spicedb/internal/datastore/common/revisions")

// OptimizedRevisionFunction instructs the datastore to compute its own current
Expand Down Expand Up @@ -64,6 +70,9 @@ func (cor *CachedOptimizedRevisions) OptimizedRevision(ctx context.Context) (dat
}
cor.RUnlock()

ctx, cancel := context.WithTimeout(ctx, singleFlightTimeout)
defer cancel()

result, _, err := cor.updateGroup.Do(ctx, "", func(ctx context.Context) (datastore.RevisionWithSchemaHash, error) {
log.Ctx(ctx).Debug().Time("now", localNow).Msg("computing new revision")

Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func TestServerGracefulTerminationOnError(t *testing.T) {
}, WithPresharedSecureKey("psk"), WithDatastore(ds), WithEnableMemoryProtectionMiddleware(false))
cancel()
_, err = c.Complete(ctx)
require.NoError(t, err)
require.ErrorIs(t, err, context.Canceled)
}

func TestReplaceUnaryMiddleware(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/datalayer/hashcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (

var tracer = otel.Tracer("spicedb/pkg/datalayer")

// singleflightTimeout is the maximum time to wait for a singleflight peer to
// load a schema before falling back to a direct load. This prevents a possible
// deadlock when all connections in a pool are held by goroutines waiting on the
// singleflight while the singleflight leader is blocked waiting for a connection.
// singleflightTimeout is the maximum time that a caller of a singleflighted method
// will wait before giving up on the singleflight executor.
// This prevents a possible deadlock when all datastore connections are held by goroutines waiting on the
// singleflight while the singleflight executor is blocked waiting for a connection.
//
//nolint:revive // var instead of const to allow test overrides
var singleflightTimeout = 1 * time.Second
Expand Down
10 changes: 10 additions & 0 deletions pkg/datastore/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package datastore
import (
"context"

"go.opentelemetry.io/otel"

"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
)

var tracer = otel.Tracer("spicedb/pkg/datastore/context")

// NewSeparatingContextDatastoreProxy severs any timeouts in the context being
// passed to the datastore and only retains tracing metadata.
//
Expand Down Expand Up @@ -49,14 +53,20 @@ func (p *ctxProxy) IsStrictReadModeEnabled() bool {
}

func (p *ctxProxy) OptimizedRevision(ctx context.Context) (RevisionWithSchemaHash, error) {
ctx, span := tracer.Start(ctx, "ctxProxy.OptimizedRevision")
defer span.End()
return p.delegate.OptimizedRevision(context.WithoutCancel(ctx))
}

func (p *ctxProxy) CheckRevision(ctx context.Context, revision Revision) error {
ctx, span := tracer.Start(ctx, "ctxProxy.CheckRevision")
defer span.End()
return p.delegate.CheckRevision(context.WithoutCancel(ctx), revision)
}

func (p *ctxProxy) HeadRevision(ctx context.Context) (RevisionWithSchemaHash, error) {
ctx, span := tracer.Start(ctx, "ctxProxy.HeadRevision")
defer span.End()
return p.delegate.HeadRevision(context.WithoutCancel(ctx))
}

Expand Down
Loading