Skip to content

Commit

Permalink
fix(query): fix parse string to JSON value (databendlabs#14177)
Browse files Browse the repository at this point in the history
* fix(query): fix parse string to JSON value

* fix
  • Loading branch information
b41sh authored Dec 28, 2023
1 parent 063ef2c commit fd372c4
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::type_check::check_function;
use databend_common_expression::types::DataType;
use databend_common_expression::BlockEntry;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchemaRef;
Expand Down Expand Up @@ -57,11 +59,19 @@ where Self: Transform
display_name: from.name().clone(),
};
if from != to {
Expr::Cast {
span: None,
is_try: false,
expr: Box::new(expr),
dest_type: to.data_type().clone(),
if from.data_type().remove_nullable() == DataType::String
&& to.data_type().remove_nullable() == DataType::Variant
{
// parse string to JSON value, avoid cast string to JSON string
check_function(None, "parse_json", &[], &[expr], &BUILTIN_FUNCTIONS)
.unwrap()
} else {
Expr::Cast {
span: None,
is_try: false,
expr: Box::new(expr),
dest_type: to.data_type().clone(),
}
}
} else {
expr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::type_check::check_function;
use databend_common_expression::types::DataType;
use databend_common_expression::BlockEntry;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchemaRef;
Expand Down Expand Up @@ -72,11 +74,19 @@ impl Transform for TransformRuntimeCastSchema {
display_name: to.name().clone(),
};
if &from.data_type != to.data_type() {
Expr::Cast {
span: None,
is_try: false,
expr: Box::new(expr),
dest_type: to.data_type().clone(),
if from.data_type.remove_nullable() == DataType::String
&& to.data_type().remove_nullable() == DataType::Variant
{
// parse string to JSON value, avoid cast string to JSON string
check_function(None, "parse_json", &[], &[expr], &BUILTIN_FUNCTIONS)
.unwrap()
} else {
Expr::Cast {
span: None,
is_try: false,
expr: Box::new(expr),
dest_type: to.data_type().clone(),
}
}
} else {
expr
Expand Down
86 changes: 61 additions & 25 deletions src/query/sql/src/planner/binder/copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use databend_common_ast::ast::TypeName;
use databend_common_ast::parser::parser_values_with_placeholder;
use databend_common_ast::parser::tokenize_sql;
use databend_common_ast::Visitor;
use databend_common_catalog::plan::ParquetReadOptions;
use databend_common_catalog::plan::StageTableInfo;
use databend_common_catalog::table_context::StageAttachment;
use databend_common_catalog::table_context::TableContext;
Expand All @@ -44,12 +45,15 @@ use databend_common_expression::DataSchema;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::Evaluator;
use databend_common_expression::Scalar;
use databend_common_expression::TableDataType;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_meta_app::principal::FileFormatOptionsAst;
use databend_common_meta_app::principal::FileFormatParams;
use databend_common_meta_app::principal::NullAs;
use databend_common_meta_app::principal::StageInfo;
use databend_common_storage::StageFilesInfo;
use databend_common_storages_parquet::Parquet2Table;
use databend_common_storages_parquet::ParquetRSTable;
use databend_common_users::UserApiProvider;
use indexmap::IndexMap;
use log::debug;
Expand Down Expand Up @@ -174,34 +178,66 @@ impl<'a> Binder {
bind_ctx: &BindContext,
plan: CopyIntoTablePlan,
) -> Result<Plan> {
if let FileFormatParams::Parquet(fmt) = &plan.stage_table_info.stage_info.file_format_params && fmt.missing_field_as == NullAs::Error {
let select_list = plan
.required_source_schema
.fields()
.iter()
.map(|f| {
let column = Expr::ColumnRef {
span: None,
database: None,
table: None,
column: AstColumnID::Name(Identifier::from_name(f.name().to_string())),
};
let expr = if f.data_type().remove_nullable() == DataType::Variant {
Expr::Cast {
span: None,
expr: Box::new(column),
target_type: TypeName::Variant,
pg_style: false,
if let FileFormatParams::Parquet(fmt) = &plan.stage_table_info.stage_info.file_format_params && fmt.missing_field_as == NullAs::Error {
let table_ctx = self.ctx.clone();
let use_parquet2 = table_ctx.get_settings().get_use_parquet2()?;
let stage_info = plan.stage_table_info.stage_info.clone();
let files_info = plan.stage_table_info.files_info.clone();
let read_options = ParquetReadOptions::default();
let table = if use_parquet2 {
Parquet2Table::create(table_ctx, stage_info, files_info, read_options, None).await?
} else {
ParquetRSTable::create(table_ctx, stage_info, files_info, read_options, None)
.await?
};
let table_info = table.get_table_info();
let table_schema = table_info.schema();

let mut select_list = Vec::with_capacity(plan.required_source_schema.num_fields());
for dest_field in plan.required_source_schema.fields().iter() {
let column = Expr::ColumnRef {
span: None,
database: None,
table: None,
column: AstColumnID::Name(Identifier::from_name(
dest_field.name().to_string(),
)),
};
let expr = match table_schema.field_with_name(dest_field.name()) {
Ok(src_field) => {
// parse string to JSON value, avoid cast string to JSON string
if dest_field.data_type().remove_nullable() == DataType::Variant {
if src_field.data_type().remove_nullable() == TableDataType::String {
Expr::FunctionCall {
span: None,
distinct: false,
name: Identifier::from_name("parse_json".to_string()),
args: vec![column],
params: vec![],
window: None,
lambda: None,
}
} else {
Expr::Cast {
span: None,
expr: Box::new(column),
target_type: TypeName::Variant,
pg_style: false,
}
}
} else {
column
}
} else {
}
Err(_) => {
column
};
SelectTarget::AliasedExpr {
expr: Box::new(expr),
alias: None,
}
})
.collect::<Vec<_>>();
};
select_list.push(SelectTarget::AliasedExpr {
expr: Box::new(expr),
alias: None,
});
}

self.bind_copy_from_query_into_table(bind_ctx, plan, &select_list, &None)
.await
Expand Down
Binary file added tests/data/parquet/json_string.parquet
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -195,5 +195,25 @@ select * from t4 where id >= 1 and 'vv' = m:b:k
----
3 {"a":"v","b":{"k":"vv"},"c":[]}

# auto cast string to json value
statement ok
INSERT INTO t4 SELECT 4, parse_json('{"a":"x2"}')

statement ok
UPDATE t4 SET m = '{"a":"x1"}' WHERE id = 3

statement ok
REPLACE INTO t4 ON CONFLICT(id) values(5,'{"a":"x3"}'), (6,'{"a":"x4"}')

query ITT
select id, m, m['a'] from t4 order by id asc
----
1 {"a":2,"b":{"k":2},"c":[10,11,12]} 2
2 {"a":true,"b":{"k":false},"c":[1,2,3]} true
3 {"a":"x1"} "x1"
4 {"a":"x2"} "x2"
5 {"a":"x3"} "x3"
6 {"a":"x4"} "x4"

statement ok
DROP DATABASE db1
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,22 @@ select * from t1 order by id
1 {"a":1,"b":"a"}
2 {"a":3,"b":"b"}
3 {"a":3,"b":"c"}

statement ok
drop table if exists t2

statement ok
create table t2(a string null, b variant null)

query
copy into t2 from @data/parquet/json_string.parquet
----
parquet/json_string.parquet 4 0 NULL NULL

query
select * from t2 order by a
----
1 {"k1":"v1","k2":"v2"}
2 {"k":{"k1":123,"k2":456}}
3 [1,2,3]
4 ["a","b"]

0 comments on commit fd372c4

Please sign in to comment.