Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cli/tui/headless/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ func (r *MultiHostHeadlessRunner) RunWithParams(ctx context.Context, image strin
select {
case <-ctx.Done():
done <- ctx.Err()
case <-testOrchestrator.CompletionCh():
done <- nil
case sig := <-sigChan:
r.output("SIGNAL", fmt.Sprintf("Received signal: %v", sig))
done <- fmt.Errorf("interrupted by signal: %v", sig)
Expand Down
18 changes: 18 additions & 0 deletions cli/tui/orchestrator_multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,9 @@ type MultiHostTestOrchestrator struct {
onOutput func(line string)
onError func(err string)

completionCh chan struct{}
completionOnce sync.Once

// New fields for multi-node metrics
aggregator *MetricsAggregator
metricsPoller MetricsPoller
Expand Down Expand Up @@ -808,6 +811,7 @@ func NewMultiHostTestOrchestrator(multiHost *MultiHostOrchestrator) *MultiHostTe

return &MultiHostTestOrchestrator{
multiHost: multiHost,
completionCh: make(chan struct{}),
aggregator: entryAggregator,
metricsPoller: NewAPIMetricsPoller(), // Default implementation
pollInterval: constants.MetricsPollInterval, // Default poll interval
Expand Down Expand Up @@ -979,6 +983,9 @@ func (m *MultiHostTestOrchestrator) metricsPollingLoop(ctx context.Context) {
if update != nil && m.onMetrics != nil {
m.onMetrics(update)
}
if update != nil && update.Aggregated != nil && update.Aggregated.TestState >= constants.TestStateCompleted {
m.signalCompletion()
}
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -1565,10 +1572,21 @@ func (m *MultiHostTestOrchestrator) StopTest() error {
Message: "Multi-host test monitoring stopped",
})
}
m.signalCompletion()

return nil
}

func (m *MultiHostTestOrchestrator) signalCompletion() {
m.completionOnce.Do(func() {
close(m.completionCh)
})
}

func (m *MultiHostTestOrchestrator) CompletionCh() <-chan struct{} {
return m.completionCh
}

// attachWorkerNode marks a prestarted worker node as ready for orchestration.
func (o *MultiHostOrchestrator) attachWorkerNode(host *HostConnection) error {
if host.DockerManager == nil {
Expand Down
81 changes: 81 additions & 0 deletions cli/tui/orchestrator_multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,87 @@ func TestMultiHostTestOrchestrator_StartTest_MultiHost_Logic(t *testing.T) {
}
}

func TestMultiHostTestOrchestrator_CompletionCh_ClosesOnTerminalStatus(t *testing.T) {
var runStarted atomic.Bool
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/ready":
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"ready":true,"status":"ready","scope":"node","role":"entry","node_id":"n0"}`))
case "/health":
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"status":"ok","scope":"node","role":"entry","node_id":"n0"}`))
case "/run":
if r.Method == http.MethodPost {
runStarted.Store(true)
w.Header().Set("ETag", "\"run-123\"")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"runId":"run-123"}`))
return
}
w.WriteHeader(http.StatusMethodNotAllowed)
case "/status":
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"state":"COMPLETED","message":"all steps done","runId":"run-123"}`))
case "/metrics/json":
now := time.Now().UTC().Format(time.RFC3339Nano)
payload := fmt.Sprintf(`[{"metrics_schema":2,"scope":"node","role":"entry","node_id":"n0","run_id":"run-123","sample_ts":"%s","step_id":"s-create","op_type":"CREATE","timestamp":%d,"elapsed_time_seconds":0.1,"test_state":2,"completion_percent":100,"overall_completion_percent":100,"unbounded":false,"overall_unbounded":false,"operations":{"success_count":1,"failed_count":0,"success_rate_last":1,"failed_rate_last":0},"bandwidth":{"bytes_total":1024,"bytes_rate_last":1024},"timing":{"latency_mean_us":1000,"duration_mean_us":1000},"concurrency":{"current":0,"mean":0}}]`, now, time.Now().UnixMilli())
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(payload))
default:
w.WriteHeader(http.StatusNotFound)
}
}))
defer srv.Close()

u, _ := url.Parse(srv.URL)
_, port, _ := net.SplitHostPort(u.Host)

hostInfos := []*hostparse.HostInfo{
{Host: "127.0.0.1", IsLocal: true, Original: "primary"},
{Host: "127.0.0.1", IsLocal: true, Original: "worker1"},
}

orchestrator := NewMultiHostOrchestrator(hostInfos, 2)
orchestrator.apiPort = port

primary := orchestrator.hosts[0]
worker := orchestrator.hosts[1]
primary.SetStatus(HostStatusReady)
worker.SetStatus(HostStatusReady)
primary.DockerManager = NewMockDockerManager()
worker.DockerManager = NewMockDockerManager()

wrapper := NewMultiHostTestOrchestrator(orchestrator)
wrapper.SetCallbacks(
func(*TestStatus) {},
func(*MultiNodeMetricsUpdate) {},
func(string) {},
func(string) {},
)

ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second)
defer cancel()

params := scenario.ScenarioParams{WorkloadType: scenario.WorkloadTypeList, Threads: 1, Bucket: "demo", Endpoint: "http://minio:9000"}
if err := wrapper.StartTest(ctx, "test-image", params); err != nil {
t.Fatalf("StartTest returned error: %v", err)
}
if !runStarted.Load() {
t.Fatalf("expected run to be started via /run API")
}

select {
case <-wrapper.CompletionCh():
case <-time.After(6 * time.Second):
t.Fatal("CompletionCh did not close after terminal status")
}

if err := wrapper.StopTest(); err != nil {
t.Fatalf("StopTest returned error: %v", err)
}
}

func TestMultiHostOrchestrator_StartDistributedTest_AttachWorkers(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:1099")
if err != nil {
Expand Down
Loading