Skip to content

Commit

Permalink
add etcd membership manager only include active workers
Browse files Browse the repository at this point in the history
  • Loading branch information
jja725 committed Jan 25, 2024
1 parent 97f325a commit 849f3a6
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ public static MembershipManager create(AlluxioConfiguration conf) {
return EtcdMembershipManager.create(conf);
case MASTER:
return MasterMembershipManager.create();
case SERVICE_REGISTRY:
return ServiceRegistryMembershipManager.create(conf);
default:
throw new IllegalStateException("Unrecognized Membership Type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
public enum MembershipType {
STATIC, // Use a static file to configure a static member list for MembershipManager
ETCD, // Use etcd for MembershipManager
MASTER // For regression purpose, still leverage Master for worker registration
MASTER, // For regression purpose, still leverage Master for worker registration
SERVICE_REGISTRY // Use SERVICE REGISTRY MembershipManager, which only use active workers
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.membership;

import alluxio.conf.AlluxioConfiguration;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerState;

import com.google.common.annotations.VisibleForTesting;
import com.google.gson.JsonParseException;
import io.etcd.jetcd.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

/**
* MembershipManager backed by configured etcd cluster. Only use active users
*/
public class ServiceRegistryMembershipManager implements MembershipManager {
private static final Logger LOG = LoggerFactory.getLogger(ServiceRegistryMembershipManager.class);

private AlluxioEtcdClient mAlluxioEtcdClient;

/**
* @param conf
* @return ServiceRegistryMembershipManager
*/
public static ServiceRegistryMembershipManager create(AlluxioConfiguration conf) {
return new ServiceRegistryMembershipManager(conf);
}

/**
* CTOR for ServiceRegistryMembershipManager.
* @param conf
*/
public ServiceRegistryMembershipManager(AlluxioConfiguration conf) {
this(conf, AlluxioEtcdClient.getInstance(conf));
}

/**
* Default ServiceRegistryMembershipManager with given AlluxioEtcdClient client.
* Only contains live workers.
*
* @param conf Alluxio configuration
* @param alluxioEtcdClient etcd client
*/
public ServiceRegistryMembershipManager(AlluxioConfiguration conf,
AlluxioEtcdClient alluxioEtcdClient) {
mAlluxioEtcdClient = alluxioEtcdClient;
}

@Override
public void join(WorkerInfo workerInfo) throws IOException {
LOG.info("Try joining Service Registry for worker:{} ", workerInfo);
WorkerServiceEntity entity =
new WorkerServiceEntity(workerInfo.getIdentity(), workerInfo.getAddress());
mAlluxioEtcdClient.mServiceDiscovery.registerAndStartSync(entity);
LOG.info("register to service registry for worker:{} ", workerInfo);
}

@Override
public WorkerClusterView getAllMembers() throws IOException {
return getLiveMembers();
}

@Override
public WorkerClusterView getLiveMembers() throws IOException {
Iterable<WorkerInfo> workerInfoIterable = parseWorkersFromEtcdKvPairs(
mAlluxioEtcdClient.mServiceDiscovery.getAllLiveServices())
.map(w -> new WorkerInfo()
.setIdentity(w.getIdentity())
.setAddress(w.getWorkerNetAddress())
.setState(WorkerState.LIVE))
::iterator;
return new WorkerClusterView(workerInfoIterable);
}

@Override
public WorkerClusterView getFailedMembers() {
return new WorkerClusterView(Collections.emptyList());
}

private Stream<WorkerServiceEntity> parseWorkersFromEtcdKvPairs(List<KeyValue> workerKvs) {
return workerKvs
.stream()
.map(this::parseWorkerServiceEntity)
.filter(Optional::isPresent)
.map(Optional::get);
}

private Optional<WorkerServiceEntity> parseWorkerServiceEntity(KeyValue etcdKvPair) {
try {
WorkerServiceEntity entity = new WorkerServiceEntity();
entity.deserialize(etcdKvPair.getValue().getBytes());
return Optional.of(entity);
} catch (JsonParseException ex) {
return Optional.empty();
}
}

@Override
@VisibleForTesting
public String showAllMembers() {
try {
WorkerClusterView registeredWorkers = getAllMembers();
String printFormat = "%s\t%s\t%s%n";
StringBuilder sb = new StringBuilder(
String.format(printFormat, "WorkerId", "Address", "Status"));
for (WorkerInfo entity : registeredWorkers) {
String entryLine = String.format(printFormat,
entity.getIdentity(),
entity.getAddress().getHost() + ":"
+ entity.getAddress().getRpcPort(), "ONLINE");
sb.append(entryLine);
}
return sb.toString();
} catch (IOException ex) {
return String.format("Exception happened:%s", ex.getMessage());
}
}

@Override
@VisibleForTesting
public void stopHeartBeat(WorkerInfo worker) throws IOException {
WorkerServiceEntity entity = new WorkerServiceEntity(worker.getIdentity(), worker.getAddress());
mAlluxioEtcdClient.mServiceDiscovery.unregisterService(entity.getServiceEntityName());
}

@Override
public void decommission(WorkerInfo worker) throws IOException {
// NOOP since we only have active workers
}

@Override
public void close() throws Exception {
// NOTHING TO CLOSE
// The EtcdClient is a singleton so its life cycle is managed by the class itself
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand Down Expand Up @@ -249,6 +250,75 @@ public void testEtcdMembership(MembershipManager membershipManager) throws Excep
Assert.assertEquals(expectedLiveMembers, actualLiveMembers);
}

@Test
public void testServiceRegistryMembershipManager() throws Exception {
Configuration.set(PropertyKey.WORKER_MEMBERSHIP_MANAGER_TYPE, MembershipType.SERVICE_REGISTRY);
Configuration.set(PropertyKey.ETCD_ENDPOINTS, getClientEndpoints());
AlluxioEtcdClient client = getHealthyAlluxioEtcdClient();
ServiceRegistryMembershipManager membershipManager =
new ServiceRegistryMembershipManager(Configuration.global(), client);
WorkerInfo wkr1 = new WorkerInfo()
.setIdentity(WorkerIdentityTestUtils.randomUuidBasedId())
.setAddress(new WorkerNetAddress()
.setHost("worker1").setContainerHost("containerhostname1")
.setRpcPort(1000).setDataPort(1001).setWebPort(1011)
.setDomainSocketPath("/var/lib/domain.sock"));
WorkerInfo wkr2 = new WorkerInfo()
.setIdentity(WorkerIdentityTestUtils.randomUuidBasedId())
.setAddress(new WorkerNetAddress()
.setHost("worker2").setContainerHost("containerhostname2")
.setRpcPort(2000).setDataPort(2001).setWebPort(2011)
.setDomainSocketPath("/var/lib/domain.sock"));
WorkerInfo wkr3 = new WorkerInfo()
.setIdentity(WorkerIdentityTestUtils.randomUuidBasedId())
.setAddress(new WorkerNetAddress()
.setHost("worker3").setContainerHost("containerhostname3")
.setRpcPort(3000).setDataPort(3001).setWebPort(3011)
.setDomainSocketPath("/var/lib/domain.sock"));
membershipManager.join(wkr1);
membershipManager.join(wkr2);
membershipManager.join(wkr3);
List<WorkerInfo> wkrs = new ArrayList<>();
wkrs.add(new WorkerInfo(wkr1).setState(WorkerState.LIVE));
wkrs.add(new WorkerInfo(wkr2).setState(WorkerState.LIVE));
wkrs.add(new WorkerInfo(wkr3).setState(WorkerState.LIVE));
List<WorkerInfo> allMembers = membershipManager.getAllMembers().stream()
.sorted(Comparator.comparing(w -> w.getAddress().getHost()))
.collect(Collectors.toList());
Assert.assertEquals(wkrs, allMembers);
List<String> strs =
client.getChildren("/").stream().map(kv -> kv.getKey().toString(StandardCharsets.UTF_8))
.collect(Collectors.toList());
Assert.assertEquals(3, strs.size());
for (String str : strs) {
Assert.assertTrue(str.contains("/ServiceDiscovery/DefaultAlluxioCluster/worker"));
}
membershipManager.stopHeartBeat(wkr2);
Configuration.set(PropertyKey.ETCD_ENDPOINTS, getClientEndpoints());
CommonUtils.waitFor("Service's lease close and service key got deleted.", () -> {
try {
return membershipManager.getLiveMembers().size() == 2;
} catch (IOException e) {
throw new RuntimeException(
String.format("Unexpected error while getting failed members: %s", e));
}
}, WaitForOptions.defaults().setTimeoutMs(TimeUnit.SECONDS.toMillis(10)));
Assert.assertTrue(Lists.newArrayList(membershipManager.getFailedMembers()).isEmpty());
List<WorkerInfo> actualLiveMembers = membershipManager.getLiveMembers().stream()
.sorted(Comparator.comparing(w -> w.getAddress().getHost()))
.collect(Collectors.toList());
List<WorkerInfo> expectedLiveMembers = new ArrayList<>();
expectedLiveMembers.add(new WorkerInfo(wkr1).setState(WorkerState.LIVE));
expectedLiveMembers.add(new WorkerInfo(wkr3).setState(WorkerState.LIVE));
Assert.assertEquals(expectedLiveMembers, actualLiveMembers);
Assert.assertEquals(membershipManager.getAllMembers().stream()
.sorted(Comparator.comparing(w -> w.getAddress().getHost()))
.collect(Collectors.toList()), actualLiveMembers);
}

// ignore due to flaky already exist address exception. This test only passes when it is the
// first test to run.
@Ignore
@Test
public void testFlakyNetwork() throws Exception {
Configuration.set(PropertyKey.WORKER_MEMBERSHIP_MANAGER_TYPE, MembershipType.ETCD);
Expand Down

0 comments on commit 849f3a6

Please sign in to comment.