[Fix-9717] The failure policy of the task flow takes effect (#9718)
* Failure policy takes effect. * Coverage on New Code * correct description logic * Compatible with all scenarios * clearer logic Co-authored-by: WangJPLeo <wangjipeng@whaleops.com>dev
parent
e6dade71bb
commit
7bcec7115a
|
|
@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.process.ProcessDag;
|
||||||
import org.apache.dolphinscheduler.common.utils.JSONUtils;
|
import org.apache.dolphinscheduler.common.utils.JSONUtils;
|
||||||
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
|
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
|
||||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
||||||
|
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
|
||||||
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
|
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
|
||||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
|
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
|
||||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
|
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
|
||||||
|
|
@ -39,6 +40,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.spi.utils.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|
@ -534,19 +536,7 @@ public class DagHelper {
|
||||||
public static boolean haveConditionsAfterNode(String parentNodeCode,
|
public static boolean haveConditionsAfterNode(String parentNodeCode,
|
||||||
DAG<String, TaskNode, TaskNodeRelation> dag
|
DAG<String, TaskNode, TaskNodeRelation> dag
|
||||||
) {
|
) {
|
||||||
boolean result = false;
|
return haveSubAfterNode(parentNodeCode, dag, TaskConstants.TASK_TYPE_CONDITIONS);
|
||||||
Set<String> subsequentNodes = dag.getSubsequentNodes(parentNodeCode);
|
|
||||||
if (CollectionUtils.isEmpty(subsequentNodes)) {
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
for (String nodeCode : subsequentNodes) {
|
|
||||||
TaskNode taskNode = dag.getNode(nodeCode);
|
|
||||||
List<String> preTasksList = JSONUtils.toList(taskNode.getPreTasks(), String.class);
|
|
||||||
if (preTasksList.contains(parentNodeCode) && taskNode.isConditionsTask()) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -565,19 +555,38 @@ public class DagHelper {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* is there have blocking node after the parent node
|
* is there have blocking node after the parent node
|
||||||
*/
|
*/
|
||||||
public static boolean haveBlockingAfterNode(String parentNodeCode,
|
public static boolean haveBlockingAfterNode(String parentNodeCode,
|
||||||
DAG<String,TaskNode,TaskNodeRelation> dag) {
|
DAG<String,TaskNode,TaskNodeRelation> dag) {
|
||||||
|
return haveSubAfterNode(parentNodeCode, dag, TaskConstants.TASK_TYPE_BLOCKING);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* is there have all node after the parent node
|
||||||
|
*/
|
||||||
|
public static boolean haveAllNodeAfterNode(String parentNodeCode,
|
||||||
|
DAG<String,TaskNode,TaskNodeRelation> dag) {
|
||||||
|
return haveSubAfterNode(parentNodeCode, dag, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether there is a specified type of child node after the parent node
|
||||||
|
*/
|
||||||
|
public static boolean haveSubAfterNode(String parentNodeCode,
|
||||||
|
DAG<String,TaskNode,TaskNodeRelation> dag, String filterNodeType) {
|
||||||
Set<String> subsequentNodes = dag.getSubsequentNodes(parentNodeCode);
|
Set<String> subsequentNodes = dag.getSubsequentNodes(parentNodeCode);
|
||||||
if (CollectionUtils.isEmpty(subsequentNodes)) {
|
if (CollectionUtils.isEmpty(subsequentNodes)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (StringUtils.isBlank(filterNodeType)){
|
||||||
|
return true;
|
||||||
|
}
|
||||||
for (String nodeName : subsequentNodes) {
|
for (String nodeName : subsequentNodes) {
|
||||||
TaskNode taskNode = dag.getNode(nodeName);
|
TaskNode taskNode = dag.getNode(nodeName);
|
||||||
List<String> preTaskList = JSONUtils.toList(taskNode.getPreTasks(),String.class);
|
if (taskNode.getType().equalsIgnoreCase(filterNodeType)){
|
||||||
if (preTaskList.contains(parentNodeCode) && taskNode.isBlockingTask()) {
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.common.process.ProcessDag;
|
||||||
import org.apache.dolphinscheduler.common.utils.JSONUtils;
|
import org.apache.dolphinscheduler.common.utils.JSONUtils;
|
||||||
import org.apache.dolphinscheduler.dao.entity.ProcessData;
|
import org.apache.dolphinscheduler.dao.entity.ProcessData;
|
||||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
||||||
|
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
|
||||||
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
|
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
|
||||||
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
|
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
|
||||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
|
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
|
||||||
|
|
@ -48,6 +49,57 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
* dag helper test
|
* dag helper test
|
||||||
*/
|
*/
|
||||||
public class DagHelperTest {
|
public class DagHelperTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHaveSubAfterNode(){
|
||||||
|
String parentNodeCode = "5293789969856";
|
||||||
|
List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
|
||||||
|
TaskNodeRelation relation = new TaskNodeRelation();
|
||||||
|
relation.setStartNode("5293789969856");
|
||||||
|
relation.setEndNode("5293789969857");
|
||||||
|
taskNodeRelations.add(relation);
|
||||||
|
|
||||||
|
TaskNodeRelation relationNext = new TaskNodeRelation();
|
||||||
|
relationNext.setStartNode("5293789969856");
|
||||||
|
relationNext.setEndNode("5293789969858");
|
||||||
|
taskNodeRelations.add(relationNext);
|
||||||
|
|
||||||
|
List<TaskNode> taskNodes = new ArrayList<>();
|
||||||
|
TaskNode node = new TaskNode();
|
||||||
|
node.setCode(5293789969856L);
|
||||||
|
node.setType("SHELL");
|
||||||
|
|
||||||
|
TaskNode subNode = new TaskNode();
|
||||||
|
subNode.setCode(5293789969857L);
|
||||||
|
subNode.setType("BLOCKING");
|
||||||
|
subNode.setPreTasks("[5293789969856]");
|
||||||
|
|
||||||
|
TaskNode subNextNode = new TaskNode();
|
||||||
|
subNextNode.setCode(5293789969858L);
|
||||||
|
subNextNode.setType("CONDITIONS");
|
||||||
|
subNextNode.setPreTasks("[5293789969856]");
|
||||||
|
|
||||||
|
taskNodes.add(node);
|
||||||
|
taskNodes.add(subNode);
|
||||||
|
taskNodes.add(subNextNode);
|
||||||
|
|
||||||
|
ProcessDag processDag = new ProcessDag();
|
||||||
|
processDag.setEdges(taskNodeRelations);
|
||||||
|
processDag.setNodes(taskNodes);
|
||||||
|
DAG<String,TaskNode,TaskNodeRelation> dag = DagHelper.buildDagGraph(processDag);
|
||||||
|
boolean canSubmit = DagHelper.haveAllNodeAfterNode(parentNodeCode, dag);
|
||||||
|
Assert.assertTrue(canSubmit);
|
||||||
|
|
||||||
|
boolean haveBlocking = DagHelper.haveBlockingAfterNode(parentNodeCode, dag);
|
||||||
|
Assert.assertTrue(haveBlocking);
|
||||||
|
|
||||||
|
boolean haveConditions = DagHelper.haveConditionsAfterNode(parentNodeCode, dag);
|
||||||
|
Assert.assertTrue(haveConditions);
|
||||||
|
|
||||||
|
boolean dependent = DagHelper.haveSubAfterNode(parentNodeCode, dag, TaskConstants.TASK_TYPE_DEPENDENT);
|
||||||
|
Assert.assertFalse(dependent);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* test task node can submit
|
* test task node can submit
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -456,9 +456,9 @@ public class WorkflowExecuteThread {
|
||||||
retryTaskInstance(taskInstance);
|
retryTaskInstance(taskInstance);
|
||||||
} else if (taskInstance.getState().typeIsFailure()) {
|
} else if (taskInstance.getState().typeIsFailure()) {
|
||||||
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
|
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
|
||||||
if (taskInstance.isConditionsTask()
|
// There are child nodes and the failure policy is: CONTINUE
|
||||||
|| DagHelper.haveConditionsAfterNode(Long.toString(taskInstance.getTaskCode()), dag)
|
if (DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)
|
||||||
|| DagHelper.haveBlockingAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) {
|
&& processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
|
||||||
submitPostNode(Long.toString(taskInstance.getTaskCode()));
|
submitPostNode(Long.toString(taskInstance.getTaskCode()));
|
||||||
} else {
|
} else {
|
||||||
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
|
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue