Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
split bookkeeper ports
Browse files Browse the repository at this point in the history
RB_ID=832040
  • Loading branch information
jordanbull committed Jan 27, 2017
1 parent ab16e73 commit e430e6e
Show file tree
Hide file tree
Showing 91 changed files with 2,037 additions and 1,296 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ target/
build/
*.xml
*.jar
.idea/
dist/
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import java.io.IOException;

import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieIdToAddressMapping;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.zookeeper.KeeperException;

Expand Down Expand Up @@ -56,7 +58,7 @@ static class LatencyCallback implements WriteCallback {
boolean complete;
@Override
synchronized public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx) {
BookieId addr, Object ctx) {
if (rc != 0) {
LOG.error("Got error " + rc);
}
Expand All @@ -77,7 +79,7 @@ static class ThroughputCallback implements WriteCallback {
int count;
int waitingCount = Integer.MAX_VALUE;
synchronized public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx) {
BookieId addr, Object ctx) {
if (rc != 0) {
LOG.error("Got error " + rc);
}
Expand Down Expand Up @@ -147,7 +149,12 @@ public static void main(String[] args)
OrderedSafeExecutor executor = new OrderedSafeExecutor(1);

ClientConfiguration conf = new ClientConfiguration();
BookieClient bc = new BookieClient(conf, channelFactory, executor);
BookieClient bc = new BookieClient(conf, channelFactory, executor, new BookieIdToAddressMapping() {
@Override
public BookieSocketAddress getBookieAddress(BookieId bookieId) {
return new BookieSocketAddress(bookieId.getHostName(), bookieId.getPort(), bookieId.getPort());
}
});
LatencyCallback lc = new LatencyCallback();

ThroughputCallback tc = new ThroughputCallback();
Expand All @@ -161,7 +168,7 @@ public static void main(String[] args)
toSend.writeLong(ledger);
toSend.writeLong(entry);
toSend.writerIndex(toSend.capacity());
bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20],
bc.addEntry(new BookieId(addr, port), ledger, new byte[20],
entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
}
LOG.info("Waiting for warmup");
Expand All @@ -179,7 +186,7 @@ public static void main(String[] args)
toSend.writeLong(entry);
toSend.writerIndex(toSend.capacity());
lc.resetComplete();
bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20],
bc.addEntry(new BookieId(addr, port), ledger, new byte[20],
entry, toSend, lc, null, BookieProtocol.FLAG_NONE);
lc.waitForComplete();
}
Expand All @@ -199,7 +206,7 @@ public static void main(String[] args)
toSend.writeLong(ledger);
toSend.writeLong(entry);
toSend.writerIndex(toSend.capacity());
bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20],
bc.addEntry(new BookieId(addr, port), ledger, new byte[20],
entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
}
tc.waitFor(entryCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -56,7 +56,7 @@ public void testThroughputLatency() throws Exception {

@Test
public void testBookie() throws Exception {
BookieSocketAddress bookie = getBookie(0);
BookieId bookie = getBookie(0);
BenchBookie.main(new String[] {
"--host", bookie.getHostName(),
"--port", String.valueOf(bookie.getPort()),
Expand Down
8 changes: 8 additions & 0 deletions bookkeeper-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,14 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.ActiveLedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNS;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.DataFormats;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
Expand Down Expand Up @@ -182,7 +184,7 @@ public long getEntry() {
static class NopWriteCallback implements WriteCallback {
@Override
public void writeComplete(int rc, long ledgerId, long entryId,
BookieSocketAddress addr, Object ctx) {
BookieId addr, Object ctx) {
if (LOG.isDebugEnabled()) {
LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",
new Object[] { entryId, ledgerId, addr, rc });
Expand Down Expand Up @@ -319,12 +321,25 @@ public static BookieSocketAddress getBookieAddress(ServerConfiguration conf)
if (conf.getUseHostNameAsBookieID()) {
hostAddress = inetAddr.getAddress().getCanonicalHostName();
}
int readPort;
if (conf.isAlternateBookiePortEnabled()) {
readPort = conf.getAlternateBookiePort();
} else {
readPort = conf.getBookiePort();
}
BookieSocketAddress addr =
new BookieSocketAddress(hostAddress, conf.getBookiePort());
if (addr.getSocketAddress().getAddress().isLoopbackAddress()
new BookieSocketAddress(hostAddress, conf.getBookiePort(), readPort);
if ((addr.getReadAddress().getAddress().isLoopbackAddress()
|| addr.getWriteAddress().getAddress().isLoopbackAddress())
&& !conf.getAllowLoopback()) {
InetSocketAddress loopedAddr;
if (addr.getReadAddress().getAddress().isLoopbackAddress()) {
loopedAddr = addr.getReadAddress();
} else {
loopedAddr = addr.getWriteAddress();
}
throw new UnknownHostException("Trying to listen on loopback address, "
+ addr + " but this is forbidden by default "
+ loopedAddr + " but this is forbidden by default "
+ "(see ServerConfiguration#getAllowLoopback())");
}
return addr;
Expand Down Expand Up @@ -431,7 +446,7 @@ public int getExitCode() {
}

private String getMyId() throws UnknownHostException {
return Bookie.getBookieAddress(conf).toString();
return Bookie.getBookieAddress(conf).getBookieId().toString();
}

void readJournal() throws IOException, BookieException {
Expand Down Expand Up @@ -765,7 +780,15 @@ private void doRegisterBookie(final String regPath) throws IOException {
try{
if (!checkRegNodeAndWaitExpired(regPath)) {
// Create the ZK ephemeral node for this Bookie.
zk.create(regPath, new byte[0], Ids.OPEN_ACL_UNSAFE,

DataFormats.BookieFormat.Builder builder = DataFormats.BookieFormat.newBuilder()
.setWritePort(conf.getBookiePort());
if (conf.isAlternateBookiePortEnabled()) {
builder.setReadPort(conf.getAlternateBookiePort());
} else {
builder.setReadPort(conf.getBookiePort());
}
zk.create(regPath, builder.build().toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
LOG.info("Registered myself in ZooKeeper at {}.", regPath);
}
Expand Down Expand Up @@ -1240,7 +1263,7 @@ static class CounterCallback implements WriteCallback {
int count;

@Override
synchronized public void writeComplete(int rc, long l, long e, BookieSocketAddress addr, Object ctx) {
synchronized public void writeComplete(int rc, long l, long e, BookieId addr, Object ctx) {
count--;
if (count == 0) {
notifyAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.EntryFormatter;
Expand Down Expand Up @@ -218,7 +219,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
ZooKeeperClient.createConnectedZooKeeperClient(conf.getZkServers(),
conf.getZkTimeout());
try {
Cookie.removeCookieForBookie(conf, zkc, address);
Cookie.removeCookieForBookie(conf, zkc, address.getBookieId());
} catch (KeeperException.NoNodeException nne) {
// ignore no node exception
LOG.warn("No cookie to remove for {} : ", address, nne);
Expand Down Expand Up @@ -285,15 +286,15 @@ int runCmd(CommandLine cmdLine) throws Exception {

// Get bookies list
final String[] bookieStrs = args[0].split(",");
final Set<BookieSocketAddress> bookieAddrs = new HashSet<BookieSocketAddress>();
final Set<BookieId> bookieAddrs = new HashSet<BookieId>();
for (String bookieStr : bookieStrs) {
final String bookieStrParts[] = bookieStr.split(":");
if (bookieStrParts.length != 2) {
System.err.println("BookieSrcs has invalid bookie address format (host:port expected) : "
+ bookieStr);
return -1;
}
bookieAddrs.add(new BookieSocketAddress(bookieStrParts[0],
bookieAddrs.add(new BookieId(bookieStrParts[0],
Integer.parseInt(bookieStrParts[1])));
}

Expand All @@ -320,7 +321,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
}
}

private int bkQuery(BookKeeperAdmin bkAdmin, Set<BookieSocketAddress> bookieAddrs)
private int bkQuery(BookKeeperAdmin bkAdmin, Set<BookieId> bookieAddrs)
throws InterruptedException, BKException {
SortedMap<Long, LedgerMetadata> ledgersContainBookies =
bkAdmin.getLedgersContainBookies(bookieAddrs);
Expand All @@ -340,13 +341,13 @@ private int bkQuery(BookKeeperAdmin bkAdmin, Set<BookieSocketAddress> bookieAddr
return 0;
}

private Map<Long, Integer> inspectLedger(LedgerMetadata metadata, Set<BookieSocketAddress> bookiesToInspect) {
private Map<Long, Integer> inspectLedger(LedgerMetadata metadata, Set<BookieId> bookiesToInspect) {
Map<Long, Integer> numBookiesToReplacePerEnsemble = new TreeMap<Long, Integer>();
for (Map.Entry<Long, ArrayList<BookieSocketAddress>> ensemble : metadata.getEnsembles().entrySet()) {
ArrayList<BookieSocketAddress> bookieList = ensemble.getValue();
for (Map.Entry<Long, ArrayList<BookieId>> ensemble : metadata.getEnsembles().entrySet()) {
ArrayList<BookieId> bookieList = ensemble.getValue();
System.out.print(ensemble.getKey() + ":\t");
int numBookiesToReplace = 0;
for (BookieSocketAddress bookie: bookieList) {
for (BookieId bookie: bookieList) {
System.out.print(bookie);
if (bookiesToInspect.contains(bookie)) {
System.out.print("*");
Expand All @@ -362,24 +363,24 @@ private Map<Long, Integer> inspectLedger(LedgerMetadata metadata, Set<BookieSock
return numBookiesToReplacePerEnsemble;
}

private int bkRecovery(BookKeeperAdmin bkAdmin, long lid, Set<BookieSocketAddress> bookieAddrs,
private int bkRecovery(BookKeeperAdmin bkAdmin, long lid, Set<BookieId> bookieAddrs,
boolean dryrun, boolean skipOpenLedgers, boolean removeCookies)
throws InterruptedException, BKException, KeeperException {
bkAdmin.recoverBookieData(lid, bookieAddrs, dryrun, skipOpenLedgers);
if (removeCookies) {
for (BookieSocketAddress addr : bookieAddrs) {
for (BookieId addr : bookieAddrs) {
Cookie.removeCookieForBookie(bkConf, bkAdmin.getZooKeeper(), addr);
}
}
return 0;
}

private int bkRecovery(BookKeeperAdmin bkAdmin, Set<BookieSocketAddress> bookieAddrs,
private int bkRecovery(BookKeeperAdmin bkAdmin, Set<BookieId> bookieAddrs,
boolean dryrun, boolean skipOpenLedgers, boolean removeCookies)
throws InterruptedException, BKException, KeeperException {
bkAdmin.recoverBookieData(bookieAddrs, dryrun, skipOpenLedgers);
if (removeCookies) {
for (BookieSocketAddress addr : bookieAddrs) {
for (BookieId addr : bookieAddrs) {
Cookie.removeCookieForBookie(bkConf, bkAdmin.getZooKeeper(), addr);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.io.StringReader;
import java.net.UnknownHostException;

import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.BookieId;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -208,7 +208,7 @@ static Cookie generateCookie(ServerConfiguration conf)
throws UnknownHostException {
Cookie c = new Cookie();
c.layoutVersion = CURRENT_COOKIE_LAYOUT_VERSION;
c.bookieHost = Bookie.getBookieAddress(conf).toString();
c.bookieHost = Bookie.getBookieAddress(conf).getBookieId().toString();
c.journalDir = conf.getJournalDirName();
StringBuilder b = new StringBuilder();
String[] dirs = conf.getLedgerDirNames();
Expand Down Expand Up @@ -253,16 +253,16 @@ public void setInstanceId(String instanceId) {

private static String getZkPath(ServerConfiguration conf)
throws UnknownHostException {
return getZkPath(conf, Bookie.getBookieAddress(conf));
return getZkPath(conf, Bookie.getBookieAddress(conf).getBookieId());
}

private static String getZkPath(ServerConfiguration conf, BookieSocketAddress address) {
private static String getZkPath(ServerConfiguration conf, BookieId address) {
String bookieCookiePath = conf.getZkLedgersRootPath() + "/" + COOKIE_NODE;
return bookieCookiePath + "/" + address;
}

public static void removeCookieForBookie(ServerConfiguration conf, ZooKeeper zk,
BookieSocketAddress address)
BookieId address)
throws KeeperException, InterruptedException {
String zkPath = getZkPath(conf, address);
zk.delete(zkPath, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,9 @@ private BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFa
.traceTaskWarnTimeMicroSec(conf.getTaskExecutionWarnTimeMicros())
.build();


bookieClient = new BookieClient(conf, this.channelFactory, mainWorkerPool, statsLogger, requestTimer, Optional.fromNullable(dnsResolver));
bookieWatcher = new BookieWatcher(conf, scheduler, placementPolicy, this);
bookieWatcher.readBookiesBlocking();
bookieClient = new BookieClient(conf, this.channelFactory, mainWorkerPool, statsLogger, requestTimer, Optional.fromNullable(dnsResolver), bookieWatcher);

ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
ledgerManager = TimedLedgerManager.of(new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager()),
Expand Down
Loading

0 comments on commit e430e6e

Please sign in to comment.