Skip to content

Commit

Permalink
[CELEBORN-1589] Ensure master is leader for some POST request APIs
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Ensure that `excludeWorker`, `removeWorkersUnavailableInfo`, and `sendWorkerEvents` can only happen from Master leader node.

### Why are the changes needed?
prevent inconsistencies from peers

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
tested against a cluster

Closes apache#2730 from akpatnam25/CELEBORN-1589.

Authored-by: Aravind Patnam <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
  • Loading branch information
Aravind Patnam authored and SteNicholas committed Sep 12, 2024
1 parent 30f7da5 commit 5f02f3e
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 28 deletions.
5 changes: 5 additions & 0 deletions master/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-scala-scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.test-framework</groupId>
<artifactId>jersey-test-framework-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1364,7 +1364,7 @@ private[celeborn] class Master(
}
}

private def isMasterActive: Int = {
private[master] def isMasterActive: Int = {
// use int rather than bool for better monitoring on dashboard
val isActive =
if (conf.haEnabled) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.service.deploy.master.http.api

import javax.ws.rs.BadRequestException

import org.apache.celeborn.service.deploy.master.Master

object MasterHttpResourceUtils {

def ensureMasterIsLeader[T](master: Master)(f: => T): T = {
if (master.isMasterActive != 1) {
throw new BadRequestException(
s"This operation can only be done from a master that has the LEADER role." +
s" The master group info is: \n${master.getMasterGroupInfo}")
}
f
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import io.swagger.v3.oas.annotations.media.{Content, Schema}
import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag

import org.apache.celeborn.rest.v1.model.{ExcludeWorkerRequest, HandleResponse, RemoveWorkersUnavailableInfoRequest, SendWorkerEventRequest, WorkerEventData, WorkerEventInfoData, WorkerEventsResponse, WorkersResponse, WorkerTimestampData}
import org.apache.celeborn.rest.v1.model._
import org.apache.celeborn.server.common.http.api.ApiRequestContext
import org.apache.celeborn.server.common.http.api.v1.ApiUtils
import org.apache.celeborn.service.deploy.master.Master
import org.apache.celeborn.service.deploy.master.http.api.MasterHttpResourceUtils._

@Tag(name = "Worker")
@Produces(Array(MediaType.APPLICATION_JSON))
Expand Down Expand Up @@ -72,7 +73,7 @@ class WorkerResource extends ApiRequestContext {
"Excluded workers of the master add or remove the worker manually given worker id. The parameter add or remove specifies the excluded workers to add or remove.")
@POST
@Path("/exclude")
def excludeWorker(request: ExcludeWorkerRequest): HandleResponse = {
def excludeWorker(request: ExcludeWorkerRequest): HandleResponse = ensureMasterIsLeader(master) {
val (success, msg) = httpService.exclude(
request.getAdd.asScala.map(ApiUtils.toWorkerInfo).toSeq,
request.getRemove.asScala.map(ApiUtils.toWorkerInfo).toSeq)
Expand All @@ -87,11 +88,12 @@ class WorkerResource extends ApiRequestContext {
description = "Remove the workers unavailable info from the master.")
@POST
@Path("/remove_unavailable")
def removeWorkersUnavailableInfo(request: RemoveWorkersUnavailableInfoRequest): HandleResponse = {
val (success, msg) = master.removeWorkersUnavailableInfo(
request.getWorkers.asScala.map(ApiUtils.toWorkerInfo).toSeq)
new HandleResponse().success(success).message(msg)
}
def removeWorkersUnavailableInfo(request: RemoveWorkersUnavailableInfoRequest): HandleResponse =
ensureMasterIsLeader(master) {
val (success, msg) = master.removeWorkersUnavailableInfo(
request.getWorkers.asScala.map(ApiUtils.toWorkerInfo).toSeq)
new HandleResponse().success(success).message(msg)
}

@ApiResponse(
responseCode = "200",
Expand Down Expand Up @@ -123,24 +125,25 @@ class WorkerResource extends ApiRequestContext {
"For Master(Leader) can send worker event to manager workers. Legal types are 'None', 'Immediately', 'Decommission', 'DecommissionThenIdle', 'Graceful', 'Recommission'.")
@POST
@Path("/events")
def sendWorkerEvents(request: SendWorkerEventRequest): HandleResponse = {
if (request.getEventType == SendWorkerEventRequest.EventTypeEnum.NONE || request.getWorkers.isEmpty) {
throw new BadRequestException(
s"eventType(${request.getEventType}) and workers(${request.getWorkers}) are required")
}
val workers = request.getWorkers.asScala.map(ApiUtils.toWorkerInfo).toSeq
val (filteredWorkers, unknownWorkers) = workers.partition(statusSystem.workers.contains)
if (filteredWorkers.isEmpty) {
throw new BadRequestException(
s"None of the workers are known: ${unknownWorkers.map(_.readableAddress).mkString(", ")}")
}
val (success, msg) = httpService.handleWorkerEvent(request.getEventType.toString, workers)
val finalMsg =
if (unknownWorkers.isEmpty) {
msg
} else {
s"${msg}\n(Unknown workers: ${unknownWorkers.map(_.readableAddress).mkString(", ")})"
def sendWorkerEvents(request: SendWorkerEventRequest): HandleResponse =
ensureMasterIsLeader(master) {
if (request.getEventType == SendWorkerEventRequest.EventTypeEnum.NONE || request.getWorkers.isEmpty) {
throw new BadRequestException(
s"eventType(${request.getEventType}) and workers(${request.getWorkers}) are required")
}
new HandleResponse().success(success).message(finalMsg)
}
val workers = request.getWorkers.asScala.map(ApiUtils.toWorkerInfo).toSeq
val (filteredWorkers, unknownWorkers) = workers.partition(statusSystem.workers.contains)
if (filteredWorkers.isEmpty) {
throw new BadRequestException(
s"None of the workers are known: ${unknownWorkers.map(_.readableAddress).mkString(", ")}")
}
val (success, msg) = httpService.handleWorkerEvent(request.getEventType.toString, workers)
val finalMsg =
if (unknownWorkers.isEmpty) {
msg
} else {
s"${msg}\n(Unknown workers: ${unknownWorkers.map(_.readableAddress).mkString(", ")})"
}
new HandleResponse().success(success).message(finalMsg)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.service.deploy.master.http.api

import javax.ws.rs.BadRequestException

import org.mockito.MockitoSugar._

import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.service.deploy.master.Master

class TestMasterHttpResourceUtils extends CelebornFunSuite {

test("ensureMasterIsLeader") {
val mockMaster = mock[Master]
when(mockMaster.isMasterActive).thenReturn(0)
assertThrows[BadRequestException] {
MasterHttpResourceUtils.ensureMasterIsLeader(mockMaster) {
"operation failed"
}
}
}
}
3 changes: 2 additions & 1 deletion project/CelebornBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,8 @@ object CelebornMaster {
Dependencies.ratisMetricsDefault,
Dependencies.ratisNetty,
Dependencies.ratisServer,
Dependencies.ratisShell
Dependencies.ratisShell,
Dependencies.scalatestMockito % "test",
) ++ commonUnitTestDependencies
)
}
Expand Down

0 comments on commit 5f02f3e

Please sign in to comment.