From 1401d18739a226f9ec9168b449a762049f9ca6fd Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Tue, 16 Apr 2024 10:40:53 +0200 Subject: [PATCH 1/8] use lazy static rt --- python/src/lib.rs | 60 +++++++++++++++++++++++++++-------------------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/python/src/lib.rs b/python/src/lib.rs index bcf62cdfb7..d1ce97e616 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -43,6 +43,7 @@ use deltalake::partitions::PartitionFilter; use deltalake::protocol::{DeltaOperation, SaveMode}; use deltalake::DeltaTableBuilder; use deltalake::{DeltaOps, DeltaResult}; +use lazy_static::lazy_static; use pyo3::exceptions::{PyRuntimeError, PyValueError}; use pyo3::prelude::*; use pyo3::types::{PyDict, PyFrozenSet}; @@ -58,6 +59,15 @@ fn rt() -> PyResult { tokio::runtime::Runtime::new().map_err(|err| PyRuntimeError::new_err(err.to_string())) } + +lazy_static! { + #[derive(Debug)] + pub static ref TOKIO_RT: tokio::runtime::Runtime = tokio::runtime::Runtime::new() + .expect("Failed to create a tokio runtime."); +} + + + #[derive(FromPyObject)] enum PartitionFilterValue<'a> { Single(&'a str), @@ -117,7 +127,7 @@ impl RawDeltaTable { .map_err(PythonError::from)?; } - let table = rt()?.block_on(builder.load()).map_err(PythonError::from)?; + let table = TOKIO_RT.block_on(builder.load()).map_err(PythonError::from)?; Ok(RawDeltaTable { _table: table, _config: FsConfig { @@ -180,13 +190,13 @@ impl RawDeltaTable { } pub fn load_version(&mut self, version: i64) -> PyResult<()> { - Ok(rt()? + Ok(TOKIO_RT .block_on(self._table.load_version(version)) .map_err(PythonError::from)?) } pub fn get_latest_version(&mut self) -> PyResult { - Ok(rt()? + Ok(TOKIO_RT .block_on(self._table.get_latest_version()) .map_err(PythonError::from)?) } @@ -196,7 +206,7 @@ impl RawDeltaTable { DateTime::::from(DateTime::::parse_from_rfc3339(ds).map_err( |err| PyValueError::new_err(format!("Failed to parse datetime string: {err}")), )?); - Ok(rt()? + Ok(TOKIO_RT .block_on(self._table.load_with_datetime(datetime)) .map_err(PythonError::from)?) } @@ -303,7 +313,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let (table, metrics) = rt()? + let (table, metrics) = TOKIO_RT .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -347,7 +357,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let (table, metrics) = rt()? + let (table, metrics) = TOKIO_RT .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -401,7 +411,7 @@ impl RawDeltaTable { .map_err(PythonError::from)?; cmd = cmd.with_filters(&converted_filters); - let (table, metrics) = rt()? + let (table, metrics) = TOKIO_RT .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -460,7 +470,7 @@ impl RawDeltaTable { .map_err(PythonError::from)?; cmd = cmd.with_filters(&converted_filters); - let (table, metrics) = rt()? + let (table, metrics) = TOKIO_RT .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -489,7 +499,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let table = rt()? + let table = TOKIO_RT .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -517,7 +527,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let table = rt()? + let table = TOKIO_RT .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -712,7 +722,7 @@ impl RawDeltaTable { } } - let (table, metrics) = rt()? + let (table, metrics) = TOKIO_RT .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -756,7 +766,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let (table, metrics) = rt()? + let (table, metrics) = TOKIO_RT .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -765,7 +775,7 @@ impl RawDeltaTable { /// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table. pub fn history(&mut self, limit: Option) -> PyResult> { - let history = rt()? + let history = TOKIO_RT .block_on(self._table.history(limit)) .map_err(PythonError::from)?; Ok(history @@ -776,7 +786,7 @@ impl RawDeltaTable { pub fn update_incremental(&mut self) -> PyResult<()> { #[allow(deprecated)] - Ok(rt()? + Ok(TOKIO_RT .block_on(self._table.update_incremental(None)) .map_err(PythonError::from)?) } @@ -988,7 +998,7 @@ impl RawDeltaTable { predicate: None, }; - rt()? + TOKIO_RT .block_on( CommitBuilder::from( CommitProperties::default().with_metadata( @@ -1022,7 +1032,7 @@ impl RawDeltaTable { } pub fn create_checkpoint(&self) -> PyResult<()> { - rt()? + TOKIO_RT .block_on(create_checkpoint(&self._table)) .map_err(PythonError::from)?; @@ -1030,7 +1040,7 @@ impl RawDeltaTable { } pub fn cleanup_metadata(&self) -> PyResult<()> { - rt()? + TOKIO_RT .block_on(cleanup_metadata(&self._table)) .map_err(PythonError::from)?; @@ -1076,7 +1086,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let (table, metrics) = rt()? + let (table, metrics) = TOKIO_RT .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -1104,7 +1114,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let (table, metrics) = rt()? + let (table, metrics) = TOKIO_RT .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -1354,7 +1364,7 @@ fn batch_distinct(batch: PyArrowType) -> PyResult Date: Tue, 16 Apr 2024 10:41:09 +0200 Subject: [PATCH 2/8] fmt --- python/src/lib.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/python/src/lib.rs b/python/src/lib.rs index d1ce97e616..afc6b4bff5 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -59,15 +59,12 @@ fn rt() -> PyResult { tokio::runtime::Runtime::new().map_err(|err| PyRuntimeError::new_err(err.to_string())) } - lazy_static! { #[derive(Debug)] pub static ref TOKIO_RT: tokio::runtime::Runtime = tokio::runtime::Runtime::new() .expect("Failed to create a tokio runtime."); } - - #[derive(FromPyObject)] enum PartitionFilterValue<'a> { Single(&'a str), @@ -127,7 +124,9 @@ impl RawDeltaTable { .map_err(PythonError::from)?; } - let table = TOKIO_RT.block_on(builder.load()).map_err(PythonError::from)?; + let table = TOKIO_RT + .block_on(builder.load()) + .map_err(PythonError::from)?; Ok(RawDeltaTable { _table: table, _config: FsConfig { From f1fa3bd758ba765a92afae46a73b2b06d7ff47e6 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Tue, 16 Apr 2024 17:29:22 +0200 Subject: [PATCH 3/8] use oncelock, refactor rt lifetime --- python/src/filesystem.rs | 13 ++++---- python/src/lib.rs | 64 +++++++++++++++++----------------------- python/src/utils.rs | 11 +++---- 3 files changed, 39 insertions(+), 49 deletions(-) diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index b50f738bec..317a2562ba 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -9,7 +9,6 @@ use pyo3::types::{IntoPyDict, PyBytes}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::runtime::Runtime; - use crate::error::PythonError; use crate::utils::{delete_dir, rt, walk_tree}; @@ -25,7 +24,7 @@ pub(crate) struct FsConfig { #[derive(Debug, Clone)] pub struct DeltaFileSystemHandler { pub(crate) inner: Arc, - pub(crate) rt: Arc, + pub(crate) rt: Arc<&'static Runtime>, pub(crate) config: FsConfig, pub(crate) known_sizes: Option>, } @@ -57,7 +56,7 @@ impl DeltaFileSystemHandler { .object_store(); Ok(Self { inner: storage, - rt: Arc::new(rt()?), + rt: Arc::new(rt()), config: FsConfig { root_url: table_uri.into(), options: options.unwrap_or_default(), @@ -314,7 +313,7 @@ impl DeltaFileSystemHandler { #[derive(Debug, Clone)] pub struct ObjectInputFile { store: Arc, - rt: Arc, + rt: Arc<&'static Runtime>, path: Path, content_length: i64, #[pyo3(get)] @@ -326,7 +325,7 @@ pub struct ObjectInputFile { impl ObjectInputFile { pub async fn try_new( - rt: Arc, + rt: Arc<&'static Runtime>, store: Arc, path: Path, size: Option, @@ -493,7 +492,7 @@ impl ObjectInputFile { #[pyclass(weakref, module = "deltalake._internal")] pub struct ObjectOutputStream { store: Arc, - rt: Arc, + rt: Arc<&'static Runtime>, path: Path, writer: Box, multipart_id: MultipartId, @@ -508,7 +507,7 @@ pub struct ObjectOutputStream { impl ObjectOutputStream { pub async fn try_new( - rt: Arc, + rt: Arc<&'static Runtime>, store: Arc, path: Path, max_buffer_size: i64, diff --git a/python/src/lib.rs b/python/src/lib.rs index afc6b4bff5..c916633716 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -43,7 +43,6 @@ use deltalake::partitions::PartitionFilter; use deltalake::protocol::{DeltaOperation, SaveMode}; use deltalake::DeltaTableBuilder; use deltalake::{DeltaOps, DeltaResult}; -use lazy_static::lazy_static; use pyo3::exceptions::{PyRuntimeError, PyValueError}; use pyo3::prelude::*; use pyo3::types::{PyDict, PyFrozenSet}; @@ -53,17 +52,8 @@ use crate::error::DeltaProtocolError; use crate::error::PythonError; use crate::filesystem::FsConfig; use crate::schema::schema_to_pyobject; +use crate::utils::rt; -#[inline] -fn rt() -> PyResult { - tokio::runtime::Runtime::new().map_err(|err| PyRuntimeError::new_err(err.to_string())) -} - -lazy_static! { - #[derive(Debug)] - pub static ref TOKIO_RT: tokio::runtime::Runtime = tokio::runtime::Runtime::new() - .expect("Failed to create a tokio runtime."); -} #[derive(FromPyObject)] enum PartitionFilterValue<'a> { @@ -124,7 +114,7 @@ impl RawDeltaTable { .map_err(PythonError::from)?; } - let table = TOKIO_RT + let table = rt() .block_on(builder.load()) .map_err(PythonError::from)?; Ok(RawDeltaTable { @@ -189,13 +179,13 @@ impl RawDeltaTable { } pub fn load_version(&mut self, version: i64) -> PyResult<()> { - Ok(TOKIO_RT + Ok(rt() .block_on(self._table.load_version(version)) .map_err(PythonError::from)?) } pub fn get_latest_version(&mut self) -> PyResult { - Ok(TOKIO_RT + Ok(rt() .block_on(self._table.get_latest_version()) .map_err(PythonError::from)?) } @@ -205,7 +195,7 @@ impl RawDeltaTable { DateTime::::from(DateTime::::parse_from_rfc3339(ds).map_err( |err| PyValueError::new_err(format!("Failed to parse datetime string: {err}")), )?); - Ok(TOKIO_RT + Ok(rt() .block_on(self._table.load_with_datetime(datetime)) .map_err(PythonError::from)?) } @@ -312,7 +302,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let (table, metrics) = TOKIO_RT + let (table, metrics) = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -356,7 +346,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let (table, metrics) = TOKIO_RT + let (table, metrics) = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -410,7 +400,7 @@ impl RawDeltaTable { .map_err(PythonError::from)?; cmd = cmd.with_filters(&converted_filters); - let (table, metrics) = TOKIO_RT + let (table, metrics) = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -469,7 +459,7 @@ impl RawDeltaTable { .map_err(PythonError::from)?; cmd = cmd.with_filters(&converted_filters); - let (table, metrics) = TOKIO_RT + let (table, metrics) = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -498,7 +488,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let table = TOKIO_RT + let table = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -526,7 +516,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let table = TOKIO_RT + let table = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -721,7 +711,7 @@ impl RawDeltaTable { } } - let (table, metrics) = TOKIO_RT + let (table, metrics) = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -765,7 +755,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let (table, metrics) = TOKIO_RT + let (table, metrics) = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -774,7 +764,7 @@ impl RawDeltaTable { /// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table. pub fn history(&mut self, limit: Option) -> PyResult> { - let history = TOKIO_RT + let history = rt() .block_on(self._table.history(limit)) .map_err(PythonError::from)?; Ok(history @@ -785,7 +775,7 @@ impl RawDeltaTable { pub fn update_incremental(&mut self) -> PyResult<()> { #[allow(deprecated)] - Ok(TOKIO_RT + Ok(rt() .block_on(self._table.update_incremental(None)) .map_err(PythonError::from)?) } @@ -997,7 +987,7 @@ impl RawDeltaTable { predicate: None, }; - TOKIO_RT + rt() .block_on( CommitBuilder::from( CommitProperties::default().with_metadata( @@ -1024,14 +1014,14 @@ impl RawDeltaTable { pub fn get_py_storage_backend(&self) -> PyResult { Ok(filesystem::DeltaFileSystemHandler { inner: self._table.object_store(), - rt: Arc::new(rt()?), + rt: Arc::new(rt()), config: self._config.clone(), known_sizes: None, }) } pub fn create_checkpoint(&self) -> PyResult<()> { - TOKIO_RT + rt() .block_on(create_checkpoint(&self._table)) .map_err(PythonError::from)?; @@ -1039,7 +1029,7 @@ impl RawDeltaTable { } pub fn cleanup_metadata(&self) -> PyResult<()> { - TOKIO_RT + rt() .block_on(cleanup_metadata(&self._table)) .map_err(PythonError::from)?; @@ -1085,7 +1075,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let (table, metrics) = TOKIO_RT + let (table, metrics) = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -1113,7 +1103,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let (table, metrics) = TOKIO_RT + let (table, metrics) = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -1363,7 +1353,7 @@ fn batch_distinct(batch: PyArrowType) -> PyResult PyResult { - Runtime::new().map_err(|_| PyRuntimeError::new_err("Couldn't start a new tokio runtime.")) +pub fn rt() -> &'static Runtime { + static TOKIO_RT: OnceLock = OnceLock::new(); + TOKIO_RT.get_or_init(|| { + Runtime::new().expect("Failed to create a tokio runtime.") + }) } /// walk the "directory" tree along common prefixes in object store From eacb74bf1e22ea3b4ed09a8e10c9cb7f909c18c0 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Tue, 16 Apr 2024 17:29:33 +0200 Subject: [PATCH 4/8] fmt --- python/src/filesystem.rs | 4 +-- python/src/lib.rs | 60 +++++++++++++++++----------------------- python/src/utils.rs | 4 +-- 3 files changed, 28 insertions(+), 40 deletions(-) diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 317a2562ba..79dc7cf4d3 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -1,6 +1,8 @@ use std::collections::HashMap; use std::sync::Arc; +use crate::error::PythonError; +use crate::utils::{delete_dir, rt, walk_tree}; use deltalake::storage::{DynObjectStore, ListResult, MultipartId, ObjectStoreError, Path}; use deltalake::DeltaTableBuilder; use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError}; @@ -9,8 +11,6 @@ use pyo3::types::{IntoPyDict, PyBytes}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::runtime::Runtime; -use crate::error::PythonError; -use crate::utils::{delete_dir, rt, walk_tree}; const DEFAULT_MAX_BUFFER_SIZE: i64 = 4 * 1024 * 1024; diff --git a/python/src/lib.rs b/python/src/lib.rs index c916633716..2fc348c9f0 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -54,7 +54,6 @@ use crate::filesystem::FsConfig; use crate::schema::schema_to_pyobject; use crate::utils::rt; - #[derive(FromPyObject)] enum PartitionFilterValue<'a> { Single(&'a str), @@ -114,9 +113,7 @@ impl RawDeltaTable { .map_err(PythonError::from)?; } - let table = rt() - .block_on(builder.load()) - .map_err(PythonError::from)?; + let table = rt().block_on(builder.load()).map_err(PythonError::from)?; Ok(RawDeltaTable { _table: table, _config: FsConfig { @@ -987,26 +984,25 @@ impl RawDeltaTable { predicate: None, }; - rt() - .block_on( - CommitBuilder::from( - CommitProperties::default().with_metadata( - custom_metadata - .unwrap_or_default() - .into_iter() - .map(|(k, v)| (k, v.into())), - ), - ) - .with_actions(actions) - .build( - Some(self._table.snapshot().map_err(PythonError::from)?), - self._table.log_store(), - operation, - ) - .map_err(|err| PythonError::from(DeltaTableError::from(err)))? - .into_future(), + rt().block_on( + CommitBuilder::from( + CommitProperties::default().with_metadata( + custom_metadata + .unwrap_or_default() + .into_iter() + .map(|(k, v)| (k, v.into())), + ), ) - .map_err(PythonError::from)?; + .with_actions(actions) + .build( + Some(self._table.snapshot().map_err(PythonError::from)?), + self._table.log_store(), + operation, + ) + .map_err(|err| PythonError::from(DeltaTableError::from(err)))? + .into_future(), + ) + .map_err(PythonError::from)?; Ok(()) } @@ -1021,16 +1017,14 @@ impl RawDeltaTable { } pub fn create_checkpoint(&self) -> PyResult<()> { - rt() - .block_on(create_checkpoint(&self._table)) + rt().block_on(create_checkpoint(&self._table)) .map_err(PythonError::from)?; Ok(()) } pub fn cleanup_metadata(&self) -> PyResult<()> { - rt() - .block_on(cleanup_metadata(&self._table)) + rt().block_on(cleanup_metadata(&self._table)) .map_err(PythonError::from)?; Ok(()) @@ -1464,8 +1458,7 @@ fn write_to_deltalake( .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - rt() - .block_on(builder.into_future()) + rt().block_on(builder.into_future()) .map_err(PythonError::from)?; Ok(()) @@ -1517,8 +1510,7 @@ fn create_deltalake( builder = builder.with_metadata(json_metadata); }; - rt() - .block_on(builder.into_future()) + rt().block_on(builder.into_future()) .map_err(PythonError::from)?; Ok(()) @@ -1569,8 +1561,7 @@ fn write_new_deltalake( builder = builder.with_metadata(json_metadata); }; - rt() - .block_on(builder.into_future()) + rt().block_on(builder.into_future()) .map_err(PythonError::from)?; Ok(()) @@ -1622,8 +1613,7 @@ fn convert_to_deltalake( builder = builder.with_metadata(json_metadata); }; - rt() - .block_on(builder.into_future()) + rt().block_on(builder.into_future()) .map_err(PythonError::from)?; Ok(()) } diff --git a/python/src/utils.rs b/python/src/utils.rs index 38b34eacf5..6d0f69b242 100644 --- a/python/src/utils.rs +++ b/python/src/utils.rs @@ -8,9 +8,7 @@ use tokio::runtime::Runtime; #[inline] pub fn rt() -> &'static Runtime { static TOKIO_RT: OnceLock = OnceLock::new(); - TOKIO_RT.get_or_init(|| { - Runtime::new().expect("Failed to create a tokio runtime.") - }) + TOKIO_RT.get_or_init(|| Runtime::new().expect("Failed to create a tokio runtime.")) } /// walk the "directory" tree along common prefixes in object store From 6260b881cd8dddce6e2926854d05d6b21b10b0ca Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Tue, 16 Apr 2024 17:34:58 +0200 Subject: [PATCH 5/8] remove unnecessary pointer --- python/src/filesystem.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 79dc7cf4d3..6ac2aae965 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -24,7 +24,7 @@ pub(crate) struct FsConfig { #[derive(Debug, Clone)] pub struct DeltaFileSystemHandler { pub(crate) inner: Arc, - pub(crate) rt: Arc<&'static Runtime>, + pub(crate) rt: &'static Runtime, pub(crate) config: FsConfig, pub(crate) known_sizes: Option>, } @@ -313,7 +313,7 @@ impl DeltaFileSystemHandler { #[derive(Debug, Clone)] pub struct ObjectInputFile { store: Arc, - rt: Arc<&'static Runtime>, + rt: &'static Runtime, path: Path, content_length: i64, #[pyo3(get)] @@ -325,7 +325,7 @@ pub struct ObjectInputFile { impl ObjectInputFile { pub async fn try_new( - rt: Arc<&'static Runtime>, + rt: &'static Runtime, store: Arc, path: Path, size: Option, @@ -492,7 +492,7 @@ impl ObjectInputFile { #[pyclass(weakref, module = "deltalake._internal")] pub struct ObjectOutputStream { store: Arc, - rt: Arc<&'static Runtime>, + rt: &'static Runtime, path: Path, writer: Box, multipart_id: MultipartId, @@ -507,7 +507,7 @@ pub struct ObjectOutputStream { impl ObjectOutputStream { pub async fn try_new( - rt: Arc<&'static Runtime>, + rt: &'static Runtime, store: Arc, path: Path, max_buffer_size: i64, From b8860b5fe238f454ae14ba646067824ce720bb64 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Tue, 16 Apr 2024 17:39:02 +0200 Subject: [PATCH 6/8] remove last arcs --- python/src/filesystem.rs | 6 +++--- python/src/lib.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 6ac2aae965..04e72b8d98 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -56,7 +56,7 @@ impl DeltaFileSystemHandler { .object_store(); Ok(Self { inner: storage, - rt: Arc::new(rt()), + rt: rt(), config: FsConfig { root_url: table_uri.into(), options: options.unwrap_or_default(), @@ -264,7 +264,7 @@ impl DeltaFileSystemHandler { let file = self .rt .block_on(ObjectInputFile::try_new( - Arc::clone(&self.rt), + self.rt, self.inner.clone(), path, size.copied(), @@ -290,7 +290,7 @@ impl DeltaFileSystemHandler { let file = self .rt .block_on(ObjectOutputStream::try_new( - Arc::clone(&self.rt), + &self.rt, self.inner.clone(), path, max_buffer_size, diff --git a/python/src/lib.rs b/python/src/lib.rs index 2fc348c9f0..6082625e92 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1010,7 +1010,7 @@ impl RawDeltaTable { pub fn get_py_storage_backend(&self) -> PyResult { Ok(filesystem::DeltaFileSystemHandler { inner: self._table.object_store(), - rt: Arc::new(rt()), + rt: &Arc::new(rt()), config: self._config.clone(), known_sizes: None, }) From cc4bdb93501113ce9ad075d36650ac9b36ec6113 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Tue, 16 Apr 2024 17:41:12 +0200 Subject: [PATCH 7/8] >_< clippy --- python/src/filesystem.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 04e72b8d98..9884e5ec8a 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -290,7 +290,7 @@ impl DeltaFileSystemHandler { let file = self .rt .block_on(ObjectOutputStream::try_new( - &self.rt, + self.rt, self.inner.clone(), path, max_buffer_size, From 4a0e47ee1f82cc65c78a97a5722890fb962f0e34 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Tue, 16 Apr 2024 18:19:29 +0200 Subject: [PATCH 8/8] don't pass in struct --- python/src/filesystem.rs | 57 +++++++++++----------------------------- python/src/lib.rs | 1 - 2 files changed, 16 insertions(+), 42 deletions(-) diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 9884e5ec8a..7a3872c660 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -10,7 +10,6 @@ use pyo3::prelude::*; use pyo3::types::{IntoPyDict, PyBytes}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncWrite, AsyncWriteExt}; -use tokio::runtime::Runtime; const DEFAULT_MAX_BUFFER_SIZE: i64 = 4 * 1024 * 1024; @@ -24,7 +23,6 @@ pub(crate) struct FsConfig { #[derive(Debug, Clone)] pub struct DeltaFileSystemHandler { pub(crate) inner: Arc, - pub(crate) rt: &'static Runtime, pub(crate) config: FsConfig, pub(crate) known_sizes: Option>, } @@ -56,7 +54,6 @@ impl DeltaFileSystemHandler { .object_store(); Ok(Self { inner: storage, - rt: rt(), config: FsConfig { root_url: table_uri.into(), options: options.unwrap_or_default(), @@ -78,8 +75,7 @@ impl DeltaFileSystemHandler { fn copy_file(&self, src: String, dest: String) -> PyResult<()> { let from_path = Self::parse_path(&src); let to_path = Self::parse_path(&dest); - self.rt - .block_on(self.inner.copy(&from_path, &to_path)) + rt().block_on(self.inner.copy(&from_path, &to_path)) .map_err(PythonError::from)?; Ok(()) } @@ -91,16 +87,14 @@ impl DeltaFileSystemHandler { fn delete_dir(&self, path: String) -> PyResult<()> { let path = Self::parse_path(&path); - self.rt - .block_on(delete_dir(self.inner.as_ref(), &path)) + rt().block_on(delete_dir(self.inner.as_ref(), &path)) .map_err(PythonError::from)?; Ok(()) } fn delete_file(&self, path: String) -> PyResult<()> { let path = Self::parse_path(&path); - self.rt - .block_on(self.inner.delete(&path)) + rt().block_on(self.inner.delete(&path)) .map_err(PythonError::from)?; Ok(()) } @@ -121,14 +115,13 @@ impl DeltaFileSystemHandler { for file_path in paths { let path = Self::parse_path(&file_path); let listed = py.allow_threads(|| { - self.rt - .block_on(self.inner.list_with_delimiter(Some(&path))) + rt().block_on(self.inner.list_with_delimiter(Some(&path))) .map_err(PythonError::from) })?; // TODO is there a better way to figure out if we are in a directory? if listed.objects.is_empty() && listed.common_prefixes.is_empty() { - let maybe_meta = py.allow_threads(|| self.rt.block_on(self.inner.head(&path))); + let maybe_meta = py.allow_threads(|| rt().block_on(self.inner.head(&path))); match maybe_meta { Ok(meta) => { let kwargs = HashMap::from([ @@ -185,10 +178,7 @@ impl DeltaFileSystemHandler { }; let path = Self::parse_path(&base_dir); - let list_result = match self - .rt - .block_on(walk_tree(self.inner.clone(), &path, recursive)) - { + let list_result = match rt().block_on(walk_tree(self.inner.clone(), &path, recursive)) { Ok(res) => Ok(res), Err(ObjectStoreError::NotFound { path, source }) => { if allow_not_found { @@ -248,8 +238,7 @@ impl DeltaFileSystemHandler { let from_path = Self::parse_path(&src); let to_path = Self::parse_path(&dest); // TODO check the if not exists semantics - self.rt - .block_on(self.inner.rename(&from_path, &to_path)) + rt().block_on(self.inner.rename(&from_path, &to_path)) .map_err(PythonError::from)?; Ok(()) } @@ -261,10 +250,8 @@ impl DeltaFileSystemHandler { }; let path = Self::parse_path(&path); - let file = self - .rt + let file = rt() .block_on(ObjectInputFile::try_new( - self.rt, self.inner.clone(), path, size.copied(), @@ -287,10 +274,8 @@ impl DeltaFileSystemHandler { .map_or(DEFAULT_MAX_BUFFER_SIZE, |v| { v.parse::().unwrap_or(DEFAULT_MAX_BUFFER_SIZE) }); - let file = self - .rt + let file = rt() .block_on(ObjectOutputStream::try_new( - self.rt, self.inner.clone(), path, max_buffer_size, @@ -313,7 +298,6 @@ impl DeltaFileSystemHandler { #[derive(Debug, Clone)] pub struct ObjectInputFile { store: Arc, - rt: &'static Runtime, path: Path, content_length: i64, #[pyo3(get)] @@ -325,7 +309,6 @@ pub struct ObjectInputFile { impl ObjectInputFile { pub async fn try_new( - rt: &'static Runtime, store: Arc, path: Path, size: Option, @@ -344,7 +327,6 @@ impl ObjectInputFile { // https://github.com/apache/arrow/blob/f184255cbb9bf911ea2a04910f711e1a924b12b8/cpp/src/arrow/filesystem/s3fs.cc#L1083 Ok(Self { store, - rt, path, content_length, closed: false, @@ -455,8 +437,7 @@ impl ObjectInputFile { self.pos += nbytes; let data = if nbytes > 0 { py.allow_threads(|| { - self.rt - .block_on(self.store.get_range(&self.path, range)) + rt().block_on(self.store.get_range(&self.path, range)) .map_err(PythonError::from) })? } else { @@ -492,7 +473,6 @@ impl ObjectInputFile { #[pyclass(weakref, module = "deltalake._internal")] pub struct ObjectOutputStream { store: Arc, - rt: &'static Runtime, path: Path, writer: Box, multipart_id: MultipartId, @@ -507,7 +487,6 @@ pub struct ObjectOutputStream { impl ObjectOutputStream { pub async fn try_new( - rt: &'static Runtime, store: Arc, path: Path, max_buffer_size: i64, @@ -515,7 +494,6 @@ impl ObjectOutputStream { let (multipart_id, writer) = store.put_multipart(&path).await?; Ok(Self { store, - rt, path, writer, multipart_id, @@ -540,11 +518,10 @@ impl ObjectOutputStream { impl ObjectOutputStream { fn close(&mut self, py: Python<'_>) -> PyResult<()> { self.closed = true; - py.allow_threads(|| match self.rt.block_on(self.writer.shutdown()) { + py.allow_threads(|| match rt().block_on(self.writer.shutdown()) { Ok(_) => Ok(()), Err(err) => { - self.rt - .block_on(self.store.abort_multipart(&self.path, &self.multipart_id)) + rt().block_on(self.store.abort_multipart(&self.path, &self.multipart_id)) .map_err(PythonError::from)?; Err(PyIOError::new_err(err.to_string())) } @@ -596,11 +573,10 @@ impl ObjectOutputStream { let len = data.as_bytes().len() as i64; let py = data.py(); let data = data.as_bytes(); - let res = py.allow_threads(|| match self.rt.block_on(self.writer.write_all(data)) { + let res = py.allow_threads(|| match rt().block_on(self.writer.write_all(data)) { Ok(_) => Ok(len), Err(err) => { - self.rt - .block_on(self.store.abort_multipart(&self.path, &self.multipart_id)) + rt().block_on(self.store.abort_multipart(&self.path, &self.multipart_id)) .map_err(PythonError::from)?; Err(PyIOError::new_err(err.to_string())) } @@ -614,11 +590,10 @@ impl ObjectOutputStream { } fn flush(&mut self, py: Python<'_>) -> PyResult<()> { - py.allow_threads(|| match self.rt.block_on(self.writer.flush()) { + py.allow_threads(|| match rt().block_on(self.writer.flush()) { Ok(_) => Ok(()), Err(err) => { - self.rt - .block_on(self.store.abort_multipart(&self.path, &self.multipart_id)) + rt().block_on(self.store.abort_multipart(&self.path, &self.multipart_id)) .map_err(PythonError::from)?; Err(PyIOError::new_err(err.to_string())) } diff --git a/python/src/lib.rs b/python/src/lib.rs index 6082625e92..a64a5efe84 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1010,7 +1010,6 @@ impl RawDeltaTable { pub fn get_py_storage_backend(&self) -> PyResult { Ok(filesystem::DeltaFileSystemHandler { inner: self._table.object_store(), - rt: &Arc::new(rt()), config: self._config.clone(), known_sizes: None, })