diff --git a/process/firehoseDataProcessor.go b/process/firehoseDataProcessor.go index f4c2a64..1ae2b58 100644 --- a/process/firehoseDataProcessor.go +++ b/process/firehoseDataProcessor.go @@ -18,6 +18,10 @@ var log = logger.GetOrCreate("firehose") const ( firehosePrefix = "FIRE" blockPrefix = "BLOCK" + initPrefix = "INIT" + + protocolReaderVersion = "1.0" + protoMessageType = "type.googleapis.com/bstream.pb.sf.bstream.v1.OutportBlock" ) type dataProcessor struct { @@ -53,6 +57,8 @@ func NewFirehoseDataProcessor( outport.TopicSaveBlock: dp.saveBlock, } + _, _ = fmt.Fprintf(dp.writer, "%s %s %s %s\n", firehosePrefix, initPrefix, protocolReaderVersion, protoMessageType) + return dp, nil }