Skip to content

Commit

Permalink
Merge pull request #2 from multiversx/update-dependencies
Browse files Browse the repository at this point in the history
update multiversx deps to latest release
  • Loading branch information
AdoAdoAdo authored Mar 11, 2024
2 parents f193e7b + 444df14 commit c50085c
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 154 deletions.
7 changes: 6 additions & 1 deletion cmd/connector/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
retry_duration = 5
# This flag specifies if we should send an acknowledge signal upon recieving data
with_acknowledge = true
# The duration in seconds to wait for an acknowledgement message
acknowledge_timeout_in_sec = 5
# Signals if in case of data payload processing error, we should send the ack signal or not. If you want to block
# incoming data in case of a local error, this should be set to true.
blocking_ack_on_error = false
blocking_ack_on_error = true
# This flag specifies if we should drop messages if there is no connection to the host
drop_messages_if_no_connection = false
version = 1
15 changes: 9 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ type Config struct {

// WebSocketConfig holds web sockets config
type WebSocketConfig struct {
Url string `toml:"url"`
MarshallerType string `toml:"marshaller_type"`
Mode string `toml:"mode"`
RetryDuration uint32 `toml:"retry_duration"`
WithAcknowledge bool `toml:"with_acknowledge"`
BlockingAckOnError bool `toml:"blocking_ack_on_error"`
Url string `toml:"url"`
MarshallerType string `toml:"marshaller_type"`
Mode string `toml:"mode"`
RetryDuration uint32 `toml:"retry_duration"`
WithAcknowledge bool `toml:"with_acknowledge"`
AcknowledgeTimeoutInSec int `toml:"acknowledge_timeout_in_sec"`
BlockingAckOnError bool `toml:"blocking_ack_on_error"`
DropMessagesIfNoConnection bool `toml:"drop_messages_if_no_connection"` // Set to `true` to drop messages if there is no active WebSocket connection to send to.
Version uint32 `toml:"version"` // Defines the payload version.
}
3 changes: 3 additions & 0 deletions factory/wsConnectorFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-core-go/marshal/factory"
logger "github.com/multiversx/mx-chain-logger-go"

"github.com/multiversx/mx-chain-ws-connector-template-go/config"
"github.com/multiversx/mx-chain-ws-connector-template-go/process"
)
Expand Down Expand Up @@ -79,6 +80,8 @@ func createWsHost(wsMarshaller marshal.Marshalizer, cfg config.WebSocketConfig)
RetryDurationInSec: int(cfg.RetryDuration),
BlockingAckOnError: cfg.BlockingAckOnError,
DropMessagesIfNoConnection: false,
AcknowledgeTimeoutInSec: cfg.AcknowledgeTimeoutInSec,
Version: cfg.Version,
},
Marshaller: wsMarshaller,
Log: log,
Expand Down
20 changes: 10 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
module github.com/multiversx/mx-chain-ws-connector-template-go

go 1.17
go 1.20

require (
github.com/multiversx/mx-chain-communication-go v1.0.1
github.com/multiversx/mx-chain-core-go v1.2.4
github.com/multiversx/mx-chain-logger-go v1.0.11
github.com/stretchr/testify v1.8.2
github.com/urfave/cli v1.22.13
github.com/multiversx/mx-chain-communication-go v1.0.12
github.com/multiversx/mx-chain-core-go v1.2.18
github.com/multiversx/mx-chain-logger-go v1.0.13
github.com/stretchr/testify v1.8.4
github.com/urfave/cli v1.22.14
)

