* dispatch task fail will set task status failed * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * send mail error, #2466 bug fix * send mail error, #2466 bug fix * send mail error, #2466 bug fix * send mail error, #2466 bug fix * #2486 bug fix * host and workergroup compatible * EnterpriseWeChatUtils modify * EnterpriseWeChatUtils modify * EnterpriseWeChatUtils modify * #2499 bug fix * add comment * revert comment * revert comment * #2499 buf fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * No master don't create command #2571 * No master don't create command #2571 Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>pull/2/head
parent
a12103b08c
commit
5666e6b75a
|
|
@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.enums.ExecuteType;
|
|||
import org.apache.dolphinscheduler.api.enums.Status;
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.common.enums.*;
|
||||
import org.apache.dolphinscheduler.common.model.Server;
|
||||
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.DateUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils;
|
||||
|
|
@ -59,7 +60,7 @@ public class ExecutorService extends BaseService{
|
|||
private ProcessDefinitionMapper processDefinitionMapper;
|
||||
|
||||
@Autowired
|
||||
private ProcessDefinitionService processDefinitionService;
|
||||
private MonitorService monitorService;
|
||||
|
||||
|
||||
@Autowired
|
||||
|
|
@ -123,6 +124,14 @@ public class ExecutorService extends BaseService{
|
|||
return result;
|
||||
}
|
||||
|
||||
// check master server exists
|
||||
List<Server> masterServers = monitorService.getServerListFromZK(true);
|
||||
|
||||
|
||||
if (masterServers.size() == 0) {
|
||||
putMsg(result, Status.MASTER_NOT_EXISTS);
|
||||
return result;
|
||||
}
|
||||
/**
|
||||
* create command
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -365,6 +365,7 @@ public class SchedulerService extends BaseService {
|
|||
|
||||
if (masterServers.size() == 0) {
|
||||
putMsg(result, Status.MASTER_NOT_EXISTS);
|
||||
return result;
|
||||
}
|
||||
|
||||
// set status
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
|
|||
import org.apache.dolphinscheduler.common.enums.Priority;
|
||||
import org.apache.dolphinscheduler.common.enums.ReleaseState;
|
||||
import org.apache.dolphinscheduler.common.enums.RunMode;
|
||||
import org.apache.dolphinscheduler.common.model.Server;
|
||||
import org.apache.dolphinscheduler.dao.entity.*;
|
||||
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
|
||||
|
|
@ -63,6 +64,9 @@ public class ExecutorService2Test {
|
|||
@Mock
|
||||
private ProjectService projectService;
|
||||
|
||||
@Mock
|
||||
private MonitorService monitorService;
|
||||
|
||||
private int processDefinitionId = 1;
|
||||
|
||||
private int tenantId = 1;
|
||||
|
|
@ -102,6 +106,7 @@ public class ExecutorService2Test {
|
|||
Mockito.when(processDefinitionMapper.selectById(processDefinitionId)).thenReturn(processDefinition);
|
||||
Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
|
||||
Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1);
|
||||
Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(getMasterServersList());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -121,7 +126,6 @@ public class ExecutorService2Test {
|
|||
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
|
||||
verify(processService, times(1)).createCommand(any(Command.class));
|
||||
}catch (Exception e){
|
||||
Assert.assertTrue(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -142,7 +146,6 @@ public class ExecutorService2Test {
|
|||
Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
|
||||
verify(processService, times(0)).createCommand(any(Command.class));
|
||||
}catch (Exception e){
|
||||
Assert.assertTrue(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -163,7 +166,6 @@ public class ExecutorService2Test {
|
|||
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
|
||||
verify(processService, times(1)).createCommand(any(Command.class));
|
||||
}catch (Exception e){
|
||||
Assert.assertTrue(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -184,7 +186,6 @@ public class ExecutorService2Test {
|
|||
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
|
||||
verify(processService, times(31)).createCommand(any(Command.class));
|
||||
}catch (Exception e){
|
||||
Assert.assertTrue(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -205,10 +206,42 @@ public class ExecutorService2Test {
|
|||
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
|
||||
verify(processService, times(15)).createCommand(any(Command.class));
|
||||
}catch (Exception e){
|
||||
Assert.assertTrue(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testNoMsterServers() throws ParseException{
|
||||
Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(new ArrayList<Server>());
|
||||
|
||||
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
|
||||
processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
|
||||
null, null,
|
||||
null, null, 0,
|
||||
"", "", RunMode.RUN_MODE_PARALLEL,
|
||||
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110);
|
||||
Assert.assertEquals(result.get(Constants.STATUS),Status.MASTER_NOT_EXISTS);
|
||||
|
||||
}
|
||||
|
||||
private List<Server> getMasterServersList(){
|
||||
List<Server> masterServerList = new ArrayList<>();
|
||||
Server masterServer1 = new Server();
|
||||
masterServer1.setId(1);
|
||||
masterServer1.setHost("192.168.220.188");
|
||||
masterServer1.setPort(1121);
|
||||
masterServerList.add(masterServer1);
|
||||
|
||||
Server masterServer2 = new Server();
|
||||
masterServer2.setId(2);
|
||||
masterServer2.setHost("192.168.220.189");
|
||||
masterServer2.setPort(1122);
|
||||
masterServerList.add(masterServer2);
|
||||
|
||||
return masterServerList;
|
||||
|
||||
}
|
||||
|
||||
private List<Schedule> zeroSchedulerList(){
|
||||
return Collections.EMPTY_LIST;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue