Browse Source

[Feature][Master] Add task caching mechanism to improve the running speed of repetitive tasks (#13194)

* Supports task instance cache operation

* add task plugin cache

* use SHA-256 to generate key

* Update dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

Co-authored-by: Jay Chung <zhongjiajie955@gmail.com>

* Update dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

Co-authored-by: Jay Chung <zhongjiajie955@gmail.com>

* Optimizing database Scripts

* Optimize clear cache operation

Co-authored-by: Jay Chung <zhongjiajie955@gmail.com>
3.2.0-release
JieguangZhou 2 years ago committed by GitHub
parent
commit
66e20271ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      docs/docs/en/faq.md
  2. 1
      docs/docs/en/guide/task/appendix.md
  3. 15
      docs/docs/zh/faq.md
  4. 1
      docs/docs/zh/guide/task/appendix.md
  5. 25
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java
  6. 44
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskInstance/TaskInstanceRemoveCacheResponse.java
  7. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  8. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
  9. 36
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
  10. 31
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
  11. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java
  12. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
  13. 6
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
  14. 1
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
  15. 13
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  16. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
  17. 14
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
  18. 20
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
  19. 161
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java
  20. 6
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
  21. 8
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
  22. 16
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
  23. 4
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
  24. 7
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
  25. 5
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
  26. 98
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql
  27. 14
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql
  28. 172
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtilsTest.java
  29. 49
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  30. 109
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandler.java
  31. 11
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
  32. 21
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  33. 112
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandlerTest.java
  34. 10
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/model/TaskNode.java
  35. 1
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  36. 2
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  37. 4
      dolphinscheduler-ui/src/locales/en_US/project.ts
  38. 4
      dolphinscheduler-ui/src/locales/zh_CN/project.ts
  39. 7
      dolphinscheduler-ui/src/service/modules/task-instances/index.ts
  40. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
  41. 29
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-cache.ts
  42. 3
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-run-flag.ts
  43. 2
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
  44. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-chunjun.ts
  45. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-data-quality.ts
  46. 3
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datasync.ts
  47. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datax.ts
  48. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dinky.ts
  49. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dms.ts
  50. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dvc.ts
  51. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts
  52. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink-stream.ts
  53. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink.ts
  54. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-hive-cli.ts
  55. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-http.ts
  56. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-java.ts
  57. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-jupyter.ts
  58. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts
  59. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-kubeflow.ts
  60. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-linkis.ts
  61. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-mlflow.ts
  62. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-mr.ts
  63. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-openmldb.ts
  64. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-pigeon.ts
  65. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-procedure.ts
  66. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-python.ts
  67. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-pytorch.ts
  68. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sagemaker.ts
  69. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sea-tunnel.ts
  70. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-shell.ts
  71. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts
  72. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts
  73. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sqoop.ts
  74. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-zeppelin.ts
  75. 4
      dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
  76. 31
      dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-context-menu.tsx
  77. 8
      dolphinscheduler-ui/src/views/projects/workflow/components/dag/index.tsx

15
docs/docs/en/faq.md

@ -752,4 +752,19 @@ start API server. If you want disabled when Python gateway service you could cha
---
## Q:How to determine whether a task has been cached when the cache is executed, that is, how to determine whether a task can use the running result of another task?
A: For the task identified as `Cache Execution`, when the task starts, a cache key will be generated, and the key is composed of the following fields and hashed:
- task definition: the id of the task definition corresponding to the task instance
- task version: the version of the task definition corresponding to the task instance
- task input parameters: including the parameters passed in by the upstream node and the global parameter, the parameters referenced by the parameter list of the task definition and the parameters used by the task definition using `${}`
- environment configuration: the actual configuration content of the environment configuration under the environment name, that is, the actual configuration content in the `security` - `environment management`
If the task with cache identification runs, it will find whether there is data with the same cache key in the database,
- If there is, copy the task instance and update the corresponding data
- If not, the task runs as usual, and the task instance data is stored in the cache when the task is completed
If you do not need to cache, you can right-click the node to run `Clear cache` in the workflow instance to clear the cache, which will clear the cache data of the current input parameters under this version.
We will collect more FAQ later

1
docs/docs/en/guide/task/appendix.md

@ -8,6 +8,7 @@ DolphinScheduler task plugins share some common default parameters. Each type of
|--------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Node Name | The name of the task. Node names within the same workflow must be unique. |
| Run Flag | Indicating whether to schedule the task. If you do not need to execute the task, you can turn on the `Prohibition execution` switch. |
| Cache Execution | Indicating whether this node needs to be cached. If it is cached, the same identifier (same task version, same task definition, same parameter input) task is cached. When the task has been cached, it will not be executed again, and the result will be reused directly. |
| Description | Describing the function of this node. |
| Task Priority | When the number of the worker threads is insufficient, the worker executes task according to the priority. When two tasks have the same priority, the worker will execute them in `first come first served` fashion. |
| Worker Group | Machines which execute the tasks. If you choose `default`, scheduler will send the task to a random worker. |

15
docs/docs/zh/faq.md

@ -720,4 +720,19 @@ A:在 3.0.0-alpha 版本之后,Python gateway server 集成到 api server
---
## Q: 缓存执行时怎么判断任务已经存在缓存过的任务,即如何判断一个任务可以使用另外一个任务的运行结果?
A: 对于标识为`缓存执行`的任务, 当任务启动时会生成一个缓存key, 该key由以下字段组合哈希得到:
- 任务定义:任务实例对应的任务定义的id
- 任务的版本:任务实例对应的任务定义的版本
- 任务输入的参数:包括上游节点和全局参数传入的参数中,被任务定义的参数列表所引用和任务定义中使用`${}`引用的参数
- 环境配置: 环境名称下具体的环境配置内容,具体为安全中心环境管理中的实际配置内容
当缓存标识的任务运行时,会查找数据库中是否用相同缓存key的数据,
- 若有则复制该任务实例并进行相应数据的更新
- 若无,则任务照常运行,并在任务完成时将任务实例的数据存入缓存
若不需要缓存时,可以在工作流实例中右键运行清除缓存,则会清除该版本下当前输入的参数的缓存数据。
我们会持续收集更多的 FAQ。

1
docs/docs/zh/guide/task/appendix.md

@ -8,6 +8,7 @@
|----------|--------------------------------------------------------------------------------------------------------------------------------------|
| 任务名称 | 任务的名称,同一个工作流定义中的节点名称不能重复。 |
| 运行标志 | 标识这个节点是否需要调度执行,如果不需要执行,可以打开禁止执行开关。 |
| 缓存执行 | 标识这个节点是否需要进行缓存,如果缓存,则对于相同标识(相同任务版本,相同任务定义,相同参数传入)的任务进行缓存,运行时若已经存在缓存过的任务时,不在重复执行,直接复用结果。 |
| 描述 | 当前节点的功能描述。 |
| 任务优先级 | worker线程数不足时,根据优先级从高到低依次执行任务,优先级一样时根据先到先得原则执行。 |
| Worker分组 | 设置分组后,任务会被分配给worker组的机器机执行。若选择Default,则会随机选择一个worker执行。 |

25
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java

@ -19,10 +19,12 @@ package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.REMOVE_TASK_INSTANCE_CACHE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.TASK_SAVEPOINT_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.TASK_STOP_ERROR;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.utils.Result;
@ -34,6 +36,7 @@ import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
@ -188,4 +191,26 @@ public class TaskInstanceController extends BaseController {
@PathVariable(value = "id") Integer id) {
return taskInstanceService.stopTask(loginUser, projectCode, id);
}
/**
* remove task instance cache
*
* @param loginUser login user
* @param projectCode project code
* @param id task instance id
* @return the result code and msg
*/
@Operation(summary = "remove-task-instance-cache", description = "REMOVE_TASK_INSTANCE_CACHE")
@Parameters({
@Parameter(name = "id", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "12"))
})
@DeleteMapping(value = "/{id}/remove-cache")
@ResponseStatus(HttpStatus.OK)
@ApiException(REMOVE_TASK_INSTANCE_CACHE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public TaskInstanceRemoveCacheResponse removeTaskInstanceCache(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "id") Integer id) {
return taskInstanceService.removeTaskInstanceCache(loginUser, projectCode, id);
}
}

