diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java index 56af2ac62c..754ac2021b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java @@ -95,6 +95,36 @@ public class TaskDefinitionController extends BaseController { return returnDataList(result); } + /** + * create single task definition that binds the workflow + * + * @param loginUser login user + * @param projectCode project code + * @param processDefinitionCode process definition code + * @param taskDefinitionJsonObj task definition json object + * @param upstreamCodes upstream task codes, sep comma + * @return create result code + */ + @ApiOperation(value = "saveSingle", notes = "CREATE_SINGLE_TASK_DEFINITION_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"), + @ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, type = "processDefinitionCode"), + @ApiImplicitParam(name = "taskDefinitionJsonObj", value = "TASK_DEFINITION_JSON", required = true, type = "String"), + @ApiImplicitParam(name = "upstreamCodes", value = "UPSTREAM_CODES", required = false, type = "String") + }) + @PostMapping("/save-single") + @ResponseStatus(HttpStatus.CREATED) + @ApiException(CREATE_TASK_DEFINITION_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result createTaskBindsWorkFlow(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @RequestParam(value = "processDefinitionCode", required = true) long processDefinitionCode, + @RequestParam(value = "taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj, + @RequestParam(value = "upstreamCodes", required = false) String upstreamCodes) { + Map result = taskDefinitionService.createTaskBindsWorkFlow(loginUser, projectCode, processDefinitionCode, taskDefinitionJsonObj, upstreamCodes); + return returnDataList(result); + } + /** * update task definition * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java index b1661c8721..1b55d8b4c0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java @@ -40,6 +40,22 @@ public interface TaskDefinitionService { long projectCode, String taskDefinitionJson); + /** + * create single task definition that binds the workflow + * + * @param loginUser login user + * @param projectCode project code + * @param processDefinitionCode process definition code + * @param taskDefinitionJsonObj task definition json object + * @param upstreamCodes upstream task codes, sep comma + * @return create result code + */ + Map createTaskBindsWorkFlow(User loginUser, + long projectCode, + long processDefinitionCode, + String taskDefinitionJsonObj, + String upstreamCodes); + /** * query task definition * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 7ddff44f81..c1342a17a4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.Project; @@ -39,16 +40,16 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskMainInfo; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.service.permission.PermissionCheck; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.Arrays; @@ -101,7 +102,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe private ProcessService processService; @Autowired - private UserMapper userMapper; + private ProcessDefinitionMapper processDefinitionMapper; /** * create task definition @@ -148,6 +149,93 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } + /** + * create single task definition that binds the workflow + * + * @param loginUser login user + * @param projectCode project code + * @param processDefinitionCode process definition code + * @param taskDefinitionJsonObj task definition json object + * @param upstreamCodes upstream task codes, sep comma + * @return create result code + */ + @Transactional(rollbackFor = RuntimeException.class) + @Override + public Map createTaskBindsWorkFlow(User loginUser, + long projectCode, + long processDefinitionCode, + String taskDefinitionJsonObj, + String upstreamCodes) { + Project project = projectMapper.queryByCode(projectCode); + //check user access for project + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); + if (result.get(Constants.STATUS) != Status.SUCCESS) { + return result; + } + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode); + return result; + } + TaskDefinitionLog taskDefinition = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class); + if (taskDefinition == null) { + logger.error("taskDefinitionJsonObj is not valid json"); + putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj); + return result; + } + if (!CheckUtils.checkTaskDefinitionParameters(taskDefinition)) { + logger.error("task definition {} parameter invalid", taskDefinition.getName()); + putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinition.getName()); + return result; + } + long taskCode = taskDefinition.getCode(); + if (taskCode == 0) { + try { + taskCode = CodeGenerateUtils.getInstance().genCode(); + taskDefinition.setCode(taskCode); + } catch (CodeGenerateException e) { + logger.error("Task code get error, ", e); + putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, taskDefinitionJsonObj); + return result; + } + } + if (StringUtils.isNotBlank(upstreamCodes)) { + Set upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); + List upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes); + Set queryUpStreamTaskCodes = upstreamTaskDefinitionList.stream().map(TaskDefinition::getCode).collect(Collectors.toSet()); + // upstreamTaskCodes - queryUpStreamTaskCodes + Set diffCode = upstreamTaskCodes.stream().filter(code -> !queryUpStreamTaskCodes.contains(code)).collect(Collectors.toSet()); + if (!diffCode.isEmpty()) { + putMsg(result, Status.TASK_DEFINE_NOT_EXIST, StringUtils.join(diffCode, Constants.COMMA)); + return result; + } + List processTaskRelationLogList = Lists.newArrayList(); + for (TaskDefinition upstreamTask : upstreamTaskDefinitionList) { + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); + processTaskRelationLog.setPreTaskCode(upstreamTask.getCode()); + processTaskRelationLog.setPreTaskVersion(upstreamTask.getVersion()); + processTaskRelationLog.setPostTaskCode(taskCode); + processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST); + processTaskRelationLogList.add(processTaskRelationLog); + } + int insertResult = processService.saveTaskRelation(loginUser, projectCode, processDefinition.getCode(), processDefinition.getVersion(), + processTaskRelationLogList, null); + if (insertResult == Constants.EXIT_CODE_SUCCESS) { + putMsg(result, Status.SUCCESS); + result.put(Constants.DATA_LIST, processDefinition); + } else { + putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); + throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); + } + } + int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, Lists.newArrayList(taskDefinition)); + if (saveTaskResult == Constants.DEFINITION_FAILURE) { + putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); + throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); + } + return result; + } + /** * query task definition * diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java index a12887dc7d..304e623a0a 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskMainInfo; import org.apache.ibatis.annotations.MapKey; import org.apache.ibatis.annotations.Param; +import java.util.Collection; import java.util.List; import java.util.Map; @@ -117,4 +118,12 @@ public interface TaskDefinitionMapper extends BaseMapper { @Param("searchWorkflowName") String searchWorkflowName, @Param("searchTaskName") String searchTaskName, @Param("taskType") String taskType); + + /** + * query task definition by code list + * + * @param codes taskDefinitionCode list + * @return task definition list + */ + List queryByCodeList(@Param("codes") Collection codes); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml index e21d608064..8dbff07e28 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml @@ -106,4 +106,16 @@ order by td.update_time desc +