Browse Source

[cherry-pick][DS-6849] add delete command check (#7074)

* delete command with check

* add test

Co-authored-by: caishunfeng <534328519@qq.com>
2.0.7-release
wind 3 years ago committed by GitHub
parent
commit
40e00585aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 125
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  2. 15
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

125
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -17,13 +17,29 @@
package org.apache.dolphinscheduler.service.process; package org.apache.dolphinscheduler.service.process;
import com.facebook.presto.jdbc.internal.guava.collect.Lists; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import com.fasterxml.jackson.core.type.TypeReference; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import com.fasterxml.jackson.databind.node.ObjectNode; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
import org.apache.commons.collections.CollectionUtils; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
import org.apache.commons.lang.StringUtils; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static java.util.stream.Collectors.toSet;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*; import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.DateInterval; import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
@ -34,27 +50,85 @@ import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters; import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper;
import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.util.*; import com.facebook.presto.jdbc.internal.guava.collect.Lists;
import java.util.Map.Entry; import com.fasterxml.jackson.core.type.TypeReference;
import java.util.stream.Collectors; import com.fasterxml.jackson.databind.node.ObjectNode;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.*;
/** /**
* process relative dao that some mappers in this. * process relative dao that some mappers in this.
@ -136,7 +210,6 @@ public class ProcessService {
* @param logger logger * @param logger logger
* @param host host * @param host host
* @param command found command * @param command found command
* @param processDefinitionCacheMaps
* @return process instance * @return process instance
*/ */
@Transactional @Transactional
@ -152,7 +225,7 @@ public class ProcessService {
processInstance.addHistoryCmd(command.getCommandType()); processInstance.addHistoryCmd(command.getCommandType());
saveProcessInstance(processInstance); saveProcessInstance(processInstance);
this.setSubProcessParam(processInstance); this.setSubProcessParam(processInstance);
this.commandMapper.deleteById(command.getId()); this.deleteCommandWithCheck(command.getId());
return processInstance; return processInstance;
} }
@ -202,10 +275,6 @@ public class ProcessService {
/** /**
* get command page * get command page
*
* @param pageSize
* @param pageNumber
* @return
*/ */
public List<Command> findCommandPage(int pageSize, int pageNumber) { public List<Command> findCommandPage(int pageSize, int pageNumber) {
return commandMapper.queryCommandPage(pageSize, pageNumber * pageSize); return commandMapper.queryCommandPage(pageSize, pageNumber * pageSize);
@ -947,11 +1016,6 @@ public class ProcessService {
/** /**
* retry submit task to db * retry submit task to db
*
* @param taskInstance
* @param commitRetryTimes
* @param commitInterval
* @return
*/ */
public TaskInstance submitTask(TaskInstance taskInstance, int commitRetryTimes, int commitInterval) { public TaskInstance submitTask(TaskInstance taskInstance, int commitRetryTimes, int commitInterval) {
@ -1672,8 +1736,6 @@ public class ProcessService {
/** /**
* for show in page of taskInstance * for show in page of taskInstance
*
* @param taskInstance
*/ */
public void changeOutParam(TaskInstance taskInstance) { public void changeOutParam(TaskInstance taskInstance) {
if (StringUtils.isEmpty(taskInstance.getVarPool())) { if (StringUtils.isEmpty(taskInstance.getVarPool())) {
@ -2401,4 +2463,11 @@ public class ProcessService {
} }
return processTaskMap; return processTaskMap;
} }
private void deleteCommandWithCheck(int commandId) {
int delete = this.commandMapper.deleteById(commandId);
if (delete != 1) {
throw new ServiceException("delete command fail, id:" + commandId);
}
}
} }

15
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.service.process;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
@ -253,10 +254,13 @@ public class ProcessServiceTest {
int processInstanceId = 222; int processInstanceId = 222;
//there is not enough thread for this command //there is not enough thread for this command
Command command1 = new Command(); Command command1 = new Command();
command1.setId(1);
command1.setProcessDefinitionCode(definitionCode); command1.setProcessDefinitionCode(definitionCode);
command1.setProcessDefinitionVersion(definitionVersion); command1.setProcessDefinitionVersion(definitionVersion);
command1.setCommandParam("{\"ProcessInstanceId\":222}"); command1.setCommandParam("{\"ProcessInstanceId\":222}");
command1.setCommandType(CommandType.START_PROCESS); command1.setCommandType(CommandType.START_PROCESS);
Mockito.when(commandMapper.deleteById(1)).thenReturn(1);
ProcessDefinition processDefinition = new ProcessDefinition(); ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(123); processDefinition.setId(123);
processDefinition.setName("test"); processDefinition.setName("test");
@ -273,31 +277,40 @@ public class ProcessServiceTest {
Assert.assertNotNull(processService.handleCommand(logger, host, command1, processDefinitionCacheMaps)); Assert.assertNotNull(processService.handleCommand(logger, host, command1, processDefinitionCacheMaps));
Command command2 = new Command(); Command command2 = new Command();
command2.setId(2);
command2.setCommandParam("{\"ProcessInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}"); command2.setCommandParam("{\"ProcessInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
command2.setProcessDefinitionCode(definitionCode); command2.setProcessDefinitionCode(definitionCode);
command2.setProcessDefinitionVersion(definitionVersion); command2.setProcessDefinitionVersion(definitionVersion);
command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS); command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
command2.setProcessInstanceId(processInstanceId); command2.setProcessInstanceId(processInstanceId);
Mockito.when(commandMapper.deleteById(2)).thenReturn(1);
Assert.assertNotNull(processService.handleCommand(logger, host, command2, processDefinitionCacheMaps)); Assert.assertNotNull(processService.handleCommand(logger, host, command2, processDefinitionCacheMaps));
Command command3 = new Command(); Command command3 = new Command();
command3.setId(3);
command3.setProcessDefinitionCode(definitionCode); command3.setProcessDefinitionCode(definitionCode);
command3.setProcessDefinitionVersion(definitionVersion); command3.setProcessDefinitionVersion(definitionVersion);
command3.setProcessInstanceId(processInstanceId); command3.setProcessInstanceId(processInstanceId);
command3.setCommandParam("{\"WaitingThreadInstanceId\":222}"); command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
Mockito.when(commandMapper.deleteById(3)).thenReturn(1);
Assert.assertNotNull(processService.handleCommand(logger, host, command3, processDefinitionCacheMaps)); Assert.assertNotNull(processService.handleCommand(logger, host, command3, processDefinitionCacheMaps));
Command command4 = new Command(); Command command4 = new Command();
command4.setId(4);
command4.setProcessDefinitionCode(definitionCode); command4.setProcessDefinitionCode(definitionCode);
command4.setProcessDefinitionVersion(definitionVersion); command4.setProcessDefinitionVersion(definitionVersion);
command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}"); command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
command4.setCommandType(CommandType.REPEAT_RUNNING); command4.setCommandType(CommandType.REPEAT_RUNNING);
command4.setProcessInstanceId(processInstanceId); command4.setProcessInstanceId(processInstanceId);
Mockito.when(commandMapper.deleteById(4)).thenReturn(1);
Assert.assertNotNull(processService.handleCommand(logger, host, command4, processDefinitionCacheMaps)); Assert.assertNotNull(processService.handleCommand(logger, host, command4, processDefinitionCacheMaps));
Command command5 = new Command(); Command command5 = new Command();
command5.setId(5);
command5.setProcessDefinitionCode(definitionCode); command5.setProcessDefinitionCode(definitionCode);
command5.setProcessDefinitionVersion(definitionVersion); command5.setProcessDefinitionVersion(definitionVersion);
HashMap<String, String> startParams = new HashMap<>(); HashMap<String, String> startParams = new HashMap<>();
@ -307,6 +320,8 @@ public class ProcessServiceTest {
command5.setCommandParam(JSONUtils.toJsonString(commandParams)); command5.setCommandParam(JSONUtils.toJsonString(commandParams));
command5.setCommandType(CommandType.START_PROCESS); command5.setCommandType(CommandType.START_PROCESS);
command5.setDryRun(Constants.DRY_RUN_FLAG_NO); command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
Mockito.when(commandMapper.deleteById(5)).thenReturn(1);
ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5, processDefinitionCacheMaps); ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5, processDefinitionCacheMaps);
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\"")); Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
} }

Loading…
Cancel
Save