Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions charts/flyte-binary/templates/crds/flyte.org_taskactions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,15 @@ spec:
description: StateJSON is the JSON serialized NodeStatus that was
last sent to the State Service
type: string
systemFailures:
description: |-
SystemFailures counts system-level failures observed during reconciliation —
either Go errors returned from Plugin.Handle (e.g. transient k8s API errors,
admission webhook denials) or plugin transitions reporting a system-retryable
failure (e.g. resource deleted externally). When it exceeds the configured
maximum, the TaskAction is converted to a permanent failure.
format: int32
type: integer
type: object
required:
- spec
Expand Down
9 changes: 9 additions & 0 deletions docker/devbox-bundled/manifests/complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7038,6 +7038,15 @@ spec:
description: StateJSON is the JSON serialized NodeStatus that was
last sent to the State Service
type: string
systemFailures:
description: |-
SystemFailures counts system-level failures observed during reconciliation —
either Go errors returned from Plugin.Handle (e.g. transient k8s API errors,
admission webhook denials) or plugin transitions reporting a system-retryable
failure (e.g. resource deleted externally). When it exceeds the configured
maximum, the TaskAction is converted to a permanent failure.
format: int32
type: integer
type: object
required:
- spec
Expand Down
9 changes: 9 additions & 0 deletions docker/devbox-bundled/manifests/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7038,6 +7038,15 @@ spec:
description: StateJSON is the JSON serialized NodeStatus that was
last sent to the State Service
type: string
systemFailures:
description: |-
SystemFailures counts system-level failures observed during reconciliation —
either Go errors returned from Plugin.Handle (e.g. transient k8s API errors,
admission webhook denials) or plugin transitions reporting a system-retryable
failure (e.g. resource deleted externally). When it exceeds the configured
maximum, the TaskAction is converted to a permanent failure.
format: int32
type: integer
type: object
required:
- spec
Expand Down
8 changes: 8 additions & 0 deletions executor/api/v1/taskaction_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ type TaskActionStatus struct {
// +optional
Attempts uint32 `json:"attempts,omitempty"`

// SystemFailures counts system-level failures observed during reconciliation —
// either Go errors returned from Plugin.Handle (e.g. transient k8s API errors,
// admission webhook denials) or plugin transitions reporting a system-retryable
// failure (e.g. resource deleted externally). When it exceeds the configured
// maximum, the TaskAction is converted to a permanent failure.
// +optional
SystemFailures uint32 `json:"systemFailures,omitempty"`

// CacheStatus is the latest observed cache lookup result for this action.
// +optional
CacheStatus core.CatalogCacheStatus `json:"cacheStatus,omitempty"`
Expand Down
9 changes: 9 additions & 0 deletions executor/config/crd/bases/flyte.org_taskactions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,15 @@ spec:
description: StateJSON is the JSON serialized NodeStatus that was
last sent to the State Service
type: string
systemFailures:
description: |-
SystemFailures counts system-level failures observed during reconciliation —
either Go errors returned from Plugin.Handle (e.g. transient k8s API errors,
admission webhook denials) or plugin transitions reporting a system-retryable
failure (e.g. resource deleted externally). When it exceeds the configured
maximum, the TaskAction is converted to a permanent failure.
format: int32
type: integer
type: object
required:
- spec
Expand Down
6 changes: 6 additions & 0 deletions executor/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var (
EventsServiceURL: "http://localhost:8090",
CacheServiceURL: "http://localhost:8094",
Cluster: "",
MaxSystemFailures: 3,
GC: GCConfig{
Interval: stdconfig.Duration{Duration: 30 * time.Minute},
MaxTTL: stdconfig.Duration{Duration: 1 * time.Hour},
Expand Down Expand Up @@ -78,6 +79,11 @@ type Config struct {
// Cluster is the cluster identifier attached to action events.
Cluster string `json:"cluster" pflag:",Cluster identifier for action events"`

// MaxSystemFailures bounds consecutive system-level failures (Plugin.Handle Go
// errors and plugin-reported system-retryable failures) before a TaskAction is
// converted to a permanent failure.
MaxSystemFailures uint32 `json:"maxSystemFailures" pflag:",Max consecutive system-level failures before forcing permanent failure"`

// GC configures the garbage collector for terminal TaskActions.
GC GCConfig `json:"gc" pflag:",Garbage collector configuration for terminal TaskActions"`
}
Expand Down
1 change: 1 addition & 0 deletions executor/pkg/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

143 changes: 127 additions & 16 deletions executor/pkg/controller/taskaction_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ const (
TaskActionDefaultRequeueDuration = 10 * time.Second
taskActionFinalizer = "flyte.org/plugin-finalizer"

// DefaultMaxSystemFailures bounds consecutive system errors before the
// TaskAction is forced to PermanentFailure.
DefaultMaxSystemFailures uint32 = 3

// MaxSystemFailuresExceededCode is the ExecutionError.Code stamped on
// TaskActions that hit the system-failure ceiling.
MaxSystemFailuresExceededCode = "MaxSystemFailuresExceeded"

// LabelTerminationStatus marks a TaskAction as terminated for GC discovery.
LabelTerminationStatus = "flyte.org/termination-status"
// LabelCompletedTime records the UTC time (minute precision) when the TaskAction became terminal.
Expand All @@ -77,16 +85,116 @@ const (
// TaskActionReconciler reconciles a TaskAction object
type TaskActionReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
PluginRegistry *plugin.Registry
DataStore *storage.DataStore
SecretManager pluginsCore.SecretManager
ResourceManager pluginsCore.ResourceManager
CatalogClient catalog.AsyncClient
Catalog catalog.Client
eventsClient workflowconnect.EventsProxyServiceClient
cluster string
Scheme *runtime.Scheme
Recorder record.EventRecorder
PluginRegistry *plugin.Registry
DataStore *storage.DataStore
SecretManager pluginsCore.SecretManager
ResourceManager pluginsCore.ResourceManager
CatalogClient catalog.AsyncClient
Catalog catalog.Client
eventsClient workflowconnect.EventsProxyServiceClient
cluster string
MaxSystemFailures uint32
}

// isSystemRetryableFailure reports whether the plugin transition is a
// PhaseRetryableFailure with kind=SYSTEM (as produced by PhaseInfoSystemRetryableFailure).
func isSystemRetryableFailure(phaseInfo pluginsCore.PhaseInfo) bool {
if phaseInfo.Phase() != pluginsCore.PhaseRetryableFailure {
return false
}
execErr := phaseInfo.Err()
return execErr != nil && execErr.GetKind() == core.ExecutionError_SYSTEM
}

func systemErrorFromPhaseInfo(phaseInfo pluginsCore.PhaseInfo) error {
if execErr := phaseInfo.Err(); execErr != nil {
return fmt.Errorf("[%s] %s", execErr.GetCode(), execErr.GetMessage())
}
return fmt.Errorf("system retryable failure")
}

func (r *TaskActionReconciler) maxSystemFailures() uint32 {
if r.MaxSystemFailures == 0 {
return DefaultMaxSystemFailures
}
return r.MaxSystemFailures
}

// resetPluginResource aborts any in-flight plugin resource and clears persisted
// plugin state so the next reconcile starts fresh from PluginPhaseNotStarted.
func (r *TaskActionReconciler) resetPluginResource(
ctx context.Context,
taskAction *flyteorgv1.TaskAction,
p pluginsCore.Plugin,
tCtx pluginsCore.TaskExecutionContext,
) {
if abortErr := p.Abort(ctx, tCtx); abortErr != nil {
log.FromContext(ctx).Error(abortErr, "failed to abort during system-error reset", "plugin", p.GetID())
}
taskAction.Status.PluginState = nil
taskAction.Status.PluginStateVersion = 0
}

// recordSystemError increments Status.SystemFailures and either requeues for
// another attempt or, once the configured threshold is exceeded, converts the
// TaskAction to a permanent failure. It does not touch the underlying plugin
// resource — callers that observed a failure phase should invoke
// resetPluginResource first.
func (r *TaskActionReconciler) recordSystemError(
ctx context.Context,
taskAction *flyteorgv1.TaskAction,
original *flyteorgv1.TaskAction,
pluginID string,
handleErr error,
) (ctrl.Result, error) {
logger := log.FromContext(ctx)
logger.Error(handleErr, "system error from plugin", "plugin", pluginID)
if r.Recorder != nil {
r.Recorder.Eventf(taskAction, corev1.EventTypeWarning, string(FailedPluginHandle),
"Plugin %q system error: %v", pluginID, handleErr)
}

taskAction.Status.SystemFailures++
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we resetting this back to 0? Suppose we hit 2 system errors and then a success, shouldn't this be reset?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure actually. we don't reset it in v1.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldnt it be an issue for long running tasks that may have transient failures followed by successes in between. Task could silently die after accumulating nonconsecutive maxSystemFailures

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense, let me update it

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated it

if taskAction.Status.SystemFailures > r.maxSystemFailures() {
Comment thread
pingsutw marked this conversation as resolved.
execErr := &core.ExecutionError{
Kind: core.ExecutionError_SYSTEM,
Code: MaxSystemFailuresExceededCode,
Message: fmt.Sprintf("plugin %q failed with system error %d times: %v", pluginID, taskAction.Status.SystemFailures, handleErr),
}
return r.finalizePermanentFailure(ctx, taskAction, original, execErr)
}

if taskActionStatusChanged(original.Status, taskAction.Status) {
if updErr := r.Status().Update(ctx, taskAction); updErr != nil {
logger.Error(updErr, "failed to persist SystemFailures counter")
}
}
return ctrl.Result{RequeueAfter: TaskActionDefaultRequeueDuration}, nil
}

// finalizePermanentFailure converts the TaskAction to a terminal PermanentFailure
// with the given ExecutionError and stamps GC labels.
func (r *TaskActionReconciler) finalizePermanentFailure(
ctx context.Context,
taskAction *flyteorgv1.TaskAction,
original *flyteorgv1.TaskAction,
execErr *core.ExecutionError,
) (ctrl.Result, error) {
phaseInfo := pluginsCore.PhaseInfoFailed(pluginsCore.PhasePermanentFailure, execErr, nil)
mapPhaseToConditions(taskAction, phaseInfo)
taskAction.Status.PluginPhase = phaseInfo.Phase().String()
taskAction.Status.PluginPhaseVersion = phaseInfo.Version()
if updErr := r.updateTaskActionStatus(ctx, original, taskAction, phaseInfo); updErr != nil {
return ctrl.Result{}, updErr
}
if isTerminal(taskAction) {
if labelErr := r.ensureTerminalLabels(ctx, taskAction); labelErr != nil {
return ctrl.Result{}, labelErr
}
}
return ctrl.Result{}, nil
}

// NewTaskActionReconciler creates a new TaskActionReconciler
Expand Down Expand Up @@ -203,10 +311,7 @@ func (r *TaskActionReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if !cacheShortCircuited {
transition, err = p.Handle(ctx, tCtx)
if err != nil {
logger.Error(err, "plugin Handle failed", "plugin", p.GetID())
r.Recorder.Eventf(taskAction, corev1.EventTypeWarning, string(FailedPluginHandle),
"Plugin %q Handle failed: %v", p.GetID(), err)
return ctrl.Result{RequeueAfter: TaskActionDefaultRequeueDuration}, nil
return r.recordSystemError(ctx, taskAction, originalTaskActionInstance, p.GetID(), err)
}
}

Expand All @@ -218,8 +323,13 @@ func (r *TaskActionReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Map transition phase to TaskAction conditions
phaseInfo := transition.Info()

// In-place task restart: when a recoverable failure occurs, restart the pod within the
// same TaskAction rather than relying on the runs service to create a new TaskAction.
if !cacheShortCircuited && isSystemRetryableFailure(phaseInfo) {
r.resetPluginResource(ctx, taskAction, p, tCtx)
return r.recordSystemError(ctx, taskAction, originalTaskActionInstance, p.GetID(), systemErrorFromPhaseInfo(phaseInfo))
}

// In-place task restart on USER-kind retryable failure: bump Status.Attempts
// and relaunch the pod under the same TaskAction.
var restartAttempts uint32
if !cacheShortCircuited && phaseInfo.Phase() == pluginsCore.PhaseRetryableFailure {
currentAttempts := observedAttempts(taskAction)
Expand Down Expand Up @@ -627,6 +737,7 @@ func taskActionStatusChanged(oldStatus, newStatus flyteorgv1.TaskActionStatus) b
oldStatus.PluginPhase != newStatus.PluginPhase ||
oldStatus.PluginPhaseVersion != newStatus.PluginPhaseVersion ||
oldStatus.Attempts != newStatus.Attempts ||
oldStatus.SystemFailures != newStatus.SystemFailures ||
oldStatus.CacheStatus != newStatus.CacheStatus {
return true
}
Expand Down
Loading
Loading