From ff94a201d9114dfafaba464ccacca9c5f079aaa1 Mon Sep 17 00:00:00 2001 From: Chunxu Tang Date: Tue, 30 Jan 2024 20:20:35 -0800 Subject: [PATCH] Use customized equality to compare etcd worker identifiers --- .../membership/EtcdMembershipManager.java | 5 ++- .../membership/WorkerServiceEntity.java | 23 +++++++++++- .../java/alluxio/wire/WorkerNetAddress.java | 33 ++++++++++++++--- .../alluxio/wire/WorkerNetAddressTest.java | 37 +++++++++++++++++++ 4 files changed, 89 insertions(+), 9 deletions(-) diff --git a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java index 8c2ebf7e4b47..9766a8bebc9f 100644 --- a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java +++ b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java @@ -138,8 +138,9 @@ public void join(WorkerInfo workerInfo) throws IOException { "Existing WorkerServiceEntity for path:%s corrupted", pathOnRing)); } - if (existingEntity.get().equals(entity)) { - // Same entity, update the original etcd-stored worker information + if (existingEntity.get().customizedEquals(entity)) { + // Same entity but potentially with new optional fields, + // update the original etcd-stored worker information mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity)); } else { throw new AlreadyExistsException( diff --git a/dora/core/common/src/main/java/alluxio/membership/WorkerServiceEntity.java b/dora/core/common/src/main/java/alluxio/membership/WorkerServiceEntity.java index d2933ac7e879..0f01a540d02f 100644 --- a/dora/core/common/src/main/java/alluxio/membership/WorkerServiceEntity.java +++ b/dora/core/common/src/main/java/alluxio/membership/WorkerServiceEntity.java @@ -108,8 +108,7 @@ public boolean equals(Object o) { // only the service name and the identity are the factors that define // what a WorkerServiceEntity is. Any other is either ephemeral or supplementary data. return mIdentity.equals(anotherO.mIdentity) - && getServiceEntityName().equals(anotherO.getServiceEntityName()) - && mAddress.equals(anotherO.getWorkerNetAddress()); + && getServiceEntityName().equals(anotherO.getServiceEntityName()); } @Override @@ -127,4 +126,24 @@ public void deserialize(byte[] buf) { .create(); gson.fromJson(new InputStreamReader(new ByteArrayInputStream(buf)), WorkerServiceEntity.class); } + + /** + * A customized equality comparison which uses the customized comparison of WorkerNetAddress + * objects. + * + * @param o The object to be compared with this WorkerServiceEntity for customized equality + * @return true if the specified object is equal to this WorkerServiceEntity; false otherwise + */ + public boolean customizedEquals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WorkerServiceEntity anotherO = (WorkerServiceEntity) o; + return mIdentity.equals(anotherO.mIdentity) + && getServiceEntityName().equals(anotherO.getServiceEntityName()) + && mAddress.customizedEquals(anotherO.getWorkerNetAddress()); + } } diff --git a/dora/core/common/src/main/java/alluxio/wire/WorkerNetAddress.java b/dora/core/common/src/main/java/alluxio/wire/WorkerNetAddress.java index d4920fedc0fe..9e3bbfbe47b0 100644 --- a/dora/core/common/src/main/java/alluxio/wire/WorkerNetAddress.java +++ b/dora/core/common/src/main/java/alluxio/wire/WorkerNetAddress.java @@ -58,7 +58,7 @@ public final class WorkerNetAddress implements Serializable { private String mDomainSocketPath = ""; @Expose @com.google.gson.annotations.SerializedName("HttpServerPort") - // Optional field - skipped in the equality comparison + // Optional field - skipped in the customized equality comparison private int mHttpServerPort; /** @@ -254,15 +254,38 @@ public boolean equals(Object o) { && mRpcPort == that.mRpcPort && mDataPort == that.mDataPort && mWebPort == that.mWebPort - && mDomainSocketPath.equals(that.mDomainSocketPath); - // Skip the comparison of mHttpServerPort for backward compatibility + && mDomainSocketPath.equals(that.mDomainSocketPath) + && mHttpServerPort == that.mHttpServerPort; } @Override public int hashCode() { - // Skip the mHttpServerPort for backward compatibility return Objects.hashCode(mSecureRpcPort, mHost, mContainerHost, mDataPort, mRpcPort, mWebPort, - mDomainSocketPath); + mDomainSocketPath, mHttpServerPort); + } + + /** + * A customized equality comparison which skips the comparison of optional fields, including + * mHttpServerPort, for backward compatibility. + * + * @param o The object to be compared with this WorkerNetAddress for customized equality + * @return true if the specified object is equal to this WorkerNetAddress; false otherwise + */ + public boolean customizedEquals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof WorkerNetAddress)) { + return false; + } + WorkerNetAddress that = (WorkerNetAddress) o; + return mHost.equals(that.mHost) + && mContainerHost.equals(that.mContainerHost) + && mSecureRpcPort == that.mSecureRpcPort + && mRpcPort == that.mRpcPort + && mDataPort == that.mDataPort + && mWebPort == that.mWebPort + && mDomainSocketPath.equals(that.mDomainSocketPath); } /** diff --git a/dora/core/common/src/test/java/alluxio/wire/WorkerNetAddressTest.java b/dora/core/common/src/test/java/alluxio/wire/WorkerNetAddressTest.java index 99314a6b44c3..bb25fb239d0a 100644 --- a/dora/core/common/src/test/java/alluxio/wire/WorkerNetAddressTest.java +++ b/dora/core/common/src/test/java/alluxio/wire/WorkerNetAddressTest.java @@ -72,6 +72,43 @@ public void copyConstructor() throws IllegalAccessException { } } + @Test + public void testCustomizedEquals() { + WorkerNetAddress workerNetAddress1 = new WorkerNetAddress() + .setHost("host") + .setContainerHost("container") + .setRpcPort(1) + .setDataPort(1) + .setNettyDataPort(1) + .setSecureRpcPort(1) + .setWebPort(1) + .setDomainSocketPath("path") + .setHttpServerPort(1); + WorkerNetAddress workerNetAddress2 = new WorkerNetAddress() + .setHost("host") + .setContainerHost("container") + .setRpcPort(1) + .setDataPort(1) + .setNettyDataPort(1) + .setSecureRpcPort(1) + .setWebPort(1) + .setDomainSocketPath("path") + .setHttpServerPort(2); + WorkerNetAddress workerNetAddress3 = new WorkerNetAddress() + .setHost("host2") + .setContainerHost("container") + .setRpcPort(1) + .setDataPort(1) + .setNettyDataPort(1) + .setSecureRpcPort(1) + .setWebPort(1) + .setDomainSocketPath("path") + .setHttpServerPort(1); + Assert.assertTrue(workerNetAddress1.customizedEquals(workerNetAddress2)); + Assert.assertFalse(workerNetAddress1.customizedEquals(workerNetAddress3)); + Assert.assertFalse(workerNetAddress2.customizedEquals(workerNetAddress3)); + } + public void checkEquality(WorkerNetAddress a, WorkerNetAddress b) { Assert.assertEquals(a.getHost(), b.getHost()); Assert.assertEquals(a.getRpcPort(), b.getRpcPort());