Skip to content

Commit

Permalink
Backport - Fix Transactions being cached before successful publish (#…
Browse files Browse the repository at this point in the history
…4277) (#4313)

## Motivation
Closes #4043

## Changes
This change validates a nodes own transactions before publishing them with libp2p. This avoids them being cached and therefore multiple submissions of the same (invalid) transaction will continue to fail instead of returning "in mempool"

## Test Plan
- Updated existing tests
- Added tests that ensure a nodes own transactions are validated first before publishing them via libp2p (and relying on the validation there)

## TODO
<!-- This section should be removed when all items are complete -->
- [x] Explain motivation or link existing issue(s)
- [x] Test changes and document test plan
- [x] Update documentation as needed

## DevOps Notes
<!-- Please uncheck these items as applicable to make DevOps aware of changes that may affect releases -->
- [x] This PR does not require configuration changes (e.g., environment variables, GitHub secrets, VM resources)
- [x] This PR does not affect public APIs
- [x] This PR does not rely on a new version of external services (PoET, elasticsearch, etc.)
- [x] This PR does not make changes to log messages (which monitoring infrastructure may rely on)
  • Loading branch information
fasmat committed Apr 18, 2023
1 parent 08256a0 commit f74acb0
Show file tree
Hide file tree
Showing 11 changed files with 303 additions and 133 deletions.
186 changes: 105 additions & 81 deletions api/grpcserver/grpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,14 +525,6 @@ func marshalProto(t *testing.T, msg proto.Message) string {

var cfg = config.DefaultTestConfig()

type SyncerMock struct {
startCalled bool
isSynced bool
}

func (s *SyncerMock) IsSynced(context.Context) bool { return s.isSynced }
func (s *SyncerMock) Start(context.Context) { s.startCalled = true }

type ActivationAPIMock struct {
UpdatePoETErr error
}
Expand Down Expand Up @@ -612,15 +604,16 @@ func TestNewServersConfig(t *testing.T) {

func TestNodeService(t *testing.T) {
logtest.SetupGlobal(t)
syncer := SyncerMock{}
ctrl := gomock.NewController(t)
syncer := mocks.NewMockSyncer(ctrl)
syncer.EXPECT().IsSynced(gomock.Any()).Return(false).AnyTimes()
atxapi := &ActivationAPIMock{}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

grpcService := NewNodeService(ctx, &networkMock, meshAPI, &genTime, &syncer, atxapi)
shutDown := launchServer(t, grpcService)
defer shutDown()
grpcService := NewNodeService(ctx, &networkMock, meshAPI, &genTime, syncer, atxapi)
t.Cleanup(launchServer(t, grpcService))

conn := dialGrpc(ctx, t, cfg)
c := pb.NewNodeServiceClient(conn)
Expand All @@ -641,14 +634,18 @@ func TestNodeService(t *testing.T) {

// now try sending bad payloads
_, err = c.Echo(context.Background(), &pb.EchoRequest{Msg: nil})
require.EqualError(t, err, "rpc error: code = InvalidArgument desc = Must include `Msg`")
statusCode := status.Code(err)
require.Equal(t, codes.InvalidArgument, statusCode)
require.Error(t, err)
grpcStatus, ok := status.FromError(err)
require.True(t, ok)
require.Equal(t, codes.InvalidArgument, grpcStatus.Code())
require.Equal(t, "Must include `Msg`", grpcStatus.Message())

_, err = c.Echo(context.Background(), &pb.EchoRequest{})
require.EqualError(t, err, "rpc error: code = InvalidArgument desc = Must include `Msg`")
statusCode = status.Code(err)
require.Equal(t, codes.InvalidArgument, statusCode)
require.Error(t, err)
grpcStatus, ok = status.FromError(err)
require.True(t, ok)
require.Equal(t, codes.InvalidArgument, grpcStatus.Code())
require.Equal(t, "Must include `Msg`", grpcStatus.Message())
}},
{"Version", func(t *testing.T) {
logtest.SetupGlobal(t)
Expand Down Expand Up @@ -695,7 +692,6 @@ func TestNodeService(t *testing.T) {
}},
{"SyncStart", func(t *testing.T) {
logtest.SetupGlobal(t)
require.Equal(t, false, syncer.startCalled, "Start() not yet called on syncer")
req := &pb.SyncStartRequest{}
res, err := c.SyncStart(context.Background(), req)
require.Nil(t, res)
Expand Down Expand Up @@ -734,8 +730,7 @@ func TestNodeService(t *testing.T) {
func TestGlobalStateService(t *testing.T) {
logtest.SetupGlobal(t)
svc := NewGlobalStateService(meshAPI, conStateAPI)
shutDown := launchServer(t, svc)
defer shutDown()
t.Cleanup(launchServer(t, svc))

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down Expand Up @@ -1016,9 +1011,9 @@ func TestGlobalStateService(t *testing.T) {

func TestSmesherService(t *testing.T) {
logtest.SetupGlobal(t)
svc := NewSmesherService(&PostAPIMock{}, &SmeshingAPIMock{}, 10*time.Millisecond, activation.DefaultPostSetupOpts())
shutDown := launchServer(t, svc)
defer shutDown()
smeshingAPI := &SmeshingAPIMock{}
svc := NewSmesherService(&PostAPIMock{}, smeshingAPI, 10*time.Millisecond, activation.DefaultPostSetupOpts())
t.Cleanup(launchServer(t, svc))

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down Expand Up @@ -1142,8 +1137,7 @@ func TestSmesherService(t *testing.T) {
func TestMeshService(t *testing.T) {
logtest.SetupGlobal(t)
grpcService := NewMeshService(meshAPI, conStateAPI, &genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal)
shutDown := launchServer(t, grpcService)
t.Cleanup(func() { shutDown() })
t.Cleanup(launchServer(t, grpcService))

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down Expand Up @@ -1670,14 +1664,16 @@ func TestMeshService(t *testing.T) {
func TestTransactionServiceSubmitUnsync(t *testing.T) {
logtest.SetupGlobal(t)
req := require.New(t)
syncer := &SyncerMock{}
ctrl := gomock.NewController(t)
syncer := mocks.NewMockSyncer(ctrl)
syncer.EXPECT().IsSynced(gomock.Any()).Return(false)
publisher := pubsubmocks.NewMockPublisher(ctrl)
publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
txHandler := mocks.NewMockTxValidator(ctrl)
txHandler.EXPECT().VerifyAndCacheTx(gomock.Any(), gomock.Any()).Return(nil)

grpcService := NewTransactionService(sql.InMemory(), publisher, meshAPI, conStateAPI, syncer)
shutDown := launchServer(t, grpcService)
defer shutDown()
grpcService := NewTransactionService(sql.InMemory(), publisher, meshAPI, conStateAPI, syncer, txHandler)
t.Cleanup(launchServer(t, grpcService))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand All @@ -1690,40 +1686,73 @@ func TestTransactionServiceSubmitUnsync(t *testing.T) {
// This time, we expect an error, since isSynced is false (by default)
// The node should not allow tx submission when not synced
res, err := c.SubmitTransaction(ctx, &pb.SubmitTransactionRequest{Transaction: serializedTx})
req.EqualError(err, "rpc error: code = FailedPrecondition desc = Cannot submit transaction, node is not in sync yet, try again later")
req.Error(err)
grpcStatus, ok := status.FromError(err)
req.True(ok)
req.Equal(codes.FailedPrecondition, grpcStatus.Code())
req.Equal("Cannot submit transaction, node is not in sync yet, try again later", grpcStatus.Message())
req.Nil(res)

syncer.isSynced = true
syncer.EXPECT().IsSynced(gomock.Any()).Return(true)

// This time, we expect no error, since isSynced is now true
_, err = c.SubmitTransaction(ctx, &pb.SubmitTransactionRequest{Transaction: serializedTx})
req.NoError(err)
// TODO: randomly got an error here, should investigate. Added specific error check above, as this error should have
// happened there first.
// Received unexpected error: "rpc error: code = Unimplemented desc = unknown service spacemesh.v1.TransactionService"
}

func TestTransactionServiceSubmitInvalidTx(t *testing.T) {
logtest.SetupGlobal(t)
req := require.New(t)

ctrl := gomock.NewController(t)
syncer := mocks.NewMockSyncer(ctrl)
syncer.EXPECT().IsSynced(gomock.Any()).Return(true)
publisher := pubsubmocks.NewMockPublisher(ctrl) // publish is not called
txHandler := mocks.NewMockTxValidator(ctrl)
txHandler.EXPECT().VerifyAndCacheTx(gomock.Any(), gomock.Any()).Return(errors.New("failed validation"))

grpcService := NewTransactionService(sql.InMemory(), publisher, meshAPI, conStateAPI, syncer, txHandler)
t.Cleanup(launchServer(t, grpcService))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
conn := dialGrpc(ctx, t, cfg)
c := pb.NewTransactionServiceClient(conn)

serializedTx, err := codec.Encode(globalTx)
req.NoError(err, "error serializing tx")

// When verifying and caching the transaction fails we expect an error
res, err := c.SubmitTransaction(ctx, &pb.SubmitTransactionRequest{Transaction: serializedTx})
req.Error(err)
grpcStatus, ok := status.FromError(err)
req.True(ok)
req.Equal(codes.InvalidArgument, grpcStatus.Code())
req.Equal("Failed to verify transaction", grpcStatus.Message())
req.Nil(res)
}

func TestTransactionService_SubmitNoConcurrency(t *testing.T) {
logtest.SetupGlobal(t)

numTxs := 20

ctrl := gomock.NewController(t)
syncer := mocks.NewMockSyncer(ctrl)
syncer.EXPECT().IsSynced(gomock.Any()).Return(true).Times(numTxs)
publisher := pubsubmocks.NewMockPublisher(ctrl)
publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(numTxs)
txHandler := mocks.NewMockTxValidator(ctrl)
txHandler.EXPECT().VerifyAndCacheTx(gomock.Any(), gomock.Any()).Return(nil).Times(numTxs)

expected := 20
n := 0
publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(func(_ context.Context, _ string, msg []byte) error {
n++
return nil
})
grpcService := NewTransactionService(sql.InMemory(), publisher, meshAPI, conStateAPI, &SyncerMock{isSynced: true})
shutDown := launchServer(t, grpcService)
defer shutDown()
grpcService := NewTransactionService(sql.InMemory(), publisher, meshAPI, conStateAPI, syncer, txHandler)
t.Cleanup(launchServer(t, grpcService))

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn := dialGrpc(ctx, t, cfg)
c := pb.NewTransactionServiceClient(conn)
for i := 0; i < expected; i++ {
for i := 0; i < numTxs; i++ {
res, err := c.SubmitTransaction(ctx, &pb.SubmitTransactionRequest{
Transaction: globalTx.Raw,
})
Expand All @@ -1732,19 +1761,21 @@ func TestTransactionService_SubmitNoConcurrency(t *testing.T) {
require.Equal(t, globalTx.ID.Bytes(), res.Txstate.Id.Id)
require.Equal(t, pb.TransactionState_TRANSACTION_STATE_MEMPOOL, res.Txstate.State)
}
require.Equal(t, expected, n)
}

func TestTransactionService(t *testing.T) {
logtest.SetupGlobal(t)

ctrl := gomock.NewController(t)
syncer := mocks.NewMockSyncer(ctrl)
syncer.EXPECT().IsSynced(gomock.Any()).Return(true).AnyTimes()
publisher := pubsubmocks.NewMockPublisher(ctrl)

publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
grpcService := NewTransactionService(sql.InMemory(), publisher, meshAPI, conStateAPI, &SyncerMock{isSynced: true})
shutDown := launchServer(t, grpcService)
defer shutDown()
txHandler := mocks.NewMockTxValidator(ctrl)
txHandler.EXPECT().VerifyAndCacheTx(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()

grpcService := NewTransactionService(sql.InMemory(), publisher, meshAPI, conStateAPI, syncer, txHandler)
t.Cleanup(launchServer(t, grpcService))

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down Expand Up @@ -2094,8 +2125,7 @@ func TestAccountMeshDataStream_comprehensive(t *testing.T) {
t.Cleanup(events.CloseEventReporter)

grpcService := NewMeshService(meshAPI, conStateAPI, &genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal)
shutDown := launchServer(t, grpcService)
defer shutDown()
t.Cleanup(launchServer(t, grpcService))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down Expand Up @@ -2151,8 +2181,7 @@ func TestAccountDataStream_comprehensive(t *testing.T) {
t.Cleanup(events.CloseEventReporter)

svc := NewGlobalStateService(meshAPI, conStateAPI)
shutDown := launchServer(t, svc)
t.Cleanup(shutDown)
t.Cleanup(launchServer(t, svc))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down Expand Up @@ -2211,8 +2240,7 @@ func TestGlobalStateStream_comprehensive(t *testing.T) {
t.Cleanup(events.CloseEventReporter)

svc := NewGlobalStateService(meshAPI, conStateAPI)
shutDown := launchServer(t, svc)
defer shutDown()
t.Cleanup(launchServer(t, svc))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down Expand Up @@ -2272,8 +2300,7 @@ func TestLayerStream_comprehensive(t *testing.T) {
t.Cleanup(events.CloseEventReporter)

grpcService := NewMeshService(meshAPI, conStateAPI, &genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal)
shutDown := launchServer(t, grpcService)
defer shutDown()
t.Cleanup(launchServer(t, grpcService))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down Expand Up @@ -2409,10 +2436,15 @@ func TestMultiService(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

svc1 := NewNodeService(ctx, &networkMock, meshAPI, &genTime, &SyncerMock{}, &ActivationAPIMock{})
ctrl, ctx := gomock.WithContext(ctx, t)
syncer := mocks.NewMockSyncer(ctrl)
syncer.EXPECT().IsSynced(gomock.Any()).Return(false).AnyTimes()
atxapi := &ActivationAPIMock{}

svc1 := NewNodeService(ctx, &networkMock, meshAPI, &genTime, syncer, atxapi)
svc2 := NewMeshService(meshAPI, conStateAPI, &genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal)
shutDown := launchServer(t, svc1, svc2)
defer shutDown()
t.Cleanup(shutDown)

conn := dialGrpc(ctx, t, cfg)

Expand Down Expand Up @@ -2451,22 +2483,23 @@ func TestJsonApi(t *testing.T) {
const message = "hello world!"

// we cannot start the gateway service without enabling at least one service
cfg.StartNodeService = false
cfg.StartMeshService = false
shutDown := launchServer(t)
t.Cleanup(shutDown)
payload := marshalProto(t, &pb.EchoRequest{Msg: &pb.SimpleString{Value: message}})
url := fmt.Sprintf("http://127.0.0.1:%d/%s", cfg.JSONServerPort, "v1/node/echo")
_, err := http.Post(url, "application/json", strings.NewReader(payload))
require.Error(t, err)
shutDown()

// enable services and try again
svc1 := NewNodeService(context.Background(), &networkMock, meshAPI, &genTime, &SyncerMock{}, &ActivationAPIMock{})
ctrl := gomock.NewController(t)
syncer := mocks.NewMockSyncer(ctrl)
syncer.EXPECT().IsSynced(gomock.Any()).Return(false).AnyTimes()
atxapi := &ActivationAPIMock{}

svc1 := NewNodeService(context.Background(), &networkMock, meshAPI, &genTime, syncer, atxapi)
svc2 := NewMeshService(meshAPI, conStateAPI, &genTime, layersPerEpoch, types.Hash20{}, layerDuration, layerAvgSize, txsPerProposal)
cfg.StartNodeService = true
cfg.StartMeshService = true
shutDown = launchServer(t, svc1, svc2)
defer shutDown()
t.Cleanup(launchServer(t, svc1, svc2))
time.Sleep(time.Second)

// generate request payload (api input params)
Expand All @@ -2490,8 +2523,7 @@ func TestDebugService(t *testing.T) {
ctrl := gomock.NewController(t)
identity := mocks.NewMockNetworkIdentity(ctrl)
svc := NewDebugService(conStateAPI, identity)
shutDown := launchServer(t, svc)
defer shutDown()
t.Cleanup(launchServer(t, svc))

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down Expand Up @@ -2550,16 +2582,9 @@ func TestEventsReceived(t *testing.T) {
events.InitializeReporter()
t.Cleanup(events.CloseEventReporter)

ctrl := gomock.NewController(t)
publisher := pubsubmocks.NewMockPublisher(ctrl)
publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(func(_ context.Context, _ string, msg []byte) error {
return nil
})

txService := NewTransactionService(sql.InMemory(), publisher, meshAPI, conStateAPI, &SyncerMock{isSynced: true})
txService := NewTransactionService(sql.InMemory(), nil, meshAPI, conStateAPI, nil, nil)
gsService := NewGlobalStateService(meshAPI, conStateAPI)
shutDown := launchServer(t, txService, gsService)
defer shutDown()
t.Cleanup(launchServer(t, txService, gsService))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down Expand Up @@ -2637,8 +2662,7 @@ func TestTransactionsRewards(t *testing.T) {
events.InitializeReporter()
t.Cleanup(events.CloseEventReporter)

shutDown := launchServer(t, NewGlobalStateService(meshAPI, conStateAPI))
t.Cleanup(shutDown)
t.Cleanup(launchServer(t, NewGlobalStateService(meshAPI, conStateAPI)))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
Expand Down
Loading

0 comments on commit f74acb0

Please sign in to comment.