From 25bed0cd788f4f07533ac528174e001fb1877f42 Mon Sep 17 00:00:00 2001 From: Oleg Bespalov Date: Thu, 7 Sep 2023 12:19:45 +0200 Subject: [PATCH 1/3] Fix for the google wrappers support in unary calls and tests for it --- grpc/client_test.go | 96 +++++++- grpc/testdata/wrappers_testing/service.go | 60 +++++ grpc/testdata/wrappers_testing/test.pb.go | 98 ++++++++ grpc/testdata/wrappers_testing/test.proto | 17 ++ .../testdata/wrappers_testing/test_grpc.pb.go | 210 ++++++++++++++++++ lib/netext/grpcext/conn.go | 2 +- 6 files changed, 478 insertions(+), 5 deletions(-) create mode 100644 grpc/testdata/wrappers_testing/service.go create mode 100644 grpc/testdata/wrappers_testing/test.pb.go create mode 100644 grpc/testdata/wrappers_testing/test.proto create mode 100644 grpc/testdata/wrappers_testing/test_grpc.pb.go diff --git a/grpc/client_test.go b/grpc/client_test.go index a7f87a4..4f8585c 100644 --- a/grpc/client_test.go +++ b/grpc/client_test.go @@ -9,26 +9,29 @@ import ( "strings" "testing" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/reflection" + "google.golang.org/grpc/status" "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoregistry" + "google.golang.org/protobuf/types/known/wrapperspb" "github.com/golang/protobuf/ptypes/any" + "github.com/golang/protobuf/ptypes/wrappers" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "go.k6.io/k6/lib/testutils/httpmultibin" grpcanytesting "go.k6.io/k6/lib/testutils/httpmultibin/grpc_any_testing" "go.k6.io/k6/lib/testutils/httpmultibin/grpc_testing" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" + "go.k6.io/k6/metrics" "google.golang.org/grpc/metadata" v1alphagrpc "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" grpcstats "google.golang.org/grpc/stats" - "google.golang.org/grpc/status" xk6grpc "github.com/grafana/xk6-grpc/grpc" + "github.com/grafana/xk6-grpc/grpc/testdata/wrappers_testing" "github.com/grafana/xk6-grpc/lib/netext/grpcext" - "go.k6.io/k6/metrics" ) func TestClient(t *testing.T) { @@ -737,6 +740,91 @@ func TestClient(t *testing.T) { err: "no gRPC connection", }, }, + { + name: "Wrappers", + setup: func(hb *httpmultibin.HTTPMultiBin) { + srv := wrappers_testing.Register(hb.ServerGRPC) + + srv.TestStringImplementation = func(_ context.Context, sv *wrappers.StringValue) (*wrappers.StringValue, error) { + return &wrapperspb.StringValue{ + Value: "hey " + sv.Value, + }, nil + } + }, + initString: codeBlock{ + code: ` + const client = new grpc.Client(); + client.load([], "../grpc/testdata/wrappers_testing/test.proto"); + `, + }, + vuString: codeBlock{ + code: ` + client.connect("GRPCBIN_ADDR"); + + let respString = client.invoke("grpc.wrappers.testing.Service/TestString", "John") + if (respString.message !== "hey John") { + throw new Error("expected to get 'hey John', but got a " + respString.message) + } + `, + }, + }, + { + name: "WrappersWithReflection", + setup: func(hb *httpmultibin.HTTPMultiBin) { + reflection.Register(hb.ServerGRPC) + + srv := wrappers_testing.Register(hb.ServerGRPC) + + srv.TestIntegerImplementation = func(_ context.Context, iv *wrappers.Int64Value) (*wrappers.Int64Value, error) { + return &wrappers.Int64Value{ + Value: 2 * iv.Value, + }, nil + } + + srv.TestStringImplementation = func(_ context.Context, sv *wrappers.StringValue) (*wrappers.StringValue, error) { + return &wrapperspb.StringValue{ + Value: "hey " + sv.Value, + }, nil + } + + srv.TestBooleanImplementation = func(_ context.Context, bv *wrappers.BoolValue) (*wrappers.BoolValue, error) { + return &wrapperspb.BoolValue{ + Value: bv.Value != true, + }, nil + } + + srv.TestDoubleImplementation = func(_ context.Context, bv *wrappers.DoubleValue) (*wrappers.DoubleValue, error) { + return &wrapperspb.DoubleValue{ + Value: bv.Value * 2, + }, nil + } + }, + initString: codeBlock{ + code: ` + const client = new grpc.Client(); + `, + }, + vuString: codeBlock{ + code: ` + client.connect("GRPCBIN_ADDR", {reflect: true}); + + let respString = client.invoke("grpc.wrappers.testing.Service/TestString", "John") + if (respString.message !== "hey John") { + throw new Error("expected to get 'hey John', but got a " + respString.message) + } + + let respInt = client.invoke("grpc.wrappers.testing.Service/TestInteger", "3") + if (respInt.message !== "6") { + throw new Error("expected to get '6', but got a " + respInt.message) + } + + let respDouble = client.invoke("grpc.wrappers.testing.Service/TestDouble", "2.7") + if (respDouble.message !== 5.4) { + throw new Error("expected to get '5.4', but got a " + respDouble.message) + } + `, + }, + }, } for _, tt := range tests { diff --git a/grpc/testdata/wrappers_testing/service.go b/grpc/testdata/wrappers_testing/service.go new file mode 100644 index 0000000..b11b5ed --- /dev/null +++ b/grpc/testdata/wrappers_testing/service.go @@ -0,0 +1,60 @@ +package wrappers_testing + +import ( + context "context" + + wrappers "github.com/golang/protobuf/ptypes/wrappers" + grpc "google.golang.org/grpc" +) + +//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative test.proto + +// Register registers a test service that could be used for the testing gRPC wrappers +func Register(r grpc.ServiceRegistrar) *service { + s := &service{} + + RegisterServiceServer(r, s) + + return s +} + +type service struct { + UnimplementedServiceServer + + TestStringImplementation func(context.Context, *wrappers.StringValue) (*wrappers.StringValue, error) + TestIntegerImplementation func(context.Context, *wrappers.Int64Value) (*wrappers.Int64Value, error) + TestBooleanImplementation func(context.Context, *wrappers.BoolValue) (*wrappers.BoolValue, error) + TestDoubleImplementation func(context.Context, *wrappers.DoubleValue) (*wrappers.DoubleValue, error) +} + +func (s *service) TestString(ctx context.Context, in *wrappers.StringValue) (*wrappers.StringValue, error) { + if s.TestStringImplementation != nil { + return s.TestStringImplementation(ctx, in) + } + + return s.UnimplementedServiceServer.TestString(ctx, in) +} + +func (s *service) TestInteger(ctx context.Context, in *wrappers.Int64Value) (*wrappers.Int64Value, error) { + if s.TestIntegerImplementation != nil { + return s.TestIntegerImplementation(ctx, in) + } + + return s.UnimplementedServiceServer.TestInteger(ctx, in) +} + +func (s *service) TestBoolean(ctx context.Context, in *wrappers.BoolValue) (*wrappers.BoolValue, error) { + if s.TestBooleanImplementation != nil { + return s.TestBooleanImplementation(ctx, in) + } + + return s.UnimplementedServiceServer.TestBoolean(ctx, in) +} + +func (s *service) TestDouble(ctx context.Context, in *wrappers.DoubleValue) (*wrappers.DoubleValue, error) { + if s.TestBooleanImplementation != nil { + return s.TestDoubleImplementation(ctx, in) + } + + return s.UnimplementedServiceServer.TestDouble(ctx, in) +} diff --git a/grpc/testdata/wrappers_testing/test.pb.go b/grpc/testdata/wrappers_testing/test.pb.go new file mode 100644 index 0000000..591cb13 --- /dev/null +++ b/grpc/testdata/wrappers_testing/test.pb.go @@ -0,0 +1,98 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.12.4 +// source: test.proto + +package wrappers_testing + +import ( + wrappers "github.com/golang/protobuf/ptypes/wrappers" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +var File_test_proto protoreflect.FileDescriptor + +var file_test_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x15, 0x67, 0x72, + 0x70, 0x63, 0x2e, 0x77, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x73, 0x2e, 0x74, 0x65, 0x73, 0x74, + 0x69, 0x6e, 0x67, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x77, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x73, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x32, 0xad, 0x02, 0x0a, 0x07, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, + 0x48, 0x0a, 0x0a, 0x54, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x1c, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x1c, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, + 0x72, 0x69, 0x6e, 0x67, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x47, 0x0a, 0x0b, 0x54, 0x65, 0x73, + 0x74, 0x49, 0x6e, 0x74, 0x65, 0x67, 0x65, 0x72, 0x12, 0x1b, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x49, 0x6e, 0x74, 0x36, 0x34, + 0x56, 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x1b, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x49, 0x6e, 0x74, 0x36, 0x34, 0x56, 0x61, 0x6c, + 0x75, 0x65, 0x12, 0x45, 0x0a, 0x0b, 0x54, 0x65, 0x73, 0x74, 0x42, 0x6f, 0x6f, 0x6c, 0x65, 0x61, + 0x6e, 0x12, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x42, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x1a, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x42, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x48, 0x0a, 0x0a, 0x54, 0x65, 0x73, + 0x74, 0x44, 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x12, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x6f, 0x75, 0x62, 0x6c, 0x65, + 0x56, 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x56, 0x61, + 0x6c, 0x75, 0x65, 0x42, 0x14, 0x5a, 0x12, 0x2e, 0x2f, 0x77, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, + 0x73, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, +} + +var file_test_proto_goTypes = []interface{}{ + (*wrappers.StringValue)(nil), // 0: google.protobuf.StringValue + (*wrappers.Int64Value)(nil), // 1: google.protobuf.Int64Value + (*wrappers.BoolValue)(nil), // 2: google.protobuf.BoolValue + (*wrappers.DoubleValue)(nil), // 3: google.protobuf.DoubleValue +} +var file_test_proto_depIdxs = []int32{ + 0, // 0: grpc.wrappers.testing.Service.TestString:input_type -> google.protobuf.StringValue + 1, // 1: grpc.wrappers.testing.Service.TestInteger:input_type -> google.protobuf.Int64Value + 2, // 2: grpc.wrappers.testing.Service.TestBoolean:input_type -> google.protobuf.BoolValue + 3, // 3: grpc.wrappers.testing.Service.TestDouble:input_type -> google.protobuf.DoubleValue + 0, // 4: grpc.wrappers.testing.Service.TestString:output_type -> google.protobuf.StringValue + 1, // 5: grpc.wrappers.testing.Service.TestInteger:output_type -> google.protobuf.Int64Value + 2, // 6: grpc.wrappers.testing.Service.TestBoolean:output_type -> google.protobuf.BoolValue + 3, // 7: grpc.wrappers.testing.Service.TestDouble:output_type -> google.protobuf.DoubleValue + 4, // [4:8] is the sub-list for method output_type + 0, // [0:4] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_test_proto_init() } +func file_test_proto_init() { + if File_test_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_test_proto_rawDesc, + NumEnums: 0, + NumMessages: 0, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_test_proto_goTypes, + DependencyIndexes: file_test_proto_depIdxs, + }.Build() + File_test_proto = out.File + file_test_proto_rawDesc = nil + file_test_proto_goTypes = nil + file_test_proto_depIdxs = nil +} diff --git a/grpc/testdata/wrappers_testing/test.proto b/grpc/testdata/wrappers_testing/test.proto new file mode 100644 index 0000000..31dcb69 --- /dev/null +++ b/grpc/testdata/wrappers_testing/test.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package grpc.wrappers.testing; + +// this proto contains service that helps tests some of well-known types or wrappers +// https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/wrappers.proto + +import "google/protobuf/wrappers.proto"; + +option go_package ="./wrappers_testing"; + +service Service { + rpc TestString(google.protobuf.StringValue) returns (google.protobuf.StringValue); + rpc TestInteger(google.protobuf.Int64Value) returns (google.protobuf.Int64Value); + rpc TestBoolean(google.protobuf.BoolValue) returns (google.protobuf.BoolValue); + rpc TestDouble(google.protobuf.DoubleValue) returns (google.protobuf.DoubleValue); +} diff --git a/grpc/testdata/wrappers_testing/test_grpc.pb.go b/grpc/testdata/wrappers_testing/test_grpc.pb.go new file mode 100644 index 0000000..0a231c6 --- /dev/null +++ b/grpc/testdata/wrappers_testing/test_grpc.pb.go @@ -0,0 +1,210 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package wrappers_testing + +import ( + context "context" + wrappers "github.com/golang/protobuf/ptypes/wrappers" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// ServiceClient is the client API for Service service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type ServiceClient interface { + TestString(ctx context.Context, in *wrappers.StringValue, opts ...grpc.CallOption) (*wrappers.StringValue, error) + TestInteger(ctx context.Context, in *wrappers.Int64Value, opts ...grpc.CallOption) (*wrappers.Int64Value, error) + TestBoolean(ctx context.Context, in *wrappers.BoolValue, opts ...grpc.CallOption) (*wrappers.BoolValue, error) + TestDouble(ctx context.Context, in *wrappers.DoubleValue, opts ...grpc.CallOption) (*wrappers.DoubleValue, error) +} + +type serviceClient struct { + cc grpc.ClientConnInterface +} + +func NewServiceClient(cc grpc.ClientConnInterface) ServiceClient { + return &serviceClient{cc} +} + +func (c *serviceClient) TestString(ctx context.Context, in *wrappers.StringValue, opts ...grpc.CallOption) (*wrappers.StringValue, error) { + out := new(wrappers.StringValue) + err := c.cc.Invoke(ctx, "/grpc.wrappers.testing.Service/TestString", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *serviceClient) TestInteger(ctx context.Context, in *wrappers.Int64Value, opts ...grpc.CallOption) (*wrappers.Int64Value, error) { + out := new(wrappers.Int64Value) + err := c.cc.Invoke(ctx, "/grpc.wrappers.testing.Service/TestInteger", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *serviceClient) TestBoolean(ctx context.Context, in *wrappers.BoolValue, opts ...grpc.CallOption) (*wrappers.BoolValue, error) { + out := new(wrappers.BoolValue) + err := c.cc.Invoke(ctx, "/grpc.wrappers.testing.Service/TestBoolean", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *serviceClient) TestDouble(ctx context.Context, in *wrappers.DoubleValue, opts ...grpc.CallOption) (*wrappers.DoubleValue, error) { + out := new(wrappers.DoubleValue) + err := c.cc.Invoke(ctx, "/grpc.wrappers.testing.Service/TestDouble", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// ServiceServer is the server API for Service service. +// All implementations must embed UnimplementedServiceServer +// for forward compatibility +type ServiceServer interface { + TestString(context.Context, *wrappers.StringValue) (*wrappers.StringValue, error) + TestInteger(context.Context, *wrappers.Int64Value) (*wrappers.Int64Value, error) + TestBoolean(context.Context, *wrappers.BoolValue) (*wrappers.BoolValue, error) + TestDouble(context.Context, *wrappers.DoubleValue) (*wrappers.DoubleValue, error) + mustEmbedUnimplementedServiceServer() +} + +// UnimplementedServiceServer must be embedded to have forward compatible implementations. +type UnimplementedServiceServer struct { +} + +func (UnimplementedServiceServer) TestString(context.Context, *wrappers.StringValue) (*wrappers.StringValue, error) { + return nil, status.Errorf(codes.Unimplemented, "method TestString not implemented") +} +func (UnimplementedServiceServer) TestInteger(context.Context, *wrappers.Int64Value) (*wrappers.Int64Value, error) { + return nil, status.Errorf(codes.Unimplemented, "method TestInteger not implemented") +} +func (UnimplementedServiceServer) TestBoolean(context.Context, *wrappers.BoolValue) (*wrappers.BoolValue, error) { + return nil, status.Errorf(codes.Unimplemented, "method TestBoolean not implemented") +} +func (UnimplementedServiceServer) TestDouble(context.Context, *wrappers.DoubleValue) (*wrappers.DoubleValue, error) { + return nil, status.Errorf(codes.Unimplemented, "method TestDouble not implemented") +} +func (UnimplementedServiceServer) mustEmbedUnimplementedServiceServer() {} + +// UnsafeServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to ServiceServer will +// result in compilation errors. +type UnsafeServiceServer interface { + mustEmbedUnimplementedServiceServer() +} + +func RegisterServiceServer(s grpc.ServiceRegistrar, srv ServiceServer) { + s.RegisterService(&Service_ServiceDesc, srv) +} + +func _Service_TestString_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(wrappers.StringValue) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ServiceServer).TestString(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/grpc.wrappers.testing.Service/TestString", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ServiceServer).TestString(ctx, req.(*wrappers.StringValue)) + } + return interceptor(ctx, in, info, handler) +} + +func _Service_TestInteger_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(wrappers.Int64Value) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ServiceServer).TestInteger(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/grpc.wrappers.testing.Service/TestInteger", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ServiceServer).TestInteger(ctx, req.(*wrappers.Int64Value)) + } + return interceptor(ctx, in, info, handler) +} + +func _Service_TestBoolean_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(wrappers.BoolValue) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ServiceServer).TestBoolean(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/grpc.wrappers.testing.Service/TestBoolean", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ServiceServer).TestBoolean(ctx, req.(*wrappers.BoolValue)) + } + return interceptor(ctx, in, info, handler) +} + +func _Service_TestDouble_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(wrappers.DoubleValue) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ServiceServer).TestDouble(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/grpc.wrappers.testing.Service/TestDouble", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ServiceServer).TestDouble(ctx, req.(*wrappers.DoubleValue)) + } + return interceptor(ctx, in, info, handler) +} + +// Service_ServiceDesc is the grpc.ServiceDesc for Service service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Service_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "grpc.wrappers.testing.Service", + HandlerType: (*ServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "TestString", + Handler: _Service_TestString_Handler, + }, + { + MethodName: "TestInteger", + Handler: _Service_TestInteger_Handler, + }, + { + MethodName: "TestBoolean", + Handler: _Service_TestBoolean_Handler, + }, + { + MethodName: "TestDouble", + Handler: _Service_TestDouble_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "test.proto", +} diff --git a/lib/netext/grpcext/conn.go b/lib/netext/grpcext/conn.go index e63c820..d2d382d 100644 --- a/lib/netext/grpcext/conn.go +++ b/lib/netext/grpcext/conn.go @@ -169,7 +169,7 @@ func (c *Conn) Invoke( // rather than the desired: // {"x":6,"y":4,"z":0} raw, _ := marshaler.Marshal(resp) - msg := make(map[string]interface{}) + var msg interface{} _ = json.Unmarshal(raw, &msg) response.Message = msg } From d5e187f8b01766e6fd91fc16b29ac0b7ee8ae011 Mon Sep 17 00:00:00 2001 From: Oleg Bespalov Date: Thu, 7 Sep 2023 13:46:02 +0200 Subject: [PATCH 2/3] google wrappers support for streams --- grpc/stream.go | 11 +-- grpc/stream_test.go | 78 ++++++++++++++++++- grpc/testdata/wrappers_testing/service.go | 9 +++ grpc/testdata/wrappers_testing/test.pb.go | 26 ++++--- grpc/testdata/wrappers_testing/test.proto | 2 + .../testdata/wrappers_testing/test_grpc.pb.go | 73 ++++++++++++++++- lib/netext/grpcext/stream.go | 8 +- 7 files changed, 186 insertions(+), 21 deletions(-) diff --git a/grpc/stream.go b/grpc/stream.go index 5b43fc2..e4ce159 100644 --- a/grpc/stream.go +++ b/grpc/stream.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "reflect" "sync" "time" @@ -140,7 +141,7 @@ func (s *stream) loop() { } } -func (s *stream) queueMessage(msg map[string]interface{}) { +func (s *stream) queueMessage(msg interface{}) { metrics.PushIfNotDone(s.vu.Context(), s.vu.State().Samples, metrics.Sample{ TimeSeries: metrics.TimeSeries{ Metric: s.instanceMetrics.StreamsMessagesReceived, @@ -184,10 +185,6 @@ func (s *stream) readData(wg *sync.WaitGroup) { return } - if len(msg) > 0 { - s.queueMessage(msg) - } - if isRegularClosing(err) { s.logger.WithError(err).Debug("stream is cancelled/finished") @@ -197,6 +194,10 @@ func (s *stream) readData(wg *sync.WaitGroup) { return } + + if msg != nil || !reflect.ValueOf(msg).IsNil() { + s.queueMessage(msg) + } } } diff --git a/grpc/stream_test.go b/grpc/stream_test.go index 5fff7a8..f96135d 100644 --- a/grpc/stream_test.go +++ b/grpc/stream_test.go @@ -2,14 +2,21 @@ package grpc_test import ( "context" + "errors" + "io" + "strings" "testing" "time" - "github.com/grafana/xk6-grpc/grpc/testutils/grpcservice" + "github.com/dop251/goja" + "github.com/golang/protobuf/ptypes/wrappers" "github.com/stretchr/testify/assert" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + + "github.com/grafana/xk6-grpc/grpc/testdata/wrappers_testing" + "github.com/grafana/xk6-grpc/grpc/testutils/grpcservice" ) func TestStream_InvalidHeader(t *testing.T) { @@ -287,3 +294,72 @@ func (s *featureExplorerStub) ListFeatures(rect *grpcservice.Rectangle, stream g return status.Errorf(codes.Unimplemented, "method ListFeatures not implemented") } + +func TestStream_Wrappers(t *testing.T) { + t.Parallel() + + ts := newTestState(t) + + stub := wrappers_testing.Register(ts.httpBin.ServerGRPC) + stub.TestStreamImplementation = func(stream wrappers_testing.Service_TestStreamServer) error { + result := "" + + for { + msg, err := stream.Recv() + if errors.Is(err, io.EOF) { + return stream.SendAndClose(&wrappers.StringValue{ + Value: strings.TrimRight(result, " "), + }) + } + + if err != nil { + return err + } + + result += msg.Value + " " + } + } + + replace := func(code string) (goja.Value, error) { + return ts.VU.Runtime().RunString(ts.httpBin.Replacer.Replace(code)) + } + + initString := codeBlock{ + code: ` + var client = new grpc.Client(); + client.load([], "../grpc/testdata/wrappers_testing/test.proto");`, + } + vuString := codeBlock{ + code: ` + client.connect("GRPCBIN_ADDR"); + let stream = new grpc.Stream(client, "grpc.wrappers.testing.Service/TestStream"); + stream.on('data', function (data) { + call('Result: ' + data); + }) + + stream.write('Hey'); + stream.write('John'); + stream.end(); + + stream.on('error', function (e) { + call('Code: ' + e.code + ' Message: ' + e.message); + }); + `, + } + + val, err := replace(initString.code) + assertResponse(t, initString, err, val, ts) + + ts.ToVUContext() + + val, err = replace(vuString.code) + + ts.EventLoop.WaitOnRegistered() + + assertResponse(t, vuString, err, val, ts) + + assert.Equal(t, ts.callRecorder.Recorded(), []string{ + "Result: Hey John", + }, + ) +} diff --git a/grpc/testdata/wrappers_testing/service.go b/grpc/testdata/wrappers_testing/service.go index b11b5ed..ed44777 100644 --- a/grpc/testdata/wrappers_testing/service.go +++ b/grpc/testdata/wrappers_testing/service.go @@ -25,6 +25,7 @@ type service struct { TestIntegerImplementation func(context.Context, *wrappers.Int64Value) (*wrappers.Int64Value, error) TestBooleanImplementation func(context.Context, *wrappers.BoolValue) (*wrappers.BoolValue, error) TestDoubleImplementation func(context.Context, *wrappers.DoubleValue) (*wrappers.DoubleValue, error) + TestStreamImplementation func(Service_TestStreamServer) error } func (s *service) TestString(ctx context.Context, in *wrappers.StringValue) (*wrappers.StringValue, error) { @@ -58,3 +59,11 @@ func (s *service) TestDouble(ctx context.Context, in *wrappers.DoubleValue) (*wr return s.UnimplementedServiceServer.TestDouble(ctx, in) } + +func (s *service) TestStream(stream Service_TestStreamServer) error { + if s.TestStreamImplementation != nil { + return s.TestStreamImplementation(stream) + } + + return s.UnimplementedServiceServer.TestStream(stream) +} diff --git a/grpc/testdata/wrappers_testing/test.pb.go b/grpc/testdata/wrappers_testing/test.pb.go index 591cb13..6b9eaf6 100644 --- a/grpc/testdata/wrappers_testing/test.pb.go +++ b/grpc/testdata/wrappers_testing/test.pb.go @@ -27,7 +27,7 @@ var file_test_proto_rawDesc = []byte{ 0x70, 0x63, 0x2e, 0x77, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x73, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x77, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x73, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x32, 0xad, 0x02, 0x0a, 0x07, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, + 0x6f, 0x74, 0x6f, 0x32, 0xf9, 0x02, 0x0a, 0x07, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x48, 0x0a, 0x0a, 0x54, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x1c, 0x2e, 0x67, 0x6f, @@ -46,9 +46,13 @@ var file_test_proto_rawDesc = []byte{ 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x6f, 0x75, 0x62, 0x6c, 0x65, 0x56, 0x61, - 0x6c, 0x75, 0x65, 0x42, 0x14, 0x5a, 0x12, 0x2e, 0x2f, 0x77, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, - 0x73, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x6c, 0x75, 0x65, 0x12, 0x4a, 0x0a, 0x0a, 0x54, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x12, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x1a, + 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x28, 0x01, 0x42, + 0x14, 0x5a, 0x12, 0x2e, 0x2f, 0x77, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x73, 0x5f, 0x74, 0x65, + 0x73, 0x74, 0x69, 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var file_test_proto_goTypes = []interface{}{ @@ -62,12 +66,14 @@ var file_test_proto_depIdxs = []int32{ 1, // 1: grpc.wrappers.testing.Service.TestInteger:input_type -> google.protobuf.Int64Value 2, // 2: grpc.wrappers.testing.Service.TestBoolean:input_type -> google.protobuf.BoolValue 3, // 3: grpc.wrappers.testing.Service.TestDouble:input_type -> google.protobuf.DoubleValue - 0, // 4: grpc.wrappers.testing.Service.TestString:output_type -> google.protobuf.StringValue - 1, // 5: grpc.wrappers.testing.Service.TestInteger:output_type -> google.protobuf.Int64Value - 2, // 6: grpc.wrappers.testing.Service.TestBoolean:output_type -> google.protobuf.BoolValue - 3, // 7: grpc.wrappers.testing.Service.TestDouble:output_type -> google.protobuf.DoubleValue - 4, // [4:8] is the sub-list for method output_type - 0, // [0:4] is the sub-list for method input_type + 0, // 4: grpc.wrappers.testing.Service.TestStream:input_type -> google.protobuf.StringValue + 0, // 5: grpc.wrappers.testing.Service.TestString:output_type -> google.protobuf.StringValue + 1, // 6: grpc.wrappers.testing.Service.TestInteger:output_type -> google.protobuf.Int64Value + 2, // 7: grpc.wrappers.testing.Service.TestBoolean:output_type -> google.protobuf.BoolValue + 3, // 8: grpc.wrappers.testing.Service.TestDouble:output_type -> google.protobuf.DoubleValue + 0, // 9: grpc.wrappers.testing.Service.TestStream:output_type -> google.protobuf.StringValue + 5, // [5:10] is the sub-list for method output_type + 0, // [0:5] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/grpc/testdata/wrappers_testing/test.proto b/grpc/testdata/wrappers_testing/test.proto index 31dcb69..0f4f6a9 100644 --- a/grpc/testdata/wrappers_testing/test.proto +++ b/grpc/testdata/wrappers_testing/test.proto @@ -14,4 +14,6 @@ service Service { rpc TestInteger(google.protobuf.Int64Value) returns (google.protobuf.Int64Value); rpc TestBoolean(google.protobuf.BoolValue) returns (google.protobuf.BoolValue); rpc TestDouble(google.protobuf.DoubleValue) returns (google.protobuf.DoubleValue); + + rpc TestStream(stream google.protobuf.StringValue) returns (google.protobuf.StringValue); } diff --git a/grpc/testdata/wrappers_testing/test_grpc.pb.go b/grpc/testdata/wrappers_testing/test_grpc.pb.go index 0a231c6..b65db18 100644 --- a/grpc/testdata/wrappers_testing/test_grpc.pb.go +++ b/grpc/testdata/wrappers_testing/test_grpc.pb.go @@ -23,6 +23,7 @@ type ServiceClient interface { TestInteger(ctx context.Context, in *wrappers.Int64Value, opts ...grpc.CallOption) (*wrappers.Int64Value, error) TestBoolean(ctx context.Context, in *wrappers.BoolValue, opts ...grpc.CallOption) (*wrappers.BoolValue, error) TestDouble(ctx context.Context, in *wrappers.DoubleValue, opts ...grpc.CallOption) (*wrappers.DoubleValue, error) + TestStream(ctx context.Context, opts ...grpc.CallOption) (Service_TestStreamClient, error) } type serviceClient struct { @@ -69,6 +70,40 @@ func (c *serviceClient) TestDouble(ctx context.Context, in *wrappers.DoubleValue return out, nil } +func (c *serviceClient) TestStream(ctx context.Context, opts ...grpc.CallOption) (Service_TestStreamClient, error) { + stream, err := c.cc.NewStream(ctx, &Service_ServiceDesc.Streams[0], "/grpc.wrappers.testing.Service/TestStream", opts...) + if err != nil { + return nil, err + } + x := &serviceTestStreamClient{stream} + return x, nil +} + +type Service_TestStreamClient interface { + Send(*wrappers.StringValue) error + CloseAndRecv() (*wrappers.StringValue, error) + grpc.ClientStream +} + +type serviceTestStreamClient struct { + grpc.ClientStream +} + +func (x *serviceTestStreamClient) Send(m *wrappers.StringValue) error { + return x.ClientStream.SendMsg(m) +} + +func (x *serviceTestStreamClient) CloseAndRecv() (*wrappers.StringValue, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(wrappers.StringValue) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // ServiceServer is the server API for Service service. // All implementations must embed UnimplementedServiceServer // for forward compatibility @@ -77,6 +112,7 @@ type ServiceServer interface { TestInteger(context.Context, *wrappers.Int64Value) (*wrappers.Int64Value, error) TestBoolean(context.Context, *wrappers.BoolValue) (*wrappers.BoolValue, error) TestDouble(context.Context, *wrappers.DoubleValue) (*wrappers.DoubleValue, error) + TestStream(Service_TestStreamServer) error mustEmbedUnimplementedServiceServer() } @@ -96,6 +132,9 @@ func (UnimplementedServiceServer) TestBoolean(context.Context, *wrappers.BoolVal func (UnimplementedServiceServer) TestDouble(context.Context, *wrappers.DoubleValue) (*wrappers.DoubleValue, error) { return nil, status.Errorf(codes.Unimplemented, "method TestDouble not implemented") } +func (UnimplementedServiceServer) TestStream(Service_TestStreamServer) error { + return status.Errorf(codes.Unimplemented, "method TestStream not implemented") +} func (UnimplementedServiceServer) mustEmbedUnimplementedServiceServer() {} // UnsafeServiceServer may be embedded to opt out of forward compatibility for this service. @@ -181,6 +220,32 @@ func _Service_TestDouble_Handler(srv interface{}, ctx context.Context, dec func( return interceptor(ctx, in, info, handler) } +func _Service_TestStream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(ServiceServer).TestStream(&serviceTestStreamServer{stream}) +} + +type Service_TestStreamServer interface { + SendAndClose(*wrappers.StringValue) error + Recv() (*wrappers.StringValue, error) + grpc.ServerStream +} + +type serviceTestStreamServer struct { + grpc.ServerStream +} + +func (x *serviceTestStreamServer) SendAndClose(m *wrappers.StringValue) error { + return x.ServerStream.SendMsg(m) +} + +func (x *serviceTestStreamServer) Recv() (*wrappers.StringValue, error) { + m := new(wrappers.StringValue) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // Service_ServiceDesc is the grpc.ServiceDesc for Service service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -205,6 +270,12 @@ var Service_ServiceDesc = grpc.ServiceDesc{ Handler: _Service_TestDouble_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "TestStream", + Handler: _Service_TestStream_Handler, + ClientStreams: true, + }, + }, Metadata: "test.proto", } diff --git a/lib/netext/grpcext/stream.go b/lib/netext/grpcext/stream.go index b4ab4cb..ac9e567 100644 --- a/lib/netext/grpcext/stream.go +++ b/lib/netext/grpcext/stream.go @@ -29,7 +29,7 @@ var ErrCanceled = errors.New("canceled by client (k6)") // ReceiveConverted receives a converted message from the stream // if the stream has been closed successfully, it returns io.EOF // if the stream has been cancelled, it returns ErrCanceled -func (s *Stream) ReceiveConverted() (map[string]interface{}, error) { +func (s *Stream) ReceiveConverted() (interface{}, error) { raw, err := s.receive() if err != nil && !errors.Is(err, io.EOF) { return nil, err @@ -60,7 +60,7 @@ func (s *Stream) receive() (*dynamicpb.Message, error) { return nil, err } -// convert converts the message to the map[string]interface{} format +// convert converts the message to the interface{} // which could be returned to the JS // there is a lot of marshaling/unmarshaling here, but if we just pass the dynamic message // the default Marshaller would be used, which would strip any zero/default values from the JSON. @@ -78,7 +78,7 @@ func (s *Stream) receive() (*dynamicpb.Message, error) { // {"x":6,"y":4} // rather than the desired: // {"x":6,"y":4,"z":0} -func (s *Stream) convert(msg *dynamicpb.Message) (map[string]interface{}, error) { +func (s *Stream) convert(msg *dynamicpb.Message) (interface{}, error) { // TODO(olegbespalov): add the test that checks that message is not nil raw, err := s.marshaler.Marshal(msg) @@ -86,7 +86,7 @@ func (s *Stream) convert(msg *dynamicpb.Message) (map[string]interface{}, error) return nil, fmt.Errorf("failed to marshal the message: %w", err) } - back := make(map[string]interface{}) + var back interface{} err = json.Unmarshal(raw, &back) if err != nil { From d99c9bf203c7580c860a2bc9cdd60671feeeeb29 Mon Sep 17 00:00:00 2001 From: Oleg Bespalov Date: Thu, 7 Sep 2023 15:54:45 +0200 Subject: [PATCH 3/3] Apply suggestions from code review Co-authored-by: Mihail Stoykov <312246+mstoykov@users.noreply.github.com> --- grpc/testdata/wrappers_testing/service.go | 2 +- lib/netext/grpcext/conn.go | 22 +++++----------------- lib/netext/grpcext/stream.go | 6 +++--- 3 files changed, 9 insertions(+), 21 deletions(-) diff --git a/grpc/testdata/wrappers_testing/service.go b/grpc/testdata/wrappers_testing/service.go index ed44777..52e38e0 100644 --- a/grpc/testdata/wrappers_testing/service.go +++ b/grpc/testdata/wrappers_testing/service.go @@ -53,7 +53,7 @@ func (s *service) TestBoolean(ctx context.Context, in *wrappers.BoolValue) (*wra } func (s *service) TestDouble(ctx context.Context, in *wrappers.DoubleValue) (*wrappers.DoubleValue, error) { - if s.TestBooleanImplementation != nil { + if s.TestDoubleImplementation != nil { return s.TestDoubleImplementation(ctx, in) } diff --git a/lib/netext/grpcext/conn.go b/lib/netext/grpcext/conn.go index d2d382d..5e9f9e0 100644 --- a/lib/netext/grpcext/conn.go +++ b/lib/netext/grpcext/conn.go @@ -154,23 +154,11 @@ func (c *Conn) Invoke( } if resp != nil { - // (rogchap) there is a lot of marshaling/unmarshaling here, but if we just pass the dynamic message - // the default Marshaller would be used, which would strip any zero/default values from the JSON. - // eg. given this message: - // message Point { - // double x = 1; - // double y = 2; - // double z = 3; - // } - // and a value like this: - // msg := Point{X: 6, Y: 4, Z: 0} - // would result in JSON output: - // {"x":6,"y":4} - // rather than the desired: - // {"x":6,"y":4,"z":0} - raw, _ := marshaler.Marshal(resp) - var msg interface{} - _ = json.Unmarshal(raw, &msg) + msg, err := convert(marshaler, resp) + if err != nil { + return nil, fmt.Errorf("unable to convert response object to JSON: %w", err) + } + response.Message = msg } return &response, nil diff --git a/lib/netext/grpcext/stream.go b/lib/netext/grpcext/stream.go index ac9e567..681a990 100644 --- a/lib/netext/grpcext/stream.go +++ b/lib/netext/grpcext/stream.go @@ -35,7 +35,7 @@ func (s *Stream) ReceiveConverted() (interface{}, error) { return nil, err } - msg, errConv := s.convert(raw) + msg, errConv := convert(s.marshaler, raw) if errConv != nil { return nil, errConv } @@ -78,10 +78,10 @@ func (s *Stream) receive() (*dynamicpb.Message, error) { // {"x":6,"y":4} // rather than the desired: // {"x":6,"y":4,"z":0} -func (s *Stream) convert(msg *dynamicpb.Message) (interface{}, error) { +func convert(marshaler protojson.MarshalOptions, msg *dynamicpb.Message) (interface{}, error) { // TODO(olegbespalov): add the test that checks that message is not nil - raw, err := s.marshaler.Marshal(msg) + raw, err := marshaler.Marshal(msg) if err != nil { return nil, fmt.Errorf("failed to marshal the message: %w", err) }