Skip to content

Commit 9ae723f

Browse files
Fix/8787 cancel pipeline keeps running (#8832)
* fix(pipeline): cancel pipeline returns 200 OK but keeps running (#8787) Three independent bugs caused pipeline cancellation to silently fail: 1. CancelPipeline discarded errors from CancelTask and never cancelled TASK_CREATED tasks in future stages. Now running tasks are cancelled via context, non-running tasks are batch-updated to TASK_CANCELLED in the DB, and errors are logged and returned. 2. gitextractor's storeRepoSnapshot (go-git path) had no ctx.Done() checks in its commit/blame loops, making it unresponsive to cancellation for 30+ minutes on large repos. Added cancellation checkpoints following the pattern already used elsewhere in the file. 3. Cancelled tasks were marked TASK_FAILED instead of TASK_CANCELLED, and ComputePipelineStatus never returned TASK_CANCELLED. Now RunTask checks for context cancellation and writes TASK_CANCELLED, and ComputePipelineStatus returns TASK_CANCELLED when the pipeline was cancelled by the user. Test gaps: RunTask deferred status logic, CancelPipeline flow, and storeRepoSnapshot have no existing unit tests. These are pre-existing gaps not introduced by this change. The only existing test, TestComputePipelineStatus, has been extended to cover isCancelled=true. Closes #8787 Related: #5585, #4188 * refactor(pipeline): improve CancelPipeline readability and correctness Extract cancelRunningTasks and cancelPendingTasksInDB helpers from CancelPipeline for better separation of concerns. Also fixes: - .As(NotFound) replaced with .GetType() to prevent swallowing wrapped errors - Added TASK_RESUME to cancelPendingTasksInDB status filter - Error message now includes pipeline ID for traceability - Added TestCancelPipeline e2e tests with 3 subtests
1 parent bfebec3 commit 9ae723f

5 files changed

Lines changed: 199 additions & 13 deletions

File tree

backend/core/runner/run_task.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,12 @@ func RunTask(
7777
}
7878
finishedAt := time.Now()
7979
spentSeconds := finishedAt.Unix() - beganAt.Unix()
80+
isCancelled := errors.Is(err, gocontext.Canceled) || ctx.Err() == gocontext.Canceled
8081
if err != nil {
82+
taskStatus := models.TASK_FAILED
83+
if isCancelled {
84+
taskStatus = models.TASK_CANCELLED
85+
}
8186
lakeErr := errors.AsLakeErrorType(err)
8287
subTaskName := "unknown"
8388
if lakeErr = lakeErr.As(errors.SubtaskErr); lakeErr != nil {
@@ -87,16 +92,19 @@ func RunTask(
8792
} else {
8893
lakeErr = errors.Convert(err)
8994
}
90-
dbe := db.UpdateColumns(task, []dal.DalSet{
91-
{ColumnName: "status", Value: models.TASK_FAILED},
95+
columns := []dal.DalSet{
96+
{ColumnName: "status", Value: taskStatus},
9297
{ColumnName: "message", Value: lakeErr.Error()},
9398
{ColumnName: "error_name", Value: lakeErr.Messages().Format()},
9499
{ColumnName: "finished_at", Value: finishedAt},
95100
{ColumnName: "spent_seconds", Value: spentSeconds},
96-
{ColumnName: "failed_sub_task", Value: subTaskName},
97-
})
101+
}
102+
if taskStatus == models.TASK_FAILED {
103+
columns = append(columns, dal.DalSet{ColumnName: "failed_sub_task", Value: subTaskName})
104+
}
105+
dbe := db.UpdateColumns(task, columns)
98106
if dbe != nil {
99-
logger.Error(dbe, "failed to finalize task status into db (task failed)")
107+
logger.Error(dbe, "failed to finalize task status into db (task %s)", taskStatus)
100108
}
101109
} else {
102110
dbe := db.UpdateColumns(task, []dal.DalSet{
@@ -116,7 +124,7 @@ func RunTask(
116124
dal.Where("id=?", task.PipelineId),
117125
))
118126
// not return err if the `SkipOnFail` is true and the error is not canceled
119-
if dbPipeline.SkipOnFail && !errors.Is(err, gocontext.Canceled) {
127+
if dbPipeline.SkipOnFail && !isCancelled {
120128
err = nil
121129
}
122130
}()

backend/plugins/gitextractor/parser/repo_gogit.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,11 @@ func (r *GogitRepoCollector) storeRepoSnapshot(subtaskCtx plugin.SubTaskContext,
524524
ctx := subtaskCtx.GetContext()
525525
snapshot := make(map[string][]string) // {"filePathAndName": ["line1 commit sha", "line2 commit sha"]}
526526
for _, commit := range commitList {
527+
select {
528+
case <-ctx.Done():
529+
return ctx.Err()
530+
default:
531+
}
527532
commitTree, firstParentTree, err := r.getCurrentAndParentTree(ctx, commit)
528533
if err != nil {
529534
return err
@@ -533,6 +538,11 @@ func (r *GogitRepoCollector) storeRepoSnapshot(subtaskCtx plugin.SubTaskContext,
533538
return err
534539
}
535540
for _, p := range patch.Stats() {
541+
select {
542+
case <-ctx.Done():
543+
return ctx.Err()
544+
default:
545+
}
536546
fileName := p.Name
537547
if _, ok := snapshot[fileName]; !ok {
538548
snapshot[fileName] = []string{}

backend/server/services/pipeline.go

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -461,10 +461,54 @@ func CancelPipeline(pipelineId uint64) errors.Error {
461461
if count == 0 {
462462
return nil
463463
}
464-
for _, pendingTask := range pendingTasks {
465-
_ = CancelTask(pendingTask.ID)
464+
var runningTaskIds []uint64
465+
var pendingTaskIds []uint64
466+
for _, task := range pendingTasks {
467+
if task.Status == models.TASK_RUNNING {
468+
runningTaskIds = append(runningTaskIds, task.ID)
469+
} else {
470+
pendingTaskIds = append(pendingTaskIds, task.ID)
471+
}
472+
}
473+
failedCancels := cancelRunningTasks(runningTaskIds) + cancelPendingTasksInDB(pendingTaskIds)
474+
if failedCancels > 0 {
475+
return errors.Default.New(fmt.Sprintf("failed to cancel %d task(s) for pipeline #%d", failedCancels, pipelineId))
476+
}
477+
return nil
478+
}
479+
480+
// cancelRunningTasks cancels tasks that are actively running by triggering
481+
// their context cancellation. Returns the number of tasks that failed to cancel.
482+
func cancelRunningTasks(taskIds []uint64) int {
483+
failCount := 0
484+
for _, taskId := range taskIds {
485+
if err := CancelTask(taskId); err != nil {
486+
if err.GetType() == errors.NotFound {
487+
continue // task no longer tracked in-memory (finished or context lost after restart)
488+
}
489+
globalPipelineLog.Error(err, "failed to cancel running task #%d", taskId)
490+
failCount++
491+
}
492+
}
493+
return failCount
494+
}
495+
496+
// cancelPendingTasksInDB marks non-running pending tasks as cancelled directly
497+
// in the database. Returns the number of tasks that failed to update.
498+
func cancelPendingTasksInDB(taskIds []uint64) int {
499+
if len(taskIds) == 0 {
500+
return 0
501+
}
502+
err := db.UpdateColumn(
503+
&models.Task{},
504+
"status", models.TASK_CANCELLED,
505+
dal.Where("id IN ? AND status IN ?", taskIds, []string{models.TASK_CREATED, models.TASK_RERUN, models.TASK_RESUME}),
506+
)
507+
if err != nil {
508+
globalPipelineLog.Error(err, "failed to cancel %d pending tasks in DB", len(taskIds))
509+
return len(taskIds)
466510
}
467-
return errors.Convert(err)
511+
return 0
468512
}
469513

470514
// getPipelineLogsPath gets the logs directory of this pipeline

backend/server/services/pipeline_runner.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,10 @@ func runPipeline(pipelineId uint64) errors.Error {
107107
}
108108

109109
// ComputePipelineStatus determines pipeline status by its latest(rerun included) tasks statuses
110-
// 1. TASK_COMPLETED: all tasks were executed successfully
111-
// 2. TASK_FAILED: SkipOnFail=false with failed task(s)
112-
// 3. TASK_PARTIAL: SkipOnFail=true with failed task(s)
110+
// 1. TASK_CANCELLED: pipeline was cancelled by the user (takes priority)
111+
// 2. TASK_COMPLETED: all tasks were executed successfully
112+
// 3. TASK_FAILED: SkipOnFail=false with failed task(s)
113+
// 4. TASK_PARTIAL: SkipOnFail=true with failed task(s)
113114
func ComputePipelineStatus(pipeline *models.Pipeline, isCancelled bool) (string, errors.Error) {
114115
tasks, err := GetLatestTasksOfPipeline(pipeline)
115116
if err != nil {
@@ -134,6 +135,9 @@ func ComputePipelineStatus(pipeline *models.Pipeline, isCancelled bool) (string,
134135
return "", errors.Default.New("unexpected status, did you call computePipelineStatus at a wrong timing?")
135136
}
136137

138+
if isCancelled {
139+
return models.TASK_CANCELLED, nil
140+
}
137141
if failed == 0 {
138142
return models.TASK_COMPLETED, nil
139143
}

backend/test/e2e/services/pipeline_runner_test.go

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@ limitations under the License.
1818
package services
1919

2020
import (
21+
"testing"
22+
"time"
23+
24+
"github.com/apache/incubator-devlake/core/dal"
2125
"github.com/apache/incubator-devlake/core/models"
2226
"github.com/apache/incubator-devlake/server/services"
2327
"github.com/apache/incubator-devlake/test/helper"
2428
"github.com/stretchr/testify/assert"
25-
"testing"
2629
)
2730

2831
func TestComputePipelineStatus(t *testing.T) {
@@ -163,4 +166,121 @@ func TestComputePipelineStatus(t *testing.T) {
163166
status, err = services.ComputePipelineStatus(pipeline, false)
164167
assert.Nil(t, err)
165168
assert.Equal(t, models.TASK_PARTIAL, status)
169+
170+
// pipeline.status == "cancelled" if the pipeline was cancelled by the user
171+
// regardless of individual task statuses
172+
task_row1_col1_rerun.Status = models.TASK_COMPLETED
173+
err = db.Update(task_row1_col1_rerun)
174+
assert.Nil(t, err)
175+
status, err = services.ComputePipelineStatus(pipeline, true)
176+
assert.Nil(t, err)
177+
assert.Equal(t, models.TASK_CANCELLED, status)
178+
179+
// pipeline.status == "cancelled" even when some tasks failed
180+
task_row1_col1_rerun.Status = models.TASK_FAILED
181+
err = db.Update(task_row1_col1_rerun)
182+
assert.Nil(t, err)
183+
status, err = services.ComputePipelineStatus(pipeline, true)
184+
assert.Nil(t, err)
185+
assert.Equal(t, models.TASK_CANCELLED, status)
186+
}
187+
188+
func TestCancelPipeline(t *testing.T) {
189+
client := helper.StartDevLakeServer(t, nil)
190+
db := client.GetDal()
191+
192+
t.Run("cancels pending pipeline and all its tasks", func(t *testing.T) {
193+
pipeline := &models.Pipeline{
194+
TotalTasks: 2,
195+
Status: models.TASK_CREATED,
196+
}
197+
err := db.Create(pipeline)
198+
assert.Nil(t, err)
199+
assert.NotZero(t, pipeline.ID)
200+
201+
task1 := &models.Task{
202+
PipelineId: pipeline.ID,
203+
PipelineRow: 1,
204+
PipelineCol: 1,
205+
Plugin: "github",
206+
Status: models.TASK_CREATED,
207+
}
208+
task2 := &models.Task{
209+
PipelineId: pipeline.ID,
210+
PipelineRow: 1,
211+
PipelineCol: 2,
212+
Plugin: "gitextractor",
213+
Status: models.TASK_CREATED,
214+
}
215+
err = db.Create(task1)
216+
assert.Nil(t, err)
217+
assert.NotZero(t, task1.ID)
218+
err = db.Create(task2)
219+
assert.Nil(t, err)
220+
assert.NotZero(t, task2.ID)
221+
222+
err = services.CancelPipeline(pipeline.ID)
223+
assert.Nil(t, err)
224+
225+
cancelledPipeline := &models.Pipeline{}
226+
err = db.First(cancelledPipeline, dal.Where("id = ?", pipeline.ID))
227+
assert.Nil(t, err)
228+
assert.Equal(t, models.TASK_CANCELLED, cancelledPipeline.Status)
229+
230+
cancelledTask1, err := services.GetTask(task1.ID)
231+
assert.Nil(t, err)
232+
assert.Equal(t, models.TASK_CANCELLED, cancelledTask1.Status)
233+
cancelledTask2, err := services.GetTask(task2.ID)
234+
assert.Nil(t, err)
235+
assert.Equal(t, models.TASK_CANCELLED, cancelledTask2.Status)
236+
})
237+
238+
t.Run("cancels pending tasks but leaves completed tasks unchanged", func(t *testing.T) {
239+
pipeline := &models.Pipeline{
240+
TotalTasks: 2,
241+
Status: models.TASK_RUNNING,
242+
}
243+
err := db.Create(pipeline)
244+
assert.Nil(t, err)
245+
assert.NotZero(t, pipeline.ID)
246+
247+
finishedAt := time.Now()
248+
completedTask := &models.Task{
249+
PipelineId: pipeline.ID,
250+
PipelineRow: 1,
251+
PipelineCol: 1,
252+
Plugin: "github",
253+
Status: models.TASK_COMPLETED,
254+
FinishedAt: &finishedAt,
255+
}
256+
pendingTask := &models.Task{
257+
PipelineId: pipeline.ID,
258+
PipelineRow: 2,
259+
PipelineCol: 1,
260+
Plugin: "refdiff",
261+
Status: models.TASK_CREATED,
262+
}
263+
err = db.Create(completedTask)
264+
assert.Nil(t, err)
265+
assert.NotZero(t, completedTask.ID)
266+
err = db.Create(pendingTask)
267+
assert.Nil(t, err)
268+
assert.NotZero(t, pendingTask.ID)
269+
270+
err = services.CancelPipeline(pipeline.ID)
271+
assert.Nil(t, err)
272+
273+
reloadedCompleted, err := services.GetTask(completedTask.ID)
274+
assert.Nil(t, err)
275+
assert.Equal(t, models.TASK_COMPLETED, reloadedCompleted.Status)
276+
277+
reloadedPending, err := services.GetTask(pendingTask.ID)
278+
assert.Nil(t, err)
279+
assert.Equal(t, models.TASK_CANCELLED, reloadedPending.Status)
280+
})
281+
282+
t.Run("returns error for non-existent pipeline", func(t *testing.T) {
283+
err := services.CancelPipeline(999999)
284+
assert.NotNil(t, err)
285+
})
166286
}

0 commit comments

Comments
 (0)