From 8be32d4145c851a01d7300cd5a854be9b4a27055 Mon Sep 17 00:00:00 2001 From: qianli2022 <97489722+qianli2022@users.noreply.github.com> Date: Wed, 18 Jan 2023 17:58:32 +0800 Subject: [PATCH] [Feature][Api] When use api to run a process we want get processInstanceId (#13184) * add sql * add mapper * add dao * add excutor Co-authored-by: qianl4 --- .../controller/ProcessInstanceController.java | 18 +++ .../api/service/ProcessInstanceService.java | 10 ++ .../api/service/impl/ExecutorServiceImpl.java | 33 ++++- .../impl/ProcessInstanceServiceImpl.java | 27 ++++ .../ProcessInstanceControllerTest.java | 20 +++ .../api/service/ExecutorServiceTest.java | 6 + .../service/ProcessInstanceServiceTest.java | 22 +++ .../common/enums/ApiTriggerType.java | 47 ++++++ .../dao/entity/TriggerRelation.java | 63 ++++++++ .../dao/mapper/ProcessInstanceMapper.java | 7 + .../dao/mapper/TriggerRelationMapper.java | 72 +++++++++ .../dao/mapper/ProcessInstanceMapper.xml | 21 +++ .../dao/mapper/TriggerRelationMapper.xml | 55 +++++++ .../resources/sql/dolphinscheduler_h2.sql | 16 ++ .../resources/sql/dolphinscheduler_mysql.sql | 12 ++ .../sql/dolphinscheduler_postgresql.sql | 15 ++ .../mysql/dolphinscheduler_ddl.sql | 15 ++ .../postgresql/dolphinscheduler_ddl.sql | 15 ++ .../dao/mapper/TriggerRelationMapperTest.java | 138 ++++++++++++++++++ .../runner/WorkflowExecuteRunnable.java | 1 + .../service/process/ProcessService.java | 2 + .../service/process/ProcessServiceImpl.java | 9 ++ .../process/TriggerRelationService.java | 38 +++++ .../process/TriggerRelationServiceImpl.java | 73 +++++++++ .../service/process/ProcessServiceTest.java | 7 +- .../process/TriggerRelationServiceTest.java | 107 ++++++++++++++ 26 files changed, 842 insertions(+), 7 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ApiTriggerType.java create mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TriggerRelation.java create mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TriggerRelationMapper.java create mode 100644 dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TriggerRelationMapper.xml create mode 100644 dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TriggerRelationMapperTest.java create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/TriggerRelationService.java create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/TriggerRelationServiceImpl.java create mode 100644 dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/TriggerRelationServiceTest.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java index cf76fe646f..aba285d358 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.api.controller; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR; + import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ApiException; @@ -411,4 +413,20 @@ public class ProcessInstanceController extends BaseController { } return returnDataList(result); } + + @Operation(summary = "queryProcessInstanceListByTrigger", description = "QUERY_PROCESS_INSTANCE_BY_TRIGGER_NOTES") + @Parameters({ + @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true, schema = @Schema(implementation = Long.class)), + @Parameter(name = "triggerCode", description = "TRIGGER_CODE", required = true, schema = @Schema(implementation = Long.class)) + }) + @GetMapping("/trigger") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR) + @AccessLogAnnotation() + public Result queryProcessInstancesByTriggerCode(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @PathVariable long projectCode, + @RequestParam(value = "triggerCode") Long triggerCode) { + Map result = processInstanceService.queryByTriggerCode(loginUser, projectCode, triggerCode); + return returnDataList(result); + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index 5b8bab32c0..eb760722fa 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -222,6 +222,16 @@ public interface ProcessInstanceService { List queryByProcessDefineCode(Long processDefinitionCode, int size); + /** + * query process instance list bt trigger code + * + * @param loginUser + * @param projectCode + * @param triggerCode + * @return + */ + Map queryByTriggerCode(User loginUser, long projectCode, Long triggerCode); + void deleteProcessInstanceByWorkflowDefinitionCode(long workflowDefinitionCode); void deleteProcessInstanceById(int workflowInstanceId); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 2761b67ca3..765c5c7a38 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.api.service.MonitorService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.WorkerGroupService; import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.enums.ApiTriggerType; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; import org.apache.dolphinscheduler.common.enums.CycleEnum; @@ -52,6 +53,7 @@ import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.Command; @@ -85,6 +87,7 @@ import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.cron.CronUtils; import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.process.TriggerRelationService; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.collections4.CollectionUtils; @@ -106,6 +109,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.Lists; @@ -164,6 +168,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ @Autowired private WorkerGroupService workerGroupService; + @Autowired + private TriggerRelationService triggerRelationService; /** * execute process instance * @@ -188,6 +194,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @return execute process instance code */ @Override + @Transactional(rollbackFor = Exception.class) public Map execProcessInstance(User loginUser, long projectCode, long processDefinitionCode, String cronTime, CommandType commandType, FailureStrategy failureStrategy, String startNodeList, @@ -239,11 +246,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ if (!checkMasterExists(result)) { return result; } + + long triggerCode = CodeGenerateUtils.getInstance().genCode(); + /** * create command */ int create = - this.createCommand(commandType, processDefinition.getCode(), taskDependType, failureStrategy, + this.createCommand(triggerCode, commandType, processDefinition.getCode(), taskDependType, + failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority, workerGroup, @@ -255,6 +266,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ processDefinitionMapper.updateById(processDefinition); logger.info("Create command complete, processDefinitionCode:{}, commandCount:{}.", processDefinition.getCode(), create); + result.put(Constants.DATA_LIST, triggerCode); putMsg(result, Status.SUCCESS); } else { logger.error("Start process instance failed because create command error, processDefinitionCode:{}.", @@ -878,7 +890,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @param environmentCode environmentCode * @return command id */ - private int createCommand(CommandType commandType, long processDefineCode, TaskDependType nodeDep, + private int createCommand(Long triggerCode, CommandType commandType, long processDefineCode, TaskDependType nodeDep, FailureStrategy failureStrategy, String startNodeList, String schedule, WarningType warningType, int executorId, Integer warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode, @@ -940,7 +952,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ try { logger.info("Start to create {} command, processDefinitionCode:{}.", command.getCommandType().getDescp(), processDefineCode); - return createComplementCommandList(schedule, runMode, command, expectedParallelismNumber, + return createComplementCommandList(triggerCode, schedule, runMode, command, expectedParallelismNumber, complementDependentMode); } catch (CronParseException cronParseException) { // We catch the exception here just to make compiler happy, since we have already validated the schedule @@ -949,8 +961,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ } } else { command.setCommandParam(JSONUtils.toJsonString(cmdParam)); - logger.info("Creating command, commandInfo:{}.", command); - return commandService.createCommand(command); + int count = commandService.createCommand(command); + if (count > 0) { + triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode, command.getId()); + } + return count; } } @@ -962,7 +977,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @param runMode * @return */ - protected int createComplementCommandList(String scheduleTimeParam, RunMode runMode, Command command, + protected int createComplementCommandList(Long triggerCode, String scheduleTimeParam, RunMode runMode, + Command command, Integer expectedParallelismNumber, ComplementDependentMode complementDependentMode) throws CronParseException { int createCount = 0; @@ -1027,6 +1043,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command); } } + if (createCount > 0) { + triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode, command.getId()); + } break; } case RUN_MODE_PARALLEL: { @@ -1075,6 +1094,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ if (commandService.createCommand(command) > 0) { logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); + triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode, + command.getId()); } else { logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 0adf53cd89..af4d9ab96e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.api.service.impl; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE; import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST; import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR; import static org.apache.dolphinscheduler.common.constants.Constants.DATA_LIST; @@ -1021,6 +1022,32 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return processInstanceMapper.queryByProcessDefineCode(processDefinitionCode, size); } + /** + * query process instance list bt trigger code + * + * @param loginUser + * @param projectCode + * @param triggerCode + * @return + */ + @Override + public Map queryByTriggerCode(User loginUser, long projectCode, Long triggerCode) { + + Project project = projectMapper.queryByCode(projectCode); + // check user access for project + Map result = + projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE); + if (result.get(Constants.STATUS) != Status.SUCCESS || triggerCode == null) { + return result; + } + + List processInstances = processInstanceMapper.queryByTriggerCode( + triggerCode); + result.put(DATA_LIST, processInstances); + putMsg(result, Status.SUCCESS); + return result; + } + @Override public void deleteProcessInstanceByWorkflowDefinitionCode(long workflowDefinitionCode) { while (true) { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java index 5fa88ea52a..b0f36c9c56 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java @@ -248,4 +248,24 @@ public class ProcessInstanceControllerTest extends AbstractControllerTest { Assertions.assertNotNull(result); Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); } + + @Test + public void queryProcessInstancesByTriggerCode() throws Exception { + Map mockResult = new HashMap<>(); + mockResult.put(Constants.STATUS, Status.SUCCESS); + + Mockito.when(processInstanceService + .queryByTriggerCode(Mockito.any(), Mockito.anyLong(), Mockito.anyLong())) + .thenReturn(mockResult); + + MvcResult mvcResult = mockMvc.perform(get("/projects/1113/process-instances/trigger") + .header("sessionId", sessionId) + .param("triggerCode", "12051206")) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON)) + .andReturn(); + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + Assertions.assertNotNull(result); + Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); + } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index 3f6ad0e0ca..fe19b5b402 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -66,6 +66,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.process.TriggerRelationService; import java.util.ArrayList; import java.util.Collections; @@ -143,6 +144,9 @@ public class ExecutorServiceTest { @Mock private ProcessInstanceMapper processInstanceMapper; + @Mock + private TriggerRelationService triggerRelationService; + private int processDefinitionId = 1; private int processDefinitionVersion = 1; @@ -231,6 +235,8 @@ public class ExecutorServiceTest { Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(this.processDefinition); Mockito.when(taskGroupQueueMapper.selectById(1)).thenReturn(taskGroupQueue); Mockito.when(processInstanceMapper.selectById(1)).thenReturn(processInstance); + Mockito.when(triggerRelationService.saveProcessInstanceTrigger(Mockito.any(), Mockito.any())) + .thenReturn(1); Mockito.when(processService.findRelationByCode(processDefinitionCode, processDefinitionVersion)) .thenReturn(processTaskRelations); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index 61d94c5067..d748480eb8 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -281,6 +281,28 @@ public class ProcessInstanceServiceTest { } + @Test + public void queryByTriggerCode() { + long projectCode = 666L; + User loginUser = getAdminUser(); + Project project = getProject(projectCode); + Map result = new HashMap<>(); + putMsg(result, Status.PROJECT_NOT_FOUND, projectCode); + + // project auth fail + when(projectMapper.queryByCode(projectCode)).thenReturn(project); + when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); + Map proejctAuthFailMap = + processInstanceService.queryByTriggerCode(loginUser, projectCode, 999L); + Assertions.assertEquals(Status.PROJECT_NOT_FOUND, proejctAuthFailMap.get(Constants.STATUS)); + // project auth sucess + putMsg(result, Status.SUCCESS, projectCode); + when(processInstanceMapper.queryByTriggerCode(projectCode)).thenReturn(new ArrayList()); + proejctAuthFailMap = + processInstanceService.queryByTriggerCode(loginUser, projectCode, 999L); + Assertions.assertEquals(Status.SUCCESS, proejctAuthFailMap.get(Constants.STATUS)); + } + @Test public void testQueryTopNLongestRunningProcessInstance() { long projectCode = 1L; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ApiTriggerType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ApiTriggerType.java new file mode 100644 index 0000000000..92814e6191 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ApiTriggerType.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.enums; + +import com.baomidou.mybatisplus.annotation.EnumValue; + +/** + * trigger support type + */ +public enum ApiTriggerType { + + PROCESS(0, "process instance"), + TASK(1, "task node"), + COMMAND(2, "command"); + + ApiTriggerType(int code, String desc) { + this.code = code; + this.desc = desc; + } + + @EnumValue + private final int code; + private final String desc; + + public int getCode() { + return code; + } + + public String getDesc() { + return desc; + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TriggerRelation.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TriggerRelation.java new file mode 100644 index 0000000000..0e83ff06ef --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TriggerRelation.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.entity; + +import java.util.Date; + +import lombok.Data; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; + +@Data +@TableName("t_ds_trigger_relation") +public class TriggerRelation { + + /** + * id + */ + @TableId(value = "id", type = IdType.AUTO) + private Integer id; + + /** + * trigger code + */ + private long triggerCode; + + /** + * triggerType + */ + private int triggerType; + + /** + * jobId + */ + private Integer jobId; + + /** + * create time + */ + private Date createTime; + + /** + * update time + */ + private Date updateTime; + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index d6cc986a39..7807998b16 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -310,4 +310,11 @@ public interface ProcessInstanceMapper extends BaseMapper { @Param("model") Integer model, @Param("projectIds") Set projectIds); + /** + * query process list by triggerCode + * + * @param triggerCode + * @return + */ + List queryByTriggerCode(@Param("triggerCode") Long triggerCode); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TriggerRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TriggerRelationMapper.java new file mode 100644 index 0000000000..10a0acf47f --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TriggerRelationMapper.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.mapper; + +import org.apache.dolphinscheduler.dao.entity.TriggerRelation; + +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +/** + * triggerRelation mapper interface + */ +public interface TriggerRelationMapper extends BaseMapper { + + /** + * query by code and id + * @param triggerType + * @param jobId + * @return + */ + TriggerRelation queryByTypeAndJobId(@Param("triggerType") Integer triggerType, @Param("jobId") int jobId); + + /** + * query triggerRelation by code + * + * @param triggerCode triggerCode + * @return triggerRelation + */ + List queryByTriggerRelationCode(@Param("triggerCode") Long triggerCode); + + /** + * query triggerRelation by code + * + * @param triggerCode triggerCode + * @return triggerRelation + */ + List queryByTriggerRelationCodeAndType(@Param("triggerCode") Long triggerCode, + @Param("triggerType") Integer triggerType); + + /** + * delete triggerRelation by code + * + * @param triggerCode triggerCode + * @return int + */ + int deleteByCode(@Param("triggerCode") Long triggerCode); + + /** + * if exist update else insert + * + * @param triggerRelation + */ + void upsert(@Param("triggerRelation") TriggerRelation triggerRelation); +} diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index 0bf5b29ea1..7c64c035c6 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -27,6 +27,17 @@ process_instance_priority, worker_group,environment_code, timeout, tenant_id, tenant_code, var_pool, dry_run, test_flag, next_process_instance_id, restart_time, state_history + + + ${alias}.id + , ${alias}.name, ${alias}.process_definition_version, ${alias}.process_definition_code, ${alias}.project_code, ${alias}.state, ${alias}.recovery, ${alias}.start_time, ${alias}.end_time, ${alias}.run_times,host, + command_type, ${alias}.command_param, ${alias}.task_depend_type, ${alias}.max_try_times, ${alias}.failure_strategy, ${alias}.warning_type, + warning_group_id, ${alias}.schedule_time, ${alias}.command_start_time, ${alias}.global_params, ${alias}.flag, + update_time, ${alias}.is_sub_process, ${alias}.executor_id, ${alias}.history_cmd, + process_instance_priority, ${alias}.worker_group,environment_code, ${alias}.timeout, ${alias}.tenant_id, ${alias}.tenant_code, ${alias}.var_pool, + dry_run, ${alias}.test_flag, ${alias}.next_process_instance_id, ${alias}.restart_time, ${alias}.state_history + + + select + + + + from t_ds_trigger_relation a + join t_ds_process_instance b on a.job_id = b.id + where a.trigger_type = 0 and a.trigger_code = #{triggerCode} + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TriggerRelationMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TriggerRelationMapper.xml new file mode 100644 index 0000000000..8055a56018 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TriggerRelationMapper.xml @@ -0,0 +1,55 @@ + + + + + + + id, trigger_code, trigger_type, job_id, create_time, update_time + + + + + + + + + + delete from t_ds_trigger_relation where triggerCode = #{triggerCode} + + + + INSERT INTO t_ds_trigger_relation (trigger_code, trigger_type, job_id, create_time, update_time) VALUES( + #{triggerRelation.triggerCode},#{triggerRelation.triggerType},#{triggerRelation.jobId},#{triggerRelation.createTime},#{triggerRelation.updateTime}) + ON DUPLICATE KEY UPDATE update_time = #{triggerRelation.updateTime}; + + diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index 3f94e21b61..07c6820683 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -2042,3 +2042,19 @@ CREATE TABLE t_ds_fav_task user_id int NOT NULL, PRIMARY KEY (id) ); + +-- +-- Table structure for t_ds_trigger_relation +-- +DROP TABLE IF EXISTS `t_ds_trigger_relation`; +CREATE TABLE t_ds_trigger_relation +( + id bigint(20) NOT NULL AUTO_INCREMENT, + trigger_type int NOT NULL, + job_id int NOT NULL, + trigger_code bigint(20) NOT NULL, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE KEY t_ds_trigger_relation_UN(trigger_type,job_id,trigger_code) +); diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 227692f222..062db95c6b 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -2018,3 +2018,15 @@ CREATE TABLE `t_ds_fav_task` ) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8; + +CREATE TABLE `t_ds_trigger_relation` ( + `id` bigint(20) NOT NULL AUTO_INCREMENT, + `trigger_type` int(11) NOT NULL DEFAULT '0' COMMENT '0 process 1 task', + `trigger_code` bigint(20) NOT NULL, + `job_id` bigint(20) NOT NULL, + `create_time` datetime DEFAULT NULL, + `update_time` datetime DEFAULT NULL, + PRIMARY KEY (`id`), + KEY `t_ds_trigger_relation_trigger_code_IDX` (`trigger_code`), + UNIQUE KEY `t_ds_trigger_relation_UN` (`trigger_type`,`job_id`,`trigger_code`) +) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index a41959b806..15ecf49bb6 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -1994,3 +1994,18 @@ CREATE TABLE t_ds_fav_task user_id int NOT NULL, PRIMARY KEY (id) ); + +-- ---------------------------- +-- Table structure for t_ds_trigger_relation +-- ---------------------------- +DROP TABLE IF EXISTS t_ds_trigger_relation; +CREATE TABLE t_ds_trigger_relation ( + id serial NOT NULL, + trigger_type int NOT NULL, + trigger_code bigint NOT NULL, + job_id bigint NOT NULL, + create_time timestamp DEFAULT NULL, + update_time timestamp DEFAULT NULL, + PRIMARY KEY (id), + CONSTRAINT t_ds_trigger_relation_unique UNIQUE (trigger_type,job_id,trigger_code) +); diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql index a2643503bd..cc6335af92 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql @@ -121,6 +121,21 @@ delimiter ; CALL uc_dolphin_T_t_ds_task_instance_R_test_flag; DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_R_test_flag; +delimiter d// +CREATE TABLE `t_ds_trigger_relation` ( + `id` bigint(20) NOT NULL AUTO_INCREMENT, + `trigger_type` int(11) NOT NULL DEFAULT '0' COMMENT '0 process 1 task', + `trigger_code` bigint(20) NOT NULL, + `job_id` bigint(20) NOT NULL, + `create_time` datetime DEFAULT NULL, + `update_time` datetime DEFAULT NULL, + PRIMARY KEY (`id`), + KEY `t_ds_trigger_relation_trigger_code_IDX` (`trigger_code`), + UNIQUE KEY `t_ds_trigger_relation_UN` (`trigger_type`,`job_id`,`trigger_code`) +) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; +d// +delimiter ; + -- uc_dolphin_T_t_ds_task_definition_R_is_cache drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_definition_R_is_cache; delimiter d// diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql index b5b4ab048e..fc0e8e1e04 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql @@ -120,6 +120,21 @@ delimiter ; select uc_dolphin_T_t_ds_task_instance_R_test_flag(); DROP FUNCTION uc_dolphin_T_t_ds_task_instance_R_test_flag(); +delimiter d// +DROP TABLE IF EXISTS t_ds_trigger_relation; +CREATE TABLE t_ds_trigger_relation ( + id serial NOT NULL, + trigger_type int NOT NULL, + trigger_code bigint NOT NULL, + job_id bigint NOT NULL, + create_time timestamp DEFAULT NULL, + update_time timestamp DEFAULT NULL, + PRIMARY KEY (id), + CONSTRAINT t_ds_trigger_relation_unique UNIQUE (trigger_type,job_id,trigger_code) +); +d// +delimiter ; + ALTER TABLE t_ds_task_definition DROP COLUMN IF EXISTS is_cache; ALTER TABLE t_ds_task_definition ADD COLUMN IF NOT EXISTS is_cache int DEFAULT '0'; diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TriggerRelationMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TriggerRelationMapperTest.java new file mode 100644 index 0000000000..d3f4fcc666 --- /dev/null +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TriggerRelationMapperTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.mapper; + +import org.apache.dolphinscheduler.common.enums.ApiTriggerType; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.dao.BaseDaoTest; +import org.apache.dolphinscheduler.dao.entity.TriggerRelation; + +import java.util.List; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * trigger mapper test + */ +public class TriggerRelationMapperTest extends BaseDaoTest { + + @Autowired + TriggerRelationMapper triggerRelationMapper; + + /** + * test insert + * + * @return + */ + @Test + public void testInsert() { + TriggerRelation expectedObj = createTriggerRelation(); + Assertions.assertTrue(expectedObj.getId() > 0); + } + + /** + * test select by id + * + * @return + */ + @Test + public void testSelectById() { + TriggerRelation expectRelation = createTriggerRelation(); + TriggerRelation actualRelation = triggerRelationMapper.selectById(expectRelation.getId()); + Assertions.assertEquals(expectRelation, actualRelation); + } + + /** + * test select by type and job id + * + * @return + */ + @Test + public void testQueryByTypeAndJobId() { + TriggerRelation expectRelation = createTriggerRelation(); + TriggerRelation actualRelation = triggerRelationMapper.queryByTypeAndJobId( + expectRelation.getTriggerType(), expectRelation.getJobId()); + Assertions.assertEquals(expectRelation, actualRelation); + } + + /** + * test select by trigger code + * + * @return + */ + @Test + public void testQueryByTriggerRelationCode() { + TriggerRelation expectRelation = createTriggerRelation(); + List actualRelations = triggerRelationMapper.queryByTriggerRelationCode( + expectRelation.getTriggerCode()); + Assertions.assertEquals(actualRelations.size(), 1); + } + + /** + * test select by type and trigger code + * + * @return + */ + @Test + public void testQueryByTriggerRelationCodeAndType() { + TriggerRelation expectRelation = createTriggerRelation(); + List actualRelations = triggerRelationMapper.queryByTriggerRelationCodeAndType( + expectRelation.getTriggerCode(), expectRelation.getTriggerType()); + Assertions.assertEquals(actualRelations.size(), 1); + } + + @Test + public void testUpsert() { + TriggerRelation expectRelation = createTriggerRelation(); + triggerRelationMapper.upsert(expectRelation); + TriggerRelation actualRelation = triggerRelationMapper.selectById(expectRelation.getId()); + Assertions.assertEquals(expectRelation, actualRelation); + } + + /** + * test delete + */ + @Test + public void testDelete() { + TriggerRelation expectRelation = createTriggerRelation(); + triggerRelationMapper.deleteById(expectRelation.getId()); + TriggerRelation actualRelation = triggerRelationMapper.selectById(expectRelation.getId()); + Assertions.assertNull(actualRelation); + } + + /** + * create TriggerRelation and insert + * + * @return TriggerRelation + * @throws Exception + */ + private TriggerRelation createTriggerRelation() { + TriggerRelation triggerRelation = new TriggerRelation(); + triggerRelation.setTriggerCode(4567890); + triggerRelation.setTriggerType(ApiTriggerType.COMMAND.getCode()); + triggerRelation.setJobId(99); + triggerRelation.setCreateTime(DateUtils.getCurrentDate()); + triggerRelation.setUpdateTime(DateUtils.getCurrentDate()); + + triggerRelationMapper.insert(triggerRelation); + return triggerRelation; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 62caae828e..5f981cf92e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -704,6 +704,7 @@ public class WorkflowExecuteRunnable implements Callable { command.setProcessInstanceId(0); command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion()); command.setTestFlag(processInstance.getTestFlag()); + processService.saveCommandTrigger(command.getId(), processInstance.getId()); return commandService.createCommand(command); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index e321ce5441..48689909db 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -241,4 +241,6 @@ public interface ProcessService { void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId); Integer queryTestDataSourceId(Integer onlineDataSourceId); + + void saveCommandTrigger(Integer commandId, Integer processInstanceId); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index ce8360ad64..3591bec514 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -308,6 +308,8 @@ public class ProcessServiceImpl implements ProcessService { @Autowired private CommandService commandService; + @Autowired + private TriggerRelationService triggerRelationService; /** * handle Command (construct ProcessInstance from Command) , wrapped in transaction * @@ -336,12 +338,14 @@ public class ProcessServiceImpl implements ProcessService { saveSerialProcess(processInstance, processDefinition); if (processInstance.getState() != WorkflowExecutionStatus.SUBMITTED_SUCCESS) { setSubProcessParam(processInstance); + triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId()); deleteCommandWithCheck(command.getId()); return null; } } else { processInstanceDao.upsertProcessInstance(processInstance); } + triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId()); setSubProcessParam(processInstance); deleteCommandWithCheck(command.getId()); return processInstance; @@ -2708,6 +2712,11 @@ public class ProcessServiceImpl implements ProcessService { return null; } + @Override + public void saveCommandTrigger(Integer commandId, Integer processInstanceId) { + triggerRelationService.saveCommandTrigger(commandId, processInstanceId); + } + private Set getResourceFullNames(TaskDefinition taskDefinition) { Set resourceFullNames = null; AbstractParameters params = taskPluginManager.getParameters(ParametersNode.builder() diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/TriggerRelationService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/TriggerRelationService.java new file mode 100644 index 0000000000..b78f477931 --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/TriggerRelationService.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.process; + +import org.apache.dolphinscheduler.common.enums.ApiTriggerType; +import org.apache.dolphinscheduler.dao.entity.TriggerRelation; + +import org.springframework.stereotype.Component; + +/** + * Trigger relation operator to db,because operator command process instance + */ +@Component +public interface TriggerRelationService { + + void saveTriggerToDb(ApiTriggerType type, Long triggerCode, Integer jobId); + + TriggerRelation queryByTypeAndJobId(ApiTriggerType apiTriggerType, int jobId); + + int saveCommandTrigger(Integer commandId, Integer processInstanceId); + + int saveProcessInstanceTrigger(Integer commandId, Integer processInstanceId); +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/TriggerRelationServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/TriggerRelationServiceImpl.java new file mode 100644 index 0000000000..df41c41eef --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/TriggerRelationServiceImpl.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.process; + +import org.apache.dolphinscheduler.common.enums.ApiTriggerType; +import org.apache.dolphinscheduler.dao.entity.TriggerRelation; +import org.apache.dolphinscheduler.dao.mapper.TriggerRelationMapper; + +import java.util.Date; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * Trigger relation operator to db + */ +@Component +public class TriggerRelationServiceImpl implements TriggerRelationService { + + @Autowired + private TriggerRelationMapper triggerRelationMapper; + + @Override + public void saveTriggerToDb(ApiTriggerType type, Long triggerCode, Integer jobId) { + TriggerRelation triggerRelation = new TriggerRelation(); + triggerRelation.setTriggerType(type.getCode()); + triggerRelation.setJobId(jobId); + triggerRelation.setTriggerCode(triggerCode); + triggerRelation.setCreateTime(new Date()); + triggerRelation.setUpdateTime(new Date()); + triggerRelationMapper.upsert(triggerRelation); + } + @Override + public TriggerRelation queryByTypeAndJobId(ApiTriggerType apiTriggerType, int jobId) { + return triggerRelationMapper.queryByTypeAndJobId(apiTriggerType.getCode(), jobId); + } + + @Override + public int saveCommandTrigger(Integer commandId, Integer processInstanceId) { + TriggerRelation exist = queryByTypeAndJobId(ApiTriggerType.PROCESS, processInstanceId); + if (exist == null) { + return 0; + } + saveTriggerToDb(ApiTriggerType.COMMAND, exist.getTriggerCode(), commandId); + return 1; + } + + @Override + public int saveProcessInstanceTrigger(Integer commandId, Integer processInstanceId) { + TriggerRelation exist = queryByTypeAndJobId(ApiTriggerType.COMMAND, commandId); + if (exist == null) { + return 0; + } + saveTriggerToDb(ApiTriggerType.PROCESS, exist.getTriggerCode(), processInstanceId); + return 1; + } + +} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index ef08a76fcd..65b67c09fe 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -183,9 +183,10 @@ public class ProcessServiceTest { @Mock TaskPluginManager taskPluginManager; + @Mock + private TriggerRelationService triggerRelationService; @Test public void testHandleCommand() throws CronParseException, CodeGenerateUtils.CodeGenerateException { - // cannot construct process instance, return null; String host = "127.0.0.1"; Command command = new Command(); @@ -238,6 +239,8 @@ public class ProcessServiceTest { Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition)); Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance); + Mockito.when(triggerRelationService.saveProcessInstanceTrigger(Mockito.any(), Mockito.any())) + .thenReturn(1); Assertions.assertNotNull(processService.handleCommand(host, command1)); Command command2 = new Command(); @@ -410,6 +413,8 @@ public class ProcessServiceTest { Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition)); Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance); + Mockito.when(triggerRelationService.saveProcessInstanceTrigger(Mockito.any(), Mockito.any())) + .thenReturn(1); Assertions.assertThrows(ServiceException.class, () -> { // will throw exception when command id is 0 and delete fail diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/TriggerRelationServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/TriggerRelationServiceTest.java new file mode 100644 index 0000000000..8f4790111c --- /dev/null +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/TriggerRelationServiceTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.process; + +import org.apache.dolphinscheduler.common.enums.ApiTriggerType; +import org.apache.dolphinscheduler.dao.entity.TriggerRelation; +import org.apache.dolphinscheduler.dao.mapper.TriggerRelationMapper; +import org.apache.dolphinscheduler.service.cron.CronUtilsTest; + +import java.util.Date; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Trigger Relation Service Test + */ +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +public class TriggerRelationServiceTest { + + private static final Logger logger = LoggerFactory.getLogger(CronUtilsTest.class); + + @InjectMocks + private TriggerRelationServiceImpl triggerRelationService; + @Mock + private TriggerRelationMapper triggerRelationMapper; + + @Test + public void saveTriggerToDb() { + Mockito.doNothing().when(triggerRelationMapper).upsert(Mockito.any()); + triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, 1234567890L, 100); + } + + @Test + public void queryByTypeAndJobId() { + Mockito.doNothing().when(triggerRelationMapper).upsert(Mockito.any()); + Mockito.when(triggerRelationMapper.queryByTypeAndJobId(ApiTriggerType.PROCESS.getCode(), 100)) + .thenReturn(getTriggerTdoDb()); + + TriggerRelation triggerRelation1 = triggerRelationService.queryByTypeAndJobId( + ApiTriggerType.PROCESS, 100); + Assertions.assertNotNull(triggerRelation1); + TriggerRelation triggerRelation2 = triggerRelationService.queryByTypeAndJobId( + ApiTriggerType.PROCESS, 200); + Assertions.assertNull(triggerRelation2); + } + + @Test + public void saveCommandTrigger() { + Mockito.doNothing().when(triggerRelationMapper).upsert(Mockito.any()); + Mockito.when(triggerRelationMapper.queryByTypeAndJobId(ApiTriggerType.PROCESS.getCode(), 100)) + .thenReturn(getTriggerTdoDb()); + int result = -1; + result = triggerRelationService.saveCommandTrigger(1234567890, 100); + Assertions.assertTrue(result > 0); + result = triggerRelationService.saveCommandTrigger(1234567890, 200); + Assertions.assertTrue(result == 0); + + } + + @Test + public void saveProcessInstanceTrigger() { + Mockito.doNothing().when(triggerRelationMapper).upsert(Mockito.any()); + Mockito.when(triggerRelationMapper.queryByTypeAndJobId(ApiTriggerType.COMMAND.getCode(), 100)) + .thenReturn(getTriggerTdoDb()); + int result = -1; + result = triggerRelationService.saveProcessInstanceTrigger(100, 1234567890); + Assertions.assertTrue(result > 0); + result = triggerRelationService.saveProcessInstanceTrigger(200, 1234567890); + Assertions.assertTrue(result == 0); + } + + private TriggerRelation getTriggerTdoDb() { + TriggerRelation triggerRelation = new TriggerRelation(); + triggerRelation.setTriggerType(ApiTriggerType.PROCESS.getCode()); + triggerRelation.setJobId(100); + triggerRelation.setTriggerCode(1234567890L); + triggerRelation.setCreateTime(new Date()); + triggerRelation.setUpdateTime(new Date()); + return triggerRelation; + } +}