Skip to content

Commit

Permalink
[feature-wip](arrow-flight)(step5) Support JDBC and PreparedStatement…
Browse files Browse the repository at this point in the history
… and Fix Bug (apache#27661)
  • Loading branch information
xinyiZzz authored Nov 29, 2023
1 parent 19ecb3a commit d96e2df
Show file tree
Hide file tree
Showing 18 changed files with 372 additions and 126 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ DEFINE_Int32(brpc_port, "8060");

DEFINE_Int32(arrow_flight_sql_port, "-1");

DEFINE_mString(public_access_ip, "");

// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
DEFINE_Int32(brpc_num_threads, "256");
Expand Down
5 changes: 5 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ DECLARE_Int32(brpc_port);
// Default -1, do not start arrow flight sql server.
DECLARE_Int32(arrow_flight_sql_port);

// If priority_networks is incorrect but cannot be modified, set public_access_ip as BE’s real IP.
// For ADBC client fetch result, default is empty, the ADBC client uses the backend ip to fetch the result.
// If ADBC client cannot access the backend ip, can set public_access_ip to modify the fetch result ip.
DECLARE_mString(public_access_ip);

// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
DECLARE_Int32(brpc_num_threads);
Expand Down
3 changes: 3 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,9 @@ void PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcContro
auto st = serialize_arrow_schema(&schema, &schema_str);
if (st.ok()) {
result->set_schema(std::move(schema_str));
if (config::public_access_ip != "") {
result->set_be_arrow_flight_ip(config::public_access_ip);
}
}
st.to_protobuf(result->mutable_status());
});
Expand Down
9 changes: 5 additions & 4 deletions be/src/util/arrow/row_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,13 @@ Status convert_to_arrow_schema(const RowDescriptor& row_desc,
Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs,
std::shared_ptr<arrow::Schema>* result) {
std::vector<std::shared_ptr<arrow::Field>> fields;
for (auto expr_ctx : output_vexpr_ctxs) {
for (int i = 0; i < output_vexpr_ctxs.size(); i++) {
std::shared_ptr<arrow::DataType> arrow_type;
auto root_expr = expr_ctx->root();
auto root_expr = output_vexpr_ctxs.at(i)->root();
RETURN_IF_ERROR(convert_to_arrow_type(root_expr->type(), &arrow_type));
auto field_name = root_expr->is_slot_ref() ? root_expr->expr_name()
: root_expr->data_type()->get_name();
auto field_name = root_expr->is_slot_ref() && !root_expr->expr_name().empty()
? root_expr->expr_name()
: fmt::format("{}_{}", root_expr->data_type()->get_name(), i);
fields.push_back(
std::make_shared<arrow::Field>(field_name, arrow_type, root_expr->is_nullable()));
}
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,10 @@ under the License.
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-sql-jdbc-driver</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-context</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ private PlanFragment createMergeFragment(PlanFragment inputFragment)
mergePlan.init(ctx.getRootAnalyzer());
Preconditions.checkState(mergePlan.hasValidStats());
PlanFragment fragment = new PlanFragment(ctx.getNextFragmentId(), mergePlan, DataPartition.UNPARTITIONED);
fragment.setResultSinkType(ctx.getRootAnalyzer().getContext().getResultSinkType());
inputFragment.setDestination(mergePlan);
return fragment;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQue
LOG.debug("substitute result Exprs {}", resExprs);
rootFragment.setOutputExprs(resExprs);
}
rootFragment.setResultSinkType(ConnectContext.get().getResultSinkType());
LOG.debug("finalize plan fragments");
for (PlanFragment fragment : fragments) {
fragment.finalize(queryStmt);
Expand Down
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public enum ConnectType {
protected volatile long loginTime;
// for arrow flight
protected volatile String peerIdentity;
private final Map<String, String> preparedQuerys = new HashMap<>();
private String runningQuery;
private TNetworkAddress resultFlightServerAddr;
private TNetworkAddress resultInternalServiceAddr;
Expand Down Expand Up @@ -611,6 +612,18 @@ public void resetLoginTime() {
this.loginTime = System.currentTimeMillis();
}

public void addPreparedQuery(String preparedStatementId, String preparedQuery) {
preparedQuerys.put(preparedStatementId, preparedQuery);
}

public String getPreparedQuery(String preparedStatementId) {
return preparedQuerys.get(preparedStatementId);
}

public void removePreparedQuery(String preparedStatementId) {
preparedQuerys.remove(preparedStatementId);
}

public void setRunningQuery(String runningQuery) {
this.runningQuery = runningQuery;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1687,7 +1687,7 @@ private TNetworkAddress toArrowFlightHost(TNetworkAddress host) throws Exception
if (backend.getArrowFlightSqlPort() < 0) {
return null;
}
return new TNetworkAddress(backend.getHost(), backend.getArrowFlightSqlPort());
return backend.getArrowFlightAddress();
}

// estimate if this fragment contains UnionNode
Expand Down
25 changes: 15 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -605,9 +605,6 @@ public void finalizeQuery() {
}

private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
context.setReturnResultFromLocal(false);
}
// queue query here
syncJournalIfNeeded();
QueueOfferToken offerRet = null;
Expand Down Expand Up @@ -642,6 +639,9 @@ private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
DebugUtil.printId(queryId), i, DebugUtil.printId(newQueryId));
context.setQueryId(newQueryId);
}
if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
context.setReturnResultFromLocal(false);
}
handleQueryStmt();
break;
} catch (RpcException e) {
Expand Down Expand Up @@ -2305,18 +2305,23 @@ private void handleLockTablesStmt() {
}

public void handleExplainStmt(String result, boolean isNereids) throws IOException {
// TODO support arrow flight sql
ShowResultSetMetaData metaData = ShowResultSetMetaData.builder()
.addColumn(new Column("Explain String" + (isNereids ? "(Nereids Planner)" : "(Old Planner)"),
ScalarType.createVarchar(20)))
.build();
sendMetaData(metaData);
if (context.getConnectType() == ConnectType.MYSQL) {
sendMetaData(metaData);

// Send result set.
for (String item : result.split("\n")) {
serializer.reset();
serializer.writeLenEncodedString(item);
context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
// Send result set.
for (String item : result.split("\n")) {
serializer.reset();
serializer.writeLenEncodedString(item);
context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
}
} else if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
context.getFlightSqlChannel()
.addResult(DebugUtil.printId(context.queryId()), context.getRunningQuery(), metaData, result);
context.setReturnResultFromLocal(true);
}
context.getState().setEof();
}
Expand Down
Loading

0 comments on commit d96e2df

Please sign in to comment.