Skip to content

Commit

Permalink
[feature](jdbc) Support jdbc catalog to read json types (apache#21341)
Browse files Browse the repository at this point in the history
  • Loading branch information
zy-kkk authored Jul 10, 2023
1 parent 1a08c81 commit 0be349e
Show file tree
Hide file tree
Showing 15 changed files with 1,242 additions and 1,062 deletions.
2 changes: 2 additions & 0 deletions be/src/runtime/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ struct TypeDescriptor {

bool is_variant_type() const { return type == TYPE_VARIANT; }

bool is_json_type() const { return type == TYPE_JSONB; }

static inline int get_decimal_byte_size(int precision) {
DCHECK_GT(precision, 0);
if (precision <= MAX_DECIMAL4_PRECISION) {
Expand Down
76 changes: 76 additions & 0 deletions be/src/vec/exec/vjdbc_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,22 @@ Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string&
->create_column());
break;
}
case TYPE_JSONB: {
if (type_str != "java.lang.String" && type_str != "org.postgresql.util.PGobject") {
return Status::InternalError(error_msg);
}

_map_column_idx_to_cast_idx_json[column_index] = _input_json_string_types.size();
if (slot_desc->is_nullable()) {
_input_json_string_types.push_back(make_nullable(std::make_shared<DataTypeString>()));
} else {
_input_json_string_types.push_back(std::make_shared<DataTypeString>());
}
str_json_cols.push_back(
_input_json_string_types[_map_column_idx_to_cast_idx_json[column_index]]
->create_column());
break;
}
case TYPE_HLL: {
if (type_str != "java.lang.String") {
return Status::InternalError(error_msg);
Expand Down Expand Up @@ -447,6 +463,8 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns
_cast_string_to_array(slot_desc, block, column_index, num_rows);
} else if (slot_desc->type().is_hll_type()) {
_cast_string_to_hll(slot_desc, block, column_index, num_rows);
} else if (slot_desc->type().is_json_type()) {
_cast_string_to_json(slot_desc, block, column_index, num_rows);
}
materialized_column_index++;
}
Expand Down Expand Up @@ -627,6 +645,26 @@ Status JdbcConnector::_convert_batch_result_set(JNIEnv* env, jobject jcolumn_dat
address[1], chars_addres);
break;
}
case TYPE_JSONB: {
str_json_cols[_map_column_idx_to_cast_idx_json[column_index]]->resize(num_rows);
if (column_is_nullable) {
auto* nullbale_column = reinterpret_cast<vectorized::ColumnNullable*>(
str_json_cols[_map_column_idx_to_cast_idx_json[column_index]].get());
auto& null_map = nullbale_column->get_null_map_data();
memset(null_map.data(), 0, num_rows);
address[0] = reinterpret_cast<int64_t>(null_map.data());
col_ptr = &nullbale_column->get_nested_column();
} else {
col_ptr = str_json_cols[_map_column_idx_to_cast_idx_json[column_index]].get();
}
auto column_string = reinterpret_cast<vectorized::ColumnString*>(col_ptr);
address[1] = reinterpret_cast<int64_t>(column_string->get_offsets().data());
auto chars_addres = reinterpret_cast<int64_t>(&column_string->get_chars());
env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_get_json_result,
jcolumn_data, column_is_nullable, num_rows, address[0],
address[1], chars_addres);
break;
}
case TYPE_HLL: {
str_hll_cols[_map_column_idx_to_cast_idx_hll[column_index]]->resize(num_rows);
if (column_is_nullable) {
Expand Down Expand Up @@ -704,6 +742,8 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
"(Ljava/lang/Object;ZIJJJ)V", _executor_get_array_result));
RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchHllResult", "(Ljava/lang/Object;ZIJJJ)V",
_executor_get_hll_result));
RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchJsonResult",
"(Ljava/lang/Object;ZIJJJ)V", _executor_get_json_result));
RETURN_IF_ERROR(register_id(_executor_clazz, "copyBatchCharResult",
"(Ljava/lang/Object;ZIJJJZ)V", _executor_get_char_result));

Expand Down Expand Up @@ -820,6 +860,42 @@ Status JdbcConnector::_cast_string_to_array(const SlotDescriptor* slot_desc, Blo
return Status::OK();
}

