diff --git a/backend/plugins/github/tasks/cicd_run_collector.go b/backend/plugins/github/tasks/cicd_run_collector.go index 68bd0aa15fa..44c816ca236 100644 --- a/backend/plugins/github/tasks/cicd_run_collector.go +++ b/backend/plugins/github/tasks/cicd_run_collector.go @@ -25,7 +25,7 @@ import ( "time" "github.com/apache/incubator-devlake/core/errors" - "github.com/apache/incubator-devlake/core/models/common" + "github.com/apache/incubator-devlake/core/log" "github.com/apache/incubator-devlake/core/plugin" helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api" ) @@ -36,14 +36,20 @@ func init() { const RAW_RUN_TABLE = "github_api_runs" -// Although the API accepts a maximum of 100 entries per page, sometimes -// the response body is too large which would lead to request failures -// https://github.com/apache/incubator-devlake/issues/3199 +// PAGE_SIZE is below GitHub's 100-item max to avoid oversized response bodies (#3199). const PAGE_SIZE = 30 -type SimpleGithubApiJob struct { - ID int64 - CreatedAt common.Iso8601Time `json:"created_at"` +// FILTERED_SEARCH_CAP is GitHub's per-query item cap for `/actions/runs` in filtered mode +// (`created=..`); exceeding it triggers HTTP 422. See #8842. +const FILTERED_SEARCH_CAP = 1000 + +// githubTimeLayout is the ISO8601 format GitHub expects in the `created` filter. +const githubTimeLayout = "2006-01-02T15:04:05Z" + +// TimeWindow is an inclusive-both-ends range for the `/actions/runs` `created=..` query. +type TimeWindow struct { + From time.Time + To time.Time } var CollectRunsMeta = plugin.SubTaskMeta{ @@ -56,75 +62,265 @@ var CollectRunsMeta = plugin.SubTaskMeta{ ProductTables: []string{RAW_RUN_TABLE}, } +// CollectRuns collects GitHub Workflow Runs into the raw table, working around the two +// pagination caps of `/actions/runs`: 40k items in unfiltered mode and 1000 items per +// filtered window. It probes each candidate window with `per_page=1`, bisects recursively +// until every leaf is under FILTERED_SEARCH_CAP, then feeds the leaves to a single +// ApiCollector so the raw table is truncated only once per fullsync. func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*GithubTaskData) - log := taskCtx.GetLogger() - collector, err := helper.NewStatefulApiCollectorForFinalizableEntity(helper.FinalizableApiCollectorArgs{ - RawDataSubTaskArgs: helper.RawDataSubTaskArgs{ - Ctx: taskCtx, - Params: GithubApiParams{ - ConnectionId: data.Options.ConnectionId, - Name: data.Options.Name, - }, - Table: RAW_RUN_TABLE, + logger := taskCtx.GetLogger() + + manager, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ + Ctx: taskCtx, + Params: GithubApiParams{ + ConnectionId: data.Options.ConnectionId, + Name: data.Options.Name, }, - ApiClient: data.ApiClient, - CollectNewRecordsByList: helper.FinalizableApiCollectorListArgs{ - PageSize: PAGE_SIZE, - Concurrency: 10, - FinalizableApiCollectorCommonArgs: helper.FinalizableApiCollectorCommonArgs{ - UrlTemplate: "repos/{{ .Params.Name }}/actions/runs", - Query: func(reqData *helper.RequestData, createdAfter *time.Time) (url.Values, errors.Error) { - query := url.Values{} - // GitHub API returns only the first 34 pages (with a size of 30) when specifying status=compleleted, try the following API request to verify the problem. - // https://api.github.com/repos/apache/incubator-devlake/actions/runs?per_page=30&page=35&status=completed - // query.Set("status", "completed") - query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page)) - query.Set("per_page", fmt.Sprintf("%v", reqData.Pager.Size)) - return query, nil - }, - ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) { - body := &GithubRawRunsResult{} - err := helper.UnmarshalResponse(res, body) - if err != nil { - return nil, err - } - if len(body.WorkflowRuns) == 0 { - return nil, nil - } - // filter out the runs that are not completed - filteredRuns := make([]json.RawMessage, 0) - for _, run := range body.WorkflowRuns { - if run.Status == "completed" { - runJSON, err := json.Marshal(run) - if err != nil { - return nil, errors.Convert(err) - } - filteredRuns = append(filteredRuns, json.RawMessage(runJSON)) - } else { - log.Info("Skipping run{id: %d, number: %d} with status %s", run.ID, run.RunNumber, run.Status) - } - } - return filteredRuns, nil - }, - }, - GetCreated: func(item json.RawMessage) (time.Time, errors.Error) { - pj := &SimpleGithubApiJob{} - err := json.Unmarshal(item, pj) + Table: RAW_RUN_TABLE, + }) + if err != nil { + return err + } + + // Normalize both bounds to second precision so `created=..` queries and the + // persisted LatestSuccessStart share the same boundary; without this, every incremental + // sync would re-fetch up to 1s of overlap. For incremental syncs we also advance + // `windowStart` past the previously collected second (inclusive-both-ends), while + // fullsync + TimeAfter keeps the user-specified bound inclusive. + createdAfter := manager.GetSince() + untilPtr := manager.GetUntil() + *untilPtr = untilPtr.Truncate(time.Second) + until := *untilPtr + + var windowStart time.Time + if createdAfter != nil { + windowStart = createdAfter.Truncate(time.Second) + if manager.IsIncremental() { + windowStart = windowStart.Add(time.Second) + } + } else { + // 2018-01-01 conservatively predates GitHub Actions' late-2019 GA. + windowStart = time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC) + } + + logger.Info("cicd_run_collector: collecting workflow runs in [%s, %s] (incremental=%v)", + windowStart.Format(githubTimeLayout), + until.Format(githubTimeLayout), + manager.IsIncremental()) + + leafWindows, err := newLeafWindowBuilder(taskCtx, data).build(windowStart, until) + if err != nil { + return err + } + logger.Info("cicd_run_collector: built %d leaf windows for collection", len(leafWindows)) + + if err := registerCollectorForLeafWindows(manager, data.ApiClient, leafWindows); err != nil { + return err + } + + return manager.Execute() +} + +// buildRunsQuery assembles the filtered-mode query for a single leaf TimeWindow. +// Shared between registerCollectorForLeafWindows and tests. +func buildRunsQuery(reqData *helper.RequestData) (url.Values, errors.Error) { + w, ok := reqData.Input.(*TimeWindow) + if !ok || w == nil { + return nil, errors.Default.New("cicd_run_collector: Input is not *TimeWindow") + } + q := url.Values{} + q.Set("created", fmt.Sprintf("%s..%s", + w.From.UTC().Format(githubTimeLayout), + w.To.UTC().Format(githubTimeLayout))) + q.Set("page", fmt.Sprintf("%d", reqData.Pager.Page)) + q.Set("per_page", fmt.Sprintf("%d", reqData.Pager.Size)) + return q, nil +} + +// registerCollectorForLeafWindows wires a single ApiCollector whose Input iterator feeds the +// leaf TimeWindows. +func registerCollectorForLeafWindows( + manager *helper.StatefulApiCollector, + apiClient helper.RateLimitedApiClient, + leafWindows []TimeWindow, +) errors.Error { + iterator := helper.NewQueueIterator() + for i := range leafWindows { + w := leafWindows[i] + iterator.Push(&w) + } + return manager.InitCollector(helper.ApiCollectorArgs{ + ApiClient: apiClient, + Input: iterator, + UrlTemplate: "repos/{{ .Params.Name }}/actions/runs", + Query: buildRunsQuery, + PageSize: PAGE_SIZE, + Concurrency: 10, + ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) { + body := &GithubRawRunsResult{} + if err := helper.UnmarshalResponse(res, body); err != nil { + return nil, err + } + if len(body.WorkflowRuns) == 0 { + return nil, nil + } + // Range is already bounded in filtered mode; only keep completed runs. + filtered := make([]json.RawMessage, 0, len(body.WorkflowRuns)) + for _, run := range body.WorkflowRuns { + if run.Status != "completed" { + continue + } + runJSON, err := json.Marshal(run) if err != nil { - return time.Time{}, errors.BadInput.Wrap(err, "failed to unmarshal github run") + return nil, errors.Convert(err) } - return pj.CreatedAt.ToTime(), nil - }, + filtered = append(filtered, json.RawMessage(runJSON)) + } + return filtered, nil }, }) +} + +// probeFunc signature matches defaultProbeTotalCount so tests can inject a fake. +type probeFunc func(taskCtx plugin.SubTaskContext, data *GithubTaskData, from, to time.Time) (int, bool, errors.Error) + +// leafWindowBuilder recursively bisects a [from, to] range until every leaf window fits under +// FILTERED_SEARCH_CAP. The probe function is a field so tests can inject a fake without +// mutating package-level state. +type leafWindowBuilder struct { + taskCtx plugin.SubTaskContext + data *GithubTaskData + probe probeFunc + logger log.Logger +} + +func newLeafWindowBuilder(taskCtx plugin.SubTaskContext, data *GithubTaskData) *leafWindowBuilder { + return &leafWindowBuilder{ + taskCtx: taskCtx, + data: data, + probe: defaultProbeTotalCount, + logger: taskCtx.GetLogger(), + } +} + +// build recursively bisects [from, to] until every leaf has total_count < FILTERED_SEARCH_CAP +// (or the window is a single-second bucket that cannot be split further). Empty windows are +// dropped. +// +// Boundary policy (non-overlapping, full coverage at second precision): +// - left: created=.. +// - right: created=.. +// +// Bisection is done on integer Unix seconds because GitHub's `created` filter is +// second-precision; a single-second bucket (from.Unix() == to.Unix()) is the smallest +// indivisible unit. +func (b *leafWindowBuilder) build(from, to time.Time) ([]TimeWindow, errors.Error) { + if !from.Before(to) && !from.Equal(to) { + return nil, nil + } + total, is422, err := b.probe(b.taskCtx, b.data, from, to) if err != nil { - return err + return nil, err + } + + if total == 0 && !is422 { + return nil, nil } - return collector.Execute() + if total >= FILTERED_SEARCH_CAP || is422 { + fromSec := from.UTC().Unix() + toSec := to.UTC().Unix() + if fromSec == toSec { + return nil, errors.Default.New(fmt.Sprintf( + "cicd_run_collector: %d runs within a single 1-second bucket at %s; cannot bisect further. "+ + "Filtered GitHub search caps at %d items per window, so some runs would be missed. "+ + "Refusing to advance collector state.", + total, from.UTC().Format(time.RFC3339), FILTERED_SEARCH_CAP, + )) + } + if b.logger != nil { + b.logger.Debug("cicd_run_collector: bisecting [%s, %s] (total=%d, is422=%v)", + from.Format(githubTimeLayout), + to.Format(githubTimeLayout), + total, is422) + } + midSec := (fromSec + toSec) / 2 + leftTo := time.Unix(midSec, 0).UTC() + rightFrom := leftTo.Add(time.Second) + left, err := b.build(from, leftTo) + if err != nil { + return nil, err + } + right, err := b.build(rightFrom, to) + if err != nil { + return nil, err + } + return append(left, right...), nil + } + return []TimeWindow{{From: from, To: to}}, nil +} + +// defaultProbeTotalCount issues a filtered-mode GET with per_page=1 to learn total_count +// (or detect 422) cheaply. It runs under SubmitBlocking so it shares the rate-limit budget +// with the main collector; DoGetAsync is avoided because that path errors on >=400 before +// the callback, which would hide the 422 we use as a bisection signal. +func defaultProbeTotalCount( + taskCtx plugin.SubTaskContext, + data *GithubTaskData, + from time.Time, + to time.Time, +) (int, bool, errors.Error) { + q := url.Values{} + q.Set("per_page", "1") + q.Set("page", "1") + q.Set("created", fmt.Sprintf("%s..%s", + from.UTC().Format(githubTimeLayout), + to.UTC().Format(githubTimeLayout))) + path := fmt.Sprintf("repos/%s/actions/runs", data.Options.Name) + + var total int + var is422 bool + var innerErr errors.Error + data.ApiClient.SubmitBlocking(func() errors.Error { + res, getErr := data.ApiClient.Get(path, q, nil) + // If a sibling subtask installed an AfterResponse hook that maps 422 -> + // helper.ErrIgnoreAndContinue, ApiClient.Do already closed res.Body before returning + // the sentinel (api_client.go L389). Recover the 422 signal here without double-closing. + // Sentinel comparison uses `==` to stay consistent with every other ErrIgnoreAndContinue + // call site in devlake (api_client.go:389, api_async_client.go:165, etc.). + if getErr == helper.ErrIgnoreAndContinue && res != nil && res.StatusCode == http.StatusUnprocessableEntity { + is422 = true + return nil + } + if getErr != nil { + innerErr = getErr + return nil + } + if res.StatusCode == http.StatusUnprocessableEntity { + if res.Body != nil { + _ = res.Body.Close() + } + is422 = true + return nil + } + body := &GithubRawRunsResult{} + if e := helper.UnmarshalResponse(res, body); e != nil { + innerErr = e + return nil + } + total = body.TotalCount + return nil + }) + if err := data.ApiClient.WaitAsync(); err != nil { + return 0, false, err + } + if innerErr != nil { + return 0, false, innerErr + } + return total, is422, nil } type GithubRawRunsResult struct { diff --git a/backend/plugins/github/tasks/cicd_run_collector_test.go b/backend/plugins/github/tasks/cicd_run_collector_test.go new file mode 100644 index 00000000000..d69b5c73b59 --- /dev/null +++ b/backend/plugins/github/tasks/cicd_run_collector_test.go @@ -0,0 +1,490 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tasks + +import ( + "bytes" + "encoding/json" + "io" + "net/http" + "net/url" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/apache/incubator-devlake/core/errors" + "github.com/apache/incubator-devlake/core/plugin" + helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api" + "github.com/apache/incubator-devlake/helpers/unithelper" + mockdal "github.com/apache/incubator-devlake/mocks/core/dal" + mockapi "github.com/apache/incubator-devlake/mocks/helpers/pluginhelper/api" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +// newTestBuilder constructs a leafWindowBuilder with a stubbed probe for unit testing. +func newTestBuilder(probe probeFunc) *leafWindowBuilder { + mockDal := new(mockdal.Dal) + return &leafWindowBuilder{ + taskCtx: unithelper.DummySubTaskContext(mockDal), + data: &GithubTaskData{Options: &GithubOptions{Name: "o/r"}}, + probe: probe, + } +} + +// ------------------------------------------------------------------------------------------ +// leafWindowBuilder.build unit tests +// ------------------------------------------------------------------------------------------ + +func TestCicdRunBuildLeafWindows_SingleWindowUnderCap(t *testing.T) { + b := newTestBuilder(func(_ plugin.SubTaskContext, _ *GithubTaskData, _, _ time.Time) (int, bool, errors.Error) { + return 500, false, nil + }) + + from := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + to := time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC) + + leaves, err := b.build(from, to) + require.Nil(t, err) + require.Len(t, leaves, 1) + assert.True(t, leaves[0].From.Equal(from)) + assert.True(t, leaves[0].To.Equal(to)) +} + +func TestCicdRunBuildLeafWindows_EmptyWindow(t *testing.T) { + b := newTestBuilder(func(_ plugin.SubTaskContext, _ *GithubTaskData, _, _ time.Time) (int, bool, errors.Error) { + return 0, false, nil + }) + + from := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + to := time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC) + + leaves, err := b.build(from, to) + require.Nil(t, err) + assert.Empty(t, leaves) +} + +// TestCicdRunBuildLeafWindows_InvertedWindowReturnsNoLeaves pins the entry-guard contract: +// when `from > to` (rapid re-sync edge case), build must short-circuit to (nil, nil) +// without probing so no GitHub round-trip is paid on a no-op sync. +func TestCicdRunBuildLeafWindows_InvertedWindowReturnsNoLeaves(t *testing.T) { + var probeCalls int32 + b := newTestBuilder(func(_ plugin.SubTaskContext, _ *GithubTaskData, _, _ time.Time) (int, bool, errors.Error) { + atomic.AddInt32(&probeCalls, 1) + return 0, false, nil + }) + + to := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + from := to.Add(2 * time.Second) + + leaves, err := b.build(from, to) + require.Nil(t, err) + assert.Nil(t, leaves) + assert.Equal(t, int32(0), atomic.LoadInt32(&probeCalls)) +} + +func TestCicdRunBuildLeafWindows_BisectsOnCap(t *testing.T) { + var callCount int32 + fullWindow := time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).Sub(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) + b := newTestBuilder(func(_ plugin.SubTaskContext, _ *GithubTaskData, f, to time.Time) (int, bool, errors.Error) { + atomic.AddInt32(&callCount, 1) + if to.Sub(f) >= fullWindow { + return 1000, false, nil + } + return 500, false, nil + }) + + from := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + to := time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC) + + leaves, err := b.build(from, to) + require.Nil(t, err) + require.Len(t, leaves, 2) + assert.GreaterOrEqual(t, atomic.LoadInt32(&callCount), int32(3)) + assert.True(t, leaves[0].From.Equal(from)) + assert.True(t, leaves[1].To.Equal(to)) +} + +func TestCicdRunBuildLeafWindows_BisectsOn422(t *testing.T) { + fullWindow := time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).Sub(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) + b := newTestBuilder(func(_ plugin.SubTaskContext, _ *GithubTaskData, f, to time.Time) (int, bool, errors.Error) { + if to.Sub(f) >= fullWindow { + return 0, true, nil + } + return 100, false, nil + }) + + from := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + to := time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC) + + leaves, err := b.build(from, to) + require.Nil(t, err) + require.Len(t, leaves, 2) +} + +func TestCicdRunBuildLeafWindows_MinWindowReturnsError(t *testing.T) { + // Only a single-second bucket (from.Unix() == to.Unix()) is truly unbisectable; a 1s-wide + // window bisects into [T, T] and [T+1, T+1] and the recursive call on the inner bucket + // surfaces the error. + t.Run("zero-width saturated window -> error", func(t *testing.T) { + b := newTestBuilder(func(_ plugin.SubTaskContext, _ *GithubTaskData, _, _ time.Time) (int, bool, errors.Error) { + return 1000, false, nil + }) + + from := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + to := from + + leaves, err := b.build(from, to) + assert.Nil(t, leaves) + require.NotNil(t, err) + assert.Contains(t, err.Error(), "single 1-second bucket") + assert.Contains(t, err.Error(), "Refusing to advance collector state") + }) + + t.Run("1s-wide saturated window bisects then fails on inner single-second bucket", func(t *testing.T) { + b := newTestBuilder(func(_ plugin.SubTaskContext, _ *GithubTaskData, _, _ time.Time) (int, bool, errors.Error) { + return 1000, false, nil + }) + + from := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + to := from.Add(time.Second) + + leaves, err := b.build(from, to) + assert.Nil(t, leaves) + require.NotNil(t, err) + assert.Contains(t, err.Error(), "single 1-second bucket") + }) +} + +func TestCicdRunBuildLeafWindows_MinWindowOn422ReturnsError(t *testing.T) { + t.Run("zero-width 422 window -> error", func(t *testing.T) { + b := newTestBuilder(func(_ plugin.SubTaskContext, _ *GithubTaskData, _, _ time.Time) (int, bool, errors.Error) { + return 0, true, nil + }) + + from := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + to := from + + leaves, err := b.build(from, to) + assert.Nil(t, leaves) + require.NotNil(t, err) + assert.Contains(t, err.Error(), "single 1-second bucket") + }) + + t.Run("1s-wide 422 window bisects then fails on inner single-second bucket", func(t *testing.T) { + b := newTestBuilder(func(_ plugin.SubTaskContext, _ *GithubTaskData, _, _ time.Time) (int, bool, errors.Error) { + return 0, true, nil + }) + + from := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + to := from.Add(time.Second) + + leaves, err := b.build(from, to) + assert.Nil(t, leaves) + require.NotNil(t, err) + assert.Contains(t, err.Error(), "single 1-second bucket") + }) +} + +func TestCicdRunBuildLeafWindows_NoRightHalfEmptyAtSubSecondWidths(t *testing.T) { + // Regression guard: integer-second bisection keeps both halves inside [from, to] at + // sub-second-tail widths; the old duration-based bisector failed with "right half empty". + from := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + to := from.Add(1500 * time.Millisecond) + b := newTestBuilder(func(_ plugin.SubTaskContext, _ *GithubTaskData, f, tt time.Time) (int, bool, errors.Error) { + if f.Equal(from) && tt.Equal(to) { + return 1000, false, nil + } + return 500, false, nil + }) + + leaves, err := b.build(from, to) + require.Nil(t, err) + require.Len(t, leaves, 2) + assert.Equal(t, from.Unix(), leaves[0].From.Unix()) + assert.Equal(t, from.Unix(), leaves[0].To.Unix()) + assert.Equal(t, from.Unix()+1, leaves[1].From.Unix()) + assert.True(t, leaves[1].To.Equal(to)) +} + +// TestCicdRunBuildLeafWindows_ProbeErrIgnoreAndContinueRecovery pins the contract that +// `build` treats (total=0, is422=true, err=nil) as a valid saturated signal, which is what +// defaultProbeTotalCount emits when a shared ApiAsyncClient's AfterResponse hook converts +// 422 into helper.ErrIgnoreAndContinue. +func TestCicdRunBuildLeafWindows_ProbeErrIgnoreAndContinueRecovery(t *testing.T) { + fullWindow := time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).Sub(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) + b := newTestBuilder(func(_ plugin.SubTaskContext, _ *GithubTaskData, f, to time.Time) (int, bool, errors.Error) { + if to.Sub(f) >= fullWindow { + return 0, true, nil + } + return 200, false, nil + }) + + from := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + to := time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC) + + leaves, err := b.build(from, to) + require.Nil(t, err) + require.Len(t, leaves, 2) + assert.True(t, leaves[0].From.Equal(from)) + assert.True(t, leaves[1].To.Equal(to)) + assert.True(t, leaves[1].From.Equal(leaves[0].To.Add(time.Second))) +} + +func TestCicdRunBuildLeafWindows_ProbeErrorPropagates(t *testing.T) { + b := newTestBuilder(func(_ plugin.SubTaskContext, _ *GithubTaskData, _, _ time.Time) (int, bool, errors.Error) { + return 0, false, errors.Default.New("boom") + }) + + from := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + to := time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC) + + leaves, err := b.build(from, to) + assert.Nil(t, leaves) + require.NotNil(t, err) + assert.Contains(t, err.Error(), "boom") +} + +func TestCicdRunBuildLeafWindows_BoundaryCases(t *testing.T) { + // Boundary total_count values at zero-width (unbisectable single-second bucket) and 1s + // (bisectable) widths. + cases := []struct { + total int + is422 bool + expectLeaf bool + expectError bool + description string + }{ + {0, false, false, false, "zero total -> no leaf"}, + {1, false, true, false, "1 total -> one leaf"}, + {999, false, true, false, "999 total -> one leaf"}, + {1000, false, false, true, "1000 -> error (saturated at bucket)"}, + {1001, false, false, true, "1001 -> error (saturated at bucket)"}, + {0, true, false, true, "422 -> error (saturated at bucket)"}, + } + widths := []struct { + name string + duration time.Duration + }{ + {"zero-width (single-second bucket)", 0}, + {"1s-wide (bisectable at second precision)", time.Second}, + } + for _, w := range widths { + for _, c := range cases { + t.Run(w.name+"/"+c.description, func(t *testing.T) { + total := c.total + is422 := c.is422 + b := newTestBuilder(func(_ plugin.SubTaskContext, _ *GithubTaskData, _, _ time.Time) (int, bool, errors.Error) { + return total, is422, nil + }) + + from := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + to := from.Add(w.duration) + + leaves, err := b.build(from, to) + if c.expectError { + require.NotNil(t, err) + assert.Nil(t, leaves) + assert.Contains(t, err.Error(), "single 1-second bucket") + return + } + require.Nil(t, err) + if c.expectLeaf { + assert.Len(t, leaves, 1) + } else { + assert.Empty(t, leaves) + } + }) + } + } +} + +func TestCicdRunBuildLeafWindows_IntegerSecondSplit(t *testing.T) { + // Parent [T, T+1s] with total=1000 must split into two single-second-bucket leaves + // [T, T] and [T+1, T+1], each with total < FILTERED_SEARCH_CAP. + from := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + to := from.Add(time.Second) + b := newTestBuilder(func(_ plugin.SubTaskContext, _ *GithubTaskData, f, tt time.Time) (int, bool, errors.Error) { + if f.Equal(from) && tt.Equal(to) { + return 1000, false, nil + } + return 500, false, nil + }) + + leaves, err := b.build(from, to) + require.Nil(t, err) + require.Len(t, leaves, 2) + + assert.Equal(t, from.Unix(), leaves[0].From.Unix()) + assert.Equal(t, from.Unix(), leaves[0].To.Unix()) + assert.Equal(t, from.Unix()+1, leaves[1].From.Unix()) + assert.Equal(t, from.Unix()+1, leaves[1].To.Unix()) + assert.True(t, leaves[1].From.Equal(leaves[0].To.Add(time.Second))) +} + +func TestCicdRunBuildLeafWindows_BootstrapFromEpoch(t *testing.T) { + // Simulate createdAfter == nil path: caller passes 2018-01-01 as start. + full := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC).Sub(time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)) + b := newTestBuilder(func(_ plugin.SubTaskContext, _ *GithubTaskData, f, to time.Time) (int, bool, errors.Error) { + if to.Sub(f) >= full/2 { + return 1000, false, nil + } + return 500, false, nil + }) + + from := time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC) + to := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + + leaves, err := b.build(from, to) + require.Nil(t, err) + assert.GreaterOrEqual(t, len(leaves), 2) +} + +// ------------------------------------------------------------------------------------------ +// Boundary (non-overlap) test +// ------------------------------------------------------------------------------------------ + +func TestCicdRunBuildLeafWindows_BoundaryNonOverlapping(t *testing.T) { + fullWindow := time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC).Sub(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) + b := newTestBuilder(func(_ plugin.SubTaskContext, _ *GithubTaskData, f, to time.Time) (int, bool, errors.Error) { + if to.Sub(f) >= fullWindow { + return 1000, false, nil + } + return 500, false, nil + }) + + from := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + to := time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC) + + leaves, err := b.build(from, to) + require.Nil(t, err) + require.Len(t, leaves, 2) + + assert.True(t, leaves[1].From.Equal(leaves[0].To.Add(time.Second)), + "right.From (%s) must equal left.To+1s (%s)", + leaves[1].From.UTC().Format(time.RFC3339), + leaves[0].To.Add(time.Second).UTC().Format(time.RFC3339)) + + leftStr := leaves[0].To.UTC().Format(githubTimeLayout) + rightStr := leaves[1].From.UTC().Format(githubTimeLayout) + assert.NotEqual(t, leftStr, rightStr, "boundary timestamps must differ by at least 1s") +} + +// ------------------------------------------------------------------------------------------ +// Query hook window application +// ------------------------------------------------------------------------------------------ + +func TestCicdRunQueryHookAppliesWindow(t *testing.T) { + from := time.Date(2024, 1, 2, 3, 4, 5, 0, time.UTC) + to := time.Date(2024, 6, 7, 8, 9, 10, 0, time.UTC) + reqData := &helper.RequestData{ + Pager: &helper.Pager{Page: 2, Size: 30}, + Input: &TimeWindow{From: from, To: to}, + } + + q, err := buildRunsQuery(reqData) + require.Nil(t, err) + assert.Equal(t, "2024-01-02T03:04:05Z..2024-06-07T08:09:10Z", q.Get("created")) + assert.Equal(t, "2", q.Get("page")) + assert.Equal(t, "30", q.Get("per_page")) + assert.True(t, strings.Contains(q.Get("created"), "..")) +} + +// ------------------------------------------------------------------------------------------ +// Thin integration test +// Verifies: +// - Delete (raw table truncation) is called at most once for a full sync +// - DoGetAsync is called at least once per leaf window (concurrency=1, PageSize=3) +// ------------------------------------------------------------------------------------------ + +func TestCicdRunRegisterCollectorForLeafWindows_SingleDelete(t *testing.T) { + mockDal := new(mockdal.Dal) + notFoundErr := errors.Default.New("record not found") + mockDal.On("First", mock.Anything, mock.Anything).Return(notFoundErr).Once() + mockDal.On("IsErrorNotFound", mock.Anything).Return(true).Maybe() + mockDal.On("Update", mock.Anything, mock.Anything).Return(nil).Maybe() + mockDal.On("AutoMigrate", mock.Anything, mock.Anything).Return(nil).Once() + mockDal.On("Delete", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + mockDal.On("Create", mock.Anything, mock.Anything).Return(nil).Maybe() + + mockCtx := unithelper.DummySubTaskContext(mockDal) + + var getAsyncCount int32 + mockApi := new(mockapi.RateLimitedApiClient) + mockApi.On("DoGetAsync", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + atomic.AddInt32(&getAsyncCount, 1) + res := &http.Response{ + Request: &http.Request{URL: &url.URL{}}, + Body: io.NopCloser(bytes.NewBufferString(`{"total_count":0,"workflow_runs":[]}`)), + } + handler := args.Get(3).(plugin.ApiAsyncCallback) + _ = handler(res) + }) + mockApi.On("NextTick", mock.Anything).Run(func(args mock.Arguments) { + fn := args.Get(0).(func() errors.Error) + assert.Nil(t, fn()) + }) + mockApi.On("HasError").Return(false) + mockApi.On("WaitAsync").Return(nil) + mockApi.On("GetAfterFunction").Return(nil) + mockApi.On("SetAfterFunction", mock.Anything).Return() + + manager, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ + Ctx: mockCtx, + Params: GithubApiParams{ + ConnectionId: 1, + Name: "apache/incubator-devlake", + }, + Table: RAW_RUN_TABLE, + }) + require.Nil(t, err) + + leaves := []TimeWindow{ + {From: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), To: time.Date(2024, 1, 15, 0, 0, 0, 0, time.UTC)}, + {From: time.Date(2024, 1, 15, 0, 0, 1, 0, time.UTC), To: time.Date(2024, 2, 1, 0, 0, 0, 0, time.UTC)}, + {From: time.Date(2024, 2, 1, 0, 0, 1, 0, time.UTC), To: time.Date(2024, 2, 15, 0, 0, 0, 0, time.UTC)}, + } + + // Force concurrency=1 and PageSize=3 for deterministic DoGetAsync counting; share the + // production Query hook via buildRunsQuery so changes to it stay covered. + iterator := helper.NewQueueIterator() + for i := range leaves { + w := leaves[i] + iterator.Push(&w) + } + err = manager.InitCollector(helper.ApiCollectorArgs{ + ApiClient: mockApi, + Input: iterator, + UrlTemplate: "repos/{{ .Params.Name }}/actions/runs", + Query: buildRunsQuery, + PageSize: 3, + Concurrency: 1, + ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) { + return nil, nil + }, + }) + require.Nil(t, err) + + require.Nil(t, manager.Execute()) + + mockDal.AssertExpectations(t) + assert.GreaterOrEqual(t, atomic.LoadInt32(&getAsyncCount), int32(len(leaves))) +}