diff --git a/executor/pkg/config/config.go b/executor/pkg/config/config.go index ca4b47f9458..8dd9593042e 100644 --- a/executor/pkg/config/config.go +++ b/executor/pkg/config/config.go @@ -25,6 +25,12 @@ var ( CacheServiceURL: "http://localhost:8094", Cluster: "", MaxSystemFailures: 3, + // Default to 1 reconcile worker. This matches controller-runtime's own + // default (see sigs.k8s.io/controller-runtime/pkg/controller.Options) and + // preserves the historical single-worker behavior when this knob is + // unset; operators tune it upward to spend more CPU on parallel + // reconciles when the TaskAction queue grows. + MaxConcurrentReconciles: 1, GC: GCConfig{ Interval: stdconfig.Duration{Duration: 30 * time.Minute}, MaxTTL: stdconfig.Duration{Duration: 1 * time.Hour}, @@ -84,6 +90,14 @@ type Config struct { // converted to a permanent failure. MaxSystemFailures uint32 `json:"maxSystemFailures" pflag:",Max consecutive system-level failures before forcing permanent failure"` + // MaxConcurrentReconciles is the maximum number of concurrent reconciles + // the TaskAction controller may run. Maps directly to + // controller.Options.MaxConcurrentReconciles in controller-runtime, so the + // upstream documentation applies: each worker pulls one TaskAction at a + // time, so this scales the steady-state reconcile parallelism. A value of + // 0 means "defer to controller-runtime's own default" (currently 1). + MaxConcurrentReconciles uint32 `json:"maxConcurrentReconciles" pflag:",Max concurrent reconciles for the TaskAction controller (controller-runtime MaxConcurrentReconciles); 0 means use controller-runtime's default"` + // GC configures the garbage collector for terminal TaskActions. GC GCConfig `json:"gc" pflag:",Garbage collector configuration for terminal TaskActions"` } diff --git a/executor/pkg/config/config_flags.go b/executor/pkg/config/config_flags.go index f098b8d874b..164323e38f9 100755 --- a/executor/pkg/config/config_flags.go +++ b/executor/pkg/config/config_flags.go @@ -64,6 +64,7 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "EventsServiceURL"), defaultConfig.EventsServiceURL, "URL of the Event Service for action event updates") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "cluster"), defaultConfig.Cluster, "Cluster identifier for action events") cmdFlags.Uint32(fmt.Sprintf("%v%v", prefix, "maxSystemFailures"), defaultConfig.MaxSystemFailures, "Max consecutive system-level failures before forcing permanent failure") + cmdFlags.Uint32(fmt.Sprintf("%v%v", prefix, "maxConcurrentReconciles"), defaultConfig.MaxConcurrentReconciles, "Max concurrent reconciles for the TaskAction controller (controller-runtime MaxConcurrentReconciles); 0 means use controller-runtime's default") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "gc.interval"), defaultConfig.GC.Interval.String(), "How often the garbage collector runs. 0 disables GC.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "gc.maxTTL"), defaultConfig.GC.MaxTTL.String(), "Time-to-live for terminal TaskActions before deletion.") return cmdFlags diff --git a/executor/pkg/config/config_test.go b/executor/pkg/config/config_test.go new file mode 100644 index 00000000000..e6450b95b7d --- /dev/null +++ b/executor/pkg/config/config_test.go @@ -0,0 +1,30 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 +*/ + +package config + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +// TestDefaultMaxConcurrentReconciles locks the default value introduced in +// #7205. The default is 1 — one reconcile worker — to match +// controller-runtime's own default and preserve the historical +// single-worker behavior of the executor controller. Operators tune this +// upward to spend more CPU on parallel reconciles when the TaskAction +// queue grows; bumping the default would change the worker pool sizing of +// every existing deployment, so this test is deliberately a tripwire. +func TestDefaultMaxConcurrentReconciles(t *testing.T) { + assert.Equal(t, uint32(1), defaultConfig.MaxConcurrentReconciles, + "changing this default changes worker pool sizing for every existing deployment; "+ + "if intentional, update both this test and the package doc-comment together") +} diff --git a/executor/pkg/controller/taskaction_controller.go b/executor/pkg/controller/taskaction_controller.go index 6119c99378c..9f250854a1c 100644 --- a/executor/pkg/controller/taskaction_controller.go +++ b/executor/pkg/controller/taskaction_controller.go @@ -35,6 +35,7 @@ import ( "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" @@ -96,6 +97,12 @@ type TaskActionReconciler struct { eventsClient workflowconnect.EventsProxyServiceClient cluster string MaxSystemFailures uint32 + // MaxConcurrentReconciles, when > 0, is forwarded to the controller-runtime + // builder via WithOptions(controller.Options{MaxConcurrentReconciles: ...}) + // in SetupWithManager. A zero value defers to controller-runtime's own + // default (1) so existing callers that never set this field keep their + // pre-#7205 behavior. + MaxConcurrentReconciles int } // isSystemRetryableFailure reports whether the plugin transition is a @@ -880,12 +887,26 @@ func createStateJSON(actionSpec *workflow.ActionSpec, phase string) string { } // SetupWithManager sets up the controller with the Manager. +// +// MaxConcurrentReconciles is forwarded to controller-runtime via +// WithOptions only when set (>0). When unset (zero value), the WithOptions +// call is skipped and controller-runtime applies its own default of 1, so +// the historical single-worker behavior is preserved for callers that +// have not yet wired the new config knob. Non-positive values +// (including the zero value) intentionally fall through to that default +// rather than disabling the controller, since 0 workers would silently +// stop reconciliation altogether. func (r *TaskActionReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). + builder := ctrl.NewControllerManagedBy(mgr). For(&flyteorgv1.TaskAction{}). Owns(&corev1.Pod{}). - Named("taskaction"). - Complete(r) + Named("taskaction") + if r.MaxConcurrentReconciles > 0 { + builder = builder.WithOptions(controller.Options{ + MaxConcurrentReconciles: r.MaxConcurrentReconciles, + }) + } + return builder.Complete(r) } // pluginResolver is satisfied by *plugin.Registry and allows mocking in tests. diff --git a/executor/pkg/controller/taskaction_controller_test.go b/executor/pkg/controller/taskaction_controller_test.go index bbdd045d6fa..62eb70c532e 100644 --- a/executor/pkg/controller/taskaction_controller_test.go +++ b/executor/pkg/controller/taskaction_controller_test.go @@ -326,6 +326,28 @@ var _ = Describe("TaskAction Controller", func() { }) }) + // Regression for #7205. Pre-fix, SetupWithManager unconditionally built + // the controller without WithOptions, so MaxConcurrentReconciles had no + // effect. The behavioral assertion below is intentionally narrow: it + // verifies the field is plumbed onto the reconciler and that values that + // would have been silently dropped before are now visible to + // SetupWithManager. The end-to-end "controller-runtime received this + // many workers" check belongs in an integration test (which would + // require a real envtest manager); here we lock the contract that the + // reconciler exposes the knob. + Context("MaxConcurrentReconciles plumbing (#7205)", func() { + It("defaults to 0 on a zero-value reconciler so SetupWithManager defers to controller-runtime", func() { + r := &TaskActionReconciler{} + Expect(r.MaxConcurrentReconciles).To(Equal(0), + "expected zero value so SetupWithManager skips WithOptions and inherits the upstream default") + }) + + It("preserves an explicitly configured value for SetupWithManager to forward", func() { + r := &TaskActionReconciler{MaxConcurrentReconciles: 4} + Expect(r.MaxConcurrentReconciles).To(Equal(4)) + }) + }) + Context("recordSystemError", func() { const handleErrResource = "handle-err-resource" ctx := context.Background() diff --git a/executor/setup.go b/executor/setup.go index 1d59adbb07b..4637087f337 100644 --- a/executor/setup.go +++ b/executor/setup.go @@ -153,6 +153,12 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { reconciler.Catalog = cacheClient reconciler.Recorder = mgr.GetEventRecorderFor("taskaction-controller") reconciler.MaxSystemFailures = cfg.MaxSystemFailures + // uint32 -> int conversion is safe in practice: cfg.MaxConcurrentReconciles + // is a user-supplied pflag whose operationally meaningful values + // (single-digit to a few thousand) fit comfortably in an int on every + // supported platform. See SetupWithManager for the "0 means use + // controller-runtime's own default" semantics. + reconciler.MaxConcurrentReconciles = int(cfg.MaxConcurrentReconciles) if err := reconciler.SetupWithManager(mgr); err != nil { return fmt.Errorf("executor: failed to setup controller: %w", err) }