diff --git a/actions/k8s/client.go b/actions/k8s/client.go index cf9ce2a4a5f..5a752ffb1e6 100644 --- a/actions/k8s/client.go +++ b/actions/k8s/client.go @@ -43,13 +43,16 @@ type ActionUpdate struct { ErrorState *executorv1.ErrorState } -const labelTerminalStatusRecorded = "flyte.org/terminal-status-recorded" +const ( + labelTerminalStatusRecorded = "flyte.org/terminal-status-recorded" + // flyteNamespace is the single Kubernetes namespace used for all Flyte resources. + flyteNamespace = "flyte" +) // ActionsClient handles all etcd/K8s TaskAction CR operations for the Actions service. type ActionsClient struct { k8sClient client.WithWatch sharedCache ctrlcache.Cache - namespace string bufferSize int runClient workflowconnect.InternalRunServiceClient // recordedFilter deduplicates RecordAction calls across watch reconnects. @@ -71,14 +74,13 @@ type ActionsClient struct { } // NewActionsClient creates a new Kubernetes-based actions client. -func NewActionsClient(k8sClient client.WithWatch, sharedCache ctrlcache.Cache, namespace string, bufferSize int, numWorkers int, runClient workflowconnect.InternalRunServiceClient, recordFilterSize int, scope promutils.Scope) *ActionsClient { +func NewActionsClient(k8sClient client.WithWatch, sharedCache ctrlcache.Cache, bufferSize int, numWorkers int, runClient workflowconnect.InternalRunServiceClient, recordFilterSize int, scope promutils.Scope) *ActionsClient { if numWorkers <= 0 { numWorkers = 1 } c := &ActionsClient{ k8sClient: k8sClient, sharedCache: sharedCache, - namespace: namespace, bufferSize: bufferSize, numWorkers: numWorkers, runClient: runClient, @@ -109,14 +111,13 @@ func (c *ActionsClient) Enqueue(ctx context.Context, action *actions.Action, run switch action.GetSpec().(type) { case *actions.Action_Task: taskActionName := buildTaskActionName(actionID) - namespace := buildNamespace(actionID.Run) - if err := k8sutil.EnsureNamespaceExists(ctx, c.k8sClient, namespace); err != nil { - return fmt.Errorf("failed to ensure namespace %s: %w", namespace, err) + if err := k8sutil.EnsureNamespaceExists(ctx, c.k8sClient, flyteNamespace); err != nil { + return fmt.Errorf("failed to ensure namespace %s: %w", flyteNamespace, err) } taskAction := &executorv1.TaskAction{ ObjectMeta: metav1.ObjectMeta{ Name: taskActionName, - Namespace: namespace, + Namespace: flyteNamespace, Labels: map[string]string{ "flyte.org/project": actionID.Run.Project, "flyte.org/domain": actionID.Run.Domain, @@ -138,7 +139,7 @@ func (c *ActionsClient) Enqueue(ctx context.Context, action *actions.Action, run parentName := buildTaskActionName(parentID) parent := &executorv1.TaskAction{} - if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: parentName, Namespace: namespace}, parent); err != nil { + if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: parentName, Namespace: flyteNamespace}, parent); err != nil { return fmt.Errorf("failed to get parent TaskAction %s: %w", parentName, err) } parentTaskAction = parent @@ -214,7 +215,7 @@ func (c *ActionsClient) AbortAction(ctx context.Context, actionID *common.Action logger.Infof(ctx, "Aborting action %s (reason: %v)", taskActionName, reason) taskAction := &executorv1.TaskAction{} - if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: taskActionName, Namespace: buildNamespace(actionID.Run)}, taskAction); err != nil { + if err := c.k8sClient.Get(ctx, client.ObjectKey{Name: taskActionName, Namespace: flyteNamespace}, taskAction); err != nil { return fmt.Errorf("failed to get TaskAction %s: %w", taskActionName, err) } @@ -233,7 +234,7 @@ func (c *ActionsClient) GetState(ctx context.Context, actionID *common.ActionIde taskAction := &executorv1.TaskAction{} if err := c.k8sClient.Get(ctx, client.ObjectKey{ Name: taskActionName, - Namespace: buildNamespace(actionID.Run), + Namespace: flyteNamespace, }, taskAction); err != nil { return "", fmt.Errorf("failed to get TaskAction %s: %w", taskActionName, err) } @@ -249,7 +250,7 @@ func (c *ActionsClient) PutState(ctx context.Context, actionID *common.ActionIde taskAction := &executorv1.TaskAction{} if err := c.k8sClient.Get(ctx, client.ObjectKey{ Name: taskActionName, - Namespace: buildNamespace(actionID.Run), + Namespace: flyteNamespace, }, taskAction); err != nil { return fmt.Errorf("failed to get TaskAction %s: %w", taskActionName, err) } @@ -279,7 +280,7 @@ func (c *ActionsClient) PutState(ctx context.Context, actionID *common.ActionIde func (c *ActionsClient) ListRunActions(ctx context.Context, runID *common.RunIdentifier) ([]*executorv1.TaskAction, error) { taskActionList := &executorv1.TaskActionList{} listOpts := []client.ListOption{ - client.InNamespace(buildNamespace(runID)), + client.InNamespace(flyteNamespace), client.MatchingLabels{ "flyte.org/project": runID.Project, "flyte.org/domain": runID.Domain, @@ -303,7 +304,7 @@ func (c *ActionsClient) ListChildActions(ctx context.Context, parentActionID *co // List all TaskActions in the same run taskActionList := &executorv1.TaskActionList{} listOpts := []client.ListOption{ - client.InNamespace(buildNamespace(parentActionID.Run)), + client.InNamespace(flyteNamespace), client.MatchingLabels{ "flyte.org/project": parentActionID.Run.Project, "flyte.org/domain": parentActionID.Run.Domain, @@ -340,7 +341,7 @@ func (c *ActionsClient) GetTaskAction(ctx context.Context, actionID *common.Acti taskAction := &executorv1.TaskAction{} if err := c.k8sClient.Get(ctx, client.ObjectKey{ Name: taskActionName, - Namespace: buildNamespace(actionID.Run), + Namespace: flyteNamespace, }, taskAction); err != nil { return nil, fmt.Errorf("failed to get TaskAction %s: %w", taskActionName, err) } @@ -393,7 +394,7 @@ func (c *ActionsClient) StartWatching(ctx context.Context) error { } c.mu.Unlock() - logger.Infof(ctx, "Starting TaskAction watcher for namespace: %s (workers: %d)", c.namespace, c.numWorkers) + logger.Infof(ctx, "Starting TaskAction watcher for namespace: %s (workers: %d)", flyteNamespace, c.numWorkers) if c.sharedCache == nil { return fmt.Errorf("shared cache is required for TaskAction informer") @@ -724,11 +725,6 @@ func buildTaskActionName(actionID *common.ActionIdentifier) string { return fmt.Sprintf("%s-%s", actionID.Run.Name, actionID.Name) } -// buildNamespace returns the Kubernetes namespace for a run: "-". -func buildNamespace(runID *common.RunIdentifier) string { - return fmt.Sprintf("%s-%s", runID.Project, runID.Domain) -} - // buildOutputUri computes the action-specific output URI from the TaskAction spec. // It uses the same path structure as the executor's ComputeActionOutputPath so that // the SDK can find outputs written by the executor. diff --git a/actions/k8s/client_test.go b/actions/k8s/client_test.go index cb8e284cf0a..22f61a97ed5 100644 --- a/actions/k8s/client_test.go +++ b/actions/k8s/client_test.go @@ -186,22 +186,8 @@ func TestBuildTaskActionName(t *testing.T) { }) } -func TestBuildNamespace(t *testing.T) { - t.Run("combines project and domain", func(t *testing.T) { - runID := &common.RunIdentifier{ - Project: "flytesnacks", - Domain: "development", - } - assert.Equal(t, "flytesnacks-development", buildNamespace(runID)) - }) - - t.Run("different project and domain", func(t *testing.T) { - runID := &common.RunIdentifier{ - Project: "myproject", - Domain: "production", - } - assert.Equal(t, "myproject-production", buildNamespace(runID)) - }) +func TestFlyteNamespace(t *testing.T) { + assert.Equal(t, "flyte", flyteNamespace) } func TestExtractTaskCacheKey(t *testing.T) { diff --git a/actions/setup.go b/actions/setup.go index 6f20d863403..b6ce2bc57d7 100644 --- a/actions/setup.go +++ b/actions/setup.go @@ -32,14 +32,13 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { actionsClient := actionsk8s.NewActionsClient( sc.K8sClient, sc.K8sCache, - sc.Namespace, cfg.WatchBufferSize, cfg.WatchWorkers, runClient, cfg.RecordFilterSize, sc.Scope, ) - logger.Infof(ctx, "Actions K8s client initialized for namespace: %s", sc.Namespace) + logger.Infof(ctx, "Actions K8s client initialized") if err := actionsClient.StartWatching(ctx); err != nil { return fmt.Errorf("actions: failed to start TaskAction watcher: %w", err)