diff --git a/charts/flyte-binary/templates/crds/flyte.org_taskactions.yaml b/charts/flyte-binary/templates/crds/flyte.org_taskactions.yaml index d73c7e8a871..bffc5164d8a 100644 --- a/charts/flyte-binary/templates/crds/flyte.org_taskactions.yaml +++ b/charts/flyte-binary/templates/crds/flyte.org_taskactions.yaml @@ -296,6 +296,15 @@ spec: description: StateJSON is the JSON serialized NodeStatus that was last sent to the State Service type: string + systemFailures: + description: |- + SystemFailures counts system-level failures observed during reconciliation — + either Go errors returned from Plugin.Handle (e.g. transient k8s API errors, + admission webhook denials) or plugin transitions reporting a system-retryable + failure (e.g. resource deleted externally). When it exceeds the configured + maximum, the TaskAction is converted to a permanent failure. + format: int32 + type: integer type: object required: - spec diff --git a/docker/devbox-bundled/manifests/complete.yaml b/docker/devbox-bundled/manifests/complete.yaml index 899c9315892..317edbc42ba 100644 --- a/docker/devbox-bundled/manifests/complete.yaml +++ b/docker/devbox-bundled/manifests/complete.yaml @@ -7038,6 +7038,15 @@ spec: description: StateJSON is the JSON serialized NodeStatus that was last sent to the State Service type: string + systemFailures: + description: |- + SystemFailures counts system-level failures observed during reconciliation — + either Go errors returned from Plugin.Handle (e.g. transient k8s API errors, + admission webhook denials) or plugin transitions reporting a system-retryable + failure (e.g. resource deleted externally). When it exceeds the configured + maximum, the TaskAction is converted to a permanent failure. + format: int32 + type: integer type: object required: - spec diff --git a/docker/devbox-bundled/manifests/dev.yaml b/docker/devbox-bundled/manifests/dev.yaml index 5335a610509..faea15c0d25 100644 --- a/docker/devbox-bundled/manifests/dev.yaml +++ b/docker/devbox-bundled/manifests/dev.yaml @@ -7038,6 +7038,15 @@ spec: description: StateJSON is the JSON serialized NodeStatus that was last sent to the State Service type: string + systemFailures: + description: |- + SystemFailures counts system-level failures observed during reconciliation — + either Go errors returned from Plugin.Handle (e.g. transient k8s API errors, + admission webhook denials) or plugin transitions reporting a system-retryable + failure (e.g. resource deleted externally). When it exceeds the configured + maximum, the TaskAction is converted to a permanent failure. + format: int32 + type: integer type: object required: - spec diff --git a/executor/api/v1/taskaction_types.go b/executor/api/v1/taskaction_types.go index c15a15981ef..43aa8c05ea5 100644 --- a/executor/api/v1/taskaction_types.go +++ b/executor/api/v1/taskaction_types.go @@ -262,6 +262,14 @@ type TaskActionStatus struct { // +optional Attempts uint32 `json:"attempts,omitempty"` + // SystemFailures counts system-level failures observed during reconciliation — + // either Go errors returned from Plugin.Handle (e.g. transient k8s API errors, + // admission webhook denials) or plugin transitions reporting a system-retryable + // failure (e.g. resource deleted externally). When it exceeds the configured + // maximum, the TaskAction is converted to a permanent failure. + // +optional + SystemFailures uint32 `json:"systemFailures,omitempty"` + // CacheStatus is the latest observed cache lookup result for this action. // +optional CacheStatus core.CatalogCacheStatus `json:"cacheStatus,omitempty"` diff --git a/executor/config/crd/bases/flyte.org_taskactions.yaml b/executor/config/crd/bases/flyte.org_taskactions.yaml index d73c7e8a871..bffc5164d8a 100644 --- a/executor/config/crd/bases/flyte.org_taskactions.yaml +++ b/executor/config/crd/bases/flyte.org_taskactions.yaml @@ -296,6 +296,15 @@ spec: description: StateJSON is the JSON serialized NodeStatus that was last sent to the State Service type: string + systemFailures: + description: |- + SystemFailures counts system-level failures observed during reconciliation — + either Go errors returned from Plugin.Handle (e.g. transient k8s API errors, + admission webhook denials) or plugin transitions reporting a system-retryable + failure (e.g. resource deleted externally). When it exceeds the configured + maximum, the TaskAction is converted to a permanent failure. + format: int32 + type: integer type: object required: - spec diff --git a/executor/pkg/config/config.go b/executor/pkg/config/config.go index 64e3ffe3952..ca4b47f9458 100644 --- a/executor/pkg/config/config.go +++ b/executor/pkg/config/config.go @@ -24,6 +24,7 @@ var ( EventsServiceURL: "http://localhost:8090", CacheServiceURL: "http://localhost:8094", Cluster: "", + MaxSystemFailures: 3, GC: GCConfig{ Interval: stdconfig.Duration{Duration: 30 * time.Minute}, MaxTTL: stdconfig.Duration{Duration: 1 * time.Hour}, @@ -78,6 +79,11 @@ type Config struct { // Cluster is the cluster identifier attached to action events. Cluster string `json:"cluster" pflag:",Cluster identifier for action events"` + // MaxSystemFailures bounds consecutive system-level failures (Plugin.Handle Go + // errors and plugin-reported system-retryable failures) before a TaskAction is + // converted to a permanent failure. + MaxSystemFailures uint32 `json:"maxSystemFailures" pflag:",Max consecutive system-level failures before forcing permanent failure"` + // GC configures the garbage collector for terminal TaskActions. GC GCConfig `json:"gc" pflag:",Garbage collector configuration for terminal TaskActions"` } diff --git a/executor/pkg/config/config_flags.go b/executor/pkg/config/config_flags.go index f7378733153..f098b8d874b 100755 --- a/executor/pkg/config/config_flags.go +++ b/executor/pkg/config/config_flags.go @@ -63,6 +63,7 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "enableHTTP2"), defaultConfig.EnableHTTP2, "Enable HTTP/2 for metrics and webhook servers") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "EventsServiceURL"), defaultConfig.EventsServiceURL, "URL of the Event Service for action event updates") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "cluster"), defaultConfig.Cluster, "Cluster identifier for action events") + cmdFlags.Uint32(fmt.Sprintf("%v%v", prefix, "maxSystemFailures"), defaultConfig.MaxSystemFailures, "Max consecutive system-level failures before forcing permanent failure") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "gc.interval"), defaultConfig.GC.Interval.String(), "How often the garbage collector runs. 0 disables GC.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "gc.maxTTL"), defaultConfig.GC.MaxTTL.String(), "Time-to-live for terminal TaskActions before deletion.") return cmdFlags diff --git a/executor/pkg/controller/taskaction_controller.go b/executor/pkg/controller/taskaction_controller.go index 02447566e3f..6119c99378c 100644 --- a/executor/pkg/controller/taskaction_controller.go +++ b/executor/pkg/controller/taskaction_controller.go @@ -55,6 +55,14 @@ const ( TaskActionDefaultRequeueDuration = 10 * time.Second taskActionFinalizer = "flyte.org/plugin-finalizer" + // DefaultMaxSystemFailures bounds consecutive system errors before the + // TaskAction is forced to PermanentFailure. + DefaultMaxSystemFailures uint32 = 3 + + // MaxSystemFailuresExceededCode is the ExecutionError.Code stamped on + // TaskActions that hit the system-failure ceiling. + MaxSystemFailuresExceededCode = "MaxSystemFailuresExceeded" + // LabelTerminationStatus marks a TaskAction as terminated for GC discovery. LabelTerminationStatus = "flyte.org/termination-status" // LabelCompletedTime records the UTC time (minute precision) when the TaskAction became terminal. @@ -77,16 +85,116 @@ const ( // TaskActionReconciler reconciles a TaskAction object type TaskActionReconciler struct { client.Client - Scheme *runtime.Scheme - Recorder record.EventRecorder - PluginRegistry *plugin.Registry - DataStore *storage.DataStore - SecretManager pluginsCore.SecretManager - ResourceManager pluginsCore.ResourceManager - CatalogClient catalog.AsyncClient - Catalog catalog.Client - eventsClient workflowconnect.EventsProxyServiceClient - cluster string + Scheme *runtime.Scheme + Recorder record.EventRecorder + PluginRegistry *plugin.Registry + DataStore *storage.DataStore + SecretManager pluginsCore.SecretManager + ResourceManager pluginsCore.ResourceManager + CatalogClient catalog.AsyncClient + Catalog catalog.Client + eventsClient workflowconnect.EventsProxyServiceClient + cluster string + MaxSystemFailures uint32 +} + +// isSystemRetryableFailure reports whether the plugin transition is a +// PhaseRetryableFailure with kind=SYSTEM (as produced by PhaseInfoSystemRetryableFailure). +func isSystemRetryableFailure(phaseInfo pluginsCore.PhaseInfo) bool { + if phaseInfo.Phase() != pluginsCore.PhaseRetryableFailure { + return false + } + execErr := phaseInfo.Err() + return execErr != nil && execErr.GetKind() == core.ExecutionError_SYSTEM +} + +func systemErrorFromPhaseInfo(phaseInfo pluginsCore.PhaseInfo) error { + if execErr := phaseInfo.Err(); execErr != nil { + return fmt.Errorf("[%s] %s", execErr.GetCode(), execErr.GetMessage()) + } + return fmt.Errorf("system retryable failure") +} + +func (r *TaskActionReconciler) maxSystemFailures() uint32 { + if r.MaxSystemFailures == 0 { + return DefaultMaxSystemFailures + } + return r.MaxSystemFailures +} + +// resetPluginResource aborts any in-flight plugin resource and clears persisted +// plugin state so the next reconcile starts fresh from PluginPhaseNotStarted. +func (r *TaskActionReconciler) resetPluginResource( + ctx context.Context, + taskAction *flyteorgv1.TaskAction, + p pluginsCore.Plugin, + tCtx pluginsCore.TaskExecutionContext, +) { + if abortErr := p.Abort(ctx, tCtx); abortErr != nil { + log.FromContext(ctx).Error(abortErr, "failed to abort during system-error reset", "plugin", p.GetID()) + } + taskAction.Status.PluginState = nil + taskAction.Status.PluginStateVersion = 0 +} + +// recordSystemError increments Status.SystemFailures and either requeues for +// another attempt or, once the configured threshold is exceeded, converts the +// TaskAction to a permanent failure. It does not touch the underlying plugin +// resource — callers that observed a failure phase should invoke +// resetPluginResource first. +func (r *TaskActionReconciler) recordSystemError( + ctx context.Context, + taskAction *flyteorgv1.TaskAction, + original *flyteorgv1.TaskAction, + pluginID string, + handleErr error, +) (ctrl.Result, error) { + logger := log.FromContext(ctx) + logger.Error(handleErr, "system error from plugin", "plugin", pluginID) + if r.Recorder != nil { + r.Recorder.Eventf(taskAction, corev1.EventTypeWarning, string(FailedPluginHandle), + "Plugin %q system error: %v", pluginID, handleErr) + } + + taskAction.Status.SystemFailures++ + if taskAction.Status.SystemFailures > r.maxSystemFailures() { + execErr := &core.ExecutionError{ + Kind: core.ExecutionError_SYSTEM, + Code: MaxSystemFailuresExceededCode, + Message: fmt.Sprintf("plugin %q failed with system error %d times: %v", pluginID, taskAction.Status.SystemFailures, handleErr), + } + return r.finalizePermanentFailure(ctx, taskAction, original, execErr) + } + + if taskActionStatusChanged(original.Status, taskAction.Status) { + if updErr := r.Status().Update(ctx, taskAction); updErr != nil { + logger.Error(updErr, "failed to persist SystemFailures counter") + } + } + return ctrl.Result{RequeueAfter: TaskActionDefaultRequeueDuration}, nil +} + +// finalizePermanentFailure converts the TaskAction to a terminal PermanentFailure +// with the given ExecutionError and stamps GC labels. +func (r *TaskActionReconciler) finalizePermanentFailure( + ctx context.Context, + taskAction *flyteorgv1.TaskAction, + original *flyteorgv1.TaskAction, + execErr *core.ExecutionError, +) (ctrl.Result, error) { + phaseInfo := pluginsCore.PhaseInfoFailed(pluginsCore.PhasePermanentFailure, execErr, nil) + mapPhaseToConditions(taskAction, phaseInfo) + taskAction.Status.PluginPhase = phaseInfo.Phase().String() + taskAction.Status.PluginPhaseVersion = phaseInfo.Version() + if updErr := r.updateTaskActionStatus(ctx, original, taskAction, phaseInfo); updErr != nil { + return ctrl.Result{}, updErr + } + if isTerminal(taskAction) { + if labelErr := r.ensureTerminalLabels(ctx, taskAction); labelErr != nil { + return ctrl.Result{}, labelErr + } + } + return ctrl.Result{}, nil } // NewTaskActionReconciler creates a new TaskActionReconciler @@ -203,10 +311,7 @@ func (r *TaskActionReconciler) Reconcile(ctx context.Context, req ctrl.Request) if !cacheShortCircuited { transition, err = p.Handle(ctx, tCtx) if err != nil { - logger.Error(err, "plugin Handle failed", "plugin", p.GetID()) - r.Recorder.Eventf(taskAction, corev1.EventTypeWarning, string(FailedPluginHandle), - "Plugin %q Handle failed: %v", p.GetID(), err) - return ctrl.Result{RequeueAfter: TaskActionDefaultRequeueDuration}, nil + return r.recordSystemError(ctx, taskAction, originalTaskActionInstance, p.GetID(), err) } } @@ -218,8 +323,18 @@ func (r *TaskActionReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Map transition phase to TaskAction conditions phaseInfo := transition.Info() - // In-place task restart: when a recoverable failure occurs, restart the pod within the - // same TaskAction rather than relying on the runs service to create a new TaskAction. + if !cacheShortCircuited && isSystemRetryableFailure(phaseInfo) { + r.resetPluginResource(ctx, taskAction, p, tCtx) + return r.recordSystemError(ctx, taskAction, originalTaskActionInstance, p.GetID(), systemErrorFromPhaseInfo(phaseInfo)) + } + + // Reset the consecutive system-failure counter on any non-system-error + // transition so transient blips earlier in the lifecycle don't accumulate + // toward the permanent-failure threshold. + taskAction.Status.SystemFailures = 0 + + // In-place task restart on USER-kind retryable failure: bump Status.Attempts + // and relaunch the pod under the same TaskAction. var restartAttempts uint32 if !cacheShortCircuited && phaseInfo.Phase() == pluginsCore.PhaseRetryableFailure { currentAttempts := observedAttempts(taskAction) @@ -627,6 +742,7 @@ func taskActionStatusChanged(oldStatus, newStatus flyteorgv1.TaskActionStatus) b oldStatus.PluginPhase != newStatus.PluginPhase || oldStatus.PluginPhaseVersion != newStatus.PluginPhaseVersion || oldStatus.Attempts != newStatus.Attempts || + oldStatus.SystemFailures != newStatus.SystemFailures || oldStatus.CacheStatus != newStatus.CacheStatus { return true } diff --git a/executor/pkg/controller/taskaction_controller_test.go b/executor/pkg/controller/taskaction_controller_test.go index aaac2187bc4..bbdd045d6fa 100644 --- a/executor/pkg/controller/taskaction_controller_test.go +++ b/executor/pkg/controller/taskaction_controller_test.go @@ -18,6 +18,7 @@ package controller import ( "context" + stderrors "errors" "sync" "time" @@ -40,6 +41,27 @@ import ( "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/workflow" ) +var errFakeWebhookDenied = stderrors.New(`admission webhook denied: secret "test1" not found`) + +// fakePlugin is a minimal Plugin implementation for unit tests. +type fakePlugin struct { + id string + abortCalls int +} + +func (f *fakePlugin) GetID() string { return f.id } +func (f *fakePlugin) GetProperties() pluginsCore.PluginProperties { return pluginsCore.PluginProperties{} } +func (f *fakePlugin) Handle(_ context.Context, _ pluginsCore.TaskExecutionContext) (pluginsCore.Transition, error) { + return pluginsCore.UnknownTransition, nil +} +func (f *fakePlugin) Abort(_ context.Context, _ pluginsCore.TaskExecutionContext) error { + f.abortCalls++ + return nil +} +func (f *fakePlugin) Finalize(_ context.Context, _ pluginsCore.TaskExecutionContext) error { + return nil +} + // fakeEventsClient is a no-op implementation of EventsProxyServiceClient for tests. type fakeEventsClient struct{} @@ -292,6 +314,148 @@ var _ = Describe("TaskAction Controller", func() { }) }) + Context("maxSystemFailures", func() { + It("returns the default when MaxSystemFailures is zero", func() { + r := &TaskActionReconciler{} + Expect(r.maxSystemFailures()).To(Equal(DefaultMaxSystemFailures)) + }) + + It("returns the configured value when set", func() { + r := &TaskActionReconciler{MaxSystemFailures: 7} + Expect(r.maxSystemFailures()).To(Equal(uint32(7))) + }) + }) + + Context("recordSystemError", func() { + const handleErrResource = "handle-err-resource" + ctx := context.Background() + nn := types.NamespacedName{Name: handleErrResource, Namespace: "default"} + + BeforeEach(func() { + resource := &flyteorgv1.TaskAction{ + ObjectMeta: metav1.ObjectMeta{Name: handleErrResource, Namespace: "default"}, + Spec: flyteorgv1.TaskActionSpec{ + RunName: "test-run", + Project: "test-project", + Domain: "test-domain", + ActionName: "test-action", + InputURI: "/tmp/input", + RunOutputBase: "/tmp/output", + TaskType: "python-task", + TaskTemplate: buildTaskTemplateBytes("python-task", "python:3.11"), + }, + } + Expect(k8sClient.Create(ctx, resource)).To(Succeed()) + }) + + AfterEach(func() { + resource := &flyteorgv1.TaskAction{} + if err := k8sClient.Get(ctx, nn, resource); err == nil { + resource.Finalizers = nil + _ = k8sClient.Update(ctx, resource) + _ = k8sClient.Delete(ctx, resource) + } + }) + + It("increments SystemFailures and requeues without consuming user retries", func() { + r := &TaskActionReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: record.NewFakeRecorder(10), + MaxSystemFailures: 3, + } + ta := &flyteorgv1.TaskAction{} + Expect(k8sClient.Get(ctx, nn, ta)).To(Succeed()) + original := ta.DeepCopy() + startingAttempts := ta.Status.Attempts + + res, err := r.recordSystemError(ctx, ta, original, "pod", errFakeWebhookDenied) + Expect(err).NotTo(HaveOccurred()) + Expect(res.RequeueAfter).To(Equal(TaskActionDefaultRequeueDuration)) + Expect(ta.Status.SystemFailures).To(Equal(uint32(1))) + Expect(ta.Status.Attempts).To(Equal(startingAttempts), "user retry budget must not be consumed by system errors") + Expect(isTerminal(ta)).To(BeFalse()) + + persisted := &flyteorgv1.TaskAction{} + Expect(k8sClient.Get(ctx, nn, persisted)).To(Succeed()) + Expect(persisted.Status.SystemFailures).To(Equal(uint32(1))) + }) + + It("converts to PermanentFailure once the threshold is exceeded", func() { + r := &TaskActionReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + Recorder: record.NewFakeRecorder(10), + eventsClient: &fakeEventsClient{}, + MaxSystemFailures: 2, + } + ta := &flyteorgv1.TaskAction{} + Expect(k8sClient.Get(ctx, nn, ta)).To(Succeed()) + ta.Status.SystemFailures = 2 + Expect(k8sClient.Status().Update(ctx, ta)).To(Succeed()) + Expect(k8sClient.Get(ctx, nn, ta)).To(Succeed()) + original := ta.DeepCopy() + + res, err := r.recordSystemError(ctx, ta, original, "pod", errFakeWebhookDenied) + Expect(err).NotTo(HaveOccurred()) + Expect(res.RequeueAfter).To(BeZero(), "terminal — should not requeue") + Expect(ta.Status.SystemFailures).To(Equal(uint32(3))) + Expect(ta.Status.ErrorState).NotTo(BeNil()) + Expect(ta.Status.ErrorState.Code).To(Equal(MaxSystemFailuresExceededCode)) + Expect(ta.Status.ErrorState.Kind).To(Equal("SYSTEM")) + Expect(ta.Status.ErrorState.Message).To(ContainSubstring("admission webhook")) + Expect(isTerminal(ta)).To(BeTrue()) + }) + }) + + Context("resetPluginResource", func() { + It("aborts the plugin and clears persisted plugin state", func() { + r := &TaskActionReconciler{} + ta := &flyteorgv1.TaskAction{} + ta.Status.PluginState = []byte("stale") + ta.Status.PluginStateVersion = 1 + + fp := &fakePlugin{id: "pod"} + r.resetPluginResource(context.Background(), ta, fp, nil) + + Expect(fp.abortCalls).To(Equal(1)) + Expect(ta.Status.PluginState).To(BeNil()) + Expect(ta.Status.PluginStateVersion).To(Equal(uint8(0))) + }) + }) + + Context("isSystemRetryableFailure", func() { + It("is true for PhaseInfoSystemRetryableFailure (PhaseRetryableFailure + kind SYSTEM)", func() { + info := pluginsCore.PhaseInfoSystemRetryableFailure("ResourceDeletedExternally", "node lost", nil) + Expect(isSystemRetryableFailure(info)).To(BeTrue()) + }) + + It("is false for a user-kind retryable failure", func() { + info := pluginsCore.PhaseInfoRetryableFailure("OOMKilled", "container OOMKilled", nil) + Expect(isSystemRetryableFailure(info)).To(BeFalse()) + }) + + It("is false for a permanent failure", func() { + info := pluginsCore.PhaseInfoFailure("BadInput", "invalid spec", nil) + Expect(isSystemRetryableFailure(info)).To(BeFalse()) + }) + + It("is false for a running phase", func() { + info := pluginsCore.PhaseInfoRunning(0, nil) + Expect(isSystemRetryableFailure(info)).To(BeFalse()) + }) + }) + + Context("systemErrorFromPhaseInfo", func() { + It("formats code and message from the ExecutionError", func() { + info := pluginsCore.PhaseInfoSystemRetryableFailure("ResourceDeletedExternally", "node lost", nil) + err := systemErrorFromPhaseInfo(info) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("ResourceDeletedExternally")) + Expect(err.Error()).To(ContainSubstring("node lost")) + }) + }) + Context("errorStateFromExecError", func() { It("returns nil for nil input", func() { Expect(errorStateFromExecError(nil)).To(BeNil()) diff --git a/executor/setup.go b/executor/setup.go index 78f5889ae59..1d59adbb07b 100644 --- a/executor/setup.go +++ b/executor/setup.go @@ -152,6 +152,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { reconciler.CatalogClient = asyncCatalogClient reconciler.Catalog = cacheClient reconciler.Recorder = mgr.GetEventRecorderFor("taskaction-controller") + reconciler.MaxSystemFailures = cfg.MaxSystemFailures if err := reconciler.SetupWithManager(mgr); err != nil { return fmt.Errorf("executor: failed to setup controller: %w", err) }