Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions app/internal/k8s/app_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/flyteorg/flyte/v2/app/internal/config"
"github.com/flyteorg/flyte/v2/flytestdlib/k8s"
"github.com/flyteorg/flyte/v2/flytestdlib/logger"
"github.com/flyteorg/flyte/v2/flytestdlib/utils"
flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app"
flytecore "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core"
)
Expand Down Expand Up @@ -582,7 +583,7 @@ func (c *AppK8sClient) buildKService(app *flyteapp.App) (*servingv1.Service, err
}

// buildPodSpec constructs a corev1.PodSpec from an App Spec.
// Supports Container payload only for now; K8sPod support can be added in a follow-up.
// Supports Container and K8sPod payloads.
func buildPodSpec(spec *flyteapp.Spec) (corev1.PodSpec, error) {
switch p := spec.GetAppPayload().(type) {
case *flyteapp.Spec_Container:
Expand Down Expand Up @@ -612,9 +613,18 @@ func buildPodSpec(spec *flyteapp.Spec) (corev1.PodSpec, error) {
}, nil

case *flyteapp.Spec_Pod:
// K8sPod payloads are not yet supported — the pod spec serialization
// from flyteplugins is needed for a complete implementation.
return corev1.PodSpec{}, fmt.Errorf("K8sPod app payload is not yet supported")
pod := p.Pod
if pod == nil || pod.GetPodSpec() == nil {
return corev1.PodSpec{}, fmt.Errorf("K8sPod app payload has no pod spec")
}
var podSpec corev1.PodSpec
if err := utils.UnmarshalStructToObj(pod.GetPodSpec(), &podSpec); err != nil {
return corev1.PodSpec{}, fmt.Errorf("failed to unmarshal K8sPod spec: %w", err)
}
if podSpec.EnableServiceLinks == nil {
podSpec.EnableServiceLinks = boolPtr(false)
}
return podSpec, nil

default:
return corev1.PodSpec{}, fmt.Errorf("app spec has no payload (container or pod required)")
Expand Down
139 changes: 139 additions & 0 deletions app/internal/k8s/app_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"testing"
"time"

"google.golang.org/protobuf/types/known/structpb"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -740,3 +743,139 @@ func TestKserviceToStatus_Messages(t *testing.T) {
})
}
}


// transformStructToStructPB converts an arbitrary Go object into a *structpb.Struct
// by round-tripping through JSON. It fails the test on any marshaling error.
func transformStructToStructPB(t *testing.T, obj interface{}) *structpb.Struct {
t.Helper()
data, err := json.Marshal(obj)
require.NoError(t, err)
m := make(map[string]interface{})
err = json.Unmarshal(data, &m)
require.NoError(t, err)
s, err := structpb.NewStruct(m)
require.NoError(t, err)
return s
}

func TestBuildPodSpec_Container(t *testing.T) {
spec := &flyteapp.Spec{
AppPayload: &flyteapp.Spec_Container{
Container: &flytecoreapp.Container{
Image: "nginx:latest",
Command: []string{"nginx"},
Args: []string{"-g", "daemon off;"},
Env: []*flytecoreapp.KeyValuePair{
{Key: "FOO", Value: "bar"},
},
Ports: []*flytecoreapp.ContainerPort{
{ContainerPort: 8080, Name: "http"},
},
Resources: &flytecoreapp.Resources{
Requests: []*flytecoreapp.Resources_ResourceEntry{
{Name: flytecoreapp.Resources_CPU, Value: "100m"},
{Name: flytecoreapp.Resources_MEMORY, Value: "128Mi"},
},
},
},
},
}

podSpec, err := buildPodSpec(spec)
require.NoError(t, err)
require.Len(t, podSpec.Containers, 1)
assert.Equal(t, "app", podSpec.Containers[0].Name)
assert.Equal(t, "nginx:latest", podSpec.Containers[0].Image)
assert.Equal(t, []string{"nginx"}, podSpec.Containers[0].Command)
assert.Equal(t, []string{"-g", "daemon off;"}, podSpec.Containers[0].Args)
assert.Equal(t, []corev1.EnvVar{{Name: "FOO", Value: "bar"}}, podSpec.Containers[0].Env)
assert.Equal(t, []corev1.ContainerPort{{ContainerPort: 8080, Name: "http"}}, podSpec.Containers[0].Ports)
assert.Equal(t, "100m", podSpec.Containers[0].Resources.Requests.Cpu().String())
assert.Equal(t, "128Mi", podSpec.Containers[0].Resources.Requests.Memory().String())
assert.NotNil(t, podSpec.EnableServiceLinks)
assert.False(t, *podSpec.EnableServiceLinks)
}

func TestBuildPodSpec_Pod(t *testing.T) {
podSpecMap := map[string]interface{}{
"containers": []map[string]interface{}{
{
"name": "app",
"image": "my-image:v1",
"ports": []map[string]interface{}{
{"containerPort": float64(80), "name": "http"},
},
"resources": map[string]interface{}{
"requests": map[string]interface{}{
"cpu": "250m",
"memory": "256Mi",
},
},
},
},
"restartPolicy": "Always",
}

spec := &flyteapp.Spec{
AppPayload: &flyteapp.Spec_Pod{
Pod: &flytecoreapp.K8SPod{
PodSpec: transformStructToStructPB(t, podSpecMap),
},
},
}

podSpec, err := buildPodSpec(spec)
require.NoError(t, err)
require.Len(t, podSpec.Containers, 1)
assert.Equal(t, "app", podSpec.Containers[0].Name)
assert.Equal(t, "my-image:v1", podSpec.Containers[0].Image)
assert.Len(t, podSpec.Containers[0].Ports, 1)
assert.Equal(t, int32(80), podSpec.Containers[0].Ports[0].ContainerPort)
assert.Equal(t, "250m", podSpec.Containers[0].Resources.Requests.Cpu().String())
assert.Equal(t, "256Mi", podSpec.Containers[0].Resources.Requests.Memory().String())
assert.Equal(t, corev1.RestartPolicyAlways, podSpec.RestartPolicy)
assert.NotNil(t, podSpec.EnableServiceLinks)
assert.False(t, *podSpec.EnableServiceLinks)
}

func TestBuildPodSpec_Pod_NilPodSpec(t *testing.T) {
spec := &flyteapp.Spec{
AppPayload: &flyteapp.Spec_Pod{
Pod: &flytecoreapp.K8SPod{},
},
}

_, err := buildPodSpec(spec)
require.Error(t, err)
assert.Contains(t, err.Error(), "has no pod spec")
}

func TestBuildPodSpec_Pod_InvalidJSON(t *testing.T) {
// Create a Struct that cannot be unmarshaled into corev1.PodSpec.
// "containers" must be an array, not a string.
s := &structpb.Struct{
Fields: map[string]*structpb.Value{
"containers": {Kind: &structpb.Value_StringValue{StringValue: "not-an-array"}},
},
}

spec := &flyteapp.Spec{
AppPayload: &flyteapp.Spec_Pod{
Pod: &flytecoreapp.K8SPod{
PodSpec: s,
},
},
}

_, err := buildPodSpec(spec)
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to unmarshal K8sPod spec")
}

func TestBuildPodSpec_NoPayload(t *testing.T) {
spec := &flyteapp.Spec{}
_, err := buildPodSpec(spec)
require.Error(t, err)
assert.Contains(t, err.Error(), "app spec has no payload")
}
Loading