[DS-9263][Improvement][master]optimize failover (#9281)
- add FailoverService.java - move failover method from MasterRegistryClient to FailoverService - move failover code from FailoverExecuteThread to FailoverService This closes #9263dependabot/maven/org.apache.hadoop-hadoop-common-3.2.3
parent
9d7223b038
commit
69923546a1
|
|
@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
|
|||
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
|
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
|
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
|
||||
import org.apache.dolphinscheduler.server.master.service.FailoverService;
|
||||
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
|
||||
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
|
||||
import org.apache.dolphinscheduler.service.process.ProcessService;
|
||||
|
|
@ -77,10 +78,10 @@ public class MasterRegistryClient {
|
|||
private static final Logger logger = LoggerFactory.getLogger(MasterRegistryClient.class);
|
||||
|
||||
/**
|
||||
* process service
|
||||
* failover service
|
||||
*/
|
||||
@Autowired
|
||||
private ProcessService processService;
|
||||
private FailoverService failoverService;
|
||||
|
||||
@Autowired
|
||||
private RegistryClient registryClient;
|
||||
|
|
@ -96,16 +97,11 @@ public class MasterRegistryClient {
|
|||
*/
|
||||
private ScheduledExecutorService heartBeatExecutor;
|
||||
|
||||
@Autowired
|
||||
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
|
||||
|
||||
/**
|
||||
* master startup time, ms
|
||||
*/
|
||||
private long startupTime;
|
||||
|
||||
private String localNodePath;
|
||||
|
||||
public void init() {
|
||||
this.startupTime = System.currentTimeMillis();
|
||||
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
|
||||
|
|
@ -153,10 +149,7 @@ public class MasterRegistryClient {
|
|||
return;
|
||||
}
|
||||
|
||||
String failoverPath = getFailoverLockPath(nodeType, serverHost);
|
||||
try {
|
||||
registryClient.getLock(failoverPath);
|
||||
|
||||
if (!registryClient.exists(path)) {
|
||||
logger.info("path: {} not exists", path);
|
||||
// handle dead server
|
||||
|
|
@ -165,12 +158,10 @@ public class MasterRegistryClient {
|
|||
|
||||
//failover server
|
||||
if (failover) {
|
||||
failoverServerWhenDown(serverHost, nodeType);
|
||||
failoverService.failoverServerWhenDown(serverHost, nodeType);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("{} server failover failed, host:{}", nodeType, serverHost, e);
|
||||
} finally {
|
||||
registryClient.releaseLock(failoverPath);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -199,287 +190,19 @@ public class MasterRegistryClient {
|
|||
}
|
||||
//failover server
|
||||
if (failover) {
|
||||
failoverServerWhenDown(serverHost, nodeType);
|
||||
failoverService.failoverServerWhenDown(serverHost, nodeType);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("{} server failover failed", nodeType, e);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isNeedToHandleDeadServer(String host, NodeType nodeType, Duration sessionTimeout) {
|
||||
long sessionTimeoutMillis = Math.max(Constants.REGISTRY_SESSION_TIMEOUT, sessionTimeout.toMillis());
|
||||
List<Server> serverList = registryClient.getServerList(nodeType);
|
||||
if (CollectionUtils.isEmpty(serverList)) {
|
||||
return true;
|
||||
}
|
||||
Date startupTime = getServerStartupTime(serverList, host);
|
||||
if (startupTime == null) {
|
||||
return true;
|
||||
}
|
||||
if (System.currentTimeMillis() - startupTime.getTime() > sessionTimeoutMillis) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* failover server when server down
|
||||
*
|
||||
* @param serverHost server host
|
||||
* @param nodeType zookeeper node type
|
||||
*/
|
||||
private void failoverServerWhenDown(String serverHost, NodeType nodeType) {
|
||||
switch (nodeType) {
|
||||
case MASTER:
|
||||
failoverMaster(serverHost);
|
||||
break;
|
||||
case WORKER:
|
||||
failoverWorker(serverHost);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* get failover lock path
|
||||
*
|
||||
* @param nodeType zookeeper node type
|
||||
* @return fail over lock path
|
||||
*/
|
||||
public String getFailoverLockPath(NodeType nodeType, String host) {
|
||||
switch (nodeType) {
|
||||
case MASTER:
|
||||
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host;
|
||||
case WORKER:
|
||||
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host;
|
||||
default:
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* task needs failover if task start before worker starts
|
||||
*
|
||||
* @param workerServers worker servers
|
||||
* @param taskInstance task instance
|
||||
* @return true if task instance need fail over
|
||||
*/
|
||||
private boolean checkTaskInstanceNeedFailover(List<Server> workerServers, TaskInstance taskInstance) {
|
||||
|
||||
boolean taskNeedFailover = true;
|
||||
|
||||
//now no host will execute this task instance,so no need to failover the task
|
||||
if (taskInstance.getHost() == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
//if task start after worker starts, there is no need to failover the task.
|
||||
if (checkTaskAfterWorkerStart(workerServers, taskInstance)) {
|
||||
taskNeedFailover = false;
|
||||
}
|
||||
|
||||
return taskNeedFailover;
|
||||
}
|
||||
|
||||
/**
|
||||
* check task start after the worker server starts.
|
||||
*
|
||||
* @param taskInstance task instance
|
||||
* @return true if task instance start time after worker server start date
|
||||
*/
|
||||
private boolean checkTaskAfterWorkerStart(List<Server> workerServers, TaskInstance taskInstance) {
|
||||
if (StringUtils.isEmpty(taskInstance.getHost())) {
|
||||
return false;
|
||||
}
|
||||
Date workerServerStartDate = getServerStartupTime(workerServers, taskInstance.getHost());
|
||||
if (workerServerStartDate != null) {
|
||||
if (taskInstance.getStartTime() == null) {
|
||||
return taskInstance.getSubmitTime().after(workerServerStartDate);
|
||||
} else {
|
||||
return taskInstance.getStartTime().after(workerServerStartDate);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* get server startup time
|
||||
*/
|
||||
private Date getServerStartupTime(List<Server> servers, String host) {
|
||||
if (CollectionUtils.isEmpty(servers)) {
|
||||
return null;
|
||||
}
|
||||
Date serverStartupTime = null;
|
||||
for (Server server : servers) {
|
||||
if (host.equals(server.getHost() + Constants.COLON + server.getPort())) {
|
||||
serverStartupTime = server.getCreateTime();
|
||||
break;
|
||||
}
|
||||
}
|
||||
return serverStartupTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* get server startup time
|
||||
*/
|
||||
private Date getServerStartupTime(NodeType nodeType, String host) {
|
||||
if (StringUtils.isEmpty(host)) {
|
||||
return null;
|
||||
}
|
||||
List<Server> servers = registryClient.getServerList(nodeType);
|
||||
return getServerStartupTime(servers, host);
|
||||
}
|
||||
|
||||
/**
|
||||
* failover worker tasks
|
||||
* <p>
|
||||
* 1. kill yarn job if there are yarn jobs in tasks.
|
||||
* 2. change task state from running to need failover.
|
||||
* 3. failover all tasks when workerHost is null
|
||||
*
|
||||
* @param workerHost worker host
|
||||
*/
|
||||
private void failoverWorker(String workerHost) {
|
||||
|
||||
if (StringUtils.isEmpty(workerHost)) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
|
||||
Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
|
||||
logger.info("start worker[{}] failover, task list size:{}", workerHost, needFailoverTaskInstanceList.size());
|
||||
|
||||
for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
|
||||
ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
|
||||
if (processInstance == null) {
|
||||
processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
|
||||
if (processInstance == null) {
|
||||
logger.error("failover task instance error, processInstance {} of taskInstance {} is null",
|
||||
taskInstance.getProcessInstanceId(), taskInstance.getId());
|
||||
continue;
|
||||
}
|
||||
processInstanceCacheMap.put(processInstance.getId(), processInstance);
|
||||
}
|
||||
|
||||
if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// only failover the task owned myself if worker down.
|
||||
if (!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
|
||||
failoverTaskInstance(processInstance, taskInstance);
|
||||
}
|
||||
logger.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* failover master
|
||||
* <p>
|
||||
* failover process instance and associated task instance
|
||||
*
|
||||
* @param masterHost master host
|
||||
*/
|
||||
public void failoverMaster(String masterHost) {
|
||||
|
||||
if (StringUtils.isEmpty(masterHost)) {
|
||||
return;
|
||||
}
|
||||
|
||||
Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost);
|
||||
List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
|
||||
logger.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size());
|
||||
|
||||
for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
|
||||
if (Constants.NULL.equals(processInstance.getHost())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
|
||||
for (TaskInstance taskInstance : validTaskInstanceList) {
|
||||
if (Constants.NULL.equals(taskInstance.getHost())) {
|
||||
continue;
|
||||
}
|
||||
if (taskInstance.getState().typeIsFinished()) {
|
||||
continue;
|
||||
}
|
||||
if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) {
|
||||
continue;
|
||||
}
|
||||
logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
|
||||
failoverTaskInstance(processInstance, taskInstance);
|
||||
}
|
||||
|
||||
if (serverStartupTime != null && processInstance.getRestartTime() != null
|
||||
&& processInstance.getRestartTime().after(serverStartupTime)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
logger.info("failover process instance id: {}", processInstance.getId());
|
||||
//updateProcessInstance host is null and insert into command
|
||||
processService.processNeedFailoverProcessInstances(processInstance);
|
||||
}
|
||||
|
||||
logger.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* failover task instance
|
||||
* <p>
|
||||
* 1. kill yarn job if there are yarn jobs in tasks.
|
||||
* 2. change task state from running to need failover.
|
||||
* 3. try to notify local master
|
||||
*/
|
||||
private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance) {
|
||||
if (taskInstance == null) {
|
||||
logger.error("failover task instance error, taskInstance is null");
|
||||
return;
|
||||
}
|
||||
|
||||
if (processInstance == null) {
|
||||
logger.error("failover task instance error, processInstance {} of taskInstance {} is null",
|
||||
taskInstance.getProcessInstanceId(), taskInstance.getId());
|
||||
return;
|
||||
}
|
||||
|
||||
taskInstance.setProcessInstance(processInstance);
|
||||
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
|
||||
.buildTaskInstanceRelatedInfo(taskInstance)
|
||||
.buildProcessInstanceRelatedInfo(processInstance)
|
||||
.create();
|
||||
|
||||
if (masterConfig.isKillYarnJobWhenTaskFailover()) {
|
||||
// only kill yarn job if exists , the local thread has exited
|
||||
ProcessUtils.killYarnJob(taskExecutionContext);
|
||||
}
|
||||
|
||||
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
|
||||
processService.saveTaskInstance(taskInstance);
|
||||
|
||||
StateEvent stateEvent = new StateEvent();
|
||||
stateEvent.setTaskInstanceId(taskInstance.getId());
|
||||
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
|
||||
stateEvent.setProcessInstanceId(processInstance.getId());
|
||||
stateEvent.setExecutionStatus(taskInstance.getState());
|
||||
workflowExecuteThreadPool.submitStateEvent(stateEvent);
|
||||
}
|
||||
|
||||
/**
|
||||
* registry
|
||||
*/
|
||||
public void registry() {
|
||||
String address = NetUtils.getAddr(masterConfig.getListenPort());
|
||||
localNodePath = getMasterPath();
|
||||
String localNodePath = getMasterPath();
|
||||
int masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
|
||||
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
|
||||
masterConfig.getMaxCpuLoadAvg(),
|
||||
|
|
@ -509,6 +232,7 @@ public class MasterRegistryClient {
|
|||
}
|
||||
|
||||
public void handleConnectionState(ConnectionState state) {
|
||||
String localNodePath = getMasterPath();
|
||||
switch (state) {
|
||||
case CONNECTED:
|
||||
logger.debug("registry connection state is {}", state);
|
||||
|
|
@ -545,7 +269,7 @@ public class MasterRegistryClient {
|
|||
/**
|
||||
* get master path
|
||||
*/
|
||||
public String getMasterPath() {
|
||||
private String getMasterPath() {
|
||||
String address = getLocalAddress();
|
||||
return REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + address;
|
||||
}
|
||||
|
|
@ -553,7 +277,7 @@ public class MasterRegistryClient {
|
|||
/**
|
||||
* get local address
|
||||
*/
|
||||
public String getLocalAddress() {
|
||||
private String getLocalAddress() {
|
||||
return NetUtils.getAddr(masterConfig.getListenPort());
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -18,18 +18,10 @@
|
|||
package org.apache.dolphinscheduler.server.master.runner;
|
||||
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.common.enums.NodeType;
|
||||
import org.apache.dolphinscheduler.common.thread.Stopper;
|
||||
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
|
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
|
||||
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
|
||||
import org.apache.dolphinscheduler.service.process.ProcessService;
|
||||
import org.apache.dolphinscheduler.service.registry.RegistryClient;
|
||||
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import org.apache.dolphinscheduler.server.master.service.FailoverService;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -41,20 +33,14 @@ public class FailoverExecuteThread extends Thread {
|
|||
|
||||
private static final Logger logger = LoggerFactory.getLogger(FailoverExecuteThread.class);
|
||||
|
||||
@Autowired
|
||||
private MasterRegistryClient masterRegistryClient;
|
||||
|
||||
@Autowired
|
||||
private RegistryClient registryClient;
|
||||
|
||||
@Autowired
|
||||
private MasterConfig masterConfig;
|
||||
|
||||
/**
|
||||
* process service
|
||||
* failover service
|
||||
*/
|
||||
@Autowired
|
||||
private ProcessService processService;
|
||||
private FailoverService failoverService;
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
|
|
@ -67,23 +53,7 @@ public class FailoverExecuteThread extends Thread {
|
|||
logger.info("failover execute thread started");
|
||||
while (Stopper.isRunning()) {
|
||||
try {
|
||||
List<String> hosts = getNeedFailoverMasterServers();
|
||||
if (CollectionUtils.isEmpty(hosts)) {
|
||||
continue;
|
||||
}
|
||||
logger.info("need failover hosts:{}", hosts);
|
||||
|
||||
for (String host : hosts) {
|
||||
String failoverPath = masterRegistryClient.getFailoverLockPath(NodeType.MASTER, host);
|
||||
try {
|
||||
registryClient.getLock(failoverPath);
|
||||
masterRegistryClient.failoverMaster(host);
|
||||
} catch (Exception e) {
|
||||
logger.error("{} server failover failed, host:{}", NodeType.MASTER, host, e);
|
||||
} finally {
|
||||
registryClient.releaseLock(failoverPath);
|
||||
}
|
||||
}
|
||||
failoverService.checkMasterFailover();
|
||||
} catch (Exception e) {
|
||||
logger.error("failover execute error", e);
|
||||
} finally {
|
||||
|
|
@ -91,20 +61,4 @@ public class FailoverExecuteThread extends Thread {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> getNeedFailoverMasterServers() {
|
||||
// failover myself && failover dead masters
|
||||
List<String> hosts = processService.queryNeedFailoverProcessInstanceHost();
|
||||
|
||||
Iterator<String> iterator = hosts.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
String host = iterator.next();
|
||||
if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
|
||||
if (!host.equals(masterRegistryClient.getLocalAddress())) {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
return hosts;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,369 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.server.master.service;
|
||||
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.common.enums.NodeType;
|
||||
import org.apache.dolphinscheduler.common.enums.StateEvent;
|
||||
import org.apache.dolphinscheduler.common.enums.StateEventType;
|
||||
import org.apache.dolphinscheduler.common.model.Server;
|
||||
import org.apache.dolphinscheduler.common.utils.NetUtils;
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
|
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
|
||||
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
|
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
|
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
|
||||
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
|
||||
import org.apache.dolphinscheduler.service.process.ProcessService;
|
||||
import org.apache.dolphinscheduler.service.registry.RegistryClient;
|
||||
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* failover service
|
||||
*/
|
||||
@Component
|
||||
public class FailoverService {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(FailoverService.class);
|
||||
private final RegistryClient registryClient;
|
||||
private final MasterConfig masterConfig;
|
||||
private final ProcessService processService;
|
||||
private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
|
||||
|
||||
public FailoverService(RegistryClient registryClient, MasterConfig masterConfig, ProcessService processService,
|
||||
WorkflowExecuteThreadPool workflowExecuteThreadPool) {
|
||||
this.registryClient = registryClient;
|
||||
this.masterConfig = masterConfig;
|
||||
this.processService = processService;
|
||||
this.workflowExecuteThreadPool = workflowExecuteThreadPool;
|
||||
}
|
||||
|
||||
/**
|
||||
* check master failover
|
||||
*/
|
||||
public void checkMasterFailover() {
|
||||
List<String> hosts = getNeedFailoverMasterServers();
|
||||
if (CollectionUtils.isEmpty(hosts)) {
|
||||
return;
|
||||
}
|
||||
LOGGER.info("need failover hosts:{}", hosts);
|
||||
|
||||
for (String host : hosts) {
|
||||
failoverMasterWithLock(host);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* failover server when server down
|
||||
*
|
||||
* @param serverHost server host
|
||||
* @param nodeType node type
|
||||
*/
|
||||
public void failoverServerWhenDown(String serverHost, NodeType nodeType) {
|
||||
switch (nodeType) {
|
||||
case MASTER:
|
||||
failoverMasterWithLock(serverHost);
|
||||
break;
|
||||
case WORKER:
|
||||
failoverWorker(serverHost);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private void failoverMasterWithLock(String masterHost) {
|
||||
String failoverPath = getFailoverLockPath(NodeType.MASTER, masterHost);
|
||||
try {
|
||||
registryClient.getLock(failoverPath);
|
||||
this.failoverMaster(masterHost);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("{} server failover failed, host:{}", NodeType.MASTER, masterHost, e);
|
||||
} finally {
|
||||
registryClient.releaseLock(failoverPath);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* failover master
|
||||
* <p>
|
||||
* failover process instance and associated task instance
|
||||
*
|
||||
* @param masterHost master host
|
||||
*/
|
||||
private void failoverMaster(String masterHost) {
|
||||
if (StringUtils.isEmpty(masterHost)) {
|
||||
return;
|
||||
}
|
||||
Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost);
|
||||
long startTime = System.currentTimeMillis();
|
||||
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
|
||||
LOGGER.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size());
|
||||
List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
|
||||
for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
|
||||
if (Constants.NULL.equals(processInstance.getHost())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
|
||||
for (TaskInstance taskInstance : validTaskInstanceList) {
|
||||
LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
|
||||
failoverTaskInstance(processInstance, taskInstance, workerServers);
|
||||
}
|
||||
|
||||
if (serverStartupTime != null && processInstance.getRestartTime() != null
|
||||
&& processInstance.getRestartTime().after(serverStartupTime)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
LOGGER.info("failover process instance id: {}", processInstance.getId());
|
||||
//updateProcessInstance host is null and insert into command
|
||||
processInstance.setHost(Constants.NULL);
|
||||
processService.processNeedFailoverProcessInstances(processInstance);
|
||||
}
|
||||
|
||||
LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* failover worker tasks
|
||||
* <p>
|
||||
* 1. kill yarn job if there are yarn jobs in tasks.
|
||||
* 2. change task state from running to need failover.
|
||||
* 3. failover all tasks when workerHost is null
|
||||
*
|
||||
* @param workerHost worker host
|
||||
*/
|
||||
private void failoverWorker(String workerHost) {
|
||||
if (StringUtils.isEmpty(workerHost)) {
|
||||
return;
|
||||
}
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
|
||||
Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
|
||||
LOGGER.info("start worker[{}] failover, task list size:{}", workerHost, needFailoverTaskInstanceList.size());
|
||||
List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
|
||||
for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
|
||||
ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
|
||||
if (processInstance == null) {
|
||||
processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
|
||||
if (processInstance == null) {
|
||||
LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null",
|
||||
taskInstance.getProcessInstanceId(), taskInstance.getId());
|
||||
continue;
|
||||
}
|
||||
processInstanceCacheMap.put(processInstance.getId(), processInstance);
|
||||
}
|
||||
|
||||
// only failover the task owned myself if worker down.
|
||||
if (!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
|
||||
failoverTaskInstance(processInstance, taskInstance, workerServers);
|
||||
}
|
||||
LOGGER.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* failover task instance
|
||||
* <p>
|
||||
* 1. kill yarn job if there are yarn jobs in tasks.
|
||||
* 2. change task state from running to need failover.
|
||||
* 3. try to notify local master
|
||||
*/
|
||||
private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance, List<Server> workerServers) {
|
||||
if (processInstance == null) {
|
||||
LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null",
|
||||
taskInstance.getProcessInstanceId(), taskInstance.getId());
|
||||
return;
|
||||
}
|
||||
if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) {
|
||||
return;
|
||||
}
|
||||
|
||||
taskInstance.setProcessInstance(processInstance);
|
||||
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
|
||||
.buildTaskInstanceRelatedInfo(taskInstance)
|
||||
.buildProcessInstanceRelatedInfo(processInstance)
|
||||
.create();
|
||||
|
||||
if (masterConfig.isKillYarnJobWhenTaskFailover()) {
|
||||
// only kill yarn job if exists , the local thread has exited
|
||||
ProcessUtils.killYarnJob(taskExecutionContext);
|
||||
}
|
||||
|
||||
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
|
||||
processService.saveTaskInstance(taskInstance);
|
||||
|
||||
StateEvent stateEvent = new StateEvent();
|
||||
stateEvent.setTaskInstanceId(taskInstance.getId());
|
||||
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
|
||||
stateEvent.setProcessInstanceId(processInstance.getId());
|
||||
stateEvent.setExecutionStatus(taskInstance.getState());
|
||||
workflowExecuteThreadPool.submitStateEvent(stateEvent);
|
||||
}
|
||||
|
||||
/**
|
||||
* get need failover master servers
|
||||
*
|
||||
* @return need failover master servers
|
||||
*/
|
||||
private List<String> getNeedFailoverMasterServers() {
|
||||
// failover myself && failover dead masters
|
||||
List<String> hosts = processService.queryNeedFailoverProcessInstanceHost();
|
||||
|
||||
Iterator<String> iterator = hosts.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
String host = iterator.next();
|
||||
if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
|
||||
if (!host.equals(getLocalAddress())) {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
return hosts;
|
||||
}
|
||||
|
||||
/**
|
||||
* task needs failover if task start before worker starts
|
||||
*
|
||||
* @param workerServers worker servers
|
||||
* @param taskInstance task instance
|
||||
* @return true if task instance need fail over
|
||||
*/
|
||||
private boolean checkTaskInstanceNeedFailover(List<Server> workerServers, TaskInstance taskInstance) {
|
||||
|
||||
boolean taskNeedFailover = true;
|
||||
|
||||
if (taskInstance == null) {
|
||||
LOGGER.error("failover task instance error, taskInstance is null");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (Constants.NULL.equals(taskInstance.getHost())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
//now no host will execute this task instance,so no need to failover the task
|
||||
if (taskInstance.getHost() == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
//if task start after worker starts, there is no need to failover the task.
|
||||
if (checkTaskAfterWorkerStart(workerServers, taskInstance)) {
|
||||
taskNeedFailover = false;
|
||||
}
|
||||
|
||||
return taskNeedFailover;
|
||||
}
|
||||
|
||||
/**
|
||||
* check task start after the worker server starts.
|
||||
*
|
||||
* @param taskInstance task instance
|
||||
* @return true if task instance start time after worker server start date
|
||||
*/
|
||||
private boolean checkTaskAfterWorkerStart(List<Server> workerServers, TaskInstance taskInstance) {
|
||||
if (StringUtils.isEmpty(taskInstance.getHost())) {
|
||||
return false;
|
||||
}
|
||||
Date workerServerStartDate = getServerStartupTime(workerServers, taskInstance.getHost());
|
||||
if (workerServerStartDate != null) {
|
||||
if (taskInstance.getStartTime() == null) {
|
||||
return taskInstance.getSubmitTime().after(workerServerStartDate);
|
||||
} else {
|
||||
return taskInstance.getStartTime().after(workerServerStartDate);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* get failover lock path
|
||||
*
|
||||
* @param nodeType zookeeper node type
|
||||
* @return fail over lock path
|
||||
*/
|
||||
private String getFailoverLockPath(NodeType nodeType, String host) {
|
||||
switch (nodeType) {
|
||||
case MASTER:
|
||||
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host;
|
||||
case WORKER:
|
||||
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host;
|
||||
default:
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* get server startup time
|
||||
*/
|
||||
private Date getServerStartupTime(NodeType nodeType, String host) {
|
||||
if (StringUtils.isEmpty(host)) {
|
||||
return null;
|
||||
}
|
||||
List<Server> servers = registryClient.getServerList(nodeType);
|
||||
return getServerStartupTime(servers, host);
|
||||
}
|
||||
|
||||
/**
|
||||
* get server startup time
|
||||
*/
|
||||
private Date getServerStartupTime(List<Server> servers, String host) {
|
||||
if (CollectionUtils.isEmpty(servers)) {
|
||||
return null;
|
||||
}
|
||||
Date serverStartupTime = null;
|
||||
for (Server server : servers) {
|
||||
if (host.equals(server.getHost() + Constants.COLON + server.getPort())) {
|
||||
serverStartupTime = server.getCreateTime();
|
||||
break;
|
||||
}
|
||||
}
|
||||
return serverStartupTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* get local address
|
||||
*/
|
||||
String getLocalAddress() {
|
||||
return NetUtils.getAddr(masterConfig.getListenPort());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,157 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.server.master.service;
|
||||
|
||||
import static org.mockito.BDDMockito.given;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.common.enums.CommandType;
|
||||
import org.apache.dolphinscheduler.common.enums.NodeType;
|
||||
import org.apache.dolphinscheduler.common.enums.StateEvent;
|
||||
import org.apache.dolphinscheduler.common.model.Server;
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
|
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
|
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
|
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
|
||||
import org.apache.dolphinscheduler.service.process.ProcessService;
|
||||
import org.apache.dolphinscheduler.service.registry.RegistryClient;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.powermock.core.classloader.annotations.PowerMockIgnore;
|
||||
import org.powermock.core.classloader.annotations.PrepareForTest;
|
||||
import org.powermock.modules.junit4.PowerMockRunner;
|
||||
import org.springframework.test.util.ReflectionTestUtils;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* MasterRegistryClientTest
|
||||
*/
|
||||
@RunWith(PowerMockRunner.class)
|
||||
@PrepareForTest({RegistryClient.class})
|
||||
@PowerMockIgnore({"javax.management.*"})
|
||||
public class FailoverServiceTest {
|
||||
@InjectMocks
|
||||
private FailoverService failoverService;
|
||||
|
||||
@Mock
|
||||
private MasterConfig masterConfig;
|
||||
|
||||
@Mock
|
||||
private RegistryClient registryClient;
|
||||
|
||||
@Mock
|
||||
private ProcessService processService;
|
||||
|
||||
@Mock
|
||||
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
|
||||
|
||||
private String testHost;
|
||||
private ProcessInstance processInstance;
|
||||
private TaskInstance taskInstance;
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
given(masterConfig.getListenPort()).willReturn(8080);
|
||||
|
||||
testHost = failoverService.getLocalAddress();
|
||||
String ip = testHost.split(":")[0];
|
||||
int port = Integer.valueOf(testHost.split(":")[1]);
|
||||
Assert.assertEquals(8080, port);
|
||||
|
||||
given(registryClient.getLock(Mockito.anyString())).willReturn(true);
|
||||
given(registryClient.releaseLock(Mockito.anyString())).willReturn(true);
|
||||
given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn(testHost);
|
||||
given(registryClient.getStoppable()).willReturn(cause -> {
|
||||
});
|
||||
given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any())).willReturn(true);
|
||||
doNothing().when(registryClient).handleDeadServer(Mockito.anySet(), Mockito.any(NodeType.class), Mockito.anyString());
|
||||
|
||||
processInstance = new ProcessInstance();
|
||||
processInstance.setId(1);
|
||||
processInstance.setHost(testHost);
|
||||
processInstance.setRestartTime(new Date());
|
||||
processInstance.setHistoryCmd("xxx");
|
||||
processInstance.setCommandType(CommandType.STOP);
|
||||
|
||||
taskInstance = new TaskInstance();
|
||||
taskInstance.setId(1);
|
||||
taskInstance.setStartTime(new Date());
|
||||
taskInstance.setHost(testHost);
|
||||
|
||||
given(processService.queryNeedFailoverTaskInstances(Mockito.anyString())).willReturn(Arrays.asList(taskInstance));
|
||||
given(processService.queryNeedFailoverProcessInstanceHost()).willReturn(Lists.newArrayList(testHost));
|
||||
given(processService.queryNeedFailoverProcessInstances(Mockito.anyString())).willReturn(Arrays.asList(processInstance));
|
||||
doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class));
|
||||
given(processService.findValidTaskListByProcessId(Mockito.anyInt())).willReturn(Lists.newArrayList(taskInstance));
|
||||
given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance);
|
||||
|
||||
Thread.sleep(1000);
|
||||
Server server = new Server();
|
||||
server.setHost(ip);
|
||||
server.setPort(port);
|
||||
server.setCreateTime(new Date());
|
||||
given(registryClient.getServerList(NodeType.WORKER)).willReturn(Arrays.asList(server));
|
||||
given(registryClient.getServerList(NodeType.MASTER)).willReturn(Arrays.asList(server));
|
||||
ReflectionTestUtils.setField(failoverService, "registryClient", registryClient);
|
||||
|
||||
doNothing().when(workflowExecuteThreadPool).submitStateEvent(Mockito.any(StateEvent.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkMasterFailoverTest() {
|
||||
failoverService.checkMasterFailover();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void failoverMasterTest() {
|
||||
processInstance.setHost(Constants.NULL);
|
||||
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
|
||||
failoverService.failoverServerWhenDown(testHost, NodeType.MASTER);
|
||||
Assert.assertNotEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE);
|
||||
|
||||
processInstance.setHost(testHost);
|
||||
taskInstance.setState(ExecutionStatus.SUCCESS);
|
||||
failoverService.failoverServerWhenDown(testHost, NodeType.MASTER);
|
||||
Assert.assertNotEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE);
|
||||
Assert.assertEquals(Constants.NULL, processInstance.getHost());
|
||||
|
||||
processInstance.setHost(testHost);
|
||||
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
|
||||
failoverService.failoverServerWhenDown(testHost, NodeType.MASTER);
|
||||
Assert.assertEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE);
|
||||
Assert.assertEquals(Constants.NULL, processInstance.getHost());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void failoverWorkTest() {
|
||||
failoverService.failoverServerWhenDown(testHost, NodeType.WORKER);
|
||||
Assert.assertEquals(taskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue