Skip to content

Commit

Permalink
worker event type
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Sep 21, 2024
1 parent 578bd5d commit 5f6d35c
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,11 @@ object ControlMessages extends Logging {
object WorkerEventRequest {
def apply(
workers: util.List[WorkerInfo],
eventType: String,
eventType: WorkerEventType,
requestId: String): PbWorkerEventRequest =
PbWorkerEventRequest.newBuilder()
.setRequestId(requestId)
.setWorkerEventType(WorkerEventType.valueOf(eventType))
.setWorkerEventType(eventType)
.addAllWorkers(workers.asScala.map { workerInfo =>
PbSerDeUtils.toPbWorkerInfo(workerInfo, true, false)
}.toList.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,7 @@ private[celeborn] class Master(
}

override def handleWorkerEvent(
workerEventType: String,
workerEventType: WorkerEventType,
workers: Seq[WorkerInfo]): HandleResponse = {
val sb = new StringBuilder()
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.swagger.v3.oas.annotations.tags.Tag
import org.apache.commons.lang3.StringUtils

import org.apache.celeborn.common.meta.WorkerInfo
import org.apache.celeborn.common.protocol.WorkerEventType
import org.apache.celeborn.server.common.http.api.ApiRequestContext

@Tag(name = "Deprecated")
Expand Down Expand Up @@ -136,7 +137,9 @@ class ApiMasterResource extends ApiRequestContext {
}
sb.append("============================ Handle Worker Event =============================\n")
val workerList = workers.split(",").filter(_.nonEmpty).map(WorkerInfo.fromUniqueId)
sb.append(httpService.handleWorkerEvent(normalizeParam(eventType), workerList)._2)
sb.append(httpService.handleWorkerEvent(
WorkerEventType.valueOf(normalizeParam(eventType)),
workerList)._2)
sb.toString()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,15 @@ class WorkerResource extends ApiRequestContext {
new HandleResponse().success(success).message(finalMsg)
}

private def toWorkerEventType(enum: EventTypeEnum): String = {
private def toWorkerEventType(enum: EventTypeEnum): WorkerEventType = {
enum match {
case EventTypeEnum.NONE => WorkerEventType.None.toString
case EventTypeEnum.IMMEDIATELY => WorkerEventType.Immediately.toString
case EventTypeEnum.DECOMMISSION => WorkerEventType.Decommission.toString
case EventTypeEnum.DECOMMISSIONTHENIDLE => WorkerEventType.DecommissionThenIdle.toString
case EventTypeEnum.GRACEFUL => WorkerEventType.Graceful.toString
case EventTypeEnum.RECOMMISSION => WorkerEventType.Recommission.toString
case _ => WorkerEventType.UNRECOGNIZED.toString
case EventTypeEnum.NONE => WorkerEventType.None
case EventTypeEnum.IMMEDIATELY => WorkerEventType.Immediately
case EventTypeEnum.DECOMMISSION => WorkerEventType.Decommission
case EventTypeEnum.DECOMMISSIONTHENIDLE => WorkerEventType.DecommissionThenIdle
case EventTypeEnum.GRACEFUL => WorkerEventType.Graceful
case EventTypeEnum.RECOMMISSION => WorkerEventType.Recommission
case _ => WorkerEventType.UNRECOGNIZED
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.eclipse.jetty.servlet.FilterHolder
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.WorkerInfo
import org.apache.celeborn.common.protocol.TransportModuleConstants
import org.apache.celeborn.common.protocol.{TransportModuleConstants, WorkerEventType}
import org.apache.celeborn.common.util.Utils
import org.apache.celeborn.server.common.http.HttpServer
import org.apache.celeborn.server.common.http.api.ApiRootResource
Expand Down Expand Up @@ -187,7 +187,9 @@ abstract class HttpService extends Service with Logging {

def exit(exitType: String): String = throw new UnsupportedOperationException()

def handleWorkerEvent(workerEventType: String, workers: Seq[WorkerInfo]): HandleResponse =
def handleWorkerEvent(
workerEventType: WorkerEventType,
workers: Seq[WorkerInfo]): HandleResponse =
throw new UnsupportedOperationException()

def getWorkerEventInfo(): String = throw new UnsupportedOperationException()
Expand Down

0 comments on commit 5f6d35c

Please sign in to comment.