Browse Source

[Feature][Api] When use api to run a process we want get processInstanceId (#13184)

* add sql

* add mapper

* add dao

* add excutor


Co-authored-by: qianl4 <qianl4@cicso.com>
3.2.0-release
qianli2022 2 years ago committed by GitHub
parent
commit
8be32d4145
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
  2. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  3. 33
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  4. 27
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  5. 20
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java
  6. 6
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
  7. 22
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  8. 47
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ApiTriggerType.java
  9. 63
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TriggerRelation.java
  10. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
  11. 72
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TriggerRelationMapper.java
  12. 21
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  13. 55
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TriggerRelationMapper.xml
  14. 16
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
  15. 12
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
  16. 15
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
  17. 15
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql
  18. 15
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql
  19. 138
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TriggerRelationMapperTest.java
  20. 1
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  21. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  22. 9
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  23. 38
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/TriggerRelationService.java
  24. 73
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/TriggerRelationServiceImpl.java
  25. 7
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  26. 107
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/TriggerRelationServiceTest.java

18
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.api.controller; package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.exceptions.ApiException;
@ -411,4 +413,20 @@ public class ProcessInstanceController extends BaseController {
} }
return returnDataList(result); return returnDataList(result);
} }
@Operation(summary = "queryProcessInstanceListByTrigger", description = "QUERY_PROCESS_INSTANCE_BY_TRIGGER_NOTES")
@Parameters({
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true, schema = @Schema(implementation = Long.class)),
@Parameter(name = "triggerCode", description = "TRIGGER_CODE", required = true, schema = @Schema(implementation = Long.class))
})
@GetMapping("/trigger")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR)
@AccessLogAnnotation()
public Result queryProcessInstancesByTriggerCode(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable long projectCode,
@RequestParam(value = "triggerCode") Long triggerCode) {
Map<String, Object> result = processInstanceService.queryByTriggerCode(loginUser, projectCode, triggerCode);
return returnDataList(result);
}
} }

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -222,6 +222,16 @@ public interface ProcessInstanceService {
List<ProcessInstance> queryByProcessDefineCode(Long processDefinitionCode, List<ProcessInstance> queryByProcessDefineCode(Long processDefinitionCode,
int size); int size);
/**
* query process instance list bt trigger code
*
* @param loginUser
* @param projectCode
* @param triggerCode
* @return
*/
Map<String, Object> queryByTriggerCode(User loginUser, long projectCode, Long triggerCode);
void deleteProcessInstanceByWorkflowDefinitionCode(long workflowDefinitionCode); void deleteProcessInstanceByWorkflowDefinitionCode(long workflowDefinitionCode);
void deleteProcessInstanceById(int workflowInstanceId); void deleteProcessInstanceById(int workflowInstanceId);

33
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.WorkerGroupService; import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ApiTriggerType;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.CycleEnum; import org.apache.dolphinscheduler.common.enums.CycleEnum;
@ -52,6 +53,7 @@ import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Command;
@ -85,6 +87,7 @@ import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.cron.CronUtils; import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.process.TriggerRelationService;
import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
@ -106,6 +109,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -164,6 +168,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
@Autowired @Autowired
private WorkerGroupService workerGroupService; private WorkerGroupService workerGroupService;
@Autowired
private TriggerRelationService triggerRelationService;
/** /**
* execute process instance * execute process instance
* *
@ -188,6 +194,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @return execute process instance code * @return execute process instance code
*/ */
@Override @Override
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> execProcessInstance(User loginUser, long projectCode, long processDefinitionCode, public Map<String, Object> execProcessInstance(User loginUser, long projectCode, long processDefinitionCode,
String cronTime, CommandType commandType, String cronTime, CommandType commandType,
FailureStrategy failureStrategy, String startNodeList, FailureStrategy failureStrategy, String startNodeList,
@ -239,11 +246,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
if (!checkMasterExists(result)) { if (!checkMasterExists(result)) {
return result; return result;
} }
long triggerCode = CodeGenerateUtils.getInstance().genCode();
/** /**
* create command * create command
*/ */
int create = int create =
this.createCommand(commandType, processDefinition.getCode(), taskDependType, failureStrategy, this.createCommand(triggerCode, commandType, processDefinition.getCode(), taskDependType,
failureStrategy,
startNodeList, startNodeList,
cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority, cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority,
workerGroup, workerGroup,
@ -255,6 +266,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
processDefinitionMapper.updateById(processDefinition); processDefinitionMapper.updateById(processDefinition);
logger.info("Create command complete, processDefinitionCode:{}, commandCount:{}.", logger.info("Create command complete, processDefinitionCode:{}, commandCount:{}.",
processDefinition.getCode(), create); processDefinition.getCode(), create);
result.put(Constants.DATA_LIST, triggerCode);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} else { } else {
logger.error("Start process instance failed because create command error, processDefinitionCode:{}.", logger.error("Start process instance failed because create command error, processDefinitionCode:{}.",
@ -878,7 +890,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @param environmentCode environmentCode * @param environmentCode environmentCode
* @return command id * @return command id
*/ */
private int createCommand(CommandType commandType, long processDefineCode, TaskDependType nodeDep, private int createCommand(Long triggerCode, CommandType commandType, long processDefineCode, TaskDependType nodeDep,
FailureStrategy failureStrategy, String startNodeList, String schedule, FailureStrategy failureStrategy, String startNodeList, String schedule,
WarningType warningType, int executorId, Integer warningGroupId, RunMode runMode, WarningType warningType, int executorId, Integer warningGroupId, RunMode runMode,
Priority processInstancePriority, String workerGroup, Long environmentCode, Priority processInstancePriority, String workerGroup, Long environmentCode,
@ -940,7 +952,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
try { try {
logger.info("Start to create {} command, processDefinitionCode:{}.", logger.info("Start to create {} command, processDefinitionCode:{}.",
command.getCommandType().getDescp(), processDefineCode); command.getCommandType().getDescp(), processDefineCode);
return createComplementCommandList(schedule, runMode, command, expectedParallelismNumber, return createComplementCommandList(triggerCode, schedule, runMode, command, expectedParallelismNumber,
complementDependentMode); complementDependentMode);
} catch (CronParseException cronParseException) { } catch (CronParseException cronParseException) {
// We catch the exception here just to make compiler happy, since we have already validated the schedule // We catch the exception here just to make compiler happy, since we have already validated the schedule
@ -949,8 +961,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
} }
} else { } else {
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating command, commandInfo:{}.", command); int count = commandService.createCommand(command);
return commandService.createCommand(command); if (count > 0) {
triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode, command.getId());
}
return count;
} }
} }
@ -962,7 +977,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @param runMode * @param runMode
* @return * @return
*/ */
protected int createComplementCommandList(String scheduleTimeParam, RunMode runMode, Command command, protected int createComplementCommandList(Long triggerCode, String scheduleTimeParam, RunMode runMode,
Command command,
Integer expectedParallelismNumber, Integer expectedParallelismNumber,
ComplementDependentMode complementDependentMode) throws CronParseException { ComplementDependentMode complementDependentMode) throws CronParseException {
int createCount = 0; int createCount = 0;
@ -1027,6 +1043,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command); dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command);
} }
} }
if (createCount > 0) {
triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode, command.getId());
}
break; break;
} }
case RUN_MODE_PARALLEL: { case RUN_MODE_PARALLEL: {
@ -1075,6 +1094,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
if (commandService.createCommand(command) > 0) { if (commandService.createCommand(command) > 0) {
logger.info("Create {} command complete, processDefinitionCode:{}", logger.info("Create {} command complete, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode()); command.getCommandType().getDescp(), command.getProcessDefinitionCode());
triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode,
command.getId());
} else { } else {
logger.error("Create {} command error, processDefinitionCode:{}", logger.error("Create {} command error, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode()); command.getCommandType().getDescp(), command.getProcessDefinitionCode());

27
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.service.impl; package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE;
import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST; import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST;
import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR;
import static org.apache.dolphinscheduler.common.constants.Constants.DATA_LIST; import static org.apache.dolphinscheduler.common.constants.Constants.DATA_LIST;
@ -1021,6 +1022,32 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return processInstanceMapper.queryByProcessDefineCode(processDefinitionCode, size); return processInstanceMapper.queryByProcessDefineCode(processDefinitionCode, size);
} }
/**
* query process instance list bt trigger code
*
* @param loginUser
* @param projectCode
* @param triggerCode
* @return
*/
@Override
public Map<String, Object> queryByTriggerCode(User loginUser, long projectCode, Long triggerCode) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE);
if (result.get(Constants.STATUS) != Status.SUCCESS || triggerCode == null) {
return result;
}
List<ProcessInstance> processInstances = processInstanceMapper.queryByTriggerCode(
triggerCode);
result.put(DATA_LIST, processInstances);
putMsg(result, Status.SUCCESS);
return result;
}
@Override @Override
public void deleteProcessInstanceByWorkflowDefinitionCode(long workflowDefinitionCode) { public void deleteProcessInstanceByWorkflowDefinitionCode(long workflowDefinitionCode) {
while (true) { while (true) {

20
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java

@ -248,4 +248,24 @@ public class ProcessInstanceControllerTest extends AbstractControllerTest {
Assertions.assertNotNull(result); Assertions.assertNotNull(result);
Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
} }
@Test
public void queryProcessInstancesByTriggerCode() throws Exception {
Map<String, Object> mockResult = new HashMap<>();
mockResult.put(Constants.STATUS, Status.SUCCESS);
Mockito.when(processInstanceService
.queryByTriggerCode(Mockito.any(), Mockito.anyLong(), Mockito.anyLong()))
.thenReturn(mockResult);
MvcResult mvcResult = mockMvc.perform(get("/projects/1113/process-instances/trigger")
.header("sessionId", sessionId)
.param("triggerCode", "12051206"))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assertions.assertNotNull(result);
Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
}
} }

6
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java

@ -66,6 +66,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.process.TriggerRelationService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -143,6 +144,9 @@ public class ExecutorServiceTest {
@Mock @Mock
private ProcessInstanceMapper processInstanceMapper; private ProcessInstanceMapper processInstanceMapper;
@Mock
private TriggerRelationService triggerRelationService;
private int processDefinitionId = 1; private int processDefinitionId = 1;
private int processDefinitionVersion = 1; private int processDefinitionVersion = 1;
@ -231,6 +235,8 @@ public class ExecutorServiceTest {
Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(this.processDefinition); Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(this.processDefinition);
Mockito.when(taskGroupQueueMapper.selectById(1)).thenReturn(taskGroupQueue); Mockito.when(taskGroupQueueMapper.selectById(1)).thenReturn(taskGroupQueue);
Mockito.when(processInstanceMapper.selectById(1)).thenReturn(processInstance); Mockito.when(processInstanceMapper.selectById(1)).thenReturn(processInstance);
Mockito.when(triggerRelationService.saveProcessInstanceTrigger(Mockito.any(), Mockito.any()))
.thenReturn(1);
Mockito.when(processService.findRelationByCode(processDefinitionCode, processDefinitionVersion)) Mockito.when(processService.findRelationByCode(processDefinitionCode, processDefinitionVersion))
.thenReturn(processTaskRelations); .thenReturn(processTaskRelations);
} }

22
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -281,6 +281,28 @@ public class ProcessInstanceServiceTest {
} }
@Test
public void queryByTriggerCode() {
long projectCode = 666L;
User loginUser = getAdminUser();
Project project = getProject(projectCode);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
// project auth fail
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
Map<String, Object> proejctAuthFailMap =
processInstanceService.queryByTriggerCode(loginUser, projectCode, 999L);
Assertions.assertEquals(Status.PROJECT_NOT_FOUND, proejctAuthFailMap.get(Constants.STATUS));
// project auth sucess
putMsg(result, Status.SUCCESS, projectCode);
when(processInstanceMapper.queryByTriggerCode(projectCode)).thenReturn(new ArrayList());
proejctAuthFailMap =
processInstanceService.queryByTriggerCode(loginUser, projectCode, 999L);
Assertions.assertEquals(Status.SUCCESS, proejctAuthFailMap.get(Constants.STATUS));
}
@Test @Test
public void testQueryTopNLongestRunningProcessInstance() { public void testQueryTopNLongestRunningProcessInstance() {
long projectCode = 1L; long projectCode = 1L;

47
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ApiTriggerType.java

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
/**
* trigger support type
*/
public enum ApiTriggerType {
PROCESS(0, "process instance"),
TASK(1, "task node"),
COMMAND(2, "command");
ApiTriggerType(int code, String desc) {
this.code = code;
this.desc = desc;
}
@EnumValue
private final int code;
private final String desc;
public int getCode() {
return code;
}
public String getDesc() {
return desc;
}
}

63
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TriggerRelation.java

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
import java.util.Date;
import lombok.Data;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@Data
@TableName("t_ds_trigger_relation")
public class TriggerRelation {
/**
* id
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* trigger code
*/
private long triggerCode;
/**
* triggerType
*/
private int triggerType;
/**
* jobId
*/
private Integer jobId;
/**
* create time
*/
private Date createTime;
/**
* update time
*/
private Date updateTime;
}

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

@ -310,4 +310,11 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
@Param("model") Integer model, @Param("model") Integer model,
@Param("projectIds") Set<Integer> projectIds); @Param("projectIds") Set<Integer> projectIds);
/**
* query process list by triggerCode
*
* @param triggerCode
* @return
*/
List<ProcessInstance> queryByTriggerCode(@Param("triggerCode") Long triggerCode);
} }

72
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TriggerRelationMapper.java

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.TriggerRelation;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* triggerRelation mapper interface
*/
public interface TriggerRelationMapper extends BaseMapper<TriggerRelation> {
/**
* query by code and id
* @param triggerType
* @param jobId
* @return
*/
TriggerRelation queryByTypeAndJobId(@Param("triggerType") Integer triggerType, @Param("jobId") int jobId);
/**
* query triggerRelation by code
*
* @param triggerCode triggerCode
* @return triggerRelation
*/
List<TriggerRelation> queryByTriggerRelationCode(@Param("triggerCode") Long triggerCode);
/**
* query triggerRelation by code
*
* @param triggerCode triggerCode
* @return triggerRelation
*/
List<TriggerRelation> queryByTriggerRelationCodeAndType(@Param("triggerCode") Long triggerCode,
@Param("triggerType") Integer triggerType);
/**
* delete triggerRelation by code
*
* @param triggerCode triggerCode
* @return int
*/
int deleteByCode(@Param("triggerCode") Long triggerCode);
/**
* if exist update else insert
*
* @param triggerRelation
*/
void upsert(@Param("triggerRelation") TriggerRelation triggerRelation);
}

21
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -27,6 +27,17 @@
process_instance_priority, worker_group,environment_code, timeout, tenant_id, tenant_code, var_pool, process_instance_priority, worker_group,environment_code, timeout, tenant_id, tenant_code, var_pool,
dry_run, test_flag, next_process_instance_id, restart_time, state_history dry_run, test_flag, next_process_instance_id, restart_time, state_history
</sql> </sql>
<sql id="baseSqlV2">
${alias}.id
, ${alias}.name, ${alias}.process_definition_version, ${alias}.process_definition_code, ${alias}.project_code, ${alias}.state, ${alias}.recovery, ${alias}.start_time, ${alias}.end_time, ${alias}.run_times,host,
command_type, ${alias}.command_param, ${alias}.task_depend_type, ${alias}.max_try_times, ${alias}.failure_strategy, ${alias}.warning_type,
warning_group_id, ${alias}.schedule_time, ${alias}.command_start_time, ${alias}.global_params, ${alias}.flag,
update_time, ${alias}.is_sub_process, ${alias}.executor_id, ${alias}.history_cmd,
process_instance_priority, ${alias}.worker_group,environment_code, ${alias}.timeout, ${alias}.tenant_id, ${alias}.tenant_code, ${alias}.var_pool,
dry_run, ${alias}.test_flag, ${alias}.next_process_instance_id, ${alias}.restart_time, ${alias}.state_history
</sql>
<select id="queryDetailById" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance"> <select id="queryDetailById" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select select
<include refid="baseSql"/> <include refid="baseSql"/>
@ -335,4 +346,14 @@
where id = #{runningInstanceId} where id = #{runningInstanceId}
and next_process_instance_id = 0 and next_process_instance_id = 0
</update> </update>
<select id="queryByTriggerCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSqlV2">
<property name="alias" value="b"/>
</include>
from t_ds_trigger_relation a
join t_ds_process_instance b on a.job_id = b.id
where a.trigger_type = 0 and a.trigger_code = #{triggerCode}
</select>
</mapper> </mapper>

55
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TriggerRelationMapper.xml

@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.TriggerRelationMapper">
<sql id="baseSql">
id, trigger_code, trigger_type, job_id, create_time, update_time
</sql>
<select id="queryByTypeAndJobId" resultType="org.apache.dolphinscheduler.dao.entity.TriggerRelation">
select
<include refid="baseSql"/>
from t_ds_trigger_relation
WHERE trigger_type = #{triggerType} and job_id = #{jobId}
</select>
<select id="queryByTriggerRelationCode" resultType="org.apache.dolphinscheduler.dao.entity.TriggerRelation">
select
<include refid="baseSql"/>
from t_ds_trigger_relation
WHERE trigger_code = #{triggerCode}
</select>
<select id="queryByTriggerRelationCodeAndType" resultType="org.apache.dolphinscheduler.dao.entity.TriggerRelation">
select
<include refid="baseSql"/>
from t_ds_trigger_relation
WHERE trigger_code = #{triggerCode} and trigger_type = #{triggerType}
</select>
<delete id="deleteByCode">
delete from t_ds_trigger_relation where triggerCode = #{triggerCode}
</delete>
<insert id="upsert">
INSERT INTO t_ds_trigger_relation (trigger_code, trigger_type, job_id, create_time, update_time) VALUES(
#{triggerRelation.triggerCode},#{triggerRelation.triggerType},#{triggerRelation.jobId},#{triggerRelation.createTime},#{triggerRelation.updateTime})
ON DUPLICATE KEY UPDATE update_time = #{triggerRelation.updateTime};
</insert>
</mapper>

16
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql

@ -2042,3 +2042,19 @@ CREATE TABLE t_ds_fav_task
user_id int NOT NULL, user_id int NOT NULL,
PRIMARY KEY (id) PRIMARY KEY (id)
); );
--
-- Table structure for t_ds_trigger_relation
--
DROP TABLE IF EXISTS `t_ds_trigger_relation`;
CREATE TABLE t_ds_trigger_relation
(
id bigint(20) NOT NULL AUTO_INCREMENT,
trigger_type int NOT NULL,
job_id int NOT NULL,
trigger_code bigint(20) NOT NULL,
create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE KEY t_ds_trigger_relation_UN(trigger_type,job_id,trigger_code)
);

