Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rust): Use ObjectStore instead of AsyncRead in parquet get metadata #15069

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 12 additions & 132 deletions crates/polars-io/src/cloud/adaptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,134 +2,17 @@
//! This is used, for example, by the [parquet2] crate.
//!
//! [parquet2]: https://crates.io/crates/parquet2
use std::io::{self};
use std::pin::Pin;
use std::task::Poll;

use bytes::Bytes;
use futures::executor::block_on;
use futures::future::BoxFuture;
use futures::{AsyncRead, AsyncSeek, Future, TryFutureExt};
use std::sync::Arc;

use object_store::path::Path;
use object_store::MultipartId;
use polars_error::to_compute_err;
use object_store::{MultipartId, ObjectStore};
use polars_error::{to_compute_err, PolarsResult};
use tokio::io::{AsyncWrite, AsyncWriteExt};

use super::*;
use super::CloudOptions;
use crate::pl_async::get_runtime;

type OptionalFuture = Option<BoxFuture<'static, std::io::Result<Bytes>>>;

/// Adaptor to translate from AsyncSeek and AsyncRead to the object_store get_range API.
pub struct CloudReader {
// The current position in the stream, it is set by seeking and updated by reading bytes.
pos: u64,
// The total size of the object is required when seeking from the end of the file.
length: Option<u64>,
// Hold an reference to the store in a thread safe way.
object_store: Arc<dyn ObjectStore>,
// The path in the object_store of the current object being read.
path: Path,
// If a read is pending then `active` will point to its future.
active: OptionalFuture,
}

impl CloudReader {
pub fn new(length: Option<u64>, object_store: Arc<dyn ObjectStore>, path: Path) -> Self {
Self {
pos: 0,
length,
object_store,
path,
active: None,
}
}

/// For each read request we create a new future.
async fn read_operation(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
length: usize,
) -> std::task::Poll<std::io::Result<Bytes>> {
let start = self.pos as usize;

// If we already have a future just poll it.
if let Some(fut) = self.active.as_mut() {
return Future::poll(fut.as_mut(), cx);
}

// Create the future.
let future = {
let path = self.path.clone();
let object_store = self.object_store.clone();
// Use an async move block to get our owned objects.
async move {
object_store
.get_range(&path, start..start + length)
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("object store error {e:?}"),
)
})
.await
}
};
// Prepare for next read.
self.pos += length as u64;

let mut future = Box::pin(future);

// Need to poll it once to get the pump going.
let polled = Future::poll(future.as_mut(), cx);

// Save for next time.
self.active = Some(future);
polled
}
}

impl AsyncRead for CloudReader {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
// Use block_on in order to get the future result in this thread and copy the data in the output buffer.
// With this approach we keep ownership of the buffer and we don't have to pass it to the future runtime.
match block_on(self.read_operation(cx, buf.len())) {
Poll::Ready(Ok(bytes)) => {
buf.copy_from_slice(bytes.as_ref());
Poll::Ready(Ok(bytes.len()))
},
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
}
}

impl AsyncSeek for CloudReader {
fn poll_seek(
mut self: Pin<&mut Self>,
_: &mut std::task::Context<'_>,
pos: io::SeekFrom,
) -> std::task::Poll<std::io::Result<u64>> {
match pos {
io::SeekFrom::Start(pos) => self.pos = pos,
io::SeekFrom::End(pos) => {
let length = self.length.ok_or::<io::Error>(io::Error::new(
std::io::ErrorKind::Other,
"Cannot seek from end of stream when length is unknown.",
))?;
self.pos = (length as i64 + pos) as u64
},
io::SeekFrom::Current(pos) => self.pos = (self.pos as i64 + pos) as u64,
};
self.active = None;
std::task::Poll::Ready(Ok(self.pos))
}
}

