diff --git a/internal/dispatch/remote/cluster.go b/internal/dispatch/remote/cluster.go index 21e3d7634e..6d4cc55ea2 100644 --- a/internal/dispatch/remote/cluster.go +++ b/internal/dispatch/remote/cluster.go @@ -423,6 +423,7 @@ type receiver[S any] interface { const ( secondaryCursorPrefix = "$$secondary:" primaryDispatcher = "$primary" + noDispatcherResults = "$$no_dispatcher_results" ) func publishClient[R any](ctx context.Context, client receiver[R], reqKey string, stream dispatch.Stream[R], secondaryDispatchName string) error { @@ -565,7 +566,7 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any]( wg.Add(len(validSecondaryDispatchers)) - returnedResultsDispatcherName := atomic.NewString("") + returnedResultsDispatcherName := atomic.NewString(noDispatcherResults) primarySleeper := cr.newPrimarySleeper(reqKey, tuple.FromCoreRelationReference(req.GetResourceRelation()), @@ -574,6 +575,12 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any]( ) runHandler := func(name string, clusterClient ClusterClient) { + if name == "" { + log.Warn().Msg("attempting to run a dispatch handler with an empty name, skipping") + wg.Done() + return + } + ctx := contexts[name].ctx log.Debug().Str("dispatcher", name).Msg("running secondary dispatcher") defer wg.Done() @@ -644,13 +651,14 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any]( // If a valid result, and we have not yet returned any results, try take a "lock" on // returning results to ensure only a single dispatcher returns results. - swapped := returnedResultsDispatcherName.CompareAndSwap("", name) + swapped := returnedResultsDispatcherName.CompareAndSwap(noDispatcherResults, name) if !swapped { // Another dispatcher has started returning results, so terminate. log.Trace().Str("dispatcher", name).Msg("another dispatcher has already returned results") return } + log.Trace().Str("dispatcher", name).Msg("this dispatcher is the first to return results, will publish them and cancel the others") dispatchCounter.WithLabelValues(reqKey, name).Add(1) // Cancel all other contexts to prevent them from running, or stop them @@ -658,6 +666,7 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any]( log.Trace().Str("dispatcher", name).Msg("canceling other dispatchers") for key, ctxAndCancel := range contexts { if key != name { + log.Trace().Str("canceling-dispatcher", key).Msg("canceling dispatcher context") ctxAndCancel.cancel() } } @@ -706,8 +715,14 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any]( // Check for the first dispatcher that returned results and return its error, if any. resultHandlerName := returnedResultsDispatcherName.Load() - if resultHandlerName != "" { + if resultHandlerName == "" { + log.Error().Msg("got empty result handler name; this should never happen") + return spiceerrors.MustBugf("got empty result handler name") + } + + if resultHandlerName != noDispatcherResults { if err, ok := errorsByDispatcherName.Load(resultHandlerName); ok { + log.Warn().Err(err).Str("dispatcher", resultHandlerName).Msg("dispatcher that returned results encountered an error during streaming") return err } return nil