12
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

@ -2018,3 +2018,15 @@ CREATE TABLE `t_ds_fav_task`
) ENGINE = InnoDB ) ENGINE = InnoDB
AUTO_INCREMENT = 1 AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8; DEFAULT CHARSET = utf8;
CREATE TABLE `t_ds_trigger_relation` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`trigger_type` int(11) NOT NULL DEFAULT '0' COMMENT '0 process 1 task',
`trigger_code` bigint(20) NOT NULL,
`job_id` bigint(20) NOT NULL,
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `t_ds_trigger_relation_trigger_code_IDX` (`trigger_code`),
UNIQUE KEY `t_ds_trigger_relation_UN` (`trigger_type`,`job_id`,`trigger_code`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

15
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

@ -1994,3 +1994,18 @@ CREATE TABLE t_ds_fav_task
user_id int NOT NULL, user_id int NOT NULL,
PRIMARY KEY (id) PRIMARY KEY (id)
); );
-- ----------------------------
-- Table structure for t_ds_trigger_relation
-- ----------------------------
DROP TABLE IF EXISTS t_ds_trigger_relation;
CREATE TABLE t_ds_trigger_relation (
id serial NOT NULL,
trigger_type int NOT NULL,
trigger_code bigint NOT NULL,
job_id bigint NOT NULL,
create_time timestamp DEFAULT NULL,
update_time timestamp DEFAULT NULL,
PRIMARY KEY (id),
CONSTRAINT t_ds_trigger_relation_unique UNIQUE (trigger_type,job_id,trigger_code)
);

