Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update multiversx deps to latest release #2

Merged
merged 3 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cmd/connector/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
retry_duration = 5
# This flag specifies if we should send an acknowledge signal upon recieving data
with_acknowledge = true
# The duration in seconds to wait for an acknowledgement message
acknowledge_timeout_in_sec = 5
# Signals if in case of data payload processing error, we should send the ack signal or not. If you want to block
# incoming data in case of a local error, this should be set to true.
blocking_ack_on_error = false
blocking_ack_on_error = true
# This flag specifies if we should drop messages if there is no connection to the host
drop_messages_if_no_connection = false
version = 1
15 changes: 9 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ type Config struct {

// WebSocketConfig holds web sockets config
type WebSocketConfig struct {
Url string `toml:"url"`
MarshallerType string `toml:"marshaller_type"`
Mode string `toml:"mode"`
RetryDuration uint32 `toml:"retry_duration"`
WithAcknowledge bool `toml:"with_acknowledge"`
BlockingAckOnError bool `toml:"blocking_ack_on_error"`
Url string `toml:"url"`
MarshallerType string `toml:"marshaller_type"`
Mode string `toml:"mode"`
RetryDuration uint32 `toml:"retry_duration"`
WithAcknowledge bool `toml:"with_acknowledge"`
AcknowledgeTimeoutInSec int `toml:"acknowledge_timeout_in_sec"`
BlockingAckOnError bool `toml:"blocking_ack_on_error"`
DropMessagesIfNoConnection bool `toml:"drop_messages_if_no_connection"` // Set to `true` to drop messages if there is no active WebSocket connection to send to.
Version uint32 `toml:"version"` // Defines the payload version.
}
3 changes: 3 additions & 0 deletions factory/wsConnectorFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-core-go/marshal/factory"
logger "github.com/multiversx/mx-chain-logger-go"

"github.com/multiversx/mx-chain-ws-connector-template-go/config"
"github.com/multiversx/mx-chain-ws-connector-template-go/process"
)
Expand Down Expand Up @@ -79,6 +80,8 @@ func createWsHost(wsMarshaller marshal.Marshalizer, cfg config.WebSocketConfig)
RetryDurationInSec: int(cfg.RetryDuration),
BlockingAckOnError: cfg.BlockingAckOnError,
DropMessagesIfNoConnection: false,
raduchis marked this conversation as resolved.
Show resolved Hide resolved
AcknowledgeTimeoutInSec: cfg.AcknowledgeTimeoutInSec,
Version: cfg.Version,
},
Marshaller: wsMarshaller,
Log: log,
Expand Down
20 changes: 10 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
module github.com/multiversx/mx-chain-ws-connector-template-go

go 1.17
go 1.20

require (
github.com/multiversx/mx-chain-communication-go v1.0.1
github.com/multiversx/mx-chain-core-go v1.2.4
github.com/multiversx/mx-chain-logger-go v1.0.11
github.com/stretchr/testify v1.8.2
github.com/urfave/cli v1.22.13
github.com/multiversx/mx-chain-communication-go v1.0.12
github.com/multiversx/mx-chain-core-go v1.2.18
github.com/multiversx/mx-chain-logger-go v1.0.13
github.com/stretchr/testify v1.8.4
github.com/urfave/cli v1.22.14
)

require (
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/denisbrodbeck/machineid v1.0.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/pelletier/go-toml v1.9.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
golang.org/x/sys v0.2.0 // indirect
google.golang.org/protobuf v1.26.0 // indirect
golang.org/x/sys v0.8.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
140 changes: 21 additions & 119 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion process/firehoseDataProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewFirehoseDataProcessor(
}

// ProcessPayload will process the received payload only for TopicSaveBlock, otherwise ignores it.
func (dp *dataProcessor) ProcessPayload(payload []byte, topic string) error {
func (dp *dataProcessor) ProcessPayload(payload []byte, topic string, _ uint32) error {
operationHandler, found := dp.operationHandlers[topic]
if !found {
return nil
Expand Down
33 changes: 17 additions & 16 deletions process/firehoseDataProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (
"github.com/multiversx/mx-chain-core-go/data/block"
outportcore "github.com/multiversx/mx-chain-core-go/data/outport"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-ws-connector-template-go/testscommon"
"github.com/stretchr/testify/require"

"github.com/multiversx/mx-chain-ws-connector-template-go/testscommon"
)

var protoMarshaller = &marshal.GogoProtoMarshalizer{}
Expand Down Expand Up @@ -86,14 +87,14 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {

fi, _ := NewFirehoseDataProcessor(&testscommon.IoWriterStub{}, createContainer(), protoMarshaller)

err := fi.ProcessPayload(nil, outportcore.TopicSaveBlock)
err := fi.ProcessPayload(nil, outportcore.TopicSaveBlock, 1)
require.Equal(t, errNilOutportBlockData, err)

outportBlock := createOutportBlock()
outportBlock.BlockData = nil
outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock)

err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.Equal(t, errNilOutportBlockData, err)
})

Expand All @@ -102,7 +103,7 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {

fi, _ := NewFirehoseDataProcessor(&testscommon.IoWriterStub{}, createContainer(), protoMarshaller)

err := fi.ProcessPayload([]byte("invalid payload"), outportcore.TopicSaveBlock)
err := fi.ProcessPayload([]byte("invalid payload"), outportcore.TopicSaveBlock, 1)
require.NotNil(t, err)
})

Expand All @@ -123,7 +124,7 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {
fi, _ := NewFirehoseDataProcessor(ioWriter, createContainer(), protoMarshaller)

outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock)
err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.NotNil(t, err)
require.Equal(t, 0, ioWriterCalledCt)
})
Expand Down Expand Up @@ -163,7 +164,7 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {
}

fi, _ := NewFirehoseDataProcessor(ioWriter, createContainer(), marshaller)
err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.Equal(t, errUnmarshal, err)
})

Expand Down Expand Up @@ -197,13 +198,13 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {
outportBlock := createOutportBlock()
outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock)

err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.True(t, strings.Contains(err.Error(), err1.Error()))

err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.True(t, strings.Contains(err.Error(), err2.Error()))

err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.Nil(t, err)

require.Equal(t, 5, ioWriterCalledCt)
Expand Down Expand Up @@ -253,7 +254,7 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {

fi, _ := NewFirehoseDataProcessor(ioWriter, createContainer(), protoMarshaller)

err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock)
err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.Nil(t, err)
require.Equal(t, 2, ioWriterCalledCt)
})
Expand All @@ -265,12 +266,12 @@ func TestFirehoseIndexer_NoOperationFunctions(t *testing.T) {

fi, _ := NewFirehoseDataProcessor(&testscommon.IoWriterStub{}, createContainer(), protoMarshaller)

require.Nil(t, fi.ProcessPayload([]byte("payload"), "random topic"))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveRoundsInfo))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsRating))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsPubKeys))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveAccounts))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicFinalizedBlock))
require.Nil(t, fi.ProcessPayload([]byte("payload"), "random topic", 1))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveRoundsInfo, 1))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsRating, 1))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveValidatorsPubKeys, 1))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicSaveAccounts, 1))
require.Nil(t, fi.ProcessPayload([]byte("payload"), outportcore.TopicFinalizedBlock, 1))
}

func TestFirehoseIndexer_Close(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion process/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type WSConnector interface {

// DataProcessor defines a payload processor for incoming ws data
type DataProcessor interface {
ProcessPayload(payload []byte, topic string) error
ProcessPayload(payload []byte, topic string, version uint32) error
Close() error
IsInterfaceNil() bool
}
Expand Down
Loading