From f620f35d16738fe99f952b33bd94d9b3f3b64321 Mon Sep 17 00:00:00 2001 From: Nikolay Ulmasov Date: Sat, 25 Nov 2023 21:31:55 +0000 Subject: [PATCH] add buffer flushing to filesystem write Signed-off-by: Nikolay Ulmasov --- python/src/filesystem.rs | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index a8bfb6668a..81aa87587f 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -13,6 +13,8 @@ use tokio::runtime::Runtime; use crate::error::PythonError; use crate::utils::{delete_dir, rt, walk_tree}; +const DEFAULT_MAX_BUFFER_SIZE: i64 = 4 * 1024 * 1024; + #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct FsConfig { pub(crate) root_url: String, @@ -279,12 +281,20 @@ impl DeltaFileSystemHandler { #[allow(unused)] metadata: Option>, ) -> PyResult { let path = Self::parse_path(&path); + let max_buffer_size = self + .config + .options + .get("max_buffer_size") + .map_or(DEFAULT_MAX_BUFFER_SIZE, |v| { + v.parse::().unwrap_or(DEFAULT_MAX_BUFFER_SIZE) + }); let file = self .rt .block_on(ObjectOutputStream::try_new( Arc::clone(&self.rt), self.inner.clone(), path, + max_buffer_size, )) .map_err(PythonError::from)?; Ok(file) @@ -492,6 +502,8 @@ pub struct ObjectOutputStream { closed: bool, #[pyo3(get)] mode: String, + max_buffer_size: i64, + buffer_size: i64, } impl ObjectOutputStream { @@ -499,6 +511,7 @@ impl ObjectOutputStream { rt: Arc, store: Arc, path: Path, + max_buffer_size: i64, ) -> Result { let (multipart_id, writer) = store.put_multipart(&path).await?; Ok(Self { @@ -510,6 +523,8 @@ impl ObjectOutputStream { pos: 0, closed: false, mode: "wb".into(), + max_buffer_size: max_buffer_size, + buffer_size: 0, }) } @@ -582,7 +597,7 @@ impl ObjectOutputStream { let len = data.as_bytes().len() as i64; let py = data.py(); let data = data.as_bytes(); - py.allow_threads(|| match self.rt.block_on(self.writer.write_all(data)) { + let res = py.allow_threads(|| match self.rt.block_on(self.writer.write_all(data)) { Ok(_) => Ok(len), Err(err) => { self.rt @@ -590,7 +605,13 @@ impl ObjectOutputStream { .map_err(PythonError::from)?; Err(PyIOError::new_err(err.to_string())) } - }) + })?; + self.buffer_size += len; + if self.buffer_size >= self.max_buffer_size { + let _ = self.flush(py); + self.buffer_size = 0; + } + Ok(res) } fn flush(&mut self, py: Python<'_>) -> PyResult<()> {