Skip to content

Commit

Permalink
[improve][broker] PIP-192 Supports AntiAffinityGroupPolicy (#19708)
Browse files Browse the repository at this point in the history
Master Issue: #16691

### Motivation

Raising a PR to implement #16691.

We need to support AntiAffinityGroupPolicy in Load Manager Extension as well.

### Modifications
This PR 
- added AntiAffinityGroupPolicyFilter used in the broker assignment logic.
- added AntiAffinityGroupPolicyHelper that is passed to TransferShedder and AntiAffinityGroupPolicyFilter
- modified LoadManagerShared.filterAntiAffinityGroupOwnedBrokers and LoadManagerShared.getAntiAffinityNamespaceOwnedBrokers to accept the bundle ownership data from the Load Manager Extension.
- moved ModularLoadManagerImpl.refreshBrokerToFailureDomainMap(..) to LoadManager.refreshBrokerToFailureDomainMap(..) to reuse it in ExtensibleLoadManagerImpl.
- modified AntiAffinityNamespaceGroupTest to reuse the test cases in AntiAffinityNamespaceGroupExtensionTest
  • Loading branch information
heesung-sn authored Mar 14, 2023
1 parent 9a85dea commit 2ddfbfe
Show file tree
Hide file tree
Showing 11 changed files with 841 additions and 312 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.AntiAffinityGroupPolicyFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerIsolationPoliciesFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerMaxTopicCountFilter;
Expand All @@ -52,6 +53,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler;
Expand Down Expand Up @@ -91,6 +93,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {

private ServiceUnitStateChannel serviceUnitStateChannel;

private AntiAffinityGroupPolicyFilter antiAffinityGroupPolicyFilter;

private AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper;

private LoadDataStore<BrokerLoadData> brokerLoadDataStore;
private LoadDataStore<TopBundlesLoadData> topBundlesLoadDataStore;

Expand Down Expand Up @@ -137,6 +143,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
.build();


/**
* Life cycle: Constructor -> initialize -> start -> close.
*/
Expand Down Expand Up @@ -173,6 +180,11 @@ public void start() throws PulsarServerException {
this.serviceUnitStateChannel.listen(unloadManager);
this.serviceUnitStateChannel.listen(splitManager);
this.serviceUnitStateChannel.start();
this.antiAffinityGroupPolicyHelper =
new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel);
antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
this.antiAffinityGroupPolicyFilter = new AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper);
this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter);

