Skip to content

Commit

Permalink
Update the CONCAT scalar function to support Utf8View (#12224)
Browse files Browse the repository at this point in the history
* wip

* feat: Update the CONCAT scalar function to support Utf8View

* fmt

* fmt and add default return type for concat

* fix clippy lint

Signed-off-by: Devan <[email protected]>

* fmt

Signed-off-by: Devan <[email protected]>

* add more tests for sqllogic

Signed-off-by: Devan <[email protected]>

* make sure no casting with LargeUtf8

* fixing utf8large

* fix large utf8

Signed-off-by: Devan <[email protected]>

* fix large utf8

Signed-off-by: Devan <[email protected]>

* add test

Signed-off-by: Devan <[email protected]>

* fmt

Signed-off-by: Devan <[email protected]>

* make it so Utf8View just returns Utf8

Signed-off-by: Devan <[email protected]>

* wip -- trying to build a stringview with columnar refs

Signed-off-by: Devan <[email protected]>

* built stringview builder but it does allocate a new String each iter :(

Signed-off-by: Devan <[email protected]>

* add some testing

Signed-off-by: Devan <[email protected]>

* clippy

Signed-off-by: Devan <[email protected]>

---------

Signed-off-by: Devan <[email protected]>
  • Loading branch information
devanbenz authored Sep 3, 2024
1 parent bf6c82f commit e4a9424
Show file tree
Hide file tree
Showing 3 changed files with 416 additions and 34 deletions.
195 changes: 190 additions & 5 deletions datafusion/functions/src/string/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ use std::sync::Arc;

use arrow::array::{
new_null_array, Array, ArrayAccessor, ArrayDataBuilder, ArrayIter, ArrayRef,
GenericStringArray, GenericStringBuilder, OffsetSizeTrait, StringArray,
StringBuilder, StringViewArray,
GenericStringArray, GenericStringBuilder, LargeStringArray, OffsetSizeTrait,
StringArray, StringBuilder, StringViewArray, StringViewBuilder,
};
use arrow::buffer::{Buffer, MutableBuffer, NullBuffer};
use arrow::datatypes::DataType;

use datafusion_common::cast::{as_generic_string_array, as_string_view_array};
use datafusion_common::Result;
use datafusion_common::{exec_err, ScalarValue};
Expand Down Expand Up @@ -249,26 +248,41 @@ where
}
}

#[derive(Debug)]
pub(crate) enum ColumnarValueRef<'a> {
Scalar(&'a [u8]),
NullableArray(&'a StringArray),
NonNullableArray(&'a StringArray),
NullableLargeStringArray(&'a LargeStringArray),
NonNullableLargeStringArray(&'a LargeStringArray),
NullableStringViewArray(&'a StringViewArray),
NonNullableStringViewArray(&'a StringViewArray),
}

impl<'a> ColumnarValueRef<'a> {
#[inline]
pub fn is_valid(&self, i: usize) -> bool {
match &self {
Self::Scalar(_) | Self::NonNullableArray(_) => true,
Self::Scalar(_)
| Self::NonNullableArray(_)
| Self::NonNullableLargeStringArray(_)
| Self::NonNullableStringViewArray(_) => true,
Self::NullableArray(array) => array.is_valid(i),
Self::NullableStringViewArray(array) => array.is_valid(i),
Self::NullableLargeStringArray(array) => array.is_valid(i),
}
}

#[inline]
pub fn nulls(&self) -> Option<NullBuffer> {
match &self {
Self::Scalar(_) | Self::NonNullableArray(_) => None,
Self::Scalar(_)
| Self::NonNullableArray(_)
| Self::NonNullableStringViewArray(_)
| Self::NonNullableLargeStringArray(_) => None,
Self::NullableArray(array) => array.nulls().cloned(),
Self::NullableStringViewArray(array) => array.nulls().cloned(),
Self::NullableLargeStringArray(array) => array.nulls().cloned(),
}
}
}
Expand Down Expand Up @@ -387,10 +401,30 @@ impl StringArrayBuilder {
.extend_from_slice(array.value(i).as_bytes());
}
}
ColumnarValueRef::NullableLargeStringArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
}
ColumnarValueRef::NullableStringViewArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
}
ColumnarValueRef::NonNullableArray(array) => {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
ColumnarValueRef::NonNullableLargeStringArray(array) => {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
ColumnarValueRef::NonNullableStringViewArray(array) => {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
}
}

Expand All @@ -416,6 +450,157 @@ impl StringArrayBuilder {
}
}

pub(crate) struct StringViewArrayBuilder {
builder: StringViewBuilder,
block: String,
}

impl StringViewArrayBuilder {
pub fn with_capacity(_item_capacity: usize, data_capacity: usize) -> Self {
let builder = StringViewBuilder::with_capacity(data_capacity);
Self {
builder,
block: String::new(),
}
}

pub fn write<const CHECK_VALID: bool>(
&mut self,
column: &ColumnarValueRef,
i: usize,
) {
match column {
ColumnarValueRef::Scalar(s) => {
self.block.push_str(std::str::from_utf8(s).unwrap());
}
ColumnarValueRef::NullableArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.block.push_str(
std::str::from_utf8(array.value(i).as_bytes()).unwrap(),
);
}
}
ColumnarValueRef::NullableLargeStringArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.block.push_str(
std::str::from_utf8(array.value(i).as_bytes()).unwrap(),
);
}
}
ColumnarValueRef::NullableStringViewArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.block.push_str(
std::str::from_utf8(array.value(i).as_bytes()).unwrap(),
);
}
}
ColumnarValueRef::NonNullableArray(array) => {
self.block
.push_str(std::str::from_utf8(array.value(i).as_bytes()).unwrap());
}
ColumnarValueRef::NonNullableLargeStringArray(array) => {
self.block
.push_str(std::str::from_utf8(array.value(i).as_bytes()).unwrap());
}
ColumnarValueRef::NonNullableStringViewArray(array) => {
self.block
.push_str(std::str::from_utf8(array.value(i).as_bytes()).unwrap());
}
}
}

