Skip to content

Commit

Permalink
[CELEBORN-852] Adding new metrics to record the number of registered …
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Adding new metrics to record the number of registered connections

### Why are the changes needed?
Monitor the number of active connections on worker nodes

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
no

Closes apache#1773 from JQ-Cao/852.

Authored-by: caojiaqing <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
  • Loading branch information
JQ-Cao authored and zwangsheng committed Aug 4, 2023
1 parent 1e38560 commit 4e49105
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,14 @@ class FetchHandler(val conf: CelebornConf, val transportConf: TransportConf)

override def checkRegistered: Boolean = registered.get

/** Invoked when the channel associated with the given client is active. */
override def channelActive(client: TransportClient): Unit = {
workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT)
super.channelActive(client)
}

override def channelInactive(client: TransportClient): Unit = {
workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT, -1)
creditStreamManager.connectionTerminated(client.getChannel)
logDebug(s"channel inactive ${client.getSocketAddress}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,23 @@ class PushDataHandler extends BaseMessageHandler with Logging {
pushClientFactory.createClient(host, port, partitionId)
}
}

/**
* Invoked when the channel associated with the given client is active.
*/
override def channelActive(client: TransportClient): Unit = {
workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT)
super.channelActive(client)
}

/**
* Invoked when the channel associated with the given client is inactive.
* No further requests will come from this client.
*/
override def channelInactive(client: TransportClient): Unit = {
workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT, -1)
super.channelInactive(client)
}
}

object PushDataHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, MetricsSyste
addCounter(PUSH_DATA_HANDSHAKE_FAIL_COUNT)
addCounter(REGION_START_FAIL_COUNT)
addCounter(REGION_FINISH_FAIL_COUNT)
addCounter(ACTIVE_CONNECTION_COUNT)

// add Timers
addTimer(COMMIT_FILES_TIME)
Expand Down Expand Up @@ -94,6 +95,8 @@ object WorkerSource {
// slots
val SLOTS_ALLOCATED = "SlotsAllocated"

val ACTIVE_CONNECTION_COUNT = "ActiveConnectionCount"

// memory
val NETTY_MEMORY = "NettyMemory"
val SORT_TIME = "SortTime"
Expand Down

0 comments on commit 4e49105

Please sign in to comment.