improve test of DAGhelper
parent
d2a43b0c25
commit
3eeea22671
|
|
@ -265,7 +265,7 @@ public class ExecutorService2Test {
|
|||
.thenReturn(true);
|
||||
|
||||
Map<String, Object> result = executorService.execute(loginUser, projectName, processInstanceId, ExecuteType.RESUME_FROM_FORCED_SUCCESS);
|
||||
Assert.assertEquals(result.get(Constants.STATUS), Status.SUCCESS);
|
||||
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
|
||||
verify(processService, times(1)).createCommand(any(Command.class));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
|
@ -278,13 +279,7 @@ public class DagHelper {
|
|||
if (StringUtils.isNotEmpty(parentNodeName)) {
|
||||
TaskNode task = dag.getNode(parentNodeName);
|
||||
if (task.isConditionsTask() && completeTaskList.containsKey(parentNodeName)) {
|
||||
ConditionsParameters conditionsParameters = JSONUtils.parseObject(task.getConditionResult(), ConditionsParameters.class);
|
||||
TaskInstance taskInstance = completeTaskList.get(parentNodeName);
|
||||
if (taskInstance.getState().typeIsSuccess()) {
|
||||
startVertexs = conditionsParameters.getSuccessNode();
|
||||
} else if (taskInstance.getState().typeIsFailure()) {
|
||||
startVertexs = conditionsParameters.getFailedNode();
|
||||
}
|
||||
startVertexs = parseConditionTask(parentNodeName, task, completeTaskList);
|
||||
}
|
||||
else {
|
||||
startVertexs = dag.getSubsequentNodes(parentNodeName);
|
||||
|
|
@ -293,7 +288,7 @@ public class DagHelper {
|
|||
startVertexs = dag.getBeginNode();
|
||||
}
|
||||
|
||||
List<String> tmpStartVertexs = new ArrayList<>();
|
||||
Set<String> tmpStartVertexs = new HashSet<>();
|
||||
if (startVertexs != null) {
|
||||
tmpStartVertexs.addAll(startVertexs);
|
||||
}
|
||||
|
|
@ -317,6 +312,20 @@ public class DagHelper {
|
|||
return tmpStartVertexs;
|
||||
}
|
||||
|
||||
/**
|
||||
* parse condition post nodes
|
||||
*/
|
||||
private static Collection<String> parseConditionTask(String parentNodeName, TaskNode task, Map<String, TaskInstance> completeTaskList) {
|
||||
ConditionsParameters conditionsParameters = JSONUtils.parseObject(task.getConditionResult(), ConditionsParameters.class);
|
||||
TaskInstance taskInstance = completeTaskList.get(parentNodeName);
|
||||
if (taskInstance.getState().typeIsSuccess()) {
|
||||
return conditionsParameters.getSuccessNode();
|
||||
} else if (taskInstance.getState().typeIsFailure()) {
|
||||
return conditionsParameters.getFailedNode();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* the task can be submit when all the depends nodes are forbidden or complete
|
||||
* @param taskNode taskNode
|
||||
|
|
|
|||
|
|
@ -19,7 +19,9 @@ package org.apache.dolphinscheduler.dao.utils;
|
|||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
|
||||
import org.apache.dolphinscheduler.common.enums.TaskDependType;
|
||||
import org.apache.dolphinscheduler.common.enums.TaskType;
|
||||
import org.apache.dolphinscheduler.common.graph.DAG;
|
||||
import org.apache.dolphinscheduler.common.model.TaskNode;
|
||||
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
|
||||
|
|
@ -34,6 +36,7 @@ import java.util.ArrayList;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* dag helper test
|
||||
|
|
@ -79,6 +82,7 @@ public class DagHelperTest {
|
|||
/**
|
||||
* 1->2->3->5
|
||||
* 4->3
|
||||
* 3->6
|
||||
* @return dag
|
||||
* @throws JsonProcessingException if error throws JsonProcessingException
|
||||
*/
|
||||
|
|
@ -120,6 +124,14 @@ public class DagHelperTest {
|
|||
node5.setDepList(dep5);
|
||||
taskNodeList.add(node5);
|
||||
|
||||
TaskNode node6 = new TaskNode();
|
||||
node6.setId("6");
|
||||
node6.setName("6");
|
||||
List<String> dep6 = new ArrayList<>();
|
||||
dep6.add("3");
|
||||
node6.setDepList(dep6);
|
||||
taskNodeList.add(node6);
|
||||
|
||||
List<String> startNodes = new ArrayList<>();
|
||||
List<String> recoveryNodes = new ArrayList<>();
|
||||
List<TaskNode> destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList,
|
||||
|
|
@ -148,4 +160,28 @@ public class DagHelperTest {
|
|||
Assert.assertNotNull(dag);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetStartVertex() throws JsonProcessingException {
|
||||
// let 1->2->3->5, 3->5, 4->3
|
||||
DAG<String, TaskNode, TaskNodeRelation> dag = generateDag();
|
||||
|
||||
// test when completeList is null
|
||||
Assert.assertEquals(2, DagHelper.getStartVertex(null, dag, null).size());
|
||||
Assert.assertEquals(1, DagHelper.getStartVertex("1", dag, null).size());
|
||||
|
||||
// test when 3 is CONDITIONS and 1,2,3,4 all completed
|
||||
Map<String, TaskInstance> completeTaskList = new ConcurrentHashMap<>();
|
||||
completeTaskList.putIfAbsent("1", new TaskInstance());
|
||||
completeTaskList.putIfAbsent("2", new TaskInstance());
|
||||
completeTaskList.putIfAbsent("4", new TaskInstance());
|
||||
TaskInstance task3 = new TaskInstance();
|
||||
task3.setState(ExecutionStatus.SUCCESS);
|
||||
completeTaskList.putIfAbsent("3", task3);
|
||||
|
||||
dag.getNode("3").setConditionResult("{\"successNode\":[\"5\"],\"failedNode\":[\"6\"]}");
|
||||
dag.getNode("3").setType(TaskType.CONDITIONS.toString());
|
||||
|
||||
Assert.assertEquals(1, DagHelper.getStartVertex(null, dag, completeTaskList).size());
|
||||
Assert.assertEquals(true, DagHelper.getStartVertex(null, dag, completeTaskList).contains("5"));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue