Skip to content

Commit

Permalink
[Feature](avro) Support Apache Avro file format (apache#19990)
Browse files Browse the repository at this point in the history
support read avro file by hdfs() or s3() .
```sql
select * from s3(
         "uri" = "http://127.0.0.1:9312/test2/person.avro",
         "ACCESS_KEY" = "ak",
         "SECRET_KEY" = "sk",
         "FORMAT" = "avro");
+--------+--------------+-------------+-----------------+
| name   | boolean_type | double_type | long_type       |
+--------+--------------+-------------+-----------------+
| Alyssa |            1 |     10.0012 | 100000000221133 |
| Ben    |            0 |    5555.999 |      4009990000 |
| lisi   |            0 | 5992225.999 |      9099933330 |
+--------+--------------+-------------+-----------------+

select * from hdfs(
                "uri" = "hdfs://127.0.0.1:9000/input/person2.avro",
                "fs.defaultFS" = "hdfs://127.0.0.1:9000",
                "hadoop.username" = "doris",
                "format" = "avro");
+--------+--------------+-------------+-----------+
| name   | boolean_type | double_type | long_type |
+--------+--------------+-------------+-----------+
| Alyssa |            1 |  8888.99999 |  89898989 |
+--------+--------------+-------------+-----------+
```

current avro reader only support common data type, the complex data types will be supported later.
  • Loading branch information
DongLiang-0 authored and morningman committed Jul 3, 2023
1 parent 12a9052 commit 2c5b617
Show file tree
Hide file tree
Showing 31 changed files with 1,286 additions and 30 deletions.
9 changes: 9 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
#include "vec/exec/format/json/new_json_reader.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exec/scan/avro_jni_reader.h"
#include "vec/jsonb/serialize.h"
#include "vec/runtime/vdata_stream_mgr.h"

Expand Down Expand Up @@ -603,6 +604,14 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
file_slots, &io_ctx);
break;
}
case TFileFormatType::FORMAT_AVRO: {
// file_slots is no use
std::vector<SlotDescriptor*> file_slots;
reader = vectorized::AvroJNIReader::create_unique(profile.get(), params, range,
file_slots);
((vectorized::AvroJNIReader*)(reader.get()))->init_fetch_table_schema_reader();
break;
}
default:
st = Status::InternalError("Not supported file format in fetch table schema: {}",
params.format_type);
Expand Down
21 changes: 20 additions & 1 deletion be/src/vec/exec/jni_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,16 @@ Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) {
// cannot put the env into fields, because frames in an env object is limited
// to avoid limited frames in a thread, we should get local env in a method instead of in whole object.
JNIEnv* env = nullptr;
int batch_size = 0;
if (!_is_table_schema) {
batch_size = _state->batch_size();
}
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
if (env == nullptr) {
return Status::InternalError("Failed to get/create JVM");
}
SCOPED_TIMER(_open_scanner_time);
RETURN_IF_ERROR(_init_jni_scanner(env, state->batch_size()));
RETURN_IF_ERROR(_init_jni_scanner(env, batch_size));
// Call org.apache.doris.common.jni.JniScanner#open
env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_open);
RETURN_ERROR_IF_EXC(env);
Expand Down Expand Up @@ -129,6 +133,18 @@ Status JniConnector::get_nex_block(Block* block, size_t* read_rows, bool* eof) {
return Status::OK();
}

Status JniConnector::get_table_schema(std::string& table_schema_str) {
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
// Call org.apache.doris.jni.JniScanner#getTableSchema
// return the TableSchema information
jstring jstr = (jstring)env->CallObjectMethod(_jni_scanner_obj, _jni_scanner_get_table_schema);
RETURN_ERROR_IF_EXC(env);
table_schema_str = env->GetStringUTFChars(jstr, nullptr);
RETURN_ERROR_IF_EXC(env);
return Status::OK();
}

