Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1648] Refine AppUniqueId with UUID suffix #2810

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

chenkovsky
Copy link

What changes were proposed in this pull request?

We can add randomUUID as an suffix to solve it

Why are the changes needed?

currently, we cannot guarantee application id is really unique. this may lead to data issue.

Does this PR introduce any user-facing change?

No

How was this patch tested?

test locally

@@ -1305,4 +1305,8 @@ object Utils extends Logging {
(key, value)
}.asInstanceOf[Seq[(K, V)]]
}

def appUniqueIdWithUUID(appUiqueId: String): String = {
appUiqueId + "-" + UUID.randomUUID()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add config to allow user to add UUID suffix? IMO, not all cases should add UUID suffix. Right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to move appUniqueIdWithUUID(appUiqueId: String) to CelebornConf. Additionally, we can simplify the UUID by removing the dashes to create a shorter version, then we can use celebornConf.appUniqueIdWithUUIDSuffix

Suggested change
appUiqueId + "-" + UUID.randomUUID()
def appUniqueIdWithUUIDSuffix(appId: String): String = {
if (clientApplicationUUIDSuffixEnabled) {
appId + "-" + UUID.randomUUID().toString().replaceAll("-", "")
} else {
appId
}
}

@@ -76,7 +77,9 @@ public void registerJob(JobShuffleContext context) {
if (lifecycleManager == null) {
synchronized (RemoteShuffleMaster.class) {
if (lifecycleManager == null) {
celebornAppId = FlinkUtils.toCelebornAppId(lifecycleManagerTimestamp, jobID);
celebornAppId =
Utils.appUniqueIdWithUUID(
Copy link
Member

@SteNicholas SteNicholas Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does flink app id need to add UUID suffix?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you think there's no application id colision problem for flink?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. cc @reswqa, @RexXiong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. cc @reswqa, @RexXiong.

Currently Flink Appid is already random and concat with current timestamp, but I think it's fine if we add flag for UUID suffix

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't expect that flink app id(JobId + timestamp) might conflict. Did you actually suffer from this problem.

@@ -5055,6 +5056,14 @@ object CelebornConf extends Logging {
.checkValue(v => v > 0.0 && v <= 1.0, "Value must be between 0 and 1 (inclusive)")
.createWithDefault(0.4)

val CLIENT_APPLICATION_UUID: ConfigEntry[Boolean] =
buildConf("celeborn.client.application.uuid.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perfer celeborn.client.application.uuidSuffix.enabled

@@ -905,6 +905,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def clientExcludeReplicaOnFailureEnabled: Boolean =
get(CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED)
def clientMrMaxPushData: Long = get(CLIENT_MR_PUSH_DATA_MAX)
def clientApplicationUUID: Boolean = get(CLIENT_APPLICATION_UUID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clientApplicationUUID -> clientApplicationUUIDSuffixEnabled

@@ -1305,4 +1305,8 @@ object Utils extends Logging {
(key, value)
}.asInstanceOf[Seq[(K, V)]]
}

def appUniqueIdWithUUID(appUiqueId: String): String = {
appUiqueId + "-" + UUID.randomUUID()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to move appUniqueIdWithUUID(appUiqueId: String) to CelebornConf. Additionally, we can simplify the UUID by removing the dashes to create a shorter version, then we can use celebornConf.appUniqueIdWithUUIDSuffix

Suggested change
appUiqueId + "-" + UUID.randomUUID()
def appUniqueIdWithUUIDSuffix(appId: String): String = {
if (clientApplicationUUIDSuffixEnabled) {
appId + "-" + UUID.randomUUID().toString().replaceAll("-", "")
} else {
appId
}
}

buildConf("celeborn.client.application.uuid.enabled")
.categories("client")
.version("0.6.0")
.doc("When `true`, add uuid suffix to application id")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.doc("When `true`, add uuid suffix to application id")
.doc("Whether to add UUID suffix for application id for unique. When `true`, add UUID suffix for unique application id.")

@@ -76,12 +77,17 @@ public void registerJob(JobShuffleContext context) {
if (lifecycleManager == null) {
synchronized (RemoteShuffleMaster.class) {
if (lifecycleManager == null) {
celebornAppId = FlinkUtils.toCelebornAppId(lifecycleManagerTimestamp, jobID);
LOG.info("CelebornAppId: {}", celebornAppId);
String applicationId = FlinkUtils.toCelebornAppId(lifecycleManagerTimestamp, jobID);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this change. Meanwhile, remove line 87~89.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this change. Meanwhile, remove line 87~89.

ok, let's revert it.
should we describe that the flag is only for spark and mr in doc ?

@@ -85,7 +86,12 @@ public void registerJob(JobID jobID, TierShuffleHandler tierShuffleHandler) {
if (lifecycleManager == null) {
synchronized (CelebornTierMasterAgent.class) {
if (lifecycleManager == null) {
celebornAppId = FlinkUtils.toCelebornAppId(lifecycleManagerTimestamp, jobID);
String applicationId = FlinkUtils.toCelebornAppId(lifecycleManagerTimestamp, jobID);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants