Browse Source

[New Feature] add conditions task #205 (#2003)

* add funtion conditions task

* update conditions tasks

* update conditions for ui

* update conditions

* update

* revert

* update
pull/2/head
bao liang 5 years ago committed by GitHub
parent
commit
0df7c6719d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
  2. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java
  3. 19
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
  4. 79
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java
  5. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
  6. 197
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  7. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
  8. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
  9. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
  10. 145
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/conditions/ConditionsTask.java
  11. 1
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
  12. 4
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js
  13. 3
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss
  14. 32
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue
  15. 88
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue
  16. 13
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/commcon.js
  17. 231
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/nodeStatus.vue
  18. 265
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/conditions.vue
  19. 14
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js
  20. 5
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/util.js
  21. BIN
      dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toobar_CONDITIONS.png
  22. 1
      dolphinscheduler-ui/src/js/conf/home/store/dag/mutations.js
  23. 8
      dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
  24. 5
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

6
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java

@ -34,7 +34,8 @@ public enum TaskType {
* 8 FLINK * 8 FLINK
* 9 HTTP * 9 HTTP
* 10 DATAX * 10 DATAX
* 11 SQOOP * 11 CONDITIONS
* 12 SQOOP
*/ */
SHELL(0, "shell"), SHELL(0, "shell"),
SQL(1, "sql"), SQL(1, "sql"),
@ -47,7 +48,8 @@ public enum TaskType {
FLINK(8, "flink"), FLINK(8, "flink"),
HTTP(9, "http"), HTTP(9, "http"),
DATAX(10, "datax"), DATAX(10, "datax"),
SQOOP(11, "sqoop"); CONDITIONS(11, "conditions"),
SQOOP(12, "sqoop");
TaskType(int code, String descp){ TaskType(int code, String descp){
this.code = code; this.code = code;

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.common.model; package org.apache.dolphinscheduler.common.model;
import org.apache.dolphinscheduler.common.enums.DependResult; import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
/** /**
* dependent item * dependent item
@ -28,6 +29,7 @@ public class DependentItem {
private String cycle; private String cycle;
private String dateValue; private String dateValue;
private DependResult dependResult; private DependResult dependResult;
private ExecutionStatus status;
public String getKey(){ public String getKey(){
@ -77,4 +79,12 @@ public class DependentItem {
public void setDependResult(DependResult dependResult) { public void setDependResult(DependResult dependResult) {
this.dependResult = dependResult; this.dependResult = dependResult;
} }
public ExecutionStatus getStatus() {
return status;
}
public void setStatus(ExecutionStatus status) {
this.status = status;
}
} }

19
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.model;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -108,6 +109,11 @@ public class TaskNode {
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class) @JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String dependence; private String dependence;
@JsonDeserialize(using = JSONUtils.JsonDataDeserializer.class)
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String conditionResult;
/** /**
* task instance priority * task instance priority
*/ */
@ -230,6 +236,7 @@ public class TaskNode {
Objects.equals(extras, taskNode.extras) && Objects.equals(extras, taskNode.extras) &&
Objects.equals(runFlag, taskNode.runFlag) && Objects.equals(runFlag, taskNode.runFlag) &&
Objects.equals(dependence, taskNode.dependence) && Objects.equals(dependence, taskNode.dependence) &&
Objects.equals(conditionResult, taskNode.conditionResult) &&
Objects.equals(workerGroupId, taskNode.workerGroupId) && Objects.equals(workerGroupId, taskNode.workerGroupId) &&
CollectionUtils.equalLists(depList, taskNode.depList); CollectionUtils.equalLists(depList, taskNode.depList);
} }
@ -292,6 +299,10 @@ public class TaskNode {
return new TaskTimeoutParameter(false); return new TaskTimeoutParameter(false);
} }
public boolean isConditionsTask(){
return this.getType().toUpperCase().equals(TaskType.CONDITIONS.toString());
}
@Override @Override
public String toString() { public String toString() {
return "TaskNode{" + return "TaskNode{" +
@ -321,4 +332,12 @@ public class TaskNode {
public void setWorkerGroupId(int workerGroupId) { public void setWorkerGroupId(int workerGroupId) {
this.workerGroupId = workerGroupId; this.workerGroupId = workerGroupId;
} }
public String getConditionResult() {
return conditionResult;
}
public void setConditionResult(String conditionResult) {
this.conditionResult = conditionResult;
}
} }

79
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/conditions/ConditionsParameters.java

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.conditions;
import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import java.util.List;
public class ConditionsParameters extends AbstractParameters {
//depend node list and state, only need task name
private List<DependentTaskModel> dependTaskList;
private DependentRelation dependRelation;
// node list to run when success
private List<String> successNode;
// node list to run when failed
private List<String> failedNode;
@Override
public boolean checkParameters() {
return true;
}
@Override
public List<String> getResourceFilesList() {
return null;
}
public List<DependentTaskModel> getDependTaskList() {
return dependTaskList;
}
public void setDependTaskList(List<DependentTaskModel> dependTaskList) {
this.dependTaskList = dependTaskList;
}
public DependentRelation getDependRelation() {
return dependRelation;
}
public void setDependRelation(DependentRelation dependRelation) {
this.dependRelation = dependRelation;
}
public List<String> getSuccessNode() {
return successNode;
}
public void setSuccessNode(List<String> successNode) {
this.successNode = successNode;
}
public List<String> getFailedNode() {
return failedNode;
}
public void setFailedNode(List<String> failedNode) {
this.failedNode = failedNode;
}
}

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java

@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
@ -72,6 +73,8 @@ public class TaskParametersUtils {
return JSONUtils.parseObject(parameter, HttpParameters.class); return JSONUtils.parseObject(parameter, HttpParameters.class);
case DATAX: case DATAX:
return JSONUtils.parseObject(parameter, DataxParameters.class); return JSONUtils.parseObject(parameter, DataxParameters.class);
case CONDITIONS:
return JSONUtils.parseObject(parameter, ConditionsParameters.class);
case SQOOP: case SQOOP:
return JSONUtils.parseObject(parameter, SqoopParameters.class); return JSONUtils.parseObject(parameter, SqoopParameters.class);
default: default:

197
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.*;
@ -109,6 +110,11 @@ public class MasterExecThread implements Runnable {
*/ */
private Map<String, TaskNode> forbiddenTaskList = new ConcurrentHashMap<>(); private Map<String, TaskNode> forbiddenTaskList = new ConcurrentHashMap<>();
/**
* skip task map
*/
private Map<String, TaskNode> skipTaskNodeList = new ConcurrentHashMap<>();
/** /**
* recover tolerance fault task list * recover tolerance fault task list
*/ */
@ -434,7 +440,7 @@ public class MasterExecThread implements Runnable {
* @return TaskInstance * @return TaskInstance
*/ */
private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName, private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName,
TaskNode taskNode, String parentNodeName) { TaskNode taskNode) {
TaskInstance taskInstance = findTaskIfExists(nodeName); TaskInstance taskInstance = findTaskIfExists(nodeName);
if(taskInstance == null){ if(taskInstance == null){
@ -484,58 +490,140 @@ public class MasterExecThread implements Runnable {
} }
/** /**
* get post task instance by node * is there have conditions after the parent node
* @param dag dag * @param parentNodeName
* @param parentNodeName parent node name * @return
* @return task instance list
*/ */
private List<TaskInstance> getPostTaskInstanceByNode(DAG<String, TaskNode, TaskNodeRelation> dag, String parentNodeName){ private boolean haveConditionsAfterNode(String parentNodeName){
List<TaskInstance> postTaskList = new ArrayList<>(); boolean result = false;
Collection<String> startVertex = DagHelper.getStartVertex(parentNodeName, dag, completeTaskList); Collection<String> startVertex = DagHelper.getStartVertex(parentNodeName, dag, completeTaskList);
if(startVertex == null){ if(startVertex == null){
return postTaskList; return result;
}
for(String nodeName : startVertex){
TaskNode taskNode = dag.getNode(nodeName);
if(taskNode.getType().equals(TaskType.CONDITIONS.toString())){
result = true;
break;
}
}
return result;
} }
for (String nodeName : startVertex){ /**
// encapsulation task instance * if all of the task dependence are skip, skip it too.
TaskInstance taskInstance = createTaskInstance(processInstance, nodeName , * @param taskNode
dag.getNode(nodeName),parentNodeName); * @return
postTaskList.add(taskInstance); */
private boolean isTaskNodeNeedSkip(TaskNode taskNode){
if(CollectionUtils.isEmpty(taskNode.getDepList())){
return false;
} }
return postTaskList; for(String depNode : taskNode.getDepList()){
if(!skipTaskNodeList.containsKey(depNode)){
return false;
}
}
return true;
} }
/** /**
* return start task node list * set task node skip if dependence all skip
* @return task instance list * @param taskNodesSkipList
*/ */
private List<TaskInstance> getStartSubmitTaskList(){ private void setTaskNodeSkip(List<String> taskNodesSkipList){
for(String skipNode : taskNodesSkipList){
skipTaskNodeList.putIfAbsent(skipNode, dag.getNode(skipNode));
Collection<String> postNodeList = DagHelper.getStartVertex(skipNode, dag, completeTaskList);
List<String> postSkipList = new ArrayList<>();
for(String post : postNodeList){
TaskNode postNode = dag.getNode(post);
if(isTaskNodeNeedSkip(postNode)){
postSkipList.add(post);
}
}
setTaskNodeSkip(postSkipList);
}
}
List<TaskInstance> startTaskList = getPostTaskInstanceByNode(dag, null);
HashMap<String, TaskInstance> successTaskMaps = new HashMap<>(); /**
List<TaskInstance> resultList = new ArrayList<>(); * parse condition task find the branch process
while(Stopper.isRunning()){ * set skip flag for another one.
for(TaskInstance task : startTaskList){ * @param nodeName
if(task.getState().typeIsSuccess()){ * @return
successTaskMaps.put(task.getName(), task); */
}else if(!completeTaskList.containsKey(task.getName()) && !errorTaskList.containsKey(task.getName())){ private List<String> parseConditionTask(String nodeName){
resultList.add(task); List<String> conditionTaskList = new ArrayList<>();
TaskNode taskNode = dag.getNode(nodeName);
if(!taskNode.isConditionsTask()){
return conditionTaskList;
} }
ConditionsParameters conditionsParameters =
JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class);
TaskInstance taskInstance = completeTaskList.get(nodeName);
if(taskInstance == null){
logger.error("task instance cannot find, please check it!", nodeName);
return conditionTaskList;
} }
startTaskList.clear();
if(successTaskMaps.size() == 0){ if(taskInstance.getState().typeIsSuccess()){
break; conditionTaskList = conditionsParameters.getSuccessNode();
setTaskNodeSkip(conditionsParameters.getFailedNode());
}else if(taskInstance.getState().typeIsFailure()){
conditionTaskList = conditionsParameters.getFailedNode();
setTaskNodeSkip(conditionsParameters.getSuccessNode());
}else{
conditionTaskList.add(nodeName);
}
return conditionTaskList;
}
/**
* parse post node list of previous node
* if condition node: return process according to the settings
* if post node completed, return post nodes of the completed node
* @param previousNodeName
* @return
*/
private List<String> parsePostNodeList(String previousNodeName){
List<String> postNodeList = new ArrayList<>();
TaskNode taskNode = dag.getNode(previousNodeName);
if(taskNode != null && taskNode.isConditionsTask()){
return parseConditionTask(previousNodeName);
}
Collection<String> postNodeCollection = DagHelper.getStartVertex(previousNodeName, dag, completeTaskList);
List<String> postSkipList = new ArrayList<>();
// delete success node, parse the past nodes
// if conditions node,
// 1. parse the branch process according the conditions setting
// 2. set skip flag on anther branch process
for(String postNode : postNodeCollection){
if(completeTaskList.containsKey(postNode)){
TaskInstance postTaskInstance = completeTaskList.get(postNode);
if(dag.getNode(postNode).isConditionsTask()){
List<String> conditionTaskNodeList = parseConditionTask(postNode);
for(String conditions : conditionTaskNodeList){
postNodeList.addAll(parsePostNodeList(conditions));
}
}else if(postTaskInstance.getState().typeIsSuccess()){
postNodeList.addAll(parsePostNodeList(postNode));
}else{
postNodeList.add(postNode);
} }
Set<String> taskNameKeys = successTaskMaps.keySet(); }else if(isTaskNodeNeedSkip(dag.getNode(postNode))){
for(String taskName : taskNameKeys){ postSkipList.add(postNode);
startTaskList.addAll(getPostTaskInstanceByNode(dag, taskName)); setTaskNodeSkip(postSkipList);
postSkipList.clear();
}else{
postNodeList.add(postNode);
} }
successTaskMaps.clear();
} }
return resultList; return postNodeList;
} }
/** /**
@ -544,14 +632,17 @@ public class MasterExecThread implements Runnable {
*/ */
private void submitPostNode(String parentNodeName){ private void submitPostNode(String parentNodeName){
List<TaskInstance> submitTaskList = null; List<String> submitTaskNodeList = parsePostNodeList(parentNodeName);
if(parentNodeName == null){
submitTaskList = getStartSubmitTaskList(); List<TaskInstance> taskInstances = new ArrayList<>();
}else{ for(String taskNode : submitTaskNodeList){
submitTaskList = getPostTaskInstanceByNode(dag, parentNodeName); taskInstances.add(createTaskInstance(processInstance, taskNode,
dag.getNode(taskNode)));
} }
// if previous node success , post node submit // if previous node success , post node submit
for(TaskInstance task : submitTaskList){ for(TaskInstance task : taskInstances){
if(readyToSubmitTaskList.containsKey(task.getName())){ if(readyToSubmitTaskList.containsKey(task.getName())){
continue; continue;
} }
@ -575,27 +666,31 @@ public class MasterExecThread implements Runnable {
private DependResult isTaskDepsComplete(String taskName) { private DependResult isTaskDepsComplete(String taskName) {
Collection<String> startNodes = dag.getBeginNode(); Collection<String> startNodes = dag.getBeginNode();
// if the vertex returns true directly // if vertex,returns true directly
if(startNodes.contains(taskName)){ if(startNodes.contains(taskName)){
return DependResult.SUCCESS; return DependResult.SUCCESS;
} }
TaskNode taskNode = dag.getNode(taskName); TaskNode taskNode = dag.getNode(taskName);
List<String> depsNameList = taskNode.getDepList(); List<String> depNameList = taskNode.getDepList();
for(String depsNode : depsNameList ){ for(String depsNode : depNameList ){
if(forbiddenTaskList.containsKey(depsNode)){ if(forbiddenTaskList.containsKey(depsNode) ||
skipTaskNodeList.containsKey(depsNode)){
continue; continue;
} }
// dependencies must be fully completed // dependencies must be fully completed
if(!completeTaskList.containsKey(depsNode)){ if(!completeTaskList.containsKey(depsNode)){
return DependResult.WAITING; return DependResult.WAITING;
} }
ExecutionStatus taskState = completeTaskList.get(depsNode).getState(); ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState();
if(taskState.typeIsFailure()){ // conditions task would not return failed.
if(depTaskState.typeIsFailure()){
if(!haveConditionsAfterNode(depsNode) && !dag.getNode(depsNode).isConditionsTask()){
return DependResult.FAILED; return DependResult.FAILED;
} }
if(taskState.typeIsPause() || taskState.typeIsCancel()){ }
if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){
return DependResult.WAITING; return DependResult.WAITING;
} }
} }
@ -878,13 +973,17 @@ public class MasterExecThread implements Runnable {
if(task.taskCanRetry()){ if(task.taskCanRetry()){
addTaskToStandByList(task); addTaskToStandByList(task);
}else{ }else{
// node failure, based on failure strategy
errorTaskList.put(task.getName(), task);
completeTaskList.put(task.getName(), task); completeTaskList.put(task.getName(), task);
if( task.getTaskType().equals(TaskType.CONDITIONS.toString()) ||
haveConditionsAfterNode(task.getName())) {
submitPostNode(task.getName());
}else{
errorTaskList.put(task.getName(), task);
if(processInstance.getFailureStrategy() == FailureStrategy.END){ if(processInstance.getFailureStrategy() == FailureStrategy.END){
killTheOtherTasks(); killTheOtherTasks();
} }
} }
}
continue; continue;
} }
// other status stop/pause // other status stop/pause

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

@ -165,7 +165,6 @@ public class TaskScheduleThread implements Runnable {
new Date(), new Date(),
taskInstance.getId()); taskInstance.getId());
} }
/** /**
* get global paras map * get global paras map
* @return * @return
@ -212,22 +211,30 @@ public class TaskScheduleThread implements Runnable {
* @return log path * @return log path
*/ */
private String getTaskLogPath() { private String getTaskLogPath() {
String logPath;
try{
String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory()) String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory())
.getLogger("ROOT") .getLogger("ROOT")
.getAppender("TASKLOGFILE")) .getAppender("TASKLOGFILE"))
.getDiscriminator()).getLogBase(); .getDiscriminator()).getLogBase();
if (baseLog.startsWith(Constants.SINGLE_SLASH)){ if (baseLog.startsWith(Constants.SINGLE_SLASH)){
return baseLog + Constants.SINGLE_SLASH + logPath = baseLog + Constants.SINGLE_SLASH +
taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH + taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH +
taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH + taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskInstance.getId() + ".log"; taskInstance.getId() + ".log";
} }else{
return System.getProperty("user.dir") + Constants.SINGLE_SLASH + logPath = System.getProperty("user.dir") + Constants.SINGLE_SLASH +
baseLog + Constants.SINGLE_SLASH + baseLog + Constants.SINGLE_SLASH +
taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH + taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH +
taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH + taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskInstance.getId() + ".log"; taskInstance.getId() + ".log";
} }
}catch (Exception e){
logger.error("logger" + e);
logPath = "";
}
return logPath;
}
/** /**
* set task timeout * set task timeout

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskRecordStatus;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
@ -202,6 +203,9 @@ public abstract class AbstractTask {
case SQOOP: case SQOOP:
paramsClass = SqoopParameters.class; paramsClass = SqoopParameters.class;
break; break;
case CONDITIONS:
paramsClass = ConditionsParameters.class;
break;
default: default:
logger.error("not support this task type: {}", taskType); logger.error("not support this task type: {}", taskType);
throw new IllegalArgumentException("not support this task type"); throw new IllegalArgumentException("not support this task type");

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.EnumUtils; import org.apache.dolphinscheduler.common.utils.EnumUtils;
import org.apache.dolphinscheduler.server.worker.task.conditions.ConditionsTask;
import org.apache.dolphinscheduler.server.worker.task.dependent.DependentTask; import org.apache.dolphinscheduler.server.worker.task.dependent.DependentTask;
import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask; import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask;
import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask; import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask;
@ -71,6 +72,8 @@ public class TaskManager {
return new DataxTask(props, logger); return new DataxTask(props, logger);
case SQOOP: case SQOOP:
return new SqoopTask(props, logger); return new SqoopTask(props, logger);
case CONDITIONS:
return new ConditionsTask(props, logger);
default: default:
logger.error("unsupport task type: {}", taskType); logger.error("unsupport task type: {}", taskType);
throw new IllegalArgumentException("not support task type"); throw new IllegalArgumentException("not support task type");

145
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/conditions/ConditionsTask.java

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.conditions;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ConditionsTask extends AbstractTask {
/**
* dependent parameters
*/
private DependentParameters dependentParameters;
/**
* process dao
*/
private ProcessService processService;
/**
* taskInstance
*/
private TaskInstance taskInstance;
/**
* processInstance
*/
private ProcessInstance processInstance;
/**
*
*/
private Map<String, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
/**
* constructor
*
* @param taskProps task props
* @param logger logger
*/
public ConditionsTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger);
}
@Override
public void init() throws Exception {
logger.info("conditions task initialize");
this.processService = SpringApplicationContext.getBean(ProcessService.class);
this.dependentParameters = JSONUtils.parseObject(this.taskProps.getDependence(), DependentParameters.class);
this.taskInstance = processService.findTaskInstanceById(taskProps.getTaskInstId());
if(taskInstance == null){
throw new Exception("cannot find the task instance!");
}
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());
for(TaskInstance task : taskInstanceList){
this.completeTaskList.putIfAbsent(task.getName(), task.getState());
}
}
@Override
public void handle() throws Exception {
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
List<DependResult> modelResultList = new ArrayList<>();
for(DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()){
List<DependResult> itemDependResult = new ArrayList<>();
for(DependentItem item : dependentTaskModel.getDependItemList()){
itemDependResult.add(getDependResultForItem(item));
}
DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult);
modelResultList.add(modelResult);
}
DependResult result = DependentUtils.getDependResultForRelation(
dependentParameters.getRelation(), modelResultList
);
logger.info("the conditions task depend result : {}", result);
exitStatusCode = (result == DependResult.SUCCESS) ?
Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
}
private DependResult getDependResultForItem(DependentItem item){
DependResult dependResult = DependResult.SUCCESS;
if(!completeTaskList.containsKey(item.getDepTasks())){
logger.info("depend item: {} have not completed yet.", item.getDepTasks());
dependResult = DependResult.FAILED;
return dependResult;
}
ExecutionStatus executionStatus = completeTaskList.get(item.getDepTasks());
if(executionStatus != item.getStatus()){
logger.info("depend item : {} expect status: {}, actual status: {}" ,item.getDepTasks(), item.getStatus().toString(), executionStatus.toString());
dependResult = DependResult.FAILED;
}
logger.info("depend item: {}, depend result: {}",
item.getDepTasks(), dependResult);
return dependResult;
}
@Override
public AbstractParameters getParameters() {
return null;
}
}