Status JdbcConnector::_cast_string_to_json(const SlotDescriptor* slot_desc, Block* block,
int column_index, int rows) {
DataTypePtr _target_data_type = slot_desc->get_data_type_ptr();
std::string _target_data_type_name = _target_data_type->get_name();
DataTypePtr _cast_param_data_type = _target_data_type;
ColumnPtr _cast_param = _cast_param_data_type->create_column_const_with_default_value(1);

ColumnsWithTypeAndName argument_template;
argument_template.reserve(2);
argument_template.emplace_back(
std::move(str_json_cols[_map_column_idx_to_cast_idx_json[column_index]]),
_input_json_string_types[_map_column_idx_to_cast_idx_json[column_index]],
"java.sql.String");
argument_template.emplace_back(_cast_param, _cast_param_data_type, _target_data_type_name);
FunctionBasePtr func_cast = SimpleFunctionFactory::instance().get_function(
"CAST", argument_template, make_nullable(_target_data_type));

Block cast_block(argument_template);
int result_idx = cast_block.columns();
cast_block.insert({nullptr, make_nullable(_target_data_type), "cast_result"});
func_cast->execute(nullptr, cast_block, {0, 1}, result_idx, rows);

auto res_col = cast_block.get_by_position(result_idx).column;
if (_target_data_type->is_nullable()) {
block->replace_by_position(column_index, res_col);
} else {
auto nested_ptr = reinterpret_cast<const vectorized::ColumnNullable*>(res_col.get())
->get_nested_column_ptr();
block->replace_by_position(column_index, nested_ptr);
}
str_json_cols[_map_column_idx_to_cast_idx_json[column_index]] =
_input_json_string_types[_map_column_idx_to_cast_idx_json[column_index]]
->create_column();
return Status::OK();
}

