Skip to content

Commit

Permalink
ADD: Add AsyncDbnDecoder
Browse files Browse the repository at this point in the history
  • Loading branch information
threecgreen committed Jul 19, 2023
1 parent 022c6cf commit 255e5c1
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 10 deletions.
2 changes: 1 addition & 1 deletion rust/dbn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ categories = ["encoding"]
default = []
async = ["dep:async-compression", "dep:tokio"]
python = ["dep:pyo3", "dep:strum"]
serde = ["dep:serde"]
serde = ["dep:serde", "time/parsing", "time/serde"]
# Enables deriving the `Copy` trait for records.
trivial_copy = []

Expand Down
118 changes: 117 additions & 1 deletion rust/dbn/src/decode/dbn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,10 @@ mod tests {
}

#[cfg(feature = "async")]
pub use r#async::{MetadataDecoder as AsyncMetadataDecoder, RecordDecoder as AsyncRecordDecoder};
pub use r#async::{
Decoder as AsyncDecoder, MetadataDecoder as AsyncMetadataDecoder,
RecordDecoder as AsyncRecordDecoder,
};

#[cfg(feature = "async")]
mod r#async {
Expand All @@ -732,6 +735,99 @@ mod r#async {
Metadata, Result, DBN_VERSION, METADATA_FIXED_LEN,
};

/// An async decoder for Databento Binary Encoding (DBN), both metadata and records.
pub struct Decoder<R>
where
R: io::AsyncReadExt + Unpin,
{
metadata: Metadata,
decoder: RecordDecoder<R>,
}

impl<R> Decoder<R>
where
R: io::AsyncReadExt + Unpin,
{
/// Creates a new async DBN [`Decoder`] from `reader`.
///
/// # Errors
/// This function will return an error if it is unable to parse the metadata in `reader`.
pub async fn new(mut reader: R) -> crate::Result<Self> {
let metadata = MetadataDecoder::new(&mut reader).decode().await?;
Ok(Self {
metadata,
decoder: RecordDecoder::new(reader),
})
}

/// Returns a mutable reference to the inner reader.
pub fn get_mut(&mut self) -> &mut R {
self.decoder.get_mut()
}

/// Consumes the decoder and returns the inner reader.
pub fn into_inner(self) -> R {
self.decoder.into_inner()
}

/// Returns a reference to the decoded metadata.
pub fn metadata(&self) -> &Metadata {
&self.metadata
}

/// Tries to decode a single record and returns a reference to the record that
/// lasts until the next method call. Returns `None` if `reader` has been
/// exhausted.
///
/// # Errors
/// This function returns an error if the underlying reader returns an
/// error of a kind other than `io::ErrorKind::UnexpectedEof` upon reading.
///
/// If the next record is of a different type than `T`,
/// this function returns an error of kind `io::ErrorKind::InvalidData`.
pub async fn decode_record<'a, T: HasRType + 'a>(&'a mut self) -> Result<Option<&T>> {
self.decoder.decode().await
}

/// Tries to decode a single record and returns a reference to the record that
/// lasts until the next method call. Returns `None` if `reader` has been
/// exhausted.
///
/// # Errors
/// This function returns an error if the underlying reader returns an
/// error of a kind other than `io::ErrorKind::UnexpectedEof` upon reading.
/// It will also return an error if it encounters an invalid record.
pub async fn decode_record_ref(&mut self) -> Result<Option<RecordRef>> {
self.decoder.decode_ref().await
}
}

impl<'a, R> Decoder<ZstdDecoder<BufReader<R>>>
where
R: io::AsyncReadExt + Unpin,
{
/// Creates a new async DBN [`Decoder`] from Zstandard-compressed `reader`.
///
/// # Errors
/// This function will return an error if it is unable to parse the metadata in `reader`.
pub async fn with_zstd(reader: R) -> crate::Result<Self> {
Decoder::new(ZstdDecoder::new(BufReader::new(reader))).await
}
}

impl<'a, R> Decoder<ZstdDecoder<R>>
where
R: io::AsyncBufReadExt + Unpin,
{
/// Creates a new async DBN [`Decoder`] from Zstandard-compressed buffered `reader`.
///
/// # Errors
/// This function will return an error if it is unable to parse the metadata in `reader`.
pub async fn with_zstd_buffer(reader: R) -> crate::Result<Self> {
Decoder::new(ZstdDecoder::new(reader)).await
}
}

/// An async decoder for files and streams of Databento Binary Encoding (DBN) records.
pub struct RecordDecoder<R>
where
Expand Down Expand Up @@ -821,6 +917,26 @@ mod r#async {
}
}

