Skip to content

Commit c58fba7

Browse files
horizonzyhangc0276
authored andcommitted
Correct RackawareEnsemblePlacementPolicyImpl defaultRack when the bookie is not available. (#4439)
When the bookie is not available, the RackawareEnsemblePlacementPolicyImpl default rack will be `/default-region/default-rack`, it should be `/default-rack` for RackawareEnsemblePlacementPolicyImpl. There are some logs. ``` 2024-06-17T05:22:46,591+0000 [ReplicationWorker] ERROR org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy - Cannot resolve bookieId `test-bk-3:3181` to a network address, resolving as /default-region/default-rack org.apache.bookkeeper.proto.BookieAddressResolver$BookieIdNotResolvedException: Cannot resolve bookieId test-bk-3:3181, bookie does not exist or it is not running at org.apache.bookkeeper.client.DefaultBookieAddressResolver.resolve(DefaultBookieAddressResolver.java:66) ~[io.streamnative-bookkeeper-server-4.16.5.2.jar:4.16.5.2] at org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.resolveNetworkLocation(TopologyAwareEnsemblePlacementPolicy.java:821) ~[io.streamnative-bookkeeper-server-4.16.5.2.jar:4.16.5.2] at org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.createBookieNode(TopologyAwareEnsemblePlacementPolicy.java:811) ~[io.streamnative-bookkeeper-server-4.16.5.2.jar:4.16.5.2] at org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.convertBookieToNode(TopologyAwareEnsemblePlacementPolicy.java:845) ~[io.streamnative-bookkeeper-server-4.16.5.2.jar:4.16.5.2] at org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.convertBookiesToNodes(TopologyAwareEnsemblePlacementPolicy.java:837) ~[io.streamnative-bookkeeper-server-4.16.5.2.jar:4.16.5.2] at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.replaceBookie(RackawareEnsemblePlacementPolicyImpl.java:474) ~[io.streamnative-bookkeeper-server-4.16.5.2.jar:4.16.5.2] at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy.replaceBookie(RackawareEnsemblePlacementPolicy.java:119) ~[io.streamnative-bookkeeper-server-4.16.5.2.jar:4.16.5.2] at org.apache.bookkeeper.client.BookKeeperAdmin.getReplacementBookiesByIndexes(BookKeeperAdmin.java:993) ~[io.streamnative-bookkeeper-server-4.16.5.2.jar:4.16.5.2] at org.apache.bookkeeper.client.BookKeeperAdmin.replicateLedgerFragment(BookKeeperAdmin.java:1025) ~[io.streamnative-bookkeeper-server-4.16.5.2.jar:4.16.5.2] at org.apache.bookkeeper.replication.ReplicationWorker.rereplicate(ReplicationWorker.java:473) ~[io.streamnative-bookkeeper-server-4.16.5.2.jar:4.16.5.2] at org.apache.bookkeeper.replication.ReplicationWorker.rereplicate(ReplicationWorker.java:301) ~[io.streamnative-bookkeeper-server-4.16.5.2.jar:4.16.5.2] at org.apache.bookkeeper.replication.ReplicationWorker.run(ReplicationWorker.java:249) ~[io.streamnative-bookkeeper-server-4.16.5.2.jar:4.16.5.2] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final] at java.lang.Thread.run(Thread.java:840) ~[?:?] Caused by: org.apache.bookkeeper.client.BKException$BKBookieHandleNotAvailableException: Bookie handle is not available at org.apache.bookkeeper.discover.ZKRegistrationClient.getBookieServiceInfo(ZKRegistrationClient.java:226) ~[io.streamnative-bookkeeper-server-4.16.5.2.jar:4.16.5.2] at org.apache.bookkeeper.client.DefaultBookieAddressResolver.resolve(DefaultBookieAddressResolver.java:45) ~[io.streamnative-bookkeeper-server-4.16.5.2.jar:4.16.5.2] ... 13 more ``` (cherry picked from commit fb71383)
1 parent 4245f85 commit c58fba7

File tree

3 files changed

+72
-2
lines changed

3 files changed

+72
-2
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java

+1
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ public RackawareEnsemblePlacementPolicyImpl withDefaultRack(String rack) {
281281
return this;
282282
}
283283

284+
@Override
284285
public String getDefaultRack() {
285286
return defaultRack;
286287
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -824,12 +824,17 @@ protected String resolveNetworkLocation(BookieId addr) {
824824
if (null != historyBookie) {
825825
return historyBookie.getNetworkLocation();
826826
}
827+
String defaultRack = getDefaultRack();
827828
LOG.error("Cannot resolve bookieId {} to a network address, resolving as {}. {}", addr,
828-
NetworkTopology.DEFAULT_REGION_AND_RACK, err.getMessage());
829-
return NetworkTopology.DEFAULT_REGION_AND_RACK;
829+
defaultRack, err.getMessage());
830+
return defaultRack;
830831
}
831832
}
832833

