Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions internal/dispatch/remote/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@
const (
secondaryCursorPrefix = "$$secondary:"
primaryDispatcher = "$primary"
noDispatcherResults = "$$no_dispatcher_results"
)

func publishClient[R any](ctx context.Context, client receiver[R], reqKey string, stream dispatch.Stream[R], secondaryDispatchName string) error {
Expand Down Expand Up @@ -565,7 +566,7 @@

wg.Add(len(validSecondaryDispatchers))

returnedResultsDispatcherName := atomic.NewString("")
returnedResultsDispatcherName := atomic.NewString(noDispatcherResults)

primarySleeper := cr.newPrimarySleeper(reqKey,
tuple.FromCoreRelationReference(req.GetResourceRelation()),
Expand All @@ -574,6 +575,12 @@
)

runHandler := func(name string, clusterClient ClusterClient) {
if name == "" {
log.Warn().Msg("attempting to run a dispatch handler with an empty name, skipping")
wg.Done()
return
}

Check warning on line 582 in internal/dispatch/remote/cluster.go

View check run for this annotation

Codecov / codecov/patch

internal/dispatch/remote/cluster.go#L579-L582

Added lines #L579 - L582 were not covered by tests

ctx := contexts[name].ctx
log.Debug().Str("dispatcher", name).Msg("running secondary dispatcher")
defer wg.Done()
Expand Down Expand Up @@ -644,20 +651,22 @@

// If a valid result, and we have not yet returned any results, try take a "lock" on
// returning results to ensure only a single dispatcher returns results.
swapped := returnedResultsDispatcherName.CompareAndSwap("", name)
swapped := returnedResultsDispatcherName.CompareAndSwap(noDispatcherResults, name)
if !swapped {
// Another dispatcher has started returning results, so terminate.
log.Trace().Str("dispatcher", name).Msg("another dispatcher has already returned results")
return
}

log.Trace().Str("dispatcher", name).Msg("this dispatcher is the first to return results, will publish them and cancel the others")
dispatchCounter.WithLabelValues(reqKey, name).Add(1)

// Cancel all other contexts to prevent them from running, or stop them
// from running if started.
log.Trace().Str("dispatcher", name).Msg("canceling other dispatchers")
for key, ctxAndCancel := range contexts {
if key != name {
log.Trace().Str("canceling-dispatcher", key).Msg("canceling dispatcher context")
ctxAndCancel.cancel()
}
}
Expand Down Expand Up @@ -706,8 +715,14 @@

// Check for the first dispatcher that returned results and return its error, if any.
resultHandlerName := returnedResultsDispatcherName.Load()
if resultHandlerName != "" {
if resultHandlerName == "" {
log.Error().Msg("got empty result handler name; this should never happen")
return spiceerrors.MustBugf("got empty result handler name")
}

Check warning on line 721 in internal/dispatch/remote/cluster.go

View check run for this annotation

Codecov / codecov/patch

internal/dispatch/remote/cluster.go#L719-L721

Added lines #L719 - L721 were not covered by tests

if resultHandlerName != noDispatcherResults {
if err, ok := errorsByDispatcherName.Load(resultHandlerName); ok {
log.Warn().Err(err).Str("dispatcher", resultHandlerName).Msg("dispatcher that returned results encountered an error during streaming")

Check warning on line 725 in internal/dispatch/remote/cluster.go

View check run for this annotation

Codecov / codecov/patch

internal/dispatch/remote/cluster.go#L725

Added line #L725 was not covered by tests
return err
}
return nil
Expand Down
Loading