Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1630] Support to apply ratis peer operation with RESTful api #2804

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def haMasterRatisClientRpcWatchTimeout: Long = get(HA_MASTER_RATIS_CLIENT_RPC_WATCH_TIMEOUT)
def haMasterRatisFirstElectionTimeoutMin: Long = get(HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MIN)
def haMasterRatisFirstElectionTimeoutMax: Long = get(HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MAX)
def hasMasterRatisLeaderElectionMemeberMajorityAdd: Boolean =
get(HA_MASTER_RATIS_LEADER_ELECTION_MEMBER_MAJORITY_ADD)
def haMasterRatisNotificationNoLeaderTimeout: Long =
get(HA_MASTER_RATIS_NOTIFICATION_NO_LEADER_TIMEOUT)
def haMasterRatisRpcSlownessTimeout: Long = get(HA_MASTER_RATIS_RPC_SLOWNESS_TIMEOUT)
Expand Down Expand Up @@ -2655,6 +2657,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("5s")

val HA_MASTER_RATIS_LEADER_ELECTION_MEMBER_MAJORITY_ADD: ConfigEntry[Boolean] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe default as true is more reasonable then false, and do we need guarantee this value should be true? seems majority-add default is false in ratis.

buildConf("celeborn.master.ha.ratis.leader.election.member.majority.add")
.internal
.categories("ha")
.version("0.6.0")
.booleanConf
.createWithDefault(false)

val HA_MASTER_RATIS_NOTIFICATION_NO_LEADER_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.raft.server.notification.no-leader.timeout")
.withAlternative("celeborn.ha.master.ratis.raft.server.notification.no-leader.timeout")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,10 @@ private RaftProperties newRaftProperties(CelebornConf conf, RpcType rpc) {
RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMin(properties, firstElectionTimeoutMin);
RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMax(properties, firstElectionTimeoutMax);

boolean leaderElectionMemberMajorityAdd = conf.hasMasterRatisLeaderElectionMemeberMajorityAdd();
RaftServerConfigKeys.LeaderElection.setMemberMajorityAdd(
properties, leaderElectionMemberMajorityAdd);

// Set the rpc client timeout
TimeDuration clientRpcTimeout =
TimeDuration.valueOf(conf.haMasterRatisClientRpcTimeout(), TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@

package org.apache.celeborn.service.deploy.master.http.api.v1

import javax.ws.rs.{Consumes, Path, POST, Produces}
import javax.ws.rs.{BadRequestException, Consumes, Path, POST, Produces}
import javax.ws.rs.core.MediaType

import scala.collection.JavaConverters._

import io.swagger.v3.oas.annotations.media.{Content, Schema}
import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag
import org.apache.ratis.protocol.{LeaderElectionManagementRequest, RaftPeerId, TransferLeadershipRequest}
import org.apache.ratis.protocol.{LeaderElectionManagementRequest, RaftClientReply, RaftPeer, SetConfigurationRequest, TransferLeadershipRequest}
import org.apache.ratis.rpc.CallId

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.rest.v1.model.{HandleResponse, RatisElectionTransferRequest}
import org.apache.celeborn.rest.v1.model.{HandleResponse, RatisElectionTransferRequest, RatisPeerAddRequest, RatisPeerRemoveRequest, RatisPeerSetPriorityRequest}
import org.apache.celeborn.server.common.http.api.ApiRequestContext
import org.apache.celeborn.service.deploy.master.Master
import org.apache.celeborn.service.deploy.master.clustermeta.ha.{HAMasterMetaManager, HARaftServer}
Expand Down Expand Up @@ -92,8 +93,129 @@ class RatisResource extends ApiRequestContext with Logging {
applyElectionOp(new LeaderElectionManagementRequest.Resume)
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON,
schema = new Schema(implementation = classOf[HandleResponse]))),
description = "Add new peers to the raft group.")
@POST
@Path("/peer/add")
def peerAdd(request: RatisPeerAddRequest): HandleResponse =
ensureLeaderElectionMemberMajorityAddEnabled(master) {
if (request.getPeers.isEmpty) {
throw new BadRequestException("No peers specified.")
}

val groupInfo = ratisServer.getGroupInfo

val remaining = getRaftPeers()
val adding = request.getPeers.asScala.map { peer =>
if (remaining.exists(e =>
e.getId.toString == peer.getId || e.getAddress == peer.getAddress)) {
throw new IllegalArgumentException(
s"Peer $peer with same id or address already exists in group $groupInfo.")
}
RaftPeer.newBuilder()
.setId(peer.getId)
.setAddress(peer.getAddress)
.setPriority(0)
.build()
}

val peers = (remaining ++ adding).distinct

logInfo(s"Adding peers: $adding to group $groupInfo.")
logInfo(s"New peers: $peers")

val reply = setConfiguration(peers)
if (reply.isSuccess) {
new HandleResponse().success(true).message(
s"Successfully added peers $adding to group $groupInfo.")
} else {
new HandleResponse().success(false).message(
s"Failed to add peers $adding to group $groupInfo. $reply")
}
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON,
schema = new Schema(implementation = classOf[HandleResponse]))),
description = "Remove peers from the raft group.")
@POST
@Path("/peer/remove")
def peerRemove(request: RatisPeerRemoveRequest): HandleResponse =
ensureLeaderElectionMemberMajorityAddEnabled(master) {
if (request.getPeers.isEmpty) {
throw new BadRequestException("No peers specified.")
}

val groupInfo = ratisServer.getGroupInfo

val removing = request.getPeers.asScala.map { peer =>
getRaftPeers().find { raftPeer =>
raftPeer.getId.toString == peer.getId && raftPeer.getAddress == peer.getAddress
}.getOrElse(throw new IllegalArgumentException(
s"Peer $peer not found in group $groupInfo."))
}
val remaining = getRaftPeers().filterNot(removing.contains)

logInfo(s"Removing peers:$removing from group $groupInfo.")
logInfo(s"New peers: $remaining")

val reply = setConfiguration(remaining)
if (reply.isSuccess) {
new HandleResponse().success(true).message(
s"Successfully removed peers $removing from group $groupInfo.")
} else {
new HandleResponse().success(false).message(
s"Failed to remove peers $removing from group $groupInfo. $reply")
}
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON,
schema = new Schema(implementation = classOf[HandleResponse]))),
description = "Set the priority of the peers in the raft group.")
@POST
@Path("/peer/set_priority")
def peerSetPriority(request: RatisPeerSetPriorityRequest): HandleResponse =
ensureLeaderElectionMemberMajorityAddEnabled(master) {
if (request.getAddressPriorities.isEmpty) {
throw new BadRequestException("No peer priorities specified.")
}

val peers = getRaftPeers().map { peer =>
val newPriority = request.getAddressPriorities.get(peer.getAddress)
val priority: Int = if (newPriority != null) newPriority else peer.getPriority
RaftPeer.newBuilder(peer).setPriority(priority).build()
}

val peerPriorities =
request.getAddressPriorities.asScala.map { case (a, p) => s"$a:$p" }.mkString(", ")
logInfo(s"Setting peer priorities: $peerPriorities.")
logInfo(s"New peers: $peers")

val reply = setConfiguration(peers)
if (reply.isSuccess) {
new HandleResponse().success(true).message(
s"Successfully set peer priorities: $peerPriorities.")
} else {
new HandleResponse().success(false).message(
s"Failed to set peer priorities: $peerPriorities. $reply")
}
}