15
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql

@ -121,6 +121,21 @@ delimiter ;
CALL uc_dolphin_T_t_ds_task_instance_R_test_flag; CALL uc_dolphin_T_t_ds_task_instance_R_test_flag;
DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_R_test_flag; DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_R_test_flag;
delimiter d//
CREATE TABLE `t_ds_trigger_relation` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`trigger_type` int(11) NOT NULL DEFAULT '0' COMMENT '0 process 1 task',
`trigger_code` bigint(20) NOT NULL,
`job_id` bigint(20) NOT NULL,
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `t_ds_trigger_relation_trigger_code_IDX` (`trigger_code`),
UNIQUE KEY `t_ds_trigger_relation_UN` (`trigger_type`,`job_id`,`trigger_code`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
d//
delimiter ;
-- uc_dolphin_T_t_ds_task_definition_R_is_cache -- uc_dolphin_T_t_ds_task_definition_R_is_cache
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_definition_R_is_cache; drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_definition_R_is_cache;
delimiter d// delimiter d//

15
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql

@ -120,6 +120,21 @@ delimiter ;
select uc_dolphin_T_t_ds_task_instance_R_test_flag(); select uc_dolphin_T_t_ds_task_instance_R_test_flag();
DROP FUNCTION uc_dolphin_T_t_ds_task_instance_R_test_flag(); DROP FUNCTION uc_dolphin_T_t_ds_task_instance_R_test_flag();
delimiter d//
DROP TABLE IF EXISTS t_ds_trigger_relation;
CREATE TABLE t_ds_trigger_relation (
id serial NOT NULL,
trigger_type int NOT NULL,
trigger_code bigint NOT NULL,
job_id bigint NOT NULL,
create_time timestamp DEFAULT NULL,
update_time timestamp DEFAULT NULL,
PRIMARY KEY (id),
CONSTRAINT t_ds_trigger_relation_unique UNIQUE (trigger_type,job_id,trigger_code)
);
d//
delimiter ;
ALTER TABLE t_ds_task_definition DROP COLUMN IF EXISTS is_cache; ALTER TABLE t_ds_task_definition DROP COLUMN IF EXISTS is_cache;
ALTER TABLE t_ds_task_definition ADD COLUMN IF NOT EXISTS is_cache int DEFAULT '0'; ALTER TABLE t_ds_task_definition ADD COLUMN IF NOT EXISTS is_cache int DEFAULT '0';

138
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TriggerRelationMapperTest.java

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.common.enums.ApiTriggerType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.TriggerRelation;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
/**
* trigger mapper test
*/
public class TriggerRelationMapperTest extends BaseDaoTest {
@Autowired
TriggerRelationMapper triggerRelationMapper;
/**
* test insert
*
* @return
*/
@Test
public void testInsert() {
TriggerRelation expectedObj = createTriggerRelation();
Assertions.assertTrue(expectedObj.getId() > 0);
}
/**
* test select by id
*
* @return
*/
@Test
public void testSelectById() {
TriggerRelation expectRelation = createTriggerRelation();
TriggerRelation actualRelation = triggerRelationMapper.selectById(expectRelation.getId());
Assertions.assertEquals(expectRelation, actualRelation);
}
/**
* test select by type and job id
*
* @return
*/
@Test
public void testQueryByTypeAndJobId() {
TriggerRelation expectRelation = createTriggerRelation();
TriggerRelation actualRelation = triggerRelationMapper.queryByTypeAndJobId(
expectRelation.getTriggerType(), expectRelation.getJobId());
Assertions.assertEquals(expectRelation, actualRelation);
}
/**
* test select by trigger code
*
* @return
*/
@Test
public void testQueryByTriggerRelationCode() {
TriggerRelation expectRelation = createTriggerRelation();
List<TriggerRelation> actualRelations = triggerRelationMapper.queryByTriggerRelationCode(
expectRelation.getTriggerCode());
Assertions.assertEquals(actualRelations.size(), 1);
}
/**
* test select by type and trigger code
*
* @return
*/
@Test
public void testQueryByTriggerRelationCodeAndType() {
TriggerRelation expectRelation = createTriggerRelation();
List<TriggerRelation> actualRelations = triggerRelationMapper.queryByTriggerRelationCodeAndType(
expectRelation.getTriggerCode(), expectRelation.getTriggerType());
Assertions.assertEquals(actualRelations.size(), 1);
}
@Test
public void testUpsert() {
TriggerRelation expectRelation = createTriggerRelation();
triggerRelationMapper.upsert(expectRelation);
TriggerRelation actualRelation = triggerRelationMapper.selectById(expectRelation.getId());
Assertions.assertEquals(expectRelation, actualRelation);
}
/**
* test delete
*/
@Test
public void testDelete() {
TriggerRelation expectRelation = createTriggerRelation();
triggerRelationMapper.deleteById(expectRelation.getId());
TriggerRelation actualRelation = triggerRelationMapper.selectById(expectRelation.getId());
Assertions.assertNull(actualRelation);
}
/**
* create TriggerRelation and insert
*
* @return TriggerRelation
* @throws Exception
*/
private TriggerRelation createTriggerRelation() {
TriggerRelation triggerRelation = new TriggerRelation();
triggerRelation.setTriggerCode(4567890);
triggerRelation.setTriggerType(ApiTriggerType.COMMAND.getCode());
triggerRelation.setJobId(99);
triggerRelation.setCreateTime(DateUtils.getCurrentDate());
triggerRelation.setUpdateTime(DateUtils.getCurrentDate());
triggerRelationMapper.insert(triggerRelation);
return triggerRelation;
}
}

1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -704,6 +704,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
command.setProcessInstanceId(0); command.setProcessInstanceId(0);
command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion()); command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
command.setTestFlag(processInstance.getTestFlag()); command.setTestFlag(processInstance.getTestFlag());
processService.saveCommandTrigger(command.getId(), processInstance.getId());
return commandService.createCommand(command); return commandService.createCommand(command);
} }

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -241,4 +241,6 @@ public interface ProcessService {
void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId); void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId);
Integer queryTestDataSourceId(Integer onlineDataSourceId); Integer queryTestDataSourceId(Integer onlineDataSourceId);
void saveCommandTrigger(Integer commandId, Integer processInstanceId);
} }

