Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
324 changes: 260 additions & 64 deletions backend/plugins/github/tasks/cicd_run_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"time"

"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models/common"
"github.com/apache/incubator-devlake/core/log"
"github.com/apache/incubator-devlake/core/plugin"
helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
)
Expand All @@ -36,14 +36,20 @@ func init() {

const RAW_RUN_TABLE = "github_api_runs"

// Although the API accepts a maximum of 100 entries per page, sometimes
// the response body is too large which would lead to request failures
// https://github.com/apache/incubator-devlake/issues/3199
// PAGE_SIZE is below GitHub's 100-item max to avoid oversized response bodies (#3199).
const PAGE_SIZE = 30

type SimpleGithubApiJob struct {
ID int64
CreatedAt common.Iso8601Time `json:"created_at"`
// FILTERED_SEARCH_CAP is GitHub's per-query item cap for `/actions/runs` in filtered mode
// (`created=<from>..<to>`); exceeding it triggers HTTP 422. See #8842.
const FILTERED_SEARCH_CAP = 1000

// githubTimeLayout is the ISO8601 format GitHub expects in the `created` filter.
const githubTimeLayout = "2006-01-02T15:04:05Z"

// TimeWindow is an inclusive-both-ends range for the `/actions/runs` `created=<from>..<to>` query.
type TimeWindow struct {
From time.Time
To time.Time
}

var CollectRunsMeta = plugin.SubTaskMeta{
Expand All @@ -56,75 +62,265 @@ var CollectRunsMeta = plugin.SubTaskMeta{
ProductTables: []string{RAW_RUN_TABLE},
}

// CollectRuns collects GitHub Workflow Runs into the raw table, working around the two
// pagination caps of `/actions/runs`: 40k items in unfiltered mode and 1000 items per
// filtered window. It probes each candidate window with `per_page=1`, bisects recursively
// until every leaf is under FILTERED_SEARCH_CAP, then feeds the leaves to a single
// ApiCollector so the raw table is truncated only once per fullsync.
func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error {
data := taskCtx.GetData().(*GithubTaskData)
log := taskCtx.GetLogger()
collector, err := helper.NewStatefulApiCollectorForFinalizableEntity(helper.FinalizableApiCollectorArgs{
RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: GithubApiParams{
ConnectionId: data.Options.ConnectionId,
Name: data.Options.Name,
},
Table: RAW_RUN_TABLE,
logger := taskCtx.GetLogger()

manager, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: GithubApiParams{
ConnectionId: data.Options.ConnectionId,
Name: data.Options.Name,
},
ApiClient: data.ApiClient,
CollectNewRecordsByList: helper.FinalizableApiCollectorListArgs{
PageSize: PAGE_SIZE,
Concurrency: 10,
FinalizableApiCollectorCommonArgs: helper.FinalizableApiCollectorCommonArgs{
UrlTemplate: "repos/{{ .Params.Name }}/actions/runs",
Query: func(reqData *helper.RequestData, createdAfter *time.Time) (url.Values, errors.Error) {
query := url.Values{}
// GitHub API returns only the first 34 pages (with a size of 30) when specifying status=compleleted, try the following API request to verify the problem.
// https://api.github.com/repos/apache/incubator-devlake/actions/runs?per_page=30&page=35&status=completed
// query.Set("status", "completed")
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("per_page", fmt.Sprintf("%v", reqData.Pager.Size))
return query, nil
},
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
body := &GithubRawRunsResult{}
err := helper.UnmarshalResponse(res, body)
if err != nil {
return nil, err
}
if len(body.WorkflowRuns) == 0 {
return nil, nil
}
// filter out the runs that are not completed
filteredRuns := make([]json.RawMessage, 0)
for _, run := range body.WorkflowRuns {
if run.Status == "completed" {
runJSON, err := json.Marshal(run)
if err != nil {
return nil, errors.Convert(err)
}
filteredRuns = append(filteredRuns, json.RawMessage(runJSON))
} else {
log.Info("Skipping run{id: %d, number: %d} with status %s", run.ID, run.RunNumber, run.Status)
}
}
return filteredRuns, nil
},
},
GetCreated: func(item json.RawMessage) (time.Time, errors.Error) {
pj := &SimpleGithubApiJob{}
err := json.Unmarshal(item, pj)
Table: RAW_RUN_TABLE,
})
if err != nil {
return err
}

// Normalize both bounds to second precision so `created=<from>..<to>` queries and the
// persisted LatestSuccessStart share the same boundary; without this, every incremental
// sync would re-fetch up to 1s of overlap. For incremental syncs we also advance
// `windowStart` past the previously collected second (inclusive-both-ends), while
// fullsync + TimeAfter keeps the user-specified bound inclusive.
createdAfter := manager.GetSince()
untilPtr := manager.GetUntil()
*untilPtr = untilPtr.Truncate(time.Second)
until := *untilPtr

var windowStart time.Time
if createdAfter != nil {
windowStart = createdAfter.Truncate(time.Second)
if manager.IsIncremental() {
windowStart = windowStart.Add(time.Second)
}
} else {
// 2018-01-01 conservatively predates GitHub Actions' late-2019 GA.
windowStart = time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)
}

logger.Info("cicd_run_collector: collecting workflow runs in [%s, %s] (incremental=%v)",
windowStart.Format(githubTimeLayout),
until.Format(githubTimeLayout),
manager.IsIncremental())

leafWindows, err := newLeafWindowBuilder(taskCtx, data).build(windowStart, until)
if err != nil {
return err
}
logger.Info("cicd_run_collector: built %d leaf windows for collection", len(leafWindows))

if err := registerCollectorForLeafWindows(manager, data.ApiClient, leafWindows); err != nil {
return err
}

return manager.Execute()
}

// buildRunsQuery assembles the filtered-mode query for a single leaf TimeWindow.
// Shared between registerCollectorForLeafWindows and tests.
func buildRunsQuery(reqData *helper.RequestData) (url.Values, errors.Error) {
w, ok := reqData.Input.(*TimeWindow)
if !ok || w == nil {
return nil, errors.Default.New("cicd_run_collector: Input is not *TimeWindow")
}
q := url.Values{}
q.Set("created", fmt.Sprintf("%s..%s",
w.From.UTC().Format(githubTimeLayout),
w.To.UTC().Format(githubTimeLayout)))
q.Set("page", fmt.Sprintf("%d", reqData.Pager.Page))
q.Set("per_page", fmt.Sprintf("%d", reqData.Pager.Size))
return q, nil
}

// registerCollectorForLeafWindows wires a single ApiCollector whose Input iterator feeds the
// leaf TimeWindows.
func registerCollectorForLeafWindows(
manager *helper.StatefulApiCollector,
apiClient helper.RateLimitedApiClient,
leafWindows []TimeWindow,
) errors.Error {
iterator := helper.NewQueueIterator()
for i := range leafWindows {
w := leafWindows[i]
iterator.Push(&w)
}
return manager.InitCollector(helper.ApiCollectorArgs{
ApiClient: apiClient,
Input: iterator,
UrlTemplate: "repos/{{ .Params.Name }}/actions/runs",
Query: buildRunsQuery,
PageSize: PAGE_SIZE,
Concurrency: 10,
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
body := &GithubRawRunsResult{}
if err := helper.UnmarshalResponse(res, body); err != nil {
return nil, err
}
if len(body.WorkflowRuns) == 0 {
return nil, nil
}
// Range is already bounded in filtered mode; only keep completed runs.
filtered := make([]json.RawMessage, 0, len(body.WorkflowRuns))
for _, run := range body.WorkflowRuns {
if run.Status != "completed" {
continue
}
runJSON, err := json.Marshal(run)
if err != nil {
return time.Time{}, errors.BadInput.Wrap(err, "failed to unmarshal github run")
return nil, errors.Convert(err)
}
return pj.CreatedAt.ToTime(), nil
},
filtered = append(filtered, json.RawMessage(runJSON))
}
return filtered, nil
},
})
}

// probeFunc signature matches defaultProbeTotalCount so tests can inject a fake.
type probeFunc func(taskCtx plugin.SubTaskContext, data *GithubTaskData, from, to time.Time) (int, bool, errors.Error)

// leafWindowBuilder recursively bisects a [from, to] range until every leaf window fits under
// FILTERED_SEARCH_CAP. The probe function is a field so tests can inject a fake without
// mutating package-level state.
type leafWindowBuilder struct {
taskCtx plugin.SubTaskContext
data *GithubTaskData
probe probeFunc
logger log.Logger
}

func newLeafWindowBuilder(taskCtx plugin.SubTaskContext, data *GithubTaskData) *leafWindowBuilder {
return &leafWindowBuilder{
taskCtx: taskCtx,
data: data,
probe: defaultProbeTotalCount,
logger: taskCtx.GetLogger(),
}
}

// build recursively bisects [from, to] until every leaf has total_count < FILTERED_SEARCH_CAP
// (or the window is a single-second bucket that cannot be split further). Empty windows are
// dropped.
//
// Boundary policy (non-overlapping, full coverage at second precision):
// - left: created=<from>..<mid>
// - right: created=<mid+1s>..<to>
//
// Bisection is done on integer Unix seconds because GitHub's `created` filter is
// second-precision; a single-second bucket (from.Unix() == to.Unix()) is the smallest
// indivisible unit.
func (b *leafWindowBuilder) build(from, to time.Time) ([]TimeWindow, errors.Error) {
if !from.Before(to) && !from.Equal(to) {
return nil, nil
}

total, is422, err := b.probe(b.taskCtx, b.data, from, to)
if err != nil {
return err
return nil, err
}

if total == 0 && !is422 {
return nil, nil
}

return collector.Execute()
if total >= FILTERED_SEARCH_CAP || is422 {
fromSec := from.UTC().Unix()
toSec := to.UTC().Unix()
if fromSec == toSec {
return nil, errors.Default.New(fmt.Sprintf(
"cicd_run_collector: %d runs within a single 1-second bucket at %s; cannot bisect further. "+
"Filtered GitHub search caps at %d items per window, so some runs would be missed. "+
"Refusing to advance collector state.",
total, from.UTC().Format(time.RFC3339), FILTERED_SEARCH_CAP,
))
}
if b.logger != nil {
b.logger.Debug("cicd_run_collector: bisecting [%s, %s] (total=%d, is422=%v)",
from.Format(githubTimeLayout),
to.Format(githubTimeLayout),
total, is422)
}
midSec := (fromSec + toSec) / 2
leftTo := time.Unix(midSec, 0).UTC()
rightFrom := leftTo.Add(time.Second)
left, err := b.build(from, leftTo)
if err != nil {
return nil, err
}
right, err := b.build(rightFrom, to)
if err != nil {
return nil, err
}
return append(left, right...), nil
}

return []TimeWindow{{From: from, To: to}}, nil
}

// defaultProbeTotalCount issues a filtered-mode GET with per_page=1 to learn total_count
// (or detect 422) cheaply. It runs under SubmitBlocking so it shares the rate-limit budget
// with the main collector; DoGetAsync is avoided because that path errors on >=400 before
// the callback, which would hide the 422 we use as a bisection signal.
func defaultProbeTotalCount(
taskCtx plugin.SubTaskContext,
data *GithubTaskData,
from time.Time,
to time.Time,
) (int, bool, errors.Error) {
q := url.Values{}
q.Set("per_page", "1")
q.Set("page", "1")
q.Set("created", fmt.Sprintf("%s..%s",
from.UTC().Format(githubTimeLayout),
to.UTC().Format(githubTimeLayout)))
path := fmt.Sprintf("repos/%s/actions/runs", data.Options.Name)

var total int
var is422 bool
var innerErr errors.Error
data.ApiClient.SubmitBlocking(func() errors.Error {
res, getErr := data.ApiClient.Get(path, q, nil)
// If a sibling subtask installed an AfterResponse hook that maps 422 ->
// helper.ErrIgnoreAndContinue, ApiClient.Do already closed res.Body before returning
// the sentinel (api_client.go L389). Recover the 422 signal here without double-closing.
// Sentinel comparison uses `==` to stay consistent with every other ErrIgnoreAndContinue
// call site in devlake (api_client.go:389, api_async_client.go:165, etc.).
if getErr == helper.ErrIgnoreAndContinue && res != nil && res.StatusCode == http.StatusUnprocessableEntity {
is422 = true
return nil
}
if getErr != nil {
innerErr = getErr
return nil
}
if res.StatusCode == http.StatusUnprocessableEntity {
if res.Body != nil {
_ = res.Body.Close()
}
is422 = true
return nil
}
body := &GithubRawRunsResult{}
if e := helper.UnmarshalResponse(res, body); e != nil {
innerErr = e
return nil
}
total = body.TotalCount
return nil
})
if err := data.ApiClient.WaitAsync(); err != nil {
return 0, false, err
}
if innerErr != nil {
return 0, false, innerErr
}
return total, is422, nil
}

type GithubRawRunsResult struct {
Expand Down
Loading