diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 6571930c92..510b4254dd 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -254,6 +254,10 @@ public enum Status { BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028,"batch export process definition by ids error", "批量导出工作流定义错误"), IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"), TASK_DEFINE_NOT_EXIST(50030, "task definition {0} does not exist", "任务定义[{0}]不存在"), + DELETE_TASK_DEFINE_BY_CODE_ERROR(50031, "delete task definition by code error", "删除任务定义错误"), + DELETE_PROCESS_TASK_RELATION_ERROR(50032, "delete process task relation error", "删除工作流任务关系错误"), + PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation {0} does not exist", "工作流任务关系[{0}]不存在"), + PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already exist, processCode:[{0}]", "工作流任务关系已存在, processCode:[{0}]"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index 7976882f56..3e99dc4bd1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -59,7 +59,6 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.service.process.ProcessService; - import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -459,16 +458,6 @@ public class ProcessInstanceService extends BaseService { /** * sync definition according process instance - * - * @param loginUser - * @param project - * @param processInstanceJson - * @param locations - * @param connects - * @param processInstance - * @param processDefinition - * @param processData - * @return */ private int syncDefinition(User loginUser, Project project, String processInstanceJson, String locations, String connects, ProcessInstance processInstance, ProcessDefinition processDefinition, @@ -491,13 +480,6 @@ public class ProcessInstanceService extends BaseService { /** * update process instance attributes * - * @param processInstance - * @param tenant - * @param scheduleTime - * @param locations - * @param connects - * @param processInstanceJson - * @param processData * @return false if check failed or */ private void setProcessInstance(ProcessInstance processInstance, Tenant tenant, @@ -747,6 +729,7 @@ public class ProcessInstanceService extends BaseService { /** * query process instance by processDefinitionId and stateArray + * * @param processDefinitionId processDefinitionId * @param states states array * @return process instance list @@ -757,11 +740,12 @@ public class ProcessInstanceService extends BaseService { /** * query process instance by processDefinitionId + * * @param processDefinitionId processDefinitionId * @param size size * @return process instance list */ - public List queryByProcessDefineId(int processDefinitionId,int size) { + public List queryByProcessDefineId(int processDefinitionId, int size) { return processInstanceMapper.queryByProcessDefineId(processDefinitionId, size); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java index 0d1d0eaea1..76b6389c46 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java @@ -26,29 +26,6 @@ import java.util.Map; */ public interface ProcessTaskRelationService { - /** - * create process task relation - * - * @param loginUser login user - * @param name relation name - * @param projectName process name - * @param processDefinitionCode process definition code - * @param preTaskCode pre task code - * @param postTaskCode post task code - * @param conditionType condition type - * @param conditionParams condition params - * @return create result code - */ - Map createProcessTaskRelation(User loginUser, - String name, - String projectName, - Long processDefinitionCode, - Long preTaskCode, - Long postTaskCode, - String conditionType, - String conditionParams); - - /** * query process task relation * @@ -59,66 +36,5 @@ public interface ProcessTaskRelationService { Map queryProcessTaskRelation(User loginUser, String projectName, Long processDefinitionCode); - - /** - * delete process task relation - * - * @param loginUser login user - * @param projectName project name - * @param processDefinitionCode process definition code - */ - Map deleteTaskDefinitionByProcessCode(User loginUser, - String projectName, - Long processDefinitionCode); - - /** - * delete process task relation - * - * @param loginUser login user - * @param projectName project name - * @param preTaskCode pre task code - */ - Map deleteTaskDefinitionByTaskCode(User loginUser, - String projectName, - Long preTaskCode); - - - /** - * update process task relation - * - * @param loginUser login user - * @param id process task relation id - * @param name relation name - * @param projectName process name - * @param processDefinitionCode process definition code - * @param preTaskCode pre task code - * @param postTaskCode post task code - * @param conditionType condition type - * @param conditionParams condition params - */ - Map updateTaskDefinition(User loginUser, - int id, - String name, - String projectName, - Long processDefinitionCode, - Long preTaskCode, - Long postTaskCode, - String conditionType, - String conditionParams); - - - /** - * switch process task relation version - * - * @param loginUser login user - * @param projectName project name - * @param processTaskRelationId process task relation id - * @param version the version user want to switch - * @return switch process task relation version result code - */ - Map switchVersion(User loginUser, - String projectName, - int processTaskRelationId, - int version); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 03515e325d..575cf19afb 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -26,10 +26,8 @@ import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.BaseService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.service.ProcessInstanceService; -import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.SchedulerService; -import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.utils.CheckUtils; import org.apache.dolphinscheduler.api.utils.FileUtils; import org.apache.dolphinscheduler.api.utils.PageInfo; @@ -68,15 +66,14 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; -import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.service.permission.PermissionCheck; import org.apache.dolphinscheduler.service.process.ProcessService; - import java.io.BufferedOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -132,12 +129,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements @Autowired private ProjectService projectService; - @Autowired - private TaskDefinitionService taskDefinitionService; - - @Autowired - private ProcessTaskRelationService processTaskRelationService; - @Autowired private ProcessDefinitionLogMapper processDefinitionLogMapper; @@ -157,7 +148,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements private ProcessService processService; @Autowired - private TaskDefinitionMapper taskDefinitionMapper; + private ProcessTaskRelationMapper processTaskRelationMapper; /** * create process definition @@ -205,10 +196,11 @@ public class ProcessDefinitionServiceImpl extends BaseService implements putMsg(result, Status.CREATE_PROCESS_DEFINITION); return result; } - ProcessDefinitionLog processDefinitionLog = processService.insertProcessDefinitionLog(loginUser, processDefinitionCode, processDefinitionName, processData, + ProcessDefinitionLog processDefinitionLog = processService.insertProcessDefinitionLog(loginUser, processDefinitionCode, processDefinitionName, processData, project, desc, locations, connects); processService.switchVersion(processDefinition, processDefinitionLog); - processService.createTaskAndRelation(loginUser, projectName, "", processDefinition, processData); + // TODO relationName have ? + processService.createTaskAndRelation(loginUser, project.getCode(), processDefinition, processData); // return processDefinition object with ID result.put(Constants.DATA_LIST, processDefinition.getId()); @@ -538,7 +530,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements // TODO: replace id to code // ProcessDefinition processDefinition = processDefineMapper.deleteByCode(processDefinitionCode); int delete = processDefinitionMapper.deleteById(processDefinitionId); - + processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()); if (delete > 0) { putMsg(result, Status.SUCCESS); } else { @@ -1613,7 +1605,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements } ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper - .queryByDefinitionCodeAndVersion(processDefinition.getCode(),version); + .queryByDefinitionCodeAndVersion(processDefinition.getCode(), version); if (Objects.isNull(processDefinitionLog)) { putMsg(result diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java index 0776688121..fdc9d1f8fe 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java @@ -66,17 +66,17 @@ public class ProcessDefinitionVersionServiceImpl extends BaseService implements long version = this.queryMaxVersionByProcessDefinitionId(processDefinition.getId()) + 1; ProcessDefinitionVersion processDefinitionVersion = ProcessDefinitionVersion - .newBuilder() - .processDefinitionId(processDefinition.getId()) - .version(version) - .processDefinitionJson(processDefinition.getProcessDefinitionJson()) - .description(processDefinition.getDescription()) - .locations(processDefinition.getLocations()) - .connects(processDefinition.getConnects()) - .timeout(processDefinition.getTimeout()) - .globalParams(processDefinition.getGlobalParams()) - .createTime(processDefinition.getUpdateTime()) - .warningGroupId(processDefinition.getWarningGroupId()) + .newBuilder() + .processDefinitionId(processDefinition.getId()) + .version(version) + .processDefinitionJson(processDefinition.getProcessDefinitionJson()) + .description(processDefinition.getDescription()) + .locations(processDefinition.getLocations()) + .connects(processDefinition.getConnects()) + .timeout(processDefinition.getTimeout()) + .globalParams(processDefinition.getGlobalParams()) + .createTime(processDefinition.getUpdateTime()) + .warningGroupId(processDefinition.getWarningGroupId()) .resourceIds(processDefinition.getResourceIds()) .build(); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java index f7c57063ac..5ea01bd9d1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java @@ -22,28 +22,20 @@ import org.apache.dolphinscheduler.api.service.BaseService; import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.ConditionType; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; -import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.Project; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; -import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; -import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; /** * task definition service impl @@ -60,167 +52,28 @@ public class ProcessTaskRelationServiceImpl extends BaseService implements @Autowired private ProjectService projectService; - @Autowired - private ProcessDefinitionMapper processDefineMapper; - @Autowired private ProcessTaskRelationMapper processTaskRelationMapper; - @Autowired - private ProcessTaskRelationLogMapper processTaskRelationLogMapper; - - @Autowired - private TaskDefinitionMapper taskDefinitionMapper; - /** - * create process task relation + * query process task relation * * @param loginUser login user - * @param name relation name - * @param projectName process name + * @param projectName project name * @param processDefinitionCode process definition code - * @param preTaskCode pre task code - * @param postTaskCode post task code - * @param conditionType condition type - * @param conditionParams condition params - * @return create result code */ - @Transactional @Override - public Map createProcessTaskRelation(User loginUser, - String name, - String projectName, - Long processDefinitionCode, - Long preTaskCode, - Long postTaskCode, - String conditionType, - String conditionParams) { + public Map queryProcessTaskRelation(User loginUser, String projectName, Long processDefinitionCode) { Map result = new HashMap<>(); Project project = projectMapper.queryByName(projectName); // check project auth Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultStatus = (Status) checkResult.get(Constants.STATUS); - if (resultStatus != Status.SUCCESS) { + if (checkResult.get(Constants.STATUS) != Status.SUCCESS) { return checkResult; } - // check processDefinitionCode - ProcessDefinition processDefinition = processDefineMapper.queryByCode(processDefinitionCode); - if (processDefinition == null) { - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode); - return result; - } - // check preTaskCode and postTaskCode - checkTaskDefinitionCode(result, preTaskCode); - if (postTaskCode > 0) { - checkTaskDefinitionCode(result, postTaskCode); - } - resultStatus = (Status) result.get(Constants.STATUS); - if (resultStatus != Status.SUCCESS) { - return result; - } - Date now = new Date(); - ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(name, - 1, - project.getCode(), - processDefinitionCode, - preTaskCode, - postTaskCode, - ConditionType.of(conditionType), - conditionParams, - now, - now); - // save process task relation - processTaskRelationMapper.insert(processTaskRelation); - // save process task relation log - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); - processTaskRelationLog.set(processTaskRelation); - processTaskRelationLog.setOperator(loginUser.getId()); - processTaskRelationLog.setOperateTime(now); - processTaskRelationLogMapper.insert(processTaskRelationLog); + List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(project.getCode(), processDefinitionCode); + result.put(Constants.DATA_LIST, processTaskRelationList); putMsg(result, Status.SUCCESS); return result; } - - private void checkTaskDefinitionCode(Map result, Long taskCode) { - TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(taskCode); - if (taskDefinition == null) { - putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); - } - } - - /** - * query process task relation - * - * @param loginUser login user - * @param projectName project name - * @param processDefinitionCode process definition code - */ - @Override - public Map queryProcessTaskRelation(User loginUser, String projectName, Long processDefinitionCode) { - return null; - } - - /** - * delete process task relation - * - * @param loginUser login user - * @param projectName project name - * @param processDefinitionCode process definition code - */ - @Override - public Map deleteTaskDefinitionByProcessCode(User loginUser, String projectName, Long processDefinitionCode) { - return null; - } - - /** - * delete process task relation - * - * @param loginUser login user - * @param projectName project name - * @param preTaskCode pre task code - */ - @Override - public Map deleteTaskDefinitionByTaskCode(User loginUser, String projectName, Long preTaskCode) { - return null; - } - - /** - * update process task relation - * - * @param loginUser login user - * @param id process task relation id - * @param name relation name - * @param projectName process name - * @param processDefinitionCode process definition code - * @param preTaskCode pre task code - * @param postTaskCode post task code - * @param conditionType condition type - * @param conditionParams condition params - */ - @Override - public Map updateTaskDefinition(User loginUser, - int id, - String name, - String projectName, - Long processDefinitionCode, - Long preTaskCode, - Long postTaskCode, - String conditionType, - String conditionParams) { - return null; - } - - /** - * switch process task relation version - * - * @param loginUser login user - * @param projectName project name - * @param processTaskRelationId process task relation id - * @param version the version user want to switch - * @return switch process task relation version result code - */ - @Override - public Map switchVersion(User loginUser, String projectName, int processTaskRelationId, int version) { - return null; - } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index e140052807..8b1e6720ae 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -23,26 +23,16 @@ import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.utils.CheckUtils; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.Flag; -import org.apache.dolphinscheduler.common.enums.ReleaseState; -import org.apache.dolphinscheduler.common.enums.TaskType; -import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.model.TaskNode; -import org.apache.dolphinscheduler.common.process.ResourceInfo; -import org.apache.dolphinscheduler.common.task.AbstractParameters; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils; import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; @@ -86,9 +76,6 @@ public class TaskDefinitionServiceImpl extends BaseService implements @Autowired private ProcessTaskRelationMapper processTaskRelationMapper; - @Autowired - private ProcessDefinitionMapper processDefinitionMapper; - @Autowired private ProcessService processService; @@ -99,7 +86,7 @@ public class TaskDefinitionServiceImpl extends BaseService implements * @param projectName project name * @param taskDefinitionJson task definition json */ - @Transactional(rollbackFor = Exception.class) + @Transactional(rollbackFor = RuntimeException.class) @Override public Map createTaskDefinition(User loginUser, String projectName, @@ -119,9 +106,11 @@ public class TaskDefinitionServiceImpl extends BaseService implements if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } + TaskDefinition taskDefinition = new TaskDefinition(); long code = 0L; try { code = SnowFlakeUtils.getInstance().nextId(); + taskDefinition.setCode(code); } catch (SnowFlakeException e) { logger.error("Task code get error, ", e); } @@ -129,37 +118,10 @@ public class TaskDefinitionServiceImpl extends BaseService implements putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code"); return result; } - Date now = new Date(); - TaskDefinition taskDefinition = new TaskDefinition(code, - taskNode.getName(), - 1, - taskNode.getDesc(), - project.getCode(), - loginUser.getId(), - TaskType.of(taskNode.getType()), - taskNode.getParams(), - taskNode.isForbidden() ? Flag.NO : Flag.YES, - taskNode.getTaskInstancePriority(), - taskNode.getWorkerGroup(), - taskNode.getMaxRetryTimes(), - taskNode.getRetryInterval(), - taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE, - taskNode.getTaskTimeoutParameter().getStrategy(), - taskNode.getTaskTimeoutParameter().getInterval(), - now, - now); - taskDefinition.setResourceIds(processService.getResourceIds(taskDefinition)); - // save the new task definition - taskDefinitionMapper.insert(taskDefinition); - // save task definition log - TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(); - taskDefinitionLog.set(taskDefinition); - taskDefinitionLog.setOperator(loginUser.getId()); - taskDefinitionLog.setOperateTime(now); - taskDefinitionLogMapper.insert(taskDefinitionLog); + int insert = processService.saveTaskDefinition(loginUser, project.getCode(), taskNode, taskDefinition); // return taskDefinition object with code result.put(Constants.DATA_LIST, code); - putMsg(result, Status.SUCCESS); + putMsg(result, Status.SUCCESS, insert); return result; } @@ -209,16 +171,20 @@ public class TaskDefinitionServiceImpl extends BaseService implements if (resultEnum != Status.SUCCESS) { return checkResult; } - checkTaskRelation(result, taskCode); - resultEnum = (Status) result.get(Constants.STATUS); - if (resultEnum != Status.SUCCESS) { + List processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode); + if (!processTaskRelationList.isEmpty()) { + Set processDefinitionCodes = processTaskRelationList + .stream() + .map(ProcessTaskRelation::getProcessDefinitionCode) + .collect(Collectors.toSet()); + putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, StringUtils.join(processDefinitionCodes, ",")); return result; } int delete = taskDefinitionMapper.deleteByCode(taskCode); if (delete > 0) { putMsg(result, Status.SUCCESS); } else { - putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR); + putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); } return result; } @@ -231,6 +197,7 @@ public class TaskDefinitionServiceImpl extends BaseService implements * @param taskCode task code * @param taskDefinitionJson task definition json */ + @Transactional(rollbackFor = RuntimeException.class) @Override public Map updateTaskDefinition(User loginUser, String projectName, Long taskCode, String taskDefinitionJson) { Map result = new HashMap<>(5); @@ -241,8 +208,8 @@ public class TaskDefinitionServiceImpl extends BaseService implements if (resultEnum != Status.SUCCESS) { return checkResult; } - checkTaskRelation(result, taskCode); - if (result.get(Constants.STATUS) != Status.SUCCESS) { + if (processService.isTaskOnline(taskCode)) { + putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE); return result; } TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(taskCode); @@ -255,30 +222,12 @@ public class TaskDefinitionServiceImpl extends BaseService implements if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } - - processService.updateTaskDefinition(loginUser, project.getCode(), taskNode, taskDefinition); + int update = processService.updateTaskDefinition(loginUser, project.getCode(), taskNode, taskDefinition); result.put(Constants.DATA_LIST, taskCode); - putMsg(result, Status.SUCCESS); + putMsg(result, Status.SUCCESS, update); return result; } - public void checkTaskRelation(Map result, Long taskCode) { - List processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode, taskCode); - if (!processTaskRelationList.isEmpty()) { - Set processDefinitionCodes = processTaskRelationList - .stream() - .map(ProcessTaskRelation::getProcessDefinitionCode) - .collect(Collectors.toSet()); - List processDefinitionList = processDefinitionMapper.queryByCodes(processDefinitionCodes); - // check process definition is already online - for (ProcessDefinition processDefinition : processDefinitionList) { - if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { - putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinition.getCode()); - return; - } - } - } - } public void checkTaskNode(Map result, TaskNode taskNode, String taskDefinitionJson) { if (taskNode == null) { @@ -310,8 +259,8 @@ public class TaskDefinitionServiceImpl extends BaseService implements if (resultEnum != Status.SUCCESS) { return checkResult; } - checkTaskRelation(result, taskCode); - if (result.get(Constants.STATUS) != Status.SUCCESS) { + if (processService.isTaskOnline(taskCode)) { + putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE); return result; } TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(taskCode); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java index 3f8b25640f..13a804d2b2 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java @@ -51,9 +51,9 @@ public class ProcessTaskRelation { private String name; /** - * version + * process version */ - private int version; + private int processDefinitionVersion; /** * project code @@ -113,7 +113,7 @@ public class ProcessTaskRelation { } public ProcessTaskRelation(String name, - int version, + int processDefinitionVersion, long projectCode, long processDefinitionCode, long preTaskCode, @@ -123,7 +123,7 @@ public class ProcessTaskRelation { Date createTime, Date updateTime) { this.name = name; - this.version = version; + this.processDefinitionVersion = processDefinitionVersion; this.projectCode = projectCode; this.processDefinitionCode = processDefinitionCode; this.preTaskCode = preTaskCode; @@ -201,12 +201,12 @@ public class ProcessTaskRelation { this.conditionParamMap = conditionParamMap; } - public int getVersion() { - return version; + public int getProcessDefinitionVersion() { + return processDefinitionVersion; } - public void setVersion(int version) { - this.version = version; + public void setProcessDefinitionVersion(int processDefinitionVersion) { + this.processDefinitionVersion = processDefinitionVersion; } public long getProjectCode() { diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java index e25e2dfc79..523bf4eba0 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java @@ -17,12 +17,6 @@ package org.apache.dolphinscheduler.dao.entity; -import com.baomidou.mybatisplus.annotation.IdType; -import com.baomidou.mybatisplus.annotation.TableField; -import com.baomidou.mybatisplus.annotation.TableId; -import com.baomidou.mybatisplus.annotation.TableName; -import com.baomidou.mybatisplus.core.toolkit.StringUtils; -import com.fasterxml.jackson.annotation.JsonFormat; import org.apache.dolphinscheduler.common.enums.ConditionType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -33,6 +27,13 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.baomidou.mybatisplus.core.toolkit.StringUtils; +import com.fasterxml.jackson.annotation.JsonFormat; + /** * process task relation log */ @@ -51,9 +52,9 @@ public class ProcessTaskRelationLog { private String name; /** - * version + * process version */ - private int version; + private int processDefinitionVersion; /** * project code @@ -187,12 +188,12 @@ public class ProcessTaskRelationLog { this.conditionParamMap = conditionParamMap; } - public int getVersion() { - return version; + public int getProcessDefinitionVersion() { + return processDefinitionVersion; } - public void setVersion(int version) { - this.version = version; + public void setProcessDefinitionVersion(int processDefinitionVersion) { + this.processDefinitionVersion = processDefinitionVersion; } public long getProjectCode() { @@ -253,7 +254,7 @@ public class ProcessTaskRelationLog { public void set(ProcessTaskRelation processTaskRelation) { this.name = processTaskRelation.getName(); - this.version = processTaskRelation.getVersion(); + this.processDefinitionVersion = processTaskRelation.getProcessDefinitionVersion(); this.projectCode = processTaskRelation.getProjectCode(); this.processDefinitionCode = processTaskRelation.getProcessDefinitionCode(); this.preTaskCode = processTaskRelation.getPreTaskCode(); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java index a7482cbd9e..1158aae5ad 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java @@ -167,47 +167,6 @@ public class TaskDefinition { @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date updateTime; - public TaskDefinition() { - } - - public TaskDefinition(long code, - String name, - int version, - String description, - long projectCode, - int userId, - TaskType taskType, - String taskParams, - Flag flag, - Priority taskPriority, - String workerGroup, - int failRetryTimes, - int failRetryInterval, - TimeoutFlag timeoutFlag, - TaskTimeoutStrategy taskTimeoutStrategy, - int timeout, - Date createTime, - Date updateTime) { - this.code = code; - this.name = name; - this.version = version; - this.description = description; - this.projectCode = projectCode; - this.userId = userId; - this.taskType = taskType; - this.taskParams = taskParams; - this.flag = flag; - this.taskPriority = taskPriority; - this.workerGroup = workerGroup; - this.failRetryTimes = failRetryTimes; - this.failRetryInterval = failRetryInterval; - this.timeoutFlag = timeoutFlag; - this.taskTimeoutStrategy = taskTimeoutStrategy; - this.timeout = timeout; - this.createTime = createTime; - this.updateTime = updateTime; - } - public String getName() { return name; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java index 18c3b969b1..c6d3a54fd8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java @@ -18,8 +18,11 @@ package org.apache.dolphinscheduler.dao.mapper; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; + import org.apache.ibatis.annotations.Param; + import java.util.List; + import com.baomidou.mybatisplus.core.mapper.BaseMapper; /** @@ -35,7 +38,7 @@ public interface ProcessDefinitionLogMapper extends BaseMapper queryByDefinitionName(@Param("projectCode") Long projectCode, - @Param("processDefinitionName") String name); + @Param("processDefinitionName") String name); /** * query process definition log list @@ -47,8 +50,6 @@ public interface ProcessDefinitionLogMapper extends BaseMapper { + /** + * query process task relation log + * + * @param processCode process definition code + * @param processVersion process version + * @return process task relation log + */ + List queryByProcessCodeAndVersion(@Param("processCode") long processCode, + @Param("processVersion") int processVersion); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java index 1612fec1e5..5e5fd3138f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.ibatis.annotations.Param; +import java.util.Collection; import java.util.List; import com.baomidou.mybatisplus.core.mapper.BaseMapper; @@ -31,20 +32,38 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper; public interface ProcessTaskRelationMapper extends BaseMapper { /** - * process task relation by processDefinitionCode + * process task relation by projectCode and processCode * - * @param processDefinitionCode processDefinitionCode + * @param projectCode projectCode + * @param processCode processCode + * @return ProcessTaskRelation list + */ + List queryByProcessCode(@Param("projectCode") Long projectCode, + @Param("processCode") Long processCode); + + /** + * process task relation by taskCode + * + * @param taskCodes taskCode list * @return ProcessTaskRelation */ - List queryByProcessDefinitionCode(@Param("processDefinitionCode") String processDefinitionCode); + List queryByTaskCodes(@Param("taskCodes") Collection taskCodes); /** * process task relation by taskCode * - * @param preTaskCode preTaskCode - * @param postTaskCode postTaskCode + * @param taskCode taskCode * @return ProcessTaskRelation */ - List queryByTaskCode(@Param("preTaskCode") Long preTaskCode, - @Param("postTaskCode") Long postTaskCode); + List queryByTaskCode(@Param("taskCode") Long taskCode); + + /** + * delete process task relation by processCode + * + * @param projectCode projectCode + * @param processCode processCode + * @return int + */ + int deleteByCode(@Param("projectCode") Long projectCode, + @Param("processCode") Long processCode); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml index 94990d4518..e719af6d24 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml @@ -18,5 +18,15 @@ - + + id, `name`, process_definition_version, project_code, process_definition_code, pre_task_code, post_task_code, + condition_type, condition_params, operator, operate_time, create_time, update_time + + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml index 65c69063bf..8249db5531 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml @@ -19,22 +19,45 @@ - id, `name`, version, project_code, process_definition_code, pre_task_code, post_task_code, + id, `name`, process_definition_version, project_code, process_definition_code, pre_task_code, post_task_code, condition_type, condition_params, create_time, update_time - select from t_ds_process_task_relation - WHERE process_definition_code = #{processDefinitionCode} + WHERE project_code = #{projectCode} + and process_definition_code = #{processCode} + + + delete from t_ds_process_task_relation + WHERE project_code = #{projectCode} + and process_definition_code = #{processCode} + diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 4acf6fe6cd..b6a6026879 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -31,6 +31,7 @@ import static java.util.stream.Collectors.toSet; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ConditionType; import org.apache.dolphinscheduler.common.enums.CycleEnum; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.FailureStrategy; @@ -43,6 +44,7 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.model.DateInterval; import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; @@ -52,6 +54,8 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils; +import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.entity.Command; @@ -63,6 +67,8 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.Resource; @@ -80,6 +86,8 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; @@ -89,6 +97,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; +import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; @@ -139,8 +148,6 @@ public class ProcessService { @Autowired private ProcessDefinitionLogMapper processDefineLogMapper; - - @Autowired private ProcessInstanceMapper processInstanceMapper; @@ -181,7 +188,10 @@ public class ProcessService { private TaskDefinitionLogMapper taskDefinitionLogMapper; @Autowired - private ProcessDefinitionLogMapper processDefinitionLogMapper; + private ProcessTaskRelationMapper processTaskRelationMapper; + + @Autowired + private ProcessTaskRelationLogMapper processTaskRelationLogMapper; /** * handle Command (construct ProcessInstance from Command) , wrapped in transaction @@ -381,8 +391,6 @@ public class ProcessService { /** * covert log to process definition - * @param processDefinitionLog - * @return */ public ProcessDefinition convertFromLog(ProcessDefinitionLog processDefinitionLog) { ProcessDefinition definition = null; @@ -743,10 +751,10 @@ public class ProcessService { processInstance = this.findProcessInstanceDetailById(processInstanceId); // Recalculate global parameters after rerun. processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - getCommandTypeIfComplement(processInstance, command), - processInstance.getScheduleTime())); + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + getCommandTypeIfComplement(processInstance, command), + processInstance.getScheduleTime())); } processDefinition = processDefineMapper.selectById(processInstance.getProcessDefinitionId()); processInstance.setProcessDefinition(processDefinition); @@ -1249,14 +1257,12 @@ public class ProcessService { */ public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState) { ExecutionStatus state = taskInstance.getState(); - if ( - // running, delayed or killed - // the task already exists in task queue - // return state - state == ExecutionStatus.RUNNING_EXECUTION - || state == ExecutionStatus.DELAY_EXECUTION - || state == ExecutionStatus.KILL - ) { + // running, delayed or killed + // the task already exists in task queue + // return state + if (state == ExecutionStatus.RUNNING_EXECUTION + || state == ExecutionStatus.DELAY_EXECUTION + || state == ExecutionStatus.KILL) { return state; } //return pasue /stop if process instance state is ready pause / stop @@ -2058,8 +2064,6 @@ public class ProcessService { /** * solve the branch rename bug * - * @param processData - * @param oldJson * @return String */ public String changeJson(ProcessData processData, String oldJson) { @@ -2114,10 +2118,6 @@ public class ProcessService { /** * switch process definition version to process definition log version - * - * @param processDefinition - * @param processDefinitionLog - * @return */ public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) { if (null == processDefinition || null == processDefinitionLog) { @@ -2132,23 +2132,29 @@ public class ProcessService { int switchResult = 0; if (0 == processDefinition.getId()) { - switchResult = processDefineMapper.insert(tmpDefinition); } else { switchResult = processDefineMapper.updateById(tmpDefinition); } - //TODO... switch task relations + switchProcessTaskRelationVersion(processDefinition); return switchResult; } + public void switchProcessTaskRelationVersion(ProcessDefinition processDefinition) { + List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()); + if (!processTaskRelationList.isEmpty()) { + processTaskRelationMapper.deleteByCode(processDefinition.getProjectCode(), processDefinition.getCode()); + } + List processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); + for (ProcessTaskRelationLog processTaskRelationLog : processTaskRelationLogList) { + ProcessTaskRelation processTaskRelation = JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), + ProcessTaskRelation.class); + processTaskRelationMapper.insert(processTaskRelation); + } + } + /** * update task definition - * - * @param operator - * @param projectCode - * @param taskNode - * @param taskDefinition - * @return */ public int updateTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) { @@ -2159,12 +2165,24 @@ public class ProcessService { .max((x, y) -> x > y ? x : y) .orElse(0) + 1; Date now = new Date(); + taskDefinition.setProjectCode(projectCode); + taskDefinition.setUserId(operator.getId()); taskDefinition.setVersion(version); - taskDefinition.setCode(taskDefinition.getCode()); + taskDefinition.setUpdateTime(now); + setTaskFromTaskNode(taskNode, taskDefinition); + int update = taskDefinitionMapper.updateById(taskDefinition); + // save task definition log + TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(); + taskDefinitionLog.set(taskDefinition); + taskDefinitionLog.setOperator(operator.getId()); + taskDefinitionLog.setOperateTime(now); + int insert = taskDefinitionLogMapper.insert(taskDefinitionLog); + return insert & update; + } + + private void setTaskFromTaskNode(TaskNode taskNode, TaskDefinition taskDefinition) { taskDefinition.setName(taskNode.getName()); taskDefinition.setDescription(taskNode.getDesc()); - taskDefinition.setProjectCode(projectCode); - taskDefinition.setUserId(operator.getId()); taskDefinition.setTaskType(TaskType.of(taskNode.getType())); taskDefinition.setTaskParams(taskNode.getParams()); taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES); @@ -2175,16 +2193,7 @@ public class ProcessService { taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE); taskDefinition.setTaskTimeoutStrategy(taskNode.getTaskTimeoutParameter().getStrategy()); taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval()); - taskDefinition.setUpdateTime(now); taskDefinition.setResourceIds(getResourceIds(taskDefinition)); - int update = taskDefinitionMapper.updateById(taskDefinition); - // save task definition log - TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(); - taskDefinitionLog.set(taskDefinition); - taskDefinitionLog.setOperator(operator.getId()); - taskDefinitionLog.setOperateTime(now); - int insert = taskDefinitionLogMapper.insert(taskDefinitionLog); - return insert & update; } /** @@ -2212,15 +2221,7 @@ public class ProcessService { } /** - * @param operator - * @param name - * @param desc - * @param locations - * @param connects - * @param project - * @param processData - * @param processDefinition - * @return + * */ public int saveProcessDefinition(User operator, Project project, String name, String desc, String locations, String connects, ProcessData processData, ProcessDefinition processDefinition) { @@ -2230,28 +2231,20 @@ public class ProcessService { TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionName(project.getCode(), task.getName()); updateTaskDefinition(operator, project.getCode(), task, taskDefinition); } - createTaskAndRelation(operator, project.getName(), "", processDefinition, processData); + createTaskAndRelation(operator, project.getCode(), processDefinition, processData); ProcessDefinitionLog processDefinitionLog = insertProcessDefinitionLog(operator, processDefinition.getCode(), name, processData, project, desc, locations, connects); return switchVersion(processDefinition, processDefinitionLog); } /** - * @param operator - * @param processDefinitionCode - * @param processDefinitionName - * @param processData - * @param project - * @param desc - * @param locations - * @param connects - * @return + * */ public ProcessDefinitionLog insertProcessDefinitionLog(User operator, Long processDefinitionCode, String processDefinitionName, ProcessData processData, Project project, String desc, String locations, String connects) { ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(); - int version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinitionLog.getCode()); + int version = processDefineLogMapper.queryMaxVersionForDefinition(processDefinitionLog.getCode()); processDefinitionLog.setCode(processDefinitionCode); processDefinitionLog.setVersion(version); processDefinitionLog.setName(processDefinitionName); @@ -2275,7 +2268,7 @@ public class ProcessService { } processDefinitionLog.setGlobalParamList(globalParamsList); processDefinitionLog.setFlag(Flag.YES); - int insert = processDefinitionLogMapper.insert(processDefinitionLog); + int insert = processDefineLogMapper.insert(processDefinitionLog); if (insert > 0) { return processDefinitionLog; } @@ -2283,24 +2276,96 @@ public class ProcessService { } /** - * create task defintion and task relations - * - * @param loginUser - * @param projectName - * @param relationName - * @param processDefinition - * @param processData - * @return + * create task definition and task relations */ - public void createTaskAndRelation(User loginUser, String projectName, String relationName, - ProcessDefinition processDefinition, - ProcessData processData) { + public int createTaskAndRelation(User operator, + Long projectCode, + ProcessDefinition processDefinition, + ProcessData processData) { List taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); - for (TaskNode task : taskNodeList) { - //TODO... task code exists, update task - //createTaskDefinition(loginUser, projectName, JSONUtils.toJsonString(task)); + for (TaskNode taskNode : taskNodeList) { + TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionName(projectCode, taskNode.getName()); + if (taskDefinition == null) { + long code; + try { + code = SnowFlakeUtils.getInstance().nextId(); + taskDefinition = new TaskDefinition(); + taskDefinition.setCode(code); + } catch (SnowFlakeException e) { + logger.error("Task code get error, ", e); + return -1; + } + saveTaskDefinition(operator, projectCode, taskNode, taskDefinition); + } else { + if (isTaskOnline(taskDefinition.getCode())) { + // TODO return something for fail + return -1; + } + updateTaskDefinition(operator, projectCode, taskNode, taskDefinition); + } + } + List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode()); + if (!processTaskRelationList.isEmpty()) { + processTaskRelationMapper.deleteByCode(projectCode, processDefinition.getCode()); } - // TODO: query taskCode by projectCode and taskName + // TODO parse taskNodeList for preTaskCode and postTaskCode + List taskNodeRelationList = DagHelper.getProcessDag(taskNodeList).getEdges(); + Date now = new Date(); + ProcessTaskRelation processTaskRelation = new ProcessTaskRelation("",// todo relation name + processDefinition.getVersion(), + projectCode, + processDefinition.getCode(), + 0L, // todo pre task code + 0L, // todo post task code + ConditionType.of(""), // todo conditionType + "", // todo conditionParams + now, + now); + // save process task relation + int insert = processTaskRelationMapper.insert(processTaskRelation); + // save process task relation log + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); + processTaskRelationLog.set(processTaskRelation); + processTaskRelationLog.setOperator(operator.getId()); + processTaskRelationLog.setOperateTime(now); + int logInsert = processTaskRelationLogMapper.insert(processTaskRelationLog); + return insert & logInsert; + } + + public int saveTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) { + Date now = new Date(); + taskDefinition.setProjectCode(projectCode); + taskDefinition.setUserId(operator.getId()); + taskDefinition.setVersion(1); + taskDefinition.setUpdateTime(now); + taskDefinition.setCreateTime(now); + setTaskFromTaskNode(taskNode, taskDefinition); + // save the new task definition + int insert = taskDefinitionMapper.insert(taskDefinition); + // save task definition log + TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(); + taskDefinitionLog.set(taskDefinition); + taskDefinitionLog.setOperator(operator.getId()); + taskDefinitionLog.setOperateTime(now); + int logInsert = taskDefinitionLogMapper.insert(taskDefinitionLog); + return insert & logInsert; } + public boolean isTaskOnline(Long taskCode) { + List processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode); + if (!processTaskRelationList.isEmpty()) { + Set processDefinitionCodes = processTaskRelationList + .stream() + .map(ProcessTaskRelation::getProcessDefinitionCode) + .collect(Collectors.toSet()); + List processDefinitionList = processDefineMapper.queryByCodes(processDefinitionCodes); + // check process definition is already online + for (ProcessDefinition processDefinition : processDefinitionList) { + if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { + return true; + } + } + } + return false; + } } diff --git a/sql/dolphinscheduler-postgre.sql b/sql/dolphinscheduler-postgre.sql index d1439f6be5..6f5088cb1d 100644 --- a/sql/dolphinscheduler-postgre.sql +++ b/sql/dolphinscheduler-postgre.sql @@ -394,12 +394,10 @@ DROP TABLE IF EXISTS t_ds_process_task_relation; CREATE TABLE t_ds_process_task_relation ( id int NOT NULL , name varchar(255) DEFAULT NULL , - version int DEFAULT NULL , + process_definition_version int DEFAULT NULL , project_code bigint DEFAULT NULL , process_definition_code bigint DEFAULT NULL , - pre_project_code bigint DEFAULT NULL , pre_task_code bigint DEFAULT NULL , - post_project_code bigint DEFAULT NULL , post_task_code bigint DEFAULT NULL , condition_type int DEFAULT NULL , condition_params text , @@ -412,12 +410,10 @@ DROP TABLE IF EXISTS t_ds_process_task_relation_log; CREATE TABLE t_ds_process_task_relation_log ( id int NOT NULL , name varchar(255) DEFAULT NULL , - version int DEFAULT NULL , + process_definition_version int DEFAULT NULL , project_code bigint DEFAULT NULL , process_definition_code bigint DEFAULT NULL , - pre_project_code bigint DEFAULT NULL , pre_task_code bigint DEFAULT NULL , - post_project_code bigint DEFAULT NULL , post_task_code bigint DEFAULT NULL , condition_type int DEFAULT NULL , condition_params text , diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql index a9d4752a86..640b55a866 100644 --- a/sql/dolphinscheduler_mysql.sql +++ b/sql/dolphinscheduler_mysql.sql @@ -509,7 +509,7 @@ DROP TABLE IF EXISTS `t_ds_process_task_relation`; CREATE TABLE `t_ds_process_task_relation` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id', `name` varchar(200) DEFAULT NULL COMMENT 'relation name', - `version` int(11) DEFAULT NULL COMMENT 'relation version', + `process_definition_version` int(11) DEFAULT NULL COMMENT 'process version', `project_code` bigint(20) NOT NULL COMMENT 'project code', `process_definition_code` bigint(20) NOT NULL COMMENT 'process code', `pre_task_code` bigint(20) NOT NULL COMMENT 'pre task code', @@ -528,7 +528,7 @@ DROP TABLE IF EXISTS `t_ds_process_task_relation_log`; CREATE TABLE `t_ds_process_task_relation_log` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'self-increasing id', `name` varchar(200) DEFAULT NULL COMMENT 'relation name', - `version` int(11) DEFAULT NULL COMMENT 'relation version', + `process_definition_version` int(11) DEFAULT NULL COMMENT 'process version', `project_code` bigint(20) NOT NULL COMMENT 'project code', `process_definition_code` bigint(20) NOT NULL COMMENT 'process code', `pre_task_code` bigint(20) NOT NULL COMMENT 'pre task code',