9
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -308,6 +308,8 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired @Autowired
private CommandService commandService; private CommandService commandService;
@Autowired
private TriggerRelationService triggerRelationService;
/** /**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction * handle Command (construct ProcessInstance from Command) , wrapped in transaction
* *
@ -336,12 +338,14 @@ public class ProcessServiceImpl implements ProcessService {
saveSerialProcess(processInstance, processDefinition); saveSerialProcess(processInstance, processDefinition);
if (processInstance.getState() != WorkflowExecutionStatus.SUBMITTED_SUCCESS) { if (processInstance.getState() != WorkflowExecutionStatus.SUBMITTED_SUCCESS) {
setSubProcessParam(processInstance); setSubProcessParam(processInstance);
triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());
deleteCommandWithCheck(command.getId()); deleteCommandWithCheck(command.getId());
return null; return null;
} }
} else { } else {
processInstanceDao.upsertProcessInstance(processInstance); processInstanceDao.upsertProcessInstance(processInstance);
} }
triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());
setSubProcessParam(processInstance); setSubProcessParam(processInstance);
deleteCommandWithCheck(command.getId()); deleteCommandWithCheck(command.getId());
return processInstance; return processInstance;
@ -2708,6 +2712,11 @@ public class ProcessServiceImpl implements ProcessService {
return null; return null;
} }
@Override
public void saveCommandTrigger(Integer commandId, Integer processInstanceId) {
triggerRelationService.saveCommandTrigger(commandId, processInstanceId);
}
private Set<String> getResourceFullNames(TaskDefinition taskDefinition) { private Set<String> getResourceFullNames(TaskDefinition taskDefinition) {
Set<String> resourceFullNames = null; Set<String> resourceFullNames = null;
AbstractParameters params = taskPluginManager.getParameters(ParametersNode.builder() AbstractParameters params = taskPluginManager.getParameters(ParametersNode.builder()

38
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/TriggerRelationService.java

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.process;
import org.apache.dolphinscheduler.common.enums.ApiTriggerType;
import org.apache.dolphinscheduler.dao.entity.TriggerRelation;
import org.springframework.stereotype.Component;
/**
* Trigger relation operator to dbbecause operator command process instance
*/
@Component
public interface TriggerRelationService {
void saveTriggerToDb(ApiTriggerType type, Long triggerCode, Integer jobId);
TriggerRelation queryByTypeAndJobId(ApiTriggerType apiTriggerType, int jobId);
int saveCommandTrigger(Integer commandId, Integer processInstanceId);
int saveProcessInstanceTrigger(Integer commandId, Integer processInstanceId);
}

