Refactor worker (#2037)
* Refactor worker (#10) * Refactor worker (#2000) * Refactor worker (#2) * Refactor worker (#1993) * Refactor worker (#1) * add TaskResponseProcessor (#1983) * 1, master persistent task 2. extract master and worker communication model (#1992) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * updates Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * TaskExecutionContext create modify (#1994) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * updates * add- register processor Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * buildAckCommand taskInstanceId not set modify (#2002) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify (#2004) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment (#2006) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type (#2012) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * Refactor worker (#2018) * Refactor worker (#7) * Refactor worker (#2000) * Refactor worker (#2) * Refactor worker (#1993) * Refactor worker (#1) * add TaskResponseProcessor (#1983) * 1, master persistent task 2. extract master and worker communication model (#1992) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * updates Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * TaskExecutionContext create modify (#1994) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * updates * add- register processor Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * buildAckCommand taskInstanceId not set modify (#2002) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify (#2004) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment (#2006) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type (#2012) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * Refactor worker (#8) * Refactor worker (#2000) * Refactor worker (#2) * Refactor worker (#1993) * Refactor worker (#1) * add TaskResponseProcessor (#1983) * 1, master persistent task 2. extract master and worker communication model (#1992) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * updates Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * TaskExecutionContext create modify (#1994) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * updates * add- register processor Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * buildAckCommand taskInstanceId not set modify (#2002) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify (#2004) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment (#2006) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type (#2012) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * add kill command Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * add TaskInstanceCacheManager receive Worker report result,modify master polling db transfrom to cache (#2021) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * add TaskInstanceCacheManager receive Worker report result * TaskInstance setExecutePath * add TaskInstanceCacheManager to receive Worker Task result report * TaskInstanceCacheManager add remove method * add license * add dispatcht task method * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com> * refactor heartbeat logic * update registry and add worker group Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>refactor-worker
parent
8c545ffa9b
commit
1ce2dd2eec
|
|
@ -22,14 +22,15 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
|
|||
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
|
||||
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.OSUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.StringUtils;
|
||||
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
|
||||
import org.apache.dolphinscheduler.remote.command.CommandType;
|
||||
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
|
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
|
||||
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
|
||||
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
|
||||
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
|
||||
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
|
||||
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
|
||||
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
|
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
||||
import org.apache.dolphinscheduler.service.process.ProcessService;
|
||||
|
|
@ -45,8 +46,6 @@ import org.springframework.context.annotation.ComponentScan;
|
|||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* master server
|
||||
|
|
@ -65,11 +64,6 @@ public class MasterServer implements IStoppable {
|
|||
@Autowired
|
||||
private ZKMasterClient zkMasterClient = null;
|
||||
|
||||
/**
|
||||
* heartbeat thread pool
|
||||
*/
|
||||
private ScheduledExecutorService heartbeatMasterService;
|
||||
|
||||
/**
|
||||
* process service
|
||||
*/
|
||||
|
|
@ -87,6 +81,11 @@ public class MasterServer implements IStoppable {
|
|||
@Autowired
|
||||
private MasterConfig masterConfig;
|
||||
|
||||
/**
|
||||
* zookeeper registry center
|
||||
*/
|
||||
@Autowired
|
||||
private ZookeeperRegistryCenter zookeeperRegistryCenter;
|
||||
|
||||
/**
|
||||
* spring application context
|
||||
|
|
@ -95,8 +94,15 @@ public class MasterServer implements IStoppable {
|
|||
@Autowired
|
||||
private SpringApplicationContext springApplicationContext;
|
||||
|
||||
/**
|
||||
* netty remote server
|
||||
*/
|
||||
private NettyRemotingServer nettyRemotingServer;
|
||||
|
||||
/**
|
||||
* master registry
|
||||
*/
|
||||
private MasterRegistry masterRegistry;
|
||||
|
||||
/**
|
||||
* master server startup
|
||||
|
|
@ -115,7 +121,6 @@ public class MasterServer implements IStoppable {
|
|||
@PostConstruct
|
||||
public void run(){
|
||||
|
||||
//
|
||||
//init remoting server
|
||||
NettyServerConfig serverConfig = new NettyServerConfig();
|
||||
serverConfig.setListenPort(45678);
|
||||
|
|
@ -124,23 +129,17 @@ public class MasterServer implements IStoppable {
|
|||
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
|
||||
this.nettyRemotingServer.start();
|
||||
|
||||
//
|
||||
this.masterRegistry = new MasterRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), masterConfig.getMasterHeartbeatInterval());
|
||||
this.masterRegistry.registry();
|
||||
|
||||
//
|
||||
zkMasterClient.init();
|
||||
|
||||
masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
|
||||
|
||||
heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.DEFAULT_MASTER_HEARTBEAT_THREAD_NUM);
|
||||
|
||||
// heartbeat thread implement
|
||||
Runnable heartBeatThread = heartBeatThread();
|
||||
|
||||
zkMasterClient.setStoppable(this);
|
||||
|
||||
// regular heartbeat
|
||||
// delay 5 seconds, send heartbeat every 30 seconds
|
||||
heartbeatMasterService.
|
||||
scheduleAtFixedRate(heartBeatThread, 5, masterConfig.getMasterHeartbeatInterval(), TimeUnit.SECONDS);
|
||||
|
||||
// master scheduler thread
|
||||
MasterSchedulerThread masterSchedulerThread = new MasterSchedulerThread(
|
||||
zkMasterClient,
|
||||
|
|
@ -206,13 +205,8 @@ public class MasterServer implements IStoppable {
|
|||
}catch (Exception e){
|
||||
logger.warn("thread sleep exception ", e);
|
||||
}
|
||||
try {
|
||||
heartbeatMasterService.shutdownNow();
|
||||
}catch (Exception e){
|
||||
logger.warn("heartbeat service stopped exception");
|
||||
}
|
||||
|
||||
logger.info("heartbeat service stopped");
|
||||
this.nettyRemotingServer.close();
|
||||
this.masterRegistry.unRegistry();
|
||||
|
||||
//close quartz
|
||||
try{
|
||||
|
|
@ -247,35 +241,10 @@ public class MasterServer implements IStoppable {
|
|||
|
||||
logger.info("zookeeper service stopped");
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("master server stop exception ", e);
|
||||
System.exit(-1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* heartbeat thread implement
|
||||
* @return
|
||||
*/
|
||||
private Runnable heartBeatThread(){
|
||||
logger.info("start master heart beat thread...");
|
||||
Runnable heartBeatThread = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if(Stopper.isRunning()) {
|
||||
// send heartbeat to zk
|
||||
if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) {
|
||||
logger.error("master send heartbeat to zk failed: can't find zookeeper path of master server");
|
||||
return;
|
||||
}
|
||||
|
||||
zkMasterClient.heartBeatForZk(zkMasterClient.getMasterZNode(), Constants.MASTER_PREFIX);
|
||||
}
|
||||
}
|
||||
};
|
||||
return heartBeatThread;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,11 +19,21 @@ package org.apache.dolphinscheduler.server.master.registry;
|
|||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.state.ConnectionState;
|
||||
import org.apache.curator.framework.state.ConnectionStateListener;
|
||||
import org.apache.dolphinscheduler.common.utils.DateUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.OSUtils;
|
||||
import org.apache.dolphinscheduler.remote.utils.Constants;
|
||||
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
|
||||
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
|
||||
|
||||
/**
|
||||
* master registry
|
||||
*/
|
||||
|
|
@ -41,14 +51,32 @@ public class MasterRegistry {
|
|||
*/
|
||||
private final int port;
|
||||
|
||||
/**
|
||||
* heartbeat interval
|
||||
*/
|
||||
private final long heartBeatInterval;
|
||||
|
||||
/**
|
||||
* heartbeat executor
|
||||
*/
|
||||
private final ScheduledExecutorService heartBeatExecutor;
|
||||
|
||||
/**
|
||||
* worker start time
|
||||
*/
|
||||
private final String startTime;
|
||||
|
||||
/**
|
||||
* construct
|
||||
* @param zookeeperRegistryCenter zookeeperRegistryCenter
|
||||
* @param port port
|
||||
*/
|
||||
public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){
|
||||
public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){
|
||||
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
|
||||
this.port = port;
|
||||
this.heartBeatInterval = heartBeatInterval;
|
||||
this.startTime = DateUtils.dateToString(new Date());
|
||||
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -56,8 +84,8 @@ public class MasterRegistry {
|
|||
*/
|
||||
public void registry() {
|
||||
String address = Constants.LOCAL_ADDRESS;
|
||||
String localNodePath = getWorkerPath();
|
||||
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
|
||||
String localNodePath = getMasterPath();
|
||||
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
|
||||
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
|
||||
@Override
|
||||
public void stateChanged(CuratorFramework client, ConnectionState newState) {
|
||||
|
|
@ -65,13 +93,14 @@ public class MasterRegistry {
|
|||
logger.error("master : {} connection lost from zookeeper", address);
|
||||
} else if(newState == ConnectionState.RECONNECTED){
|
||||
logger.info("master : {} reconnected to zookeeper", address);
|
||||
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
|
||||
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
|
||||
} else if(newState == ConnectionState.SUSPENDED){
|
||||
logger.warn("master : {} connection SUSPENDED ", address);
|
||||
}
|
||||
}
|
||||
});
|
||||
logger.info("master node : {} registry to ZK successfully.", address);
|
||||
this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS);
|
||||
logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, heartBeatInterval);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -79,18 +108,18 @@ public class MasterRegistry {
|
|||
*/
|
||||
public void unRegistry() {
|
||||
String address = getLocalAddress();
|
||||
String localNodePath = getWorkerPath();
|
||||
String localNodePath = getMasterPath();
|
||||
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
|
||||
logger.info("worker node : {} unRegistry to ZK.", address);
|
||||
logger.info("master node : {} unRegistry to ZK.", address);
|
||||
}
|
||||
|
||||
/**
|
||||
* get worker path
|
||||
* get master path
|
||||
* @return
|
||||
*/
|
||||
private String getWorkerPath() {
|
||||
private String getMasterPath() {
|
||||
String address = getLocalAddress();
|
||||
String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address;
|
||||
String localNodePath = this.zookeeperRegistryCenter.getMasterPath() + "/" + address;
|
||||
return localNodePath;
|
||||
}
|
||||
|
||||
|
|
@ -101,4 +130,26 @@ public class MasterRegistry {
|
|||
private String getLocalAddress(){
|
||||
return Constants.LOCAL_ADDRESS + ":" + port;
|
||||
}
|
||||
|
||||
/**
|
||||
* hear beat task
|
||||
*/
|
||||
class HeartBeatTask implements Runnable{
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
StringBuilder builder = new StringBuilder(100);
|
||||
builder.append(OSUtils.cpuUsage()).append(COMMA);
|
||||
builder.append(OSUtils.memoryUsage()).append(COMMA);
|
||||
builder.append(OSUtils.loadAverage()).append(COMMA);
|
||||
builder.append(startTime).append(COMMA);
|
||||
builder.append(DateUtils.dateToString(new Date()));
|
||||
String masterPath = getMasterPath();
|
||||
zookeeperRegistryCenter.getZookeeperCachedOperator().update(masterPath, builder.toString());
|
||||
} catch (Throwable ex){
|
||||
logger.error("error write master heartbeat info", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -150,7 +150,6 @@ public class WorkerServer implements IStoppable {
|
|||
*/
|
||||
public static void main(String[] args) {
|
||||
Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER);
|
||||
System.setProperty("spring.profiles.active","worker");
|
||||
new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args);
|
||||
}
|
||||
|
||||
|
|
@ -169,7 +168,7 @@ public class WorkerServer implements IStoppable {
|
|||
this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_REQUEST, new TaskKillProcessor());
|
||||
this.nettyRemotingServer.start();
|
||||
|
||||
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval());
|
||||
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), workerConfig.getWorkerGroup());
|
||||
this.workerRegistry.registry();
|
||||
|
||||
this.zkWorkerClient.init();
|
||||
|
|
@ -188,22 +187,12 @@ public class WorkerServer implements IStoppable {
|
|||
// submit kill process thread
|
||||
killExecutorService.execute(killProcessThread);
|
||||
|
||||
// new fetch task thread
|
||||
// FetchTaskThread fetchTaskThread = new FetchTaskThread(zkWorkerClient, processService, taskQueue);
|
||||
//
|
||||
// // submit fetch task thread
|
||||
// fetchTaskExecutorService.execute(fetchTaskThread);
|
||||
|
||||
/**
|
||||
* register hooks, which are called before the process exits
|
||||
*/
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// worker server exit alert
|
||||
if (zkWorkerClient.getActiveMasterNum() <= 1) {
|
||||
alertDao.sendServerStopedAlert(1, OSUtils.getHost(), "Worker-Server");
|
||||
}
|
||||
stop("shutdownhook");
|
||||
}
|
||||
}));
|
||||
|
|
|
|||
|
|
@ -34,9 +34,20 @@ public class WorkerConfig {
|
|||
@Value("${worker.max.cpuload.avg}")
|
||||
private int workerMaxCpuloadAvg;
|
||||
|
||||
@Value("${master.reserved.memory}")
|
||||
@Value("${worker.reserved.memory}")
|
||||
private double workerReservedMemory;
|
||||
|
||||
@Value("${worker.group: DEFAULT}")
|
||||
private String workerGroup;
|
||||
|
||||
public String getWorkerGroup() {
|
||||
return workerGroup;
|
||||
}
|
||||
|
||||
public void setWorkerGroup(String workerGroup) {
|
||||
this.workerGroup = workerGroup;
|
||||
}
|
||||
|
||||
public int getWorkerExecThreads() {
|
||||
return workerExecThreads;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import org.apache.curator.framework.state.ConnectionState;
|
|||
import org.apache.curator.framework.state.ConnectionStateListener;
|
||||
import org.apache.dolphinscheduler.common.utils.DateUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.OSUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.StringUtils;
|
||||
import org.apache.dolphinscheduler.remote.utils.Constants;
|
||||
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
|
||||
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
|
||||
|
|
@ -33,6 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
|
||||
import static org.apache.dolphinscheduler.remote.utils.Constants.SLASH;
|
||||
|
||||
|
||||
/**
|
||||
|
|
@ -42,6 +44,8 @@ public class WorkerRegistry {
|
|||
|
||||
private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class);
|
||||
|
||||
private static final String DEFAULT_GROUP = "DEFAULT";
|
||||
|
||||
/**
|
||||
* zookeeper registry center
|
||||
*/
|
||||
|
|
@ -67,15 +71,30 @@ public class WorkerRegistry {
|
|||
*/
|
||||
private final String startTime;
|
||||
|
||||
/**
|
||||
* worker group
|
||||
*/
|
||||
private final String workerGroup;
|
||||
|
||||
/**
|
||||
* construct
|
||||
* @param zookeeperRegistryCenter zookeeperRegistryCenter
|
||||
* @param port port
|
||||
*/
|
||||
public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){
|
||||
this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_GROUP);
|
||||
}
|
||||
|
||||
/**
|
||||
* construct
|
||||
* @param zookeeperRegistryCenter zookeeperRegistryCenter
|
||||
* @param port port
|
||||
*/
|
||||
public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval, String workerGroup){
|
||||
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
|
||||
this.port = port;
|
||||
this.heartBeatInterval = heartBeatInterval;
|
||||
this.workerGroup = workerGroup;
|
||||
this.startTime = DateUtils.dateToString(new Date());
|
||||
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
|
||||
}
|
||||
|
|
@ -86,7 +105,7 @@ public class WorkerRegistry {
|
|||
public void registry() {
|
||||
String address = Constants.LOCAL_ADDRESS;
|
||||
String localNodePath = getWorkerPath();
|
||||
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
|
||||
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
|
||||
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
|
||||
@Override
|
||||
public void stateChanged(CuratorFramework client, ConnectionState newState) {
|
||||
|
|
@ -94,7 +113,7 @@ public class WorkerRegistry {
|
|||
logger.error("worker : {} connection lost from zookeeper", address);
|
||||
} else if(newState == ConnectionState.RECONNECTED){
|
||||
logger.info("worker : {} reconnected to zookeeper", address);
|
||||
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
|
||||
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
|
||||
} else if(newState == ConnectionState.SUSPENDED){
|
||||
logger.warn("worker : {} connection SUSPENDED ", address);
|
||||
}
|
||||
|
|
@ -122,8 +141,14 @@ public class WorkerRegistry {
|
|||
*/
|
||||
private String getWorkerPath() {
|
||||
String address = getLocalAddress();
|
||||
String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address;
|
||||
return localNodePath;
|
||||
StringBuilder builder = new StringBuilder(100);
|
||||
String workerPath = this.zookeeperRegistryCenter.getWorkerPath();
|
||||
builder.append(workerPath).append(SLASH);
|
||||
if(StringUtils.isNotEmpty(workerGroup) && !DEFAULT_GROUP.equalsIgnoreCase(workerGroup)){
|
||||
builder.append(workerGroup.trim()).append(SLASH);
|
||||
}
|
||||
builder.append(address);
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -149,7 +174,7 @@ public class WorkerRegistry {
|
|||
builder.append(startTime).append(COMMA);
|
||||
builder.append(DateUtils.dateToString(new Date()));
|
||||
String workerPath = getWorkerPath();
|
||||
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(workerPath, builder.toString());
|
||||
zookeeperRegistryCenter.getZookeeperCachedOperator().update(workerPath, builder.toString());
|
||||
} catch (Throwable ex){
|
||||
logger.error("error write worker heartbeat info", ex);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -100,9 +100,6 @@ public class ZKMasterClient extends AbstractZKClient {
|
|||
// init system znode
|
||||
this.initSystemZNode();
|
||||
|
||||
// register master
|
||||
this.registerMaster();
|
||||
|
||||
// check if fault tolerance is required,failure and tolerance
|
||||
if (getActiveMasterNum() == 1) {
|
||||
failoverWorker(null, true);
|
||||
|
|
@ -132,25 +129,6 @@ public class ZKMasterClient extends AbstractZKClient {
|
|||
return alertDao;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* register master znode
|
||||
*/
|
||||
public void registerMaster(){
|
||||
try {
|
||||
String serverPath = registerServer(ZKNodeType.MASTER);
|
||||
if(StringUtils.isEmpty(serverPath)){
|
||||
System.exit(-1);
|
||||
}
|
||||
masterZNode = serverPath;
|
||||
} catch (Exception e) {
|
||||
logger.error("register master failure ",e);
|
||||
System.exit(-1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* handle path events that this class cares about
|
||||
* @param client zkClient
|
||||
|
|
|
|||
|
|
@ -55,24 +55,6 @@ public class ZKWorkerClient extends AbstractZKClient {
|
|||
// init system znode
|
||||
this.initSystemZNode();
|
||||
|
||||
// register worker
|
||||
this.registWorker();
|
||||
}
|
||||
|
||||
/**
|
||||
* register worker
|
||||
*/
|
||||
private void registWorker(){
|
||||
try {
|
||||
String serverPath = registerServer(ZKNodeType.WORKER);
|
||||
if(StringUtils.isEmpty(serverPath)){
|
||||
System.exit(-1);
|
||||
}
|
||||
workerZNode = serverPath;
|
||||
} catch (Exception e) {
|
||||
logger.error("register worker failure",e);
|
||||
System.exit(-1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -46,40 +46,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
|
|||
*/
|
||||
protected IStoppable stoppable = null;
|
||||
|
||||
/**
|
||||
* heartbeat for zookeeper
|
||||
* @param znode zookeeper node
|
||||
* @param serverType server type
|
||||
*/
|
||||
public void heartBeatForZk(String znode, String serverType){
|
||||
try {
|
||||
|
||||
//check dead or not in zookeeper
|
||||
if(zkClient.getState() == CuratorFrameworkState.STOPPED || checkIsDeadServer(znode, serverType)){
|
||||
stoppable.stop("i was judged to death, release resources and stop myself");
|
||||
return;
|
||||
}
|
||||
|
||||
String resInfoStr = super.get(znode);
|
||||
String[] splits = resInfoStr.split(Constants.COMMA);
|
||||
if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
|
||||
return;
|
||||
}
|
||||
String str = splits[0] + Constants.COMMA
|
||||
+ splits[1] + Constants.COMMA
|
||||
+ OSUtils.cpuUsage() + Constants.COMMA
|
||||
+ OSUtils.memoryUsage() + Constants.COMMA
|
||||
+ OSUtils.loadAverage() + Constants.COMMA
|
||||
+ splits[5] + Constants.COMMA
|
||||
+ DateUtils.dateToString(new Date());
|
||||
zkClient.setData().forPath(znode,str.getBytes());
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("heartbeat for zk failed", e);
|
||||
stoppable.stop("heartbeat for zk exception, release resources and stop myself");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* check dead server or not , if dead, stop self
|
||||
*
|
||||
|
|
|
|||
Loading…
Reference in New Issue