Skip to content

Commit

Permalink
feat: list/array/timezone support for postgres output (#4727)
Browse files Browse the repository at this point in the history
* feat: list/array support for postgres output

* fix: implement time zone support for postgrsql

* feat: add a geohash function that returns array

* fix: typo

* fix: lint warnings

* test: add sqlness test

* refactor: check resolution range before convert value

* fix: test result for sqlness

* feat: upgrade pgwire apis
  • Loading branch information
sunng87 authored Sep 22, 2024
1 parent 163cea8 commit 0f99218
Show file tree
Hide file tree
Showing 13 changed files with 742 additions and 113 deletions.
42 changes: 37 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/common/function/src/scalars/geo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;
mod geohash;
mod h3;

use geohash::GeohashFunction;
use geohash::{GeohashFunction, GeohashNeighboursFunction};

use crate::function_registry::FunctionRegistry;

Expand All @@ -26,6 +26,7 @@ impl GeoFunctions {
pub fn register(registry: &FunctionRegistry) {
// geohash
registry.register(Arc::new(GeohashFunction));
registry.register(Arc::new(GeohashNeighboursFunction));
// h3 family
registry.register(Arc::new(h3::H3LatLngToCell));
registry.register(Arc::new(h3::H3LatLngToCellString));
Expand Down
198 changes: 181 additions & 17 deletions src/common/function/src/scalars/geo/geohash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,69 @@ use common_query::error::{self, InvalidFuncArgsSnafu, Result};
use common_query::prelude::{Signature, TypeSignature};
use datafusion::logical_expr::Volatility;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::value::Value;
use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
use datatypes::scalars::{Scalar, ScalarVectorBuilder};
use datatypes::value::{ListValue, Value};
use datatypes::vectors::{ListVectorBuilder, MutableVector, StringVectorBuilder, VectorRef};
use geohash::Coord;
use snafu::{ensure, ResultExt};

use crate::function::{Function, FunctionContext};

macro_rules! ensure_resolution_usize {
($v: ident) => {
if !($v > 0 && $v <= 12) {
Err(BoxedError::new(PlainError::new(
format!("Invalid geohash resolution {}, expect value: [1, 12]", $v),
StatusCode::EngineExecuteQuery,
)))
.context(error::ExecuteSnafu)
} else {
Ok($v as usize)
}
};
}

fn try_into_resolution(v: Value) -> Result<usize> {
match v {
Value::Int8(v) => {
ensure_resolution_usize!(v)
}
Value::Int16(v) => {
ensure_resolution_usize!(v)
}
Value::Int32(v) => {
ensure_resolution_usize!(v)
}
Value::Int64(v) => {
ensure_resolution_usize!(v)
}
Value::UInt8(v) => {
ensure_resolution_usize!(v)
}
Value::UInt16(v) => {
ensure_resolution_usize!(v)
}
Value::UInt32(v) => {
ensure_resolution_usize!(v)
}
Value::UInt64(v) => {
ensure_resolution_usize!(v)
}
_ => unreachable!(),
}
}

/// Function that return geohash string for a given geospatial coordinate.
#[derive(Clone, Debug, Default)]
pub struct GeohashFunction;

const NAME: &str = "geohash";
impl GeohashFunction {
const NAME: &'static str = "geohash";
}

impl Function for GeohashFunction {
fn name(&self) -> &str {
NAME
Self::NAME
}

fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Expand Down Expand Up @@ -93,17 +139,7 @@ impl Function for GeohashFunction {
for i in 0..size {
let lat = lat_vec.get(i).as_f64_lossy();
let lon = lon_vec.get(i).as_f64_lossy();
let r = match resolution_vec.get(i) {
Value::Int8(v) => v as usize,
Value::Int16(v) => v as usize,
Value::Int32(v) => v as usize,
Value::Int64(v) => v as usize,
Value::UInt8(v) => v as usize,
Value::UInt16(v) => v as usize,
Value::UInt32(v) => v as usize,
Value::UInt64(v) => v as usize,
_ => unreachable!(),
};
let r = try_into_resolution(resolution_vec.get(i))?;

let result = match (lat, lon) {
(Some(lat), Some(lon)) => {
Expand All @@ -130,6 +166,134 @@ impl Function for GeohashFunction {

impl fmt::Display for GeohashFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", NAME)
write!(f, "{}", Self::NAME)
}
}

/// Function that return geohash string for a given geospatial coordinate.
#[derive(Clone, Debug, Default)]
pub struct GeohashNeighboursFunction;

impl GeohashNeighboursFunction {
const NAME: &'static str = "geohash_neighbours";
}

impl Function for GeohashNeighboursFunction {
fn name(&self) -> &str {
GeohashNeighboursFunction::NAME
}

fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::list_datatype(
ConcreteDataType::string_datatype(),
))
}

fn signature(&self) -> Signature {
let mut signatures = Vec::new();
for coord_type in &[
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
] {
for resolution_type in &[
ConcreteDataType::int8_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint8_datatype(),
ConcreteDataType::uint16_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::uint64_datatype(),
] {
signatures.push(TypeSignature::Exact(vec![
// latitude
coord_type.clone(),
// longitude
coord_type.clone(),
// resolution
resolution_type.clone(),
]));
}
}
Signature::one_of(signatures, Volatility::Stable)
}

fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
ensure!(
columns.len() == 3,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 3, provided : {}",
columns.len()
),
}
);

let lat_vec = &columns[0];
let lon_vec = &columns[1];
let resolution_vec = &columns[2];

let size = lat_vec.len();
let mut results =
ListVectorBuilder::with_type_capacity(ConcreteDataType::string_datatype(), size);

for i in 0..size {
let lat = lat_vec.get(i).as_f64_lossy();
let lon = lon_vec.get(i).as_f64_lossy();
let r = try_into_resolution(resolution_vec.get(i))?;

let result = match (lat, lon) {
(Some(lat), Some(lon)) => {
let coord = Coord { x: lon, y: lat };
let encoded = geohash::encode(coord, r)
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("Geohash error: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)?;
let neighbours = geohash::neighbors(&encoded)
.map_err(|e| {
BoxedError::new(PlainError::new(
format!("Geohash error: {}", e),
StatusCode::EngineExecuteQuery,
))
})
.context(error::ExecuteSnafu)?;
Some(ListValue::new(
vec![
neighbours.n,
neighbours.nw,
neighbours.w,
neighbours.sw,
neighbours.s,
neighbours.se,
neighbours.e,
neighbours.ne,
]
.into_iter()
.map(Value::from)
.collect(),
ConcreteDataType::string_datatype(),
))
}
_ => None,
};

if let Some(list_value) = result {
results.push(Some(list_value.as_scalar_ref()));
} else {
results.push(None);
}
}

Ok(results.to_vector())
}
}

impl fmt::Display for GeohashNeighboursFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", GeohashNeighboursFunction::NAME)
}
}
2 changes: 1 addition & 1 deletion src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ openmetrics-parser = "0.4"
opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "6bbc3b65e6b19212c4f7fc4f40c20daf6f452deb" }
opentelemetry-proto.workspace = true
parking_lot = "0.12"
pgwire = { version = "0.22", default-features = false, features = ["server-api-ring"] }
pgwire = { version = "0.24.2", default-features = false, features = ["server-api-ring"] }
pin-project = "1.0"
pipeline.workspace = true
postgres-types = { version = "0.2", features = ["with-chrono-0_4", "with-serde_json-1"] }
Expand Down
Loading

0 comments on commit 0f99218

Please sign in to comment.