diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index e9c4cfa3b0a..aebe9d53cda 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -66,13 +66,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-metrics-cleaner") val roleLabel: (String, String) = "role" -> role - val instanceLabel: Map[String, String] = role match { - case Role.MASTER => - Map("instance" -> s"${Utils.localHostName(conf)}:${conf.masterHttpPort}") - case Role.WORKER => - Map("instance" -> s"${Utils.localHostName(conf)}:${conf.workerHttpPort}") - case _ => Map.empty - } + val instanceLabel: Map[String, String] = Option(Source.SOURCE_INSTANCE).map("instance" -> _).toMap val staticLabels: Map[String, String] = conf.metricsExtraLabels + roleLabel ++ instanceLabel val staticLabelsString: String = MetricLabels.labelString(staticLabels) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/Source.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/Source.scala index eff7cf744c8..c9bb12daae6 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/Source.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/Source.scala @@ -29,3 +29,7 @@ trait Source { def getMetrics: String def destroy(): Unit } + +object Source { + var SOURCE_INSTANCE: String = _ +} diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala index 848b3cabb63..9c835600892 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala @@ -19,9 +19,12 @@ package org.apache.celeborn.common.rpc import java.util.concurrent.ConcurrentMap import java.util.concurrent.atomic.AtomicLong + import scala.collection.JavaConverters._ + import com.codahale.metrics.{Histogram, UniformReservoir} import com.google.protobuf.GeneratedMessageV3 + import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.protocol.RpcNameConstants diff --git a/service/src/main/scala/org/apache/celeborn/server/common/Service.scala b/service/src/main/scala/org/apache/celeborn/server/common/Service.scala index 036ae6aefb9..4ff7c7f4ac5 100644 --- a/service/src/main/scala/org/apache/celeborn/server/common/Service.scala +++ b/service/src/main/scala/org/apache/celeborn/server/common/Service.scala @@ -20,6 +20,8 @@ package org.apache.celeborn.server.common import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.metrics.MetricsSystem +import org.apache.celeborn.common.metrics.source.Source +import org.apache.celeborn.common.util.Utils import org.apache.celeborn.server.common.service.config.{ConfigService, DynamicConfigServiceFactory} abstract class Service extends Logging { @@ -39,6 +41,15 @@ abstract class Service extends Logging { } def stop(exitKind: Int): Unit = {} + + // Set the metrics source instance for the service + serviceName match { + case Service.MASTER => + Source.SOURCE_INSTANCE = s"${Utils.localHostName(conf)}:${conf.masterHttpPort}" + case Service.WORKER => + Source.SOURCE_INSTANCE = s"${Utils.localHostName(conf)}:${conf.workerHttpPort}" + case _ => + } } object Service {