Skip to content

Commit

Permalink
object_store: Add utilities for suffix requests
Browse files Browse the repository at this point in the history
  • Loading branch information
clbarnes committed Dec 12, 2023
1 parent f7341d1 commit 03af42f
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 21 deletions.
61 changes: 40 additions & 21 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
use crate::util::{concrete_range, HttpRange};
use crate::{Error, GetOptions, GetResult};
use crate::{GetResultPayload, Result};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{StreamExt, TryStreamExt};
use reqwest::Response;

Expand All @@ -40,30 +42,47 @@ pub trait GetClientExt {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
}

pub(crate) fn response_to_get_result<T: GetClient>(
response: Response,
location: &Path,
range: Option<HttpRange>,
) -> Result<GetResult> {
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| {
Error::Generic {
store: T::STORE,
source: Box::new(e),
}
})?;

let stream = response
.bytes_stream()
.map_err(|source| Error::Generic {
store: T::STORE,
source: Box::new(source),
})
.boxed();

Ok(GetResult {
range: concrete_range(range, meta.size),
payload: GetResultPayload::Stream(stream),
meta,
})
}

#[async_trait]
impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let range = options.range.clone();
let range = options.range.clone().map(|r| HttpRange::from(r));
let response = self.get_request(location, options).await?;
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| {
Error::Generic {
store: T::STORE,
source: Box::new(e),
}
})?;

let stream = response
.bytes_stream()
.map_err(|source| Error::Generic {
store: T::STORE,
source: Box::new(source),
})
.boxed();

Ok(GetResult {
range: range.unwrap_or(0..meta.size),
payload: GetResultPayload::Stream(stream),
meta,
})
response_to_get_result::<T>(response, location, range)
}
}

/// This trait is a bodge to allow suffix requests without breaking the user-facing API.
///
/// See https://github.com/apache/arrow-rs/issues/4611 for discussion.
#[async_trait]
pub trait GetSuffixClient {
/// Get the last `nbytes` of a resource.
async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes>;
}
5 changes: 5 additions & 0 deletions object_store/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,11 @@ impl ClientOptions {
}
}

pub(crate) fn with_suffix_header(builder: RequestBuilder, nbytes: usize) -> RequestBuilder {
let range = format!("bytes=-{nbytes}");
builder.header(hyper::header::RANGE, range)
}

pub trait GetOptionsExt {
fn with_get_options(self, options: GetOptions) -> Self;
}
Expand Down
136 changes: 136 additions & 0 deletions object_store/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
// under the License.

//! Common logic for interacting with remote object stores
use std::{
fmt::Display,
ops::{Range, RangeBounds},
};

use super::Result;
use bytes::Bytes;
use futures::{stream::StreamExt, Stream, TryStreamExt};
Expand Down Expand Up @@ -167,6 +172,116 @@ fn merge_ranges(ranges: &[std::ops::Range<usize>], coalesce: usize) -> Vec<std::
ret
}

// Fully-featured HttpRange etc. implementation here https://github.com/clbarnes/arrow-rs/blob/httpsuff/object_store/src/util.rs

/// A single range in a `Range` request.
///
/// These can be created from [usize] ranges, like
///
/// ```rust
/// # use byteranges::request::HttpRange;
/// let range1: HttpRange = (50..150).into();
/// let range2: HttpRange = (50..=150).into();
/// let range3: HttpRange = (50..).into();
/// let range4: HttpRange = (..150).into();
/// ```
///
/// At present, this is only used internally.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(crate) enum HttpRange {
/// A range with a given first and last bytes.
/// If `last > first`, the response should be empty.
Range {
/// Offset of the first requested byte (0-based).
first: usize,
/// Offset of the last requested byte; e.g. the range `0-0` will request 1 byte.
/// If [None], read to end of resource.
last: usize,
},
/// A range defined only by the first byte requested (requests all remaining bytes).
Offset {
/// Offset of the first byte requested.
first: usize,
},
/// A range defined as the number of bytes at the end of the resource.
Suffix {
/// Number of bytes requested.
nbytes: usize,
},
}

impl HttpRange {
/// Create a new range which only has an offset.
pub fn new_offset(first: usize) -> Self {
Self::Offset { first }
}

/// Create a new range with a start and end point.
pub fn new_range(first: usize, last: usize) -> Self {
Self::Range { first, last }
}

/// Create a new suffix, requesting the last `nbytes` bytes of the resource.
pub fn new_suffix(nbytes: usize) -> Self {
Self::Suffix { nbytes }
}

/// The index of the first byte requested ([None] for suffix).
pub fn first_byte(&self) -> Option<usize> {
match self {
HttpRange::Range { first, .. } => Some(*first),
HttpRange::Offset { first } => Some(*first),
HttpRange::Suffix { .. } => None,
}
}

/// The index of the last byte requested ([None] for offset or suffix).
pub fn last_byte(&self) -> Option<usize> {
match self {
HttpRange::Range { first: _, last } => Some(*last),
HttpRange::Offset { .. } => None,
HttpRange::Suffix { .. } => None,
}
}
}

impl<T: RangeBounds<usize>> From<T> for HttpRange {
fn from(value: T) -> Self {
use std::ops::Bound::*;
let first = match value.start_bound() {
Included(i) => *i,
Excluded(i) => i.saturating_add(1),
Unbounded => 0,
};
match value.end_bound() {
Included(i) => HttpRange::new_range(first, *i),
Excluded(i) => HttpRange::new_range(first, i.saturating_sub(1)),
Unbounded => HttpRange::new_offset(first),
}
}
}

impl Display for HttpRange {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
HttpRange::Range { first, last } => f.write_fmt(format_args!("{first}-{last}")),
HttpRange::Offset { first } => f.write_fmt(format_args!("{first}-")),
HttpRange::Suffix { nbytes } => f.write_fmt(format_args!("-{nbytes}")),
}
}
}

pub(crate) fn concrete_range(range: Option<HttpRange>, size: usize) -> Range<usize> {
let Some(r) = range else {
return 0..size;
};
let start = r.first_byte().unwrap_or(0);
let end = r
.last_byte()
.map_or(size, |b| b.saturating_add(1).min(size));
start..end
}

#[cfg(test)]
mod tests {
use crate::Error;
Expand Down Expand Up @@ -269,4 +384,25 @@ mod tests {
}
}
}
#[test]

fn http_range_str() {
assert_eq!(HttpRange::new_offset(0).to_string(), "0-");
assert_eq!(HttpRange::new_range(10, 20).to_string(), "10-20");
assert_eq!(HttpRange::new_suffix(10).to_string(), "-10");
}

#[test]
fn http_range_from() {
assert_eq!(
Into::<HttpRange>::into(10..15),
HttpRange::new_range(10, 14),
);
assert_eq!(
Into::<HttpRange>::into(10..=15),
HttpRange::new_range(10, 15),
);
assert_eq!(Into::<HttpRange>::into(10..), HttpRange::new_offset(10),);
assert_eq!(Into::<HttpRange>::into(..=15), HttpRange::new_range(0, 15));
}
}

0 comments on commit 03af42f

Please sign in to comment.