Skip to content

Commit af6a51c

Browse files
authored
Merge pull request #533 from chaitin/feat/clickhouse-tasklogs
feat(task): 接入 ClickHouse 任务日志路由
2 parents bcdc528 + 27e20ff commit af6a51c

46 files changed

Lines changed: 2316 additions & 269 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

backend/Makefile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ OUTPUT=type=docker,dest=$(HOME)/tmp/mcai_server.tar
44
GOCACHE=/root/.cache/go-build
55
GOMODCACHE?=/go/pkg/mod
66
REGISTRY=chaitin-registry.cn-hangzhou.cr.aliyuncs.com/monkeycode
7-
87
# make server PLATFORM= TAG= OUTPUT_SERVER= GOCACHE=
98
image:
109
docker buildx build \

backend/biz/host/handler/v1/internal.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
type InternalHostHandler struct {
3333
logger *slog.Logger
3434
repo domain.HostRepo
35+
taskRepo taskLogStoreRepo
3536
teamRepo domain.TeamHostRepo
3637
redis *redis.Client
3738
getAgentToken agentTokenGetter
@@ -46,6 +47,10 @@ type InternalHostHandler struct {
4647
tokenProvider *gituc.TokenProvider
4748
}
4849

50+
type taskLogStoreRepo interface {
51+
GetLogStore(ctx context.Context, id uuid.UUID) (consts.LogStore, error)
52+
}
53+
4954
func NewInternalHostHandler(i *do.Injector) (*InternalHostHandler, error) {
5055
w := do.MustInvoke[*web.Web](i)
5156
tf := do.MustInvoke[taskflow.Clienter](i)
@@ -54,6 +59,7 @@ func NewInternalHostHandler(i *do.Injector) (*InternalHostHandler, error) {
5459
h := &InternalHostHandler{
5560
logger: do.MustInvoke[*slog.Logger](i).With("module", "InternalHostHandler"),
5661
repo: do.MustInvoke[domain.HostRepo](i),
62+
taskRepo: do.MustInvoke[domain.TaskRepo](i),
5763
teamRepo: do.MustInvoke[domain.TeamHostRepo](i),
5864
redis: rdb,
5965
getAgentToken: defaultAgentTokenGetter(rdb),
@@ -78,6 +84,7 @@ func NewInternalHostHandler(i *do.Injector) (*InternalHostHandler, error) {
7884
g.POST("/coding-config", web.BindHandler(h.GetCodingConfig))
7985
g.POST("/git-credential", web.BindHandler(h.GitCredential))
8086
g.GET("/vm/list", web.BaseHandler(h.VMList))
87+
g.POST("/task-log-store", web.BindHandler(h.GetTaskLogStore))
8188
g.POST("/task-stream-ips", web.BindHandler(h.GetTaskStreamIPs))
8289

8390
return h, nil
@@ -186,6 +193,19 @@ func (h *InternalHostHandler) CheckToken(c *web.Context, req taskflow.CheckToken
186193
return c.Success(tk)
187194
}
188195

196+
func (h *InternalHostHandler) GetTaskLogStore(c *web.Context, req taskflow.GetTaskLogStoreReq) error {
197+
store, err := h.taskRepo.GetLogStore(c.Request().Context(), req.TaskID)
198+
if err != nil {
199+
return err
200+
}
201+
if store == "" {
202+
store = consts.LogStoreLoki
203+
}
204+
return c.Success(taskflow.GetTaskLogStoreResp{
205+
LogStore: string(store),
206+
})
207+
}
208+
189209
func (h *InternalHostHandler) agentAuth(ctx context.Context, token, mid string) (*taskflow.Token, error) {
190210
// 1) 优先从 Redis 读取一次性 agent token,并清除
191211
key := fmt.Sprintf("agent:token:%s", token)

backend/biz/host/handler/v1/internal_auth_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func TestAgentAuthSoftDeletedRecycledVMStillTriggersDelete(t *testing.T) {
130130
type internalHostRepoStub struct {
131131
vm *db.VirtualMachine
132132
assertSkipMarker bool
133-
skipMarkerKey interface{}
133+
skipMarkerKey any
134134
skipMarkerValue string
135135
}
136136

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package v1
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"io"
8+
"log/slog"
9+
"net/http"
10+
"net/http/httptest"
11+
"strings"
12+
"testing"
13+
14+
"github.com/GoYoko/web"
15+
"github.com/google/uuid"
16+
17+
"github.com/chaitin/MonkeyCode/backend/consts"
18+
taskflowpkg "github.com/chaitin/MonkeyCode/backend/pkg/taskflow"
19+
)
20+
21+
type taskLogStoreRepoStub struct {
22+
store consts.LogStore
23+
err error
24+
}
25+
26+
func (s *taskLogStoreRepoStub) GetLogStore(context.Context, uuid.UUID) (consts.LogStore, error) {
27+
return s.store, s.err
28+
}
29+
30+
func TestInternalHostHandler_GetTaskLogStore_EmptyMeansLoki(t *testing.T) {
31+
h := &InternalHostHandler{
32+
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
33+
taskRepo: &taskLogStoreRepoStub{},
34+
}
35+
req := taskflowpkg.GetTaskLogStoreReq{TaskID: uuid.New()}
36+
resp := callGetTaskLogStore(t, h, req)
37+
if resp.LogStore != string(consts.LogStoreLoki) {
38+
t.Fatalf("log_store = %q, want %q", resp.LogStore, consts.LogStoreLoki)
39+
}
40+
}
41+
42+
func TestInternalHostHandler_GetTaskLogStore_ClickHousePassthrough(t *testing.T) {
43+
h := &InternalHostHandler{
44+
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
45+
taskRepo: &taskLogStoreRepoStub{store: consts.LogStoreClickHouse},
46+
}
47+
req := taskflowpkg.GetTaskLogStoreReq{TaskID: uuid.New()}
48+
resp := callGetTaskLogStore(t, h, req)
49+
if resp.LogStore != string(consts.LogStoreClickHouse) {
50+
t.Fatalf("log_store = %q, want %q", resp.LogStore, consts.LogStoreClickHouse)
51+
}
52+
}
53+
54+
func TestInternalHostHandler_GetTaskLogStore_RepoError(t *testing.T) {
55+
h := &InternalHostHandler{
56+
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
57+
taskRepo: &taskLogStoreRepoStub{err: errors.New("boom")},
58+
}
59+
req := taskflowpkg.GetTaskLogStoreReq{TaskID: uuid.New()}
60+
body, err := json.Marshal(req)
61+
if err != nil {
62+
t.Fatal(err)
63+
}
64+
w := web.New()
65+
w.POST("/internal/task-log-store", web.BindHandler(h.GetTaskLogStore))
66+
rec := httptest.NewRecorder()
67+
httpReq := httptest.NewRequest(http.MethodPost, "/internal/task-log-store", strings.NewReader(string(body)))
68+
httpReq.Header.Set("Content-Type", "application/json")
69+
w.Echo().ServeHTTP(rec, httpReq)
70+
71+
var resp web.Resp
72+
if err := json.Unmarshal(rec.Body.Bytes(), &resp); err != nil {
73+
t.Fatalf("unmarshal resp: %v", err)
74+
}
75+
if resp.Code == 0 {
76+
t.Fatalf("response = %+v, want error", resp)
77+
}
78+
}
79+
80+
func callGetTaskLogStore(t *testing.T, h *InternalHostHandler, req taskflowpkg.GetTaskLogStoreReq) taskflowpkg.GetTaskLogStoreResp {
81+
t.Helper()
82+
83+
body, err := json.Marshal(req)
84+
if err != nil {
85+
t.Fatal(err)
86+
}
87+
88+
w := web.New()
89+
w.POST("/internal/task-log-store", web.BindHandler(h.GetTaskLogStore))
90+
91+
rec := httptest.NewRecorder()
92+
httpReq := httptest.NewRequest(http.MethodPost, "/internal/task-log-store", strings.NewReader(string(body)))
93+
httpReq.Header.Set("Content-Type", "application/json")
94+
w.Echo().ServeHTTP(rec, httpReq)
95+
96+
if rec.Code != http.StatusOK {
97+
t.Fatalf("status = %d, body = %s", rec.Code, rec.Body.String())
98+
}
99+
100+
var resp web.Resp
101+
if err := json.Unmarshal(rec.Body.Bytes(), &resp); err != nil {
102+
t.Fatalf("unmarshal resp: %v", err)
103+
}
104+
data, err := json.Marshal(resp.Data)
105+
if err != nil {
106+
t.Fatalf("marshal resp data: %v", err)
107+
}
108+
109+
var out taskflowpkg.GetTaskLogStoreResp
110+
if err := json.Unmarshal(data, &out); err != nil {
111+
t.Fatalf("unmarshal typed resp: %v", err)
112+
}
113+
return out
114+
}

backend/biz/host/usecase/host_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,17 @@ func (s *hostTaskRepoStub) GetByID(ctx context.Context, id uuid.UUID) (*db.Task,
116116
return s.client.Task.Get(ctx, id)
117117
}
118118

119+
func (s *hostTaskRepoStub) GetLogStore(ctx context.Context, id uuid.UUID) (consts.LogStore, error) {
120+
tk, err := s.client.Task.Get(ctx, id)
121+
if err != nil {
122+
return "", err
123+
}
124+
if tk.LogStore == nil {
125+
return "", nil
126+
}
127+
return *tk.LogStore, nil
128+
}
129+
119130
func (s *hostTaskRepoStub) Stat(context.Context, uuid.UUID) (*domain.TaskStats, error) {
120131
panic("unexpected call to Stat")
121132
}

0 commit comments

Comments
 (0)