Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3c29c9d
feat: initial implementation of estimated finish time
deryrahman Feb 9, 2026
203fa46
refactor: use existing function from sla predictor service
deryrahman Feb 9, 2026
22c83f2
feat: store estimated duration to db
deryrahman Feb 9, 2026
da0c03d
fix: lint
deryrahman Feb 9, 2026
6bde8fe
feat: update proto
deryrahman Feb 9, 2026
de2e950
feat: add handler to generate estimated finish time
deryrahman Feb 9, 2026
6a8fe59
feat: integrate job finish time estimator service
deryrahman Feb 9, 2026
e57aa59
feat: safety check for nil variables
deryrahman Feb 10, 2026
72d998d
test: add test cases for job estimator finish time
deryrahman Feb 10, 2026
63756df
test: add test cases for generate estimation on handler
deryrahman Feb 10, 2026
4966e65
fix: linter
deryrahman Feb 10, 2026
027a52e
fix: test case
deryrahman Feb 10, 2026
5253808
feat: skip disabled job
deryrahman Feb 11, 2026
e863789
refactor: estimated -> estimator + enrich api response
deryrahman Feb 12, 2026
ffd85d2
refactor: remove unecessary vars
deryrahman Feb 12, 2026
c9723ec
feat: create table if not exist
deryrahman Feb 13, 2026
5180284
refactor: update the log using fmt.Sprintf
deryrahman Feb 13, 2026
77afb2e
feat: when job is not started yet use buffer time as well
deryrahman Feb 18, 2026
e3858c2
feat: when job doesn't have sufficient run to estimate duration
deryrahman Feb 18, 2026
989ac8f
feat: when job already finished, it should be checked first
deryrahman Feb 18, 2026
3e179e3
refactor: adjust the calculation + fix test cases
deryrahman Feb 18, 2026
7193fc9
feat: use different config for job expectator detail
deryrahman Feb 18, 2026
1d221da
feat: integrate buffer duration config
deryrahman Feb 18, 2026
c802ede
feat: use different key to distinguished actual finish time
deryrahman Feb 18, 2026
53b4469
fix: new server config
deryrahman Feb 18, 2026
17f3f36
feat: update proto commit
deryrahman Feb 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/goto/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "8e1c00250e4a3ef62086f513d52c43c790946eff"
PROTON_COMMIT := "ccb9ecd951b224d1466494fb4241a6223821e4b5"


.PHONY: build test test-ci generate-proto unit-test-ci integration-test vet coverage clean install lint
Expand Down
58 changes: 58 additions & 0 deletions core/scheduler/handler/v1beta1/job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type JobSLAPredictorService interface {
IdentifySLABreaches(ctx context.Context, projectName tenant.ProjectName, jobNames []scheduler.JobName, labels map[string]string, reqConfig service.JobSLAPredictorRequestConfig) (map[scheduler.JobName]map[scheduler.JobName]*service.JobState, error)
}

type JobExpectatorService interface {
GenerateExpectedFinishTimes(ctx context.Context, projectName tenant.ProjectName, jobNames []scheduler.JobName, labels map[string]string, referenceTime time.Time, scheduleRangeInHours time.Duration) (map[scheduler.JobSchedule]service.FinishTimeDetail, error)
}

type JobRunService interface {
JobRunInput(context.Context, tenant.ProjectName, scheduler.JobName, scheduler.RunConfig) (*scheduler.ExecutorInput, error)
UpdateJobState(context.Context, *scheduler.Event) error
Expand Down Expand Up @@ -73,6 +77,7 @@ type JobRunHandler struct {
jobLineageService JobLineageService
jobSLAPredictorService JobSLAPredictorService
thirdPartySensorService ThirdPartySensorService
jobExpectatorService JobExpectatorService

pb.UnimplementedJobRunServiceServer
}
Expand Down Expand Up @@ -574,6 +579,57 @@ func (h JobRunHandler) GetJobRunLineageSummary(ctx context.Context, req *pb.GetJ
return toJobRunLineageSummaryResponse(jobRunLineages), nil
}

// GenerateExpectedFinishTime generates expected finish time for jobs based on their schedule in the given range
func (h JobRunHandler) GenerateExpectedFinishTime(ctx context.Context, req *pb.GenerateExpectedFinishTimeRequest) (*pb.GenerateExpectedFinishTimeResponse, error) {
projectName, err := tenant.ProjectNameFrom(req.GetProjectName())
if err != nil {
h.l.Error("error adapting project name [%s]: %s", req.GetProjectName(), err)
return nil, errors.GRPCErr(err, "unable to adapt project name")
}

jobNames := []scheduler.JobName{}
for _, jn := range req.GetJobNames() {
jobName, err := scheduler.JobNameFrom(jn)
if err != nil {
h.l.Error("error adapting job name [%s]: %s", jn, err)
Comment thread
deryrahman marked this conversation as resolved.
Outdated
return nil, errors.GRPCErr(err, "unable to adapt job name")
}
jobNames = append(jobNames, jobName)
}

referenceTime := time.Now().UTC()
if req.GetReferenceTime() != nil && req.GetReferenceTime().IsValid() {
referenceTime = req.GetReferenceTime().AsTime().UTC()
}
scheduleRangeInHours := time.Duration(req.GetScheduledRangeInHours()) * time.Hour

jobsWithFinishTime, err := h.jobExpectatorService.GenerateExpectedFinishTimes(ctx, projectName, jobNames, req.GetJobLabels(), referenceTime, scheduleRangeInHours)
if err != nil {
h.l.Error("error generating expected finish times: %s", err)
return nil, errors.GRPCErr(err, "unable to generate expected finish times")
}

response := &pb.GenerateExpectedFinishTimeResponse{
InprogressJobs: make(map[string]*pb.FinishTimeDetailResponse),
FinishedJobs: make(map[string]*pb.FinishTimeDetailResponse),
}
for jobSchedule, jobWithFinishTime := range jobsWithFinishTime {
finishTimeDetail := &pb.FinishTimeDetailResponse{
ScheduledAt: timestamppb.New(jobSchedule.ScheduledAt),
ExpectedFinishTime: timestamppb.New(jobWithFinishTime.FinishTime),
Comment thread
deryrahman marked this conversation as resolved.
Outdated
}

switch jobWithFinishTime.Status {
case service.FinishTimeStatusFinished:
response.FinishedJobs[jobSchedule.JobName.String()] = finishTimeDetail
case service.FinishTimeStatusInprogress:
response.InprogressJobs[jobSchedule.JobName.String()] = finishTimeDetail
}
}

return response, nil
}

func NewJobRunHandler(
l log.Logger,
service JobRunService,
Expand All @@ -582,6 +638,7 @@ func NewJobRunHandler(
jobLineageService JobLineageService,
jobSLAPredictorService JobSLAPredictorService,
thirdPartySensorService ThirdPartySensorService,
jobExpectatorService JobExpectatorService,
) *JobRunHandler {
return &JobRunHandler{
l: l,
Expand All @@ -591,5 +648,6 @@ func NewJobRunHandler(
jobLineageService: jobLineageService,
jobSLAPredictorService: jobSLAPredictorService,
thirdPartySensorService: thirdPartySensorService,
jobExpectatorService: jobExpectatorService,
}
}
Loading
Loading