Skip to content

Commit

Permalink
Merge pull request #3 from multiversx/update-stdout-fields
Browse files Browse the repository at this point in the history
adapt stdout to upstream changes
  • Loading branch information
AdoAdoAdo authored Apr 24, 2024
2 parents c50085c + aa1ff1d commit af394ce
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 30 deletions.
2 changes: 1 addition & 1 deletion factory/wsConnectorFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func createWsHost(wsMarshaller marshal.Marshalizer, cfg config.WebSocketConfig)
Mode: cfg.Mode,
RetryDurationInSec: int(cfg.RetryDuration),
BlockingAckOnError: cfg.BlockingAckOnError,
DropMessagesIfNoConnection: false,
DropMessagesIfNoConnection: cfg.DropMessagesIfNoConnection,
AcknowledgeTimeoutInSec: cfg.AcknowledgeTimeoutInSec,
Version: cfg.Version,
},
Expand Down
42 changes: 23 additions & 19 deletions process/firehoseDataProcessor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package process

import (
"encoding/base64"
"encoding/hex"
"fmt"

Expand All @@ -15,9 +16,12 @@ import (
var log = logger.GetOrCreate("firehose")

const (
firehosePrefix = "FIRE"
beginBlockPrefix = "BLOCK_BEGIN"
endBlockPrefix = "BLOCK_END"
firehosePrefix = "FIRE"
blockPrefix = "BLOCK"
initPrefix = "INIT"

protocolReaderVersion = "1.0"
protoMessageType = "type.googleapis.com/proto.OutportBlock"
)

type dataProcessor struct {
Expand Down Expand Up @@ -53,6 +57,8 @@ func NewFirehoseDataProcessor(
outport.TopicSaveBlock: dp.saveBlock,
}

_, _ = fmt.Fprintf(dp.writer, "%s %s %s %s\n", firehosePrefix, initPrefix, protocolReaderVersion, protoMessageType)

return dp, nil
}

Expand Down Expand Up @@ -87,30 +93,28 @@ func (dp *dataProcessor) saveBlock(marshalledData []byte) error {
return err
}

log.Info("firehose: saving block", "nonce", header.GetNonce(), "hash", outportBlock.BlockData.HeaderHash)
log.Info("saving block", "nonce", header.GetNonce(), "hash", outportBlock.BlockData.HeaderHash)

_, err = fmt.Fprintf(dp.writer, "%s %s %d\n",
firehosePrefix,
beginBlockPrefix,
header.GetNonce(),
)
if err != nil {
return fmt.Errorf("could not write %s prefix , err: %w", beginBlockPrefix, err)
blockNum := header.GetNonce()
parentNum := blockNum - 1
if blockNum == 0 {
parentNum = 0
}
encodedMarshalledData := base64.StdEncoding.EncodeToString(marshalledData)

_, err = fmt.Fprintf(dp.writer, "%s %s %d %s %d %x\n",
_, err = fmt.Fprintf(dp.writer, "%s %s %d %s %d %s %d %d %s\n",
firehosePrefix,
endBlockPrefix,
header.GetNonce(),
blockPrefix,
blockNum,
hex.EncodeToString(outportBlock.BlockData.HeaderHash),
parentNum,
hex.EncodeToString(header.GetPrevHash()),
outportBlock.HighestFinalBlockNonce,
header.GetTimeStamp(),
marshalledData,
encodedMarshalledData,
)
if err != nil {
return fmt.Errorf("could not write %s prefix , err: %w", endBlockPrefix, err)
}

return nil
return err
}

// Close will close the internal writer
Expand Down
36 changes: 26 additions & 10 deletions process/firehoseDataProcessor_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package process

import (
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
Expand Down Expand Up @@ -126,7 +127,8 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {
outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock)
err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.NotNil(t, err)
require.Equal(t, 0, ioWriterCalledCt)
// New does the first write
require.Equal(t, 1, ioWriterCalledCt)
})

t.Run("cannot unmarshall to get header from bytes, should return error", func(t *testing.T) {
Expand Down Expand Up @@ -182,10 +184,12 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {

switch ioWriterCalledCt {
case 0:
return 0, err1
case 1:
return 0, nil
case 1:
return 0, err1
case 2:
return 0, nil
case 3:
return 0, err2
}

Expand All @@ -202,12 +206,12 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {
require.True(t, strings.Contains(err.Error(), err1.Error()))

err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.True(t, strings.Contains(err.Error(), err2.Error()))
require.Nil(t, err)

err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1)
require.Nil(t, err)
require.True(t, errors.Is(err, err2))

require.Equal(t, 5, ioWriterCalledCt)
require.Equal(t, 4, ioWriterCalledCt)
})

t.Run("should work", func(t *testing.T) {
Expand All @@ -227,6 +231,8 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {
HeaderBytes: headerBytes,
HeaderType: string(core.ShardHeaderV1),
},

HighestFinalBlockNonce: 0,
}
outportBlockBytes, err := protoMarshaller.Marshal(outportBlock)
require.Nil(t, err)
Expand All @@ -240,11 +246,22 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {

switch ioWriterCalledCt {
case 0:
require.Equal(t, []byte("FIRE BLOCK_BEGIN 1\n"), p)
case 1:
require.Equal(t, []byte(fmt.Sprintf("FIRE BLOCK_END 1 %s 100 %x\n",
num := header.GetNonce()
parentNum := num - 1
libNum := parentNum
encodedMvxBlock := base64.StdEncoding.EncodeToString(outportBlockBytes)

require.Equal(t, []byte(fmt.Sprintf("%s %s %d %s %d %s %d %d %s\n",
firehosePrefix,
blockPrefix,
num,
hex.EncodeToString(outportBlock.BlockData.HeaderHash),
parentNum,
hex.EncodeToString(header.PrevHash),
outportBlockBytes)), p)
libNum,
header.TimeStamp,
encodedMvxBlock)), p)
default:
require.Fail(t, "should not write again")
}
Expand All @@ -258,7 +275,6 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) {
require.Nil(t, err)
require.Equal(t, 2, ioWriterCalledCt)
})

}

func TestFirehoseIndexer_NoOperationFunctions(t *testing.T) {
Expand Down

0 comments on commit af394ce

Please sign in to comment.