1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java

@ -151,4 +151,5 @@ public class MasterExecThreadTest {
schedulerList.add(schedule); schedulerList.add(schedule);
return schedulerList; return schedulerList;
} }
} }

4
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js

@ -287,6 +287,10 @@ let tasksType = {
'SQOOP': { 'SQOOP': {
desc: 'SQOOP', desc: 'SQOOP',
color: '#E46F13' color: '#E46F13'
},
'CONDITIONS': {
desc: 'CONDITIONS',
color: '#E46F13'
} }
} }

3
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss

@ -107,6 +107,9 @@
.icos-SQOOP { .icos-SQOOP {
background: url("../img/toolbar_SQOOP.png") no-repeat 50% 50%; background: url("../img/toolbar_SQOOP.png") no-repeat 50% 50%;
} }
.icos-CONDITIONS {
background: url("../img/toobar_CONDITIONS.png") no-repeat 50% 50%;
}
.toolbar { .toolbar {
width: 60px; width: 60px;
height: 100%; height: 100%;

32
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue

@ -473,7 +473,35 @@
*/ */
_createNodes ({ id, type }) { _createNodes ({ id, type }) {
let self = this let self = this
let preNode = []
let rearNode = []
let rearList = []
$('div[data-targetarr*="' + id + '"]').each(function(){
rearNode.push($(this).attr("id"))
})
if (rearNode.length>0) {
rearNode.forEach(v => {
let rearobj = {}
rearobj.value = $(`#${v}`).find('.name-p').text()
rearobj.label = $(`#${v}`).find('.name-p').text()
rearList.push(rearobj)
})
} else {
rearList = []
}
let targetarr = $(`#${id}`).attr('data-targetarr')
if (targetarr) {
let nodearr = targetarr.split(',')
nodearr.forEach(v => {
let nodeobj = {}
nodeobj.value = $(`#${v}`).find('.name-p').text()
nodeobj.label = $(`#${v}`).find('.name-p').text()
preNode.push(nodeobj)
})
} else {
preNode = []
}
if (eventModel) { if (eventModel) {
eventModel.remove() eventModel.remove()
} }
@ -524,7 +552,9 @@
props: { props: {
id: id, id: id,
taskType: type, taskType: type,
self: self self: self,
preNode: preNode,
rearList: rearList
} }
}) })
}) })

