Browse Source

[Feature][JsonSplit-api]taskDefinition save interface (#5828)

* create task definition api

create task definition api

create task definition api

* fix code smell

* use taskdefinitionlogs not taskdefinition

* fix code smell

* trigger GitHub actions

* fix unit test question

* fix unit test question

* fix unit test question
2.0.7-release
sky 3 years ago committed by GitHub
parent
commit
741d757dcb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  3. 67
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  4. 19
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
  5. 70
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  6. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
  7. 8
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
  8. 10
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
  9. 14
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
  10. 13
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_DEFINITION;
import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_DEFINITION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_DEFINE_BY_CODE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_DEFINITION_VERSION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DETAIL_OF_TASK_DEFINITION_ERROR;
@ -55,6 +55,8 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore;
/**
* task definition controller
*/
@ -81,7 +83,7 @@ public class TaskDefinitionController extends BaseController {
})
@PostMapping(value = "/save")
@ResponseStatus(HttpStatus.CREATED)
@ApiException(CREATE_TASK_DEFINITION)
@ApiException(CREATE_TASK_DEFINITION_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result createTaskDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -270,7 +270,7 @@ public enum Status {
PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already exist, processCode:[{0}]", "工作流任务关系已存在, processCode:[{0}]"),
PROCESS_DAG_IS_EMPTY(50035, "process dag can not be empty", "工作流dag不能为空"),
CHECK_PROCESS_TASK_RELATION_ERROR(50036, "check process task relation error", "工作流任务关系参数错误"),
CREATE_TASK_DEFINITION(50037, "create task definition", "创建任务错误"),
CREATE_TASK_DEFINITION_ERROR(50037, "create task definition error", "创建任务错误"),
UPDATE_TASK_DEFINITION_ERROR(50038, "update task definition error", "更新任务定义错误"),
QUERY_TASK_DEFINITION_VERSIONS_ERROR(50039, "query task definition versions error", "查询任务历史版本信息出错"),
SWITCH_TASK_DEFINITION_VERSION_ERROR(50040, "Switch task definition version error", "切换任务版本出错"),

67
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
@ -40,7 +41,9 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -97,28 +100,48 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class);
checkTaskNode(result, taskNode, taskDefinitionJson);
if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID
|| result.get(Constants.STATUS) == Status.PROCESS_NODE_S_PARAMETER_INVALID) {
return result;
}
TaskDefinition taskDefinition = new TaskDefinition();
long code = 0L;
try {
code = SnowFlakeUtils.getInstance().nextId();
taskDefinition.setCode(code);
} catch (SnowFlakeException e) {
logger.error("Task code get error, ", e);
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
int totalSuccessNumber = 0;
List<Long> totalSuccessCode = new ArrayList<>();
List<TaskDefinitionLog> taskDefinitionLogsList = new ArrayList<>();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
checkTaskDefinition(result, taskDefinitionLog);
if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID
|| result.get(Constants.STATUS) == Status.PROCESS_NODE_S_PARAMETER_INVALID) {
return result;
}
taskDefinitionLog.setProjectCode(projectCode);
taskDefinitionLog.setUserId(loginUser.getId());
taskDefinitionLog.setVersion(1);
Date now = new Date();
taskDefinitionLog.setCreateTime(now);
taskDefinitionLog.setUpdateTime(now);
long code = 0L;
try {
code = SnowFlakeUtils.getInstance().nextId();
taskDefinitionLog.setCode(code);
} catch (SnowFlakeException e) {
logger.error("Task code get error, ", e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
return result;
}
taskDefinitionLog.setOperator(loginUser.getId());
taskDefinitionLog.setOperateTime(now);
taskDefinitionLogsList.add(taskDefinitionLog);
totalSuccessCode.add(code);
totalSuccessNumber++;
}
if (code == 0L) {
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogsList);
int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogsList);
if ((logInsert & insert) == 0) {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
return result;
}
int insert = processService.saveTaskDefinition(loginUser, project.getCode(), taskNode, taskDefinition);
// return taskDefinition object with code
result.put(Constants.DATA_LIST, code);
putMsg(result, Status.SUCCESS, insert);
Map<String, Object> resData = new HashMap<>();
resData.put("total", totalSuccessNumber);
resData.put("code",totalSuccessCode);
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, resData);
return result;
}
@ -232,6 +255,12 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
}
private void checkTaskDefinition(Map<String, Object> result, TaskDefinition taskDefinition) {
if (!CheckUtils.checkTaskDefinitionParameters(taskDefinition)) {
logger.error("task definition {} parameter invalid", taskDefinition.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinition.getName());
}
}
/**
* update task definition
*

19
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java

@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import java.text.MessageFormat;
import java.util.HashMap;
@ -134,7 +135,23 @@ public class CheckUtils {
if (TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskType)) {
abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getDependence());
} else {
abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getParams());
abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskNode.getTaskParams());
}
if (abstractParameters != null) {
return abstractParameters.checkParameters();
}
return false;
}
public static boolean checkTaskDefinitionParameters(TaskDefinition taskDefinition) {
AbstractParameters abstractParameters;
String taskType = taskDefinition.getTaskType();
if (TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskType)) {
abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskDefinition.getDependence());
} else {
abstractParameters = TaskParametersUtils.getParameters(taskType.toUpperCase(), taskDefinition.getTaskParams());
}
if (abstractParameters != null) {

70
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
@ -49,7 +50,6 @@ import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class TaskDefinitionServiceImplTest {
String taskDefinitionJson = "{\n"
+ " \"type\": \"SQL\",\n"
+ " \"id\": \"tasks-27297\",\n"
@ -132,16 +132,34 @@ public class TaskDefinitionServiceImplTest {
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result);
TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class);
Mockito.when(processService.saveTaskDefinition(Mockito.eq(loginUser)
, Mockito.eq(project.getCode())
, Mockito.eq(taskNode)
, Mockito.any(TaskDefinition.class)))
.thenReturn(1);
String createTaskDefinitionJson = "[{\n"
+ "\"name\": \"test12111\",\n"
+ "\"description\": \"test\",\n"
+ "\"taskType\": \"SHELL\",\n"
+ "\"flag\": 0,\n"
+ "\"taskParams\": \n"
+ "\"{\\\"resourceList\\\":[],\n"
+ "\\\"localParams\\\":[],\n"
+ "\\\"rawScript\\\":\\\"echo 11\\\",\n"
+ "\\\"conditionResult\\\":\n"
+ "{\\\"successNode\\\":[\\\"\\\"],\n"
+ "\\\"failedNode\\\":[\\\"\\\"]},\n"
+ "\\\"dependence\\\":{}}\",\n"
+ "\"taskPriority\": 0,\n"
+ "\"workerGroup\": \"default\",\n"
+ "\"failRetryTimes\": 0,\n"
+ "\"failRetryInterval\": 1,\n"
+ "\"timeoutFlag\": 1, \n"
+ "\"timeoutNotifyStrategy\": 0,\n"
+ "\"timeout\": 0, \n"
+ "\"delayTime\": 0,\n"
+ "\"resourceIds\":\"\" \n"
+ "}] ";
List<TaskDefinition> taskDefinitions = JSONUtils.toList(createTaskDefinitionJson, TaskDefinition.class);
Mockito.when(taskDefinitionMapper.batchInsert(Mockito.anyList())).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.batchInsert(Mockito.anyList())).thenReturn(1);
Map<String, Object> relation = taskDefinitionService
.createTaskDefinition(loginUser, projectCode, taskDefinitionJson);
.createTaskDefinition(loginUser, projectCode, createTaskDefinitionJson);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
@ -201,38 +219,6 @@ public class TaskDefinitionServiceImplTest {
}
@Test
public void updateTaskDefinition() {
long projectCode = 1L;
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result);
TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class);
Mockito.when(processService.updateTaskDefinition(Mockito.eq(loginUser)
, Mockito.eq(project.getCode())
, Mockito.eq(taskNode)
, Mockito.any(TaskDefinition.class)))
.thenReturn(1);
Mockito.when(taskDefinitionMapper.queryByDefinitionCode(11L))
.thenReturn(new TaskDefinition());
Map<String, Object> relation = taskDefinitionService
.updateTaskDefinition(loginUser, projectCode, 11L, taskDefinitionJson);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
}
@Test
public void switchVersion() {
int version = 1;

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
@ -395,6 +396,10 @@ public class TaskDefinition {
this.delayTime = delayTime;
}
public String getDependence() {
return JSONUtils.getNodeString(this.taskParams, Constants.DEPENDENCE);
}
@Override
public String toString() {
return "TaskDefinition{"

8
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java

@ -67,4 +67,12 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
*/
List<TaskDefinitionLog> queryByTaskDefinitions(@Param("taskDefinitions") Collection<TaskDefinition> taskDefinitions);
/**
* batch insert task definition logs
*
* @param taskDefinitionLogs taskDefinitionLogs
* @return int
*/
int batchInsert(@Param("taskDefinitionLogs") List<TaskDefinitionLog> taskDefinitionLogs);
}

