update zkclient
parent
9361cbbfdb
commit
3191eb92ff
|
|
@ -211,49 +211,11 @@ public abstract class AbstractZKClient {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* init system znode
|
||||
*/
|
||||
protected void initSystemZNode(){
|
||||
try {
|
||||
// read master node parent path from conf
|
||||
masterZNodeParentPath = getMasterZNodeParentPath();
|
||||
// read worker node parent path from conf
|
||||
workerZNodeParentPath = getWorkerZNodeParentPath();
|
||||
|
||||
// read server node parent path from conf
|
||||
deadServerZNodeParentPath = getDeadZNodeParentPath();
|
||||
|
||||
if(zkClient.checkExists().forPath(deadServerZNodeParentPath) == null){
|
||||
// create persistent dead server parent node
|
||||
zkClient.create().creatingParentContainersIfNeeded()
|
||||
.withMode(CreateMode.PERSISTENT).forPath(deadServerZNodeParentPath);
|
||||
}
|
||||
|
||||
if(zkClient.checkExists().forPath(masterZNodeParentPath) == null){
|
||||
// create persistent master parent node
|
||||
zkClient.create().creatingParentContainersIfNeeded()
|
||||
.withMode(CreateMode.PERSISTENT).forPath(masterZNodeParentPath);
|
||||
}
|
||||
|
||||
if(zkClient.checkExists().forPath(workerZNodeParentPath) == null){
|
||||
// create persistent worker parent node
|
||||
zkClient.create().creatingParentContainersIfNeeded()
|
||||
.withMode(CreateMode.PERSISTENT).forPath(workerZNodeParentPath);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("init system znode failed : " + e.getMessage(),e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void removeDeadServerByHost(String host, String serverType) throws Exception {
|
||||
List<String> deadServers = zkClient.getChildren().forPath(deadServerZNodeParentPath);
|
||||
for(String serverPath : deadServers){
|
||||
if(serverPath.startsWith(serverType+UNDERLINE+host)){
|
||||
|
||||
String server = deadServerZNodeParentPath + SINGLE_SLASH + serverPath;
|
||||
zkClient.delete().forPath(server);
|
||||
logger.info("{} server {} deleted from zk dead server path success" , serverType , host);
|
||||
|
|
@ -394,10 +356,11 @@ public abstract class AbstractZKClient {
|
|||
* @return
|
||||
* @throws Exception
|
||||
*/
|
||||
public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) throws Exception {
|
||||
public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
|
||||
String path = getZNodeParentPath(zkNodeType);
|
||||
if(StringUtils.isEmpty(path)){
|
||||
logger.error("check zk node exists error, host:{}, zk node type:{}", host, zkNodeType.toString());
|
||||
logger.error("check zk node exists error, host:{}, zk node type:{}",
|
||||
host, zkNodeType.toString());
|
||||
return false;
|
||||
}
|
||||
Map<String, String> serverMaps = getServerList(zkNodeType);
|
||||
|
|
@ -508,7 +471,31 @@ public abstract class AbstractZKClient {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* init system znode
|
||||
*/
|
||||
protected void initSystemZNode(){
|
||||
try {
|
||||
createNodePath(getMasterZNodeParentPath());
|
||||
createNodePath(getWorkerZNodeParentPath());
|
||||
createNodePath(getDeadZNodeParentPath());
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("init system znode failed : " + e.getMessage(),e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* create zookeeper node path if not exists
|
||||
* @param zNodeParentPath
|
||||
* @throws Exception
|
||||
*/
|
||||
private void createNodePath(String zNodeParentPath) throws Exception {
|
||||
if(null == zkClient.checkExists().forPath(zNodeParentPath)){
|
||||
zkClient.create().creatingParentContainersIfNeeded()
|
||||
.withMode(CreateMode.PERSISTENT).forPath(zNodeParentPath);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
|
|
|
|||
|
|
@ -322,9 +322,9 @@ public class ZKMasterClient extends AbstractZKClient {
|
|||
// handle dead server
|
||||
handleDeadServer(path, Constants.WORKER_PREFIX, Constants.ADD_ZK_OP);
|
||||
|
||||
// create a distributed lock, and the root node path of the lock space is /escheduler/lock/failover/worker
|
||||
String znodeLock = zkMasterClient.getWorkerFailoverLockPath();
|
||||
mutex = new InterProcessMutex(zkMasterClient.getZkClient(), znodeLock);
|
||||
// create a distributed lock
|
||||
String znodeLock = getWorkerFailoverLockPath();
|
||||
mutex = new InterProcessMutex(getZkClient(), znodeLock);
|
||||
mutex.acquire();
|
||||
|
||||
String workerHost = getHostByEventDataPath(path);
|
||||
|
|
|
|||
Loading…
Reference in New Issue