diff --git a/factory/wsConnectorFactory.go b/factory/wsConnectorFactory.go index ccffbd7..434ad2b 100644 --- a/factory/wsConnectorFactory.go +++ b/factory/wsConnectorFactory.go @@ -79,7 +79,7 @@ func createWsHost(wsMarshaller marshal.Marshalizer, cfg config.WebSocketConfig) Mode: cfg.Mode, RetryDurationInSec: int(cfg.RetryDuration), BlockingAckOnError: cfg.BlockingAckOnError, - DropMessagesIfNoConnection: false, + DropMessagesIfNoConnection: cfg.DropMessagesIfNoConnection, AcknowledgeTimeoutInSec: cfg.AcknowledgeTimeoutInSec, Version: cfg.Version, }, diff --git a/process/firehoseDataProcessor.go b/process/firehoseDataProcessor.go index 488d375..6006976 100644 --- a/process/firehoseDataProcessor.go +++ b/process/firehoseDataProcessor.go @@ -1,6 +1,7 @@ package process import ( + "encoding/base64" "encoding/hex" "fmt" @@ -15,9 +16,12 @@ import ( var log = logger.GetOrCreate("firehose") const ( - firehosePrefix = "FIRE" - beginBlockPrefix = "BLOCK_BEGIN" - endBlockPrefix = "BLOCK_END" + firehosePrefix = "FIRE" + blockPrefix = "BLOCK" + initPrefix = "INIT" + + protocolReaderVersion = "1.0" + protoMessageType = "type.googleapis.com/proto.OutportBlock" ) type dataProcessor struct { @@ -53,6 +57,8 @@ func NewFirehoseDataProcessor( outport.TopicSaveBlock: dp.saveBlock, } + _, _ = fmt.Fprintf(dp.writer, "%s %s %s %s\n", firehosePrefix, initPrefix, protocolReaderVersion, protoMessageType) + return dp, nil } @@ -87,30 +93,28 @@ func (dp *dataProcessor) saveBlock(marshalledData []byte) error { return err } - log.Info("firehose: saving block", "nonce", header.GetNonce(), "hash", outportBlock.BlockData.HeaderHash) + log.Info("saving block", "nonce", header.GetNonce(), "hash", outportBlock.BlockData.HeaderHash) - _, err = fmt.Fprintf(dp.writer, "%s %s %d\n", - firehosePrefix, - beginBlockPrefix, - header.GetNonce(), - ) - if err != nil { - return fmt.Errorf("could not write %s prefix , err: %w", beginBlockPrefix, err) + blockNum := header.GetNonce() + parentNum := blockNum - 1 + if blockNum == 0 { + parentNum = 0 } + encodedMarshalledData := base64.StdEncoding.EncodeToString(marshalledData) - _, err = fmt.Fprintf(dp.writer, "%s %s %d %s %d %x\n", + _, err = fmt.Fprintf(dp.writer, "%s %s %d %s %d %s %d %d %s\n", firehosePrefix, - endBlockPrefix, - header.GetNonce(), + blockPrefix, + blockNum, + hex.EncodeToString(outportBlock.BlockData.HeaderHash), + parentNum, hex.EncodeToString(header.GetPrevHash()), + outportBlock.HighestFinalBlockNonce, header.GetTimeStamp(), - marshalledData, + encodedMarshalledData, ) - if err != nil { - return fmt.Errorf("could not write %s prefix , err: %w", endBlockPrefix, err) - } - return nil + return err } // Close will close the internal writer diff --git a/process/firehoseDataProcessor_test.go b/process/firehoseDataProcessor_test.go index 4046b9f..3e291f5 100644 --- a/process/firehoseDataProcessor_test.go +++ b/process/firehoseDataProcessor_test.go @@ -1,6 +1,7 @@ package process import ( + "encoding/base64" "encoding/hex" "errors" "fmt" @@ -126,7 +127,8 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) { outportBlockBytes, _ := protoMarshaller.Marshal(outportBlock) err := fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) require.NotNil(t, err) - require.Equal(t, 0, ioWriterCalledCt) + // New does the first write + require.Equal(t, 1, ioWriterCalledCt) }) t.Run("cannot unmarshall to get header from bytes, should return error", func(t *testing.T) { @@ -182,10 +184,12 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) { switch ioWriterCalledCt { case 0: - return 0, err1 - case 1: return 0, nil + case 1: + return 0, err1 case 2: + return 0, nil + case 3: return 0, err2 } @@ -202,12 +206,12 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) { require.True(t, strings.Contains(err.Error(), err1.Error())) err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) - require.True(t, strings.Contains(err.Error(), err2.Error())) + require.Nil(t, err) err = fi.ProcessPayload(outportBlockBytes, outportcore.TopicSaveBlock, 1) - require.Nil(t, err) + require.True(t, errors.Is(err, err2)) - require.Equal(t, 5, ioWriterCalledCt) + require.Equal(t, 4, ioWriterCalledCt) }) t.Run("should work", func(t *testing.T) { @@ -227,6 +231,8 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) { HeaderBytes: headerBytes, HeaderType: string(core.ShardHeaderV1), }, + + HighestFinalBlockNonce: 0, } outportBlockBytes, err := protoMarshaller.Marshal(outportBlock) require.Nil(t, err) @@ -240,11 +246,22 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) { switch ioWriterCalledCt { case 0: - require.Equal(t, []byte("FIRE BLOCK_BEGIN 1\n"), p) case 1: - require.Equal(t, []byte(fmt.Sprintf("FIRE BLOCK_END 1 %s 100 %x\n", + num := header.GetNonce() + parentNum := num - 1 + libNum := parentNum + encodedMvxBlock := base64.StdEncoding.EncodeToString(outportBlockBytes) + + require.Equal(t, []byte(fmt.Sprintf("%s %s %d %s %d %s %d %d %s\n", + firehosePrefix, + blockPrefix, + num, + hex.EncodeToString(outportBlock.BlockData.HeaderHash), + parentNum, hex.EncodeToString(header.PrevHash), - outportBlockBytes)), p) + libNum, + header.TimeStamp, + encodedMvxBlock)), p) default: require.Fail(t, "should not write again") } @@ -258,7 +275,6 @@ func TestFirehoseIndexer_SaveBlock(t *testing.T) { require.Nil(t, err) require.Equal(t, 2, ioWriterCalledCt) }) - } func TestFirehoseIndexer_NoOperationFunctions(t *testing.T) {