Skip to content

Commit

Permalink
[CELEBORN-852] Adding new metrics to record the number of registered …
Browse files Browse the repository at this point in the history
…connections
  • Loading branch information
JQ-Cao committed Jul 28, 2023
1 parent 5f0295e commit e59fe84
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,20 @@ abstract class AbstractSource(conf: CelebornConf, role: String)
}
}

override def decCounter(metricsName: String, decV: Long = 1): Unit = {
decCounter(metricsName, decV, Map.empty[String, String])
}

def decCounter(metricsName: String, incV: Long, labels: Map[String, String]): Unit = {
val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, labels)
val counter = namedCounters.get(metricNameWithLabel)
if (counter != null) {
counter.counter.dec(incV)
} else {
logWarning(s"Metric $metricNameWithLabel not found!")
}
}

private def clearOldValues(map: ConcurrentHashMap[String, Long]): Unit = {
if (map.size > 5000) {
// remove values has existed more than 15 min
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ trait Source {
def startTimer(metricsName: String, key: String): Unit
def stopTimer(metricsName: String, key: String): Unit
def incCounter(metricsName: String, incV: Long): Unit
def decCounter(metricsName: String, decV: Long)
def getMetrics: String
def destroy(): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,14 @@ class FetchHandler(val conf: CelebornConf, val transportConf: TransportConf)

override def checkRegistered: Boolean = registered.get

/** Invoked when the channel associated with the given client is active. */
override def channelActive(client: TransportClient): Unit = {
workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT)
super.channelActive(client)
}

override def channelInactive(client: TransportClient): Unit = {
workerSource.decCounter(WorkerSource.ACTIVE_CONNECTION_COUNT)
creditStreamManager.connectionTerminated(client.getChannel)
logDebug(s"channel inactive ${client.getSocketAddress}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,23 @@ class PushDataHandler extends BaseMessageHandler with Logging {
pushClientFactory.createClient(host, port, partitionId)
}
}

/**
* Invoked when the channel associated with the given client is active.
*/
override def channelActive(client: TransportClient): Unit = {
workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT)
super.channelActive(client)
}

/**
* Invoked when the channel associated with the given client is inactive.
* No further requests will come from this client.
*/
override def channelInactive(client: TransportClient): Unit = {
workerSource.decCounter(WorkerSource.ACTIVE_CONNECTION_COUNT)
super.channelInactive(client)
}
}

object PushDataHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, MetricsSyste
addCounter(PUSH_DATA_HANDSHAKE_FAIL_COUNT)
addCounter(REGION_START_FAIL_COUNT)
addCounter(REGION_FINISH_FAIL_COUNT)
addCounter(ACTIVE_CONNECTION_COUNT)

// add Timers
addTimer(COMMIT_FILES_TIME)
Expand Down Expand Up @@ -94,6 +95,8 @@ object WorkerSource {
// slots
val SLOTS_ALLOCATED = "SlotsAllocated"

val ACTIVE_CONNECTION_COUNT = "ActiveConnectionCount"

// memory
val NETTY_MEMORY = "NettyMemory"
val SORT_TIME = "SortTime"
Expand Down

0 comments on commit e59fe84

Please sign in to comment.