Skip to content

Commit

Permalink
chore: consollidate-chain-message-data (lavanet#1636)
Browse files Browse the repository at this point in the history
* create protocolMessage class

* fix bug

* fix arg mismatch

* fix test

---------

Co-authored-by: Ran Mishael <[email protected]>
  • Loading branch information
omerlavanet and ranlavanet authored Aug 21, 2024
1 parent 1654218 commit 5937abe
Show file tree
Hide file tree
Showing 10 changed files with 357 additions and 241 deletions.
6 changes: 2 additions & 4 deletions protocol/chainlib/chainlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,13 @@ type RelaySender interface {
consumerIp string,
analytics *metrics.RelayMetrics,
metadata []pairingtypes.Metadata,
) (ChainMessage, map[string]string, *pairingtypes.RelayPrivateData, error)
) (ProtocolMessage, error)
SendParsedRelay(
ctx context.Context,
dappID string,
consumerIp string,
analytics *metrics.RelayMetrics,
chainMessage ChainMessage,
directiveHeaders map[string]string,
relayRequestData *pairingtypes.RelayPrivateData,
protocolMessage ProtocolMessage,
) (relayResult *common.RelayResult, errRet error)
CreateDappKey(dappID, consumerIp string) string
CancelSubscriptionContext(subscriptionKey string)
Expand Down
141 changes: 96 additions & 45 deletions protocol/chainlib/chainlib_mock.go

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions protocol/chainlib/consumer_websocket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {

metricsData := metrics.NewRelayAnalytics(dappID, cwm.chainId, cwm.apiInterface)

chainMessage, directiveHeaders, relayRequestData, err := cwm.relaySender.ParseRelay(webSocketCtx, "", string(msg), cwm.connectionType, dappID, userIp, metricsData, nil)
protocolMessage, err := cwm.relaySender.ParseRelay(webSocketCtx, "", string(msg), cwm.connectionType, dappID, userIp, metricsData, nil)
if err != nil {
formatterMsg := logger.AnalyzeWebSocketErrorAndGetFormattedMessage(websocketConn.LocalAddr().String(), utils.LavaFormatError("could not parse message", err), msgSeed, msg, cwm.apiInterface, time.Since(startTime))
if formatterMsg != nil {
Expand All @@ -159,9 +159,9 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
}

// check whether its a normal relay / unsubscribe / unsubscribe_all otherwise its a subscription flow.
if !IsFunctionTagOfType(chainMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) {
if IsFunctionTagOfType(chainMessage, spectypes.FUNCTION_TAG_UNSUBSCRIBE) {
err := cwm.consumerWsSubscriptionManager.Unsubscribe(webSocketCtx, chainMessage, directiveHeaders, relayRequestData, dappID, userIp, cwm.WebsocketConnectionUID, metricsData)
if !IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) {
if IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_UNSUBSCRIBE) {
err := cwm.consumerWsSubscriptionManager.Unsubscribe(webSocketCtx, protocolMessage, dappID, userIp, cwm.WebsocketConnectionUID, metricsData)
if err != nil {
utils.LavaFormatWarning("error unsubscribing from subscription", err, utils.LogAttr("GUID", webSocketCtx))
if err == common.SubscriptionNotFoundError {
Expand All @@ -174,15 +174,15 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
}
}
continue
} else if IsFunctionTagOfType(chainMessage, spectypes.FUNCTION_TAG_UNSUBSCRIBE_ALL) {
} else if IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_UNSUBSCRIBE_ALL) {
err := cwm.consumerWsSubscriptionManager.UnsubscribeAll(webSocketCtx, dappID, userIp, cwm.WebsocketConnectionUID, metricsData)
if err != nil {
utils.LavaFormatWarning("error unsubscribing from all subscription", err, utils.LogAttr("GUID", webSocketCtx))
}
continue
} else {
// Normal relay over websocket. (not subscription related)
relayResult, err := cwm.relaySender.SendParsedRelay(webSocketCtx, dappID, userIp, metricsData, chainMessage, directiveHeaders, relayRequestData)
relayResult, err := cwm.relaySender.SendParsedRelay(webSocketCtx, dappID, userIp, metricsData, protocolMessage)
if err != nil {
formatterMsg := logger.AnalyzeWebSocketErrorAndGetFormattedMessage(websocketConn.LocalAddr().String(), utils.LavaFormatError("could not send parsed relay", err), msgSeed, msg, cwm.apiInterface, time.Since(startTime))
if formatterMsg != nil {
Expand All @@ -202,16 +202,16 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
}

// Subscription flow
inputFormatter, outputFormatter := formatter.FormatterForRelayRequestAndResponse(relayRequestData.ApiInterface) // we use this to preserve the original jsonrpc id
inputFormatter(relayRequestData.Data) // set the extracted jsonrpc id
inputFormatter, outputFormatter := formatter.FormatterForRelayRequestAndResponse(protocolMessage.GetApiCollection().CollectionData.ApiInterface) // we use this to preserve the original jsonrpc id
inputFormatter(protocolMessage.RelayPrivateData().Data) // set the extracted jsonrpc id

reply, subscriptionMsgsChan, err := cwm.consumerWsSubscriptionManager.StartSubscription(webSocketCtx, chainMessage, directiveHeaders, relayRequestData, dappID, userIp, cwm.WebsocketConnectionUID, metricsData)
reply, subscriptionMsgsChan, err := cwm.consumerWsSubscriptionManager.StartSubscription(webSocketCtx, protocolMessage, dappID, userIp, cwm.WebsocketConnectionUID, metricsData)
if err != nil {
utils.LavaFormatWarning("StartSubscription returned an error", err,
utils.LogAttr("GUID", webSocketCtx),
utils.LogAttr("dappID", dappID),
utils.LogAttr("userIp", userIp),
utils.LogAttr("params", chainMessage.GetRPCMessage().GetParams()),
utils.LogAttr("params", protocolMessage.GetRPCMessage().GetParams()),
)

formatterMsg := logger.AnalyzeWebSocketErrorAndGetFormattedMessage(websocketConn.LocalAddr().String(), utils.LavaFormatError("could not start subscription", err), msgSeed, msg, cwm.apiInterface, time.Since(startTime))
Expand Down Expand Up @@ -239,7 +239,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
utils.LogAttr("GUID", webSocketCtx),
utils.LogAttr("dappID", dappID),
utils.LogAttr("userIp", userIp),
utils.LogAttr("params", chainMessage.GetRPCMessage().GetParams()),
utils.LogAttr("params", protocolMessage.GetRPCMessage().GetParams()),
)

for subscriptionMsgReply := range subscriptionMsgsChan {
Expand All @@ -250,7 +250,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
utils.LogAttr("GUID", webSocketCtx),
utils.LogAttr("dappID", dappID),
utils.LogAttr("userIp", userIp),
utils.LogAttr("params", chainMessage.GetRPCMessage().GetParams()),
utils.LogAttr("params", protocolMessage.GetRPCMessage().GetParams()),
)
}()
}
Expand Down
Loading

0 comments on commit 5937abe

Please sign in to comment.