diff --git a/mocks/mock_synchronizer.go b/mocks/mock_synchronizer.go index e4c26aa90..11f777d96 100644 --- a/mocks/mock_synchronizer.go +++ b/mocks/mock_synchronizer.go @@ -83,6 +83,20 @@ func (mr *MockSyncReaderMockRecorder) SubscribeNewHeads() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeNewHeads", reflect.TypeOf((*MockSyncReader)(nil).SubscribeNewHeads)) } +// SubscribePendingTxs mocks base method. +func (m *MockSyncReader) SubscribePendingTxs() sync.PendingTxSubscription { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SubscribePendingTxs") + ret0, _ := ret[0].(sync.PendingTxSubscription) + return ret0 +} + +// SubscribePendingTxs indicates an expected call of SubscribePendingTxs. +func (mr *MockSyncReaderMockRecorder) SubscribePendingTxs() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribePendingTxs", reflect.TypeOf((*MockSyncReader)(nil).SubscribePendingTxs)) +} + // SubscribeReorg mocks base method. func (m *MockSyncReader) SubscribeReorg() sync.ReorgSubscription { m.ctrl.T.Helper() diff --git a/rpc/events.go b/rpc/events.go index 7a83ea944..87963551c 100644 --- a/rpc/events.go +++ b/rpc/events.go @@ -14,7 +14,8 @@ import ( ) const ( - MaxBlocksBack = 1024 + MaxBlocksBack = 1024 + MaxAddressesInFilter = 1000 // TODO(weiihann): not finalised yet ) type EventsArg struct { @@ -116,6 +117,130 @@ func (h *Handler) SubscribeNewHeads(ctx context.Context, blockID *BlockID) (*Sub return &SubscriptionID{ID: id}, nil } +func (h *Handler) SubscribePendingTxs(ctx context.Context, getDetails *bool, senderAddr []felt.Felt) (*SubscriptionID, *jsonrpc.Error) { + w, ok := jsonrpc.ConnFromContext(ctx) + if !ok { + return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil) + } + + if len(senderAddr) > MaxAddressesInFilter { + return nil, ErrTooManyAddressesInFilter + } + + id := h.idgen() + subscriptionCtx, subscriptionCtxCancel := context.WithCancel(ctx) + sub := &subscription{ + cancel: subscriptionCtxCancel, + conn: w, + } + h.mu.Lock() + h.subscriptions[id] = sub + h.mu.Unlock() + + pendingTxsSub := h.pendingTxs.Subscribe() + sub.wg.Go(func() { + defer func() { + h.unsubscribe(sub, id) + pendingTxsSub.Unsubscribe() + }() + + h.processPendingTxs(subscriptionCtx, getDetails != nil && *getDetails, senderAddr, pendingTxsSub, w, id) + }) + + return &SubscriptionID{ID: id}, nil +} + +func (h *Handler) processPendingTxs( + ctx context.Context, + getDetails bool, + senderAddr []felt.Felt, + pendingTxsSub *feed.Subscription[[]core.Transaction], + w jsonrpc.Conn, + id uint64, +) { + for { + select { + case <-ctx.Done(): + return + case pendingTxs := <-pendingTxsSub.Recv(): + filteredTxs := h.filterTxs(pendingTxs, getDetails, senderAddr) + if err := h.sendPendingTxs(w, filteredTxs, id); err != nil { + h.log.Warnw("Error sending pending transactions", "err", err) + return + } + } + } +} + +func (h *Handler) filterTxs(pendingTxs []core.Transaction, getDetails bool, senderAddr []felt.Felt) interface{} { + if getDetails { + return h.filterTxDetails(pendingTxs, senderAddr) + } + return h.filterTxHashes(pendingTxs, senderAddr) +} + +func (h *Handler) filterTxDetails(pendingTxs []core.Transaction, senderAddr []felt.Felt) []*Transaction { + filteredTxs := make([]*Transaction, 0, len(pendingTxs)) + for _, txn := range pendingTxs { + if h.shouldIncludeTx(txn, senderAddr) { + filteredTxs = append(filteredTxs, AdaptTransaction(txn)) + } + } + return filteredTxs +} + +func (h *Handler) filterTxHashes(pendingTxs []core.Transaction, senderAddr []felt.Felt) []felt.Felt { + filteredTxHashes := make([]felt.Felt, 0, len(pendingTxs)) + for _, txn := range pendingTxs { + if h.shouldIncludeTx(txn, senderAddr) { + filteredTxHashes = append(filteredTxHashes, *txn.Hash()) + } + } + return filteredTxHashes +} + +func (h *Handler) shouldIncludeTx(txn core.Transaction, senderAddr []felt.Felt) bool { + if len(senderAddr) == 0 { + return true + } + + // + switch t := txn.(type) { + case *core.InvokeTransaction: + for _, addr := range senderAddr { + if t.SenderAddress.Equal(&addr) { + return true + } + } + case *core.DeclareTransaction: + for _, addr := range senderAddr { + if t.SenderAddress.Equal(&addr) { + return true + } + } + } + + return false +} + +func (h *Handler) sendPendingTxs(w jsonrpc.Conn, result interface{}, id uint64) error { + req := jsonrpc.Request{ + Version: "2.0", + Method: "starknet_subscriptionPendingTransactions", + Params: map[string]interface{}{ + "subscription_id": id, + "result": result, + }, + } + + resp, err := json.Marshal(req) + if err != nil { + return err + } + _, err = w.Write(resp) + return err +} + // getStartAndLatestHeaders gets the start and latest header for the subscription func (h *Handler) getStartAndLatestHeaders(blockID *BlockID) (*core.Header, *core.Header, *jsonrpc.Error) { if blockID == nil || blockID.Latest { diff --git a/rpc/events_test.go b/rpc/events_test.go index 648c1fe5b..4e63918a4 100644 --- a/rpc/events_test.go +++ b/rpc/events_test.go @@ -3,8 +3,6 @@ package rpc_test import ( "context" "fmt" - "io" - "net" "net/http/httptest" "testing" "time" @@ -28,7 +26,8 @@ import ( var emptyCommitments = core.BlockCommitments{} const ( - newHeadsResponse = `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}` + newHeadsResponse = `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}` + subscribeResponse = `{"jsonrpc":"2.0","result":{"subscription_id":%d},"id":1}` ) func TestEvents(t *testing.T) { @@ -220,31 +219,17 @@ func TestEvents(t *testing.T) { }) } -type fakeConn struct { - w io.Writer -} - -func (fc *fakeConn) Write(p []byte) (int, error) { - return fc.w.Write(p) -} - -func (fc *fakeConn) Equal(other jsonrpc.Conn) bool { - fc2, ok := other.(*fakeConn) - if !ok { - return false - } - return fc.w == fc2.w -} - type fakeSyncer struct { - newHeads *feed.Feed[*core.Header] - reorgs *feed.Feed[*sync.ReorgData] + newHeads *feed.Feed[*core.Header] + reorgs *feed.Feed[*sync.ReorgData] + pendingTxs *feed.Feed[[]core.Transaction] } func newFakeSyncer() *fakeSyncer { return &fakeSyncer{ - newHeads: feed.New[*core.Header](), - reorgs: feed.New[*sync.ReorgData](), + newHeads: feed.New[*core.Header](), + reorgs: feed.New[*sync.ReorgData](), + pendingTxs: feed.New[[]core.Transaction](), } } @@ -256,6 +241,10 @@ func (fs *fakeSyncer) SubscribeReorg() sync.ReorgSubscription { return sync.ReorgSubscription{Subscription: fs.reorgs.Subscribe()} } +func (fs *fakeSyncer) SubscribePendingTxs() sync.PendingTxSubscription { + return sync.PendingTxSubscription{Subscription: fs.pendingTxs.Subscribe()} +} + func (fs *fakeSyncer) StartingBlockNumber() (uint64, error) { return 0, nil } @@ -264,9 +253,10 @@ func (fs *fakeSyncer) HighestBlockHeader() *core.Header { return nil } -func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) { +func TestSubscribeNewHeads(t *testing.T) { t.Parallel() + log := utils.NewNopZapLogger() chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) syncer := newFakeSyncer() handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger()) @@ -281,53 +271,37 @@ func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) { // Sleep for a moment just in case. time.Sleep(50 * time.Millisecond) - serverConn, clientConn := net.Pipe() - t.Cleanup(func() { - require.NoError(t, serverConn.Close()) - require.NoError(t, clientConn.Close()) - }) + server := jsonrpc.NewServer(1, log) + require.NoError(t, server.RegisterMethods(jsonrpc.Method{ + Name: "starknet_subscribeNewHeads", + Params: []jsonrpc.Parameter{{Name: "block", Optional: true}}, + Handler: handler.SubscribeNewHeads, + })) + ws := jsonrpc.NewWebsocket(server, log) + httpSrv := httptest.NewServer(ws) + + conn, _, err := websocket.Dial(ctx, httpSrv.URL, nil) + require.NoError(t, err) - // Subscribe without setting the connection on the context. - id, rpcErr := handler.SubscribeNewHeads(ctx, nil) - require.Zero(t, id) - require.Equal(t, jsonrpc.MethodNotFound, rpcErr.Code) + id := uint64(1) + handler.WithIDGen(func() uint64 { return id }) - // Subscribe correctly. - subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) - id, rpcErr = handler.SubscribeNewHeads(subCtx, nil) - require.Nil(t, rpcErr) + subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads"}`) + require.NoError(t, conn.Write(ctx, websocket.MessageText, subscribeMsg)) + + want := fmt.Sprintf(subscribeResponse, id) + _, got, err := conn.Read(ctx) + require.NoError(t, err) + require.Equal(t, want, string(got)) // Simulate a new block syncer.newHeads.Send(testHeader(t)) // Receive a block header. - want := fmt.Sprintf(newHeadsResponse, id.ID) - got := make([]byte, len(want)) - _, err := clientConn.Read(got) + want = fmt.Sprintf(newHeadsResponse, id) + _, headerGot, err := conn.Read(ctx) require.NoError(t, err) - require.Equal(t, want, string(got)) - - // Unsubscribe without setting the connection on the context. - ok, rpcErr := handler.Unsubscribe(ctx, id.ID) - require.Equal(t, jsonrpc.MethodNotFound, rpcErr.Code) - require.False(t, ok) - - // Unsubscribe on correct connection with the incorrect id. - ok, rpcErr = handler.Unsubscribe(subCtx, id.ID+1) - require.Equal(t, rpc.ErrSubscriptionNotFound, rpcErr) - require.False(t, ok) - - // Unsubscribe on incorrect connection with the correct id. - subCtx = context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{}) - ok, rpcErr = handler.Unsubscribe(subCtx, id.ID) - require.Equal(t, rpc.ErrSubscriptionNotFound, rpcErr) - require.False(t, ok) - - // Unsubscribe on correct connection with the correct id. - subCtx = context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) - ok, rpcErr = handler.Unsubscribe(subCtx, id.ID) - require.Nil(t, rpcErr) - require.True(t, ok) + require.Equal(t, want, string(headerGot)) } func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) { @@ -372,15 +346,14 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) { handler.WithIDGen(func() uint64 { return firstID }) require.NoError(t, conn1.Write(ctx, websocket.MessageText, subscribeMsg)) - want := `{"jsonrpc":"2.0","result":{"subscription_id":%d},"id":1}` - firstWant := fmt.Sprintf(want, firstID) + firstWant := fmt.Sprintf(subscribeResponse, firstID) _, firstGot, err := conn1.Read(ctx) require.NoError(t, err) require.Equal(t, firstWant, string(firstGot)) handler.WithIDGen(func() uint64 { return secondID }) require.NoError(t, conn2.Write(ctx, websocket.MessageText, subscribeMsg)) - secondWant := fmt.Sprintf(want, secondID) + secondWant := fmt.Sprintf(subscribeResponse, secondID) _, secondGot, err := conn2.Read(ctx) require.NoError(t, err) require.Equal(t, secondWant, string(secondGot)) @@ -405,6 +378,7 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) { } func TestSubscribeNewHeadsHistorical(t *testing.T) { + log := utils.NewNopZapLogger() client := feeder.NewTestClient(t, &utils.Mainnet) gw := adaptfeeder.New(client) @@ -432,71 +406,53 @@ func TestSubscribeNewHeadsHistorical(t *testing.T) { // Sleep for a moment just in case. time.Sleep(50 * time.Millisecond) - serverConn, clientConn := net.Pipe() - t.Cleanup(func() { - require.NoError(t, serverConn.Close()) - require.NoError(t, clientConn.Close()) - }) + server := jsonrpc.NewServer(1, log) + require.NoError(t, server.RegisterMethods(jsonrpc.Method{ + Name: "starknet_subscribeNewHeads", + Params: []jsonrpc.Parameter{{Name: "block", Optional: true}}, + Handler: handler.SubscribeNewHeads, + })) + ws := jsonrpc.NewWebsocket(server, log) + httpSrv := httptest.NewServer(ws) - subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + conn, _, err := websocket.Dial(ctx, httpSrv.URL, nil) + require.NoError(t, err) - // Subscribe to a block that doesn't exist. - id, rpcErr := handler.SubscribeNewHeads(subCtx, &rpc.BlockID{Number: 1025}) - require.Equal(t, rpc.ErrBlockNotFound, rpcErr) - require.Zero(t, id) + id := uint64(1) + handler.WithIDGen(func() uint64 { return id }) - // Subscribe to a block that exists. - id, rpcErr = handler.SubscribeNewHeads(subCtx, &rpc.BlockID{Number: 0}) - require.Nil(t, rpcErr) - require.NotZero(t, id) + subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads", "params":{"block":{"block_number":0}}}`) + require.NoError(t, conn.Write(ctx, websocket.MessageText, subscribeMsg)) - // Check block 0 content - want := `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x47c3637b57c2b079b93c61539950c17e868a28f46cdef28f88521067f21e943","parent_hash":"0x0","block_number":0,"new_root":"0x21870ba80540e7831fb21c591ee93481f5ae1bb71ff85a86ddd465be4eddee6","timestamp":1637069048,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}` - want = fmt.Sprintf(want, id.ID) - got := make([]byte, len(want)) - _, err = clientConn.Read(got) + want := fmt.Sprintf(subscribeResponse, id) + _, got, err := conn.Read(ctx) require.NoError(t, err) require.Equal(t, want, string(got)) + // Check block 0 content + want = `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x47c3637b57c2b079b93c61539950c17e868a28f46cdef28f88521067f21e943","parent_hash":"0x0","block_number":0,"new_root":"0x21870ba80540e7831fb21c591ee93481f5ae1bb71ff85a86ddd465be4eddee6","timestamp":1637069048,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}` + want = fmt.Sprintf(want, id) + _, block0Got, err := conn.Read(ctx) + require.NoError(t, err) + require.Equal(t, want, string(block0Got)) + // Simulate a new block syncer.newHeads.Send(testHeader(t)) // Check new block content - want = fmt.Sprintf(newHeadsResponse, id.ID) - got = make([]byte, len(want)) - _, err = clientConn.Read(got) + want = fmt.Sprintf(newHeadsResponse, id) + _, newBlockGot, err := conn.Read(ctx) require.NoError(t, err) - require.Equal(t, want, string(got)) -} - -func testHeader(t *testing.T) *core.Header { - t.Helper() - - header := &core.Header{ - Hash: utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"), - ParentHash: utils.HexToFelt(t, "0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb"), - Number: 2, - GlobalStateRoot: utils.HexToFelt(t, "0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9"), - Timestamp: 1637084470, - SequencerAddress: utils.HexToFelt(t, "0x0"), - L1DataGasPrice: &core.GasPrice{ - PriceInFri: utils.HexToFelt(t, "0x0"), - PriceInWei: utils.HexToFelt(t, "0x0"), - }, - GasPrice: utils.HexToFelt(t, "0x0"), - GasPriceSTRK: utils.HexToFelt(t, "0x0"), - L1DAMode: core.Calldata, - ProtocolVersion: "", - } - return header + require.Equal(t, want, string(newBlockGot)) } func TestSubscriptionReorg(t *testing.T) { t.Parallel() + log := utils.NewNopZapLogger() chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) syncer := newFakeSyncer() - handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger()) + handler := rpc.New(chain, syncer, nil, "", log) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -506,18 +462,28 @@ func TestSubscriptionReorg(t *testing.T) { }() time.Sleep(50 * time.Millisecond) - serverConn, clientConn := net.Pipe() - t.Cleanup(func() { - require.NoError(t, serverConn.Close()) - require.NoError(t, clientConn.Close()) - }) + server := jsonrpc.NewServer(1, log) + require.NoError(t, server.RegisterMethods(jsonrpc.Method{ + Name: "starknet_subscribeNewHeads", + Params: []jsonrpc.Parameter{{Name: "block", Optional: true}}, + Handler: handler.SubscribeNewHeads, + })) + ws := jsonrpc.NewWebsocket(server, log) + httpSrv := httptest.NewServer(ws) + + conn, _, err := websocket.Dial(ctx, httpSrv.URL, nil) + require.NoError(t, err) - subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + id := uint64(1) + handler.WithIDGen(func() uint64 { return id }) - // Subscribe to new heads which will send a - id, rpcErr := handler.SubscribeNewHeads(subCtx, nil) - require.Nil(t, rpcErr) - require.NotZero(t, id) + subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeNewHeads"}`) + require.NoError(t, conn.Write(ctx, websocket.MessageText, subscribeMsg)) + + want := fmt.Sprintf(subscribeResponse, id) + _, got, err := conn.Read(ctx) + require.NoError(t, err) + require.Equal(t, want, string(got)) // Simulate a reorg syncer.reorgs.Send(&sync.ReorgData{ @@ -528,10 +494,231 @@ func TestSubscriptionReorg(t *testing.T) { }) // Receive reorg event - want := `{"jsonrpc":"2.0","method":"starknet_subscriptionReorg","params":{"result":{"starting_block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","starting_block_number":0,"ending_block_hash":"0x34e815552e42c5eb5233b99de2d3d7fd396e575df2719bf98e7ed2794494f86","ending_block_number":2},"subscription_id":%d}}` - want = fmt.Sprintf(want, id.ID) - got := make([]byte, len(want)) - _, err := clientConn.Read(got) + want = `{"jsonrpc":"2.0","method":"starknet_subscriptionReorg","params":{"result":{"starting_block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","starting_block_number":0,"ending_block_hash":"0x34e815552e42c5eb5233b99de2d3d7fd396e575df2719bf98e7ed2794494f86","ending_block_number":2},"subscription_id":%d}}` + want = fmt.Sprintf(want, id) + _, reorgGot, err := conn.Read(ctx) + require.NoError(t, err) + require.Equal(t, want, string(reorgGot)) +} + +func TestSubscribePendingTxs(t *testing.T) { + t.Parallel() + + log := utils.NewNopZapLogger() + chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) + syncer := newFakeSyncer() + handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger()) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + go func() { + require.NoError(t, handler.Run(ctx)) + }() + time.Sleep(50 * time.Millisecond) + + server := jsonrpc.NewServer(1, log) + require.NoError(t, server.RegisterMethods(jsonrpc.Method{ + Name: "starknet_subscribePendingTransactions", + Params: []jsonrpc.Parameter{{Name: "transaction_details", Optional: true}, {Name: "sender_address", Optional: true}}, + Handler: handler.SubscribePendingTxs, + })) + ws := jsonrpc.NewWebsocket(server, log) + httpSrv := httptest.NewServer(ws) + + conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil) + require.NoError(t, err) + + subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribePendingTransactions"}`) + + id := uint64(1) + handler.WithIDGen(func() uint64 { return id }) + require.NoError(t, conn1.Write(ctx, websocket.MessageText, subscribeMsg)) + + want := fmt.Sprintf(subscribeResponse, id) + _, got, err := conn1.Read(ctx) require.NoError(t, err) require.Equal(t, want, string(got)) + + hash1 := new(felt.Felt).SetUint64(1) + addr1 := new(felt.Felt).SetUint64(11) + + hash2 := new(felt.Felt).SetUint64(2) + addr2 := new(felt.Felt).SetUint64(22) + + hash3 := new(felt.Felt).SetUint64(3) + hash4 := new(felt.Felt).SetUint64(4) + hash5 := new(felt.Felt).SetUint64(5) + + syncer.pendingTxs.Send([]core.Transaction{ + &core.InvokeTransaction{TransactionHash: hash1, SenderAddress: addr1}, + &core.DeclareTransaction{TransactionHash: hash2, SenderAddress: addr2}, + &core.DeployTransaction{TransactionHash: hash3}, + &core.DeployAccountTransaction{DeployTransaction: core.DeployTransaction{TransactionHash: hash4}}, + &core.L1HandlerTransaction{TransactionHash: hash5}, + }) + + want = `{"jsonrpc":"2.0","method":"starknet_subscriptionPendingTransactions","params":{"result":["0x1","0x2","0x3","0x4","0x5"],"subscription_id":%d}}` + want = fmt.Sprintf(want, id) + _, pendingTxsGot, err := conn1.Read(ctx) + require.NoError(t, err) + require.Equal(t, want, string(pendingTxsGot)) +} + +func TestSubscribePendingTxsFilter(t *testing.T) { + t.Parallel() + + log := utils.NewNopZapLogger() + chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) + syncer := newFakeSyncer() + handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger()) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + go func() { + require.NoError(t, handler.Run(ctx)) + }() + time.Sleep(50 * time.Millisecond) + + server := jsonrpc.NewServer(1, log) + require.NoError(t, server.RegisterMethods(jsonrpc.Method{ + Name: "starknet_subscribePendingTransactions", + Params: []jsonrpc.Parameter{{Name: "transaction_details", Optional: true}, {Name: "sender_address", Optional: true}}, + Handler: handler.SubscribePendingTxs, + })) + ws := jsonrpc.NewWebsocket(server, log) + httpSrv := httptest.NewServer(ws) + + conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil) + require.NoError(t, err) + + subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribePendingTransactions", "params":{"sender_address":["0xb", "0x16"]}}`) + + id := uint64(1) + handler.WithIDGen(func() uint64 { return id }) + require.NoError(t, conn1.Write(ctx, websocket.MessageText, subscribeMsg)) + + want := fmt.Sprintf(subscribeResponse, id) + _, got, err := conn1.Read(ctx) + require.NoError(t, err) + require.Equal(t, want, string(got)) + + hash1 := new(felt.Felt).SetUint64(1) + addr1 := new(felt.Felt).SetUint64(11) + + hash2 := new(felt.Felt).SetUint64(2) + addr2 := new(felt.Felt).SetUint64(22) + + hash3 := new(felt.Felt).SetUint64(3) + hash4 := new(felt.Felt).SetUint64(4) + hash5 := new(felt.Felt).SetUint64(5) + + hash6 := new(felt.Felt).SetUint64(6) + addr6 := new(felt.Felt).SetUint64(66) + + hash7 := new(felt.Felt).SetUint64(7) + addr7 := new(felt.Felt).SetUint64(77) + + syncer.pendingTxs.Send([]core.Transaction{ + &core.InvokeTransaction{TransactionHash: hash1, SenderAddress: addr1}, + &core.DeclareTransaction{TransactionHash: hash2, SenderAddress: addr2}, + &core.DeployTransaction{TransactionHash: hash3}, + &core.DeployAccountTransaction{DeployTransaction: core.DeployTransaction{TransactionHash: hash4}}, + &core.L1HandlerTransaction{TransactionHash: hash5}, + &core.InvokeTransaction{TransactionHash: hash6, SenderAddress: addr6}, + &core.DeclareTransaction{TransactionHash: hash7, SenderAddress: addr7}, + }) + + want = `{"jsonrpc":"2.0","method":"starknet_subscriptionPendingTransactions","params":{"result":["0x1","0x2"],"subscription_id":%d}}` + want = fmt.Sprintf(want, id) + _, pendingTxsGot, err := conn1.Read(ctx) + require.NoError(t, err) + require.Equal(t, want, string(pendingTxsGot)) +} + +func TestSubscribePendingTxsFullDetails(t *testing.T) { + t.Parallel() + + log := utils.NewNopZapLogger() + chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet) + syncer := newFakeSyncer() + handler := rpc.New(chain, syncer, nil, "", log) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + go func() { + require.NoError(t, handler.Run(ctx)) + }() + time.Sleep(50 * time.Millisecond) + + server := jsonrpc.NewServer(1, log) + require.NoError(t, server.RegisterMethods(jsonrpc.Method{ + Name: "starknet_subscribePendingTransactions", + Params: []jsonrpc.Parameter{{Name: "transaction_details", Optional: true}, {Name: "sender_address", Optional: true}}, + Handler: handler.SubscribePendingTxs, + })) + ws := jsonrpc.NewWebsocket(server, log) + httpSrv := httptest.NewServer(ws) + + conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil) + require.NoError(t, err) + + subscribeMsg := []byte(`{"jsonrpc":"2.0","id":1,"method":"starknet_subscribePendingTransactions", "params":{"transaction_details": true}}`) + + id := uint64(1) + handler.WithIDGen(func() uint64 { return id }) + require.NoError(t, conn1.Write(ctx, websocket.MessageText, subscribeMsg)) + + want := fmt.Sprintf(subscribeResponse, id) + _, got, err := conn1.Read(ctx) + require.NoError(t, err) + require.Equal(t, want, string(got)) + + syncer.pendingTxs.Send([]core.Transaction{ + &core.InvokeTransaction{ + TransactionHash: new(felt.Felt).SetUint64(1), + CallData: []*felt.Felt{new(felt.Felt).SetUint64(2)}, + TransactionSignature: []*felt.Felt{new(felt.Felt).SetUint64(3)}, + MaxFee: new(felt.Felt).SetUint64(4), + ContractAddress: new(felt.Felt).SetUint64(5), + Version: new(core.TransactionVersion).SetUint64(3), + EntryPointSelector: new(felt.Felt).SetUint64(6), + Nonce: new(felt.Felt).SetUint64(7), + SenderAddress: new(felt.Felt).SetUint64(8), + ResourceBounds: map[core.Resource]core.ResourceBounds{}, + Tip: 9, + PaymasterData: []*felt.Felt{new(felt.Felt).SetUint64(10)}, + AccountDeploymentData: []*felt.Felt{new(felt.Felt).SetUint64(11)}, + }, + }) + + want = `{"jsonrpc":"2.0","method":"starknet_subscriptionPendingTransactions","params":{"result":[{"transaction_hash":"0x1","type":"INVOKE","version":"0x3","nonce":"0x7","max_fee":"0x4","contract_address":"0x5","sender_address":"0x8","signature":["0x3"],"calldata":["0x2"],"entry_point_selector":"0x6","resource_bounds":{},"tip":"0x9","paymaster_data":["0xa"],"account_deployment_data":["0xb"],"nonce_data_availability_mode":"L1","fee_data_availability_mode":"L1"}],"subscription_id":%d}}` + want = fmt.Sprintf(want, id) + _, pendingTxsGot, err := conn1.Read(ctx) + require.NoError(t, err) + require.Equal(t, want, string(pendingTxsGot)) +} + +func testHeader(t *testing.T) *core.Header { + t.Helper() + + header := &core.Header{ + Hash: utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"), + ParentHash: utils.HexToFelt(t, "0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb"), + Number: 2, + GlobalStateRoot: utils.HexToFelt(t, "0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9"), + Timestamp: 1637084470, + SequencerAddress: utils.HexToFelt(t, "0x0"), + L1DataGasPrice: &core.GasPrice{ + PriceInFri: utils.HexToFelt(t, "0x0"), + PriceInWei: utils.HexToFelt(t, "0x0"), + }, + GasPrice: utils.HexToFelt(t, "0x0"), + GasPriceSTRK: utils.HexToFelt(t, "0x0"), + L1DAMode: core.Calldata, + ProtocolVersion: "", + } + return header } diff --git a/rpc/handlers.go b/rpc/handlers.go index 06080a196..b5164c7a0 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -56,7 +56,8 @@ var ( ErrUnsupportedContractClassVersion = &jsonrpc.Error{Code: 62, Message: "the contract class version is not supported"} ErrUnexpectedError = &jsonrpc.Error{Code: 63, Message: "An unexpected error occurred"} - ErrTooManyBlocksBack = &jsonrpc.Error{Code: 68, Message: "Cannot go back more than 1024 blocks"} + ErrTooManyAddressesInFilter = &jsonrpc.Error{Code: 67, Message: "Too many addresses in filter sender_address filter"} + ErrTooManyBlocksBack = &jsonrpc.Error{Code: 68, Message: "Cannot go back more than 1024 blocks"} // These errors can be only be returned by Juno-specific methods. ErrSubscriptionNotFound = &jsonrpc.Error{Code: 100, Message: "Subscription not found"} @@ -81,9 +82,10 @@ type Handler struct { vm vm.VM log utils.Logger - version string - newHeads *feed.Feed[*core.Header] - reorgs *feed.Feed[*sync.ReorgData] + version string + newHeads *feed.Feed[*core.Header] + reorgs *feed.Feed[*sync.ReorgData] + pendingTxs *feed.Feed[[]core.Transaction] idgen func() uint64 mu stdsync.Mutex // protects subscriptions. @@ -118,6 +120,7 @@ func New(bcReader blockchain.Reader, syncReader sync.Reader, virtualMachine vm.V version: version, newHeads: feed.New[*core.Header](), reorgs: feed.New[*sync.ReorgData](), + pendingTxs: feed.New[[]core.Transaction](), subscriptions: make(map[uint64]*subscription), blockTraceCache: lru.NewCache[traceCacheKey, []TracedBlockTransaction](traceCacheSize), @@ -154,10 +157,13 @@ func (h *Handler) WithGateway(gatewayClient Gateway) *Handler { func (h *Handler) Run(ctx context.Context) error { newHeadsSub := h.syncReader.SubscribeNewHeads().Subscription reorgsSub := h.syncReader.SubscribeReorg().Subscription + pendingTxsSub := h.syncReader.SubscribePendingTxs().Subscription defer newHeadsSub.Unsubscribe() defer reorgsSub.Unsubscribe() + defer pendingTxsSub.Unsubscribe() feed.Tee(newHeadsSub, h.newHeads) feed.Tee(reorgsSub, h.reorgs) + feed.Tee(pendingTxsSub, h.pendingTxs) <-ctx.Done() for _, sub := range h.subscriptions { @@ -478,11 +484,17 @@ func (h *Handler) MethodsV0_7() ([]jsonrpc.Method, string) { //nolint: funlen, d Name: "starknet_specVersion", Handler: h.SpecVersion, }, + { Name: "starknet_subscribeNewHeads", Params: []jsonrpc.Parameter{{Name: "block", Optional: true}}, Handler: h.SubscribeNewHeads, }, + { + Name: "starknet_subscribePendingTransactions", + Params: []jsonrpc.Parameter{{Name: "transaction_details", Optional: true}, {Name: "sender_address", Optional: true}}, + Handler: h.SubscribePendingTxs, + }, { Name: "juno_unsubscribe", Params: []jsonrpc.Parameter{{Name: "id"}}, diff --git a/sync/sync.go b/sync/sync.go index f89d831f5..b93d4ef8d 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -39,6 +39,10 @@ type ReorgSubscription struct { *feed.Subscription[*ReorgData] } +type PendingTxSubscription struct { + *feed.Subscription[[]core.Transaction] +} + // Todo: Since this is also going to be implemented by p2p package we should move this interface to node package // //go:generate mockgen -destination=../mocks/mock_synchronizer.go -package=mocks -mock_names Reader=MockSyncReader github.com/NethermindEth/juno/sync Reader @@ -47,6 +51,7 @@ type Reader interface { HighestBlockHeader() *core.Header SubscribeNewHeads() HeaderSubscription SubscribeReorg() ReorgSubscription + SubscribePendingTxs() PendingTxSubscription } // This is temporary and will be removed once the p2p synchronizer implements this interface. @@ -68,6 +73,10 @@ func (n *NoopSynchronizer) SubscribeReorg() ReorgSubscription { return ReorgSubscription{feed.New[*ReorgData]().Subscribe()} } +func (n *NoopSynchronizer) SubscribePendingTxs() PendingTxSubscription { + return PendingTxSubscription{feed.New[[]core.Transaction]().Subscribe()} +} + // ReorgData represents data about reorganised blocks, starting and ending block number and hash type ReorgData struct { // StartBlockHash is the hash of the first known block of the orphaned chain @@ -89,6 +98,7 @@ type Synchronizer struct { highestBlockHeader atomic.Pointer[core.Header] newHeads *feed.Feed[*core.Header] reorgFeed *feed.Feed[*ReorgData] + pendingTxsFeed *feed.Feed[[]core.Transaction] log utils.SimpleLogger listener EventListener @@ -109,6 +119,7 @@ func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData, log: log, newHeads: feed.New[*core.Header](), reorgFeed: feed.New[*ReorgData](), + pendingTxsFeed: feed.New[[]core.Transaction](), pendingPollInterval: pendingPollInterval, listener: &SelectiveListener{}, readOnlyBlockchain: readOnlyBlockchain, @@ -515,6 +526,9 @@ func (s *Synchronizer) fetchAndStorePending(ctx context.Context) error { return err } + // send the pending transactions to the feed + s.pendingTxsFeed.Send(pendingBlock.Transactions) + s.log.Debugw("Found pending block", "txns", pendingBlock.TransactionCount) return s.blockchain.StorePending(&blockchain.Pending{ Block: pendingBlock, @@ -545,3 +559,9 @@ func (s *Synchronizer) SubscribeReorg() ReorgSubscription { Subscription: s.reorgFeed.Subscribe(), } } + +func (s *Synchronizer) SubscribePendingTxs() PendingTxSubscription { + return PendingTxSubscription{ + Subscription: s.pendingTxsFeed.Subscribe(), + } +} diff --git a/sync/sync_test.go b/sync/sync_test.go index 6a694d34e..3f5281b72 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -21,6 +21,8 @@ import ( "go.uber.org/mock/gomock" ) +var emptyCommitments = core.BlockCommitments{} + const timeout = time.Second func TestSyncBlocks(t *testing.T) { @@ -230,3 +232,28 @@ func TestSubscribeNewHeads(t *testing.T) { require.Equal(t, want.Header, got) sub.Unsubscribe() } + +func TestSubscribePendingTxs(t *testing.T) { + t.Parallel() + + client := feeder.NewTestClient(t, &utils.Mainnet) + gw := adaptfeeder.New(client) + + testDB := pebble.NewMemTest(t) + log := utils.NewNopZapLogger() + bc := blockchain.New(testDB, &utils.Mainnet) + synchronizer := sync.New(bc, gw, log, time.Millisecond*100, false) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + + sub := synchronizer.SubscribePendingTxs() + + require.NoError(t, synchronizer.Run(ctx)) + cancel() + + pending, err := bc.Pending() + require.NoError(t, err) + pendingTxs, ok := <-sub.Recv() + require.True(t, ok) + require.Equal(t, pending.Block.Transactions, pendingTxs) + sub.Unsubscribe() +}