diff --git a/README.md b/README.md index 97e1a95b18..ede405baa7 100644 --- a/README.md +++ b/README.md @@ -17,32 +17,33 @@ Dolphin Scheduler Official Website ### Design features: -A distributed and easy-to-extend visual DAG workflow scheduling system. Dedicated to solving the complex dependencies in data processing, making the scheduling system `out of the box` for data processing. +Dolphin Scheduler is a distributed and easy-to-extend visual DAG workflow scheduling system. It dedicates to solving the complex dependencies in data processing to make the scheduling system `out of the box` for the data processing process. + Its main objectives are as follows: - - Associate the Tasks according to the dependencies of the tasks in a DAG graph, which can visualize the running state of task in real time. - - Support for many task types: Shell, MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Sub_Process, Procedure, etc. + - Associate the tasks according to the dependencies of the tasks in a DAG graph, which can visualize the running state of the task in real-time. + - Support many task types: Shell, MR, Spark, SQL (MySQL, PostgreSQL, hive, spark SQL), Python, Sub_Process, Procedure, etc. - Support process scheduling, dependency scheduling, manual scheduling, manual pause/stop/recovery, support for failed retry/alarm, recovery from specified nodes, Kill task, etc. - - Support process priority, task priority and task failover and task timeout alarm/failure - - Support process global parameters and node custom parameter settings - - Support online upload/download of resource files, management, etc. Support online file creation and editing + - Support the priority of process & task, task failover, and task timeout alarm or failure. + - Support process global parameters and node custom parameter settings. + - Support online upload/download of resource files, management, etc. Support online file creation and editing. - Support task log online viewing and scrolling, online download log, etc. - - Implement cluster HA, decentralize Master cluster and Worker cluster through Zookeeper - - Support online viewing of `Master/Worker` cpu load, memory - - Support process running history tree/gantt chart display, support task status statistics, process status statistics - - Support backfilling data - - Support multi-tenant - - Support internationalization - - There are more waiting partners to explore + - Implement cluster HA, decentralize Master cluster and Worker cluster through Zookeeper. + - Support the viewing of Master/Worker CPU load, memory, and CPU usage metrics. + - Support presenting tree or Gantt chart of workflow history as well as the statistics results of task & process status in each workflow. + - Support backfilling data. + - Support multi-tenant. + - Support internationalization. + - There are more waiting for partners to explore... ### What's in Dolphin Scheduler Stability | Easy to use | Features | Scalability | -- | -- | -- | -- -Decentralized multi-master and multi-worker | Visualization process defines key information such as task status, task type, retry times, task running machine, visual variables and so on at a glance.  |  Support pause, recover operation | support custom task types -HA is supported by itself | All process definition operations are visualized, dragging tasks to draw DAGs, configuring data sources and resources. At the same time, for third-party systems, the api mode operation is provided. | Users on DolphinScheduler can achieve many-to-one or one-to-one mapping relationship through tenants and Hadoop users, which is very important for scheduling large data jobs. " | The scheduler uses distributed scheduling, and the overall scheduling capability will increase linearly with the scale of the cluster. Master and Worker support dynamic online and offline. -Overload processing: Task queue mechanism, the number of schedulable tasks on a single machine can be flexibly configured, when too many tasks will be cached in the task queue, will not cause machine jam. | One-click deployment | Supports traditional shell tasks, and also support big data platform task scheduling: MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Procedure, Sub_Process | | +Decentralized multi-master and multi-worker | Visualization process defines key information such as task status, task type, retry times, task running machine, visual variables, and so on at a glance.  |  Support pause, recover operation | Support custom task types +HA is supported by itself | All process definition operations are visualized, dragging tasks to draw DAGs, configuring data sources and resources. At the same time, for third-party systems, the API mode operation is provided. | Users on Dolphin Scheduler can achieve many-to-one or one-to-one mapping relationship through tenants and Hadoop users, which is very important for scheduling large data jobs. | The scheduler uses distributed scheduling, and the overall scheduling capability will increase linearly with the scale of the cluster. Master and Worker support dynamic online and offline. +Overload processing: Overload processing: By using the task queue mechanism, the number of schedulable tasks on a single machine can be flexibly configured. Machine jam can be avoided with high tolerance to numbers of tasks cached in task queue. | One-click deployment | Support traditional shell tasks, and big data platform task scheduling: MR, Spark, SQL (MySQL, PostgreSQL, hive, spark SQL), Python, Procedure, Sub_Process | | ### System partial screenshot @@ -55,17 +56,14 @@ Overload processing: Task queue mechanism, the number of schedulable tasks on a ![monitor](https://user-images.githubusercontent.com/59273635/75625839-c698a480-5bfc-11ea-8bbe-895b561b337f.png) ![security](https://user-images.githubusercontent.com/15833811/75236441-bfd2f180-57f8-11ea-88bd-f24311e01b7e.png) ![treeview](https://user-images.githubusercontent.com/15833811/75217191-3fe56100-57d1-11ea-8856-f19180d9a879.png) -### Online Demo -- Online Demo ### Recent R&D plan -Work plan of Dolphin Scheduler: [R&D plan](https://github.com/apache/incubator-dolphinscheduler/projects/1), Under the `In Develop` card is what is currently being developed, TODO card is to be done (including feature ideas) +The work plan of Dolphin Scheduler: [R&D plan](https://github.com/apache/incubator-dolphinscheduler/projects/1), which `In Develop` card shows the features that are currently being developed and TODO card lists what needs to be done(including feature ideas). ### How to contribute -Welcome to participate in contributing, please refer to the process of submitting the code: -[[How to contribute](https://dolphinscheduler.apache.org/en-us/docs/development/contribute.html)] +Welcome to participate in contributing, please refer to this website to find out more: [[How to contribute](https://dolphinscheduler.apache.org/en-us/docs/development/contribute.html)] ### How to Build @@ -82,15 +80,15 @@ dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-${latest.release ### Thanks -Dolphin Scheduler uses a lot of excellent open source projects, such as google guava, guice, grpc, netty, ali bonecp, quartz, and many open source projects of apache, etc. -It is because of the shoulders of these open source projects that the birth of the Dolphin Scheduler is possible. We are very grateful for all the open source software used! We also hope that we will not only be the beneficiaries of open source, but also be open source contributors. We also hope that partners who have the same passion and conviction for open source will join in and contribute to open source! +Dolphin Scheduler is based on a lot of excellent open-source projects, such as google guava, guice, grpc, netty, ali bonecp, quartz, and many open-source projects of Apache and so on. +We would like to express our deep gratitude to all the open-source projects which contribute to making the dream of Dolphin Scheduler comes true. We hope that we are not only the beneficiaries of open-source, but also give back to the community. Besides, we expect the partners who have the same passion and conviction to open-source will join in and contribute to the open-source community! + ### Get Help 1. Submit an issue -1. Subscribe the mail list : https://dolphinscheduler.apache.org/en-us/docs/development/subscribe.html. then send mail to dev@dolphinscheduler.apache.org -1. Slack channel: [![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](https://join.slack.com/share/zt-do3gvfhj-UUhrAX2GxkVX_~JJt1jpKA) -1. Contact WeChat(dailidong66). This is just for Mandarin(CN) discussion. +1. Subscribe to the mail list: https://dolphinscheduler.apache.org/en-us/docs/development/subscribe.html, then email dev@dolphinscheduler.apache.org + ### License -Please refer to [LICENSE](https://github.com/apache/incubator-dolphinscheduler/blob/dev/LICENSE) file. +Please refer to the [LICENSE](https://github.com/apache/incubator-dolphinscheduler/blob/dev/LICENSE) file. diff --git a/README_zh_CN.md b/README_zh_CN.md index a02a2c3376..4f2a8f0bf8 100644 --- a/README_zh_CN.md +++ b/README_zh_CN.md @@ -50,10 +50,6 @@ Dolphin Scheduler Official Website ![security](https://user-images.githubusercontent.com/15833811/75209633-baa28200-57b9-11ea-9def-94bef2e212a7.jpg) -### 我要体验 - -- 我要体验 - ### 近期研发计划 @@ -86,7 +82,6 @@ Dolphin Scheduler使用了很多优秀的开源项目,比如google的guava、g ### 获得帮助 1. 提交issue 1. 先订阅邮件开发列表:[订阅邮件列表](https://dolphinscheduler.apache.org/zh-cn/docs/development/subscribe.html), 订阅成功后发送邮件到dev@dolphinscheduler.apache.org. -1. 联系微信群助手(ID:dailidong66). 微信仅用于中国用户讨论. ### 版权 请参考 [LICENSE](https://github.com/apache/incubator-dolphinscheduler/blob/dev/LICENSE) 文件. diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java index 1c13c1374c..8cfbc32fa9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java @@ -42,6 +42,7 @@ import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.User; import java.util.ArrayList; @@ -372,7 +373,7 @@ public class ProcessDefinitionController extends BaseController { } /** - * query datail of process definition + * query datail of process definition by id * * @param loginUser login user * @param projectName project name @@ -396,6 +397,29 @@ public class ProcessDefinitionController extends BaseController { return returnDataList(result); } + /** + * query datail of process definition by name + * + * @param loginUser login user + * @param projectName project name + * @param processDefinitionName process definition name + * @return process definition detail + */ + @ApiOperation(value = "queryProcessDefinitionByName", notes = "QUERY_PROCESS_DEFINITION_BY_NAME_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processDefinitionName", value = "PROCESS_DEFINITION_ID", required = true, dataType = "String") + }) + @GetMapping(value = "/select-by-name") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_DATAIL_OF_PROCESS_DEFINITION_ERROR) + public Result queryProcessDefinitionByName(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, + @RequestParam("processDefinitionName") String processDefinitionName + ) { + Map result = processDefinitionService.queryProcessDefinitionByName(loginUser, projectName, processDefinitionName); + return returnDataList(result); + } + /** * query Process definition list * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java index 56e7ef2087..a07478315a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.api.controller; +import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR; import org.apache.dolphinscheduler.api.exceptions.ApiException; @@ -36,6 +37,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestAttribute; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -127,4 +129,27 @@ public class TaskInstanceController extends BaseController { return returnDataListPaging(result); } + /** + * change one task instance's state from FAILURE to FORCED_SUCCESS + * + * @param loginUser login user + * @param projectName project name + * @param taskInstanceId task instance id + * @return the result code and msg + */ + @ApiOperation(value = "force-success", notes = "FORCE_TASK_SUCCESS") + @ApiImplicitParams({ + @ApiImplicitParam(name = "taskInstanceId", value = "TASK_INSTANCE_ID", required = true, dataType = "Int", example = "12") + }) + @PostMapping(value = "/force-success") + @ResponseStatus(HttpStatus.OK) + @ApiException(FORCE_TASK_SUCCESS_ERROR) + public Result forceTaskSuccess(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, + @RequestParam(value = "taskInstanceId") Integer taskInstanceId) { + logger.info("force task success, project: {}, task instance id: {}", projectName, taskInstanceId); + Map result = taskInstanceService.forceTaskSuccess(loginUser, projectName, taskInstanceId); + return returnDataList(result); + } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java index 6f42ba9f56..41c7abba85 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java @@ -57,6 +57,16 @@ public class TaskCountDto { .sum(); } + // remove the specified state + public void removeStateFromCountList(ExecutionStatus status) { + for (TaskStateCount count : this.taskCountDtos) { + if (count.getTaskStateType().equals(status)) { + this.taskCountDtos.remove(count); + break; + } + } + } + public List getTaskCountDtos() { return taskCountDtos; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java index cc1797295a..f4dac5aecd 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java @@ -30,7 +30,7 @@ public enum ExecuteType { * 4 stop * 5 pause */ - NONE,REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE; + NONE, REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE; public static ExecuteType getEnum(int value){ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 86b815cf9a..52d246dcb9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -196,6 +196,9 @@ public enum Status { QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_ERROR(10162, "query authorized and user created project error error", "查询授权的和用户创建的项目错误"), DELETE_PROCESS_DEFINITION_BY_ID_FAIL(10163,"delete process definition by id fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"), CHECK_OS_TENANT_CODE_ERROR(10164, "Please enter the English os tenant code", "请输入英文操作系统租户"), + FORCE_TASK_SUCCESS_ERROR(10165, "force task success error", "强制成功任务实例错误"), + TASK_INSTANCE_STATE_OPERATION_ERROR(10166, "the status of task instance {0} is {1},Cannot perform force success operation", "任务实例[{0}]的状态是[{1}],无法执行强制成功操作"), + UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), @@ -247,7 +250,7 @@ public enum Status { BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR(50026, "batch delete process definition by ids {0} error", "批量删除工作流定义[{0}]错误"), TENANT_NOT_SUITABLE(50027, "there is not any tenant suitable, please choose a tenant available.", "没有合适的租户,请选择可用的租户"), EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"), - BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028, "batch export process definition by ids error", "批量导出工作流定义错误"), + BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028,"batch export process definition by ids error", "批量导出工作流定义错误"), IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index f6f786b6b1..37d856059c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -95,6 +95,19 @@ public interface ProcessDefinitionService { String projectName, Integer processId); + /** + * query datail of process definition + * + * @param loginUser login user + * @param projectName project name + * @param processDefinitionName process definition name + * @return process definition detail + */ + + Map queryProcessDefinitionByName(User loginUser, + String projectName, + String processDefinitionName); + /** * batch copy process definition * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java index 012af8fd38..dd2caff3b6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java @@ -145,6 +145,50 @@ public class TaskInstanceService extends BaseService { return result; } + /** + * change one task instance's state from failure to forced success + * + * @param loginUser login user + * @param projectName project name + * @param taskInstanceId task instance id + * @return the result code and msg + */ + public Map forceTaskSuccess(User loginUser, String projectName, Integer taskInstanceId) { + Map result = new HashMap<>(5); + Project project = projectMapper.queryByName(projectName); + + // check user auth + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status status = (Status) checkResult.get(Constants.STATUS); + if (status != Status.SUCCESS) { + return checkResult; + } + + // check whether the task instance can be found + TaskInstance task = taskInstanceMapper.selectById(taskInstanceId); + if (task == null) { + putMsg(result, Status.TASK_INSTANCE_NOT_FOUND); + return result; + } + + // check whether the task instance state type is failure + if (!task.getState().typeIsFailure()) { + putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString()); + return result; + } + + // change the state of the task instance + task.setState(ExecutionStatus.FORCED_SUCCESS); + int changedNum = taskInstanceMapper.updateById(task); + if (changedNum > 0) { + putMsg(result, Status.SUCCESS); + } else { + putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR); + } + + return result; + } + /*** * generate {@link org.apache.dolphinscheduler.api.enums.Status#REQUEST_PARAMS_NOT_VALID_ERROR} res with param name * @param result exist result map diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java index c9560e1c50..7254fc1a88 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.api.service.DataAnalysisService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -114,12 +115,17 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis * @return process instance state count data */ public Map countProcessInstanceStateByProject(User loginUser, int projectId, String startDate, String endDate) { - return this.countStateByProject( + Map result = this.countStateByProject( loginUser, projectId, startDate, endDate, (start, end, projectIds) -> this.processInstanceMapper.countInstanceStateByUser(start, end, projectIds)); + // process state count needs to remove state of forced success + if (result.containsKey(Constants.STATUS) && result.get(Constants.STATUS).equals(Status.SUCCESS)) { + ((TaskCountDto)result.get(Constants.DATA_LIST)).removeStateFromCountList(ExecutionStatus.FORCED_SUCCESS); + } + return result; } private Map countStateByProject(User loginUser, int projectId, String startDate, String endDate diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index e4b01e45e7..245d33bb19 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -349,7 +349,29 @@ public class ProcessDefinitionServiceImpl extends BaseService implements ProcessDefinition processDefinition = processDefineMapper.selectById(processId); if (processDefinition == null) { - putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processId); + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); + } else { + result.put(Constants.DATA_LIST, processDefinition); + putMsg(result, Status.SUCCESS); + } + return result; + } + + @Override + public Map queryProcessDefinitionByName(User loginUser, String projectName, String processDefinitionName) { + + Map result = new HashMap<>(); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultStatus = (Status) checkResult.get(Constants.STATUS); + if (resultStatus != Status.SUCCESS) { + return checkResult; + } + + ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(),processDefinitionName); + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName); } else { result.put(Constants.DATA_LIST, processDefinition); putMsg(result, Status.SUCCESS); diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages.properties b/dolphinscheduler-api/src/main/resources/i18n/messages.properties index 4d5055b775..5837cfa929 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages.properties @@ -172,6 +172,7 @@ PROCESS_DEFINITION_ID=process definition id PROCESS_DEFINITION_IDS=process definition ids RELEASE_PROCESS_DEFINITION_NOTES=release process definition QUERY_PROCESS_DEFINITION_BY_ID_NOTES=query process definition by id +QUERY_PROCESS_DEFINITION_BY_NAME_NOTES=query process definition by name QUERY_PROCESS_DEFINITION_LIST_NOTES=query process definition list QUERY_PROCESS_DEFINITION_LIST_PAGING_NOTES=query process definition list paging QUERY_ALL_DEFINITION_LIST_NOTES=query all definition list diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java index b97c7c192f..031d7819cb 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java @@ -18,8 +18,13 @@ package org.apache.dolphinscheduler.api.controller; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.TaskInstanceService; @@ -27,24 +32,28 @@ import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.User; import java.util.HashMap; import java.util.Map; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.http.MediaType; +import org.springframework.test.web.servlet.MvcResult; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; /** * task instance controller test */ -@RunWith(MockitoJUnitRunner.Silent.class) -public class TaskInstanceControllerTest { +public class TaskInstanceControllerTest extends AbstractControllerTest { @InjectMocks private TaskInstanceController taskInstanceController; @@ -67,7 +76,27 @@ public class TaskInstanceControllerTest { Result taskResult = taskInstanceController.queryTaskListPaging(null, "", 1, "", "", "", "", ExecutionStatus.SUCCESS,"192.168.xx.xx", "2020-01-01 00:00:00", "2020-01-02 00:00:00",pageNo, pageSize); Assert.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode()); - } + @Ignore + @Test + public void testForceTaskSuccess() throws Exception { + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + paramsMap.add("taskInstanceId", "104"); + + Map mockResult = new HashMap<>(5); + mockResult.put(Constants.STATUS, Status.SUCCESS); + mockResult.put(Constants.MSG, Status.SUCCESS.getMsg()); + when(taskInstanceService.forceTaskSuccess(any(User.class), anyString(), anyInt())).thenReturn(mockResult); + + MvcResult mvcResult = mockMvc.perform(post("/projects/{projectName}/task-instance/force-success", "cxc_1113") + .header(SESSION_ID, sessionId) + .params(paramsMap)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8)) + .andReturn(); + + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); + } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java index 9f8d1b21f0..09b9f7d2e5 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java @@ -25,12 +25,14 @@ import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.RunMode; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Tenant; @@ -82,12 +84,16 @@ public class ExecutorService2Test { private int processDefinitionId = 1; + private int processInstanceId = 1; + private int tenantId = 1; private int userId = 1; private ProcessDefinition processDefinition = new ProcessDefinition(); + private ProcessInstance processInstance = new ProcessInstance(); + private User loginUser = new User(); private String projectName = "projectName"; @@ -107,6 +113,13 @@ public class ExecutorService2Test { processDefinition.setTenantId(tenantId); processDefinition.setUserId(userId); + // processInstance + processInstance.setId(processInstanceId); + processInstance.setProcessDefinitionId(processDefinitionId); + processInstance.setState(ExecutionStatus.FAILURE); + processInstance.setExecutorId(userId); + processInstance.setTenantId(tenantId); + // project project.setName(projectName); @@ -120,6 +133,8 @@ public class ExecutorService2Test { Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant()); Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1); Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(getMasterServersList()); + Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance); + Mockito.when(processService.findProcessDefineById(processDefinitionId)).thenReturn(processDefinition); } /** diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index e197e56e1f..89d8b86841 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -341,7 +341,7 @@ public class ProcessDefinitionServiceTest { Mockito.when(processDefineMapper.selectById(1)).thenReturn(null); Map instanceNotexitRes = processDefinitionService.queryProcessDefinitionById(loginUser, "project_test1", 1); - Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS)); + Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS)); //instance exit Mockito.when(processDefineMapper.selectById(46)).thenReturn(getProcessDefinition()); @@ -350,6 +350,41 @@ public class ProcessDefinitionServiceTest { Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); } + @Test + public void testQueryProcessDefinitionByName() { + String projectName = "project_test1"; + Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName)); + + Project project = getProject(projectName); + + User loginUser = new User(); + loginUser.setId(-1); + loginUser.setUserType(UserType.GENERAL_USER); + + Map result = new HashMap<>(); + putMsg(result, Status.PROJECT_NOT_FOUNT, projectName); + + //project check auth fail + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); + Map map = processDefinitionService.queryProcessDefinitionByName(loginUser, + "project_test1", "test_def"); + Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS)); + + //project check auth success, instance not exist + putMsg(result, Status.SUCCESS, projectName); + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); + Mockito.when(processDefineMapper.queryByDefineName(project.getId(),"test_def")).thenReturn(null); + Map instanceNotexitRes = processDefinitionService.queryProcessDefinitionByName(loginUser, + "project_test1", "test_def"); + Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS)); + + //instance exit + Mockito.when(processDefineMapper.queryByDefineName(project.getId(),"test")).thenReturn(getProcessDefinition()); + Map successRes = processDefinitionService.queryProcessDefinitionByName(loginUser, + "project_test1", "test"); + Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); + } + @Test public void testBatchCopyProcessDefinition() { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java index 199b34cc1b..b1989b4e31 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java @@ -214,4 +214,48 @@ public class TaskInstanceServiceTest { result.put(Constants.MSG, status.getMsg()); } } -} \ No newline at end of file + + @Test + public void forceTaskSuccess() { + User user = getAdminUser(); + String projectName = "test"; + Project project = getProject(projectName); + int taskId = 1; + TaskInstance task = getTaskInstance(); + + Map mockSuccess = new HashMap<>(5); + putMsg(mockSuccess, Status.SUCCESS); + when(projectMapper.queryByName(projectName)).thenReturn(project); + + // user auth failed + Map mockFailure = new HashMap<>(5); + putMsg(mockFailure, Status.USER_NO_OPERATION_PROJECT_PERM, user.getUserName(), projectName); + when(projectService.checkProjectAndAuth(user, project, projectName)).thenReturn(mockFailure); + Map authFailRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId); + Assert.assertNotSame(Status.SUCCESS, authFailRes.get(Constants.STATUS)); + + // test task not found + when(projectService.checkProjectAndAuth(user, project, projectName)).thenReturn(mockSuccess); + when(taskInstanceMapper.selectById(Mockito.anyInt())).thenReturn(null); + Map taskNotFoundRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId); + Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND, taskNotFoundRes.get(Constants.STATUS)); + + // test task instance state error + task.setState(ExecutionStatus.SUCCESS); + when(taskInstanceMapper.selectById(1)).thenReturn(task); + Map taskStateErrorRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId); + Assert.assertEquals(Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskStateErrorRes.get(Constants.STATUS)); + + // test error + task.setState(ExecutionStatus.FAILURE); + when(taskInstanceMapper.updateById(task)).thenReturn(0); + Map errorRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId); + Assert.assertEquals(Status.FORCE_TASK_SUCCESS_ERROR, errorRes.get(Constants.STATUS)); + + // test success + task.setState(ExecutionStatus.FAILURE); + when(taskInstanceMapper.updateById(task)).thenReturn(1); + Map successRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId); + Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java index f6ac2cf5ab..2f5c022333 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java @@ -41,6 +41,7 @@ public enum ExecutionStatus { * 10 waiting thread * 11 waiting depend node complete * 12 delay execution + * 13 forced success */ SUBMITTED_SUCCESS(0, "submit success"), RUNNING_EXECUTION(1, "running"), @@ -54,7 +55,8 @@ public enum ExecutionStatus { KILL(9, "kill"), WAITTING_THREAD(10, "waiting thread"), WAITTING_DEPEND(11, "waiting depend node complete"), - DELAY_EXECUTION(12, "delay execution"); + DELAY_EXECUTION(12, "delay execution"), + FORCED_SUCCESS(13, "forced success"); ExecutionStatus(int code, String descp) { this.code = code; @@ -79,7 +81,7 @@ public enum ExecutionStatus { * @return status */ public boolean typeIsSuccess() { - return this == SUCCESS; + return this == SUCCESS || this == FORCED_SUCCESS; } /** diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java index 89a8605a99..3d4f65ab50 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java @@ -63,7 +63,7 @@ public class VarPoolUtils { * @throws ParseException ParseException */ public static void convertVarPoolToMap(Map propToValue, String varPool) throws ParseException { - if (varPool == null || propToValue == null) { + if (propToValue == null || StringUtils.isEmpty(varPool)) { return; } String[] splits = varPool.split("\\$VarPool\\$"); diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java index a225f7654c..4bef7f1b68 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java @@ -14,8 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.dao.mapper; +package org.apache.dolphinscheduler.dao.mapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -27,6 +27,10 @@ import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; + +import java.util.Date; +import java.util.List; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -36,9 +40,6 @@ import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.transaction.annotation.Transactional; -import java.util.Date; -import java.util.List; - @RunWith(SpringRunner.class) @SpringBootTest @Transactional @@ -55,20 +56,38 @@ public class TaskInstanceMapperTest { @Autowired ProcessInstanceMapper processInstanceMapper; + @Autowired + ProcessInstanceMapMapper processInstanceMapMapper; + /** * insert + * * @return TaskInstance */ - private TaskInstance insertOne(){ + private TaskInstance insertOne() { //insertOne + return insertOne("us task", 1, ExecutionStatus.RUNNING_EXECUTION, TaskType.SHELL.toString()); + } + + /** + * construct a task instance and then insert + * + * @param taskName + * @param processInstanceId + * @param state + * @param taskType + * @return + */ + private TaskInstance insertOne(String taskName, int processInstanceId, ExecutionStatus state, String taskType) { TaskInstance taskInstance = new TaskInstance(); taskInstance.setFlag(Flag.YES); - taskInstance.setName("ut task"); - taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + taskInstance.setName(taskName); + taskInstance.setState(state); taskInstance.setStartTime(new Date()); taskInstance.setEndTime(new Date()); taskInstance.setTaskJson("{}"); - taskInstance.setTaskType(TaskType.SHELL.toString()); + taskInstance.setProcessInstanceId(processInstanceId); + taskInstance.setTaskType(taskType); taskInstanceMapper.insert(taskInstance); return taskInstance; } @@ -90,7 +109,7 @@ public class TaskInstanceMapperTest { * test delete */ @Test - public void testDelete(){ + public void testDelete() { TaskInstance taskInstance = insertOne(); int delete = taskInstanceMapper.deleteById(taskInstance.getId()); Assert.assertEquals(1, delete); @@ -149,7 +168,7 @@ public class TaskInstanceMapperTest { taskInstanceMapper.deleteById(task2.getId()); taskInstanceMapper.deleteById(task.getId()); Assert.assertNotEquals(taskInstances.size(), 0); - Assert.assertNotEquals(taskInstances1.size(), 0 ); + Assert.assertNotEquals(taskInstances1.size(), 0); } /** @@ -298,4 +317,4 @@ public class TaskInstanceMapperTest { Assert.assertNotEquals(taskInstanceIPage.getTotal(), 0); } -} \ No newline at end of file +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index e68f9d5937..f5946d5614 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -987,6 +987,7 @@ public class MasterExecThread implements Runnable { // updateProcessInstance completed task status // failure priority is higher than pause // if a task fails, other suspended tasks need to be reset kill + // check if there exists forced success nodes in errorTaskList if (errorTaskList.size() > 0) { for (Map.Entry entry : completeTaskList.entrySet()) { TaskInstance completeTask = entry.getValue(); @@ -996,6 +997,22 @@ public class MasterExecThread implements Runnable { processService.updateTaskInstance(completeTask); } } + for (Map.Entry entry : errorTaskList.entrySet()) { + TaskInstance errorTask = entry.getValue(); + TaskInstance currentTask = processService.findTaskInstanceById(errorTask.getId()); + if (currentTask == null) { + continue; + } + // for nodes that have been forced success + if (errorTask.getState().typeIsFailure() && currentTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) { + // update state in this thread and remove from errorTaskList + errorTask.setState(currentTask.getState()); + logger.info("task: {} has been forced success, remove it from error task list", errorTask.getName()); + errorTaskList.remove(errorTask.getName()); + // submit post nodes + submitPostNode(errorTask.getName()); + } + } } if (canSubmitTaskToQueue()) { submitStandByTask(); @@ -1096,6 +1113,18 @@ public class MasterExecThread implements Runnable { int length = readyToSubmitTaskQueue.size(); for (int i = 0; i < length; i++) { TaskInstance task = readyToSubmitTaskQueue.peek(); + // stop tasks which is retrying if forced success happens + if (task.taskCanRetry()) { + TaskInstance retryTask = processService.findTaskInstanceById(task.getId()); + if (retryTask != null && retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) { + task.setState(retryTask.getState()); + logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName()); + removeTaskFromStandbyList(task); + completeTaskList.put(task.getName(), task); + submitPostNode(task.getName()); + continue; + } + } DependResult dependResult = getDependResultForTask(task); if (DependResult.SUCCESS == dependResult) { if (retryTaskIntervalOverTime(task)) { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue index 69d1a4c503..980a755120 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue @@ -17,7 +17,7 @@