88
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue

@ -109,6 +109,43 @@
<span>({{$t('Minute')}})</span> <span>({{$t('Minute')}})</span>
</div> </div>
</div> </div>
<div class="clearfix list" v-if="taskType === 'CONDITIONS'">
<div class="text-box">
<span>{{$t('State')}}</span>
</div>
<div class="cont-box">
<span class="label-box" style="width: 193px;display: inline-block;">
<x-select style="width: 157px;" v-model="successNode" :disabled="true">
<x-option v-for="item in stateList" :key="item.value" :value="item.value" :label="item.label">
</x-option>
</x-select>
</span>
<span class="text-b" style="padding-left: 38px">{{$t('Branch flow')}}</span>
<x-select style="width: 157px;" v-model="successBranch" clearable>
<x-option v-for="item in rearList" :key="item.value" :value="item.value" :label="item.label">
</x-option>
</x-select>
</div>
</div>
<div class="clearfix list" v-if="taskType === 'CONDITIONS'">
<div class="text-box">
<span>{{$t('State')}}</span>
</div>
<div class="cont-box">
<span class="label-box" style="width: 193px;display: inline-block;">
<x-select style="width: 157px;" v-model="failedNode" :disabled="true">
<x-option v-for="item in stateList" :key="item.value" :value="item.value" :label="item.label">
</x-option>
</x-select>
</span>
<span class="text-b" style="padding-left: 38px">{{$t('Branch flow')}}</span>
<x-select style="width: 157px;" v-model="failedBranch" clearable>
<x-option v-for="item in rearList" :key="item.value" :value="item.value" :label="item.label">
</x-option>
</x-select>
</div>
</div>
<!-- Task timeout alarm --> <!-- Task timeout alarm -->
<m-timeout-alarm <m-timeout-alarm
@ -211,6 +248,13 @@
ref="SQOOP" ref="SQOOP"
:backfill-item="backfillItem"> :backfill-item="backfillItem">
</m-sqoop> </m-sqoop>
<m-conditions
v-if="taskType === 'CONDITIONS'"
ref="CONDITIONS"
@on-dependent="_onDependent"
:backfill-item="backfillItem"
:pre-node="preNode">
</m-conditions>
</div> </div>
</div> </div>
<div class="bottom-box"> <div class="bottom-box">
@ -236,6 +280,7 @@
import mDependent from './tasks/dependent' import mDependent from './tasks/dependent'
import mHttp from './tasks/http' import mHttp from './tasks/http'
import mDatax from './tasks/datax' import mDatax from './tasks/datax'
import mConditions from './tasks/CONDITIONS'
import mSqoop from './tasks/sqoop' import mSqoop from './tasks/sqoop'
import mSubProcess from './tasks/sub_process' import mSubProcess from './tasks/sub_process'
import mSelectInput from './_source/selectInput' import mSelectInput from './_source/selectInput'
@ -253,13 +298,21 @@
// loading // loading
spinnerLoading: false, spinnerLoading: false,
// node name // node name
name: ``, name: '',
// description // description
description: '', description: '',
// Node echo data // Node echo data
backfillItem: {}, backfillItem: {},
// Resource(list) // Resource(list)
resourcesList: [], resourcesList: [],
successNode: 'success',
failedNode: 'failed',
successBranch: '',
failedBranch: '',
conditionResult: {
'successNode': [],
'failedNode': []
},
// dependence // dependence
dependence: {}, dependence: {},
// cache dependence // cache dependence
@ -279,7 +332,17 @@
// Task priority // Task priority
taskInstancePriority: 'MEDIUM', taskInstancePriority: 'MEDIUM',
// worker group id // worker group id
workerGroupId: -1 workerGroupId: -1,
stateList:[
{
value: 'success',
label: `${i18n.$t('success')}`
},
{
value: 'failed',
label: `${i18n.$t('failed')}`
}
]
} }
}, },
/** /**
@ -290,7 +353,9 @@
props: { props: {
id: Number, id: Number,
taskType: String, taskType: String,
self: Object self: Object,
preNode: Array,
rearList: Array
}, },
methods: { methods: {
/** /**
@ -399,6 +464,10 @@
this.$message.warning(`${i18n.$t('Please enter name (required)')}`) this.$message.warning(`${i18n.$t('Please enter name (required)')}`)
return false return false
} }
if (this.successBranch !='' && this.successBranch == this.failedBranch) {
this.$message.warning(`${i18n.$t('Cannot select the same node for successful branch flow and failed branch flow')}`)
return false
}
if (this.name === this.backfillItem.name) { if (this.name === this.backfillItem.name) {
return true return true
} }
@ -427,6 +496,8 @@
} }
$(`#${this.id}`).find('span').text(this.name) $(`#${this.id}`).find('span').text(this.name)
this.conditionResult.successNode[0] = this.successBranch
this.conditionResult.failedNode[0] = this.failedBranch
// Store the corresponding node data structure // Store the corresponding node data structure
this.$emit('addTaskInfo', { this.$emit('addTaskInfo', {
item: { item: {
@ -436,12 +507,15 @@
params: this.params, params: this.params,
description: this.description, description: this.description,
runFlag: this.runFlag, runFlag: this.runFlag,
conditionResult: this.conditionResult,
dependence: this.dependence, dependence: this.dependence,
maxRetryTimes: this.maxRetryTimes, maxRetryTimes: this.maxRetryTimes,
retryInterval: this.retryInterval, retryInterval: this.retryInterval,
timeout: this.timeout, timeout: this.timeout,
taskInstancePriority: this.taskInstancePriority, taskInstancePriority: this.taskInstancePriority,
workerGroupId: this.workerGroupId workerGroupId: this.workerGroupId,
status: this.status,
branch: this.branch
}, },
fromThis: this fromThis: this
}) })
@ -526,7 +600,10 @@
this.description = o.description this.description = o.description
this.maxRetryTimes = o.maxRetryTimes this.maxRetryTimes = o.maxRetryTimes
this.retryInterval = o.retryInterval this.retryInterval = o.retryInterval
if(o.conditionResult) {
this.successBranch = o.conditionResult.successNode[0]
this.failedBranch = o.conditionResult.failedNode[0]
}
// If the workergroup has been deleted, set the default workergroup // If the workergroup has been deleted, set the default workergroup
var hasMatch = false; var hasMatch = false;
for (let i = 0; i < this.store.state.security.workerGroupsListAll.length; i++) { for (let i = 0; i < this.store.state.security.workerGroupsListAll.length; i++) {
@ -598,6 +675,7 @@
mHttp, mHttp,
mDatax, mDatax,
mSqoop, mSqoop,
mConditions,
mSelectInput, mSelectInput,
mTimeoutAlarm, mTimeoutAlarm,
mPriority, mPriority,

13
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/commcon.js

@ -232,6 +232,16 @@ const positionList = [
code: "Headers" code: "Headers"
} }
] ]
const nodeStatusList = [
{
value: 'SUCCESS',
label: `${i18n.$t('success')}`
},
{
value: 'FAILURE',
label: `${i18n.$t('failed')}`
}
]
export { export {
cycleList, cycleList,
@ -239,5 +249,6 @@ export {
typeList, typeList,
directList, directList,
sqlTypeList, sqlTypeList,
positionList positionList,
nodeStatusList
} }

231
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/nodeStatus.vue

@ -0,0 +1,231 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
<template>
<div class="dep-list-model">
<div v-for="(el,$index) in dependItemList" :key='$index' class="list" @click="itemIndex = $index">
<x-select style="width: 150px;" v-model="el.depTasks" :disabled="isDetails">
<x-option v-for="item in preNode" :key="item.value" :value="item.value" :label="item.label">
</x-option>
</x-select>
<x-select style="width: 116px;" v-model="el.status" :disabled="isDetails">
<x-option v-for="item in nodeStatusList || []" :key="item.value" :value="item.value" :label="item.label">
</x-option>
</x-select>
<template v-if="isInstance">
<span class="instance-state">
<i class="iconfont" :class="'icon-' + el.state" v-if="el.state === 'SUCCESS'" data-toggle="tooltip" data-container="body" :title="$t('success')">&#xe607;</i>
<i class="iconfont" :class="'icon-' + el.state" v-if="el.state === 'WAITING'" data-toggle="tooltip" data-container="body" :title="$t('waiting')">&#xe62a;</i>
<i class="iconfont" :class="'icon-' + el.state" v-if="el.state === 'FAILED'" data-toggle="tooltip" data-container="body" :title="$t('failed')">&#xe626;</i>
</span>
</template>
<span class="operation">
<a href="javascript:" class="delete" @click="!isDetails && _remove($index)">
<i class="iconfont" :class="_isDetails" data-toggle="tooltip" data-container="body" :title="$t('delete')" >&#xe611;</i>
</a>
<a href="javascript:" class="add" @click="!isDetails && _add()" v-if="$index === (dependItemList.length - 1)">
<i class="iconfont" :class="_isDetails" data-toggle="tooltip" data-container="body" :title="$t('Add')">&#xe636;</i>
</a>
</span>
</div>
</div>
</template>
<script>
import _ from 'lodash'
import { cycleList, dateValueList, nodeStatusList } from './commcon'
import disabledState from '@/module/mixin/disabledState'
export default {
name: 'node-status',
data () {
return {
list: [],
definitionList: [],
projectList: [],
cycleList: cycleList,
isInstance: false,
itemIndex: null,
nodeStatusList: nodeStatusList
}
},
mixins: [disabledState],
props: {
dependItemList: Array,
index: Number,
dependTaskList:Array,
preNode: Array
},
model: {
prop: 'dependItemList',
event: 'dependItemListEvent'
},
methods: {
/**
* add task
*/
_add () {
// btn loading
this.isLoading = true
this.$emit('dependItemListEvent', _.concat(this.dependItemList, this._rtNewParams()))
// remove tooltip
this._removeTip()
},
/**
* remove task
*/
_remove (i) {
this.dependTaskList[this.index].dependItemList.splice(i,1)
this._removeTip()
if (!this.dependItemList.length || this.dependItemList.length === 0) {
this.$emit('on-delete-all', {
index: this.index
})
}
},
_getProjectList () {
return new Promise((resolve, reject) => {
this.projectList = _.map(_.cloneDeep(this.store.state.dag.projectListS), v => {
return {
value: v.id,
label: v.name
}
})
resolve()
})
},
_getProcessByProjectId (id) {
return new Promise((resolve, reject) => {
this.store.dispatch('dag/getProcessByProjectId', { projectId: id }).then(res => {
this.definitionList = _.map(_.cloneDeep(res), v => {
return {
value: v.id,
label: v.name
}
})
resolve(res)
})
})
},
/**
* get dependItemList
*/
_getDependItemList (ids, is = true) {
return new Promise((resolve, reject) => {
if (is) {
this.store.dispatch('dag/getProcessTasksList', { processDefinitionId: ids }).then(res => {
resolve(['ALL'].concat(_.map(res, v => v.name)))
})
} else {
this.store.dispatch('dag/getTaskListDefIdAll', { processDefinitionIdList: ids }).then(res => {
resolve(res)
})
}
})
},
_rtNewParams () {
return {
depTasks: '',
status: ''
}
},
_rtOldParams (value,depTasksList, item) {
return {
depTasks: '',
status: ''
}
},
/**
* remove tip
*/
_removeTip () {
$('body').find('.tooltip.fade.top.in').remove()
}
},
watch: {
},
beforeCreate () {
},
created () {
// is type projects-instance-details
this.isInstance = this.router.history.current.name === 'projects-instance-details'
// get processlist
this._getProjectList().then(() => {
let projectId = this.projectList[0].value
if (!this.dependItemList.length) {
this.$emit('dependItemListEvent', _.concat(this.dependItemList, this._rtNewParams()))
} else {
// get definitionId ids
let ids = _.map(this.dependItemList, v => v.definitionId).join(',')
// get item list
this._getDependItemList(ids, false).then(res => {
_.map(this.dependItemList, (v, i) => {
this._getProcessByProjectId(v.projectId).then(definitionList => {
this.$set(this.dependItemList, i, this._rtOldParams(v.definitionId, ['ALL'].concat(_.map(res[v.definitionId] || [], v => v.name)), v))
})
})
})
}
})
},
mounted () {
},
components: {}
}
</script>
<style lang="scss" rel="stylesheet/scss">
.dep-list-model {
position: relative;
min-height: 32px;
.list {
margin-bottom: 6px;
.operation {
padding-left: 4px;
a {
i {
font-size: 18px;
vertical-align: middle;
}
}
.delete {
color: #ff0000;
}
.add {
color: #0097e0;
}
}
}
.instance-state {
display: inline-block;
width: 24px;
.iconfont {
font-size: 20px;
vertical-align: middle;
cursor: pointer;
margin-left: 6px;
&.icon-SUCCESS {
color: #33cc00;
}
&.icon-WAITING {
color: #888888;
}
&.icon-FAILED {
color: #F31322;
}
}
}
}
</style>