834+
protected String getDefaultRack() {
835+
return NetworkTopology.DEFAULT_REGION_AND_RACK;
836+
}
837+
833838
protected Set<Node> convertBookiesToNodes(Collection<BookieId> bookies) {
834839
Set<Node> nodes = new HashSet<Node>();
835840
for (BookieId addr : bookies) {

bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java

+64
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.google.common.util.concurrent.ThreadFactoryBuilder;
3030
import io.netty.util.HashedWheelTimer;
3131
import java.net.InetAddress;
32+
import java.net.UnknownHostException;
3233
import java.util.ArrayList;
3334
import java.util.Arrays;
3435
import java.util.Collection;
@@ -40,6 +41,7 @@
4041
import java.util.Optional;
4142
import java.util.Set;
4243
import java.util.concurrent.TimeUnit;
44+
import java.util.concurrent.atomic.AtomicInteger;
4345
import java.util.function.Consumer;
4446
import junit.framework.TestCase;
4547
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
@@ -2354,6 +2356,68 @@ public void testNodeWithFailures() throws Exception {
23542356
assertEquals(ensemble.get(reoderSet.get(3)), addr2.toBookieId());
23552357
assertEquals(ensemble.get(reoderSet.get(0)), addr3.toBookieId());
23562358
assertEquals(ensemble.get(reoderSet.get(1)), addr4.toBookieId());
2359+
StaticDNSResolver.reset();
2360+
}
2361+
2362+
@Test
2363+
public void testReplaceNotAvailableBookieWithDefaultRack() throws Exception {
2364+
repp.uninitalize();
2365+
repp.withDefaultRack(NetworkTopology.DEFAULT_RACK);
2366+
AtomicInteger counter = new AtomicInteger();
2367+
BookieAddressResolver mockResolver = new BookieAddressResolver() {
2368+
@Override
2369+
public BookieSocketAddress resolve(BookieId bookieId) throws BookieIdNotResolvedException {
2370+
if (bookieId.equals(addr1.toBookieId()) && counter.getAndIncrement() >= 1) {
2371+
throw new BookieIdNotResolvedException(bookieId,
2372+
new RuntimeException(addr1.toBookieId() + " shutdown"));
2373+
}
2374+
try {
2375+
return new BookieSocketAddress(bookieId.toString());
2376+
} catch (UnknownHostException err) {
2377+
throw new BookieIdNotResolvedException(bookieId, err);
2378+
}
2379+
}
2380+
};
2381+
2382+
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE,
2383+
mockResolver);
2384+
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
2385+
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
2386+
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
2387+
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
2388+
// update dns mapping
2389+
StaticDNSResolver.addNodeToRack(addr1.getHostName(), NetworkTopology.DEFAULT_RACK);
2390+
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/r1");
2391+
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/r1");
2392+
StaticDNSResolver.addNodeToRack(addr4.getHostName(), NetworkTopology.DEFAULT_RACK);
2393+
2394+
// Update cluster
2395+
Set<BookieId> addrs = new HashSet<BookieId>();
2396+
addrs.add(addr1.toBookieId());
2397+
addrs.add(addr2.toBookieId());
2398+
addrs.add(addr3.toBookieId());
2399+
addrs.add(addr4.toBookieId());
2400+
repp.onClusterChanged(addrs, new HashSet<BookieId>());
2401+
2402+
// replace node under r1
2403+
EnsemblePlacementPolicy.PlacementResult<BookieId> replaceBookieResponse =
2404+
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(), addr1.toBookieId(), new HashSet<>());
2405+
BookieId replacedBookie = replaceBookieResponse.getResult();
2406+
assertEquals(addr4.toBookieId(), replacedBookie);
2407+
2408+
//clear history bookies and make addr1 shutdown.
2409+
repp = new RackawareEnsemblePlacementPolicy();
2410+
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE,
2411+
mockResolver);
2412+
2413+
addrs.remove(addr1.toBookieId());
2414+
repp.onClusterChanged(addrs, new HashSet<BookieId>());
2415+
2416+
// replace node under r1 again
2417+
replaceBookieResponse =
2418+
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(), addr1.toBookieId(), new HashSet<>());
2419+
replacedBookie = replaceBookieResponse.getResult();
2420+
assertEquals(addr4.toBookieId(), replacedBookie);
23572421
}
23582422

23592423
@Test

0 commit comments

Comments
 (0)