diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index db3e6838f6a5..f6336cd18af8 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -35,6 +35,7 @@ workspace = true # enable core functions core_expressions = [] crypto_expressions = ["md-5", "sha2", "blake2", "blake3"] +hash_expressions = ["twox-hash"] # enable datetime functions datetime_expressions = [] # Enable encoding by default so the doctests work. In general don't automatically enable all packages. @@ -46,6 +47,7 @@ default = [ "regex_expressions", "string_expressions", "unicode_expressions", + "hash_expressions", ] # enable encode/decode functions encoding_expressions = ["base64", "hex"] @@ -85,6 +87,7 @@ md-5 = { version = "^0.10.0", optional = true } rand = { workspace = true } regex = { workspace = true, optional = true } sha2 = { version = "^0.10.1", optional = true } +twox-hash = { version = "1.6.3", optional = true } unicode-segmentation = { version = "^1.7.1", optional = true } uuid = { version = "1.7", features = ["v4"], optional = true } diff --git a/datafusion/functions/src/hash/mod.rs b/datafusion/functions/src/hash/mod.rs new file mode 100644 index 000000000000..5f204ccd5c95 --- /dev/null +++ b/datafusion/functions/src/hash/mod.rs @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! "xxhash" DataFusion functions + +use datafusion_expr::ScalarUDF; +use std::sync::Arc; + +pub mod xxhash; +make_udf_function!(xxhash::XxHash32Func, xxhash32); +make_udf_function!(xxhash::XxHash64Func, xxhash64); + +pub mod expr_fn { + export_functions!(( + xxhash32, + "Computes the XXHash32 hash of a binary string.", + input + ),( + xxhash64, + "Computes the XXHash64 hash of a binary string.", + input + )); +} + +/// Returns all DataFusion functions defined in this package +pub fn functions() -> Vec> { + vec![xxhash32(), xxhash64()] +} diff --git a/datafusion/functions/src/hash/xxhash.rs b/datafusion/functions/src/hash/xxhash.rs new file mode 100644 index 000000000000..ed0a8e37e014 --- /dev/null +++ b/datafusion/functions/src/hash/xxhash.rs @@ -0,0 +1,279 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, StringArray, Int32Array, Int64Array, UInt32Array, UInt64Array}; +use arrow::datatypes::DataType; +use datafusion_common::{Result, ScalarValue}; +use datafusion_expr::{ + ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, +}; +use twox_hash::{XxHash64, XxHash32}; +use datafusion_macros::user_doc; +use std::any::Any; +use std::hash::Hasher; +use datafusion_common::DataFusionError; +use std::sync::Arc; + +#[user_doc( + doc_section(label = "Hashing Functions"), + description = "Computes the XXHash64 hash of a binary string.", + syntax_example = "xxhash64(expression)", + sql_example = r#"```sql +> select xxhash64('foo'); ++-------------------------------------------+ +| xxhash64(Utf8("foo")) | ++-------------------------------------------+ +| | ++-------------------------------------------+ +```"#, + standard_argument(name = "expression", prefix = "String") +)] +#[derive(Debug)] +pub struct XxHash64Func { + signature: Signature, +} + +impl Default for XxHash64Func { + fn default() -> Self { + Self::new() + } +} + +impl XxHash64Func { + pub fn new() -> Self { + use DataType::*; + Self { + signature: Signature::uniform( + 1, + vec![Utf8View, Utf8, LargeUtf8, Binary, LargeBinary], + Volatility::Immutable, + ), + } + } + + pub fn hash_scalar(&self, value: &ColumnarValue) -> Result { + let value_str = to_string_from_scalar(value)?; + hash_value(&value_str, XxHash64::default(), HashType::U64) + } +} + +impl ScalarUDFImpl for XxHash64Func { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "xxhash64" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn invoke_batch( + &self, + args: &[ColumnarValue], + _number_rows: usize, + ) -> Result { + let input_data = &args[0]; + + let result = match input_data { + ColumnarValue::Array(array) => { + let hash_results = process_array(array, XxHash64::default(), HashType::U64)?; + let hash_array = StringArray::from(hash_results); + Arc::new(hash_array) as Arc + }, + ColumnarValue::Scalar(scalar) => { + let hash_result = self.hash_scalar(&ColumnarValue::Scalar(scalar.clone()))?; + let hash_array = StringArray::from(vec![hash_result]); + Arc::new(hash_array) as Arc + } + }; + + Ok(ColumnarValue::Array(result)) + } + + fn documentation(&self) -> Option<&Documentation> { + self.doc() + } +} + +#[user_doc( + doc_section(label = "Hashing Functions"), + description = "Computes the XXHash32 hash of a binary string.", + syntax_example = "xxhash32(expression)", + sql_example = r#"```sql +> select xxhash32('foo'); ++-------------------------------------------+ +| xxhash32(Utf8("foo")) | ++-------------------------------------------+ +| | ++-------------------------------------------+ +```"#, + standard_argument(name = "expression", prefix = "String") +)] +#[derive(Debug)] +pub struct XxHash32Func { + signature: Signature, +} + +impl Default for XxHash32Func { + fn default() -> Self { + Self::new() + } +} + +impl XxHash32Func { + pub fn new() -> Self { + use DataType::*; + Self { + signature: Signature::uniform( + 1, + vec![Utf8View, Utf8, LargeUtf8, Binary, LargeBinary], + Volatility::Immutable, + ), + } + } + + pub fn hash_scalar(&self, value: &ColumnarValue) -> Result { + let value_str = to_string_from_scalar(value)?; + hash_value(&value_str, XxHash32::default(), HashType::U32) + } +} + +impl ScalarUDFImpl for XxHash32Func { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "xxhash32" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn invoke_batch( + &self, + args: &[ColumnarValue], + _number_rows: usize, + ) -> Result { + let input_data = &args[0]; + + let result = match input_data { + ColumnarValue::Array(array) => { + let hash_results = process_array(array, XxHash32::default(), HashType::U32)?; + let hash_array = StringArray::from(hash_results); + Arc::new(hash_array) as Arc + }, + ColumnarValue::Scalar(scalar) => { + let hash_result = self.hash_scalar(&ColumnarValue::Scalar(scalar.clone()))?; + let hash_array = StringArray::from(vec![hash_result]); + Arc::new(hash_array) as Arc + } + }; + + Ok(ColumnarValue::Array(result)) + } + + fn documentation(&self) -> Option<&Documentation> { + self.doc() + } +} + +// Helper functions + +fn to_string_from_scalar(value: &ColumnarValue) -> Result { + match value { + ColumnarValue::Scalar(scalar) => match scalar { + ScalarValue::Utf8(Some(v)) => Ok(v.clone()), + ScalarValue::Int32(Some(v)) => Ok(v.to_string()), + ScalarValue::Int64(Some(v)) => Ok(v.to_string()), + ScalarValue::UInt32(Some(v)) => Ok(v.to_string()), + ScalarValue::UInt64(Some(v)) => Ok(v.to_string()), + _ => Err(DataFusionError::Internal("Unsupported scalar type".to_string())), + }, + _ => Err(DataFusionError::Internal("Expected a scalar value".to_string())), + } +} + +#[derive(Clone)] +pub enum HashType { + U32, + U64, +} + +fn hash_value(value_str: &str, mut hasher: T, hash_type: HashType) -> Result { + hasher.write(value_str.as_bytes()); + let hash = hasher.finish(); + match hash_type { + HashType::U32 => { + let hash_u32 = hash as u32; + Ok(hex::encode(hash_u32.to_be_bytes())) + }, + HashType::U64 => { + let hash_u64 = hash as u64; + Ok(hex::encode(hash_u64.to_be_bytes())) + }, + } +} + +fn process_array(array: &dyn Array, mut hasher: T, hash_type: HashType) -> Result> { + let mut hash_results: Vec = Vec::with_capacity(array.len()); + for i in 0..array.len() { + if array.is_null(i) { + hash_results.push(String::from("00000000")); // Handle null values + continue; + } + + let value_str = match array.data_type() { + DataType::Utf8 => { + let string_array = array.as_any().downcast_ref::().unwrap(); + string_array.value(i).to_string() + } + DataType::Int32 => { + let int_array = array.as_any().downcast_ref::().unwrap(); + int_array.value(i).to_string() + } + DataType::Int64 => { + let int_array = array.as_any().downcast_ref::().unwrap(); + int_array.value(i).to_string() + } + DataType::UInt32 => { + let uint_array = array.as_any().downcast_ref::().unwrap(); + uint_array.value(i).to_string() + } + DataType::UInt64 => { + let uint_array = array.as_any().downcast_ref::().unwrap(); + uint_array.value(i).to_string() + } + _ => return Err(DataFusionError::Internal("Unsupported array type".to_string())), + }; + + hash_results.push(hash_value(&value_str, &mut hasher, hash_type.clone())?); + } + Ok(hash_results) +} \ No newline at end of file diff --git a/datafusion/functions/src/lib.rs b/datafusion/functions/src/lib.rs index 7278fe3ec536..0d2806255144 100644 --- a/datafusion/functions/src/lib.rs +++ b/datafusion/functions/src/lib.rs @@ -133,6 +133,10 @@ make_stub_package!(crypto, "crypto_expressions"); pub mod unicode; make_stub_package!(unicode, "unicode_expressions"); +#[cfg(feature = "hash_expressions")] +pub mod hash; +make_stub_package!(hash, "hash_expressions"); + #[cfg(any(feature = "datetime_expressions", feature = "unicode_expressions"))] pub mod planner; @@ -158,6 +162,8 @@ pub mod expr_fn { pub use super::string::expr_fn::*; #[cfg(feature = "unicode_expressions")] pub use super::unicode::expr_fn::*; + #[cfg(feature = "hash_expressions")] + pub use super::hash::expr_fn::*; } /// Return all default functions @@ -171,6 +177,7 @@ pub fn all_default_functions() -> Vec> { .chain(crypto::functions()) .chain(unicode::functions()) .chain(string::functions()) + .chain(hash::functions()) .collect::>() }