10
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java

@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.ibatis.annotations.MapKey;
import org.apache.ibatis.annotations.Param;
@ -106,4 +106,12 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
* @return int
*/
int deleteByCode(@Param("code") Long code);
/**
* batch insert task definitions
*
* @param taskDefinitions taskDefinitions
* @return int
*/
int batchInsert(@Param("taskDefinitions") List<TaskDefinitionLog> taskDefinitions);
}

14
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml

@ -59,4 +59,18 @@
</foreach>
</if>
</select>
<insert id="batchInsert">
insert into t_ds_task_definition_log (code, `name`, version, description, project_code, user_id,
task_type, task_params, flag, task_priority, worker_group, fail_retry_times, fail_retry_interval,
timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, operator, operate_time, create_time, update_time)
values
<foreach collection="taskDefinitionLogs" item="taskDefinitionLog" separator=",">
(#{taskDefinitionLog.code},#{taskDefinitionLog.name},#{taskDefinitionLog.version},#{taskDefinitionLog.description},
#{taskDefinitionLog.projectCode},#{taskDefinitionLog.userId},#{taskDefinitionLog.taskType},#{taskDefinitionLog.taskParams},
#{taskDefinitionLog.flag},#{taskDefinitionLog.taskPriority},#{taskDefinitionLog.workerGroup},#{taskDefinitionLog.failRetryTimes},
#{taskDefinitionLog.failRetryInterval},#{taskDefinitionLog.timeoutFlag},#{taskDefinitionLog.timeoutNotifyStrategy},#{taskDefinitionLog.timeout},
#{taskDefinitionLog.delayTime},#{taskDefinitionLog.resourceIds},#{taskDefinitionLog.operator},#{taskDefinitionLog.operateTime},
#{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime})
</foreach>
</insert>
</mapper>

13
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

@ -91,4 +91,17 @@
delete from t_ds_task_definition
where code = #{code}
</delete>
<insert id="batchInsert">
insert into t_ds_task_definition (code, `name`, version, description, project_code, user_id,
task_type, task_params, flag, task_priority, worker_group, fail_retry_times, fail_retry_interval,
timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, create_time, update_time)
values
<foreach collection="taskDefinitions" item="taskDefinition" separator=",">
(#{taskDefinition.code},#{taskDefinition.name},#{taskDefinition.version},#{taskDefinition.description},
#{taskDefinition.projectCode},#{taskDefinition.userId},#{taskDefinition.taskType},#{taskDefinition.taskParams},
#{taskDefinition.flag},#{taskDefinition.taskPriority},#{taskDefinition.workerGroup},#{taskDefinition.failRetryTimes},
#{taskDefinition.failRetryInterval},#{taskDefinition.timeoutFlag},#{taskDefinition.timeoutNotifyStrategy},#{taskDefinition.timeout},
#{taskDefinition.delayTime},#{taskDefinition.resourceIds},#{taskDefinition.createTime},#{taskDefinition.updateTime})
</foreach>
</insert>
</mapper>

Loading…
Cancel
Save