265
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/conditions.vue

@ -0,0 +1,265 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
<template>
<div class="dependence-model">
<m-list-box>
<div slot="text">{{$t('Custom Parameters')}}</div>
<div slot="content">
<div class="dep-opt">
<a href="javascript:"
@click="!isDetails && _addDep()"
class="add-dep">
<i v-if="!isLoading" class="iconfont" :class="_isDetails" data-toggle="tooltip" :title="$t('Add')">
&#xe636;
</i>
<i v-if="isLoading" class="iconfont fa fa-spin" data-toggle="tooltip" :title="$t('Add')">
&#xe6af;
</i>
</a>
</div>
<div class="dep-box">
<span
class="dep-relation"
@click="!isDetails && _setGlobalRelation()"
v-if="dependTaskList.length">
{{relation === 'AND' ? $t('and') : $t('or')}}
</span>
<div class="dep-list" v-for="(el,$index) in dependTaskList" :key='$index'>
<span class="dep-line-pie"
v-if="el.dependItemList.length"
@click="!isDetails && _setRelation($index)">
{{el.relation === 'AND' ? $t('and') : $t('or')}}
</span>
<i class="iconfont dep-delete"
data-toggle="tooltip"
data-container="body"
:class="_isDetails"
@click="!isDetails && _deleteDep($index)"
:title="$t('delete')" >
&#xe611;
</i>
<m-node-status
:dependTaskList='dependTaskList'
v-model="el.dependItemList"
@on-delete-all="_onDeleteAll"
@getDependTaskList="getDependTaskList"
:index="$index"
:rear-list = "rearList"
:pre-node = "preNode">
</m-node-status>
</div>
</div>
</div>
</m-list-box>
</div>
</template>
<script>
import _ from 'lodash'
import mListBox from './_source/listBox'
import mNodeStatus from './_source/nodeStatus'
import disabledState from '@/module/mixin/disabledState'
export default {
name: 'dependence',
data () {
return {
relation: 'AND',
dependTaskList: [],
isLoading: false
}
},
mixins: [disabledState],
props: {
backfillItem: Object,
preNode: Array,
rearList: Array
},
methods: {
_addDep () {
if (!this.isLoading) {
this.isLoading = true
this.dependTaskList.push({
dependItemList: [],
relation: 'AND'
})
}
},
_deleteDep (i) {
// remove index dependent
this.dependTaskList.splice(i, 1)
// remove tootip
$('body').find('.tooltip.fade.top.in').remove()
},
_onDeleteAll (i) {
this.dependTaskList.map((item,i)=>{
if(item.dependItemList.length === 0){
this.dependTaskList.splice(i,1)
}
})
// this._deleteDep(i)
},
_setGlobalRelation () {
this.relation = this.relation === 'AND' ? 'OR' : 'AND'
},
getDependTaskList(i){
// console.log('getDependTaskList',i)
},
_setRelation (i) {
this.dependTaskList[i].relation = this.dependTaskList[i].relation === 'AND' ? 'OR' : 'AND'
},
_verification () {
this.$emit('on-dependent', {
relation: this.relation,
dependTaskList: _.map(this.dependTaskList, v => {
return {
relation: v.relation,
dependItemList: _.map(v.dependItemList, v1 => _.omit(v1, ['depTasksList', 'state', 'dateValueList']))
}
})
})
return true
}
},
watch: {
dependTaskList (e) {
setTimeout(() => {
this.isLoading = false
}, 600)
}
},
beforeCreate () {
},
created () {
let o = this.backfillItem
let dependentResult = $(`#${o.id}`).data('dependent-result') || {}
// Does not represent an empty object backfill
if (!_.isEmpty(o)) {
this.relation = _.cloneDeep(o.dependence.relation) || 'AND'
this.dependTaskList = _.cloneDeep(o.dependence.dependTaskList) || []
let defaultState = this.isDetails ? 'WAITING' : ''
// Process instance return status display matches by key
_.map(this.dependTaskList, v => _.map(v.dependItemList, v1 => v1.state = dependentResult[`${v1.definitionId}-${v1.depTasks}-${v1.cycle}-${v1.dateValue}`] || defaultState))
}
},
mounted () {
},
destroyed () {
},
computed: {},
components: { mListBox, mNodeStatus }
}
</script>
<style lang="scss" rel="stylesheet/scss">
.dependence-model {
margin-top: -10px;
.dep-opt {
margin-bottom: 10px;
padding-top: 3px;
line-height: 24px;
.add-dep {
color: #0097e0;
margin-right: 10px;
i {
font-size: 18px;
vertical-align: middle;
}
}
}
.dep-list {
margin-bottom: 16px;
position: relative;
border-left: 1px solid #eee;
padding-left: 16px;
margin-left: -16px;
transition: all 0.2s ease-out;
padding-bottom: 20px;
&:hover{
border-left: 1px solid #0097e0;
transition: all 0.2s ease-out;
.dep-line-pie {
transition: all 0.2s ease-out;
border: 1px solid #0097e0;
background: #0097e0;
color: #fff;
}
}
.dep-line-pie {
transition: all 0.2s ease-out;
position: absolute;
width: 20px;
height: 20px;
border: 1px solid #e2e2e2;
text-align: center;
top: 50%;
margin-top: -20px;
z-index: 1;
left: -10px;
border-radius: 10px;
background: #fff;
font-size: 12px;
cursor: pointer;
&::selection {
background:transparent;
}
&::-moz-selection {
background:transparent;
}
&::-webkit-selection {
background:transparent;
}
}
.dep-delete {
position: absolute;
bottom: -6px;
left: 14px;
font-size: 18px;
color: #ff0000;
cursor: pointer;
}
}
.dep-box {
border-left: 4px solid #eee;
margin-left: -46px;
padding-left: 42px;
position: relative;
.dep-relation {
position: absolute;
width: 20px;
height: 20px;
border: 1px solid #e2e2e2;
text-align: center;
top: 50%;
margin-top: -10px;
z-index: 1;
left: -12px;
border-radius: 10px;
background: #fff;
font-size: 12px;
cursor: pointer;
&::selection {
background:transparent;
}
&::-moz-selection {
background:transparent;
}
&::-webkit-selection {
background:transparent;
}
}
}
}
</style>

