From 66e20271ad04c571dbd05c114ccbc050ee128dfb Mon Sep 17 00:00:00 2001 From: JieguangZhou Date: Sun, 18 Dec 2022 18:17:09 +0800 Subject: [PATCH] [Feature][Master] Add task caching mechanism to improve the running speed of repetitive tasks (#13194) * Supports task instance cache operation * add task plugin cache * use SHA-256 to generate key * Update dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql Co-authored-by: Jay Chung * Update dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql Co-authored-by: Jay Chung * Optimizing database Scripts * Optimize clear cache operation Co-authored-by: Jay Chung --- docs/docs/en/faq.md | 15 ++ docs/docs/en/guide/task/appendix.md | 1 + docs/docs/zh/faq.md | 15 ++ docs/docs/zh/guide/task/appendix.md | 1 + .../controller/TaskInstanceController.java | 25 +++ .../TaskInstanceRemoveCacheResponse.java | 44 +++++ .../dolphinscheduler/api/enums/Status.java | 3 + .../api/service/TaskInstanceService.java | 10 + .../service/impl/TaskInstanceServiceImpl.java | 36 ++++ .../api/service/TaskInstanceServiceTest.java | 31 ++++ .../dolphinscheduler/common/enums/Flag.java | 1 + .../common/enums/TaskEventType.java | 3 +- .../dao/entity/TaskDefinition.java | 6 + .../dao/entity/TaskDefinitionLog.java | 1 + .../dao/entity/TaskInstance.java | 13 ++ .../dao/mapper/TaskInstanceMapper.java | 4 + .../dao/repository/TaskInstanceDao.java | 14 ++ .../repository/impl/TaskInstanceDaoImpl.java | 20 ++ .../dao/utils/TaskCacheUtils.java | 161 ++++++++++++++++ .../dao/mapper/TaskDefinitionLogMapper.xml | 6 +- .../dao/mapper/TaskDefinitionMapper.xml | 8 +- .../dao/mapper/TaskInstanceMapper.xml | 16 +- .../resources/sql/dolphinscheduler_h2.sql | 4 + .../resources/sql/dolphinscheduler_mysql.sql | 7 +- .../sql/dolphinscheduler_postgresql.sql | 5 + .../mysql/dolphinscheduler_ddl.sql | 98 ++++++++++ .../postgresql/dolphinscheduler_ddl.sql | 14 ++ .../dao/utils/TaskCacheUtilsTest.java | 172 ++++++++++++++++++ .../consumer/TaskPriorityQueueConsumer.java | 49 +++++ .../master/event/TaskCacheEventHandler.java | 109 +++++++++++ .../master/processor/queue/TaskEvent.java | 11 ++ .../runner/WorkflowExecuteRunnable.java | 21 +++ .../event/TaskCacheEventHandlerTest.java | 112 ++++++++++++ .../service/model/TaskNode.java | 10 + .../service/process/ProcessServiceImpl.java | 1 + .../service/process/ProcessServiceTest.java | 2 + .../src/locales/en_US/project.ts | 4 +- .../src/locales/zh_CN/project.ts | 4 +- .../service/modules/task-instances/index.ts | 7 + .../task/components/node/fields/index.ts | 1 + .../task/components/node/fields/use-cache.ts | 29 +++ .../components/node/fields/use-run-flag.ts | 3 +- .../task/components/node/format-data.ts | 2 + .../task/components/node/tasks/use-chunjun.ts | 1 + .../components/node/tasks/use-data-quality.ts | 1 + .../components/node/tasks/use-datasync.ts | 3 +- .../task/components/node/tasks/use-datax.ts | 1 + .../task/components/node/tasks/use-dinky.ts | 1 + .../task/components/node/tasks/use-dms.ts | 1 + .../task/components/node/tasks/use-dvc.ts | 1 + .../task/components/node/tasks/use-emr.ts | 1 + .../components/node/tasks/use-flink-stream.ts | 1 + .../task/components/node/tasks/use-flink.ts | 1 + .../components/node/tasks/use-hive-cli.ts | 1 + .../task/components/node/tasks/use-http.ts | 1 + .../task/components/node/tasks/use-java.ts | 1 + .../task/components/node/tasks/use-jupyter.ts | 1 + .../task/components/node/tasks/use-k8s.ts | 1 + .../components/node/tasks/use-kubeflow.ts | 1 + .../task/components/node/tasks/use-linkis.ts | 1 + .../task/components/node/tasks/use-mlflow.ts | 1 + .../task/components/node/tasks/use-mr.ts | 1 + .../components/node/tasks/use-openmldb.ts | 1 + .../task/components/node/tasks/use-pigeon.ts | 1 + .../components/node/tasks/use-procedure.ts | 1 + .../task/components/node/tasks/use-python.ts | 1 + .../task/components/node/tasks/use-pytorch.ts | 1 + .../components/node/tasks/use-sagemaker.ts | 1 + .../components/node/tasks/use-sea-tunnel.ts | 1 + .../task/components/node/tasks/use-shell.ts | 1 + .../task/components/node/tasks/use-spark.ts | 1 + .../task/components/node/tasks/use-sql.ts | 1 + .../task/components/node/tasks/use-sqoop.ts | 1 + .../components/node/tasks/use-zeppelin.ts | 1 + .../projects/task/components/node/types.ts | 4 +- .../components/dag/dag-context-menu.tsx | 31 +++- .../workflow/components/dag/index.tsx | 8 + 77 files changed, 1151 insertions(+), 24 deletions(-) create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskInstance/TaskInstanceRemoveCacheResponse.java create mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java create mode 100644 dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtilsTest.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandler.java create mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandlerTest.java create mode 100644 dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-cache.ts diff --git a/docs/docs/en/faq.md b/docs/docs/en/faq.md index 93d0710156..042b3d7fa2 100644 --- a/docs/docs/en/faq.md +++ b/docs/docs/en/faq.md @@ -752,4 +752,19 @@ start API server. If you want disabled when Python gateway service you could cha --- +## Q:How to determine whether a task has been cached when the cache is executed, that is, how to determine whether a task can use the running result of another task? + +A: For the task identified as `Cache Execution`, when the task starts, a cache key will be generated, and the key is composed of the following fields and hashed: + +- task definition: the id of the task definition corresponding to the task instance +- task version: the version of the task definition corresponding to the task instance +- task input parameters: including the parameters passed in by the upstream node and the global parameter, the parameters referenced by the parameter list of the task definition and the parameters used by the task definition using `${}` +- environment configuration: the actual configuration content of the environment configuration under the environment name, that is, the actual configuration content in the `security` - `environment management` + +If the task with cache identification runs, it will find whether there is data with the same cache key in the database, +- If there is, copy the task instance and update the corresponding data +- If not, the task runs as usual, and the task instance data is stored in the cache when the task is completed + +If you do not need to cache, you can right-click the node to run `Clear cache` in the workflow instance to clear the cache, which will clear the cache data of the current input parameters under this version. + We will collect more FAQ later diff --git a/docs/docs/en/guide/task/appendix.md b/docs/docs/en/guide/task/appendix.md index 57cb5dbc31..4569216617 100644 --- a/docs/docs/en/guide/task/appendix.md +++ b/docs/docs/en/guide/task/appendix.md @@ -8,6 +8,7 @@ DolphinScheduler task plugins share some common default parameters. Each type of |--------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | Node Name | The name of the task. Node names within the same workflow must be unique. | | Run Flag | Indicating whether to schedule the task. If you do not need to execute the task, you can turn on the `Prohibition execution` switch. | +| Cache Execution | Indicating whether this node needs to be cached. If it is cached, the same identifier (same task version, same task definition, same parameter input) task is cached. When the task has been cached, it will not be executed again, and the result will be reused directly. | | Description | Describing the function of this node. | | Task Priority | When the number of the worker threads is insufficient, the worker executes task according to the priority. When two tasks have the same priority, the worker will execute them in `first come first served` fashion. | | Worker Group | Machines which execute the tasks. If you choose `default`, scheduler will send the task to a random worker. | diff --git a/docs/docs/zh/faq.md b/docs/docs/zh/faq.md index 9461f275cf..294762877d 100644 --- a/docs/docs/zh/faq.md +++ b/docs/docs/zh/faq.md @@ -720,4 +720,19 @@ A:在 3.0.0-alpha 版本之后,Python gateway server 集成到 api server --- +## Q: 缓存执行时怎么判断任务已经存在缓存过的任务,即如何判断一个任务可以使用另外一个任务的运行结果? + +A: 对于标识为`缓存执行`的任务, 当任务启动时会生成一个缓存key, 该key由以下字段组合哈希得到: + +- 任务定义:任务实例对应的任务定义的id +- 任务的版本:任务实例对应的任务定义的版本 +- 任务输入的参数:包括上游节点和全局参数传入的参数中,被任务定义的参数列表所引用和任务定义中使用`${}`引用的参数 +- 环境配置: 环境名称下具体的环境配置内容,具体为安全中心环境管理中的实际配置内容 + +当缓存标识的任务运行时,会查找数据库中是否用相同缓存key的数据, +- 若有则复制该任务实例并进行相应数据的更新 +- 若无,则任务照常运行,并在任务完成时将任务实例的数据存入缓存 + +若不需要缓存时,可以在工作流实例中右键运行清除缓存,则会清除该版本下当前输入的参数的缓存数据。 + 我们会持续收集更多的 FAQ。 diff --git a/docs/docs/zh/guide/task/appendix.md b/docs/docs/zh/guide/task/appendix.md index 3966e79ea7..6ba27ad38b 100644 --- a/docs/docs/zh/guide/task/appendix.md +++ b/docs/docs/zh/guide/task/appendix.md @@ -8,6 +8,7 @@ |----------|--------------------------------------------------------------------------------------------------------------------------------------| | 任务名称 | 任务的名称,同一个工作流定义中的节点名称不能重复。 | | 运行标志 | 标识这个节点是否需要调度执行,如果不需要执行,可以打开禁止执行开关。 | +| 缓存执行 | 标识这个节点是否需要进行缓存,如果缓存,则对于相同标识(相同任务版本,相同任务定义,相同参数传入)的任务进行缓存,运行时若已经存在缓存过的任务时,不在重复执行,直接复用结果。 | | 描述 | 当前节点的功能描述。 | | 任务优先级 | worker线程数不足时,根据优先级从高到低依次执行任务,优先级一样时根据先到先得原则执行。 | | Worker分组 | 设置分组后,任务会被分配给worker组的机器机执行。若选择Default,则会随机选择一个worker执行。 | diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java index b9fabb0f32..af23a4ce0b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java @@ -19,10 +19,12 @@ package org.apache.dolphinscheduler.api.controller; import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.REMOVE_TASK_INSTANCE_CACHE_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.TASK_SAVEPOINT_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.TASK_STOP_ERROR; import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; +import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse; import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.TaskInstanceService; import org.apache.dolphinscheduler.api.utils.Result; @@ -34,6 +36,7 @@ import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; @@ -188,4 +191,26 @@ public class TaskInstanceController extends BaseController { @PathVariable(value = "id") Integer id) { return taskInstanceService.stopTask(loginUser, projectCode, id); } + + /** + * remove task instance cache + * + * @param loginUser login user + * @param projectCode project code + * @param id task instance id + * @return the result code and msg + */ + @Operation(summary = "remove-task-instance-cache", description = "REMOVE_TASK_INSTANCE_CACHE") + @Parameters({ + @Parameter(name = "id", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "12")) + }) + @DeleteMapping(value = "/{id}/remove-cache") + @ResponseStatus(HttpStatus.OK) + @ApiException(REMOVE_TASK_INSTANCE_CACHE_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public TaskInstanceRemoveCacheResponse removeTaskInstanceCache(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @PathVariable(value = "id") Integer id) { + return taskInstanceService.removeTaskInstanceCache(loginUser, projectCode, id); + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskInstance/TaskInstanceRemoveCacheResponse.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskInstance/TaskInstanceRemoveCacheResponse.java new file mode 100644 index 0000000000..34a8218304 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskInstance/TaskInstanceRemoveCacheResponse.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.dto.taskInstance; + +import org.apache.dolphinscheduler.api.utils.Result; + +import lombok.Data; + +/** + * task instance success response + */ +@Data +public class TaskInstanceRemoveCacheResponse extends Result { + + private String cacheKey; + + public TaskInstanceRemoveCacheResponse(Result result) { + super(); + this.setCode(result.getCode()); + this.setMsg(result.getMsg()); + } + + public TaskInstanceRemoveCacheResponse(Result result, String cacheKey) { + super(); + this.setCode(result.getCode()); + this.setMsg(result.getMsg()); + this.cacheKey = cacheKey; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 0f1cb95b47..cb1941b2a0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -278,6 +278,7 @@ public enum Status { UDF_RESOURCE_IS_BOUND(20013, "udf resource file is bound by UDF functions:{0}", "udf函数绑定了资源文件[{0}]"), RESOURCE_IS_USED(20014, "resource file is used by process definition", "资源文件被上线的流程定义使用了"), PARENT_RESOURCE_NOT_EXIST(20015, "parent resource not exist", "父资源文件不存在"), + RESOURCE_NOT_EXIST_OR_NO_PERMISSION(20016, "resource not exist or no permission,please view the task node and remove error resource", "请检查任务节点并移除无权限或者已删除的资源"), @@ -285,6 +286,8 @@ public enum Status { "资源文件已授权其他用户[{0}],后缀不允许修改"), RESOURCE_HAS_FOLDER(20018, "There are files or folders in the current directory:{0}", "当前目录下有文件或文件夹[{0}]"), + REMOVE_TASK_INSTANCE_CACHE_ERROR(20019, "remove task instance cache error", "删除任务实例缓存错误"), + USER_NO_OPERATION_PERM(30001, "user has no operation privilege", "当前用户没有操作权限"), USER_NO_OPERATION_PROJECT_PERM(30002, "user {0} is not has project {1} permission", "当前用户[{0}]没有[{1}]项目的操作权限"), USER_NO_WRITE_PROJECT_PERM(30003, "user [{0}] does not have write permission for project [{1}]", diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java index 9fa259eb2b..ed23d3695e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.api.service; +import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.enums.TaskExecuteType; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -100,4 +101,13 @@ public interface TaskInstanceService { * @return the result code and msg */ TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long taskInstanceId); + + /** + * remove task instance cache + * @param loginUser + * @param projectCode + * @param taskInstanceId + * @return + */ + TaskInstanceRemoveCacheResponse removeTaskInstanceCache(User loginUser, long projectCode, Integer taskInstanceId); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index a212cb8299..1d4f9b64de 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -18,8 +18,10 @@ package org.apache.dolphinscheduler.api.service.impl; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE; +import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.ProcessInstanceService; import org.apache.dolphinscheduler.api.service.ProjectService; @@ -38,6 +40,8 @@ import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.command.TaskSavePointRequestCommand; @@ -45,6 +49,9 @@ import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; + import java.util.Date; import java.util.HashSet; import java.util.List; @@ -81,6 +88,9 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst @Autowired TaskInstanceMapper taskInstanceMapper; + @Autowired + TaskInstanceDao taskInstanceDao; + @Autowired ProcessInstanceService processInstanceService; @@ -319,4 +329,30 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst } return taskInstance; } + + @Override + public TaskInstanceRemoveCacheResponse removeTaskInstanceCache(User loginUser, long projectCode, + Integer taskInstanceId) { + Result result = new Result(); + + Project project = projectMapper.queryByCode(projectCode); + projectService.checkProjectAndAuthThrowException(loginUser, project, INSTANCE_UPDATE); + + TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstanceId); + if (taskInstance == null) { + logger.error("Task definition can not be found, projectCode:{}, taskInstanceId:{}.", projectCode, + taskInstanceId); + putMsg(result, Status.TASK_INSTANCE_NOT_FOUND); + return new TaskInstanceRemoveCacheResponse(result); + } + String tagCacheKey = taskInstance.getCacheKey(); + Pair taskIdAndCacheKey = TaskCacheUtils.revertCacheKey(tagCacheKey); + String cacheKey = taskIdAndCacheKey.getRight(); + if (StringUtils.isNotEmpty(cacheKey)) { + taskInstanceDao.clearCacheByCacheKey(cacheKey); + } + putMsg(result, Status.SUCCESS); + return new TaskInstanceRemoveCacheResponse(result, cacheKey); + } + } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java index 0482ab7ef4..89de9b6643 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java @@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; import org.apache.dolphinscheduler.api.ApiApplicationServer; +import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.api.service.impl.TaskInstanceServiceImpl; @@ -40,6 +41,7 @@ import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -93,6 +95,9 @@ public class TaskInstanceServiceTest { @Mock TaskDefinitionMapper taskDefinitionMapper; + @Mock + TaskInstanceDao taskInstanceDao; + @Test public void queryTaskListPaging() { long projectCode = 1L; @@ -341,4 +346,30 @@ public class TaskInstanceServiceTest { Assertions.assertEquals(Status.SUCCESS.getCode(), successRes.getCode().intValue()); } + + @Test + public void testRemoveTaskInstanceCache() { + User user = getAdminUser(); + long projectCode = 1L; + Project project = getProject(projectCode); + int taskId = 1; + TaskInstance task = getTaskInstance(); + String cacheKey = "950311f3597f9198976cd3fd69e208e5b9ba6750"; + task.setCacheKey(cacheKey); + + when(projectMapper.queryByCode(projectCode)).thenReturn(project); + when(taskInstanceMapper.selectById(1)).thenReturn(task); + when(taskInstanceDao.findTaskInstanceByCacheKey(cacheKey)).thenReturn(task, null); + when(taskInstanceDao.updateTaskInstance(task)).thenReturn(true); + + TaskInstanceRemoveCacheResponse response = + taskInstanceService.removeTaskInstanceCache(user, projectCode, taskId); + Assertions.assertEquals(Status.SUCCESS.getCode(), response.getCode()); + + when(taskInstanceMapper.selectById(1)).thenReturn(null); + TaskInstanceRemoveCacheResponse responseNotFoundTask = + taskInstanceService.removeTaskInstanceCache(user, projectCode, taskId); + Assertions.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), responseNotFoundTask.getCode()); + + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java index 80c9cae999..a080a6c826 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java @@ -26,6 +26,7 @@ import com.baomidou.mybatisplus.annotation.EnumValue; * have_arr_variables * have_map_variables * have_alert + * is_cache */ public enum Flag { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java index 09f85d3f17..f24f168679 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java @@ -22,5 +22,6 @@ public enum TaskEventType { DELAY, RUNNING, RESULT, - WORKER_REJECT + WORKER_REJECT, + CACHE, } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java index 3da375dd0b..18c3a405a7 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java @@ -115,6 +115,11 @@ public class TaskDefinition { */ private Flag flag; + /** + * task is cache: yes/no + */ + private Flag isCache; + /** * task priority */ @@ -281,6 +286,7 @@ public class TaskDefinition { && Objects.equals(taskType, that.taskType) && Objects.equals(taskParams, that.taskParams) && flag == that.flag + && isCache == that.isCache && taskPriority == that.taskPriority && Objects.equals(workerGroup, that.workerGroup) && timeoutFlag == that.timeoutFlag diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java index 3701956612..a42f593a9f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java @@ -68,6 +68,7 @@ public class TaskDefinitionLog extends TaskDefinition { this.setFailRetryInterval(taskDefinition.getFailRetryInterval()); this.setFailRetryTimes(taskDefinition.getFailRetryTimes()); this.setFlag(taskDefinition.getFlag()); + this.setIsCache(taskDefinition.getIsCache()); this.setModifyBy(taskDefinition.getModifyBy()); this.setCpuQuota(taskDefinition.getCpuQuota()); this.setMemoryMax(taskDefinition.getMemoryMax()); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 7f9c0b7e5f..fbef6cc9fe 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -40,6 +40,7 @@ import java.util.Map; import lombok.Data; +import com.baomidou.mybatisplus.annotation.FieldStrategy; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; @@ -187,6 +188,17 @@ public class TaskInstance implements Serializable { */ private Flag flag; + /** + * task is cache: yes/no + */ + private Flag isCache; + + /** + * cache_key + */ + @TableField(updateStrategy = FieldStrategy.IGNORED) + private String cacheKey; + /** * dependency */ @@ -409,4 +421,5 @@ public class TaskInstance implements Serializable { // task retry does not over time, return false return getRetryInterval() * SEC_2_MINUTES_TIME_UNIT < failedTimeInterval; } + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java index faa481e294..14f354f11e 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java @@ -57,6 +57,10 @@ public interface TaskInstanceMapper extends BaseMapper { TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int processInstanceId, @Param("taskCode") Long taskCode); + TaskInstance queryByCacheKey(@Param("cacheKey") String cacheKey); + + Boolean clearCacheByCacheKey(@Param("cacheKey") String cacheKey); + List queryByProcessInstanceIdsAndTaskCodes(@Param("processInstanceIds") List processInstanceIds, @Param("taskCodes") List taskCodes); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java index 11b53bbeeb..8537e86ba1 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java @@ -88,6 +88,20 @@ public interface TaskInstanceDao { */ TaskInstance findTaskInstanceById(Integer taskId); + /** + * find task instance by cache_key + * @param cacheKey cache key + * @return task instance + */ + TaskInstance findTaskInstanceByCacheKey(String cacheKey); + + /** + * clear task instance cache by cache_key + * @param cacheKey cache key + * @return task instance + */ + Boolean clearCacheByCacheKey(String cacheKey); + /** * find task instance list by id list * @param idList task id list diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java index a68a0953fd..7566e17783 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.Date; @@ -164,6 +165,25 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao { return taskInstanceMapper.selectById(taskId); } + @Override + public TaskInstance findTaskInstanceByCacheKey(String cacheKey) { + if (StringUtils.isEmpty(cacheKey)) { + return null; + } + return taskInstanceMapper.queryByCacheKey(cacheKey); + } + + @Override + public Boolean clearCacheByCacheKey(String cacheKey) { + try { + taskInstanceMapper.clearCacheByCacheKey(cacheKey); + return true; + } catch (Exception e) { + logger.error("clear cache by cacheKey failed", e); + return false; + } + } + @Override public List findTaskInstanceByIdList(List idList) { if (CollectionUtils.isEmpty(idList)) { diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java new file mode 100644 index 0000000000..207756af77 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.utils; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.databind.JsonNode; + +public class TaskCacheUtils { + + private TaskCacheUtils() { + throw new IllegalStateException("Utility class"); + } + + public static final String MERGE_TAG = "-"; + + /** + * generate cache key for task instance + * the follow message will be used to generate cache key + * 2. task version + * 3. task is cache + * 4. input VarPool, from upstream task and workflow global parameters + * @param taskInstance task instance + * @param taskExecutionContext taskExecutionContext + * @return cache key + */ + public static String generateCacheKey(TaskInstance taskInstance, TaskExecutionContext taskExecutionContext) { + List keyElements = new ArrayList<>(); + keyElements.add(String.valueOf(taskInstance.getTaskCode())); + keyElements.add(String.valueOf(taskInstance.getTaskDefinitionVersion())); + keyElements.add(String.valueOf(taskInstance.getIsCache().getCode())); + keyElements.add(String.valueOf(taskInstance.getEnvironmentConfig())); + keyElements.add(getTaskInputVarPoolData(taskInstance, taskExecutionContext)); + String data = StringUtils.join(keyElements, "_"); + return DigestUtils.sha256Hex(data); + } + + /** + * generate cache key for task instance which is cache execute + * this key will record which cache task instance will be copied, and cache key will be used + * tagCacheKey = sourceTaskId + "-" + cacheKey + * @param sourceTaskId source task id + * @param cacheKey cache key + * @return tagCacheKey + */ + public static String generateTagCacheKey(Integer sourceTaskId, String cacheKey) { + return sourceTaskId + MERGE_TAG + cacheKey; + } + + /** + * revert cache key tag to source task id and cache key + * @param tagCacheKey cache key + * @return Pair, first is source task id, second is cache key + */ + public static Pair revertCacheKey(String tagCacheKey) { + Pair taskIdAndCacheKey; + if (tagCacheKey == null) { + taskIdAndCacheKey = Pair.of(-1, ""); + return taskIdAndCacheKey; + } + if (tagCacheKey.contains(MERGE_TAG)) { + String[] split = tagCacheKey.split(MERGE_TAG); + if (split.length == 2) { + taskIdAndCacheKey = Pair.of(Integer.parseInt(split[0]), split[1]); + } else { + taskIdAndCacheKey = Pair.of(-1, ""); + } + return taskIdAndCacheKey; + } else { + return Pair.of(-1, tagCacheKey); + } + } + + /** + * get hash data of task input var pool + * there are two parts of task input var pool: from upstream task and workflow global parameters + * @param taskInstance task instance + * taskExecutionContext taskExecutionContext + */ + public static String getTaskInputVarPoolData(TaskInstance taskInstance, TaskExecutionContext context) { + JsonNode taskParams = JSONUtils.parseObject(taskInstance.getTaskParams()); + + // The set of input values considered from localParams in the taskParams + Set propertyInSet = JSONUtils.toList(taskParams.get("localParams").toString(), Property.class).stream() + .filter(property -> property.getDirect().equals(Direct.IN)) + .map(Property::getProp).collect(Collectors.toSet()); + + // The set of input values considered from `${var}` form task definition + propertyInSet.addAll(getScriptVarInSet(taskInstance)); + + // var pool value from upstream task + List varPool = JSONUtils.toList(taskInstance.getVarPool(), Property.class); + + // var pool value from workflow global parameters + if (context.getPrepareParamsMap() != null) { + Set taskVarPoolSet = varPool.stream().map(Property::getProp).collect(Collectors.toSet()); + List globalContextVarPool = context.getPrepareParamsMap().entrySet().stream() + .filter(entry -> !taskVarPoolSet.contains(entry.getKey())) + .map(Map.Entry::getValue) + .collect(Collectors.toList()); + varPool.addAll(globalContextVarPool); + } + + // only consider var pool value which is in propertyInSet + varPool = varPool.stream() + .filter(property -> property.getDirect().equals(Direct.IN)) + .filter(property -> propertyInSet.contains(property.getProp())) + .sorted(Comparator.comparing(Property::getProp)) + .collect(Collectors.toList()); + return JSONUtils.toJsonString(varPool); + } + + /** + * get var in set from task definition + * @param taskInstance task instance + * @return var in set + */ + public static List getScriptVarInSet(TaskInstance taskInstance) { + Pattern pattern = Pattern.compile("\\$\\{(.+?)\\}"); + Matcher matcher = pattern.matcher(taskInstance.getTaskParams()); + + List varInSet = new ArrayList<>(); + while (matcher.find()) { + varInSet.add(matcher.group(1)); + } + return varInSet; + } + +} diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml index d914f9db08..756213a336 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml @@ -19,7 +19,7 @@ - id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority, + id, code, name, version, description, project_code, user_id, task_type, task_params, flag, is_cache, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, operator, operate_time, create_time, update_time, task_group_id, task_group_priority, cpu_quota, memory_max, task_execute_type @@ -50,14 +50,14 @@ insert into t_ds_task_definition_log (code, name, version, description, project_code, user_id, - task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval, + task_type, task_params, flag, is_cache, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, operator, operate_time, create_time, update_time, task_group_id, task_group_priority, cpu_quota, memory_max, task_execute_type) values (#{taskDefinitionLog.code},#{taskDefinitionLog.name},#{taskDefinitionLog.version},#{taskDefinitionLog.description}, #{taskDefinitionLog.projectCode},#{taskDefinitionLog.userId},#{taskDefinitionLog.taskType},#{taskDefinitionLog.taskParams}, - #{taskDefinitionLog.flag},#{taskDefinitionLog.taskPriority},#{taskDefinitionLog.workerGroup},#{taskDefinitionLog.environmentCode}, + #{taskDefinitionLog.flag},#{taskDefinitionLog.isCache},#{taskDefinitionLog.taskPriority},#{taskDefinitionLog.workerGroup},#{taskDefinitionLog.environmentCode}, #{taskDefinitionLog.failRetryTimes},#{taskDefinitionLog.failRetryInterval},#{taskDefinitionLog.timeoutFlag},#{taskDefinitionLog.timeoutNotifyStrategy}, #{taskDefinitionLog.timeout},#{taskDefinitionLog.delayTime},#{taskDefinitionLog.resourceIds},#{taskDefinitionLog.operator},#{taskDefinitionLog.operateTime}, #{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime}, #{taskDefinitionLog.taskGroupId},#{taskDefinitionLog.taskGroupPriority}, diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml index a4040bd698..fbc9c793df 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml @@ -19,13 +19,13 @@ - id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority, + id, code, name, version, description, project_code, user_id, task_type, task_params, flag, is_cache, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, create_time, update_time, task_group_id,task_group_priority, cpu_quota, memory_max, task_execute_type ${alias}.id, ${alias}.code, ${alias}.name, ${alias}.version, ${alias}.description, ${alias}.project_code, ${alias}.user_id, - ${alias}.task_type, ${alias}.task_params, ${alias}.flag, ${alias}.task_priority, ${alias}.worker_group, ${alias}.environment_code, + ${alias}.task_type, ${alias}.task_params, ${alias}.flag, ${alias}.is_cache, ${alias}.task_priority, ${alias}.worker_group, ${alias}.environment_code, ${alias}.fail_retry_times, ${alias}.fail_retry_interval, ${alias}.timeout_flag, ${alias}.timeout_notify_strategy, ${alias}.timeout, ${alias}.delay_time, ${alias}.resource_ids, ${alias}.create_time, ${alias}.update_time, ${alias}.task_group_id, ${alias}.task_group_priority, ${alias}.cpu_quota, ${alias}.memory_max, ${alias}.task_execute_type @@ -96,12 +96,12 @@ insert into t_ds_task_definition (code, name, version, description, project_code, user_id, - task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval, + task_type, task_params, flag, is_cache, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, create_time, update_time,task_group_id, task_execute_type) values (#{taskDefinition.code},#{taskDefinition.name},#{taskDefinition.version},#{taskDefinition.description}, - #{taskDefinition.projectCode},#{taskDefinition.userId},#{taskDefinition.taskType},#{taskDefinition.taskParams},#{taskDefinition.flag}, + #{taskDefinition.projectCode},#{taskDefinition.userId},#{taskDefinition.taskType},#{taskDefinition.taskParams},#{taskDefinition.flag},#{taskDefinition.isCache}, #{taskDefinition.taskPriority},#{taskDefinition.workerGroup},#{taskDefinition.environmentCode},#{taskDefinition.failRetryTimes}, #{taskDefinition.failRetryInterval},#{taskDefinition.timeoutFlag},#{taskDefinition.timeoutNotifyStrategy},#{taskDefinition.timeout}, #{taskDefinition.delayTime},#{taskDefinition.resourceIds},#{taskDefinition.createTime},#{taskDefinition.updateTime}, diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index fc454a6841..306bb7f49a 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -21,13 +21,13 @@ id, name, task_type, process_instance_id, task_code, task_definition_version, state, submit_time, start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link, - flag, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id, + flag, is_cache, cache_key, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id, first_submit_time, delay_time, task_params, var_pool, dry_run, test_flag, task_group_id, cpu_quota, memory_max, task_execute_type ${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time, ${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link, - ${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id, + ${alias}.flag, ${alias}.is_cache, ${alias}.cache_key, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id, ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run, ${alias}.test_flag, ${alias}.task_group_id, ${alias}.task_execute_type @@ -200,6 +200,18 @@ and flag = 1 limit 1 + + + update t_ds_task_instance + set cache_key = null + where cache_key = #{cacheKey} +