Browse Source

update taskParams/add task delayTime/fix conditionType bug (#5385)

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 4 years ago committed by GitHub
parent
commit
c3558965c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 20
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
  3. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
  4. 14
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
  5. 1
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
  6. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  7. 6
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
  8. 6
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
  9. 35
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
  10. 17
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  11. 35
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  12. 2
      sql/dolphinscheduler_mysql.sql
  13. 8
      sql/dolphinscheduler_postgre.sql

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -789,6 +789,8 @@ public final class Constants {
public static final String SUBPROCESS_INSTANCE_ID = "subProcessInstanceId"; public static final String SUBPROCESS_INSTANCE_ID = "subProcessInstanceId";
public static final String PROCESS_INSTANCE_STATE = "processInstanceState"; public static final String PROCESS_INSTANCE_STATE = "processInstanceState";
public static final String PARENT_WORKFLOW_INSTANCE = "parentWorkflowInstance"; public static final String PARENT_WORKFLOW_INSTANCE = "parentWorkflowInstance";
public static final String CONDITION_RESULT = "conditionResult";
public static final String DEPENDENCE = "dependence";
public static final String TASK_TYPE = "taskType"; public static final String TASK_TYPE = "taskType";
public static final String TASK_LIST = "taskList"; public static final String TASK_LIST = "taskList";
public static final String RWXR_XR_X = "rwxr-xr-x"; public static final String RWXR_XR_X = "rwxr-xr-x";

20
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java

@ -25,7 +25,9 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
@ -371,6 +373,24 @@ public class TaskNode {
this.preTaskNodeList = preTaskNodeList; this.preTaskNodeList = preTaskNodeList;
} }
public String getTaskParams() {
Map<String, Object> taskParams = JSONUtils.toMap(this.params, String.class, Object.class);
if (taskParams == null) {
taskParams = new HashMap<>();
}
taskParams.put(Constants.CONDITION_RESULT, this.conditionResult);
taskParams.put(Constants.DEPENDENCE, this.dependence);
return JSONUtils.toJsonString(taskParams);
}
public Map<String, Object> taskParamsToJsonObj(String taskParams) {
Map<String, Object> taskParamsMap = JSONUtils.toMap(taskParams, String.class, Object.class);
if (taskParamsMap == null) {
taskParamsMap = new HashMap<>();
}
return taskParamsMap;
}
@Override @Override
public String toString() { public String toString() {
return "TaskNode{" return "TaskNode{"

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java

@ -39,8 +39,7 @@ public class VarPoolUtils {
*/ */
public static void setTaskNodeLocalParams(TaskNode taskNode, Map<String, Object> propToValue) { public static void setTaskNodeLocalParams(TaskNode taskNode, Map<String, Object> propToValue) {
String taskParamsJson = taskNode.getParams(); String taskParamsJson = taskNode.getParams();
Map<String,Object> taskParams = JSONUtils.parseObject(taskParamsJson, HashMap.class); Map<String,Object> taskParams = JSONUtils.toMap(taskParamsJson, String.class, Object.class);
Object localParamsObject = taskParams.get(LOCALPARAMS); Object localParamsObject = taskParams.get(LOCALPARAMS);
if (null != localParamsObject && null != propToValue && propToValue.size() > 0) { if (null != localParamsObject && null != propToValue && propToValue.size() > 0) {
ArrayList<Object> localParams = (ArrayList)localParamsObject; ArrayList<Object> localParams = (ArrayList)localParamsObject;

14
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java

@ -152,6 +152,11 @@ public class TaskDefinition {
*/ */
private int timeout; private int timeout;
/**
* delay execution time.
*/
private int delayTime;
/** /**
* resource ids * resource ids
*/ */
@ -381,6 +386,14 @@ public class TaskDefinition {
this.resourceIds = resourceIds; this.resourceIds = resourceIds;
} }
public int getDelayTime() {
return delayTime;
}
public void setDelayTime(int delayTime) {
this.delayTime = delayTime;
}
@Override @Override
public String toString() { public String toString() {
return "TaskDefinition{" return "TaskDefinition{"
@ -405,6 +418,7 @@ public class TaskDefinition {
+ ", timeoutFlag=" + timeoutFlag + ", timeoutFlag=" + timeoutFlag
+ ", timeoutNotifyStrategy=" + timeoutNotifyStrategy + ", timeoutNotifyStrategy=" + timeoutNotifyStrategy
+ ", timeout=" + timeout + ", timeout=" + timeout
+ ", delayTime=" + delayTime
+ ", resourceIds='" + resourceIds + '\'' + ", resourceIds='" + resourceIds + '\''
+ ", createTime=" + createTime + ", createTime=" + createTime
+ ", updateTime=" + updateTime + ", updateTime=" + updateTime

1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java

@ -63,6 +63,7 @@ public class TaskDefinitionLog extends TaskDefinition {
this.setTimeoutNotifyStrategy(taskDefinition.getTimeoutNotifyStrategy()); this.setTimeoutNotifyStrategy(taskDefinition.getTimeoutNotifyStrategy());
this.setTaskType(taskDefinition.getTaskType()); this.setTaskType(taskDefinition.getTaskType());
this.setTimeout(taskDefinition.getTimeout()); this.setTimeout(taskDefinition.getTimeout());
this.setDelayTime(taskDefinition.getDelayTime());
this.setTimeoutFlag(taskDefinition.getTimeoutFlag()); this.setTimeoutFlag(taskDefinition.getTimeoutFlag());
this.setUpdateTime(taskDefinition.getUpdateTime()); this.setUpdateTime(taskDefinition.getUpdateTime());
this.setCreateTime(taskDefinition.getCreateTime()); this.setCreateTime(taskDefinition.getCreateTime());

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.entity; package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.Priority;
@ -415,7 +416,8 @@ public class TaskInstance implements Serializable {
public DependentParameters getDependency() { public DependentParameters getDependency() {
if (this.dependency == null) { if (this.dependency == null) {
this.dependency = JSONUtils.parseObject(this.getTaskParams(), DependentParameters.class); Map<String, Object> taskParamsMap = JSONUtils.toMap(this.getTaskParams(), String.class, Object.class);
this.dependency = JSONUtils.parseObject((String) taskParamsMap.get(Constants.DEPENDENCE), DependentParameters.class);
} }
return this.dependency; return this.dependency;
} }

6
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml

@ -20,13 +20,13 @@
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper"> <mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper">
<sql id="baseSql"> <sql id="baseSql">
id, code, `name`, version, description, project_code, user_id, task_type, task_params, flag, task_priority, id, code, `name`, version, description, project_code, user_id, task_type, task_params, flag, task_priority,
worker_group, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, worker_group, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
resource_ids, operator, operate_time, create_time, update_time resource_ids, operator, operate_time, create_time, update_time
</sql> </sql>
<select id="queryByDefinitionName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog"> <select id="queryByDefinitionName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog">
select td.id, td.code, td.name, td.version, td.description, td.project_code, td.user_id, td.task_type, td.task_params, select td.id, td.code, td.name, td.version, td.description, td.project_code, td.user_id, td.task_type, td.task_params,
td.flag, td.task_priority, td.worker_group, td.fail_retry_times, td.fail_retry_interval, td.timeout_flag, td.flag, td.task_priority, td.worker_group, td.fail_retry_times, td.fail_retry_interval, td.timeout_flag, td.timeout_notify_strategy,
td.timeout_notify_strategy, td.timeout, td.resource_ids, td.operator,td.operate_time, td.create_time, td.update_time, td.timeout, td.delay_time, td.resource_ids, td.operator,td.operate_time, td.create_time, td.update_time,
u.user_name,p.name as project_name u.user_name,p.name as project_name
from t_ds_task_definition_log td from t_ds_task_definition_log td
JOIN t_ds_user u ON td.user_id = u.id JOIN t_ds_user u ON td.user_id = u.id

6
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

@ -20,7 +20,7 @@
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper"> <mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper">
<sql id="baseSql"> <sql id="baseSql">
id, code, `name`, version, description, project_code, user_id, task_type, task_params, flag, task_priority, id, code, `name`, version, description, project_code, user_id, task_type, task_params, flag, task_priority,
worker_group, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, worker_group, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
resource_ids, create_time, update_time resource_ids, create_time, update_time
</sql> </sql>
<select id="queryByDefinitionName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition"> <select id="queryByDefinitionName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
@ -63,8 +63,8 @@
</select> </select>
<select id="queryByDefinitionId" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition"> <select id="queryByDefinitionId" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select td.id, td.code, td.name, td.version, td.description, td.project_code, td.user_id, td.task_type, td.task_params, select td.id, td.code, td.name, td.version, td.description, td.project_code, td.user_id, td.task_type, td.task_params,
td.flag, td.task_priority, td.worker_group, td.fail_retry_times, td.fail_retry_interval, td.timeout_flag, td.flag, td.task_priority, td.worker_group, td.fail_retry_times, td.fail_retry_interval, td.timeout_flag, td.timeout_notify_strategy,
td.timeout_notify_strategy, td.timeout, td.resource_ids, td.create_time, td.update_time, u.user_name,p.name as project_name td.timeout, td.delay_time, td.resource_ids, td.create_time, td.update_time, u.user_name,p.name as project_name
from t_ds_task_definition td from t_ds_task_definition td
JOIN t_ds_user u ON td.user_id = u.id JOIN t_ds_user u ON td.user_id = u.id
JOIN t_ds_project p ON td.project_code = p.code JOIN t_ds_project p ON td.project_code = p.code

35
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java

@ -65,7 +65,7 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
@Override @Override
public Boolean submitWaitComplete() { public Boolean submitWaitComplete() {
try{ try {
this.taskInstance = submit(); this.taskInstance = submit();
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionCode(),
@ -78,33 +78,28 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
logger.info("dependent task start"); logger.info("dependent task start");
waitTaskQuit(); waitTaskQuit();
updateTaskState(); updateTaskState();
}catch (Exception e){ } catch (Exception e) {
logger.error("conditions task run exception" , e); logger.error("conditions task run exception", e);
} }
return true; return true;
} }
private void waitTaskQuit() { private void waitTaskQuit() {
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId( List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstanceId() for (TaskInstance task : taskInstances) {
);
for(TaskInstance task : taskInstances){
completeTaskList.putIfAbsent(task.getName(), task.getState()); completeTaskList.putIfAbsent(task.getName(), task.getState());
} }
List<DependResult> modelResultList = new ArrayList<>(); List<DependResult> modelResultList = new ArrayList<>();
for(DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()){ for (DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()) {
List<DependResult> itemDependResult = new ArrayList<>(); List<DependResult> itemDependResult = new ArrayList<>();
for(DependentItem item : dependentTaskModel.getDependItemList()){ for (DependentItem item : dependentTaskModel.getDependItemList()) {
itemDependResult.add(getDependResultForItem(item)); itemDependResult.add(getDependResultForItem(item));
} }
DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult); DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult);
modelResultList.add(modelResult); modelResultList.add(modelResult);
} }
conditionResult = DependentUtils.getDependResultForRelation( conditionResult = DependentUtils.getDependResultForRelation(dependentParameters.getRelation(), modelResultList);
dependentParameters.getRelation(), modelResultList
);
logger.info("the conditions task depend result : {}", conditionResult); logger.info("the conditions task depend result : {}", conditionResult);
} }
@ -113,9 +108,9 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
*/ */
private void updateTaskState() { private void updateTaskState() {
ExecutionStatus status; ExecutionStatus status;
if(this.cancel){ if (this.cancel) {
status = ExecutionStatus.KILL; status = ExecutionStatus.KILL;
}else{ } else {
status = (conditionResult == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE; status = (conditionResult == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;
} }
taskInstance.setState(status); taskInstance.setState(status);
@ -137,20 +132,18 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
/** /**
* depend result for depend item * depend result for depend item
* @param item
* @return
*/ */
private DependResult getDependResultForItem(DependentItem item){ private DependResult getDependResultForItem(DependentItem item) {
DependResult dependResult = DependResult.SUCCESS; DependResult dependResult = DependResult.SUCCESS;
if(!completeTaskList.containsKey(item.getDepTasks())){ if (!completeTaskList.containsKey(item.getDepTasks())) {
logger.info("depend item: {} have not completed yet.", item.getDepTasks()); logger.info("depend item: {} have not completed yet.", item.getDepTasks());
dependResult = DependResult.FAILED; dependResult = DependResult.FAILED;
return dependResult; return dependResult;
} }
ExecutionStatus executionStatus = completeTaskList.get(item.getDepTasks()); ExecutionStatus executionStatus = completeTaskList.get(item.getDepTasks());
if(executionStatus != item.getStatus()){ if (executionStatus != item.getStatus()) {
logger.info("depend item : {} expect status: {}, actual status: {}" ,item.getDepTasks(), item.getStatus(), executionStatus); logger.info("depend item : {} expect status: {}, actual status: {}", item.getDepTasks(), item.getStatus(), executionStatus);
dependResult = DependResult.FAILED; dependResult = DependResult.FAILED;
} }
logger.info("dependent item complete {} {},{}", logger.info("dependent item complete {} {},{}",

17
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@ -175,7 +174,7 @@ public class MasterExecThread implements Runnable {
* *
* @param parentNodeName parent node name * @param parentNodeName parent node name
*/ */
private Map<String, Object> propToValue = new ConcurrentHashMap<String, Object>(); private Map<String, Object> propToValue = new ConcurrentHashMap<>();
/** /**
* constructor of MasterExecThread * constructor of MasterExecThread
@ -519,11 +518,7 @@ public class MasterExecThread implements Runnable {
} else { } else {
taskInstance.setWorkerGroup(taskWorkerGroup); taskInstance.setWorkerGroup(taskWorkerGroup);
} }
if (TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskNode.getType())) { taskInstance.setTaskParams(globalParamToTaskParams(taskNode.getTaskParams()));
taskInstance.setTaskParams(taskNode.getDependence());
} else {
taskInstance.setTaskParams(globalParamToTaskParams(taskNode.getParams()));
}
// delay execution time // delay execution time
taskInstance.setDelayTime(taskNode.getDelayTime()); taskInstance.setDelayTime(taskNode.getDelayTime());
} }
@ -545,8 +540,11 @@ public class MasterExecThread implements Runnable {
if (localParams != null) { if (localParams != null) {
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
for (Property info : allParam) { for (Property info : allParam) {
if (info.getDirect().equals(Direct.IN)) {
String paramName = info.getProp(); String paramName = info.getProp();
if (StringUtils.isNotEmpty(paramName) && propToValue.containsKey(paramName)) {
info.setValue((String) propToValue.get(paramName));
}
if (info.getDirect().equals(Direct.IN)) {
String value = globalMap.get(paramName); String value = globalMap.get(paramName);
if (StringUtils.isNotEmpty(value)) { if (StringUtils.isNotEmpty(value)) {
info.setValue(value); info.setValue(value);
@ -569,9 +567,6 @@ public class MasterExecThread implements Runnable {
throw new RuntimeException(); throw new RuntimeException();
} }
TaskNode taskNodeObject = dag.getNode(taskNode); TaskNode taskNodeObject = dag.getNode(taskNode);
if (!TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskNodeObject.getType())) {
VarPoolUtils.setTaskNodeLocalParams(taskNodeObject, propToValue);
}
taskInstances.add(createTaskInstance(processInstance, taskNodeObject)); taskInstances.add(createTaskInstance(processInstance, taskNodeObject));
} }

35
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -42,7 +42,6 @@ import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
@ -110,8 +109,6 @@ import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.commons.collections.map.HashedMap;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Calendar; import java.util.Calendar;
@ -2200,7 +2197,7 @@ public class ProcessService {
taskDefinition.setName(taskNode.getName()); taskDefinition.setName(taskNode.getName());
taskDefinition.setDescription(taskNode.getDesc()); taskDefinition.setDescription(taskNode.getDesc());
taskDefinition.setTaskType(taskNode.getType().toUpperCase()); taskDefinition.setTaskType(taskNode.getType().toUpperCase());
taskDefinition.setTaskParams(TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskNode.getType()) ? taskNode.getDependence() : taskNode.getParams()); taskDefinition.setTaskParams(taskNode.getTaskParams());
taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES); taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES);
taskDefinition.setTaskPriority(taskNode.getTaskInstancePriority()); taskDefinition.setTaskPriority(taskNode.getTaskInstancePriority());
taskDefinition.setWorkerGroup(taskNode.getWorkerGroup()); taskDefinition.setWorkerGroup(taskNode.getWorkerGroup());
@ -2209,6 +2206,7 @@ public class ProcessService {
taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE); taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
taskDefinition.setTimeoutNotifyStrategy(taskNode.getTaskTimeoutParameter().getStrategy()); taskDefinition.setTimeoutNotifyStrategy(taskNode.getTaskTimeoutParameter().getStrategy());
taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval()); taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval());
taskDefinition.setDelayTime(taskNode.getDelayTime());
taskDefinition.setResourceIds(getResourceIds(taskDefinition)); taskDefinition.setResourceIds(getResourceIds(taskDefinition));
} }
@ -2221,7 +2219,6 @@ public class ProcessService {
public String getResourceIds(TaskDefinition taskDefinition) { public String getResourceIds(TaskDefinition taskDefinition) {
Set<Integer> resourceIds = null; Set<Integer> resourceIds = null;
AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType(), taskDefinition.getTaskParams()); AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType(), taskDefinition.getTaskParams());
if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) { if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
resourceIds = params.getResourceFilesList(). resourceIds = params.getResourceFilesList().
stream() stream()
@ -2341,7 +2338,8 @@ public class ProcessService {
List<String> depList = taskNode.getDepList(); List<String> depList = taskNode.getDepList();
if (CollectionUtils.isNotEmpty(depList)) { if (CollectionUtils.isNotEmpty(depList)) {
for (String preTaskName : depList) { for (String preTaskName : depList) {
builderRelationList.add(new ProcessTaskRelation("", builderRelationList.add(new ProcessTaskRelation(
StringUtils.EMPTY,
processDefinition.getVersion(), processDefinition.getVersion(),
projectCode, projectCode,
processDefinition.getCode(), processDefinition.getCode(),
@ -2350,12 +2348,13 @@ public class ProcessService {
taskDefinitionMap.get(taskNode.getName()).getCode(), taskDefinitionMap.get(taskNode.getName()).getCode(),
taskDefinitionMap.get(taskNode.getName()).getVersion(), taskDefinitionMap.get(taskNode.getName()).getVersion(),
ConditionType.NONE, ConditionType.NONE,
taskNode.getConditionResult(), StringUtils.EMPTY,
now, now,
now)); now));
} }
} else { } else {
builderRelationList.add(new ProcessTaskRelation("", builderRelationList.add(new ProcessTaskRelation(
StringUtils.EMPTY,
processDefinition.getVersion(), processDefinition.getVersion(),
projectCode, projectCode,
processDefinition.getCode(), processDefinition.getCode(),
@ -2363,8 +2362,8 @@ public class ProcessService {
0, 0,
taskDefinitionMap.get(taskNode.getName()).getCode(), taskDefinitionMap.get(taskNode.getName()).getCode(),
taskDefinitionMap.get(taskNode.getName()).getVersion(), taskDefinitionMap.get(taskNode.getName()).getVersion(),
ConditionType.of("none"), ConditionType.NONE,
taskNode.getConditionResult(), StringUtils.EMPTY,
now, now,
now)); now));
} }
@ -2464,7 +2463,6 @@ public class ProcessService {
v = new TaskNode(); v = new TaskNode();
v.setCode(processTaskRelation.getPostTaskCode()); v.setCode(processTaskRelation.getPostTaskCode());
v.setVersion(processTaskRelation.getPostTaskVersion()); v.setVersion(processTaskRelation.getPostTaskVersion());
v.setConditionResult(processTaskRelation.getConditionParams());
List<PreviousTaskNode> preTaskNodeList = new ArrayList<>(); List<PreviousTaskNode> preTaskNodeList = new ArrayList<>();
if (processTaskRelation.getPreTaskCode() > 0) { if (processTaskRelation.getPreTaskCode() > 0) {
preTaskNodeList.add(new PreviousTaskNode(processTaskRelation.getPreTaskCode(), "", processTaskRelation.getPreTaskVersion())); preTaskNodeList.add(new PreviousTaskNode(processTaskRelation.getPreTaskCode(), "", processTaskRelation.getPreTaskVersion()));
@ -2489,13 +2487,18 @@ public class ProcessService {
v.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_NORMAL : Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); v.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_NORMAL : Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
v.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes()); v.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes());
v.setRetryInterval(taskDefinitionLog.getFailRetryInterval()); v.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
v.setParams(TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskDefinitionLog.getTaskType()) ? null : taskDefinitionLog.getTaskParams()); Map<String, Object> taskParamsMap = v.taskParamsToJsonObj(taskDefinitionLog.getTaskParams());
v.setDependence(TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskDefinitionLog.getTaskType()) ? taskDefinitionLog.getTaskParams() : null); v.setConditionResult((String) taskParamsMap.get(Constants.CONDITION_RESULT));
v.setDependence((String) taskParamsMap.get(Constants.DEPENDENCE));
taskParamsMap.remove(Constants.CONDITION_RESULT);
taskParamsMap.remove(Constants.DEPENDENCE);
v.setParams(JSONUtils.toJsonString(taskParamsMap));
v.setTaskInstancePriority(taskDefinitionLog.getTaskPriority()); v.setTaskInstancePriority(taskDefinitionLog.getTaskPriority());
v.setWorkerGroup(taskDefinitionLog.getWorkerGroup()); v.setWorkerGroup(taskDefinitionLog.getWorkerGroup());
v.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN, v.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
taskDefinitionLog.getTimeoutNotifyStrategy(), taskDefinitionLog.getTimeoutNotifyStrategy(),
taskDefinitionLog.getTimeout()))); taskDefinitionLog.getTimeout())));
v.setDelayTime(taskDefinitionLog.getDelayTime());
v.getPreTaskNodeList().forEach(task -> task.setName(taskDefinitionLogMap.get(task.getCode()).getName())); v.getPreTaskNodeList().forEach(task -> task.setName(taskDefinitionLogMap.get(task.getCode()).getName()));
v.setPreTasks(JSONUtils.toJsonString(v.getPreTaskNodeList().stream().map(PreviousTaskNode::getName).collect(Collectors.toList()))); v.setPreTasks(JSONUtils.toJsonString(v.getPreTaskNodeList().stream().map(PreviousTaskNode::getName).collect(Collectors.toList())));
}); });
@ -2503,7 +2506,7 @@ public class ProcessService {
} }
/** /**
* find task definition by code and verision * find task definition by code and version
* *
* @param taskCode * @param taskCode
* @param taskDefinitionVersion * @param taskDefinitionVersion
@ -2514,7 +2517,7 @@ public class ProcessService {
} }
/** /**
* query taks definition list by process code and process version * query tasks definition list by process code and process version
* *
* @param processCode * @param processCode
* @param processVersion * @param processVersion
@ -2523,7 +2526,7 @@ public class ProcessService {
public List<TaskDefinitionLog> queryTaskDefinitionList(Long processCode, int processVersion) { public List<TaskDefinitionLog> queryTaskDefinitionList(Long processCode, int processVersion) {
List<ProcessTaskRelationLog> processTaskRelationLogs = List<ProcessTaskRelationLog> processTaskRelationLogs =
processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion); processTaskRelationLogMapper.queryByProcessCodeAndVersion(processCode, processVersion);
Map<Long, TaskDefinition> postTaskDefinitionMap = new HashedMap(); Map<Long, TaskDefinition> postTaskDefinitionMap = new HashMap<>();
processTaskRelationLogs.forEach(processTaskRelationLog -> { processTaskRelationLogs.forEach(processTaskRelationLog -> {
Long code = processTaskRelationLog.getPostTaskCode(); Long code = processTaskRelationLog.getPostTaskCode();
int version = processTaskRelationLog.getPostTaskVersion(); int version = processTaskRelationLog.getPostTaskVersion();

2
sql/dolphinscheduler_mysql.sql

@ -465,6 +465,7 @@ CREATE TABLE `t_ds_task_definition` (
`timeout_flag` tinyint(2) DEFAULT '0' COMMENT 'timeout flag:0 close, 1 open', `timeout_flag` tinyint(2) DEFAULT '0' COMMENT 'timeout flag:0 close, 1 open',
`timeout_notify_strategy` tinyint(4) DEFAULT NULL COMMENT 'timeout notification policy: 0 warning, 1 fail', `timeout_notify_strategy` tinyint(4) DEFAULT NULL COMMENT 'timeout notification policy: 0 warning, 1 fail',
`timeout` int(11) DEFAULT '0' COMMENT 'timeout length,unit: minute', `timeout` int(11) DEFAULT '0' COMMENT 'timeout length,unit: minute',
`delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute',
`resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource id, separated by comma', `resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource id, separated by comma',
`create_time` datetime NOT NULL COMMENT 'create time', `create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time', `update_time` datetime DEFAULT NULL COMMENT 'update time',
@ -494,6 +495,7 @@ CREATE TABLE `t_ds_task_definition_log` (
`timeout_flag` tinyint(2) DEFAULT '0' COMMENT 'timeout flag:0 close, 1 open', `timeout_flag` tinyint(2) DEFAULT '0' COMMENT 'timeout flag:0 close, 1 open',
`timeout_notify_strategy` tinyint(4) DEFAULT NULL COMMENT 'timeout notification policy: 0 warning, 1 fail', `timeout_notify_strategy` tinyint(4) DEFAULT NULL COMMENT 'timeout notification policy: 0 warning, 1 fail',
`timeout` int(11) DEFAULT '0' COMMENT 'timeout length,unit: minute', `timeout` int(11) DEFAULT '0' COMMENT 'timeout length,unit: minute',
`delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute',
`resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource id, separated by comma', `resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource id, separated by comma',
`operator` int(11) DEFAULT NULL COMMENT 'operator user id', `operator` int(11) DEFAULT NULL COMMENT 'operator user id',
`operate_time` datetime DEFAULT NULL COMMENT 'operate time', `operate_time` datetime DEFAULT NULL COMMENT 'operate time',

8
sql/dolphinscheduler_postgre.sql

@ -330,8 +330,7 @@ CREATE TABLE t_ds_process_definition_log (
operate_time timestamp DEFAULT NULL , operate_time timestamp DEFAULT NULL ,
create_time timestamp DEFAULT NULL , create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id), PRIMARY KEY (id)
CONSTRAINT process_definition_unique UNIQUE (name, project_id)
) ; ) ;
DROP TABLE IF EXISTS t_ds_task_definition; DROP TABLE IF EXISTS t_ds_task_definition;
@ -353,6 +352,7 @@ CREATE TABLE t_ds_task_definition (
timeout_flag int DEFAULT NULL , timeout_flag int DEFAULT NULL ,
timeout_notify_strategy int DEFAULT NULL , timeout_notify_strategy int DEFAULT NULL ,
timeout int DEFAULT '0' , timeout int DEFAULT '0' ,
delay_time int DEFAULT '0' ,
resource_ids varchar(255) DEFAULT NULL , resource_ids varchar(255) DEFAULT NULL ,
create_time timestamp DEFAULT NULL , create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL ,
@ -381,13 +381,13 @@ CREATE TABLE t_ds_task_definition_log (
timeout_flag int DEFAULT NULL , timeout_flag int DEFAULT NULL ,
timeout_notify_strategy int DEFAULT NULL , timeout_notify_strategy int DEFAULT NULL ,
timeout int DEFAULT '0' , timeout int DEFAULT '0' ,
delay_time int DEFAULT '0' ,
resource_ids varchar(255) DEFAULT NULL , resource_ids varchar(255) DEFAULT NULL ,
operator int DEFAULT NULL , operator int DEFAULT NULL ,
operate_time timestamp DEFAULT NULL , operate_time timestamp DEFAULT NULL ,
create_time timestamp DEFAULT NULL , create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id), PRIMARY KEY (id)
CONSTRAINT task_definition_unique UNIQUE (name, project_id)
) ; ) ;
DROP TABLE IF EXISTS t_ds_process_task_relation; DROP TABLE IF EXISTS t_ds_process_task_relation;

Loading…
Cancel
Save