From dd4de569f8d08b9d64df7326c79f605da1be7c97 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 31 Mar 2026 16:23:57 -0700 Subject: [PATCH 1/7] Allow for task finishing on shutdown --- internal/internal_task_pollers.go | 25 +++----- internal/internal_worker_base.go | 45 +++++++------- internal/internal_worker_base_test.go | 88 +++++++++++++++++++++++++++ 3 files changed, 118 insertions(+), 40 deletions(-) diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index bc148edf9..97d04b1ca 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -310,24 +310,13 @@ func (bp *basePoller) doPoll(pollFunc func(ctx context.Context) (taskForWorker, }() if bp.workerPollCompleteOnShutdown != nil && bp.workerPollCompleteOnShutdown.Load() { - // Don't kill the gRPC stream. After ShutdownWorker, the server returns empty responses. - select { - case <-doneC: - return result, err - case <-bp.stopC: - // TEMP FIX: Give the server a reasonable window to complete the poll after - // ShutdownWorker. Fall back to cancelling the poll if it takes too - // long, e.g. when the gRPC connection was closed before Stop(). - timer := time.NewTimer(5 * time.Second) - defer timer.Stop() - select { - case <-doneC: - case <-timer.C: - cancel() - <-doneC - } - return result, err - } + // Don't cancel the gRPC stream. After ShutdownWorker, the server + // completes the poll with an empty response. The poll is bounded by + // the gRPC timeout (pollTaskServiceTimeOut). The worker's stop + // timeout (WorkerStopTimeout) controls how long Stop() blocks; + // goroutines clean up in the background within the gRPC deadline. + <-doneC + return result, err } // Legacy: cancel in-flight polls immediately on shutdown diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index fc3ae45dd..5b8bac19c 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -216,6 +216,7 @@ type ( lastPollTaskErrLock sync.Mutex noRepoll atomic.Bool + pollerWG sync.WaitGroup } eagerOrPolledTask interface { @@ -391,6 +392,7 @@ func (bw *baseWorker) Start() { for i := 0; i < taskWorker.pollerCount; i++ { bw.stopWG.Add(1) + bw.pollerWG.Add(1) go bw.runPoller(taskWorker) } @@ -403,6 +405,15 @@ func (bw *baseWorker) Start() { } } + // When all pollers have exited, close taskQueueCh so the dispatcher + // knows no more polled tasks will arrive and can drain what remains. + bw.stopWG.Add(1) + go func() { + defer bw.stopWG.Done() + bw.pollerWG.Wait() + close(bw.taskQueueCh) + }() + bw.stopWG.Add(1) go bw.runTaskDispatcher() @@ -428,6 +439,7 @@ func (bw *baseWorker) isStop() bool { func (bw *baseWorker) runPoller(taskWorker scalableTaskPoller) { defer bw.stopWG.Done() + defer bw.pollerWG.Done() // Note: With poller autoscaling, this metric doesn't make a lot of sense since the number of pollers can go up and down. bw.metricsHandler.Counter(metrics.PollerStartCounter).Inc(1) @@ -561,24 +573,14 @@ func (bw *baseWorker) processTaskAsync(eagerOrPolled eagerOrPolledTask) { func (bw *baseWorker) runTaskDispatcher() { defer bw.stopWG.Done() - for { - // wait for new task or worker stop - select { - case <-bw.stopCh: - // Currently we can drop any tasks received when closing. - // https://github.com/temporalio/sdk-go/issues/1197 - return - case task := <-bw.taskQueueCh: - // for non-polled-task (local activity result as task or eager task), we don't need to rate limit - _, isPolledTask := task.(*polledTask) - if isPolledTask && bw.taskLimiter.Wait(bw.limiterContext) != nil { - if bw.isStop() { - bw.releaseSlot(task.getPermit(), SlotReleaseReasonUnused) - return - } - } - bw.processTaskAsync(task) + for task := range bw.taskQueueCh { + // For non-polled-task (local activity result as task or eager task), we don't need to rate limit. + // During shutdown the limiter context is cancelled, so Wait returns + // immediately — we still process the task rather than dropping it. + if _, isPolledTask := task.(*polledTask); isPolledTask { + bw.taskLimiter.Wait(bw.limiterContext) } + bw.processTaskAsync(task) } } @@ -639,11 +641,10 @@ func (bw *baseWorker) pollTask(taskWorker scalableTaskPoller, slotPermit *SlotPe taskWorker.pollerAutoscalerReportHandle.handleTask(task) } - select { - case bw.taskQueueCh <- &polledTask{task: task, permit: slotPermit}: - didSendTask = true - case <-bw.stopCh: - } + // The dispatcher is guaranteed to be alive: it only exits after + // taskQueueCh is closed, which happens after all pollers finish. + bw.taskQueueCh <- &polledTask{task: task, permit: slotPermit} + didSendTask = true } } diff --git a/internal/internal_worker_base_test.go b/internal/internal_worker_base_test.go index 81918a33a..f52319a55 100644 --- a/internal/internal_worker_base_test.go +++ b/internal/internal_worker_base_test.go @@ -315,6 +315,94 @@ type noopTaskProcessor struct{} func (noopTaskProcessor) ProcessTask(any) error { return nil } +// TestTaskNotDroppedDuringShutdown verifies the two-stage shutdown: when a +// poller receives a task during shutdown, the task is still dispatched and +// processed rather than silently dropped. +func TestTaskNotDroppedDuringShutdown(t *testing.T) { + taskProcessed := make(chan struct{}) + pollStarted := make(chan struct{}) + + // A poller that blocks until returnTask is closed, then returns a task. + tp := &shutdownTaskPoller{ + pollStarted: pollStarted, + returnTask: make(chan struct{}), + task: &testTask{}, + } + + processor := &recordingTaskProcessor{ + processed: taskProcessed, + } + + bw := newBaseWorker(baseWorkerOptions{ + slotSupplier: &testSlotSupplier{}, + maxTaskPerSecond: 1000, + taskPollers: []scalableTaskPoller{ + {taskPollerType: "test", pollerCount: 1, taskPoller: tp}, + }, + taskProcessor: processor, + workerType: "ShutdownTest", + logger: ilog.NewNopLogger(), + stopTimeout: 5 * time.Second, + metricsHandler: metrics.NopHandler, + }) + + bw.Start() + + // Wait for the poller to start polling + <-pollStarted + + // Signal the poller to return a task, then stop the worker. + // The task should be processed, not dropped. + bw.noRepoll.Store(true) + close(bw.stopCh) + close(tp.returnTask) + bw.limiterContextCancel() + + select { + case <-taskProcessed: + // Success: the task was dispatched and processed + case <-time.After(5 * time.Second): + t.Fatal("task polled during shutdown was not processed (dropped)") + } + + // Wait for full cleanup. We already closed stopCh manually, so + // replicate the remaining Stop() logic. + awaitWaitGroup(&bw.stopWG, bw.options.stopTimeout) +} + +// shutdownTaskPoller blocks until returnTask is closed, then returns a task +// exactly once. Subsequent polls return nil. +type shutdownTaskPoller struct { + pollStarted chan struct{} + returnTask chan struct{} + task taskForWorker + returned atomic.Bool +} + +func (p *shutdownTaskPoller) PollTask() (taskForWorker, error) { + select { + case p.pollStarted <- struct{}{}: + default: + } + <-p.returnTask + if p.returned.CompareAndSwap(false, true) { + return p.task, nil + } + return nil, nil +} + +type recordingTaskProcessor struct { + processed chan struct{} +} + +func (p *recordingTaskProcessor) ProcessTask(any) error { + select { + case p.processed <- struct{}{}: + default: + } + return nil +} + func (s *PollScalerReportHandleSuite) TestAutoscaleDownOnTimeoutWithCapability() { targetSuggestion := 0 ps := newPollScalerReportHandle(pollScalerReportHandleOptions{ From ae7b8f827e2c46ff1c593a9af2df71e75186b64e Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 31 Mar 2026 16:35:25 -0700 Subject: [PATCH 2/7] go run . check --- internal/internal_worker_base.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 5b8bac19c..c23633373 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -574,11 +574,15 @@ func (bw *baseWorker) runTaskDispatcher() { defer bw.stopWG.Done() for task := range bw.taskQueueCh { - // For non-polled-task (local activity result as task or eager task), we don't need to rate limit. - // During shutdown the limiter context is cancelled, so Wait returns - // immediately — we still process the task rather than dropping it. + // For non-polled-task (local activity result as task or eager task), + // we don't need to rate limit. During shutdown the limiter context + // is cancelled, so Wait returns immediately — we still process the + // task rather than dropping it. if _, isPolledTask := task.(*polledTask); isPolledTask { - bw.taskLimiter.Wait(bw.limiterContext) + if err := bw.taskLimiter.Wait(bw.limiterContext); err != nil { + // Context cancelled during shutdown — skip rate limiting + // but still process remaining tasks. + } } bw.processTaskAsync(task) } From 5bdf128c2cc4bda28e710067f7425d9eb5e0ea3a Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Tue, 31 Mar 2026 16:42:54 -0700 Subject: [PATCH 3/7] Make test match prod behavior --- internal/internal_worker_base.go | 7 +++--- internal/internal_worker_base_test.go | 32 ++++++++++++++++++--------- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index c23633373..afd88ddb5 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -579,10 +579,9 @@ func (bw *baseWorker) runTaskDispatcher() { // is cancelled, so Wait returns immediately — we still process the // task rather than dropping it. if _, isPolledTask := task.(*polledTask); isPolledTask { - if err := bw.taskLimiter.Wait(bw.limiterContext); err != nil { - // Context cancelled during shutdown — skip rate limiting - // but still process remaining tasks. - } + // Ignore error: during shutdown the limiter context is + // cancelled, but we still process remaining tasks. + _ = bw.taskLimiter.Wait(bw.limiterContext) } bw.processTaskAsync(task) } diff --git a/internal/internal_worker_base_test.go b/internal/internal_worker_base_test.go index f52319a55..dde5f71c6 100644 --- a/internal/internal_worker_base_test.go +++ b/internal/internal_worker_base_test.go @@ -322,7 +322,8 @@ func TestTaskNotDroppedDuringShutdown(t *testing.T) { taskProcessed := make(chan struct{}) pollStarted := make(chan struct{}) - // A poller that blocks until returnTask is closed, then returns a task. + // A poller that blocks until returnTask is closed, then returns a task + // exactly once. Subsequent polls return nil so the poller can exit. tp := &shutdownTaskPoller{ pollStarted: pollStarted, returnTask: make(chan struct{}), @@ -348,26 +349,35 @@ func TestTaskNotDroppedDuringShutdown(t *testing.T) { bw.Start() - // Wait for the poller to start polling + // Wait for the poller to be actively polling. <-pollStarted - // Signal the poller to return a task, then stop the worker. - // The task should be processed, not dropped. - bw.noRepoll.Store(true) - close(bw.stopCh) + // Release the poller so it returns a task, then stop the worker. + // The poller returns a task and then nil on subsequent polls, + // allowing it to exit via noRepoll/stopCh during Stop(). close(tp.returnTask) - bw.limiterContextCancel() + + // Stop exercises the real shutdown path: noRepoll, close(stopCh), + // limiterContextCancel, and awaitWaitGroup. + stopDone := make(chan struct{}) + go func() { + bw.Stop() + close(stopDone) + }() select { case <-taskProcessed: - // Success: the task was dispatched and processed + // Success: the task was dispatched and processed during shutdown case <-time.After(5 * time.Second): t.Fatal("task polled during shutdown was not processed (dropped)") } - // Wait for full cleanup. We already closed stopCh manually, so - // replicate the remaining Stop() logic. - awaitWaitGroup(&bw.stopWG, bw.options.stopTimeout) + select { + case <-stopDone: + // Stop completed cleanly + case <-time.After(5 * time.Second): + t.Fatal("Stop() did not return in time") + } } // shutdownTaskPoller blocks until returnTask is closed, then returns a task From 29b355a93d1f1d73bd08ee4614e6c1bc6640ce2f Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 1 Apr 2026 15:41:16 -0700 Subject: [PATCH 4/7] Fix test race with new behavior --- test/integration_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/integration_test.go b/test/integration_test.go index 07af77117..ace208290 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -4303,8 +4303,10 @@ func (ts *IntegrationTestSuite) testUpdateOrderingCancel(cancelWf bool) { }() } - // The server does not support admitted updates, so we send the update in a separate goroutine - time.Sleep(5 * time.Second) + // The server does not support admitted updates, so we send the update in a separate goroutine. + // Keep this shorter than the activity's ScheduleToCloseTimeout (5s) so the new worker + // has time to execute activities before they time out. + time.Sleep(2 * time.Second) // Now create a new worker on that same task queue to resume the work of the // workflow nextWorker := worker.New(ts.client, ts.taskQueueName, worker.Options{}) From d1f9d7ce4c6d617445730cb0ba1936b1bacd4623 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 1 Apr 2026 17:19:03 -0700 Subject: [PATCH 5/7] Prevent zombie poller goroutines --- internal/internal_task_pollers.go | 7 +++---- internal/internal_worker_base.go | 6 ++++++ test/integration_test.go | 2 +- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 97d04b1ca..c6454435e 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -311,10 +311,9 @@ func (bp *basePoller) doPoll(pollFunc func(ctx context.Context) (taskForWorker, if bp.workerPollCompleteOnShutdown != nil && bp.workerPollCompleteOnShutdown.Load() { // Don't cancel the gRPC stream. After ShutdownWorker, the server - // completes the poll with an empty response. The poll is bounded by - // the gRPC timeout (pollTaskServiceTimeOut). The worker's stop - // timeout (WorkerStopTimeout) controls how long Stop() blocks; - // goroutines clean up in the background within the gRPC deadline. + // completes the poll with an empty response. The poll is bounded + // by the gRPC timeout (pollTaskServiceTimeOut). Stop() waits for + // all pollers to finish before proceeding to task drain. <-doneC return result, err } diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index afd88ddb5..c51da2128 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -707,6 +707,12 @@ func (bw *baseWorker) Stop() { close(bw.stopCh) bw.limiterContextCancel() + // Wait for pollers to finish. (pollTaskServiceTimeOut) bounds this if the connection is broken. + bw.pollerWG.Wait() + + // Wait for task processing to complete. The dispatcher + // drains taskQueueCh (closed after pollers finish above) and + // processTaskAsync goroutines are tracked in stopWG. if success := awaitWaitGroup(&bw.stopWG, bw.options.stopTimeout); !success { traceLog(func() { bw.logger.Info("Worker graceful stop timed out.", "Stop timeout", bw.options.stopTimeout) diff --git a/test/integration_test.go b/test/integration_test.go index ace208290..f517c4f0d 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -4306,7 +4306,7 @@ func (ts *IntegrationTestSuite) testUpdateOrderingCancel(cancelWf bool) { // The server does not support admitted updates, so we send the update in a separate goroutine. // Keep this shorter than the activity's ScheduleToCloseTimeout (5s) so the new worker // has time to execute activities before they time out. - time.Sleep(2 * time.Second) + time.Sleep(5 * time.Second) // Now create a new worker on that same task queue to resume the work of the // workflow nextWorker := worker.New(ts.client, ts.taskQueueName, worker.Options{}) From be5c0e42d477e788963c5aa7de285497f5adff61 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Wed, 1 Apr 2026 18:32:26 -0700 Subject: [PATCH 6/7] add debug logging --- internal/internal_task_pollers.go | 11 +++++++++-- internal/internal_worker_base.go | 8 ++++++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index c6454435e..57429db68 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -314,8 +314,15 @@ func (bp *basePoller) doPoll(pollFunc func(ctx context.Context) (taskForWorker, // completes the poll with an empty response. The poll is bounded // by the gRPC timeout (pollTaskServiceTimeOut). Stop() waits for // all pollers to finish before proceeding to task drain. - <-doneC - return result, err + select { + case <-doneC: + return result, err + case <-bp.stopC: + fmt.Println("DEBUG: doPoll graceful path: stopC fired, waiting for poll to complete") + <-doneC + fmt.Println("DEBUG: doPoll graceful path: poll completed after stopC") + return result, err + } } // Legacy: cancel in-flight polls immediately on shutdown diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index c51da2128..3564a64ea 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -439,7 +439,10 @@ func (bw *baseWorker) isStop() bool { func (bw *baseWorker) runPoller(taskWorker scalableTaskPoller) { defer bw.stopWG.Done() - defer bw.pollerWG.Done() + defer func() { + bw.logger.Info("Poller exiting", "pollerType", taskWorker.taskPollerType) + bw.pollerWG.Done() + }() // Note: With poller autoscaling, this metric doesn't make a lot of sense since the number of pollers can go up and down. bw.metricsHandler.Counter(metrics.PollerStartCounter).Inc(1) @@ -707,8 +710,9 @@ func (bw *baseWorker) Stop() { close(bw.stopCh) bw.limiterContextCancel() - // Wait for pollers to finish. (pollTaskServiceTimeOut) bounds this if the connection is broken. + bw.logger.Info("Waiting for pollers to finish") bw.pollerWG.Wait() + bw.logger.Info("All pollers finished") // Wait for task processing to complete. The dispatcher // drains taskQueueCh (closed after pollers finish above) and From 57fdffe193f549560aceecc9da2759e4dfc0fe88 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Thu, 30 Apr 2026 08:49:44 -1000 Subject: [PATCH 7/7] Revert "add debug logging" This reverts commit be5c0e42d477e788963c5aa7de285497f5adff61. --- internal/internal_task_pollers.go | 11 ++--------- internal/internal_worker_base.go | 8 ++------ 2 files changed, 4 insertions(+), 15 deletions(-) diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index c834db8e5..a6ad37bb9 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -319,15 +319,8 @@ func (bp *basePoller) doPoll(pollFunc func(ctx context.Context) (taskForWorker, // completes the poll with an empty response. The poll is bounded // by the gRPC timeout (pollTaskServiceTimeOut). Stop() waits for // all pollers to finish before proceeding to task drain. - select { - case <-doneC: - return result, err - case <-bp.stopC: - fmt.Println("DEBUG: doPoll graceful path: stopC fired, waiting for poll to complete") - <-doneC - fmt.Println("DEBUG: doPoll graceful path: poll completed after stopC") - return result, err - } + <-doneC + return result, err } // Legacy: cancel in-flight polls immediately on shutdown diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index c8b29bb8a..cf8b921cf 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -439,10 +439,7 @@ func (bw *baseWorker) isStop() bool { func (bw *baseWorker) runPoller(taskWorker scalableTaskPoller) { defer bw.stopWG.Done() - defer func() { - bw.logger.Info("Poller exiting", "pollerType", taskWorker.taskPollerType) - bw.pollerWG.Done() - }() + defer bw.pollerWG.Done() // Note: With poller autoscaling, this metric doesn't make a lot of sense since the number of pollers can go up and down. bw.metricsHandler.Counter(metrics.PollerStartCounter).Inc(1) @@ -710,9 +707,8 @@ func (bw *baseWorker) Stop() { close(bw.stopCh) bw.limiterContextCancel() - bw.logger.Info("Waiting for pollers to finish") + // Wait for pollers to finish. (pollTaskServiceTimeOut) bounds this if the connection is broken. bw.pollerWG.Wait() - bw.logger.Info("All pollers finished") // Wait for task processing to complete. The dispatcher // drains taskQueueCh (closed after pollers finish above) and