From a0c6c4e7b57c61a3932ac1d5e5570d3deb98bf00 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Wed, 15 Apr 2026 11:44:15 -0700 Subject: [PATCH] fix: put a timeout to callers of methods that use singleflight --- CHANGELOG.md | 1 + internal/datastore/crdb/crdb.go | 8 ++--- internal/datastore/proxy/singleflight.go | 39 ++++++++++++++++++++++- internal/datastore/revisions/optimized.go | 9 ++++++ pkg/cmd/server/server_test.go | 2 +- pkg/datalayer/hashcache.go | 8 ++--- pkg/datastore/context.go | 10 ++++++ 7 files changed, 65 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 62536492e4..c2a254e6cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Fixed - Query plan contexts are written to during recursive calls -- for now, disble dispatch inside recursive calls (https://github.com/authzed/spicedb/pull/3078) +- Add a timeout to callers of singeflighted code (https://github.com/authzed/spicedb/pull/3048) ## [1.52.0] - 2026-04-30 ### Added diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index 842e3d69e1..974f798c0e 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -20,7 +20,6 @@ import ( "github.com/shopspring/decimal" "go.opentelemetry.io/otel" "golang.org/x/sync/errgroup" - "resenje.org/singleflight" "github.com/authzed/spicedb/internal/datastore/common" "github.com/authzed/spicedb/internal/datastore/crdb/migrations" @@ -278,7 +277,6 @@ type crdbDatastore struct { beginChangefeedQuery string transactionNowQuery string - featuresGroup singleflight.Group[string, *datastore.Features] cachedFeatures *datastore.Features // GUARDED_BY(featuresLock) featuresLock sync.Mutex @@ -562,9 +560,7 @@ func (cds *crdbDatastore) Features(ctx context.Context) (*datastore.Features, er return cached, nil } - features, _, err := cds.featuresGroup.Do(ctx, "", func(ictx context.Context) (*datastore.Features, error) { - return cds.features(ictx) - }) + features, err := cds.features(ctx) if err != nil { return nil, err } @@ -573,7 +569,7 @@ func (cds *crdbDatastore) Features(ctx context.Context) (*datastore.Features, er cds.cachedFeatures = features cds.featuresLock.Unlock() - return features, err + return features, nil } func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, error) { diff --git a/internal/datastore/proxy/singleflight.go b/internal/datastore/proxy/singleflight.go index 77cbf3f549..062bbf460c 100644 --- a/internal/datastore/proxy/singleflight.go +++ b/internal/datastore/proxy/singleflight.go @@ -2,6 +2,7 @@ package proxy import ( "context" + "time" "resenje.org/singleflight" @@ -9,6 +10,12 @@ import ( "github.com/authzed/spicedb/pkg/datastore/options" ) +// singleflightTimeout is the maximum time that a caller of a singleflighted method +// will wait before giving up on the singleflight executor. +// This prevents a possible deadlock when all datastore connections are held by goroutines waiting on the +// singleflight while the singleflight executor is blocked waiting for a connection. +const singleFlightTimeout = 1 * time.Second + // NewSingleflightDatastoreProxy creates a new Datastore proxy which // deduplicates calls to Datastore methods that can share results. func NewSingleflightDatastoreProxy(d datastore.Datastore) datastore.Datastore { @@ -19,6 +26,7 @@ type singleflightProxy struct { headRevGroup singleflight.Group[string, datastore.RevisionWithSchemaHash] checkRevGroup singleflight.Group[string, string] statsGroup singleflight.Group[string, datastore.Stats] + featuresGroup singleflight.Group[string, *datastore.Features] delegate datastore.Datastore } @@ -43,10 +51,18 @@ func (p *singleflightProxy) ReadWriteTx(ctx context.Context, f datastore.TxUserF func (p *singleflightProxy) OptimizedRevision(ctx context.Context) (datastore.RevisionWithSchemaHash, error) { // NOTE: Optimized revisions are singleflighted by the underlying datastore via the // CachedOptimizedRevisions struct. + ctx, span := tracer.Start(ctx, "singleflightProxy.OptimizedRevision") + defer span.End() return p.delegate.OptimizedRevision(ctx) } func (p *singleflightProxy) CheckRevision(ctx context.Context, revision datastore.Revision) error { + ctx, span := tracer.Start(ctx, "singleflightProxy.CheckRevision") + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, singleFlightTimeout) + defer cancel() + _, _, err := p.checkRevGroup.Do(ctx, revision.String(), func(ctx context.Context) (string, error) { return "", p.delegate.CheckRevision(ctx, revision) }) @@ -54,6 +70,12 @@ func (p *singleflightProxy) CheckRevision(ctx context.Context, revision datastor } func (p *singleflightProxy) HeadRevision(ctx context.Context) (datastore.RevisionWithSchemaHash, error) { + ctx, span := tracer.Start(ctx, "singleflightProxy.HeadRevision") + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, singleFlightTimeout) + defer cancel() + rev, _, err := p.headRevGroup.Do(ctx, "", func(ctx context.Context) (datastore.RevisionWithSchemaHash, error) { return p.delegate.HeadRevision(ctx) }) @@ -69,6 +91,12 @@ func (p *singleflightProxy) Watch(ctx context.Context, afterRevision datastore.R } func (p *singleflightProxy) Statistics(ctx context.Context) (datastore.Stats, error) { + ctx, span := tracer.Start(ctx, "singleflightProxy.Statistics") + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, singleFlightTimeout) + defer cancel() + stats, _, err := p.statsGroup.Do(ctx, "", func(ctx context.Context) (datastore.Stats, error) { return p.delegate.Statistics(ctx) }) @@ -76,7 +104,16 @@ func (p *singleflightProxy) Statistics(ctx context.Context) (datastore.Stats, er } func (p *singleflightProxy) Features(ctx context.Context) (*datastore.Features, error) { - return p.delegate.Features(ctx) + ctx, span := tracer.Start(ctx, "singleflightProxy.Features") + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, singleFlightTimeout) + defer cancel() + + features, _, err := p.featuresGroup.Do(ctx, "", func(ctx context.Context) (*datastore.Features, error) { + return p.delegate.Features(ctx) + }) + return features, err } func (p *singleflightProxy) OfflineFeatures() (*datastore.Features, error) { diff --git a/internal/datastore/revisions/optimized.go b/internal/datastore/revisions/optimized.go index 7e5a381b10..7ba001348b 100644 --- a/internal/datastore/revisions/optimized.go +++ b/internal/datastore/revisions/optimized.go @@ -17,6 +17,12 @@ import ( "github.com/authzed/spicedb/pkg/datastore" ) +// singleflightTimeout is the maximum time that a caller of a singleflighted method +// will wait before giving up on the singleflight executor. +// This prevents a possible deadlock when all datastore connections are held by goroutines waiting on the +// singleflight while the singleflight executor is blocked waiting for a connection. +const singleFlightTimeout = 1 * time.Second + var tracer = otel.Tracer("spicedb/internal/datastore/common/revisions") // OptimizedRevisionFunction instructs the datastore to compute its own current @@ -64,6 +70,9 @@ func (cor *CachedOptimizedRevisions) OptimizedRevision(ctx context.Context) (dat } cor.RUnlock() + ctx, cancel := context.WithTimeout(ctx, singleFlightTimeout) + defer cancel() + result, _, err := cor.updateGroup.Do(ctx, "", func(ctx context.Context) (datastore.RevisionWithSchemaHash, error) { log.Ctx(ctx).Debug().Time("now", localNow).Msg("computing new revision") diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index f604fb277b..9fae2357d2 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -471,7 +471,7 @@ func TestServerGracefulTerminationOnError(t *testing.T) { }, WithPresharedSecureKey("psk"), WithDatastore(ds), WithEnableMemoryProtectionMiddleware(false)) cancel() _, err = c.Complete(ctx) - require.NoError(t, err) + require.ErrorIs(t, err, context.Canceled) } func TestReplaceUnaryMiddleware(t *testing.T) { diff --git a/pkg/datalayer/hashcache.go b/pkg/datalayer/hashcache.go index 39970d43f6..05d62c37fe 100644 --- a/pkg/datalayer/hashcache.go +++ b/pkg/datalayer/hashcache.go @@ -16,10 +16,10 @@ import ( var tracer = otel.Tracer("spicedb/pkg/datalayer") -// singleflightTimeout is the maximum time to wait for a singleflight peer to -// load a schema before falling back to a direct load. This prevents a possible -// deadlock when all connections in a pool are held by goroutines waiting on the -// singleflight while the singleflight leader is blocked waiting for a connection. +// singleflightTimeout is the maximum time that a caller of a singleflighted method +// will wait before giving up on the singleflight executor. +// This prevents a possible deadlock when all datastore connections are held by goroutines waiting on the +// singleflight while the singleflight executor is blocked waiting for a connection. // //nolint:revive // var instead of const to allow test overrides var singleflightTimeout = 1 * time.Second diff --git a/pkg/datastore/context.go b/pkg/datastore/context.go index 434fdea646..64a94043c9 100644 --- a/pkg/datastore/context.go +++ b/pkg/datastore/context.go @@ -3,10 +3,14 @@ package datastore import ( "context" + "go.opentelemetry.io/otel" + "github.com/authzed/spicedb/pkg/datastore/options" core "github.com/authzed/spicedb/pkg/proto/core/v1" ) +var tracer = otel.Tracer("spicedb/pkg/datastore/context") + // NewSeparatingContextDatastoreProxy severs any timeouts in the context being // passed to the datastore and only retains tracing metadata. // @@ -49,14 +53,20 @@ func (p *ctxProxy) IsStrictReadModeEnabled() bool { } func (p *ctxProxy) OptimizedRevision(ctx context.Context) (RevisionWithSchemaHash, error) { + ctx, span := tracer.Start(ctx, "ctxProxy.OptimizedRevision") + defer span.End() return p.delegate.OptimizedRevision(context.WithoutCancel(ctx)) } func (p *ctxProxy) CheckRevision(ctx context.Context, revision Revision) error { + ctx, span := tracer.Start(ctx, "ctxProxy.CheckRevision") + defer span.End() return p.delegate.CheckRevision(context.WithoutCancel(ctx), revision) } func (p *ctxProxy) HeadRevision(ctx context.Context) (RevisionWithSchemaHash, error) { + ctx, span := tracer.Start(ctx, "ctxProxy.HeadRevision") + defer span.End() return p.delegate.HeadRevision(context.WithoutCancel(ctx)) }