Skip to content

Commit

Permalink
[refactor](point query) decouple PointQueryExec from the Coordinator (a…
Browse files Browse the repository at this point in the history
…pache#24509)

In order to decouple PointQueryExec from the Coordinator, both PointQueryExec and Coordinator inherit from CoordInterface, and are collectively scheduled through StmtExecutor.
  • Loading branch information
eldenmoon authored Sep 18, 2023
1 parent 23a75d0 commit 932b639
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 97 deletions.
31 changes: 31 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe;

import org.apache.doris.proto.Types;

public interface CoordInterface {
public void exec() throws Exception;

public RowBatch getNext() throws Exception;

public int getInstanceTotalNum();

public void cancel(Types.PPlanFragmentCancelReason cancelReason);
}

63 changes: 10 additions & 53 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.PrepareStmt;
import org.apache.doris.analysis.PrepareStmt.PreparedType;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
Expand Down Expand Up @@ -150,7 +148,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

public class Coordinator {
public class Coordinator implements CoordInterface {
private static final Logger LOG = LogManager.getLogger(Coordinator.class);

private static final String localIP = FrontendOptions.getLocalHostAddress();
Expand Down Expand Up @@ -260,7 +258,6 @@ public class Coordinator {
public Map<RuntimeFilterId, Integer> ridToBuilderNum = Maps.newHashMap();
private ConnectContext context;

private boolean isPointQuery = false;
private PointQueryExec pointExec = null;

private StatsErrorEstimator statsErrorEstimator;
Expand Down Expand Up @@ -294,32 +291,7 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
this.queryId = context.queryId();
this.fragments = planner.getFragments();
this.scanNodes = planner.getScanNodes();

if (this.scanNodes.size() == 1 && this.scanNodes.get(0) instanceof OlapScanNode) {
OlapScanNode olapScanNode = (OlapScanNode) (this.scanNodes.get(0));
isPointQuery = olapScanNode.isPointQuery();
if (isPointQuery) {
PlanFragment fragment = fragments.get(0);
LOG.debug("execPointGet fragment {}", fragment);
OlapScanNode planRoot = (OlapScanNode) fragment.getPlanRoot();
Preconditions.checkNotNull(planRoot);
pointExec = new PointQueryExec(planRoot.getPointQueryEqualPredicates(),
planRoot.getDescTable(), fragment.getOutputExprs());
}
}
PrepareStmt prepareStmt = analyzer == null ? null : analyzer.getPrepareStmt();
if (prepareStmt != null && prepareStmt.getPreparedType() == PreparedType.FULL_PREPARED) {
// Used cached or better performance
this.descTable = prepareStmt.getDescTable();
if (pointExec != null) {
pointExec.setCacheID(prepareStmt.getID());
pointExec.setSerializedDescTable(prepareStmt.getSerializedDescTable());
pointExec.setSerializedOutputExpr(prepareStmt.getSerializedOutputExprs());
pointExec.setBinaryProtocol(prepareStmt.isBinaryProtocol());
}
} else {
this.descTable = planner.getDescTable().toThrift();
}
this.descTable = planner.getDescTable().toThrift();

this.returnedAllResults = false;
this.enableShareHashTableForBroadcastJoin = context.getSessionVariable().enableShareHashTableForBroadcastJoin;
Expand Down Expand Up @@ -506,6 +478,7 @@ public Map<String, Integer> getBeToInstancesNum() {
return result;
}

@Override
public int getInstanceTotalNum() {
return instanceTotalNum;
}
Expand Down Expand Up @@ -598,6 +571,7 @@ public TExecPlanFragmentParams getStreamLoadPlan() throws Exception {
// 'Request' must contain at least a coordinator plan fragment (ie, can't
// be for a query like 'SELECT 1').
// A call to Exec() must precede all other member function calls.
@Override
public void exec() throws Exception {
if (LOG.isDebugEnabled() && !scanNodes.isEmpty()) {
LOG.debug("debug: in Coordinator::exec. query id: {}, planNode: {}",
Expand Down Expand Up @@ -649,17 +623,10 @@ public void exec() throws Exception {
LOG.info("dispatch load job: {} to {}", DebugUtil.printId(queryId), addressToBackendID.keySet());
}
executionProfile.markInstances(instanceIds);
if (!isPointQuery) {
if (enablePipelineEngine) {
sendPipelineCtx();
} else {
sendFragment();
}
if (enablePipelineEngine) {
sendPipelineCtx();
} else {
OlapScanNode planRoot = (OlapScanNode) fragments.get(0).getPlanRoot();
Preconditions.checkState(planRoot.getScanTabletIds().size() == 1);
pointExec.setCandidateBackends(planRoot.getScanBackendIds());
pointExec.setTabletId(planRoot.getScanTabletIds().get(0));
sendFragment();
}
}

Expand Down Expand Up @@ -1187,19 +1154,15 @@ private void updateStatus(Status status, TUniqueId instanceId) {
}
}

@Override
public RowBatch getNext() throws Exception {
if (receiver == null) {
throw new UserException("There is no receiver.");
}

RowBatch resultBatch;
Status status = new Status();

if (!isPointQuery) {
resultBatch = receiver.getNext(status);
} else {
resultBatch = pointExec.getNext(status);
}
resultBatch = receiver.getNext(status);
if (!status.ok()) {
LOG.warn("get next fail, need cancel. query id: {}", DebugUtil.printId(queryId));
}
Expand Down Expand Up @@ -1325,6 +1288,7 @@ public void cancel() {
cancel(Types.PPlanFragmentCancelReason.USER_CANCEL);
}

@Override
public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
lock();
try {
Expand Down Expand Up @@ -2095,13 +2059,6 @@ private Map<TNetworkAddress, Long> getReplicaNumPerHostForOlapTable() {
// Populates scan_range_assignment_.
// <fragment, <server, nodeId>>
private void computeScanRangeAssignment() throws Exception {
if (isPointQuery) {
// Fast path for evaluate Backend for point query
List<TScanRangeLocations> locations = ((OlapScanNode) scanNodes.get(0)).lazyEvaluateRangeLocations();
Preconditions.checkNotNull(locations);
return;
}

Map<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
Map<TNetworkAddress, Long> replicaNumPerHost = getReplicaNumPerHostForOlapTable();
Collections.shuffle(scanNodes);
Expand Down
122 changes: 92 additions & 30 deletions fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,33 @@

package org.apache.doris.qe;

import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.PrepareStmt;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.Planner;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.KeyTuple;
import org.apache.doris.proto.Types;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TExpr;
import org.apache.doris.thrift.TExprList;
import org.apache.doris.thrift.TResultBatch;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TStatusCode;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -43,7 +53,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -53,7 +62,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class PointQueryExec {
public class PointQueryExec implements CoordInterface {
private static final Logger LOG = LogManager.getLogger(PointQueryExec.class);
// SlotRef sorted by column id
private Map<SlotRef, Expr> equalPredicats;
Expand All @@ -69,55 +78,69 @@ public class PointQueryExec {
private boolean isBinaryProtocol = false;

private List<Backend> candidateBackends;
Planner planner;

// For parepared statement cached structure,
// there are some pre caculated structure in Backend TabletFetch service
// using this ID to find for this prepared statement
private UUID cacheID;

public PointQueryExec(Map<SlotRef, Expr> equalPredicats, DescriptorTable descTable,
ArrayList<Expr> outputExprs) {
this.equalPredicats = equalPredicats;
this.descriptorTable = descTable;
this.outputExprs = outputExprs;
private OlapScanNode getPlanRoot() {
List<PlanFragment> fragments = planner.getFragments();
PlanFragment fragment = fragments.get(0);
LOG.debug("execPointGet fragment {}", fragment);
OlapScanNode planRoot = (OlapScanNode) fragment.getPlanRoot();
Preconditions.checkNotNull(planRoot);
return planRoot;
}

void setCandidateBackends(HashSet<Long> backendsIds) {
public PointQueryExec(Planner planner, Analyzer analyzer) {
// init from planner
this.planner = planner;
List<PlanFragment> fragments = planner.getFragments();
PlanFragment fragment = fragments.get(0);
OlapScanNode planRoot = getPlanRoot();
this.equalPredicats = planRoot.getPointQueryEqualPredicates();
this.descriptorTable = planRoot.getDescTable();
this.outputExprs = fragment.getOutputExprs();

PrepareStmt prepareStmt = analyzer == null ? null : analyzer.getPrepareStmt();
if (prepareStmt != null && prepareStmt.getPreparedType() == PrepareStmt.PreparedType.FULL_PREPARED) {
// Used cached or better performance
this.cacheID = prepareStmt.getID();
this.serializedDescTable = prepareStmt.getSerializedDescTable();
this.serializedOutputExpr = prepareStmt.getSerializedOutputExprs();
this.isBinaryProtocol = prepareStmt.isBinaryProtocol();
} else {
// TODO
// planner.getDescTable().toThrift();
}
}

void setScanRangeLocations() throws Exception {
OlapScanNode planRoot = getPlanRoot();
// compute scan range
List<TScanRangeLocations> locations = planRoot.lazyEvaluateRangeLocations();
Preconditions.checkState(planRoot.getScanTabletIds().size() == 1);
this.tabletID = planRoot.getScanTabletIds().get(0);

Preconditions.checkNotNull(locations);
candidateBackends = new ArrayList<>();
for (Long backendID : backendsIds) {
for (Long backendID : planRoot.getScanBackendIds()) {
Backend backend = Env.getCurrentSystemInfo().getBackend(backendID);
if (SimpleScheduler.isAvailable(backend)) {
candidateBackends.add(backend);
}
}
// Random read replicas
Collections.shuffle(this.candidateBackends);
}

public void setSerializedDescTable(ByteString serializedDescTable) {
this.serializedDescTable = serializedDescTable;
}

public void setSerializedOutputExpr(ByteString serializedOutputExpr) {
this.serializedOutputExpr = serializedOutputExpr;
}

public void setCacheID(UUID cacheID) {
this.cacheID = cacheID;
}

public void setTabletId(long tabletID) {
this.tabletID = tabletID;
LOG.debug("set scan locations, backend ids {}, tablet id {}", candidateBackends, tabletID);
}

public void setTimeout(long timeoutMs) {
this.timeoutMs = timeoutMs;
}

public void setBinaryProtocol(boolean isBinaryProtocol) {
this.isBinaryProtocol = isBinaryProtocol;
}

void addKeyTuples(
InternalService.PTabletKeyLookupRequest.Builder requestBuilder) {
// TODO handle IN predicates
Expand All @@ -129,11 +152,26 @@ void addKeyTuples(
requestBuilder.addKeyTuples(kBuilder);
}

public RowBatch getNext(Status status) throws TException {
@Override
public int getInstanceTotalNum() {
// TODO
return 1;
}

@Override
public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
// Do nothing
}


@Override
public RowBatch getNext() throws Exception {
setScanRangeLocations();
Iterator<Backend> backendIter = candidateBackends.iterator();
RowBatch rowBatch = null;
int tryCount = 0;
int maxTry = Math.min(Config.max_point_query_retry_time, candidateBackends.size());
Status status = new Status();
do {
Backend backend = backendIter.next();
rowBatch = getNextInternal(status, backend);
Expand All @@ -146,9 +184,33 @@ public RowBatch getNext(Status status) throws TException {
}
status.setStatus(Status.OK);
} while (true);
// handle status code
if (!status.ok()) {
if (Strings.isNullOrEmpty(status.getErrorMsg())) {
status.rewriteErrorMsg();
}
if (status.isRpcError()) {
throw new RpcException(null, status.getErrorMsg());
} else {
String errMsg = status.getErrorMsg();
LOG.warn("query failed: {}", errMsg);

// hide host info
int hostIndex = errMsg.indexOf("host");
if (hostIndex != -1) {
errMsg = errMsg.substring(0, hostIndex);
}
throw new UserException(errMsg);
}
}
return rowBatch;
}

@Override
public void exec() throws Exception {
// Do nothing
}

private RowBatch getNextInternal(Status status, Backend backend) throws TException {
long timeoutTs = System.currentTimeMillis() + timeoutMs;
RowBatch rowBatch = new RowBatch();
Expand Down
Loading

0 comments on commit 932b639

Please sign in to comment.