diff --git a/dolphinscheduler-api/src/main/resources/logback-api.xml b/dolphinscheduler-api/src/main/resources/logback-api.xml
index 2df90d839..c1142ed8e 100644
--- a/dolphinscheduler-api/src/main/resources/logback-api.xml
+++ b/dolphinscheduler-api/src/main/resources/logback-api.xml
@@ -55,7 +55,7 @@
-
+
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
index 1133cadbe..e2da27d85 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
+import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
@@ -238,52 +239,6 @@ public class DagHelper {
return null;
}
-
- /**
- * get start vertex in one dag
- * it would find the post node if the start vertex is forbidden running
- * @param parentNodeName previous node
- * @param dag dag
- * @param completeTaskList completeTaskList
- * @return start Vertex list
- */
- public static Collection getStartVertex(String parentNodeName, DAG dag,
- Map completeTaskList){
-
- if(completeTaskList == null){
- completeTaskList = new HashMap<>();
- }
- Collection startVertexs = null;
- if(StringUtils.isNotEmpty(parentNodeName)){
- startVertexs = dag.getSubsequentNodes(parentNodeName);
- }else{
- startVertexs = dag.getBeginNode();
- }
-
- List tmpStartVertexs = new ArrayList<>();
- if(startVertexs!= null){
- tmpStartVertexs.addAll(startVertexs);
- }
-
- for(String start : startVertexs){
- TaskNode startNode = dag.getNode(start);
- if(!startNode.isForbidden() && !completeTaskList.containsKey(start)){
- // the start can be submit if not forbidden and not in complete tasks
- continue;
- }
- // then submit the post nodes
- Collection postNodes = getStartVertex(start, dag, completeTaskList);
- for(String post : postNodes){
- TaskNode postNode = dag.getNode(post);
- if(taskNodeCanSubmit(postNode, dag, completeTaskList)){
- tmpStartVertexs.add(post);
- }
- }
- tmpStartVertexs.remove(start);
- }
- return tmpStartVertexs;
- }
-
/**
* the task can be submit when all the depends nodes are forbidden or complete
* @param taskNode taskNode
@@ -291,24 +246,145 @@ public class DagHelper {
* @param completeTaskList completeTaskList
* @return can submit
*/
- public static boolean taskNodeCanSubmit(TaskNode taskNode,
- DAG dag,
- Map completeTaskList) {
-
+ public static boolean allDependsForbiddenOrEnd(TaskNode taskNode,
+ DAG dag,
+ Map skipTaskNodeList,
+ Map completeTaskList) {
List dependList = taskNode.getDepList();
- if(dependList == null){
+ if (dependList == null) {
return true;
}
-
- for(String dependNodeName : dependList){
+ for (String dependNodeName : dependList) {
TaskNode dependNode = dag.getNode(dependNodeName);
- if(!dependNode.isForbidden() && !completeTaskList.containsKey(dependNodeName)){
+ if (completeTaskList.containsKey(dependNodeName)
+ || dependNode.isForbidden()
+ || skipTaskNodeList.containsKey(dependNodeName)) {
+ continue;
+ } else {
return false;
}
}
return true;
}
+ /**
+ * parse the successor nodes of previous node.
+ * this function parse the condition node to find the right branch.
+ * also check all the depends nodes forbidden or complete
+ * @param preNodeName
+ * @return successor nodes
+ */
+ public static Set parsePostNodes(String preNodeName,
+ Map skipTaskNodeList,
+ DAG dag,
+ Map completeTaskList) {
+ Set postNodeList = new HashSet<>();
+ Collection startVertexes = new ArrayList<>();
+ if (preNodeName == null) {
+ startVertexes = dag.getBeginNode();
+ } else if (dag.getNode(preNodeName).isConditionsTask()) {
+ List conditionTaskList = parseConditionTask(preNodeName, skipTaskNodeList, dag, completeTaskList);
+ startVertexes.addAll(conditionTaskList);
+ } else {
+ startVertexes = dag.getSubsequentNodes(preNodeName);
+ }
+ for (String subsequent : startVertexes) {
+ TaskNode taskNode = dag.getNode(subsequent);
+ if (isTaskNodeNeedSkip(taskNode, skipTaskNodeList)) {
+ setTaskNodeSkip(subsequent, dag, completeTaskList, skipTaskNodeList );
+ continue;
+ }
+ if (!DagHelper.allDependsForbiddenOrEnd(taskNode, dag, skipTaskNodeList, completeTaskList)) {
+ continue;
+ }
+ if (taskNode.isForbidden() || completeTaskList.containsKey(subsequent)) {
+ postNodeList.addAll(parsePostNodes(subsequent, skipTaskNodeList, dag, completeTaskList));
+ continue;
+ }
+ postNodeList.add(subsequent);
+ }
+ return postNodeList;
+ }
+
+ /**
+ * if all of the task dependence are skipped, skip it too.
+ * @param taskNode
+ * @return
+ */
+ private static boolean isTaskNodeNeedSkip(TaskNode taskNode,
+ Map skipTaskNodeList
+ ){
+ if(CollectionUtils.isEmpty(taskNode.getDepList())){
+ return false;
+ }
+ for(String depNode : taskNode.getDepList()){
+ if(!skipTaskNodeList.containsKey(depNode)){
+ return false;
+ }
+ }
+ return true;
+ }
+
+
+ /**
+ * parse condition task find the branch process
+ * set skip flag for another one.
+ * @param nodeName
+ * @return
+ */
+ public static List parseConditionTask(String nodeName,
+ Map skipTaskNodeList,
+ DAG dag,
+ Map completeTaskList){
+ List conditionTaskList = new ArrayList<>();
+ TaskNode taskNode = dag.getNode(nodeName);
+ if (!taskNode.isConditionsTask()){
+ return conditionTaskList;
+ }
+ if (!completeTaskList.containsKey(nodeName)){
+ return conditionTaskList;
+ }
+ TaskInstance taskInstance = completeTaskList.get(nodeName);
+ ConditionsParameters conditionsParameters =
+ JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class);
+ List skipNodeList = new ArrayList<>();
+ if(taskInstance.getState().typeIsSuccess()){
+ conditionTaskList = conditionsParameters.getSuccessNode();
+ skipNodeList = conditionsParameters.getFailedNode();
+ }else if(taskInstance.getState().typeIsFailure()){
+ conditionTaskList = conditionsParameters.getFailedNode();
+ skipNodeList = conditionsParameters.getSuccessNode();
+ }else{
+ conditionTaskList.add(nodeName);
+ }
+ for(String failedNode : skipNodeList){
+ setTaskNodeSkip(failedNode, dag, completeTaskList, skipTaskNodeList);
+ }
+ return conditionTaskList;
+ }
+
+ /**
+ * set task node and the post nodes skip flag
+ * @param skipNodeName
+ * @param dag
+ * @param completeTaskList
+ * @param skipTaskNodeList
+ */
+ private static void setTaskNodeSkip(String skipNodeName,
+ DAG dag,
+ Map completeTaskList,
+ Map skipTaskNodeList){
+ skipTaskNodeList.putIfAbsent(skipNodeName, dag.getNode(skipNodeName));
+ Collection postNodeList = dag.getSubsequentNodes(skipNodeName);
+ for(String post : postNodeList){
+ TaskNode postNode = dag.getNode(post);
+ if(isTaskNodeNeedSkip(postNode, skipTaskNodeList)){
+ setTaskNodeSkip(post, dag, completeTaskList, skipTaskNodeList);
+ }
+ }
+ }
+
+
/***
* build dag graph
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
index 95c7d2f08..a1ec7b52c 100644
--- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.dao.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
@@ -27,6 +28,7 @@ import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+
import org.junit.Assert;
import org.junit.Test;
@@ -34,6 +36,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+
/**
* dag helper test
@@ -41,15 +45,17 @@ import java.util.Map;
public class DagHelperTest {
/**
* test task node can submit
+ *
* @throws JsonProcessingException if error throws JsonProcessingException
*/
@Test
public void testTaskNodeCanSubmit() throws JsonProcessingException {
- //1->2->3->5
- //4->3
+ //1->2->3->5->7
+ //4->3->6
DAG dag = generateDag();
TaskNode taskNode3 = dag.getNode("3");
- Map completeTaskList = new HashMap<>();
+ Map completeTaskList = new HashMap<>();
+ Map skipNodeList = new HashMap<>();
completeTaskList.putIfAbsent("1", new TaskInstance());
Boolean canSubmit = false;
@@ -58,27 +64,199 @@ public class DagHelperTest {
node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
TaskNode nodex = dag.getNode("4");
nodex.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
- canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList);
+ canSubmit = DagHelper.allDependsForbiddenOrEnd(taskNode3, dag, skipNodeList, completeTaskList);
Assert.assertEquals(canSubmit, true);
// 2forbidden, 3 cannot be submit
completeTaskList.putIfAbsent("2", new TaskInstance());
TaskNode nodey = dag.getNode("4");
nodey.setRunFlag("");
- canSubmit = DagHelper.taskNodeCanSubmit(taskNode3, dag, completeTaskList);
+ canSubmit = DagHelper.allDependsForbiddenOrEnd(taskNode3, dag, skipNodeList, completeTaskList);
Assert.assertEquals(canSubmit, false);
// 2/3 forbidden submit 5
TaskNode node3 = dag.getNode("3");
node3.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
+ TaskNode node8 = dag.getNode("8");
+ node8.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
TaskNode node5 = dag.getNode("5");
- canSubmit = DagHelper.taskNodeCanSubmit(node5, dag, completeTaskList);
+ canSubmit = DagHelper.allDependsForbiddenOrEnd(node5, dag, skipNodeList, completeTaskList);
Assert.assertEquals(canSubmit, true);
- }
+ }
/**
- * 1->2->3->5
- * 4->3
+ * test parse post node list
+ */
+ @Test
+ public void testParsePostNodeList() throws JsonProcessingException {
+ DAG dag = generateDag();
+ Map completeTaskList = new HashMap<>();
+ Map skipNodeList = new HashMap<>();
+
+ Set postNodes = null;
+ //complete : null
+ // expect post: 1/4
+ postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
+ Assert.assertEquals(2, postNodes.size());
+ Assert.assertTrue(postNodes.contains("1"));
+ Assert.assertTrue(postNodes.contains("4"));
+
+ //complete : 1
+ // expect post: 2/4
+ completeTaskList.put("1", new TaskInstance());
+ postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
+ Assert.assertEquals(2, postNodes.size());
+ Assert.assertTrue(postNodes.contains("2"));
+ Assert.assertTrue(postNodes.contains("4"));
+
+ // complete : 1/2
+ // expect post: 4
+ completeTaskList.put("2", new TaskInstance());
+ postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
+ Assert.assertEquals(2, postNodes.size());
+ Assert.assertTrue(postNodes.contains("4"));
+ Assert.assertTrue(postNodes.contains("8"));
+
+ // complete : 1/2/4
+ // expect post: 3
+ completeTaskList.put("4", new TaskInstance());
+ postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
+ Assert.assertEquals(2, postNodes.size());
+ Assert.assertTrue(postNodes.contains("3"));
+ Assert.assertTrue(postNodes.contains("8"));
+
+ // complete : 1/2/4/3
+ // expect post: 8/6
+ completeTaskList.put("3", new TaskInstance());
+ postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
+ Assert.assertEquals(2, postNodes.size());
+ Assert.assertTrue(postNodes.contains("8"));
+ Assert.assertTrue(postNodes.contains("6"));
+
+ // complete : 1/2/4/3/8
+ // expect post: 6/5
+ completeTaskList.put("8", new TaskInstance());
+ postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
+ Assert.assertEquals(2, postNodes.size());
+ Assert.assertTrue(postNodes.contains("5"));
+ Assert.assertTrue(postNodes.contains("6"));
+ // complete : 1/2/4/3/5/6/8
+ // expect post: 7
+ completeTaskList.put("6", new TaskInstance());
+ completeTaskList.put("5", new TaskInstance());
+ postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
+ Assert.assertEquals(1, postNodes.size());
+ Assert.assertTrue(postNodes.contains("7"));
+ }
+
+ /**
+ * test forbidden post node
+ * @throws JsonProcessingException
+ */
+ @Test
+ public void testForbiddenPostNode() throws JsonProcessingException {
+ DAG dag = generateDag();
+ Map completeTaskList = new HashMap<>();
+ Map skipNodeList = new HashMap<>();
+ Set postNodes = null;
+ // dag: 1-2-3-5-7 4-3-6 2-8-5-7
+ // forbid:2 complete:1 post:4/8
+ completeTaskList.put("1", new TaskInstance());
+ TaskNode node2 = dag.getNode("2");
+ node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
+ postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
+ Assert.assertEquals(2, postNodes.size());
+ Assert.assertTrue(postNodes.contains("4"));
+ Assert.assertTrue(postNodes.contains("8"));
+
+ //forbid:2/4 complete:1 post:3/8
+ TaskNode node4 = dag.getNode("4");
+ node4.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
+ postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
+ Assert.assertEquals(2, postNodes.size());
+ Assert.assertTrue(postNodes.contains("3"));
+ Assert.assertTrue(postNodes.contains("8"));
+
+ //forbid:2/4/5 complete:1/8 post:3
+ completeTaskList.put("8", new TaskInstance());
+ TaskNode node5 = dag.getNode("5");
+ node5.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
+ postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
+ Assert.assertEquals(1, postNodes.size());
+ Assert.assertTrue(postNodes.contains("3"));
+ }
+
+ /**
+ * test condition post node
+ * @throws JsonProcessingException
+ */
+ @Test
+ public void testConditionPostNode() throws JsonProcessingException {
+ DAG dag = generateDag();
+ Map completeTaskList = new HashMap<>();
+ Map skipNodeList = new HashMap<>();
+ Set postNodes = null;
+ // dag: 1-2-3-5-7 4-3-6 2-8-5-7
+ // 3-if
+ completeTaskList.put("1", new TaskInstance());
+ completeTaskList.put("2", new TaskInstance());
+ completeTaskList.put("4", new TaskInstance());
+ TaskNode node3 = dag.getNode("3");
+ node3.setType("CONDITIONS");
+ node3.setConditionResult("{\n" +
+ " \"successNode\": [5\n" +
+ " ],\n" +
+ " \"failedNode\": [6\n" +
+ " ]\n" +
+ " }");
+ completeTaskList.remove("3");
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setState(ExecutionStatus.SUCCESS);
+ //complete 1/2/3/4 expect:8
+ completeTaskList.put("3", taskInstance);
+ postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
+ Assert.assertEquals(1, postNodes.size());
+ Assert.assertTrue(postNodes.contains("8"));
+
+ //2.complete 1/2/3/4/8 expect:5 skip:6
+ completeTaskList.put("8", new TaskInstance());
+ postNodes = DagHelper.parsePostNodes(null ,skipNodeList, dag, completeTaskList);
+ Assert.assertTrue(postNodes.contains("5"));
+ Assert.assertEquals(1, skipNodeList.size());
+ Assert.assertTrue(skipNodeList.containsKey("6"));
+
+ // 3.complete 1/2/3/4/5/8 expect post:7 skip:6
+ skipNodeList.clear();
+ TaskInstance taskInstance1 = new TaskInstance();
+ taskInstance.setState(ExecutionStatus.SUCCESS);
+ completeTaskList.put("5", taskInstance1);
+ postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
+ Assert.assertEquals(1, postNodes.size());
+ Assert.assertTrue(postNodes.contains("7"));
+ Assert.assertEquals(1, skipNodeList.size());
+ Assert.assertTrue(skipNodeList.containsKey("6"));
+
+ // dag: 1-2-3-5-7 4-3-6
+ // 3-if , complete:1/2/3/4
+ // 1.failure:3 expect post:6 skip:5/7
+ skipNodeList.clear();
+ completeTaskList.remove("3");
+ taskInstance = new TaskInstance();
+ taskInstance.setState(ExecutionStatus.FAILURE);
+ completeTaskList.put("3", taskInstance);
+ postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
+ Assert.assertEquals(1, postNodes.size());
+ Assert.assertTrue(postNodes.contains("6"));
+ Assert.assertEquals(2, skipNodeList.size());
+ Assert.assertTrue(skipNodeList.containsKey("5"));
+ Assert.assertTrue(skipNodeList.containsKey("7"));
+ }
+
+ /**
+ * 1->2->3->5->7
+ * 4->3->6
+ * 2->8->5->7
+ *
* @return dag
* @throws JsonProcessingException if error throws JsonProcessingException
*/
@@ -87,11 +265,13 @@ public class DagHelperTest {
TaskNode node1 = new TaskNode();
node1.setId("1");
node1.setName("1");
+ node1.setType("SHELL");
taskNodeList.add(node1);
TaskNode node2 = new TaskNode();
node2.setId("2");
node2.setName("2");
+ node2.setType("SHELL");
List dep2 = new ArrayList<>();
dep2.add("1");
node2.setDepList(dep2);
@@ -101,11 +281,13 @@ public class DagHelperTest {
TaskNode node4 = new TaskNode();
node4.setId("4");
node4.setName("4");
+ node4.setType("SHELL");
taskNodeList.add(node4);
TaskNode node3 = new TaskNode();
node3.setId("3");
node3.setName("3");
+ node3.setType("SHELL");
List dep3 = new ArrayList<>();
dep3.add("2");
dep3.add("4");
@@ -115,20 +297,48 @@ public class DagHelperTest {
TaskNode node5 = new TaskNode();
node5.setId("5");
node5.setName("5");
+ node5.setType("SHELL");
List dep5 = new ArrayList<>();
dep5.add("3");
+ dep5.add("8");
node5.setDepList(dep5);
taskNodeList.add(node5);
+ TaskNode node6 = new TaskNode();
+ node6.setId("6");
+ node6.setName("6");
+ node6.setType("SHELL");
+ List dep6 = new ArrayList<>();
+ dep6.add("3");
+ node6.setDepList(dep6);
+ taskNodeList.add(node6);
+
+ TaskNode node7 = new TaskNode();
+ node7.setId("7");
+ node7.setName("7");
+ node7.setType("SHELL");
+ List dep7 = new ArrayList<>();
+ dep7.add("5");
+ node7.setDepList(dep7);
+ taskNodeList.add(node7);
+
+ TaskNode node8 = new TaskNode();
+ node8.setId("8");
+ node8.setName("8");
+ node8.setType("SHELL");
+ List dep8 = new ArrayList<>();
+ dep8.add("2");
+ node8.setDepList(dep8);
+ taskNodeList.add(node8);
+
List startNodes = new ArrayList<>();
- List recoveryNodes = new ArrayList<>();
+ List recoveryNodes = new ArrayList<>();
List destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList,
startNodes, recoveryNodes, TaskDependType.TASK_POST);
- List taskNodeRelations =DagHelper.generateRelationListByFlowNodes(destTaskNodeList);
+ List taskNodeRelations = DagHelper.generateRelationListByFlowNodes(destTaskNodeList);
ProcessDag processDag = new ProcessDag();
processDag.setEdges(taskNodeRelations);
processDag.setNodes(destTaskNodeList);
-
return DagHelper.buildDagGraph(processDag);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
index 7e3950df1..1b65fbf06 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
@@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentHashMap;
public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
-
/**
* dependent parameters
*/
@@ -131,7 +130,6 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
this.dependentParameters = JSONUtils.parseObject(this.taskInstance.getDependency(), DependentParameters.class);
}
-
/**
* depend result for depend item
* @param item
@@ -155,5 +153,4 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
return dependResult;
}
-
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 4b626e85d..db53d0003 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -25,7 +25,6 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
-import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*;
@@ -39,6 +38,7 @@ import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -497,144 +497,22 @@ public class MasterExecThread implements Runnable {
return taskInstance;
}
-
-
- /**
- * if all of the task dependence are skip, skip it too.
- * @param taskNode
- * @return
- */
- private boolean isTaskNodeNeedSkip(TaskNode taskNode){
- if(CollectionUtils.isEmpty(taskNode.getDepList())){
- return false;
- }
- for(String depNode : taskNode.getDepList()){
- if(!skipTaskNodeList.containsKey(depNode)){
- return false;
- }
- }
- return true;
- }
-
- /**
- * set task node skip if dependence all skip
- * @param taskNodesSkipList
- */
- private void setTaskNodeSkip(List taskNodesSkipList){
- for(String skipNode : taskNodesSkipList){
- skipTaskNodeList.putIfAbsent(skipNode, dag.getNode(skipNode));
- Collection postNodeList = DagHelper.getStartVertex(skipNode, dag, completeTaskList);
- List postSkipList = new ArrayList<>();
- for(String post : postNodeList){
- TaskNode postNode = dag.getNode(post);
- if(isTaskNodeNeedSkip(postNode)){
- postSkipList.add(post);
- }
- }
- setTaskNodeSkip(postSkipList);
- }
- }
-
-
- /**
- * parse condition task find the branch process
- * set skip flag for another one.
- * @param nodeName
- * @return
- */
- private List parseConditionTask(String nodeName){
- List conditionTaskList = new ArrayList<>();
- TaskNode taskNode = dag.getNode(nodeName);
- if(!taskNode.isConditionsTask()){
- return conditionTaskList;
- }
- ConditionsParameters conditionsParameters =
- JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class);
-
- TaskInstance taskInstance = completeTaskList.get(nodeName);
- if(taskInstance == null){
- logger.error("task instance {} cannot find, please check it!", nodeName);
- return conditionTaskList;
- }
-
- if(taskInstance.getState().typeIsSuccess()){
- conditionTaskList = conditionsParameters.getSuccessNode();
- setTaskNodeSkip(conditionsParameters.getFailedNode());
- }else if(taskInstance.getState().typeIsFailure()){
- conditionTaskList = conditionsParameters.getFailedNode();
- setTaskNodeSkip(conditionsParameters.getSuccessNode());
- }else{
- conditionTaskList.add(nodeName);
- }
- return conditionTaskList;
- }
-
- /**
- * parse post node list of previous node
- * if condition node: return process according to the settings
- * if post node completed, return post nodes of the completed node
- * @param previousNodeName
- * @return
- */
- private List parsePostNodeList(String previousNodeName){
- List postNodeList = new ArrayList<>();
-
- TaskNode taskNode = dag.getNode(previousNodeName);
- if(taskNode != null && taskNode.isConditionsTask()){
- return parseConditionTask(previousNodeName);
- }
- Collection postNodeCollection = DagHelper.getStartVertex(previousNodeName, dag, completeTaskList);
- List postSkipList = new ArrayList<>();
- // delete success node, parse the past nodes
- // if conditions node,
- // 1. parse the branch process according the conditions setting
- // 2. set skip flag on anther branch process
- for(String postNode : postNodeCollection){
- if(completeTaskList.containsKey(postNode)){
- TaskInstance postTaskInstance = completeTaskList.get(postNode);
- if(dag.getNode(postNode).isConditionsTask()){
- List conditionTaskNodeList = parseConditionTask(postNode);
- for(String conditions : conditionTaskNodeList){
- postNodeList.addAll(parsePostNodeList(conditions));
- }
- }else if(postTaskInstance.getState().typeIsSuccess()){
- postNodeList.addAll(parsePostNodeList(postNode));
- }else{
- postNodeList.add(postNode);
- }
-
- }else if(isTaskNodeNeedSkip(dag.getNode(postNode))){
- postSkipList.add(postNode);
- setTaskNodeSkip(postSkipList);
- postSkipList.clear();
- }else{
- postNodeList.add(postNode);
- }
- }
- return postNodeList;
- }
-
/**
* submit post node
* @param parentNodeName parent node name
*/
private void submitPostNode(String parentNodeName){
-
- List submitTaskNodeList = parsePostNodeList(parentNodeName);
-
+ Set submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);
List taskInstances = new ArrayList<>();
for(String taskNode : submitTaskNodeList){
taskInstances.add(createTaskInstance(processInstance, taskNode,
dag.getNode(taskNode)));
}
-
// if previous node success , post node submit
for(TaskInstance task : taskInstances){
-
if(readyToSubmitTaskList.containsKey(task.getName())){
continue;
}
-
if(completeTaskList.containsKey(task.getName())){
logger.info("task {} has already run success", task.getName());
continue;
@@ -695,7 +573,7 @@ public class MasterExecThread implements Runnable {
private boolean dependTaskSuccess(String dependNodeName, String nextNodeName){
if(dag.getNode(dependNodeName).isConditionsTask()){
//condition task need check the branch to run
- List nextTaskList = parseConditionTask(dependNodeName);
+ List nextTaskList = DagHelper.parseConditionTask(dependNodeName, skipTaskNodeList, dag, completeTaskList);
if(!nextTaskList.contains(nextNodeName)){
return false;
}
@@ -708,7 +586,6 @@ public class MasterExecThread implements Runnable {
return true;
}
-
/**
* query task instance by complete state
* @param state state
diff --git a/dolphinscheduler-server/src/main/resources/logback-master.xml b/dolphinscheduler-server/src/main/resources/logback-master.xml
index 7410c01f0..2b986ddad 100644
--- a/dolphinscheduler-server/src/main/resources/logback-master.xml
+++ b/dolphinscheduler-server/src/main/resources/logback-master.xml
@@ -74,7 +74,6 @@
-
diff --git a/dolphinscheduler-server/src/main/resources/logback-worker.xml b/dolphinscheduler-server/src/main/resources/logback-worker.xml
index c2a578d16..f96bef9bb 100644
--- a/dolphinscheduler-server/src/main/resources/logback-worker.xml
+++ b/dolphinscheduler-server/src/main/resources/logback-worker.xml
@@ -75,7 +75,6 @@
-
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java
index d541f43a3..d402afcee 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterCommandTest.java
@@ -111,29 +111,4 @@ public class MasterCommandTest {
}
- @Test
- public void testDagHelper(){
-
- ProcessDefinition processDefinition = processDefinitionMapper.selectById(19);
-
- try {
- ProcessDag processDag = DagHelper.generateFlowDag(processDefinition.getProcessDefinitionJson(),
- new ArrayList<>(), new ArrayList<>(), TaskDependType.TASK_POST);
-
- DAG dag = DagHelper.buildDagGraph(processDag);
- Collection start = DagHelper.getStartVertex("1", dag, null);
-
- System.out.println(start.toString());
-
- Map forbidden = DagHelper.getForbiddenTaskNodeMaps(processDefinition.getProcessDefinitionJson());
- System.out.println(forbidden);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
-
-
-
-
}