pub fn append_offset(&mut self) {
self.builder.append_value(&self.block);
self.block = String::new();
}

pub fn finish(mut self) -> StringViewArray {
self.builder.finish()
}
}

pub(crate) struct LargeStringArrayBuilder {
offsets_buffer: MutableBuffer,
value_buffer: MutableBuffer,
}

impl LargeStringArrayBuilder {
pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
let mut offsets_buffer = MutableBuffer::with_capacity(
(item_capacity + 1) * std::mem::size_of::<i64>(),
);
// SAFETY: the first offset value is definitely not going to exceed the bounds.
unsafe { offsets_buffer.push_unchecked(0_i64) };
Self {
offsets_buffer,
value_buffer: MutableBuffer::with_capacity(data_capacity),
}
}

pub fn write<const CHECK_VALID: bool>(
&mut self,
column: &ColumnarValueRef,
i: usize,
) {
match column {
ColumnarValueRef::Scalar(s) => {
self.value_buffer.extend_from_slice(s);
}
ColumnarValueRef::NullableArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
}
ColumnarValueRef::NullableLargeStringArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
}
ColumnarValueRef::NullableStringViewArray(array) => {
if !CHECK_VALID || array.is_valid(i) {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
}
ColumnarValueRef::NonNullableArray(array) => {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
ColumnarValueRef::NonNullableLargeStringArray(array) => {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
ColumnarValueRef::NonNullableStringViewArray(array) => {
self.value_buffer
.extend_from_slice(array.value(i).as_bytes());
}
}
}

pub fn append_offset(&mut self) {
let next_offset: i64 = self
.value_buffer
.len()
.try_into()
.expect("byte array offset overflow");
unsafe { self.offsets_buffer.push_unchecked(next_offset) };
}

pub fn finish(self, null_buffer: Option<NullBuffer>) -> LargeStringArray {
let array_builder = ArrayDataBuilder::new(DataType::LargeUtf8)
.len(self.offsets_buffer.len() / std::mem::size_of::<i64>() - 1)
.add_buffer(self.offsets_buffer.into())
.add_buffer(self.value_buffer.into())
.nulls(null_buffer);
// SAFETY: all data that was appended was valid Large UTF8 and the values
// and offsets were created correctly
let array_data = unsafe { array_builder.build_unchecked() };
LargeStringArray::from(array_data)
}
}

fn case_conversion_array<'a, O, F>(array: &'a ArrayRef, op: F) -> Result<ArrayRef>
where
O: OffsetSizeTrait,
Expand Down
Loading

0 comments on commit e4a9424

Please sign in to comment.