@@ -3,6 +3,7 @@ package v1
33import (
44 "context"
55 "encoding/json"
6+ "errors"
67 "fmt"
78 "io"
89 "log/slog"
@@ -30,6 +31,8 @@ import (
3031 "github.com/chaitin/MonkeyCode/backend/pkg/ws"
3132)
3233
34+ var errRoundEnded = errors .New ("round ended" )
35+
3336// TaskHandler 任务处理器
3437type TaskHandler struct {
3538 cfg * config.Config
@@ -481,66 +484,13 @@ func (h *TaskHandler) consumeLiveStream(ctx context.Context, cancel context.Canc
481484 return
482485 }
483486 if chunk .Event == "task-ended" {
484- cancel (fmt . Errorf ( "round ended" ) )
487+ cancel (errRoundEnded )
485488 return
486489 }
487490 }
488491 }
489492}
490493
491- func (h * TaskHandler ) findTailStart (ctx context.Context , taskID string , taskCreatedAt time.Time ) time.Time {
492- lastInputTS , err := h .loki .FindLastEvent (ctx , taskID , "user-input" , taskCreatedAt , time.Time {})
493- if err != nil {
494- h .logger .With ("error" , err ).WarnContext (ctx , "failed to find last task-ended" )
495- }
496-
497- if ! lastInputTS .IsZero () {
498- return lastInputTS
499- }
500-
501- return taskCreatedAt
502- }
503-
504- func (h * TaskHandler ) tailLogs (ctx context.Context , cancel context.CancelCauseFunc , wsConn * ws.WebsocketManager , logger * slog.Logger , taskID string , tailStart time.Time ) {
505- logLimit := h .cfg .Task .LogLimit
506- if logLimit <= 0 {
507- logLimit = 200
508- }
509-
510- err := h .loki .Tail (ctx , taskID , tailStart , logLimit , time.Time {}, func (le []loki.LogEntry ) error {
511- for _ , l := range le {
512- if l .Line == "" {
513- continue
514- }
515- var chunk taskflow.TaskChunk
516- if err := json .Unmarshal ([]byte (l .Line ), & chunk ); err != nil {
517- logger .ErrorContext (ctx , "failed to unmarshal log entry" , "line" , l .Line , "error" , err )
518- continue
519- }
520- if err := wsConn .WriteJSON (domain.TaskStream {
521- Type : consts .TaskStreamType (chunk .Event ),
522- Data : chunk .Data ,
523- Kind : chunk .Kind ,
524- Timestamp : l .Timestamp .UnixMilli (),
525- }); err != nil {
526- return fmt .Errorf ("failed to write to websocket: %w" , err )
527- }
528-
529- if chunk .Event == "task-ended" {
530- cancel (fmt .Errorf ("round ended" ))
531- return fmt .Errorf ("round ended" )
532- }
533- }
534- return nil
535- })
536-
537- if err != nil {
538- logger .ErrorContext (ctx , "tailer failed" , "error" , err )
539- h .writeError (wsConn , fmt .Errorf ("failed to tail logs %w" , err ))
540- cancel (fmt .Errorf ("failed to tail logs %w" , err ))
541- }
542- }
543-
544494func (h * TaskHandler ) subscribeRealtimeStream (ctx context.Context , cancel context.CancelCauseFunc , wsConn * ws.WebsocketManager , logger * slog.Logger , taskID string ) {
545495 err := h .taskflow .TaskLive (ctx , taskID , false , func (chunk * taskflow.TaskChunk ) error {
546496 if err := wsConn .WriteJSON (domain.TaskStream {
@@ -553,13 +503,13 @@ func (h *TaskHandler) subscribeRealtimeStream(ctx context.Context, cancel contex
553503 }
554504
555505 if chunk .Event == "task-ended" {
556- cancel (fmt . Errorf ( "round ended" ) )
557- return fmt . Errorf ( "round ended" )
506+ cancel (errRoundEnded )
507+ return errRoundEnded
558508 }
559509 return nil
560510 })
561511
562- if err != nil {
512+ if err != nil && ! errors . Is ( err , errRoundEnded ) {
563513 logger .ErrorContext (ctx , "realtime stream failed" , "error" , err )
564514 h .writeError (wsConn , fmt .Errorf ("failed to subscribe realtime stream: %w" , err ))
565515 cancel (fmt .Errorf ("failed to subscribe realtime stream: %w" , err ))
0 commit comments