diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java index 63c73b50edcd07..e76934cf847597 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/BackendWorker.java @@ -45,7 +45,7 @@ public String address() { @Override public String brpcAddress() { - return backend.getHost() + brpcPort(); + return backend.getHost() + ":" + brpcPort(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java index f9ab8e83f07e99..fca5461fe1cb72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java @@ -65,7 +65,7 @@ public boolean isMergeRuntimeFilterInstance(AssignedJob instance) { return mergeInstance == instance; } - public void setRuntimeFilterThriftParams(TRuntimeFilterParams runtimeFilterParams) { + public void populateRuntimeFilterParams(TRuntimeFilterParams runtimeFilterParams) { for (RuntimeFilter rf : runtimeFilters) { List targets = ridToTargets.get(rf.getFilterId()); if (targets == null) { @@ -89,8 +89,7 @@ public void setRuntimeFilterThriftParams(TRuntimeFilterParams runtimeFilterParam } runtimeFilterParams.putToRidToTargetParamv2( - rf.getFilterId().asInt(), new ArrayList<>(targetToParams.values()) - ); + rf.getFilterId().asInt(), new ArrayList<>(targetToParams.values())); } } for (Map.Entry entry : ridToBuilderNum.entrySet()) { @@ -122,15 +121,14 @@ public static RuntimeFiltersThriftBuilder compute( PlanFragment fragment = plan.getFragmentJob().getFragment(); // Transform to for (RuntimeFilterId rid : fragment.getTargetRuntimeFilterIds()) { - List targetFragments = - ridToTargetParam.computeIfAbsent(rid, k -> new ArrayList<>()); + List targetFragments = ridToTargetParam.computeIfAbsent(rid, + k -> new ArrayList<>()); for (AssignedJob instanceJob : plan.getInstanceJobs()) { BackendWorker backendWorker = (BackendWorker) instanceJob.getAssignedWorker(); Backend backend = backendWorker.getBackend(); targetFragments.add(new RuntimeFilterTarget( fragment.getFragmentId().asInt(), - new TNetworkAddress(backend.getHost(), backend.getBrpcPort()) - )); + new TNetworkAddress(backend.getHost(), backend.getBrpcPort()))); } } @@ -146,8 +144,7 @@ public static RuntimeFiltersThriftBuilder compute( } return new RuntimeFiltersThriftBuilder( mergeAddress, runtimeFilters, mergeInstance, - broadcastRuntimeFilterIds, ridToTargetParam, ridToBuilderNum - ); + broadcastRuntimeFilterIds, ridToTargetParam, ridToBuilderNum); } public static class RuntimeFilterTarget { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index 5fcd14fcb79689..394e0cd5b1c6c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -125,7 +125,7 @@ public static Map plansToThr // so we can merge and send multiple fragment to a backend use one rpc for (Entry kv : workerToCurrentFragment.entrySet()) { TPipelineFragmentParamsList fragments = fragmentsGroupByWorker.computeIfAbsent( - kv.getKey(), w -> beToThrift(runtimeFiltersThriftBuilder, + kv.getKey(), w -> beToThrift(kv.getKey(), runtimeFiltersThriftBuilder, topNFilterThriftSupplier)); fragments.addToParamsList(kv.getValue()); } @@ -298,18 +298,23 @@ private static TPlanFragmentDestination instanceToDestination(AssignedJob instan } private static TPipelineFragmentParamsList beToThrift( + DistributedPlanWorker worker, RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder, Supplier> topNFilterThriftSupplier) { TPipelineFragmentParamsList beParam = new TPipelineFragmentParamsList(); TRuntimeFilterInfo runtimeFilterInfo = new TRuntimeFilterInfo(); runtimeFilterInfo.setTopnFilterDescs(topNFilterThriftSupplier.get()); - // set for runtime filter TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams(); runtimeFilterParams.setRuntimeFilterMergeAddr(runtimeFiltersThriftBuilder.mergeAddress); + if (worker.host().equals(runtimeFiltersThriftBuilder.mergeAddress.getHostname()) + && worker.brpcPort() == runtimeFiltersThriftBuilder.mergeAddress.getPort()) { + // only set following information for merge BE node + runtimeFiltersThriftBuilder.populateRuntimeFilterParams(runtimeFilterParams); + } runtimeFilterInfo.setRuntimeFilterParams(runtimeFilterParams); - runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams); beParam.setRuntimeFilterInfo(runtimeFilterInfo); + return beParam; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index c5982ed4b0fa22..8f587f38bc3dff 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -442,6 +442,7 @@ struct TRuntimeFilterParams { // Runtime filter merge instance address. Used if this filter has a remote target 1: optional Types.TNetworkAddress runtime_filter_merge_addr + // keep 2/3/4/5 unset if BE is not used for merge // deprecated 2: optional map> rid_to_target_param