73
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/TriggerRelationServiceImpl.java

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.process;
import org.apache.dolphinscheduler.common.enums.ApiTriggerType;
import org.apache.dolphinscheduler.dao.entity.TriggerRelation;
import org.apache.dolphinscheduler.dao.mapper.TriggerRelationMapper;
import java.util.Date;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* Trigger relation operator to db
*/
@Component
public class TriggerRelationServiceImpl implements TriggerRelationService {
@Autowired
private TriggerRelationMapper triggerRelationMapper;
@Override
public void saveTriggerToDb(ApiTriggerType type, Long triggerCode, Integer jobId) {
TriggerRelation triggerRelation = new TriggerRelation();
triggerRelation.setTriggerType(type.getCode());
triggerRelation.setJobId(jobId);
triggerRelation.setTriggerCode(triggerCode);
triggerRelation.setCreateTime(new Date());
triggerRelation.setUpdateTime(new Date());
triggerRelationMapper.upsert(triggerRelation);
}
@Override
public TriggerRelation queryByTypeAndJobId(ApiTriggerType apiTriggerType, int jobId) {
return triggerRelationMapper.queryByTypeAndJobId(apiTriggerType.getCode(), jobId);
}
@Override
public int saveCommandTrigger(Integer commandId, Integer processInstanceId) {
TriggerRelation exist = queryByTypeAndJobId(ApiTriggerType.PROCESS, processInstanceId);
if (exist == null) {
return 0;
}
saveTriggerToDb(ApiTriggerType.COMMAND, exist.getTriggerCode(), commandId);
return 1;
}
@Override
public int saveProcessInstanceTrigger(Integer commandId, Integer processInstanceId) {
TriggerRelation exist = queryByTypeAndJobId(ApiTriggerType.COMMAND, commandId);
if (exist == null) {
return 0;
}
saveTriggerToDb(ApiTriggerType.PROCESS, exist.getTriggerCode(), processInstanceId);
return 1;
}
}