14
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/jsPlumbHandle.js

@ -198,7 +198,8 @@ JSP.prototype.jsonHandle = function ({ largeJson, locations }) {
targetarr: locations[v.id]['targetarr'], targetarr: locations[v.id]['targetarr'],
isAttachment: this.config.isAttachment, isAttachment: this.config.isAttachment,
taskType: v.type, taskType: v.type,
runFlag: v.runFlag runFlag: v.runFlag,
nodenumber: locations[v.id]['nodenumber'],
})) }))
// contextmenu event // contextmenu event
@ -517,6 +518,9 @@ JSP.prototype.removeConnect = function ($connect) {
targetarr = _.filter(targetarr, v => v !== sourceId) targetarr = _.filter(targetarr, v => v !== sourceId)
$(`#${targetId}`).attr('data-targetarr', targetarr.toString()) $(`#${targetId}`).attr('data-targetarr', targetarr.toString())
} }
if ($(`#${sourceId}`).attr('data-tasks-type')=='CONDITIONS') {
$(`#${sourceId}`).attr('data-nodenumber',Number($(`#${sourceId}`).attr('data-nodenumber'))-1)
}
this.JspInstance.deleteConnection($connect) this.JspInstance.deleteConnection($connect)
this.selectedElement = {} this.selectedElement = {}
@ -572,6 +576,7 @@ JSP.prototype.copyNodes = function ($id) {
[newId]: { [newId]: {
name: newName, name: newName,
targetarr: '', targetarr: '',
nodenumber: 0,
x: newX, x: newX,
y: newY y: newY
} }
@ -658,6 +663,7 @@ JSP.prototype.saveStore = function () {
locations[v.id] = { locations[v.id] = {
name: v.name, name: v.name,
targetarr: v.targetarr, targetarr: v.targetarr,
nodenumber: v.nodenumber,
x: v.x, x: v.x,
y: v.y y: v.y
} }
@ -711,6 +717,12 @@ JSP.prototype.handleEvent = function () {
return false return false
} }
if ($(`#${sourceId}`).attr('data-tasks-type')=='CONDITIONS' && $(`#${sourceId}`).attr('data-nodenumber')==2) {
return false
} else {
$(`#${sourceId}`).attr('data-nodenumber',Number($(`#${sourceId}`).attr('data-nodenumber'))+1)
}
// Storage node dependency information // Storage node dependency information
saveTargetarr(sourceId, targetId) saveTargetarr(sourceId, targetId)

5
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/plugIn/util.js

@ -43,9 +43,9 @@ const rtBantpl = () => {
/** /**
* return node html * return node html
*/ */
const rtTasksTpl = ({ id, name, x, y, targetarr, isAttachment, taskType, runFlag }) => { const rtTasksTpl = ({ id, name, x, y, targetarr, isAttachment, taskType, runFlag, nodenumber }) => {
let tpl = `` let tpl = ``
tpl += `<div class="w jtk-draggable jtk-droppable jtk-endpoint-anchor jtk-connected ${isAttachment ? 'jtk-ep' : ''}" data-targetarr="${targetarr || ''}" data-tasks-type="${taskType}" id="${id}" style="left: ${x}px; top: ${y}px;">` tpl += `<div class="w jtk-draggable jtk-droppable jtk-endpoint-anchor jtk-connected ${isAttachment ? 'jtk-ep' : ''}" data-targetarr="${targetarr || ''}" data-nodenumber="${nodenumber || 0}" data-tasks-type="${taskType}" id="${id}" style="left: ${x}px; top: ${y}px;">`
tpl += `<div>` tpl += `<div>`
tpl += `<div class="state-p"></div>` tpl += `<div class="state-p"></div>`
tpl += `<div class="icos icos-${taskType}"></div>` tpl += `<div class="icos icos-${taskType}"></div>`
@ -73,6 +73,7 @@ const tasksAll = () => {
id: e.attr('id'), id: e.attr('id'),
name: e.find('.name-p').text(), name: e.find('.name-p').text(),
targetarr: e.attr('data-targetarr') || '', targetarr: e.attr('data-targetarr') || '',
nodenumber: e.attr('data-nodenumber'),
x: parseInt(e.css('left'), 10), x: parseInt(e.css('left'), 10),
y: parseInt(e.css('top'), 10) y: parseInt(e.css('top'), 10)
}) })

BIN
dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toobar_CONDITIONS.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 KiB

1
dolphinscheduler-ui/src/js/conf/home/store/dag/mutations.js

@ -134,6 +134,7 @@ export default {
state.locations[payload.id] = _.assign(state.locations[payload.id], { state.locations[payload.id] = _.assign(state.locations[payload.id], {
name: dom.find('.name-p').text(), name: dom.find('.name-p').text(),
targetarr: dom.attr('data-targetarr'), targetarr: dom.attr('data-targetarr'),
nodenumber: dom.attr('data-nodenumber'),
x: parseInt(dom.css('left'), 10), x: parseInt(dom.css('left'), 10),
y: parseInt(dom.css('top'), 10) y: parseInt(dom.css('top'), 10)
}) })

8
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@ -520,7 +520,6 @@ export default {
'0 means unlimited by byte': '0 means unlimited', '0 means unlimited by byte': '0 means unlimited',
'0 means unlimited by count': '0 means unlimited', '0 means unlimited by count': '0 means unlimited',
'Modify User': 'Modify User', 'Modify User': 'Modify User',
'Please enter Mysql Database(required)': 'Please enter Mysql Database(required)', 'Please enter Mysql Database(required)': 'Please enter Mysql Database(required)',
'Please enter Mysql Table(required)': 'Please enter Mysql Table(required)', 'Please enter Mysql Table(required)': 'Please enter Mysql Table(required)',
'Please enter Columns (Comma separated)': 'Please enter Columns (Comma separated)', 'Please enter Columns (Comma separated)': 'Please enter Columns (Comma separated)',
@ -566,7 +565,8 @@ export default {
'Data Source': 'Data Source', 'Data Source': 'Data Source',
'Data Target': 'Data Target', 'Data Target': 'Data Target',
'All Columns': 'All Columns', 'All Columns': 'All Columns',
'Some Columns': 'Some Columns' 'Some Columns': 'Some Columns',
'Modify User': 'Modify User',
'Branch flow': 'Branch flow',
'Cannot select the same node for successful branch flow and failed branch flow': 'Cannot select the same node for successful branch flow and failed branch flow'
} }

5
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -565,5 +565,8 @@ export default {
'Data Source': '数据来源', 'Data Source': '数据来源',
'Data Target': '数据目的', 'Data Target': '数据目的',
'All Columns': '全表导入', 'All Columns': '全表导入',
'Some Columns': '选择列' 'Some Columns': '选择列',
'Modify User': '修改用户',
'Branch flow': '分支流转',
'Cannot select the same node for successful branch flow and failed branch flow': '成功分支流转和失败分支流转不能选择同一个节点'
} }

Loading…
Cancel
Save