Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (p *Planner[T]) ConfigureFetch() resolve.FetchConfiguration {
return resolve.FetchConfiguration{}
}

dataSource, err = grpcdatasource.NewDataSource(p.grpcClient, grpcdatasource.DataSourceConfig{
dataSource, err = grpcdatasource.NewDataSource(grpcdatasource.NewGRPCTransport(p.grpcClient), grpcdatasource.DataSourceConfig{
Operation: &opDocument,
Definition: p.config.schemaConfiguration.upstreamSchemaAst,
Mapping: p.config.grpc.Mapping,
Expand Down
11 changes: 5 additions & 6 deletions v2/pkg/engine/datasource/grpc_datasource/grpc_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/cespare/xxhash/v2"
"github.com/tidwall/gjson"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/wundergraph/astjson"
Expand Down Expand Up @@ -44,7 +43,7 @@ var _ resolve.DataSource = (*DataSource)(nil)
// transforms the responses back to GraphQL format.
type DataSource struct {
plan *RPCExecutionPlan
cc grpc.ClientConnInterface
transport RPCTransport
rc *RPCCompiler
mapping *GRPCMapping
federationConfigs plan.FederationFieldConfigurations
Expand All @@ -68,8 +67,8 @@ type DataSourceConfig struct {
Disabled bool
}

// NewDataSource creates a new gRPC datasource
func NewDataSource(client grpc.ClientConnInterface, config DataSourceConfig) (*DataSource, error) {
// NewDataSource creates a new datasource with the given RPCTransport.
func NewDataSource(transport RPCTransport, config DataSourceConfig) (*DataSource, error) {
planner, err := NewPlanner(config.SubgraphName, config.Mapping, config.FederationConfigs)
if err != nil {
return nil, err
Expand All @@ -81,7 +80,7 @@ func NewDataSource(client grpc.ClientConnInterface, config DataSourceConfig) (*D

return &DataSource{
plan: plan,
cc: client,
transport: transport,
rc: config.Compiler,
mapping: config.Mapping,
definition: config.Definition,
Expand Down Expand Up @@ -152,7 +151,7 @@ func (d *DataSource) Load(ctx context.Context, headers http.Header, input []byte
builder := newJSONBuilder(item.Arena, d.mapping, variables)
errGrp.Go(func() error {
// Invoke the gRPC method - this will populate serviceCall.Output
err := d.cc.Invoke(errGrpCtx, serviceCall.MethodFullName(), serviceCall.Input, serviceCall.Output)
err := d.transport.Invoke(errGrpCtx, serviceCall.MethodFullName(), serviceCall.Input, serviceCall.Output)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func Test_DataSource_Load_WithEntity_Calls(t *testing.T) {
}

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -617,7 +617,7 @@ func Test_DataSource_Load_WithEntity_Calls_WithCompositeTypes(t *testing.T) {
}

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -1406,7 +1406,7 @@ func Test_DataSource_Load_WithEntity_Calls_And_Requires(t *testing.T) {
}

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -1821,7 +1821,7 @@ func Test_DataSource_Load_WithEntity_Calls_And_Requires_And_FieldResolvers(t *te
}

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -1965,7 +1965,7 @@ func Test_DataSource_Load_WithEntity_Calls_And_Requires_AbstractTypes(t *testing
}

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func Test_DataSource_Load_NullMetrics_NestedResolversNotInvoked(t *testing.T) {
compiler, err := NewProtoCompiler(grpctest.MustProtoSchema(t), testMapping())
require.NoError(t, err)

ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -191,7 +191,7 @@ func Test_DataSource_Load_NullCategory_FieldResolversNotInvoked(t *testing.T) {
compiler, err := NewProtoCompiler(grpctest.MustProtoSchema(t), testMapping())
require.NoError(t, err)

ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -226,7 +226,7 @@ func Test_DataSource_Load_ArgumentLessFieldResolversCalled(t *testing.T) {
compiler, err := NewProtoCompiler(grpctest.MustProtoSchema(t), testMapping())
require.NoError(t, err)

ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -267,7 +267,7 @@ func Test_DataSource_Load_NullCategory_ArgumentLessFieldResolversNotInvoked(t *t
compiler, err := NewProtoCompiler(grpctest.MustProtoSchema(t), testMapping())
require.NoError(t, err)

ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down
36 changes: 18 additions & 18 deletions v2/pkg/engine/datasource/grpc_datasource/grpc_datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func Benchmark_DataSource_Load(b *testing.B) {
compiler, err := NewProtoCompiler(grpctest.MustProtoSchema(b), testMapping())
require.NoError(b, err)

ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -81,7 +81,7 @@ func Benchmark_DataSource_Load_WithFieldArguments(b *testing.B) {

const subgraphName = "Products"

ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: subgraphName,
Expand Down Expand Up @@ -192,7 +192,7 @@ func Test_DataSource_Load(t *testing.T) {
}

mi := mockInterface{}
ds, err := NewDataSource(mi, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(mi), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -247,7 +247,7 @@ func Test_DataSource_Load_WithMockService(t *testing.T) {
}

// 2. Create a datasource with the real gRPC client connection
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -337,7 +337,7 @@ func Test_DataSource_Load_WithRecursiveInputType(t *testing.T) {
t.Fatalf("failed to compile proto: %v", err)
}

ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -388,7 +388,7 @@ func Test_DataSource_Load_WithMockService_WithResponseMapping(t *testing.T) {
}

// 2. Create a datasource with the real gRPC client connection
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -489,7 +489,7 @@ func Test_DataSource_Load_WithGrpcError(t *testing.T) {
}

// 3. Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -837,7 +837,7 @@ func Test_DataSource_Load_WithAnimalInterface(t *testing.T) {
}

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -1107,7 +1107,7 @@ func Test_Datasource_Load_WithUnionTypes(t *testing.T) {
}

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -1243,7 +1243,7 @@ func Test_DataSource_Load_WithCategoryQueries(t *testing.T) {
}

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -1323,7 +1323,7 @@ func Test_DataSource_Load_WithTotalCalculation(t *testing.T) {
}

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -1413,7 +1413,7 @@ func Test_DataSource_Load_WithTypename(t *testing.T) {
}

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -1882,7 +1882,7 @@ func Test_DataSource_Load_WithAliases(t *testing.T) {
}

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -2260,7 +2260,7 @@ func Test_DataSource_Load_WithNullableFieldsType(t *testing.T) {
}

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -3561,7 +3561,7 @@ func Test_DataSource_Load_WithNestedLists(t *testing.T) {
}

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -4795,7 +4795,7 @@ func Test_Datasource_Load_WithFieldResolvers(t *testing.T) {
}

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -5004,7 +5004,7 @@ func Test_Datasource_Load_WithHeaders(t *testing.T) {
require.NoError(t, err)

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down Expand Up @@ -5055,7 +5055,7 @@ func Test_Datasource_Load_PreservesExistingContextMetadata(t *testing.T) {
require.NoError(t, err)

// Create the datasource
ds, err := NewDataSource(conn, DataSourceConfig{
ds, err := NewDataSource(NewGRPCTransport(conn), DataSourceConfig{
Operation: &queryDoc,
Definition: &schemaDoc,
SubgraphName: "Products",
Expand Down
36 changes: 36 additions & 0 deletions v2/pkg/engine/datasource/grpc_datasource/transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package grpcdatasource

import (
"context"
"errors"

"google.golang.org/grpc"
protoref "google.golang.org/protobuf/reflect/protoreflect"
)

// RPCTransport abstracts the transport protocol for RPC calls.
// Both gRPC and Connect protocol implement this interface.
type RPCTransport interface {
Invoke(ctx context.Context, methodFullName string, input, output protoref.Message) error
}

// grpcTransport wraps grpc.ClientConnInterface to implement RPCTransport.
type grpcTransport struct {
cc grpc.ClientConnInterface
}

// NewGRPCTransport creates an RPCTransport that delegates to a gRPC ClientConnInterface.
func NewGRPCTransport(cc grpc.ClientConnInterface) RPCTransport {
return &grpcTransport{cc: cc}
}

func (t *grpcTransport) Invoke(ctx context.Context, method string, input, output protoref.Message) error {
if t.cc == nil {
return errors.New("grpc transport: nil client connection")
}
// grpc.ClientConnInterface.Invoke accepts (ctx, method, args any, reply any, opts ...grpc.CallOption).
// protoref.Message satisfies the any constraint; variadic opts can be omitted.
// This wrapper intentionally does not forward grpc.CallOption, as RPCTransport
// is protocol-agnostic. The existing grpc_datasource code does not use any CallOption at the Invoke site.
return t.cc.Invoke(ctx, method, input, output)
}
54 changes: 54 additions & 0 deletions v2/pkg/engine/datasource/grpc_datasource/transport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package grpcdatasource

import (
"context"
"testing"

"github.com/stretchr/testify/require"
protoref "google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/dynamicpb"

"github.com/wundergraph/graphql-go-tools/v2/pkg/grpctest"
)

// newTestCompiler builds an RPCCompiler bound to the grpctest fixture.
// It is shared by every transport-level test in this package.
func newTestCompiler(t *testing.T) *RPCCompiler {
t.Helper()
compiler, err := NewProtoCompiler(grpctest.MustProtoSchema(t), testMapping())
require.NoError(t, err)
return compiler
}

// findMessageDesc resolves a fully-qualified message name from the compiled
// proto document. Used by tests to construct dynamicpb.Message instances
// for transport.Invoke calls without depending on the generated Go types.
func findMessageDesc(t *testing.T, compiler *RPCCompiler, fullName string) protoref.MessageDescriptor {
t.Helper()
for _, m := range compiler.doc.Messages {
if string(m.Desc.FullName()) == fullName {
return m.Desc
}
}
t.Fatalf("message %q not found in proto document", fullName)
return nil
}

// TestGRPCTransport_Invoke is a smoke test for the gRPC RPCTransport
// implementation; it goes through the data source's mockInterface (defined
// in grpc_datasource_test.go) so the assertion is just that Invoke returns
// no error for a well-formed request.
func TestGRPCTransport_Invoke(t *testing.T) {
mi := mockInterface{}
transport := NewGRPCTransport(mi)

compiler := newTestCompiler(t)
reqDesc := findMessageDesc(t, compiler, "productv1.QueryComplexFilterTypeRequest")
respDesc := findMessageDesc(t, compiler, "productv1.QueryComplexFilterTypeResponse")

inputMsg := dynamicpb.NewMessage(reqDesc)
outputMsg := dynamicpb.NewMessage(respDesc)

err := transport.Invoke(context.Background(), "/productv1.ProductService/QueryComplexFilterType", inputMsg, outputMsg)
require.NoError(t, err)
}
Loading