From acf9c84c503f326e16bb328ac3896a3ead66dc58 Mon Sep 17 00:00:00 2001 From: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com> Date: Wed, 5 Jan 2022 22:08:42 +0800 Subject: [PATCH] [Feature][workflow list edit] add task update with upstream (#7829) * add task save and binds workflow * add task update with upstream --- .../controller/TaskDefinitionController.java | 34 +++- .../dolphinscheduler/api/enums/Status.java | 2 +- .../api/service/TaskDefinitionService.java | 16 ++ .../impl/TaskDefinitionServiceImpl.java | 163 +++++++++++++++--- .../dao/entity/TaskDefinition.java | 35 ++-- .../dao/entity/TaskMainInfo.java | 13 ++ .../dao/mapper/TaskDefinitionMapper.xml | 2 +- .../mysql/dolphinscheduler_ddl.sql | 4 + 8 files changed, 223 insertions(+), 46 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java index 754ac2021b..d00a05282c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java @@ -152,6 +152,36 @@ public class TaskDefinitionController extends BaseController { return returnDataList(result); } + /** + * update task definition + * + * @param loginUser login user + * @param projectCode project code + * @param code task definition code + * @param taskDefinitionJsonObj task definition json object + * @param upstreamCodes upstream task codes, sep comma + * @return update result code + */ + @ApiOperation(value = "updateWithUpstream", notes = "UPDATE_TASK_DEFINITION_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"), + @ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1"), + @ApiImplicitParam(name = "taskDefinitionJsonObj", value = "TASK_DEFINITION_JSON", required = true, type = "String"), + @ApiImplicitParam(name = "upstreamCodes", value = "UPSTREAM_CODES", required = false, type = "String") + }) + @PutMapping(value = "/{code}/with-upstream") + @ResponseStatus(HttpStatus.OK) + @ApiException(UPDATE_TASK_DEFINITION_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result updateTaskWithUpstream(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @PathVariable(value = "code") long code, + @RequestParam(value = "taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj, + @RequestParam(value = "upstreamCodes", required = false) String upstreamCodes) { + Map result = taskDefinitionService.updateTaskWithUpstream(loginUser, projectCode, code, taskDefinitionJsonObj, upstreamCodes); + return returnDataList(result); + } + /** * query task definition version paging list info * @@ -300,7 +330,7 @@ public class TaskDefinitionController extends BaseController { @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = false, type = "Long"), @ApiImplicitParam(name = "searchWorkflowName", value = "SEARCH_WORKFLOW_NAME", required = false, type = "String"), @ApiImplicitParam(name = "searchTaskName", value = "SEARCH_TASK_NAME", required = false, type = "String"), - @ApiImplicitParam(name = "taskType", value = "TASK_TYPE", required = true, dataType = "TaskType", example = "SHELL"), + @ApiImplicitParam(name = "taskType", value = "TASK_TYPE", required = false, dataType = "TaskType", example = "SHELL"), @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"), @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10") }) @@ -312,7 +342,7 @@ public class TaskDefinitionController extends BaseController { @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, @RequestParam(value = "searchWorkflowName", required = false) String searchWorkflowName, @RequestParam(value = "searchTaskName", required = false) String searchTaskName, - @RequestParam(value = "taskType", required = true) TaskType taskType, + @RequestParam(value = "taskType", required = false) TaskType taskType, @RequestParam("pageNo") Integer pageNo, @RequestParam("pageSize") Integer pageSize) { Result result = checkPageParams(pageNo, pageSize); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 2e5f3fcac7..ec4931b7d6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -258,7 +258,7 @@ public enum Status { DATA_IS_NULL(50018, "data {0} is null", "数据[{0}]不能为空"), PROCESS_NODE_HAS_CYCLE(50019, "process node has cycle", "流程节点间存在循环依赖"), PROCESS_NODE_S_PARAMETER_INVALID(50020, "process node {0} parameter invalid", "流程节点[{0}]参数无效"), - PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line", "工作流定义[{0}]已上线"), + PROCESS_DEFINE_STATE_ONLINE(50021, "process definition [{0}] is already on line", "工作流定义[{0}]已上线"), DELETE_PROCESS_DEFINE_BY_CODE_ERROR(50022, "delete process definition by code error", "删除工作流定义错误"), SCHEDULE_CRON_STATE_ONLINE(50023, "the status of schedule {0} is already on line", "调度配置[{0}]已上线"), DELETE_SCHEDULE_CRON_BY_ID_ERROR(50024, "delete schedule by id error", "删除调度配置错误"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java index 1b55d8b4c0..962e60a15a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java @@ -91,6 +91,22 @@ public interface TaskDefinitionService { long taskCode, String taskDefinitionJsonObj); + /** + * update task definition and upstream + * + * @param loginUser login user + * @param projectCode project code + * @param taskCode task definition code + * @param taskDefinitionJsonObj task definition json object + * @param upstreamCodes upstream task codes, sep comma + * @return update result code + */ + Map updateTaskWithUpstream(User loginUser, + long projectCode, + long taskCode, + String taskDefinitionJsonObj, + String upstreamCodes); + /** * update task definition * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index c1342a17a4..094ce9f775 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; +import org.apache.dolphinscheduler.common.enums.ConditionType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.TaskType; @@ -173,10 +174,14 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); - if (processDefinition == null) { + if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode); return result; } + if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { + putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinitionCode); + return result; + } TaskDefinitionLog taskDefinition = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class); if (taskDefinition == null) { logger.error("taskDefinitionJsonObj is not valid json"); @@ -216,14 +221,17 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe processTaskRelationLog.setPreTaskVersion(upstreamTask.getVersion()); processTaskRelationLog.setPostTaskCode(taskCode); processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST); + processTaskRelationLog.setConditionType(ConditionType.NONE); + processTaskRelationLog.setConditionParams("{}"); processTaskRelationLogList.add(processTaskRelationLog); } + List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + if (!processTaskRelationList.isEmpty()) { + processTaskRelationLogList.addAll(processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList())); + } int insertResult = processService.saveTaskRelation(loginUser, projectCode, processDefinition.getCode(), processDefinition.getVersion(), processTaskRelationLogList, null); - if (insertResult == Constants.EXIT_CODE_SUCCESS) { - putMsg(result, Status.SUCCESS); - result.put(Constants.DATA_LIST, processDefinition); - } else { + if (insertResult != Constants.EXIT_CODE_SUCCESS) { putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); } @@ -233,6 +241,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); } + putMsg(result, Status.SUCCESS); + result.put(Constants.DATA_LIST, taskDefinition); return result; } @@ -286,7 +296,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); return result; } - if (taskDefinition.getFlag() == Flag.YES) { + if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) { putMsg(result, Status.TASK_DEFINE_STATE_ONLINE, taskCode); return result; } @@ -332,36 +342,81 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Transactional(rollbackFor = RuntimeException.class) @Override public Map updateTaskDefinition(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj) { + Map result = new HashMap<>(); + int version = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result); + if (version <= 0) { + return result; + } + List processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode); + if (!processTaskRelationList.isEmpty()) { + List processTaskRelationLogList = new ArrayList<>(); + int delete = 0; + int deleteLog = 0; + Date now = new Date(); + for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); + delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog); + deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog); + if (processTaskRelationLog.getPreTaskCode() == taskCode) { + processTaskRelationLog.setPreTaskVersion(version); + } + if (processTaskRelationLog.getPostTaskCode() == taskCode) { + processTaskRelationLog.setPostTaskVersion(version); + } + processTaskRelationLog.setOperator(loginUser.getId()); + processTaskRelationLog.setOperateTime(now); + processTaskRelationLog.setUpdateTime(now); + processTaskRelationLogList.add(processTaskRelationLog); + } + if ((delete & deleteLog) == 0) { + throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR); + } else { + int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList); + int insertRelationLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogList); + if ((insertRelation & insertRelationLog) == 0) { + throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); + } + } + } + result.put(Constants.DATA_LIST, taskCode); + putMsg(result, Status.SUCCESS); + return result; + } + + private int updateTask(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, Map result) { Project project = projectMapper.queryByCode(projectCode); //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); + result.putAll(projectService.checkProjectAndAuth(loginUser, project, projectCode)); if (result.get(Constants.STATUS) != Status.SUCCESS) { - return result; + return Constants.EXIT_CODE_FAILURE; } TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); if (taskDefinition == null) { putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); - return result; + return Constants.EXIT_CODE_FAILURE; } if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) { putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION); - return result; + return Constants.EXIT_CODE_FAILURE; } TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class); + if (taskDefinition.equals(taskDefinitionToUpdate)) { + return taskDefinition.getVersion(); + } if (taskDefinitionToUpdate == null) { logger.error("taskDefinitionJson is not valid json"); putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj); - return result; + return Constants.EXIT_CODE_FAILURE; } if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionToUpdate)) { logger.error("task definition {} parameter invalid", taskDefinitionToUpdate.getName()); putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionToUpdate.getName()); - return result; + return Constants.EXIT_CODE_FAILURE; } Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode); if (version == null || version == 0) { putMsg(result, Status.DATA_IS_NOT_VALID, taskCode); - return result; + return Constants.EXIT_CODE_FAILURE; } Date now = new Date(); taskDefinitionToUpdate.setCode(taskCode); @@ -381,43 +436,101 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); } + return version; + } + + /** + * update task definition and upstream + * + * @param loginUser login user + * @param projectCode project code + * @param taskCode task definition code + * @param taskDefinitionJsonObj task definition json object + * @param upstreamCodes upstream task codes, sep comma + * @return update result code + */ + @Override + public Map updateTaskWithUpstream(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, String upstreamCodes) { + Map result = new HashMap<>(); + int version = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result); + if (version <= 0) { + return result; + } + Map queryUpStreamTaskCodeMap; + if (StringUtils.isNotBlank(upstreamCodes)) { + Set upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); + List upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes); + queryUpStreamTaskCodeMap = upstreamTaskDefinitionList.stream().collect(Collectors.toMap(TaskDefinition::getCode, taskDefinition -> taskDefinition)); + // upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet + upstreamTaskCodes.removeAll(queryUpStreamTaskCodeMap.keySet()); + if (!upstreamTaskCodes.isEmpty()) { + putMsg(result, Status.TASK_DEFINE_NOT_EXIST, StringUtils.join(upstreamTaskCodes, Constants.COMMA)); + return result; + } + } else { + queryUpStreamTaskCodeMap = new HashMap<>(); + } List processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode); + if (!queryUpStreamTaskCodeMap.isEmpty() && processTaskRelationList.isEmpty()) { + putMsg(result, Status.PROCESS_TASK_RELATION_NOT_EXIST, StringUtils.join(queryUpStreamTaskCodeMap.keySet(), Constants.COMMA)); + throw new ServiceException(Status.PROCESS_TASK_RELATION_NOT_EXIST); + } if (!processTaskRelationList.isEmpty()) { - List processTaskRelationLogList = new ArrayList<>(); + List relationLogs = new ArrayList<>(); + Date now = new Date(); int delete = 0; int deleteLog = 0; for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog); deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog); + processTaskRelationLog.setOperator(loginUser.getId()); + processTaskRelationLog.setOperateTime(now); + processTaskRelationLog.setUpdateTime(now); if (processTaskRelationLog.getPreTaskCode() == taskCode) { processTaskRelationLog.setPreTaskVersion(version); } if (processTaskRelationLog.getPostTaskCode() == taskCode) { processTaskRelationLog.setPostTaskVersion(version); + TaskDefinition definition = queryUpStreamTaskCodeMap.remove(processTaskRelationLog.getPreTaskCode()); + if (definition == null) { + processTaskRelationLog.setPreTaskCode(0L); + processTaskRelationLog.setPreTaskVersion(0); + } } - processTaskRelationLog.setOperator(loginUser.getId()); - processTaskRelationLog.setOperateTime(now); - processTaskRelationLog.setUpdateTime(now); - processTaskRelationLogList.add(processTaskRelationLog); + relationLogs.add(processTaskRelationLog); } if ((delete & deleteLog) == 0) { throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR); - } else { - int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList); - int insertRelationLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogList); - if ((insertRelation & insertRelationLog) == 0) { - throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); + } + if (!queryUpStreamTaskCodeMap.isEmpty()) { + ProcessTaskRelationLog taskRelationLogDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(relationLogs.get(0)), ProcessTaskRelationLog.class); + assert taskRelationLogDeepCopy != null; + for (TaskDefinition upstreamTask : queryUpStreamTaskCodeMap.values()) { + taskRelationLogDeepCopy.setPreTaskCode(upstreamTask.getCode()); + taskRelationLogDeepCopy.setPreTaskVersion(upstreamTask.getVersion()); + relationLogs.add(taskRelationLogDeepCopy); } } + Map taskRelationLogMap = + relationLogs.stream().collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode, processTaskRelationLog -> processTaskRelationLog)); + if (taskRelationLogMap.containsKey(0L) && taskRelationLogMap.size() >= 3) { + taskRelationLogMap.remove(0L); + } + int insertRelation = processTaskRelationMapper.batchInsert(relationLogs); + int insertRelationLog = processTaskRelationLogMapper.batchInsert(relationLogs); + if ((insertRelation & insertRelationLog) == 0) { + putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); + throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); + } } result.put(Constants.DATA_LIST, taskCode); - putMsg(result, Status.SUCCESS, update); + putMsg(result, Status.SUCCESS); return result; } /** - * update task definition + * Switch task definition * * @param loginUser login user * @param projectCode project code diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java index 699b2ecdef..4d31e235a0 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java @@ -459,24 +459,25 @@ public class TaskDefinition { } TaskDefinition that = (TaskDefinition) o; return failRetryTimes == that.failRetryTimes - && failRetryInterval == that.failRetryInterval - && timeout == that.timeout - && delayTime == that.delayTime - && Objects.equals(name, that.name) - && Objects.equals(description, that.description) - && Objects.equals(taskType, that.taskType) - && Objects.equals(taskParams, that.taskParams) - && flag == that.flag - && taskPriority == that.taskPriority - && Objects.equals(workerGroup, that.workerGroup) - && timeoutFlag == that.timeoutFlag - && timeoutNotifyStrategy == that.timeoutNotifyStrategy - && Objects.equals(resourceIds, that.resourceIds) - && environmentCode == that.environmentCode - && taskGroupId == that.taskGroupId - && taskGroupPriority == that.taskGroupPriority; + && failRetryInterval == that.failRetryInterval + && timeout == that.timeout + && delayTime == that.delayTime + && Objects.equals(name, that.name) + && Objects.equals(description, that.description) + && Objects.equals(taskType, that.taskType) + && Objects.equals(taskParams, that.taskParams) + && flag == that.flag + && taskPriority == that.taskPriority + && Objects.equals(workerGroup, that.workerGroup) + && timeoutFlag == that.timeoutFlag + && timeoutNotifyStrategy == that.timeoutNotifyStrategy + && (Objects.equals(resourceIds, that.resourceIds) + || (StringUtils.EMPTY.equals(resourceIds) && that.resourceIds == null) + || (StringUtils.EMPTY.equals(that.resourceIds) && resourceIds == null)) + && environmentCode == that.environmentCode + && taskGroupId == that.taskGroupId + && taskGroupPriority == that.taskGroupPriority; } - @Override public String toString() { return "TaskDefinition{" diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java index 580c683982..e17392fa78 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java @@ -44,6 +44,11 @@ public class TaskMainInfo { */ private int taskVersion; + /** + * task type + */ + private String taskType; + /** * create time */ @@ -115,6 +120,14 @@ public class TaskMainInfo { this.taskVersion = taskVersion; } + public String getTaskType() { + return taskType; + } + + public void setTaskType(String taskType) { + this.taskType = taskType; + } + public Date getTaskCreateTime() { return taskCreateTime; } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml index 8dbff07e28..5c889d1b07 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml @@ -87,7 +87,7 @@