diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java index b56e72560c..1f85432bd2 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java @@ -34,7 +34,8 @@ public enum TaskType { * 8 FLINK * 9 HTTP * 10 DATAX - * 11 SQOOP + * 11 CONDITIONS + * 12 SQOOP */ SHELL(0, "shell"), SQL(1, "sql"), @@ -47,7 +48,8 @@ public enum TaskType { FLINK(8, "flink"), HTTP(9, "http"), DATAX(10, "datax"), - SQOOP(11, "sqoop"); + CONDITIONS(11, "conditions"), + SQOOP(12, "sqoop"); TaskType(int code, String descp){ this.code = code; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java index 484a2f7ac8..6c09064eae 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.common.model; import org.apache.dolphinscheduler.common.enums.DependResult; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; /** * dependent item @@ -28,6 +29,7 @@ public class DependentItem { private String cycle; private String dateValue; private DependResult dependResult; + private ExecutionStatus status; public String getKey(){ @@ -77,4 +79,12 @@ public class DependentItem { public void setDependResult(DependResult dependResult) { this.dependResult = dependResult; } + + public ExecutionStatus getStatus() { + return status; + } + + public void setStatus(ExecutionStatus status) { + this.status = status; + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java index 40efd0a24f..b45bd8aeb8 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.model; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -108,6 +109,11 @@ public class TaskNode { @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) private String dependence; + + @JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class) + @JsonSerialize(using = JSONUtils.JsonDataSerializer.class) + private String conditionResult; + /** * task instance priority */ @@ -230,6 +236,7 @@ public class TaskNode { Objects.equals(extras, taskNode.extras) && Objects.equals(runFlag, taskNode.runFlag) && Objects.equals(dependence, taskNode.dependence) && + Objects.equals(conditionResult, taskNode.conditionResult) && Objects.equals(workerGroupId, taskNode.workerGroupId) && CollectionUtils.equalLists(depList, taskNode.depList); } @@ -292,6 +299,10 @@ public class TaskNode { return new TaskTimeoutParameter(false); } + public boolean isConditionsTask(){ + return this.getType().toUpperCase().equals(TaskType.CONDITIONS.toString()); + } + @Override public String toString() { return "TaskNode{" + @@ -321,4 +332,12 @@ public class TaskNode { public void setWorkerGroupId(int workerGroupId) { this.workerGroupId = workerGroupId; } + + public String getConditionResult() { + return conditionResult; + } + + public void setConditionResult(String conditionResult) { + this.conditionResult = conditionResult; + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java new file mode 100644 index 0000000000..5714b5ef3e --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.task.conditions; + +import org.apache.dolphinscheduler.common.enums.DependentRelation; +import org.apache.dolphinscheduler.common.model.DependentTaskModel; +import org.apache.dolphinscheduler.common.task.AbstractParameters; + +import java.util.List; + +public class ConditionsParameters extends AbstractParameters { + + //depend node list and state, only need task name + private List dependTaskList; + private DependentRelation dependRelation; + + // node list to run when success + private List successNode; + + // node list to run when failed + private List failedNode; + + + @Override + public boolean checkParameters() { + return true; + } + + @Override + public List getResourceFilesList() { + return null; + } + + public List getDependTaskList() { + return dependTaskList; + } + + public void setDependTaskList(List dependTaskList) { + this.dependTaskList = dependTaskList; + } + + public DependentRelation getDependRelation() { + return dependRelation; + } + + public void setDependRelation(DependentRelation dependRelation) { + this.dependRelation = dependRelation; + } + + public List getSuccessNode() { + return successNode; + } + + public void setSuccessNode(List successNode) { + this.successNode = successNode; + } + + public List getFailedNode() { + return failedNode; + } + + public void setFailedNode(List failedNode) { + this.failedNode = failedNode; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java index 7a0e069a9a..a3492f49fa 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.utils; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; @@ -72,6 +73,8 @@ public class TaskParametersUtils { return JSONUtils.parseObject(parameter, HttpParameters.class); case DATAX: return JSONUtils.parseObject(parameter, DataxParameters.class); + case CONDITIONS: + return JSONUtils.parseObject(parameter, ConditionsParameters.class); case SQOOP: return JSONUtils.parseObject(parameter, SqoopParameters.class); default: diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index f5e31210a0..4b22b27bec 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.ProcessDag; +import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.*; @@ -109,6 +110,11 @@ public class MasterExecThread implements Runnable { */ private Map forbiddenTaskList = new ConcurrentHashMap<>(); + /** + * skip task map + */ + private Map skipTaskNodeList = new ConcurrentHashMap<>(); + /** * recover tolerance fault task list */ @@ -434,7 +440,7 @@ public class MasterExecThread implements Runnable { * @return TaskInstance */ private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName, - TaskNode taskNode, String parentNodeName) { + TaskNode taskNode) { TaskInstance taskInstance = findTaskIfExists(nodeName); if(taskInstance == null){ @@ -484,58 +490,140 @@ public class MasterExecThread implements Runnable { } /** - * get post task instance by node - * @param dag dag - * @param parentNodeName parent node name - * @return task instance list + * is there have conditions after the parent node + * @param parentNodeName + * @return */ - private List getPostTaskInstanceByNode(DAG dag, String parentNodeName){ + private boolean haveConditionsAfterNode(String parentNodeName){ - List postTaskList = new ArrayList<>(); + boolean result = false; Collection startVertex = DagHelper.getStartVertex(parentNodeName, dag, completeTaskList); if(startVertex == null){ - return postTaskList; + return result; + } + for(String nodeName : startVertex){ + TaskNode taskNode = dag.getNode(nodeName); + if(taskNode.getType().equals(TaskType.CONDITIONS.toString())){ + result = true; + break; + } } + return result; + } - for (String nodeName : startVertex){ - // encapsulation task instance - TaskInstance taskInstance = createTaskInstance(processInstance, nodeName , - dag.getNode(nodeName),parentNodeName); - postTaskList.add(taskInstance); + /** + * if all of the task dependence are skip, skip it too. + * @param taskNode + * @return + */ + private boolean isTaskNodeNeedSkip(TaskNode taskNode){ + if(CollectionUtils.isEmpty(taskNode.getDepList())){ + return false; } - return postTaskList; + for(String depNode : taskNode.getDepList()){ + if(!skipTaskNodeList.containsKey(depNode)){ + return false; + } + } + return true; } /** - * return start task node list - * @return task instance list + * set task node skip if dependence all skip + * @param taskNodesSkipList */ - private List getStartSubmitTaskList(){ + private void setTaskNodeSkip(List taskNodesSkipList){ + for(String skipNode : taskNodesSkipList){ + skipTaskNodeList.putIfAbsent(skipNode, dag.getNode(skipNode)); + Collection postNodeList = DagHelper.getStartVertex(skipNode, dag, completeTaskList); + List postSkipList = new ArrayList<>(); + for(String post : postNodeList){ + TaskNode postNode = dag.getNode(post); + if(isTaskNodeNeedSkip(postNode)){ + postSkipList.add(post); + } + } + setTaskNodeSkip(postSkipList); + } + } - List startTaskList = getPostTaskInstanceByNode(dag, null); - HashMap successTaskMaps = new HashMap<>(); - List resultList = new ArrayList<>(); - while(Stopper.isRunning()){ - for(TaskInstance task : startTaskList){ - if(task.getState().typeIsSuccess()){ - successTaskMaps.put(task.getName(), task); - }else if(!completeTaskList.containsKey(task.getName()) && !errorTaskList.containsKey(task.getName())){ - resultList.add(task); + /** + * parse condition task find the branch process + * set skip flag for another one. + * @param nodeName + * @return + */ + private List parseConditionTask(String nodeName){ + List conditionTaskList = new ArrayList<>(); + TaskNode taskNode = dag.getNode(nodeName); + if(!taskNode.isConditionsTask()){ + return conditionTaskList; + } + ConditionsParameters conditionsParameters = + JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class); + + TaskInstance taskInstance = completeTaskList.get(nodeName); + if(taskInstance == null){ + logger.error("task instance cannot find, please check it!", nodeName); + return conditionTaskList; + } + + if(taskInstance.getState().typeIsSuccess()){ + conditionTaskList = conditionsParameters.getSuccessNode(); + setTaskNodeSkip(conditionsParameters.getFailedNode()); + }else if(taskInstance.getState().typeIsFailure()){ + conditionTaskList = conditionsParameters.getFailedNode(); + setTaskNodeSkip(conditionsParameters.getSuccessNode()); + }else{ + conditionTaskList.add(nodeName); + } + return conditionTaskList; + } + + /** + * parse post node list of previous node + * if condition node: return process according to the settings + * if post node completed, return post nodes of the completed node + * @param previousNodeName + * @return + */ + private List parsePostNodeList(String previousNodeName){ + List postNodeList = new ArrayList<>(); + + TaskNode taskNode = dag.getNode(previousNodeName); + if(taskNode != null && taskNode.isConditionsTask()){ + return parseConditionTask(previousNodeName); + } + Collection postNodeCollection = DagHelper.getStartVertex(previousNodeName, dag, completeTaskList); + List postSkipList = new ArrayList<>(); + // delete success node, parse the past nodes + // if conditions node, + // 1. parse the branch process according the conditions setting + // 2. set skip flag on anther branch process + for(String postNode : postNodeCollection){ + if(completeTaskList.containsKey(postNode)){ + TaskInstance postTaskInstance = completeTaskList.get(postNode); + if(dag.getNode(postNode).isConditionsTask()){ + List conditionTaskNodeList = parseConditionTask(postNode); + for(String conditions : conditionTaskNodeList){ + postNodeList.addAll(parsePostNodeList(conditions)); + } + }else if(postTaskInstance.getState().typeIsSuccess()){ + postNodeList.addAll(parsePostNodeList(postNode)); + }else{ + postNodeList.add(postNode); } - } - startTaskList.clear(); - if(successTaskMaps.size() == 0){ - break; - } - Set taskNameKeys = successTaskMaps.keySet(); - for(String taskName : taskNameKeys){ - startTaskList.addAll(getPostTaskInstanceByNode(dag, taskName)); + }else if(isTaskNodeNeedSkip(dag.getNode(postNode))){ + postSkipList.add(postNode); + setTaskNodeSkip(postSkipList); + postSkipList.clear(); + }else{ + postNodeList.add(postNode); } - successTaskMaps.clear(); } - return resultList; + return postNodeList; } /** @@ -544,14 +632,17 @@ public class MasterExecThread implements Runnable { */ private void submitPostNode(String parentNodeName){ - List submitTaskList = null; - if(parentNodeName == null){ - submitTaskList = getStartSubmitTaskList(); - }else{ - submitTaskList = getPostTaskInstanceByNode(dag, parentNodeName); + List submitTaskNodeList = parsePostNodeList(parentNodeName); + + List taskInstances = new ArrayList<>(); + for(String taskNode : submitTaskNodeList){ + taskInstances.add(createTaskInstance(processInstance, taskNode, + dag.getNode(taskNode))); } + // if previous node success , post node submit - for(TaskInstance task : submitTaskList){ + for(TaskInstance task : taskInstances){ + if(readyToSubmitTaskList.containsKey(task.getName())){ continue; } @@ -575,27 +666,31 @@ public class MasterExecThread implements Runnable { private DependResult isTaskDepsComplete(String taskName) { Collection startNodes = dag.getBeginNode(); - // if the vertex returns true directly + // if vertex,returns true directly if(startNodes.contains(taskName)){ return DependResult.SUCCESS; } TaskNode taskNode = dag.getNode(taskName); - List depsNameList = taskNode.getDepList(); - for(String depsNode : depsNameList ){ + List depNameList = taskNode.getDepList(); + for(String depsNode : depNameList ){ - if(forbiddenTaskList.containsKey(depsNode)){ + if(forbiddenTaskList.containsKey(depsNode) || + skipTaskNodeList.containsKey(depsNode)){ continue; } // dependencies must be fully completed if(!completeTaskList.containsKey(depsNode)){ return DependResult.WAITING; } - ExecutionStatus taskState = completeTaskList.get(depsNode).getState(); - if(taskState.typeIsFailure()){ - return DependResult.FAILED; + ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); + // conditions task would not return failed. + if(depTaskState.typeIsFailure()){ + if(!haveConditionsAfterNode(depsNode) && !dag.getNode(depsNode).isConditionsTask()){ + return DependResult.FAILED; + } } - if(taskState.typeIsPause() || taskState.typeIsCancel()){ + if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){ return DependResult.WAITING; } } @@ -878,11 +973,15 @@ public class MasterExecThread implements Runnable { if(task.taskCanRetry()){ addTaskToStandByList(task); }else{ - // node failure, based on failure strategy - errorTaskList.put(task.getName(), task); completeTaskList.put(task.getName(), task); - if(processInstance.getFailureStrategy() == FailureStrategy.END){ - killTheOtherTasks(); + if( task.getTaskType().equals(TaskType.CONDITIONS.toString()) || + haveConditionsAfterNode(task.getName())) { + submitPostNode(task.getName()); + }else{ + errorTaskList.put(task.getName(), task); + if(processInstance.getFailureStrategy() == FailureStrategy.END){ + killTheOtherTasks(); + } } } continue; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index a69cffd58d..5e68acf94e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -165,7 +165,6 @@ public class TaskScheduleThread implements Runnable { new Date(), taskInstance.getId()); } - /** * get global paras map * @return @@ -212,21 +211,29 @@ public class TaskScheduleThread implements Runnable { * @return log path */ private String getTaskLogPath() { - String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory()) - .getLogger("ROOT") - .getAppender("TASKLOGFILE")) - .getDiscriminator()).getLogBase(); - if (baseLog.startsWith(Constants.SINGLE_SLASH)){ - return baseLog + Constants.SINGLE_SLASH + - taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH + - taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH + - taskInstance.getId() + ".log"; + String logPath; + try{ + String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory()) + .getLogger("ROOT") + .getAppender("TASKLOGFILE")) + .getDiscriminator()).getLogBase(); + if (baseLog.startsWith(Constants.SINGLE_SLASH)){ + logPath = baseLog + Constants.SINGLE_SLASH + + taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH + + taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH + + taskInstance.getId() + ".log"; + }else{ + logPath = System.getProperty("user.dir") + Constants.SINGLE_SLASH + + baseLog + Constants.SINGLE_SLASH + + taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH + + taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH + + taskInstance.getId() + ".log"; + } + }catch (Exception e){ + logger.error("logger" + e); + logPath = ""; } - return System.getProperty("user.dir") + Constants.SINGLE_SLASH + - baseLog + Constants.SINGLE_SLASH + - taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH + - taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH + - taskInstance.getId() + ".log"; + return logPath; } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index 5c50df2265..3795506b78 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskRecordStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; @@ -202,6 +203,9 @@ public abstract class AbstractTask { case SQOOP: paramsClass = SqoopParameters.class; break; + case CONDITIONS: + paramsClass = ConditionsParameters.class; + break; default: logger.error("not support this task type: {}", taskType); throw new IllegalArgumentException("not support this task type"); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java index 1ab6f2f3da..ad62b77655 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.task; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.utils.EnumUtils; +import org.apache.dolphinscheduler.server.worker.task.conditions.ConditionsTask; import org.apache.dolphinscheduler.server.worker.task.dependent.DependentTask; import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask; import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask; @@ -71,6 +72,8 @@ public class TaskManager { return new DataxTask(props, logger); case SQOOP: return new SqoopTask(props, logger); + case CONDITIONS: + return new ConditionsTask(props, logger); default: logger.error("unsupport task type: {}", taskType); throw new IllegalArgumentException("not support task type"); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/conditions/ConditionsTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/conditions/ConditionsTask.java new file mode 100644 index 0000000000..cbe82ce20a --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/conditions/ConditionsTask.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.conditions; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.DependResult; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.model.DependentItem; +import org.apache.dolphinscheduler.common.model.DependentTaskModel; +import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; +import org.apache.dolphinscheduler.common.utils.DependentUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.worker.task.AbstractTask; +import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class ConditionsTask extends AbstractTask { + + + /** + * dependent parameters + */ + private DependentParameters dependentParameters; + + /** + * process dao + */ + private ProcessService processService; + + /** + * taskInstance + */ + private TaskInstance taskInstance; + + /** + * processInstance + */ + private ProcessInstance processInstance; + + /** + * + */ + private Map completeTaskList = new ConcurrentHashMap<>(); + + /** + * constructor + * + * @param taskProps task props + * @param logger logger + */ + public ConditionsTask(TaskProps taskProps, Logger logger) { + super(taskProps, logger); + } + + @Override + public void init() throws Exception { + logger.info("conditions task initialize"); + + this.processService = SpringApplicationContext.getBean(ProcessService.class); + + this.dependentParameters = JSONUtils.parseObject(this.taskProps.getDependence(), DependentParameters.class); + + this.taskInstance = processService.findTaskInstanceById(taskProps.getTaskInstId()); + + if(taskInstance == null){ + throw new Exception("cannot find the task instance!"); + } + + List taskInstanceList = processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId()); + for(TaskInstance task : taskInstanceList){ + this.completeTaskList.putIfAbsent(task.getName(), task.getState()); + } + } + + @Override + public void handle() throws Exception { + + String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId()); + Thread.currentThread().setName(threadLoggerInfoName); + + List modelResultList = new ArrayList<>(); + for(DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()){ + + List itemDependResult = new ArrayList<>(); + for(DependentItem item : dependentTaskModel.getDependItemList()){ + itemDependResult.add(getDependResultForItem(item)); + } + DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult); + modelResultList.add(modelResult); + } + DependResult result = DependentUtils.getDependResultForRelation( + dependentParameters.getRelation(), modelResultList + ); + logger.info("the conditions task depend result : {}", result); + exitStatusCode = (result == DependResult.SUCCESS) ? + Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE; + } + + private DependResult getDependResultForItem(DependentItem item){ + + DependResult dependResult = DependResult.SUCCESS; + if(!completeTaskList.containsKey(item.getDepTasks())){ + logger.info("depend item: {} have not completed yet.", item.getDepTasks()); + dependResult = DependResult.FAILED; + return dependResult; + } + ExecutionStatus executionStatus = completeTaskList.get(item.getDepTasks()); + if(executionStatus != item.getStatus()){ + logger.info("depend item : {} expect status: {}, actual status: {}" ,item.getDepTasks(), item.getStatus().toString(), executionStatus.toString()); + dependResult = DependResult.FAILED; + } + logger.info("depend item: {}, depend result: {}", + item.getDepTasks(), dependResult); + return dependResult; + } + + @Override + public AbstractParameters getParameters() { + return null; + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java index 770ab3cff6..d2a0fb2407 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java @@ -151,4 +151,5 @@ public class MasterExecThreadTest { schedulerList.add(schedule); return schedulerList; } + } \ No newline at end of file diff --git a/dolphinscheduler-ui/.env b/dolphinscheduler-ui/.env index 4c7e96e795..e676be6059 100644 --- a/dolphinscheduler-ui/.env +++ b/dolphinscheduler-ui/.env @@ -17,4 +17,4 @@ API_BASE = http://192.168.xx.xx:12345 # If IP access is required for local development, remove the "#" -#DEV_HOST = 192.168.xx.xx \ No newline at end of file +#DEV_HOST = 192.168.xx.xx diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js old mode 100644 new mode 100755 index be4daa6595..a9a51aa2b1 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js @@ -287,6 +287,10 @@ let tasksType = { 'SQOOP': { desc: 'SQOOP', color: '#E46F13' + }, + 'CONDITIONS': { + desc: 'CONDITIONS', + color: '#E46F13' } } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss old mode 100644 new mode 100755 index 08918c969f..6d97856960 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss @@ -107,6 +107,9 @@ .icos-SQOOP { background: url("../img/toolbar_SQOOP.png") no-repeat 50% 50%; } + .icos-CONDITIONS { + background: url("../img/toobar_CONDITIONS.png") no-repeat 50% 50%; + } .toolbar { width: 60px; height: 100%; diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue old mode 100644 new mode 100755 index a21889f4ac..d912a9a884 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue @@ -473,7 +473,35 @@ */ _createNodes ({ id, type }) { let self = this + let preNode = [] + let rearNode = [] + let rearList = [] + $('div[data-targetarr*="' + id + '"]').each(function(){ + rearNode.push($(this).attr("id")) + }) + if (rearNode.length>0) { + rearNode.forEach(v => { + let rearobj = {} + rearobj.value = $(`#${v}`).find('.name-p').text() + rearobj.label = $(`#${v}`).find('.name-p').text() + rearList.push(rearobj) + }) + } else { + rearList = [] + } + let targetarr = $(`#${id}`).attr('data-targetarr') + if (targetarr) { + let nodearr = targetarr.split(',') + nodearr.forEach(v => { + let nodeobj = {} + nodeobj.value = $(`#${v}`).find('.name-p').text() + nodeobj.label = $(`#${v}`).find('.name-p').text() + preNode.push(nodeobj) + }) + } else { + preNode = [] + } if (eventModel) { eventModel.remove() } @@ -524,7 +552,9 @@ props: { id: id, taskType: type, - self: self + self: self, + preNode: preNode, + rearList: rearList } }) }) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue old mode 100644 new mode 100755 index 177b252693..2500ce5772 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue @@ -109,6 +109,43 @@ ({{$t('Minute')}}) +
+
+ {{$t('State')}} +
+
+ + + + + + + {{$t('Branch flow')}} + + + + +
+
+ +
+
+ {{$t('State')}} +
+
+ + + + + + + {{$t('Branch flow')}} + + + + +
+
+ +
@@ -236,6 +280,7 @@ import mDependent from './tasks/dependent' import mHttp from './tasks/http' import mDatax from './tasks/datax' + import mConditions from './tasks/CONDITIONS' import mSqoop from './tasks/sqoop' import mSubProcess from './tasks/sub_process' import mSelectInput from './_source/selectInput' @@ -253,13 +298,21 @@ // loading spinnerLoading: false, // node name - name: ``, + name: '', // description description: '', // Node echo data backfillItem: {}, // Resource(list) resourcesList: [], + successNode: 'success', + failedNode: 'failed', + successBranch: '', + failedBranch: '', + conditionResult: { + 'successNode': [], + 'failedNode': [] + }, // dependence dependence: {}, // cache dependence @@ -279,7 +332,17 @@ // Task priority taskInstancePriority: 'MEDIUM', // worker group id - workerGroupId: -1 + workerGroupId: -1, + stateList:[ + { + value: 'success', + label: `${i18n.$t('success')}` + }, + { + value: 'failed', + label: `${i18n.$t('failed')}` + } + ] } }, /** @@ -290,7 +353,9 @@ props: { id: Number, taskType: String, - self: Object + self: Object, + preNode: Array, + rearList: Array }, methods: { /** @@ -399,6 +464,10 @@ this.$message.warning(`${i18n.$t('Please enter name (required)')}`) return false } + if (this.successBranch !='' && this.successBranch == this.failedBranch) { + this.$message.warning(`${i18n.$t('Cannot select the same node for successful branch flow and failed branch flow')}`) + return false + } if (this.name === this.backfillItem.name) { return true } @@ -427,6 +496,8 @@ } $(`#${this.id}`).find('span').text(this.name) + this.conditionResult.successNode[0] = this.successBranch + this.conditionResult.failedNode[0] = this.failedBranch // Store the corresponding node data structure this.$emit('addTaskInfo', { item: { @@ -436,12 +507,15 @@ params: this.params, description: this.description, runFlag: this.runFlag, + conditionResult: this.conditionResult, dependence: this.dependence, maxRetryTimes: this.maxRetryTimes, retryInterval: this.retryInterval, timeout: this.timeout, taskInstancePriority: this.taskInstancePriority, - workerGroupId: this.workerGroupId + workerGroupId: this.workerGroupId, + status: this.status, + branch: this.branch }, fromThis: this }) @@ -526,7 +600,10 @@ this.description = o.description this.maxRetryTimes = o.maxRetryTimes this.retryInterval = o.retryInterval - + if(o.conditionResult) { + this.successBranch = o.conditionResult.successNode[0] + this.failedBranch = o.conditionResult.failedNode[0] + } // If the workergroup has been deleted, set the default workergroup var hasMatch = false; for (let i = 0; i < this.store.state.security.workerGroupsListAll.length; i++) { @@ -598,6 +675,7 @@ mHttp, mDatax, mSqoop, + mConditions, mSelectInput, mTimeoutAlarm, mPriority, diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/commcon.js b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/commcon.js old mode 100644 new mode 100755 index fc8fe654d2..cdf632f13d --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/commcon.js +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/commcon.js @@ -232,6 +232,16 @@ const positionList = [ code: "Headers" } ] +const nodeStatusList = [ + { + value: 'SUCCESS', + label: `${i18n.$t('success')}` + }, + { + value: 'FAILURE', + label: `${i18n.$t('failed')}` + } +] export { cycleList, @@ -239,5 +249,6 @@ export { typeList, directList, sqlTypeList, - positionList + positionList, + nodeStatusList } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/nodeStatus.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/nodeStatus.vue new file mode 100644 index 0000000000..4afb8b46c5 --- /dev/null +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/nodeStatus.vue @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + + \ No newline at end of file diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/conditions.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/conditions.vue new file mode 100644 index 0000000000..4ac04d91a6 --- /dev/null +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/conditions.vue @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + + \ No newline at end of file diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js old mode 100644 new mode 100755 index b0a7a64a47..598c94209e --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js @@ -198,7 +198,8 @@ JSP.prototype.jsonHandle = function ({ largeJson, locations }) { targetarr: locations[v.id]['targetarr'], isAttachment: this.config.isAttachment, taskType: v.type, - runFlag: v.runFlag + runFlag: v.runFlag, + nodenumber: locations[v.id]['nodenumber'], })) // contextmenu event @@ -517,6 +518,9 @@ JSP.prototype.removeConnect = function ($connect) { targetarr = _.filter(targetarr, v => v !== sourceId) $(`#${targetId}`).attr('data-targetarr', targetarr.toString()) } + if ($(`#${sourceId}`).attr('data-tasks-type')=='CONDITIONS') { + $(`#${sourceId}`).attr('data-nodenumber',Number($(`#${sourceId}`).attr('data-nodenumber'))-1) + } this.JspInstance.deleteConnection($connect) this.selectedElement = {} @@ -572,6 +576,7 @@ JSP.prototype.copyNodes = function ($id) { [newId]: { name: newName, targetarr: '', + nodenumber: 0, x: newX, y: newY } @@ -658,6 +663,7 @@ JSP.prototype.saveStore = function () { locations[v.id] = { name: v.name, targetarr: v.targetarr, + nodenumber: v.nodenumber, x: v.x, y: v.y } @@ -711,6 +717,12 @@ JSP.prototype.handleEvent = function () { return false } + if ($(`#${sourceId}`).attr('data-tasks-type')=='CONDITIONS' && $(`#${sourceId}`).attr('data-nodenumber')==2) { + return false + } else { + $(`#${sourceId}`).attr('data-nodenumber',Number($(`#${sourceId}`).attr('data-nodenumber'))+1) + } + // Storage node dependency information saveTargetarr(sourceId, targetId) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/util.js b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/util.js old mode 100644 new mode 100755 index c10dfda5d6..4b485fec0b --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/util.js +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/util.js @@ -43,9 +43,9 @@ const rtBantpl = () => { /** * return node html */ -const rtTasksTpl = ({ id, name, x, y, targetarr, isAttachment, taskType, runFlag }) => { +const rtTasksTpl = ({ id, name, x, y, targetarr, isAttachment, taskType, runFlag, nodenumber }) => { let tpl = `` - tpl += `
` + tpl += `
` tpl += `
` tpl += `
` tpl += `
` @@ -73,6 +73,7 @@ const tasksAll = () => { id: e.attr('id'), name: e.find('.name-p').text(), targetarr: e.attr('data-targetarr') || '', + nodenumber: e.attr('data-nodenumber'), x: parseInt(e.css('left'), 10), y: parseInt(e.css('top'), 10) }) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toobar_CONDITIONS.png b/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toobar_CONDITIONS.png new file mode 100644 index 0000000000..e8c5e38339 Binary files /dev/null and b/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toobar_CONDITIONS.png differ diff --git a/dolphinscheduler-ui/src/js/conf/home/store/dag/mutations.js b/dolphinscheduler-ui/src/js/conf/home/store/dag/mutations.js old mode 100644 new mode 100755 index 6ceabed8c1..b914b86740 --- a/dolphinscheduler-ui/src/js/conf/home/store/dag/mutations.js +++ b/dolphinscheduler-ui/src/js/conf/home/store/dag/mutations.js @@ -134,6 +134,7 @@ export default { state.locations[payload.id] = _.assign(state.locations[payload.id], { name: dom.find('.name-p').text(), targetarr: dom.attr('data-targetarr'), + nodenumber: dom.attr('data-nodenumber'), x: parseInt(dom.css('left'), 10), y: parseInt(dom.css('top'), 10) }) diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js old mode 100644 new mode 100755 index a96ee8e145..e8ac57adc0 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -520,7 +520,6 @@ export default { '0 means unlimited by byte': '0 means unlimited', '0 means unlimited by count': '0 means unlimited', 'Modify User': 'Modify User', - 'Please enter Mysql Database(required)': 'Please enter Mysql Database(required)', 'Please enter Mysql Table(required)': 'Please enter Mysql Table(required)', 'Please enter Columns (Comma separated)': 'Please enter Columns (Comma separated)', @@ -566,7 +565,8 @@ export default { 'Data Source': 'Data Source', 'Data Target': 'Data Target', 'All Columns': 'All Columns', - 'Some Columns': 'Some Columns' - - + 'Some Columns': 'Some Columns', + 'Modify User': 'Modify User', + 'Branch flow': 'Branch flow', + 'Cannot select the same node for successful branch flow and failed branch flow': 'Cannot select the same node for successful branch flow and failed branch flow' } diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js old mode 100644 new mode 100755 index 72c978d2e1..c72090657b --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -565,5 +565,8 @@ export default { 'Data Source': '数据来源', 'Data Target': '数据目的', 'All Columns': '全表导入', - 'Some Columns': '选择列' + 'Some Columns': '选择列', + 'Modify User': '修改用户', + 'Branch flow': '分支流转', + 'Cannot select the same node for successful branch flow and failed branch flow': '成功分支流转和失败分支流转不能选择同一个节点' }