From 645847c096ecf26fab04d11fb4905ff84f2bd47e Mon Sep 17 00:00:00 2001 From: JinyLeeChina <42576980+JinyLeeChina@users.noreply.github.com> Date: Tue, 13 Apr 2021 22:47:36 +0800 Subject: [PATCH] Fix quoted bug about processDefineId of processInstance (#5263) Co-authored-by: JinyLeeChina <297062848@qq.com> --- .../api/service/ExecutorService.java | 4 +- .../api/service/impl/ExecutorServiceImpl.java | 10 ++--- .../impl/ProcessInstanceServiceImpl.java | 2 +- .../api/service/ExecutorService2Test.java | 1 - .../service/ProcessInstanceServiceTest.java | 8 ++-- .../dao/entity/ProcessInstance.java | 16 ------- .../dao/mapper/ProcessInstanceMapperTest.java | 1 - .../master/runner/MasterExecThread.java | 11 ++--- .../server/zk/ZKMasterClient.java | 3 +- .../server/master/ConditionsTaskTest.java | 1 - .../server/master/DependentTaskTest.java | 20 ++++----- .../server/master/MasterExecThreadTest.java | 1 - .../server/master/SubProcessTaskTest.java | 2 - .../service/process/ProcessService.java | 42 ++++--------------- .../service/process/ProcessServiceTest.java | 1 - 15 files changed, 35 insertions(+), 88 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 6bed9790b..72a0089bc 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -66,10 +66,10 @@ public interface ExecutorService { * check whether the process definition can be executed * * @param processDefinition process definition - * @param processDefineId process definition id + * @param processDefineCode process definition code * @return check result code */ - Map checkProcessDefinitionValid(ProcessDefinition processDefinition, int processDefineId); + Map checkProcessDefinitionValid(ProcessDefinition processDefinition, long processDefineCode); /** * do action to process instance:pause, stop, repeat, recover from pause, recover from stop diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index be8b9ffde..87c5123ca 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -195,18 +195,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * check whether the process definition can be executed * * @param processDefinition process definition - * @param processDefineId process definition id + * @param processDefineCode process definition code * @return check result code */ @Override - public Map checkProcessDefinitionValid(ProcessDefinition processDefinition, int processDefineId) { + public Map checkProcessDefinitionValid(ProcessDefinition processDefinition, long processDefineCode) { Map result = new HashMap<>(); if (processDefinition == null) { // check process definition exists - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId); + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineCode); } else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { // check process definition online - putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefineId); + putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefineCode); } else { result.put(Constants.STATUS, Status.SUCCESS); } @@ -246,7 +246,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) { - result = checkProcessDefinitionValid(processDefinition, processInstance.getProcessDefinitionId()); + result = checkProcessDefinitionValid(processDefinition, processInstance.getProcessDefinitionCode()); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index fa805f26d..afbbb1f6d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -204,11 +204,11 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); - processInstance.setWarningGroupId(processDefinition.getWarningGroupId()); if (processDefinition == null) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); } else { + processInstance.setWarningGroupId(processDefinition.getWarningGroupId()); ProcessData processData = processService.genProcessData(processDefinition); processInstance.setProcessInstanceJson(JSONUtils.toJsonString(processData)); result.put(DATA_LIST, processInstance); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java index b0af84f41..83d188924 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java @@ -117,7 +117,6 @@ public class ExecutorService2Test { // processInstance processInstance.setId(processInstanceId); - processInstance.setProcessDefinitionId(processDefinitionId); processInstance.setState(ExecutionStatus.FAILURE); processInstance.setExecutorId(userId); processInstance.setTenantId(tenantId); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index 121c832f3..f6777fdb9 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -241,14 +241,14 @@ public class ProcessInstanceServiceTest { //project auth success ProcessInstance processInstance = getProcessInstance(); - processInstance.setProcessDefinitionId(46); putMsg(result, Status.SUCCESS, projectName); Project project = getProject(projectName); ProcessDefinition processDefinition = getProcessDefinition(); when(projectMapper.queryByName(projectName)).thenReturn(project); when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); when(processService.findProcessInstanceDetailById(processInstance.getId())).thenReturn(processInstance); - when(processService.findProcessDefineById(processInstance.getProcessDefinitionId())).thenReturn(processDefinition); + when(processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion())).thenReturn(processDefinition); Map successRes = processInstanceService.queryProcessInstanceById(loginUser, projectName, 1); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); @@ -395,7 +395,6 @@ public class ProcessInstanceServiceTest { Tenant tenant = new Tenant(); tenant.setId(1); tenant.setTenantCode("test_tenant"); - when(processService.findProcessDefineById(processInstance.getProcessDefinitionId())).thenReturn(processDefinition); when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant); when(processService.updateProcessInstance(processInstance)).thenReturn(1); when(processDefinitionService.checkProcessNodeList(Mockito.any(), eq(shellJson))).thenReturn(result); @@ -555,6 +554,8 @@ public class ProcessInstanceServiceTest { ProcessInstance processInstance = new ProcessInstance(); processInstance.setId(1); processInstance.setName("test_process_instance"); + processInstance.setProcessDefinitionCode(46L); + processInstance.setProcessDefinitionVersion(1); processInstance.setStartTime(new Date()); processInstance.setEndTime(new Date()); return processInstance; @@ -568,6 +569,7 @@ public class ProcessInstanceServiceTest { private ProcessDefinition getProcessDefinition() { ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setCode(46L); + processDefinition.setVersion(1); processDefinition.setId(46); processDefinition.setName("test_pdf"); processDefinition.setProjectId(2); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index 83ecadf76..1eba3234e 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -48,13 +48,6 @@ public class ProcessInstance { @TableId(value = "id", type = IdType.AUTO) private int id; - /** - * process definition id - * TODO delete - */ - @TableField(exist = false) - private int processDefinitionId; - /** * process definition code */ @@ -290,14 +283,6 @@ public class ProcessInstance { this.id = id; } - public int getProcessDefinitionId() { - return processDefinitionId; - } - - public void setProcessDefinitionId(int processDefinitionId) { - this.processDefinitionId = processDefinitionId; - } - public ExecutionStatus getState() { return state; } @@ -616,7 +601,6 @@ public class ProcessInstance { public String toString() { return "ProcessInstance{" + "id=" + id - + ", processDefinitionId=" + processDefinitionId + ", state=" + state + ", recovery=" + recovery + ", startTime=" + startTime diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java index 6ff1778ed..1232e7b56 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java @@ -275,7 +275,6 @@ public class ProcessInstanceMapperTest { processDefinitionMapper.insert(processDefinition); ProcessInstance processInstance = insertOne(); - processInstance.setProcessDefinitionId(processDefinition.getId()); int update = processInstanceMapper.updateById(processInstance); Long[] projectCodes = new Long[]{processDefinition.getProjectCode()}; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index b9fab4d26..d89ded788 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -258,7 +258,7 @@ public class MasterExecThread implements Runnable { processService.saveProcessInstance(processInstance); // get schedules - int processDefinitionId = processInstance.getProcessDefinitionId(); + int processDefinitionId = processInstance.getProcessDefinition().getId(); List schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId); List listDate = Lists.newLinkedList(); if (!CollectionUtils.isEmpty(schedules)) { @@ -268,7 +268,7 @@ public class MasterExecThread implements Runnable { } // get first fire date Iterator iterator = null; - Date scheduleDate = null; + Date scheduleDate; if (!CollectionUtils.isEmpty(listDate)) { iterator = listDate.iterator(); scheduleDate = iterator.next(); @@ -282,9 +282,7 @@ public class MasterExecThread implements Runnable { } while (Stopper.isRunning()) { - - logger.info("process {} start to complement {} data", - processInstance.getId(), DateUtils.dateToString(scheduleDate)); + logger.info("process {} start to complement {} data", processInstance.getId(), DateUtils.dateToString(scheduleDate)); // prepare dag and other info prepareProcess(); @@ -302,8 +300,7 @@ public class MasterExecThread implements Runnable { endProcess(); // process instance failure ,no more complements if (!processInstance.getState().typeIsSuccess()) { - logger.info("process {} state {}, complement not completely!", - processInstance.getId(), processInstance.getState()); + logger.info("process {} state {}, complement not completely!", processInstance.getId(), processInstance.getState()); break; } // current process instance success ,next execute diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index f19bfa220..d7c5e529b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -347,8 +347,7 @@ public class ZKMasterClient extends AbstractZKClient { logger.info("failover process list size:{} ", needFailoverProcessInstanceList.size()); //updateProcessInstance host is null and insert into command for (ProcessInstance processInstance : needFailoverProcessInstanceList) { - logger.info("failover process instance id: {} host:{}", - processInstance.getId(), processInstance.getHost()); + logger.info("failover process instance id: {} host:{}", processInstance.getId(), processInstance.getHost()); if (Constants.NULL.equals(processInstance.getHost())) { continue; } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java index 66f9d4515..6837a8727 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java @@ -162,7 +162,6 @@ public class ConditionsTaskTest { private ProcessInstance getProcessInstance() { ProcessInstance processInstance = new ProcessInstance(); processInstance.setId(1000); - processInstance.setProcessDefinitionId(1000); processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); return processInstance; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java index 17aa05e7a..abe841fc4 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java @@ -145,7 +145,7 @@ public class DependentTaskTest { public void testBasicSuccess() throws Exception { testBasicInit(); ProcessInstance dependentProcessInstance = - getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE); + getProcessInstanceForFindLastRunningProcess(200, ExecutionStatus.FAILURE); // for DependentExecute.findLastProcessInterval Mockito.when(processService .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any())) @@ -168,7 +168,7 @@ public class DependentTaskTest { public void testBasicFailure() throws Exception { testBasicInit(); ProcessInstance dependentProcessInstance = - getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS); + getProcessInstanceForFindLastRunningProcess(200, ExecutionStatus.SUCCESS); // for DependentExecute.findLastProcessInterval Mockito.when(processService .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any())) @@ -219,9 +219,9 @@ public class DependentTaskTest { setupTaskInstance(taskNode); ProcessInstance processInstance200 = - getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE); + getProcessInstanceForFindLastRunningProcess(200, ExecutionStatus.FAILURE); ProcessInstance processInstance300 = - getProcessInstanceForFindLastRunningProcess(300, 3, ExecutionStatus.SUCCESS); + getProcessInstanceForFindLastRunningProcess(300, ExecutionStatus.SUCCESS); // for DependentExecute.findLastProcessInterval Mockito.when(processService @@ -276,7 +276,7 @@ public class DependentTaskTest { // for DependentExecute.findLastProcessInterval Mockito.when(processService .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any())) - .thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS)); + .thenReturn(getProcessInstanceForFindLastRunningProcess(200, ExecutionStatus.SUCCESS)); DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance); taskExecThread.call(); @@ -289,7 +289,7 @@ public class DependentTaskTest { // for DependentExecute.findLastProcessInterval Mockito.when(processService .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any())) - .thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE)); + .thenReturn(getProcessInstanceForFindLastRunningProcess(200, ExecutionStatus.FAILURE)); DependentTaskExecThread dependentTask = new DependentTaskExecThread(taskInstance); dependentTask.call(); @@ -323,7 +323,7 @@ public class DependentTaskTest { setupTaskInstance(taskNode); ProcessInstance dependentProcessInstance = - getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.RUNNING_EXECUTION); + getProcessInstanceForFindLastRunningProcess(200, ExecutionStatus.RUNNING_EXECUTION); // for DependentExecute.findLastProcessInterval Mockito.when(processService .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any())) @@ -349,7 +349,6 @@ public class DependentTaskTest { private ProcessInstance getProcessInstance(int processInstanceId, int processDefinitionId) { ProcessInstance processInstance = new ProcessInstance(); processInstance.setId(processInstanceId); - processInstance.setProcessDefinitionId(processDefinitionId); processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); return processInstance; } @@ -403,12 +402,9 @@ public class DependentTaskTest { return dependentItem; } - private ProcessInstance getProcessInstanceForFindLastRunningProcess( - int processInstanceId, int processDefinitionId, ExecutionStatus state - ) { + private ProcessInstance getProcessInstanceForFindLastRunningProcess(int processInstanceId, ExecutionStatus state) { ProcessInstance processInstance = new ProcessInstance(); processInstance.setId(processInstanceId); - processInstance.setProcessDefinitionId(processDefinitionId); processInstance.setState(state); return processInstance; } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java index cdea2526a..34522d446 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java @@ -88,7 +88,6 @@ public class MasterExecThreadTest { Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); processInstance = mock(ProcessInstance.class); - Mockito.when(processInstance.getProcessDefinitionId()).thenReturn(processDefinitionId); Mockito.when(processInstance.getState()).thenReturn(ExecutionStatus.SUCCESS); Mockito.when(processInstance.getHistoryCmd()).thenReturn(CommandType.COMPLEMENT_DATA.toString()); Mockito.when(processInstance.getIsSubProcess()).thenReturn(Flag.NO); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java index 4ddc6f426..3912d2e5a 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java @@ -130,7 +130,6 @@ public class SubProcessTaskTest { private ProcessInstance getProcessInstance() { ProcessInstance processInstance = new ProcessInstance(); processInstance.setId(100); - processInstance.setProcessDefinitionId(1); processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); return processInstance; @@ -139,7 +138,6 @@ public class SubProcessTaskTest { private ProcessInstance getSubProcessInstance(ExecutionStatus executionStatus) { ProcessInstance processInstance = new ProcessInstance(); processInstance.setId(102); - processInstance.setProcessDefinitionId(2); processInstance.setState(executionStatus); return processInstance; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index d05a0a52e..dcf850267 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -236,7 +236,7 @@ public class ProcessService { processInstance.addHistoryCmd(command.getCommandType()); saveProcessInstance(processInstance); this.setSubProcessParam(processInstance); - delCommandById(command.getId()); + this.commandMapper.deleteById(command.getId()); return processInstance; } @@ -250,7 +250,7 @@ public class ProcessService { public void moveToErrorCommand(Command command, String message) { ErrorCommand errorCommand = new ErrorCommand(command, message); this.errorCommandMapper.insert(errorCommand); - delCommandById(command.getId()); + this.commandMapper.deleteById(command.getId()); } /** @@ -538,7 +538,7 @@ public class ProcessService { processInstance.getTaskDependType(), processInstance.getFailureStrategy(), processInstance.getExecutorId(), - processInstance.getProcessDefinitionId(), + processInstance.getProcessDefinition().getId(), JSONUtils.toJsonString(cmdParam), processInstance.getWarningType(), processInstance.getWarningGroupId(), @@ -600,7 +600,7 @@ public class ProcessService { processInstance.setStartTime(new Date()); processInstance.setRunTimes(1); processInstance.setMaxTryTimes(0); - processInstance.setProcessDefinitionId(command.getProcessDefinitionId()); + //processInstance.setProcessDefinitionId(command.getProcessDefinitionId()); processInstance.setCommandParam(command.getCommandParam()); processInstance.setCommandType(command.getCommandType()); processInstance.setIsSubProcess(Flag.NO); @@ -719,7 +719,6 @@ public class ProcessService { * @return process instance */ private ProcessInstance constructProcessInstance(Command command, String host) { - ProcessInstance processInstance; CommandType commandType = command.getCommandType(); Map cmdParam = JSONUtils.toMap(command.getCommandParam()); @@ -764,16 +763,13 @@ public class ProcessService { } // Recalculate global parameters after rerun. - processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamList(), commandTypeIfComplement, processInstance.getScheduleTime())); + processInstance.setProcessDefinition(processDefinition); } - processDefinition = processDefineMapper.selectById(processInstance.getProcessDefinitionId()); - processInstance.setProcessDefinition(processDefinition); - //reset command parameter if (processInstance.getCommandParam() != null) { Map processCmdParam = JSONUtils.toMap(processInstance.getCommandParam()); @@ -1345,25 +1341,12 @@ public class ProcessService { return true; } - /** - * create a new process instance - * - * @param processInstance processInstance - */ - public void createProcessInstance(ProcessInstance processInstance) { - - if (processInstance != null) { - processInstanceMapper.insert(processInstance); - } - } - /** * insert or update work process instance to data base * * @param processInstance processInstance */ public void saveProcessInstance(ProcessInstance processInstance) { - if (processInstance == null) { logger.error("save error, process instance is null!"); return; @@ -1371,7 +1354,7 @@ public class ProcessService { if (processInstance.getId() != 0) { processInstanceMapper.updateById(processInstance); } else { - createProcessInstance(processInstance); + processInstanceMapper.insert(processInstance); } } @@ -1425,15 +1408,6 @@ public class ProcessService { return count > 0; } - /** - * delete a command by id - * - * @param id id - */ - public void delCommandById(int id) { - commandMapper.deleteById(id); - } - /** * find task instance by id * @@ -1772,9 +1746,11 @@ public class ProcessService { processInstance.setHost(Constants.NULL); processInstanceMapper.updateById(processInstance); + ProcessDefinition processDefinition = findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); + //2 insert into recover command Command cmd = new Command(); - cmd.setProcessDefinitionId(processInstance.getProcessDefinitionId()); + cmd.setProcessDefinitionId(processDefinition.getId()); cmd.setCommandParam(String.format("{\"%s\":%d}", Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId())); cmd.setExecutorId(processInstance.getExecutorId()); cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 577432277..9c4b8c4f4 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -109,7 +109,6 @@ public class ProcessServiceTest { public void testCreateSubCommand() { ProcessService processService = new ProcessService(); ProcessInstance parentInstance = new ProcessInstance(); - parentInstance.setProcessDefinitionId(1); parentInstance.setWarningType(WarningType.SUCCESS); parentInstance.setWarningGroupId(0);