From a20fac16180e28a05e164cd740f2047550382390 Mon Sep 17 00:00:00 2001 From: Jianjian Date: Tue, 23 Jan 2024 15:26:19 -0800 Subject: [PATCH] add etcd membership manager only include active workers --- .../alluxio/membership/MembershipManager.java | 2 + .../alluxio/membership/MembershipType.java | 3 +- .../ServiceRegistryMembershipManager.java | 153 ++++++++++++++++++ .../membership/MembershipManagerTest.java | 65 ++++++++ 4 files changed, 222 insertions(+), 1 deletion(-) create mode 100644 dora/core/common/src/main/java/alluxio/membership/ServiceRegistryMembershipManager.java diff --git a/dora/core/common/src/main/java/alluxio/membership/MembershipManager.java b/dora/core/common/src/main/java/alluxio/membership/MembershipManager.java index 82192960cfc8..05075215afd5 100644 --- a/dora/core/common/src/main/java/alluxio/membership/MembershipManager.java +++ b/dora/core/common/src/main/java/alluxio/membership/MembershipManager.java @@ -125,6 +125,8 @@ public static MembershipManager create(AlluxioConfiguration conf) { return EtcdMembershipManager.create(conf); case MASTER: return MasterMembershipManager.create(); + case SERVICE_REGISTRY: + return ServiceRegistryMembershipManager.create(conf); default: throw new IllegalStateException("Unrecognized Membership Type"); } diff --git a/dora/core/common/src/main/java/alluxio/membership/MembershipType.java b/dora/core/common/src/main/java/alluxio/membership/MembershipType.java index abeddedc1f65..3a7fc49f1ec3 100644 --- a/dora/core/common/src/main/java/alluxio/membership/MembershipType.java +++ b/dora/core/common/src/main/java/alluxio/membership/MembershipType.java @@ -17,5 +17,6 @@ public enum MembershipType { STATIC, // Use a static file to configure a static member list for MembershipManager ETCD, // Use etcd for MembershipManager - MASTER // For regression purpose, still leverage Master for worker registration + MASTER, // For regression purpose, still leverage Master for worker registration + SERVICE_REGISTRY // Use SERVICE REGISTRY MembershipManager, which only use active workers } diff --git a/dora/core/common/src/main/java/alluxio/membership/ServiceRegistryMembershipManager.java b/dora/core/common/src/main/java/alluxio/membership/ServiceRegistryMembershipManager.java new file mode 100644 index 000000000000..8a7090343fc3 --- /dev/null +++ b/dora/core/common/src/main/java/alluxio/membership/ServiceRegistryMembershipManager.java @@ -0,0 +1,153 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.membership; + +import alluxio.conf.AlluxioConfiguration; +import alluxio.wire.WorkerInfo; +import alluxio.wire.WorkerState; + +import com.google.common.annotations.VisibleForTesting; +import com.google.gson.JsonParseException; +import io.etcd.jetcd.KeyValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; + +/** + * MembershipManager backed by configured etcd cluster. Only use active users + */ +public class ServiceRegistryMembershipManager implements MembershipManager { + private static final Logger LOG = LoggerFactory.getLogger(ServiceRegistryMembershipManager.class); + + private AlluxioEtcdClient mAlluxioEtcdClient; + + /** + * @param conf + * @return ServiceRegistryMembershipManager + */ + public static ServiceRegistryMembershipManager create(AlluxioConfiguration conf) { + return new ServiceRegistryMembershipManager(conf); + } + + /** + * CTOR for ServiceRegistryMembershipManager. + * @param conf + */ + public ServiceRegistryMembershipManager(AlluxioConfiguration conf) { + this(conf, AlluxioEtcdClient.getInstance(conf)); + } + + /** + * Default ServiceRegistryMembershipManager with given AlluxioEtcdClient client. + * Only contains live workers. + * + * @param conf Alluxio configuration + * @param alluxioEtcdClient etcd client + */ + public ServiceRegistryMembershipManager(AlluxioConfiguration conf, + AlluxioEtcdClient alluxioEtcdClient) { + mAlluxioEtcdClient = alluxioEtcdClient; + } + + @Override + public void join(WorkerInfo workerInfo) throws IOException { + LOG.info("Try joining Service Registry for worker:{} ", workerInfo); + WorkerServiceEntity entity = + new WorkerServiceEntity(workerInfo.getIdentity(), workerInfo.getAddress()); + mAlluxioEtcdClient.mServiceDiscovery.registerAndStartSync(entity); + LOG.info("register to service registry for worker:{} ", workerInfo); + } + + @Override + public WorkerClusterView getAllMembers() throws IOException { + return getLiveMembers(); + } + + @Override + public WorkerClusterView getLiveMembers() throws IOException { + Iterable workerInfoIterable = parseWorkersFromEtcdKvPairs( + mAlluxioEtcdClient.mServiceDiscovery.getAllLiveServices()) + .map(w -> new WorkerInfo() + .setIdentity(w.getIdentity()) + .setAddress(w.getWorkerNetAddress()) + .setState(WorkerState.LIVE)) + ::iterator; + return new WorkerClusterView(workerInfoIterable); + } + + @Override + public WorkerClusterView getFailedMembers() { + return new WorkerClusterView(Collections.emptyList()); + } + + private Stream parseWorkersFromEtcdKvPairs(List workerKvs) { + return workerKvs + .stream() + .map(this::parseWorkerServiceEntity) + .filter(Optional::isPresent) + .map(Optional::get); + } + + private Optional parseWorkerServiceEntity(KeyValue etcdKvPair) { + try { + WorkerServiceEntity entity = new WorkerServiceEntity(); + entity.deserialize(etcdKvPair.getValue().getBytes()); + return Optional.of(entity); + } catch (JsonParseException ex) { + return Optional.empty(); + } + } + + @Override + @VisibleForTesting + public String showAllMembers() { + try { + WorkerClusterView registeredWorkers = getAllMembers(); + String printFormat = "%s\t%s\t%s%n"; + StringBuilder sb = new StringBuilder( + String.format(printFormat, "WorkerId", "Address", "Status")); + for (WorkerInfo entity : registeredWorkers) { + String entryLine = String.format(printFormat, + entity.getIdentity(), + entity.getAddress().getHost() + ":" + + entity.getAddress().getRpcPort(), "ONLINE"); + sb.append(entryLine); + } + return sb.toString(); + } catch (IOException ex) { + return String.format("Exception happened:%s", ex.getMessage()); + } + } + + @Override + @VisibleForTesting + public void stopHeartBeat(WorkerInfo worker) throws IOException { + WorkerServiceEntity entity = new WorkerServiceEntity(worker.getIdentity(), worker.getAddress()); + mAlluxioEtcdClient.mServiceDiscovery.unregisterService(entity.getServiceEntityName()); + } + + @Override + public void decommission(WorkerInfo worker) throws IOException { + // NOOP since we only have active workers + } + + @Override + public void close() throws Exception { + // NOTHING TO CLOSE + // The EtcdClient is a singleton so its life cycle is managed by the class itself + } +} diff --git a/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java b/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java index 038bc17cb10b..08c680647f88 100644 --- a/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java +++ b/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java @@ -249,6 +249,71 @@ public void testEtcdMembership(MembershipManager membershipManager) throws Excep Assert.assertEquals(expectedLiveMembers, actualLiveMembers); } + @Test + public void testServiceRegistryMembershipManager() throws Exception { + Configuration.set(PropertyKey.WORKER_MEMBERSHIP_MANAGER_TYPE, MembershipType.SERVICE_REGISTRY); + Configuration.set(PropertyKey.ETCD_ENDPOINTS, getClientEndpoints()); + ServiceRegistryMembershipManager membershipManager = + new ServiceRegistryMembershipManager(Configuration.global(), getHealthyAlluxioEtcdClient()); + WorkerInfo wkr1 = new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.randomUuidBasedId()) + .setAddress(new WorkerNetAddress() + .setHost("worker1").setContainerHost("containerhostname1") + .setRpcPort(1000).setDataPort(1001).setWebPort(1011) + .setDomainSocketPath("/var/lib/domain.sock")); + WorkerInfo wkr2 = new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.randomUuidBasedId()) + .setAddress(new WorkerNetAddress() + .setHost("worker2").setContainerHost("containerhostname2") + .setRpcPort(2000).setDataPort(2001).setWebPort(2011) + .setDomainSocketPath("/var/lib/domain.sock")); + WorkerInfo wkr3 = new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.randomUuidBasedId()) + .setAddress(new WorkerNetAddress() + .setHost("worker3").setContainerHost("containerhostname3") + .setRpcPort(3000).setDataPort(3001).setWebPort(3011) + .setDomainSocketPath("/var/lib/domain.sock")); + membershipManager.join(wkr1); + membershipManager.join(wkr2); + membershipManager.join(wkr3); + List wkrs = new ArrayList<>(); + wkrs.add(new WorkerInfo(wkr1).setState(WorkerState.LIVE)); + wkrs.add(new WorkerInfo(wkr2).setState(WorkerState.LIVE)); + wkrs.add(new WorkerInfo(wkr3).setState(WorkerState.LIVE)); + List allMembers = membershipManager.getAllMembers().stream() + .sorted(Comparator.comparing(w -> w.getAddress().getHost())) + .collect(Collectors.toList()); + Assert.assertEquals(wkrs, allMembers); + List strs = getHealthyAlluxioEtcdClient().getChildren("/") + .stream().map(kv -> kv.getKey().toString(StandardCharsets.UTF_8)) + .collect(Collectors.toList()); + Assert.assertEquals(3, strs.size()); + for (String str : strs) { + Assert.assertTrue(str.contains("/ServiceDiscovery/DefaultAlluxioCluster/worker")); + } + membershipManager.stopHeartBeat(wkr2); + Configuration.set(PropertyKey.ETCD_ENDPOINTS, getClientEndpoints()); + CommonUtils.waitFor("Service's lease close and service key got deleted.", () -> { + try { + return membershipManager.getLiveMembers().size() == 2; + } catch (IOException e) { + throw new RuntimeException( + String.format("Unexpected error while getting failed members: %s", e)); + } + }, WaitForOptions.defaults().setTimeoutMs(TimeUnit.SECONDS.toMillis(10))); + Assert.assertTrue(Lists.newArrayList(membershipManager.getFailedMembers()).isEmpty()); + List actualLiveMembers = membershipManager.getLiveMembers().stream() + .sorted(Comparator.comparing(w -> w.getAddress().getHost())) + .collect(Collectors.toList()); + List expectedLiveMembers = new ArrayList<>(); + expectedLiveMembers.add(new WorkerInfo(wkr1).setState(WorkerState.LIVE)); + expectedLiveMembers.add(new WorkerInfo(wkr3).setState(WorkerState.LIVE)); + Assert.assertEquals(expectedLiveMembers, actualLiveMembers); + Assert.assertEquals(membershipManager.getAllMembers().stream() + .sorted(Comparator.comparing(w -> w.getAddress().getHost())) + .collect(Collectors.toList()), actualLiveMembers); + } + @Test public void testFlakyNetwork() throws Exception { Configuration.set(PropertyKey.WORKER_MEMBERSHIP_MANAGER_TYPE, MembershipType.ETCD);