Status JdbcConnector::exec_stmt_write(Block* block, const VExprContextSPtrs& output_vexpr_ctxs,
uint32_t* num_rows_sent) {
SCOPED_TIMER(_result_send_timer);
Expand Down
7 changes: 7 additions & 0 deletions be/src/vec/exec/vjdbc_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ class JdbcConnector : public TableConnector {
int rows);
Status _cast_string_to_hll(const SlotDescriptor* slot_desc, Block* block, int column_index,
int rows);
Status _cast_string_to_json(const SlotDescriptor* slot_desc, Block* block, int column_index,
int rows);
Status _convert_batch_result_set(JNIEnv* env, jobject jobj, const SlotDescriptor* slot_desc,
vectorized::IColumn* column_ptr, int num_rows,
int column_index);
Expand Down Expand Up @@ -142,6 +144,7 @@ class JdbcConnector : public TableConnector {
jmethodID _executor_get_decimal64_result;
jmethodID _executor_get_decimal128_result;
jmethodID _executor_get_array_result;
jmethodID _executor_get_json_result;
jmethodID _executor_get_hll_result;
jmethodID _executor_get_types_id;
jmethodID _executor_close_id;
Expand All @@ -160,6 +163,10 @@ class JdbcConnector : public TableConnector {
std::vector<DataTypePtr> _input_hll_string_types;
std::vector<MutableColumnPtr> str_hll_cols; // for hll type to save data like string

std::map<int, int> _map_column_idx_to_cast_idx_json;
std::vector<DataTypePtr> _input_json_string_types;
std::vector<MutableColumnPtr> str_json_cols; // for json type to save data like string

JdbcStatistic _jdbc_statistic;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,13 @@ CREATE TABLE doris_test.arr
`arr27` Array(Datetime64)
)
ENGINE = MergeTree
ORDER BY id
ORDER BY id;

set allow_experimental_object_type = 1;
CREATE TABLE doris_test.json
(
`id` String,
`o` JSON
)
ENGINE = MergeTree
ORDER BY id;
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ VALUES
INSERT INTO doris_test.student values (1, 'doris', 18), (2, 'alice', 19), (3, 'bob', 20);

INSERT INTO doris_test.arr values
('1',[true],['2022-01-01'],['2022-01-01'],[1.1],[1.1],[1],[1],[1],[1],[1],[1],[1],[1],[1],[1],[1],[1],[2.2],[1],['116.253.40.133'],['2a02:aa08:e000:3100::2'],['61f0c404-5cb3-11e7-907b-a6006ad3dba0'],[1],['string'],['string'],['2022-01-01 00:00:00'],['2022-01-01 00:00:00'])
('1',[true],['2022-01-01'],['2022-01-01'],[1.1],[1.1],[1],[1],[1],[1],[1],[1],[1],[1],[1],[1],[1],[1],[2.2],[1],['116.253.40.133'],['2a02:aa08:e000:3100::2'],['61f0c404-5cb3-11e7-907b-a6006ad3dba0'],[1],['string'],['string'],['2022-01-01 00:00:00'],['2022-01-01 00:00:00']);

INSERT INTO doris_test.json VALUES ('1','{"a": 1, "b": { "c": 2, "d": [1, 2, 3] }}');
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,15 @@ CREATE TABLE catalog_pg_test.dt_test (
ts_field TIMESTAMP(3),
tzt_field TIMESTAMPTZ(3)
);

CREATE TABLE catalog_pg_test.json_test (
id serial PRIMARY KEY,
type varchar(10),
value json
);

CREATE TABLE catalog_pg_test.jsonb_test (
id serial PRIMARY KEY,
type varchar(10),
value jsonb
);
40 changes: 40 additions & 0 deletions docker/thirdparties/docker-compose/postgresql/init/04-insert.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2664,3 +2664,43 @@ VALUES
'2023-06-16 12:34:56.123',
'2023-06-16 12:34:56.123+08'
);

INSERT INTO catalog_pg_test.json_test (type,value) VALUES
(
'json',
'{
"stringKey": "stringValue",
"integerKey": 12345,
"floatKey": 123.45,
"booleanKey": true,
"nullKey": null,
"arrayKey": ["element1", 2, false, null, {"nestedKey": "nestedValue"}],
"objectKey": {
"nestedStringKey": "nestedStringValue",
"nestedIntegerKey": 67890
}
}'),
(
'json2',
NULL
);

INSERT INTO catalog_pg_test.jsonb_test (type,value) VALUES
(
'jsonb',
'{
"stringKey": "stringValue",
"integerKey": 12345,
"floatKey": 123.45,
"booleanKey": true,
"nullKey": null,
"arrayKey": ["element1", 2, false, null, {"nestedKey": "nestedValue"}],
"objectKey": {
"nestedStringKey": "nestedStringValue",
"nestedIntegerKey": 67890
}
}'),
(
'jsonb2',
NULL
);
5 changes: 3 additions & 2 deletions docs/en/docs/lakehouse/multi-catalog/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ CREATE CATALOG jdbc_mysql PROPERTIES (
| TIME | STRING | |
| CHAR | CHAR | |
| VARCHAR | VARCHAR | |
| JSON | STRING | |
| JSON | JSON | |
| SET | STRING | |
| BIT | BOOLEAN/STRING | BIT(1) will be mapped to BOOLEAN, and other BITs will be mapped to STRING |
| TINYTEXT、TEXT、MEDIUMTEXT、LONGTEXT | STRING | |
Expand Down Expand Up @@ -236,12 +236,13 @@ Doris obtains all schemas that PG user can access through the SQL statement: `se
| varchar/text | STRING | |
| timestamp | DATETIME | |
| date | DATE | |
| json/josnb | JSON | |
| time | STRING | |
| interval | STRING | |
| point/line/lseg/box/path/polygon/circle | STRING | |
| cidr/inet/macaddr | STRING | |
| bit | BOOLEAN/STRING | bit(1) will be mapped to BOOLEAN, and other bits will be mapped to STRING |
| uuid/josnb | STRING | |
| uuid | STRING | |
| Other | UNSUPPORTED | |

### Oracle
Expand Down
5 changes: 3 additions & 2 deletions docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ CREATE CATALOG jdbc_mysql PROPERTIES (
| TIME | STRING | |
| CHAR | CHAR | |
| VARCHAR | VARCHAR | |
| JSON | STRING | |
| JSON | JSON | |
| SET | STRING | |
| BIT | BOOLEAN/STRING | BIT(1) 会映射为 BOOLEAN,其他 BIT 映射为 STRING |
| TINYTEXT、TEXT、MEDIUMTEXT、LONGTEXT | STRING | |
Expand Down Expand Up @@ -236,12 +236,13 @@ Doris 通过sql 语句 `select nspname from pg_namespace where has_schema_privil
| varchar/text | STRING | |
| timestamp | DATETIME | |
| date | DATE | |
| json/josnb | JSON | |
| time | STRING | |
| interval | STRING | |
| point/line/lseg/box/path/polygon/circle | STRING | |
| cidr/inet/macaddr | STRING | |
| bit | BOOLEAN/STRING | bit(1)会映射为 BOOLEAN,其他 bit 映射为 STRING |
| uuid/josnb | STRING | |
| uuid | STRING | |
| Other | UNSUPPORTED | |

### Oracle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1924,6 +1924,23 @@ public void copyBatchArrayResult(Object columnObj, boolean isNullable, int numRo
}
}

public void copyBatchJsonResult(Object columnObj, boolean isNullable, int numRows, long nullMapAddr,
long offsetsAddr, long charsAddr) {
Object[] column = (Object[]) columnObj;
int firstNotNullIndex = 0;
if (isNullable) {
firstNotNullIndex = getFirstNotNullObject(column, numRows, nullMapAddr);
}
if (firstNotNullIndex == numRows) {
return;
}
if (column[firstNotNullIndex] instanceof String) {
stringPutToString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr);
} else {
objectPutToString(column, isNullable, numRows, nullMapAddr, offsetsAddr, charsAddr);
}
}

