Skip to content

Commit

Permalink
add buffer flushing to filesystem write
Browse files Browse the repository at this point in the history
Signed-off-by: Nikolay Ulmasov <[email protected]>
  • Loading branch information
r3stl355 committed Nov 25, 2023
1 parent fa6c513 commit f620f35
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use tokio::runtime::Runtime;
use crate::error::PythonError;
use crate::utils::{delete_dir, rt, walk_tree};

const DEFAULT_MAX_BUFFER_SIZE: i64 = 4 * 1024 * 1024;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct FsConfig {
pub(crate) root_url: String,
Expand Down Expand Up @@ -279,12 +281,20 @@ impl DeltaFileSystemHandler {
#[allow(unused)] metadata: Option<HashMap<String, String>>,
) -> PyResult<ObjectOutputStream> {
let path = Self::parse_path(&path);
let max_buffer_size = self
.config
.options
.get("max_buffer_size")
.map_or(DEFAULT_MAX_BUFFER_SIZE, |v| {
v.parse::<i64>().unwrap_or(DEFAULT_MAX_BUFFER_SIZE)
});
let file = self
.rt
.block_on(ObjectOutputStream::try_new(
Arc::clone(&self.rt),
self.inner.clone(),
path,
max_buffer_size,
))
.map_err(PythonError::from)?;
Ok(file)
Expand Down Expand Up @@ -492,13 +502,16 @@ pub struct ObjectOutputStream {
closed: bool,
#[pyo3(get)]
mode: String,
max_buffer_size: i64,
buffer_size: i64,
}

impl ObjectOutputStream {
pub async fn try_new(
rt: Arc<Runtime>,
store: Arc<DynObjectStore>,
path: Path,
max_buffer_size: i64,
) -> Result<Self, ObjectStoreError> {
let (multipart_id, writer) = store.put_multipart(&path).await?;
Ok(Self {
Expand All @@ -510,6 +523,8 @@ impl ObjectOutputStream {
pos: 0,
closed: false,
mode: "wb".into(),
max_buffer_size: max_buffer_size,
buffer_size: 0,
})
}

Expand Down Expand Up @@ -582,15 +597,21 @@ impl ObjectOutputStream {
let len = data.as_bytes().len() as i64;
let py = data.py();
let data = data.as_bytes();
py.allow_threads(|| match self.rt.block_on(self.writer.write_all(data)) {
let res = py.allow_threads(|| match self.rt.block_on(self.writer.write_all(data)) {
Ok(_) => Ok(len),
Err(err) => {
self.rt
.block_on(self.store.abort_multipart(&self.path, &self.multipart_id))
.map_err(PythonError::from)?;
Err(PyIOError::new_err(err.to_string()))
}
})
})?;
self.buffer_size += len;
if self.buffer_size >= self.max_buffer_size {
let _ = self.flush(py);
self.buffer_size = 0;
}
Ok(res)
}

fn flush(&mut self, py: Python<'_>) -> PyResult<()> {
Expand Down

0 comments on commit f620f35

Please sign in to comment.