diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index 2cd958780cb..849e9940486 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -52,10 +52,11 @@ const ( ) type metrics struct { - pluginPanics labeled.Counter - unsupportedTaskType labeled.Counter - pluginExecutionLatency labeled.StopWatch - pluginQueueLatency labeled.StopWatch + pluginPanics labeled.Counter + unsupportedTaskType labeled.Counter + pluginExecutionLatency labeled.StopWatch + pluginQueueLatency labeled.StopWatch + pluginInitializeLatency labeled.StopWatch // TODO We should have a metric to capture custom state size scope promutils.Scope @@ -553,6 +554,14 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta } } + // Emit the initializing latency if the task has just transitioned from Initializing to Running. + if ts.PluginPhase == pluginCore.PhaseInitializing && + pluginTrns.pInfo.Phase() == pluginCore.PhaseRunning { + if !ts.LastPhaseUpdatedAt.IsZero() { + t.metrics.pluginInitializeLatency.Observe(ctx, ts.LastPhaseUpdatedAt, time.Now()) + } + } + if pluginTrns.pInfo.Phase() == ts.PluginPhase { if pluginTrns.pInfo.Version() == ts.PluginPhaseVersion { logger.Debugf(ctx, "p+Version previously seen .. no event will be sent") @@ -800,12 +809,20 @@ func (t Handler) Handle(ctx context.Context, nCtx interfaces.NodeExecutionContex } // STEP 6: Persist the plugin state + // Only refresh LastPhaseUpdatedAt when the phase itself changes; intra-phase + // version bumps (e.g. emitted by MaybeUpdatePhaseVersion when a plugin's + // reason string changes) must not reset it, otherwise duration metrics like + // plugin_queue_latency lose their start anchor and undercount. + lastPhaseUpdatedAt := ts.LastPhaseUpdatedAt + if ts.PluginPhase != pluginTrns.pInfo.Phase() { + lastPhaseUpdatedAt = time.Now() + } err = nCtx.NodeStateWriter().PutTaskNodeState(handler.TaskNodeState{ PluginState: pluginTrns.pluginState, PluginStateVersion: pluginTrns.pluginStateVersion, PluginPhase: pluginTrns.pInfo.Phase(), PluginPhaseVersion: pluginTrns.pInfo.Version(), - LastPhaseUpdatedAt: time.Now(), + LastPhaseUpdatedAt: lastPhaseUpdatedAt, PreviousNodeExecutionCheckpointURI: ts.PreviousNodeExecutionCheckpointURI, CleanupOnFailure: ts.CleanupOnFailure || pluginTrns.pInfo.CleanupOnFailure(), }) @@ -1060,11 +1077,12 @@ func New(ctx context.Context, kubeClient executors.Client, kubeClientset kuberne pluginsForType: make(map[pluginCore.TaskType]map[pluginID]pluginCore.Plugin), taskMetricsMap: make(map[MetricKey]*taskMetrics), metrics: &metrics{ - pluginPanics: labeled.NewCounter("plugin_panic", "Task plugin panicked when trying to execute a Handler.", scope), - unsupportedTaskType: labeled.NewCounter("unsupported_tasktype", "No Handler plugin configured for Handler type", scope), - pluginExecutionLatency: labeled.NewStopWatch("plugin_exec_latency", "Time taken to invoke plugin for one round", time.Microsecond, scope), - pluginQueueLatency: labeled.NewStopWatch("plugin_queue_latency", "Time spent by plugin in queued phase", time.Microsecond, scope), - scope: scope, + pluginPanics: labeled.NewCounter("plugin_panic", "Task plugin panicked when trying to execute a Handler.", scope), + unsupportedTaskType: labeled.NewCounter("unsupported_tasktype", "No Handler plugin configured for Handler type", scope), + pluginExecutionLatency: labeled.NewStopWatch("plugin_exec_latency", "Time taken to invoke plugin for one round", time.Microsecond, scope), + pluginQueueLatency: labeled.NewStopWatch("plugin_queue_latency", "Time spent by plugin in queued phase", time.Microsecond, scope), + pluginInitializeLatency: labeled.NewStopWatch("plugin_initialize_latency", "Time spent by plugin in initializing phase", time.Microsecond, scope), + scope: scope, }, pluginScope: scope.NewSubScope("plugin"), kubeClient: kubeClient, diff --git a/flytepropeller/pkg/controller/nodes/task/handler_test.go b/flytepropeller/pkg/controller/nodes/task/handler_test.go index 8f51627fb8c..d36e0eac048 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/task/handler_test.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/golang/protobuf/proto" "github.com/pkg/errors" @@ -1313,3 +1314,264 @@ func Test_task_Handle_ValidateOutputErr(t *testing.T) { assert.NoError(t, err) assert.False(t, result.IsRecoverable) } + +// Test_task_Handle_LastPhaseUpdatedAt verifies that the persisted +// LastPhaseUpdatedAt is only refreshed when the plugin's reported phase +// actually changes. Intra-phase version bumps (which MaybeUpdatePhaseVersion +// produces when a plugin's reason string changes — e.g. as a Pending pod +// progresses through scheduler reasons during autoscaling) must preserve +// the timestamp. +// +// The plugin_queue_latency and plugin_initialize_latency metrics anchor +// off this timestamp; if version bumps reset it, both metrics undercount +// and only capture the final segment before exiting the phase. +func Test_task_Handle_LastPhaseUpdatedAt(t *testing.T) { + inputs := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "foo": coreutils.MustMakeLiteral("bar"), + }, + } + + createNodeContext := func( + startingPhase pluginCore.Phase, + startingVersion uint32, + startingLastPhaseUpdatedAt time.Time, + pluginResp fakeplugins.NextPhaseState, + recorder interfaces.EventRecorder, + s *taskNodeStateHolder, + ) *nodeMocks.NodeExecutionContext { + wfExecID := &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + } + nodeID := "n1" + + nm := &nodeMocks.NodeExecutionMetadata{} + nm.EXPECT().GetAnnotations().Return(map[string]string{}) + nm.EXPECT().GetNodeExecutionID().Return(&core.NodeExecutionIdentifier{ + NodeId: nodeID, + ExecutionId: wfExecID, + }) + nm.EXPECT().GetK8sServiceAccount().Return("service-account") + nm.EXPECT().GetLabels().Return(map[string]string{}) + nm.EXPECT().GetNamespace().Return("namespace") + nm.EXPECT().GetOwnerID().Return(types.NamespacedName{Namespace: "namespace", Name: "name"}) + nm.EXPECT().GetOwnerReference().Return(v12.OwnerReference{ + Kind: "sample", + Name: "name", + }) + nm.EXPECT().IsInterruptible().Return(false) + + tk := &core.TaskTemplate{ + Id: &core.Identifier{ResourceType: core.ResourceType_TASK, Project: "proj", Domain: "dom", Version: "ver"}, + Type: "test", + Metadata: &core.TaskMetadata{ + Discoverable: false, + }, + Interface: &core.TypedInterface{ + Inputs: &core.VariableMap{ + Variables: map[string]*core.Variable{ + "foo": { + Type: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_STRING, + }, + }, + }, + }, + }, + Outputs: &core.VariableMap{ + Variables: map[string]*core.Variable{ + "x": { + Type: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_BOOLEAN, + }, + }, + }, + }, + }, + }, + } + taskID := &core.Identifier{} + tr := &nodeMocks.TaskReader{} + tr.EXPECT().GetTaskID().Return(taskID) + tr.EXPECT().GetTaskType().Return("test") + tr.EXPECT().Read(mock.Anything).Return(tk, nil) + + ns := &flyteMocks.ExecutableNodeStatus{} + ns.EXPECT().GetDataDir().Return("data-dir") + ns.EXPECT().GetOutputDir().Return("data-dir") + + res := &v1.ResourceRequirements{} + n := &flyteMocks.ExecutableNode{} + ma := 5 + n.EXPECT().GetRetryStrategy().Return(&v1alpha1.RetryStrategy{MinAttempts: &ma}) + n.EXPECT().GetResources().Return(res) + + ir := &ioMocks.InputReader{} + ir.EXPECT().GetInputPath().Return("input") + ir.EXPECT().Get(mock.Anything).Return(inputs, nil) + nCtx := &nodeMocks.NodeExecutionContext{} + nCtx.EXPECT().NodeExecutionMetadata().Return(nm) + nCtx.EXPECT().Node().Return(n) + nCtx.EXPECT().InputReader().Return(ir) + ds, err := storage.NewDataStore( + &storage.Config{ + Type: storage.TypeMemory, + }, + promutils.NewTestScope(), + ) + assert.NoError(t, err) + nCtx.EXPECT().DataStore().Return(ds) + nCtx.EXPECT().CurrentAttempt().Return(uint32(1)) + nCtx.EXPECT().TaskReader().Return(tr) + nCtx.EXPECT().NodeStatus().Return(ns) + nCtx.EXPECT().NodeID().Return(nodeID) + nCtx.EXPECT().EventsRecorder().Return(recorder) + nCtx.EXPECT().EnqueueOwnerFunc().Return(nil) + + nCtx.EXPECT().RawOutputPrefix().Return("s3://sandbox/") + nCtx.EXPECT().OutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"})) + + executionContext := &mocks.ExecutionContext{} + executionContext.EXPECT().GetExecutionConfig().Return(v1alpha1.ExecutionConfig{}) + executionContext.EXPECT().GetEventVersion().Return(v1alpha1.EventVersion0) + executionContext.EXPECT().GetParentInfo().Return(nil) + executionContext.EXPECT().IncrementParallelism().Return(1) + nCtx.EXPECT().ExecutionContext().Return(executionContext) + + st := bytes.NewBuffer([]byte{}) + cod := codex.GobStateCodec{} + assert.NoError(t, cod.Encode(pluginResp, st)) + nr := &nodeMocks.NodeStateReader{} + nr.EXPECT().GetTaskNodeState().Return(handler.TaskNodeState{ + PluginState: st.Bytes(), + PluginPhase: startingPhase, + PluginPhaseVersion: startingVersion, + LastPhaseUpdatedAt: startingLastPhaseUpdatedAt, + }) + nCtx.EXPECT().NodeStateReader().Return(nr) + nCtx.EXPECT().NodeStateWriter().Return(s) + return nCtx + } + + noopRm := CreateNoopResourceManager(context.TODO(), promutils.NewTestScope()) + referenceTime := time.Now().Add(-time.Hour) + + type args struct { + startingPhase pluginCore.Phase + startingVersion uint32 + nextPhase pluginCore.Phase + nextVersion uint32 + } + + tests := []struct { + name string + args args + timestampPreserved bool + }{ + { + name: "queued same phase, version bump preserves timestamp", + args: args{ + startingPhase: pluginCore.PhaseQueued, + startingVersion: 0, + nextPhase: pluginCore.PhaseQueued, + nextVersion: 1, + }, + timestampPreserved: true, + }, + { + name: "queued -> initializing refreshes timestamp", + args: args{ + startingPhase: pluginCore.PhaseQueued, + startingVersion: 0, + nextPhase: pluginCore.PhaseInitializing, + nextVersion: 0, + }, + timestampPreserved: false, + }, + { + name: "initializing same phase, version bump preserves timestamp", + args: args{ + startingPhase: pluginCore.PhaseInitializing, + startingVersion: 0, + nextPhase: pluginCore.PhaseInitializing, + nextVersion: 1, + }, + timestampPreserved: true, + }, + { + name: "initializing -> running refreshes timestamp", + args: args{ + startingPhase: pluginCore.PhaseInitializing, + startingVersion: 0, + nextPhase: pluginCore.PhaseRunning, + nextVersion: 0, + }, + timestampPreserved: false, + }, + { + name: "queued -> running refreshes timestamp", + args: args{ + startingPhase: pluginCore.PhaseQueued, + startingVersion: 0, + nextPhase: pluginCore.PhaseRunning, + nextVersion: 0, + }, + timestampPreserved: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scope := promutils.NewTestScope() + state := &taskNodeStateHolder{} + ev := &fakeBufferedEventRecorder{} + nCtx := createNodeContext( + tt.args.startingPhase, + tt.args.startingVersion, + referenceTime, + fakeplugins.NextPhaseState{ + Phase: tt.args.nextPhase, + PhaseVersion: tt.args.nextVersion, + TaskInfo: &pluginCore.TaskInfo{}, + }, + ev, + state, + ) + tk := Handler{ + cfg: &config.Config{MaxErrorMessageLength: 100, MaxPluginPhaseVersions: 100000}, + defaultPlugins: map[pluginCore.TaskType]pluginCore.Plugin{ + "test": fakeplugins.NewPhaseBasedPlugin(), + }, + pluginScope: scope, + catalog: &pluginCatalogMocks.Client{}, + resourceManager: noopRm, + taskMetricsMap: make(map[MetricKey]*taskMetrics), + metrics: &metrics{ + pluginPanics: labeled.NewCounter("plugin_panic", "x", scope), + unsupportedTaskType: labeled.NewCounter("unsupported_tasktype", "x", scope), + pluginExecutionLatency: labeled.NewStopWatch("plugin_exec_latency", "x", time.Microsecond, scope), + pluginQueueLatency: labeled.NewStopWatch("plugin_queue_latency", "x", time.Microsecond, scope), + pluginInitializeLatency: labeled.NewStopWatch("plugin_initialize_latency", "x", time.Microsecond, scope), + scope: scope, + }, + eventConfig: eventConfig, + agentService: &agent.AgentService{}, + } + _, err := tk.Handle(context.TODO(), nCtx) + assert.NoError(t, err) + + if tt.timestampPreserved { + assert.Equal(t, referenceTime, state.s.LastPhaseUpdatedAt, + "LastPhaseUpdatedAt must be preserved across intra-phase version bumps") + } else { + assert.True(t, state.s.LastPhaseUpdatedAt.After(referenceTime), + "LastPhaseUpdatedAt must refresh on phase change (got %v, expected after %v)", + state.s.LastPhaseUpdatedAt, referenceTime) + } + }) + } +} diff --git a/go.mod b/go.mod index fdb92eeca4a..ee44b565d75 100644 --- a/go.mod +++ b/go.mod @@ -5,21 +5,14 @@ go 1.26.0 require ( github.com/flyteorg/flyte/datacatalog v0.0.0-00010101000000-000000000000 github.com/flyteorg/flyte/flyteadmin v0.0.0-00010101000000-000000000000 - github.com/flyteorg/flyte/flyteidl v0.0.0-00010101000000-000000000000 - github.com/flyteorg/flyte/flyteplugins v0.0.0-00010101000000-000000000000 github.com/flyteorg/flyte/flytepropeller v0.0.0-00010101000000-000000000000 github.com/flyteorg/flyte/flytestdlib v0.0.0-00010101000000-000000000000 github.com/golang/glog v1.2.5 - github.com/kubeflow/spark-operator v0.0.0-20250325114751-1905be6e1dbd github.com/prometheus/client_golang v1.23.0 github.com/spf13/cobra v1.9.1 github.com/spf13/pflag v1.0.10 - github.com/stretchr/testify v1.11.1 golang.org/x/sync v0.20.0 - google.golang.org/protobuf v1.36.11 gorm.io/driver/postgres v1.5.3 - k8s.io/api v0.34.1 - k8s.io/apimachinery v0.34.1 sigs.k8s.io/controller-runtime v0.22.4 ) @@ -89,6 +82,8 @@ require ( github.com/evanphx/json-patch/v5 v5.9.11 // indirect github.com/fatih/color v1.18.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/flyteorg/flyte/flyteidl v0.0.0-00010101000000-000000000000 // indirect + github.com/flyteorg/flyte/flyteplugins v0.0.0-00010101000000-000000000000 // indirect github.com/flyteorg/stow v0.3.12 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect @@ -143,6 +138,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/kelseyhightower/envconfig v1.4.0 // indirect github.com/klauspost/compress v1.18.0 // indirect + github.com/kubeflow/spark-operator v0.0.0-20250325114751-1905be6e1dbd // indirect github.com/kubeflow/training-operator v1.8.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/lestrrat-go/backoff/v2 v2.0.8 // indirect @@ -199,6 +195,7 @@ require ( github.com/spf13/viper v1.21.0 // indirect github.com/spiffe/go-spiffe/v2 v2.6.0 // indirect github.com/stretchr/objx v0.5.2 // indirect + github.com/stretchr/testify v1.11.1 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/tidwall/gjson v1.18.0 // indirect github.com/tidwall/match v1.1.1 // indirect @@ -243,6 +240,7 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect google.golang.org/grpc v1.80.0 // indirect + google.golang.org/protobuf v1.36.11 // indirect gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect @@ -256,7 +254,9 @@ require ( gorm.io/driver/sqlite v1.5.4 // indirect gorm.io/gorm v1.25.4 // indirect gorm.io/plugin/opentelemetry v0.1.4 // indirect + k8s.io/api v0.34.1 // indirect k8s.io/apiextensions-apiserver v0.34.1 // indirect + k8s.io/apimachinery v0.34.1 // indirect k8s.io/client-go v0.34.1 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20250814151709-d7b6acb124c3 // indirect