Skip to content
Closed

test #57583

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public String address() {

@Override
public String brpcAddress() {
return backend.getHost() + brpcPort();
return backend.getHost() + ":" + brpcPort();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public boolean isMergeRuntimeFilterInstance(AssignedJob instance) {
return mergeInstance == instance;
}

public void setRuntimeFilterThriftParams(TRuntimeFilterParams runtimeFilterParams) {
public void populateRuntimeFilterParams(TRuntimeFilterParams runtimeFilterParams) {
for (RuntimeFilter rf : runtimeFilters) {
List<RuntimeFilterTarget> targets = ridToTargets.get(rf.getFilterId());
if (targets == null) {
Expand All @@ -89,8 +89,7 @@ public void setRuntimeFilterThriftParams(TRuntimeFilterParams runtimeFilterParam
}

runtimeFilterParams.putToRidToTargetParamv2(
rf.getFilterId().asInt(), new ArrayList<>(targetToParams.values())
);
rf.getFilterId().asInt(), new ArrayList<>(targetToParams.values()));
}
}
for (Map.Entry<RuntimeFilterId, Integer> entry : ridToBuilderNum.entrySet()) {
Expand Down Expand Up @@ -122,15 +121,14 @@ public static RuntimeFiltersThriftBuilder compute(
PlanFragment fragment = plan.getFragmentJob().getFragment();
// Transform <fragment, runtimeFilterId> to <runtimeFilterId, fragment>
for (RuntimeFilterId rid : fragment.getTargetRuntimeFilterIds()) {
List<RuntimeFilterTarget> targetFragments =
ridToTargetParam.computeIfAbsent(rid, k -> new ArrayList<>());
List<RuntimeFilterTarget> targetFragments = ridToTargetParam.computeIfAbsent(rid,
k -> new ArrayList<>());
for (AssignedJob instanceJob : plan.getInstanceJobs()) {
BackendWorker backendWorker = (BackendWorker) instanceJob.getAssignedWorker();
Backend backend = backendWorker.getBackend();
targetFragments.add(new RuntimeFilterTarget(
fragment.getFragmentId().asInt(),
new TNetworkAddress(backend.getHost(), backend.getBrpcPort())
));
new TNetworkAddress(backend.getHost(), backend.getBrpcPort())));
}
}

Expand All @@ -146,8 +144,7 @@ public static RuntimeFiltersThriftBuilder compute(
}
return new RuntimeFiltersThriftBuilder(
mergeAddress, runtimeFilters, mergeInstance,
broadcastRuntimeFilterIds, ridToTargetParam, ridToBuilderNum
);
broadcastRuntimeFilterIds, ridToTargetParam, ridToBuilderNum);
}

public static class RuntimeFilterTarget {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public static Map<DistributedPlanWorker, TPipelineFragmentParamsList> plansToThr
// so we can merge and send multiple fragment to a backend use one rpc
for (Entry<DistributedPlanWorker, TPipelineFragmentParams> kv : workerToCurrentFragment.entrySet()) {
TPipelineFragmentParamsList fragments = fragmentsGroupByWorker.computeIfAbsent(
kv.getKey(), w -> beToThrift(runtimeFiltersThriftBuilder,
kv.getKey(), w -> beToThrift(kv.getKey(), runtimeFiltersThriftBuilder,
topNFilterThriftSupplier));
fragments.addToParamsList(kv.getValue());
}
Expand Down Expand Up @@ -298,18 +298,23 @@ private static TPlanFragmentDestination instanceToDestination(AssignedJob instan
}

private static TPipelineFragmentParamsList beToThrift(
DistributedPlanWorker worker,
RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder,
Supplier<List<TTopnFilterDesc>> topNFilterThriftSupplier) {
TPipelineFragmentParamsList beParam = new TPipelineFragmentParamsList();
TRuntimeFilterInfo runtimeFilterInfo = new TRuntimeFilterInfo();
runtimeFilterInfo.setTopnFilterDescs(topNFilterThriftSupplier.get());

// set for runtime filter
TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams();
runtimeFilterParams.setRuntimeFilterMergeAddr(runtimeFiltersThriftBuilder.mergeAddress);
if (worker.host().equals(runtimeFiltersThriftBuilder.mergeAddress.getHostname())
&& worker.brpcPort() == runtimeFiltersThriftBuilder.mergeAddress.getPort()) {
// only set following information for merge BE node
runtimeFiltersThriftBuilder.populateRuntimeFilterParams(runtimeFilterParams);
}
runtimeFilterInfo.setRuntimeFilterParams(runtimeFilterParams);
runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams);
beParam.setRuntimeFilterInfo(runtimeFilterInfo);

return beParam;
}

Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ struct TRuntimeFilterParams {
// Runtime filter merge instance address. Used if this filter has a remote target
1: optional Types.TNetworkAddress runtime_filter_merge_addr

// keep 2/3/4/5 unset if BE is not used for merge
// deprecated
2: optional map<i32, list<TRuntimeFilterTargetParams>> rid_to_target_param

Expand Down
Loading