Skip to content

Commit

Permalink
feat: PRT-1178: Subscriptions phase 1 (without handover) (lavanet#1462)
Browse files Browse the repository at this point in the history
* Enable the subscription again

* Add GetParams() to GenericMessage struct

* Fix tendermint small bug

* RelayProcessor to skip some logic on subscription

* Add subscription management in the provider - some TODOs are left

* Fix the subscription category for supported chains

* Remove unused functions

* Small fix to the provider web socket manager

* Small fixes to the provider

* Move finalization consensus to a new package

* Split the SendRelay into 2 functions: ParseRelay & SendParsedRelay

* Add an utility function for readable hex

* Remove the logic of disconnecting consumers after the end of epoch

* Fix to the sage channel sender

* Use a utility function CreateHashFromParams

* Add some trace logs

* Rename IsSubscription -> IsSubscriptionCategory

* Add LongLastingProvidersStorage

* Add the cancellable context to subscription relays int he rpcconsumer_server

* Add ConsumerWebsocketManager & ConsumerWSSubscriptionManager

* Move the subscription closing logic to a new function

* Rename to make the code more sense

* Implemented timeout of 15 minutes for open subscription

* Add the subscription function tags

* Updated cosmos and ethereum specs with new subscription parse directives

* Implemented unsubscribe logic in the consumer

* Small logs improvements

* Rename CreateSubscriptionKey -> CreateDappKey

* Use hashed params as map keys for subscription context

* Move the websocket replies channel creation to the subscription manager

* Rethink the storing of connected dapps and active subscriptions in the consumer

Which now supports multiple subscriptions in one websocket

* Some small fixes to the Unsubscribe function

* Small  rename

* Implement unsubscribe_all for consumer

* Typos fix

* Improved some logs

* Add a missing lock

* Make the websocket channel writing a little bit safer

* Fix a small bug

* Don't support ubsubscribe_all in Relay flow

* Split the TryRelay into smaller pieces

* Add GetID to RPCInput interface

* Add support for unsubscribe relay in the provider

* Safe channel sender improvements

* Use SafeChannelSender for the subscription in the consumer and fix bugs

* Use formatter to preserve original jsonrpc ID

* Add some comments

* Small fixes

* Rename and fix AnalyzeWebSocketErrorAndWriteMessage

* Add a comment

* Remove done TODO

* Some log improvements

* Provider subscription manager fix

* Move UpdateCU call to start of subscription instead of its end

* Updated subscription CU cost

* Fix finalization consensus test

* Lint fixes

* Add LongLastingProvidersStorage tests

* Add SafeChannelSender tests

* Add 10 seconds timeout to handle hang when waiting for first message from subscription

* Move the first subscription reply verification into the rpcconsumer_server so we can control better the OnSessionFailure call

* Handle bad provider signature better

* Fix a small bug and remove redundant return value

* Fix typos

* Move ProviderNodeSubscriptionManager to chainlib

* Fix to the UnsubscribeAll call

* Create a const for the subscription relay timeout

* Sync the consumer and provider on session failure

* Change the type of replyServer to not be a pointer to interface

* Small fix for the consumer subscription manager

* Fix a small bug in the provider node subscription manager

* Small fix to the consumer subscription manager

* Remove epochs from provider subscription manager

* Small fix to the safe channel sender

* Tiny log fix

* Allow weboscket and http connectors in jsonRPC

* Small log fix to connector.go

* Add websocket server to mock chain lib

* Add tests for provider and consumer subscription manager

* Post merge fixes

* Fix lint errors

* Verify webSocket is up in consumer in protocol integration tests

* Revert "Allow weboscket and http connectors in jsonRPC"

This reverts commit 0a5d142.

* WIP

* changing functionality of parsed directive to be saved on the chain message instead of every time looking for it from scratch

* rename long lasting to active subscription provider storage

* adding purge callback.

* two consumers setup

* fix a bug where 2 consumers wouldn't be able to subscribe.

* fixing test for subscriptions, fixing bug in chain router, and managing the state properly.

* fixing safe channel sender functionality

* fixing provider subscription manager test.

* fix problem with websocket listener in tests.

* fixing test routine condition.

* lint

* fix ws issue on generic chain lib mocks

* fix e2e for jsonrpc.

* removing unused code.

* fix typo

* adding comments

* comment fix

* adding more comments.

* removing spam trace logs

* undoing WIP changes

* adding seen block to rpc consumer's consistency.

* handling same consumer subscription hash.

* rename to a proper convention

* fixing case where two subscriptions at the same time could trigger a race, and a hanging lock.

* improve readability

* adding comments for better readability

* insert sdk address inside the consumer container to avoid unnecessary unmarshlling

* improve json marshalling by using gojson :)