7
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -183,9 +183,10 @@ public class ProcessServiceTest {
@Mock @Mock
TaskPluginManager taskPluginManager; TaskPluginManager taskPluginManager;
@Mock
private TriggerRelationService triggerRelationService;
@Test @Test
public void testHandleCommand() throws CronParseException, CodeGenerateUtils.CodeGenerateException { public void testHandleCommand() throws CronParseException, CodeGenerateUtils.CodeGenerateException {
// cannot construct process instance, return null; // cannot construct process instance, return null;
String host = "127.0.0.1"; String host = "127.0.0.1";
Command command = new Command(); Command command = new Command();
@ -238,6 +239,8 @@ public class ProcessServiceTest {
Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(), Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition)); processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance); Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
Mockito.when(triggerRelationService.saveProcessInstanceTrigger(Mockito.any(), Mockito.any()))
.thenReturn(1);
Assertions.assertNotNull(processService.handleCommand(host, command1)); Assertions.assertNotNull(processService.handleCommand(host, command1));
Command command2 = new Command(); Command command2 = new Command();
@ -410,6 +413,8 @@ public class ProcessServiceTest {
Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(), Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition)); processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance); Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
Mockito.when(triggerRelationService.saveProcessInstanceTrigger(Mockito.any(), Mockito.any()))
.thenReturn(1);
Assertions.assertThrows(ServiceException.class, () -> { Assertions.assertThrows(ServiceException.class, () -> {
// will throw exception when command id is 0 and delete fail // will throw exception when command id is 0 and delete fail

107
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/TriggerRelationServiceTest.java

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.process;
import org.apache.dolphinscheduler.common.enums.ApiTriggerType;
import org.apache.dolphinscheduler.dao.entity.TriggerRelation;
import org.apache.dolphinscheduler.dao.mapper.TriggerRelationMapper;
import org.apache.dolphinscheduler.service.cron.CronUtilsTest;
import java.util.Date;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Trigger Relation Service Test
*/
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class TriggerRelationServiceTest {
private static final Logger logger = LoggerFactory.getLogger(CronUtilsTest.class);
@InjectMocks
private TriggerRelationServiceImpl triggerRelationService;
@Mock
private TriggerRelationMapper triggerRelationMapper;
@Test
public void saveTriggerToDb() {
Mockito.doNothing().when(triggerRelationMapper).upsert(Mockito.any());
triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, 1234567890L, 100);
}
@Test
public void queryByTypeAndJobId() {
Mockito.doNothing().when(triggerRelationMapper).upsert(Mockito.any());
Mockito.when(triggerRelationMapper.queryByTypeAndJobId(ApiTriggerType.PROCESS.getCode(), 100))
.thenReturn(getTriggerTdoDb());
TriggerRelation triggerRelation1 = triggerRelationService.queryByTypeAndJobId(
ApiTriggerType.PROCESS, 100);
Assertions.assertNotNull(triggerRelation1);
TriggerRelation triggerRelation2 = triggerRelationService.queryByTypeAndJobId(
ApiTriggerType.PROCESS, 200);
Assertions.assertNull(triggerRelation2);
}
@Test
public void saveCommandTrigger() {
Mockito.doNothing().when(triggerRelationMapper).upsert(Mockito.any());
Mockito.when(triggerRelationMapper.queryByTypeAndJobId(ApiTriggerType.PROCESS.getCode(), 100))
.thenReturn(getTriggerTdoDb());
int result = -1;
result = triggerRelationService.saveCommandTrigger(1234567890, 100);
Assertions.assertTrue(result > 0);
result = triggerRelationService.saveCommandTrigger(1234567890, 200);
Assertions.assertTrue(result == 0);
}
@Test
public void saveProcessInstanceTrigger() {
Mockito.doNothing().when(triggerRelationMapper).upsert(Mockito.any());
Mockito.when(triggerRelationMapper.queryByTypeAndJobId(ApiTriggerType.COMMAND.getCode(), 100))
.thenReturn(getTriggerTdoDb());
int result = -1;
result = triggerRelationService.saveProcessInstanceTrigger(100, 1234567890);
Assertions.assertTrue(result > 0);
result = triggerRelationService.saveProcessInstanceTrigger(200, 1234567890);
Assertions.assertTrue(result == 0);
}
private TriggerRelation getTriggerTdoDb() {
TriggerRelation triggerRelation = new TriggerRelation();
triggerRelation.setTriggerType(ApiTriggerType.PROCESS.getCode());
triggerRelation.setJobId(100);
triggerRelation.setTriggerCode(1234567890L);
triggerRelation.setCreateTime(new Date());
triggerRelation.setUpdateTime(new Date());
return triggerRelation;
}
}
Loading…
Cancel
Save