std::map<std::string, std::string> JniConnector::get_statistics(JNIEnv* env) {
jobject metrics = env->CallObjectMethod(_jni_scanner_obj, _jni_scanner_get_statistics);
std::map<std::string, std::string> result = JniUtil::convert_to_cpp_map(env, metrics);
Expand Down Expand Up @@ -197,6 +213,9 @@ Status JniConnector::_init_jni_scanner(JNIEnv* env, int batch_size) {

_jni_scanner_open = env->GetMethodID(_jni_scanner_cls, "open", "()V");
_jni_scanner_get_next_batch = env->GetMethodID(_jni_scanner_cls, "getNextBatchMeta", "()J");
_jni_scanner_get_table_schema =
env->GetMethodID(_jni_scanner_cls, "getTableSchema", "()Ljava/lang/String;");
RETURN_ERROR_IF_EXC(env);
_jni_scanner_close = env->GetMethodID(_jni_scanner_cls, "close", "()V");
_jni_scanner_release_column = env->GetMethodID(_jni_scanner_cls, "releaseColumn", "(I)V");
_jni_scanner_release_table = env->GetMethodID(_jni_scanner_cls, "releaseTable", "()V");
Expand Down
24 changes: 24 additions & 0 deletions be/src/vec/exec/jni_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,17 @@ class JniConnector {
_connector_name = split(_connector_class, "/").back();
}

/**
* Just use to get the table schema.
* @param connector_class Java scanner class
* @param scanner_params Provided configuration map
*/
JniConnector(std::string connector_class, std::map<std::string, std::string> scanner_params)
: _connector_class(std::move(connector_class)),
_scanner_params(std::move(scanner_params)) {
_is_table_schema = true;
}

/// Should release jni resources if other functions are failed.
~JniConnector();

Expand Down Expand Up @@ -205,6 +216,17 @@ class JniConnector {
*/
std::map<std::string, std::string> get_statistics(JNIEnv* env);

/**
* Call java side function JniScanner.getTableSchema.
*
* The schema information are stored as a string.
* Use # between column names and column types.
*
* like: col_name1,col_name2,col_name3#col_type1,col_type2.col_type3
*
*/
Status get_table_schema(std::string& table_schema_str);

/**
* Close scanner and release jni resources.
*/
Expand All @@ -222,6 +244,7 @@ class JniConnector {
std::string _connector_class;
std::map<std::string, std::string> _scanner_params;
std::vector<std::string> _column_names;
bool _is_table_schema = false;

RuntimeState* _state;
RuntimeProfile* _profile;
Expand All @@ -237,6 +260,7 @@ class JniConnector {
jobject _jni_scanner_obj;
jmethodID _jni_scanner_open;
jmethodID _jni_scanner_get_next_batch;
jmethodID _jni_scanner_get_table_schema;
jmethodID _jni_scanner_close;
jmethodID _jni_scanner_release_column;
jmethodID _jni_scanner_release_table;
Expand Down
165 changes: 165 additions & 0 deletions be/src/vec/exec/scan/avro_jni_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "avro_jni_reader.h"

#include <map>
#include <ostream>

#include "runtime/descriptors.h"
#include "runtime/types.h"

namespace doris::vectorized {

AvroJNIReader::AvroJNIReader(RuntimeState* state, RuntimeProfile* profile,
const TFileScanRangeParams& params,
const std::vector<SlotDescriptor*>& file_slot_descs)
: _file_slot_descs(file_slot_descs), _state(state), _profile(profile), _params(params) {}

AvroJNIReader::AvroJNIReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
const TFileRangeDesc& range,
const std::vector<SlotDescriptor*>& file_slot_descs)
: _file_slot_descs(file_slot_descs), _profile(profile), _params(params), _range(range) {}

AvroJNIReader::~AvroJNIReader() = default;

Status AvroJNIReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
RETURN_IF_ERROR(_jni_connector->get_nex_block(block, read_rows, eof));
if (*eof) {
RETURN_IF_ERROR(_jni_connector->close());
}
return Status::OK();
}

Status AvroJNIReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) {
for (auto& desc : _file_slot_descs) {
name_to_type->emplace(desc->col_name(), desc->type());
}
return Status::OK();
}

Status AvroJNIReader::init_fetch_table_reader(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
std::ostringstream required_fields;
std::ostringstream columns_types;
std::vector<std::string> column_names;
int index = 0;
for (auto& desc : _file_slot_descs) {
std::string field = desc->col_name();
column_names.emplace_back(field);
std::string type = JniConnector::get_hive_type(desc->type());
if (index == 0) {
required_fields << field;
columns_types << type;
} else {
required_fields << "," << field;
columns_types << "#" << type;
}
index++;
}

TFileType::type type = _params.file_type;
std::map<String, String> required_param = {
{"required_fields", required_fields.str()},
{"columns_types", columns_types.str()},
{"file_type", std::to_string(type)},
{"is_get_table_schema", "false"},
{"hive.serde", "org.apache.hadoop.hive.serde2.avro.AvroSerDe"}};
switch (type) {
case TFileType::FILE_HDFS:
required_param.insert(std::make_pair("uri", _params.hdfs_params.hdfs_conf.data()->value));
break;
case TFileType::FILE_S3:
required_param.insert(_params.properties.begin(), _params.properties.end());
break;
default:
Status::InternalError("unsupported file reader type: {}", std::to_string(type));
}
required_param.insert(_params.properties.begin(), _params.properties.end());
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/avro/AvroJNIScanner",
required_param, column_names);
RETURN_IF_ERROR(_jni_connector->init(_colname_to_value_range));
return _jni_connector->open(_state, _profile);
}

Status AvroJNIReader::init_fetch_table_schema_reader() {
std::map<String, String> required_param = {{"uri", _range.path},
{"file_type", std::to_string(_params.file_type)},
{"is_get_table_schema", "true"}};

required_param.insert(_params.properties.begin(), _params.properties.end());
_jni_connector =
std::make_unique<JniConnector>("org/apache/doris/avro/AvroJNIScanner", required_param);
return _jni_connector->open(nullptr, _profile);
}

Status AvroJNIReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) {
std::string table_schema_str;
RETURN_IF_ERROR(_jni_connector->get_table_schema(table_schema_str));

rapidjson::Document document;
document.Parse(table_schema_str.c_str());
if (document.IsArray()) {
for (int i = 0; i < document.Size(); ++i) {
rapidjson::Value& column_schema = document[i];
col_names->push_back(column_schema["name"].GetString());
col_types->push_back(convert_to_doris_type(column_schema));
}
}
return _jni_connector->close();
}

TypeDescriptor AvroJNIReader::convert_to_doris_type(const rapidjson::Value& column_schema) {
::doris::TPrimitiveType::type schema_type =
static_cast< ::doris::TPrimitiveType::type>(column_schema["type"].GetInt());
switch (schema_type) {
case TPrimitiveType::INT:
case TPrimitiveType::STRING:
case TPrimitiveType::BIGINT:
case TPrimitiveType::BOOLEAN:
case TPrimitiveType::DOUBLE:
case TPrimitiveType::FLOAT:
return TypeDescriptor(thrift_to_type(schema_type));
case TPrimitiveType::ARRAY: {
TypeDescriptor list_type(PrimitiveType::TYPE_ARRAY);
list_type.add_sub_type(convert_complex_type(column_schema["childColumn"].GetObject()));
return list_type;
}
case TPrimitiveType::MAP: {
TypeDescriptor map_type(PrimitiveType::TYPE_MAP);

// The default type of AVRO MAP structure key is STRING
map_type.add_sub_type(PrimitiveType::TYPE_STRING);
map_type.add_sub_type(convert_complex_type(column_schema["childColumn"].GetObject()));
return map_type;
}
default:
return TypeDescriptor(PrimitiveType::INVALID_TYPE);
}
}

TypeDescriptor AvroJNIReader::convert_complex_type(
const rapidjson::Document::ConstObject child_schema) {
::doris::TPrimitiveType::type child_schema_type =
static_cast< ::doris::TPrimitiveType::type>(child_schema["type"].GetInt());
return TypeDescriptor(thrift_to_type(child_schema_type));
}

} // namespace doris::vectorized
96 changes: 96 additions & 0 deletions be/src/vec/exec/scan/avro_jni_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <rapidjson/document.h>
#include <stddef.h>

#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "common/status.h"
#include "exec/olap_common.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/jni_connector.h"

namespace doris {
class RuntimeProfile;

class RuntimeState;

class SlotDescriptor;
namespace vectorized {
class Block;
} // namespace vectorized
struct TypeDescriptor;
} // namespace doris

namespace doris::vectorized {

/**
* Read avro-format file
*/
class AvroJNIReader : public GenericReader {
ENABLE_FACTORY_CREATOR(AvroJNIReader);

public:
/**
* Call java side by jni to get table data.
*/
AvroJNIReader(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams& params,
const std::vector<SlotDescriptor*>& file_slot_descs);

/**
* Call java side by jni to get table schema.
*/
AvroJNIReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
const TFileRangeDesc& range, const std::vector<SlotDescriptor*>& file_slot_descs);

~AvroJNIReader() override;

Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;

Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;

Status init_fetch_table_reader(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);

Status init_fetch_table_schema_reader();

Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) override;

TypeDescriptor convert_to_doris_type(const rapidjson::Value& column_schema);

TypeDescriptor convert_complex_type(const rapidjson::Document::ConstObject child_schema);

private:
const std::vector<SlotDescriptor*>& _file_slot_descs;
RuntimeState* _state;
RuntimeProfile* _profile;
const TFileScanRangeParams _params;
const TFileRangeDesc _range;
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
std::unique_ptr<JniConnector> _jni_connector;
};

} // namespace doris::vectorized
Loading

0 comments on commit 2c5b617

Please sign in to comment.