From d24a95493b4e1f8f544091ac2e6c62468d05a60f Mon Sep 17 00:00:00 2001 From: IronGauntlets Date: Wed, 6 Nov 2024 12:34:46 +0000 Subject: [PATCH 1/8] Add subscribeEvents and subcriptionEvent --- rpc/events.go | 203 +++++++++++++++++++++++++++++++++++++++++++++ rpc/events_test.go | 44 ++++++++++ rpc/handlers.go | 7 ++ 3 files changed, 254 insertions(+) diff --git a/rpc/events.go b/rpc/events.go index 002c0e077b..6fd10a894e 100644 --- a/rpc/events.go +++ b/rpc/events.go @@ -5,6 +5,7 @@ import ( "encoding/json" "github.com/NethermindEth/juno/blockchain" + "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/jsonrpc" ) @@ -44,6 +45,16 @@ type EventsChunk struct { ContinuationToken string `json:"continuation_token,omitempty"` } +type EventSubscription struct { + From *felt.Felt `json:"from_address"` + Keys [][]felt.Felt `json:"keys"` + FromBlock *BlockID `json:"block"` +} + +type SubscriptionID struct { + ID uint64 `json:"subscription_id"` +} + /**************************************************** Events Handlers *****************************************************/ @@ -112,6 +123,198 @@ func (h *Handler) Unsubscribe(ctx context.Context, id uint64) (bool, *jsonrpc.Er return true, nil } +const subscribeEventsChunkSize = 1024 + +func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys [][]felt.Felt, + blockID *BlockID, +) (*SubscriptionID, *jsonrpc.Error) { + w, ok := jsonrpc.ConnFromContext(ctx) + if !ok { + return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil) + } + + lenKeys := len(keys) + for _, k := range keys { + lenKeys += len(k) + } + if lenKeys > maxEventFilterKeys { + return nil, ErrTooManyKeysInFilter + } + + var requestedHeader *core.Header + headHeader, err := h.bcReader.HeadsHeader() + if err != nil { + return nil, ErrInternal.CloneWithData(err.Error()) + } + + if blockID == nil { + requestedHeader = headHeader + } else { + requestedHeader, rpcErr := h.blockHeaderByID(blockID) + if rpcErr != nil { + return nil, rpcErr + } + + // Todo: should the pending block be included in the head count? + if requestedHeader.Number >= maxBlocksBack && requestedHeader.Number <= headHeader.Number-maxBlocksBack { + return nil, ErrTooManyBlocksBack + } + } + + id := h.idgen() + subscriptionCtx, subscriptionCtxCancel := context.WithCancel(ctx) + sub := &subscription{ + cancel: subscriptionCtxCancel, + conn: w, + } + h.mu.Lock() + h.subscriptions[id] = sub + h.mu.Unlock() + + headerSub := h.newHeads.Subscribe() + sub.wg.Go(func() { + defer func() { + h.unsubscribe(sub, id) + headerSub.Unsubscribe() + }() + + // The specification doesn't enforce ordering of events therefore events from new blocks can be sent before + // old blocks. + // Todo: DRY + sub.wg.Go(func() { + for { + select { + case <-subscriptionCtx.Done(): + return + case header := <-headerSub.Recv(): + filter, err := h.bcReader.EventFilter(fromAddr, keys) + if err != nil { + h.log.Warnw("Error creating event filter", "err", err) + return + } + defer h.callAndLogErr(filter.Close, "Error closing event filter in events subscription") + + if err = setEventFilterRange(filter, &BlockID{Number: header.Number}, + &BlockID{Number: header.Number}, header.Number); err != nil { + h.log.Warnw("Error setting event filter range", "err", err) + return + } + + var cToken *blockchain.ContinuationToken + filteredEvents, cToken, err := filter.Events(cToken, subscribeEventsChunkSize) + if err != nil { + h.log.Warnw("Error filtering events", "err", err) + return + } + + err = sendEvents(subscriptionCtx, w, filteredEvents, id) + if err != nil { + h.log.Warnw("Error sending events", "err", err) + return + } + + for cToken != nil { + filteredEvents, cToken, err = filter.Events(cToken, subscribeEventsChunkSize) + if err != nil { + h.log.Warnw("Error filtering events", "err", err) + return + } + + err = sendEvents(subscriptionCtx, w, filteredEvents, id) + if err != nil { + h.log.Warnw("Error sending events", "err", err) + return + } + } + } + } + }) + + filter, err := h.bcReader.EventFilter(fromAddr, keys) + if err != nil { + h.log.Warnw("Error creating event filter", "err", err) + return + } + defer h.callAndLogErr(filter.Close, "Error closing event filter in events subscription") + + if err = setEventFilterRange(filter, &BlockID{Number: requestedHeader.Number}, + &BlockID{Number: headHeader.Number}, headHeader.Number); err != nil { + h.log.Warnw("Error setting event filter range", "err", err) + return + } + + var cToken *blockchain.ContinuationToken + filteredEvents, cToken, err := filter.Events(cToken, subscribeEventsChunkSize) + if err != nil { + h.log.Warnw("Error filtering events", "err", err) + return + } + + err = sendEvents(subscriptionCtx, w, filteredEvents, id) + if err != nil { + h.log.Warnw("Error sending events", "err", err) + return + } + + for cToken != nil { + filteredEvents, cToken, err = filter.Events(cToken, subscribeEventsChunkSize) + if err != nil { + h.log.Warnw("Error filtering events", "err", err) + return + } + + err = sendEvents(subscriptionCtx, w, filteredEvents, id) + if err != nil { + h.log.Warnw("Error sending events", "err", err) + return + } + } + }) + + return &SubscriptionID{ID: id}, nil +} + +func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.FilteredEvent, id uint64) error { + for _, event := range events { + select { + case <-ctx.Done(): + return ctx.Err() + default: + // Pending block doesn't have a number + var blockNumber *uint64 + if event.BlockHash != nil { + blockNumber = &(event.BlockNumber) + } + emittedEvent := &EmittedEvent{ + BlockNumber: blockNumber, + BlockHash: event.BlockHash, + TransactionHash: event.TransactionHash, + Event: &Event{ + From: event.From, + Keys: event.Keys, + Data: event.Data, + }, + } + + resp, err := json.Marshal(jsonrpc.Request{ + Version: "2.0", + Method: "starknet_subscriptionEvents", + Params: map[string]any{ + "subscription_id": id, + "result": emittedEvent, + }, + }) + if err != nil { + return err + } + + _, err = w.Write(resp) + return err + } + } + return nil +} + // Events gets the events matching a filter // // It follows the specification defined here: diff --git a/rpc/events_test.go b/rpc/events_test.go index 7f8b483987..f7715c1f31 100644 --- a/rpc/events_test.go +++ b/rpc/events_test.go @@ -229,6 +229,50 @@ func (fc *fakeConn) Equal(other jsonrpc.Conn) bool { return fc.w == fc2.w } +func TestSubscribeEventsAndUnsubscribe(t *testing.T) { + t.Parallel() + log := utils.NewNopZapLogger() + n := utils.Ptr(utils.Mainnet) + client := feeder.NewTestClient(t, n) + gw := adaptfeeder.New(client) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + chain := blockchain.New(pebble.NewMemTest(t), n) + syncer := sync.New(chain, gw, log, 0, false) + handler := rpc.New(chain, syncer, nil, "", log) + + go func() { + require.NoError(t, handler.Run(ctx)) + }() + // Technically, there's a race between goroutine above and the SubscribeNewHeads call down below. + // Sleep for a moment just in case. + time.Sleep(50 * time.Millisecond) + + serverConn, clientConn := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + require.NoError(t, clientConn.Close()) + }) + + t.Run("Too many keys in filter", func(t *testing.T) { + keys := make([][]felt.Felt, 1024+1) + fromAddr := new(felt.Felt).SetBytes([]byte("from_address")) + id, rpcErr := handler.SubscribeEvents(ctx, fromAddr, keys, nil) + assert.Zero(t, id) + assert.Equal(t, rpc.ErrTooManyKeysInFilter, rpcErr) + }) + + // Todo: use mocks to fix the tests + t.Run("Too many blocks back", func(t *testing.T) { + keys := make([][]felt.Felt, 1) + fromAddr := new(felt.Felt).SetBytes([]byte("from_address")) + blockID := &rpc.BlockID{Number: 0} + id, rpcErr := handler.SubscribeEvents(ctx, fromAddr, keys, blockID) + assert.Zero(t, id) + assert.Equal(t, rpc.ErrTooManyBlocksBack, rpcErr) + }) +} + func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) { t.Parallel() log := utils.NewNopZapLogger() diff --git a/rpc/handlers.go b/rpc/handlers.go index 3ae27684c7..4b5d21e4ee 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -55,12 +55,14 @@ var ( ErrUnsupportedTxVersion = &jsonrpc.Error{Code: 61, Message: "the transaction version is not supported"} ErrUnsupportedContractClassVersion = &jsonrpc.Error{Code: 62, Message: "the contract class version is not supported"} ErrUnexpectedError = &jsonrpc.Error{Code: 63, Message: "An unexpected error occurred"} + ErrTooManyBlocksBack = &jsonrpc.Error{Code: 68, Message: "Cannot go back more than 1024 blocks"} // These errors can be only be returned by Juno-specific methods. ErrSubscriptionNotFound = &jsonrpc.Error{Code: 100, Message: "Subscription not found"} ) const ( + maxBlocksBack = 1024 maxEventChunkSize = 10240 maxEventFilterKeys = 1024 traceCacheSize = 128 @@ -311,6 +313,11 @@ func (h *Handler) Methods() ([]jsonrpc.Method, string) { //nolint: funlen Name: "starknet_specVersion", Handler: h.SpecVersion, }, + { + Name: "starknet_subscribeEvents", + Params: []jsonrpc.Parameter{{Name: "from_address"}, {Name: "keys"}, {Name: "block", Optional: true}}, + Handler: h.SubscribeEvents, + }, { Name: "juno_subscribeNewHeads", Handler: h.SubscribeNewHeads, From 98e2dd35533e67c0f114d77cfb1f0edc534adfa9 Mon Sep 17 00:00:00 2001 From: IronGauntlets Date: Sun, 10 Nov 2024 23:38:41 +0000 Subject: [PATCH 2/8] Add subscription.go --- rpc/events.go | 199 ------------------------------------------ rpc/subscriptions.go | 203 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 203 insertions(+), 199 deletions(-) create mode 100644 rpc/subscriptions.go diff --git a/rpc/events.go b/rpc/events.go index 6fd10a894e..85ee68f812 100644 --- a/rpc/events.go +++ b/rpc/events.go @@ -5,7 +5,6 @@ import ( "encoding/json" "github.com/NethermindEth/juno/blockchain" - "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/jsonrpc" ) @@ -45,12 +44,6 @@ type EventsChunk struct { ContinuationToken string `json:"continuation_token,omitempty"` } -type EventSubscription struct { - From *felt.Felt `json:"from_address"` - Keys [][]felt.Felt `json:"keys"` - FromBlock *BlockID `json:"block"` -} - type SubscriptionID struct { ID uint64 `json:"subscription_id"` } @@ -123,198 +116,6 @@ func (h *Handler) Unsubscribe(ctx context.Context, id uint64) (bool, *jsonrpc.Er return true, nil } -const subscribeEventsChunkSize = 1024 - -func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys [][]felt.Felt, - blockID *BlockID, -) (*SubscriptionID, *jsonrpc.Error) { - w, ok := jsonrpc.ConnFromContext(ctx) - if !ok { - return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil) - } - - lenKeys := len(keys) - for _, k := range keys { - lenKeys += len(k) - } - if lenKeys > maxEventFilterKeys { - return nil, ErrTooManyKeysInFilter - } - - var requestedHeader *core.Header - headHeader, err := h.bcReader.HeadsHeader() - if err != nil { - return nil, ErrInternal.CloneWithData(err.Error()) - } - - if blockID == nil { - requestedHeader = headHeader - } else { - requestedHeader, rpcErr := h.blockHeaderByID(blockID) - if rpcErr != nil { - return nil, rpcErr - } - - // Todo: should the pending block be included in the head count? - if requestedHeader.Number >= maxBlocksBack && requestedHeader.Number <= headHeader.Number-maxBlocksBack { - return nil, ErrTooManyBlocksBack - } - } - - id := h.idgen() - subscriptionCtx, subscriptionCtxCancel := context.WithCancel(ctx) - sub := &subscription{ - cancel: subscriptionCtxCancel, - conn: w, - } - h.mu.Lock() - h.subscriptions[id] = sub - h.mu.Unlock() - - headerSub := h.newHeads.Subscribe() - sub.wg.Go(func() { - defer func() { - h.unsubscribe(sub, id) - headerSub.Unsubscribe() - }() - - // The specification doesn't enforce ordering of events therefore events from new blocks can be sent before - // old blocks. - // Todo: DRY - sub.wg.Go(func() { - for { - select { - case <-subscriptionCtx.Done(): - return - case header := <-headerSub.Recv(): - filter, err := h.bcReader.EventFilter(fromAddr, keys) - if err != nil { - h.log.Warnw("Error creating event filter", "err", err) - return - } - defer h.callAndLogErr(filter.Close, "Error closing event filter in events subscription") - - if err = setEventFilterRange(filter, &BlockID{Number: header.Number}, - &BlockID{Number: header.Number}, header.Number); err != nil { - h.log.Warnw("Error setting event filter range", "err", err) - return - } - - var cToken *blockchain.ContinuationToken - filteredEvents, cToken, err := filter.Events(cToken, subscribeEventsChunkSize) - if err != nil { - h.log.Warnw("Error filtering events", "err", err) - return - } - - err = sendEvents(subscriptionCtx, w, filteredEvents, id) - if err != nil { - h.log.Warnw("Error sending events", "err", err) - return - } - - for cToken != nil { - filteredEvents, cToken, err = filter.Events(cToken, subscribeEventsChunkSize) - if err != nil { - h.log.Warnw("Error filtering events", "err", err) - return - } - - err = sendEvents(subscriptionCtx, w, filteredEvents, id) - if err != nil { - h.log.Warnw("Error sending events", "err", err) - return - } - } - } - } - }) - - filter, err := h.bcReader.EventFilter(fromAddr, keys) - if err != nil { - h.log.Warnw("Error creating event filter", "err", err) - return - } - defer h.callAndLogErr(filter.Close, "Error closing event filter in events subscription") - - if err = setEventFilterRange(filter, &BlockID{Number: requestedHeader.Number}, - &BlockID{Number: headHeader.Number}, headHeader.Number); err != nil { - h.log.Warnw("Error setting event filter range", "err", err) - return - } - - var cToken *blockchain.ContinuationToken - filteredEvents, cToken, err := filter.Events(cToken, subscribeEventsChunkSize) - if err != nil { - h.log.Warnw("Error filtering events", "err", err) - return - } - - err = sendEvents(subscriptionCtx, w, filteredEvents, id) - if err != nil { - h.log.Warnw("Error sending events", "err", err) - return - } - - for cToken != nil { - filteredEvents, cToken, err = filter.Events(cToken, subscribeEventsChunkSize) - if err != nil { - h.log.Warnw("Error filtering events", "err", err) - return - } - - err = sendEvents(subscriptionCtx, w, filteredEvents, id) - if err != nil { - h.log.Warnw("Error sending events", "err", err) - return - } - } - }) - - return &SubscriptionID{ID: id}, nil -} - -func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.FilteredEvent, id uint64) error { - for _, event := range events { - select { - case <-ctx.Done(): - return ctx.Err() - default: - // Pending block doesn't have a number - var blockNumber *uint64 - if event.BlockHash != nil { - blockNumber = &(event.BlockNumber) - } - emittedEvent := &EmittedEvent{ - BlockNumber: blockNumber, - BlockHash: event.BlockHash, - TransactionHash: event.TransactionHash, - Event: &Event{ - From: event.From, - Keys: event.Keys, - Data: event.Data, - }, - } - - resp, err := json.Marshal(jsonrpc.Request{ - Version: "2.0", - Method: "starknet_subscriptionEvents", - Params: map[string]any{ - "subscription_id": id, - "result": emittedEvent, - }, - }) - if err != nil { - return err - } - - _, err = w.Write(resp) - return err - } - } - return nil -} - // Events gets the events matching a filter // // It follows the specification defined here: diff --git a/rpc/subscriptions.go b/rpc/subscriptions.go new file mode 100644 index 0000000000..6d95065057 --- /dev/null +++ b/rpc/subscriptions.go @@ -0,0 +1,203 @@ +package rpc + +import ( + "context" + "encoding/json" + + "github.com/NethermindEth/juno/blockchain" + "github.com/NethermindEth/juno/core" + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/jsonrpc" +) + +const subscribeEventsChunkSize = 1024 + +func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys [][]felt.Felt, + blockID *BlockID, +) (*SubscriptionID, *jsonrpc.Error) { + w, ok := jsonrpc.ConnFromContext(ctx) + if !ok { + return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil) + } + + lenKeys := len(keys) + for _, k := range keys { + lenKeys += len(k) + } + if lenKeys > maxEventFilterKeys { + return nil, ErrTooManyKeysInFilter + } + + var requestedHeader *core.Header + headHeader, err := h.bcReader.HeadsHeader() + if err != nil { + return nil, ErrInternal.CloneWithData(err.Error()) + } + + if blockID == nil { + requestedHeader = headHeader + } else { + requestedHeader, rpcErr := h.blockHeaderByID(blockID) + if rpcErr != nil { + return nil, rpcErr + } + + // Todo: should the pending block be included in the head count? + if requestedHeader.Number >= maxBlocksBack && requestedHeader.Number <= headHeader.Number-maxBlocksBack { + return nil, ErrTooManyBlocksBack + } + } + + id := h.idgen() + subscriptionCtx, subscriptionCtxCancel := context.WithCancel(ctx) + sub := &subscription{ + cancel: subscriptionCtxCancel, + conn: w, + } + h.mu.Lock() + h.subscriptions[id] = sub + h.mu.Unlock() + + headerSub := h.newHeads.Subscribe() + sub.wg.Go(func() { + defer func() { + h.unsubscribe(sub, id) + headerSub.Unsubscribe() + }() + + // The specification doesn't enforce ordering of events therefore events from new blocks can be sent before + // old blocks. + // Todo: DRY + sub.wg.Go(func() { + for { + select { + case <-subscriptionCtx.Done(): + return + case header := <-headerSub.Recv(): + filter, err := h.bcReader.EventFilter(fromAddr, keys) + if err != nil { + h.log.Warnw("Error creating event filter", "err", err) + return + } + defer h.callAndLogErr(filter.Close, "Error closing event filter in events subscription") + + if err = setEventFilterRange(filter, &BlockID{Number: header.Number}, + &BlockID{Number: header.Number}, header.Number); err != nil { + h.log.Warnw("Error setting event filter range", "err", err) + return + } + + var cToken *blockchain.ContinuationToken + filteredEvents, cToken, err := filter.Events(cToken, subscribeEventsChunkSize) + if err != nil { + h.log.Warnw("Error filtering events", "err", err) + return + } + + err = sendEvents(subscriptionCtx, w, filteredEvents, id) + if err != nil { + h.log.Warnw("Error sending events", "err", err) + return + } + + for cToken != nil { + filteredEvents, cToken, err = filter.Events(cToken, subscribeEventsChunkSize) + if err != nil { + h.log.Warnw("Error filtering events", "err", err) + return + } + + err = sendEvents(subscriptionCtx, w, filteredEvents, id) + if err != nil { + h.log.Warnw("Error sending events", "err", err) + return + } + } + } + } + }) + + filter, err := h.bcReader.EventFilter(fromAddr, keys) + if err != nil { + h.log.Warnw("Error creating event filter", "err", err) + return + } + defer h.callAndLogErr(filter.Close, "Error closing event filter in events subscription") + + if err = setEventFilterRange(filter, &BlockID{Number: requestedHeader.Number}, + &BlockID{Number: headHeader.Number}, headHeader.Number); err != nil { + h.log.Warnw("Error setting event filter range", "err", err) + return + } + + var cToken *blockchain.ContinuationToken + filteredEvents, cToken, err := filter.Events(cToken, subscribeEventsChunkSize) + if err != nil { + h.log.Warnw("Error filtering events", "err", err) + return + } + + err = sendEvents(subscriptionCtx, w, filteredEvents, id) + if err != nil { + h.log.Warnw("Error sending events", "err", err) + return + } + + for cToken != nil { + filteredEvents, cToken, err = filter.Events(cToken, subscribeEventsChunkSize) + if err != nil { + h.log.Warnw("Error filtering events", "err", err) + return + } + + err = sendEvents(subscriptionCtx, w, filteredEvents, id) + if err != nil { + h.log.Warnw("Error sending events", "err", err) + return + } + } + }) + + return &SubscriptionID{ID: id}, nil +} + +func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.FilteredEvent, id uint64) error { + for _, event := range events { + select { + case <-ctx.Done(): + return ctx.Err() + default: + // Pending block doesn't have a number + var blockNumber *uint64 + if event.BlockHash != nil { + blockNumber = &(event.BlockNumber) + } + emittedEvent := &EmittedEvent{ + BlockNumber: blockNumber, + BlockHash: event.BlockHash, + TransactionHash: event.TransactionHash, + Event: &Event{ + From: event.From, + Keys: event.Keys, + Data: event.Data, + }, + } + + resp, err := json.Marshal(jsonrpc.Request{ + Version: "2.0", + Method: "starknet_subscriptionEvents", + Params: map[string]any{ + "subscription_id": id, + "result": emittedEvent, + }, + }) + if err != nil { + return err + } + + _, err = w.Write(resp) + return err + } + } + return nil +} From e75669bd13725c1bba786ea010df185e069e4bc4 Mon Sep 17 00:00:00 2001 From: IronGauntlets Date: Mon, 11 Nov 2024 00:03:51 +0000 Subject: [PATCH 3/8] DRY processing of Events --- rpc/subscriptions.go | 105 ++++++++++++++----------------------------- 1 file changed, 34 insertions(+), 71 deletions(-) diff --git a/rpc/subscriptions.go b/rpc/subscriptions.go index 6d95065057..01186e55a2 100644 --- a/rpc/subscriptions.go +++ b/rpc/subscriptions.go @@ -67,98 +67,61 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys // The specification doesn't enforce ordering of events therefore events from new blocks can be sent before // old blocks. - // Todo: DRY sub.wg.Go(func() { for { select { case <-subscriptionCtx.Done(): return case header := <-headerSub.Recv(): - filter, err := h.bcReader.EventFilter(fromAddr, keys) - if err != nil { - h.log.Warnw("Error creating event filter", "err", err) - return - } - defer h.callAndLogErr(filter.Close, "Error closing event filter in events subscription") - - if err = setEventFilterRange(filter, &BlockID{Number: header.Number}, - &BlockID{Number: header.Number}, header.Number); err != nil { - h.log.Warnw("Error setting event filter range", "err", err) - return - } - - var cToken *blockchain.ContinuationToken - filteredEvents, cToken, err := filter.Events(cToken, subscribeEventsChunkSize) - if err != nil { - h.log.Warnw("Error filtering events", "err", err) - return - } - - err = sendEvents(subscriptionCtx, w, filteredEvents, id) - if err != nil { - h.log.Warnw("Error sending events", "err", err) - return - } - - for cToken != nil { - filteredEvents, cToken, err = filter.Events(cToken, subscribeEventsChunkSize) - if err != nil { - h.log.Warnw("Error filtering events", "err", err) - return - } - - err = sendEvents(subscriptionCtx, w, filteredEvents, id) - if err != nil { - h.log.Warnw("Error sending events", "err", err) - return - } - } + h.processEvents(subscriptionCtx, w, id, header.Number, headHeader.Number, fromAddr, keys) } } }) + h.processEvents(subscriptionCtx, w, id, requestedHeader.Number, headHeader.Number, fromAddr, keys) + }) - filter, err := h.bcReader.EventFilter(fromAddr, keys) - if err != nil { - h.log.Warnw("Error creating event filter", "err", err) - return - } - defer h.callAndLogErr(filter.Close, "Error closing event filter in events subscription") + return &SubscriptionID{ID: id}, nil +} - if err = setEventFilterRange(filter, &BlockID{Number: requestedHeader.Number}, - &BlockID{Number: headHeader.Number}, headHeader.Number); err != nil { - h.log.Warnw("Error setting event filter range", "err", err) - return - } +func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, to uint64, fromAddr *felt.Felt, keys [][]felt.Felt) { + filter, err := h.bcReader.EventFilter(fromAddr, keys) + if err != nil { + h.log.Warnw("Error creating event filter", "err", err) + return + } + defer h.callAndLogErr(filter.Close, "Error closing event filter in events subscription") + + if err = setEventFilterRange(filter, &BlockID{Number: from}, &BlockID{Number: to}, to); err != nil { + h.log.Warnw("Error setting event filter range", "err", err) + return + } + + var cToken *blockchain.ContinuationToken + filteredEvents, cToken, err := filter.Events(cToken, subscribeEventsChunkSize) + if err != nil { + h.log.Warnw("Error filtering events", "err", err) + return + } - var cToken *blockchain.ContinuationToken - filteredEvents, cToken, err := filter.Events(cToken, subscribeEventsChunkSize) + err = sendEvents(ctx, w, filteredEvents, id) + if err != nil { + h.log.Warnw("Error sending events", "err", err) + return + } + + for cToken != nil { + filteredEvents, cToken, err = filter.Events(cToken, subscribeEventsChunkSize) if err != nil { h.log.Warnw("Error filtering events", "err", err) return } - err = sendEvents(subscriptionCtx, w, filteredEvents, id) + err = sendEvents(ctx, w, filteredEvents, id) if err != nil { h.log.Warnw("Error sending events", "err", err) return } - - for cToken != nil { - filteredEvents, cToken, err = filter.Events(cToken, subscribeEventsChunkSize) - if err != nil { - h.log.Warnw("Error filtering events", "err", err) - return - } - - err = sendEvents(subscriptionCtx, w, filteredEvents, id) - if err != nil { - h.log.Warnw("Error sending events", "err", err) - return - } - } - }) - - return &SubscriptionID{ID: id}, nil + } } func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.FilteredEvent, id uint64) error { From 71a5c07eede7f4d383d8e8c03475c46e7a2e2a4a Mon Sep 17 00:00:00 2001 From: IronGauntlets Date: Mon, 11 Nov 2024 01:49:13 +0000 Subject: [PATCH 4/8] Fix nil pointer panic and wait for headerSub to finish --- rpc/subscriptions.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/rpc/subscriptions.go b/rpc/subscriptions.go index 01186e55a2..4fe88f21dc 100644 --- a/rpc/subscriptions.go +++ b/rpc/subscriptions.go @@ -3,6 +3,7 @@ package rpc import ( "context" "encoding/json" + "sync" "github.com/NethermindEth/juno/blockchain" "github.com/NethermindEth/juno/core" @@ -37,7 +38,8 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys if blockID == nil { requestedHeader = headHeader } else { - requestedHeader, rpcErr := h.blockHeaderByID(blockID) + var rpcErr *jsonrpc.Error + requestedHeader, rpcErr = h.blockHeaderByID(blockID) if rpcErr != nil { return nil, rpcErr } @@ -67,17 +69,26 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys // The specification doesn't enforce ordering of events therefore events from new blocks can be sent before // old blocks. - sub.wg.Go(func() { + // Todo: see if sub's wg can be used? + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + defer wg.Done() + for { select { case <-subscriptionCtx.Done(): return case header := <-headerSub.Recv(): - h.processEvents(subscriptionCtx, w, id, header.Number, headHeader.Number, fromAddr, keys) + h.processEvents(subscriptionCtx, w, id, header.Number, header.Number, fromAddr, keys) } } - }) + }() + h.processEvents(subscriptionCtx, w, id, requestedHeader.Number, headHeader.Number, fromAddr, keys) + + wg.Wait() }) return &SubscriptionID{ID: id}, nil From 1798e14eedc5eb73ef2fd6648101c88598a917c4 Mon Sep 17 00:00:00 2001 From: IronGauntlets Date: Mon, 18 Nov 2024 14:55:21 +0000 Subject: [PATCH 5/8] Stop exiting early --- rpc/subscriptions.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rpc/subscriptions.go b/rpc/subscriptions.go index 4fe88f21dc..26c64ab215 100644 --- a/rpc/subscriptions.go +++ b/rpc/subscriptions.go @@ -170,7 +170,9 @@ func sendEvents(ctx context.Context, w jsonrpc.Conn, events []*blockchain.Filter } _, err = w.Write(resp) - return err + if err != nil { + return err + } } } return nil From 1703014287e47e7197eadf26e9cef4487ebabd1f Mon Sep 17 00:00:00 2001 From: IronGauntlets Date: Tue, 19 Nov 2024 09:30:19 +0000 Subject: [PATCH 6/8] Fix too many blocks back condition --- rpc/events_test.go | 44 -------------------- rpc/subscriptions.go | 2 +- rpc/subscriptions_test.go | 84 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 45 deletions(-) create mode 100644 rpc/subscriptions_test.go diff --git a/rpc/events_test.go b/rpc/events_test.go index f7715c1f31..7f8b483987 100644 --- a/rpc/events_test.go +++ b/rpc/events_test.go @@ -229,50 +229,6 @@ func (fc *fakeConn) Equal(other jsonrpc.Conn) bool { return fc.w == fc2.w } -func TestSubscribeEventsAndUnsubscribe(t *testing.T) { - t.Parallel() - log := utils.NewNopZapLogger() - n := utils.Ptr(utils.Mainnet) - client := feeder.NewTestClient(t, n) - gw := adaptfeeder.New(client) - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - chain := blockchain.New(pebble.NewMemTest(t), n) - syncer := sync.New(chain, gw, log, 0, false) - handler := rpc.New(chain, syncer, nil, "", log) - - go func() { - require.NoError(t, handler.Run(ctx)) - }() - // Technically, there's a race between goroutine above and the SubscribeNewHeads call down below. - // Sleep for a moment just in case. - time.Sleep(50 * time.Millisecond) - - serverConn, clientConn := net.Pipe() - t.Cleanup(func() { - require.NoError(t, serverConn.Close()) - require.NoError(t, clientConn.Close()) - }) - - t.Run("Too many keys in filter", func(t *testing.T) { - keys := make([][]felt.Felt, 1024+1) - fromAddr := new(felt.Felt).SetBytes([]byte("from_address")) - id, rpcErr := handler.SubscribeEvents(ctx, fromAddr, keys, nil) - assert.Zero(t, id) - assert.Equal(t, rpc.ErrTooManyKeysInFilter, rpcErr) - }) - - // Todo: use mocks to fix the tests - t.Run("Too many blocks back", func(t *testing.T) { - keys := make([][]felt.Felt, 1) - fromAddr := new(felt.Felt).SetBytes([]byte("from_address")) - blockID := &rpc.BlockID{Number: 0} - id, rpcErr := handler.SubscribeEvents(ctx, fromAddr, keys, blockID) - assert.Zero(t, id) - assert.Equal(t, rpc.ErrTooManyBlocksBack, rpcErr) - }) -} - func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) { t.Parallel() log := utils.NewNopZapLogger() diff --git a/rpc/subscriptions.go b/rpc/subscriptions.go index 26c64ab215..2c9fbdf5ae 100644 --- a/rpc/subscriptions.go +++ b/rpc/subscriptions.go @@ -45,7 +45,7 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys } // Todo: should the pending block be included in the head count? - if requestedHeader.Number >= maxBlocksBack && requestedHeader.Number <= headHeader.Number-maxBlocksBack { + if headHeader.Number >= maxBlocksBack && requestedHeader.Number <= headHeader.Number-maxBlocksBack { return nil, ErrTooManyBlocksBack } } diff --git a/rpc/subscriptions_test.go b/rpc/subscriptions_test.go new file mode 100644 index 0000000000..dc635d732b --- /dev/null +++ b/rpc/subscriptions_test.go @@ -0,0 +1,84 @@ +package rpc_test + +import ( + "context" + "net" + "testing" + + "github.com/NethermindEth/juno/core" + + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/jsonrpc" + "github.com/NethermindEth/juno/mocks" + "github.com/NethermindEth/juno/rpc" + "github.com/NethermindEth/juno/utils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" +) + +func TestSubscribeEventsAndUnsubscribe(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + + log := utils.NewNopZapLogger() + handler := rpc.New(mockChain, mockSyncer, nil, "", log) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + t.Run("Too many keys in filter", func(t *testing.T) { + keys := make([][]felt.Felt, 1024+1) + fromAddr := new(felt.Felt).SetBytes([]byte("from_address")) + + serverConn, clientConn := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + require.NoError(t, clientConn.Close()) + }) + + subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + + id, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, nil) + assert.Zero(t, id) + assert.Equal(t, rpc.ErrTooManyKeysInFilter, rpcErr) + }) + + t.Run("Too many blocks back", func(t *testing.T) { + keys := make([][]felt.Felt, 1) + fromAddr := new(felt.Felt).SetBytes([]byte("from_address")) + blockID := &rpc.BlockID{Number: 0} + + serverConn, clientConn := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + require.NoError(t, clientConn.Close()) + }) + + subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + + // Note the end of the window doesn't need to be tested because if a requested number is more than the + // head a block not found error will be returned. This behaviour has been tested in various other test, and we + // don't need to test it here again. + t.Run("head is 1024", func(t *testing.T) { + mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: 1024}, nil) + mockChain.EXPECT().BlockHeaderByNumber(blockID.Number).Return(&core.Header{Number: 0}, nil) + + id, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, blockID) + assert.Zero(t, id) + assert.Equal(t, rpc.ErrTooManyBlocksBack, rpcErr) + }) + + t.Run("head is more than 1024", func(t *testing.T) { + mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: 2024}, nil) + mockChain.EXPECT().BlockHeaderByNumber(blockID.Number).Return(&core.Header{Number: 0}, nil) + + id, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, blockID) + assert.Zero(t, id) + assert.Equal(t, rpc.ErrTooManyBlocksBack, rpcErr) + }) + }) +} From 0a8c6734e29765995dffe712292ce7870fde340d Mon Sep 17 00:00:00 2001 From: IronGauntlets Date: Tue, 19 Nov 2024 12:26:45 +0000 Subject: [PATCH 7/8] wip --- rpc/subscriptions_test.go | 81 +++++++++++++++++++++++++++++++-------- 1 file changed, 66 insertions(+), 15 deletions(-) diff --git a/rpc/subscriptions_test.go b/rpc/subscriptions_test.go index dc635d732b..d20a93ad6d 100644 --- a/rpc/subscriptions_test.go +++ b/rpc/subscriptions_test.go @@ -2,15 +2,17 @@ package rpc_test import ( "context" + "fmt" "net" "testing" + "github.com/NethermindEth/juno/clients/feeder" "github.com/NethermindEth/juno/core" - "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/jsonrpc" "github.com/NethermindEth/juno/mocks" "github.com/NethermindEth/juno/rpc" + adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder" "github.com/NethermindEth/juno/utils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -18,19 +20,16 @@ import ( ) func TestSubscribeEventsAndUnsubscribe(t *testing.T) { - mockCtrl := gomock.NewController(t) - t.Cleanup(mockCtrl.Finish) - - mockChain := mocks.NewMockReader(mockCtrl) - mockSyncer := mocks.NewMockSyncReader(mockCtrl) - log := utils.NewNopZapLogger() - handler := rpc.New(mockChain, mockSyncer, nil, "", log) - - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) t.Run("Too many keys in filter", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + handler := rpc.New(mockChain, mockSyncer, nil, "", log) + keys := make([][]felt.Felt, 1024+1) fromAddr := new(felt.Felt).SetBytes([]byte("from_address")) @@ -40,7 +39,7 @@ func TestSubscribeEventsAndUnsubscribe(t *testing.T) { require.NoError(t, clientConn.Close()) }) - subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) id, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, nil) assert.Zero(t, id) @@ -48,6 +47,13 @@ func TestSubscribeEventsAndUnsubscribe(t *testing.T) { }) t.Run("Too many blocks back", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + handler := rpc.New(mockChain, mockSyncer, nil, "", log) + keys := make([][]felt.Felt, 1) fromAddr := new(felt.Felt).SetBytes([]byte("from_address")) blockID := &rpc.BlockID{Number: 0} @@ -58,10 +64,10 @@ func TestSubscribeEventsAndUnsubscribe(t *testing.T) { require.NoError(t, clientConn.Close()) }) - subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) - // Note the end of the window doesn't need to be tested because if a requested number is more than the - // head a block not found error will be returned. This behaviour has been tested in various other test, and we + // Note the end of the window doesn't need to be tested because if requested block number is more than the + // head, a block not found error will be returned. This behaviour has been tested in various other test, and we // don't need to test it here again. t.Run("head is 1024", func(t *testing.T) { mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: 1024}, nil) @@ -81,4 +87,49 @@ func TestSubscribeEventsAndUnsubscribe(t *testing.T) { assert.Equal(t, rpc.ErrTooManyBlocksBack, rpcErr) }) }) + + t.Run("Events from old blocks and new", func(t *testing.T) { + n := utils.Ptr(utils.Sepolia) + client := feeder.NewTestClient(t, n) + gw := adaptfeeder.New(client) + + b1, err := gw.BlockByNumber(context.Background(), 56377) + require.NoError(t, err) + + // Make a shallow copy of b1 into b2 and b3. Then modify them accordingly. + b2, b3 := new(core.Block), new(core.Block) + b2.Header, b3.Header = new(core.Header), new(core.Header) + *b2.Header, *b3.Header = *b1.Header, *b1.Header + b2.Number = b1.Number + 1 + b3.Number = b2.Number + 1 + fmt.Println(b1.Number, b2.Number, b3.Number) + + serverConn, clientConn := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + require.NoError(t, clientConn.Close()) + }) + + subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + fromAddr := b1.Receipts[0].Events[0].From + keys := make([][]felt.Felt, 1) + for _, k := range b1.Receipts[0].Events[0].Keys { + keys[0] = append(keys[0], *k) + } + + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + handler := rpc.New(mockChain, mockSyncer, nil, "", log) + + mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: b2.Number}, nil) + mockChain.EXPECT().BlockHeaderByNumber(b1.Number).Return(b1.Header, nil) + + _, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, &rpc.BlockID{Number: b1.Number}) + require.Nil(t, rpcErr) + + // Check from the conn that the correct id has been passed + }) } From 91a2d9b7b8c1bfc4c0046bbf5b921ad2c4711d5d Mon Sep 17 00:00:00 2001 From: Kirill Date: Wed, 20 Nov 2024 13:59:38 +0400 Subject: [PATCH 8/8] Add websocket shutdown logic --- jsonrpc/websocket.go | 19 +++++++++++++++++-- jsonrpc/websocket_test.go | 2 +- node/http.go | 15 +++++++++++++-- rpc/events_test.go | 2 +- 4 files changed, 32 insertions(+), 6 deletions(-) diff --git a/jsonrpc/websocket.go b/jsonrpc/websocket.go index b3e436703a..2c1e207a54 100644 --- a/jsonrpc/websocket.go +++ b/jsonrpc/websocket.go @@ -18,14 +18,17 @@ type Websocket struct { log utils.SimpleLogger connParams *WebsocketConnParams listener NewRequestListener + + shutdown <-chan struct{} } -func NewWebsocket(rpc *Server, log utils.SimpleLogger) *Websocket { +func NewWebsocket(rpc *Server, shutdown <-chan struct{}, log utils.SimpleLogger) *Websocket { ws := &Websocket{ rpc: rpc, log: log, connParams: DefaultWebsocketConnParams(), listener: &SelectiveListener{}, + shutdown: shutdown, } return ws @@ -54,7 +57,19 @@ func (ws *Websocket) ServeHTTP(w http.ResponseWriter, r *http.Request) { // TODO include connection information, such as the remote address, in the logs. - wsc := newWebsocketConn(r.Context(), conn, ws.connParams) + ctx, cancel := context.WithCancel(r.Context()) + defer cancel() + go func() { + select { + case <-ws.shutdown: + cancel() + case <-ctx.Done(): + // in case websocket connection is closed and server is not in shutdown mode + // we need to release this goroutine from waiting + } + }() + + wsc := newWebsocketConn(ctx, conn, ws.connParams) for { _, wsc.r, err = wsc.conn.Reader(wsc.ctx) diff --git a/jsonrpc/websocket_test.go b/jsonrpc/websocket_test.go index 9baf704ae9..4f60377c02 100644 --- a/jsonrpc/websocket_test.go +++ b/jsonrpc/websocket_test.go @@ -20,7 +20,7 @@ func testConnection(t *testing.T, ctx context.Context, method jsonrpc.Method, li require.NoError(t, rpc.RegisterMethods(method)) // Server - srv := httptest.NewServer(jsonrpc.NewWebsocket(rpc, utils.NewNopZapLogger())) + srv := httptest.NewServer(jsonrpc.NewWebsocket(rpc, nil, utils.NewNopZapLogger())) // Client conn, resp, err := websocket.Dial(ctx, srv.URL, nil) //nolint:bodyclose // websocket package closes resp.Body for us. diff --git a/node/http.go b/node/http.go index 4226564b0a..3df3b00fca 100644 --- a/node/http.go +++ b/node/http.go @@ -52,6 +52,10 @@ func (h *httpService) Run(ctx context.Context) error { } } +func (h *httpService) registerOnShutdown(f func()) { + h.srv.RegisterOnShutdown(f) +} + func makeHTTPService(host string, port uint16, handler http.Handler) *httpService { portStr := strconv.FormatUint(uint64(port), 10) return &httpService{ @@ -108,9 +112,11 @@ func makeRPCOverWebsocket(host string, port uint16, servers map[string]*jsonrpc. listener = makeWSMetrics() } + shutdown := make(chan struct{}) + mux := http.NewServeMux() for path, server := range servers { - wsHandler := jsonrpc.NewWebsocket(server, log) + wsHandler := jsonrpc.NewWebsocket(server, shutdown, log) if listener != nil { wsHandler = wsHandler.WithListener(listener) } @@ -124,7 +130,12 @@ func makeRPCOverWebsocket(host string, port uint16, servers map[string]*jsonrpc. if corsEnabled { handler = cors.Default().Handler(handler) } - return makeHTTPService(host, port, handler) + + service := makeHTTPService(host, port, handler) + service.registerOnShutdown(func() { + close(shutdown) + }) + return service } func makeMetrics(host string, port uint16) *httpService { diff --git a/rpc/events_test.go b/rpc/events_test.go index 7f8b483987..114002cf34 100644 --- a/rpc/events_test.go +++ b/rpc/events_test.go @@ -347,7 +347,7 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) { Params: []jsonrpc.Parameter{{Name: "id"}}, Handler: handler.Unsubscribe, })) - ws := jsonrpc.NewWebsocket(server, log) + ws := jsonrpc.NewWebsocket(server, nil, log) httpSrv := httptest.NewServer(ws) conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil) require.NoError(t, err)