* remove unused code

* change json marshaling package

* improve flow on rpcconsumer server.

* improved encoding on rpc consumer server

* mistakenly forgotten unlock.

* Small log fix

* Pass also the ws connection to the provider in the setup_providers.sh script

* Add websocket subscription test in e2e

* fix mock tests

* add replace channel method to safe channel sender

* fix test

* use replace channel instead of close

* add  more documentation

* fix sub manager test

* Fix lint

* fix pending subscription race issue and add tests

* Checking for errors when writing to file in e2e

* Add websocket response to log

* Allow more than 2 websocket in e2e test for subscriptions

* fix nil deref on a race between read and close connection

* Remove log

* add comments

* remove log spam and make addon print better

* fix ws message bug

* Leftovers from committed WIP

* Small code cleaning

* Post merge fix

* Small test fix

* fix: PRT-1178: Subscription phase 1 unsubscribe fix (lavanet#1575)

* Fix init_chain command for macOS

* Fix the bug with unsubscribing from jsonrpc

* Send error message to user when subscription not found

* Updated the ethereum spec to match the fix

* Delete the dappkey from connected dapps when empty

* Test fix

* Fix lint issues

* Make the logs clear to investigate test fail on GH

* Add some logs for the SDK to trace the E2E test failure

* Attempt to fix test by removing go routines

* Another attempt to fix the protocol tests

* Small lint fix

* merged

* Adding better search func for requirements

* fix unused else

* adding consumer guid for subscription requests from multiple consumer processes with the same key

* consumer websocket manager fixes

* fix 7 more comments

* fixing more comments :)

* set all id parsing in chain message.

* fixing more comments

* merged v2 changes

* merge conflict fixes

* terminate connection in handleNewNodeMessage on error

* fixing missing websocket id for unique dapp key for websocket subscriptions from the same user.

* another review  bites the dust

* another one bites the dust

* logs

* fixing a bug in unsubscribe brackets

* do not purge providers if they have more than one subscription

* solving issue after issue. I love to call it, Hero mode.

* solve the safe channel sender issue when sending a message and waiting the entire time until the consumer responds

* remove log.

* fix lint

* lint v2

* lint v3

* lint v4

* fixing context issue reading headers on subscription.

* nil deref fix

* fix initialize redundancy

* adding multiple unique id on same dapp etc..

* fixing pending race for more than one pending subscriptions

* adding a test to validate the queue mechanism

* fixed.

---------

Co-authored-by: Ran Mishael <[email protected]>
Co-authored-by: Omer <[email protected]>
  • Loading branch information
3 people authored Aug 6, 2024
1 parent 6abf929 commit e942fcd
Show file tree
Hide file tree
Showing 97 changed files with 7,326 additions and 1,287 deletions.
9 changes: 8 additions & 1 deletion .github/workflows/lava.yml
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,14 @@ jobs:

