diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 9614ed2e5688..113935007416 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -92,19 +92,31 @@ export ARROW_TEST_DATA=$(cd ../testing/data; pwd) From here on, this is a pure Rust project and `cargo` can be used to run tests, benchmarks, docs and examples as usual. -### Running the tests +## Running the tests Run tests using the Rust standard `cargo test` command: ```bash -# run all tests. +# run all unit and integration tests cargo test - -# run only tests for the arrow crate +# run tests for the arrow crate cargo test -p arrow ``` +For some changes, you may want to run additional tests. You can find up-to-date information on the current CI tests in [.github/workflows](https://github.com/apache/arrow-rs/tree/master/.github/workflows). Here are some examples of additional tests you may want to run: + +```bash +# run tests for the parquet crate +cargo test -p parquet + +# run arrow tests with all features enabled +cargo test -p arrow --all-features + +# run the doc tests +cargo test --doc +``` + ## Code Formatting Our CI uses `rustfmt` to check code formatting. Before submitting a @@ -118,10 +130,19 @@ cargo +stable fmt --all -- --check We recommend using `clippy` for checking lints during development. While we do not yet enforce `clippy` checks, we recommend not introducing new `clippy` errors or warnings. -Run the following to check for clippy lints. +Run the following to check for `clippy` lints: ```bash +# run clippy with default settings cargo clippy + +``` + +More comprehensive `clippy` checks can be run by adding flags: + +```bash +# run clippy on the arrow crate with all features enabled, targeting all tests, examples, and benchmarks +cargo clippy -p arrow --all-features --all-targets ``` If you use Visual Studio Code with the `rust-analyzer` plugin, you can enable `clippy` to run each time you save a file. See https://users.rust-lang.org/t/how-to-use-clippy-in-vs-code-with-rust-analyzer/41881. @@ -134,6 +155,33 @@ Search for `allow(clippy::` in the codebase to identify lints that are ignored/a - If you have several lints on a function or module, you may disable the lint on the function or module. - If a lint is pervasive across multiple modules, you may disable it at the crate level. +## Running Benchmarks + +Running benchmarks are a good way to test the performance of a change. As benchmarks usually take a long time to run, we recommend running targeted tests instead of the full suite. + +```bash +# run all benchmarks +cargo bench + +# run arrow benchmarks +cargo bench -p arrow + +# run benchmark for the parse_time function within the arrow-cast crate +cargo bench -p arrow-cast --bench parse_time +``` + +To set the baseline for your benchmarks, use the --save-baseline flag: + +```bash +git checkout master + +cargo bench --bench parse_time -- --save-baseline master + +git checkout feature + +cargo bench --bench parse_time -- --baseline master +``` + ## Git Pre-Commit Hook We can use [git pre-commit hook](https://git-scm.com/book/en/v2/Customizing-Git-Git-Hooks) to automate various kinds of git pre-commit checking/formatting. diff --git a/arrow-array/src/array/primitive_array.rs b/arrow-array/src/array/primitive_array.rs index a800aa6bf924..03a48609fbab 100644 --- a/arrow-array/src/array/primitive_array.rs +++ b/arrow-array/src/array/primitive_array.rs @@ -1557,7 +1557,10 @@ mod tests { // roundtrip to and from datetime assert_eq!( 1550902545147, - arr.value_as_datetime(i).unwrap().timestamp_millis() + arr.value_as_datetime(i) + .unwrap() + .and_utc() + .timestamp_millis() ); } else { assert!(arr.is_null(i)); diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index d89020a65681..314445bba617 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -236,6 +236,11 @@ impl RecordBatch { self.schema.clone() } + /// Returns a reference to the [`Schema`] of the record batch. + pub fn schema_ref(&self) -> &SchemaRef { + &self.schema + } + /// Projects the schema onto the specified columns pub fn project(&self, indices: &[usize]) -> Result { let projected_schema = self.schema.project(indices)?; diff --git a/arrow-array/src/temporal_conversions.rs b/arrow-array/src/temporal_conversions.rs index e0edcc9bc182..8d238b3a196c 100644 --- a/arrow-array/src/temporal_conversions.rs +++ b/arrow-array/src/temporal_conversions.rs @@ -43,7 +43,7 @@ pub const EPOCH_DAYS_FROM_CE: i32 = 719_163; /// converts a `i32` representing a `date32` to [`NaiveDateTime`] #[inline] pub fn date32_to_datetime(v: i32) -> Option { - NaiveDateTime::from_timestamp_opt(v as i64 * SECONDS_IN_DAY, 0) + Some(DateTime::from_timestamp(v as i64 * SECONDS_IN_DAY, 0)?.naive_utc()) } /// converts a `i64` representing a `date64` to [`NaiveDateTime`] @@ -51,12 +51,13 @@ pub fn date32_to_datetime(v: i32) -> Option { pub fn date64_to_datetime(v: i64) -> Option { let (sec, milli_sec) = split_second(v, MILLISECONDS); - NaiveDateTime::from_timestamp_opt( + let datetime = DateTime::from_timestamp( // extract seconds from milliseconds sec, // discard extracted seconds and convert milliseconds to nanoseconds milli_sec * MICROSECONDS as u32, - ) + )?; + Some(datetime.naive_utc()) } /// converts a `i32` representing a `time32(s)` to [`NaiveDateTime`] @@ -130,7 +131,7 @@ pub fn time_to_time64ns(v: NaiveTime) -> i64 { /// converts a `i64` representing a `timestamp(s)` to [`NaiveDateTime`] #[inline] pub fn timestamp_s_to_datetime(v: i64) -> Option { - NaiveDateTime::from_timestamp_opt(v, 0) + Some(DateTime::from_timestamp(v, 0)?.naive_utc()) } /// converts a `i64` representing a `timestamp(ms)` to [`NaiveDateTime`] @@ -138,12 +139,13 @@ pub fn timestamp_s_to_datetime(v: i64) -> Option { pub fn timestamp_ms_to_datetime(v: i64) -> Option { let (sec, milli_sec) = split_second(v, MILLISECONDS); - NaiveDateTime::from_timestamp_opt( + let datetime = DateTime::from_timestamp( // extract seconds from milliseconds sec, // discard extracted seconds and convert milliseconds to nanoseconds milli_sec * MICROSECONDS as u32, - ) + )?; + Some(datetime.naive_utc()) } /// converts a `i64` representing a `timestamp(us)` to [`NaiveDateTime`] @@ -151,12 +153,13 @@ pub fn timestamp_ms_to_datetime(v: i64) -> Option { pub fn timestamp_us_to_datetime(v: i64) -> Option { let (sec, micro_sec) = split_second(v, MICROSECONDS); - NaiveDateTime::from_timestamp_opt( + let datetime = DateTime::from_timestamp( // extract seconds from microseconds sec, // discard extracted seconds and convert microseconds to nanoseconds micro_sec * MILLISECONDS as u32, - ) + )?; + Some(datetime.naive_utc()) } /// converts a `i64` representing a `timestamp(ns)` to [`NaiveDateTime`] @@ -164,11 +167,12 @@ pub fn timestamp_us_to_datetime(v: i64) -> Option { pub fn timestamp_ns_to_datetime(v: i64) -> Option { let (sec, nano_sec) = split_second(v, NANOSECONDS); - NaiveDateTime::from_timestamp_opt( + let datetime = DateTime::from_timestamp( // extract seconds from nanoseconds sec, // discard extracted seconds nano_sec, - ) + )?; + Some(datetime.naive_utc()) } #[inline] @@ -179,13 +183,13 @@ pub(crate) fn split_second(v: i64, base: i64) -> (i64, u32) { /// converts a `i64` representing a `duration(s)` to [`Duration`] #[inline] pub fn duration_s_to_duration(v: i64) -> Duration { - Duration::seconds(v) + Duration::try_seconds(v).unwrap() } /// converts a `i64` representing a `duration(ms)` to [`Duration`] #[inline] pub fn duration_ms_to_duration(v: i64) -> Duration { - Duration::milliseconds(v) + Duration::try_milliseconds(v).unwrap() } /// converts a `i64` representing a `duration(us)` to [`Duration`] @@ -272,18 +276,18 @@ mod tests { date64_to_datetime, split_second, timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_us_to_datetime, NANOSECONDS, }; - use chrono::NaiveDateTime; + use chrono::DateTime; #[test] fn negative_input_timestamp_ns_to_datetime() { assert_eq!( timestamp_ns_to_datetime(-1), - NaiveDateTime::from_timestamp_opt(-1, 999_999_999) + DateTime::from_timestamp(-1, 999_999_999).map(|x| x.naive_utc()) ); assert_eq!( timestamp_ns_to_datetime(-1_000_000_001), - NaiveDateTime::from_timestamp_opt(-2, 999_999_999) + DateTime::from_timestamp(-2, 999_999_999).map(|x| x.naive_utc()) ); } @@ -291,12 +295,12 @@ mod tests { fn negative_input_timestamp_us_to_datetime() { assert_eq!( timestamp_us_to_datetime(-1), - NaiveDateTime::from_timestamp_opt(-1, 999_999_000) + DateTime::from_timestamp(-1, 999_999_000).map(|x| x.naive_utc()) ); assert_eq!( timestamp_us_to_datetime(-1_000_001), - NaiveDateTime::from_timestamp_opt(-2, 999_999_000) + DateTime::from_timestamp(-2, 999_999_000).map(|x| x.naive_utc()) ); } @@ -304,12 +308,12 @@ mod tests { fn negative_input_timestamp_ms_to_datetime() { assert_eq!( timestamp_ms_to_datetime(-1), - NaiveDateTime::from_timestamp_opt(-1, 999_000_000) + DateTime::from_timestamp(-1, 999_000_000).map(|x| x.naive_utc()) ); assert_eq!( timestamp_ms_to_datetime(-1_001), - NaiveDateTime::from_timestamp_opt(-2, 999_000_000) + DateTime::from_timestamp(-2, 999_000_000).map(|x| x.naive_utc()) ); } @@ -317,12 +321,12 @@ mod tests { fn negative_input_date64_to_datetime() { assert_eq!( date64_to_datetime(-1), - NaiveDateTime::from_timestamp_opt(-1, 999_000_000) + DateTime::from_timestamp(-1, 999_000_000).map(|x| x.naive_utc()) ); assert_eq!( date64_to_datetime(-1_001), - NaiveDateTime::from_timestamp_opt(-2, 999_000_000) + DateTime::from_timestamp(-2, 999_000_000).map(|x| x.naive_utc()) ); } diff --git a/arrow-array/src/types.rs b/arrow-array/src/types.rs index 6e177838c4f5..83a229c1da0d 100644 --- a/arrow-array/src/types.rs +++ b/arrow-array/src/types.rs @@ -390,31 +390,34 @@ impl ArrowTimestampType for TimestampSecondType { const UNIT: TimeUnit = TimeUnit::Second; fn make_value(naive: NaiveDateTime) -> Option { - Some(naive.timestamp()) + Some(naive.and_utc().timestamp()) } } impl ArrowTimestampType for TimestampMillisecondType { const UNIT: TimeUnit = TimeUnit::Millisecond; fn make_value(naive: NaiveDateTime) -> Option { - let millis = naive.timestamp().checked_mul(1_000)?; - millis.checked_add(naive.timestamp_subsec_millis() as i64) + let utc = naive.and_utc(); + let millis = utc.timestamp().checked_mul(1_000)?; + millis.checked_add(utc.timestamp_subsec_millis() as i64) } } impl ArrowTimestampType for TimestampMicrosecondType { const UNIT: TimeUnit = TimeUnit::Microsecond; fn make_value(naive: NaiveDateTime) -> Option { - let micros = naive.timestamp().checked_mul(1_000_000)?; - micros.checked_add(naive.timestamp_subsec_micros() as i64) + let utc = naive.and_utc(); + let micros = utc.timestamp().checked_mul(1_000_000)?; + micros.checked_add(utc.timestamp_subsec_micros() as i64) } } impl ArrowTimestampType for TimestampNanosecondType { const UNIT: TimeUnit = TimeUnit::Nanosecond; fn make_value(naive: NaiveDateTime) -> Option { - let nanos = naive.timestamp().checked_mul(1_000_000_000)?; - nanos.checked_add(naive.timestamp_subsec_nanos() as i64) + let utc = naive.and_utc(); + let nanos = utc.timestamp().checked_mul(1_000_000_000)?; + nanos.checked_add(utc.timestamp_subsec_nanos() as i64) } } @@ -438,7 +441,7 @@ fn add_day_time( let (days, ms) = IntervalDayTimeType::to_parts(delta); let res = as_datetime_with_timezone::(timestamp, tz)?; let res = add_days_datetime(res, days)?; - let res = res.checked_add_signed(Duration::milliseconds(ms as i64))?; + let res = res.checked_add_signed(Duration::try_milliseconds(ms as i64)?)?; let res = res.naive_utc(); T::make_value(res) } @@ -477,7 +480,7 @@ fn subtract_day_time( let (days, ms) = IntervalDayTimeType::to_parts(delta); let res = as_datetime_with_timezone::(timestamp, tz)?; let res = sub_days_datetime(res, days)?; - let res = res.checked_sub_signed(Duration::milliseconds(ms as i64))?; + let res = res.checked_sub_signed(Duration::try_milliseconds(ms as i64)?)?; let res = res.naive_utc(); T::make_value(res) } @@ -1001,7 +1004,7 @@ impl Date32Type { /// * `i` - The Date32Type to convert pub fn to_naive_date(i: ::Native) -> NaiveDate { let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); - epoch.add(Duration::days(i as i64)) + epoch.add(Duration::try_days(i as i64).unwrap()) } /// Converts a chrono::NaiveDate into an arrow Date32Type @@ -1042,8 +1045,8 @@ impl Date32Type { ) -> ::Native { let (days, ms) = IntervalDayTimeType::to_parts(delta); let res = Date32Type::to_naive_date(date); - let res = res.add(Duration::days(days as i64)); - let res = res.add(Duration::milliseconds(ms as i64)); + let res = res.add(Duration::try_days(days as i64).unwrap()); + let res = res.add(Duration::try_milliseconds(ms as i64).unwrap()); Date32Type::from_naive_date(res) } @@ -1060,7 +1063,7 @@ impl Date32Type { let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(delta); let res = Date32Type::to_naive_date(date); let res = shift_months(res, months); - let res = res.add(Duration::days(days as i64)); + let res = res.add(Duration::try_days(days as i64).unwrap()); let res = res.add(Duration::nanoseconds(nanos)); Date32Type::from_naive_date(res) } @@ -1093,8 +1096,8 @@ impl Date32Type { ) -> ::Native { let (days, ms) = IntervalDayTimeType::to_parts(delta); let res = Date32Type::to_naive_date(date); - let res = res.sub(Duration::days(days as i64)); - let res = res.sub(Duration::milliseconds(ms as i64)); + let res = res.sub(Duration::try_days(days as i64).unwrap()); + let res = res.sub(Duration::try_milliseconds(ms as i64).unwrap()); Date32Type::from_naive_date(res) } @@ -1111,7 +1114,7 @@ impl Date32Type { let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(delta); let res = Date32Type::to_naive_date(date); let res = shift_months(res, -months); - let res = res.sub(Duration::days(days as i64)); + let res = res.sub(Duration::try_days(days as i64).unwrap()); let res = res.sub(Duration::nanoseconds(nanos)); Date32Type::from_naive_date(res) } @@ -1125,7 +1128,7 @@ impl Date64Type { /// * `i` - The Date64Type to convert pub fn to_naive_date(i: ::Native) -> NaiveDate { let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); - epoch.add(Duration::milliseconds(i)) + epoch.add(Duration::try_milliseconds(i).unwrap()) } /// Converts a chrono::NaiveDate into an arrow Date64Type @@ -1166,8 +1169,8 @@ impl Date64Type { ) -> ::Native { let (days, ms) = IntervalDayTimeType::to_parts(delta); let res = Date64Type::to_naive_date(date); - let res = res.add(Duration::days(days as i64)); - let res = res.add(Duration::milliseconds(ms as i64)); + let res = res.add(Duration::try_days(days as i64).unwrap()); + let res = res.add(Duration::try_milliseconds(ms as i64).unwrap()); Date64Type::from_naive_date(res) } @@ -1184,7 +1187,7 @@ impl Date64Type { let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(delta); let res = Date64Type::to_naive_date(date); let res = shift_months(res, months); - let res = res.add(Duration::days(days as i64)); + let res = res.add(Duration::try_days(days as i64).unwrap()); let res = res.add(Duration::nanoseconds(nanos)); Date64Type::from_naive_date(res) } @@ -1217,8 +1220,8 @@ impl Date64Type { ) -> ::Native { let (days, ms) = IntervalDayTimeType::to_parts(delta); let res = Date64Type::to_naive_date(date); - let res = res.sub(Duration::days(days as i64)); - let res = res.sub(Duration::milliseconds(ms as i64)); + let res = res.sub(Duration::try_days(days as i64).unwrap()); + let res = res.sub(Duration::try_milliseconds(ms as i64).unwrap()); Date64Type::from_naive_date(res) } @@ -1235,7 +1238,7 @@ impl Date64Type { let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(delta); let res = Date64Type::to_naive_date(date); let res = shift_months(res, -months); - let res = res.sub(Duration::days(days as i64)); + let res = res.sub(Duration::try_days(days as i64).unwrap()); let res = res.sub(Duration::nanoseconds(nanos)); Date64Type::from_naive_date(res) } diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs index ca3ee6260ba7..10b62a2ce473 100644 --- a/arrow-buffer/src/buffer/immutable.rs +++ b/arrow-buffer/src/buffer/immutable.rs @@ -423,25 +423,8 @@ impl Buffer { impl FromIterator for Buffer { fn from_iter>(iter: I) -> Self { - let mut iterator = iter.into_iter(); - let size = std::mem::size_of::(); - - // first iteration, which will likely reserve sufficient space for the buffer. - let mut buffer = match iterator.next() { - None => MutableBuffer::new(0), - Some(element) => { - let (lower, _) = iterator.size_hint(); - let mut buffer = MutableBuffer::new(lower.saturating_add(1).saturating_mul(size)); - unsafe { - std::ptr::write(buffer.as_mut_ptr() as *mut T, element); - buffer.set_len(size); - } - buffer - } - }; - - buffer.extend_from_iter(iterator); - buffer.into() + let vec = Vec::from_iter(iter); + Buffer::from_vec(vec) } } @@ -822,7 +805,7 @@ mod tests { } #[test] - #[should_panic(expected = "failed to round to next highest power of 2")] + #[should_panic(expected = "capacity overflow")] fn test_from_iter_overflow() { let iter_len = usize::MAX / std::mem::size_of::() + 1; let _ = Buffer::from_iter(std::iter::repeat(0_u64).take(iter_len)); diff --git a/arrow-cast/Cargo.toml b/arrow-cast/Cargo.toml index 2d8a57aba5f5..791b129b2d1f 100644 --- a/arrow-cast/Cargo.toml +++ b/arrow-cast/Cargo.toml @@ -51,7 +51,7 @@ num = { version = "0.4", default-features = false, features = ["std"] } lexical-core = { version = "^0.8", default-features = false, features = ["write-integers", "write-floats", "parse-integers", "parse-floats"] } atoi = "2.0.0" comfy-table = { version = "7.0", optional = true, default-features = false } -base64 = "0.21" +base64 = "0.22" ryu = "1.0.16" [dev-dependencies] diff --git a/arrow-cast/src/parse.rs b/arrow-cast/src/parse.rs index 72942af8394a..6214e6d97371 100644 --- a/arrow-cast/src/parse.rs +++ b/arrow-cast/src/parse.rs @@ -273,7 +273,8 @@ pub fn string_to_timestamp_nanos(s: &str) -> Result { /// Fallible conversion of [`NaiveDateTime`] to `i64` nanoseconds #[inline] fn to_timestamp_nanos(dt: NaiveDateTime) -> Result { - dt.timestamp_nanos_opt() + dt.and_utc() + .timestamp_nanos_opt() .ok_or_else(|| ArrowError::ParseError(ERR_NANOSECONDS_NOT_SUPPORTED.to_string())) } @@ -632,8 +633,8 @@ impl Parser for Date32Type { impl Parser for Date64Type { fn parse(string: &str) -> Option { if string.len() <= 10 { - let date = parse_date(string)?; - Some(NaiveDateTime::new(date, NaiveTime::default()).timestamp_millis()) + let datetime = NaiveDateTime::new(parse_date(string)?, NaiveTime::default()); + Some(datetime.and_utc().timestamp_millis()) } else { let date_time = string_to_datetime(&Utc, string).ok()?; Some(date_time.timestamp_millis()) @@ -662,7 +663,7 @@ impl Parser for Date64Type { Some(date_time.timestamp_millis()) } else { let date_time = NaiveDateTime::parse_from_str(string, format).ok()?; - Some(date_time.timestamp_millis()) + Some(date_time.and_utc().timestamp_millis()) } } } @@ -1286,43 +1287,45 @@ mod tests { // Ensure both T and ' ' variants work assert_eq!( - naive_datetime.timestamp_nanos_opt().unwrap(), + naive_datetime.and_utc().timestamp_nanos_opt().unwrap(), parse_timestamp("2020-09-08T13:42:29.190855").unwrap() ); assert_eq!( - naive_datetime.timestamp_nanos_opt().unwrap(), + naive_datetime.and_utc().timestamp_nanos_opt().unwrap(), parse_timestamp("2020-09-08 13:42:29.190855").unwrap() ); // Also ensure that parsing timestamps with no fractional // second part works as well - let naive_datetime_whole_secs = NaiveDateTime::new( + let datetime_whole_secs = NaiveDateTime::new( NaiveDate::from_ymd_opt(2020, 9, 8).unwrap(), NaiveTime::from_hms_opt(13, 42, 29).unwrap(), - ); + ) + .and_utc(); // Ensure both T and ' ' variants work assert_eq!( - naive_datetime_whole_secs.timestamp_nanos_opt().unwrap(), + datetime_whole_secs.timestamp_nanos_opt().unwrap(), parse_timestamp("2020-09-08T13:42:29").unwrap() ); assert_eq!( - naive_datetime_whole_secs.timestamp_nanos_opt().unwrap(), + datetime_whole_secs.timestamp_nanos_opt().unwrap(), parse_timestamp("2020-09-08 13:42:29").unwrap() ); // ensure without time work // no time, should be the nano second at // 2020-09-08 0:0:0 - let naive_datetime_no_time = NaiveDateTime::new( + let datetime_no_time = NaiveDateTime::new( NaiveDate::from_ymd_opt(2020, 9, 8).unwrap(), NaiveTime::from_hms_opt(0, 0, 0).unwrap(), - ); + ) + .and_utc(); assert_eq!( - naive_datetime_no_time.timestamp_nanos_opt().unwrap(), + datetime_no_time.timestamp_nanos_opt().unwrap(), parse_timestamp("2020-09-08").unwrap() ) } @@ -1434,12 +1437,12 @@ mod tests { // Ensure both T and ' ' variants work assert_eq!( - naive_datetime.timestamp_nanos_opt().unwrap(), + naive_datetime.and_utc().timestamp_nanos_opt().unwrap(), parse_timestamp("2020-09-08T13:42:29.190855").unwrap() ); assert_eq!( - naive_datetime.timestamp_nanos_opt().unwrap(), + naive_datetime.and_utc().timestamp_nanos_opt().unwrap(), parse_timestamp("2020-09-08 13:42:29.190855").unwrap() ); @@ -1450,12 +1453,12 @@ mod tests { // Ensure both T and ' ' variants work assert_eq!( - naive_datetime.timestamp_nanos_opt().unwrap(), + naive_datetime.and_utc().timestamp_nanos_opt().unwrap(), parse_timestamp("2020-09-08T13:42:29").unwrap() ); assert_eq!( - naive_datetime.timestamp_nanos_opt().unwrap(), + naive_datetime.and_utc().timestamp_nanos_opt().unwrap(), parse_timestamp("2020-09-08 13:42:29").unwrap() ); diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs index 10c53c549e2b..2ddc2d845b01 100644 --- a/arrow-data/src/data.rs +++ b/arrow-data/src/data.rs @@ -109,6 +109,10 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff buffer.push(0i64); [buffer, MutableBuffer::new(capacity * mem::size_of::())] } + DataType::BinaryView | DataType::Utf8View => [ + MutableBuffer::new(capacity * mem::size_of::()), + empty_buffer, + ], DataType::List(_) | DataType::Map(_, _) => { // offset buffer always starts with a zero let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::()); @@ -1541,6 +1545,9 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout { DataType::LargeBinary => DataTypeLayout::new_binary::(), DataType::Utf8 => DataTypeLayout::new_binary::(), DataType::LargeUtf8 => DataTypeLayout::new_binary::(), + DataType::BinaryView | DataType::Utf8View => { + unimplemented!("BinaryView/Utf8View not implemented") + } DataType::FixedSizeList(_, _) => DataTypeLayout::new_empty(), // all in child data DataType::List(_) => DataTypeLayout::new_fixed_width::(), DataType::LargeList(_) => DataTypeLayout::new_fixed_width::(), diff --git a/arrow-data/src/equal/mod.rs b/arrow-data/src/equal/mod.rs index b279546474a0..1255ff39e097 100644 --- a/arrow-data/src/equal/mod.rs +++ b/arrow-data/src/equal/mod.rs @@ -96,6 +96,9 @@ fn equal_values( variable_sized_equal::(lhs, rhs, lhs_start, rhs_start, len) } DataType::FixedSizeBinary(_) => fixed_binary_equal(lhs, rhs, lhs_start, rhs_start, len), + DataType::BinaryView | DataType::Utf8View => { + unimplemented!("BinaryView/Utf8View not yet implemented") + } DataType::List(_) => list_equal::(lhs, rhs, lhs_start, rhs_start, len), DataType::LargeList(_) => list_equal::(lhs, rhs, lhs_start, rhs_start, len), DataType::FixedSizeList(_, _) => fixed_list_equal(lhs, rhs, lhs_start, rhs_start, len), diff --git a/arrow-data/src/transform/mod.rs b/arrow-data/src/transform/mod.rs index 268cf10f2326..ef53efac2373 100644 --- a/arrow-data/src/transform/mod.rs +++ b/arrow-data/src/transform/mod.rs @@ -224,6 +224,9 @@ fn build_extend(array: &ArrayData) -> Extend { DataType::Decimal256(_, _) => primitive::build_extend::(array), DataType::Utf8 | DataType::Binary => variable_size::build_extend::(array), DataType::LargeUtf8 | DataType::LargeBinary => variable_size::build_extend::(array), + DataType::BinaryView | DataType::Utf8View => { + unimplemented!("BinaryView/Utf8View not implemented") + } DataType::Map(_, _) | DataType::List(_) => list::build_extend::(array), DataType::LargeList(_) => list::build_extend::(array), DataType::Dictionary(_, _) => unreachable!("should use build_extend_dictionary"), @@ -266,6 +269,9 @@ fn build_extend_nulls(data_type: &DataType) -> ExtendNulls { DataType::Decimal256(_, _) => primitive::extend_nulls::, DataType::Utf8 | DataType::Binary => variable_size::extend_nulls::, DataType::LargeUtf8 | DataType::LargeBinary => variable_size::extend_nulls::, + DataType::BinaryView | DataType::Utf8View => { + unimplemented!("BinaryView/Utf8View not implemented") + } DataType::Map(_, _) | DataType::List(_) => list::extend_nulls::, DataType::LargeList(_) => list::extend_nulls::, DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() { @@ -419,6 +425,9 @@ impl<'a> MutableArrayData<'a> { | DataType::LargeBinary | DataType::Interval(_) | DataType::FixedSizeBinary(_) => vec![], + DataType::BinaryView | DataType::Utf8View => { + unimplemented!("BinaryView/Utf8View not implemented") + } DataType::Map(_, _) | DataType::List(_) | DataType::LargeList(_) => { let children = arrays .iter() diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml index 1e7255baa748..60ba3ae827de 100644 --- a/arrow-flight/Cargo.toml +++ b/arrow-flight/Cargo.toml @@ -39,7 +39,7 @@ arrow-row = { workspace = true, optional = true } arrow-select = { workspace = true, optional = true } arrow-schema = { workspace = true } arrow-string = { workspace = true, optional = true } -base64 = { version = "0.21", default-features = false, features = ["std"] } +base64 = { version = "0.22", default-features = false, features = ["std"] } bytes = { version = "1", default-features = false } futures = { version = "0.3", default-features = false, features = ["alloc"] } once_cell = { version = "1", optional = true } diff --git a/arrow-flight/examples/flight_sql_server.rs b/arrow-flight/examples/flight_sql_server.rs index 85f5c7499346..efd8b6dec90f 100644 --- a/arrow-flight/examples/flight_sql_server.rs +++ b/arrow-flight/examples/flight_sql_server.rs @@ -193,9 +193,9 @@ impl FlightSqlService for FlightSqlServiceImpl { ) -> Result::DoGetStream>, Status> { self.check_token(&request)?; let batch = Self::fake_result().map_err(|e| status!("Could not fake a result", e))?; - let schema = batch.schema(); - let batches = vec![batch]; - let flight_data = batches_to_flight_data(schema.as_ref(), batches) + let schema = batch.schema_ref(); + let batches = vec![batch.clone()]; + let flight_data = batches_to_flight_data(schema, batches) .map_err(|e| status!("Could not convert batches", e))? .into_iter() .map(Ok); @@ -641,10 +641,10 @@ impl FlightSqlService for FlightSqlServiceImpl { request: Request, ) -> Result { self.check_token(&request)?; - let schema = Self::fake_result() - .map_err(|e| status!("Error getting result schema", e))? - .schema(); - let message = SchemaAsIpc::new(&schema, &IpcWriteOptions::default()) + let record_batch = + Self::fake_result().map_err(|e| status!("Error getting result schema", e))?; + let schema = record_batch.schema_ref(); + let message = SchemaAsIpc::new(schema, &IpcWriteOptions::default()) .try_into() .map_err(|e| status!("Unable to serialize schema", e))?; let IpcMessage(schema_bytes) = message; diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index e6ef9994d487..bb0436816209 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -320,7 +320,7 @@ impl FlightDataEncoder { let schema = match &self.schema { Some(schema) => schema.clone(), // encode the schema if this is the first time we have seen it - None => self.encode_schema(&batch.schema()), + None => self.encode_schema(batch.schema_ref()), }; // encode the batch @@ -565,12 +565,12 @@ mod tests { let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef)]) .expect("cannot create record batch"); - let schema = batch.schema(); + let schema = batch.schema_ref(); let (_, baseline_flight_batch) = make_flight_data(&batch, &options); let big_batch = batch.slice(0, batch.num_rows() - 1); - let optimized_big_batch = prepare_batch_for_flight(&big_batch, Arc::clone(&schema), false) + let optimized_big_batch = prepare_batch_for_flight(&big_batch, Arc::clone(schema), false) .expect("failed to optimize"); let (_, optimized_big_flight_batch) = make_flight_data(&optimized_big_batch, &options); @@ -581,7 +581,7 @@ mod tests { let small_batch = batch.slice(0, 1); let optimized_small_batch = - prepare_batch_for_flight(&small_batch, Arc::clone(&schema), false) + prepare_batch_for_flight(&small_batch, Arc::clone(schema), false) .expect("failed to optimize"); let (_, optimized_small_flight_batch) = make_flight_data(&optimized_small_batch, &options); diff --git a/arrow-flight/tests/encode_decode.rs b/arrow-flight/tests/encode_decode.rs index 789233b918d0..224b12500a08 100644 --- a/arrow-flight/tests/encode_decode.rs +++ b/arrow-flight/tests/encode_decode.rs @@ -465,7 +465,7 @@ async fn roundtrip(input: Vec) { /// When is resolved, /// it should be possible to use `roundtrip` async fn roundtrip_dictionary(input: Vec) { - let schema = Arc::new(prepare_schema_for_flight(&input[0].schema())); + let schema = Arc::new(prepare_schema_for_flight(input[0].schema_ref())); let expected_output: Vec<_> = input .iter() .map(|batch| prepare_batch_for_flight(batch, schema.clone()).unwrap()) diff --git a/arrow-flight/tests/flight_sql_client_cli.rs b/arrow-flight/tests/flight_sql_client_cli.rs index a28080450bc2..cc270eeb6186 100644 --- a/arrow-flight/tests/flight_sql_client_cli.rs +++ b/arrow-flight/tests/flight_sql_client_cli.rs @@ -189,7 +189,7 @@ impl FlightSqlServiceImpl { let batch = Self::fake_result()?; Ok(FlightInfo::new() - .try_with_schema(&batch.schema()) + .try_with_schema(batch.schema_ref()) .expect("encoding schema") .with_endpoint( FlightEndpoint::new().with_ticket(Ticket::new( @@ -245,9 +245,9 @@ impl FlightSqlService for FlightSqlServiceImpl { "part_2" => batch.slice(2, 1), ticket => panic!("Invalid ticket: {ticket:?}"), }; - let schema = batch.schema(); - let batches = vec![batch]; - let flight_data = batches_to_flight_data(schema.as_ref(), batches) + let schema = batch.schema_ref(); + let batches = vec![batch.clone()]; + let flight_data = batches_to_flight_data(schema, batches) .unwrap() .into_iter() .map(Ok); diff --git a/arrow-integration-test/src/datatype.rs b/arrow-integration-test/src/datatype.rs index 42ac71fbbd7e..a04db1cf3538 100644 --- a/arrow-integration-test/src/datatype.rs +++ b/arrow-integration-test/src/datatype.rs @@ -271,6 +271,9 @@ pub fn data_type_to_json(data_type: &DataType) -> serde_json::Value { DataType::LargeUtf8 => json!({"name": "largeutf8"}), DataType::Binary => json!({"name": "binary"}), DataType::LargeBinary => json!({"name": "largebinary"}), + DataType::BinaryView | DataType::Utf8View => { + unimplemented!("BinaryView/Utf8View not implemented") + } DataType::FixedSizeBinary(byte_width) => { json!({"name": "fixedsizebinary", "byteWidth": byte_width}) } diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs index 505541f7d50f..a2ffd4380203 100644 --- a/arrow-ipc/src/convert.rs +++ b/arrow-ipc/src/convert.rs @@ -543,6 +543,7 @@ pub(crate) fn get_fb_field_type<'a>( .as_union_value(), children: Some(fbb.create_vector(&empty_fields[..])), }, + BinaryView | Utf8View => unimplemented!("BinaryView/Utf8View not implemented"), Utf8 => FBFieldType { type_type: crate::Type::Utf8, type_: crate::Utf8Builder::new(fbb).finish().as_union_value(), diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 81b8b530734a..361c4f7f67dd 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -1429,7 +1429,7 @@ mod tests { fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch { let mut buf = Vec::new(); - let mut writer = crate::writer::FileWriter::try_new(&mut buf, &rb.schema()).unwrap(); + let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap(); writer.write(rb).unwrap(); writer.finish().unwrap(); drop(writer); @@ -1440,7 +1440,7 @@ mod tests { fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch { let mut buf = Vec::new(); - let mut writer = crate::writer::StreamWriter::try_new(&mut buf, &rb.schema()).unwrap(); + let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap(); writer.write(rb).unwrap(); writer.finish().unwrap(); drop(writer); @@ -1815,7 +1815,7 @@ mod tests { let batch = RecordBatch::new_empty(schema); let mut buf = Vec::new(); - let mut writer = crate::writer::FileWriter::try_new(&mut buf, &batch.schema()).unwrap(); + let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap(); writer.write(&batch).unwrap(); writer.finish().unwrap(); drop(writer); @@ -1842,7 +1842,7 @@ mod tests { let batch = RecordBatch::new_empty(schema); let mut buf = Vec::new(); - let mut writer = crate::writer::FileWriter::try_new(&mut buf, &batch.schema()).unwrap(); + let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap(); writer.write(&batch).unwrap(); writer.finish().unwrap(); drop(writer); diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 1f6bf5f6fa85..99e52e2a7076 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -1436,7 +1436,7 @@ mod tests { use super::*; fn serialize_file(rb: &RecordBatch) -> Vec { - let mut writer = FileWriter::try_new(vec![], &rb.schema()).unwrap(); + let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap(); writer.write(rb).unwrap(); writer.finish().unwrap(); writer.into_inner().unwrap() @@ -1448,7 +1448,7 @@ mod tests { } fn serialize_stream(record: &RecordBatch) -> Vec { - let mut stream_writer = StreamWriter::try_new(vec![], &record.schema()).unwrap(); + let mut stream_writer = StreamWriter::try_new(vec![], record.schema_ref()).unwrap(); stream_writer.write(record).unwrap(); stream_writer.finish().unwrap(); stream_writer.into_inner().unwrap() @@ -1982,7 +1982,7 @@ mod tests { ) .expect("new batch"); - let mut writer = StreamWriter::try_new(vec![], &batch.schema()).expect("new writer"); + let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer"); writer.write(&batch).expect("write"); let outbuf = writer.into_inner().expect("inner"); diff --git a/arrow-json/src/writer.rs b/arrow-json/src/writer.rs index bb494b595064..d8045c330481 100644 --- a/arrow-json/src/writer.rs +++ b/arrow-json/src/writer.rs @@ -1046,6 +1046,7 @@ mod tests { let ts_nanos = ts_string .parse::() .unwrap() + .and_utc() .timestamp_nanos_opt() .unwrap(); let ts_micros = ts_nanos / 1000; @@ -1099,6 +1100,7 @@ mod tests { let ts_nanos = ts_string .parse::() .unwrap() + .and_utc() .timestamp_nanos_opt() .unwrap(); let ts_micros = ts_nanos / 1000; @@ -1159,6 +1161,7 @@ mod tests { let ts_millis = ts_string .parse::() .unwrap() + .and_utc() .timestamp_millis(); let arr_date32 = Date32Array::from(vec![ diff --git a/arrow-schema/src/datatype.rs b/arrow-schema/src/datatype.rs index 079b855ce990..b3d89b011e66 100644 --- a/arrow-schema/src/datatype.rs +++ b/arrow-schema/src/datatype.rs @@ -196,6 +196,15 @@ pub enum DataType { /// A single LargeBinary array can store up to [`i64::MAX`] bytes /// of binary data in total. LargeBinary, + /// (NOT YET FULLY SUPPORTED) Opaque binary data of variable length. + /// + /// Note this data type is not yet fully supported. Using it with arrow APIs may result in `panic`s. + /// + /// Logically the same as [`Self::Binary`], but the internal representation uses a view + /// struct that contains the string length and either the string's entire data + /// inline (for small strings) or an inlined prefix, an index of another buffer, + /// and an offset pointing to a slice in that buffer (for non-small strings). + BinaryView, /// A variable-length string in Unicode with UTF-8 encoding. /// /// A single Utf8 array can store up to [`i32::MAX`] bytes @@ -206,6 +215,15 @@ pub enum DataType { /// A single LargeUtf8 array can store up to [`i64::MAX`] bytes /// of string data in total. LargeUtf8, + /// (NOT YET FULLY SUPPORTED) A variable-length string in Unicode with UTF-8 encoding + /// + /// Note this data type is not yet fully supported. Using it with arrow APIs may result in `panic`s. + /// + /// Logically the same as [`Self::Utf8`], but the internal representation uses a view + /// struct that contains the string length and either the string's entire data + /// inline (for small strings) or an inlined prefix, an index of another buffer, + /// and an offset pointing to a slice in that buffer (for non-small strings). + Utf8View, /// A list of some logical data type with variable length. /// /// A single List array can store up to [`i32::MAX`] elements in total. @@ -515,8 +533,8 @@ impl DataType { DataType::Interval(IntervalUnit::MonthDayNano) => Some(16), DataType::Decimal128(_, _) => Some(16), DataType::Decimal256(_, _) => Some(32), - DataType::Utf8 | DataType::LargeUtf8 => None, - DataType::Binary | DataType::LargeBinary => None, + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => None, + DataType::Binary | DataType::LargeBinary | DataType::BinaryView => None, DataType::FixedSizeBinary(_) => None, DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _) => None, DataType::FixedSizeList(_, _) => None, @@ -555,8 +573,10 @@ impl DataType { | DataType::Binary | DataType::FixedSizeBinary(_) | DataType::LargeBinary + | DataType::BinaryView | DataType::Utf8 | DataType::LargeUtf8 + | DataType::Utf8View | DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => 0, DataType::Timestamp(_, s) => s.as_ref().map(|s| s.len()).unwrap_or_default(), diff --git a/arrow-schema/src/field.rs b/arrow-schema/src/field.rs index 5cb2399f383c..70a3e2b21a3c 100644 --- a/arrow-schema/src/field.rs +++ b/arrow-schema/src/field.rs @@ -129,7 +129,7 @@ impl Field { } } - /// Creates a new `Field`` suitable for [`DataType::List`] and + /// Creates a new `Field` suitable for [`DataType::List`] and /// [`DataType::LargeList`] /// /// While not required, this method follows the convention of naming the @@ -507,6 +507,7 @@ impl Field { | DataType::Duration(_) | DataType::Binary | DataType::LargeBinary + | DataType::BinaryView | DataType::Interval(_) | DataType::LargeList(_) | DataType::List(_) @@ -517,6 +518,7 @@ impl Field { | DataType::FixedSizeBinary(_) | DataType::Utf8 | DataType::LargeUtf8 + | DataType::Utf8View | DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => { if from.data_type == DataType::Null { diff --git a/arrow/benches/cast_kernels.rs b/arrow/benches/cast_kernels.rs index 6632dbc57c56..1877ed0e7899 100644 --- a/arrow/benches/cast_kernels.rs +++ b/arrow/benches/cast_kernels.rs @@ -21,6 +21,7 @@ use criterion::Criterion; use rand::distributions::{Distribution, Standard, Uniform}; use rand::Rng; +use chrono::DateTime; use std::sync::Arc; extern crate arrow; @@ -63,8 +64,6 @@ fn build_utf8_date_array(size: usize, with_nulls: bool) -> ArrayRef { } fn build_utf8_date_time_array(size: usize, with_nulls: bool) -> ArrayRef { - use chrono::NaiveDateTime; - // use random numbers to avoid spurious compiler optimizations wrt to branching let mut rng = seedable_rng(); let mut builder = StringBuilder::new(); @@ -74,7 +73,7 @@ fn build_utf8_date_time_array(size: usize, with_nulls: bool) -> ArrayRef { if with_nulls && rng.gen::() > 0.8 { builder.append_null(); } else { - let string = NaiveDateTime::from_timestamp_opt(rng.sample(range), 0) + let string = DateTime::from_timestamp(rng.sample(range), 0) .unwrap() .format("%Y-%m-%dT%H:%M:%S") .to_string(); diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index 4cc75595df09..a1e80ce51ded 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -44,7 +44,7 @@ url = "2.2" walkdir = "2" # Cloud storage support -base64 = { version = "0.21", default-features = false, features = ["std"], optional = true } +base64 = { version = "0.22", default-features = false, features = ["std"], optional = true } hyper = { version = "0.14", default-features = false, optional = true } quick-xml = { version = "0.31.0", features = ["serialize", "overlapped-lists"], optional = true } serde = { version = "1.0", default-features = false, features = ["derive"], optional = true } diff --git a/object_store/src/gcp/credential.rs b/object_store/src/gcp/credential.rs index dc504da05723..34cd6eeb6ea4 100644 --- a/object_store/src/gcp/credential.rs +++ b/object_store/src/gcp/credential.rs @@ -393,7 +393,11 @@ pub enum ApplicationDefaultCredentials { } impl ApplicationDefaultCredentials { - const CREDENTIALS_PATH: &'static str = ".config/gcloud/application_default_credentials.json"; + const CREDENTIALS_PATH: &'static str = if cfg!(windows) { + "gcloud/application_default_credentials.json" + } else { + ".config/gcloud/application_default_credentials.json" + }; // Create a new application default credential in the following situations: // 1. a file is passed in and the type matches. @@ -402,7 +406,9 @@ impl ApplicationDefaultCredentials { if let Some(path) = path { return read_credentials_file::(path).map(Some); } - if let Some(home) = env::var_os("HOME") { + + let home_var = if cfg!(windows) { "APPDATA" } else { "HOME" }; + if let Some(home) = env::var_os(home_var) { let path = Path::new(&home).join(Self::CREDENTIALS_PATH); // It's expected for this file to not exist unless it has been explicitly configured by the user. diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index af5676ef5003..8132002b6e01 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1693,7 +1693,9 @@ mod tests { } let options = GetOptions { - if_unmodified_since: Some(meta.last_modified + chrono::Duration::hours(10)), + if_unmodified_since: Some( + meta.last_modified + chrono::Duration::try_hours(10).unwrap(), + ), ..GetOptions::default() }; match storage.get_opts(&path, options).await { @@ -1702,7 +1704,9 @@ mod tests { } let options = GetOptions { - if_unmodified_since: Some(meta.last_modified - chrono::Duration::hours(10)), + if_unmodified_since: Some( + meta.last_modified - chrono::Duration::try_hours(10).unwrap(), + ), ..GetOptions::default() }; match storage.get_opts(&path, options).await { @@ -1720,7 +1724,7 @@ mod tests { } let options = GetOptions { - if_modified_since: Some(meta.last_modified - chrono::Duration::hours(10)), + if_modified_since: Some(meta.last_modified - chrono::Duration::try_hours(10).unwrap()), ..GetOptions::default() }; match storage.get_opts(&path, options).await { diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 60160a8b3fc1..e6d612e0cc62 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -56,7 +56,7 @@ zstd = { version = "0.13.0", optional = true, default-features = false } chrono = { workspace = true } num = { version = "0.4", default-features = false } num-bigint = { version = "0.4", default-features = false } -base64 = { version = "0.21", default-features = false, features = ["std", ], optional = true } +base64 = { version = "0.22", default-features = false, features = ["std", ], optional = true } clap = { version = "4.1", default-features = false, features = ["std", "derive", "env", "help", "error-context", "usage"], optional = true } serde = { version = "1.0", default-features = false, features = ["derive"], optional = true } serde_json = { version = "1.0", default-features = false, features = ["std"], optional = true } @@ -69,7 +69,7 @@ paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } [dev-dependencies] -base64 = { version = "0.21", default-features = false, features = ["std"] } +base64 = { version = "0.22", default-features = false, features = ["std"] } criterion = { version = "0.5", default-features = false } snap = { version = "1.0", default-features = false } tempfile = { version = "3.0", default-features = false } diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 6b6146042051..7aeb3d127ac3 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -3082,7 +3082,7 @@ mod tests { .unwrap(); let batches = reader.collect::, _>>().unwrap(); - let actual = concat_batches(&batch.schema(), &batches).unwrap(); + let actual = concat_batches(batch.schema_ref(), &batches).unwrap(); assert_eq!(actual.num_rows(), selection.row_count()); let mut batch_offset = 0; diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 01c86db8b7d2..c72a8af6ce69 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -80,6 +80,32 @@ mod levels; /// /// assert_eq!(to_write, read); /// ``` +/// +/// ## Memory Limiting +/// +/// The nature of parquet forces buffering of an entire row group before it can be flushed +/// to the underlying writer. Data is buffered in its encoded form, to reduce memory usage, +/// but if writing rows containing large strings or very nested data, this may still result in +/// non-trivial memory usage. +/// +/// [`ArrowWriter::in_progress_size`] can be used to track the size of the buffered row group, +/// and potentially trigger an early flush of a row group based on a memory threshold and/or +/// global memory pressure. However, users should be aware that smaller row groups will result +/// in higher metadata overheads, and may worsen compression ratios and query performance. +/// +/// ```no_run +/// # use std::io::Write; +/// # use arrow_array::RecordBatch; +/// # use parquet::arrow::ArrowWriter; +/// # let mut writer: ArrowWriter> = todo!(); +/// # let batch: RecordBatch = todo!(); +/// writer.write(&batch).unwrap(); +/// // Trigger an early flush if buffered size exceeds 1_000_000 +/// if writer.in_progress_size() > 1_000_000 { +/// writer.flush().unwrap(); +/// } +/// ``` +/// pub struct ArrowWriter { /// Underlying Parquet writer writer: SerializedFileWriter, diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index cdabbcf11237..784959fbd9fb 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -67,6 +67,29 @@ use tokio::io::{AsyncWrite, AsyncWriteExt}; /// It is implemented based on the sync writer [`ArrowWriter`] with an inner buffer. /// The buffered data will be flushed to the writer provided by caller when the /// buffer's threshold is exceeded. +/// +/// ## Memory Limiting +/// +/// The nature of parquet forces buffering of an entire row group before it can be flushed +/// to the underlying writer. This buffering may exceed the configured buffer size +/// of [`AsyncArrowWriter`]. Memory usage can be limited by prematurely flushing the row group, +/// although this will have implications for file size and query performance. See [ArrowWriter] +/// for more information. +/// +/// ```no_run +/// # use tokio::fs::File; +/// # use arrow_array::RecordBatch; +/// # use parquet::arrow::AsyncArrowWriter; +/// # async fn test() { +/// let mut writer: AsyncArrowWriter = todo!(); +/// let batch: RecordBatch = todo!(); +/// writer.write(&batch).await.unwrap(); +/// // Trigger an early flush if buffered size exceeds 1_000_000 +/// if writer.in_progress_size() > 1_000_000 { +/// writer.flush().await.unwrap() +/// } +/// # } +/// ``` pub struct AsyncArrowWriter { /// Underlying sync writer sync_writer: ArrowWriter>, @@ -82,12 +105,9 @@ impl AsyncArrowWriter { /// Try to create a new Async Arrow Writer. /// /// `buffer_size` determines the minimum number of bytes to buffer before flushing - /// to the underlying [`AsyncWrite`] - /// - /// The intermediate buffer will automatically be resized if necessary - /// - /// [`Self::write`] will flush this intermediate buffer if it is at least - /// half full + /// to the underlying [`AsyncWrite`]. However, the nature of writing parquet may + /// force buffering of data in excess of this within the underlying [`ArrowWriter`]. + /// See the documentation on [`ArrowWriter`] for more details pub fn try_new( writer: W, arrow_schema: SchemaRef, @@ -101,12 +121,9 @@ impl AsyncArrowWriter { /// Try to create a new Async Arrow Writer with [`ArrowWriterOptions`]. /// /// `buffer_size` determines the minimum number of bytes to buffer before flushing - /// to the underlying [`AsyncWrite`] - /// - /// The intermediate buffer will automatically be resized if necessary - /// - /// [`Self::write`] will flush this intermediate buffer if it is at least - /// half full + /// to the underlying [`AsyncWrite`]. However, the nature of writing parquet may + /// force buffering of data in excess of this within the underlying [`ArrowWriter`]. + /// See the documentation on [`ArrowWriter`] for more details pub fn try_new_with_options( writer: W, arrow_schema: SchemaRef, diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 4c350c4b1d8c..a8bef98d9e8c 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -481,6 +481,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_length(*length) .build() } + DataType::BinaryView | DataType::Utf8View => unimplemented!("BinaryView/Utf8View not implemented"), DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => { // Decimal precision determines the Parquet physical type to use. diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index a8b631ecc024..25734813a8d8 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -68,6 +68,7 @@ struct APartiallyCompleteRecord { mod tests { use super::*; + use chrono::SubsecRound; use std::{env, fs, io::Write, sync::Arc}; use parquet::{ @@ -202,9 +203,8 @@ mod tests { out.read_from_row_group(&mut *row_group, 1).unwrap(); // correct for rounding error when writing milliseconds - drs[0].now = - chrono::naive::NaiveDateTime::from_timestamp_millis(drs[0].now.timestamp_millis()) - .unwrap(); + + drs[0].now = drs[0].now.trunc_subsecs(3); assert!(out[0].double.is_nan()); // these three lines are necessary because NAN != NAN out[0].double = 0.;