Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions dataproxy/service/dataproxy_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/flyteorg/flyte/v2/dataproxy/config"
"github.com/flyteorg/flyte/v2/dataproxy/logs"
"github.com/flyteorg/flyte/v2/flytestdlib/logger"
"github.com/flyteorg/flyte/v2/flytestdlib/storage"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/common"
flyteIdlCore "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core"
"github.com/flyteorg/flyte/v2/dataproxy/logs"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/dataproxy"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/dataproxy/dataproxyconnect"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/project"
Expand Down Expand Up @@ -470,6 +470,10 @@ func (s *Service) GetActionData(
ctx context.Context,
req *connect.Request[dataproxy.GetActionDataRequest],
) (*connect.Response[dataproxy.GetActionDataResponse], error) {
if err := req.Msg.Validate(); err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}

actionId := req.Msg.GetActionId()

urisResp, err := s.runClient.GetActionDataURIs(ctx, connect.NewRequest(&workflow.GetActionDataURIsRequest{
Expand All @@ -495,11 +499,17 @@ func (s *Service) GetActionData(
}
logger.Infof(groupCtx, "GetActionData: reading inputs from %s", inputRef)
if err := s.dataStore.ReadProtobuf(groupCtx, inputRef, resp.Inputs); err != nil {
logger.Errorf(groupCtx, "GetActionData: failed to read inputs from %s: %v", inputRef, err)
return connect.NewError(connect.CodeInternal, fmt.Errorf("failed to read inputs from %s: %w", inputRef, err))
if !storage.IsNotFound(err) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return an error if the input is not found?

Copy link
Copy Markdown
Contributor Author

@BarryWu0812 BarryWu0812 Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/flyteorg/flyte/pull/7284/changes#diff-5698a432711a8b677ea5e6d843f3a66a0c8cff4ebfaa6d262b844456364b46fbL11-L790

I saw the original logic is to skip the error that input is not found, so I kept the same logic.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, i see

logger.Errorf(groupCtx, "GetActionData: failed to read inputs from %s: %v", inputRef, err)
return connect.NewError(connect.CodeInternal, fmt.Errorf("failed to read inputs from %s: %w", inputRef, err))
}
} else {
logger.Debugf(groupCtx, "Read %d input literals and %d action contexts", len(resp.Inputs.Literals), len(resp.Inputs.Context))
}
return nil
})
} else {
logger.Warnf(ctx, "Action %s has empty InputURI", req.Msg.ActionId.Name)
}

if urisResp.Msg.GetOutputsUri() != "" {
Expand All @@ -508,11 +518,16 @@ func (s *Service) GetActionData(
logger.Infof(groupCtx, "GetActionData: reading outputs from %s", outputRef)
var inputsOrOutputs task.Inputs
if err := s.dataStore.ReadProtobuf(groupCtx, outputRef, &inputsOrOutputs); err != nil {
logger.Errorf(groupCtx, "GetActionData: failed to read outputs from %s: %v", outputRef, err)
return connect.NewError(connect.CodeInternal, fmt.Errorf("failed to read outputs from %s: %w", outputRef, err))
}
resp.Outputs = &task.Outputs{
Literals: inputsOrOutputs.GetLiterals(),
if !storage.IsNotFound(err) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

logger.Errorf(groupCtx, "GetActionData: failed to read outputs from %s: %v", outputRef, err)
return connect.NewError(connect.CodeInternal, fmt.Errorf("failed to read outputs from %s: %w", outputRef, err))
}
logger.Debugf(groupCtx, "Outputs not found at %s (action may not have finished)", urisResp.Msg.GetOutputsUri())
} else {
resp.Outputs = &task.Outputs{
Literals: inputsOrOutputs.GetLiterals(),
}
logger.Debugf(groupCtx, "Read %d output literals", len(resp.Outputs.Literals))
}
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion dataproxy/service/dataproxy_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/dataproxy"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/dataproxy/dataproxyconnect"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/project"
projectMocks "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/project/projectconnect/mocks"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/task"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/workflow"
projectMocks "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/project/projectconnect/mocks"
workflowMocks "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/workflow/workflowconnect/mocks"
)

