Skip to content

Commit 35cf936

Browse files
committed
fix(github): bisect adaptive time windows for workflow runs 40k pagination cap
GitHub's /actions/runs enforces a 40k cap in unfiltered mode and a 1,000-item cap per filtered search, making any repo with >40k workflow runs uncollectable. Switch to filtered mode and recursively bisect time windows at integer-second midpoints, probing via SubmitBlocking to share rate-limit with the main collector and feeding leaves to a single ApiCollector so raw-table Delete fires only once. See PR description for the full design rationale. Closes #8842 Signed-off-by: yamoyamoto <yamo7yamoto@gmail.com>
1 parent a6335ae commit 35cf936

2 files changed

Lines changed: 750 additions & 64 deletions

File tree

backend/plugins/github/tasks/cicd_run_collector.go

Lines changed: 260 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
"time"
2626

2727
"github.com/apache/incubator-devlake/core/errors"
28-
"github.com/apache/incubator-devlake/core/models/common"
28+
"github.com/apache/incubator-devlake/core/log"
2929
"github.com/apache/incubator-devlake/core/plugin"
3030
helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
3131
)
@@ -36,14 +36,20 @@ func init() {
3636

3737
const RAW_RUN_TABLE = "github_api_runs"
3838

39-
// Although the API accepts a maximum of 100 entries per page, sometimes
40-
// the response body is too large which would lead to request failures
41-
// https://github.com/apache/incubator-devlake/issues/3199
39+
// PAGE_SIZE is below GitHub's 100-item max to avoid oversized response bodies (#3199).
4240
const PAGE_SIZE = 30
4341

44-
type SimpleGithubApiJob struct {
45-
ID int64
46-
CreatedAt common.Iso8601Time `json:"created_at"`
42+
// FILTERED_SEARCH_CAP is GitHub's per-query item cap for `/actions/runs` in filtered mode
43+
// (`created=<from>..<to>`); exceeding it triggers HTTP 422. See #8842.
44+
const FILTERED_SEARCH_CAP = 1000
45+
46+
// githubTimeLayout is the ISO8601 format GitHub expects in the `created` filter.
47+
const githubTimeLayout = "2006-01-02T15:04:05Z"
48+
49+
// TimeWindow is an inclusive-both-ends range for the `/actions/runs` `created=<from>..<to>` query.
50+
type TimeWindow struct {
51+
From time.Time
52+
To time.Time
4753
}
4854

4955
var CollectRunsMeta = plugin.SubTaskMeta{
@@ -56,75 +62,265 @@ var CollectRunsMeta = plugin.SubTaskMeta{
5662
ProductTables: []string{RAW_RUN_TABLE},
5763
}
5864

65+
// CollectRuns collects GitHub Workflow Runs into the raw table, working around the two
66+
// pagination caps of `/actions/runs`: 40k items in unfiltered mode and 1000 items per
67+
// filtered window. It probes each candidate window with `per_page=1`, bisects recursively
68+
// until every leaf is under FILTERED_SEARCH_CAP, then feeds the leaves to a single
69+
// ApiCollector so the raw table is truncated only once per fullsync.
5970
func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error {
6071
data := taskCtx.GetData().(*GithubTaskData)
61-
log := taskCtx.GetLogger()
62-
collector, err := helper.NewStatefulApiCollectorForFinalizableEntity(helper.FinalizableApiCollectorArgs{
63-
RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
64-
Ctx: taskCtx,
65-
Params: GithubApiParams{
66-
ConnectionId: data.Options.ConnectionId,
67-
Name: data.Options.Name,
68-
},
69-
Table: RAW_RUN_TABLE,
72+
logger := taskCtx.GetLogger()
73+
74+
manager, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
75+
Ctx: taskCtx,
76+
Params: GithubApiParams{
77+
ConnectionId: data.Options.ConnectionId,
78+
Name: data.Options.Name,
7079
},
71-
ApiClient: data.ApiClient,
72-
CollectNewRecordsByList: helper.FinalizableApiCollectorListArgs{
73-
PageSize: PAGE_SIZE,
74-
Concurrency: 10,
75-
FinalizableApiCollectorCommonArgs: helper.FinalizableApiCollectorCommonArgs{
76-
UrlTemplate: "repos/{{ .Params.Name }}/actions/runs",
77-
Query: func(reqData *helper.RequestData, createdAfter *time.Time) (url.Values, errors.Error) {
78-
query := url.Values{}
79-
// GitHub API returns only the first 34 pages (with a size of 30) when specifying status=compleleted, try the following API request to verify the problem.
80-
// https://api.github.com/repos/apache/incubator-devlake/actions/runs?per_page=30&page=35&status=completed
81-
// query.Set("status", "completed")
82-
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
83-
query.Set("per_page", fmt.Sprintf("%v", reqData.Pager.Size))
84-
return query, nil
85-
},
86-
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
87-
body := &GithubRawRunsResult{}
88-
err := helper.UnmarshalResponse(res, body)
89-
if err != nil {
90-
return nil, err
91-
}
92-
if len(body.WorkflowRuns) == 0 {
93-
return nil, nil
94-
}
95-
// filter out the runs that are not completed
96-
filteredRuns := make([]json.RawMessage, 0)
97-
for _, run := range body.WorkflowRuns {
98-
if run.Status == "completed" {
99-
runJSON, err := json.Marshal(run)
100-
if err != nil {
101-
return nil, errors.Convert(err)
102-
}
103-
filteredRuns = append(filteredRuns, json.RawMessage(runJSON))
104-
} else {
105-
log.Info("Skipping run{id: %d, number: %d} with status %s", run.ID, run.RunNumber, run.Status)
106-
}
107-
}
108-
return filteredRuns, nil
109-
},
110-
},
111-
GetCreated: func(item json.RawMessage) (time.Time, errors.Error) {
112-
pj := &SimpleGithubApiJob{}
113-
err := json.Unmarshal(item, pj)
80+
Table: RAW_RUN_TABLE,
81+
})
82+
if err != nil {
83+
return err
84+
}
85+
86+
// Normalize both bounds to second precision so `created=<from>..<to>` queries and the
87+
// persisted LatestSuccessStart share the same boundary; without this, every incremental
88+
// sync would re-fetch up to 1s of overlap. For incremental syncs we also advance
89+
// `windowStart` past the previously collected second (inclusive-both-ends), while
90+
// fullsync + TimeAfter keeps the user-specified bound inclusive.
91+
createdAfter := manager.GetSince()
92+
untilPtr := manager.GetUntil()
93+
*untilPtr = untilPtr.Truncate(time.Second)
94+
until := *untilPtr
95+
96+
var windowStart time.Time
97+
if createdAfter != nil {
98+
windowStart = createdAfter.Truncate(time.Second)
99+
if manager.IsIncremental() {
100+
windowStart = windowStart.Add(time.Second)
101+
}
102+
} else {
103+
// 2018-01-01 conservatively predates GitHub Actions' late-2019 GA.
104+
windowStart = time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)
105+
}
106+
107+
logger.Info("cicd_run_collector: collecting workflow runs in [%s, %s] (incremental=%v)",
108+
windowStart.Format(githubTimeLayout),
109+
until.Format(githubTimeLayout),
110+
manager.IsIncremental())
111+
112+
leafWindows, err := newLeafWindowBuilder(taskCtx, data).build(windowStart, until)
113+
if err != nil {
114+
return err
115+
}
116+
logger.Info("cicd_run_collector: built %d leaf windows for collection", len(leafWindows))
117+
118+
if err := registerCollectorForLeafWindows(manager, data.ApiClient, leafWindows); err != nil {
119+
return err
120+
}
121+
122+
return manager.Execute()
123+
}
124+
125+
// buildRunsQuery assembles the filtered-mode query for a single leaf TimeWindow.
126+
// Shared between registerCollectorForLeafWindows and tests.
127+
func buildRunsQuery(reqData *helper.RequestData) (url.Values, errors.Error) {
128+
w, ok := reqData.Input.(*TimeWindow)
129+
if !ok || w == nil {
130+
return nil, errors.Default.New("cicd_run_collector: Input is not *TimeWindow")
131+
}
132+
q := url.Values{}
133+
q.Set("created", fmt.Sprintf("%s..%s",
134+
w.From.UTC().Format(githubTimeLayout),
135+
w.To.UTC().Format(githubTimeLayout)))
136+
q.Set("page", fmt.Sprintf("%d", reqData.Pager.Page))
137+
q.Set("per_page", fmt.Sprintf("%d", reqData.Pager.Size))
138+
return q, nil
139+
}
140+
141+
// registerCollectorForLeafWindows wires a single ApiCollector whose Input iterator feeds the
142+
// leaf TimeWindows.
143+
func registerCollectorForLeafWindows(
144+
manager *helper.StatefulApiCollector,
145+
apiClient helper.RateLimitedApiClient,
146+
leafWindows []TimeWindow,
147+
) errors.Error {
148+
iterator := helper.NewQueueIterator()
149+
for i := range leafWindows {
150+
w := leafWindows[i]
151+
iterator.Push(&w)
152+
}
153+
return manager.InitCollector(helper.ApiCollectorArgs{
154+
ApiClient: apiClient,
155+
Input: iterator,
156+
UrlTemplate: "repos/{{ .Params.Name }}/actions/runs",
157+
Query: buildRunsQuery,
158+
PageSize: PAGE_SIZE,
159+
Concurrency: 10,
160+
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
161+
body := &GithubRawRunsResult{}
162+
if err := helper.UnmarshalResponse(res, body); err != nil {
163+
return nil, err
164+
}
165+
if len(body.WorkflowRuns) == 0 {
166+
return nil, nil
167+
}
168+
// Range is already bounded in filtered mode; only keep completed runs.
169+
filtered := make([]json.RawMessage, 0, len(body.WorkflowRuns))
170+
for _, run := range body.WorkflowRuns {
171+
if run.Status != "completed" {
172+
continue
173+
}
174+
runJSON, err := json.Marshal(run)
114175
if err != nil {
115-
return time.Time{}, errors.BadInput.Wrap(err, "failed to unmarshal github run")
176+
return nil, errors.Convert(err)
116177
}
117-
return pj.CreatedAt.ToTime(), nil
118-
},
178+
filtered = append(filtered, json.RawMessage(runJSON))
179+
}
180+
return filtered, nil
119181
},
120182
})
183+
}
184+
185+
// probeFunc signature matches defaultProbeTotalCount so tests can inject a fake.
186+
type probeFunc func(taskCtx plugin.SubTaskContext, data *GithubTaskData, from, to time.Time) (int, bool, errors.Error)
187+
188+
// leafWindowBuilder recursively bisects a [from, to] range until every leaf window fits under
189+
// FILTERED_SEARCH_CAP. The probe function is a field so tests can inject a fake without
190+
// mutating package-level state.
191+
type leafWindowBuilder struct {
192+
taskCtx plugin.SubTaskContext
193+
data *GithubTaskData
194+
probe probeFunc
195+
logger log.Logger
196+
}
197+
198+
func newLeafWindowBuilder(taskCtx plugin.SubTaskContext, data *GithubTaskData) *leafWindowBuilder {
199+
return &leafWindowBuilder{
200+
taskCtx: taskCtx,
201+
data: data,
202+
probe: defaultProbeTotalCount,
203+
logger: taskCtx.GetLogger(),
204+
}
205+
}
206+
207+
// build recursively bisects [from, to] until every leaf has total_count < FILTERED_SEARCH_CAP
208+
// (or the window is a single-second bucket that cannot be split further). Empty windows are
209+
// dropped.
210+
//
211+
// Boundary policy (non-overlapping, full coverage at second precision):
212+
// - left: created=<from>..<mid>
213+
// - right: created=<mid+1s>..<to>
214+
//
215+
// Bisection is done on integer Unix seconds because GitHub's `created` filter is
216+
// second-precision; a single-second bucket (from.Unix() == to.Unix()) is the smallest
217+
// indivisible unit.
218+
func (b *leafWindowBuilder) build(from, to time.Time) ([]TimeWindow, errors.Error) {
219+
if !from.Before(to) && !from.Equal(to) {
220+
return nil, nil
221+
}
121222

223+
total, is422, err := b.probe(b.taskCtx, b.data, from, to)
122224
if err != nil {
123-
return err
225+
return nil, err
226+
}
227+
228+
if total == 0 && !is422 {
229+
return nil, nil
124230
}
125231

126-
return collector.Execute()
232+
if total >= FILTERED_SEARCH_CAP || is422 {
233+
fromSec := from.UTC().Unix()
234+
toSec := to.UTC().Unix()
235+
if fromSec == toSec {
236+
return nil, errors.Default.New(fmt.Sprintf(
237+
"cicd_run_collector: %d runs within a single 1-second bucket at %s; cannot bisect further. "+
238+
"Filtered GitHub search caps at %d items per window, so some runs would be missed. "+
239+
"Refusing to advance collector state.",
240+
total, from.UTC().Format(time.RFC3339), FILTERED_SEARCH_CAP,
241+
))
242+
}
243+
if b.logger != nil {
244+
b.logger.Debug("cicd_run_collector: bisecting [%s, %s] (total=%d, is422=%v)",
245+
from.Format(githubTimeLayout),
246+
to.Format(githubTimeLayout),
247+
total, is422)
248+
}
249+
midSec := (fromSec + toSec) / 2
250+
leftTo := time.Unix(midSec, 0).UTC()
251+
rightFrom := leftTo.Add(time.Second)
252+
left, err := b.build(from, leftTo)
253+
if err != nil {
254+
return nil, err
255+
}
256+
right, err := b.build(rightFrom, to)
257+
if err != nil {
258+
return nil, err
259+
}
260+
return append(left, right...), nil
261+
}
127262

263+
return []TimeWindow{{From: from, To: to}}, nil
264+
}
265+
266+
// defaultProbeTotalCount issues a filtered-mode GET with per_page=1 to learn total_count
267+
// (or detect 422) cheaply. It runs under SubmitBlocking so it shares the rate-limit budget
268+
// with the main collector; DoGetAsync is avoided because that path errors on >=400 before
269+
// the callback, which would hide the 422 we use as a bisection signal.
270+
func defaultProbeTotalCount(
271+
taskCtx plugin.SubTaskContext,
272+
data *GithubTaskData,
273+
from time.Time,
274+
to time.Time,
275+
) (int, bool, errors.Error) {
276+
q := url.Values{}
277+
q.Set("per_page", "1")
278+
q.Set("page", "1")
279+
q.Set("created", fmt.Sprintf("%s..%s",
280+
from.UTC().Format(githubTimeLayout),
281+
to.UTC().Format(githubTimeLayout)))
282+
path := fmt.Sprintf("repos/%s/actions/runs", data.Options.Name)
283+
284+
var total int
285+
var is422 bool
286+
var innerErr errors.Error
287+
data.ApiClient.SubmitBlocking(func() errors.Error {
288+
res, getErr := data.ApiClient.Get(path, q, nil)
289+
// If a sibling subtask installed an AfterResponse hook that maps 422 ->
290+
// helper.ErrIgnoreAndContinue, ApiClient.Do already closed res.Body before returning
291+
// the sentinel (api_client.go L389). Recover the 422 signal here without double-closing.
292+
// Sentinel comparison uses `==` to stay consistent with every other ErrIgnoreAndContinue
293+
// call site in devlake (api_client.go:389, api_async_client.go:165, etc.).
294+
if getErr == helper.ErrIgnoreAndContinue && res != nil && res.StatusCode == http.StatusUnprocessableEntity {
295+
is422 = true
296+
return nil
297+
}
298+
if getErr != nil {
299+
innerErr = getErr
300+
return nil
301+
}
302+
if res.StatusCode == http.StatusUnprocessableEntity {
303+
if res.Body != nil {
304+
_ = res.Body.Close()
305+
}
306+
is422 = true
307+
return nil
308+
}
309+
body := &GithubRawRunsResult{}
310+
if e := helper.UnmarshalResponse(res, body); e != nil {
311+
innerErr = e
312+
return nil
313+
}
314+
total = body.TotalCount
315+
return nil
316+
})
317+
if err := data.ApiClient.WaitAsync(); err != nil {
318+
return 0, false, err
319+
}
320+
if innerErr != nil {
321+
return 0, false, innerErr
322+
}
323+
return total, is422, nil
128324
}
129325

130326
type GithubRawRunsResult struct {

0 commit comments

Comments
 (0)