/// Adaptor which wraps the asynchronous interface of [ObjectStore::put_multipart](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#tymethod.put_multipart)
/// exposing a synchronous interface which implements `std::io::Write`.
///
Expand All @@ -156,16 +39,13 @@ impl CloudWriter {
object_store: Arc<dyn ObjectStore>,
path: Path,
) -> PolarsResult<Self> {
let build_result = Self::build_writer(&object_store, &path).await;
match build_result {
Err(error) => Err(PolarsError::from(error)),
Ok((multipart_id, writer)) => Ok(CloudWriter {
object_store,
path,
multipart_id,
writer,
}),
}
let (multipart_id, writer) = Self::build_writer(&object_store, &path).await?;
Ok(CloudWriter {
object_store,
path,
multipart_id,
writer,
})
}

/// Constructs a new CloudWriter from a path and an optional set of CloudOptions.
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-io/src/cloud/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use polars_core::error::to_compute_err;
use polars_core::prelude::{polars_ensure, polars_err};
use polars_error::{PolarsError, PolarsResult};
use regex::Regex;
use url::Url;

use super::*;
use super::CloudOptions;

const DELIMITER: char = '/';

Expand Down
21 changes: 8 additions & 13 deletions crates/polars-io/src/cloud/mod.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,24 @@
//! Interface with cloud storage through the object_store crate.

#[cfg(feature = "cloud")]
use std::borrow::Cow;
mod adaptors;
#[cfg(feature = "cloud")]
use std::sync::Arc;
pub use adaptors::*;

#[cfg(feature = "cloud")]
use object_store::local::LocalFileSystem;
mod polars_object_store;
#[cfg(feature = "cloud")]
use object_store::ObjectStore;
#[cfg(feature = "cloud")]
use polars_core::prelude::{polars_bail, PolarsError, PolarsResult};
pub use polars_object_store::*;

#[cfg(feature = "cloud")]
mod adaptors;
#[cfg(feature = "cloud")]
mod glob;
#[cfg(feature = "cloud")]
mod object_store_setup;
pub mod options;
pub use glob::*;

#[cfg(feature = "cloud")]
pub use adaptors::*;
#[cfg(feature = "cloud")]
pub use glob::*;
mod object_store_setup;
#[cfg(feature = "cloud")]
pub use object_store_setup::*;

pub mod options;
pub use options::*;
25 changes: 17 additions & 8 deletions crates/polars-io/src/cloud/object_store_setup.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::sync::Arc;

use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
use once_cell::sync::Lazy;
pub use options::*;
use polars_error::to_compute_err;
use polars_error::{polars_bail, to_compute_err, PolarsError, PolarsResult};
use polars_utils::aliases::PlHashMap;
use tokio::sync::RwLock;
use url::Url;

use super::*;
use super::{parse_url, CloudLocation, CloudOptions, CloudType};

/// Object stores must be cached. Every object-store will do DNS lookups and
/// get rate limited when querying the DNS (can take up to 5s).
Expand Down Expand Up @@ -35,7 +38,14 @@ fn url_to_key(url: &Url) -> String {
}

/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.
pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> BuildResult {
pub async fn build_object_store(
url: &str,
#[cfg_attr(
not(any(feature = "aws", feature = "gcp", feature = "azure")),
allow(unused_variables)
)]
options: Option<&CloudOptions>,
) -> BuildResult {
let parsed = parse_url(url).map_err(to_compute_err)?;
let cloud_location = CloudLocation::from_url(&parsed)?;

Expand All @@ -48,9 +58,8 @@ pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> Bu
}
}

let options = options
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned(Default::default()));
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
let options = options.map(std::borrow::Cow::Borrowed).unwrap_or_default();

