From d3bd7309fb623d87ae7f35a01bdaec727a672be4 Mon Sep 17 00:00:00 2001 From: wind Date: Wed, 29 Dec 2021 15:35:03 +0800 Subject: [PATCH] [Bug-7686][Server]fix restart server after kill force (#7688) * [DS-7686][Server]fix restart server after kill force * update registry logic Co-authored-by: caishunfeng <534328519@qq.com> --- .../master/registry/MasterRegistryClient.java | 21 +++++++++------- .../worker/registry/WorkerRegistryClient.java | 24 +++++++++++++++---- 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index d97c8c241..d23ab3600 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -118,14 +118,6 @@ public class MasterRegistryClient { registryClient.getLock(nodeLock); // master registry registry(); - String registryPath = getMasterPath(); - registryClient.handleDeadServer(Collections.singleton(registryPath), NodeType.MASTER, Constants.DELETE_OP); - - // init system node - - while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) { - ThreadUtils.sleep(SLEEP_TIME_MILLIS); - } registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener()); } catch (Exception e) { @@ -500,7 +492,20 @@ public class MasterRegistryClient { Constants.MASTER_TYPE, registryClient); + // remove before persist + registryClient.remove(localNodePath); registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo()); + + while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) { + ThreadUtils.sleep(SLEEP_TIME_MILLIS); + } + + // sleep 1s, waiting master failover remove + ThreadUtils.sleep(SLEEP_TIME_MILLIS); + + // delete dead server + registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP); + registryClient.addConnectionStateListener(this::handleConnectionState); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java index 037db2a32..e218b0a4a 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java @@ -20,10 +20,12 @@ package org.apache.dolphinscheduler.server.worker.registry; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; +import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.registry.HeartBeatTask; @@ -99,11 +101,6 @@ public class WorkerRegistryClient { Set workerZkPaths = getWorkerZkPaths(); int workerHeartbeatInterval = workerConfig.getHeartbeatInterval(); - for (String workerZKPath : workerZkPaths) { - registryClient.persistEphemeral(workerZKPath, ""); - logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath); - } - HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, workerConfig.getMaxCpuLoadAvg(), workerConfig.getReservedMemory(), @@ -115,6 +112,23 @@ public class WorkerRegistryClient { workerManagerThread.getThreadPoolQueueSize() ); + for (String workerZKPath : workerZkPaths) { + // remove before persist + registryClient.remove(workerZKPath); + registryClient.persistEphemeral(workerZKPath, heartBeatTask.getHeartBeatInfo()); + logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath); + } + + while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.WORKER)) { + ThreadUtils.sleep(SLEEP_TIME_MILLIS); + } + + // sleep 1s, waiting master failover remove + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); + + // delete dead server + registryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP); + this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval); }