Skip to content

Commit

Permalink
add version into rpc header (#212)
Browse files Browse the repository at this point in the history
* add version into rpc header

* Update yarpc.go

* fix build
  • Loading branch information
jc-fireball authored Aug 22, 2017
1 parent 60eb6b6 commit 6024915
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 43 deletions.
6 changes: 6 additions & 0 deletions connectors/yarpc/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pkg/errors"
"github.com/uber-go/dosa"
dosarpc "github.com/uber/dosa-idl/.gen/dosa"
rpc "go.uber.org/yarpc"
)

// RawValueAsInterface converts a value from the wire to an object implementing the interface
Expand Down Expand Up @@ -332,3 +333,8 @@ func fieldValueMapFromClientMap(values map[string]dosa.FieldValue) (dosarpc.Fiel
}
return fields, nil
}

// VersionHeader returns the rpc style version header
func VersionHeader() rpc.CallOption {
return rpc.WithHeader(_version, dosa.VERSION)
}
36 changes: 19 additions & 17 deletions connectors/yarpc/yarpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
)

const (
_version = "version"
_defaultServiceName = "dosa-gateway"
errCodeNotFound int32 = 404
errCodeAlreadyExists int32 = 409
Expand Down Expand Up @@ -219,7 +220,8 @@ func (c *Connector) CreateIfNotExists(ctx context.Context, ei *dosa.EntityInfo,
Ref: entityInfoToSchemaRef(ei),
EntityValues: ev,
}
err = c.Client.CreateIfNotExists(ctx, &createRequest)

err = c.Client.CreateIfNotExists(ctx, &createRequest, VersionHeader())
if err != nil {
if be, ok := err.(*dosarpc.BadRequestError); ok {
if be.ErrorCode != nil && *be.ErrorCode == errCodeAlreadyExists {
Expand All @@ -240,7 +242,7 @@ func (c *Connector) Upsert(ctx context.Context, ei *dosa.EntityInfo, values map[
Ref: entityInfoToSchemaRef(ei),
EntityValues: ev,
}
return c.Client.Upsert(ctx, &upsertRequest)
return c.Client.Upsert(ctx, &upsertRequest, VersionHeader())
}

// Read reads a single entity
Expand Down Expand Up @@ -275,7 +277,7 @@ func (c *Connector) Read(ctx context.Context, ei *dosa.EntityInfo, keys map[stri
FieldsToRead: rpcMinimumFields,
}

response, err := c.Client.Read(ctx, readRequest)
response, err := c.Client.Read(ctx, readRequest, VersionHeader())
if err != nil {
if be, ok := err.(*dosarpc.BadRequestError); ok {
if be.ErrorCode != nil && *be.ErrorCode == errCodeNotFound {
Expand Down Expand Up @@ -319,7 +321,7 @@ func (c *Connector) MultiRead(ctx context.Context, ei *dosa.EntityInfo, keys []m
FieldsToRead: rpcMinimumFields,
}

response, err := c.Client.MultiRead(ctx, request)
response, err := c.Client.MultiRead(ctx, request, VersionHeader())
if err != nil {
return nil, errors.Wrap(err, "YARPC MultiRead failed")
}
Expand Down Expand Up @@ -372,7 +374,7 @@ func (c *Connector) Remove(ctx context.Context, ei *dosa.EntityInfo, keys map[st
KeyValues: rpcFields,
}

err := c.Client.Remove(ctx, removeRequest)
err := c.Client.Remove(ctx, removeRequest, VersionHeader())
if err != nil {
return errors.Wrap(err, "YARPC Remove failed")
}
Expand All @@ -391,7 +393,7 @@ func (c *Connector) RemoveRange(ctx context.Context, ei *dosa.EntityInfo, column
Conditions: rpcConditions,
}

if err := c.Client.RemoveRange(ctx, request); err != nil {
if err := c.Client.RemoveRange(ctx, request, VersionHeader()); err != nil {
return errors.Wrap(err, "YARPC RemoveRange failed")
}
return nil
Expand All @@ -417,7 +419,7 @@ func (c *Connector) Range(ctx context.Context, ei *dosa.EntityInfo, columnCondit
Conditions: rpcConditions,
FieldsToRead: rpcMinimumFields,
}
response, err := c.Client.Range(ctx, &rangeRequest)
response, err := c.Client.Range(ctx, &rangeRequest, VersionHeader())
if err != nil {
return nil, "", errors.Wrap(err, "YARPC Range failed")
}
Expand Down Expand Up @@ -467,7 +469,7 @@ func (c *Connector) Scan(ctx context.Context, ei *dosa.EntityInfo, minimumFields
Limit: &limit32,
FieldsToRead: rpcMinimumFields,
}
response, err := c.Client.Scan(ctx, &scanRequest)
response, err := c.Client.Scan(ctx, &scanRequest, VersionHeader())
if err != nil {
return nil, "", errors.Wrap(err, "YARPC Scan failed")
}
Expand All @@ -488,7 +490,7 @@ func (c *Connector) CheckSchema(ctx context.Context, scope, namePrefix string, e
}
csr := dosarpc.CheckSchemaRequest{EntityDefs: rpcEntityDefinition, Scope: &scope, NamePrefix: &namePrefix}

response, err := c.Client.CheckSchema(ctx, &csr)
response, err := c.Client.CheckSchema(ctx, &csr, VersionHeader())

if err != nil {
return dosa.InvalidVersion, wrapError(err, "YARPC CheckSchema failed", scope, c.Config.ServiceName)
Expand All @@ -510,7 +512,7 @@ func (c *Connector) UpsertSchema(ctx context.Context, scope, namePrefix string,
EntityDefs: rpcEds,
}

response, err := c.Client.UpsertSchema(ctx, request)
response, err := c.Client.UpsertSchema(ctx, request, VersionHeader())
if err != nil {
return nil, wrapError(err, "YARPC UpsertSchema failed", scope, c.Config.ServiceName)
}
Expand All @@ -533,7 +535,7 @@ func (c *Connector) UpsertSchema(ctx context.Context, scope, namePrefix string,
// CheckSchemaStatus checks the status of specific version of schema
func (c *Connector) CheckSchemaStatus(ctx context.Context, scope, namePrefix string, version int32) (*dosa.SchemaStatus, error) {
request := dosarpc.CheckSchemaStatusRequest{Scope: &scope, NamePrefix: &namePrefix, Version: &version}
response, err := c.Client.CheckSchemaStatus(ctx, &request)
response, err := c.Client.CheckSchemaStatus(ctx, &request, VersionHeader())

if err != nil {
return nil, wrapError(err, "YARPC ChecksShemaStatus failed", scope, c.Config.ServiceName)
Expand All @@ -560,8 +562,8 @@ func (c *Connector) CreateScope(ctx context.Context, scope string) error {
Name: &scope,
}

if err := c.Client.CreateScope(ctx, request); err != nil {
return wrapError(err, "YARPC CreateScope failed", scope, c.Config.ServiceName)
if err := c.Client.CreateScope(ctx, request, VersionHeader()); err != nil {
return errors.Wrap(err, "YARPC CreateScope failed")
}

return nil
Expand All @@ -573,8 +575,8 @@ func (c *Connector) TruncateScope(ctx context.Context, scope string) error {
Name: &scope,
}

if err := c.Client.TruncateScope(ctx, request); err != nil {
return wrapError(err, "YARPC TruncateScope failed", scope, c.Config.ServiceName)
if err := c.Client.TruncateScope(ctx, request, VersionHeader()); err != nil {
return errors.Wrap(err, "YARPC TruncateScope failed")
}

return nil
Expand All @@ -586,8 +588,8 @@ func (c *Connector) DropScope(ctx context.Context, scope string) error {
Name: &scope,
}

if err := c.Client.DropScope(ctx, request); err != nil {
return wrapError(err, "YARPC DropScope failed", scope, c.Config.ServiceName)
if err := c.Client.DropScope(ctx, request, VersionHeader()); err != nil {
return errors.Wrap(err, "YARPC DropScope failed")
}

return nil
Expand Down
54 changes: 28 additions & 26 deletions connectors/yarpc/yarpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
drpc "github.com/uber/dosa-idl/.gen/dosa"
"github.com/uber/dosa-idl/.gen/dosa/dosatest"
tchan "github.com/uber/tchannel-go"
yarpc2 "go.uber.org/yarpc"
)

var testEi = &dosa.EntityInfo{
Expand Down Expand Up @@ -195,7 +196,7 @@ func TestYaRPCClient_Read(t *testing.T) {
}

// we expect a single call to Read, and we return back two fields, f1 which is in the typemap and another field that is not
mockedClient.EXPECT().Read(ctx, readRequest).Return(&drpc.ReadResponse{drpc.FieldValueMap{
mockedClient.EXPECT().Read(ctx, readRequest, gomock.Any()).Return(&drpc.ReadResponse{drpc.FieldValueMap{
"c1": {ElemValue: &drpc.RawValue{Int64Value: testutil.TestInt64Ptr(1)}},
"fieldNotInSchema": {ElemValue: &drpc.RawValue{Int64Value: testutil.TestInt64Ptr(5)}},
"c2": {ElemValue: &drpc.RawValue{DoubleValue: testutil.TestFloat64Ptr(2.2)}},
Expand All @@ -221,7 +222,7 @@ func TestYaRPCClient_Read(t *testing.T) {
assert.Empty(t, values["fieldNotInSchema"]) // the unknown field is not present

errCode := int32(404)
mockedClient.EXPECT().Read(ctx, readRequest).Return(nil, &drpc.BadRequestError{ErrorCode: &errCode})
mockedClient.EXPECT().Read(ctx, readRequest, gomock.Any()).Return(nil, &drpc.BadRequestError{ErrorCode: &errCode})
_, err = sut.Read(ctx, testEi, map[string]dosa.FieldValue{"f1": dosa.FieldValue(int64(5))}, []string{"f1"})
assert.True(t, dosa.ErrorIsNotFound(err))

Expand Down Expand Up @@ -338,7 +339,7 @@ func TestYaRPCClient_MultiRead(t *testing.T) {
}

for _, d := range data {
mockedClient.EXPECT().MultiRead(ctx, d.Request).Return(d.Response, d.ResponseErr)
mockedClient.EXPECT().MultiRead(ctx, d.Request, gomock.Any()).Return(d.Response, d.ResponseErr)
// perform the multi read
values, err := sut.MultiRead(ctx, testEi, []map[string]dosa.FieldValue{{"f1": dosa.FieldValue(int64(5))}, {"f2": dosa.FieldValue(int64(6))}}, []string{"f1"})
if d.ResponseErr == nil {
Expand Down Expand Up @@ -409,7 +410,7 @@ func TestYaRPCClient_CreateIfNotExists(t *testing.T) {
outFields[item.Name] = &drpc.Value{ElemValue: rv}
}

mockedClient.EXPECT().CreateIfNotExists(ctx, &drpc.CreateRequest{Ref: &testRPCSchemaRef, EntityValues: outFields})
mockedClient.EXPECT().CreateIfNotExists(ctx, &drpc.CreateRequest{Ref: &testRPCSchemaRef, EntityValues: outFields}, gomock.Any())

// create the YaRPCClient and give it the mocked RPC interface
// see https://en.wiktionary.org/wiki/SUT for the reason this is called sut
Expand All @@ -420,7 +421,7 @@ func TestYaRPCClient_CreateIfNotExists(t *testing.T) {
assert.Nil(t, err)

errCode := int32(409)
mockedClient.EXPECT().CreateIfNotExists(ctx, &drpc.CreateRequest{Ref: &testRPCSchemaRef, EntityValues: outFields}).Return(
mockedClient.EXPECT().CreateIfNotExists(ctx, &drpc.CreateRequest{Ref: &testRPCSchemaRef, EntityValues: outFields}, gomock.Any()).Return(
&drpc.BadRequestError{ErrorCode: &errCode},
)

Expand Down Expand Up @@ -483,7 +484,7 @@ func TestYaRPCClient_Upsert(t *testing.T) {
mockedClient.EXPECT().Upsert(ctx, &drpc.UpsertRequest{
Ref: &testRPCSchemaRef,
EntityValues: outFields,
})
}, gomock.Any())

// create the YaRPCClient and give it the mocked RPC interface
// see https://en.wiktionary.org/wiki/SUT for the reason this is called sut
Expand Down Expand Up @@ -527,7 +528,7 @@ func TestClient_CheckSchema(t *testing.T) {
EntityDefs: []*drpc.EntityDefinition{yarpc.EntityDefinitionToThrift(&ed.EntityDefinition)},
}
v := int32(1)
mockedClient.EXPECT().CheckSchema(ctx, gomock.Any()).Do(func(_ context.Context, request *drpc.CheckSchemaRequest) {
mockedClient.EXPECT().CheckSchema(ctx, gomock.Any(), gomock.Any()).Do(func(_ context.Context, request *drpc.CheckSchemaRequest, opts yarpc2.CallOption) {
assert.Equal(t, expectedRequest, request)
}).Return(&drpc.CheckSchemaResponse{Version: &v}, nil)

Expand All @@ -551,7 +552,7 @@ func TestClient_CheckSchemaStatus(t *testing.T) {
Version: &version,
}

mockedClient.EXPECT().CheckSchemaStatus(ctx, gomock.Any()).Do(func(_ context.Context, request *drpc.CheckSchemaStatusRequest) {
mockedClient.EXPECT().CheckSchemaStatus(ctx, gomock.Any(), gomock.Any()).Do(func(_ context.Context, request *drpc.CheckSchemaStatusRequest, opts yarpc2.CallOption) {
assert.Equal(t, expectedRequest, request)
}).Return(&drpc.CheckSchemaStatusResponse{Version: &version}, nil)

Expand All @@ -577,14 +578,14 @@ func TestClient_UpsertSchema(t *testing.T) {
EntityDefs: []*drpc.EntityDefinition{yarpc.EntityDefinitionToThrift(&ed.EntityDefinition)},
}
v := int32(1)
mockedClient.EXPECT().UpsertSchema(ctx, gomock.Any()).Do(func(_ context.Context, request *drpc.UpsertSchemaRequest) {
mockedClient.EXPECT().UpsertSchema(ctx, gomock.Any(), gomock.Any()).Do(func(_ context.Context, request *drpc.UpsertSchemaRequest, option yarpc2.CallOption) {
assert.Equal(t, expectedRequest, request)
}).Return(&drpc.UpsertSchemaResponse{Version: &v}, nil)
result, err := sut.UpsertSchema(ctx, sp, prefix, []*dosa.EntityDefinition{&ed.EntityDefinition})
assert.NoError(t, err)
assert.Equal(t, &dosa.SchemaStatus{Version: v}, result)

mockedClient.EXPECT().UpsertSchema(ctx, gomock.Any()).Return(nil, errors.New("test error"))
mockedClient.EXPECT().UpsertSchema(ctx, gomock.Any(), gomock.Any()).Return(nil, errors.New("test error"))
_, err = sut.UpsertSchema(ctx, sp, prefix, []*dosa.EntityDefinition{&ed.EntityDefinition})
assert.Error(t, err)
assert.Contains(t, err.Error(), "test error")
Expand All @@ -594,12 +595,13 @@ func TestClient_CreateScope(t *testing.T) {
// build a mock RPC client
ctrl := gomock.NewController(t)
mockedClient := dosatest.NewMockClient(ctrl)

sut := yarpc.Connector{Client: mockedClient, Config: testCfg}
mockedClient.EXPECT().CreateScope(ctx, gomock.Any()).Return(nil)
mockedClient.EXPECT().CreateScope(ctx, gomock.Any(), gomock.Any()).Return(nil)
err := sut.CreateScope(ctx, "scope")
assert.NoError(t, err)

mockedClient.EXPECT().CreateScope(ctx, gomock.Any()).Return(errors.New("test error"))
mockedClient.EXPECT().CreateScope(ctx, gomock.Any(), gomock.Any()).Return(errors.New("test error"))
err = sut.CreateScope(ctx, "scope")
assert.Error(t, err)
assert.Contains(t, err.Error(), "test error")
Expand All @@ -611,11 +613,11 @@ func TestClient_TruncateScope(t *testing.T) {
mockedClient := dosatest.NewMockClient(ctrl)
sut := yarpc.Connector{Client: mockedClient, Config: testCfg}

mockedClient.EXPECT().TruncateScope(ctx, gomock.Any()).Return(nil)
mockedClient.EXPECT().TruncateScope(ctx, gomock.Any(), gomock.Any()).Return(nil)
err := sut.TruncateScope(ctx, "scope")
assert.NoError(t, err)

mockedClient.EXPECT().TruncateScope(ctx, gomock.Any()).Return(errors.New("test error"))
mockedClient.EXPECT().TruncateScope(ctx, gomock.Any(), gomock.Any()).Return(errors.New("test error"))
err = sut.TruncateScope(ctx, "scope")
assert.Error(t, err)
assert.Contains(t, err.Error(), "test error")
Expand All @@ -627,11 +629,11 @@ func TestClient_DropScope(t *testing.T) {
mockedClient := dosatest.NewMockClient(ctrl)
sut := yarpc.Connector{Client: mockedClient, Config: testCfg}

mockedClient.EXPECT().DropScope(ctx, gomock.Any()).Return(nil)
mockedClient.EXPECT().DropScope(ctx, gomock.Any(), gomock.Any()).Return(nil)
err := sut.DropScope(ctx, "scope")
assert.NoError(t, err)

mockedClient.EXPECT().DropScope(ctx, gomock.Any()).Return(errors.New("test error"))
mockedClient.EXPECT().DropScope(ctx, gomock.Any(), gomock.Any()).Return(errors.New("test error"))
err = sut.DropScope(ctx, "scope")
assert.Error(t, err)
assert.Contains(t, err.Error(), "test error")
Expand All @@ -656,7 +658,7 @@ func TestConnector_Range(t *testing.T) {
sut := yarpc.Connector{Client: mockedClient, Config: testCfg}

// successful call, return results
mockedClient.EXPECT().Range(ctx, gomock.Any()).Do(func(_ context.Context, request *drpc.RangeRequest) {
mockedClient.EXPECT().Range(ctx, gomock.Any(), gomock.Any()).Do(func(_ context.Context, request *drpc.RangeRequest, opts yarpc2.CallOption) {
assert.Equal(t, map[string]struct{}{"c1": {}}, request.FieldsToRead)
assert.Equal(t, testLimit, *request.Limit)
assert.Equal(t, testRPCSchemaRef, *request.Ref)
Expand Down Expand Up @@ -699,7 +701,7 @@ func TestConnector_Range(t *testing.T) {
testutil.AssertEqForPointer(testAssert(t), float64(2.2), values[0]["c2"])

// perform a not found request
mockedClient.EXPECT().Range(ctx, gomock.Any()).
mockedClient.EXPECT().Range(ctx, gomock.Any(), gomock.Any()).
Return(nil, &dosa.ErrNotFound{}).Times(1)
values, token, err = sut.Range(ctx, testEi, map[string][]*dosa.Condition{"c2": {&dosa.Condition{
Value: float64(3.3),
Expand All @@ -711,7 +713,7 @@ func TestConnector_Range(t *testing.T) {
assert.True(t, dosa.ErrorIsNotFound(err))

// perform a generic error request
mockedClient.EXPECT().Range(ctx, gomock.Any()).
mockedClient.EXPECT().Range(ctx, gomock.Any(), gomock.Any()).
Return(nil, errors.New("test error")).Times(1)
values, token, err = sut.Range(ctx, testEi, map[string][]*dosa.Condition{"c2": {&dosa.Condition{
Value: float64(3.3),
Expand Down Expand Up @@ -743,7 +745,7 @@ func TestConnector_RemoveRange(t *testing.T) {
field := drpc.Field{&fieldName, &drpc.Value{ElemValue: &drpc.RawValue{Int64Value: testutil.TestInt64Ptr(10)}}}
op := drpc.OperatorEq

mockedClient.EXPECT().RemoveRange(ctx, gomock.Any()).Do(func(_ context.Context, request *drpc.RemoveRangeRequest) {
mockedClient.EXPECT().RemoveRange(ctx, gomock.Any(), gomock.Any()).Do(func(_ context.Context, request *drpc.RemoveRangeRequest, option yarpc2.CallOption) {
assert.Equal(t, testRPCSchemaRef, *request.Ref)
assert.Equal(t, len(request.Conditions), 1)
condition := request.Conditions[0]
Expand All @@ -761,7 +763,7 @@ func TestConnector_RemoveRange(t *testing.T) {
assert.NoError(t, err)

// perform a generic error request
mockedClient.EXPECT().RemoveRange(ctx, gomock.Any()).Return(errors.New("test error")).Times(1)
mockedClient.EXPECT().RemoveRange(ctx, gomock.Any(), gomock.Any()).Return(errors.New("test error")).Times(1)
err = sut.RemoveRange(ctx, testEi, map[string][]*dosa.Condition{"c2": {&dosa.Condition{
Value: 3.3,
Op: dosa.Eq,
Expand Down Expand Up @@ -796,8 +798,8 @@ func TestConnector_Scan(t *testing.T) {
FieldsToRead: map[string]struct{}{"c1": {}},
}
// successful call, return results
mockedClient.EXPECT().Scan(ctx, sr).
Do(func(_ context.Context, r *drpc.ScanRequest) {
mockedClient.EXPECT().Scan(ctx, sr, gomock.Any()).
Do(func(_ context.Context, r *drpc.ScanRequest, option yarpc2.CallOption) {
assert.Equal(t, sr, r)
}).
Return(&drpc.ScanResponse{
Expand All @@ -811,10 +813,10 @@ func TestConnector_Scan(t *testing.T) {
NextToken: &responseToken,
}, nil)
// failed call, return error
mockedClient.EXPECT().Scan(ctx, gomock.Any()).
mockedClient.EXPECT().Scan(ctx, gomock.Any(), gomock.Any()).
Return(nil, errors.New("test error")).Times(1)
// no results, make sure error is exact
mockedClient.EXPECT().Scan(ctx, gomock.Any()).
mockedClient.EXPECT().Scan(ctx, gomock.Any(), gomock.Any()).
Return(nil, &dosa.ErrNotFound{})

// Prepare the dosa client interface using the mocked RPC layer
Expand Down Expand Up @@ -856,7 +858,7 @@ func TestConnector_Remove(t *testing.T) {
}

// we expect a single call to Read, and we return back two fields, f1 which is in the typemap and another field that is not
mockedClient.EXPECT().Remove(ctx, removeRequest).Return(nil)
mockedClient.EXPECT().Remove(ctx, removeRequest, gomock.Any()).Return(nil)

// Prepare the dosa client interface using the mocked RPC layer
sut := yarpc.Connector{Client: mockedClient, Config: testCfg}
Expand Down
Loading

0 comments on commit 6024915

Please sign in to comment.