From 255e5c10778b9009519b37904ec4d84dbbc602b8 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Tue, 18 Jul 2023 07:52:33 -0500 Subject: [PATCH] ADD: Add `AsyncDbnDecoder` --- rust/dbn/Cargo.toml | 2 +- rust/dbn/src/decode/dbn.rs | 118 +++++++++++++++++++++++++++++++++++- rust/dbn/src/decode/mod.rs | 2 +- rust/dbn/src/encode/json.rs | 11 ++-- rust/dbn/src/metadata.rs | 29 ++++++++- 5 files changed, 152 insertions(+), 10 deletions(-) diff --git a/rust/dbn/Cargo.toml b/rust/dbn/Cargo.toml index 461286b..d527d70 100644 --- a/rust/dbn/Cargo.toml +++ b/rust/dbn/Cargo.toml @@ -14,7 +14,7 @@ categories = ["encoding"] default = [] async = ["dep:async-compression", "dep:tokio"] python = ["dep:pyo3", "dep:strum"] -serde = ["dep:serde"] +serde = ["dep:serde", "time/parsing", "time/serde"] # Enables deriving the `Copy` trait for records. trivial_copy = [] diff --git a/rust/dbn/src/decode/dbn.rs b/rust/dbn/src/decode/dbn.rs index 84966ab..e898530 100644 --- a/rust/dbn/src/decode/dbn.rs +++ b/rust/dbn/src/decode/dbn.rs @@ -717,7 +717,10 @@ mod tests { } #[cfg(feature = "async")] -pub use r#async::{MetadataDecoder as AsyncMetadataDecoder, RecordDecoder as AsyncRecordDecoder}; +pub use r#async::{ + Decoder as AsyncDecoder, MetadataDecoder as AsyncMetadataDecoder, + RecordDecoder as AsyncRecordDecoder, +}; #[cfg(feature = "async")] mod r#async { @@ -732,6 +735,99 @@ mod r#async { Metadata, Result, DBN_VERSION, METADATA_FIXED_LEN, }; + /// An async decoder for Databento Binary Encoding (DBN), both metadata and records. + pub struct Decoder + where + R: io::AsyncReadExt + Unpin, + { + metadata: Metadata, + decoder: RecordDecoder, + } + + impl Decoder + where + R: io::AsyncReadExt + Unpin, + { + /// Creates a new async DBN [`Decoder`] from `reader`. + /// + /// # Errors + /// This function will return an error if it is unable to parse the metadata in `reader`. + pub async fn new(mut reader: R) -> crate::Result { + let metadata = MetadataDecoder::new(&mut reader).decode().await?; + Ok(Self { + metadata, + decoder: RecordDecoder::new(reader), + }) + } + + /// Returns a mutable reference to the inner reader. + pub fn get_mut(&mut self) -> &mut R { + self.decoder.get_mut() + } + + /// Consumes the decoder and returns the inner reader. + pub fn into_inner(self) -> R { + self.decoder.into_inner() + } + + /// Returns a reference to the decoded metadata. + pub fn metadata(&self) -> &Metadata { + &self.metadata + } + + /// Tries to decode a single record and returns a reference to the record that + /// lasts until the next method call. Returns `None` if `reader` has been + /// exhausted. + /// + /// # Errors + /// This function returns an error if the underlying reader returns an + /// error of a kind other than `io::ErrorKind::UnexpectedEof` upon reading. + /// + /// If the next record is of a different type than `T`, + /// this function returns an error of kind `io::ErrorKind::InvalidData`. + pub async fn decode_record<'a, T: HasRType + 'a>(&'a mut self) -> Result> { + self.decoder.decode().await + } + + /// Tries to decode a single record and returns a reference to the record that + /// lasts until the next method call. Returns `None` if `reader` has been + /// exhausted. + /// + /// # Errors + /// This function returns an error if the underlying reader returns an + /// error of a kind other than `io::ErrorKind::UnexpectedEof` upon reading. + /// It will also return an error if it encounters an invalid record. + pub async fn decode_record_ref(&mut self) -> Result> { + self.decoder.decode_ref().await + } + } + + impl<'a, R> Decoder>> + where + R: io::AsyncReadExt + Unpin, + { + /// Creates a new async DBN [`Decoder`] from Zstandard-compressed `reader`. + /// + /// # Errors + /// This function will return an error if it is unable to parse the metadata in `reader`. + pub async fn with_zstd(reader: R) -> crate::Result { + Decoder::new(ZstdDecoder::new(BufReader::new(reader))).await + } + } + + impl<'a, R> Decoder> + where + R: io::AsyncBufReadExt + Unpin, + { + /// Creates a new async DBN [`Decoder`] from Zstandard-compressed buffered `reader`. + /// + /// # Errors + /// This function will return an error if it is unable to parse the metadata in `reader`. + pub async fn with_zstd_buffer(reader: R) -> crate::Result { + Decoder::new(ZstdDecoder::new(reader)).await + } + } + /// An async decoder for files and streams of Databento Binary Encoding (DBN) records. pub struct RecordDecoder where @@ -821,6 +917,26 @@ mod r#async { } } + impl RecordDecoder>> + where + R: io::AsyncReadExt + Unpin, + { + /// Creates a new async DBN [`RecordDecoder`] from a Zstandard-compressed `reader`. + pub fn with_zstd(reader: R) -> Self { + RecordDecoder::new(ZstdDecoder::new(BufReader::new(reader))) + } + } + + impl RecordDecoder> + where + R: io::AsyncBufReadExt + Unpin, + { + /// Creates a new async DBN [`RecordDecoder`] from a Zstandard-compressed buffered `reader`. + pub fn with_zstd_buffer(reader: R) -> Self { + RecordDecoder::new(ZstdDecoder::new(reader)) + } + } + impl From> for RecordDecoder where R: io::AsyncReadExt + Unpin, diff --git a/rust/dbn/src/decode/mod.rs b/rust/dbn/src/decode/mod.rs index 4808b1f..71ed9a9 100644 --- a/rust/dbn/src/decode/mod.rs +++ b/rust/dbn/src/decode/mod.rs @@ -299,7 +299,7 @@ mod tests { } #[cfg(feature = "async")] -pub use r#async::DynReader as AsyncDynReader; +pub use self::{dbn::AsyncDecoder as AsyncDbnDecoder, r#async::DynReader as AsyncDynReader}; #[cfg(feature = "async")] mod r#async { diff --git a/rust/dbn/src/encode/json.rs b/rust/dbn/src/encode/json.rs index 3ab46d5..fda25e5 100644 --- a/rust/dbn/src/encode/json.rs +++ b/rust/dbn/src/encode/json.rs @@ -92,8 +92,6 @@ where pub(crate) mod serialize { use std::ffi::c_char; - use time::format_description::FormatItem; - use crate::json_writer::{JsonObjectWriter, NULL}; use crate::pretty::{fmt_px, fmt_ts}; use crate::UNDEF_TIMESTAMP; @@ -413,10 +411,13 @@ pub(crate) mod serialize { key: &str, date: &time::Date, ) { - const DATE_FORMAT: &[FormatItem<'static>] = - time::macros::format_description!("[year]-[month]-[day]"); if PRETTY_TS { - writer.value(key, &date.format(DATE_FORMAT).unwrap_or_default()); + writer.value( + key, + &date + .format(crate::metadata::DATE_FORMAT) + .unwrap_or_default(), + ); } else { let mut date_int = date.year() as u32 * 10_000; date_int += date.month() as u32 * 100; diff --git a/rust/dbn/src/metadata.rs b/rust/dbn/src/metadata.rs index f452541..36b41ca 100644 --- a/rust/dbn/src/metadata.rs +++ b/rust/dbn/src/metadata.rs @@ -6,6 +6,8 @@ use std::num::NonZeroU64; // of pyo3's attribute macros. See https://github.com/PyO3/pyo3/issues/780 #[cfg(not(feature = "python"))] pub use dbn_macros::MockPyo3; +#[cfg(feature = "serde")] +use serde::Deserialize; use crate::enums::{SType, Schema}; use crate::record::as_u8_slice; @@ -295,6 +297,7 @@ impl Default for MetadataBuilder { /// A raw symbol and its symbol mappings for different time ranges within the query range. #[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(Deserialize))] #[cfg_attr(feature = "python", derive(pyo3::FromPyObject))] pub struct SymbolMapping { /// The symbol assigned by publisher. @@ -305,11 +308,33 @@ pub struct SymbolMapping { /// The resolved symbol for a date range. #[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(Deserialize))] pub struct MappingInterval { - /// The UTC start date of interval. + /// The UTC start date of interval (inclusive). + #[cfg_attr( + feature = "serde", + serde(rename = "d0", deserialize_with = "deserialize_date") + )] pub start_date: time::Date, - /// The UTC end date of interval. + /// The UTC end date of interval (exclusive). + #[cfg_attr( + feature = "serde", + serde(rename = "d1", deserialize_with = "deserialize_date") + )] pub end_date: time::Date, /// The resolved symbol for this interval. + #[cfg_attr(feature = "serde", serde(rename = "s"))] pub symbol: String, } + +/// The date format used for date strings when serializing [`Metadata`]. +pub const DATE_FORMAT: &[time::format_description::FormatItem<'static>] = + time::macros::format_description!("[year]-[month]-[day]"); + +#[cfg(feature = "serde")] +fn deserialize_date<'de, D: serde::Deserializer<'de>>( + deserializer: D, +) -> Result { + let date_str = String::deserialize(deserializer)?; + time::Date::parse(&date_str, DATE_FORMAT).map_err(serde::de::Error::custom) +}