Expand Down
128 changes: 29 additions & 99 deletions runs/service/run_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import (
"time"

"connectrpc.com/connect"
"github.com/flyteorg/flyte/v2/flytestdlib/app"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand All @@ -25,6 +23,8 @@ import (
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/actions/actionsconnect"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/common"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/dataproxy"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/dataproxy/dataproxyconnect"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/project/projectconnect"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/task"
"github.com/flyteorg/flyte/v2/gen/go/flyteidl2/workflow"
Expand All @@ -39,12 +39,20 @@ import (
type RunService struct {
repo interfaces.Repository
actionsClient actionsconnect.ActionsServiceClient
dataProxyClient actionDataClient
projectClient projectconnect.ProjectServiceClient
storagePrefix string
dataStore *storage.DataStore
abortReconciler *AbortReconciler
}

type actionDataClient interface {
GetActionData(
ctx context.Context,
req *connect.Request[dataproxy.GetActionDataRequest],
) (*connect.Response[dataproxy.GetActionDataResponse], error)
}

const (
runIDLength = 20
runStringFormat = "r%s"
Expand Down Expand Up @@ -105,10 +113,19 @@ func (s *RunService) WatchGroups(ctx context.Context, req *connect.Request[workf
}

// NewRunService creates a new RunService instance
func NewRunService(repo interfaces.Repository, actionsClient actionsconnect.ActionsServiceClient, projectClient projectconnect.ProjectServiceClient, storagePrefix string, dataStore *storage.DataStore, reconciler *AbortReconciler) *RunService {
func NewRunService(
repo interfaces.Repository,
actionsClient actionsconnect.ActionsServiceClient,
dataProxyClient dataproxyconnect.DataProxyServiceClient,
projectClient projectconnect.ProjectServiceClient,
storagePrefix string,
dataStore *storage.DataStore,
reconciler *AbortReconciler,
) *RunService {
return &RunService{
repo: repo,
actionsClient: actionsClient,
dataProxyClient: dataProxyClient,
projectClient: projectClient,
storagePrefix: storagePrefix,
dataStore: dataStore,
Expand Down Expand Up @@ -746,7 +763,7 @@ func lastAttemptIsTerminal(attempts []*workflow.ActionAttempt) bool {
return IsTerminalPhase(last.GetPhase())
}

// GetActionData gets input and output data for an action by reading from storage.
// GetActionData keeps backward compatibility by delegating data reads to DataProxy.
func (s *RunService) GetActionData(
ctx context.Context,
req *connect.Request[workflow.GetActionDataRequest],
Expand All @@ -758,107 +775,20 @@ func (s *RunService) GetActionData(
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}

// Get action from DB for storage URIs
action, err := s.repo.ActionRepo().GetAction(ctx, req.Msg.ActionId)
if err != nil {
logger.Errorf(ctx, "Failed to get action: %v", err)
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("action not found: %w", err))
if s.dataProxyClient == nil {
return nil, connect.NewError(connect.CodeUnavailable, fmt.Errorf("dataproxy client is not configured"))
}

inputURI, _ := extractStorageURIs(action.ActionSpec)

info := &workflow.RunInfo{}
if err := proto.Unmarshal(action.DetailedInfo, info); err != nil {
dpResp, err := s.dataProxyClient.GetActionData(ctx, connect.NewRequest(&dataproxy.GetActionDataRequest{
ActionId: req.Msg.GetActionId(),
}))
if err != nil {
return nil, err
}

resp := &workflow.GetActionDataResponse{
Inputs: &task.Inputs{},
Outputs: &task.Outputs{},
}

// Read inputs from storage
group, groupCtx := errgroup.WithContext(ctx)
if inputURI != "" {
group.Go(func() error {
inputRef := storage.DataReference(inputURI)
logger.Debugf(groupCtx, "Reading inputs from: %s", inputRef)
if err := s.dataStore.ReadProtobuf(groupCtx, inputRef, resp.Inputs); err != nil {
if !storage.IsNotFound(err) {
logger.Errorf(groupCtx, "Failed to read inputs from %s: %v", inputRef, err)
return connect.NewError(connect.CodeInternal, fmt.Errorf("failed to read inputs: %w", err))
}
logger.Debugf(groupCtx, "Inputs not found at %s", inputRef)
} else {
logger.Debugf(groupCtx, "Read %d input literals and %d action contexts", len(resp.Inputs.Literals), len(resp.Inputs.Context))
}
return nil
})
} else {
logger.Warnf(ctx, "Action %s has empty InputURI", req.Msg.ActionId.Name)
}

// Read outputs from storage (only present if action succeeded)
if action.Phase == int32(common.ActionPhase_ACTION_PHASE_SUCCEEDED) {
group.Go(func() error {
// There are no attempts for trace actions, so we can skip the attempt validation
var attempts []*workflow.ActionAttempt
var err error
if workflow.ActionType(action.ActionType) == workflow.ActionType_ACTION_TYPE_TRACE {
if info.GetOutputsUri() == "" {
return nil
}
logger.Debugf(groupCtx, "Reading outputs from: %s", info.GetOutputsUri())

outputMap := &core.LiteralMap{}
if err := s.dataStore.ReadProtobuf(groupCtx, storage.DataReference(info.GetOutputsUri()), outputMap); err != nil {
if !storage.IsNotFound(err) {
logger.Errorf(groupCtx, "Failed to read outputs from %s: %v", info.GetOutputsUri(), err)
return connect.NewError(connect.CodeInternal, fmt.Errorf("failed to read outputs: %w", err))
}
logger.Debugf(groupCtx, "Outputs not found at %s (action may not have finished)", info.GetOutputsUri())
} else {
resp.Outputs = literalMapToOutputs(outputMap)
logger.Debugf(groupCtx, "Read %d output literals", len(resp.Outputs.Literals))
}

return nil
}

// Default to "task" action types
attempts, err = s.getAttempts(groupCtx, req.Msg.GetActionId())
if err != nil {
return err
}

if len(attempts) == 0 {
return app.NewServerError(codes.NotFound, "outputs not available, no attempts for action")
}

outputUri := attempts[len(attempts)-1].GetOutputs().GetOutputUri()
if outputUri == "" {
return app.NewServerError(codes.NotFound, "outputs not available")
}

logger.Debugf(groupCtx, "Reading outputs from: %s", outputUri)
outputMap := &core.LiteralMap{}
if err := s.dataStore.ReadProtobuf(groupCtx, storage.DataReference(outputUri), outputMap); err != nil {
if !storage.IsNotFound(err) {
logger.Errorf(groupCtx, "Failed to read outputs from %s: %v", outputUri, err)
return connect.NewError(connect.CodeInternal, fmt.Errorf("failed to read outputs: %w", err))
}
logger.Debugf(groupCtx, "Outputs not found at %s (action may not have finished)", outputUri)
} else {
resp.Outputs = literalMapToOutputs(outputMap)
logger.Debugf(groupCtx, "Read %d output literals", len(resp.Outputs.Literals))
}

return nil
})
}

if err := group.Wait(); err != nil {
return nil, err
Inputs: dpResp.Msg.GetInputs(),
Outputs: dpResp.Msg.GetOutputs(),
}

logger.Infof(ctx, "Retrieved action data for: %s (inputs=%d, outputs=%d)",
Expand Down
Loading
Loading