Skip to content

Commit f20704c

Browse files
authored
Merge pull request #475 from chaitin/fix/round-ended-error-log
fix: round ended 不再作为错误记录日志
2 parents 3290f93 + 5007f16 commit f20704c

2 files changed

Lines changed: 11 additions & 58 deletions

File tree

backend/biz/task/handler/v1/task.go

Lines changed: 7 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package v1
33
import (
44
"context"
55
"encoding/json"
6+
"errors"
67
"fmt"
78
"io"
89
"log/slog"
@@ -30,6 +31,8 @@ import (
3031
"github.com/chaitin/MonkeyCode/backend/pkg/ws"
3132
)
3233

34+
var errRoundEnded = errors.New("round ended")
35+
3336
// TaskHandler 任务处理器
3437
type TaskHandler struct {
3538
cfg *config.Config
@@ -481,66 +484,13 @@ func (h *TaskHandler) consumeLiveStream(ctx context.Context, cancel context.Canc
481484
return
482485
}
483486
if chunk.Event == "task-ended" {
484-
cancel(fmt.Errorf("round ended"))
487+
cancel(errRoundEnded)
485488
return
486489
}
487490
}
488491
}
489492
}
490493

491-
func (h *TaskHandler) findTailStart(ctx context.Context, taskID string, taskCreatedAt time.Time) time.Time {
492-
lastInputTS, err := h.loki.FindLastEvent(ctx, taskID, "user-input", taskCreatedAt, time.Time{})
493-
if err != nil {
494-
h.logger.With("error", err).WarnContext(ctx, "failed to find last task-ended")
495-
}
496-
497-
if !lastInputTS.IsZero() {
498-
return lastInputTS
499-
}
500-
501-
return taskCreatedAt
502-
}
503-
504-
func (h *TaskHandler) tailLogs(ctx context.Context, cancel context.CancelCauseFunc, wsConn *ws.WebsocketManager, logger *slog.Logger, taskID string, tailStart time.Time) {
505-
logLimit := h.cfg.Task.LogLimit
506-
if logLimit <= 0 {
507-
logLimit = 200
508-
}
509-
510-
err := h.loki.Tail(ctx, taskID, tailStart, logLimit, time.Time{}, func(le []loki.LogEntry) error {
511-
for _, l := range le {
512-
if l.Line == "" {
513-
continue
514-
}
515-
var chunk taskflow.TaskChunk
516-
if err := json.Unmarshal([]byte(l.Line), &chunk); err != nil {
517-
logger.ErrorContext(ctx, "failed to unmarshal log entry", "line", l.Line, "error", err)
518-
continue
519-
}
520-
if err := wsConn.WriteJSON(domain.TaskStream{
521-
Type: consts.TaskStreamType(chunk.Event),
522-
Data: chunk.Data,
523-
Kind: chunk.Kind,
524-
Timestamp: l.Timestamp.UnixMilli(),
525-
}); err != nil {
526-
return fmt.Errorf("failed to write to websocket: %w", err)
527-
}
528-
529-
if chunk.Event == "task-ended" {
530-
cancel(fmt.Errorf("round ended"))
531-
return fmt.Errorf("round ended")
532-
}
533-
}
534-
return nil
535-
})
536-
537-
if err != nil {
538-
logger.ErrorContext(ctx, "tailer failed", "error", err)
539-
h.writeError(wsConn, fmt.Errorf("failed to tail logs %w", err))
540-
cancel(fmt.Errorf("failed to tail logs %w", err))
541-
}
542-
}
543-
544494
func (h *TaskHandler) subscribeRealtimeStream(ctx context.Context, cancel context.CancelCauseFunc, wsConn *ws.WebsocketManager, logger *slog.Logger, taskID string) {
545495
err := h.taskflow.TaskLive(ctx, taskID, false, func(chunk *taskflow.TaskChunk) error {
546496
if err := wsConn.WriteJSON(domain.TaskStream{
@@ -553,13 +503,13 @@ func (h *TaskHandler) subscribeRealtimeStream(ctx context.Context, cancel contex
553503
}
554504

555505
if chunk.Event == "task-ended" {
556-
cancel(fmt.Errorf("round ended"))
557-
return fmt.Errorf("round ended")
506+
cancel(errRoundEnded)
507+
return errRoundEnded
558508
}
559509
return nil
560510
})
561511

562-
if err != nil {
512+
if err != nil && !errors.Is(err, errRoundEnded) {
563513
logger.ErrorContext(ctx, "realtime stream failed", "error", err)
564514
h.writeError(wsConn, fmt.Errorf("failed to subscribe realtime stream: %w", err))
565515
cancel(fmt.Errorf("failed to subscribe realtime stream: %w", err))

backend/pkg/lifecycle/taskhook.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,10 @@ func (h *TaskHook) handleProcessing(ctx context.Context, id uuid.UUID, metadata
101101
}
102102

103103
h.logger.With("task_id", id).InfoContext(ctx, "creating taskflow task")
104-
return h.taskflow.TaskManager().Create(ctx, createReq)
104+
if err := h.taskflow.TaskManager().Create(ctx, createReq); err != nil {
105+
h.logger.With("error", err, "task_id", id).ErrorContext(ctx, "failed to create task")
106+
}
107+
return nil
105108
})
106109

107110
return nil

0 commit comments

Comments
 (0)