Skip to content

Commit

Permalink
feat: lazy static runtime in python (#2424)
Browse files Browse the repository at this point in the history
# Description
As suggested by @wjones127 to create a lazy static runtime, supersedes
this PR: #1950
  • Loading branch information
ion-elgreco authored Apr 16, 2024
1 parent aa8f4d5 commit 9736522
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 104 deletions.
62 changes: 18 additions & 44 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use std::collections::HashMap;
use std::sync::Arc;

use crate::error::PythonError;
use crate::utils::{delete_dir, rt, walk_tree};
use deltalake::storage::{DynObjectStore, ListResult, MultipartId, ObjectStoreError, Path};
use deltalake::DeltaTableBuilder;
use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyBytes};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::runtime::Runtime;

use crate::error::PythonError;
use crate::utils::{delete_dir, rt, walk_tree};

const DEFAULT_MAX_BUFFER_SIZE: i64 = 4 * 1024 * 1024;

Expand All @@ -25,7 +23,6 @@ pub(crate) struct FsConfig {
#[derive(Debug, Clone)]
pub struct DeltaFileSystemHandler {
pub(crate) inner: Arc<DynObjectStore>,
pub(crate) rt: Arc<Runtime>,
pub(crate) config: FsConfig,
pub(crate) known_sizes: Option<HashMap<String, i64>>,
}
Expand Down Expand Up @@ -57,7 +54,6 @@ impl DeltaFileSystemHandler {
.object_store();
Ok(Self {
inner: storage,
rt: Arc::new(rt()?),
config: FsConfig {
root_url: table_uri.into(),
options: options.unwrap_or_default(),
Expand All @@ -79,8 +75,7 @@ impl DeltaFileSystemHandler {
fn copy_file(&self, src: String, dest: String) -> PyResult<()> {
let from_path = Self::parse_path(&src);
let to_path = Self::parse_path(&dest);
self.rt
.block_on(self.inner.copy(&from_path, &to_path))
rt().block_on(self.inner.copy(&from_path, &to_path))
.map_err(PythonError::from)?;
Ok(())
}
Expand All @@ -92,16 +87,14 @@ impl DeltaFileSystemHandler {

fn delete_dir(&self, path: String) -> PyResult<()> {
let path = Self::parse_path(&path);
self.rt
.block_on(delete_dir(self.inner.as_ref(), &path))
rt().block_on(delete_dir(self.inner.as_ref(), &path))
.map_err(PythonError::from)?;
Ok(())
}

fn delete_file(&self, path: String) -> PyResult<()> {
let path = Self::parse_path(&path);
self.rt
.block_on(self.inner.delete(&path))
rt().block_on(self.inner.delete(&path))
.map_err(PythonError::from)?;
Ok(())
}
Expand All @@ -122,14 +115,13 @@ impl DeltaFileSystemHandler {
for file_path in paths {
let path = Self::parse_path(&file_path);
let listed = py.allow_threads(|| {
self.rt
.block_on(self.inner.list_with_delimiter(Some(&path)))
rt().block_on(self.inner.list_with_delimiter(Some(&path)))
.map_err(PythonError::from)
})?;

// TODO is there a better way to figure out if we are in a directory?
if listed.objects.is_empty() && listed.common_prefixes.is_empty() {
let maybe_meta = py.allow_threads(|| self.rt.block_on(self.inner.head(&path)));
let maybe_meta = py.allow_threads(|| rt().block_on(self.inner.head(&path)));
match maybe_meta {
Ok(meta) => {
let kwargs = HashMap::from([
Expand Down Expand Up @@ -186,10 +178,7 @@ impl DeltaFileSystemHandler {
};

let path = Self::parse_path(&base_dir);
let list_result = match self
.rt
.block_on(walk_tree(self.inner.clone(), &path, recursive))
{
let list_result = match rt().block_on(walk_tree(self.inner.clone(), &path, recursive)) {
Ok(res) => Ok(res),
Err(ObjectStoreError::NotFound { path, source }) => {
if allow_not_found {
Expand Down Expand Up @@ -249,8 +238,7 @@ impl DeltaFileSystemHandler {
let from_path = Self::parse_path(&src);
let to_path = Self::parse_path(&dest);
// TODO check the if not exists semantics
self.rt
.block_on(self.inner.rename(&from_path, &to_path))
rt().block_on(self.inner.rename(&from_path, &to_path))
.map_err(PythonError::from)?;
Ok(())
}
Expand All @@ -262,10 +250,8 @@ impl DeltaFileSystemHandler {
};

let path = Self::parse_path(&path);
let file = self
.rt
let file = rt()
.block_on(ObjectInputFile::try_new(
Arc::clone(&self.rt),
self.inner.clone(),
path,
size.copied(),
Expand All @@ -288,10 +274,8 @@ impl DeltaFileSystemHandler {
.map_or(DEFAULT_MAX_BUFFER_SIZE, |v| {
v.parse::<i64>().unwrap_or(DEFAULT_MAX_BUFFER_SIZE)
});
let file = self
.rt
let file = rt()
.block_on(ObjectOutputStream::try_new(
Arc::clone(&self.rt),
self.inner.clone(),
path,
max_buffer_size,
Expand All @@ -314,7 +298,6 @@ impl DeltaFileSystemHandler {
#[derive(Debug, Clone)]
pub struct ObjectInputFile {
store: Arc<DynObjectStore>,
rt: Arc<Runtime>,
path: Path,
content_length: i64,
#[pyo3(get)]
Expand All @@ -326,7 +309,6 @@ pub struct ObjectInputFile {

impl ObjectInputFile {
pub async fn try_new(
rt: Arc<Runtime>,
store: Arc<DynObjectStore>,
path: Path,
size: Option<i64>,
Expand All @@ -345,7 +327,6 @@ impl ObjectInputFile {
// https://github.com/apache/arrow/blob/f184255cbb9bf911ea2a04910f711e1a924b12b8/cpp/src/arrow/filesystem/s3fs.cc#L1083
Ok(Self {
store,
rt,
path,
content_length,
closed: false,
Expand Down Expand Up @@ -456,8 +437,7 @@ impl ObjectInputFile {
self.pos += nbytes;
let data = if nbytes > 0 {
py.allow_threads(|| {
self.rt
.block_on(self.store.get_range(&self.path, range))
rt().block_on(self.store.get_range(&self.path, range))
.map_err(PythonError::from)
})?
} else {
Expand Down Expand Up @@ -493,7 +473,6 @@ impl ObjectInputFile {
#[pyclass(weakref, module = "deltalake._internal")]
pub struct ObjectOutputStream {
store: Arc<DynObjectStore>,
rt: Arc<Runtime>,
path: Path,
writer: Box<dyn AsyncWrite + Send + Unpin>,
multipart_id: MultipartId,
Expand All @@ -508,15 +487,13 @@ pub struct ObjectOutputStream {

impl ObjectOutputStream {
pub async fn try_new(
rt: Arc<Runtime>,
store: Arc<DynObjectStore>,
path: Path,
max_buffer_size: i64,
) -> Result<Self, ObjectStoreError> {
let (multipart_id, writer) = store.put_multipart(&path).await?;
Ok(Self {
store,
rt,
path,
writer,
multipart_id,
Expand All @@ -541,11 +518,10 @@ impl ObjectOutputStream {
impl ObjectOutputStream {
fn close(&mut self, py: Python<'_>) -> PyResult<()> {
self.closed = true;
py.allow_threads(|| match self.rt.block_on(self.writer.shutdown()) {
py.allow_threads(|| match rt().block_on(self.writer.shutdown()) {
Ok(_) => Ok(()),
Err(err) => {
self.rt
.block_on(self.store.abort_multipart(&self.path, &self.multipart_id))
rt().block_on(self.store.abort_multipart(&self.path, &self.multipart_id))
.map_err(PythonError::from)?;
Err(PyIOError::new_err(err.to_string()))
}
Expand Down Expand Up @@ -597,11 +573,10 @@ impl ObjectOutputStream {
let len = data.as_bytes().len() as i64;
let py = data.py();
let data = data.as_bytes();
let res = py.allow_threads(|| match self.rt.block_on(self.writer.write_all(data)) {
let res = py.allow_threads(|| match rt().block_on(self.writer.write_all(data)) {
Ok(_) => Ok(len),
Err(err) => {
self.rt
.block_on(self.store.abort_multipart(&self.path, &self.multipart_id))
rt().block_on(self.store.abort_multipart(&self.path, &self.multipart_id))
.map_err(PythonError::from)?;
Err(PyIOError::new_err(err.to_string()))
}
Expand All @@ -615,11 +590,10 @@ impl ObjectOutputStream {
}

fn flush(&mut self, py: Python<'_>) -> PyResult<()> {
py.allow_threads(|| match self.rt.block_on(self.writer.flush()) {
py.allow_threads(|| match rt().block_on(self.writer.flush()) {
Ok(_) => Ok(()),
Err(err) => {
self.rt
.block_on(self.store.abort_multipart(&self.path, &self.multipart_id))
rt().block_on(self.store.abort_multipart(&self.path, &self.multipart_id))
.map_err(PythonError::from)?;
Err(PyIOError::new_err(err.to_string()))
}
Expand Down
Loading

0 comments on commit 9736522

Please sign in to comment.