try {
this.brokerLoadDataStore = LoadDataStoreFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener;
Expand Down Expand Up @@ -164,4 +166,10 @@ public interface ServiceUnitStateChannel extends Closeable {
*/
void listen(StateChangeListener listener);

/**
* Returns service unit ownership entry set.
* @return a set of service unit ownership entries
*/
Set<Map.Entry<String, ServiceUnitStateData>> getOwnershipEntrySet();

}
Original file line number Diff line number Diff line change
Expand Up @@ -1317,5 +1317,11 @@ public List<Metrics> getMetrics() {
@Override
public void listen(StateChangeListener listener) {
this.stateChangeListeners.addListener(listener);

}

@Override
public Set<Map.Entry<String, ServiceUnitStateData>> getOwnershipEntrySet() {
return tableview.entrySet();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.filter;

import java.util.Map;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
import org.apache.pulsar.common.naming.ServiceUnitId;

/**
* Filter by anti-affinity-group-policy.
*/
public class AntiAffinityGroupPolicyFilter implements BrokerFilter {

public static final String FILTER_NAME = "broker_anti_affinity_group_filter";

private final AntiAffinityGroupPolicyHelper helper;

public AntiAffinityGroupPolicyFilter(AntiAffinityGroupPolicyHelper helper) {
this.helper = helper;
}

@Override
public Map<String, BrokerLookupData> filter(
Map<String, BrokerLookupData> brokers, ServiceUnitId serviceUnitId, LoadManagerContext context) {
helper.filter(brokers, serviceUnitId.toString());
return brokers;
}


@Override
public String name() {
return FILTER_NAME;
}

@Override
public void initialize(PulsarService pulsar) {
return;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.policies;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.metadata.api.MetadataStoreException;

@Slf4j
public class AntiAffinityGroupPolicyHelper {
PulsarService pulsar;
Map<String, String> brokerToFailureDomainMap;
ServiceUnitStateChannel channel;

public AntiAffinityGroupPolicyHelper(PulsarService pulsar,
ServiceUnitStateChannel channel){

this.pulsar = pulsar;
this.brokerToFailureDomainMap = new HashMap<>();
this.channel = channel;
}

public void filter(
Map<String, BrokerLookupData> brokers, String bundle) {
LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, bundle,
brokers.keySet(),
channel.getOwnershipEntrySet(), brokerToFailureDomainMap);
}

public boolean canUnload(
Map<String, BrokerLookupData> brokers,
String bundle,
String srcBroker,
Optional<String> dstBroker) {



try {
var antiAffinityGroupOptional = LoadManagerShared.getNamespaceAntiAffinityGroup(
pulsar, LoadManagerShared.getNamespaceNameFromBundleName(bundle));
if (antiAffinityGroupOptional.isPresent()) {

// copy to retain the input brokers
Map<String, BrokerLookupData> candidates = new HashMap<>(brokers);

filter(candidates, bundle);

candidates.remove(srcBroker);

// unload case
if (dstBroker.isEmpty()) {
return !candidates.isEmpty();
}

// transfer case
return candidates.containsKey(dstBroker.get());
}
} catch (MetadataStoreException e) {
log.error("Failed to check unload candidates. Assumes that bundle:{} cannot unload ", bundle, e);
return false;
}

return true;
}

public void listenFailureDomainUpdate() {
LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap);
// register listeners for domain changes
pulsar.getPulsarResources().getClusterResources().getFailureDomainResources()
.registerListener(__ -> {
pulsar.getLoadManagerExecutor().execute(() ->
LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.TimeoutException;
import lombok.Getter;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
Expand All @@ -41,13 +40,12 @@
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.broker.loadbalance.extensions.policies.AntiAffinityGroupPolicyHelper;
import org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -80,6 +78,7 @@ public class TransferShedder implements NamespaceUnloadStrategy {
private final PulsarService pulsar;
private final SimpleResourceAllocationPolicies allocationPolicies;
private final IsolationPoliciesHelper isolationPoliciesHelper;
private final AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper;

private final UnloadDecision decision = new UnloadDecision();

Expand All @@ -88,12 +87,14 @@ public TransferShedder(){
this.pulsar = null;
this.allocationPolicies = null;
this.isolationPoliciesHelper = null;
this.antiAffinityGroupPolicyHelper = null;
}

public TransferShedder(PulsarService pulsar){
public TransferShedder(PulsarService pulsar, AntiAffinityGroupPolicyHelper antiAffinityGroupPolicyHelper) {
this.pulsar = pulsar;
this.allocationPolicies = new SimpleResourceAllocationPolicies(pulsar);
this.isolationPoliciesHelper = new IsolationPoliciesHelper(allocationPolicies);
this.antiAffinityGroupPolicyHelper = antiAffinityGroupPolicyHelper;
}


Expand Down Expand Up @@ -438,32 +439,25 @@ private boolean hasMsgThroughput(LoadManagerContext context, String broker) {
private boolean isTransferable(LoadManagerContext context,
Map<String, BrokerLookupData> availableBrokers,
String bundle,
String maxBroker,
Optional<String> broker) {
String srcBroker,
Optional<String> dstBroker) {
if (pulsar == null || allocationPolicies == null) {
return true;
}
String namespace = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
NamespaceName namespaceName = NamespaceName.get(namespace);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
NamespaceBundle namespaceBundle =
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespace, bundleRange);

if (!canTransferWithIsolationPoliciesToBroker(context, availableBrokers, namespaceBundle, maxBroker, broker)) {
if (!canTransferWithIsolationPoliciesToBroker(
context, availableBrokers, namespaceBundle, srcBroker, dstBroker)) {
return false;
}

try {
var localPoliciesOptional = pulsar
.getPulsarResources().getLocalPolicies().getLocalPolicies(namespaceName);
if (localPoliciesOptional.isPresent() && StringUtils.isNotBlank(
localPoliciesOptional.get().namespaceAntiAffinityGroup)) {
return false;
}
} catch (MetadataStoreException e) {
log.error("Failed to get localPolicies. Assumes that bundle:{} is not transferable.", bundle, e);
if (!antiAffinityGroupPolicyHelper.canUnload(availableBrokers, bundle, srcBroker, dstBroker)) {
return false;
}

return true;
}

Expand Down
Loading

0 comments on commit 2ddfbfe

Please sign in to comment.