Skip to content

Commit

Permalink
feat(query): copy into allow disable_variant_check for parquet file (
Browse files Browse the repository at this point in the history
…databendlabs#14444)

* feat(query): copy into allow `disable_variant_check` for parquet file

* fix

* fix

---------

Co-authored-by: Bohu <[email protected]>
  • Loading branch information
b41sh and BohuTANG authored Jan 24, 2024
1 parent 19e2c9a commit ea5eaf0
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/query/expression/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub enum FunctionEval {
pub struct FunctionContext {
pub tz: TzLUT,
pub rounding_mode: bool,
pub disable_variant_check: bool,

pub openai_api_chat_base_url: String,
pub openai_api_embedding_base_url: String,
Expand Down
12 changes: 10 additions & 2 deletions src/query/functions/src/scalars/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ pub fn register(registry: &mut FunctionRegistry) {
value.write_to_vec(&mut output.data);
}
Err(err) => {
ctx.set_error(output.len(), err.to_string());
if ctx.func_ctx.disable_variant_check {
output.put_str(&val);
} else {
ctx.set_error(output.len(), err.to_string());
}
}
}
output.commit_row();
Expand All @@ -142,7 +146,11 @@ pub fn register(registry: &mut FunctionRegistry) {
value.write_to_vec(&mut output.data);
}
Err(err) => {
ctx.set_error(output.len(), err.to_string());
if ctx.func_ctx.disable_variant_check {
output.put_str(s);
} else {
ctx.set_error(output.len(), err.to_string());
}
}
}
output.commit_row();
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,12 +601,14 @@ impl TableContext for QueryContext {
let tz = TzFactory::instance().get_by_name(&tz)?;
let numeric_cast_option = self.get_settings().get_numeric_cast_option()?;
let rounding_mode = numeric_cast_option.as_str() == "rounding";
let disable_variant_check = self.get_settings().get_disable_variant_check()?;

let query_config = &GlobalConfig::instance().query;

Ok(FunctionContext {
tz,
rounding_mode,
disable_variant_check,

openai_api_key: query_config.openai_api_key.clone(),
openai_api_version: query_config.openai_api_version.clone(),
Expand Down
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,12 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("disable_variant_check", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Disable variant check to allow insert invalid JSON values",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("cost_factor_hash_table_per_row", DefaultSettingValue {
value: UserSettingValue::UInt64(COST_FACTOR_HASH_TABLE_PER_ROW),
desc: "Cost factor of building hash table for a data row",
Expand Down
8 changes: 8 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,14 @@ impl Settings {
)
}

pub fn get_disable_variant_check(&self) -> Result<bool> {
Ok(self.try_get_u64("disable_variant_check")? != 0)
}

pub fn set_disable_variant_check(&self, val: bool) -> Result<()> {
self.try_set_u64("disable_variant_check", u64::from(val))
}

pub fn get_cost_factor_hash_table_per_row(&self) -> Result<u64> {
self.try_get_u64("cost_factor_hash_table_per_row")
}
Expand Down
33 changes: 33 additions & 0 deletions src/query/sql/src/planner/binder/copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use databend_common_ast::ast::CopyIntoTableSource;
use databend_common_ast::ast::CopyIntoTableStmt;
use databend_common_ast::ast::Expr;
use databend_common_ast::ast::FileLocation;
use databend_common_ast::ast::Hint;
use databend_common_ast::ast::HintItem;
use databend_common_ast::ast::Identifier;
use databend_common_ast::ast::Literal;
use databend_common_ast::ast::Query;
use databend_common_ast::ast::SelectTarget;
use databend_common_ast::ast::SetExpr;
Expand Down Expand Up @@ -57,6 +60,7 @@ use databend_common_storages_parquet::ParquetRSTable;
use databend_common_users::UserApiProvider;
use indexmap::IndexMap;
use log::debug;
use log::warn;
use parking_lot::RwLock;

use crate::binder::location::parse_uri_location;
Expand Down Expand Up @@ -185,6 +189,7 @@ impl<'a> Binder {
let use_parquet2 = table_ctx.get_settings().get_use_parquet2()?;
let stage_info = plan.stage_table_info.stage_info.clone();
let files_info = plan.stage_table_info.files_info.clone();

let read_options = ParquetReadOptions::default();
let table = if use_parquet2 {
Parquet2Table::create(table_ctx, stage_info, files_info, read_options, None).await?
Expand Down Expand Up @@ -395,6 +400,34 @@ impl<'a> Binder {
output_context.parent = from_context.parent;
output_context.columns = from_context.columns;

// disable variant check to allow copy invalid JSON into tables
let disable_variant_check = plan
.stage_table_info
.stage_info
.copy_options
.disable_variant_check;
if disable_variant_check {
let hints = Hint {
hints_list: vec![HintItem {
name: Identifier::from_name("disable_variant_check"),
expr: Expr::Literal {
span: None,
lit: Literal::UInt64(1),
},
}],
};
if let Some(e) = self
.opt_hints_set_var(&mut output_context, &hints)
.await
.err()
{
warn!(
"In COPY resolve optimize hints {:?} failed, err: {:?}",
hints, e
);
}
}

plan.query = Some(Box::new(Plan::Query {
s_expr: Box::new(s_expr),
metadata: self.metadata.clone(),
Expand Down
Binary file added tests/data/parquet/invalid_json_string.parquet
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,33 @@ drop table if exists t2
statement ok
create table t2(a string null, b variant null)

query
query TIITI
copy into t2 from (select a, parse_json(b) from @data/parquet/json_string.parquet)
----
parquet/json_string.parquet 4 0 NULL NULL

query
query TT
select * from t2 order by a
----
1 {"k1":"v1","k2":"v2"}
2 {"k":{"k1":123,"k2":456}}
3 [1,2,3]
4 ["a","b"]

statement error 1006
copy into t2 from @data/parquet/invalid_json_string.parquet FILE_FORMAT = (type = PARQUET) disable_variant_check = false ON_ERROR = CONTINUE

query TIITI
copy into t2 from @data/parquet/invalid_json_string.parquet FILE_FORMAT = (type = PARQUET) disable_variant_check = true ON_ERROR = CONTINUE
----
parquet/invalid_json_string.parquet 2 0 NULL NULL

query TT
select * from t2 order by a
----
1 {"k1":"v1","k2":"v2"}
2 {"k":{"k1":123,"k2":456}}
3 [1,2,3]
4 ["a","b"]
5 {"k":"v"}
6 [1,

0 comments on commit ea5eaf0

Please sign in to comment.