Skip to content

Commit

Permalink
Add http server port in the worker net address
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

This PR enables registering the worker's HTTP server's port in the etcd. This helps to find worker's restful APIs from the Python client.

### Why are the changes needed?

Alluxio Python client (e.g. in ML use cases) needs to connect to the worker's REST APIs. But as the http server port isn't included in the worker's information in the etcd, the client fails to find the API endpoint.

### Does this PR introduce any user facing changes?

No.

			pr-link: #18499
			change-id: cid-1cf7e0bdc7cc0c9702949bc313de5583d9cc2fb8
  • Loading branch information
ChunxuTang authored Jan 31, 2024
1 parent 055b375 commit 262b2b3
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,19 @@ public void join(WorkerInfo workerInfo) throws IOException {
"Existing WorkerServiceEntity for path:%s corrupted",
pathOnRing));
}
throw new AlreadyExistsException(
String.format("Some other member with same id registered on the ring, bail."
+ "Conflicting worker addr:%s, worker identity:%s."
+ "Different workers can't assume same worker identity in non-k8s env,"
+ "clean local worker identity settings to continue.",
existingEntity.get().getWorkerNetAddress().toString(),
existingEntity.get().getIdentity()));
if (existingEntity.get().equalsIgnoringOptionalFields(entity)) {
// Same entity but potentially with new optional fields,
// update the original etcd-stored worker information
mAlluxioEtcdClient.createForPath(pathOnRing, Optional.of(serializedEntity));
} else {
throw new AlreadyExistsException(
String.format("Some other member with same id registered on the ring, bail."
+ "Conflicting worker addr:%s, worker identity:%s."
+ "Different workers can't assume same worker identity in non-k8s env,"
+ "clean local worker identity settings to continue.",
existingEntity.get().getWorkerNetAddress().toString(),
existingEntity.get().getIdentity()));
}
}
}
} catch (InterruptedException | ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ private static List<WorkerInfo> parseWorkerAddresses(
.setContainerHost(Configuration.global()
.getOrDefault(PropertyKey.WORKER_CONTAINER_HOSTNAME, ""))
.setRpcPort(conf.getInt(PropertyKey.WORKER_RPC_PORT))
.setWebPort(conf.getInt(PropertyKey.WORKER_WEB_PORT));
.setWebPort(conf.getInt(PropertyKey.WORKER_WEB_PORT))
.setHttpServerPort(conf.getInt(PropertyKey.WORKER_HTTP_SERVER_PORT));
//data port, these are initialized from configuration for client to deduce the
//workeraddr related info, on worker side, it will be corrected by join().
InetSocketAddress inetAddr;
Expand Down Expand Up @@ -126,7 +127,8 @@ public void join(WorkerInfo worker) throws IOException {
.setDomainSocketPath(addr.getDomainSocketPath())
.setNettyDataPort(addr.getNettyDataPort())
.setWebPort(addr.getWebPort())
.setSecureRpcPort(addr.getSecureRpcPort()));
.setSecureRpcPort(addr.getSecureRpcPort())
.setHttpServerPort(addr.getHttpServerPort()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,35 @@ public void deserialize(byte[] buf) {
.create();
gson.fromJson(new InputStreamReader(new ByteArrayInputStream(buf)), WorkerServiceEntity.class);
}

/**
* A customized equality comparison which ignores optional fields such as mHttpServerPort in the
* WorkerNetAddress.
*
* @param o The object to be compared with this WorkerServiceEntity for equality
* @return true if the specified object is equal to this WorkerServiceEntity by ignoring optional
* fields; false otherwise
*/
public boolean equalsIgnoringOptionalFields(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WorkerServiceEntity anotherO = (WorkerServiceEntity) o;
return mIdentity.equals(anotherO.mIdentity)
&& getServiceEntityName().equals(anotherO.getServiceEntityName())
&& equalsIgnoringHttpServerPort(anotherO.getWorkerNetAddress());
}

private boolean equalsIgnoringHttpServerPort(WorkerNetAddress other) {
return mAddress.getHost().equals(other.getHost())
&& mAddress.getContainerHost().equals(other.getContainerHost())
&& mAddress.getSecureRpcPort() == other.getSecureRpcPort()
&& mAddress.getRpcPort() == other.getRpcPort()
&& mAddress.getDataPort() == other.getDataPort()
&& mAddress.getWebPort() == other.getWebPort()
&& mAddress.getDomainSocketPath().equals(other.getDomainSocketPath());
}
}
29 changes: 27 additions & 2 deletions dora/core/common/src/main/java/alluxio/wire/WorkerNetAddress.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public final class WorkerNetAddress implements Serializable {
@Expose
@com.google.gson.annotations.SerializedName("DomainSocketPath")
private String mDomainSocketPath = "";
@Expose
@com.google.gson.annotations.SerializedName("HttpServerPort")
// Optional field - skipped in the customized equality comparison
private int mHttpServerPort;

/**
* Creates a new instance of {@link WorkerNetAddress}.
Expand All @@ -77,6 +81,7 @@ public WorkerNetAddress(WorkerNetAddress copyFrom) {
mNettyDataPort = copyFrom.mNettyDataPort;
mWebPort = copyFrom.mWebPort;
mDomainSocketPath = copyFrom.mDomainSocketPath;
mHttpServerPort = copyFrom.mHttpServerPort;
}

/**
Expand Down Expand Up @@ -143,6 +148,14 @@ public String getDomainSocketPath() {
return mDomainSocketPath;
}

/**
* @return the http server port
*/
@ApiModelProperty(value = "Port of the worker's http server for rest apis")
public int getHttpServerPort() {
return mHttpServerPort;
}

/**
* @param host the host to use
* @return the worker net address
Expand Down Expand Up @@ -217,6 +230,15 @@ public WorkerNetAddress setDomainSocketPath(String domainSocketPath) {
return this;
}

/**
* @param httpServerPort the http server port to use
* @return the worker net address
*/
public WorkerNetAddress setHttpServerPort(int httpServerPort) {
mHttpServerPort = httpServerPort;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -232,13 +254,14 @@ public boolean equals(Object o) {
&& mRpcPort == that.mRpcPort
&& mDataPort == that.mDataPort
&& mWebPort == that.mWebPort
&& mDomainSocketPath.equals(that.mDomainSocketPath);
&& mDomainSocketPath.equals(that.mDomainSocketPath)
&& mHttpServerPort == that.mHttpServerPort;
}

@Override
public int hashCode() {
return Objects.hashCode(mSecureRpcPort, mHost, mContainerHost, mDataPort, mRpcPort, mWebPort,
mDomainSocketPath);
mDomainSocketPath, mHttpServerPort);
}

/**
Expand All @@ -254,6 +277,7 @@ public String dumpMainInfo() {
.add("dataPort", mDataPort)
.add("webPort", mWebPort)
.add("domainSocketPath", mDomainSocketPath)
.add("httpServerPort", mHttpServerPort)
.toString();
}

Expand All @@ -267,6 +291,7 @@ public String toString() {
.add("webPort", mWebPort)
.add("domainSocketPath", mDomainSocketPath)
.add("secureRpcPort", mSecureRpcPort)
.add("httpServerPort", mHttpServerPort)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,32 @@ public void testSerializationWorkerServiceEntity() throws Exception {
deserialized.deserialize(jsonBytes);
Assert.assertEquals(deserialized, entity);
}

@Test
public void testEqualsIgnoringOptionalFields() throws Exception {
final WorkerNetAddress workerNetAddress1 = new WorkerNetAddress()
.setHost("worker1").setContainerHost("containerhostname1")
.setRpcPort(1000).setDataPort(1001).setWebPort(1011)
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(1021);
final WorkerNetAddress workerNetAddress2 = new WorkerNetAddress()
.setHost("worker1").setContainerHost("containerhostname1")
.setRpcPort(1000).setDataPort(1001).setWebPort(1011)
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(2021);
final WorkerNetAddress workerNetAddress3 = new WorkerNetAddress()
.setHost("worker3").setContainerHost("containerhostname1")
.setRpcPort(1000).setDataPort(1001).setWebPort(1011)
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(1021);
final WorkerIdentity identity = WorkerIdentity.fromProto(
alluxio.grpc.WorkerIdentity.newBuilder()
.setIdentifier(ByteString.copyFrom(Longs.toByteArray(1L)))
.setVersion(0)
.build());
WorkerServiceEntity entity1 = new WorkerServiceEntity(identity, workerNetAddress1);
WorkerServiceEntity entity2 = new WorkerServiceEntity(identity, workerNetAddress2);
WorkerServiceEntity entity3 = new WorkerServiceEntity(identity, workerNetAddress3);

Assert.assertTrue(entity1.equalsIgnoringOptionalFields(entity2));
Assert.assertFalse(entity2.equalsIgnoringOptionalFields(entity3));
Assert.assertFalse(entity3.equalsIgnoringOptionalFields(entity1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public void copyConstructor() throws IllegalAccessException {
.setNettyDataPort(1)
.setSecureRpcPort(1)
.setWebPort(1)
.setDomainSocketPath("path");
.setDomainSocketPath("path")
.setHttpServerPort(1);
WorkerNetAddress copied = new WorkerNetAddress(original);
// copied instance should contain exactly the same content
checkEquality(original, copied);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,9 @@ public WorkerNetAddress getAddress() {
if (mNettyDataTransmissionEnable) {
workerNetAddress.setNettyDataPort(getNettyDataLocalPort());
}
if (Configuration.getBoolean(PropertyKey.WORKER_HTTP_SERVER_ENABLED)) {
workerNetAddress.setHttpServerPort(Configuration.getInt(PropertyKey.WORKER_HTTP_SERVER_PORT));
}
return workerNetAddress;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,19 +201,19 @@ public void testEtcdMembership(MembershipManager membershipManager) throws Excep
.setAddress(new WorkerNetAddress()
.setHost("worker1").setContainerHost("containerhostname1")
.setRpcPort(1000).setDataPort(1001).setWebPort(1011)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(1021));
WorkerInfo wkr2 = new WorkerInfo()
.setIdentity(WorkerIdentityTestUtils.randomUuidBasedId())
.setAddress(new WorkerNetAddress()
.setHost("worker2").setContainerHost("containerhostname2")
.setRpcPort(2000).setDataPort(2001).setWebPort(2011)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(2021));
WorkerInfo wkr3 = new WorkerInfo()
.setIdentity(WorkerIdentityTestUtils.randomUuidBasedId())
.setAddress(new WorkerNetAddress()
.setHost("worker3").setContainerHost("containerhostname3")
.setRpcPort(3000).setDataPort(3001).setWebPort(3011)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(3031));
membershipManager.join(wkr1);
membershipManager.join(wkr2);
membershipManager.join(wkr3);
Expand Down Expand Up @@ -330,13 +330,13 @@ public void testFlakyNetwork() throws Exception {
.setAddress(new WorkerNetAddress()
.setHost("worker-1").setContainerHost("containerhostname1")
.setRpcPort(29999).setDataPort(29997).setWebPort(30000)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(30001));
WorkerInfo wkr2 = new WorkerInfo()
.setIdentity(WorkerIdentityTestUtils.randomUuidBasedId())
.setAddress(new WorkerNetAddress()
.setHost("worker-2").setContainerHost("containerhostname2")
.setRpcPort(29999).setDataPort(29997).setWebPort(30000)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(30001));
membershipManager.join(wkr1);
membershipManager.join(wkr2);
CommonUtils.waitFor("Workers joined",
Expand Down Expand Up @@ -396,19 +396,19 @@ public void testStaticMembership() throws Exception {
.setAddress(new WorkerNetAddress()
.setHost("worker1").setContainerHost("containerhostname1")
.setRpcPort(1000).setDataPort(1001).setWebPort(1011)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(1021));
WorkerInfo wkr2 = new WorkerInfo()
.setIdentity(WorkerIdentityTestUtils.randomUuidBasedId())
.setAddress(new WorkerNetAddress()
.setHost("worker2").setContainerHost("containerhostname2")
.setRpcPort(2000).setDataPort(2001).setWebPort(2011)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(2021));
WorkerInfo wkr3 = new WorkerInfo()
.setIdentity(WorkerIdentityTestUtils.randomUuidBasedId())
.setAddress(new WorkerNetAddress()
.setHost("worker3").setContainerHost("containerhostname3")
.setRpcPort(3000).setDataPort(3001).setWebPort(3011)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(3021));
membershipManager.join(wkr1);
membershipManager.join(wkr2);
membershipManager.join(wkr3);
Expand Down Expand Up @@ -437,13 +437,13 @@ public void testSameWorkerIdentityConflict() throws Exception {
.setAddress(new WorkerNetAddress()
.setHost("worker1").setContainerHost("containerhostname1")
.setRpcPort(1000).setDataPort(1001).setWebPort(1011)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(1021));
WorkerInfo wkr2 = new WorkerInfo()
.setIdentity(workerIdentity1)
.setAddress(new WorkerNetAddress()
.setHost("worker2").setContainerHost("containerhostname2")
.setRpcPort(2000).setDataPort(2001).setWebPort(2011)
.setDomainSocketPath("/var/lib/domain.sock"));
.setDomainSocketPath("/var/lib/domain.sock").setHttpServerPort(2021));
membershipManager.join(wkr1);
// bring wrk1 down and join wrk2 with a same worker identity.
membershipManager.stopHeartBeat(wkr1);
Expand Down Expand Up @@ -471,4 +471,44 @@ public void testSameWorkerIdentityConflict() throws Exception {
Assert.assertTrue(curWorkerInfo.isPresent());
Assert.assertEquals(wkr2.getAddress(), curWorkerInfo.get().getAddress());
}

@Test
public void testOptionalHttpPortChangeInWorkerAddress() throws Exception {
final MembershipManager membershipManager = getHealthyEtcdMemberMgr();
Assert.assertTrue(membershipManager instanceof EtcdMembershipManager);
// join without http server ports
WorkerIdentity workerIdentity = WorkerIdentityTestUtils.randomUuidBasedId();
WorkerNetAddress workerNetAddress = new WorkerNetAddress()
.setHost("worker1").setContainerHost("containerhostname1")
.setRpcPort(1000).setDataPort(1001).setWebPort(1011)
.setDomainSocketPath("/var/lib/domain.sock");
WorkerInfo wkr = new WorkerInfo()
.setIdentity(workerIdentity)
.setAddress(workerNetAddress);
membershipManager.join(wkr);
Optional<WorkerInfo> curWorkerInfo = membershipManager.getLiveMembers()
.getWorkerById(workerIdentity);
Assert.assertTrue(curWorkerInfo.isPresent());
membershipManager.stopHeartBeat(wkr);
CommonUtils.waitFor("wkr is not alive.", () -> {
try {
return membershipManager.getFailedMembers().getWorkerById(workerIdentity).isPresent();
} catch (IOException e) {
// IGNORE
return false;
}
}, WaitForOptions.defaults().setTimeoutMs(5000));

// set the http server port and rejoin
workerNetAddress.setHttpServerPort(1021);
membershipManager.join(wkr);
// check if the worker is rejoined and information updated
WorkerClusterView allMembers = membershipManager.getAllMembers();
Assert.assertEquals(1, allMembers.size());
curWorkerInfo = membershipManager.getLiveMembers().getWorkerById(workerIdentity);
Assert.assertTrue(curWorkerInfo.isPresent());
Assert.assertEquals(wkr.getAddress(), curWorkerInfo.get().getAddress());
Assert.assertEquals(wkr.getAddress().getHttpServerPort(),
curWorkerInfo.get().getAddress().getHttpServerPort());
}
}

0 comments on commit 262b2b3

Please sign in to comment.