Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge 53.0.0-dev dev branch to main #6126

Merged
merged 14 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,16 +356,29 @@ impl Buffer {
}
}

/// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly
/// allocated memory region.
impl<T: AsRef<[u8]>> From<T> for Buffer {
fn from(p: T) -> Self {
// allocate aligned memory buffer
let slice = p.as_ref();
let len = slice.len();
let mut buffer = MutableBuffer::new(len);
buffer.extend_from_slice(slice);
buffer.into()
/// Note that here we deliberately do not implement
/// `impl<T: AsRef<[u8]>> From<T> for Buffer`
/// As it would accept `Buffer::from(vec![...])` that would cause an unexpected copy.
/// Instead, we ask user to be explicit when copying is occurring, e.g., `Buffer::from(vec![...].to_byte_slice())`.
/// For zero-copy conversion, user should use `Buffer::from_vec(vec![...])`.
///
/// Since we removed impl for `AsRef<u8>`, we added the following three specific implementations to reduce API breakage.
/// See <https://github.com/apache/arrow-rs/issues/6033> for more discussion on this.
impl From<&[u8]> for Buffer {
fn from(p: &[u8]) -> Self {
Self::from_slice_ref(p)
}
}

impl<const N: usize> From<[u8; N]> for Buffer {
fn from(p: [u8; N]) -> Self {
Self::from_slice_ref(p)
}
}

impl<const N: usize> From<&[u8; N]> for Buffer {
fn from(p: &[u8; N]) -> Self {
Self::from_slice_ref(p)
}
}

Expand Down
26 changes: 13 additions & 13 deletions arrow-cast/src/cast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4409,8 +4409,8 @@ mod tests {
IntervalUnit::YearMonth,
IntervalYearMonthArray,
vec![
Some("1 years 1 mons 0 days 0 hours 0 mins 0.00 secs"),
Some("2 years 7 mons 0 days 0 hours 0 mins 0.00 secs"),
Some("1 years 1 mons"),
Some("2 years 7 mons"),
None,
None,
None,
Expand All @@ -4433,9 +4433,9 @@ mod tests {
IntervalUnit::DayTime,
IntervalDayTimeArray,
vec![
Some("0 years 0 mons 390 days 0 hours 0 mins 0.000 secs"),
Some("0 years 0 mons 930 days 0 hours 0 mins 0.000 secs"),
Some("0 years 0 mons 30 days 0 hours 0 mins 0.000 secs"),
Some("390 days"),
Some("930 days"),
Some("30 days"),
None,
None,
]
Expand All @@ -4461,16 +4461,16 @@ mod tests {
IntervalUnit::MonthDayNano,
IntervalMonthDayNanoArray,
vec![
Some("0 years 13 mons 1 days 0 hours 0 mins 0.000000000 secs"),
Some("13 mons 1 days"),
None,
Some("0 years 31 mons 35 days 0 hours 0 mins 0.001400000 secs"),
Some("0 years 0 mons 3 days 0 hours 0 mins 0.000000000 secs"),
Some("0 years 0 mons 0 days 0 hours 0 mins 8.000000000 secs"),
Some("31 mons 35 days 0.001400000 secs"),
Some("3 days"),
Some("8.000000000 secs"),
None,
Some("0 years 0 mons 1 days 0 hours 0 mins 29.800000000 secs"),
Some("0 years 3 mons 0 days 0 hours 0 mins 1.000000000 secs"),
Some("0 years 0 mons 0 days 0 hours 8 mins 0.000000000 secs"),
Some("0 years 63 mons 9 days 19 hours 9 mins 2.222000000 secs"),
Some("1 days 29.800000000 secs"),
Some("3 mons 1.000000000 secs"),
Some("8 mins"),
Some("63 mons 9 days 19 hours 9 mins 2.222000000 secs"),
None,
]
);
Expand Down
155 changes: 115 additions & 40 deletions arrow-cast/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,73 +654,148 @@ impl<'a> DisplayIndex for &'a PrimitiveArray<IntervalYearMonthType> {
let years = (interval / 12_f64).floor();
let month = interval - (years * 12_f64);

write!(
f,
"{years} years {month} mons 0 days 0 hours 0 mins 0.00 secs",
)?;
write!(f, "{years} years {month} mons",)?;
Ok(())
}
}

impl<'a> DisplayIndex for &'a PrimitiveArray<IntervalDayTimeType> {
fn write(&self, idx: usize, f: &mut dyn Write) -> FormatResult {
let value = self.value(idx);
let mut prefix = "";

let secs = value.milliseconds / 1_000;
if value.days != 0 {
write!(f, "{prefix}{} days", value.days)?;
prefix = " ";
}

if value.milliseconds != 0 {
let millis_fmt = MillisecondsFormatter {
milliseconds: value.milliseconds,
prefix,
};

f.write_fmt(format_args!("{millis_fmt}"))?;
}

Ok(())
}
}

impl<'a> DisplayIndex for &'a PrimitiveArray<IntervalMonthDayNanoType> {
fn write(&self, idx: usize, f: &mut dyn Write) -> FormatResult {
let value = self.value(idx);
let mut prefix = "";

if value.months != 0 {
write!(f, "{prefix}{} mons", value.months)?;
prefix = " ";
}

if value.days != 0 {
write!(f, "{prefix}{} days", value.days)?;
prefix = " ";
}

if value.nanoseconds != 0 {
let nano_fmt = NanosecondsFormatter {
nanoseconds: value.nanoseconds,
prefix,
};
f.write_fmt(format_args!("{nano_fmt}"))?;
}

Ok(())
}
}

struct NanosecondsFormatter<'a> {
nanoseconds: i64,
prefix: &'a str,
}

impl<'a> Display for NanosecondsFormatter<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut prefix = self.prefix;

let secs = self.nanoseconds / 1_000_000_000;
let mins = secs / 60;
let hours = mins / 60;

let secs = secs - (mins * 60);
let mins = mins - (hours * 60);

let milliseconds = value.milliseconds % 1_000;
let nanoseconds = self.nanoseconds % 1_000_000_000;

let secs_sign = if secs < 0 || milliseconds < 0 {
"-"
} else {
""
};
if hours != 0 {
write!(f, "{prefix}{} hours", hours)?;
prefix = " ";
}

if mins != 0 {
write!(f, "{prefix}{} mins", mins)?;
prefix = " ";
}

if secs != 0 || nanoseconds != 0 {
let secs_sign = if secs < 0 || nanoseconds < 0 { "-" } else { "" };
write!(
f,
"{prefix}{}{}.{:09} secs",
secs_sign,
secs.abs(),
nanoseconds.abs()
)?;
}

write!(
f,
"0 years 0 mons {} days {} hours {} mins {}{}.{:03} secs",
value.days,
hours,
mins,
secs_sign,
secs.abs(),
milliseconds.abs(),
)?;
Ok(())
}
}

impl<'a> DisplayIndex for &'a PrimitiveArray<IntervalMonthDayNanoType> {
fn write(&self, idx: usize, f: &mut dyn Write) -> FormatResult {
let value = self.value(idx);
struct MillisecondsFormatter<'a> {
milliseconds: i32,
prefix: &'a str,
}

impl<'a> Display for MillisecondsFormatter<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut prefix = self.prefix;

let secs = value.nanoseconds / 1_000_000_000;
let secs = self.milliseconds / 1_000;
let mins = secs / 60;
let hours = mins / 60;

let secs = secs - (mins * 60);
let mins = mins - (hours * 60);

let nanoseconds = value.nanoseconds % 1_000_000_000;

let secs_sign = if secs < 0 || nanoseconds < 0 { "-" } else { "" };

write!(
f,
"0 years {} mons {} days {} hours {} mins {}{}.{:09} secs",
value.months,
value.days,
hours,
mins,
secs_sign,
secs.abs(),
nanoseconds.abs(),
)?;
let milliseconds = self.milliseconds % 1_000;

if hours != 0 {
write!(f, "{prefix}{} hours", hours,)?;
prefix = " ";
}

if mins != 0 {
write!(f, "{prefix}{} mins", mins,)?;
prefix = " ";
}

if secs != 0 || milliseconds != 0 {
let secs_sign = if secs < 0 || milliseconds < 0 {
"-"
} else {
""
};

write!(
f,
"{prefix}{}{}.{:03} secs",
secs_sign,
secs.abs(),
milliseconds.abs()
)?;
}

Ok(())
}
}
Expand Down
54 changes: 27 additions & 27 deletions arrow-cast/src/pretty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -986,16 +986,16 @@ mod tests {
let table = pretty_format_batches(&[batch]).unwrap().to_string();

let expected = vec![
"+----------------------------------------------------+",
"| IntervalDayTime |",
"+----------------------------------------------------+",
"| 0 years 0 mons -1 days 0 hours -10 mins 0.000 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins -1.001 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins -0.001 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.001 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.010 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.100 secs |",
"+----------------------------------------------------+",
"+------------------+",
"| IntervalDayTime |",
"+------------------+",
"| -1 days -10 mins |",
"| -1.001 secs |",
"| -0.001 secs |",
"| 0.001 secs |",
"| 0.010 secs |",
"| 0.100 secs |",
"+------------------+",
];

let actual: Vec<&str> = table.lines().collect();
Expand Down Expand Up @@ -1032,23 +1032,23 @@ mod tests {
let table = pretty_format_batches(&[batch]).unwrap().to_string();

let expected = vec![
"+-----------------------------------------------------------+",
"| IntervalMonthDayNano |",
"+-----------------------------------------------------------+",
"| 0 years -1 mons -1 days 0 hours -10 mins 0.000000000 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins -1.000000001 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins -0.000000001 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.000000001 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.000000010 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.000000100 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.000001000 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.000010000 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.000100000 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.001000000 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.010000000 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 0.100000000 secs |",
"| 0 years 0 mons 0 days 0 hours 0 mins 1.000000000 secs |",
"+-----------------------------------------------------------+",
"+--------------------------+",
"| IntervalMonthDayNano |",
"+--------------------------+",
"| -1 mons -1 days -10 mins |",
"| -1.000000001 secs |",
"| -0.000000001 secs |",
"| 0.000000001 secs |",
"| 0.000000010 secs |",
"| 0.000000100 secs |",
"| 0.000001000 secs |",
"| 0.000010000 secs |",
"| 0.000100000 secs |",
"| 0.001000000 secs |",
"| 0.010000000 secs |",
"| 0.100000000 secs |",
"| 1.000000000 secs |",
"+--------------------------+",
];

let actual: Vec<&str> = table.lines().collect();
Expand Down
11 changes: 6 additions & 5 deletions arrow-flight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ bytes = { version = "1", default-features = false }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
once_cell = { version = "1", optional = true }
paste = { version = "1.0" }
prost = { version = "0.12.3", default-features = false, features = ["prost-derive"] }
prost = { version = "0.13.1", default-features = false, features = ["prost-derive"] }
# For Timestamp type
prost-types = { version = "0.12.3", default-features = false }
prost-types = { version = "0.13.1", default-features = false }
tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "rt-multi-thread"] }
tonic = { version = "0.11.0", default-features = false, features = ["transport", "codegen", "prost"] }
tonic = { version = "0.12.0", default-features = false, features = ["transport", "codegen", "prost"] }