44
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskInstance/TaskInstanceRemoveCacheResponse.java

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.taskInstance;
import org.apache.dolphinscheduler.api.utils.Result;
import lombok.Data;
/**
* task instance success response
*/
@Data
public class TaskInstanceRemoveCacheResponse extends Result {
private String cacheKey;
public TaskInstanceRemoveCacheResponse(Result result) {
super();
this.setCode(result.getCode());
this.setMsg(result.getMsg());
}
public TaskInstanceRemoveCacheResponse(Result result, String cacheKey) {
super();
this.setCode(result.getCode());
this.setMsg(result.getMsg());
this.cacheKey = cacheKey;
}
}

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -278,6 +278,7 @@ public enum Status {
UDF_RESOURCE_IS_BOUND(20013, "udf resource file is bound by UDF functions:{0}", "udf函数绑定了资源文件[{0}]"),
RESOURCE_IS_USED(20014, "resource file is used by process definition", "资源文件被上线的流程定义使用了"),
PARENT_RESOURCE_NOT_EXIST(20015, "parent resource not exist", "父资源文件不存在"),
RESOURCE_NOT_EXIST_OR_NO_PERMISSION(20016,
"resource not exist or no permission,please view the task node and remove error resource",
"请检查任务节点并移除无权限或者已删除的资源"),
@ -285,6 +286,8 @@ public enum Status {
"资源文件已授权其他用户[{0}],后缀不允许修改"),
RESOURCE_HAS_FOLDER(20018, "There are files or folders in the current directory:{0}", "当前目录下有文件或文件夹[{0}]"),
REMOVE_TASK_INSTANCE_CACHE_ERROR(20019, "remove task instance cache error", "删除任务实例缓存错误"),
USER_NO_OPERATION_PERM(30001, "user has no operation privilege", "当前用户没有操作权限"),
USER_NO_OPERATION_PROJECT_PERM(30002, "user {0} is not has project {1} permission", "当前用户[{0}]没有[{1}]项目的操作权限"),
USER_NO_WRITE_PROJECT_PERM(30003, "user [{0}] does not have write permission for project [{1}]",

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -100,4 +101,13 @@ public interface TaskInstanceService {
* @return the result code and msg
*/
TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long taskInstanceId);
/**
* remove task instance cache
* @param loginUser
* @param projectCode
* @param taskInstanceId
* @return
*/
TaskInstanceRemoveCacheResponse removeTaskInstanceCache(User loginUser, long projectCode, Integer taskInstanceId);
}

36
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java

@ -18,8 +18,10 @@
package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE;
import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
@ -38,6 +40,8 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskSavePointRequestCommand;
@ -45,6 +49,9 @@ import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
@ -81,6 +88,9 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
@Autowired
TaskInstanceMapper taskInstanceMapper;
@Autowired
TaskInstanceDao taskInstanceDao;
@Autowired
ProcessInstanceService processInstanceService;
@ -319,4 +329,30 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
}
return taskInstance;
}
@Override
public TaskInstanceRemoveCacheResponse removeTaskInstanceCache(User loginUser, long projectCode,
Integer taskInstanceId) {
Result result = new Result();
Project project = projectMapper.queryByCode(projectCode);
projectService.checkProjectAndAuthThrowException(loginUser, project, INSTANCE_UPDATE);
TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstanceId);
if (taskInstance == null) {
logger.error("Task definition can not be found, projectCode:{}, taskInstanceId:{}.", projectCode,
taskInstanceId);
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return new TaskInstanceRemoveCacheResponse(result);
}
String tagCacheKey = taskInstance.getCacheKey();
Pair<Integer, String> taskIdAndCacheKey = TaskCacheUtils.revertCacheKey(tagCacheKey);
String cacheKey = taskIdAndCacheKey.getRight();
if (StringUtils.isNotEmpty(cacheKey)) {
taskInstanceDao.clearCacheByCacheKey(cacheKey);
}
putMsg(result, Status.SUCCESS);
return new TaskInstanceRemoveCacheResponse(result, cacheKey);
}
}

31
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java

@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.TaskInstanceServiceImpl;
@ -40,6 +41,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -93,6 +95,9 @@ public class TaskInstanceServiceTest {
@Mock
TaskDefinitionMapper taskDefinitionMapper;
@Mock
TaskInstanceDao taskInstanceDao;
@Test
public void queryTaskListPaging() {
long projectCode = 1L;
@ -341,4 +346,30 @@ public class TaskInstanceServiceTest {
Assertions.assertEquals(Status.SUCCESS.getCode(), successRes.getCode().intValue());
}
@Test
public void testRemoveTaskInstanceCache() {
User user = getAdminUser();
long projectCode = 1L;
Project project = getProject(projectCode);
int taskId = 1;
TaskInstance task = getTaskInstance();
String cacheKey = "950311f3597f9198976cd3fd69e208e5b9ba6750";
task.setCacheKey(cacheKey);
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(taskInstanceMapper.selectById(1)).thenReturn(task);
when(taskInstanceDao.findTaskInstanceByCacheKey(cacheKey)).thenReturn(task, null);
when(taskInstanceDao.updateTaskInstance(task)).thenReturn(true);
TaskInstanceRemoveCacheResponse response =
taskInstanceService.removeTaskInstanceCache(user, projectCode, taskId);
Assertions.assertEquals(Status.SUCCESS.getCode(), response.getCode());
when(taskInstanceMapper.selectById(1)).thenReturn(null);
TaskInstanceRemoveCacheResponse responseNotFoundTask =
taskInstanceService.removeTaskInstanceCache(user, projectCode, taskId);
Assertions.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), responseNotFoundTask.getCode());
}
}

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java