impl<R> RecordDecoder<ZstdDecoder<BufReader<R>>>
where
R: io::AsyncReadExt + Unpin,
{
/// Creates a new async DBN [`RecordDecoder`] from a Zstandard-compressed `reader`.
pub fn with_zstd(reader: R) -> Self {
RecordDecoder::new(ZstdDecoder::new(BufReader::new(reader)))
}
}

impl<R> RecordDecoder<ZstdDecoder<R>>
where
R: io::AsyncBufReadExt + Unpin,
{
/// Creates a new async DBN [`RecordDecoder`] from a Zstandard-compressed buffered `reader`.
pub fn with_zstd_buffer(reader: R) -> Self {
RecordDecoder::new(ZstdDecoder::new(reader))
}
}

impl<R> From<MetadataDecoder<R>> for RecordDecoder<R>
where
R: io::AsyncReadExt + Unpin,
Expand Down
2 changes: 1 addition & 1 deletion rust/dbn/src/decode/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ mod tests {
}

#[cfg(feature = "async")]
pub use r#async::DynReader as AsyncDynReader;
pub use self::{dbn::AsyncDecoder as AsyncDbnDecoder, r#async::DynReader as AsyncDynReader};

#[cfg(feature = "async")]
mod r#async {
Expand Down
11 changes: 6 additions & 5 deletions rust/dbn/src/encode/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ where
pub(crate) mod serialize {
use std::ffi::c_char;

use time::format_description::FormatItem;

use crate::json_writer::{JsonObjectWriter, NULL};
use crate::pretty::{fmt_px, fmt_ts};
use crate::UNDEF_TIMESTAMP;
Expand Down Expand Up @@ -413,10 +411,13 @@ pub(crate) mod serialize {
key: &str,
date: &time::Date,
) {
const DATE_FORMAT: &[FormatItem<'static>] =
time::macros::format_description!("[year]-[month]-[day]");
if PRETTY_TS {
writer.value(key, &date.format(DATE_FORMAT).unwrap_or_default());
writer.value(
key,
&date
.format(crate::metadata::DATE_FORMAT)
.unwrap_or_default(),
);
} else {
let mut date_int = date.year() as u32 * 10_000;
date_int += date.month() as u32 * 100;
Expand Down
29 changes: 27 additions & 2 deletions rust/dbn/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::num::NonZeroU64;
// of pyo3's attribute macros. See https://github.com/PyO3/pyo3/issues/780
#[cfg(not(feature = "python"))]
pub use dbn_macros::MockPyo3;
#[cfg(feature = "serde")]
use serde::Deserialize;

use crate::enums::{SType, Schema};
use crate::record::as_u8_slice;
Expand Down Expand Up @@ -295,6 +297,7 @@ impl Default for MetadataBuilder<Unset, Unset, Unset, Unset, Unset> {

/// A raw symbol and its symbol mappings for different time ranges within the query range.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Deserialize))]
#[cfg_attr(feature = "python", derive(pyo3::FromPyObject))]
pub struct SymbolMapping {
/// The symbol assigned by publisher.
Expand All @@ -305,11 +308,33 @@ pub struct SymbolMapping {

/// The resolved symbol for a date range.
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Deserialize))]
pub struct MappingInterval {
/// The UTC start date of interval.
/// The UTC start date of interval (inclusive).
#[cfg_attr(
feature = "serde",
serde(rename = "d0", deserialize_with = "deserialize_date")
)]
pub start_date: time::Date,
/// The UTC end date of interval.
/// The UTC end date of interval (exclusive).
#[cfg_attr(
feature = "serde",
serde(rename = "d1", deserialize_with = "deserialize_date")
)]
pub end_date: time::Date,
/// The resolved symbol for this interval.
#[cfg_attr(feature = "serde", serde(rename = "s"))]
pub symbol: String,
}

/// The date format used for date strings when serializing [`Metadata`].
pub const DATE_FORMAT: &[time::format_description::FormatItem<'static>] =
time::macros::format_description!("[year]-[month]-[day]");

#[cfg(feature = "serde")]
fn deserialize_date<'de, D: serde::Deserializer<'de>>(
deserializer: D,
) -> Result<time::Date, D::Error> {
let date_str = String::deserialize(deserializer)?;
time::Date::parse(&date_str, DATE_FORMAT).map_err(serde::de::Error::custom)
}

0 comments on commit 255e5c1

Please sign in to comment.