diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index b50f738bec..7a3872c660 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -1,6 +1,8 @@ use std::collections::HashMap; use std::sync::Arc; +use crate::error::PythonError; +use crate::utils::{delete_dir, rt, walk_tree}; use deltalake::storage::{DynObjectStore, ListResult, MultipartId, ObjectStoreError, Path}; use deltalake::DeltaTableBuilder; use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError}; @@ -8,10 +10,6 @@ use pyo3::prelude::*; use pyo3::types::{IntoPyDict, PyBytes}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncWrite, AsyncWriteExt}; -use tokio::runtime::Runtime; - -use crate::error::PythonError; -use crate::utils::{delete_dir, rt, walk_tree}; const DEFAULT_MAX_BUFFER_SIZE: i64 = 4 * 1024 * 1024; @@ -25,7 +23,6 @@ pub(crate) struct FsConfig { #[derive(Debug, Clone)] pub struct DeltaFileSystemHandler { pub(crate) inner: Arc, - pub(crate) rt: Arc, pub(crate) config: FsConfig, pub(crate) known_sizes: Option>, } @@ -57,7 +54,6 @@ impl DeltaFileSystemHandler { .object_store(); Ok(Self { inner: storage, - rt: Arc::new(rt()?), config: FsConfig { root_url: table_uri.into(), options: options.unwrap_or_default(), @@ -79,8 +75,7 @@ impl DeltaFileSystemHandler { fn copy_file(&self, src: String, dest: String) -> PyResult<()> { let from_path = Self::parse_path(&src); let to_path = Self::parse_path(&dest); - self.rt - .block_on(self.inner.copy(&from_path, &to_path)) + rt().block_on(self.inner.copy(&from_path, &to_path)) .map_err(PythonError::from)?; Ok(()) } @@ -92,16 +87,14 @@ impl DeltaFileSystemHandler { fn delete_dir(&self, path: String) -> PyResult<()> { let path = Self::parse_path(&path); - self.rt - .block_on(delete_dir(self.inner.as_ref(), &path)) + rt().block_on(delete_dir(self.inner.as_ref(), &path)) .map_err(PythonError::from)?; Ok(()) } fn delete_file(&self, path: String) -> PyResult<()> { let path = Self::parse_path(&path); - self.rt - .block_on(self.inner.delete(&path)) + rt().block_on(self.inner.delete(&path)) .map_err(PythonError::from)?; Ok(()) } @@ -122,14 +115,13 @@ impl DeltaFileSystemHandler { for file_path in paths { let path = Self::parse_path(&file_path); let listed = py.allow_threads(|| { - self.rt - .block_on(self.inner.list_with_delimiter(Some(&path))) + rt().block_on(self.inner.list_with_delimiter(Some(&path))) .map_err(PythonError::from) })?; // TODO is there a better way to figure out if we are in a directory? if listed.objects.is_empty() && listed.common_prefixes.is_empty() { - let maybe_meta = py.allow_threads(|| self.rt.block_on(self.inner.head(&path))); + let maybe_meta = py.allow_threads(|| rt().block_on(self.inner.head(&path))); match maybe_meta { Ok(meta) => { let kwargs = HashMap::from([ @@ -186,10 +178,7 @@ impl DeltaFileSystemHandler { }; let path = Self::parse_path(&base_dir); - let list_result = match self - .rt - .block_on(walk_tree(self.inner.clone(), &path, recursive)) - { + let list_result = match rt().block_on(walk_tree(self.inner.clone(), &path, recursive)) { Ok(res) => Ok(res), Err(ObjectStoreError::NotFound { path, source }) => { if allow_not_found { @@ -249,8 +238,7 @@ impl DeltaFileSystemHandler { let from_path = Self::parse_path(&src); let to_path = Self::parse_path(&dest); // TODO check the if not exists semantics - self.rt - .block_on(self.inner.rename(&from_path, &to_path)) + rt().block_on(self.inner.rename(&from_path, &to_path)) .map_err(PythonError::from)?; Ok(()) } @@ -262,10 +250,8 @@ impl DeltaFileSystemHandler { }; let path = Self::parse_path(&path); - let file = self - .rt + let file = rt() .block_on(ObjectInputFile::try_new( - Arc::clone(&self.rt), self.inner.clone(), path, size.copied(), @@ -288,10 +274,8 @@ impl DeltaFileSystemHandler { .map_or(DEFAULT_MAX_BUFFER_SIZE, |v| { v.parse::().unwrap_or(DEFAULT_MAX_BUFFER_SIZE) }); - let file = self - .rt + let file = rt() .block_on(ObjectOutputStream::try_new( - Arc::clone(&self.rt), self.inner.clone(), path, max_buffer_size, @@ -314,7 +298,6 @@ impl DeltaFileSystemHandler { #[derive(Debug, Clone)] pub struct ObjectInputFile { store: Arc, - rt: Arc, path: Path, content_length: i64, #[pyo3(get)] @@ -326,7 +309,6 @@ pub struct ObjectInputFile { impl ObjectInputFile { pub async fn try_new( - rt: Arc, store: Arc, path: Path, size: Option, @@ -345,7 +327,6 @@ impl ObjectInputFile { // https://github.com/apache/arrow/blob/f184255cbb9bf911ea2a04910f711e1a924b12b8/cpp/src/arrow/filesystem/s3fs.cc#L1083 Ok(Self { store, - rt, path, content_length, closed: false, @@ -456,8 +437,7 @@ impl ObjectInputFile { self.pos += nbytes; let data = if nbytes > 0 { py.allow_threads(|| { - self.rt - .block_on(self.store.get_range(&self.path, range)) + rt().block_on(self.store.get_range(&self.path, range)) .map_err(PythonError::from) })? } else { @@ -493,7 +473,6 @@ impl ObjectInputFile { #[pyclass(weakref, module = "deltalake._internal")] pub struct ObjectOutputStream { store: Arc, - rt: Arc, path: Path, writer: Box, multipart_id: MultipartId, @@ -508,7 +487,6 @@ pub struct ObjectOutputStream { impl ObjectOutputStream { pub async fn try_new( - rt: Arc, store: Arc, path: Path, max_buffer_size: i64, @@ -516,7 +494,6 @@ impl ObjectOutputStream { let (multipart_id, writer) = store.put_multipart(&path).await?; Ok(Self { store, - rt, path, writer, multipart_id, @@ -541,11 +518,10 @@ impl ObjectOutputStream { impl ObjectOutputStream { fn close(&mut self, py: Python<'_>) -> PyResult<()> { self.closed = true; - py.allow_threads(|| match self.rt.block_on(self.writer.shutdown()) { + py.allow_threads(|| match rt().block_on(self.writer.shutdown()) { Ok(_) => Ok(()), Err(err) => { - self.rt - .block_on(self.store.abort_multipart(&self.path, &self.multipart_id)) + rt().block_on(self.store.abort_multipart(&self.path, &self.multipart_id)) .map_err(PythonError::from)?; Err(PyIOError::new_err(err.to_string())) } @@ -597,11 +573,10 @@ impl ObjectOutputStream { let len = data.as_bytes().len() as i64; let py = data.py(); let data = data.as_bytes(); - let res = py.allow_threads(|| match self.rt.block_on(self.writer.write_all(data)) { + let res = py.allow_threads(|| match rt().block_on(self.writer.write_all(data)) { Ok(_) => Ok(len), Err(err) => { - self.rt - .block_on(self.store.abort_multipart(&self.path, &self.multipart_id)) + rt().block_on(self.store.abort_multipart(&self.path, &self.multipart_id)) .map_err(PythonError::from)?; Err(PyIOError::new_err(err.to_string())) } @@ -615,11 +590,10 @@ impl ObjectOutputStream { } fn flush(&mut self, py: Python<'_>) -> PyResult<()> { - py.allow_threads(|| match self.rt.block_on(self.writer.flush()) { + py.allow_threads(|| match rt().block_on(self.writer.flush()) { Ok(_) => Ok(()), Err(err) => { - self.rt - .block_on(self.store.abort_multipart(&self.path, &self.multipart_id)) + rt().block_on(self.store.abort_multipart(&self.path, &self.multipart_id)) .map_err(PythonError::from)?; Err(PyIOError::new_err(err.to_string())) } diff --git a/python/src/lib.rs b/python/src/lib.rs index bcf62cdfb7..a64a5efe84 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -52,11 +52,7 @@ use crate::error::DeltaProtocolError; use crate::error::PythonError; use crate::filesystem::FsConfig; use crate::schema::schema_to_pyobject; - -#[inline] -fn rt() -> PyResult { - tokio::runtime::Runtime::new().map_err(|err| PyRuntimeError::new_err(err.to_string())) -} +use crate::utils::rt; #[derive(FromPyObject)] enum PartitionFilterValue<'a> { @@ -117,7 +113,7 @@ impl RawDeltaTable { .map_err(PythonError::from)?; } - let table = rt()?.block_on(builder.load()).map_err(PythonError::from)?; + let table = rt().block_on(builder.load()).map_err(PythonError::from)?; Ok(RawDeltaTable { _table: table, _config: FsConfig { @@ -180,13 +176,13 @@ impl RawDeltaTable { } pub fn load_version(&mut self, version: i64) -> PyResult<()> { - Ok(rt()? + Ok(rt() .block_on(self._table.load_version(version)) .map_err(PythonError::from)?) } pub fn get_latest_version(&mut self) -> PyResult { - Ok(rt()? + Ok(rt() .block_on(self._table.get_latest_version()) .map_err(PythonError::from)?) } @@ -196,7 +192,7 @@ impl RawDeltaTable { DateTime::::from(DateTime::::parse_from_rfc3339(ds).map_err( |err| PyValueError::new_err(format!("Failed to parse datetime string: {err}")), )?); - Ok(rt()? + Ok(rt() .block_on(self._table.load_with_datetime(datetime)) .map_err(PythonError::from)?) } @@ -303,7 +299,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let (table, metrics) = rt()? + let (table, metrics) = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -347,7 +343,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let (table, metrics) = rt()? + let (table, metrics) = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -401,7 +397,7 @@ impl RawDeltaTable { .map_err(PythonError::from)?; cmd = cmd.with_filters(&converted_filters); - let (table, metrics) = rt()? + let (table, metrics) = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -460,7 +456,7 @@ impl RawDeltaTable { .map_err(PythonError::from)?; cmd = cmd.with_filters(&converted_filters); - let (table, metrics) = rt()? + let (table, metrics) = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -489,7 +485,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let table = rt()? + let table = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -517,7 +513,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let table = rt()? + let table = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -712,7 +708,7 @@ impl RawDeltaTable { } } - let (table, metrics) = rt()? + let (table, metrics) = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -756,7 +752,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let (table, metrics) = rt()? + let (table, metrics) = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -765,7 +761,7 @@ impl RawDeltaTable { /// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table. pub fn history(&mut self, limit: Option) -> PyResult> { - let history = rt()? + let history = rt() .block_on(self._table.history(limit)) .map_err(PythonError::from)?; Ok(history @@ -776,7 +772,7 @@ impl RawDeltaTable { pub fn update_incremental(&mut self) -> PyResult<()> { #[allow(deprecated)] - Ok(rt()? + Ok(rt() .block_on(self._table.update_incremental(None)) .map_err(PythonError::from)?) } @@ -988,26 +984,25 @@ impl RawDeltaTable { predicate: None, }; - rt()? - .block_on( - CommitBuilder::from( - CommitProperties::default().with_metadata( - custom_metadata - .unwrap_or_default() - .into_iter() - .map(|(k, v)| (k, v.into())), - ), - ) - .with_actions(actions) - .build( - Some(self._table.snapshot().map_err(PythonError::from)?), - self._table.log_store(), - operation, - ) - .map_err(|err| PythonError::from(DeltaTableError::from(err)))? - .into_future(), + rt().block_on( + CommitBuilder::from( + CommitProperties::default().with_metadata( + custom_metadata + .unwrap_or_default() + .into_iter() + .map(|(k, v)| (k, v.into())), + ), ) - .map_err(PythonError::from)?; + .with_actions(actions) + .build( + Some(self._table.snapshot().map_err(PythonError::from)?), + self._table.log_store(), + operation, + ) + .map_err(|err| PythonError::from(DeltaTableError::from(err)))? + .into_future(), + ) + .map_err(PythonError::from)?; Ok(()) } @@ -1015,23 +1010,20 @@ impl RawDeltaTable { pub fn get_py_storage_backend(&self) -> PyResult { Ok(filesystem::DeltaFileSystemHandler { inner: self._table.object_store(), - rt: Arc::new(rt()?), config: self._config.clone(), known_sizes: None, }) } pub fn create_checkpoint(&self) -> PyResult<()> { - rt()? - .block_on(create_checkpoint(&self._table)) + rt().block_on(create_checkpoint(&self._table)) .map_err(PythonError::from)?; Ok(()) } pub fn cleanup_metadata(&self) -> PyResult<()> { - rt()? - .block_on(cleanup_metadata(&self._table)) + rt().block_on(cleanup_metadata(&self._table)) .map_err(PythonError::from)?; Ok(()) @@ -1076,7 +1068,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let (table, metrics) = rt()? + let (table, metrics) = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -1104,7 +1096,7 @@ impl RawDeltaTable { .with_commit_properties(CommitProperties::default().with_metadata(json_metadata)); }; - let (table, metrics) = rt()? + let (table, metrics) = rt() .block_on(cmd.into_future()) .map_err(PythonError::from)?; self._table.state = table.state; @@ -1354,7 +1346,7 @@ fn batch_distinct(batch: PyArrowType) -> PyResult PyResult { - Runtime::new().map_err(|_| PyRuntimeError::new_err("Couldn't start a new tokio runtime.")) +pub fn rt() -> &'static Runtime { + static TOKIO_RT: OnceLock = OnceLock::new(); + TOKIO_RT.get_or_init(|| Runtime::new().expect("Failed to create a tokio runtime.")) } /// walk the "directory" tree along common prefixes in object store