diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java index b4838c731c..9a25cc2b1d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.api.controller; -import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_DEFINITION; +import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_DEFINITION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_DEFINE_BY_CODE_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_DEFINITION_VERSION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DETAIL_OF_TASK_DEFINITION_ERROR; @@ -55,6 +55,8 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import springfox.documentation.annotations.ApiIgnore; + + /** * task definition controller */ @@ -81,7 +83,7 @@ public class TaskDefinitionController extends BaseController { }) @PostMapping(value = "/save") @ResponseStatus(HttpStatus.CREATED) - @ApiException(CREATE_TASK_DEFINITION) + @ApiException(CREATE_TASK_DEFINITION_ERROR) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result createTaskDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 268dd8ef25..cc5a3809f0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -270,7 +270,7 @@ public enum Status { PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already exist, processCode:[{0}]", "工作流任务关系已存在, processCode:[{0}]"), PROCESS_DAG_IS_EMPTY(50035, "process dag can not be empty", "工作流dag不能为空"), CHECK_PROCESS_TASK_RELATION_ERROR(50036, "check process task relation error", "工作流任务关系参数错误"), - CREATE_TASK_DEFINITION(50037, "create task definition", "创建任务错误"), + CREATE_TASK_DEFINITION_ERROR(50037, "create task definition error", "创建任务错误"), UPDATE_TASK_DEFINITION_ERROR(50038, "update task definition error", "更新任务定义错误"), QUERY_TASK_DEFINITION_VERSIONS_ERROR(50039, "query task definition versions error", "查询任务历史版本信息出错"), SWITCH_TASK_DEFINITION_VERSION_ERROR(50040, "Switch task definition version error", "切换任务版本出错"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 66bff859de..4b2a9634e2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service.impl; import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID; + import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.TaskDefinitionService; @@ -40,7 +41,9 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -97,28 +100,48 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } - TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class); - checkTaskNode(result, taskNode, taskDefinitionJson); - if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID - || result.get(Constants.STATUS) == Status.PROCESS_NODE_S_PARAMETER_INVALID) { - return result; - } - TaskDefinition taskDefinition = new TaskDefinition(); - long code = 0L; - try { - code = SnowFlakeUtils.getInstance().nextId(); - taskDefinition.setCode(code); - } catch (SnowFlakeException e) { - logger.error("Task code get error, ", e); + List taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); + int totalSuccessNumber = 0; + List totalSuccessCode = new ArrayList<>(); + List taskDefinitionLogsList = new ArrayList<>(); + for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { + checkTaskDefinition(result, taskDefinitionLog); + if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID + || result.get(Constants.STATUS) == Status.PROCESS_NODE_S_PARAMETER_INVALID) { + return result; + } + taskDefinitionLog.setProjectCode(projectCode); + taskDefinitionLog.setUserId(loginUser.getId()); + taskDefinitionLog.setVersion(1); + Date now = new Date(); + taskDefinitionLog.setCreateTime(now); + taskDefinitionLog.setUpdateTime(now); + long code = 0L; + try { + code = SnowFlakeUtils.getInstance().nextId(); + taskDefinitionLog.setCode(code); + } catch (SnowFlakeException e) { + logger.error("Task code get error, ", e); + putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code"); + return result; + } + taskDefinitionLog.setOperator(loginUser.getId()); + taskDefinitionLog.setOperateTime(now); + taskDefinitionLogsList.add(taskDefinitionLog); + totalSuccessCode.add(code); + totalSuccessNumber++; } - if (code == 0L) { - putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code"); + int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogsList); + int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogsList); + if ((logInsert & insert) == 0) { + putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); return result; } - int insert = processService.saveTaskDefinition(loginUser, project.getCode(), taskNode, taskDefinition); - // return taskDefinition object with code - result.put(Constants.DATA_LIST, code); - putMsg(result, Status.SUCCESS, insert); + Map resData = new HashMap<>(); + resData.put("total", totalSuccessNumber); + resData.put("code",totalSuccessCode); + putMsg(result, Status.SUCCESS); + result.put(Constants.DATA_LIST, resData); return result; } @@ -232,6 +255,12 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } } + private void checkTaskDefinition(Map result, TaskDefinition taskDefinition) { + if (!CheckUtils.checkTaskDefinitionParameters(taskDefinition)) { + logger.error("task definition {} parameter invalid", taskDefinition.getName()); + putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinition.getName()); + } + } /** * update task definition * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java index aca977125e..1149aa86bd 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import java.text.MessageFormat; import java.util.HashMap; @@ -134,7 +135,23 @@ public class CheckUtils { if (TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskType)) { abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getDependence()); } else { - abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getParams()); + abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getTaskParams()); + } + + if (abstractParameters != null) { + return abstractParameters.checkParameters(); + } + + return false; + } + + public static boolean checkTaskDefinitionParameters(TaskDefinition taskDefinition) { + AbstractParameters abstractParameters; + String taskType = taskDefinition.getTaskType(); + if (TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskType)) { + abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskDefinition.getDependence()); + } else { + abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskDefinition.getTaskParams()); } if (abstractParameters != null) { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java index b78fd82bcb..d58c509a8d 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java @@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import java.text.MessageFormat; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.junit.Assert; @@ -49,7 +50,6 @@ import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class TaskDefinitionServiceImplTest { - String taskDefinitionJson = "{\n" + " \"type\": \"SQL\",\n" + " \"id\": \"tasks-27297\",\n" @@ -132,16 +132,34 @@ public class TaskDefinitionServiceImplTest { putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result); - TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class); - - Mockito.when(processService.saveTaskDefinition(Mockito.eq(loginUser) - , Mockito.eq(project.getCode()) - , Mockito.eq(taskNode) - , Mockito.any(TaskDefinition.class))) - .thenReturn(1); - + String createTaskDefinitionJson = "[{\n" + + "\"name\": \"test12111\",\n" + + "\"description\": \"test\",\n" + + "\"taskType\": \"SHELL\",\n" + + "\"flag\": 0,\n" + + "\"taskParams\": \n" + + "\"{\\\"resourceList\\\":[],\n" + + "\\\"localParams\\\":[],\n" + + "\\\"rawScript\\\":\\\"echo 11\\\",\n" + + "\\\"conditionResult\\\":\n" + + "{\\\"successNode\\\":[\\\"\\\"],\n" + + "\\\"failedNode\\\":[\\\"\\\"]},\n" + + "\\\"dependence\\\":{}}\",\n" + + "\"taskPriority\": 0,\n" + + "\"workerGroup\": \"default\",\n" + + "\"failRetryTimes\": 0,\n" + + "\"failRetryInterval\": 1,\n" + + "\"timeoutFlag\": 1, \n" + + "\"timeoutNotifyStrategy\": 0,\n" + + "\"timeout\": 0, \n" + + "\"delayTime\": 0,\n" + + "\"resourceIds\":\"\" \n" + + "}] "; + List taskDefinitions = JSONUtils.toList(createTaskDefinitionJson, TaskDefinition.class); + Mockito.when(taskDefinitionMapper.batchInsert(Mockito.anyList())).thenReturn(1); + Mockito.when(taskDefinitionLogMapper.batchInsert(Mockito.anyList())).thenReturn(1); Map relation = taskDefinitionService - .createTaskDefinition(loginUser, projectCode, taskDefinitionJson); + .createTaskDefinition(loginUser, projectCode, createTaskDefinitionJson); Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); @@ -201,38 +219,6 @@ public class TaskDefinitionServiceImplTest { } - @Test - public void updateTaskDefinition() { - long projectCode = 1L; - - Project project = getProject(projectCode); - Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); - - User loginUser = new User(); - loginUser.setId(-1); - loginUser.setUserType(UserType.GENERAL_USER); - - Map result = new HashMap<>(); - putMsg(result, Status.SUCCESS, projectCode); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result); - - TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class); - - Mockito.when(processService.updateTaskDefinition(Mockito.eq(loginUser) - , Mockito.eq(project.getCode()) - , Mockito.eq(taskNode) - , Mockito.any(TaskDefinition.class))) - .thenReturn(1); - - Mockito.when(taskDefinitionMapper.queryByDefinitionCode(11L)) - .thenReturn(new TaskDefinition()); - - Map relation = taskDefinitionService - .updateTaskDefinition(loginUser, projectCode, 11L, taskDefinitionJson); - - Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); - } - @Test public void switchVersion() { int version = 1; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java index 08ca28d896..28859d47a4 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.dao.entity; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; @@ -395,6 +396,10 @@ public class TaskDefinition { this.delayTime = delayTime; } + public String getDependence() { + return JSONUtils.getNodeString(this.taskParams, Constants.DEPENDENCE); + } + @Override public String toString() { return "TaskDefinition{" diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java index 1ad40e0124..eb29b647bc 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java @@ -67,4 +67,12 @@ public interface TaskDefinitionLogMapper extends BaseMapper { */ List queryByTaskDefinitions(@Param("taskDefinitions") Collection taskDefinitions); + /** + * batch insert task definition logs + * + * @param taskDefinitionLogs taskDefinitionLogs + * @return int + */ + int batchInsert(@Param("taskDefinitionLogs") List taskDefinitionLogs); + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java index b200d9a523..99503b2de5 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java @@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.dao.mapper; import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; - +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.ibatis.annotations.MapKey; import org.apache.ibatis.annotations.Param; @@ -106,4 +106,12 @@ public interface TaskDefinitionMapper extends BaseMapper { * @return int */ int deleteByCode(@Param("code") Long code); + + /** + * batch insert task definitions + * + * @param taskDefinitions taskDefinitions + * @return int + */ + int batchInsert(@Param("taskDefinitions") List taskDefinitions); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml index 7f31843b9d..df9d115890 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml @@ -59,4 +59,18 @@ + + insert into t_ds_task_definition_log (code, `name`, version, description, project_code, user_id, + task_type, task_params, flag, task_priority, worker_group, fail_retry_times, fail_retry_interval, + timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, operator, operate_time, create_time, update_time) + values + + (#{taskDefinitionLog.code},#{taskDefinitionLog.name},#{taskDefinitionLog.version},#{taskDefinitionLog.description}, + #{taskDefinitionLog.projectCode},#{taskDefinitionLog.userId},#{taskDefinitionLog.taskType},#{taskDefinitionLog.taskParams}, + #{taskDefinitionLog.flag},#{taskDefinitionLog.taskPriority},#{taskDefinitionLog.workerGroup},#{taskDefinitionLog.failRetryTimes}, + #{taskDefinitionLog.failRetryInterval},#{taskDefinitionLog.timeoutFlag},#{taskDefinitionLog.timeoutNotifyStrategy},#{taskDefinitionLog.timeout}, + #{taskDefinitionLog.delayTime},#{taskDefinitionLog.resourceIds},#{taskDefinitionLog.operator},#{taskDefinitionLog.operateTime}, + #{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime}) + + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml index 89801effff..cc8e18067a 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml @@ -91,4 +91,17 @@ delete from t_ds_task_definition where code = #{code} + + insert into t_ds_task_definition (code, `name`, version, description, project_code, user_id, + task_type, task_params, flag, task_priority, worker_group, fail_retry_times, fail_retry_interval, + timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, create_time, update_time) + values + + (#{taskDefinition.code},#{taskDefinition.name},#{taskDefinition.version},#{taskDefinition.description}, + #{taskDefinition.projectCode},#{taskDefinition.userId},#{taskDefinition.taskType},#{taskDefinition.taskParams}, + #{taskDefinition.flag},#{taskDefinition.taskPriority},#{taskDefinition.workerGroup},#{taskDefinition.failRetryTimes}, + #{taskDefinition.failRetryInterval},#{taskDefinition.timeoutFlag},#{taskDefinition.timeoutNotifyStrategy},#{taskDefinition.timeout}, + #{taskDefinition.delayTime},#{taskDefinition.resourceIds},#{taskDefinition.createTime},#{taskDefinition.updateTime}) + +