private int getFirstNotNullObject(Object[] column, int numRows, long nullMapAddr) {
int i = 0;
for (; i < numRows; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
} else {
return ScalarType.createStringType();
}
case "JSON":
return ScalarType.createJsonbType();
case "TIME":
case "TINYTEXT":
case "TEXT":
Expand All @@ -326,7 +328,6 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
case "STRING":
case "MEDIUMSTRING":
case "LONGSTRING":
case "JSON":
case "SET":
case "BINARY":
case "VARBINARY":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,12 @@ protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
case "inet":
case "macaddr":
case "varbit":
case "jsonb":
case "uuid":
case "bytea":
return ScalarType.createStringType();
case "json":
case "jsonb":
return ScalarType.createJsonbType();
default:
return Type.UNSUPPORTED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ sys

-- !mysql_all_types --
\N 302 \N 502 602 4.14159 \N 6.14159 \N -124 -302 2013 -402 -502 -602 \N 2012-10-26T02:08:39.345700 2013-10-26T08:09:18 -5.14145 \N -7.1400 row2 \N 09:11:09.567 text2 0xe86f6c6c6f20576f726c67 \N \N 0x2f \N 0x88656c6c9f Value3
201 301 401 501 601 3.14159 4.1415926 5.14159 true -123 -301 2012 -401 -501 -601 2012-10-30 2012-10-25T12:05:36.345700 2012-10-25T08:08:08 -4.14145 -5.1400000001 -6.1400 row1 line1 09:09:09.567 text1 0x48656c6c6f20576f726c64 {"age": 30, "city": "London", "name": "Alice"} Option1,Option3 0x2a 0x48656c6c6f00000000000000 0x48656c6c6f Value2
202 302 402 502 602 4.14159 5.1415926 6.14159 false -124 -302 2013 -402 -502 -602 2012-11-01 2012-10-26T02:08:39.345700 2013-10-26T08:09:18 -5.14145 -6.1400000001 -7.1400 row2 line2 09:11:09.567 text2 0xe86f6c6c6f20576f726c67 {"age": 18, "city": "ChongQing", "name": "Gaoxin"} Option1,Option2 0x2f 0x58676c6c6f00000000000000 0x88656c6c9f Value3
203 303 403 503 603 7.14159 8.1415926 9.14159 false \N -402 2017 -602 -902 -1102 2012-11-02 \N 2013-10-27T08:11:18 -5.14145 -6.1400000000001 -7.1400 row3 line3 09:11:09.567 text3 0xe86f6c6c6f20576f726c67 {"age": 24, "city": "ChongQing", "name": "ChenQi"} Option2 0x2f 0x58676c6c6f00000000000000 \N Value1
201 301 401 501 601 3.14159 4.1415926 5.14159 true -123 -301 2012 -401 -501 -601 2012-10-30 2012-10-25T12:05:36.345700 2012-10-25T08:08:08 -4.14145 -5.1400000001 -6.1400 row1 line1 09:09:09.567 text1 0x48656c6c6f20576f726c64 {"age":30,"city":"London","name":"Alice"} Option1,Option3 0x2a 0x48656c6c6f00000000000000 0x48656c6c6f Value2
202 302 402 502 602 4.14159 5.1415926 6.14159 false -124 -302 2013 -402 -502 -602 2012-11-01 2012-10-26T02:08:39.345700 2013-10-26T08:09:18 -5.14145 -6.1400000001 -7.1400 row2 line2 09:11:09.567 text2 0xe86f6c6c6f20576f726c67 {"age":18,"city":"ChongQing","name":"Gaoxin"} Option1,Option2 0x2f 0x58676c6c6f00000000000000 0x88656c6c9f Value3
203 303 403 503 603 7.14159 8.1415926 9.14159 false \N -402 2017 -602 -902 -1102 2012-11-02 \N 2013-10-27T08:11:18 -5.14145 -6.1400000000001 -7.1400 row3 line3 09:11:09.567 text3 0xe86f6c6c6f20576f726c67 {"age":24,"city":"ChongQing","name":"ChenQi"} Option2 0x2f 0x58676c6c6f00000000000000 \N Value1

2,110 changes: 1,059 additions & 1,051 deletions regression-test/data/jdbc_catalog_p0/test_pg_jdbc_catalog.out

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ suite("test_pg_jdbc_catalog", "p0") {
order_qt_test14 """ select * from test12 order by id; """
order_qt_wkb_test """ select * from wkb_test order by id; """
order_qt_dt_test """ select * from dt_test order by 1; """
order_qt_json_test """ select * from json_test order by 1; """
order_qt_jsonb_test """ select * from jsonb_test order by 1; """

// test insert
String uuid1 = UUID.randomUUID().toString();
Expand Down

0 comments on commit 0be349e

Please sign in to comment.