Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Introduce integration test framework. #581

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Make clippy happy
liurenjie1024 committed Sep 13, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 0c92b0c66b7d7eaaf585ba1e80157b22e4c635e2
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ members = [
"crates/iceberg",
"crates/integrations/*",
"crates/test_utils",
"crates/sqllogictest",
"crates/sqllogictests",
]
exclude = ["bindings/python"]

@@ -40,6 +40,7 @@ rust-version = "1.77.1"
anyhow = "1.0.72"
apache-avro = "0.17"
array-init = "2"
arrow = { version = "52" }
arrow-arith = { version = "52" }
arrow-array = { version = "52" }
arrow-ord = { version = "52" }
3 changes: 1 addition & 2 deletions crates/iceberg/src/writer/file_writer/track_writer.rs
Original file line number Diff line number Diff line change
@@ -42,10 +42,9 @@ impl TrackWriter {
impl FileWrite for TrackWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let size = bs.len();
self.inner.write(bs).await.map(|v| {
self.inner.write(bs).await.inspect(|_| {
self.written_size
.fetch_add(size as i64, std::sync::atomic::Ordering::Relaxed);
v
})
}

4 changes: 0 additions & 4 deletions crates/sqllogictest/src/error.rs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "sqllogictest"
name = "sqllogictests"
version.workspace = true
edition.workspace = true
homepage.workspace = true
@@ -8,10 +8,11 @@ license.workspace = true
rust-version.workspace = true

[dependencies]
arrow-schema = { workspace = true }
arrow-array= { workspace = true }
arrow = { workspace = true }
# For spark-connect-rs
arrow_51 = { version = "51", package = "arrow"}
async-trait = { workspace = true }
sqllogictest = "0.21.0"
sqllogictest = "0.22"
datafusion = { workspace = true, default-features = true}
datafusion-common = { workspace = true, default-features = true}
thiserror = "1.0.63"
82 changes: 82 additions & 0 deletions crates/sqllogictests/src/display/conversion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use arrow::array::types::{Decimal128Type, Decimal256Type, DecimalType};
use arrow::datatypes::i256;
use bigdecimal::BigDecimal;
use half::f16;
use rust_decimal::prelude::*;

/// Represents a constant for NULL string in your database.
pub const NULL_STR: &str = "NULL";

pub(crate) fn bool_to_str(value: bool) -> String {
if value {
"true".to_string()
} else {
"false".to_string()
}
}

pub(crate) fn varchar_to_str(value: &str) -> String {
if value.is_empty() {
"(empty)".to_string()
} else {
value.trim_end_matches('\n').to_string()
}
}

pub(crate) fn f16_to_str(value: f16) -> String {
if value.is_nan() {
// The sign of NaN can be different depending on platform.
// So the string representation of NaN ignores the sign.
"NaN".to_string()
} else if value == f16::INFINITY {
"Infinity".to_string()
} else if value == f16::NEG_INFINITY {
"-Infinity".to_string()
} else {
big_decimal_to_str(BigDecimal::from_str(&value.to_string()).unwrap())
}
}

pub(crate) fn f32_to_str(value: f32) -> String {
if value.is_nan() {
// The sign of NaN can be different depending on platform.
// So the string representation of NaN ignores the sign.
"NaN".to_string()
} else if value == f32::INFINITY {
"Infinity".to_string()
} else if value == f32::NEG_INFINITY {
"-Infinity".to_string()
} else {
big_decimal_to_str(BigDecimal::from_str(&value.to_string()).unwrap())
}
}

pub(crate) fn f64_to_str(value: f64) -> String {
if value.is_nan() {
// The sign of NaN can be different depending on platform.
// So the string representation of NaN ignores the sign.
"NaN".to_string()
} else if value == f64::INFINITY {
"Infinity".to_string()
} else if value == f64::NEG_INFINITY {
"-Infinity".to_string()
} else {
big_decimal_to_str(BigDecimal::from_str(&value.to_string()).unwrap())
}
}

pub(crate) fn i128_to_str(value: i128, precision: &u8, scale: &i8) -> String {
big_decimal_to_str(
BigDecimal::from_str(&Decimal128Type::format_decimal(value, *precision, *scale)).unwrap(),
)
}

pub(crate) fn i256_to_str(value: i256, precision: &u8, scale: &i8) -> String {
big_decimal_to_str(
BigDecimal::from_str(&Decimal256Type::format_decimal(value, *precision, *scale)).unwrap(),
)
}

pub(crate) fn big_decimal_to_str(value: BigDecimal) -> String {
value.round(12).normalized().to_string()
}
Original file line number Diff line number Diff line change
@@ -1,26 +1,10 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::types::{Decimal128Type, Decimal256Type, DecimalType};
use arrow_51::array::types::{Decimal128Type, Decimal256Type, DecimalType};
use arrow_51::datatypes::i256;
use bigdecimal::BigDecimal;
use datafusion_common::arrow::datatypes::i256;
use half::f16;
use rust_decimal::prelude::*;


/// Represents a constant for NULL string in your database.
pub const NULL_STR: &str = "NULL";

@@ -84,18 +68,16 @@ pub(crate) fn f64_to_str(value: f64) -> String {

pub(crate) fn i128_to_str(value: i128, precision: &u8, scale: &i8) -> String {
big_decimal_to_str(
BigDecimal::from_str(&Decimal128Type::format_decimal(value, *precision, *scale))
.unwrap(),
BigDecimal::from_str(&Decimal128Type::format_decimal(value, *precision, *scale)).unwrap(),
)
}

pub(crate) fn i256_to_str(value: i256, precision: &u8, scale: &i8) -> String {
big_decimal_to_str(
BigDecimal::from_str(&Decimal256Type::format_decimal(value, *precision, *scale))
.unwrap(),
BigDecimal::from_str(&Decimal256Type::format_decimal(value, *precision, *scale)).unwrap(),
)
}

pub(crate) fn big_decimal_to_str(value: BigDecimal) -> String {
value.round(12).normalized().to_string()
}
}
4 changes: 4 additions & 0 deletions crates/sqllogictests/src/display/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod conversion;
pub mod conversion_51;
pub mod normalize;
pub mod normalize_51;
Original file line number Diff line number Diff line change
@@ -15,14 +15,16 @@
// specific language governing permissions and limitations
// under the License.

use crate::engine::output::DFColumnType;
use anyhow::anyhow;
use arrow_array::{ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array, LargeStringArray, RecordBatch, StringArray, StringViewArray};
use arrow_schema::{DataType, Fields};
use datafusion::arrow::util::display::ArrayFormatter;
use arrow::array::{
ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float16Array, Float32Array,
Float64Array, LargeStringArray, RecordBatch, StringArray, StringViewArray,
};
use arrow::datatypes::{DataType, Fields};
use arrow::util::display::ArrayFormatter;
use datafusion_common::format::DEFAULT_FORMAT_OPTIONS;

use crate::engine::conversion::*;
use crate::display::conversion::*;
use crate::engine::output::DFColumnType;

/// Converts `batches` to a result as expected by sqllogicteset.
pub(crate) fn convert_batches(batches: Vec<RecordBatch>) -> anyhow::Result<Vec<Vec<String>>> {
@@ -35,16 +37,13 @@ pub(crate) fn convert_batches(batches: Vec<RecordBatch>) -> anyhow::Result<Vec<V
// Verify schema
if !schema.contains(&batch.schema()) {
return Err(anyhow!(
"Schema mismatch. Previously had\n{:#?}\n\nGot:\n{:#?}",
&schema,
batch.schema()
),
);
"Schema mismatch. Previously had\n{:#?}\n\nGot:\n{:#?}",
&schema,
batch.schema()
));
}

let new_rows = convert_batch(batch)?
.into_iter()
.flat_map(expand_row);
let new_rows = convert_batch(batch)?.into_iter().flat_map(expand_row);
rows.extend(new_rows);
}
Ok(rows)
@@ -73,10 +72,11 @@ pub(crate) fn convert_batches(batches: Vec<RecordBatch>) -> anyhow::Result<Vec<V
/// "|-- Projection: d.b, MAX(d.a) AS max_a",
/// ]
/// ```
fn expand_row(mut row: Vec<String>) -> impl Iterator<Item=Vec<String>> {
use itertools::Either;
fn expand_row(mut row: Vec<String>) -> impl Iterator<Item = Vec<String>> {
use std::iter::once;

use itertools::Either;

// check last cell
if let Some(cell) = row.pop() {
let lines: Vec<_> = cell.split('\n').collect();
@@ -93,7 +93,7 @@ fn expand_row(mut row: Vec<String>) -> impl Iterator<Item=Vec<String>> {
.enumerate()
.map(|(idx, l)| {
// replace any leading spaces with '-' as
// `sqllogictest` ignores whitespace differences
// `sqllogictests` ignores whitespace differences
//
// See https://github.com/apache/datafusion/issues/6328
let content = l.trim_start();
@@ -141,26 +141,17 @@ macro_rules! get_row_value {
/// [NULL Values and empty strings]: https://duckdb.org/dev/sqllogictest/result_verification#null-values-and-empty-strings
///
/// Floating numbers are rounded to have a consistent representation with the Postgres runner.
///
pub fn cell_to_string(col: &ArrayRef, row: usize) -> anyhow::Result<String> {
if !col.is_valid(row) {
// represent any null value with the string "NULL"
Ok(NULL_STR.to_string())
} else {
match col.data_type() {
DataType::Null => Ok(NULL_STR.to_string()),
DataType::Boolean => {
Ok(bool_to_str(get_row_value!(BooleanArray, col, row)))
}
DataType::Float16 => {
Ok(f16_to_str(get_row_value!(Float16Array, col, row)))
}
DataType::Float32 => {
Ok(f32_to_str(get_row_value!(Float32Array, col, row)))
}
DataType::Float64 => {
Ok(f64_to_str(get_row_value!(Float64Array, col, row)))
}
DataType::Boolean => Ok(bool_to_str(get_row_value!(BooleanArray, col, row))),
DataType::Float16 => Ok(f16_to_str(get_row_value!(Float16Array, col, row))),
DataType::Float32 => Ok(f32_to_str(get_row_value!(Float32Array, col, row))),
DataType::Float64 => Ok(f64_to_str(get_row_value!(Float64Array, col, row))),
DataType::Decimal128(precision, scale) => {
let value = get_row_value!(Decimal128Array, col, row);
Ok(i128_to_str(value, precision, scale))
@@ -169,19 +160,9 @@ pub fn cell_to_string(col: &ArrayRef, row: usize) -> anyhow::Result<String> {
let value = get_row_value!(Decimal256Array, col, row);
Ok(i256_to_str(value, precision, scale))
}
DataType::LargeUtf8 => Ok(varchar_to_str(get_row_value!(
LargeStringArray,
col,
row
))),
DataType::Utf8 => {
Ok(varchar_to_str(get_row_value!(StringArray, col, row)))
}
DataType::Utf8View => Ok(varchar_to_str(get_row_value!(
StringViewArray,
col,
row
))),
DataType::LargeUtf8 => Ok(varchar_to_str(get_row_value!(LargeStringArray, col, row))),
DataType::Utf8 => Ok(varchar_to_str(get_row_value!(StringArray, col, row))),
DataType::Utf8View => Ok(varchar_to_str(get_row_value!(StringViewArray, col, row))),
_ => {
let f = ArrayFormatter::try_new(col.as_ref(), &DEFAULT_FORMAT_OPTIONS);
Ok(f.unwrap().value(row).to_string())
@@ -210,15 +191,12 @@ pub(crate) fn convert_schema_to_types(columns: &Fields) -> Vec<DFColumnType> {
| DataType::Float64
| DataType::Decimal128(_, _)
| DataType::Decimal256(_, _) => DFColumnType::Float,
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
DFColumnType::Text
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => DFColumnType::Text,
DataType::Date32 | DataType::Date64 | DataType::Time32(_) | DataType::Time64(_) => {
DFColumnType::DateTime
}
DataType::Date32
| DataType::Date64
| DataType::Time32(_)
| DataType::Time64(_) => DFColumnType::DateTime,
DataType::Timestamp(_, _) => DFColumnType::Timestamp,
_ => DFColumnType::Another,
})
.collect()
}
}
205 changes: 205 additions & 0 deletions crates/sqllogictests/src/display/normalize_51.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use anyhow::anyhow;
use arrow_51::util::display::{DurationFormat, FormatOptions};
use arrow_51::array::{
ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float16Array, Float32Array,
Float64Array, LargeStringArray, RecordBatch, StringArray, StringViewArray,
};
use arrow_51::datatypes::{DataType, Fields};
use arrow_51::util::display::ArrayFormatter;
use crate::display::conversion_51::*;
use crate::engine::output::DFColumnType;

const DEFAULT_FORMAT_OPTIONS: FormatOptions<'static> =
FormatOptions::new().with_duration_format(DurationFormat::Pretty);

/// Converts `batches` to a result as expected by sqllogicteset.
pub(crate) fn convert_batches(batches: Vec<RecordBatch>) -> anyhow::Result<Vec<Vec<String>>> {
if batches.is_empty() {
Ok(vec![])
} else {
let schema = batches[0].schema();
let mut rows = vec![];
for batch in batches {
// Verify schema
if !schema.contains(&batch.schema()) {
return Err(anyhow!(
"Schema mismatch. Previously had\n{:#?}\n\nGot:\n{:#?}",
&schema,
batch.schema()
));
}

let new_rows = convert_batch(batch)?.into_iter().flat_map(expand_row);
rows.extend(new_rows);
}
Ok(rows)
}
}

/// special case rows that have newlines in them (like explain plans)
//
/// Transform inputs like:
/// ```text
/// [
/// "logical_plan",
/// "Sort: d.b ASC NULLS LAST\n Projection: d.b, MAX(d.a) AS max_a",
/// ]
/// ```
///
/// Into one cell per line, adding lines if necessary
/// ```text
/// [
/// "logical_plan",
/// ]
/// [
/// "Sort: d.b ASC NULLS LAST",
/// ]
/// [ <--- newly added row
/// "|-- Projection: d.b, MAX(d.a) AS max_a",
/// ]
/// ```
fn expand_row(mut row: Vec<String>) -> impl Iterator<Item = Vec<String>> {
use std::iter::once;

use itertools::Either;

// check last cell
if let Some(cell) = row.pop() {
let lines: Vec<_> = cell.split('\n').collect();

// no newlines in last cell
if lines.len() < 2 {
row.push(cell);
return Either::Left(once(row));
}

// form new rows with each additional line
let new_lines: Vec<_> = lines
.into_iter()
.enumerate()
.map(|(idx, l)| {
// replace any leading spaces with '-' as
// `sqllogictests` ignores whitespace differences
//
// See https://github.com/apache/datafusion/issues/6328
let content = l.trim_start();
let new_prefix = "-".repeat(l.len() - content.len());
// maintain for each line a number, so
// reviewing explain result changes is easier
let line_num = idx + 1;
vec![format!("{line_num:02}){new_prefix}{content}")]
})
.collect();

Either::Right(once(row).chain(new_lines))
} else {
Either::Left(once(row))
}
}

/// Convert a single batch to a `Vec<Vec<String>>` for comparison
fn convert_batch(batch: RecordBatch) -> anyhow::Result<Vec<Vec<String>>> {
(0..batch.num_rows())
.map(|row| {
batch
.columns()
.iter()
.map(|col| cell_to_string(col, row))
.collect::<anyhow::Result<Vec<String>>>()
})
.collect()
}

macro_rules! get_row_value {
($array_type:ty, $column: ident, $row: ident) => {{
let array = $column.as_any().downcast_ref::<$array_type>().unwrap();

array.value($row)
}};
}

/// Normalizes the content of a single cell in RecordBatch prior to printing.
///
/// This is to make the output comparable to the semi-standard .slt format
///
/// Normalizations applied to [NULL Values and empty strings]
///
/// [NULL Values and empty strings]: https://duckdb.org/dev/sqllogictest/result_verification#null-values-and-empty-strings
///
/// Floating numbers are rounded to have a consistent representation with the Postgres runner.
pub fn cell_to_string(col: &ArrayRef, row: usize) -> anyhow::Result<String> {
if !col.is_valid(row) {
// represent any null value with the string "NULL"
Ok(NULL_STR.to_string())
} else {
match col.data_type() {
DataType::Null => Ok(NULL_STR.to_string()),
DataType::Boolean => Ok(bool_to_str(get_row_value!(BooleanArray, col, row))),
DataType::Float16 => Ok(f16_to_str(get_row_value!(Float16Array, col, row))),
DataType::Float32 => Ok(f32_to_str(get_row_value!(Float32Array, col, row))),
DataType::Float64 => Ok(f64_to_str(get_row_value!(Float64Array, col, row))),
DataType::Decimal128(precision, scale) => {
let value = get_row_value!(Decimal128Array, col, row);
Ok(i128_to_str(value, precision, scale))
}
DataType::Decimal256(precision, scale) => {
let value = get_row_value!(Decimal256Array, col, row);
Ok(i256_to_str(value, precision, scale))
}
DataType::LargeUtf8 => Ok(varchar_to_str(get_row_value!(LargeStringArray, col, row))),
DataType::Utf8 => Ok(varchar_to_str(get_row_value!(StringArray, col, row))),
DataType::Utf8View => Ok(varchar_to_str(get_row_value!(StringViewArray, col, row))),
_ => {
let f = ArrayFormatter::try_new(col.as_ref(), &DEFAULT_FORMAT_OPTIONS);
Ok(f.unwrap().value(row).to_string())
}
}
}
}

/// Converts columns to a result as expected by sqllogicteset.
pub(crate) fn convert_schema_to_types(columns: &Fields) -> Vec<DFColumnType> {
columns
.iter()
.map(|f| f.data_type())
.map(|data_type| match data_type {
DataType::Boolean => DFColumnType::Boolean,
DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64 => DFColumnType::Integer,
DataType::Float16
| DataType::Float32
| DataType::Float64
| DataType::Decimal128(_, _)
| DataType::Decimal256(_, _) => DFColumnType::Float,
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => DFColumnType::Text,
DataType::Date32 | DataType::Date64 | DataType::Time32(_) | DataType::Time64(_) => {
DFColumnType::DateTime
}
DataType::Timestamp(_, _) => DFColumnType::Timestamp,
_ => DFColumnType::Another,
})
.collect()
}
Original file line number Diff line number Diff line change
@@ -15,37 +15,36 @@
// specific language governing permissions and limitations
// under the License.

use arrow_array::RecordBatch;
use async_trait::async_trait;
use datafusion::physical_plan::common::collect;
use datafusion::physical_plan::execute_stream;
use datafusion::prelude::{SessionConfig, SessionContext};
use sqllogictest::{AsyncDB, DBOutput};
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use arrow::array::RecordBatch;
use async_trait::async_trait;
use datafusion::catalog::CatalogProvider;
use toml::Table;
use datafusion::physical_plan::common::collect;
use datafusion::physical_plan::execute_stream;
use datafusion::prelude::{SessionConfig, SessionContext};
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
use iceberg_datafusion::IcebergCatalogProvider;
use crate::engine::normalize;
use sqllogictest::{AsyncDB, DBOutput};
use toml::Table;

use crate::display::normalize;
use crate::engine::output::{DFColumnType, DFOutput};
use crate::error::{Result, Error};
use crate::error::{Error, Result};

pub struct DataFusionEngine {
ctx: SessionContext,
}

impl Default for DataFusionEngine {
fn default() -> Self {
let config = SessionConfig::new()
.with_target_partitions(4);
let config = SessionConfig::new().with_target_partitions(4);

let ctx = SessionContext::new_with_config(config);

Self {
ctx
}
Self { ctx }
}
}

@@ -55,7 +54,7 @@ impl AsyncDB for DataFusionEngine {
type ColumnType = DFColumnType;

async fn run(&mut self, sql: &str) -> Result<DFOutput> {
run_query(&self.ctx, sql).await.map_err(Box::new)
Ok(run_query(&self.ctx, sql).await?)
}

/// Engine name of current database.
@@ -73,7 +72,7 @@ impl AsyncDB for DataFusionEngine {
}
}

async fn run_query(ctx: &SessionContext, sql: impl Into<String>) -> Result<DFOutput> {
async fn run_query(ctx: &SessionContext, sql: impl Into<String>) -> anyhow::Result<DFOutput> {
let df = ctx.sql(sql.into().as_str()).await?;
let task_ctx = Arc::new(df.task_ctx());
let plan = df.create_physical_plan().await?;
@@ -92,19 +91,17 @@ async fn run_query(ctx: &SessionContext, sql: impl Into<String>) -> Result<DFOut

impl DataFusionEngine {
pub async fn new(configs: &Table) -> Result<Self> {
let config = SessionConfig::new()
.with_target_partitions(4);
let config = SessionConfig::new().with_target_partitions(4);

let ctx = SessionContext::new_with_config(config);
ctx.register_catalog("demo", Self::create_catalog(configs).await?);

Ok(Self {
ctx
})
Ok(Self { ctx })
}

async fn create_catalog(configs: &Table) -> anyhow::Result<Arc<dyn CatalogProvider>> {
let rest_catalog_url = configs.get("url")
let rest_catalog_url = configs
.get("url")
.ok_or_else(|| anyhow!("url not found datafusion engine!"))?
.as_str()
.ok_or_else(|| anyhow!("url is not str"))?;
@@ -115,6 +112,8 @@ impl DataFusionEngine {

let rest_catalog = RestCatalog::new(rest_catalog_config);

Ok(Arc::new(IcebergCatalogProvider::try_new(Arc::new(rest_catalog)).await?))
Ok(Arc::new(
IcebergCatalogProvider::try_new(Arc::new(rest_catalog)).await?,
))
}
}
Original file line number Diff line number Diff line change
@@ -15,23 +15,21 @@
// specific language governing permissions and limitations
// under the License.

use anyhow::{anyhow, bail};
use std::sync::Arc;

use anyhow::anyhow;
pub use datafusion::*;
use sqllogictest::{strict_column_validator, AsyncDB, MakeConnection, Runner};
use std::sync::Arc;
use toml::Table;

mod conversion;
mod output;
mod normalize;
pub mod output;

mod spark;
pub use spark::*;

mod datafusion;
pub use datafusion::*;
use crate::error::Result;

use crate::error::Result;

#[derive(Clone)]
pub enum Engine {
@@ -43,42 +41,38 @@ impl Engine {
pub async fn new(typ: &str, configs: &Table) -> Result<Self> {
let configs = Arc::new(configs.clone());
match typ {
"spark" => {
Ok(Engine::SparkSQL(configs))
}
"datafusion" => {
Ok(Engine::DataFusion(configs))
}
other => Err(anyhow!("Unknown engine type: {other}").into())
"spark" => Ok(Engine::SparkSQL(configs)),
"datafusion" => Ok(Engine::DataFusion(configs)),
other => Err(anyhow!("Unknown engine type: {other}").into()),
}
}

pub async fn run_slt_file(self, slt_file: impl Into<String>) -> anyhow::Result<()> {
let absolute_file = format!("{}/testdata/slts/{}", env!("CARGO_MANIFEST_DIR"), slt_file.into());
let absolute_file = format!(
"{}/testdata/slts/{}",
env!("CARGO_MANIFEST_DIR"),
slt_file.into()
);

match self {
Engine::DataFusion(configs) => {
let configs = configs.clone();
let runner = Runner::new(|| async {
DataFusionEngine::new(&*configs).await
});
let runner = Runner::new(|| async { DataFusionEngine::new(&configs).await });
Self::run_with_runner(runner, absolute_file).await
}
Engine::SparkSQL(configs) => {
let configs = configs.clone();
let runner = Runner::new(|| async {
SparkSqlEngine::new(&*configs).await
});
let runner = Runner::new(|| async { SparkSqlEngine::new(&configs).await });
Self::run_with_runner(runner, absolute_file).await
}
}
}

async fn run_with_runner<D: AsyncDB, M: MakeConnection<Conn = D>>(mut runner: Runner<D, M>,
slt_file: String) -> anyhow::Result<()> {
async fn run_with_runner<D: AsyncDB, M: MakeConnection<Conn = D>>(
mut runner: Runner<D, M>,
slt_file: String,
) -> anyhow::Result<()> {
runner.with_column_validator(strict_column_validator);
Ok(runner
.run_file_async(slt_file)
.await?)
Ok(runner.run_file_async(slt_file).await?)
}
}
}
Original file line number Diff line number Diff line change
@@ -54,4 +54,4 @@ impl ColumnType for DFColumnType {
}
}

pub(crate) type DFOutput = DBOutput<DFColumnType>;
pub(crate) type DFOutput = DBOutput<DFColumnType>;
Original file line number Diff line number Diff line change
@@ -15,18 +15,19 @@
// specific language governing permissions and limitations
// under the License.

use crate::engine::output::DFColumnType;
use crate::engine::{normalize, DataFusionEngine};
use std::time::Duration;

use anyhow::anyhow;
use itertools::Itertools;
use async_trait::async_trait;
use spark_connect_rs::{SparkSession, SparkSessionBuilder};
use sqllogictest::{AsyncDB, DBOutput};
use std::time::Duration;
use async_trait::async_trait;
use toml::Table;
use crate::error::*;

/// SparkSql engine implementation for sqllogictest.
use crate::engine::output::DFColumnType;
use crate::display::normalize_51;
use crate::error::{Error, Result};

/// SparkSql engine implementation for sqllogictests.
pub struct SparkSqlEngine {
session: SparkSession,
}
@@ -37,15 +38,16 @@ impl AsyncDB for SparkSqlEngine {
type ColumnType = DFColumnType;

async fn run(&mut self, sql: &str) -> Result<DBOutput<DFColumnType>> {
let results = self.session
let results = self
.session
.sql(sql)
.await
.map_err(Box::new)?
.map_err(|e| anyhow!(e))?
.collect()
.await
.map_err(Box::new)?;
let types = normalize::convert_schema_to_types(results.schema().fields());
let rows = normalize::convert_batches(results)?;
.map_err(|e| anyhow!(e))?;
let types = normalize_51::convert_schema_to_types(results.schema().fields());
let rows = normalize_51::convert_batches(vec![results])?;

if rows.is_empty() && types.is_empty() {
Ok(DBOutput::StatementComplete(0))
@@ -71,15 +73,17 @@ impl AsyncDB for SparkSqlEngine {

impl SparkSqlEngine {
pub async fn new(configs: &Table) -> Result<Self> {
let url = configs.get("url")
let url = configs
.get("url")
.ok_or_else(|| anyhow!("url property doesn't exist for spark engine"))?
.as_str()
.ok_or_else(|| anyhow!("url property is not a string for spark engine"))?;

let session = SparkSessionBuilder::remote(url)
.app_name("SparkConnect")
.build()
.await?;
.await
.map_err(|e| anyhow!(e))?;

Ok(Self { session })
}
28 changes: 28 additions & 0 deletions crates/sqllogictests/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use std::fmt::{Debug, Display, Formatter};

pub struct Error(pub anyhow::Error);
pub type Result<T> = std::result::Result<T, Error>;

impl Debug for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.0)
}
}

impl Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.0.source()
}
}

impl From<anyhow::Error> for Error {
fn from(value: anyhow::Error) -> Self {
Self(value)
}
}
Original file line number Diff line number Diff line change
@@ -18,6 +18,8 @@
// This lib contains codes copied from
// [Apache Datafusion](https://github.com/apache/datafusion/tree/main/datafusion/sqllogictest)
mod engine;
pub mod schedule;
mod error;
pub mod schedule;
mod display;

pub use error::*;
Original file line number Diff line number Diff line change
@@ -15,14 +15,16 @@
// specific language governing permissions and limitations
// under the License.

use crate::engine::Engine;
use anyhow::anyhow;
use itertools::Itertools;
use std::collections::HashMap;
use std::fs::read_to_string;
use std::path::Path;

use anyhow::anyhow;
use itertools::Itertools;
use toml::{Table, Value};

use crate::engine::Engine;

/// Schedule of engines to run tests.
pub struct Schedule {
/// Map of engine names to engine instances.
@@ -41,35 +43,37 @@ pub struct Step {
impl Schedule {
pub async fn parse<P: AsRef<Path>>(schedule_def_file: P) -> anyhow::Result<Self> {
let content = read_to_string(schedule_def_file)?;
let toml_value = content.parse::<Value>()?.as_table()
let toml_value = content.parse::<Value>()?;
let toml_table = toml_value
.as_table()
.ok_or_else(|| anyhow::anyhow!("Schedule file must be a TOML table"))?;

let engines = Schedule::parse_engines(toml_value).await?;
let steps = Schedule::parse_steps(toml_value).await?;
let engines = Schedule::parse_engines(toml_table).await?;
let steps = Schedule::parse_steps(toml_table).await?;

Ok(Self {
engines,
steps
})
Ok(Self { engines, steps })
}

async fn parse_engines(table: &Table) -> anyhow::Result<HashMap<String, Engine>> {
let engines = table.get("engines")
let engines = table
.get("engines")
.ok_or_else(|| anyhow::anyhow!("Schedule file must have an 'engines' table"))?
.as_table()
.ok_or_else(|| anyhow::anyhow!("'engines' must be a table"))?;

let mut result = HashMap::new();
for (name, engine_config) in engines {
let engine_configs = engine_config.as_table()
let engine_configs = engine_config
.as_table()
.ok_or_else(|| anyhow::anyhow!("Config of engine {name} is not a table"))?;

let typ = engine_configs.get("type")
let typ = engine_configs
.get("type")
.ok_or_else(|| anyhow::anyhow!("Engine {name} doesn't have a 'type' field"))?
.as_str()
.ok_or_else(|| anyhow::anyhow!("Engine {name} type must be a string"))?;

let engine = Engine::build(typ, engine_configs).await?;
let engine = Engine::new(typ, engine_configs).await?;

result.insert(name.clone(), engine);
}
@@ -78,39 +82,38 @@ impl Schedule {
}

async fn parse_steps(table: &Table) -> anyhow::Result<Vec<Step>> {
let steps = table.get("steps")
let steps = table
.get("steps")
.ok_or_else(|| anyhow!("steps not found"))?
.as_array()
.ok_or_else(|| anyhow!("steps is not array"))?;

steps.iter().map(Schedule::parse_step)
.try_collect()
steps.iter().map(Schedule::parse_step).try_collect()
}

fn parse_step(value: &Value) -> anyhow::Result<Step> {
let t = value
.as_table()
.ok_or_else(|| anyhow!("Step must be a table!"))?;

let engine_name = t.get("engine")
let engine_name = t
.get("engine")
.ok_or_else(|| anyhow!("Property engine is missing in step"))?
.as_str()
.ok_or_else(|| anyhow!("Property engine is not a string in step"))?
.to_string();

let sql = t.get("sql")
let sql = t
.get("sql")
.ok_or_else(|| anyhow!("Property sql is missing in step"))?
.as_str()
.ok_or_else(|| anyhow!("Property sqlis not a string in step"))?
.to_string();

Ok(Step {
engine_name,
sql,
})
Ok(Step { engine_name, sql })
}

pub async fn run(mut self) -> anyhow::Result<()> {
pub async fn run(self) -> anyhow::Result<()> {
for step_idx in 0..self.steps.len() {
self.run_step(step_idx).await?;
}
@@ -121,7 +124,9 @@ impl Schedule {
async fn run_step(&self, step_index: usize) -> anyhow::Result<()> {
let step = &self.steps[step_index];

let engine = self.engines.get(&step.engine_name)
let engine = self
.engines
.get(&step.engine_name)
.ok_or_else(|| anyhow!("Engine {} not found!", step.engine_name))?
.clone();

Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::fs;
use std::path::PathBuf;

use iceberg_test_utils::docker::DockerCompose;
use libtest_mimic::{Arguments, Trial};
use sqllogictests::schedule::Schedule;
use tokio::runtime::Handle;
use iceberg_test_utils::docker::DockerCompose;
use sqllogictest::schedule::Schedule;

fn main() {
env_logger::init();
@@ -24,7 +25,7 @@
let tests = collect_trials(rt.handle().clone()).unwrap();

log::info!("Starting tests...");
// Run all tests and exit the application appropriatly.

Check warning on line 28 in crates/sqllogictests/tests/sqllogictests.rs

GitHub Actions / typos check

"appropriatly" should be "appropriately".
let result = libtest_mimic::run(&args, tests);

log::info!("Shutting down tokio runtime...");
@@ -36,23 +37,34 @@
}

fn start_docker() -> anyhow::Result<DockerCompose> {
let docker = DockerCompose::new("sqllogictests",
format!("{}/testdata/docker", env!("CARGO_MANIFEST_DIR")));
let docker = DockerCompose::new(
"sqllogictests",
format!("{}/testdata/docker", env!("CARGO_MANIFEST_DIR")),
);
docker.run();
Ok(docker)
}

fn collect_trials(handle: Handle) -> anyhow::Result<Vec<Trial>> {
let schedule_files = collect_schedule_files()?;
log::debug!("Found {} schedule files: {}", schedule_files.len(), &schedule_files);
log::debug!(
"Found {} schedule files: {:?}",
schedule_files.len(),
&schedule_files
);
let mut trials = Vec::with_capacity(schedule_files.len());
for schedule_file in schedule_files {
let h = handle.clone();
let trial_name = format!("Test schedule {}",
schedule_file.file_name()
.expect("Schedule file should have a name")
.to_string_lossy());
let trial = Trial::new(trial_name, move || h.block_on(run_schedule(schedule_file.clone())));
let trial_name = format!(
"Test schedule {}",
schedule_file
.file_name()
.expect("Schedule file should have a name")
.to_string_lossy()
);
let trial = Trial::test(trial_name, move || {
Ok(h.block_on(run_schedule(schedule_file.clone()))?)
});
trials.push(trial);
}
Ok(trials)
@@ -75,4 +87,4 @@
let schedule = Schedule::parse(schedule_file).await?;
schedule.run().await?;
Ok(())
}
}