Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: lazy static runtime in python #2424

Merged
merged 8 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashMap;
use std::sync::Arc;

use crate::error::PythonError;
use crate::utils::{delete_dir, rt, walk_tree};
use deltalake::storage::{DynObjectStore, ListResult, MultipartId, ObjectStoreError, Path};
use deltalake::DeltaTableBuilder;
use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError};
Expand All @@ -10,9 +12,6 @@ use serde::{Deserialize, Serialize};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::runtime::Runtime;

use crate::error::PythonError;
use crate::utils::{delete_dir, rt, walk_tree};

const DEFAULT_MAX_BUFFER_SIZE: i64 = 4 * 1024 * 1024;

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -25,7 +24,7 @@ pub(crate) struct FsConfig {
#[derive(Debug, Clone)]
pub struct DeltaFileSystemHandler {
pub(crate) inner: Arc<DynObjectStore>,
pub(crate) rt: Arc<Runtime>,
pub(crate) rt: Arc<&'static Runtime>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's a static, I don't think it even needs to hold this as part of the struct. Can just reference the global static in the methods.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or you mean to not pass it through the struct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wjones127 I removed it from the structs

pub(crate) config: FsConfig,
pub(crate) known_sizes: Option<HashMap<String, i64>>,
}
Expand Down Expand Up @@ -57,7 +56,7 @@ impl DeltaFileSystemHandler {
.object_store();
Ok(Self {
inner: storage,
rt: Arc::new(rt()?),
rt: Arc::new(rt()),
config: FsConfig {
root_url: table_uri.into(),
options: options.unwrap_or_default(),
Expand Down Expand Up @@ -314,7 +313,7 @@ impl DeltaFileSystemHandler {
#[derive(Debug, Clone)]
pub struct ObjectInputFile {
store: Arc<DynObjectStore>,
rt: Arc<Runtime>,
rt: Arc<&'static Runtime>,
path: Path,
content_length: i64,
#[pyo3(get)]
Expand All @@ -326,7 +325,7 @@ pub struct ObjectInputFile {

impl ObjectInputFile {
pub async fn try_new(
rt: Arc<Runtime>,
rt: Arc<&'static Runtime>,
store: Arc<DynObjectStore>,
path: Path,
size: Option<i64>,
Expand Down Expand Up @@ -493,7 +492,7 @@ impl ObjectInputFile {
#[pyclass(weakref, module = "deltalake._internal")]
pub struct ObjectOutputStream {
store: Arc<DynObjectStore>,
rt: Arc<Runtime>,
rt: Arc<&'static Runtime>,
path: Path,
writer: Box<dyn AsyncWrite + Send + Unpin>,
multipart_id: MultipartId,
Expand All @@ -508,7 +507,7 @@ pub struct ObjectOutputStream {

impl ObjectOutputStream {
pub async fn try_new(
rt: Arc<Runtime>,
rt: Arc<&'static Runtime>,
store: Arc<DynObjectStore>,
path: Path,
max_buffer_size: i64,
Expand Down
99 changes: 44 additions & 55 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,7 @@ use crate::error::DeltaProtocolError;
use crate::error::PythonError;
use crate::filesystem::FsConfig;
use crate::schema::schema_to_pyobject;

#[inline]
fn rt() -> PyResult<tokio::runtime::Runtime> {
tokio::runtime::Runtime::new().map_err(|err| PyRuntimeError::new_err(err.to_string()))
}
use crate::utils::rt;

#[derive(FromPyObject)]
enum PartitionFilterValue<'a> {
Expand Down Expand Up @@ -117,7 +113,7 @@ impl RawDeltaTable {
.map_err(PythonError::from)?;
}

let table = rt()?.block_on(builder.load()).map_err(PythonError::from)?;
let table = rt().block_on(builder.load()).map_err(PythonError::from)?;
Ok(RawDeltaTable {
_table: table,
_config: FsConfig {
Expand Down Expand Up @@ -180,13 +176,13 @@ impl RawDeltaTable {
}

pub fn load_version(&mut self, version: i64) -> PyResult<()> {
Ok(rt()?
Ok(rt()
.block_on(self._table.load_version(version))
.map_err(PythonError::from)?)
}

pub fn get_latest_version(&mut self) -> PyResult<i64> {
Ok(rt()?
Ok(rt()
.block_on(self._table.get_latest_version())
.map_err(PythonError::from)?)
}
Expand All @@ -196,7 +192,7 @@ impl RawDeltaTable {
DateTime::<Utc>::from(DateTime::<FixedOffset>::parse_from_rfc3339(ds).map_err(
|err| PyValueError::new_err(format!("Failed to parse datetime string: {err}")),
)?);
Ok(rt()?
Ok(rt()
.block_on(self._table.load_with_datetime(datetime))
.map_err(PythonError::from)?)
}
Expand Down Expand Up @@ -303,7 +299,7 @@ impl RawDeltaTable {
.with_commit_properties(CommitProperties::default().with_metadata(json_metadata));
};

let (table, metrics) = rt()?
let (table, metrics) = rt()
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Expand Down Expand Up @@ -347,7 +343,7 @@ impl RawDeltaTable {
.with_commit_properties(CommitProperties::default().with_metadata(json_metadata));
};

let (table, metrics) = rt()?
let (table, metrics) = rt()
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Expand Down Expand Up @@ -401,7 +397,7 @@ impl RawDeltaTable {
.map_err(PythonError::from)?;
cmd = cmd.with_filters(&converted_filters);

let (table, metrics) = rt()?
let (table, metrics) = rt()
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Expand Down Expand Up @@ -460,7 +456,7 @@ impl RawDeltaTable {
.map_err(PythonError::from)?;
cmd = cmd.with_filters(&converted_filters);

let (table, metrics) = rt()?
let (table, metrics) = rt()
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Expand Down Expand Up @@ -489,7 +485,7 @@ impl RawDeltaTable {
.with_commit_properties(CommitProperties::default().with_metadata(json_metadata));
};

let table = rt()?
let table = rt()
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Expand Down Expand Up @@ -517,7 +513,7 @@ impl RawDeltaTable {
.with_commit_properties(CommitProperties::default().with_metadata(json_metadata));
};

let table = rt()?
let table = rt()
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Expand Down Expand Up @@ -712,7 +708,7 @@ impl RawDeltaTable {
}
}

let (table, metrics) = rt()?
let (table, metrics) = rt()
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Expand Down Expand Up @@ -756,7 +752,7 @@ impl RawDeltaTable {
.with_commit_properties(CommitProperties::default().with_metadata(json_metadata));
};

let (table, metrics) = rt()?
let (table, metrics) = rt()
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Expand All @@ -765,7 +761,7 @@ impl RawDeltaTable {

/// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table.
pub fn history(&mut self, limit: Option<usize>) -> PyResult<Vec<String>> {
let history = rt()?
let history = rt()
.block_on(self._table.history(limit))
.map_err(PythonError::from)?;
Ok(history
Expand All @@ -776,7 +772,7 @@ impl RawDeltaTable {

pub fn update_incremental(&mut self) -> PyResult<()> {
#[allow(deprecated)]
Ok(rt()?
Ok(rt()
.block_on(self._table.update_incremental(None))
.map_err(PythonError::from)?)
}
Expand Down Expand Up @@ -988,50 +984,47 @@ impl RawDeltaTable {
predicate: None,
};

rt()?
.block_on(
CommitBuilder::from(
CommitProperties::default().with_metadata(
custom_metadata
.unwrap_or_default()
.into_iter()
.map(|(k, v)| (k, v.into())),
),
)
.with_actions(actions)
.build(
Some(self._table.snapshot().map_err(PythonError::from)?),
self._table.log_store(),
operation,
)
.map_err(|err| PythonError::from(DeltaTableError::from(err)))?
.into_future(),
rt().block_on(
CommitBuilder::from(
CommitProperties::default().with_metadata(
custom_metadata
.unwrap_or_default()
.into_iter()
.map(|(k, v)| (k, v.into())),
),
)
.map_err(PythonError::from)?;
.with_actions(actions)
.build(
Some(self._table.snapshot().map_err(PythonError::from)?),
self._table.log_store(),
operation,
)
.map_err(|err| PythonError::from(DeltaTableError::from(err)))?
.into_future(),
)
.map_err(PythonError::from)?;

Ok(())
}

pub fn get_py_storage_backend(&self) -> PyResult<filesystem::DeltaFileSystemHandler> {
Ok(filesystem::DeltaFileSystemHandler {
inner: self._table.object_store(),
rt: Arc::new(rt()?),
rt: Arc::new(rt()),
config: self._config.clone(),
known_sizes: None,
})
}

pub fn create_checkpoint(&self) -> PyResult<()> {
rt()?
.block_on(create_checkpoint(&self._table))
rt().block_on(create_checkpoint(&self._table))
.map_err(PythonError::from)?;

Ok(())
}

pub fn cleanup_metadata(&self) -> PyResult<()> {
rt()?
.block_on(cleanup_metadata(&self._table))
rt().block_on(cleanup_metadata(&self._table))
.map_err(PythonError::from)?;

Ok(())
Expand Down Expand Up @@ -1076,7 +1069,7 @@ impl RawDeltaTable {
.with_commit_properties(CommitProperties::default().with_metadata(json_metadata));
};

let (table, metrics) = rt()?
let (table, metrics) = rt()
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Expand Down Expand Up @@ -1104,7 +1097,7 @@ impl RawDeltaTable {
.with_commit_properties(CommitProperties::default().with_metadata(json_metadata));
};

let (table, metrics) = rt()?
let (table, metrics) = rt()
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Expand Down Expand Up @@ -1354,7 +1347,7 @@ fn batch_distinct(batch: PyArrowType<RecordBatch>) -> PyResult<PyArrowType<Recor
let schema = batch.0.schema();
ctx.register_batch("batch", batch.0)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
let batches = rt()?
let batches = rt()
.block_on(async { ctx.table("batch").await?.distinct()?.collect().await })
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

Expand Down Expand Up @@ -1422,7 +1415,7 @@ fn write_to_deltalake(
let save_mode = mode.parse().map_err(PythonError::from)?;

let options = storage_options.clone().unwrap_or_default();
let table = rt()?
let table = rt()
.block_on(DeltaOps::try_from_uri_with_storage_options(
&table_uri, options,
))
Expand Down Expand Up @@ -1465,8 +1458,7 @@ fn write_to_deltalake(
.with_commit_properties(CommitProperties::default().with_metadata(json_metadata));
};

rt()?
.block_on(builder.into_future())
rt().block_on(builder.into_future())
.map_err(PythonError::from)?;

Ok(())
Expand Down Expand Up @@ -1518,8 +1510,7 @@ fn create_deltalake(
builder = builder.with_metadata(json_metadata);
};

rt()?
.block_on(builder.into_future())
rt().block_on(builder.into_future())
.map_err(PythonError::from)?;

Ok(())
Expand Down Expand Up @@ -1570,8 +1561,7 @@ fn write_new_deltalake(
builder = builder.with_metadata(json_metadata);
};

rt()?
.block_on(builder.into_future())
rt().block_on(builder.into_future())
.map_err(PythonError::from)?;

Ok(())
Expand Down Expand Up @@ -1623,8 +1613,7 @@ fn convert_to_deltalake(
builder = builder.with_metadata(json_metadata);
};

rt()?
.block_on(builder.into_future())
rt().block_on(builder.into_future())
.map_err(PythonError::from)?;
Ok(())
}
Expand Down
9 changes: 4 additions & 5 deletions python/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use deltalake::storage::{ListResult, ObjectStore, ObjectStoreError, ObjectStoreResult, Path};
use futures::future::{join_all, BoxFuture, FutureExt};
use futures::StreamExt;
use pyo3::exceptions::PyRuntimeError;
use pyo3::prelude::*;
use tokio::runtime::Runtime;

#[inline]
pub fn rt() -> PyResult<tokio::runtime::Runtime> {
Runtime::new().map_err(|_| PyRuntimeError::new_err("Couldn't start a new tokio runtime."))
pub fn rt() -> &'static Runtime {
static TOKIO_RT: OnceLock<Runtime> = OnceLock::new();
TOKIO_RT.get_or_init(|| Runtime::new().expect("Failed to create a tokio runtime."))
}

/// walk the "directory" tree along common prefixes in object store
Expand Down
Loading