let cloud_type = CloudType::from_url(&parsed)?;
let store = match cloud_type {
Expand Down Expand Up @@ -93,7 +102,7 @@ pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> Bu
{
let store = object_store::http::HttpBuilder::new()
.with_url(url)
.with_client_options(get_client_options())
.with_client_options(super::get_client_options())
.build()?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
}
Expand Down
14 changes: 6 additions & 8 deletions crates/polars-io/src/cloud/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ use object_store::gcp::GoogleCloudStorageBuilder;
pub use object_store::gcp::GoogleConfigKey;
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
use object_store::ClientOptions;
#[cfg(feature = "cloud")]
use object_store::ObjectStore;
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
use object_store::{BackoffConfig, RetryConfig};
#[cfg(feature = "aws")]
Expand Down Expand Up @@ -226,9 +224,9 @@ impl CloudOptions {
self
}

/// Build the [`ObjectStore`] implementation for AWS.
/// Build the [`object_store::ObjectStore`] implementation for AWS.
#[cfg(feature = "aws")]
pub async fn build_aws(&self, url: &str) -> PolarsResult<impl ObjectStore> {
pub async fn build_aws(&self, url: &str) -> PolarsResult<impl object_store::ObjectStore> {
let options = self.aws.as_ref();
let mut builder = AmazonS3Builder::from_env().with_url(url);
if let Some(options) = options {
Expand Down Expand Up @@ -329,9 +327,9 @@ impl CloudOptions {
self
}

/// Build the [`ObjectStore`] implementation for Azure.
/// Build the [`object_store::ObjectStore`] implementation for Azure.
#[cfg(feature = "azure")]
pub fn build_azure(&self, url: &str) -> PolarsResult<impl ObjectStore> {
pub fn build_azure(&self, url: &str) -> PolarsResult<impl object_store::ObjectStore> {
let options = self.azure.as_ref();
let mut builder = MicrosoftAzureBuilder::from_env();
if let Some(options) = options {
Expand Down Expand Up @@ -363,9 +361,9 @@ impl CloudOptions {
self
}

/// Build the [`ObjectStore`] implementation for GCP.
/// Build the [`object_store::ObjectStore`] implementation for GCP.
#[cfg(feature = "gcp")]
pub fn build_gcp(&self, url: &str) -> PolarsResult<impl ObjectStore> {
pub fn build_gcp(&self, url: &str) -> PolarsResult<impl object_store::ObjectStore> {
let options = self.gcp.as_ref();
let mut builder = GoogleCloudStorageBuilder::from_env();
if let Some(options) = options {
Expand Down
61 changes: 61 additions & 0 deletions crates/polars-io/src/cloud/polars_object_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::ops::Range;
use std::sync::Arc;

use bytes::Bytes;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use polars_error::{to_compute_err, PolarsResult};

use crate::pl_async::{
tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST,
};

/// Polars specific wrapper for `Arc<dyn ObjectStore>` that limits the number of
/// concurrent requests for the entire application.
#[derive(Debug, Clone)]
pub struct PolarsObjectStore(Arc<dyn ObjectStore>);

impl PolarsObjectStore {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self(store)
}

pub async fn get(&self, path: &Path) -> PolarsResult<Bytes> {
tune_with_concurrency_budget(1, || async {
self.0
.get(path)
.await
.map_err(to_compute_err)?
.bytes()
.await
.map_err(to_compute_err)
})
.await
}

pub async fn get_range(&self, path: &Path, range: Range<usize>) -> PolarsResult<Bytes> {
tune_with_concurrency_budget(1, || self.0.get_range(path, range))
.await
.map_err(to_compute_err)
}

pub async fn get_ranges(
&self,
path: &Path,
ranges: &[Range<usize>],
) -> PolarsResult<Vec<Bytes>> {
tune_with_concurrency_budget(
(ranges.len() as u32).clamp(0, MAX_BUDGET_PER_REQUEST as u32),
|| self.0.get_ranges(path, ranges),
)
.await
.map_err(to_compute_err)
}

/// Fetch the metadata of the parquet file, do not memoize it.
pub async fn head(&self, path: &Path) -> PolarsResult<ObjectMeta> {
with_concurrency_budget(1, || self.0.head(path))
.await
.map_err(to_compute_err)
}
}
Loading
Loading