# CLI-related dependencies
anyhow = { version = "1.0", optional = true }
Expand All @@ -70,8 +70,9 @@ cli = ["anyhow", "arrow-cast/prettyprint", "clap", "tracing-log", "tracing-subsc
[dev-dependencies]
arrow-cast = { workspace = true, features = ["prettyprint"] }
assert_cmd = "2.0.8"
http = "0.2.9"
http-body = "0.4.5"
http = "1.1.0"
http-body = "1.0.0"
hyper-util = "0.1"
pin-project-lite = "0.2"
tempfile = "3.3"
tokio-stream = { version = "0.1", features = ["net"] }
Expand Down
6 changes: 4 additions & 2 deletions arrow-flight/examples/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,8 @@ impl ProstMessageExt for FetchResults {
#[cfg(test)]
mod tests {
use super::*;
use futures::TryStreamExt;
use futures::{TryFutureExt, TryStreamExt};
use hyper_util::rt::TokioIo;
use std::fs;
use std::future::Future;
use std::net::SocketAddr;
Expand Down Expand Up @@ -843,7 +844,8 @@ mod tests {
.serve_with_incoming(stream);

let request_future = async {
let connector = service_fn(move |_| UnixStream::connect(path.clone()));
let connector =
service_fn(move |_| UnixStream::connect(path.clone()).map_ok(TokioIo::new));
let channel = Endpoint::try_from("http://example.com")
.unwrap()
.connect_with_connector(connector)
Expand Down
Loading
Loading