Browse Source

[Feature][JsonSplit] add relation method and refactor task method (#4737)

* add task query

* modify codestyle

* add task delete/update/swich method

* add task delete/update/swich method

* codestyle

* use updateById save task definition

* modify method name

* code style

* code style

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 4 years ago committed by GitHub
parent
commit
763a50d0cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  2. 22
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  3. 84
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
  4. 22
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  5. 22
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java
  6. 161
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
  7. 91
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  8. 16
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
  9. 27
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java
  10. 41
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
  11. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
  12. 13
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
  13. 33
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
  14. 12
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
  15. 33
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
  16. 223
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  17. 8
      sql/dolphinscheduler-postgre.sql
  18. 4
      sql/dolphinscheduler_mysql.sql

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -254,6 +254,10 @@ public enum Status {
BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028,"batch export process definition by ids error", "批量导出工作流定义错误"),
IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"),
TASK_DEFINE_NOT_EXIST(50030, "task definition {0} does not exist", "任务定义[{0}]不存在"),
DELETE_TASK_DEFINE_BY_CODE_ERROR(50031, "delete task definition by code error", "删除任务定义错误"),
DELETE_PROCESS_TASK_RELATION_ERROR(50032, "delete process task relation error", "删除工作流任务关系错误"),
PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation {0} does not exist", "工作流任务关系[{0}]不存在"),
PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already exist, processCode:[{0}]", "工作流任务关系已存在, processCode:[{0}]"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
/**

22
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -59,7 +59,6 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@ -459,16 +458,6 @@ public class ProcessInstanceService extends BaseService {
/**
* sync definition according process instance
*
* @param loginUser
* @param project
* @param processInstanceJson
* @param locations
* @param connects
* @param processInstance
* @param processDefinition
* @param processData
* @return
*/
private int syncDefinition(User loginUser, Project project, String processInstanceJson, String locations, String connects,
ProcessInstance processInstance, ProcessDefinition processDefinition,
@ -491,13 +480,6 @@ public class ProcessInstanceService extends BaseService {
/**
* update process instance attributes
*
* @param processInstance
* @param tenant
* @param scheduleTime
* @param locations
* @param connects
* @param processInstanceJson
* @param processData
* @return false if check failed or
*/
private void setProcessInstance(ProcessInstance processInstance, Tenant tenant,
@ -747,6 +729,7 @@ public class ProcessInstanceService extends BaseService {
/**
* query process instance by processDefinitionId and stateArray
*
* @param processDefinitionId processDefinitionId
* @param states states array
* @return process instance list
@ -757,11 +740,12 @@ public class ProcessInstanceService extends BaseService {
/**
* query process instance by processDefinitionId
*
* @param processDefinitionId processDefinitionId
* @param size size
* @return process instance list
*/
public List<ProcessInstance> queryByProcessDefineId(int processDefinitionId,int size) {
public List<ProcessInstance> queryByProcessDefineId(int processDefinitionId, int size) {
return processInstanceMapper.queryByProcessDefineId(processDefinitionId, size);
}

84
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java

@ -26,29 +26,6 @@ import java.util.Map;
*/
public interface ProcessTaskRelationService {
/**
* create process task relation
*
* @param loginUser login user
* @param name relation name
* @param projectName process name
* @param processDefinitionCode process definition code
* @param preTaskCode pre task code
* @param postTaskCode post task code
* @param conditionType condition type
* @param conditionParams condition params
* @return create result code
*/
Map<String, Object> createProcessTaskRelation(User loginUser,
String name,
String projectName,
Long processDefinitionCode,
Long preTaskCode,
Long postTaskCode,
String conditionType,
String conditionParams);
/**
* query process task relation
*
@ -59,66 +36,5 @@ public interface ProcessTaskRelationService {
Map<String, Object> queryProcessTaskRelation(User loginUser,
String projectName,
Long processDefinitionCode);
/**
* delete process task relation
*
* @param loginUser login user
* @param projectName project name
* @param processDefinitionCode process definition code
*/
Map<String, Object> deleteTaskDefinitionByProcessCode(User loginUser,
String projectName,
Long processDefinitionCode);
/**
* delete process task relation
*
* @param loginUser login user
* @param projectName project name
* @param preTaskCode pre task code
*/
Map<String, Object> deleteTaskDefinitionByTaskCode(User loginUser,
String projectName,
Long preTaskCode);
/**
* update process task relation
*
* @param loginUser login user
* @param id process task relation id
* @param name relation name
* @param projectName process name
* @param processDefinitionCode process definition code
* @param preTaskCode pre task code
* @param postTaskCode post task code
* @param conditionType condition type
* @param conditionParams condition params
*/
Map<String, Object> updateTaskDefinition(User loginUser,
int id,
String name,
String projectName,
Long processDefinitionCode,
Long preTaskCode,
Long postTaskCode,
String conditionType,
String conditionParams);
/**
* switch process task relation version
*
* @param loginUser login user
* @param projectName project name
* @param processTaskRelationId process task relation id
* @param version the version user want to switch
* @return switch process task relation version result code
*/
Map<String, Object> switchVersion(User loginUser,
String projectName,
int processTaskRelationId,
int version);
}

22
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -26,10 +26,8 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.SchedulerService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.FileUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@ -68,15 +66,14 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.permission.PermissionCheck;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -132,12 +129,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
@Autowired
private ProjectService projectService;
@Autowired
private TaskDefinitionService taskDefinitionService;
@Autowired
private ProcessTaskRelationService processTaskRelationService;
@Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper;
@ -157,7 +148,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
private ProcessService processService;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
private ProcessTaskRelationMapper processTaskRelationMapper;
/**
* create process definition
@ -205,10 +196,11 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
putMsg(result, Status.CREATE_PROCESS_DEFINITION);
return result;
}
ProcessDefinitionLog processDefinitionLog = processService.insertProcessDefinitionLog(loginUser, processDefinitionCode, processDefinitionName, processData,
ProcessDefinitionLog processDefinitionLog = processService.insertProcessDefinitionLog(loginUser, processDefinitionCode, processDefinitionName, processData,
project, desc, locations, connects);
processService.switchVersion(processDefinition, processDefinitionLog);
processService.createTaskAndRelation(loginUser, projectName, "", processDefinition, processData);
// TODO relationName have ?
processService.createTaskAndRelation(loginUser, project.getCode(), processDefinition, processData);
// return processDefinition object with ID
result.put(Constants.DATA_LIST, processDefinition.getId());
@ -538,7 +530,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
// TODO: replace id to code
// ProcessDefinition processDefinition = processDefineMapper.deleteByCode(processDefinitionCode);
int delete = processDefinitionMapper.deleteById(processDefinitionId);
processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode());
if (delete > 0) {
putMsg(result, Status.SUCCESS);
} else {
@ -1613,7 +1605,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
}
ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper
.queryByDefinitionCodeAndVersion(processDefinition.getCode(),version);
.queryByDefinitionCodeAndVersion(processDefinition.getCode(), version);
if (Objects.isNull(processDefinitionLog)) {
putMsg(result

22
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java

@ -66,17 +66,17 @@ public class ProcessDefinitionVersionServiceImpl extends BaseService implements
long version = this.queryMaxVersionByProcessDefinitionId(processDefinition.getId()) + 1;
ProcessDefinitionVersion processDefinitionVersion = ProcessDefinitionVersion
.newBuilder()
.processDefinitionId(processDefinition.getId())
.version(version)
.processDefinitionJson(processDefinition.getProcessDefinitionJson())
.description(processDefinition.getDescription())
.locations(processDefinition.getLocations())
.connects(processDefinition.getConnects())
.timeout(processDefinition.getTimeout())
.globalParams(processDefinition.getGlobalParams())
.createTime(processDefinition.getUpdateTime())
.warningGroupId(processDefinition.getWarningGroupId())
.newBuilder()
.processDefinitionId(processDefinition.getId())
.version(version)
.processDefinitionJson(processDefinition.getProcessDefinitionJson())
.description(processDefinition.getDescription())
.locations(processDefinition.getLocations())
.connects(processDefinition.getConnects())
.timeout(processDefinition.getTimeout())
.globalParams(processDefinition.getGlobalParams())
.createTime(processDefinition.getUpdateTime())
.warningGroupId(processDefinition.getWarningGroupId())
.resourceIds(processDefinition.getResourceIds())
.build();

161
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

@ -22,28 +22,20 @@ import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* task definition service impl
@ -60,167 +52,28 @@ public class ProcessTaskRelationServiceImpl extends BaseService implements
@Autowired
private ProjectService projectService;
@Autowired
private ProcessDefinitionMapper processDefineMapper;
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
/**
* create process task relation
* query process task relation
*
* @param loginUser login user
* @param name relation name
* @param projectName process name
* @param projectName project name
* @param processDefinitionCode process definition code
* @param preTaskCode pre task code
* @param postTaskCode post task code
* @param conditionType condition type
* @param conditionParams condition params
* @return create result code
*/
@Transactional
@Override
public Map<String, Object> createProcessTaskRelation(User loginUser,
String name,
String projectName,
Long processDefinitionCode,
Long preTaskCode,
Long postTaskCode,
String conditionType,
String conditionParams) {
public Map<String, Object> queryProcessTaskRelation(User loginUser, String projectName, Long processDefinitionCode) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
// check project auth
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
if (resultStatus != Status.SUCCESS) {
if (checkResult.get(Constants.STATUS) != Status.SUCCESS) {
return checkResult;
}
// check processDefinitionCode
ProcessDefinition processDefinition = processDefineMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
return result;
}
// check preTaskCode and postTaskCode
checkTaskDefinitionCode(result, preTaskCode);
if (postTaskCode > 0) {
checkTaskDefinitionCode(result, postTaskCode);
}
resultStatus = (Status) result.get(Constants.STATUS);
if (resultStatus != Status.SUCCESS) {
return result;
}
Date now = new Date();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(name,
1,
project.getCode(),
processDefinitionCode,
preTaskCode,
postTaskCode,
ConditionType.of(conditionType),
conditionParams,
now,
now);
// save process task relation
processTaskRelationMapper.insert(processTaskRelation);
// save process task relation log
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.set(processTaskRelation);
processTaskRelationLog.setOperator(loginUser.getId());
processTaskRelationLog.setOperateTime(now);
processTaskRelationLogMapper.insert(processTaskRelationLog);
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(project.getCode(), processDefinitionCode);
result.put(Constants.DATA_LIST, processTaskRelationList);
putMsg(result, Status.SUCCESS);
return result;
}
private void checkTaskDefinitionCode(Map<String, Object> result, Long taskCode) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(taskCode);
if (taskDefinition == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
}
}
/**
* query process task relation
*
* @param loginUser login user
* @param projectName project name
* @param processDefinitionCode process definition code
*/
@Override
public Map<String, Object> queryProcessTaskRelation(User loginUser, String projectName, Long processDefinitionCode) {
return null;
}
/**
* delete process task relation
*
* @param loginUser login user
* @param projectName project name
* @param processDefinitionCode process definition code
*/
@Override
public Map<String, Object> deleteTaskDefinitionByProcessCode(User loginUser, String projectName, Long processDefinitionCode) {
return null;
}
/**
* delete process task relation
*
* @param loginUser login user
* @param projectName project name
* @param preTaskCode pre task code
*/
@Override
public Map<String, Object> deleteTaskDefinitionByTaskCode(User loginUser, String projectName, Long preTaskCode) {
return null;
}
/**
* update process task relation
*
* @param loginUser login user
* @param id process task relation id
* @param name relation name
* @param projectName process name
* @param processDefinitionCode process definition code
* @param preTaskCode pre task code
* @param postTaskCode post task code
* @param conditionType condition type
* @param conditionParams condition params
*/
@Override
public Map<String, Object> updateTaskDefinition(User loginUser,
int id,
String name,
String projectName,
Long processDefinitionCode,
Long preTaskCode,
Long postTaskCode,
String conditionType,
String conditionParams) {
return null;
}
/**
* switch process task relation version
*
* @param loginUser login user
* @param projectName project name
* @param processTaskRelationId process task relation id
* @param version the version user want to switch
* @return switch process task relation version result code
*/
@Override
public Map<String, Object> switchVersion(User loginUser, String projectName, int processTaskRelationId, int version) {
return null;
}
}

91
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -23,26 +23,16 @@ import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@ -86,9 +76,6 @@ public class TaskDefinitionServiceImpl extends BaseService implements
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@Autowired
private ProcessService processService;
@ -99,7 +86,7 @@ public class TaskDefinitionServiceImpl extends BaseService implements
* @param projectName project name
* @param taskDefinitionJson task definition json
*/
@Transactional(rollbackFor = Exception.class)
@Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> createTaskDefinition(User loginUser,
String projectName,
@ -119,9 +106,11 @@ public class TaskDefinitionServiceImpl extends BaseService implements
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
TaskDefinition taskDefinition = new TaskDefinition();
long code = 0L;
try {
code = SnowFlakeUtils.getInstance().nextId();
taskDefinition.setCode(code);
} catch (SnowFlakeException e) {
logger.error("Task code get error, ", e);
}
@ -129,37 +118,10 @@ public class TaskDefinitionServiceImpl extends BaseService implements
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
return result;
}
Date now = new Date();
TaskDefinition taskDefinition = new TaskDefinition(code,
taskNode.getName(),
1,
taskNode.getDesc(),
project.getCode(),
loginUser.getId(),
TaskType.of(taskNode.getType()),
taskNode.getParams(),
taskNode.isForbidden() ? Flag.NO : Flag.YES,
taskNode.getTaskInstancePriority(),
taskNode.getWorkerGroup(),
taskNode.getMaxRetryTimes(),
taskNode.getRetryInterval(),
taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE,
taskNode.getTaskTimeoutParameter().getStrategy(),
taskNode.getTaskTimeoutParameter().getInterval(),
now,
now);
taskDefinition.setResourceIds(processService.getResourceIds(taskDefinition));
// save the new task definition
taskDefinitionMapper.insert(taskDefinition);
// save task definition log
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
taskDefinitionLog.set(taskDefinition);
taskDefinitionLog.setOperator(loginUser.getId());
taskDefinitionLog.setOperateTime(now);
taskDefinitionLogMapper.insert(taskDefinitionLog);
int insert = processService.saveTaskDefinition(loginUser, project.getCode(), taskNode, taskDefinition);
// return taskDefinition object with code
result.put(Constants.DATA_LIST, code);
putMsg(result, Status.SUCCESS);
putMsg(result, Status.SUCCESS, insert);
return result;
}
@ -209,16 +171,20 @@ public class TaskDefinitionServiceImpl extends BaseService implements
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
checkTaskRelation(result, taskCode);
resultEnum = (Status) result.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
if (!processTaskRelationList.isEmpty()) {
Set<Long> processDefinitionCodes = processTaskRelationList
.stream()
.map(ProcessTaskRelation::getProcessDefinitionCode)
.collect(Collectors.toSet());
putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, StringUtils.join(processDefinitionCodes, ","));
return result;
}
int delete = taskDefinitionMapper.deleteByCode(taskCode);
if (delete > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR);
putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
}
return result;
}
@ -231,6 +197,7 @@ public class TaskDefinitionServiceImpl extends BaseService implements
* @param taskCode task code
* @param taskDefinitionJson task definition json
*/
@Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> updateTaskDefinition(User loginUser, String projectName, Long taskCode, String taskDefinitionJson) {
Map<String, Object> result = new HashMap<>(5);
@ -241,8 +208,8 @@ public class TaskDefinitionServiceImpl extends BaseService implements
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
checkTaskRelation(result, taskCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
if (processService.isTaskOnline(taskCode)) {
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(taskCode);
@ -255,30 +222,12 @@ public class TaskDefinitionServiceImpl extends BaseService implements
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
processService.updateTaskDefinition(loginUser, project.getCode(), taskNode, taskDefinition);
int update = processService.updateTaskDefinition(loginUser, project.getCode(), taskNode, taskDefinition);
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS);
putMsg(result, Status.SUCCESS, update);
return result;
}
public void checkTaskRelation(Map<String, Object> result, Long taskCode) {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode, taskCode);
if (!processTaskRelationList.isEmpty()) {
Set<Long> processDefinitionCodes = processTaskRelationList
.stream()
.map(ProcessTaskRelation::getProcessDefinitionCode)
.collect(Collectors.toSet());
List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryByCodes(processDefinitionCodes);
// check process definition is already online
for (ProcessDefinition processDefinition : processDefinitionList) {
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinition.getCode());
return;
}
}
}
}
public void checkTaskNode(Map<String, Object> result, TaskNode taskNode, String taskDefinitionJson) {
if (taskNode == null) {
@ -310,8 +259,8 @@ public class TaskDefinitionServiceImpl extends BaseService implements
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
checkTaskRelation(result, taskCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
if (processService.isTaskOnline(taskCode)) {
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(taskCode);

16
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java

@ -51,9 +51,9 @@ public class ProcessTaskRelation {
private String name;
/**
* version
* process version
*/
private int version;
private int processDefinitionVersion;
/**
* project code
@ -113,7 +113,7 @@ public class ProcessTaskRelation {
}
public ProcessTaskRelation(String name,
int version,
int processDefinitionVersion,
long projectCode,
long processDefinitionCode,
long preTaskCode,
@ -123,7 +123,7 @@ public class ProcessTaskRelation {
Date createTime,
Date updateTime) {
this.name = name;
this.version = version;
this.processDefinitionVersion = processDefinitionVersion;
this.projectCode = projectCode;
this.processDefinitionCode = processDefinitionCode;
this.preTaskCode = preTaskCode;
@ -201,12 +201,12 @@ public class ProcessTaskRelation {
this.conditionParamMap = conditionParamMap;
}
public int getVersion() {
return version;
public int getProcessDefinitionVersion() {
return processDefinitionVersion;
}
public void setVersion(int version) {
this.version = version;
public void setProcessDefinitionVersion(int processDefinitionVersion) {
this.processDefinitionVersion = processDefinitionVersion;
}
public long getProjectCode() {

27
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java

@ -17,12 +17,6 @@
package org.apache.dolphinscheduler.dao.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.fasterxml.jackson.annotation.JsonFormat;
import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -33,6 +27,13 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.fasterxml.jackson.annotation.JsonFormat;
/**
* process task relation log
*/
@ -51,9 +52,9 @@ public class ProcessTaskRelationLog {
private String name;
/**
* version
* process version
*/
private int version;
private int processDefinitionVersion;
/**
* project code
@ -187,12 +188,12 @@ public class ProcessTaskRelationLog {
this.conditionParamMap = conditionParamMap;
}
public int getVersion() {
return version;
public int getProcessDefinitionVersion() {
return processDefinitionVersion;
}
public void setVersion(int version) {
this.version = version;
public void setProcessDefinitionVersion(int processDefinitionVersion) {
this.processDefinitionVersion = processDefinitionVersion;
}
public long getProjectCode() {
@ -253,7 +254,7 @@ public class ProcessTaskRelationLog {
public void set(ProcessTaskRelation processTaskRelation) {
this.name = processTaskRelation.getName();
this.version = processTaskRelation.getVersion();
this.processDefinitionVersion = processTaskRelation.getProcessDefinitionVersion();
this.projectCode = processTaskRelation.getProjectCode();
this.processDefinitionCode = processTaskRelation.getProcessDefinitionCode();
this.preTaskCode = processTaskRelation.getPreTaskCode();

41
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java

@ -167,47 +167,6 @@ public class TaskDefinition {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date updateTime;
public TaskDefinition() {
}
public TaskDefinition(long code,
String name,
int version,
String description,
long projectCode,
int userId,
TaskType taskType,
String taskParams,
Flag flag,
Priority taskPriority,
String workerGroup,
int failRetryTimes,
int failRetryInterval,
TimeoutFlag timeoutFlag,
TaskTimeoutStrategy taskTimeoutStrategy,
int timeout,
Date createTime,
Date updateTime) {
this.code = code;
this.name = name;
this.version = version;
this.description = description;
this.projectCode = projectCode;
this.userId = userId;
this.taskType = taskType;
this.taskParams = taskParams;
this.flag = flag;
this.taskPriority = taskPriority;
this.workerGroup = workerGroup;
this.failRetryTimes = failRetryTimes;
this.failRetryInterval = failRetryInterval;
this.timeoutFlag = timeoutFlag;
this.taskTimeoutStrategy = taskTimeoutStrategy;
this.timeout = timeout;
this.createTime = createTime;
this.updateTime = updateTime;
}
public String getName() {
return name;
}

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java

@ -18,8 +18,11 @@
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
@ -35,7 +38,7 @@ public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinition
* @return process definition log list
*/
List<ProcessDefinitionLog> queryByDefinitionName(@Param("projectCode") Long projectCode,
@Param("processDefinitionName") String name);
@Param("processDefinitionName") String name);
/**
* query process definition log list
@ -47,8 +50,6 @@ public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinition
/**
* query max version for definition
* @param processDefinitionCode
* @return
*/
int queryMaxVersionForDefinition(@Param("processDefinitionCode") long processDefinitionCode);

13
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java

@ -19,6 +19,10 @@ package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
@ -26,4 +30,13 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
*/
public interface ProcessTaskRelationLogMapper extends BaseMapper<ProcessTaskRelationLog> {
/**
* query process task relation log
*
* @param processCode process definition code
* @param processVersion process version
* @return process task relation log
*/
List<ProcessTaskRelationLog> queryByProcessCodeAndVersion(@Param("processCode") long processCode,
@Param("processVersion") int processVersion);
}

33
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.ibatis.annotations.Param;
import java.util.Collection;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
@ -31,20 +32,38 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelation> {
/**
* process task relation by processDefinitionCode
* process task relation by projectCode and processCode
*
* @param processDefinitionCode processDefinitionCode
* @param projectCode projectCode
* @param processCode processCode
* @return ProcessTaskRelation list
*/
List<ProcessTaskRelation> queryByProcessCode(@Param("projectCode") Long projectCode,
@Param("processCode") Long processCode);
/**
* process task relation by taskCode
*
* @param taskCodes taskCode list
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryByProcessDefinitionCode(@Param("processDefinitionCode") String processDefinitionCode);
List<ProcessTaskRelation> queryByTaskCodes(@Param("taskCodes") Collection<Long> taskCodes);
/**
* process task relation by taskCode
*
* @param preTaskCode preTaskCode
* @param postTaskCode postTaskCode
* @param taskCode taskCode
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryByTaskCode(@Param("preTaskCode") Long preTaskCode,
@Param("postTaskCode") Long postTaskCode);
List<ProcessTaskRelation> queryByTaskCode(@Param("taskCode") Long taskCode);
/**
* delete process task relation by processCode
*
* @param projectCode projectCode
* @param processCode processCode
* @return int
*/
int deleteByCode(@Param("projectCode") Long projectCode,
@Param("processCode") Long processCode);
}

12
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml

@ -18,5 +18,15 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper">
<sql id="baseSql">
id, `name`, process_definition_version, project_code, process_definition_code, pre_task_code, post_task_code,
condition_type, condition_params, operator, operate_time, create_time, update_time
</sql>
<select id="queryByProcessCodeAndVersion" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog">
select
<include refid="baseSql"/>
from t_ds_process_task_relation_log
WHERE process_definition_code = #{processCode}
and process_definition_version = #{processVersion}
</select>
</mapper>

33
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

@ -19,22 +19,45 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper">
<sql id="baseSql">
id, `name`, version, project_code, process_definition_code, pre_task_code, post_task_code,
id, `name`, process_definition_version, project_code, process_definition_code, pre_task_code, post_task_code,
condition_type, condition_params, create_time, update_time
</sql>
<select id="queryByProcessDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
<select id="queryByProcessCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>
from t_ds_process_task_relation
WHERE process_definition_code = #{processDefinitionCode}
WHERE project_code = #{projectCode}
and process_definition_code = #{processCode}
</select>
<select id="queryByTaskCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>
from t_ds_process_task_relation
WHERE pre_task_code = #{preTaskCode}
WHERE pre_task_code = #{taskCode}
<if test="postTaskCode != 0">
or post_task_code = #{postTaskCode}
or post_task_code = #{taskCode}
</if>
</select>
<select id="queryByTaskCodes" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>
from t_ds_process_task_relation
WHERE 1 = 1
<if test="taskCodes != null and taskCodes.length != 0">
(and pre_task_code in
<foreach collection="taskCodes" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
or post_task_code in
<foreach collection="taskCodes" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
)
</if>
</select>
<delete id="deleteByCode">
delete from t_ds_process_task_relation
WHERE project_code = #{projectCode}
and process_definition_code = #{processCode}
</delete>
</mapper>

223
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -31,6 +31,7 @@ import static java.util.stream.Collectors.toSet;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@ -43,6 +44,7 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
@ -52,6 +54,8 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
@ -63,6 +67,8 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Resource;
@ -80,6 +86,8 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
@ -89,6 +97,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
@ -139,8 +148,6 @@ public class ProcessService {
@Autowired
private ProcessDefinitionLogMapper processDefineLogMapper;
@Autowired
private ProcessInstanceMapper processInstanceMapper;
@ -181,7 +188,10 @@ public class ProcessService {
private TaskDefinitionLogMapper taskDefinitionLogMapper;
@Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper;
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
@ -381,8 +391,6 @@ public class ProcessService {
/**
* covert log to process definition
* @param processDefinitionLog
* @return
*/
public ProcessDefinition convertFromLog(ProcessDefinitionLog processDefinitionLog) {
ProcessDefinition definition = null;
@ -743,10 +751,10 @@ public class ProcessService {
processInstance = this.findProcessInstanceDetailById(processInstanceId);
// Recalculate global parameters after rerun.
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
getCommandTypeIfComplement(processInstance, command),
processInstance.getScheduleTime()));
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
getCommandTypeIfComplement(processInstance, command),
processInstance.getScheduleTime()));
}
processDefinition = processDefineMapper.selectById(processInstance.getProcessDefinitionId());
processInstance.setProcessDefinition(processDefinition);
@ -1249,14 +1257,12 @@ public class ProcessService {
*/
public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState) {
ExecutionStatus state = taskInstance.getState();
if (
// running, delayed or killed
// the task already exists in task queue
// return state
state == ExecutionStatus.RUNNING_EXECUTION
|| state == ExecutionStatus.DELAY_EXECUTION
|| state == ExecutionStatus.KILL
) {
// running, delayed or killed
// the task already exists in task queue
// return state
if (state == ExecutionStatus.RUNNING_EXECUTION
|| state == ExecutionStatus.DELAY_EXECUTION
|| state == ExecutionStatus.KILL) {
return state;
}
//return pasue /stop if process instance state is ready pause / stop
@ -2058,8 +2064,6 @@ public class ProcessService {
/**
* solve the branch rename bug
*
* @param processData
* @param oldJson
* @return String
*/
public String changeJson(ProcessData processData, String oldJson) {
@ -2114,10 +2118,6 @@ public class ProcessService {
/**
* switch process definition version to process definition log version
*
* @param processDefinition
* @param processDefinitionLog
* @return
*/
public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) {
if (null == processDefinition || null == processDefinitionLog) {
@ -2132,23 +2132,29 @@ public class ProcessService {
int switchResult = 0;
if (0 == processDefinition.getId()) {
switchResult = processDefineMapper.insert(tmpDefinition);
} else {
switchResult = processDefineMapper.updateById(tmpDefinition);
}
//TODO... switch task relations
switchProcessTaskRelationVersion(processDefinition);
return switchResult;
}
public void switchProcessTaskRelationVersion(ProcessDefinition processDefinition) {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
if (!processTaskRelationList.isEmpty()) {
processTaskRelationMapper.deleteByCode(processDefinition.getProjectCode(), processDefinition.getCode());
}
List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
for (ProcessTaskRelationLog processTaskRelationLog : processTaskRelationLogList) {
ProcessTaskRelation processTaskRelation = JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog),
ProcessTaskRelation.class);
processTaskRelationMapper.insert(processTaskRelation);
}
}
/**
* update task definition
*
* @param operator
* @param projectCode
* @param taskNode
* @param taskDefinition
* @return
*/
public int updateTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) {
@ -2159,12 +2165,24 @@ public class ProcessService {
.max((x, y) -> x > y ? x : y)
.orElse(0) + 1;
Date now = new Date();
taskDefinition.setProjectCode(projectCode);
taskDefinition.setUserId(operator.getId());
taskDefinition.setVersion(version);
taskDefinition.setCode(taskDefinition.getCode());
taskDefinition.setUpdateTime(now);
setTaskFromTaskNode(taskNode, taskDefinition);
int update = taskDefinitionMapper.updateById(taskDefinition);
// save task definition log
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
taskDefinitionLog.set(taskDefinition);
taskDefinitionLog.setOperator(operator.getId());
taskDefinitionLog.setOperateTime(now);
int insert = taskDefinitionLogMapper.insert(taskDefinitionLog);
return insert & update;
}
private void setTaskFromTaskNode(TaskNode taskNode, TaskDefinition taskDefinition) {
taskDefinition.setName(taskNode.getName());
taskDefinition.setDescription(taskNode.getDesc());
taskDefinition.setProjectCode(projectCode);
taskDefinition.setUserId(operator.getId());
taskDefinition.setTaskType(TaskType.of(taskNode.getType()));
taskDefinition.setTaskParams(taskNode.getParams());
taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES);
@ -2175,16 +2193,7 @@ public class ProcessService {
taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
taskDefinition.setTaskTimeoutStrategy(taskNode.getTaskTimeoutParameter().getStrategy());
taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval());
taskDefinition.setUpdateTime(now);
taskDefinition.setResourceIds(getResourceIds(taskDefinition));
int update = taskDefinitionMapper.updateById(taskDefinition);
// save task definition log
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
taskDefinitionLog.set(taskDefinition);
taskDefinitionLog.setOperator(operator.getId());
taskDefinitionLog.setOperateTime(now);
int insert = taskDefinitionLogMapper.insert(taskDefinitionLog);
return insert & update;
}
/**
@ -2212,15 +2221,7 @@ public class ProcessService {
}
/**
* @param operator
* @param name
* @param desc
* @param locations
* @param connects
* @param project
* @param processData
* @param processDefinition
* @return
*
*/
public int saveProcessDefinition(User operator, Project project, String name, String desc, String locations,
String connects, ProcessData processData, ProcessDefinition processDefinition) {
@ -2230,28 +2231,20 @@ public class ProcessService {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionName(project.getCode(), task.getName());
updateTaskDefinition(operator, project.getCode(), task, taskDefinition);
}
createTaskAndRelation(operator, project.getName(), "", processDefinition, processData);
createTaskAndRelation(operator, project.getCode(), processDefinition, processData);
ProcessDefinitionLog processDefinitionLog = insertProcessDefinitionLog(operator, processDefinition.getCode(),
name, processData, project, desc, locations, connects);
return switchVersion(processDefinition, processDefinitionLog);
}
/**
* @param operator
* @param processDefinitionCode
* @param processDefinitionName
* @param processData
* @param project
* @param desc
* @param locations
* @param connects
* @return
*
*/
public ProcessDefinitionLog insertProcessDefinitionLog(User operator, Long processDefinitionCode, String processDefinitionName,
ProcessData processData, Project project,
String desc, String locations, String connects) {
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog();
int version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinitionLog.getCode());
int version = processDefineLogMapper.queryMaxVersionForDefinition(processDefinitionLog.getCode());
processDefinitionLog.setCode(processDefinitionCode);
processDefinitionLog.setVersion(version);
processDefinitionLog.setName(processDefinitionName);
@ -2275,7 +2268,7 @@ public class ProcessService {
}
processDefinitionLog.setGlobalParamList(globalParamsList);
processDefinitionLog.setFlag(Flag.YES);
int insert = processDefinitionLogMapper.insert(processDefinitionLog);
int insert = processDefineLogMapper.insert(processDefinitionLog);
if (insert > 0) {
return processDefinitionLog;
}
@ -2283,24 +2276,96 @@ public class ProcessService {
}
/**
* create task defintion and task relations
*
* @param loginUser
* @param projectName
* @param relationName
* @param processDefinition
* @param processData
* @return
* create task definition and task relations
*/
public void createTaskAndRelation(User loginUser, String projectName, String relationName,
ProcessDefinition processDefinition,
ProcessData processData) {
public int createTaskAndRelation(User operator,
Long projectCode,
ProcessDefinition processDefinition,
ProcessData processData) {
List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
for (TaskNode task : taskNodeList) {
//TODO... task code exists, update task
//createTaskDefinition(loginUser, projectName, JSONUtils.toJsonString(task));
for (TaskNode taskNode : taskNodeList) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionName(projectCode, taskNode.getName());
if (taskDefinition == null) {
long code;
try {
code = SnowFlakeUtils.getInstance().nextId();
taskDefinition = new TaskDefinition();
taskDefinition.setCode(code);
} catch (SnowFlakeException e) {
logger.error("Task code get error, ", e);
return -1;
}
saveTaskDefinition(operator, projectCode, taskNode, taskDefinition);
} else {
if (isTaskOnline(taskDefinition.getCode())) {
// TODO return something for fail
return -1;
}
updateTaskDefinition(operator, projectCode, taskNode, taskDefinition);
}
}
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
if (!processTaskRelationList.isEmpty()) {
processTaskRelationMapper.deleteByCode(projectCode, processDefinition.getCode());
}
// TODO: query taskCode by projectCode and taskName
// TODO parse taskNodeList for preTaskCode and postTaskCode
List<TaskNodeRelation> taskNodeRelationList = DagHelper.getProcessDag(taskNodeList).getEdges();
Date now = new Date();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation("",// todo relation name
processDefinition.getVersion(),
projectCode,
processDefinition.getCode(),
0L, // todo pre task code
0L, // todo post task code
ConditionType.of(""), // todo conditionType
"", // todo conditionParams
now,
now);
// save process task relation
int insert = processTaskRelationMapper.insert(processTaskRelation);
// save process task relation log
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.set(processTaskRelation);
processTaskRelationLog.setOperator(operator.getId());
processTaskRelationLog.setOperateTime(now);
int logInsert = processTaskRelationLogMapper.insert(processTaskRelationLog);
return insert & logInsert;
}
public int saveTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) {
Date now = new Date();
taskDefinition.setProjectCode(projectCode);
taskDefinition.setUserId(operator.getId());
taskDefinition.setVersion(1);
taskDefinition.setUpdateTime(now);
taskDefinition.setCreateTime(now);
setTaskFromTaskNode(taskNode, taskDefinition);
// save the new task definition
int insert = taskDefinitionMapper.insert(taskDefinition);
// save task definition log
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
taskDefinitionLog.set(taskDefinition);
taskDefinitionLog.setOperator(operator.getId());
taskDefinitionLog.setOperateTime(now);
int logInsert = taskDefinitionLogMapper.insert(taskDefinitionLog);
return insert & logInsert;
}
public boolean isTaskOnline(Long taskCode) {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
if (!processTaskRelationList.isEmpty()) {
Set<Long> processDefinitionCodes = processTaskRelationList
.stream()
.map(ProcessTaskRelation::getProcessDefinitionCode)
.collect(Collectors.toSet());
List<ProcessDefinition> processDefinitionList = processDefineMapper.queryByCodes(processDefinitionCodes);
// check process definition is already online
for (ProcessDefinition processDefinition : processDefinitionList) {
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
return true;
}
}
}
return false;
}
}

8
sql/dolphinscheduler-postgre.sql

@ -394,12 +394,10 @@ DROP TABLE IF EXISTS t_ds_process_task_relation;
CREATE TABLE t_ds_process_task_relation (
id int NOT NULL ,
name varchar(255) DEFAULT NULL ,
version int DEFAULT NULL ,
process_definition_version int DEFAULT NULL ,
project_code bigint DEFAULT NULL ,
process_definition_code bigint DEFAULT NULL ,
pre_project_code bigint DEFAULT NULL ,
pre_task_code bigint DEFAULT NULL ,
post_project_code bigint DEFAULT NULL ,
post_task_code bigint DEFAULT NULL ,
condition_type int DEFAULT NULL ,
condition_params text ,
@ -412,12 +410,10 @@ DROP TABLE IF EXISTS t_ds_process_task_relation_log;
CREATE TABLE t_ds_process_task_relation_log (
id int NOT NULL ,
name varchar(255) DEFAULT NULL ,
version int DEFAULT NULL ,
process_definition_version int DEFAULT NULL ,
project_code bigint DEFAULT NULL ,
process_definition_code bigint DEFAULT NULL ,
pre_project_code bigint DEFAULT NULL ,
pre_task_code bigint DEFAULT NULL ,
post_project_code bigint DEFAULT NULL ,
post_task_code bigint DEFAULT NULL ,
condition_type int DEFAULT NULL ,
condition_params text ,

4
sql/dolphinscheduler_mysql.sql

@ -509,7 +509,7 @@ DROP TABLE IF EXISTS `t_ds_process_task_relation`;
CREATE TABLE `t_ds_process_task_relation` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id',
`name` varchar(200) DEFAULT NULL COMMENT 'relation name',
`version` int(11) DEFAULT NULL COMMENT 'relation version',
`process_definition_version` int(11) DEFAULT NULL COMMENT 'process version',
`project_code` bigint(20) NOT NULL COMMENT 'project code',
`process_definition_code` bigint(20) NOT NULL COMMENT 'process code',
`pre_task_code` bigint(20) NOT NULL COMMENT 'pre task code',
@ -528,7 +528,7 @@ DROP TABLE IF EXISTS `t_ds_process_task_relation_log`;
CREATE TABLE `t_ds_process_task_relation_log` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id',
`name` varchar(200) DEFAULT NULL COMMENT 'relation name',
`version` int(11) DEFAULT NULL COMMENT 'relation version',
`process_definition_version` int(11) DEFAULT NULL COMMENT 'process version',
`project_code` bigint(20) NOT NULL COMMENT 'project code',
`process_definition_code` bigint(20) NOT NULL COMMENT 'process code',
`pre_task_code` bigint(20) NOT NULL COMMENT 'pre task code',

Loading…
Cancel
Save