From e59fe84f0f627540978fbe4178b7694f9c4dae59 Mon Sep 17 00:00:00 2001 From: caojiaqing Date: Fri, 28 Jul 2023 11:45:23 +0800 Subject: [PATCH] [CELEBORN-852] Adding new metrics to record the number of registered connections --- .../common/metrics/source/AbstractSource.scala | 14 ++++++++++++++ .../celeborn/common/metrics/source/Source.scala | 1 + .../service/deploy/worker/FetchHandler.scala | 7 +++++++ .../service/deploy/worker/PushDataHandler.scala | 17 +++++++++++++++++ .../service/deploy/worker/WorkerSource.scala | 3 +++ 5 files changed, 42 insertions(+) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index 21fd68d3368..1fce83df0aa 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -257,6 +257,20 @@ abstract class AbstractSource(conf: CelebornConf, role: String) } } + override def decCounter(metricsName: String, decV: Long = 1): Unit = { + decCounter(metricsName, decV, Map.empty[String, String]) + } + + def decCounter(metricsName: String, incV: Long, labels: Map[String, String]): Unit = { + val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, labels) + val counter = namedCounters.get(metricNameWithLabel) + if (counter != null) { + counter.counter.dec(incV) + } else { + logWarning(s"Metric $metricNameWithLabel not found!") + } + } + private def clearOldValues(map: ConcurrentHashMap[String, Long]): Unit = { if (map.size > 5000) { // remove values has existed more than 15 min diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/Source.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/Source.scala index eff7cf744c8..f8bc7ecf31b 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/Source.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/Source.scala @@ -26,6 +26,7 @@ trait Source { def startTimer(metricsName: String, key: String): Unit def stopTimer(metricsName: String, key: String): Unit def incCounter(metricsName: String, incV: Long): Unit + def decCounter(metricsName: String, decV: Long) def getMetrics: String def destroy(): Unit } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala index 9b4d08f8c25..0560b198be8 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala @@ -265,7 +265,14 @@ class FetchHandler(val conf: CelebornConf, val transportConf: TransportConf) override def checkRegistered: Boolean = registered.get + /** Invoked when the channel associated with the given client is active. */ + override def channelActive(client: TransportClient): Unit = { + workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT) + super.channelActive(client) + } + override def channelInactive(client: TransportClient): Unit = { + workerSource.decCounter(WorkerSource.ACTIVE_CONNECTION_COUNT) creditStreamManager.connectionTerminated(client.getChannel) logDebug(s"channel inactive ${client.getSocketAddress}") } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index f93d46d60a7..9befcc6f143 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -1100,6 +1100,23 @@ class PushDataHandler extends BaseMessageHandler with Logging { pushClientFactory.createClient(host, port, partitionId) } } + + /** + * Invoked when the channel associated with the given client is active. + */ + override def channelActive(client: TransportClient): Unit = { + workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT) + super.channelActive(client) + } + + /** + * Invoked when the channel associated with the given client is inactive. + * No further requests will come from this client. + */ + override def channelInactive(client: TransportClient): Unit = { + workerSource.decCounter(WorkerSource.ACTIVE_CONNECTION_COUNT) + super.channelInactive(client) + } } object PushDataHandler { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index 87c755517c7..edcebbb2e08 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -36,6 +36,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, MetricsSyste addCounter(PUSH_DATA_HANDSHAKE_FAIL_COUNT) addCounter(REGION_START_FAIL_COUNT) addCounter(REGION_FINISH_FAIL_COUNT) + addCounter(ACTIVE_CONNECTION_COUNT) // add Timers addTimer(COMMIT_FILES_TIME) @@ -94,6 +95,8 @@ object WorkerSource { // slots val SLOTS_ALLOCATED = "SlotsAllocated" + val ACTIVE_CONNECTION_COUNT = "ActiveConnectionCount" + // memory val NETTY_MEMORY = "NettyMemory" val SORT_TIME = "SortTime"