report-tests-results:
runs-on: ubuntu-latest
needs: [test-consensus, test-protocol, test-protocol-e2e, test-payment-e2e] # test-sdk-e2e,
needs:
[
test-consensus,
test-protocol,
test-protocol-e2e,
# test-sdk-e2e,
test-payment-e2e,
]
if: always()
steps:
- name: Download Artifacts
Expand Down
26 changes: 13 additions & 13 deletions config/provider_examples/avalanch_internal_paths_example.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# this example show cases how you can setup Avalanche
# this example show cases how you can setup Avalanche
endpoints:
- api-interface: jsonrpc
chain-id: AVAX
network-address: 127.0.0.1:2221
node-urls:
- url: ws://127.0.0.1:3333/C/rpc/ws
internal-path: "/C/rpc" # c chain like specified in the spec
- url: https://127.0.0.1:3334/C/avax
internal-path: "/C/avax" # c/avax like specified in the spec
- url: https://127.0.0.1:3335/X
internal-path: "/X" # x chain like specified in the spec
- url: https://127.0.0.1:3336/P
internal-path: "/P" # p chain like specified in the spec
- api-interface: jsonrpc
chain-id: AVAX
network-address: 127.0.0.1:2221
node-urls:
- url: ws://127.0.0.1:3333/C/rpc/ws
internal-path: "/C/rpc" # c chain like specified in the spec
- url: https://127.0.0.1:3334/C/avax
internal-path: "/C/avax" # c/avax like specified in the spec
- url: https://127.0.0.1:3335/X
internal-path: "/X" # x chain like specified in the spec
- url: https://127.0.0.1:3336/P
internal-path: "/P" # p chain like specified in the spec
21 changes: 15 additions & 6 deletions cookbook/specs/ethereum.json
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@
"category": {
"deterministic": false,
"local": true,
"subscription": true,
"subscription": false,
"stateful": 0
},
"extra_compute_units": 0
Expand Down Expand Up @@ -833,7 +833,7 @@
],
"parser_func": "DEFAULT"
},
"compute_units": 10,
"compute_units": 1000,
"enabled": true,
"category": {
"deterministic": false,
Expand Down Expand Up @@ -874,7 +874,7 @@
"category": {
"deterministic": false,
"local": true,
"subscription": true,
"subscription": false,
"stateful": 0
},
"extra_compute_units": 0
Expand All @@ -883,16 +883,16 @@
"name": "eth_unsubscribe",
"block_parsing": {
"parser_arg": [
""
"latest"
],
"parser_func": "EMPTY"
"parser_func": "DEFAULT"
},
"compute_units": 10,
"enabled": true,
"category": {
"deterministic": false,
"local": true,
"subscription": false,
"subscription": true,
"stateful": 0
},
"extra_compute_units": 0
Expand Down Expand Up @@ -1045,6 +1045,15 @@
"encoding": "hex"
},
"api_name": "eth_getBlockByNumber"
},
{
"function_tag": "SUBSCRIBE",
"api_name": "eth_subscribe"
},
{
"function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"eth_unsubscribe\",\"params\":[\"%s\"],\"id\":1}",
"function_tag": "UNSUBSCRIBE",
"api_name": "eth_unsubscribe"
}
],
"verifications": [
Expand Down
6 changes: 3 additions & 3 deletions cookbook/specs/fantom.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
"category": {
"deterministic": false,
"local": true,
"subscription": false,
"subscription": true,
"stateful": 0
},
"extra_compute_units": 0
Expand Down Expand Up @@ -387,7 +387,7 @@
"category": {
"deterministic": false,
"local": true,
"subscription": true,
"subscription": false,
"stateful": 0
},
"extra_compute_units": 0
Expand Down Expand Up @@ -477,7 +477,7 @@
"category": {
"deterministic": false,
"local": true,
"subscription": true,
"subscription": false,
"stateful": 0
},
"extra_compute_units": 0
Expand Down
32 changes: 23 additions & 9 deletions cookbook/specs/tendermint.json
Original file line number Diff line number Diff line change
Expand Up @@ -449,11 +449,11 @@
"name": "subscribe",
"block_parsing": {
"parser_arg": [
""
"latest"
],
"parser_func": "EMPTY"
"parser_func": "DEFAULT"
},
"compute_units": 10,
"compute_units": 1000,
"enabled": true,
"category": {
"deterministic": false,
Expand Down Expand Up @@ -521,16 +521,16 @@
"name": "unsubscribe",
"block_parsing": {
"parser_arg": [
""
"latest"
],
"parser_func": "EMPTY"
"parser_func": "DEFAULT"
},
"compute_units": 10,
"enabled": true,
"category": {
"deterministic": false,
"local": true,
"subscription": false,
"subscription": true,
"stateful": 0
},
"extra_compute_units": 0
Expand All @@ -539,16 +539,16 @@
"name": "unsubscribe_all",
"block_parsing": {
"parser_arg": [
""
"latest"
],
"parser_func": "EMPTY"
"parser_func": "DEFAULT"
},
"compute_units": 10,
"enabled": true,
"category": {
"deterministic": false,
"local": true,
"subscription": false,
"subscription": true,
"stateful": 0
},
"extra_compute_units": 0
Expand Down Expand Up @@ -618,6 +618,20 @@
"encoding": "base64"
},
"api_name": "earliest_block"
},
{
"function_tag": "SUBSCRIBE",
"api_name": "subscribe"
},
{
"function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"unsubscribe\",\"params\":%s,\"id\":1}",
"function_tag": "UNSUBSCRIBE",
"api_name": "unsubscribe"
},
{
"function_template": "{\"jsonrpc\":\"2.0\",\"method\":\"unsubscribe_all\",\"params\":[],\"id\":1}",
"function_tag": "UNSUBSCRIBE_ALL",
"api_name": "unsubscribe_all"
}
],
"verifications": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class StateBadgeQuery {

// fetchPairing fetches pairing for all chainIDs we support
public async fetchPairing(): Promise<number> {
Logger.debug("Fetching pairing started");
Logger.debug("Fetching pairing from badge started");

let timeLeftToNextPairing;
let virtualEpoch;
Expand Down Expand Up @@ -110,7 +110,7 @@ export class StateBadgeQuery {
this.virtualEpoch = virtualEpoch;
this.currentEpoch = currentEpoch;

Logger.debug("Fetching pairing ended");
Logger.debug("Fetching pairing from badge ended", timeLeftToNextPairing);

return timeLeftToNextPairing;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export class StateChainQuery {
// fetchPairing fetches pairing for all chainIDs we support
public async fetchPairing(): Promise<number> {
try {
Logger.debug("Fetching pairing started");
Logger.debug("Fetching pairing from chain started");
// Save time till next epoch
let timeLeftToNextPairing;
let currentEpoch;
Expand Down Expand Up @@ -154,7 +154,7 @@ export class StateChainQuery {
this.currentEpoch = currentEpoch;
this.downtimeParams = downtimeParams;

Logger.debug("Fetching pairing ended");
Logger.debug("Fetching pairing from chain ended", timeLeftToNextPairing);

// Return timeLeftToNextPairing
return timeLeftToNextPairing;
Expand Down
3 changes: 3 additions & 0 deletions proto/lavanet/lava/spec/api_collection.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ enum FUNCTION_TAG {
SET_LATEST_IN_BODY = 4;
VERIFICATION = 5;
GET_EARLIEST_BLOCK = 6;
SUBSCRIBE = 7;
UNSUBSCRIBE = 8;
UNSUBSCRIBE_ALL = 9;
}

enum PARSER_TYPE {
Expand Down
10 changes: 6 additions & 4 deletions protocol/chainlib/base_chain_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,14 @@ func (bcp *BaseChainParser) SeparateAddonsExtensions(supported []string) (addons
if supportedToCheck == "" {
continue
}
if bcp.isExtension(supportedToCheck) {
if bcp.isExtension(supportedToCheck) || supportedToCheck == WebSocketExtension {
extensions = append(extensions, supportedToCheck)
continue
}
// neither is an error
err = utils.LavaFormatError("invalid supported to check, is neither an addon or an extension", err, utils.Attribute{Key: "spec", Value: bcp.spec.Index}, utils.Attribute{Key: "supported", Value: supportedToCheck})
err = utils.LavaFormatError("invalid supported to check, is neither an addon or an extension", err,
utils.Attribute{Key: "spec", Value: bcp.spec.Index},
utils.Attribute{Key: "supported", Value: supportedToCheck})
}
}
return addons, extensions, err
Expand Down Expand Up @@ -252,15 +254,15 @@ func (bcp *BaseChainParser) Construct(spec spectypes.Spec, internalPaths map[str
bcp.extensionParser.SetConfiguredExtensions(extensionParser.GetConfiguredExtensions())
}

func (bcp *BaseChainParser) GetParsingByTag(tag spectypes.FUNCTION_TAG) (parsing *spectypes.ParseDirective, collectionData *spectypes.CollectionData, existed bool) {
func (bcp *BaseChainParser) GetParsingByTag(tag spectypes.FUNCTION_TAG) (parsing *spectypes.ParseDirective, apiCollection *spectypes.ApiCollection, existed bool) {
bcp.rwLock.RLock()
defer bcp.rwLock.RUnlock()

val, ok := bcp.taggedApis[tag]
if !ok {
return nil, nil, false
}
return val.Parsing, &val.ApiCollection.CollectionData, ok
return val.Parsing, val.ApiCollection, ok
}

func (bcp *BaseChainParser) ExtensionParsing(addon string, parsedMessageArg *baseChainMessageContainer, extensionInfo extensionslib.ExtensionInfo) {
Expand Down
7 changes: 5 additions & 2 deletions protocol/chainlib/chain_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,12 @@ func (cf *ChainFetcher) ChainFetcherMetadata() []pairingtypes.Metadata {
}

func (cf *ChainFetcher) FetchLatestBlockNum(ctx context.Context) (int64, error) {
parsing, collectionData, ok := cf.chainParser.GetParsingByTag(spectypes.FUNCTION_TAG_GET_BLOCKNUM)
parsing, apiCollection, ok := cf.chainParser.GetParsingByTag(spectypes.FUNCTION_TAG_GET_BLOCKNUM)
tagName := spectypes.FUNCTION_TAG_GET_BLOCKNUM.String()
if !ok {
return spectypes.NOT_APPLICABLE, utils.LavaFormatError(tagName+" tag function not found", nil, []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...)
}
collectionData := apiCollection.CollectionData
var craftData *CraftData
if parsing.FunctionTemplate != "" {
path := parsing.ApiName
Expand Down Expand Up @@ -321,11 +322,13 @@ func (cf *ChainFetcher) constructRelayData(conectionType string, path string, da
}

func (cf *ChainFetcher) FetchBlockHashByNum(ctx context.Context, blockNum int64) (string, error) {
parsing, collectionData, ok := cf.chainParser.GetParsingByTag(spectypes.FUNCTION_TAG_GET_BLOCK_BY_NUM)
parsing, apiCollection, ok := cf.chainParser.GetParsingByTag(spectypes.FUNCTION_TAG_GET_BLOCK_BY_NUM)
tagName := spectypes.FUNCTION_TAG_GET_BLOCK_BY_NUM.String()
if !ok {
return "", utils.LavaFormatError(tagName+" tag function not found", nil, []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...)
}
collectionData := apiCollection.CollectionData

if parsing.FunctionTemplate == "" {
return "", utils.LavaFormatError(tagName+" missing function template", nil, []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...)
}
Expand Down
Loading

0 comments on commit e942fcd

Please sign in to comment.