diff --git a/.gitignore b/.gitignore
index 119d7fe7..8dd454b3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,3 +6,5 @@ target/
build/
*.xml
*.jar
+.idea/
+dist/
diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
index 92241e6b..03e28411 100644
--- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
+++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java
@@ -23,6 +23,8 @@
import java.io.IOException;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.net.BookieIdToAddressMapping;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.zookeeper.KeeperException;
@@ -56,7 +58,7 @@ static class LatencyCallback implements WriteCallback {
boolean complete;
@Override
synchronized public void writeComplete(int rc, long ledgerId, long entryId,
- BookieSocketAddress addr, Object ctx) {
+ BookieId addr, Object ctx) {
if (rc != 0) {
LOG.error("Got error " + rc);
}
@@ -77,7 +79,7 @@ static class ThroughputCallback implements WriteCallback {
int count;
int waitingCount = Integer.MAX_VALUE;
synchronized public void writeComplete(int rc, long ledgerId, long entryId,
- BookieSocketAddress addr, Object ctx) {
+ BookieId addr, Object ctx) {
if (rc != 0) {
LOG.error("Got error " + rc);
}
@@ -147,7 +149,12 @@ public static void main(String[] args)
OrderedSafeExecutor executor = new OrderedSafeExecutor(1);
ClientConfiguration conf = new ClientConfiguration();
- BookieClient bc = new BookieClient(conf, channelFactory, executor);
+ BookieClient bc = new BookieClient(conf, channelFactory, executor, new BookieIdToAddressMapping() {
+ @Override
+ public BookieSocketAddress getBookieAddress(BookieId bookieId) {
+ return new BookieSocketAddress(bookieId.getHostName(), bookieId.getPort(), bookieId.getPort());
+ }
+ });
LatencyCallback lc = new LatencyCallback();
ThroughputCallback tc = new ThroughputCallback();
@@ -161,7 +168,7 @@ public static void main(String[] args)
toSend.writeLong(ledger);
toSend.writeLong(entry);
toSend.writerIndex(toSend.capacity());
- bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20],
+ bc.addEntry(new BookieId(addr, port), ledger, new byte[20],
entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
}
LOG.info("Waiting for warmup");
@@ -179,7 +186,7 @@ public static void main(String[] args)
toSend.writeLong(entry);
toSend.writerIndex(toSend.capacity());
lc.resetComplete();
- bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20],
+ bc.addEntry(new BookieId(addr, port), ledger, new byte[20],
entry, toSend, lc, null, BookieProtocol.FLAG_NONE);
lc.waitForComplete();
}
@@ -199,7 +206,7 @@ public static void main(String[] args)
toSend.writeLong(ledger);
toSend.writeLong(entry);
toSend.writerIndex(toSend.capacity());
- bc.addEntry(new BookieSocketAddress(addr, port), ledger, new byte[20],
+ bc.addEntry(new BookieId(addr, port), ledger, new byte[20],
entry, toSend, tc, null, BookieProtocol.FLAG_NONE);
}
tc.waitFor(entryCount);
diff --git a/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java b/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
index d7ce2172..a92eb5f7 100644
--- a/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
+++ b/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
@@ -25,7 +25,7 @@
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,7 +56,7 @@ public void testThroughputLatency() throws Exception {
@Test
public void testBookie() throws Exception {
- BookieSocketAddress bookie = getBookie(0);
+ BookieId bookie = getBookie(0);
BenchBookie.main(new String[] {
"--host", bookie.getHostName(),
"--port", String.valueOf(bookie.getPort()),
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index ccea948c..297375ca 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -307,6 +307,14 @@
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ 1.8
+
+
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 3cefafb7..f5ff66ba 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -54,9 +54,11 @@
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.ActiveLedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNS;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.proto.DataFormats;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -182,7 +184,7 @@ public long getEntry() {
static class NopWriteCallback implements WriteCallback {
@Override
public void writeComplete(int rc, long ledgerId, long entryId,
- BookieSocketAddress addr, Object ctx) {
+ BookieId addr, Object ctx) {
if (LOG.isDebugEnabled()) {
LOG.debug("Finished writing entry {} @ ledger {} for {} : {}",
new Object[] { entryId, ledgerId, addr, rc });
@@ -319,12 +321,25 @@ public static BookieSocketAddress getBookieAddress(ServerConfiguration conf)
if (conf.getUseHostNameAsBookieID()) {
hostAddress = inetAddr.getAddress().getCanonicalHostName();
}
+ int readPort;
+ if (conf.isAlternateBookiePortEnabled()) {
+ readPort = conf.getAlternateBookiePort();
+ } else {
+ readPort = conf.getBookiePort();
+ }
BookieSocketAddress addr =
- new BookieSocketAddress(hostAddress, conf.getBookiePort());
- if (addr.getSocketAddress().getAddress().isLoopbackAddress()
+ new BookieSocketAddress(hostAddress, conf.getBookiePort(), readPort);
+ if ((addr.getReadAddress().getAddress().isLoopbackAddress()
+ || addr.getWriteAddress().getAddress().isLoopbackAddress())
&& !conf.getAllowLoopback()) {
+ InetSocketAddress loopedAddr;
+ if (addr.getReadAddress().getAddress().isLoopbackAddress()) {
+ loopedAddr = addr.getReadAddress();
+ } else {
+ loopedAddr = addr.getWriteAddress();
+ }
throw new UnknownHostException("Trying to listen on loopback address, "
- + addr + " but this is forbidden by default "
+ + loopedAddr + " but this is forbidden by default "
+ "(see ServerConfiguration#getAllowLoopback())");
}
return addr;
@@ -431,7 +446,7 @@ public int getExitCode() {
}
private String getMyId() throws UnknownHostException {
- return Bookie.getBookieAddress(conf).toString();
+ return Bookie.getBookieAddress(conf).getBookieId().toString();
}
void readJournal() throws IOException, BookieException {
@@ -765,7 +780,15 @@ private void doRegisterBookie(final String regPath) throws IOException {
try{
if (!checkRegNodeAndWaitExpired(regPath)) {
// Create the ZK ephemeral node for this Bookie.
- zk.create(regPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
+
+ DataFormats.BookieFormat.Builder builder = DataFormats.BookieFormat.newBuilder()
+ .setWritePort(conf.getBookiePort());
+ if (conf.isAlternateBookiePortEnabled()) {
+ builder.setReadPort(conf.getAlternateBookiePort());
+ } else {
+ builder.setReadPort(conf.getBookiePort());
+ }
+ zk.create(regPath, builder.build().toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
LOG.info("Registered myself in ZooKeeper at {}.", regPath);
}
@@ -1240,7 +1263,7 @@ static class CounterCallback implements WriteCallback {
int count;
@Override
- synchronized public void writeComplete(int rc, long l, long e, BookieSocketAddress addr, Object ctx) {
+ synchronized public void writeComplete(int rc, long l, long e, BookieId addr, Object ctx) {
count--;
if (count == 0) {
notifyAll();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index bbb611b5..be32cf0b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -41,6 +41,7 @@
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.EntryFormatter;
@@ -218,7 +219,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
ZooKeeperClient.createConnectedZooKeeperClient(conf.getZkServers(),
conf.getZkTimeout());
try {
- Cookie.removeCookieForBookie(conf, zkc, address);
+ Cookie.removeCookieForBookie(conf, zkc, address.getBookieId());
} catch (KeeperException.NoNodeException nne) {
// ignore no node exception
LOG.warn("No cookie to remove for {} : ", address, nne);
@@ -285,7 +286,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
// Get bookies list
final String[] bookieStrs = args[0].split(",");
- final Set bookieAddrs = new HashSet();
+ final Set bookieAddrs = new HashSet();
for (String bookieStr : bookieStrs) {
final String bookieStrParts[] = bookieStr.split(":");
if (bookieStrParts.length != 2) {
@@ -293,7 +294,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
+ bookieStr);
return -1;
}
- bookieAddrs.add(new BookieSocketAddress(bookieStrParts[0],
+ bookieAddrs.add(new BookieId(bookieStrParts[0],
Integer.parseInt(bookieStrParts[1])));
}
@@ -320,7 +321,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
}
}
- private int bkQuery(BookKeeperAdmin bkAdmin, Set bookieAddrs)
+ private int bkQuery(BookKeeperAdmin bkAdmin, Set bookieAddrs)
throws InterruptedException, BKException {
SortedMap ledgersContainBookies =
bkAdmin.getLedgersContainBookies(bookieAddrs);
@@ -340,13 +341,13 @@ private int bkQuery(BookKeeperAdmin bkAdmin, Set bookieAddr
return 0;
}
- private Map inspectLedger(LedgerMetadata metadata, Set bookiesToInspect) {
+ private Map inspectLedger(LedgerMetadata metadata, Set bookiesToInspect) {
Map numBookiesToReplacePerEnsemble = new TreeMap();
- for (Map.Entry> ensemble : metadata.getEnsembles().entrySet()) {
- ArrayList bookieList = ensemble.getValue();
+ for (Map.Entry> ensemble : metadata.getEnsembles().entrySet()) {
+ ArrayList bookieList = ensemble.getValue();
System.out.print(ensemble.getKey() + ":\t");
int numBookiesToReplace = 0;
- for (BookieSocketAddress bookie: bookieList) {
+ for (BookieId bookie: bookieList) {
System.out.print(bookie);
if (bookiesToInspect.contains(bookie)) {
System.out.print("*");
@@ -362,24 +363,24 @@ private Map inspectLedger(LedgerMetadata metadata, Set bookieAddrs,
+ private int bkRecovery(BookKeeperAdmin bkAdmin, long lid, Set bookieAddrs,
boolean dryrun, boolean skipOpenLedgers, boolean removeCookies)
throws InterruptedException, BKException, KeeperException {
bkAdmin.recoverBookieData(lid, bookieAddrs, dryrun, skipOpenLedgers);
if (removeCookies) {
- for (BookieSocketAddress addr : bookieAddrs) {
+ for (BookieId addr : bookieAddrs) {
Cookie.removeCookieForBookie(bkConf, bkAdmin.getZooKeeper(), addr);
}
}
return 0;
}
- private int bkRecovery(BookKeeperAdmin bkAdmin, Set bookieAddrs,
+ private int bkRecovery(BookKeeperAdmin bkAdmin, Set bookieAddrs,
boolean dryrun, boolean skipOpenLedgers, boolean removeCookies)
throws InterruptedException, BKException, KeeperException {
bkAdmin.recoverBookieData(bookieAddrs, dryrun, skipOpenLedgers);
if (removeCookies) {
- for (BookieSocketAddress addr : bookieAddrs) {
+ for (BookieId addr : bookieAddrs) {
Cookie.removeCookieForBookie(bkConf, bkAdmin.getZooKeeper(), addr);
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
index 12f0aaa0..d1320a11 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
@@ -34,7 +34,7 @@
import java.io.StringReader;
import java.net.UnknownHostException;
-import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.KeeperException;
@@ -208,7 +208,7 @@ static Cookie generateCookie(ServerConfiguration conf)
throws UnknownHostException {
Cookie c = new Cookie();
c.layoutVersion = CURRENT_COOKIE_LAYOUT_VERSION;
- c.bookieHost = Bookie.getBookieAddress(conf).toString();
+ c.bookieHost = Bookie.getBookieAddress(conf).getBookieId().toString();
c.journalDir = conf.getJournalDirName();
StringBuilder b = new StringBuilder();
String[] dirs = conf.getLedgerDirNames();
@@ -253,16 +253,16 @@ public void setInstanceId(String instanceId) {
private static String getZkPath(ServerConfiguration conf)
throws UnknownHostException {
- return getZkPath(conf, Bookie.getBookieAddress(conf));
+ return getZkPath(conf, Bookie.getBookieAddress(conf).getBookieId());
}
- private static String getZkPath(ServerConfiguration conf, BookieSocketAddress address) {
+ private static String getZkPath(ServerConfiguration conf, BookieId address) {
String bookieCookiePath = conf.getZkLedgersRootPath() + "/" + COOKIE_NODE;
return bookieCookiePath + "/" + address;
}
public static void removeCookieForBookie(ServerConfiguration conf, ZooKeeper zk,
- BookieSocketAddress address)
+ BookieId address)
throws KeeperException, InterruptedException {
String zkPath = getZkPath(conf, address);
zk.delete(zkPath, -1);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index c7847454..99aa8e55 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -375,10 +375,9 @@ private BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFa
.traceTaskWarnTimeMicroSec(conf.getTaskExecutionWarnTimeMicros())
.build();
-
- bookieClient = new BookieClient(conf, this.channelFactory, mainWorkerPool, statsLogger, requestTimer, Optional.fromNullable(dnsResolver));
bookieWatcher = new BookieWatcher(conf, scheduler, placementPolicy, this);
bookieWatcher.readBookiesBlocking();
+ bookieClient = new BookieClient(conf, this.channelFactory, mainWorkerPool, statsLogger, requestTimer, Optional.fromNullable(dnsResolver), bookieWatcher);
ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
ledgerManager = TimedLedgerManager.of(new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager()),
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index b13ae98b..fbd2938a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -43,7 +43,7 @@
import org.apache.bookkeeper.client.BookKeeper.SyncOpenCallback;
import org.apache.bookkeeper.client.LedgerFragmentReplicator.SingleFragmentCallback;
import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
@@ -178,16 +178,16 @@ public void close() throws InterruptedException, BKException {
*
* @return the registered bookie list.
*/
- public Collection getRegisteredBookies()
+ public Collection getRegisteredBookies()
throws BKException {
String cookiePath = bkc.getConf().getZkLedgersRootPath() + "/"
+ BookKeeperConstants.COOKIE_NODE;
try {
List children = zk.getChildren(cookiePath, false);
- List bookies = new ArrayList(children.size());
+ List bookies = new ArrayList(children.size());
for (String child : children) {
try {
- bookies.add(new BookieSocketAddress(child));
+ bookies.add(new BookieId(child));
} catch (IOException ioe) {
LOG.error("Error parsing bookie address {} : ", child, ioe);
throw new BKException.ZKException();
@@ -311,7 +311,7 @@ public SyncObject() {
}
}
- public SortedMap getLedgersContainBookies(Set bookies)
+ public SortedMap getLedgersContainBookies(Set bookies)
throws InterruptedException, BKException {
final SyncObject sync = new SyncObject();
final AtomicReference> resultHolder =
@@ -339,7 +339,7 @@ public void operationComplete(int rc, SortedMap result) {
return resultHolder.get();
}
- public void asyncGetLedgersContainBookies(final Set bookies,
+ public void asyncGetLedgersContainBookies(final Set bookies,
final GenericCallback> callback) {
final SortedMap ledgers = new ConcurrentSkipListMap();
bkc.getLedgerManager().asyncProcessLedgers(new Processor() {
@@ -356,8 +356,8 @@ public void operationComplete(int rc, LedgerMetadata metadata) {
cb.processResult(rc, null, null);
return;
}
- Set bookiesInLedger = metadata.getBookiesInThisLedger();
- Sets.SetView intersection =
+ Set bookiesInLedger = metadata.getBookiesInThisLedger();
+ Sets.SetView intersection =
Sets.intersection(bookiesInLedger, bookies);
if (!intersection.isEmpty()) {
ledgers.put(lid, metadata);
@@ -392,19 +392,19 @@ public void processResult(int rc, String path, Object ctx) {
* Optional destination bookie that if passed, we will copy all
* of the ledger fragments from the source bookie over to it.
*/
- public void recoverBookieData(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest)
+ public void recoverBookieData(final BookieId bookieSrc, final BookieId bookieDest)
throws InterruptedException, BKException {
- Set bookiesSrc = new HashSet();
+ Set bookiesSrc = new HashSet();
bookiesSrc.add(bookieSrc);
recoverBookieData(bookiesSrc);
}
- public void recoverBookieData(final Set bookiesSrc)
+ public void recoverBookieData(final Set bookiesSrc)
throws InterruptedException, BKException {
recoverBookieData(bookiesSrc, false, false);
}
- public void recoverBookieData(final Set bookiesSrc, boolean dryrun, boolean skipOpenLedgers)
+ public void recoverBookieData(final Set bookiesSrc, boolean dryrun, boolean skipOpenLedgers)
throws InterruptedException, BKException {
SyncObject sync = new SyncObject();
// Call the async method to recover bookie data.
@@ -432,7 +432,7 @@ public void recoverComplete(int rc, Object ctx) {
}
}
- public void recoverBookieData(final long lid, final Set bookiesSrc, boolean dryrun, boolean skipOpenLedgers)
+ public void recoverBookieData(final long lid, final Set bookiesSrc, boolean dryrun, boolean skipOpenLedgers)
throws InterruptedException, BKException {
SyncObject sync = new SyncObject();
// Call the async method to recover bookie data.
@@ -483,19 +483,19 @@ public void recoverComplete(int rc, Object ctx) {
* @param context
* Context for the RecoverCallback to call.
*/
- public void asyncRecoverBookieData(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest,
+ public void asyncRecoverBookieData(final BookieId bookieSrc, final BookieId bookieDest,
final RecoverCallback cb, final Object context) {
- Set bookiesSrc = new HashSet();
+ Set bookiesSrc = new HashSet();
bookiesSrc.add(bookieSrc);
asyncRecoverBookieData(bookiesSrc, cb, context);
}
- public void asyncRecoverBookieData(final Set bookieSrc,
+ public void asyncRecoverBookieData(final Set bookieSrc,
final RecoverCallback cb, final Object context) {
asyncRecoverBookieData(bookieSrc, false, false, cb, context);
}
- public void asyncRecoverBookieData(final Set bookieSrc, boolean dryrun,
+ public void asyncRecoverBookieData(final Set bookieSrc, boolean dryrun,
final boolean skipOpenLedgers, final RecoverCallback cb, final Object context) {
getActiveLedgers(bookieSrc, dryrun, skipOpenLedgers, cb, context);
}
@@ -517,7 +517,7 @@ public void asyncRecoverBookieData(final Set bookieSrc, boo
* @param context
* Context for the RecoverCallback to call.
*/
- public void asyncRecoverBookieData(long lid, final Set bookieSrc, boolean dryrun,
+ public void asyncRecoverBookieData(long lid, final Set bookieSrc, boolean dryrun,
boolean skipOpenLedgers, final RecoverCallback callback, final Object context) {
AsyncCallback.VoidCallback callbackWrapper = new AsyncCallback.VoidCallback() {
@Override
@@ -547,7 +547,7 @@ public void processResult(int rc, String path, Object ctx) {
* @param context
* Context for the RecoverCallback to call.
*/
- private void getActiveLedgers(final Set bookiesSrc, final boolean dryrun,
+ private void getActiveLedgers(final Set bookiesSrc, final boolean dryrun,
final boolean skipOpenLedgers, final RecoverCallback cb, final Object context) {
// Wrapper class around the RecoverCallback so it can be used
// as the final VoidCallback to process ledgers
@@ -592,7 +592,7 @@ ledgerProcessor, new RecoverCallbackWrapper(cb),
* IterationCallback to invoke once we've recovered the current
* ledger.
*/
- private void recoverLedger(final Set bookiesSrc, final long lId, final boolean dryrun,
+ private void recoverLedger(final Set bookiesSrc, final long lId, final boolean dryrun,
final boolean skipOpenLedgers, final AsyncCallback.VoidCallback finalLedgerIterCb) {
LOG.debug("Recovering ledger : {}", lId);
@@ -623,7 +623,7 @@ public void openComplete(int rc, final LedgerHandle lh, Object ctx) {
if (!lm.isClosed() &&
lm.getEnsembles().size() > 0) {
Long lastKey = lm.getEnsembles().lastKey();
- ArrayList lastEnsemble = lm.getEnsembles().get(lastKey);
+ ArrayList lastEnsemble = lm.getEnsembles().get(lastKey);
// the original write has not removed faulty bookie from
// current ledger ensemble. to avoid data loss issue in
// the case of concurrent updates to the ensemble composition,
@@ -692,7 +692,7 @@ public void processResult(int rc, String path, Object ctx) {
*/
Map ledgerFragmentsRange = new HashMap();
Long curEntryId = null;
- for (Map.Entry> entry : lh.getLedgerMetadata().getEnsembles()
+ for (Map.Entry> entry : lh.getLedgerMetadata().getEnsembles()
.entrySet()) {
if (curEntryId != null)
ledgerFragmentsRange.put(curEntryId, entry.getKey() - 1);
@@ -738,9 +738,9 @@ public void processResult(int rc, String path, Object ctx) {
*/
for (final Long startEntryId : ledgerFragmentsToRecover) {
Long endEntryId = ledgerFragmentsRange.get(startEntryId);
- ArrayList ensemble = lh.getLedgerMetadata().getEnsembles().get(startEntryId);
+ ArrayList ensemble = lh.getLedgerMetadata().getEnsembles().get(startEntryId);
// Get bookies to replace
- Map targetBookieAddresses;
+ Map targetBookieAddresses;
try {
targetBookieAddresses = getReplacedBookies(lh, ensemble, bookiesSrc);
} catch (BKException.BKNotEnoughBookiesException e) {
@@ -754,7 +754,7 @@ public void processResult(int rc, String path, Object ctx) {
}
if (dryrun) {
- ArrayList newEnsemble =
+ ArrayList newEnsemble =
replaceBookiesInEnsemble(ensemble, targetBookieAddresses);
System.out.println(" Fragment [" + startEntryId + " - " + endEntryId + " ] : ");
System.out.println(" old ensemble : " + formatEnsemble(ensemble, bookiesSrc, '*'));
@@ -785,7 +785,7 @@ public void processResult(int rc, String path, Object ctx) {
}, null);
}
- static String formatEnsemble(ArrayList ensemble, Set bookiesSrc, char marker) {
+ static String formatEnsemble(ArrayList ensemble, Set bookiesSrc, char marker) {
StringBuilder sb = new StringBuilder();
sb.append("[");
for (int i = 0; i < ensemble.size(); i++) {
@@ -822,18 +822,18 @@ static String formatEnsemble(ArrayList ensemble, Set newBookies) throws InterruptedException {
+ final Set newBookies) throws InterruptedException {
lfr.replicate(lh, ledgerFragment, ledgerFragmentMcb, newBookies);
}
- private Map getReplacedBookies(
+ private Map getReplacedBookies(
LedgerHandle lh,
- List ensemble,
- Set bookiesToRereplicate)
+ List ensemble,
+ Set bookiesToRereplicate)
throws BKException.BKNotEnoughBookiesException {
Set bookieIndexesToRereplicate = Sets.newHashSet();
for (int bookieIndex = 0; bookieIndex < ensemble.size(); bookieIndex++) {
- BookieSocketAddress bookieInEnsemble = ensemble.get(bookieIndex);
+ BookieId bookieInEnsemble = ensemble.get(bookieIndex);
if (bookiesToRereplicate.contains(bookieInEnsemble)) {
bookieIndexesToRereplicate.add(bookieIndex);
}
@@ -842,31 +842,31 @@ private Map getReplacedBookies(
lh, ensemble, bookieIndexesToRereplicate, Optional.of(bookiesToRereplicate));
}
- private Map getReplacedBookiesByIndexes(
+ private Map getReplacedBookiesByIndexes(
LedgerHandle lh,
- List ensemble,
+ List ensemble,
Set bookieIndexesToRereplicate,
- Optional> excludedBookies)
+ Optional> excludedBookies)
throws BKException.BKNotEnoughBookiesException {
// target bookies to replicate
- Map targetBookieAddresses =
+ Map targetBookieAddresses =
Maps.newHashMapWithExpectedSize(bookieIndexesToRereplicate.size());
// bookies to exclude for ensemble allocation
- Set bookiesToExclude = Sets.newHashSet();
+ Set bookiesToExclude = Sets.newHashSet();
if (excludedBookies.isPresent()) {
bookiesToExclude.addAll(excludedBookies.get());
}
// excluding bookies that need to be replicated
for (Integer bookieIndex : bookieIndexesToRereplicate) {
- BookieSocketAddress bookie = ensemble.get(bookieIndex);
+ BookieId bookie = ensemble.get(bookieIndex);
bookiesToExclude.add(bookie);
}
// allocate bookies
for (Integer bookieIndex : bookieIndexesToRereplicate) {
- BookieSocketAddress oldBookie = ensemble.get(bookieIndex);
- BookieSocketAddress newBookie =
+ BookieId oldBookie = ensemble.get(bookieIndex);
+ BookieId newBookie =
bkc.getPlacementPolicy().replaceBookie(
lh.getLedgerMetadata().getEnsembleSize(),
lh.getLedgerMetadata().getWriteQuorumSize(),
@@ -881,11 +881,11 @@ private Map getReplacedBookiesByIndexes(
return targetBookieAddresses;
}
- private ArrayList replaceBookiesInEnsemble(
- List ensemble,
- Map replacedBookies) {
- ArrayList newEnsemble = Lists.newArrayList(ensemble);
- for (Map.Entry entry : replacedBookies.entrySet()) {
+ private ArrayList replaceBookiesInEnsemble(
+ List ensemble,
+ Map replacedBookies) {
+ ArrayList newEnsemble = Lists.newArrayList(ensemble);
+ for (Map.Entry entry : replacedBookies.entrySet()) {
newEnsemble.set(entry.getKey(), entry.getValue());
}
return newEnsemble;
@@ -902,8 +902,8 @@ private ArrayList replaceBookiesInEnsemble(
public void replicateLedgerFragment(LedgerHandle lh,
final LedgerFragment ledgerFragment)
throws InterruptedException, BKException {
- Optional> excludedBookies = Optional.absent();
- Map targetBookieAddresses =
+ Optional> excludedBookies = Optional.absent();
+ Map targetBookieAddresses =
getReplacedBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
ledgerFragment.getBookiesIndexes(), excludedBookies);
replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses);
@@ -911,14 +911,14 @@ public void replicateLedgerFragment(LedgerHandle lh,
private void replicateLedgerFragment(LedgerHandle lh,
final LedgerFragment ledgerFragment,
- final Map targetBookieAddresses)
+ final Map targetBookieAddresses)
throws InterruptedException, BKException {
SyncCounter syncCounter = new SyncCounter();
ResultCallBack resultCallBack = new ResultCallBack(syncCounter);
SingleFragmentCallback cb = new SingleFragmentCallback(resultCallBack,
lh, ledgerFragment.getFirstEntryId(), getReplacedBookiesMap(ledgerFragment, targetBookieAddresses));
syncCounter.inc();
- Set targetBookieSet = new HashSet();
+ Set targetBookieSet = new HashSet();
targetBookieSet.addAll(targetBookieAddresses.values());
asyncRecoverLedgerFragment(lh, ledgerFragment, cb, targetBookieSet);
syncCounter.block(0);
@@ -927,34 +927,34 @@ private void replicateLedgerFragment(LedgerHandle lh,
}
}
- private static Map getReplacedBookiesMap(
- ArrayList ensemble,
- Map targetBookieAddresses) {
- Map bookiesMap =
- new HashMap();
- for (Map.Entry entry : targetBookieAddresses.entrySet()) {
- BookieSocketAddress oldBookie = ensemble.get(entry.getKey());
- BookieSocketAddress newBookie = entry.getValue();
+ private static Map getReplacedBookiesMap(
+ ArrayList ensemble,
+ Map targetBookieAddresses) {
+ Map bookiesMap =
+ new HashMap();
+ for (Map.Entry entry : targetBookieAddresses.entrySet()) {
+ BookieId oldBookie = ensemble.get(entry.getKey());
+ BookieId newBookie = entry.getValue();
bookiesMap.put(oldBookie, newBookie);
}
return bookiesMap;
}
- private static Map getReplacedBookiesMap(
+ private static Map getReplacedBookiesMap(
LedgerFragment ledgerFragment,
- Map targetBookieAddresses) {
- Map bookiesMap =
- new HashMap();
+ Map targetBookieAddresses) {
+ Map bookiesMap =
+ new HashMap();
for (Integer bookieIndex : ledgerFragment.getBookiesIndexes()) {
- BookieSocketAddress oldBookie = ledgerFragment.getAddress(bookieIndex);
- BookieSocketAddress newBookie = targetBookieAddresses.get(bookieIndex);
+ BookieId oldBookie = ledgerFragment.getAddress(bookieIndex);
+ BookieId newBookie = targetBookieAddresses.get(bookieIndex);
bookiesMap.put(oldBookie, newBookie);
}
return bookiesMap;
}
- private static boolean containBookies(ArrayList ensemble, Set bookies) {
- for (BookieSocketAddress bookie : ensemble) {
+ private static boolean containBookies(ArrayList ensemble, Set bookies) {
+ for (BookieId bookie : ensemble) {
if (bookies.contains(bookie)) {
return true;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieClusterManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieClusterManager.java
index 9c45841c..5679f9cb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieClusterManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieClusterManager.java
@@ -21,7 +21,7 @@
package org.apache.bookkeeper.client;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -74,15 +74,15 @@ long getLastTimestamp() {
protected final ServerConfiguration conf;
protected final BookKeeper bkc;
protected final long staleBookieIntervalInMs;
- protected final Map bookieStatuses =
- new HashMap();
+ protected final Map bookieStatuses =
+ new HashMap();
protected AtomicBoolean isStarted = new AtomicBoolean();
- protected Set registeredBookies = new HashSet();
- protected Set availableBookies = new HashSet();
- protected Set readOnlyBookies = new HashSet();
- protected Set staleBookies = new HashSet();
- protected Set activeBookies = new HashSet();
- protected Set lostBookies = new HashSet();
+ protected Set registeredBookies = new HashSet();
+ protected Set availableBookies = new HashSet();
+ protected Set readOnlyBookies = new HashSet();
+ protected Set staleBookies = new HashSet();
+ protected Set activeBookies = new HashSet();
+ protected Set lostBookies = new HashSet();
// stats for bookie cluster
private StatsLogger statsLogger;
@@ -119,17 +119,17 @@ public void start() throws BKException {
* So if you keep calling this function, ideally there should be fewer stale bookies fetched each time.
* @throws BKException
*/
- public Set fetchStaleBookies() throws BKException {
+ public Set fetchStaleBookies() throws BKException {
updateBookiesStatuses(this.availableBookies);
updateBookiesStatuses(this.readOnlyBookies);
- Set staleBookies = new HashSet<>();
- Set activeBookies = new HashSet<>();
+ Set staleBookies = new HashSet<>();
+ Set activeBookies = new HashSet<>();
long now = MathUtils.now();
synchronized (this) {
- Iterator> iter =
+ Iterator> iter =
bookieStatuses.entrySet().iterator();
while (iter.hasNext()) {
- Map.Entry entry = iter.next();
+ Map.Entry entry = iter.next();
long millisSinceLastSeen = now - entry.getValue().getLastTimestamp();
if (millisSinceLastSeen > staleBookieIntervalInMs) {
logger.info("Bookie {} (seen @ {}) become stale for {} ms, remove it.",
@@ -158,7 +158,7 @@ public void close() {
* @throws BKException
*/
private void fetchRegisteredBookies() throws BKException {
- Collection registeredBookies = this.bkc.bookieWatcher.getRegisteredBookies();
+ Collection registeredBookies = this.bkc.bookieWatcher.getRegisteredBookies();
logger.info("Fetch all registered bookies : {}", registeredBookies);
updateBookiesStatuses(registeredBookies);
updateBookies(this.registeredBookies, registeredBookies);
@@ -169,7 +169,7 @@ private void fetchRegisteredBookies() throws BKException {
* @throws BKException
*/
private void fetchAvailableBookies() throws BKException {
- Collection availableBookies = this.bkc.bookieWatcher.getAvailableBookies();
+ Collection availableBookies = this.bkc.bookieWatcher.getAvailableBookies();
logger.info("Fetch all available bookies: {}", availableBookies);
updateBookiesStatuses(availableBookies);
updateBookies(this.availableBookies, availableBookies);
@@ -180,7 +180,7 @@ private void fetchAvailableBookies() throws BKException {
* @throws BKException
*/
private void fetchReadOnlyBookies() throws BKException {
- Collection readOnlyBookies = this.bkc.bookieWatcher.getReadOnlyBookies();
+ Collection readOnlyBookies = this.bkc.bookieWatcher.getReadOnlyBookies();
logger.info("Fetch all readonly bookies: {}", readOnlyBookies);
updateBookiesStatuses(readOnlyBookies);
updateBookies(this.readOnlyBookies, readOnlyBookies);
@@ -192,8 +192,8 @@ private void fetchReadOnlyBookies() throws BKException {
*
* @param bookies bookies to update the lastUpdateTime
*/
- private synchronized void updateBookiesStatuses(Collection bookies) {
- for (BookieSocketAddress bookie : bookies) {
+ private synchronized void updateBookiesStatuses(Collection bookies) {
+ for (BookieId bookie : bookies) {
UpdateStatus bs = bookieStatuses.get(bookie);
if (null == bs) {
bs = new UpdateStatus();
@@ -210,38 +210,38 @@ private synchronized void updateBookiesStatuses(Collection
* @param oldBookies old bookies to be updated
* @param newBookies new bookies
*/
- private synchronized void updateBookies(Collection oldBookies,
- Collection newBookies) {
+ private synchronized void updateBookies(Collection oldBookies,
+ Collection newBookies) {
oldBookies.clear();
oldBookies.addAll(newBookies);
}
@Override
- public void availableBookiesChanged(Set bookies) {
+ public void availableBookiesChanged(Set bookies) {
updateBookiesStatuses(bookies);
updateBookies(this.availableBookies, bookies);
}
@Override
- public void readOnlyBookiesChanged(Set bookies) {
+ public void readOnlyBookiesChanged(Set bookies) {
updateBookiesStatuses(bookies);
updateBookies(this.readOnlyBookies, bookies);
}
- public void lostBookiesChanged(Set bookies) {
+ public void lostBookiesChanged(Set bookies) {
updateBookies(this.lostBookies, bookies);
}
- public Set getAvailableBookies() {
+ public Set getAvailableBookies() {
return availableBookies;
}
- public Set getReadOnlyBookies() {
+ public Set getReadOnlyBookies() {
return readOnlyBookies;
}
- public Set getActiveBookies() {
+ public Set getActiveBookies() {
return activeBookies;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index 445b25e7..4325be0f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -18,21 +18,14 @@
* limitations under the License.
*/
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.net.BookieIdToAddressMapping;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.DataFormats;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.util.ZkUtils;
@@ -44,10 +37,24 @@
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.bookkeeper.util.BookKeeperConstants.*;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
/**
* This class is responsible for maintaining a consistent view of what bookies
@@ -56,11 +63,11 @@
* replacement
*
*/
-class BookieWatcher implements Watcher, ChildrenCallback {
+public class BookieWatcher implements Watcher, ChildrenCallback, BookieIdToAddressMapping{
static final Logger logger = LoggerFactory.getLogger(BookieWatcher.class);
public static int ZK_CONNECT_BACKOFF_SEC = 1;
- private static final Set EMPTY_SET = new HashSet();
+ private static final Set EMPTY_SET = new HashSet();
// Bookie registration path in ZK
private final String bookieRegistrationPath;
@@ -78,6 +85,8 @@ public void safeRun() {
}
};
private final ReadOnlyBookieWatcher readOnlyBookieWatcher;
+ private final ConcurrentHashMap addressMapping =
+ new ConcurrentHashMap();
BookieWatcher(ClientConfiguration conf,
ScheduledExecutorService scheduler,
@@ -92,6 +101,15 @@ public void safeRun() {
readOnlyBookieWatcher = new ReadOnlyBookieWatcher(conf, bk);
}
+ @Override
+ public BookieSocketAddress getBookieAddress(BookieId bookieId) {
+ if (addressMapping.containsKey(bookieId)) {
+ return addressMapping.get(bookieId);
+ } else {
+ return bookieId.asBookieSocketAddress();
+ }
+ }
+
void registerBookiesListener(final BookiesListener listener) {
listeners.add(listener);
readOnlyBookieWatcher.registerBookiesListener(listener);
@@ -107,15 +125,15 @@ void unregisterBookiesListener(final BookiesListener listener) {
*
* @return the registered bookie list.
*/
- Collection getRegisteredBookies() throws BKException {
+ Collection getRegisteredBookies() throws BKException {
String cookiePath = bk.getConf().getZkLedgersRootPath() + "/"
+ BookKeeperConstants.COOKIE_NODE;
try {
List children = bk.getZkHandle().getChildren(cookiePath, false);
- List bookies = new ArrayList(children.size());
+ List bookies = new ArrayList(children.size());
for (String child : children) {
try {
- bookies.add(new BookieSocketAddress(child));
+ bookies.add(new BookieId(child));
} catch (IOException ioe) {
logger.error("Error parsing bookie address {} : ", child, ioe);
throw new BKException.ZKException();
@@ -132,11 +150,11 @@ Collection getRegisteredBookies() throws BKException {
}
}
- Collection getAvailableBookies() throws BKException {
+ Collection getAvailableBookies() throws BKException {
try {
List children = bk.getZkHandle().getChildren(this.bookieRegistrationPath, false);
children.remove(BookKeeperConstants.READONLY);
- return convertToBookieAddresses(children);
+ return convertToBookieIds(children);
} catch (KeeperException ke) {
if (ZkUtils.isRecoverableException(ke)) {
logger.info("Encountered recoverable zookeeper exception on getting bookie list : code = {}", ke.code());
@@ -152,7 +170,7 @@ Collection getAvailableBookies() throws BKException {
}
}
- Collection getReadOnlyBookies() throws BKException {
+ Collection getReadOnlyBookies() throws BKException {
try {
readOnlyBookieWatcher.readROBookiesBlocking();
} catch (KeeperException ke) {
@@ -169,7 +187,7 @@ Collection getReadOnlyBookies() throws BKException {
logger.error("Interrupted reading bookie list : ", ie);
throw new BKException.BKInterruptedException();
}
- return new HashSet(readOnlyBookieWatcher.getReadOnlyBookies());
+ return new HashSet(readOnlyBookieWatcher.getReadOnlyBookies());
}
private void readBookies() {
@@ -208,10 +226,34 @@ public void processResult(int rc, String path, Object ctx, List children
// available nodes list.
children.remove(READONLY);
- HashSet newBookieAddrs = convertToBookieAddresses(children);
+ HashSet newBookieAddrs = convertToBookieIds(children);
+ for (final BookieId bookieId: newBookieAddrs) {
+ if (!addressMapping.containsKey(bookieId)) {
+ bk.getZkHandle().getData(bookieRegistrationPath + "/" + bookieId.toString(),
+ false, new DataCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+ if (data != null && data.length > 0) {
+ try {
+ DataFormats.BookieFormat bookieFormat = DataFormats.BookieFormat.parseFrom(data);
+ BookieSocketAddress socketAddress = new BookieSocketAddress(
+ bookieId.getHostName(), bookieFormat.getWritePort(),
+ bookieFormat.getReadPort());
+ addressMapping.put(bookieId, socketAddress);
+ } catch (InvalidProtocolBufferException e) {
+ logger.warn("Invalid bookie format from zk node");
+ addressMapping.put(bookieId, bookieId.asBookieSocketAddress());
+ }
+ } else {
+ addressMapping.put(bookieId, bookieId.asBookieSocketAddress());
+ }
+ }
+ }, this);
+ }
+ }
synchronized (this) {
- Set readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies();
+ Set readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies();
placementPolicy.onClusterChanged(newBookieAddrs, readonlyBookies);
}
@@ -235,21 +277,19 @@ public void processResult(int rc, String path, Object ctx, List children
// }
}
- private static HashSet convertToBookieAddresses(List children) {
+ private static HashSet convertToBookieIds(List children) {
// Read the bookie addresses into a set for efficient lookup
- HashSet newBookieAddrs = new HashSet();
+ HashSet