Skip to content

Commit

Permalink
fix: websocket client connection issue
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyasbhat0 committed Oct 28, 2024
1 parent cb58a51 commit 3346913
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 17 deletions.
3 changes: 2 additions & 1 deletion directory/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,12 @@ func TestDownloadProviderMetadata(t *testing.T) {
if err != nil {
t.FailNow()
}

// nolint:staticcheck
if metadata == nil {
t.FailNow()
}

// nolint:staticcheck
if metadata.Version == "" {
t.FailNow()
}
Expand Down
58 changes: 42 additions & 16 deletions sentinel/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"strings"
"syscall"

"cosmossdk.io/errors"
"github.com/gogo/protobuf/proto"

"github.com/arkeonetwork/arkeo/common"
"github.com/arkeonetwork/arkeo/common/cosmos"

"github.com/cometbft/cometbft/libs/log"

tmlog "github.com/cometbft/cometbft/libs/log"
tmclient "github.com/cometbft/cometbft/rpc/client/http"
tmCoreTypes "github.com/cometbft/cometbft/rpc/core/types"
tmtypes "github.com/cometbft/cometbft/types"
Expand All @@ -23,6 +25,8 @@ import (
"github.com/arkeonetwork/arkeo/x/arkeo/types"
)

var numOfWebSocketClients = 2

func subscribe(client *tmclient.HTTP, logger log.Logger, query string) <-chan tmCoreTypes.ResultEvent {
out, err := client.Subscribe(context.Background(), "", query)
if err != nil {
Expand All @@ -32,26 +36,46 @@ func subscribe(client *tmclient.HTTP, logger log.Logger, query string) <-chan tm
return out
}

func (p Proxy) EventListener(host string) {
logger := p.logger
client, err := tmclient.New(fmt.Sprintf("tcp://%s", host), "/websocket")
func NewTendermintClient(baseURL string) (*tmclient.HTTP, error) {
client, err := tmclient.New(baseURL, "/websocket")
if err != nil {
logger.Error("failure to create websocket client", "error", err)
panic(err)
return nil, errors.Wrapf(err, "error creating websocket client")
}
logger := tmlog.NewTMLogger(tmlog.NewSyncWriter(os.Stdout))
client.SetLogger(logger)
err = client.Start()
if err != nil {
logger.Error("Failed to start a client", "err", err)
os.Exit(1)
}
defer client.Stop() // nolint

// create a unified channel for receiving events
return client, nil
}

func (p Proxy) EventListener(host string) {
logger := p.logger

logger.Info("starting realtime indexing using /websocket")

// as maximum allowed connection is 5 per ws client(cometbft) we split this into 2 client to handle 3 connection each
clients := make([]*tmclient.HTTP, numOfWebSocketClients)

for i := 0; i < numOfWebSocketClients; i++ {
client, err := NewTendermintClient(fmt.Sprintf("tcp://%s", host))
if err != nil {
panic(fmt.Sprintf("error creating tm client for %s: %+v", host, err))
}
if err = client.Start(); err != nil {
panic(fmt.Sprintf("error starting ws client: %s: %+v", host, err))
}
defer func() {
if err := client.Stop(); err != nil {
logger.Error("Failed to stop the client", "error", err)
}
}()
clients[i] = client
}

// Create a unified channel for receiving events
eventChan := make(chan tmCoreTypes.ResultEvent, 1000)

subscribeToEvents := func(queries ...string) {
// Function to subscribe to events for a given client
subscribeToEvents := func(client *tmclient.HTTP, queries ...string) {
for _, query := range queries {
out := subscribe(client, logger, query)

Expand All @@ -60,7 +84,6 @@ func (p Proxy) EventListener(host string) {
select {
case result := <-out:
eventChan <- result

case <-client.Quit():
return
}
Expand All @@ -69,11 +92,14 @@ func (p Proxy) EventListener(host string) {
}
}

// subscribe to events
go subscribeToEvents(
// Subscribe to events for each client
go subscribeToEvents(clients[0],
"tm.event = 'NewBlock'",
"tm.event = 'Tx' AND message.action='/arkeo.arkeo.MsgOpenContract'",
"tm.event = 'Tx' AND message.action='/arkeo.arkeo.MsgCloseContract'",
)

go subscribeToEvents(clients[1],
"tm.event = 'Tx' AND message.action='/arkeo.arkeo.MsgClaimContractIncome'",
"tm.event = 'Tx' AND message.action='/arkeo.arkeo.MsgBondProvider'",
"tm.event = 'Tx' AND message.action='/arkeo.arkeo.MsgModProvider'",
Expand Down

0 comments on commit 3346913

Please sign in to comment.