Skip to content

Commit

Permalink
[CELEBORN-1577][Phase2] QuotaManager should support interrupt shuffle.
Browse files Browse the repository at this point in the history
  • Loading branch information
leixm committed Dec 3, 2024
1 parent 7102174 commit 940609e
Show file tree
Hide file tree
Showing 15 changed files with 962 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def estimatedPartitionSizeForEstimationUpdateInterval: Long =
get(ESTIMATED_PARTITION_SIZE_UPDATE_INTERVAL)
def masterResourceConsumptionInterval: Long = get(MASTER_RESOURCE_CONSUMPTION_INTERVAL)
def masterUserDiskUsageThreshold: Long = get(MASTER_USER_DISK_USAGE_THRESHOLD)
def masterClusterDiskUsageThreshold: Long = get(MASTER_CLUSTER_DISK_USAGE_THRESHOLD)
def clusterName: String = get(CLUSTER_NAME)

// //////////////////////////////////////////////////////
Expand Down Expand Up @@ -2928,6 +2930,26 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")

val MASTER_USER_DISK_USAGE_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.master.userResourceConsumption.user.threshold")
.categories("master")
.doc("When user resource consumption exceeds quota, Master will " +
"interrupt some apps until user resource consumption is less " +
"than this value. Default value is Long.MaxValue which means disable check.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val MASTER_CLUSTER_DISK_USAGE_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.master.userResourceConsumption.cluster.threshold")
.categories("master")
.doc("When cluster resource consumption exceeds quota, Master will " +
"interrupt some apps until cluster resource consumption is less " +
"than this value. Default value is Long.MaxValue which means disable check.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val CLUSTER_NAME: ConfigEntry[String] =
buildConf("celeborn.cluster.name")
.categories("master", "worker")
Expand Down Expand Up @@ -5380,7 +5402,7 @@ object CelebornConf extends Logging {
.dynamic
.doc("Quota dynamic configuration for written disk bytes.")
.version("0.5.0")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_DISK_FILE_COUNT: ConfigEntry[Long] =
Expand All @@ -5389,7 +5411,7 @@ object CelebornConf extends Logging {
.dynamic
.doc("Quota dynamic configuration for written disk file count.")
.version("0.5.0")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
Expand All @@ -5398,7 +5420,7 @@ object CelebornConf extends Logging {
.dynamic
.doc("Quota dynamic configuration for written hdfs bytes.")
.version("0.5.0")
.longConf
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_HDFS_FILE_COUNT: ConfigEntry[Long] =
Expand Down Expand Up @@ -6014,4 +6036,40 @@ object CelebornConf extends Logging {
.doubleConf
.checkValue(v => v > 0.0 && v <= 1.0, "Should be in (0.0, 1.0].")
.createWithDefault(1)

val QUOTA_CLUSTER_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.cluster.diskBytesWritten")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for cluster written disk bytes.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_CLUSTER_DISK_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.cluster.diskFileCount")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for cluster written disk file count.")
.version("0.6.0")
.longConf
.createWithDefault(Long.MaxValue)

val QUOTA_CLUSTER_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.cluster.hdfsBytesWritten")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for cluster written hdfs bytes.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_CLUSTER_HDFS_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.cluster.hdfsFileCount")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for cluster written hdfs file count.")
.version("0.6.0")
.longConf
.createWithDefault(Long.MaxValue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ case class ResourceConsumption(
hdfsFileCount: Long,
var subResourceConsumptions: util.Map[String, ResourceConsumption] = null) {

def withSubResourceConsumptions(
resourceConsumptions: util.Map[String, ResourceConsumption]): ResourceConsumption = {
subResourceConsumptions = resourceConsumptions
this
}

def add(other: ResourceConsumption): ResourceConsumption = {
ResourceConsumption(
diskBytesWritten + other.diskBytesWritten,
Expand All @@ -38,6 +44,14 @@ case class ResourceConsumption(
hdfsFileCount + other.hdfsFileCount)
}

def subtract(other: ResourceConsumption): ResourceConsumption = {
ResourceConsumption(
diskBytesWritten - other.diskBytesWritten,
diskFileCount - other.diskFileCount,
hdfsBytesWritten - other.hdfsBytesWritten,
hdfsFileCount - other.hdfsFileCount)
}

def addSubResourceConsumptions(otherSubResourceConsumptions: Map[
String,
ResourceConsumption]): Map[String, ResourceConsumption] = {
Expand Down Expand Up @@ -77,4 +91,11 @@ case class ResourceConsumption(
s" hdfsFileCount: $hdfsFileCount," +
s" subResourceConsumptions: $subResourceConsumptionString)"
}

def simpleString: String = {
s"ResourceConsumption(diskBytesWritten: ${Utils.bytesToString(diskBytesWritten)}," +
s" diskFileCount: $diskFileCount," +
s" hdfsBytesWritten: ${Utils.bytesToString(hdfsBytesWritten)}," +
s" hdfsFileCount: $hdfsFileCount)"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.celeborn.common.quota
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.util.Utils

case class Quota(
case class StorageQuota(
diskBytesWritten: Long,
diskFileCount: Long,
hdfsBytesWritten: Long,
Expand All @@ -34,3 +34,7 @@ case class Quota(
s"]"
}
}

object StorageQuota {
val DEFAULT_QUOTA = StorageQuota(Long.MaxValue, Long.MaxValue, Long.MaxValue, Long.MaxValue)
}
2 changes: 2 additions & 0 deletions docs/configuration/master.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ license: |
| celeborn.master.slot.assign.loadAware.numDiskGroups | 5 | false | This configuration is a guidance for load-aware slot allocation algorithm. This value is control how many disk groups will be created. | 0.3.0 | celeborn.slots.assign.loadAware.numDiskGroups |
| celeborn.master.slot.assign.maxWorkers | 10000 | false | Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`. | 0.3.1 | |
| celeborn.master.slot.assign.policy | ROUNDROBIN | false | Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. Loadaware policy will be ignored when `HDFS` is enabled in `celeborn.storage.availableTypes` | 0.3.0 | celeborn.slots.assign.policy |
| celeborn.master.userResourceConsumption.cluster.threshold | 9223372036854775807b | false | When cluster resource consumption exceeds quota, Master will interrupt some apps until cluster resource consumption is less than this value. Default value is Long.MaxValue which means disable check. | 0.6.0 | |
| celeborn.master.userResourceConsumption.update.interval | 30s | false | Time length for a window about compute user resource consumption. | 0.3.0 | |
| celeborn.master.userResourceConsumption.user.threshold | 9223372036854775807b | false | When user resource consumption exceeds quota, Master will interrupt some apps until user resource consumption is less than this value. Default value is Long.MaxValue which means disable check. | 0.6.0 | |
| celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | false | Worker unavailable info would be cleared when the retention period is expired. Set -1 to disable the expiration. | 0.3.1 | |
| celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service when Master side checks that there is no enough quota for current user. | 0.2.0 | |
| celeborn.redaction.regex | (?i)secret|password|token|access[.]key | false | Regex to decide which Celeborn configuration properties and environment variables in master and worker environments contain sensitive information. When this regex matches a property key or value, the value is redacted from the logging. | 0.5.0 | |
Expand Down
10 changes: 7 additions & 3 deletions docs/configuration/quota.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ license: |
<!--begin-include-->
| Key | Default | isDynamic | Description | Since | Deprecated |
| --- | ------- | --------- | ----------- | ----- | ---------- |
| celeborn.quota.cluster.diskBytesWritten | 9223372036854775807b | true | Quota dynamic configuration for cluster written disk bytes. | 0.6.0 | |
| celeborn.quota.cluster.diskFileCount | 9223372036854775807 | true | Quota dynamic configuration for cluster written disk file count. | 0.6.0 | |
| celeborn.quota.cluster.hdfsBytesWritten | 9223372036854775807b | true | Quota dynamic configuration for cluster written hdfs bytes. | 0.6.0 | |
| celeborn.quota.cluster.hdfsFileCount | 9223372036854775807 | true | Quota dynamic configuration for cluster written hdfs file count. | 0.6.0 | |
| celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service when Master side checks that there is no enough quota for current user. | 0.2.0 | |
| celeborn.quota.identity.provider | org.apache.celeborn.common.identity.DefaultIdentityProvider | false | IdentityProvider class name. Default class is `org.apache.celeborn.common.identity.DefaultIdentityProvider`. Optional values: org.apache.celeborn.common.identity.HadoopBasedIdentityProvider user name will be obtained by UserGroupInformation.getUserName; org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values. | 0.2.0 | |
| celeborn.quota.identity.user-specific.tenant | default | false | Tenant id if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
| celeborn.quota.identity.user-specific.userName | default | false | User name if celeborn.quota.identity.provider is org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
| celeborn.quota.interruptShuffle.enabled | false | false | Whether to enable interrupt shuffle when quota exceeds. | 0.6.0 | |
| celeborn.quota.tenant.diskBytesWritten | 9223372036854775807 | true | Quota dynamic configuration for written disk bytes. | 0.5.0 | |
| celeborn.quota.tenant.diskFileCount | 9223372036854775807 | true | Quota dynamic configuration for written disk file count. | 0.5.0 | |
| celeborn.quota.tenant.hdfsBytesWritten | 9223372036854775807 | true | Quota dynamic configuration for written hdfs bytes. | 0.5.0 | |
| celeborn.quota.tenant.diskBytesWritten | 9223372036854775807b | true | Quota dynamic configuration for written disk bytes. | 0.5.0 | |
| celeborn.quota.tenant.diskFileCount | 9223372036854775807b | true | Quota dynamic configuration for written disk file count. | 0.5.0 | |
| celeborn.quota.tenant.hdfsBytesWritten | 9223372036854775807b | true | Quota dynamic configuration for written hdfs bytes. | 0.5.0 | |
| celeborn.quota.tenant.hdfsFileCount | 9223372036854775807 | true | Quota dynamic configuration for written hdfs file count. | 0.5.0 | |
<!--end-include-->
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,12 @@ private[celeborn] class Master(
private val hasHDFSStorage = conf.hasHDFSStorage
private val hasS3Storage = conf.hasS3Storage

private val quotaManager = new QuotaManager(conf, configService)
private val quotaManager = new QuotaManager(
statusSystem,
masterSource,
resourceConsumptionSource,
conf,
configService)
private val tagsManager = new TagsManager(Option(configService))
private val masterResourceConsumptionInterval = conf.masterResourceConsumptionInterval
private val userResourceConsumptions =
Expand Down Expand Up @@ -1135,7 +1140,7 @@ private[celeborn] class Master(
new util.ArrayList[WorkerInfo](
(statusSystem.shutdownWorkers.asScala ++ statusSystem.decommissionWorkers.asScala).asJava),
new util.ArrayList(appRelatedShuffles),
CheckQuotaResponse(isAvailable = true, "")))
quotaManager.checkApplicationQuotaStatus(appId)))
} else {
context.reply(OneWayMessageResponse)
}
Expand All @@ -1151,78 +1156,11 @@ private[celeborn] class Master(
}
}

private def handleResourceConsumption(userIdentifier: UserIdentifier): ResourceConsumption = {
val userResourceConsumption = computeUserResourceConsumption(userIdentifier)
gaugeResourceConsumption(userIdentifier)
userResourceConsumption
}

private def gaugeResourceConsumption(
userIdentifier: UserIdentifier,
applicationId: String = null): Unit = {
val resourceConsumptionLabel =
if (applicationId == null) userIdentifier.toMap
else userIdentifier.toMap + (resourceConsumptionSource.applicationLabel -> applicationId)
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.DISK_FILE_COUNT,
resourceConsumptionLabel) { () =>
computeResourceConsumption(userIdentifier, applicationId).diskFileCount
}
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.DISK_BYTES_WRITTEN,
resourceConsumptionLabel) { () =>
computeResourceConsumption(userIdentifier, applicationId).diskBytesWritten
}
if (hasHDFSStorage) {
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.HDFS_FILE_COUNT,
resourceConsumptionLabel) { () =>
computeResourceConsumption(userIdentifier, applicationId).hdfsFileCount
}
resourceConsumptionSource.addGauge(
ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
resourceConsumptionLabel) { () =>
computeResourceConsumption(userIdentifier, applicationId).hdfsBytesWritten
}
}
}

private def computeResourceConsumption(
userIdentifier: UserIdentifier,
applicationId: String = null): ResourceConsumption = {
val newResourceConsumption = computeUserResourceConsumption(userIdentifier)
if (applicationId == null) {
val current = System.currentTimeMillis()
if (userResourceConsumptions.containsKey(userIdentifier)) {
val resourceConsumptionAndUpdateTime = userResourceConsumptions.get(userIdentifier)
if (current - resourceConsumptionAndUpdateTime._2 <= masterResourceConsumptionInterval) {
return resourceConsumptionAndUpdateTime._1
}
}
userResourceConsumptions.put(userIdentifier, (newResourceConsumption, current))
newResourceConsumption
} else {
newResourceConsumption.subResourceConsumptions.get(applicationId)
}
}

// TODO: Support calculate topN app resource consumption.
private def computeUserResourceConsumption(
userIdentifier: UserIdentifier): ResourceConsumption = {
val resourceConsumption = statusSystem.workersMap.values().asScala.flatMap {
workerInfo => workerInfo.userResourceConsumption.asScala.get(userIdentifier)
}.foldRight(ResourceConsumption(0, 0, 0, 0))(_ add _)
resourceConsumption
}

private[master] def handleCheckQuota(
userIdentifier: UserIdentifier,
context: RpcCallContext): Unit = {
val userResourceConsumption = handleResourceConsumption(userIdentifier)
if (conf.quotaEnabled) {
val (isAvailable, reason) =
quotaManager.checkQuotaSpaceAvailable(userIdentifier, userResourceConsumption)
context.reply(CheckQuotaResponse(isAvailable, reason))
context.reply(quotaManager.checkUserQuotaStatus(userIdentifier))
} else {
context.reply(CheckQuotaResponse(true, ""))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,6 @@ object MasterSource {
// Capacity
val DEVICE_CELEBORN_FREE_CAPACITY = "DeviceCelebornFreeBytes"
val DEVICE_CELEBORN_TOTAL_CAPACITY = "DeviceCelebornTotalBytes"

val UPDATE_RESOURCE_CONSUMPTION_TIME = "UpdateResourceConsumptionTime"
}
Loading

0 comments on commit 940609e

Please sign in to comment.