private def transferLeadership(peerAddress: String): HandleResponse = {
val newLeaderId = Option(peerAddress).map(getRaftPeerId).orNull
val newLeaderId = Option(peerAddress).map { addr =>
getRaftPeers().find(_.getAddress == addr).map(_.getId).getOrElse(
throw new IllegalArgumentException(
s"Peer $addr not found in group ${ratisServer.getGroupInfo}."))
}.orNull
val op =
if (newLeaderId == null) s"step down leader ${ratisServer.getLocalAddress}"
else s"transfer leadership from ${ratisServer.getLocalAddress} to $peerAddress"
Expand Down Expand Up @@ -129,11 +251,26 @@ class RatisResource extends ApiRequestContext with Logging {
}
}

private def getRaftPeerId(peerAddress: String): RaftPeerId = {
val groupInfo =
master.statusSystem.asInstanceOf[HAMasterMetaManager].getRatisServer.getGroupInfo
groupInfo.getCommitInfos.asScala.filter(peer => peer.getServer.getAddress == peerAddress)
.map(peer => RaftPeerId.valueOf(peer.getServer.getId)).headOption.getOrElse(
throw new IllegalArgumentException(s"Peer $peerAddress not found in group: $groupInfo"))
private def setConfiguration(peers: Seq[RaftPeer]): RaftClientReply = {
ratisServer.getServer.setConfiguration(new SetConfigurationRequest(
ratisServer.getClientId,
ratisServer.getServer.getId,
ratisServer.getGroupId,
CallId.getAndIncrement(),
SetConfigurationRequest.Arguments.newBuilder.setServersInNewConf(peers.asJava).build()))
}

private def getRaftPeers(): Seq[RaftPeer] = {
ratisServer.getGroupInfo.getGroup.getPeers.asScala.toSeq
}

private def ensureLeaderElectionMemberMajorityAddEnabled[T](master: Master)(f: => T): T = {
ensureMasterIsLeader(master) {
if (!master.conf.hasMasterRatisLeaderElectionMemeberMajorityAdd) {
throw new BadRequestException(s"This operation can only be done when" +
s" ${CelebornConf.HA_MASTER_RATIS_LEADER_ELECTION_MEMBER_MAJORITY_ADD.key} is true.")
}
f
}
}
}
Loading
Loading