@ -26,6 +26,7 @@ import com.baomidou.mybatisplus.annotation.EnumValue;
* have_arr_variables
* have_map_variables
* have_alert
* is_cache
*/
public enum Flag {

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java

@ -22,5 +22,6 @@ public enum TaskEventType {
DELAY,
RUNNING,
RESULT,
WORKER_REJECT
WORKER_REJECT,
CACHE,
}

6
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java

@ -115,6 +115,11 @@ public class TaskDefinition {
*/
private Flag flag;
/**
* task is cache: yes/no
*/
private Flag isCache;
/**
* task priority
*/
@ -281,6 +286,7 @@ public class TaskDefinition {
&& Objects.equals(taskType, that.taskType)
&& Objects.equals(taskParams, that.taskParams)
&& flag == that.flag
&& isCache == that.isCache
&& taskPriority == that.taskPriority
&& Objects.equals(workerGroup, that.workerGroup)
&& timeoutFlag == that.timeoutFlag

1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java

@ -68,6 +68,7 @@ public class TaskDefinitionLog extends TaskDefinition {
this.setFailRetryInterval(taskDefinition.getFailRetryInterval());
this.setFailRetryTimes(taskDefinition.getFailRetryTimes());
this.setFlag(taskDefinition.getFlag());
this.setIsCache(taskDefinition.getIsCache());
this.setModifyBy(taskDefinition.getModifyBy());
this.setCpuQuota(taskDefinition.getCpuQuota());
this.setMemoryMax(taskDefinition.getMemoryMax());

13
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -40,6 +40,7 @@ import java.util.Map;
import lombok.Data;
import com.baomidou.mybatisplus.annotation.FieldStrategy;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
@ -187,6 +188,17 @@ public class TaskInstance implements Serializable {
*/
private Flag flag;
/**
* task is cache: yes/no
*/
private Flag isCache;
/**
* cache_key
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private String cacheKey;
/**
* dependency
*/
@ -409,4 +421,5 @@ public class TaskInstance implements Serializable {
// task retry does not over time, return false
return getRetryInterval() * SEC_2_MINUTES_TIME_UNIT < failedTimeInterval;
}
}

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java

@ -57,6 +57,10 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int processInstanceId,
@Param("taskCode") Long taskCode);
TaskInstance queryByCacheKey(@Param("cacheKey") String cacheKey);
Boolean clearCacheByCacheKey(@Param("cacheKey") String cacheKey);
List<TaskInstance> queryByProcessInstanceIdsAndTaskCodes(@Param("processInstanceIds") List<Integer> processInstanceIds,
@Param("taskCodes") List<Long> taskCodes);

14
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java

@ -88,6 +88,20 @@ public interface TaskInstanceDao {
*/
TaskInstance findTaskInstanceById(Integer taskId);
/**
* find task instance by cache_key
* @param cacheKey cache key
* @return task instance
*/
TaskInstance findTaskInstanceByCacheKey(String cacheKey);
/**
* clear task instance cache by cache_key
* @param cacheKey cache key
* @return task instance
*/
Boolean clearCacheByCacheKey(String cacheKey);
/**
* find task instance list by id list
* @param idList task id list

20
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Date;
@ -164,6 +165,25 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao {
return taskInstanceMapper.selectById(taskId);
}
@Override
public TaskInstance findTaskInstanceByCacheKey(String cacheKey) {
if (StringUtils.isEmpty(cacheKey)) {
return null;
}
return taskInstanceMapper.queryByCacheKey(cacheKey);
}
@Override
public Boolean clearCacheByCacheKey(String cacheKey) {
try {
taskInstanceMapper.clearCacheByCacheKey(cacheKey);
return true;
} catch (Exception e) {
logger.error("clear cache by cacheKey failed", e);
return false;
}
}
@Override
public List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList) {
if (CollectionUtils.isEmpty(idList)) {

161
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.utils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.JsonNode;
public class TaskCacheUtils {
private TaskCacheUtils() {
throw new IllegalStateException("Utility class");
}
public static final String MERGE_TAG = "-";
/**
* generate cache key for task instance
* the follow message will be used to generate cache key
* 2. task version
* 3. task is cache
* 4. input VarPool, from upstream task and workflow global parameters
* @param taskInstance task instance
* @param taskExecutionContext taskExecutionContext
* @return cache key
*/
public static String generateCacheKey(TaskInstance taskInstance, TaskExecutionContext taskExecutionContext) {
List<String> keyElements = new ArrayList<>();
keyElements.add(String.valueOf(taskInstance.getTaskCode()));
keyElements.add(String.valueOf(taskInstance.getTaskDefinitionVersion()));
keyElements.add(String.valueOf(taskInstance.getIsCache().getCode()));
keyElements.add(String.valueOf(taskInstance.getEnvironmentConfig()));
keyElements.add(getTaskInputVarPoolData(taskInstance, taskExecutionContext));
String data = StringUtils.join(keyElements, "_");
return DigestUtils.sha256Hex(data);
}
/**
* generate cache key for task instance which is cache execute
* this key will record which cache task instance will be copied, and cache key will be used
* tagCacheKey = sourceTaskId + "-" + cacheKey
* @param sourceTaskId source task id
* @param cacheKey cache key
* @return tagCacheKey
*/
public static String generateTagCacheKey(Integer sourceTaskId, String cacheKey) {
return sourceTaskId + MERGE_TAG + cacheKey;
}
/**
* revert cache key tag to source task id and cache key
* @param tagCacheKey cache key
* @return Pair<Integer, String>, first is source task id, second is cache key
*/
public static Pair<Integer, String> revertCacheKey(String tagCacheKey) {
Pair<Integer, String> taskIdAndCacheKey;
if (tagCacheKey == null) {
taskIdAndCacheKey = Pair.of(-1, "");
return taskIdAndCacheKey;
}
if (tagCacheKey.contains(MERGE_TAG)) {
String[] split = tagCacheKey.split(MERGE_TAG);
if (split.length == 2) {
taskIdAndCacheKey = Pair.of(Integer.parseInt(split[0]), split[1]);
} else {
taskIdAndCacheKey = Pair.of(-1, "");
}
return taskIdAndCacheKey;
} else {
return Pair.of(-1, tagCacheKey);
}
}
/**
* get hash data of task input var pool
* there are two parts of task input var pool: from upstream task and workflow global parameters
* @param taskInstance task instance
* taskExecutionContext taskExecutionContext
*/
public static String getTaskInputVarPoolData(TaskInstance taskInstance, TaskExecutionContext context) {
JsonNode taskParams = JSONUtils.parseObject(taskInstance.getTaskParams());
// The set of input values considered from localParams in the taskParams
Set<String> propertyInSet = JSONUtils.toList(taskParams.get("localParams").toString(), Property.class).stream()
.filter(property -> property.getDirect().equals(Direct.IN))
.map(Property::getProp).collect(Collectors.toSet());
// The set of input values considered from `${var}` form task definition
propertyInSet.addAll(getScriptVarInSet(taskInstance));
// var pool value from upstream task
List<Property> varPool = JSONUtils.toList(taskInstance.getVarPool(), Property.class);
// var pool value from workflow global parameters
if (context.getPrepareParamsMap() != null) {
Set<String> taskVarPoolSet = varPool.stream().map(Property::getProp).collect(Collectors.toSet());
List<Property> globalContextVarPool = context.getPrepareParamsMap().entrySet().stream()
.filter(entry -> !taskVarPoolSet.contains(entry.getKey()))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
varPool.addAll(globalContextVarPool);
}
// only consider var pool value which is in propertyInSet
varPool = varPool.stream()
.filter(property -> property.getDirect().equals(Direct.IN))
.filter(property -> propertyInSet.contains(property.getProp()))
.sorted(Comparator.comparing(Property::getProp))
.collect(Collectors.toList());
return JSONUtils.toJsonString(varPool);
}
/**
* get var in set from task definition
* @param taskInstance task instance
* @return var in set
*/
public static List<String> getScriptVarInSet(TaskInstance taskInstance) {
Pattern pattern = Pattern.compile("\\$\\{(.+?)\\}");
Matcher matcher = pattern.matcher(taskInstance.getTaskParams());
List<String> varInSet = new ArrayList<>();
while (matcher.find()) {
varInSet.add(matcher.group(1));
}
return varInSet;
}
}

6
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml

@ -19,7 +19,7 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper">
<sql id="baseSql">
id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority,
id, code, name, version, description, project_code, user_id, task_type, task_params, flag, is_cache, task_priority,
worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
resource_ids, operator, operate_time, create_time, update_time, task_group_id, task_group_priority, cpu_quota, memory_max, task_execute_type
</sql>
@ -50,14 +50,14 @@
</select>
<insert id="batchInsert">
insert into t_ds_task_definition_log (code, name, version, description, project_code, user_id,
task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval,
task_type, task_params, flag, is_cache, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval,
timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, operator, operate_time, create_time,
update_time, task_group_id, task_group_priority, cpu_quota, memory_max, task_execute_type)
values
<foreach collection="taskDefinitionLogs" item="taskDefinitionLog" separator=",">
(#{taskDefinitionLog.code},#{taskDefinitionLog.name},#{taskDefinitionLog.version},#{taskDefinitionLog.description},
#{taskDefinitionLog.projectCode},#{taskDefinitionLog.userId},#{taskDefinitionLog.taskType},#{taskDefinitionLog.taskParams},
#{taskDefinitionLog.flag},#{taskDefinitionLog.taskPriority},#{taskDefinitionLog.workerGroup},#{taskDefinitionLog.environmentCode},
#{taskDefinitionLog.flag},#{taskDefinitionLog.isCache},#{taskDefinitionLog.taskPriority},#{taskDefinitionLog.workerGroup},#{taskDefinitionLog.environmentCode},
#{taskDefinitionLog.failRetryTimes},#{taskDefinitionLog.failRetryInterval},#{taskDefinitionLog.timeoutFlag},#{taskDefinitionLog.timeoutNotifyStrategy},
#{taskDefinitionLog.timeout},#{taskDefinitionLog.delayTime},#{taskDefinitionLog.resourceIds},#{taskDefinitionLog.operator},#{taskDefinitionLog.operateTime},
#{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime}, #{taskDefinitionLog.taskGroupId},#{taskDefinitionLog.taskGroupPriority},

8
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

@ -19,13 +19,13 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper">
<sql id="baseSql">
id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority,
id, code, name, version, description, project_code, user_id, task_type, task_params, flag, is_cache, task_priority,
worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
resource_ids, create_time, update_time, task_group_id,task_group_priority, cpu_quota, memory_max, task_execute_type
</sql>
<sql id="baseSqlV2">
${alias}.id, ${alias}.code, ${alias}.name, ${alias}.version, ${alias}.description, ${alias}.project_code, ${alias}.user_id,
${alias}.task_type, ${alias}.task_params, ${alias}.flag, ${alias}.task_priority, ${alias}.worker_group, ${alias}.environment_code,
${alias}.task_type, ${alias}.task_params, ${alias}.flag, ${alias}.is_cache, ${alias}.task_priority, ${alias}.worker_group, ${alias}.environment_code,
${alias}.fail_retry_times, ${alias}.fail_retry_interval, ${alias}.timeout_flag, ${alias}.timeout_notify_strategy, ${alias}.timeout,
${alias}.delay_time, ${alias}.resource_ids, ${alias}.create_time, ${alias}.update_time, ${alias}.task_group_id,
${alias}.task_group_priority, ${alias}.cpu_quota, ${alias}.memory_max, ${alias}.task_execute_type
@ -96,12 +96,12 @@
<insert id="batchInsert">
insert into t_ds_task_definition (code, name, version, description, project_code, user_id,
task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval,
task_type, task_params, flag, is_cache, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval,
timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, create_time, update_time,task_group_id, task_execute_type)
values
<foreach collection="taskDefinitions" item="taskDefinition" separator=",">
(#{taskDefinition.code},#{taskDefinition.name},#{taskDefinition.version},#{taskDefinition.description},
#{taskDefinition.projectCode},#{taskDefinition.userId},#{taskDefinition.taskType},#{taskDefinition.taskParams},#{taskDefinition.flag},
#{taskDefinition.projectCode},#{taskDefinition.userId},#{taskDefinition.taskType},#{taskDefinition.taskParams},#{taskDefinition.flag},#{taskDefinition.isCache},
#{taskDefinition.taskPriority},#{taskDefinition.workerGroup},#{taskDefinition.environmentCode},#{taskDefinition.failRetryTimes},
#{taskDefinition.failRetryInterval},#{taskDefinition.timeoutFlag},#{taskDefinition.timeoutNotifyStrategy},#{taskDefinition.timeout},
#{taskDefinition.delayTime},#{taskDefinition.resourceIds},#{taskDefinition.createTime},#{taskDefinition.updateTime},

16
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

@ -21,13 +21,13 @@
<sql id="baseSql">
id, name, task_type, process_instance_id, task_code, task_definition_version, state, submit_time,
start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link,
flag, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id,
flag, is_cache, cache_key, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id,
first_submit_time, delay_time, task_params, var_pool, dry_run, test_flag, task_group_id, cpu_quota, memory_max, task_execute_type
</sql>
<sql id="baseSqlV2">
${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time,
${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link,
${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id,
${alias}.flag, ${alias}.is_cache, ${alias}.cache_key, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id,
${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run, ${alias}.test_flag, ${alias}.task_group_id, ${alias}.task_execute_type
</sql>
<update id="setFailoverByHostAndStateArray">
@ -200,6 +200,18 @@
and flag = 1
limit 1
</select>
<select id="queryByCacheKey" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
from t_ds_task_instance
where cache_key = #{cacheKey}
limit 1
</select>
<update id="clearCacheByCacheKey">
update t_ds_task_instance
set cache_key = null
where cache_key = #{cacheKey}
</update>
<select id="queryByProcessInstanceIdsAndTaskCodes" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>

4
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql

@ -481,6 +481,7 @@ CREATE TABLE t_ds_task_definition
task_execute_type int(11) DEFAULT '0',
task_params longtext,
flag tinyint(2) DEFAULT NULL,
is_cache tinyint(2) DEFAULT '0',
task_priority tinyint(4) DEFAULT '2',
worker_group varchar(200) DEFAULT NULL,
environment_code bigint(20) DEFAULT '-1',
@ -517,6 +518,7 @@ CREATE TABLE t_ds_task_definition_log
task_execute_type int(11) DEFAULT '0',
task_params text,
flag tinyint(2) DEFAULT NULL,
is_cache tinyint(2) DEFAULT '0',
task_priority tinyint(4) DEFAULT '2',
worker_group varchar(200) DEFAULT NULL,
environment_code bigint(20) DEFAULT '-1',
@ -883,6 +885,8 @@ CREATE TABLE t_ds_task_instance
app_link text,
task_params longtext,
flag tinyint(4) DEFAULT '1',
is_cache tinyint(2) DEFAULT '0',
cache_key varchar(200) DEFAULT NULL,
retry_interval int(4) DEFAULT NULL,
max_retry_times int(2) DEFAULT NULL,
task_instance_priority int(11) DEFAULT NULL,

7
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

@ -481,6 +481,7 @@ CREATE TABLE `t_ds_task_definition` (
`task_execute_type` int(11) DEFAULT '0' COMMENT 'task execute type: 0-batch, 1-stream',
`task_params` longtext COMMENT 'job custom parameters',
`flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available',
`is_cache` tinyint(2) DEFAULT '0' COMMENT '0 not available, 1 available',
`task_priority` tinyint(4) DEFAULT '2' COMMENT 'job priority',
`worker_group` varchar(200) DEFAULT NULL COMMENT 'worker grouping',
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
@ -516,6 +517,7 @@ CREATE TABLE `t_ds_task_definition_log` (
`task_execute_type` int(11) DEFAULT '0' COMMENT 'task execute type: 0-batch, 1-stream',
`task_params` longtext COMMENT 'job custom parameters',
`flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available',
`is_cache` tinyint(2) DEFAULT '0' COMMENT '0 not available, 1 available',
`task_priority` tinyint(4) DEFAULT '2' COMMENT 'job priority',
`worker_group` varchar(200) DEFAULT NULL COMMENT 'worker grouping',
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
@ -877,6 +879,8 @@ CREATE TABLE `t_ds_task_instance` (
`app_link` text COMMENT 'yarn app id',
`task_params` longtext COMMENT 'job custom parameters',
`flag` tinyint(4) DEFAULT '1' COMMENT '0 not available, 1 available',
`is_cache` tinyint(2) DEFAULT '0' COMMENT '0 not available, 1 available',
`cache_key` varchar(200) DEFAULT NULL COMMENT 'cache_key',
`retry_interval` int(4) DEFAULT NULL COMMENT 'retry interval when task failed ',
`max_retry_times` int(2) DEFAULT NULL COMMENT 'max retry times',
`task_instance_priority` int(11) DEFAULT NULL COMMENT 'task instance priority:0 Highest,1 High,2 Medium,3 Low,4 Lowest',
@ -894,7 +898,8 @@ CREATE TABLE `t_ds_task_instance` (
`test_flag` tinyint(4) DEFAULT null COMMENT 'test flag:0 normal, 1 test run',
PRIMARY KEY (`id`),
KEY `process_instance_id` (`process_instance_id`) USING BTREE,
KEY `idx_code_version` (`task_code`, `task_definition_version`) USING BTREE
KEY `idx_code_version` (`task_code`, `task_definition_version`) USING BTREE,
KEY `idx_cache_key` (`cache_key`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ----------------------------

5
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

@ -398,6 +398,7 @@ CREATE TABLE t_ds_task_definition (
task_execute_type int DEFAULT '0',
task_params text ,
flag int DEFAULT NULL ,
is_cache int DEFAULT '0',
task_priority int DEFAULT '2' ,
worker_group varchar(255) DEFAULT NULL ,
environment_code bigint DEFAULT '-1',
@ -436,6 +437,7 @@ CREATE TABLE t_ds_task_definition_log (
task_execute_type int DEFAULT '0',
task_params text ,
flag int DEFAULT NULL ,
is_cache int DEFAULT '0' ,
task_priority int DEFAULT '2' ,
worker_group varchar(255) DEFAULT NULL ,
environment_code bigint DEFAULT '-1',
@ -777,6 +779,8 @@ CREATE TABLE t_ds_task_instance (
app_link text ,
task_params text ,
flag int DEFAULT '1' ,
is_cache int DEFAULT '0',
cache_key varchar(200) DEFAULT NULL,
retry_interval int DEFAULT NULL ,
max_retry_times int DEFAULT NULL ,
task_instance_priority int DEFAULT NULL ,
@ -796,6 +800,7 @@ CREATE TABLE t_ds_task_instance (
) ;
create index idx_task_instance_code_version on t_ds_task_instance (task_code, task_definition_version);
create index idx_cache_key on t_ds_task_instance (cache_key);
--
-- Table structure for table t_ds_tenant

98
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql

@ -120,3 +120,101 @@ d//
delimiter ;
CALL uc_dolphin_T_t_ds_task_instance_R_test_flag;
DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_R_test_flag;
-- uc_dolphin_T_t_ds_task_definition_R_is_cache
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_definition_R_is_cache;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_task_definition_R_is_cache()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_definition'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='is_cache')
THEN
ALTER TABLE t_ds_task_definition ADD `is_cache` tinyint(2) DEFAULT '0' COMMENT '0 not available, 1 available';
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_task_definition_R_is_cache;
DROP PROCEDURE uc_dolphin_T_t_ds_task_definition_R_is_cache;
-- uc_dolphin_T_t_ds_task_definition_log_R_is_cache
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_definition_log_R_is_cache;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_task_definition_log_R_is_cache()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_definition_log'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='is_cache')
THEN
ALTER TABLE t_ds_task_definition_log ADD `is_cache` tinyint(2) DEFAULT '0' COMMENT '0 not available, 1 available';
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_task_definition_log_R_is_cache;
DROP PROCEDURE uc_dolphin_T_t_ds_task_definition_log_R_is_cache;
-- uc_dolphin_T_t_ds_task_instance_R_is_cache
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_instance_R_is_cache;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_task_instance_R_is_cache()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='is_cache')
THEN
ALTER TABLE t_ds_task_instance ADD `is_cache` tinyint(2) DEFAULT '0' COMMENT '0 not available, 1 available';
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_task_instance_R_is_cache;
DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_R_is_cache;
-- uc_dolphin_T_t_ds_task_instance_R_cache_key
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_instance_R_cache_key;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_task_instance_R_cache_key()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='cache_key')
THEN
ALTER TABLE t_ds_task_instance ADD `cache_key` varchar(255) DEFAULT null COMMENT 'cache key';
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_task_instance_R_cache_key;
DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_R_cache_key;
-- ALTER TABLE `t_ds_task_instance` ADD KEY `cache_key`( `cache_key`);
drop PROCEDURE if EXISTS add_t_ds_task_instance_idx_cache_key;
delimiter d//
CREATE PROCEDURE add_t_ds_task_instance_idx_cache_key()
BEGIN
IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_NAME='t_ds_task_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND INDEX_NAME='cache_key')
THEN
ALTER TABLE `t_ds_task_instance` ADD KEY `cache_key`( `cache_key` );
END IF;
END;
d//
delimiter ;
CALL add_t_ds_task_instance_idx_cache_key;
DROP PROCEDURE add_t_ds_task_instance_idx_cache_key;

14
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql

@ -119,3 +119,17 @@ d//
delimiter ;
select uc_dolphin_T_t_ds_task_instance_R_test_flag();
DROP FUNCTION uc_dolphin_T_t_ds_task_instance_R_test_flag();
ALTER TABLE t_ds_task_definition DROP COLUMN IF EXISTS is_cache;
ALTER TABLE t_ds_task_definition ADD COLUMN IF NOT EXISTS is_cache int DEFAULT '0';
ALTER TABLE t_ds_task_definition_log DROP COLUMN IF EXISTS is_cache;
ALTER TABLE t_ds_task_definition_log ADD COLUMN IF NOT EXISTS is_cache int DEFAULT '0';
ALTER TABLE t_ds_task_instance DROP COLUMN IF EXISTS is_cache;
ALTER TABLE t_ds_task_instance ADD COLUMN IF NOT EXISTS is_cache int DEFAULT '0';
ALTER TABLE t_ds_task_instance ADD COLUMN IF NOT EXISTS cache_key varchar(200) DEFAULT NULL;
ALTER TABLE t_ds_task_instance DROP COLUMN IF EXISTS cacke_key;
CREATE INDEX IF NOT EXISTS idx_cache_key ON t_ds_task_instance USING Btree("cache_key");

172
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtilsTest.java

@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.utils;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.commons.lang3.tuple.Pair;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class TaskCacheUtilsTest {
private TaskInstance taskInstance;
private TaskExecutionContext taskExecutionContext;
@BeforeEach
void setUp() {
String taskParams = "{\n" +
" \"localParams\": [\n" +
" {\n" +
" \"prop\": \"a\",\n" +
" \"direct\": \"IN\",\n" +
" \"type\": \"VARCHAR\",\n" +
" \"value\": \"\"\n" +
" },\n" +
" {\n" +
" \"prop\": \"b\",\n" +
" \"direct\": \"IN\",\n" +
" \"type\": \"VARCHAR\",\n" +
" \"value\": \"bb\"\n" +
" }\n" +
" ],\n" +
" \"rawScript\": \"echo ${c}\\necho ${d}\",\n" +
" \"resourceList\": []\n" +
"}";
String varPool = "[\n" +
" {\n" +
" \"prop\": \"c\",\n" +
" \"direct\": \"IN\",\n" +
" \"type\": \"VARCHAR\",\n" +
" \"value\": \"cc\"\n" +
" },\n" +
" {\n" +
" \"prop\": \"k\",\n" +
" \"direct\": \"IN\",\n" +
" \"type\": \"VARCHAR\",\n" +
" \"value\": \"kk\"\n" +
" }\n" +
"]";
taskInstance = new TaskInstance();
taskInstance.setTaskParams(taskParams);
taskInstance.setVarPool(varPool);
taskInstance.setTaskCode(123L);
taskInstance.setTaskDefinitionVersion(1);
taskInstance.setIsCache(Flag.YES);
taskExecutionContext = new TaskExecutionContext();
Property property = new Property();
property.setProp("a");
property.setDirect(Direct.IN);
property.setType(DataType.VARCHAR);
property.setValue("aa");
Map<String, Property> prepareParamsMap = new HashMap<>();
prepareParamsMap.put("a", property);
taskExecutionContext.setPrepareParamsMap(prepareParamsMap);
}
@Test
void testRevertCacheKey() {
Pair<Integer, String> taskIdAndCacheKey1 = TaskCacheUtils.revertCacheKey(null);
Assertions.assertEquals(Pair.of(-1, ""), taskIdAndCacheKey1);
Pair<Integer, String> taskIdAndCacheKey2 = TaskCacheUtils.revertCacheKey("123");
Assertions.assertEquals(Pair.of(-1, "123"), taskIdAndCacheKey2);
Pair<Integer, String> taskIdAndCacheKey3 = TaskCacheUtils.revertCacheKey("1-123");
Assertions.assertEquals(Pair.of(1, "123"), taskIdAndCacheKey3);
Pair<Integer, String> taskIdAndCacheKey4 = TaskCacheUtils.revertCacheKey("1-123-4");
Assertions.assertEquals(Pair.of(-1, ""), taskIdAndCacheKey4);
}
@Test
void testGetScriptVarInSet() {
List<String> scriptVarInSet = TaskCacheUtils.getScriptVarInSet(taskInstance);
List<String> except = new ArrayList<>(Arrays.asList("c", "d"));
Assertions.assertEquals(except, scriptVarInSet);
}
@Test
void TestGetTaskInputVarPoolData() {
TaskCacheUtils.getTaskInputVarPoolData(taskInstance, taskExecutionContext);
// only a=aa and c=cc will influence the result,
// b=bb is a fixed value, will be considered in task version
// k=kk is not in task params, will be ignored
String except =
"[{\"prop\":\"a\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"aa\"},{\"prop\":\"c\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"cc\"}]";
Assertions.assertEquals(except, TaskCacheUtils.getTaskInputVarPoolData(taskInstance, taskExecutionContext));
}
@Test
void TestGenerateCacheKey() {
String cacheKeyBase = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
Property propertyI = new Property();
propertyI.setProp("i");
propertyI.setDirect(Direct.IN);
propertyI.setType(DataType.VARCHAR);
propertyI.setValue("ii");
taskExecutionContext.getPrepareParamsMap().put("i", propertyI);
String cacheKeyNew = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
// i will not influence the result, because task instance not use it
Assertions.assertEquals(cacheKeyBase, cacheKeyNew);
Property propertyD = new Property();
propertyD.setProp("d");
propertyD.setDirect(Direct.IN);
propertyD.setType(DataType.VARCHAR);
propertyD.setValue("dd");
taskExecutionContext.getPrepareParamsMap().put("i", propertyD);
String cacheKeyD = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
// d will influence the result, because task instance use it
Assertions.assertNotEquals(cacheKeyBase, cacheKeyD);
taskInstance.setTaskDefinitionVersion(100);
String cacheKeyE = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
// task definition version is changed, so cache key changed
Assertions.assertNotEquals(cacheKeyD, cacheKeyE);
taskInstance.setEnvironmentConfig("export PYTHON_HOME=/bin/python3");
String cacheKeyF = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
// EnvironmentConfig is changed, so cache key changed
Assertions.assertNotEquals(cacheKeyE, cacheKeyF);
}
@Test
void testGetCacheKey() {
String cacheKey = TaskCacheUtils.generateTagCacheKey(1, "123");
Assertions.assertEquals("1-123", cacheKey);
}
}

49
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -18,11 +18,13 @@
package org.apache.dolphinscheduler.server.master.consumer;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
@ -217,6 +219,11 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
}
}
// check task is cache execution, and decide whether to dispatch
if (checkIsCacheExecution(taskInstance, context)) {
return true;
}
result = dispatcher.dispatch(executionContext);
if (result) {
@ -276,4 +283,46 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
}
return false;
}
/**
* check if task is cache execution
* if the task is defined as cache execution, and we find the cache task instance is finished yet, we will not dispatch this task
* @param taskInstance taskInstance
* @param context context
* @return true if we will not dispatch this task, false if we will dispatch this task
*/
private boolean checkIsCacheExecution(TaskInstance taskInstance, TaskExecutionContext context) {
try {
// check if task is defined as a cache task
if (taskInstance.getIsCache().equals(Flag.NO)) {
return false;
}
// check if task is cache execution
String cacheKey = TaskCacheUtils.generateCacheKey(taskInstance, context);
TaskInstance cacheTaskInstance = taskInstanceDao.findTaskInstanceByCacheKey(cacheKey);
// if we can find the cache task instance, we will add cache event, and return true.
if (cacheTaskInstance != null) {
logger.info("Task {} is cache, no need to dispatch, task instance id: {}",
taskInstance.getName(), taskInstance.getId());
addCacheEvent(taskInstance, cacheTaskInstance);
taskInstance.setCacheKey(TaskCacheUtils.generateTagCacheKey(cacheTaskInstance.getId(), cacheKey));
return true;
} else {
// if we can not find cache task, update cache key, and return false. the task will be dispatched
taskInstance.setCacheKey(TaskCacheUtils.generateTagCacheKey(taskInstance.getId(), cacheKey));
}
} catch (Exception e) {
logger.error("checkIsCacheExecution error", e);
}
return false;
}
private void addCacheEvent(TaskInstance taskInstance, TaskInstance cacheTaskInstance) {
if (cacheTaskInstance == null) {
return;
}
TaskEvent taskEvent = TaskEvent.newCacheEvent(taskInstance.getProcessInstanceId(), taskInstance.getId(),
cacheTaskInstance.getId());
taskEventService.addEvent(taskEvent);
}
}

109
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandler.java

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Date;
import java.util.Optional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TaskCacheEventHandler implements TaskEventHandler {
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@Autowired
private DataQualityResultOperator dataQualityResultOperator;
@Autowired
private ProcessService processService;
@Autowired
private TaskInstanceDao taskInstanceDao;
/**
* handle CACHE task event
* copy a new task instance from the cache task has been successfully run
* @param taskEvent task event
*/
@Override
public void handleTaskEvent(TaskEvent taskEvent) {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId(
processInstanceId);
Optional<TaskInstance> taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskInstanceId);
if (!taskInstanceOptional.isPresent()) {
return;
}
TaskInstance taskInstance = taskInstanceOptional.get();
dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
TaskInstance cacheTaskInstance = taskInstanceDao.findTaskInstanceById(taskEvent.getCacheTaskInstanceId());
// keep the task instance fields
cacheTaskInstance.setId(taskInstance.getId());
cacheTaskInstance.setProcessInstanceId(processInstanceId);
cacheTaskInstance.setProcessInstanceName(taskInstance.getProcessInstanceName());
cacheTaskInstance.setProcessInstance(taskInstance.getProcessInstance());
cacheTaskInstance.setProcessDefine(taskInstance.getProcessDefine());
cacheTaskInstance.setStartTime(taskInstance.getSubmitTime());
cacheTaskInstance.setSubmitTime(taskInstance.getSubmitTime());
cacheTaskInstance.setEndTime(new Date());
cacheTaskInstance.setFlag(Flag.YES);
TaskInstanceUtils.copyTaskInstance(cacheTaskInstance, taskInstance);
processService.changeOutParam(taskInstance);
taskInstanceDao.updateTaskInstance(taskInstance);
TaskStateEvent stateEvent = TaskStateEvent.builder()
.processInstanceId(taskEvent.getProcessInstanceId())
.taskInstanceId(taskEvent.getTaskInstanceId())
.status(taskEvent.getState())
.type(StateEventType.TASK_STATE_CHANGE)
.build();
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
@Override
public TaskEventType getHandleEventType() {
return TaskEventType.CACHE;
}
}

11
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java

@ -90,6 +90,8 @@ public class TaskEvent {
*/
private String varPool;
private int cacheTaskInstanceId;
/**
* channel
*/
@ -147,4 +149,13 @@ public class TaskEvent {
event.setEvent(TaskEventType.WORKER_REJECT);
return event;
}
public static TaskEvent newCacheEvent(int processInstanceId, int taskInstanceId, int cacheTaskInstanceId) {
TaskEvent event = new TaskEvent();
event.setProcessInstanceId(processInstanceId);
event.setTaskInstanceId(taskInstanceId);
event.setCacheTaskInstanceId(cacheTaskInstanceId);
event.setEvent(TaskEventType.CACHE);
return event;
}
}

21
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -60,6 +60,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@ -95,6 +96,7 @@ import org.apache.dolphinscheduler.service.utils.LoggerUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.lang3.tuple.Pair;
import java.util.ArrayList;
import java.util.Arrays;
@ -411,6 +413,10 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
// todo: merge the last taskInstance
processInstance.setVarPool(taskInstance.getVarPool());
processInstanceDao.upsertProcessInstance(processInstance);
// save the cacheKey only if the task is defined as cache task and the task is success
if (taskInstance.getIsCache().equals(Flag.YES)) {
saveCacheTaskInstance(taskInstance);
}
if (!processInstance.isBlocked()) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
}
@ -1196,6 +1202,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
taskInstance.setCpuQuota(taskNode.getCpuQuota());
taskInstance.setMemoryMax(taskNode.getMemoryMax());
taskInstance.setIsCache(taskNode.getIsCache() == Flag.YES.getCode() ? Flag.YES : Flag.NO);
// task instance priority
if (taskNode.getTaskInstancePriority() == null) {
taskInstance.setTaskInstancePriority(Priority.MEDIUM);
@ -2125,6 +2133,19 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
errorTaskMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getKey())));
}
private void saveCacheTaskInstance(TaskInstance taskInstance) {
Pair<Integer, String> taskIdAndCacheKey = TaskCacheUtils.revertCacheKey(taskInstance.getCacheKey());
Integer taskId = taskIdAndCacheKey.getLeft();
if (taskId.equals(taskInstance.getId())) {
taskInstance.setCacheKey(taskIdAndCacheKey.getRight());
try {
taskInstanceDao.updateTaskInstance(taskInstance);
} catch (Exception e) {
logger.error("update task instance cache key failed", e);
}
}
}
private enum WorkflowRunnableStatus {
CREATED, INITIALIZE_DAG, INITIALIZE_QUEUE, STARTED,
;

112
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandlerTest.java

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Date;
import java.util.HashMap;
import java.util.Optional;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class TaskCacheEventHandlerTest {
@InjectMocks
private TaskCacheEventHandler taskCacheEventHandler;
@Mock
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Mock
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@Mock
private DataQualityResultOperator dataQualityResultOperator;
@Mock
private ProcessService processService;
@Mock
private TaskInstanceDao taskInstanceDao;
@Test
void testHandleTaskEvent() {
TaskEvent taskEvent = Mockito.mock(TaskEvent.class);
int processInstanceId = 1;
int taskInstanceId = 2;
int cacheTaskInstanceId = 3;
int cacheProcessInstanceId = 4;
Mockito.when(taskEvent.getTaskInstanceId()).thenReturn(taskInstanceId);
Mockito.when(taskEvent.getProcessInstanceId()).thenReturn(processInstanceId);
Mockito.when(taskEvent.getCacheTaskInstanceId()).thenReturn(cacheTaskInstanceId);
TaskInstance cacheTaskInstance = new TaskInstance();
cacheTaskInstance.setId(cacheTaskInstanceId);
cacheTaskInstance.setProcessInstanceId(cacheProcessInstanceId);
cacheTaskInstance.setTaskParams(JSONUtils.toJsonString(new HashMap<>()));
Mockito.when(taskInstanceDao.findTaskInstanceById(cacheTaskInstanceId)).thenReturn(cacheTaskInstance);
WorkflowExecuteRunnable workflowExecuteRunnable = Mockito.mock(WorkflowExecuteRunnable.class);
Mockito.when(processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId))
.thenReturn(workflowExecuteRunnable);
Optional<TaskInstance> taskInstanceOptional = Mockito.mock(Optional.class);
Mockito.when(workflowExecuteRunnable.getTaskInstance(taskInstanceId)).thenReturn(taskInstanceOptional);
Mockito.when(taskInstanceOptional.isPresent()).thenReturn(true);
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskParams(JSONUtils.toJsonString(new HashMap<>()));
taskInstance.setId(taskInstanceId);
taskInstance.setProcessInstanceId(processInstanceId);
taskInstance.setProcessInstanceName("test");
ProcessInstance processInstance = new ProcessInstance();
taskInstance.setProcessInstance(processInstance);
ProcessDefinition processDefinition = new ProcessDefinition();
taskInstance.setProcessDefine(processDefinition);
taskInstance.setSubmitTime(new Date());
Mockito.when(taskInstanceOptional.get()).thenReturn(taskInstance);
taskCacheEventHandler.handleTaskEvent(taskEvent);
Assertions.assertEquals(Flag.YES, taskInstance.getFlag());
Assertions.assertEquals(taskInstanceId, taskInstance.getId());
Assertions.assertEquals(processInstanceId, taskInstance.getProcessInstanceId());
}
}

10
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/model/TaskNode.java

@ -78,6 +78,8 @@ public class TaskNode {
*/
private String runFlag;
private int isCache;
/**
* the front field
*/
@ -278,6 +280,14 @@ public class TaskNode {
this.runFlag = runFlag;
}
public int getIsCache() {
return isCache;
}
public void setIsCache(int isCache) {
this.isCache = isCache;
}
public boolean isForbidden() {
// skip stream task when run DAG
if (taskExecuteType == TaskExecuteType.STREAM) {

1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -2328,6 +2328,7 @@ public class ProcessServiceImpl implements ProcessService {
taskNode.setCpuQuota(taskDefinitionLog.getCpuQuota());
taskNode.setMemoryMax(taskDefinitionLog.getMemoryMax());
taskNode.setTaskExecuteType(taskDefinitionLog.getTaskExecuteType());
taskNode.setIsCache(taskDefinitionLog.getIsCache().getCode());
taskNodeList.add(taskNode);
}
}

2
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.any;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.UserType;
@ -653,6 +654,7 @@ public class ProcessServiceTest {
taskDefinition.setVersion(2);
taskDefinition.setCreateTime(new Date());
taskDefinition.setUpdateTime(new Date());
taskDefinition.setIsCache(Flag.NO);
TaskDefinitionLog td2 = new TaskDefinitionLog();
td2.setCode(2L);

4
dolphinscheduler-ui/src/locales/en_US/project.ts

@ -277,7 +277,8 @@ export default {
alarm_group: 'Alarm group',
startup_parameter: 'Startup Parameter',
whether_dry_run: 'Whether Dry-Run',
please_choose: 'Please Choose'
please_choose: 'Please Choose',
remove_task_cache: 'Clear cache'
},
dag: {
create: 'Create Workflow',
@ -325,6 +326,7 @@ export default {
online: 'Online'
},
node: {
is_cache: "Cache Execution",
jvm_args: 'Java VM Parameters',
jvm_args_tips: 'Please enter virtual machine parameters',
run_type: 'Run Type',

4
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@ -278,7 +278,8 @@ export default {
alarm_group: '告警组',
startup_parameter: '启动参数',
whether_dry_run: '是否空跑',
please_choose: '请选择'
please_choose: '请选择',
remove_task_cache: '清除缓存'
},
dag: {
create: '创建工作流',
@ -326,6 +327,7 @@ export default {
online: '已上线'
},
node: {
is_cache: "缓存执行",
is_module_path: '使用模块路径',
run_type: '运行类型',
jvm_args: '虚拟机参数',

7
dolphinscheduler-ui/src/service/modules/task-instances/index.ts

@ -54,3 +54,10 @@ export function savePoint(projectCode: number, taskId: number): any {
method: 'post'
})
}
export function removeTaskInstanceCache(projectCode: number, taskId: number): any {
return axios({
url: `projects/${projectCode}/task-instances/${taskId}/remove-cache`,
method: 'delete'
})
}

1
dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts

@ -17,6 +17,7 @@
export { useName } from './use-name'
export { useRunFlag } from './use-run-flag'
export { useCache } from './use-cache'
export { useDescription } from './use-description'
export { useTaskPriority } from './use-task-priority'
export { useWorkerGroup } from './use-worker-group'

29
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-cache.ts

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { useI18n } from 'vue-i18n'
import type { IJsonItem } from '../types'
export function useCache(): IJsonItem {
const { t } = useI18n()
return {
type: 'switch',
field: 'isCache',
name: t('project.node.is_cache'),
span: 12
}
}

3
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-run-flag.ts

@ -34,6 +34,7 @@ export function useRunFlag(): IJsonItem {
type: 'radio',
field: 'flag',
name: t('project.node.run_flag'),
options: options
options: options,
span: 12
}
}

2
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -478,6 +478,7 @@ export function formatParams(data: INodeData): {
: '0',
failRetryTimes: data.failRetryTimes ? String(data.failRetryTimes) : '0',
flag: data.flag,
isCache: data.isCache ? "YES" : "NO",
name: data.name,
taskGroupId: data.taskGroupId,
taskGroupPriority: data.taskGroupPriority,
@ -526,6 +527,7 @@ export function formatModel(data: ITaskData) {
...omit(data.taskParams, ['resourceList', 'mainJar', 'localParams']),
environmentCode: data.environmentCode === -1 ? null : data.environmentCode,
timeoutFlag: data.timeoutFlag === 'OPEN',
isCache: data.isCache === 'YES',
timeoutNotifyStrategy: data.timeoutNotifyStrategy
? [data.timeoutNotifyStrategy]
: [],

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-chunjun.ts

@ -56,6 +56,7 @@ export function useChunjun({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-data-quality.ts

@ -65,6 +65,7 @@ export function useDataQuality({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

3
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datasync.ts

@ -65,6 +65,7 @@ export function useDatasync({
Fields.useName(from),
...extra,
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),
@ -79,4 +80,4 @@ export function useDatasync({
] as IJsonItem[],
model
}
}
}

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datax.ts

@ -59,6 +59,7 @@ export function useDataX({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dinky.ts

@ -51,6 +51,7 @@ export function useDinky({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dms.ts

@ -66,6 +66,7 @@ export function useDms({
Fields.useName(from),
...extra,
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dvc.ts

@ -52,6 +52,7 @@ export function useDvc({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts

@ -53,6 +53,7 @@ export function useEmr({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink-stream.ts

@ -61,6 +61,7 @@ export function useFlinkStream({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink.ts

@ -61,6 +61,7 @@ export function useFlink({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-hive-cli.ts

@ -65,6 +65,7 @@ export function useHiveCli({
Fields.useName(from),
...extra,
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-http.ts

@ -58,6 +58,7 @@ export function useHttp({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-java.ts

@ -73,6 +73,7 @@ export function useJava({
Fields.useName(from),
...extra,
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-jupyter.ts

@ -53,6 +53,7 @@ export function useJupyter({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts

@ -52,6 +52,7 @@ export function useK8s({
Fields.useName(),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-kubeflow.ts

@ -51,6 +51,7 @@ export function useKubeflow({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-linkis.ts

@ -62,6 +62,7 @@ export function useLinkis({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-mlflow.ts

@ -59,6 +59,7 @@ export function useMlflow({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-mr.ts

@ -52,6 +52,7 @@ export function useMr({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-openmldb.ts

@ -55,6 +55,7 @@ export function useOpenmldb({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-pigeon.ts

@ -51,6 +51,7 @@ export function usePigeon({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-procedure.ts

@ -55,6 +55,7 @@ export function useProcedure({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-python.ts

@ -55,6 +55,7 @@ export function usePython({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-pytorch.ts

@ -70,6 +70,7 @@ export function usePytorch({
Fields.useName(from),
...extra,
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sagemaker.ts

@ -51,6 +51,7 @@ export function userSagemaker({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sea-tunnel.ts

@ -82,6 +82,7 @@ export function useSeaTunnel({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-shell.ts

@ -54,6 +54,7 @@ export function useShell({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts

@ -59,6 +59,7 @@ export function useSpark({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts

@ -59,6 +59,7 @@ export function useSql({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sqoop.ts

@ -74,6 +74,7 @@ export function useSqoop({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

1
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-zeppelin.ts

@ -51,6 +51,7 @@ export function useZeppelin({
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useCache(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),

4
dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

@ -434,6 +434,7 @@ interface INodeData
cpuQuota?: number
memoryMax?: number
flag?: 'YES' | 'NO'
isCache?: boolean
taskGroupId?: number
taskGroupPriority?: number
taskPriority?: string
@ -465,10 +466,11 @@ interface INodeData
interface ITaskData
extends Omit<
INodeData,
'timeoutFlag' | 'taskPriority' | 'timeoutNotifyStrategy'
'isCache' | 'timeoutFlag' | 'taskPriority' | 'timeoutNotifyStrategy'
> {
name?: string
taskPriority?: string
isCache?: "YES" | "NO"
timeoutFlag?: 'OPEN' | 'CLOSE'
timeoutNotifyStrategy?: string | []
taskParams?: ITaskParams

31
dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-context-menu.tsx

@ -63,7 +63,7 @@ const props = {
export default defineComponent({
name: 'dag-context-menu',
props,
emits: ['hide', 'start', 'edit', 'viewLog', 'copyTask', 'removeTasks', 'executeTask'],
emits: ['hide', 'start', 'edit', 'viewLog', 'copyTask', 'removeTasks', 'executeTask', 'removeTaskInstanceCache'],
setup(props, ctx) {
const graph = inject('graph', ref())
const route = useRoute()
@ -103,6 +103,12 @@ export default defineComponent({
}
}
const handleRemoveTaskInstanceCache = () => {
if (props.taskInstance) {
ctx.emit('removeTaskInstanceCache', props.taskInstance.id)
}
}
const handleCopy = () => {
const genNums = 1
const type = props.cell?.data.taskType
@ -138,7 +144,8 @@ export default defineComponent({
handleViewLog,
handleExecuteTaskOnly,
handleExecuteTaskPOST,
handleExecuteTaskPRE
handleExecuteTaskPRE,
handleRemoveTaskInstanceCache
}
},
render() {
@ -181,12 +188,20 @@ export default defineComponent({
</>
)}
{this.taskInstance && (
<NButton
class={`${styles['menu-item']}`}
onClick={this.handleViewLog}
>
{t('project.node.view_log')}
</NButton>
<>
<NButton
class={`${styles['menu-item']}`}
onClick={this.handleViewLog}
>
{t('project.node.view_log')}
</NButton>
<NButton
class={`${styles['menu-item']}`}
onClick={this.handleRemoveTaskInstanceCache}
>
{t('project.task.remove_task_cache')}
</NButton>
</>
)}
{this.executeTaskDisplay && (
<>

8
dolphinscheduler-ui/src/views/projects/workflow/components/dag/index.tsx

@ -56,6 +56,7 @@ import { useAsyncState } from '@vueuse/core'
import utils from '@/utils'
import { useUISettingStore } from '@/store/ui-setting/ui-setting'
import { executeTask } from '@/service/modules/executors'
import { removeTaskInstanceCache } from '@/service/modules/task-instances'
const props = {
// If this prop is passed, it means from definition detail
@ -305,6 +306,12 @@ export default defineComponent({
})
}
const handleRemoveTaskInstanceCache = (taskId: number) => {
removeTaskInstanceCache(props.projectCode, taskId).then(() => {
window.$message.success(t('project.workflow.success'))
})
}
const downloadLogs = () => {
utils.downloadFile('log/download-log', {
taskInstanceId: nodeVariables.logTaskId
@ -418,6 +425,7 @@ export default defineComponent({
onRemoveTasks={removeTasks}
onViewLog={handleViewLog}
onExecuteTask={handleExecuteTask}
onRemoveTaskInstanceCache={handleRemoveTaskInstanceCache}
/>
{!!props.definition && (
<StartModal

Loading…
Cancel
Save