Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix-16942] Fix global master failover might cause master dead #16953

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
@SuperBuilder
public abstract class BaseServerMetadata implements IClusters.IServerMetadata {

private final int processId;

// The server startup time in milliseconds.
private final long serverStartupTime;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class MasterServerMetadata extends BaseServerMetadata implements Comparab

public static MasterServerMetadata parseFromHeartBeat(final MasterHeartBeat masterHeartBeat) {
return MasterServerMetadata.builder()
.processId(masterHeartBeat.getProcessId())
.serverStartupTime(masterHeartBeat.getStartupTime())
.address(masterHeartBeat.getHost() + Constants.COLON + masterHeartBeat.getPort())
.cpuUsage(masterHeartBeat.getCpuUsage())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class WorkerServerMetadata extends BaseServerMetadata {

public static WorkerServerMetadata parseFromHeartBeat(final WorkerHeartBeat workerHeartBeat) {
return WorkerServerMetadata.builder()
.processId(workerHeartBeat.getProcessId())
.serverStartupTime(workerHeartBeat.getStartupTime())
.address(workerHeartBeat.getHost() + Constants.COLON + workerHeartBeat.getPort())
.workerGroup(workerHeartBeat.getWorkerGroup())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
public class MasterFailoverEvent extends AbstractSystemEvent {

private final MasterServerMetadata masterServerMetadata;
// The time when the event occurred. This might be different at different nodes.
private final Date eventTime;

private MasterFailoverEvent(final MasterServerMetadata masterServerMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;

@Slf4j
@Component
Expand All @@ -65,9 +64,6 @@ public class FailoverCoordinator implements IFailoverCoordinator {
@Autowired
private WorkflowInstanceDao workflowInstanceDao;

@Autowired
private PlatformTransactionManager platformTransactionManager;

@Autowired
private WorkflowFailover workflowFailover;

Expand All @@ -81,13 +77,21 @@ public void globalMasterFailover(final GlobalMasterFailoverEvent globalMasterFai
final Optional<MasterServerMetadata> aliveMasterOptional =
clusterManager.getMasterClusters().getServer(masterAddress);
if (aliveMasterOptional.isPresent()) {
// If the master is alive, then we use the alive master's startup time as the failover deadline.
final MasterServerMetadata aliveMasterServerMetadata = aliveMasterOptional.get();
log.info("The master[{}] is alive, do global master failover on it", aliveMasterServerMetadata);
doMasterFailover(aliveMasterServerMetadata.getAddress(),
aliveMasterServerMetadata.getServerStartupTime());
doMasterFailover(
masterAddress,
aliveMasterServerMetadata.getServerStartupTime(),
RegistryUtils.getFailoveredNodePathWhichStartupTimeIsUnknown(
masterAddress));
} else {
// If the master is not alive, then we use the event time as the failover deadline.
log.info("The master[{}] is not alive, do global master failover on it", masterAddress);
doMasterFailover(masterAddress, globalMasterFailoverEvent.getEventTime().getTime());
doMasterFailover(
masterAddress,
globalMasterFailoverEvent.getEventTime().getTime(),
RegistryUtils.getFailoveredNodePathWhichStartupTimeIsUnknown(masterAddress));
}
}

Expand All @@ -99,53 +103,55 @@ public void globalMasterFailover(final GlobalMasterFailoverEvent globalMasterFai
public void failoverMaster(final MasterFailoverEvent masterFailoverEvent) {
final MasterServerMetadata masterServerMetadata = masterFailoverEvent.getMasterServerMetadata();
log.info("Master[{}] failover starting", masterServerMetadata);
final String masterAddress = masterServerMetadata.getAddress();

final Optional<MasterServerMetadata> aliveMasterOptional =
clusterManager.getMasterClusters().getServer(masterServerMetadata.getAddress());
clusterManager.getMasterClusters().getServer(masterAddress);
if (aliveMasterOptional.isPresent()) {
final MasterServerMetadata aliveMasterServerMetadata = aliveMasterOptional.get();
if (aliveMasterServerMetadata.getServerStartupTime() == masterServerMetadata.getServerStartupTime()) {
log.info("The master[{}] is alive, maybe it reconnect to registry skip failover", masterServerMetadata);
} else {
log.info("The master[{}] is alive, but the startup time is different, will failover on {}",
masterServerMetadata,
aliveMasterServerMetadata);
doMasterFailover(aliveMasterServerMetadata.getAddress(),
aliveMasterServerMetadata.getServerStartupTime());
return;
}
} else {
log.info("The master[{}] is not alive, will failover", masterServerMetadata);
doMasterFailover(masterServerMetadata.getAddress(), masterServerMetadata.getServerStartupTime());
}
doMasterFailover(
masterServerMetadata.getAddress(),
masterFailoverEvent.getEventTime().getTime(),
RegistryUtils.getFailoveredNodePath(
masterServerMetadata.getAddress(),
masterServerMetadata.getServerStartupTime(),
masterServerMetadata.getProcessId()));
}

/**
* Do master failover.
* <p> Will failover the workflow which is scheduled by the master and the workflow's fire time is before the maxWorkflowFireTime.
*/
private void doMasterFailover(final String masterAddress, final long masterStartupTime) {
private void doMasterFailover(final String masterAddress,
final long workflowFailoverDeadline,
final String masterFailoverNodePath) {
// We use lock to avoid multiple master failover at the same time.
// Once the workflow has been failovered, then it's state will be changed to FAILOVER
// Once the FAILOVER workflow has been refired, then it's host will be changed to the new master and have a new
// start time.
// So if a master has been failovered multiple times, there is no problem.
final StopWatch failoverTimeCost = StopWatch.createStarted();
registryClient.getLock(RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath());
registryClient.getLock(RegistryUtils.getMasterFailoverLockPath(masterAddress));
try {
final String failoverFinishedNodePath =
RegistryUtils.getFailoverFinishedNodePath(masterAddress, masterStartupTime);
if (registryClient.exists(failoverFinishedNodePath)) {
log.error("The master[{}-{}] is exist at: {}, means it has already been failovered, skip failover",
// If the master has already been failovered, then we skip the failover.
if (registryClient.exists(masterFailoverNodePath)
&& String.valueOf(workflowFailoverDeadline).equals(registryClient.get(masterFailoverNodePath))) {
log.error("The master[{}/{}] is exist at: {}, means it has already been failovered, skip failover",
masterAddress,
masterStartupTime,
failoverFinishedNodePath);
workflowFailoverDeadline,
masterFailoverNodePath);
return;
}
final List<WorkflowInstance> needFailoverWorkflows =
getFailoverWorkflowsForMaster(masterAddress, new Date(masterStartupTime));
getFailoverWorkflowsForMaster(masterAddress, new Date(workflowFailoverDeadline));
needFailoverWorkflows.forEach(workflowFailover::failoverWorkflow);
registryClient.persist(masterFailoverNodePath, String.valueOf(workflowFailoverDeadline));
failoverTimeCost.stop();
registryClient.persist(failoverFinishedNodePath, String.valueOf(System.currentTimeMillis()));
log.info("Master[{}] failover {} workflows finished, cost: {}/ms",
masterAddress,
needFailoverWorkflows.size(),
Expand Down Expand Up @@ -190,28 +196,30 @@ public void failoverWorker(final WorkerFailoverEvent workerFailoverEvent) {
final WorkerServerMetadata aliveWorkerServerMetadata = aliveWorkerOptional.get();
if (aliveWorkerServerMetadata.getServerStartupTime() == workerServerMetadata.getServerStartupTime()) {
log.info("The worker[{}] is alive, maybe it reconnect to registry skip failover", workerServerMetadata);
} else {
log.info("The worker[{}] is alive, but the startup time is different, will failover on {}",
workerServerMetadata,
aliveWorkerServerMetadata);
doWorkerFailover(aliveWorkerServerMetadata.getAddress(),
aliveWorkerServerMetadata.getServerStartupTime());
return;
}
} else {
log.info("The worker[{}] is not alive, will failover", workerServerMetadata);
doWorkerFailover(workerServerMetadata.getAddress(), workerServerMetadata.getServerStartupTime());
}
doWorkerFailover(
workerServerMetadata.getAddress(),
System.currentTimeMillis(),
RegistryUtils.getFailoveredNodePath(
workerServerMetadata.getAddress(),
workerServerMetadata.getServerStartupTime(),
workerServerMetadata.getProcessId()));
}

private void doWorkerFailover(final String workerAddress, final long workerCrashTime) {
private void doWorkerFailover(final String workerAddress,
final long taskFailoverDeadline,
final String workerFailoverNodePath) {
final StopWatch failoverTimeCost = StopWatch.createStarted();
// we don't check the workerFailoverNodePath exist, since the worker may be failovered multiple master

final List<ITaskExecutionRunnable> needFailoverTasks =
getFailoverTaskForWorker(workerAddress, new Date(workerCrashTime));
getFailoverTaskForWorker(workerAddress, new Date(taskFailoverDeadline));
needFailoverTasks.forEach(taskFailover::failoverTask);

registryClient.persist(
RegistryUtils.getFailoverFinishedNodePath(workerAddress, workerCrashTime),
workerFailoverNodePath,
String.valueOf(System.currentTimeMillis()));
failoverTimeCost.stop();
log.info("Worker[{}] failover {} tasks finished, cost: {}/ms",
Expand All @@ -221,7 +229,7 @@ private void doWorkerFailover(final String workerAddress, final long workerCrash
}

private List<ITaskExecutionRunnable> getFailoverTaskForWorker(final String workerAddress,
final Date workerCrashTime) {
final Date taskFailoverDeadline) {
return workflowRepository.getAll()
.stream()
.map(IWorkflowExecutionRunnable::getWorkflowExecutionGraph)
Expand All @@ -237,7 +245,7 @@ private List<ITaskExecutionRunnable> getFailoverTaskForWorker(final String worke
// The submitTime should not be null.
// This is a bad case unless someone manually set the submitTime to null.
final Date submitTime = taskExecutionRunnable.getTaskInstance().getSubmitTime();
return submitTime != null && submitTime.before(workerCrashTime);
return submitTime != null && submitTime.before(taskFailoverDeadline);
})
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public MasterHeartBeat getHeartBeat() {

@Override
public void writeHeartBeat(final MasterHeartBeat masterHeartBeat) {
final String failoverNodePath = RegistryUtils.getFailoverFinishedNodePath(masterHeartBeat);
final String failoverNodePath = RegistryUtils.getFailoveredNodePath(masterHeartBeat);
if (registryClient.exists(failoverNodePath)) {
log.warn("The master: {} is under {}, means it has been failover will close myself",
masterHeartBeat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public enum RegistryNodeType {

FAILOVER_FINISH_NODES("FailoverFinishNodes", "/nodes/failover-finish-nodes"),

GLOBAL_MASTER_FAILOVER_LOCK("GlobalMasterFailoverLock", "/lock/global-master-failover"),
MASTER("Master", "/nodes/master"),
MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
MASTER_COORDINATOR("MasterCoordinator", "/nodes/master-coordinator"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,30 @@
import org.apache.dolphinscheduler.common.model.BaseHeartBeat;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;

import com.google.common.base.Preconditions;

public class RegistryUtils {

public static String getFailoverFinishedNodePath(final BaseHeartBeat baseHeartBeat) {
return getFailoverFinishedNodePath(baseHeartBeat.getHost() + ":" + baseHeartBeat.getPort(),
baseHeartBeat.getStartupTime());
public static String getMasterFailoverLockPath(final String masterAddress) {
Preconditions.checkNotNull(masterAddress, "master address cannot be null");
return RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath() + "/" + masterAddress;
}

public static String getFailoveredNodePathWhichStartupTimeIsUnknown(final String serverAddress) {
return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/" + serverAddress + "-" + "unknown" + "-"
+ "unknown";
}

public static String getFailoveredNodePath(final BaseHeartBeat baseHeartBeat) {
return getFailoveredNodePath(
baseHeartBeat.getHost() + ":" + baseHeartBeat.getPort(),
baseHeartBeat.getStartupTime(),
baseHeartBeat.getProcessId());
}

public static String getFailoverFinishedNodePath(final String masterAddress, final long masterStartupTime) {
return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/" + masterAddress + "-" + masterStartupTime;
public static String getFailoveredNodePath(final String serverAddress, final long serverStartupTime,
final int processId) {
return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/" + serverAddress + "-" + serverStartupTime
+ "-" + processId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public WorkerHeartBeat getHeartBeat() {

@Override
public void writeHeartBeat(final WorkerHeartBeat workerHeartBeat) {
final String failoverNodePath = RegistryUtils.getFailoverFinishedNodePath(workerHeartBeat);
final String failoverNodePath = RegistryUtils.getFailoveredNodePath(workerHeartBeat);
if (registryClient.exists(failoverNodePath)) {
log.warn("The worker: {} is under {}, means it has been failover will close myself",
workerHeartBeat,
Expand Down
Loading