From 80ce952ea3a63aa8787ddfac39b3326665f6b8f0 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Tue, 12 Sep 2023 10:42:49 -0500 Subject: [PATCH 01/11] ADD: Add ARCX publisher --- CHANGELOG.md | 4 ++++ rust/dbn/src/publishers.rs | 8 +++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a1167f..a2aab45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 0.10.3 - TBD +### Enhancements +- Added `ARCX.PILLAR.ARCX` publisher + ## 0.10.2 - 2023-09-12 ### Bug fixes - Fixed query range checking in `Metadata::symbol_map_for_date` diff --git a/rust/dbn/src/publishers.rs b/rust/dbn/src/publishers.rs index 22bfb11..fc17994 100644 --- a/rust/dbn/src/publishers.rs +++ b/rust/dbn/src/publishers.rs @@ -406,10 +406,12 @@ pub enum Publisher { DbeqBasicIexg = 41, /// DBEQ Basic - MIAX Pearl DbeqBasicEprl = 42, + /// NYSE Arca Integrated + ArcxPillarArcx = 43, } /// The number of Publisher variants. -pub const PUBLISHER_COUNT: usize = 42; +pub const PUBLISHER_COUNT: usize = 43; impl Publisher { /// Convert a Publisher to its `str` representation. @@ -457,6 +459,7 @@ impl Publisher { Self::DbeqBasicXcis => "DBEQ.BASIC.XCIS", Self::DbeqBasicIexg => "DBEQ.BASIC.IEXG", Self::DbeqBasicEprl => "DBEQ.BASIC.EPRL", + Self::ArcxPillarArcx => "ARCX.PILLAR.ARCX", } } @@ -505,6 +508,7 @@ impl Publisher { Self::DbeqBasicXcis => Venue::Xcis, Self::DbeqBasicIexg => Venue::Iexg, Self::DbeqBasicEprl => Venue::Eprl, + Self::ArcxPillarArcx => Venue::Arcx, } } @@ -553,6 +557,7 @@ impl Publisher { Self::DbeqBasicXcis => Dataset::DbeqBasic, Self::DbeqBasicIexg => Dataset::DbeqBasic, Self::DbeqBasicEprl => Dataset::DbeqBasic, + Self::ArcxPillarArcx => Dataset::ArcxPillar, } } } @@ -616,6 +621,7 @@ impl std::str::FromStr for Publisher { "DBEQ.BASIC.XCIS" => Ok(Self::DbeqBasicXcis), "DBEQ.BASIC.IEXG" => Ok(Self::DbeqBasicIexg), "DBEQ.BASIC.EPRL" => Ok(Self::DbeqBasicEprl), + "ARCX.PILLAR.ARCX" => Ok(Self::ArcxPillarArcx), _ => Err(Error::conversion::(s)), } } From 929791b5d87afafee7590e47b0399549d2c16cd6 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Tue, 12 Sep 2023 17:36:16 -0500 Subject: [PATCH 02/11] ADD: Add symbol field support to CSV and JSON --- CHANGELOG.md | 13 +- c/src/text_serialization.rs | 10 +- python/src/transcoder.rs | 3 +- rust/dbn-cli/src/main.rs | 2 +- rust/dbn/src/encode.rs | 65 ++- rust/dbn/src/encode/csv.rs | 728 +------------------------- rust/dbn/src/encode/csv/serialize.rs | 188 +++++++ rust/dbn/src/encode/csv/sync.rs | 652 +++++++++++++++++++++++ rust/dbn/src/encode/dbn/sync.rs | 22 +- rust/dbn/src/encode/json/serialize.rs | 23 + rust/dbn/src/encode/json/sync.rs | 83 ++- rust/dbn/src/lib.rs | 3 +- rust/dbn/src/record_ref.rs | 2 +- 13 files changed, 1028 insertions(+), 766 deletions(-) create mode 100644 rust/dbn/src/encode/csv/serialize.rs create mode 100644 rust/dbn/src/encode/csv/sync.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index a2aab45..870322a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,20 @@ # Changelog -## 0.10.3 - TBD +## 0.11.0 - TBD ### Enhancements +- Added new `EncodeRecordTextExt` trait which is implemented for the CSV and JSON + encoders. It adds two methods for encoding a `symbol` field along side the rest of the + record fields, matching the behavior of `map_symbols` in the historical API +- Added `encode_header` and `encode_header_for_schema` methods to `CsvEncoder` to give + more flexibility for encoding CSV headers +- Implemented `Copy` for `RecordRef` to make it behave more like a reference - Added `ARCX.PILLAR.ARCX` publisher +## Breaking changes +- Split `encode_record_ref` into a safe method with no arguments and an unsafe method + with a `ts_out` parameter to reduce `unsafe` usage when not working with live data + that may contain `ts_out` + ## 0.10.2 - 2023-09-12 ### Bug fixes - Fixed query range checking in `Metadata::symbol_map_for_date` diff --git a/c/src/text_serialization.rs b/c/src/text_serialization.rs index 0b03b39..57f5582 100644 --- a/c/src/text_serialization.rs +++ b/c/src/text_serialization.rs @@ -179,10 +179,10 @@ pub unsafe extern "C" fn s_serialize_record( let res = match options.encoding { TextEncoding::Json => { json::Encoder::new(&mut cursor, false, options.pretty_px, options.pretty_ts) - .encode_record_ref(record, options.ts_out) + .encode_record_ref_ts_out(record, options.ts_out) } TextEncoding::Csv => csv::Encoder::new(&mut cursor, options.pretty_px, options.pretty_ts) - .encode_record_ref(record, options.ts_out), + .encode_record_ref_ts_out(record, options.ts_out), } // null byte .and_then(|_| { @@ -232,10 +232,10 @@ pub unsafe extern "C" fn f_serialize_record( let res = match options.encoding { TextEncoding::Json => { json::Encoder::new(&mut cfile, false, options.pretty_px, options.pretty_ts) - .encode_record_ref(record, options.ts_out) + .encode_record_ref_ts_out(record, options.ts_out) } TextEncoding::Csv => csv::Encoder::new(&mut cfile, options.pretty_px, options.pretty_ts) - .encode_record_ref(record, options.ts_out), + .encode_record_ref_ts_out(record, options.ts_out), }; if res.is_ok() { cfile.bytes_written() as i32 @@ -266,5 +266,5 @@ fn serialize_csv_header( _rec: &R, encoder: &mut csv::Encoder, ) -> dbn::Result<()> { - encoder.encode_header::() + encoder.encode_header::(false) } diff --git a/python/src/transcoder.rs b/python/src/transcoder.rs index f488acd..c42f0aa 100644 --- a/python/src/transcoder.rs +++ b/python/src/transcoder.rs @@ -92,7 +92,8 @@ impl Transcoder { loop { match decoder.decode_record_ref() { Ok(Some(rec)) => { - unsafe { encoder.encode_record_ref(rec, self.ts_out) }.map_err(to_val_err)?; + unsafe { encoder.encode_record_ref_ts_out(rec, self.ts_out) } + .map_err(to_val_err)?; // keep track of position after last _successful_ decoding to // ensure buffer is left in correct state in the case where one // or more successful decodings is followed by a partial one, i.e. diff --git a/rust/dbn-cli/src/main.rs b/rust/dbn-cli/src/main.rs index c16f810..b8a6b23 100644 --- a/rust/dbn-cli/src/main.rs +++ b/rust/dbn-cli/src/main.rs @@ -83,7 +83,7 @@ fn write_dbn_frag( let mut n = 0; while let Some(record) = decoder.decode_record_ref()? { // Assume no ts_out for safety - match unsafe { encoder.encode_record_ref(record, false) } { + match encoder.encode_record_ref(record) { // Handle broken pipe as a non-error. Err(dbn::Error::Io { source, .. }) if source.kind() == std::io::ErrorKind::BrokenPipe => diff --git a/rust/dbn/src/encode.rs b/rust/dbn/src/encode.rs index 61c6323..d9e299a 100644 --- a/rust/dbn/src/encode.rs +++ b/rust/dbn/src/encode.rs @@ -24,13 +24,12 @@ pub use self::{ json::Encoder as JsonEncoder, }; -use crate::Error; use crate::{ decode::DecodeDbn, enums::{Compression, Encoding}, record::HasRType, record_ref::RecordRef, - Metadata, Result, + rtype_dispatch, Error, Metadata, Result, }; use self::{csv::serialize::CsvSerialize, json::serialize::JsonSerialize}; @@ -62,13 +61,21 @@ pub trait EncodeRecord { pub trait EncodeRecordRef { /// Encodes a single DBN [`RecordRef`]. /// + /// # Errors + /// This function returns an error if it's unable to write to the underlying writer + /// or there's a serialization error. + fn encode_record_ref(&mut self, record: RecordRef) -> Result<()>; + + /// Encodes a single DBN [`RecordRef`] with an optional `ts_out` (see + /// [`record::WithTsOut`](crate::record::WithTsOut)). + /// /// # Safety /// `ts_out` must be `false` if `record` does not have an appended `ts_out`. /// /// # Errors /// This function returns an error if it's unable to write to the underlying writer /// or there's a serialization error. - unsafe fn encode_record_ref(&mut self, record: RecordRef, ts_out: bool) -> Result<()>; + unsafe fn encode_record_ref_ts_out(&mut self, record: RecordRef, ts_out: bool) -> Result<()>; } /// Trait for types that encode DBN records with a specific record type. @@ -112,7 +119,7 @@ pub trait EncodeDbn: EncodeRecord + EncodeRecordRef { while let Some(record) = decoder.decode_record_ref()? { // Safety: It's safe to cast to `WithTsOut` because we're passing in the `ts_out` // from the metadata header. - unsafe { self.encode_record_ref(record, ts_out) }?; + unsafe { self.encode_record_ref_ts_out(record, ts_out) }?; } self.flush()?; Ok(()) @@ -134,7 +141,7 @@ pub trait EncodeDbn: EncodeRecord + EncodeRecordRef { while let Some(record) = decoder.decode_record_ref()? { // Safety: It's safe to cast to `WithTsOut` because we're passing in the `ts_out` // from the metadata header. - unsafe { self.encode_record_ref(record, ts_out) }?; + unsafe { self.encode_record_ref_ts_out(record, ts_out) }?; i += 1; if i == limit.get() { break; @@ -145,6 +152,30 @@ pub trait EncodeDbn: EncodeRecord + EncodeRecordRef { } } +/// Extension trait for text encodings. +pub trait EncodeRecordTextExt: EncodeRecord + EncodeRecordRef { + /// Encodes a single DBN record of type `R` along with the record's text symbol. + /// + /// # Errors + /// This function returns an error if it's unable to write to the underlying writer + /// or there's a serialization error. + fn encode_record_with_sym( + &mut self, + record: &R, + symbol: Option<&str>, + ) -> Result<()>; + + /// Encodes a single DBN [`RecordRef`] along with the record's text symbol. + /// + /// # Errors + /// This function returns an error if it's unable to write to the underlying writer + /// or there's a serialization error. + fn encode_ref_with_sym(&mut self, record: RecordRef, symbol: Option<&str>) -> Result<()> { + #[allow(clippy::redundant_closure_call)] + rtype_dispatch!(record, |rec| self.encode_record_with_sym(rec, symbol))? + } +} + /// The default Zstandard compression level. const ZSTD_COMPRESSION_LEVEL: i32 = 0; @@ -310,8 +341,12 @@ impl<'a, W> EncodeRecordRef for DynEncoder<'a, W> where W: io::Write, { - unsafe fn encode_record_ref(&mut self, record: RecordRef, ts_out: bool) -> Result<()> { - self.0.encode_record_ref(record, ts_out) + fn encode_record_ref(&mut self, record: RecordRef) -> Result<()> { + self.0.encode_record_ref(record) + } + + unsafe fn encode_record_ref_ts_out(&mut self, record: RecordRef, ts_out: bool) -> Result<()> { + self.0.encode_record_ref_ts_out(record, ts_out) } } @@ -360,11 +395,19 @@ impl<'a, W> EncodeRecordRef for DynEncoderImpl<'a, W> where W: io::Write, { - unsafe fn encode_record_ref(&mut self, record: RecordRef, ts_out: bool) -> Result<()> { + fn encode_record_ref(&mut self, record: RecordRef) -> Result<()> { + match self { + DynEncoderImpl::Dbn(enc) => enc.encode_record_ref(record), + DynEncoderImpl::Csv(enc) => enc.encode_record_ref(record), + DynEncoderImpl::Json(enc) => enc.encode_record_ref(record), + } + } + + unsafe fn encode_record_ref_ts_out(&mut self, record: RecordRef, ts_out: bool) -> Result<()> { match self { - DynEncoderImpl::Dbn(enc) => enc.encode_record_ref(record, ts_out), - DynEncoderImpl::Csv(enc) => enc.encode_record_ref(record, ts_out), - DynEncoderImpl::Json(enc) => enc.encode_record_ref(record, ts_out), + DynEncoderImpl::Dbn(enc) => enc.encode_record_ref_ts_out(record, ts_out), + DynEncoderImpl::Csv(enc) => enc.encode_record_ref_ts_out(record, ts_out), + DynEncoderImpl::Json(enc) => enc.encode_record_ref_ts_out(record, ts_out), } } } diff --git a/rust/dbn/src/encode/csv.rs b/rust/dbn/src/encode/csv.rs index e5754a7..692b4c7 100644 --- a/rust/dbn/src/encode/csv.rs +++ b/rust/dbn/src/encode/csv.rs @@ -1,728 +1,6 @@ //! Encoding of DBN records into comma-separated values (CSV). -use std::{io, num::NonZeroU64}; -use streaming_iterator::StreamingIterator; +pub(crate) mod serialize; +mod sync; -use super::{EncodeDbn, EncodeRecord, EncodeRecordRef}; -use crate::{ - decode::DecodeDbn, enums::RType, rtype_ts_out_dispatch, schema_method_dispatch, Error, Result, -}; - -/// Type for encoding files and streams of DBN records in CSV. -/// -/// Note that encoding [`Metadata`](crate::Metadata) in CSV is not supported. -pub struct Encoder -where - W: io::Write, -{ - writer: csv::Writer, - use_pretty_px: bool, - use_pretty_ts: bool, -} - -impl Encoder -where - W: io::Write, -{ - /// Creates a new [`Encoder`] that will write to `writer`. If `use_pretty_px` - /// is `true`, price fields will be serialized as a decimal. If `pretty_ts` is - /// `true`, timestamp fields will be serialized in a ISO8601 datetime string. - pub fn new(writer: W, use_pretty_px: bool, use_pretty_ts: bool) -> Self { - let csv_writer = csv::WriterBuilder::new() - .has_headers(false) // need to write our own custom header - .from_writer(writer); - Self { - writer: csv_writer, - use_pretty_px, - use_pretty_ts, - } - } - - /// Returns a reference to the underlying writer. - pub fn get_ref(&self) -> &W { - self.writer.get_ref() - } - - #[doc(hidden)] - pub fn encode_header(&mut self) -> Result<()> { - R::serialize_header(&mut self.writer)?; - // end of line - self.writer.write_record(None::<&[u8]>)?; - Ok(()) - } -} - -impl EncodeRecord for Encoder -where - W: io::Write, -{ - fn encode_record(&mut self, record: &R) -> Result<()> { - let serialize_res = match (self.use_pretty_px, self.use_pretty_ts) { - (true, true) => record.serialize_to::<_, true, true>(&mut self.writer), - (true, false) => record.serialize_to::<_, true, false>(&mut self.writer), - (false, true) => record.serialize_to::<_, false, true>(&mut self.writer), - (false, false) => record.serialize_to::<_, false, false>(&mut self.writer), - }; - match serialize_res - // write new line - .and_then(|_| self.writer.write_record(None::<&[u8]>)) - { - Ok(()) => Ok(()), - Err(e) => Err(match e.into_kind() { - csv::ErrorKind::Io(err) => Error::io(err, format!("serializing {record:?}")), - e => Error::encode(format!("Failed to serialize {record:?}: {e:?}")), - }), - } - } - - fn flush(&mut self) -> Result<()> { - self.writer - .flush() - .map_err(|e| Error::io(e, "flushing output")) - } -} - -impl EncodeRecordRef for Encoder -where - W: io::Write, -{ - unsafe fn encode_record_ref(&mut self, record: crate::RecordRef, ts_out: bool) -> Result<()> { - #[allow(clippy::redundant_closure_call)] - rtype_ts_out_dispatch!(record, ts_out, |rec| self.encode_record(rec))? - } -} - -impl EncodeDbn for Encoder -where - W: io::Write, -{ - fn encode_records(&mut self, records: &[R]) -> Result<()> { - self.encode_header::()?; - for record in records { - self.encode_record(record)?; - } - self.flush()?; - Ok(()) - } - - /// Encodes a stream of DBN records. - /// - /// # Errors - /// This function returns an error if it's unable to write to the underlying writer - /// or there's a serialization error. - fn encode_stream( - &mut self, - mut stream: impl StreamingIterator, - ) -> Result<()> { - self.encode_header::()?; - while let Some(record) = stream.next() { - self.encode_record(record)?; - } - self.flush()?; - Ok(()) - } - - /// Encode DBN records directly from a DBN decoder. This implemented outside - /// [`EncodeDbn`] because the CSV encoder has the additional constraint of only - /// being able to encode a single schema in a stream. - /// - /// # Errors - /// This function returns an error if it's unable to write to the underlying writer - /// or there's a serialization error. - fn encode_decoded(&mut self, mut decoder: D) -> Result<()> { - let ts_out = decoder.metadata().ts_out; - if let Some(schema) = decoder.metadata().schema { - schema_method_dispatch!(schema, self, encode_header)?; - let rtype = RType::from(schema); - while let Some(record) = decoder.decode_record_ref()? { - if record.rtype().map_or(true, |r| r != rtype) { - return Err(Error::encode(format!("Schema indicated {rtype:?}, but found record with rtype {:?}. Mixed schemas cannot be encoded in CSV.", record.rtype()))); - } - // Safety: It's safe to cast to `WithTsOut` because we're passing in the `ts_out` - // from the metadata header. - unsafe { self.encode_record_ref(record, ts_out) }?; - } - self.flush()?; - Ok(()) - } else { - Err(Error::encode("Can't encode a CSV with mixed schemas")) - } - } - - fn encode_decoded_with_limit( - &mut self, - mut decoder: D, - limit: NonZeroU64, - ) -> Result<()> { - let ts_out = decoder.metadata().ts_out; - if let Some(schema) = decoder.metadata().schema { - schema_method_dispatch!(schema, self, encode_header)?; - let rtype = RType::from(schema); - let mut i = 0; - while let Some(record) = decoder.decode_record_ref()? { - if record.rtype().map_or(true, |r| r != rtype) { - return Err(Error::encode(format!("Schema indicated {rtype:?}, but found record with rtype {:?}. Mixed schemas cannot be encoded in CSV.", record.rtype()))); - } - // Safety: It's safe to cast to `WithTsOut` because we're passing in the `ts_out` - // from the metadata header. - unsafe { self.encode_record_ref(record, ts_out) }?; - i += 1; - if i == limit.get() { - break; - } - } - self.flush()?; - Ok(()) - } else { - Err(Error::encode("Can't encode a CSV with mixed schemas")) - } - } -} - -pub(crate) mod serialize { - use std::{ffi::c_char, io}; - - use csv::Writer; - - use crate::{ - enums::{SecurityUpdateAction, UserDefinedInstrument}, - pretty::{fmt_px, fmt_ts}, - record::{c_chars_to_str, BidAskPair, HasRType, RecordHeader, WithTsOut}, - UNDEF_PRICE, UNDEF_TIMESTAMP, - }; - - /// Because of the flat nature of CSVs, there are several limitations in the - /// Rust CSV serde serialization library. This trait helps work around them. - pub trait CsvSerialize { - /// Encode the header to `csv_writer`. - fn serialize_header(csv_writer: &mut Writer) -> csv::Result<()>; - - /// Serialize the object to `csv_writer`. Allows custom behavior that would otherwise - /// cause a runtime error, e.g. serializing a struct with array field. - fn serialize_to( - &self, - csv_writer: &mut Writer, - ) -> csv::Result<()>; - } - - impl CsvSerialize for WithTsOut { - fn serialize_header(csv_writer: &mut Writer) -> csv::Result<()> { - T::serialize_header(csv_writer)?; - csv_writer.write_field("ts_out") - } - - fn serialize_to( - &self, - csv_writer: &mut Writer, - ) -> csv::Result<()> { - self.rec - .serialize_to::(csv_writer)?; - write_ts_field::(csv_writer, self.ts_out) - } - } - - pub trait WriteField { - fn write_header(csv_writer: &mut Writer, name: &str) -> csv::Result<()> { - csv_writer.write_field(name) - } - - fn write_field( - &self, - writer: &mut Writer, - ) -> csv::Result<()>; - } - - impl WriteField for RecordHeader { - fn write_field( - &self, - writer: &mut Writer, - ) -> csv::Result<()> { - self.serialize_to::(writer) - } - - fn write_header(csv_writer: &mut Writer, _name: &str) -> csv::Result<()> { - Self::serialize_header(csv_writer) - } - } - - impl WriteField for [BidAskPair; N] { - fn write_header(csv_writer: &mut Writer, _name: &str) -> csv::Result<()> { - for i in 0..N { - for f in ["bid_px", "ask_px", "bid_sz", "ask_sz", "bid_ct", "ask_ct"] { - csv_writer.write_field(&format!("{f}_{i:02}"))?; - } - } - Ok(()) - } - - fn write_field( - &self, - writer: &mut csv::Writer, - ) -> csv::Result<()> { - for level in self.iter() { - write_px_field::<_, PRETTY_PX>(writer, level.bid_px)?; - write_px_field::<_, PRETTY_PX>(writer, level.ask_px)?; - writer.write_field(&level.bid_sz.to_string())?; - writer.write_field(&level.ask_sz.to_string())?; - writer.write_field(&level.bid_ct.to_string())?; - writer.write_field(&level.ask_ct.to_string())?; - } - Ok(()) - } - } - macro_rules! impl_write_field_for { - ($($ty:ident),+) => { - $( - impl WriteField for $ty { - fn write_field( - &self, - writer: &mut Writer, - ) -> csv::Result<()> { - writer.write_field(&self.to_string()) - } - } - )* - }; - } - - impl_write_field_for! {i64, u64, i32, u32, i16, u16, i8, u8, bool} - - impl WriteField for [c_char; N] { - fn write_field( - &self, - writer: &mut Writer, - ) -> csv::Result<()> { - writer.write_field(c_chars_to_str(self).unwrap_or_default()) - } - } - - impl WriteField for SecurityUpdateAction { - fn write_field( - &self, - writer: &mut Writer, - ) -> csv::Result<()> { - writer.write_field(&(*self as u8 as char).to_string()) - } - } - - impl WriteField for UserDefinedInstrument { - fn write_field( - &self, - writer: &mut Writer, - ) -> csv::Result<()> { - writer.write_field(&(*self as u8 as char).to_string()) - } - } - - pub fn write_px_field( - csv_writer: &mut Writer, - px: i64, - ) -> csv::Result<()> { - if PRETTY_PX { - if px == UNDEF_PRICE { - csv_writer.write_field("") - } else { - csv_writer.write_field(fmt_px(px)) - } - } else { - csv_writer.write_field(px.to_string()) - } - } - - pub fn write_ts_field( - csv_writer: &mut Writer, - ts: u64, - ) -> csv::Result<()> { - if PRETTY_TS { - match ts { - 0 | UNDEF_TIMESTAMP => csv_writer.write_field(""), - ts => csv_writer.write_field(fmt_ts(ts)), - } - } else { - csv_writer.write_field(ts.to_string()) - } - } - - pub fn write_c_char_field( - csv_writer: &mut Writer, - c: c_char, - ) -> csv::Result<()> { - // Handle NUL byte - if c == 0 { - csv_writer.write_field(String::new()) - } else { - csv_writer.write_field((c as u8 as char).to_string()) - } - } -} - -#[cfg(test)] -mod tests { - use std::{array, io::BufWriter, os::raw::c_char}; - - use super::{serialize::write_c_char_field, *}; - use crate::{ - encode::test_data::{VecStream, BID_ASK, RECORD_HEADER}, - enums::{ - InstrumentClass, SecurityUpdateAction, StatType, StatUpdateAction, - UserDefinedInstrument, - }, - record::{ - str_to_c_chars, ImbalanceMsg, InstrumentDefMsg, MboMsg, Mbp10Msg, Mbp1Msg, OhlcvMsg, - StatMsg, StatusMsg, TradeMsg, WithTsOut, - }, - }; - - const HEADER_CSV: &str = "1658441851000000000,4,1,323"; - - const BID_ASK_CSV: &str = "372000000000000,372500000000000,10,5,5,2"; - - fn extract_2nd_line(buffer: Vec) -> String { - let output = String::from_utf8(buffer).expect("valid UTF-8"); - let (first, second) = output.split_once('\n').expect("two lines"); - assert!(!first.trim().is_empty()); - second - .trim_end() // remove newline - .to_owned() - } - - #[test] - fn test_mbo_encode_stream() { - let data = vec![MboMsg { - hd: RECORD_HEADER, - order_id: 16, - price: 5500, - size: 3, - flags: 128, - channel_id: 14, - action: 'B' as c_char, - side: 'B' as c_char, - ts_recv: 1658441891000000000, - ts_in_delta: 22_000, - sequence: 1_002_375, - }]; - let mut buffer = Vec::new(); - let writer = BufWriter::new(&mut buffer); - Encoder::new(writer, false, false) - .encode_stream(VecStream::new(data)) - .unwrap(); - let line = extract_2nd_line(buffer); - assert_eq!( - line, - format!("1658441891000000000,{HEADER_CSV},B,B,5500,3,14,16,128,22000,1002375") - ); - } - - #[test] - fn test_mbp1_encode_records() { - let data = vec![Mbp1Msg { - hd: RECORD_HEADER, - price: 5500, - size: 3, - action: 'M' as c_char, - side: 'A' as c_char, - flags: 128, - depth: 9, - ts_recv: 1658441891000000000, - ts_in_delta: 22_000, - sequence: 1_002_375, - levels: [BID_ASK], - }]; - let mut buffer = Vec::new(); - let writer = BufWriter::new(&mut buffer); - Encoder::new(writer, false, false) - .encode_records(data.as_slice()) - .unwrap(); - let line = extract_2nd_line(buffer); - assert_eq!( - line, - format!( - "1658441891000000000,{HEADER_CSV},M,A,9,5500,3,128,22000,1002375,{BID_ASK_CSV}" - ) - ); - } - - #[test] - fn test_mbp10_encode_stream() { - let data = vec![Mbp10Msg { - hd: RECORD_HEADER, - price: 5500, - size: 3, - action: 'B' as c_char, - side: 'A' as c_char, - flags: 128, - depth: 9, - ts_recv: 1658441891000000000, - ts_in_delta: 22_000, - sequence: 1_002_375, - levels: array::from_fn(|_| BID_ASK.clone()), - }]; - let mut buffer = Vec::new(); - let writer = BufWriter::new(&mut buffer); - Encoder::new(writer, false, false) - .encode_stream(VecStream::new(data)) - .unwrap(); - let line = extract_2nd_line(buffer); - assert_eq!( - line, - format!("1658441891000000000,{HEADER_CSV},B,A,9,5500,3,128,22000,1002375,{BID_ASK_CSV},{BID_ASK_CSV},{BID_ASK_CSV},{BID_ASK_CSV},{BID_ASK_CSV},{BID_ASK_CSV},{BID_ASK_CSV},{BID_ASK_CSV},{BID_ASK_CSV},{BID_ASK_CSV}") - ); - } - - #[test] - fn test_trade_encode_records() { - let data = vec![TradeMsg { - hd: RECORD_HEADER, - price: 5500, - size: 3, - action: 'B' as c_char, - side: 'B' as c_char, - flags: 128, - depth: 9, - ts_recv: 1658441891000000000, - ts_in_delta: 22_000, - sequence: 1_002_375, - }]; - let mut buffer = Vec::new(); - let writer = BufWriter::new(&mut buffer); - Encoder::new(writer, false, false) - .encode_records(data.as_slice()) - .unwrap(); - let line = extract_2nd_line(buffer); - assert_eq!( - line, - format!("1658441891000000000,{HEADER_CSV},B,B,9,5500,3,128,22000,1002375") - ); - } - - #[test] - fn test_ohlcv_encode_stream() { - let data = vec![OhlcvMsg { - hd: RECORD_HEADER, - open: 5000, - high: 8000, - low: 3000, - close: 6000, - volume: 55_000, - }]; - let mut buffer = Vec::new(); - let writer = BufWriter::new(&mut buffer); - Encoder::new(writer, false, false) - .encode_stream(VecStream::new(data)) - .unwrap(); - let line = extract_2nd_line(buffer); - assert_eq!(line, format!("{HEADER_CSV},5000,8000,3000,6000,55000")); - } - - #[test] - fn test_status_encode_records() { - let mut group = [0; 21]; - for (i, c) in "group".chars().enumerate() { - group[i] = c as c_char; - } - let data = vec![StatusMsg { - hd: RECORD_HEADER, - ts_recv: 1658441891000000000, - group, - trading_status: 3, - halt_reason: 4, - trading_event: 6, - }]; - let mut buffer = Vec::new(); - let writer = BufWriter::new(&mut buffer); - Encoder::new(writer, false, false) - .encode_records(data.as_slice()) - .unwrap(); - let line = extract_2nd_line(buffer); - assert_eq!( - line, - format!("{HEADER_CSV},1658441891000000000,group,3,4,6") - ); - } - - #[test] - fn test_instrument_def_encode_stream() { - let data = vec![InstrumentDefMsg { - hd: RECORD_HEADER, - ts_recv: 1658441891000000000, - min_price_increment: 100, - display_factor: 1000, - expiration: 1698450000000000000, - activation: 1697350000000000000, - high_limit_price: 1_000_000, - low_limit_price: -1_000_000, - max_price_variation: 0, - trading_reference_price: 500_000, - unit_of_measure_qty: 5, - min_price_increment_amount: 5, - price_ratio: 10, - inst_attrib_value: 10, - underlying_id: 256785, - raw_instrument_id: RECORD_HEADER.instrument_id, - market_depth_implied: 0, - market_depth: 13, - market_segment_id: 0, - max_trade_vol: 10_000, - min_lot_size: 1, - min_lot_size_block: 1000, - min_lot_size_round_lot: 100, - min_trade_vol: 1, - contract_multiplier: 0, - decay_quantity: 0, - original_contract_size: 0, - trading_reference_date: 0, - appl_id: 0, - maturity_year: 0, - decay_start_date: 0, - channel_id: 4, - currency: str_to_c_chars("USD").unwrap(), - settl_currency: str_to_c_chars("USD").unwrap(), - secsubtype: Default::default(), - raw_symbol: str_to_c_chars("ESZ4 C4100").unwrap(), - group: str_to_c_chars("EW").unwrap(), - exchange: str_to_c_chars("XCME").unwrap(), - asset: str_to_c_chars("ES").unwrap(), - cfi: str_to_c_chars("OCAFPS").unwrap(), - security_type: str_to_c_chars("OOF").unwrap(), - unit_of_measure: str_to_c_chars("IPNT").unwrap(), - underlying: str_to_c_chars("ESZ4").unwrap(), - strike_price_currency: str_to_c_chars("USD").unwrap(), - instrument_class: InstrumentClass::Call as u8 as c_char, - strike_price: 4_100_000_000_000, - match_algorithm: 'F' as c_char, - md_security_trading_status: 2, - main_fraction: 4, - price_display_format: 8, - settl_price_type: 9, - sub_fraction: 23, - underlying_product: 10, - security_update_action: SecurityUpdateAction::Add, - maturity_month: 8, - maturity_day: 9, - maturity_week: 11, - user_defined_instrument: UserDefinedInstrument::No, - contract_multiplier_unit: 0, - flow_schedule_type: 5, - tick_rule: 0, - _reserved2: Default::default(), - _reserved3: Default::default(), - _reserved4: Default::default(), - _reserved5: Default::default(), - _dummy: [0; 3], - }]; - let mut buffer = Vec::new(); - let writer = BufWriter::new(&mut buffer); - Encoder::new(writer, false, false) - .encode_stream(VecStream::new(data)) - .unwrap(); - let line = extract_2nd_line(buffer); - assert_eq!(line, format!("1658441891000000000,{HEADER_CSV},ESZ4 C4100,A,C,100,1000,1698450000000000000,1697350000000000000,1000000,-1000000,0,500000,5,5,10,10,256785,323,0,13,0,10000,1,1000,100,1,0,0,0,0,0,0,0,4,USD,USD,,EW,XCME,ES,OCAFPS,OOF,IPNT,ESZ4,USD,4100000000000,F,2,4,8,9,23,10,8,9,11,N,0,5,0")); - } - - #[test] - fn test_encode_with_ts_out() { - let data = vec![WithTsOut { - rec: TradeMsg { - hd: RECORD_HEADER, - price: 5500, - size: 3, - action: 'T' as c_char, - side: 'A' as c_char, - flags: 128, - depth: 9, - ts_recv: 1658441891000000000, - ts_in_delta: 22_000, - sequence: 1_002_375, - }, - ts_out: 1678480044000000000, - }]; - let mut buffer = Vec::new(); - let writer = BufWriter::new(&mut buffer); - Encoder::new(writer, false, false) - .encode_records(data.as_slice()) - .unwrap(); - let lines = String::from_utf8(buffer).expect("valid UTF-8"); - assert_eq!( - lines, - format!("ts_recv,ts_event,rtype,publisher_id,instrument_id,action,side,depth,price,size,flags,ts_in_delta,sequence,ts_out\n1658441891000000000,{HEADER_CSV},T,A,9,5500,3,128,22000,1002375,1678480044000000000\n") - ); - } - - #[test] - fn test_imbalance_encode_records() { - let data = vec![ImbalanceMsg { - hd: RECORD_HEADER, - ts_recv: 1, - ref_price: 2, - auction_time: 3, - cont_book_clr_price: 4, - auct_interest_clr_price: 5, - ssr_filling_price: 6, - ind_match_price: 7, - upper_collar: 8, - lower_collar: 9, - paired_qty: 10, - total_imbalance_qty: 11, - market_imbalance_qty: 12, - unpaired_qty: 13, - auction_type: 'B' as c_char, - side: 'A' as c_char, - auction_status: 14, - freeze_status: 15, - num_extensions: 16, - unpaired_side: 'A' as c_char, - significant_imbalance: 'N' as c_char, - _dummy: [0], - }]; - let mut buffer = Vec::new(); - let writer = BufWriter::new(&mut buffer); - Encoder::new(writer, false, false) - .encode_records(data.as_slice()) - .unwrap(); - let line = extract_2nd_line(buffer); - assert_eq!( - line, - format!("1,{HEADER_CSV},2,3,4,5,6,7,8,9,10,11,12,13,B,A,14,15,16,A,N") - ); - } - - #[test] - fn test_stat_encode_stream() { - let data = vec![StatMsg { - hd: RECORD_HEADER, - ts_recv: 1, - ts_ref: 2, - price: 3, - quantity: 0, - sequence: 4, - ts_in_delta: 5, - stat_type: StatType::OpeningPrice as u16, - channel_id: 7, - update_action: StatUpdateAction::New as u8, - stat_flags: 0, - _dummy: Default::default(), - }]; - let mut buffer = Vec::new(); - let writer = BufWriter::new(&mut buffer); - Encoder::new(writer, false, false) - .encode_stream(VecStream::new(data)) - .unwrap(); - let line = extract_2nd_line(buffer); - assert_eq!(line, format!("1,{HEADER_CSV},2,3,0,4,5,1,7,1,0")); - } - - #[test] - fn test_write_char_nul() { - let mut buffer = Vec::new(); - let mut writer = csv::WriterBuilder::new().from_writer(&mut buffer); - write_c_char_field(&mut writer, 0).unwrap(); - writer.write_field("a").unwrap(); - writer.flush().unwrap(); - drop(writer); - let s = std::str::from_utf8(buffer.as_slice()).unwrap(); - assert_eq!(s, ",a"); - } - - #[test] - fn test_writes_header_for_0_records() {} -} +pub use sync::Encoder; diff --git a/rust/dbn/src/encode/csv/serialize.rs b/rust/dbn/src/encode/csv/serialize.rs new file mode 100644 index 0000000..1255ff9 --- /dev/null +++ b/rust/dbn/src/encode/csv/serialize.rs @@ -0,0 +1,188 @@ +use std::{ffi::c_char, io}; + +use csv::Writer; + +use crate::{ + enums::{SecurityUpdateAction, UserDefinedInstrument}, + pretty::{fmt_px, fmt_ts}, + record::{c_chars_to_str, BidAskPair, HasRType, RecordHeader, WithTsOut}, + UNDEF_PRICE, UNDEF_TIMESTAMP, +}; + +/// Because of the flat nature of CSVs, there are several limitations in the +/// Rust CSV serde serialization library. This trait helps work around them. +pub trait CsvSerialize { + /// Encode the header to `csv_writer`. + fn serialize_header(csv_writer: &mut Writer) -> csv::Result<()>; + + /// Serialize the object to `csv_writer`. Allows custom behavior that would otherwise + /// cause a runtime error, e.g. serializing a struct with array field. + fn serialize_to( + &self, + csv_writer: &mut Writer, + ) -> csv::Result<()>; +} + +impl CsvSerialize for WithTsOut { + fn serialize_header(csv_writer: &mut Writer) -> csv::Result<()> { + T::serialize_header(csv_writer)?; + csv_writer.write_field("ts_out") + } + + fn serialize_to( + &self, + csv_writer: &mut Writer, + ) -> csv::Result<()> { + self.rec + .serialize_to::(csv_writer)?; + write_ts_field::(csv_writer, self.ts_out) + } +} + +pub trait WriteField { + fn write_header(csv_writer: &mut Writer, name: &str) -> csv::Result<()> { + csv_writer.write_field(name) + } + + fn write_field( + &self, + writer: &mut Writer, + ) -> csv::Result<()>; +} + +impl WriteField for RecordHeader { + fn write_field( + &self, + writer: &mut Writer, + ) -> csv::Result<()> { + self.serialize_to::(writer) + } + + fn write_header(csv_writer: &mut Writer, _name: &str) -> csv::Result<()> { + Self::serialize_header(csv_writer) + } +} + +impl WriteField for [BidAskPair; N] { + fn write_header(csv_writer: &mut Writer, _name: &str) -> csv::Result<()> { + for i in 0..N { + for f in ["bid_px", "ask_px", "bid_sz", "ask_sz", "bid_ct", "ask_ct"] { + csv_writer.write_field(&format!("{f}_{i:02}"))?; + } + } + Ok(()) + } + + fn write_field( + &self, + writer: &mut csv::Writer, + ) -> csv::Result<()> { + for level in self.iter() { + write_px_field::<_, PRETTY_PX>(writer, level.bid_px)?; + write_px_field::<_, PRETTY_PX>(writer, level.ask_px)?; + writer.write_field(&level.bid_sz.to_string())?; + writer.write_field(&level.ask_sz.to_string())?; + writer.write_field(&level.bid_ct.to_string())?; + writer.write_field(&level.ask_ct.to_string())?; + } + Ok(()) + } +} +macro_rules! impl_write_field_for { + ($($ty:ident),+) => { + $( + impl WriteField for $ty { + fn write_field( + &self, + writer: &mut Writer, + ) -> csv::Result<()> { + writer.write_field(&self.to_string()) + } + } + )* + }; + } + +impl_write_field_for! {i64, u64, i32, u32, i16, u16, i8, u8, bool} + +impl WriteField for [c_char; N] { + fn write_field( + &self, + writer: &mut Writer, + ) -> csv::Result<()> { + writer.write_field(c_chars_to_str(self).unwrap_or_default()) + } +} + +impl WriteField for SecurityUpdateAction { + fn write_field( + &self, + writer: &mut Writer, + ) -> csv::Result<()> { + writer.write_field(&(*self as u8 as char).to_string()) + } +} + +impl WriteField for UserDefinedInstrument { + fn write_field( + &self, + writer: &mut Writer, + ) -> csv::Result<()> { + writer.write_field(&(*self as u8 as char).to_string()) + } +} + +pub fn write_px_field( + csv_writer: &mut Writer, + px: i64, +) -> csv::Result<()> { + if PRETTY_PX { + if px == UNDEF_PRICE { + csv_writer.write_field("") + } else { + csv_writer.write_field(fmt_px(px)) + } + } else { + csv_writer.write_field(px.to_string()) + } +} + +pub fn write_ts_field( + csv_writer: &mut Writer, + ts: u64, +) -> csv::Result<()> { + if PRETTY_TS { + match ts { + 0 | UNDEF_TIMESTAMP => csv_writer.write_field(""), + ts => csv_writer.write_field(fmt_ts(ts)), + } + } else { + csv_writer.write_field(ts.to_string()) + } +} + +pub fn write_c_char_field(csv_writer: &mut Writer, c: c_char) -> csv::Result<()> { + // Handle NUL byte + if c == 0 { + csv_writer.write_field(String::new()) + } else { + csv_writer.write_field((c as u8 as char).to_string()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_write_char_nul() { + let mut buffer = Vec::new(); + let mut writer = csv::WriterBuilder::new().from_writer(&mut buffer); + write_c_char_field(&mut writer, 0).unwrap(); + writer.write_field("a").unwrap(); + writer.flush().unwrap(); + drop(writer); + let s = std::str::from_utf8(buffer.as_slice()).unwrap(); + assert_eq!(s, ",a"); + } +} diff --git a/rust/dbn/src/encode/csv/sync.rs b/rust/dbn/src/encode/csv/sync.rs new file mode 100644 index 0000000..7722d26 --- /dev/null +++ b/rust/dbn/src/encode/csv/sync.rs @@ -0,0 +1,652 @@ +use std::{io, num::NonZeroU64}; + +use streaming_iterator::StreamingIterator; + +use crate::{ + decode::DecodeDbn, + encode::{DbnEncodable, EncodeDbn, EncodeRecord, EncodeRecordRef, EncodeRecordTextExt}, + enums::{RType, Schema}, + rtype_dispatch, rtype_ts_out_dispatch, schema_method_dispatch, Error, Result, +}; + +/// Type for encoding files and streams of DBN records in CSV. +/// +/// Note that encoding [`Metadata`](crate::Metadata) in CSV is not supported. +pub struct Encoder +where + W: io::Write, +{ + writer: csv::Writer, + use_pretty_px: bool, + use_pretty_ts: bool, +} + +impl Encoder +where + W: io::Write, +{ + /// Creates a new [`Encoder`] that will write to `writer`. If `use_pretty_px` + /// is `true`, price fields will be serialized as a decimal. If `pretty_ts` is + /// `true`, timestamp fields will be serialized in a ISO8601 datetime string. + pub fn new(writer: W, use_pretty_px: bool, use_pretty_ts: bool) -> Self { + let csv_writer = csv::WriterBuilder::new() + .has_headers(false) // need to write our own custom header + .from_writer(writer); + Self { + writer: csv_writer, + use_pretty_px, + use_pretty_ts, + } + } + + /// Returns a reference to the underlying writer. + pub fn get_ref(&self) -> &W { + self.writer.get_ref() + } + + /// Encodes the CSV header for the record type `R`, i.e. the names of each of the + /// fields to the output. + /// + /// If `with_symbol` is `true`, will add a header field for "symbol". This should + /// only be used with [`Self::encode_record_with_sym()`] and + /// [`Self::encode_ref_with_sym()`], otherwise there will be a mismatch between the + /// number of fields in the header and the body. + /// + /// # Errors + /// This function returns an error if there's an error writing to `writer`. + pub fn encode_header(&mut self, with_symbol: bool) -> Result<()> { + R::serialize_header(&mut self.writer)?; + if with_symbol { + self.writer.write_field("symbol")?; + } + // end of line + self.writer.write_record(None::<&[u8]>)?; + Ok(()) + } + + /// Encodes the CSV header for `schema`, i.e. the names of each of the fields to + /// the output. + /// + /// If `with_symbol` is `true`, will add a header field for "symbol". This should + /// only be used with [`Self::encode_record_with_sym()`] and + /// [`Self::encode_ref_with_sym()`], otherwise there will be a mismatch between the + /// number of fields in the header and the body. + /// + /// # Errors + /// This function returns an error if there's an error writing to `writer`. + pub fn encode_header_for_schema(&mut self, schema: Schema, with_symbol: bool) -> Result<()> { + schema_method_dispatch!(schema, self, encode_header, with_symbol) + } + + fn encode_record_impl(&mut self, record: &R) -> csv::Result<()> { + match (self.use_pretty_px, self.use_pretty_ts) { + (true, true) => record.serialize_to::<_, true, true>(&mut self.writer), + (true, false) => record.serialize_to::<_, true, false>(&mut self.writer), + (false, true) => record.serialize_to::<_, false, true>(&mut self.writer), + (false, false) => record.serialize_to::<_, false, false>(&mut self.writer), + } + } + + fn encode_symbol(&mut self, symbol: Option<&str>) -> csv::Result<()> { + self.writer.write_field(symbol.unwrap_or_default()) + } +} + +impl EncodeRecord for Encoder +where + W: io::Write, +{ + fn encode_record(&mut self, record: &R) -> Result<()> { + match self + .encode_record_impl(record) + // write new line + .and_then(|_| self.writer.write_record(None::<&[u8]>)) + { + Ok(()) => Ok(()), + Err(e) => Err(match e.into_kind() { + csv::ErrorKind::Io(err) => Error::io(err, format!("serializing {record:?}")), + e => Error::encode(format!("Failed to serialize {record:?}: {e:?}")), + }), + } + } + + fn flush(&mut self) -> Result<()> { + self.writer + .flush() + .map_err(|e| Error::io(e, "flushing output")) + } +} + +impl EncodeRecordRef for Encoder +where + W: io::Write, +{ + fn encode_record_ref(&mut self, record: crate::RecordRef) -> Result<()> { + #[allow(clippy::redundant_closure_call)] + rtype_dispatch!(record, |rec| self.encode_record(rec))? + } + + unsafe fn encode_record_ref_ts_out( + &mut self, + record: crate::RecordRef, + ts_out: bool, + ) -> Result<()> { + #[allow(clippy::redundant_closure_call)] + rtype_ts_out_dispatch!(record, ts_out, |rec| self.encode_record(rec))? + } +} + +impl EncodeDbn for Encoder +where + W: io::Write, +{ + fn encode_records(&mut self, records: &[R]) -> Result<()> { + self.encode_header::(false)?; + for record in records { + self.encode_record(record)?; + } + self.flush()?; + Ok(()) + } + + /// Encodes a stream of DBN records. + /// + /// # Errors + /// This function returns an error if it's unable to write to the underlying writer + /// or there's a serialization error. + fn encode_stream( + &mut self, + mut stream: impl StreamingIterator, + ) -> Result<()> { + self.encode_header::(false)?; + while let Some(record) = stream.next() { + self.encode_record(record)?; + } + self.flush()?; + Ok(()) + } + + /// Encode DBN records directly from a DBN decoder. This implemented outside + /// [`EncodeDbn`] because the CSV encoder has the additional constraint of only + /// being able to encode a single schema in a stream. + /// + /// # Errors + /// This function returns an error if it's unable to write to the underlying writer + /// or there's a serialization error. + fn encode_decoded(&mut self, mut decoder: D) -> Result<()> { + let ts_out = decoder.metadata().ts_out; + if let Some(schema) = decoder.metadata().schema { + schema_method_dispatch!(schema, self, encode_header, false)?; + let rtype = RType::from(schema); + while let Some(record) = decoder.decode_record_ref()? { + if record.rtype().map_or(true, |r| r != rtype) { + return Err(Error::encode(format!("Schema indicated {rtype:?}, but found record with rtype {:?}. Mixed schemas cannot be encoded in CSV.", record.rtype()))); + } + // Safety: It's safe to cast to `WithTsOut` because we're passing in the `ts_out` + // from the metadata header. + unsafe { self.encode_record_ref_ts_out(record, ts_out) }?; + } + self.flush()?; + Ok(()) + } else { + Err(Error::encode("Can't encode a CSV with mixed schemas")) + } + } + + fn encode_decoded_with_limit( + &mut self, + mut decoder: D, + limit: NonZeroU64, + ) -> Result<()> { + let ts_out = decoder.metadata().ts_out; + if let Some(schema) = decoder.metadata().schema { + schema_method_dispatch!(schema, self, encode_header, false)?; + let rtype = RType::from(schema); + let mut i = 0; + while let Some(record) = decoder.decode_record_ref()? { + if record.rtype().map_or(true, |r| r != rtype) { + return Err(Error::encode(format!("Schema indicated {rtype:?}, but found record with rtype {:?}. Mixed schemas cannot be encoded in CSV.", record.rtype()))); + } + // Safety: It's safe to cast to `WithTsOut` because we're passing in the `ts_out` + // from the metadata header. + unsafe { self.encode_record_ref_ts_out(record, ts_out) }?; + i += 1; + if i == limit.get() { + break; + } + } + self.flush()?; + Ok(()) + } else { + Err(Error::encode("Can't encode a CSV with mixed schemas")) + } + } +} + +impl EncodeRecordTextExt for Encoder +where + W: io::Write, +{ + fn encode_record_with_sym( + &mut self, + record: &R, + symbol: Option<&str>, + ) -> Result<()> { + match self + .encode_record_impl(record) + .and_then(|_| self.encode_symbol(symbol)) + // write new line + .and_then(|_| self.writer.write_record(None::<&[u8]>)) + { + Ok(()) => Ok(()), + Err(e) => Err(match e.into_kind() { + csv::ErrorKind::Io(err) => Error::io(err, format!("serializing {record:?}")), + e => Error::encode(format!("Failed to serialize {record:?}: {e:?}")), + }), + } + } +} + +#[cfg(test)] +mod tests { + use std::{array, io::BufWriter, os::raw::c_char}; + + use super::*; + use crate::{ + encode::test_data::{VecStream, BID_ASK, RECORD_HEADER}, + enums::{ + rtype, InstrumentClass, SecurityUpdateAction, StatType, StatUpdateAction, + UserDefinedInstrument, + }, + record::{ + str_to_c_chars, ImbalanceMsg, InstrumentDefMsg, MboMsg, Mbp10Msg, Mbp1Msg, OhlcvMsg, + RecordHeader, StatMsg, StatusMsg, TradeMsg, WithTsOut, + }, + RecordRef, FIXED_PRICE_SCALE, + }; + + const HEADER_CSV: &str = "1658441851000000000,4,1,323"; + + const BID_ASK_CSV: &str = "372000000000000,372500000000000,10,5,5,2"; + + fn extract_2nd_line(buffer: Vec) -> String { + let output = String::from_utf8(buffer).expect("valid UTF-8"); + let (first, second) = output.split_once('\n').expect("two lines"); + assert!(!first.trim().is_empty()); + second + .trim_end() // remove newline + .to_owned() + } + + #[test] + fn test_mbo_encode_stream() { + let data = vec![MboMsg { + hd: RECORD_HEADER, + order_id: 16, + price: 5500, + size: 3, + flags: 128, + channel_id: 14, + action: 'B' as c_char, + side: 'B' as c_char, + ts_recv: 1658441891000000000, + ts_in_delta: 22_000, + sequence: 1_002_375, + }]; + let mut buffer = Vec::new(); + let writer = BufWriter::new(&mut buffer); + Encoder::new(writer, false, false) + .encode_stream(VecStream::new(data)) + .unwrap(); + let line = extract_2nd_line(buffer); + assert_eq!( + line, + format!("1658441891000000000,{HEADER_CSV},B,B,5500,3,14,16,128,22000,1002375") + ); + } + + #[test] + fn test_mbp1_encode_records() { + let data = vec![Mbp1Msg { + hd: RECORD_HEADER, + price: 5500, + size: 3, + action: 'M' as c_char, + side: 'A' as c_char, + flags: 128, + depth: 9, + ts_recv: 1658441891000000000, + ts_in_delta: 22_000, + sequence: 1_002_375, + levels: [BID_ASK], + }]; + let mut buffer = Vec::new(); + let writer = BufWriter::new(&mut buffer); + Encoder::new(writer, false, false) + .encode_records(data.as_slice()) + .unwrap(); + let line = extract_2nd_line(buffer); + assert_eq!( + line, + format!( + "1658441891000000000,{HEADER_CSV},M,A,9,5500,3,128,22000,1002375,{BID_ASK_CSV}" + ) + ); + } + + #[test] + fn test_mbp10_encode_stream() { + let data = vec![Mbp10Msg { + hd: RECORD_HEADER, + price: 5500, + size: 3, + action: 'B' as c_char, + side: 'A' as c_char, + flags: 128, + depth: 9, + ts_recv: 1658441891000000000, + ts_in_delta: 22_000, + sequence: 1_002_375, + levels: array::from_fn(|_| BID_ASK.clone()), + }]; + let mut buffer = Vec::new(); + let writer = BufWriter::new(&mut buffer); + Encoder::new(writer, false, false) + .encode_stream(VecStream::new(data)) + .unwrap(); + let line = extract_2nd_line(buffer); + assert_eq!( + line, + format!("1658441891000000000,{HEADER_CSV},B,A,9,5500,3,128,22000,1002375,{BID_ASK_CSV},{BID_ASK_CSV},{BID_ASK_CSV},{BID_ASK_CSV},{BID_ASK_CSV},{BID_ASK_CSV},{BID_ASK_CSV},{BID_ASK_CSV},{BID_ASK_CSV},{BID_ASK_CSV}") + ); + } + + #[test] + fn test_trade_encode_records() { + let data = vec![TradeMsg { + hd: RECORD_HEADER, + price: 5500, + size: 3, + action: 'B' as c_char, + side: 'B' as c_char, + flags: 128, + depth: 9, + ts_recv: 1658441891000000000, + ts_in_delta: 22_000, + sequence: 1_002_375, + }]; + let mut buffer = Vec::new(); + let writer = BufWriter::new(&mut buffer); + Encoder::new(writer, false, false) + .encode_records(data.as_slice()) + .unwrap(); + let line = extract_2nd_line(buffer); + assert_eq!( + line, + format!("1658441891000000000,{HEADER_CSV},B,B,9,5500,3,128,22000,1002375") + ); + } + + #[test] + fn test_ohlcv_encode_stream() { + let data = vec![OhlcvMsg { + hd: RECORD_HEADER, + open: 5000, + high: 8000, + low: 3000, + close: 6000, + volume: 55_000, + }]; + let mut buffer = Vec::new(); + let writer = BufWriter::new(&mut buffer); + Encoder::new(writer, false, false) + .encode_stream(VecStream::new(data)) + .unwrap(); + let line = extract_2nd_line(buffer); + assert_eq!(line, format!("{HEADER_CSV},5000,8000,3000,6000,55000")); + } + + #[test] + fn test_status_encode_records() { + let mut group = [0; 21]; + for (i, c) in "group".chars().enumerate() { + group[i] = c as c_char; + } + let data = vec![StatusMsg { + hd: RECORD_HEADER, + ts_recv: 1658441891000000000, + group, + trading_status: 3, + halt_reason: 4, + trading_event: 6, + }]; + let mut buffer = Vec::new(); + let writer = BufWriter::new(&mut buffer); + Encoder::new(writer, false, false) + .encode_records(data.as_slice()) + .unwrap(); + let line = extract_2nd_line(buffer); + assert_eq!( + line, + format!("{HEADER_CSV},1658441891000000000,group,3,4,6") + ); + } + + #[test] + fn test_instrument_def_encode_stream() { + let data = vec![InstrumentDefMsg { + hd: RECORD_HEADER, + ts_recv: 1658441891000000000, + min_price_increment: 100, + display_factor: 1000, + expiration: 1698450000000000000, + activation: 1697350000000000000, + high_limit_price: 1_000_000, + low_limit_price: -1_000_000, + max_price_variation: 0, + trading_reference_price: 500_000, + unit_of_measure_qty: 5, + min_price_increment_amount: 5, + price_ratio: 10, + inst_attrib_value: 10, + underlying_id: 256785, + raw_instrument_id: RECORD_HEADER.instrument_id, + market_depth_implied: 0, + market_depth: 13, + market_segment_id: 0, + max_trade_vol: 10_000, + min_lot_size: 1, + min_lot_size_block: 1000, + min_lot_size_round_lot: 100, + min_trade_vol: 1, + contract_multiplier: 0, + decay_quantity: 0, + original_contract_size: 0, + trading_reference_date: 0, + appl_id: 0, + maturity_year: 0, + decay_start_date: 0, + channel_id: 4, + currency: str_to_c_chars("USD").unwrap(), + settl_currency: str_to_c_chars("USD").unwrap(), + secsubtype: Default::default(), + raw_symbol: str_to_c_chars("ESZ4 C4100").unwrap(), + group: str_to_c_chars("EW").unwrap(), + exchange: str_to_c_chars("XCME").unwrap(), + asset: str_to_c_chars("ES").unwrap(), + cfi: str_to_c_chars("OCAFPS").unwrap(), + security_type: str_to_c_chars("OOF").unwrap(), + unit_of_measure: str_to_c_chars("IPNT").unwrap(), + underlying: str_to_c_chars("ESZ4").unwrap(), + strike_price_currency: str_to_c_chars("USD").unwrap(), + instrument_class: InstrumentClass::Call as u8 as c_char, + strike_price: 4_100_000_000_000, + match_algorithm: 'F' as c_char, + md_security_trading_status: 2, + main_fraction: 4, + price_display_format: 8, + settl_price_type: 9, + sub_fraction: 23, + underlying_product: 10, + security_update_action: SecurityUpdateAction::Add, + maturity_month: 8, + maturity_day: 9, + maturity_week: 11, + user_defined_instrument: UserDefinedInstrument::No, + contract_multiplier_unit: 0, + flow_schedule_type: 5, + tick_rule: 0, + _reserved2: Default::default(), + _reserved3: Default::default(), + _reserved4: Default::default(), + _reserved5: Default::default(), + _dummy: [0; 3], + }]; + let mut buffer = Vec::new(); + let writer = BufWriter::new(&mut buffer); + Encoder::new(writer, false, false) + .encode_stream(VecStream::new(data)) + .unwrap(); + let line = extract_2nd_line(buffer); + assert_eq!(line, format!("1658441891000000000,{HEADER_CSV},ESZ4 C4100,A,C,100,1000,1698450000000000000,1697350000000000000,1000000,-1000000,0,500000,5,5,10,10,256785,323,0,13,0,10000,1,1000,100,1,0,0,0,0,0,0,0,4,USD,USD,,EW,XCME,ES,OCAFPS,OOF,IPNT,ESZ4,USD,4100000000000,F,2,4,8,9,23,10,8,9,11,N,0,5,0")); + } + + #[test] + fn test_encode_with_ts_out() { + let data = vec![WithTsOut { + rec: TradeMsg { + hd: RECORD_HEADER, + price: 5500, + size: 3, + action: 'T' as c_char, + side: 'A' as c_char, + flags: 128, + depth: 9, + ts_recv: 1658441891000000000, + ts_in_delta: 22_000, + sequence: 1_002_375, + }, + ts_out: 1678480044000000000, + }]; + let mut buffer = Vec::new(); + let writer = BufWriter::new(&mut buffer); + Encoder::new(writer, false, false) + .encode_records(data.as_slice()) + .unwrap(); + let lines = String::from_utf8(buffer).expect("valid UTF-8"); + assert_eq!( + lines, + format!("ts_recv,ts_event,rtype,publisher_id,instrument_id,action,side,depth,price,size,flags,ts_in_delta,sequence,ts_out\n1658441891000000000,{HEADER_CSV},T,A,9,5500,3,128,22000,1002375,1678480044000000000\n") + ); + } + + #[test] + fn test_imbalance_encode_records() { + let data = vec![ImbalanceMsg { + hd: RECORD_HEADER, + ts_recv: 1, + ref_price: 2, + auction_time: 3, + cont_book_clr_price: 4, + auct_interest_clr_price: 5, + ssr_filling_price: 6, + ind_match_price: 7, + upper_collar: 8, + lower_collar: 9, + paired_qty: 10, + total_imbalance_qty: 11, + market_imbalance_qty: 12, + unpaired_qty: 13, + auction_type: 'B' as c_char, + side: 'A' as c_char, + auction_status: 14, + freeze_status: 15, + num_extensions: 16, + unpaired_side: 'A' as c_char, + significant_imbalance: 'N' as c_char, + _dummy: [0], + }]; + let mut buffer = Vec::new(); + let writer = BufWriter::new(&mut buffer); + Encoder::new(writer, false, false) + .encode_records(data.as_slice()) + .unwrap(); + let line = extract_2nd_line(buffer); + assert_eq!( + line, + format!("1,{HEADER_CSV},2,3,4,5,6,7,8,9,10,11,12,13,B,A,14,15,16,A,N") + ); + } + + #[test] + fn test_stat_encode_stream() { + let data = vec![StatMsg { + hd: RECORD_HEADER, + ts_recv: 1, + ts_ref: 2, + price: 3, + quantity: 0, + sequence: 4, + ts_in_delta: 5, + stat_type: StatType::OpeningPrice as u16, + channel_id: 7, + update_action: StatUpdateAction::New as u8, + stat_flags: 0, + _dummy: Default::default(), + }]; + let mut buffer = Vec::new(); + let writer = BufWriter::new(&mut buffer); + Encoder::new(writer, false, false) + .encode_stream(VecStream::new(data)) + .unwrap(); + let line = extract_2nd_line(buffer); + assert_eq!(line, format!("1,{HEADER_CSV},2,3,0,4,5,1,7,1,0")); + } + + #[test] + fn test_encode_ref_with_sym() { + let mut buffer = Vec::new(); + const BAR: OhlcvMsg = OhlcvMsg { + hd: RecordHeader::new::(rtype::OHLCV_1H, 10, 9, 0), + open: 175 * FIXED_PRICE_SCALE, + high: 177 * FIXED_PRICE_SCALE, + low: 174 * FIXED_PRICE_SCALE, + close: 175 * FIXED_PRICE_SCALE, + volume: 4033445, + }; + let rec_ref = unsafe { RecordRef::unchecked_from_header(&BAR.hd as *const RecordHeader) }; + let mut encoder = Encoder::new(&mut buffer, false, false); + encoder.encode_ref_with_sym(rec_ref, None).unwrap(); + encoder.encode_ref_with_sym(rec_ref, Some("AAPL")).unwrap(); + drop(encoder); + let res = String::from_utf8(buffer).unwrap(); + assert_eq!( + res, + "0,34,10,9,175000000000,177000000000,174000000000,175000000000,4033445,\n\ + 0,34,10,9,175000000000,177000000000,174000000000,175000000000,4033445,AAPL\n" + ); + } + + #[test] + fn test_encode_header_for_schema() { + let mut buffer = Vec::new(); + { + let mut encoder = Encoder::new(&mut buffer, false, false); + encoder + .encode_header_for_schema(Schema::Statistics, false) + .unwrap(); + } + { + let mut encoder = Encoder::new(&mut buffer, false, false); + encoder + .encode_header_for_schema(Schema::Statistics, true) + .unwrap(); + } + + let res = String::from_utf8(buffer).unwrap(); + let (fst_line, snd_line) = res.split_once('\n').unwrap(); + assert!(snd_line.ends_with(",symbol\n")); + let orig_header = snd_line.split_once(",symbol").unwrap().0; + assert_eq!(fst_line, orig_header); + } +} diff --git a/rust/dbn/src/encode/dbn/sync.rs b/rust/dbn/src/encode/dbn/sync.rs index 27b9a83..1e3ec7d 100644 --- a/rust/dbn/src/encode/dbn/sync.rs +++ b/rust/dbn/src/encode/dbn/sync.rs @@ -78,6 +78,10 @@ impl EncodeRecordRef for Encoder where W: io::Write, { + fn encode_record_ref(&mut self, record: RecordRef) -> Result<()> { + self.record_encoder.encode_record_ref(record) + } + /// Encodes a single DBN record. /// /// # Safety @@ -87,8 +91,8 @@ where /// # Errors /// This function will return an error if it fails to encode `record` to /// `writer`. - unsafe fn encode_record_ref(&mut self, record: RecordRef, ts_out: bool) -> Result<()> { - self.record_encoder.encode_record_ref(record, ts_out) + unsafe fn encode_record_ref_ts_out(&mut self, record: RecordRef, ts_out: bool) -> Result<()> { + self.record_encoder.encode_record_ref_ts_out(record, ts_out) } } @@ -372,6 +376,13 @@ impl EncodeRecordRef for RecordEncoder where W: io::Write, { + fn encode_record_ref(&mut self, record: RecordRef) -> Result<()> { + match self.writer.write_all(record.as_ref()) { + Ok(()) => Ok(()), + Err(e) => Err(Error::io(e, format!("serializing {record:?}"))), + } + } + /// Encodes a single DBN record. /// /// # Safety @@ -381,11 +392,8 @@ where /// # Errors /// This function will return an error if it fails to encode `record` to /// `writer`. - unsafe fn encode_record_ref(&mut self, record: RecordRef, _ts_out: bool) -> Result<()> { - match self.writer.write_all(record.as_ref()) { - Ok(()) => Ok(()), - Err(e) => Err(Error::io(e, format!("serializing {record:?}"))), - } + unsafe fn encode_record_ref_ts_out(&mut self, record: RecordRef, _ts_out: bool) -> Result<()> { + self.encode_record_ref(record) } } diff --git a/rust/dbn/src/encode/json/serialize.rs b/rust/dbn/src/encode/json/serialize.rs index 25db34e..aaa474d 100644 --- a/rust/dbn/src/encode/json/serialize.rs +++ b/rust/dbn/src/encode/json/serialize.rs @@ -29,6 +29,29 @@ pub fn to_json_string( res } +/// Serializes `obj` to a JSON string with an optional `symbol`. +pub fn to_json_string_with_sym( + obj: &T, + should_pretty_print: bool, + use_pretty_px: bool, + use_pretty_ts: bool, + symbol: Option<&str>, +) -> String { + let mut res = String::new(); + if should_pretty_print { + let mut pretty = pretty_writer(&mut res); + let mut writer = JsonObjectWriter::new(&mut pretty); + to_json_with_writer(obj, &mut writer, use_pretty_px, use_pretty_ts); + writer.value("symbol", symbol); + } else { + let mut writer = JsonObjectWriter::new(&mut res); + to_json_with_writer(obj, &mut writer, use_pretty_px, use_pretty_ts); + writer.value("symbol", symbol); + } + res.push('\n'); + res +} + fn to_json_with_writer( obj: &T, writer: &mut JsonObjectWriter, diff --git a/rust/dbn/src/encode/json/sync.rs b/rust/dbn/src/encode/json/sync.rs index 0418ddb..2b26d7e 100644 --- a/rust/dbn/src/encode/json/sync.rs +++ b/rust/dbn/src/encode/json/sync.rs @@ -1,9 +1,9 @@ use std::io; -use super::serialize::to_json_string; +use super::serialize::{to_json_string, to_json_string_with_sym}; use crate::{ - encode::{DbnEncodable, EncodeDbn, EncodeRecord, EncodeRecordRef}, - rtype_ts_out_dispatch, Error, Metadata, Result, + encode::{DbnEncodable, EncodeDbn, EncodeRecord, EncodeRecordRef, EncodeRecordTextExt}, + rtype_dispatch, rtype_ts_out_dispatch, Error, Metadata, Result, }; /// Type for encoding files and streams of DBN records in newline-delimited JSON (ndjson). @@ -94,7 +94,16 @@ impl EncodeRecordRef for Encoder where W: io::Write, { - unsafe fn encode_record_ref(&mut self, record: crate::RecordRef, ts_out: bool) -> Result<()> { + fn encode_record_ref(&mut self, record: crate::RecordRef) -> Result<()> { + #[allow(clippy::redundant_closure_call)] + rtype_dispatch!(record, |rec| self.encode_record(rec))? + } + + unsafe fn encode_record_ref_ts_out( + &mut self, + record: crate::RecordRef, + ts_out: bool, + ) -> Result<()> { #[allow(clippy::redundant_closure_call)] rtype_ts_out_dispatch!(record, ts_out, |rec| self.encode_record(rec))? } @@ -102,6 +111,29 @@ where impl EncodeDbn for Encoder where W: io::Write {} +impl EncodeRecordTextExt for Encoder +where + W: io::Write, +{ + fn encode_record_with_sym( + &mut self, + record: &R, + symbol: Option<&str>, + ) -> Result<()> { + let json = to_json_string_with_sym( + record, + self.should_pretty_print, + self.use_pretty_px, + self.use_pretty_ts, + symbol, + ); + match self.writer.write_all(json.as_bytes()) { + Ok(()) => Ok(()), + Err(e) => Err(Error::io(e, "writing record")), + } + } +} + #[cfg(test)] mod tests { use std::{array, io::BufWriter, num::NonZeroU64, os::raw::c_char}; @@ -114,14 +146,14 @@ mod tests { test_data::{VecStream, BID_ASK, RECORD_HEADER}, }, enums::{ - InstrumentClass, SType, Schema, SecurityUpdateAction, StatType, StatUpdateAction, - UserDefinedInstrument, + rtype, InstrumentClass, SType, Schema, SecurityUpdateAction, StatType, + StatUpdateAction, UserDefinedInstrument, }, record::{ str_to_c_chars, ErrorMsg, ImbalanceMsg, InstrumentDefMsg, MboMsg, Mbp10Msg, Mbp1Msg, - OhlcvMsg, StatMsg, StatusMsg, TradeMsg, WithTsOut, + OhlcvMsg, RecordHeader, StatMsg, StatusMsg, TradeMsg, WithTsOut, }, - MappingInterval, SymbolMapping, + MappingInterval, RecordRef, SymbolMapping, FIXED_PRICE_SCALE, }; fn write_json_to_string( @@ -134,10 +166,14 @@ mod tests { R: DbnEncodable, { let mut buffer = Vec::new(); - let writer = BufWriter::new(&mut buffer); - Encoder::new(writer, should_pretty_print, use_pretty_px, use_pretty_ts) - .encode_records(records) - .unwrap(); + Encoder::new( + &mut buffer, + should_pretty_print, + use_pretty_px, + use_pretty_ts, + ) + .encode_records(records) + .unwrap(); String::from_utf8(buffer).expect("valid UTF-8") } @@ -595,4 +631,27 @@ mod tests { drop(writer); assert_eq!(buf, r#"{"test":null}"#); } + + #[test] + fn test_encode_ref_with_sym() { + let mut buffer = Vec::new(); + const BAR: OhlcvMsg = OhlcvMsg { + hd: RecordHeader::new::(rtype::OHLCV_1H, 10, 9, 0), + open: 175 * FIXED_PRICE_SCALE, + high: 177 * FIXED_PRICE_SCALE, + low: 174 * FIXED_PRICE_SCALE, + close: 175 * FIXED_PRICE_SCALE, + volume: 4033445, + }; + let rec_ref = unsafe { RecordRef::unchecked_from_header(&BAR.hd as *const RecordHeader) }; + let mut encoder = Encoder::new(&mut buffer, false, false, false); + encoder.encode_ref_with_sym(rec_ref, None).unwrap(); + encoder.encode_ref_with_sym(rec_ref, Some("AAPL")).unwrap(); + let res = String::from_utf8(buffer).unwrap(); + assert_eq!( + res, + "{\"hd\":{\"ts_event\":\"0\",\"rtype\":34,\"publisher_id\":10,\"instrument_id\":9},\"open\":\"175000000000\",\"high\":\"177000000000\",\"low\":\"174000000000\",\"close\":\"175000000000\",\"volume\":\"4033445\",\"symbol\":null}\n\ + {\"hd\":{\"ts_event\":\"0\",\"rtype\":34,\"publisher_id\":10,\"instrument_id\":9},\"open\":\"175000000000\",\"high\":\"177000000000\",\"low\":\"174000000000\",\"close\":\"175000000000\",\"volume\":\"4033445\",\"symbol\":\"AAPL\"}\n", + ); + } } diff --git a/rust/dbn/src/lib.rs b/rust/dbn/src/lib.rs index 33e9df7..2e84da3 100644 --- a/rust/dbn/src/lib.rs +++ b/rust/dbn/src/lib.rs @@ -40,8 +40,7 @@ pub mod decode; pub mod encode; pub mod enums; pub mod error; -#[doc(hidden)] -pub mod json_writer; +mod json_writer; pub mod macros; pub mod metadata; pub mod pretty; diff --git a/rust/dbn/src/record_ref.rs b/rust/dbn/src/record_ref.rs index 02088ee..9d3100d 100644 --- a/rust/dbn/src/record_ref.rs +++ b/rust/dbn/src/record_ref.rs @@ -9,7 +9,7 @@ use crate::{ /// A wrapper around a non-owning immutable reference to a DBN record. This wrapper /// allows for mixing of record types and schemas, and runtime record polymorphism. -#[derive(Clone, Debug)] +#[derive(Copy, Clone, Debug)] pub struct RecordRef<'a> { ptr: NonNull, /// Associates the object with the lifetime of the memory pointed to by `ptr`. From 8523a2dec56193ac1fc3e0905bc986915939c7b8 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Thu, 14 Sep 2023 17:07:12 -0500 Subject: [PATCH 03/11] ADD: Add from_file methods to async decoder --- CHANGELOG.md | 2 ++ rust/dbn/Cargo.toml | 2 +- rust/dbn/src/decode/dbn/async.rs | 51 ++++++++++++++++++++++++++++++-- 3 files changed, 51 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 870322a..294f522 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ record fields, matching the behavior of `map_symbols` in the historical API - Added `encode_header` and `encode_header_for_schema` methods to `CsvEncoder` to give more flexibility for encoding CSV headers +- Added `from_file` and `from_zstd_file` functions to `AsyncDbnDecoder` to match + synchronous decoder - Implemented `Copy` for `RecordRef` to make it behave more like a reference - Added `ARCX.PILLAR.ARCX` publisher diff --git a/rust/dbn/Cargo.toml b/rust/dbn/Cargo.toml index b43fa36..0ba30da 100644 --- a/rust/dbn/Cargo.toml +++ b/rust/dbn/Cargo.toml @@ -48,7 +48,7 @@ thiserror = "1.0" # date and datetime support time = { version = "0.3", features = ["formatting", "macros"] } # async traits -tokio = { version = "1", features = ["io-util"], optional = true } +tokio = { version = "1", features = ["fs", "io-util"], optional = true } # (de)compression zstd = "0.12" diff --git a/rust/dbn/src/decode/dbn/async.rs b/rust/dbn/src/decode/dbn/async.rs index 429beb8..2985627 100644 --- a/rust/dbn/src/decode/dbn/async.rs +++ b/rust/dbn/src/decode/dbn/async.rs @@ -1,5 +1,10 @@ +use std::path::Path; + use async_compression::tokio::bufread::ZstdDecoder; -use tokio::io::{self, BufReader}; +use tokio::{ + fs::File, + io::{self, BufReader}, +}; use crate::{ decode::FromLittleEndianSlice, @@ -87,7 +92,7 @@ where } } -impl<'a, R> Decoder>> +impl Decoder>> where R: io::AsyncReadExt + Unpin, { @@ -100,7 +105,7 @@ where } } -impl<'a, R> Decoder> +impl Decoder> where R: io::AsyncBufReadExt + Unpin, { @@ -113,6 +118,46 @@ where } } +impl Decoder> { + /// Creates a new async DBN [`Decoder`] from the file at `path`. + /// + /// # Errors + /// This function will return an error if it is unable to read the file at `path` or + /// if it is unable to parse the metadata in the file. + pub async fn from_file(path: impl AsRef) -> crate::Result { + let file = File::open(path.as_ref()).await.map_err(|e| { + crate::Error::io( + e, + format!( + "Error opening DBN file at path '{}'", + path.as_ref().display() + ), + ) + })?; + Self::new(BufReader::new(file)).await + } +} + +impl Decoder>> { + /// Creates a new async DBN [`Decoder`] from the Zstandard-compressed file at `path`. + /// + /// # Errors + /// This function will return an error if it is unable to read the file at `path` or + /// if it is unable to parse the metadata in the file. + pub async fn from_zstd_file(path: impl AsRef) -> crate::Result { + let file = File::open(path.as_ref()).await.map_err(|e| { + crate::Error::io( + e, + format!( + "Error opening Zstandard-compressed DBN file at path '{}'", + path.as_ref().display() + ), + ) + })?; + Self::with_zstd(file).await + } +} + /// An async decoder for files and streams of Databento Binary Encoding (DBN) records. pub struct RecordDecoder where From cd2fe342b4cd91e60f021450572a734f68320087 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Tue, 19 Sep 2023 09:08:21 -0500 Subject: [PATCH 04/11] ADD: Add `AsyncDbnEncoder` --- CHANGELOG.md | 1 + rust/dbn/src/encode/dbn.rs | 5 +- rust/dbn/src/encode/dbn/async.rs | 95 ++++++++++++++++++++++++++++--- rust/dbn/src/encode/json/async.rs | 11 ++++ 4 files changed, 102 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 294f522..977e6b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - Added `from_file` and `from_zstd_file` functions to `AsyncDbnDecoder` to match synchronous decoder - Implemented `Copy` for `RecordRef` to make it behave more like a reference +- Added `AsyncDbnEncoder` for simpler DBN encoding and to match sync API - Added `ARCX.PILLAR.ARCX` publisher ## Breaking changes diff --git a/rust/dbn/src/encode/dbn.rs b/rust/dbn/src/encode/dbn.rs index 910aac1..56a177b 100644 --- a/rust/dbn/src/encode/dbn.rs +++ b/rust/dbn/src/encode/dbn.rs @@ -5,4 +5,7 @@ pub use sync::{Encoder, MetadataEncoder, RecordEncoder}; #[cfg(feature = "async")] mod r#async; #[cfg(feature = "async")] -pub use r#async::{MetadataEncoder as AsyncMetadataEncoder, RecordEncoder as AsyncRecordEncoder}; +pub use r#async::{ + Encoder as AsyncEncoder, MetadataEncoder as AsyncMetadataEncoder, + RecordEncoder as AsyncRecordEncoder, +}; diff --git a/rust/dbn/src/encode/dbn/async.rs b/rust/dbn/src/encode/dbn/async.rs index 417583f..48c7197 100644 --- a/rust/dbn/src/encode/dbn/async.rs +++ b/rust/dbn/src/encode/dbn/async.rs @@ -1,13 +1,73 @@ -use std::{fmt, num::NonZeroU64}; +use std::num::NonZeroU64; use async_compression::tokio::write::ZstdEncoder; use tokio::io; use crate::{ - record::HasRType, record_ref::RecordRef, Error, Metadata, Result, SymbolMapping, DBN_VERSION, - NULL_LIMIT, NULL_RECORD_COUNT, NULL_SCHEMA, NULL_STYPE, UNDEF_TIMESTAMP, + encode::DbnEncodable, record_ref::RecordRef, Error, Metadata, Result, SymbolMapping, + DBN_VERSION, NULL_LIMIT, NULL_RECORD_COUNT, NULL_SCHEMA, NULL_STYPE, UNDEF_TIMESTAMP, }; +/// An async encoder for DBN streams. +pub struct Encoder +where + W: io::AsyncWriteExt + Unpin, +{ + record_encoder: RecordEncoder, +} + +impl Encoder +where + W: io::AsyncWriteExt + Unpin, +{ + /// Creates a new async DBN encoder that will write to `writer`. + /// + /// # Errors + /// This function will return an error if it fails to encode `metadata` to + /// `writer`. + pub async fn new(mut writer: W, metadata: &Metadata) -> Result { + MetadataEncoder::new(&mut writer).encode(metadata).await?; + let record_encoder = RecordEncoder::new(writer); + Ok(Self { record_encoder }) + } + + /// Returns a reference to the underlying writer. + pub fn get_ref(&self) -> &W { + self.record_encoder.get_ref() + } + + /// Returns a mutable reference to the underlying writer. + pub fn get_mut(&mut self) -> &mut W { + self.record_encoder.get_mut() + } + + /// Encode a single DBN record of type `R`. + /// + /// # Errors + /// This function returns an error if it's unable to write to the underlying + /// writer. + pub async fn encode_record(&mut self, record: &R) -> Result<()> { + self.record_encoder.encode(record).await + } + + /// Encodes a single DBN [`RecordRef`]. + /// + /// # Errors + /// This function returns an error if it's unable to write to the underlying writer + /// or there's a serialization error. + pub async fn encode_record_ref(&mut self, record_ref: RecordRef<'_>) -> Result<()> { + self.record_encoder.encode_ref(record_ref).await + } + + /// Flushes any buffered content to the true output. + /// + /// # Errors + /// This function returns an error if it's unable to flush the underlying writer. + pub async fn flush(&mut self) -> Result<()> { + self.record_encoder.flush().await + } +} + /// An async encoder of DBN records. pub struct RecordEncoder where @@ -31,10 +91,7 @@ where /// # Errors /// This function returns an error if it's unable to write to the underlying /// writer. - pub async fn encode + HasRType + fmt::Debug>( - &mut self, - record: &R, - ) -> Result<()> { + pub async fn encode(&mut self, record: &R) -> Result<()> { match self.writer.write_all(record.as_ref()).await { Ok(()) => Ok(()), Err(e) => Err(Error::io(e, format!("serializing {record:?}"))), @@ -53,6 +110,22 @@ where } } + /// Flushes any buffered content to the true output. + /// + /// # Errors + /// This function returns an error if it's unable to flush the underlying writer. + pub async fn flush(&mut self) -> Result<()> { + self.writer + .flush() + .await + .map_err(|e| Error::io(e, "flushing output".to_owned())) + } + + /// Returns a reference to the underlying writer. + pub fn get_ref(&self) -> &W { + &self.writer + } + /// Returns a mutable reference to the underlying writer. pub fn get_mut(&mut self) -> &mut W { &mut self.writer @@ -281,7 +354,7 @@ mod tests { use super::*; use crate::{ datasets::{GLBX_MDP3, XNAS_ITCH}, - decode::{dbn::MetadataDecoder, FromLittleEndianSlice}, + decode::{dbn::AsyncMetadataDecoder as MetadataDecoder, FromLittleEndianSlice}, enums::{SType, Schema}, MappingInterval, MetadataBuilder, }; @@ -347,6 +420,7 @@ mod tests { dbg!(&buffer); let res = MetadataDecoder::new(&mut buffer.as_slice()) .decode() + .await .unwrap(); dbg!(&res, &metadata); assert_eq!(res, metadata); @@ -421,7 +495,10 @@ mod tests { .encode(&metadata) .await .unwrap(); - let decoded = MetadataDecoder::new(buffer.as_slice()).decode().unwrap(); + let decoded = MetadataDecoder::new(buffer.as_slice()) + .decode() + .await + .unwrap(); assert!(decoded.end.is_none()); assert!(decoded.limit.is_none()); } diff --git a/rust/dbn/src/encode/json/async.rs b/rust/dbn/src/encode/json/async.rs index 374ba3c..cb79986 100644 --- a/rust/dbn/src/encode/json/async.rs +++ b/rust/dbn/src/encode/json/async.rs @@ -104,6 +104,17 @@ where self.encode_record(rec).await })? } + + /// Flushes any buffered content to the true output. + /// + /// # Errors + /// This function returns an error if it's unable to flush the underlying writer. + pub async fn flush(&mut self) -> Result<()> { + self.writer + .flush() + .await + .map_err(|e| Error::io(e, "flushing output")) + } } #[cfg(test)] From 89a8c77e1ac5c0d1678ba51ab3d022c24c9d7840 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Wed, 20 Sep 2023 17:47:55 +0000 Subject: [PATCH 05/11] ADD: Add DBN Record enums --- CHANGELOG.md | 8 + rust/dbn/src/encode/csv/sync.rs | 2 +- rust/dbn/src/encode/json/sync.rs | 2 +- rust/dbn/src/lib.rs | 20 ++- rust/dbn/src/publishers.rs | 2 + rust/dbn/src/record.rs | 2 +- rust/dbn/src/record_enum.rs | 255 +++++++++++++++++++++++++++++++ rust/dbn/src/record_ref.rs | 74 ++++++++- 8 files changed, 354 insertions(+), 11 deletions(-) create mode 100644 rust/dbn/src/record_enum.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 977e6b8..6368b1f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,13 +11,21 @@ synchronous decoder - Implemented `Copy` for `RecordRef` to make it behave more like a reference - Added `AsyncDbnEncoder` for simpler DBN encoding and to match sync API +- Added `RecordEnum` and `RecordRefEnum` to more easily be able to pattern match on + records of different types - Added `ARCX.PILLAR.ARCX` publisher +- Added `From` DBN records for `RecordRef` +- Added re-exports to the top level of the crate for all enums and records for simpler + imports ## Breaking changes - Split `encode_record_ref` into a safe method with no arguments and an unsafe method with a `ts_out` parameter to reduce `unsafe` usage when not working with live data that may contain `ts_out` +## Bug fixes +- Fixed lifetime on return value from `RecordRef::get_unchecked` + ## 0.10.2 - 2023-09-12 ### Bug fixes - Fixed query range checking in `Metadata::symbol_map_for_date` diff --git a/rust/dbn/src/encode/csv/sync.rs b/rust/dbn/src/encode/csv/sync.rs index 7722d26..0524e52 100644 --- a/rust/dbn/src/encode/csv/sync.rs +++ b/rust/dbn/src/encode/csv/sync.rs @@ -614,7 +614,7 @@ mod tests { close: 175 * FIXED_PRICE_SCALE, volume: 4033445, }; - let rec_ref = unsafe { RecordRef::unchecked_from_header(&BAR.hd as *const RecordHeader) }; + let rec_ref = RecordRef::from(&BAR); let mut encoder = Encoder::new(&mut buffer, false, false); encoder.encode_ref_with_sym(rec_ref, None).unwrap(); encoder.encode_ref_with_sym(rec_ref, Some("AAPL")).unwrap(); diff --git a/rust/dbn/src/encode/json/sync.rs b/rust/dbn/src/encode/json/sync.rs index 2b26d7e..2ac4d42 100644 --- a/rust/dbn/src/encode/json/sync.rs +++ b/rust/dbn/src/encode/json/sync.rs @@ -643,7 +643,7 @@ mod tests { close: 175 * FIXED_PRICE_SCALE, volume: 4033445, }; - let rec_ref = unsafe { RecordRef::unchecked_from_header(&BAR.hd as *const RecordHeader) }; + let rec_ref = RecordRef::from(&BAR); let mut encoder = Encoder::new(&mut buffer, false, false, false); encoder.encode_ref_with_sym(rec_ref, None).unwrap(); encoder.encode_ref_with_sym(rec_ref, Some("AAPL")).unwrap(); diff --git a/rust/dbn/src/lib.rs b/rust/dbn/src/lib.rs index 2e84da3..c2d7d7c 100644 --- a/rust/dbn/src/lib.rs +++ b/rust/dbn/src/lib.rs @@ -44,16 +44,28 @@ mod json_writer; pub mod macros; pub mod metadata; pub mod pretty; -/// Enumerations for different data sources, venues, and publishers. pub mod publishers; #[cfg(feature = "python")] pub mod python; pub mod record; +mod record_enum; pub mod record_ref; -pub use crate::error::{Error, Result}; -pub use crate::metadata::{MappingInterval, Metadata, MetadataBuilder, SymbolMapping}; -pub use crate::record_ref::RecordRef; +pub use crate::{ + enums::{ + Action, Compression, Encoding, InstrumentClass, MatchAlgorithm, RType, SType, Schema, + SecurityUpdateAction, Side, StatType, StatUpdateAction, UserDefinedInstrument, + }, + error::{Error, Result}, + metadata::{MappingInterval, Metadata, MetadataBuilder, SymbolMapping}, + publishers::{Dataset, Publisher, Venue}, + record::{ + ErrorMsg, ImbalanceMsg, InstrumentDefMsg, MboMsg, Mbp10Msg, Mbp1Msg, OhlcvMsg, StatMsg, + StatusMsg, SymbolMappingMsg, SystemMsg, TbboMsg, TradeMsg, + }, + record_enum::{RecordEnum, RecordRefEnum}, + record_ref::RecordRef, +}; /// The current version of the DBN encoding, which is different from the crate version. pub const DBN_VERSION: u8 = 1; diff --git a/rust/dbn/src/publishers.rs b/rust/dbn/src/publishers.rs index fc17994..2ff0861 100644 --- a/rust/dbn/src/publishers.rs +++ b/rust/dbn/src/publishers.rs @@ -1,3 +1,5 @@ +//! Enumerations for different data sources, venues, and publishers. + use std::fmt::{self, Display, Formatter}; use num_enum::{IntoPrimitive, TryFromPrimitive}; diff --git a/rust/dbn/src/record.rs b/rust/dbn/src/record.rs index 37dfca6..edf4489 100644 --- a/rust/dbn/src/record.rs +++ b/rust/dbn/src/record.rs @@ -335,7 +335,7 @@ pub struct OhlcvMsg { pub volume: u64, } -/// Trading status update message. The record of the +/// A trrading status update message. The record of the /// [`Status`](crate::enums::Schema::Status) schema. #[doc(hidden)] #[repr(C)] diff --git a/rust/dbn/src/record_enum.rs b/rust/dbn/src/record_enum.rs new file mode 100644 index 0000000..d989a1b --- /dev/null +++ b/rust/dbn/src/record_enum.rs @@ -0,0 +1,255 @@ +use crate::{ + Error, ErrorMsg, ImbalanceMsg, InstrumentDefMsg, MboMsg, Mbp10Msg, Mbp1Msg, OhlcvMsg, RType, + RecordRef, StatMsg, StatusMsg, SymbolMappingMsg, SystemMsg, TradeMsg, +}; + +/// An owned DBN record type of flexible type. +#[derive(Debug, Clone)] +pub enum RecordEnum { + /// An market-by-order message. + Mbo(MboMsg), + /// A trade message. + Trade(TradeMsg), + /// A market-by-price message with a book depth of 1. + Mbp1(Mbp1Msg), + /// A market-by-price message with a book depth of 10. + Mbp10(Mbp10Msg), + /// An open, high, low, close, and volume message. + Ohlcv(OhlcvMsg), + /// A trading status message. + Status(StatusMsg), + /// An instrument definition message. + InstrumentDef(InstrumentDefMsg), + /// An auction imbalance message. + Imbalance(ImbalanceMsg), + /// A publisher statistic message. + Stat(StatMsg), + /// An error message from the Databento Live Subscription Gateway (LSG). + Error(ErrorMsg), + /// A symbol mapping message. + SymbolMapping(SymbolMappingMsg), + /// A non-error message from the Databento Live Subscription Gateway (LSG). + System(SystemMsg), +} + +/// An immutable reference to a DBN record of flexible type. Unlike [`RecordRef`], this +/// type allows `match`ing. +#[derive(Debug, Copy, Clone)] +pub enum RecordRefEnum<'a> { + /// A reference to a market-by-order message. + Mbo(&'a MboMsg), + /// A reference to a trade message. + Trade(&'a TradeMsg), + /// A reference to a market-by-price message with a book depth of 1. + Mbp1(&'a Mbp1Msg), + /// A reference to a market-by-price message with a book depth of 10. + Mbp10(&'a Mbp10Msg), + /// A reference to an open, high, low, close, and volume message. + Ohlcv(&'a OhlcvMsg), + /// A reference to a trading status message. + Status(&'a StatusMsg), + /// A reference to an instrument definition message. + InstrumentDef(&'a InstrumentDefMsg), + /// A reference to an auction imbalance message. + Imbalance(&'a ImbalanceMsg), + /// A reference to a publisher statistic message. + Stat(&'a StatMsg), + /// A reference to an error message from the Databento Live Subscription Gateway + /// (LSG). + Error(&'a ErrorMsg), + /// A reference to a symbol mapping message. + SymbolMapping(&'a SymbolMappingMsg), + /// A reference to a non-error message from the Databento Live Subscription Gateway + /// (LSG). + System(&'a SystemMsg), +} + +impl<'a> From<&'a RecordEnum> for RecordRefEnum<'a> { + fn from(rec_enum: &'a RecordEnum) -> Self { + match rec_enum { + RecordEnum::Mbo(rec) => Self::Mbo(rec), + RecordEnum::Trade(rec) => Self::Trade(rec), + RecordEnum::Mbp1(rec) => Self::Mbp1(rec), + RecordEnum::Mbp10(rec) => Self::Mbp10(rec), + RecordEnum::Ohlcv(rec) => Self::Ohlcv(rec), + RecordEnum::Status(rec) => Self::Status(rec), + RecordEnum::InstrumentDef(rec) => Self::InstrumentDef(rec), + RecordEnum::Imbalance(rec) => Self::Imbalance(rec), + RecordEnum::Stat(rec) => Self::Stat(rec), + RecordEnum::Error(rec) => Self::Error(rec), + RecordEnum::SymbolMapping(rec) => Self::SymbolMapping(rec), + RecordEnum::System(rec) => Self::System(rec), + } + } +} + +impl<'a> RecordRefEnum<'a> { + /// Converts the reference enum into an owned enum value. + pub fn to_owned(&self) -> RecordEnum { + #[allow(clippy::clone_on_copy)] // required for when trivial_copy feature is disabled + match self { + Self::Mbo(rec) => RecordEnum::from((*rec).clone()), + Self::Trade(rec) => RecordEnum::from((*rec).clone()), + Self::Mbp1(rec) => RecordEnum::from((*rec).clone()), + Self::Mbp10(rec) => RecordEnum::from((*rec).clone()), + Self::Ohlcv(rec) => RecordEnum::from((*rec).clone()), + Self::Status(rec) => RecordEnum::from((*rec).clone()), + Self::InstrumentDef(rec) => RecordEnum::from((*rec).clone()), + Self::Imbalance(rec) => RecordEnum::from((*rec).clone()), + Self::Stat(rec) => RecordEnum::from((*rec).clone()), + Self::Error(rec) => RecordEnum::from((*rec).clone()), + Self::SymbolMapping(rec) => RecordEnum::from((*rec).clone()), + Self::System(rec) => RecordEnum::from((*rec).clone()), + } + } +} + +impl<'a> TryFrom> for RecordRefEnum<'a> { + type Error = Error; + + fn try_from(rec_ref: RecordRef<'a>) -> Result { + Ok(unsafe { + #[allow(deprecated)] + match rec_ref.rtype()? { + RType::Mbo => RecordRefEnum::Mbo(rec_ref.get_unchecked()), + RType::Mbp0 => RecordRefEnum::Trade(rec_ref.get_unchecked()), + RType::Mbp1 => RecordRefEnum::Mbp1(rec_ref.get_unchecked()), + RType::Mbp10 => RecordRefEnum::Mbp10(rec_ref.get_unchecked()), + RType::OhlcvDeprecated + | RType::Ohlcv1S + | RType::Ohlcv1M + | RType::Ohlcv1H + | RType::Ohlcv1D + | RType::OhlcvEod => RecordRefEnum::Ohlcv(rec_ref.get_unchecked()), + RType::Status => RecordRefEnum::Status(rec_ref.get_unchecked()), + RType::InstrumentDef => RecordRefEnum::InstrumentDef(rec_ref.get_unchecked()), + RType::Imbalance => RecordRefEnum::Imbalance(rec_ref.get_unchecked()), + RType::Statistics => RecordRefEnum::Stat(rec_ref.get_unchecked()), + RType::Error => RecordRefEnum::Error(rec_ref.get_unchecked()), + RType::SymbolMapping => RecordRefEnum::SymbolMapping(rec_ref.get_unchecked()), + RType::System => RecordRefEnum::System(rec_ref.get_unchecked()), + } + }) + } +} + +impl From for RecordEnum { + fn from(rec: MboMsg) -> Self { + Self::Mbo(rec) + } +} +impl<'a> From<&'a MboMsg> for RecordRefEnum<'a> { + fn from(rec: &'a MboMsg) -> Self { + Self::Mbo(rec) + } +} +impl From for RecordEnum { + fn from(rec: TradeMsg) -> Self { + Self::Trade(rec) + } +} +impl<'a> From<&'a TradeMsg> for RecordRefEnum<'a> { + fn from(rec: &'a TradeMsg) -> Self { + Self::Trade(rec) + } +} +impl From for RecordEnum { + fn from(rec: Mbp1Msg) -> Self { + Self::Mbp1(rec) + } +} +impl<'a> From<&'a Mbp1Msg> for RecordRefEnum<'a> { + fn from(rec: &'a Mbp1Msg) -> Self { + Self::Mbp1(rec) + } +} +impl From for RecordEnum { + fn from(rec: Mbp10Msg) -> Self { + Self::Mbp10(rec) + } +} +impl<'a> From<&'a Mbp10Msg> for RecordRefEnum<'a> { + fn from(rec: &'a Mbp10Msg) -> Self { + Self::Mbp10(rec) + } +} +impl From for RecordEnum { + fn from(rec: OhlcvMsg) -> Self { + Self::Ohlcv(rec) + } +} +impl<'a> From<&'a OhlcvMsg> for RecordRefEnum<'a> { + fn from(rec: &'a OhlcvMsg) -> Self { + Self::Ohlcv(rec) + } +} +impl From for RecordEnum { + fn from(rec: StatusMsg) -> Self { + Self::Status(rec) + } +} +impl<'a> From<&'a StatusMsg> for RecordRefEnum<'a> { + fn from(rec: &'a StatusMsg) -> Self { + Self::Status(rec) + } +} +impl From for RecordEnum { + fn from(rec: InstrumentDefMsg) -> Self { + Self::InstrumentDef(rec) + } +} +impl<'a> From<&'a InstrumentDefMsg> for RecordRefEnum<'a> { + fn from(rec: &'a InstrumentDefMsg) -> Self { + Self::InstrumentDef(rec) + } +} +impl From for RecordEnum { + fn from(rec: ImbalanceMsg) -> Self { + Self::Imbalance(rec) + } +} +impl<'a> From<&'a ImbalanceMsg> for RecordRefEnum<'a> { + fn from(rec: &'a ImbalanceMsg) -> Self { + Self::Imbalance(rec) + } +} +impl From for RecordEnum { + fn from(rec: StatMsg) -> Self { + Self::Stat(rec) + } +} +impl<'a> From<&'a StatMsg> for RecordRefEnum<'a> { + fn from(rec: &'a StatMsg) -> Self { + Self::Stat(rec) + } +} +impl From for RecordEnum { + fn from(rec: ErrorMsg) -> Self { + Self::Error(rec) + } +} +impl<'a> From<&'a ErrorMsg> for RecordRefEnum<'a> { + fn from(rec: &'a ErrorMsg) -> Self { + Self::Error(rec) + } +} +impl From for RecordEnum { + fn from(rec: SymbolMappingMsg) -> Self { + Self::SymbolMapping(rec) + } +} +impl<'a> From<&'a SymbolMappingMsg> for RecordRefEnum<'a> { + fn from(rec: &'a SymbolMappingMsg) -> Self { + Self::SymbolMapping(rec) + } +} +impl From for RecordEnum { + fn from(rec: SystemMsg) -> Self { + Self::System(rec) + } +} +impl<'a> From<&'a SystemMsg> for RecordRefEnum<'a> { + fn from(rec: &'a SystemMsg) -> Self { + Self::System(rec) + } +} diff --git a/rust/dbn/src/record_ref.rs b/rust/dbn/src/record_ref.rs index 9d3100d..951203c 100644 --- a/rust/dbn/src/record_ref.rs +++ b/rust/dbn/src/record_ref.rs @@ -5,6 +5,7 @@ use std::{marker::PhantomData, mem, ptr::NonNull}; use crate::{ enums::RType, record::{HasRType, RecordHeader}, + RecordEnum, RecordRefEnum, }; /// A wrapper around a non-owning immutable reference to a DBN record. This wrapper @@ -81,7 +82,7 @@ impl<'a> RecordRef<'a> { /// # Errors /// This function returns an error if the `rtype` field does not /// contain a valid, known [`RType`]. - pub fn rtype(&self) -> crate::error::Result { + pub fn rtype(&self) -> crate::Result { self.header().rtype() } @@ -110,6 +111,16 @@ impl<'a> RecordRef<'a> { } } + /// Returns a native Rust enum with a variant for each record type. This allows for + /// pattern `match`ing. + /// + /// # Errors + /// This function returns a conversion error if the rtype does not correspond with + /// any known DBN record type. + pub fn as_enum(&self) -> crate::Result { + RecordRefEnum::try_from(*self) + } + /// Returns a reference to the underlying record of type `T` without checking if /// this object references a record of type `T`. /// @@ -117,13 +128,30 @@ impl<'a> RecordRef<'a> { /// /// # Safety /// The caller needs to validate this object points to a `T`. - pub unsafe fn get_unchecked(&self) -> &T { + pub unsafe fn get_unchecked(&self) -> &'a T { debug_assert!(self.has::()); debug_assert!(self.record_size() >= mem::size_of::()); self.ptr.cast::().as_ref() } } +impl<'a, R> From<&'a R> for RecordRef<'a> +where + R: HasRType, +{ + /// Constructs a new reference to a DBN record. + fn from(rec: &'a R) -> Self { + Self { + // Safe: `R` must be a record because it implements `HasRType`. Casting to `mut` + // is required for `NonNull`, but it is never mutated. + ptr: unsafe { + NonNull::new_unchecked((rec.header() as *const RecordHeader).cast_mut()) + }, + _marker: PhantomData, + } + } +} + impl<'a> AsRef<[u8]> for RecordRef<'a> { fn as_ref(&self) -> &[u8] { // # Safety @@ -132,6 +160,44 @@ impl<'a> AsRef<[u8]> for RecordRef<'a> { } } +impl<'a> From<&'a RecordEnum> for RecordRef<'a> { + fn from(rec_enum: &'a RecordEnum) -> Self { + match rec_enum { + RecordEnum::Mbo(rec) => Self::from(rec), + RecordEnum::Trade(rec) => Self::from(rec), + RecordEnum::Mbp1(rec) => Self::from(rec), + RecordEnum::Mbp10(rec) => Self::from(rec), + RecordEnum::Ohlcv(rec) => Self::from(rec), + RecordEnum::Status(rec) => Self::from(rec), + RecordEnum::InstrumentDef(rec) => Self::from(rec), + RecordEnum::Imbalance(rec) => Self::from(rec), + RecordEnum::Stat(rec) => Self::from(rec), + RecordEnum::Error(rec) => Self::from(rec), + RecordEnum::SymbolMapping(rec) => Self::from(rec), + RecordEnum::System(rec) => Self::from(rec), + } + } +} + +impl<'a> From> for RecordRef<'a> { + fn from(rec_enum: RecordRefEnum<'a>) -> Self { + match rec_enum { + RecordRefEnum::Mbo(rec) => Self::from(rec), + RecordRefEnum::Trade(rec) => Self::from(rec), + RecordRefEnum::Mbp1(rec) => Self::from(rec), + RecordRefEnum::Mbp10(rec) => Self::from(rec), + RecordRefEnum::Ohlcv(rec) => Self::from(rec), + RecordRefEnum::Status(rec) => Self::from(rec), + RecordRefEnum::InstrumentDef(rec) => Self::from(rec), + RecordRefEnum::Imbalance(rec) => Self::from(rec), + RecordRefEnum::Stat(rec) => Self::from(rec), + RecordRefEnum::Error(rec) => Self::from(rec), + RecordRefEnum::SymbolMapping(rec) => Self::from(rec), + RecordRefEnum::System(rec) => Self::from(rec), + } + } +} + #[cfg(test)] mod tests { use std::ffi::c_char; @@ -165,7 +231,7 @@ mod tests { #[test] fn test_has_and_get() { - let target = unsafe { RecordRef::new(SOURCE_RECORD.as_ref()) }; + let target = RecordRef::from(&SOURCE_RECORD); assert!(!target.has::()); assert!(!target.has::()); assert!(!target.has::()); @@ -178,7 +244,7 @@ mod tests { #[test] fn test_as_ref() { - let target = unsafe { RecordRef::new(SOURCE_RECORD.as_ref()) }; + let target = RecordRef::from(&SOURCE_RECORD); let byte_slice = target.as_ref(); assert_eq!(SOURCE_RECORD.record_size(), byte_slice.len()); assert_eq!(target.record_size(), byte_slice.len()); From f2285a48e4bdde7e48569840816d610ade3b8c9b Mon Sep 17 00:00:00 2001 From: Carter Green Date: Wed, 20 Sep 2023 15:31:43 -0500 Subject: [PATCH 06/11] FIX: Add missing `stype_out` check in Metadata --- CHANGELOG.md | 1 + rust/dbn/src/metadata.rs | 29 +++++++++++++++++++++++++---- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6368b1f..6ab5863 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ ## Bug fixes - Fixed lifetime on return value from `RecordRef::get_unchecked` +- Fixed missing check for `stype_out` before building `Metadata` symbology maps ## 0.10.2 - 2023-09-12 ### Bug fixes diff --git a/rust/dbn/src/metadata.rs b/rust/dbn/src/metadata.rs index 32d25d8..64f8e17 100644 --- a/rust/dbn/src/metadata.rs +++ b/rust/dbn/src/metadata.rs @@ -95,9 +95,16 @@ impl Metadata { /// time range of the request. Otherwise, [`Self::symbol_map()`] is recommmended. /// /// # Errors - /// This function returns an error if it can't parse a symbol into a `u32` - /// instrument ID. + /// This function returns an error if `stype_out` is not [`SType::InstrumentId`] or + /// it can't parse a symbol into a `u32` instrument ID. It will also return an error + /// if `date` is outside the query range. pub fn symbol_map_for_date(&self, date: time::Date) -> crate::Result> { + if self.stype_out != SType::InstrumentId { + return Err(crate::Error::BadArgument { + param_name: "stype_out".to_owned(), + desc: "Can only create symbol maps with an stype_out of instrument ID".to_owned(), + }); + } let datetime = PrimitiveDateTime::new(date, time!(0:00)).assume_utc(); // need to compare with `end` as a datetime to handle midnight case if date < self.start().date() || self.end().map_or(false, |end| datetime >= end) { @@ -133,9 +140,15 @@ impl Metadata { /// change, [`Self::symbol_map_for_date()`] is recommended. /// /// # Errors - /// This function returns an error if it can't parse a symbol into a `u32` - /// instrument ID. + /// This function returns an error if `stype_out` is not [`SType::InstrumentId`] or + /// it can't parse a symbol into a `u32` instrument ID. pub fn symbol_map(&self) -> crate::Result> { + if self.stype_out != SType::InstrumentId { + return Err(crate::Error::BadArgument { + param_name: "stype_out".to_owned(), + desc: "Can only create symbol maps with an stype_out of instrument ID".to_owned(), + }); + } let mut index = HashMap::new(); for mapping in self.mappings.iter() { for interval in mapping.intervals.iter() { @@ -929,4 +942,12 @@ mod tests { assert_eq!(symbol_map[&(date!(2023 - 07 - 24), 10174)], "TSLA"); assert_eq!(symbol_map[&(date!(2023 - 07 - 25), 10172)], "TSLA"); } + + #[test] + fn test_other_stype_errors() { + let mut target = metadata_w_mappings(); + target.stype_out = SType::RawSymbol; + assert!(target.symbol_map().is_err()); + assert!(target.symbol_map_for_date(date!(2023 - 07 - 31)).is_err()); + } } From 399fb1577aa4a9ec3153b0eb728d99cf6710cdb7 Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Thu, 21 Sep 2023 10:23:12 +1000 Subject: [PATCH 07/11] MOD: Upgrade to Python 3.10 syntax --- python/databento_dbn.pyi | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/python/databento_dbn.pyi b/python/databento_dbn.pyi index e3bc2aa..9c0419a 100644 --- a/python/databento_dbn.pyi +++ b/python/databento_dbn.pyi @@ -5,14 +5,7 @@ from collections.abc import Iterable from collections.abc import Sequence from datetime import datetime from enum import Enum -from typing import ( - Any, - BinaryIO, - ClassVar, - SupportsBytes, - TextIO, - Union, -) +from typing import Any, BinaryIO, ClassVar, SupportsBytes, TextIO, Union FIXED_PRICE_SCALE: int From 7f20a4be3e6140c4ed5e13038c6439d8b9359259 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Wed, 20 Sep 2023 15:12:43 -0500 Subject: [PATCH 08/11] FIX: Fix missing CSV header with DBN fragments --- CHANGELOG.md | 6 +++-- rust/dbn-cli/src/main.rs | 15 +++++++++++-- rust/dbn-cli/tests/integration_tests.rs | 28 ++++++++++++++++++++++++ rust/dbn/src/decode.rs | 4 ++-- rust/dbn/src/encode.rs | 29 +++++++++++++++++++++++++ 5 files changed, 76 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ab5863..13b5d93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,8 @@ - Added new `EncodeRecordTextExt` trait which is implemented for the CSV and JSON encoders. It adds two methods for encoding a `symbol` field along side the rest of the record fields, matching the behavior of `map_symbols` in the historical API -- Added `encode_header` and `encode_header_for_schema` methods to `CsvEncoder` to give - more flexibility for encoding CSV headers +- Added `encode_header` and `encode_header_for_schema` methods to `CsvEncoder` and + `DynEncoder` to give more flexibility for encoding CSV headers - Added `from_file` and `from_zstd_file` functions to `AsyncDbnDecoder` to match synchronous decoder - Implemented `Copy` for `RecordRef` to make it behave more like a reference @@ -24,6 +24,8 @@ that may contain `ts_out` ## Bug fixes +- Fixed `dbn` CLI not writing CSV header when using `--fragment` and `--zstd-fragment` + flags - Fixed lifetime on return value from `RecordRef::get_unchecked` - Fixed missing check for `stype_out` before building `Metadata` symbology maps diff --git a/rust/dbn-cli/src/main.rs b/rust/dbn-cli/src/main.rs index b8a6b23..d959846 100644 --- a/rust/dbn-cli/src/main.rs +++ b/rust/dbn-cli/src/main.rs @@ -3,9 +3,9 @@ use std::{fs::File, io}; use clap::Parser; use dbn::{ decode::{DbnRecordDecoder, DecodeDbn, DecodeRecordRef, DynDecoder}, - encode::{json, DynEncoder, EncodeDbn, EncodeRecordRef}, + encode::{json, DbnEncodable, DynEncoder, EncodeDbn, EncodeRecordRef}, enums::SType, - MetadataBuilder, + rtype_dispatch, Encoding, MetadataBuilder, }; use dbn_cli::{infer_encoding_and_compression, output_from_args, Args}; @@ -81,7 +81,18 @@ fn write_dbn_frag( args.should_pretty_print, )?; let mut n = 0; + let mut has_written_header = encoding != Encoding::Csv; + fn write_header( + _record: &T, + encoder: &mut DynEncoder>, + ) -> dbn::Result<()> { + encoder.encode_header::(false) + } while let Some(record) = decoder.decode_record_ref()? { + if !has_written_header { + rtype_dispatch!(record, write_header, &mut encoder)??; + has_written_header = true; + } // Assume no ts_out for safety match encoder.encode_record_ref(record) { // Handle broken pipe as a non-error. diff --git a/rust/dbn-cli/tests/integration_tests.rs b/rust/dbn-cli/tests/integration_tests.rs index 4f1ec51..d4e4be0 100644 --- a/rust/dbn-cli/tests/integration_tests.rs +++ b/rust/dbn-cli/tests/integration_tests.rs @@ -427,6 +427,20 @@ fn test_fragment() { .stderr(is_empty()); } +#[test] +fn test_writes_csv_header_for_fragment() { + cmd() + .args([ + &format!("{TEST_DATA_PATH}/test_data.definition.dbn.frag"), + "--fragment", + "--csv", + ]) + .assert() + .success() + .stdout(contains('\n').count(3)) + .stderr(is_empty()); +} + #[test] fn test_zstd_fragment() { cmd() @@ -441,6 +455,20 @@ fn test_zstd_fragment() { .stderr(is_empty()); } +#[test] +fn test_writes_csv_header_for_zstd_fragment() { + cmd() + .args([ + &format!("{TEST_DATA_PATH}/test_data.definition.dbn.frag.zst"), + "--zstd-fragment", + "--csv", + ]) + .assert() + .success() + .stdout(contains('\n').count(3)) + .stderr(is_empty()); +} + #[test] fn test_limit_updates_metadata() { // Check metadata shows limit = 2 diff --git a/rust/dbn/src/decode.rs b/rust/dbn/src/decode.rs index 4d79bfb..181b280 100644 --- a/rust/dbn/src/decode.rs +++ b/rust/dbn/src/decode.rs @@ -69,8 +69,8 @@ pub trait DecodeDbn: DecodeRecordRef + private::BufferSlice { /// [`Error::Decode`](crate::Error::Decode) will be returned. fn decode_record(&mut self) -> crate::Result>; - /// Tries to convert the decoder into a streaming iterator. This lazily decodes the - /// data. + /// Converts the decoder into a streaming iterator of records of type `T`. This + /// lazily decodes the data. fn decode_stream(self) -> StreamIterDecoder where Self: Sized; diff --git a/rust/dbn/src/encode.rs b/rust/dbn/src/encode.rs index d9e299a..b2dfd63 100644 --- a/rust/dbn/src/encode.rs +++ b/rust/dbn/src/encode.rs @@ -24,6 +24,7 @@ pub use self::{ json::Encoder as JsonEncoder, }; +use crate::Schema; use crate::{ decode::DecodeDbn, enums::{Compression, Encoding}, @@ -322,6 +323,34 @@ where )))), } } + + /// Encodes the CSV header for the record type `R`, i.e. the names of each of the + /// fields to the output. + /// + /// If `with_symbol` is `true`, will add a header field for "symbol". + /// + /// # Errors + /// This function returns an error if there's an error writing to `writer`. + pub fn encode_header(&mut self, with_symbol: bool) -> Result<()> { + match &mut self.0 { + DynEncoderImpl::Csv(encoder) => encoder.encode_header::(with_symbol), + _ => Ok(()), + } + } + + /// Encodes the CSV header for `schema`, i.e. the names of each of the fields to + /// the output. + /// + /// If `with_symbol` is `true`, will add a header field for "symbol". + /// + /// # Errors + /// This function returns an error if there's an error writing to `writer`. + pub fn encode_header_for_schema(&mut self, schema: Schema, with_symbol: bool) -> Result<()> { + match &mut self.0 { + DynEncoderImpl::Csv(encoder) => encoder.encode_header_for_schema(schema, with_symbol), + _ => Ok(()), + } + } } impl<'a, W> EncodeRecord for DynEncoder<'a, W> From be391be5450601677a542e81d66d3e0e141cba6e Mon Sep 17 00:00:00 2001 From: Carter Green Date: Thu, 21 Sep 2023 11:51:50 -0500 Subject: [PATCH 09/11] VER: Release DBN 0.11.0 --- CHANGELOG.md | 2 +- Cargo.lock | 10 +++++----- c/Cargo.toml | 2 +- python/Cargo.toml | 2 +- python/pyproject.toml | 2 +- rust/dbn-cli/Cargo.toml | 4 ++-- rust/dbn-macros/Cargo.toml | 2 +- rust/dbn/Cargo.toml | 4 ++-- 8 files changed, 14 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 13b5d93..7683b3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 0.11.0 - TBD +## 0.11.0 - 2023-09-21 ### Enhancements - Added new `EncodeRecordTextExt` trait which is implemented for the CSV and JSON encoders. It adds two methods for encoding a `symbol` field along side the rest of the diff --git a/Cargo.lock b/Cargo.lock index 0af0143..d5906c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -272,7 +272,7 @@ dependencies = [ [[package]] name = "databento-dbn" -version = "0.10.2" +version = "0.11.0" dependencies = [ "dbn", "pyo3", @@ -281,7 +281,7 @@ dependencies = [ [[package]] name = "dbn" -version = "0.10.2" +version = "0.11.0" dependencies = [ "async-compression", "csv", @@ -300,7 +300,7 @@ dependencies = [ [[package]] name = "dbn-c" -version = "0.10.2" +version = "0.11.0" dependencies = [ "anyhow", "cbindgen", @@ -310,7 +310,7 @@ dependencies = [ [[package]] name = "dbn-cli" -version = "0.10.2" +version = "0.11.0" dependencies = [ "anyhow", "assert_cmd", @@ -324,7 +324,7 @@ dependencies = [ [[package]] name = "dbn-macros" -version = "0.10.2" +version = "0.11.0" dependencies = [ "proc-macro-crate", "proc-macro2", diff --git a/c/Cargo.toml b/c/Cargo.toml index 598f0ef..9b4d68a 100644 --- a/c/Cargo.toml +++ b/c/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "dbn-c" authors = ["Databento "] -version = "0.10.2" +version = "0.11.0" edition = "2021" description = "C bindings for working with Databento Binary Encoding (DBN)" license = "Apache-2.0" diff --git a/python/Cargo.toml b/python/Cargo.toml index 5378954..9b67fac 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "databento-dbn" authors = ["Databento "] -version = "0.10.2" +version = "0.11.0" edition = "2021" description = "Python library written in Rust for working with Databento Binary Encoding (DBN)" license = "Apache-2.0" diff --git a/python/pyproject.toml b/python/pyproject.toml index 1f27fbd..9501323 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "databento-dbn" -version = "0.10.2" +version = "0.11.0" description = "Python bindings for encoding and decoding Databento Binary Encoding (DBN)" authors = ["Databento "] license = "Apache-2.0" diff --git a/rust/dbn-cli/Cargo.toml b/rust/dbn-cli/Cargo.toml index 774e7b6..84c7e11 100644 --- a/rust/dbn-cli/Cargo.toml +++ b/rust/dbn-cli/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "dbn-cli" authors = ["Databento "] -version = "0.10.2" +version = "0.11.0" edition = "2021" description = "Command-line utility for converting Databento Binary Encoding (DBN) files to text-based formats" default-run = "dbn" @@ -17,7 +17,7 @@ path = "src/main.rs" [dependencies] # Databento common DBN library -dbn = { path = "../dbn", version = "=0.10.2", default-features = false } +dbn = { path = "../dbn", version = "=0.11.0", default-features = false } # Error handling anyhow = "1.0.72" diff --git a/rust/dbn-macros/Cargo.toml b/rust/dbn-macros/Cargo.toml index 4d8b3a2..4a11f44 100644 --- a/rust/dbn-macros/Cargo.toml +++ b/rust/dbn-macros/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "dbn-macros" authors = ["Databento "] -version = "0.10.2" +version = "0.11.0" edition = "2021" description = "Proc macros for dbn crate" license = "Apache-2.0" diff --git a/rust/dbn/Cargo.toml b/rust/dbn/Cargo.toml index 0ba30da..5b085ea 100644 --- a/rust/dbn/Cargo.toml +++ b/rust/dbn/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "dbn" authors = ["Databento "] -version = "0.10.2" +version = "0.11.0" edition = "2021" description = "Library for working with Databento Binary Encoding (DBN)" license = "Apache-2.0" @@ -25,7 +25,7 @@ serde = ["dep:serde", "time/parsing", "time/serde"] trivial_copy = [] [dependencies] -dbn-macros = { version = "=0.10.2", path = "../dbn-macros" } +dbn-macros = { version = "=0.11.0", path = "../dbn-macros" } # async (de)compression async-compression = { version = "0.4.1", features = ["tokio", "zstd"], optional = true } From 5946b1505439c94e1df13de3c20e6d557850b0b5 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Thu, 21 Sep 2023 14:13:35 -0500 Subject: [PATCH 10/11] ADD: Add OPRA.PILLAR stat types --- CHANGELOG.md | 1 + rust/dbn/src/enums.rs | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7683b3d..6f386a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ - Added `From` DBN records for `RecordRef` - Added re-exports to the top level of the crate for all enums and records for simpler imports +- Added `ClosePrice` and `NetChange` `StatType`s used in the `OPRA.PILLAR` dataset ## Breaking changes - Split `encode_record_ref` into a safe method with no arguments and an unsafe method diff --git a/rust/dbn/src/enums.rs b/rust/dbn/src/enums.rs index 7fd095f..9e553ce 100644 --- a/rust/dbn/src/enums.rs +++ b/rust/dbn/src/enums.rs @@ -641,6 +641,11 @@ pub enum StatType { /// The volume-weighted average price (VWAP) for a fixing period. `price` will be /// set. FixingPrice = 10, + /// The last trade price during a trading session. `price` will be set. + ClosePrice = 11, + /// The change in price from the close price of the previous trading session to the + /// most recent trading session. `price` will be set. + NetChange = 12, } /// The type of [`StatMsg`](crate::record::StatMsg) update. From 0546f0f8b5c99a9118d83686bc346fd59961c0c8 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Thu, 21 Sep 2023 15:09:46 -0500 Subject: [PATCH 11/11] FIX: Fix changelog levels --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f386a8..e6c2712 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,12 +19,12 @@ imports - Added `ClosePrice` and `NetChange` `StatType`s used in the `OPRA.PILLAR` dataset -## Breaking changes +### Breaking changes - Split `encode_record_ref` into a safe method with no arguments and an unsafe method with a `ts_out` parameter to reduce `unsafe` usage when not working with live data that may contain `ts_out` -## Bug fixes +### Bug fixes - Fixed `dbn` CLI not writing CSV header when using `--fragment` and `--zstd-fragment` flags - Fixed lifetime on return value from `RecordRef::get_unchecked`