Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/goto/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "6c23cfa641ff762cb1082abc2a6edb89eccde42c"
PROTON_COMMIT := "abc30710e2586dcfe32f87536174f59e54f28dc4"


.PHONY: build test test-ci generate-proto unit-test-ci integration-test vet coverage clean install lint
Expand Down
3 changes: 3 additions & 0 deletions client/jsonschema/job.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
},
"type": "array"
},
"enable_dex_sensor": {
"type": "boolean"
},
"dependencies": {
"items": {
"$ref": "#/$defs/JobSpecDependency"
Expand Down
21 changes: 13 additions & 8 deletions client/local/model/job_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ type JobSpec struct {
Hooks []JobSpecHook `yaml:"hooks"`
Dependencies []JobSpecDependency `yaml:"dependencies"`
Metadata *JobSpecMetadata `yaml:"metadata,omitempty"`
Path string `yaml:"-"`

EnableDexSensor bool `yaml:"enable_dex_sensor,omitempty"`
Path string `yaml:"-"`
}

type JobSpecSchedule struct {
Expand Down Expand Up @@ -222,13 +224,14 @@ func (j *JobSpec) ToProto() *pb.JobSpecification {
Config: taskConfig,
Alert: j.Task.Alerts.GetOperatorAlertProto(),
},
Dependencies: j.getProtoJobDependencies(),
Assets: j.Asset,
Hooks: j.getProtoJobSpecHooks(),
Description: j.Description,
Labels: j.Labels,
Behavior: j.getProtoJobSpecBehavior(),
Metadata: j.getProtoJobMetadata(),
Dependencies: j.getProtoJobDependencies(),
Assets: j.Asset,
Hooks: j.getProtoJobSpecHooks(),
Description: j.Description,
Labels: j.Labels,
Behavior: j.getProtoJobSpecBehavior(),
Metadata: j.getProtoJobMetadata(),
EnableDexSensor: j.EnableDexSensor,
}

if js.Version < NewWindowVersion {
Expand Down Expand Up @@ -612,6 +615,8 @@ func ToJobSpec(protoSpec *pb.JobSpecification) *JobSpec {
Hooks: toJobSpecHooks(protoSpec.Hooks),
Dependencies: toJobSpecDependencies(protoSpec.Dependencies),
Metadata: toJobSpecMetadata(protoSpec.Metadata),

EnableDexSensor: protoSpec.EnableDexSensor,
}
}

Expand Down
23 changes: 14 additions & 9 deletions core/job/handler/v1beta1/job_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ func ToJobProto(jobEntity *job.Job) *pb.JobSpecification {
Version: spec.Task().Version(),
Config: fromConfig(spec.Task().Config()),
},
Dependencies: fromSpecUpstreams(spec.UpstreamSpec()),
Assets: fromAsset(spec.Asset()),
Hooks: fromHooks(spec.Hooks()),
Description: spec.Description(),
Labels: spec.Labels(),
Behavior: fromRetryAndAlerts(spec.Schedule().Retry(), spec.AlertSpecs()),
Metadata: fromMetadata(spec.Metadata()),
Destination: jobEntity.Destination().String(),
Sources: fromResourceURNs(jobEntity.Sources()),
Dependencies: fromSpecUpstreams(spec.UpstreamSpec()),
Assets: fromAsset(spec.Asset()),
Hooks: fromHooks(spec.Hooks()),
Description: spec.Description(),
Labels: spec.Labels(),
Behavior: fromRetryAndAlerts(spec.Schedule().Retry(), spec.AlertSpecs()),
Metadata: fromMetadata(spec.Metadata()),
Destination: jobEntity.Destination().String(),
Sources: fromResourceURNs(jobEntity.Sources()),
EnableDexSensor: jobEntity.Spec().IsDexSensorEnabled(),
}

if spec.Version() == window.NewWindowVersion {
Expand Down Expand Up @@ -151,6 +152,10 @@ func fromJobProto(js *pb.JobSpecification) (*job.Spec, error) {

jobSpecBuilder := job.NewSpecBuilder(version, name, owner, schedule, window, task).WithDescription(js.Description)

if js.EnableDexSensor {
jobSpecBuilder = jobSpecBuilder.WithDexSensor()
}

if js.Labels != nil {
labels := labels.FromMap(js.Labels)
jobSpecBuilder = jobSpecBuilder.WithLabels(labels)
Expand Down
5 changes: 5 additions & 0 deletions core/job/resolver/dex_upstream_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func (u *dexUpstreamResolver) BulkResolve(ctx context.Context, jobsWithUpstream
}

func (u *dexUpstreamResolver) Resolve(ctx context.Context, jobWithUpstream *job.WithUpstream, lw writer.LogWriter) (*job.WithUpstream, error) {
if !jobWithUpstream.Job().Spec().IsDexSensorEnabled() {
// skip DEX upstream resolution if dex sensor is not enabled for the job
return jobWithUpstream, nil
}

details, err := u.tenantDetailsGetter.GetDetails(ctx, jobWithUpstream.Job().Tenant())
if err != nil {
return jobWithUpstream, fmt.Errorf("failed to get tenant details for tenant %s: %w", jobWithUpstream.Job().Tenant().String(), err)
Expand Down
4 changes: 2 additions & 2 deletions core/job/resolver/dex_upstream_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func TestDexUpstreamResolver_Resolve(t *testing.T) {
jobTaskConfig, _ := job.ConfigFrom(map[string]string{"sample_task_key": "sample_value"})
jobTask := job.NewTask(taskName, jobTaskConfig, "", nil)
upstreamSpec, _ := job.NewSpecUpstreamBuilder().WithUpstreamNames([]job.SpecUpstreamName{"external-project/job-B"}).Build()
specA, _ := job.NewSpecBuilder(jobVersion, "job-A", "sample-owner", jobSchedule, jobWindow, jobTask).WithSpecUpstream(upstreamSpec).Build()
specB, _ := job.NewSpecBuilder(jobVersion, "job-B", "sample-owner", jobSchedule, jobWindow, jobTask).WithSpecUpstream(upstreamSpec).Build()
specA, _ := job.NewSpecBuilder(jobVersion, "job-A", "sample-owner", jobSchedule, jobWindow, jobTask).WithSpecUpstream(upstreamSpec).WithDexSensor().Build()
specB, _ := job.NewSpecBuilder(jobVersion, "job-B", "sample-owner", jobSchedule, jobWindow, jobTask).WithSpecUpstream(upstreamSpec).WithDexSensor().Build()
resourceURNC, err := resource.ParseURN("store://resource-C")
assert.NoError(t, err)
resourceURND, err := resource.ParseURN("store://resource-D")
Expand Down
26 changes: 18 additions & 8 deletions core/job/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@ type Spec struct {
windowConfig window.Config
task Task

description string
labels labels.Labels
metadata *Metadata
hooks []*Hook
asset Asset
alertSpecs []*AlertSpec
webhook []*WebhookSpec
upstreamSpec *UpstreamSpec
description string
labels labels.Labels
metadata *Metadata
hooks []*Hook
asset Asset
alertSpecs []*AlertSpec
webhook []*WebhookSpec
upstreamSpec *UpstreamSpec
enableDexSensor bool
}

func (s *Spec) IsDexSensorEnabled() bool {
return s.enableDexSensor
}

func (s *Spec) Version() int {
Expand Down Expand Up @@ -178,6 +183,11 @@ func (s *SpecBuilder) WithLabels(labels labels.Labels) *SpecBuilder {
return s
}

func (s *SpecBuilder) WithDexSensor() *SpecBuilder {
s.spec.enableDexSensor = true
return s
}

func (s *SpecBuilder) WithDescription(description string) *SpecBuilder {
s.spec.description = description
return s
Expand Down
6 changes: 5 additions & 1 deletion internal/store/postgres/job/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type Spec struct {
UpdatedAt time.Time
DeletedAt sql.NullTime

EnableDexSensor bool

IsDirty bool
}

Expand Down Expand Up @@ -257,6 +259,8 @@ func toStorageSpec(jobEntity *job.Job) (*Spec, error) {

ProjectName: jobEntity.Tenant().ProjectName().String(),
NamespaceName: jobEntity.Tenant().NamespaceName().String(),

EnableDexSensor: jobEntity.Spec().IsDexSensorEnabled(),
}, nil
}

Expand Down Expand Up @@ -832,7 +836,7 @@ func FromRow(row pgx.Row) (*Spec, error) {
err := row.Scan(&js.ID, &js.State, &js.Name, &js.Version, &js.Owner, &js.Description,
&js.Labels, &js.Schedule, &js.Alert, &js.Webhook, &js.StaticUpstreams, &js.HTTPUpstreams,
&js.TaskName, &js.TaskConfig, &js.WindowSpec, &js.Assets, &js.Hooks, &js.Metadata, &js.Destination, &js.Sources,
&js.ProjectName, &js.NamespaceName, &js.CreatedAt, &js.UpdatedAt, &js.DeletedAt, &js.IsDirty)
&js.ProjectName, &js.NamespaceName, &js.EnableDexSensor, &js.CreatedAt, &js.UpdatedAt, &js.DeletedAt, &js.IsDirty)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, errors.NotFound(job.EntityJob, "job not found")
Expand Down
10 changes: 5 additions & 5 deletions internal/store/postgres/job/job_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

const (
jobColumnsToStore = `name, version, owner, description, labels, schedule, alert, webhook, static_upstreams, http_upstreams,
task_name, task_config, window_spec, assets, hooks, metadata, destination, sources, project_name, namespace_name, created_at, updated_at`
task_name, task_config, window_spec, assets, hooks, metadata, destination, sources, project_name, namespace_name, dex_sensor, created_at, updated_at`

jobColumns = `id, state, ` + jobColumnsToStore + `, deleted_at, is_dirty`
)
Expand Down Expand Up @@ -80,14 +80,14 @@ func (j JobRepository) triggerInsert(ctx context.Context, jobEntity *job.Job) er

insertJobQuery := `INSERT INTO job (` + jobColumnsToStore + `)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16,
$17, $18, $19, $20, NOW(), NOW());`
$17, $18, $19, $20, $21, NOW(), NOW());`

tag, err := j.db.Exec(ctx, insertJobQuery,
storageJob.Name, storageJob.Version, storageJob.Owner, storageJob.Description, storageJob.Labels,
storageJob.Schedule, storageJob.Alert, storageJob.Webhook, storageJob.StaticUpstreams, storageJob.HTTPUpstreams,
storageJob.TaskName, storageJob.TaskConfig, storageJob.WindowSpec, storageJob.Assets,
storageJob.Hooks, storageJob.Metadata, storageJob.Destination, storageJob.Sources,
storageJob.ProjectName, storageJob.NamespaceName)
storageJob.ProjectName, storageJob.NamespaceName, storageJob.EnableDexSensor)
if err != nil {
return errors.Wrap(job.EntityJob, "unable to save job spec", err)
}
Expand Down Expand Up @@ -374,7 +374,7 @@ func (j JobRepository) triggerUpdate(ctx context.Context, jobEntity *job.Job) er
UPDATE job SET
version = $1, owner = $2, description = $3, labels = $4, schedule = $5, alert = $6,
static_upstreams = $7, http_upstreams = $8, task_name = $9, task_config = $10,
window_spec = $11, assets = $12, hooks = $13, metadata = $14, destination = $15, sources = $16, webhook = $19,
window_spec = $11, assets = $12, hooks = $13, metadata = $14, destination = $15, sources = $16, webhook = $19, dex_sensor = $20,
updated_at = NOW(), deleted_at = null
WHERE
name = $17 AND
Expand All @@ -386,7 +386,7 @@ WHERE
storageJob.StaticUpstreams, storageJob.HTTPUpstreams, storageJob.TaskName, storageJob.TaskConfig,
storageJob.WindowSpec, storageJob.Assets, storageJob.Hooks, storageJob.Metadata,
storageJob.Destination, storageJob.Sources,
storageJob.Name, storageJob.ProjectName, storageJob.Webhook)
storageJob.Name, storageJob.ProjectName, storageJob.Webhook, storageJob.EnableDexSensor)
if err != nil {
return errors.Wrap(job.EntityJob, "unable to update job spec", err)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table job drop COLUMN if EXISTS dex_sensor;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE job
ADD COLUMN if not EXISTS dex_sensor boolean DEFAULT FALSE;
Loading
Loading