require (
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/denisbrodbeck/machineid v1.0.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/pelletier/go-toml v1.9.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
golang.org/x/sys v0.2.0 // indirect
google.golang.org/protobuf v1.26.0 // indirect
golang.org/x/sys v0.8.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
140 changes: 21 additions & 119 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion process/firehoseDataProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewFirehoseDataProcessor(
}

// ProcessPayload will process the received payload only for TopicSaveBlock, otherwise ignores it.
func (dp *dataProcessor) ProcessPayload(payload []byte, topic string) error {
func (dp *dataProcessor) ProcessPayload(payload []byte, topic string, _ uint32) error {
operationHandler, found := dp.operationHandlers[topic]
if !found {
return nil
Expand Down
33 changes: 17 additions & 16 deletions process/firehoseDataProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (
"github.com/multiversx/mx-chain-core-go/data/block"
outportcore "github.com/multiversx/mx-chain-core-go/data/outport"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-ws-connector-template-go/testscommon"
"github.com/stretchr/testify/require"

"github.com/multiversx/mx-chain-ws-connector-template-go/testscommon"
)

var protoMarshaller = &marshal.GogoProtoMarshalizer{}
Expand Down Expand Up @@ -86,14 +87,14 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {

fi, _ := NewFirehoseDataProcessor(&testscommon.IoWriterStub{}, createContainer(), protoMarshaller)

err := fi.ProcessPayload(nil, outportcore.TopicSaveBlock)
err := fi.ProcessPayload(nil, outportcore.TopicSaveBlock, 1)
require.Equal(t, errNilOutportBlockData, err)

outportBlock := createOutportBlock()
outportBlock.BlockData = nil
outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock)

err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.Equal(t, errNilOutportBlockData, err)
})

Expand All @@ -102,7 +103,7 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {

fi, _ := NewFirehoseDataProcessor(&testscommon.IoWriterStub{}, createContainer(), protoMarshaller)

err := fi.ProcessPayload([]byte("invalid payload"), outportcore.TopicSaveBlock)
err := fi.ProcessPayload([]byte("invalid payload"), outportcore.TopicSaveBlock, 1)
require.NotNil(t, err)
})

Expand All @@ -123,7 +124,7 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {
fi, _ := NewFirehoseDataProcessor(ioWriter, createContainer(), protoMarshaller)

outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock)
err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.NotNil(t, err)
require.Equal(t, 0, ioWriterCalledCt)
})
Expand Down Expand Up @@ -163,7 +164,7 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {
}

fi, _ := NewFirehoseDataProcessor(ioWriter, createContainer(), marshaller)
err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.Equal(t, errUnmarshal, err)
})

Expand Down Expand Up @@ -197,13 +198,13 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {
outportBlock := createOutportBlock()
outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock)

err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.True(t, strings.Contains(err.Error(), err1.Error()))

err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.True(t, strings.Contains(err.Error(), err2.Error()))

err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.Nil(t, err)

require.Equal(t, 5, ioWriterCalledCt)
Expand Down Expand Up @@ -253,7 +254,7 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {

fi, _ := NewFirehoseDataProcessor(ioWriter, createContainer(), protoMarshaller)

err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.Nil(t, err)
require.Equal(t, 2, ioWriterCalledCt)
})
Expand All @@ -265,12 +266,12 @@ func TestFirehoseIndexer_NoOperationFunctions(t *testing.T) {

fi, _ := NewFirehoseDataProcessor(&testscommon.IoWriterStub{}, createContainer(), protoMarshaller)

require.Nil(t, fi.ProcessPayload([]byte("payload"), "random topic"))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveRoundsInfo))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsRating))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsPubKeys))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveAccounts))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicFinalizedBlock))
require.Nil(t, fi.ProcessPayload([]byte("payload"), "random topic", 1))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveRoundsInfo, 1))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsRating, 1))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsPubKeys, 1))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveAccounts, 1))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicFinalizedBlock, 1))
}

func TestFirehoseIndexer_Close(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion process/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type WSConnector interface {

// DataProcessor defines a payload processor for incoming ws data
type DataProcessor interface {
ProcessPayload(payload []byte, topic string) error
ProcessPayload(payload []byte, topic string, version uint32) error
Close() error
IsInterfaceNil() bool
}
Expand Down

0 comments on commit c50085c

Please sign in to comment.