Skip to content

Commit

Permalink
Catalog V2 (#2071)
Browse files Browse the repository at this point in the history
* Refactor data types for generic catalog writing
* Simplify schema metadata logic
* Simplify orderbook delta metadata
* Add pyo3 bindings for catalog
* Add query function to catalog
* Add persist to json function and round trip test
* Add max_row_group_size option to parquet writer
* Fix chunk metadata usage for order book delta
* Fix row group size in quotes test data
* Fix quotes tick data test

Requires additional development and testing prior to production use.
  • Loading branch information
twitu authored Dec 18, 2024
1 parent 9e24c24 commit 0952886
Show file tree
Hide file tree
Showing 21 changed files with 613 additions and 134 deletions.
28 changes: 28 additions & 0 deletions nautilus_core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions nautilus_core/adapters/tardis/src/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ fn batch_and_write_bars(bars: Vec<Bar>, bar_type: &BarType, date: NaiveDate, pat
};

let filepath = path.join(parquet_filepath_bars(bar_type, date));
match write_batch_to_parquet(&batch, &filepath, None) {
match write_batch_to_parquet(batch, &filepath, None) {
Ok(()) => tracing::info!("File written: {}", filepath.display()),
Err(e) => tracing::error!("Error writing {}: {e:?}", filepath.display()),
}
Expand Down Expand Up @@ -463,7 +463,7 @@ fn write_batch(
path: &Path,
) {
let filepath = path.join(parquet_filepath(typename, instrument_id, date));
match write_batch_to_parquet(&batch, &filepath, None) {
match write_batch_to_parquet(batch, &filepath, None) {
Ok(()) => tracing::info!("File written: {}", filepath.display()),
Err(e) => tracing::error!("Error writing {}: {e:?}", filepath.display()),
}
Expand Down
5 changes: 3 additions & 2 deletions nautilus_core/model/src/data/deltas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ use nautilus_core::{

use super::{delta::OrderBookDelta, GetTsInit};
use crate::identifiers::InstrumentId;
use serde::{Deserialize, Serialize};

/// Represents a grouped batch of `OrderBookDelta` updates for an `OrderBook`.
///
/// This type cannot be `repr(C)` due to the `deltas` vec.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model")
Expand Down Expand Up @@ -137,7 +138,7 @@ impl GetTsInit for OrderBookDeltas {
/// dereferenced to `OrderBookDeltas`, providing access to `OrderBookDeltas`'s methods without
/// having to manually access the underlying `OrderBookDeltas` instance.
#[repr(C)]
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[allow(non_camel_case_types)]
pub struct OrderBookDeltas_API(Box<OrderBookDeltas>);

Expand Down
30 changes: 29 additions & 1 deletion nautilus_core/model/src/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use crate::{
/// Not recommended for storing large amounts of data, as the largest variant is significantly
/// larger (10x) than the smallest.
#[repr(C)]
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[allow(clippy::large_enum_variant)]
pub enum Data {
Delta(OrderBookDelta),
Expand All @@ -66,6 +66,34 @@ pub enum Data {
Bar(Bar),
}

macro_rules! impl_try_from_data {
($variant:ident, $type:ty) => {
impl TryFrom<Data> for $type {
type Error = ();

fn try_from(value: Data) -> Result<Self, Self::Error> {
match value {
Data::$variant(x) => Ok(x),
_ => Err(()),
}
}
}
};
}

impl_try_from_data!(Delta, OrderBookDelta);
impl_try_from_data!(Deltas, OrderBookDeltas_API);
impl_try_from_data!(Depth10, OrderBookDepth10);
impl_try_from_data!(Quote, QuoteTick);
impl_try_from_data!(Trade, TradeTick);
impl_try_from_data!(Bar, Bar);

pub fn to_variant<T: TryFrom<Data>>(data: Vec<Data>) -> Vec<T> {
data.into_iter()
.filter_map(|d| T::try_from(d).ok())
.collect()
}

impl Data {
/// Returns the instrument ID for the data.
pub fn instrument_id(&self) -> InstrumentId {
Expand Down
6 changes: 6 additions & 0 deletions nautilus_core/persistence/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ nautilus-serialization = { path = "../serialization" }

anyhow = { workspace = true }
futures = { workspace = true }
heck = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
pyo3 = { workspace = true, optional = true }
rand = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
thiserror = { workspace = true }
binary-heap-plus = "0.5.0"
Expand All @@ -32,6 +36,8 @@ criterion = { workspace = true }
rstest = { workspace = true }
quickcheck = "1"
quickcheck_macros = "1"
tempfile.workspace = true
pretty_assertions = "1.4.1"
[target.'cfg(target_os = "linux")'.dependencies]
procfs = "0.17.0"

Expand Down
193 changes: 193 additions & 0 deletions nautilus_core/persistence/src/backend/catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
use itertools::Itertools;
use log::info;
use nautilus_core::nanos::UnixNanos;
use nautilus_model::data::bar::Bar;
use nautilus_model::data::delta::OrderBookDelta;
use nautilus_model::data::depth::OrderBookDepth10;
use nautilus_model::data::quote::QuoteTick;
use nautilus_model::data::trade::TradeTick;
use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
use serde::Serialize;
use std::path::PathBuf;

use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use heck::ToSnakeCase;
use nautilus_model::data::{Data, GetTsInit};
use nautilus_serialization::parquet::write_batches_to_parquet;

use super::session::{self, build_query, DataBackendSession, QueryResult};

pub struct ParquetDataCatalog {
base_path: PathBuf,
batch_size: usize,
session: DataBackendSession,
}

impl ParquetDataCatalog {
pub fn new(base_path: PathBuf, batch_size: Option<usize>) -> Self {
let batch_size = batch_size.unwrap_or(5000);
Self {
base_path,
batch_size,
session: session::DataBackendSession::new(batch_size),
}
}

// TODO: fix path creation
fn make_path(&self, type_name: &str, instrument_id: Option<&String>) -> PathBuf {
let mut path = self.base_path.join("data").join(type_name.to_lowercase());

if let Some(id) = instrument_id {
path = path.join(id);
}

std::fs::create_dir_all(&path).expect("Failed to create directory");
let file_path = path.join("data.parquet");
info!("Created directory path: {:?}", file_path);
file_path
}

fn check_ascending_timestamps<T: GetTsInit>(data: &[T], type_name: &str) {
assert!(
data.windows(2).all(|w| w[0].ts_init() <= w[1].ts_init()),
"{} timestamps must be in ascending order",
type_name
);
}

pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> Vec<RecordBatch>
where
T: GetTsInit + EncodeToRecordBatch,
{
data.into_iter()
.chunks(self.batch_size)
.into_iter()
.map(|chunk| {
// Take first element and extract metadata
// SAFETY: Unwrap safe as already checked that `data` not empty
let data = chunk.collect_vec();
let metadata = EncodeToRecordBatch::chunk_metadata(&data);
T::encode_batch(&metadata, &data).expect("Expected to encode batch")
})
.collect()
}

pub fn write_to_json<T>(&self, data: Vec<T>) -> PathBuf
where
T: GetTsInit + Serialize,
{
let type_name = std::any::type_name::<T>().to_snake_case();
ParquetDataCatalog::check_ascending_timestamps(&data, &type_name);

let path = self.make_path(&type_name, None);
let json_path = path.with_extension("json");

info!(
"Writing {} records of {} data to {:?}",
data.len(),
type_name,
json_path
);

let file = std::fs::File::create(&json_path)
.unwrap_or_else(|_| panic!("Failed to create JSON file at {:?}", json_path));

serde_json::to_writer(file, &data)
.unwrap_or_else(|_| panic!("Failed to write {} to JSON", type_name));

json_path
}

pub fn write_to_parquet<T>(&self, data: Vec<T>)
where
T: GetTsInit + EncodeToRecordBatch,
{
let type_name = std::any::type_name::<T>().to_snake_case();
ParquetDataCatalog::check_ascending_timestamps(&data, &type_name);

let batches = self.data_to_record_batches(data);
if let Some(batch) = batches.first() {
let schema = batch.schema();
let instrument_id = schema.metadata.get("instrument_id");
let path = self.make_path(&type_name, instrument_id);

// Write all batches to parquet file
info!(
"Writing {} batches of {} data to {:?}",
batches.len(),
type_name,
path
);
// TODO: Set writer to property to limit max row group size
write_batches_to_parquet(&batches, &path, None, Some(5000))
.unwrap_or_else(|_| panic!("Failed to write {} to parquet", type_name));
}
}

/// Query data loaded in the catalog
pub fn query<T>(
&mut self,
// use instrument_ids or bar_types to query specific subset of the data
instrument_ids: Vec<String>,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
where_clause: Option<&str>,
) -> Result<QueryResult>
where
T: DecodeDataFromRecordBatch,
{
let mut paths = Vec::new();
for instrument_id in instrument_ids.iter() {
paths.push(self.make_path("TODO", Some(instrument_id)));
}

// If no specific instrument_id is selected query all files for the data type
if paths.is_empty() {
paths.push(self.make_path("TODO", None));
}

for path in paths.iter() {
let path = path.to_str().unwrap();
let query = build_query(path, start, end, where_clause);
self.session.add_file::<T>(path, path, Some(&query))?;
}

Ok(self.session.get_query_result())
}

pub fn write_data_enum(&self, data: Vec<Data>) {
let mut delta: Vec<OrderBookDelta> = Vec::new();
let mut depth10: Vec<OrderBookDepth10> = Vec::new();
let mut quote: Vec<QuoteTick> = Vec::new();
let mut trade: Vec<TradeTick> = Vec::new();
let mut bar: Vec<Bar> = Vec::new();

for d in data.iter().cloned() {
match d {
Data::Delta(d) => {
delta.push(d);
}
Data::Depth10(d) => {
depth10.push(d);
}
Data::Quote(d) => {
quote.push(d);
}
Data::Trade(d) => {
trade.push(d);
}
Data::Bar(d) => {
bar.push(d);
}
Data::Deltas(_) => continue,
}
}

self.write_to_parquet(delta);
self.write_to_parquet(depth10);
self.write_to_parquet(quote);
self.write_to_parquet(trade);
self.write_to_parquet(bar);
}
}
1 change: 1 addition & 0 deletions nautilus_core/persistence/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@

//! Provides an Apache Parquet backend powered by [DataFusion](https://arrow.apache.org/datafusion).
pub mod catalog;
pub mod kmerge_batch;
pub mod session;
Loading

0 comments on commit 0952886

Please sign in to comment.