diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 41ac529318..2ba1ccd38f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -77,6 +77,7 @@ import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeComman import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.cron.CronUtils; import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -130,6 +131,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ @Autowired private ProcessService processService; + @Autowired + private CommandService commandService; + @Autowired private ProcessInstanceDao processInstanceDao; @@ -626,7 +630,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ command.setProcessDefinitionVersion(processVersion); command.setProcessInstanceId(instanceId); command.setTestFlag(testFlag); - if (!processService.verifyIsNeedCreateCommand(command)) { + if (!commandService.verifyIsNeedCreateCommand(command)) { logger.warn( "Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.", processDefinitionCode, processVersion, instanceId); @@ -635,7 +639,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ } logger.info("Creating command, commandInfo:{}.", command); - int create = processService.createCommand(command); + int create = commandService.createCommand(command); if (create > 0) { logger.info("Create {} command complete, processDefinitionCode:{}, processDefinitionVersion:{}.", @@ -784,7 +788,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ } else { command.setCommandParam(JSONUtils.toJsonString(cmdParam)); logger.info("Creating command, commandInfo:{}.", command); - return processService.createCommand(command); + return commandService.createCommand(command); } } @@ -824,26 +828,28 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); logger.info("Creating command, commandInfo:{}.", command); - createCount = processService.createCommand(command); - if (createCount > 0) + createCount = commandService.createCommand(command); + if (createCount > 0) { logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); - else + } else { logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); + } } if (startDate != null && endDate != null) { cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startDate); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endDate); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); logger.info("Creating command, commandInfo:{}.", command); - createCount = processService.createCommand(command); - if (createCount > 0) + createCount = commandService.createCommand(command); + if (createCount > 0) { logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); - else + } else { logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); + } // dependent process definition List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode( command.getProcessDefinitionCode()); @@ -904,12 +910,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ DateUtils.dateToString(listDate.get(endDateIndex))); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); logger.info("Creating command, commandInfo:{}.", command); - if (processService.createCommand(command) > 0) + if (commandService.createCommand(command) > 0) { logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); - else + } else { logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); + } if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { logger.info( "Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", @@ -937,12 +944,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, stringDate)); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); logger.info("Creating command, commandInfo:{}.", command); - if (processService.createCommand(command) > 0) + if (commandService.createCommand(command) > 0) { logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); - else + } else { logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); + } } } } @@ -985,7 +993,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ cmdParam.put(CMD_PARAM_START_NODES, String.valueOf(dependentProcessDefinition.getTaskDefinitionCode())); dependentCommand.setCommandParam(JSONUtils.toJsonString(cmdParam)); logger.info("Creating complement dependent command, commandInfo:{}.", command); - dependentProcessDefinitionCreateCount += processService.createCommand(dependentCommand); + dependentProcessDefinitionCreateCount += commandService.createCommand(dependentCommand); } return dependentProcessDefinitionCreateCount; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index 58889636c0..3cd27f7834 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -57,6 +57,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; +import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.ArrayList; @@ -102,6 +103,9 @@ public class ExecutorServiceTest { @Mock private ProcessService processService; + @Mock + private CommandService commandService; + @Mock private ProcessDefinitionMapper processDefinitionMapper; @@ -194,8 +198,8 @@ public class ExecutorServiceTest { .thenReturn(checkProjectAndAuth()); Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition); Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant()); - doReturn(1).when(processService).createCommand(argThat(c -> c.getId() == null)); - doReturn(0).when(processService).createCommand(argThat(c -> c.getId() != null)); + doReturn(1).when(commandService).createCommand(argThat(c -> c.getId() == null)); + doReturn(0).when(commandService).createCommand(argThat(c -> c.getId() != null)); Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList()); Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)) .thenReturn(Optional.ofNullable(processInstance)); @@ -230,7 +234,7 @@ public class ExecutorServiceTest { Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - verify(processService, times(1)).createCommand(any(Command.class)); + verify(commandService, times(1)).createCommand(any(Command.class)); } @@ -253,7 +257,7 @@ public class ExecutorServiceTest { Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - verify(processService, times(1)).createCommand(any(Command.class)); + verify(commandService, times(1)).createCommand(any(Command.class)); } @@ -320,7 +324,7 @@ public class ExecutorServiceTest { Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); Assertions.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); - verify(processService, times(0)).createCommand(any(Command.class)); + verify(commandService, times(0)).createCommand(any(Command.class)); } /** @@ -342,7 +346,7 @@ public class ExecutorServiceTest { Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - verify(processService, times(1)).createCommand(any(Command.class)); + verify(commandService, times(1)).createCommand(any(Command.class)); } /** @@ -364,7 +368,7 @@ public class ExecutorServiceTest { Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - verify(processService, times(31)).createCommand(any(Command.class)); + verify(commandService, times(31)).createCommand(any(Command.class)); } @@ -387,7 +391,7 @@ public class ExecutorServiceTest { Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); - verify(processService, times(15)).createCommand(any(Command.class)); + verify(commandService, times(15)).createCommand(any(Command.class)); } @@ -411,7 +415,7 @@ public class ExecutorServiceTest { @Test public void testExecuteRepeatRunning() { - Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true); + Mockito.when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN)) .thenReturn(checkProjectAndAuth()); Map result = @@ -421,7 +425,7 @@ public class ExecutorServiceTest { @Test public void testOfTestRun() { - Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true); + Mockito.when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN)) .thenReturn(checkProjectAndAuth()); Map result = executorService.execProcessInstance(loginUser, projectCode, diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java index 377dff7191..80616048e2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java @@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; +import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.utils.LoggerUtils; @@ -66,6 +67,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl @Autowired private ProcessService processService; + @Autowired + private CommandService commandService; + @Autowired private ProcessInstanceDao processInstanceDao; @@ -172,6 +176,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl "The workflow instance is already been cached, this case shouldn't be happened"); } WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance, + commandService, processService, processInstanceDao, nettyExecutorManager, @@ -225,7 +230,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl } } catch (Exception e) { logger.error("Master handle command {} error ", command.getId(), e); - processService.moveToErrorCommand(command, e.toString()); + commandService.moveToErrorCommand(command, e.toString()); } finally { latch.countDown(); } @@ -254,7 +259,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl int pageNumber = 0; int pageSize = masterConfig.getFetchCommandNum(); final List result = - processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot); + commandService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot); if (CollectionUtils.isNotEmpty(result)) { logger.info( "Master schedule bootstrap loop command success, command size: {}, current slot: {}, total slot size: {}", diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 1a896bf9c5..6b7925a97f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -76,6 +76,7 @@ import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor; import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; +import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.cron.CronUtils; import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.expand.CuringParamsService; @@ -127,6 +128,8 @@ public class WorkflowExecuteRunnable implements Callable { private final ProcessService processService; + private final CommandService commandService; + private ProcessInstanceDao processInstanceDao; private final ProcessAlertManager processAlertManager; @@ -233,6 +236,7 @@ public class WorkflowExecuteRunnable implements Callable { */ public WorkflowExecuteRunnable( @NonNull ProcessInstance processInstance, + @NonNull CommandService commandService, @NonNull ProcessService processService, @NonNull ProcessInstanceDao processInstanceDao, @NonNull NettyExecutorManager nettyExecutorManager, @@ -241,6 +245,7 @@ public class WorkflowExecuteRunnable implements Callable { @NonNull StateWheelExecuteThread stateWheelExecuteThread, @NonNull CuringParamsService curingParamsService) { this.processService = processService; + this.commandService = commandService; this.processInstanceDao = processInstanceDao; this.processInstance = processInstance; this.nettyExecutorManager = nettyExecutorManager; @@ -657,7 +662,7 @@ public class WorkflowExecuteRunnable implements Callable { command.setProcessInstanceId(0); command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion()); command.setTestFlag(processInstance.getTestFlag()); - return processService.createCommand(command); + return commandService.createCommand(command); } private boolean needComplementProcess() { @@ -750,7 +755,7 @@ public class WorkflowExecuteRunnable implements Callable { command.setProcessDefinitionCode(processDefinition.getCode()); command.setProcessDefinitionVersion(processDefinition.getVersion()); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); - processService.createCommand(command); + commandService.createCommand(command); } /** diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java index 755311b30f..6fb703511f 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java @@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -72,6 +73,8 @@ public class WorkflowExecuteRunnableTest { private ProcessService processService; + private CommandService commandService; + private ProcessInstanceDao processInstanceDao; private MasterConfig config; @@ -90,6 +93,7 @@ public class WorkflowExecuteRunnableTest { config = new MasterConfig(); processService = Mockito.mock(ProcessService.class); + commandService = Mockito.mock(CommandService.class); processInstanceDao = Mockito.mock(ProcessInstanceDao.class); processInstance = Mockito.mock(ProcessInstance.class); Map cmdParam = new HashMap<>(); @@ -105,7 +109,8 @@ public class WorkflowExecuteRunnableTest { NettyExecutorManager nettyExecutorManager = Mockito.mock(NettyExecutorManager.class); ProcessAlertManager processAlertManager = Mockito.mock(ProcessAlertManager.class); workflowExecuteThread = Mockito.spy( - new WorkflowExecuteRunnable(processInstance, processService, processInstanceDao, nettyExecutorManager, + new WorkflowExecuteRunnable(processInstance, commandService, processService, processInstanceDao, + nettyExecutorManager, processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService)); Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag"); dag.setAccessible(true); diff --git a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java index 31251c108e..79deabf543 100644 --- a/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java +++ b/dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.scheduler.quartz.utils.QuartzTaskUtils; +import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.lang3.StringUtils; @@ -49,6 +50,9 @@ public class ProcessScheduleTask extends QuartzJobBean { @Autowired private ProcessService processService; + @Autowired + private CommandService commandService; + @Counted(value = "ds.master.quartz.job.executed") @Timed(value = "ds.master.quartz.job.execution.time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true) @Override @@ -100,7 +104,7 @@ public class ProcessScheduleTask extends QuartzJobBean { command.setProcessInstancePriority(schedule.getProcessInstancePriority()); command.setProcessDefinitionVersion(processDefinition.getVersion()); - processService.createCommand(command); + commandService.createCommand(command); } private void deleteJob(JobExecutionContext context, int projectId, int scheduleId) { diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java new file mode 100644 index 0000000000..48d0a1bbae --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.command; + +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; + +import java.util.List; + +/** + * Command Service + */ +public interface CommandService { + + /** + * Save error command, and delete original command. If the given command has already been moved into error command, + * will throw {@link java.sql.SQLIntegrityConstraintViolationException ). + * @param command command + * @param message message + */ + void moveToErrorCommand(Command command, String message); + + /** + * Create new command + * @param command command + * @return result + */ + int createCommand(Command command); + + /** + * Get command page + * @param pageSize page size + * @param pageNumber page number + * @param masterCount master count + * @param thisMasterSlot master slot + * @return command page + */ + List findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot); + + /** + * check the input command exists in queue list + * + * @param command command + * @return create command result + */ + boolean verifyIsNeedCreateCommand(Command command); + + /** + * create recovery waiting thread command when thread pool is not enough for the process instance. + * sub work process instance need not create recovery command. + * create recovery waiting thread command and delete origin command at the same time. + * if the recovery command is exists, only update the field update_time + * + * @param originCommand originCommand + * @param processInstance processInstance + */ + void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance); + + /** + * create sub work process command + * @param parentProcessInstance parent process instance + * @param childInstance child process instance + * @param instanceMap process instance map + * @param task task instance + * @return command + */ + Command createSubProcessCommand(ProcessInstance parentProcessInstance, + ProcessInstance childInstance, + ProcessInstanceMap instanceMap, + TaskInstance task); +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java new file mode 100644 index 0000000000..62ce978a84 --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.command; + +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.ErrorCommand; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; +import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.mapper.CommandMapper; +import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; +import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.service.utils.ParamUtils; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.Date; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; +import io.micrometer.core.annotation.Counted; + +/** + * Command Service implementation + */ +@Component +public class CommandServiceImpl implements CommandService { + + private final Logger logger = LoggerFactory.getLogger(CommandServiceImpl.class); + + @Autowired + private ErrorCommandMapper errorCommandMapper; + + @Autowired + private CommandMapper commandMapper; + + @Autowired + private ScheduleMapper scheduleMapper; + + @Autowired + private ProcessDefinitionMapper processDefineMapper; + + @Override + public void moveToErrorCommand(Command command, String message) { + ErrorCommand errorCommand = new ErrorCommand(command, message); + this.errorCommandMapper.insert(errorCommand); + this.commandMapper.deleteById(command.getId()); + } + + @Override + @Counted("ds.workflow.create.command.count") + public int createCommand(Command command) { + int result = 0; + if (command == null) { + return result; + } + // add command timezone + Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(command.getProcessDefinitionCode()); + if (schedule != null) { + Map commandParams = + StringUtils.isNotBlank(command.getCommandParam()) ? JSONUtils.toMap(command.getCommandParam()) + : new HashMap<>(); + commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId()); + command.setCommandParam(JSONUtils.toJsonString(commandParams)); + } + command.setId(null); + result = commandMapper.insert(command); + return result; + } + + @Override + public List findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) { + if (masterCount <= 0) { + return Lists.newArrayList(); + } + return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot); + } + + @Override + public boolean verifyIsNeedCreateCommand(Command command) { + boolean isNeedCreate = true; + EnumMap cmdTypeMap = new EnumMap<>(CommandType.class); + cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1); + cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1); + cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1); + CommandType commandType = command.getCommandType(); + + if (!cmdTypeMap.containsKey(commandType)) { + return true; + } + + ObjectNode cmdParamObj = JSONUtils.parseObject(command.getCommandParam()); + int processInstanceId = cmdParamObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt(); + + List commands = commandMapper.selectList(null); + // for all commands + for (Command tmpCommand : commands) { + if (!cmdTypeMap.containsKey(tmpCommand.getCommandType())) { + continue; + } + ObjectNode tempObj = JSONUtils.parseObject(tmpCommand.getCommandParam()); + if (tempObj != null + && processInstanceId == tempObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt()) { + isNeedCreate = false; + break; + } + } + return isNeedCreate; + } + + @Override + public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) { + // sub process doesn't need to create wait command + if (processInstance.getIsSubProcess() == Flag.YES) { + if (originCommand != null) { + commandMapper.deleteById(originCommand.getId()); + } + return; + } + Map cmdParam = new HashMap<>(); + cmdParam.put(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD, String.valueOf(processInstance.getId())); + // process instance quit by "waiting thread" state + if (originCommand == null) { + Command command = new Command( + CommandType.RECOVER_WAITING_THREAD, + processInstance.getTaskDependType(), + processInstance.getFailureStrategy(), + processInstance.getExecutorId(), + processInstance.getProcessDefinition().getCode(), + JSONUtils.toJsonString(cmdParam), + processInstance.getWarningType(), + processInstance.getWarningGroupId(), + processInstance.getScheduleTime(), + processInstance.getWorkerGroup(), + processInstance.getEnvironmentCode(), + processInstance.getProcessInstancePriority(), + processInstance.getDryRun(), + processInstance.getId(), + processInstance.getProcessDefinitionVersion(), + processInstance.getTestFlag()); + upsertCommand(command); + return; + } + + // update the command time if current command is recover from waiting + if (originCommand.getCommandType() == CommandType.RECOVER_WAITING_THREAD) { + originCommand.setUpdateTime(new Date()); + upsertCommand(originCommand); + } else { + // delete old command and create new waiting thread command + commandMapper.deleteById(originCommand.getId()); + originCommand.setId(0); + originCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD); + originCommand.setUpdateTime(new Date()); + originCommand.setCommandParam(JSONUtils.toJsonString(cmdParam)); + originCommand.setProcessInstancePriority(processInstance.getProcessInstancePriority()); + upsertCommand(originCommand); + } + } + + private int upsertCommand(@NotNull Command command) { + if (command.getId() != null) { + return commandMapper.updateById(command); + } else { + return commandMapper.insert(command); + } + } + + @Override + public Command createSubProcessCommand(ProcessInstance parentProcessInstance, ProcessInstance childInstance, + ProcessInstanceMap instanceMap, TaskInstance task) { + CommandType commandType = getSubCommandType(parentProcessInstance, childInstance); + Map subProcessParam = JSONUtils.toMap(task.getTaskParams(), String.class, Object.class); + long childDefineCode = 0L; + if (subProcessParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)) { + try { + childDefineCode = + Long.parseLong( + String.valueOf(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE))); + } catch (NumberFormatException nfe) { + logger.error("processDefinitionCode is not a number", nfe); + return null; + } + } + ProcessDefinition subProcessDefinition = processDefineMapper.queryByCode(childDefineCode); + + Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS); + List allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); + Map globalMap = ParamUtils.getGlobalParamMap(task.getVarPool()); + Map fatherParams = new HashMap<>(); + if (CollectionUtils.isNotEmpty(allParam)) { + for (Property info : allParam) { + if (Direct.OUT == info.getDirect()) { + continue; + } + fatherParams.put(info.getProp(), globalMap.get(info.getProp())); + } + } + String processParam = ParamUtils.getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams); + int subProcessInstanceId = + childInstance == null ? 0 : (childInstance.getId() == null ? 0 : childInstance.getId()); + return new Command( + commandType, + TaskDependType.TASK_POST, + parentProcessInstance.getFailureStrategy(), + parentProcessInstance.getExecutorId(), + subProcessDefinition.getCode(), + processParam, + parentProcessInstance.getWarningType(), + parentProcessInstance.getWarningGroupId(), + parentProcessInstance.getScheduleTime(), + task.getWorkerGroup(), + task.getEnvironmentCode(), + parentProcessInstance.getProcessInstancePriority(), + parentProcessInstance.getDryRun(), + subProcessInstanceId, + subProcessDefinition.getVersion(), + parentProcessInstance.getTestFlag()); + } + + /** + * get sub work flow command type + * child instance exist: child command = fatherCommand + * child instance not exists: child command = fatherCommand[0] + */ + private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) { + CommandType commandType = parentProcessInstance.getCommandType(); + if (childInstance == null) { + String fatherHistoryCommand = parentProcessInstance.getHistoryCmd(); + commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]); + } + return commandType; + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index a18d751205..6f85ed3e0b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -68,16 +68,6 @@ public interface ProcessService { ProcessInstance handleCommand(String host, Command command) throws CronParseException, CodeGenerateUtils.CodeGenerateException; - void moveToErrorCommand(Command command, String message); - - int createCommand(Command command); - - List findCommandPage(int pageSize, int pageNumber); - - List findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot); - - boolean verifyIsNeedCreateCommand(Command command); - Optional findProcessInstanceDetailById(int processId); List getTaskNodeListByDefinition(long defineCode); @@ -100,8 +90,6 @@ public interface ProcessService { void recurseFindSubProcess(long parentCode, List ids); - void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance); - Tenant getTenantForProcess(int tenantId, int userId); Environment findEnvironmentByCode(Long environmentCode); @@ -116,19 +104,10 @@ public interface ProcessService { void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task); - Map getGlobalParamMap(String globalParams); - - Command createSubProcessCommand(ProcessInstance parentProcessInstance, - ProcessInstance childInstance, - ProcessInstanceMap instanceMap, - TaskInstance task); - TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance); TaskExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance); - int saveCommand(Command command); - boolean saveTaskInstance(TaskInstance taskInstance); boolean createTaskInstance(TaskInstance taskInstance); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 8277dc6d71..2ef495e942 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -22,8 +22,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; @@ -61,7 +59,6 @@ import org.apache.dolphinscheduler.dao.entity.DqRuleExecuteSql; import org.apache.dolphinscheduler.dao.entity.DqRuleInputEntry; import org.apache.dolphinscheduler.dao.entity.DqTaskStatisticsValue; import org.apache.dolphinscheduler.dao.entity.Environment; -import org.apache.dolphinscheduler.dao.entity.ErrorCommand; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; @@ -126,6 +123,7 @@ import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand; import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.cron.CronUtils; import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.exceptions.ServiceException; @@ -138,12 +136,10 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; -import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -169,7 +165,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Joiner; import com.google.common.base.Strings; import com.google.common.collect.Lists; -import io.micrometer.core.annotation.Counted; /** * process relative dao that some mappers in this. @@ -285,6 +280,9 @@ public class ProcessServiceImpl implements ProcessService { @Autowired private LogClient logClient; + @Autowired + private CommandService commandService; + /** * handle Command (construct ProcessInstance from Command) , wrapped in transaction * @@ -300,7 +298,7 @@ public class ProcessServiceImpl implements ProcessService { // cannot construct process instance, return null if (processInstance == null) { logger.error("scan command, command parameter is error: {}", command); - moveToErrorCommand(command, "process instance is null"); + commandService.moveToErrorCommand(command, "process instance is null"); return null; } processInstance.setCommandType(command.getCommandType()); @@ -387,101 +385,6 @@ public class ProcessServiceImpl implements ProcessService { } } - /** - * Save error command, and delete original command. If the given command has already been moved into error command, - * will throw {@link java.sql.SQLIntegrityConstraintViolationException ). - * - * @param command command - * @param message message - */ - @Override - public void moveToErrorCommand(Command command, String message) { - ErrorCommand errorCommand = new ErrorCommand(command, message); - this.errorCommandMapper.insert(errorCommand); - this.commandMapper.deleteById(command.getId()); - } - - /** - * insert one command - * - * @param command command - * @return create result - */ - @Override - @Counted("ds.workflow.create.command.count") - public int createCommand(Command command) { - int result = 0; - if (command == null) { - return result; - } - // add command timezone - Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(command.getProcessDefinitionCode()); - if (schedule != null) { - Map commandParams = - StringUtils.isNotBlank(command.getCommandParam()) ? JSONUtils.toMap(command.getCommandParam()) - : new HashMap<>(); - commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId()); - command.setCommandParam(JSONUtils.toJsonString(commandParams)); - } - command.setId(null); - result = commandMapper.insert(command); - return result; - } - - /** - * get command page - */ - @Override - public List findCommandPage(int pageSize, int pageNumber) { - return commandMapper.queryCommandPage(pageSize, pageNumber * pageSize); - } - - /** - * get command page - */ - @Override - public List findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) { - if (masterCount <= 0) { - return Lists.newArrayList(); - } - return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot); - } - - /** - * check the input command exists in queue list - * - * @param command command - * @return create command result - */ - @Override - public boolean verifyIsNeedCreateCommand(Command command) { - boolean isNeedCreate = true; - EnumMap cmdTypeMap = new EnumMap<>(CommandType.class); - cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1); - cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1); - cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1); - CommandType commandType = command.getCommandType(); - - if (cmdTypeMap.containsKey(commandType)) { - ObjectNode cmdParamObj = JSONUtils.parseObject(command.getCommandParam()); - int processInstanceId = cmdParamObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt(); - - List commands = commandMapper.selectList(null); - // for all commands - for (Command tmpCommand : commands) { - if (cmdTypeMap.containsKey(tmpCommand.getCommandType())) { - ObjectNode tempObj = JSONUtils.parseObject(tmpCommand.getCommandParam()); - if (tempObj != null - && processInstanceId == tempObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt()) { - isNeedCreate = false; - break; - } - } - } - } - return isNeedCreate; - } - /** * find process instance detail by id * @@ -670,66 +573,6 @@ public class ProcessServiceImpl implements ProcessService { } } - /** - * create recovery waiting thread command when thread pool is not enough for the process instance. - * sub work process instance need not to create recovery command. - * create recovery waiting thread command and delete origin command at the same time. - * if the recovery command is exists, only update the field update_time - * - * @param originCommand originCommand - * @param processInstance processInstance - */ - @Override - public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) { - - // sub process doesnot need to create wait command - if (processInstance.getIsSubProcess() == Flag.YES) { - if (originCommand != null) { - commandMapper.deleteById(originCommand.getId()); - } - return; - } - Map cmdParam = new HashMap<>(); - cmdParam.put(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD, String.valueOf(processInstance.getId())); - // process instance quit by "waiting thread" state - if (originCommand == null) { - Command command = new Command( - CommandType.RECOVER_WAITING_THREAD, - processInstance.getTaskDependType(), - processInstance.getFailureStrategy(), - processInstance.getExecutorId(), - processInstance.getProcessDefinition().getCode(), - JSONUtils.toJsonString(cmdParam), - processInstance.getWarningType(), - processInstance.getWarningGroupId(), - processInstance.getScheduleTime(), - processInstance.getWorkerGroup(), - processInstance.getEnvironmentCode(), - processInstance.getProcessInstancePriority(), - processInstance.getDryRun(), - processInstance.getId(), - processInstance.getProcessDefinitionVersion(), - processInstance.getTestFlag()); - saveCommand(command); - return; - } - - // update the command time if current command if recover from waiting - if (originCommand.getCommandType() == CommandType.RECOVER_WAITING_THREAD) { - originCommand.setUpdateTime(new Date()); - saveCommand(originCommand); - } else { - // delete old command and create new waiting thread command - commandMapper.deleteById(originCommand.getId()); - originCommand.setId(0); - originCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD); - originCommand.setUpdateTime(new Date()); - originCommand.setCommandParam(JSONUtils.toJsonString(cmdParam)); - originCommand.setProcessInstancePriority(processInstance.getProcessInstancePriority()); - saveCommand(originCommand); - } - } - /** * get schedule time from command * @@ -1445,105 +1288,18 @@ public class ProcessServiceImpl implements ProcessService { logger.info("sub process instance {} status is success, so skip creating command", childInstance.getId()); return; } - Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task); + Command subProcessCommand = + commandService.createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task); + if (subProcessCommand == null) { + logger.error("create sub process command failed, so skip creating command"); + return; + } updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionCode()); initSubInstanceState(childInstance); - createCommand(subProcessCommand); + commandService.createCommand(subProcessCommand); logger.info("sub process command created: {} ", subProcessCommand); } - /** - * complement data needs transform parent parameter to child. - */ - protected String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance, - Map fatherParams) { - // set sub work process command - String processMapStr = JSONUtils.toJsonString(instanceMap); - Map cmdParam = JSONUtils.toMap(processMapStr); - if (parentProcessInstance.isComplementData()) { - Map parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam()); - String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); - String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); - String scheduleTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST); - if (StringUtils.isNotEmpty(startTime) && StringUtils.isNotEmpty(endTime)) { - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime); - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime); - } - if (StringUtils.isNotEmpty(scheduleTime)) { - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, scheduleTime); - } - processMapStr = JSONUtils.toJsonString(cmdParam); - } - if (MapUtils.isNotEmpty(fatherParams)) { - cmdParam.put(CMD_PARAM_FATHER_PARAMS, JSONUtils.toJsonString(fatherParams)); - processMapStr = JSONUtils.toJsonString(cmdParam); - } - return processMapStr; - } - - @Override - public Map getGlobalParamMap(String globalParams) { - List propList; - Map globalParamMap = new HashMap<>(); - if (!Strings.isNullOrEmpty(globalParams)) { - propList = JSONUtils.toList(globalParams, Property.class); - globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); - } - - return globalParamMap; - } - - /** - * create sub work process command - */ - @Override - public Command createSubProcessCommand(ProcessInstance parentProcessInstance, - ProcessInstance childInstance, - ProcessInstanceMap instanceMap, - TaskInstance task) { - CommandType commandType = getSubCommandType(parentProcessInstance, childInstance); - Map subProcessParam = JSONUtils.toMap(task.getTaskParams(), String.class, Object.class); - long childDefineCode = 0L; - if (subProcessParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)) { - childDefineCode = - Long.parseLong(String.valueOf(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE))); - } - ProcessDefinition subProcessDefinition = processDefineMapper.queryByCode(childDefineCode); - - Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS); - List allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); - Map globalMap = this.getGlobalParamMap(task.getVarPool()); - Map fatherParams = new HashMap<>(); - if (CollectionUtils.isNotEmpty(allParam)) { - for (Property info : allParam) { - if (Direct.OUT == info.getDirect()) { - continue; - } - fatherParams.put(info.getProp(), globalMap.get(info.getProp())); - } - } - String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams); - int subProcessInstanceId = - childInstance == null ? 0 : (childInstance.getId() == null ? 0 : childInstance.getId()); - return new Command( - commandType, - TaskDependType.TASK_POST, - parentProcessInstance.getFailureStrategy(), - parentProcessInstance.getExecutorId(), - subProcessDefinition.getCode(), - processParam, - parentProcessInstance.getWarningType(), - parentProcessInstance.getWarningGroupId(), - parentProcessInstance.getScheduleTime(), - task.getWorkerGroup(), - task.getEnvironmentCode(), - parentProcessInstance.getProcessInstancePriority(), - parentProcessInstance.getDryRun(), - subProcessInstanceId, - subProcessDefinition.getVersion(), - parentProcessInstance.getTestFlag()); - } - /** * initialize sub work flow state * child instance state would be initialized when 'recovery from pause/stop/failure' @@ -1681,21 +1437,6 @@ public class ProcessServiceImpl implements ProcessService { return true; } - /** - * insert or update command - * - * @param command command - * @return save command result - */ - @Override - public int saveCommand(Command command) { - if (command.getId() != null) { - return commandMapper.updateById(command); - } else { - return commandMapper.insert(command); - } - } - /** * insert or update task instance * @@ -2112,7 +1853,7 @@ public class ProcessServiceImpl implements ProcessService { cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS); cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority()); cmd.setTestFlag(processInstance.getTestFlag()); - createCommand(cmd); + commandService.createCommand(cmd); } /** diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ParamUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ParamUtils.java new file mode 100644 index 0000000000..b2bea95d8b --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ParamUtils.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.utils; + +import static org.apache.dolphinscheduler.common.Constants.*; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; + +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import com.google.common.base.Strings; + +/** + * Param Utility class + */ +public class ParamUtils { + + /** + * convert globalParams string to global parameter map + * @param globalParams globalParams + * @return parameter map + */ + public static Map getGlobalParamMap(String globalParams) { + List propList; + Map globalParamMap = new HashMap<>(); + if (!Strings.isNullOrEmpty(globalParams)) { + propList = JSONUtils.toList(globalParams, Property.class); + globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); + } + return globalParamMap; + } + + /** + * Get sub workflow parameters + * @param instanceMap process instance map + * @param parentProcessInstance parent process instance + * @param fatherParams fatherParams + * @return sub workflow parameters + */ + public static String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance, + Map fatherParams) { + // set sub work process command + String processMapStr = JSONUtils.toJsonString(instanceMap); + Map cmdParam = JSONUtils.toMap(processMapStr); + if (parentProcessInstance.isComplementData()) { + Map parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam()); + String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE); + String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE); + String scheduleTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST); + if (StringUtils.isNotEmpty(startTime) && StringUtils.isNotEmpty(endTime)) { + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime); + } + if (StringUtils.isNotEmpty(scheduleTime)) { + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, scheduleTime); + } + processMapStr = JSONUtils.toJsonString(cmdParam); + } + if (MapUtils.isNotEmpty(fatherParams)) { + cmdParam.put(CMD_PARAM_FATHER_PARAMS, JSONUtils.toJsonString(fatherParams)); + processMapStr = JSONUtils.toJsonString(cmdParam); + } + return processMapStr; + } + +} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/CommandServiceImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/CommandServiceImplTest.java new file mode 100644 index 0000000000..739f8a099f --- /dev/null +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/CommandServiceImplTest.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.command; + +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.mockito.ArgumentMatchers.anyString; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.mapper.CommandMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import com.fasterxml.jackson.databind.JsonNode; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class CommandServiceImplTest { + + @InjectMocks + private CommandServiceImpl commandService; + + @Mock + private CommandMapper commandMapper; + + @Mock + private ProcessDefinitionMapper processDefineMapper; + + @Mock + private ScheduleMapper scheduleMapper; + + @Test + public void testCreateSubCommand() { + ProcessInstance parentInstance = new ProcessInstance(); + parentInstance.setWarningType(WarningType.SUCCESS); + parentInstance.setWarningGroupId(0); + + TaskInstance task = new TaskInstance(); + task.setTaskParams("{\"processDefinitionCode\":10}}"); + task.setId(10); + task.setTaskCode(1L); + task.setTaskDefinitionVersion(1); + + ProcessInstance childInstance = null; + ProcessInstanceMap instanceMap = new ProcessInstanceMap(); + instanceMap.setParentProcessInstanceId(1); + instanceMap.setParentTaskInstanceId(10); + Command command; + + // father history: start; child null == command type: start + parentInstance.setHistoryCmd("START_PROCESS"); + parentInstance.setCommandType(CommandType.START_PROCESS); + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setCode(10L); + Mockito.when(processDefineMapper.queryByDefineId(100)).thenReturn(processDefinition); + Mockito.when(processDefineMapper.queryByCode(10L)).thenReturn(processDefinition); + command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task); + Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType()); + + // father history: start,start failure; child null == command type: start + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS"); + command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task); + Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType()); + + // father history: scheduler,start failure; child null == command type: scheduler + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS"); + command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task); + Assertions.assertEquals(CommandType.SCHEDULER, command.getCommandType()); + + // father history: complement,start failure; child null == command type: complement + + String startString = "2020-01-01 00:00:00"; + String endString = "2020-01-10 00:00:00"; + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS"); + Map complementMap = new HashMap<>(); + complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE, startString); + complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE, endString); + parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap)); + command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task); + Assertions.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType()); + + JsonNode complementDate = JSONUtils.parseObject(command.getCommandParam()); + Date start = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE).asText()); + Date end = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE).asText()); + Assertions.assertEquals(startString, DateUtils.dateToString(start)); + Assertions.assertEquals(endString, DateUtils.dateToString(end)); + + // father history: start,failure,start failure; child not null == command type: start failure + childInstance = new ProcessInstance(); + parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); + parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS"); + command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task); + Assertions.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType()); + } + + @Test + public void testVerifyIsNeedCreateCommand() { + + List commands = new ArrayList<>(); + + Command command = new Command(); + command.setCommandType(CommandType.REPEAT_RUNNING); + command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\"}"); + commands.add(command); + Mockito.when(commandMapper.selectList(null)).thenReturn(commands); + Assertions.assertFalse(commandService.verifyIsNeedCreateCommand(command)); + + Command command1 = new Command(); + command1.setCommandType(CommandType.REPEAT_RUNNING); + command1.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"222\"}"); + Assertions.assertTrue(commandService.verifyIsNeedCreateCommand(command1)); + + Command command2 = new Command(); + command2.setCommandType(CommandType.PAUSE); + Assertions.assertTrue(commandService.verifyIsNeedCreateCommand(command2)); + } + + @Test + public void testCreateRecoveryWaitingThreadCommand() { + int id = 123; + Mockito.when(commandMapper.deleteById(id)).thenReturn(1); + ProcessInstance subProcessInstance = new ProcessInstance(); + subProcessInstance.setIsSubProcess(Flag.YES); + Command originCommand = new Command(); + originCommand.setId(id); + commandService.createRecoveryWaitingThreadCommand(originCommand, subProcessInstance); + + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(111); + commandService.createRecoveryWaitingThreadCommand(null, subProcessInstance); + + Command recoverCommand = new Command(); + recoverCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD); + commandService.createRecoveryWaitingThreadCommand(recoverCommand, subProcessInstance); + + Command repeatRunningCommand = new Command(); + recoverCommand.setCommandType(CommandType.REPEAT_RUNNING); + commandService.createRecoveryWaitingThreadCommand(repeatRunningCommand, subProcessInstance); + + ProcessInstance subProcessInstance2 = new ProcessInstance(); + subProcessInstance2.setId(111); + subProcessInstance2.setIsSubProcess(Flag.NO); + commandService.createRecoveryWaitingThreadCommand(repeatRunningCommand, subProcessInstance2); + } + + @Test + public void giveNullOriginCommand_thenCreateRecoveryWaitingThreadCommand_expectNoDelete() { + ProcessInstance subProcessInstance = new ProcessInstance(); + subProcessInstance.setIsSubProcess(Flag.NO); + subProcessInstance.setId(111); + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setId(111); + processDefinition.setCode(10L); + subProcessInstance.setProcessDefinition(processDefinition); + subProcessInstance.setWarningGroupId(1); + commandService.createRecoveryWaitingThreadCommand(null, subProcessInstance); + Mockito.verify(commandMapper, Mockito.times(0)).deleteById(anyString()); + } + + @Test + public void testCreateCommand() { + Command command = new Command(); + command.setProcessDefinitionCode(123); + command.setCommandParam("{\"ProcessInstanceId\":222}"); + command.setCommandType(CommandType.START_PROCESS); + int mockResult = 1; + Mockito.when(commandMapper.insert(command)).thenReturn(mockResult); + int exeMethodResult = commandService.createCommand(command); + Assertions.assertEquals(mockResult, exeMethodResult); + Mockito.verify(commandMapper, Mockito.times(1)).insert(command); + } + + @Test + public void testFindCommandPageBySlot() { + int pageSize = 1; + int pageNumber = 0; + int masterCount = 0; + int thisMasterSlot = 2; + List commandList = + commandService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot); + Assertions.assertEquals(0, commandList.size()); + } + +} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index d0ef8f8e18..b394d9605a 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -24,15 +24,12 @@ import static org.mockito.ArgumentMatchers.any; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; -import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.UserType; -import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; -import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.DqExecuteResult; @@ -42,7 +39,6 @@ import org.apache.dolphinscheduler.dao.entity.DqRuleInputEntry; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; @@ -63,7 +59,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; -import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper; @@ -91,7 +86,6 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; -import java.util.Map; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -105,8 +99,6 @@ import org.mockito.quality.Strictness; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.JsonNode; - /** * process service test */ @@ -165,132 +157,12 @@ public class ProcessServiceTest { @Mock private DqComparisonTypeMapper dqComparisonTypeMapper; - @Mock - private ScheduleMapper scheduleMapper; - @Mock CuringParamsService curingGlobalParamsService; @Mock TaskPluginManager taskPluginManager; - @Test - public void testCreateSubCommand() { - ProcessInstance parentInstance = new ProcessInstance(); - parentInstance.setWarningType(WarningType.SUCCESS); - parentInstance.setWarningGroupId(0); - - TaskInstance task = new TaskInstance(); - task.setTaskParams("{\"processDefinitionCode\":10}}"); - task.setId(10); - task.setTaskCode(1L); - task.setTaskDefinitionVersion(1); - - ProcessInstance childInstance = null; - ProcessInstanceMap instanceMap = new ProcessInstanceMap(); - instanceMap.setParentProcessInstanceId(1); - instanceMap.setParentTaskInstanceId(10); - Command command; - - // father history: start; child null == command type: start - parentInstance.setHistoryCmd("START_PROCESS"); - parentInstance.setCommandType(CommandType.START_PROCESS); - ProcessDefinition processDefinition = new ProcessDefinition(); - processDefinition.setCode(10L); - Mockito.when(processDefineMapper.queryByDefineId(100)).thenReturn(processDefinition); - Mockito.when(processDefineMapper.queryByCode(10L)).thenReturn(processDefinition); - command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task); - Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType()); - - // father history: start,start failure; child null == command type: start - parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); - parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS"); - command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task); - Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType()); - - // father history: scheduler,start failure; child null == command type: scheduler - parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); - parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS"); - command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task); - Assertions.assertEquals(CommandType.SCHEDULER, command.getCommandType()); - - // father history: complement,start failure; child null == command type: complement - - String startString = "2020-01-01 00:00:00"; - String endString = "2020-01-10 00:00:00"; - parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); - parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS"); - Map complementMap = new HashMap<>(); - complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE, startString); - complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE, endString); - parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap)); - command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task); - Assertions.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType()); - - JsonNode complementDate = JSONUtils.parseObject(command.getCommandParam()); - Date start = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE).asText()); - Date end = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE).asText()); - Assertions.assertEquals(startString, DateUtils.dateToString(start)); - Assertions.assertEquals(endString, DateUtils.dateToString(end)); - - // father history: start,failure,start failure; child not null == command type: start failure - childInstance = new ProcessInstance(); - parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); - parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS"); - command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task); - Assertions.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType()); - } - - @Test - public void testVerifyIsNeedCreateCommand() { - - List commands = new ArrayList<>(); - - Command command = new Command(); - command.setCommandType(CommandType.REPEAT_RUNNING); - command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\"}"); - commands.add(command); - Mockito.when(commandMapper.selectList(null)).thenReturn(commands); - Assertions.assertFalse(processService.verifyIsNeedCreateCommand(command)); - - Command command1 = new Command(); - command1.setCommandType(CommandType.REPEAT_RUNNING); - command1.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"222\"}"); - Assertions.assertTrue(processService.verifyIsNeedCreateCommand(command1)); - - Command command2 = new Command(); - command2.setCommandType(CommandType.PAUSE); - Assertions.assertTrue(processService.verifyIsNeedCreateCommand(command2)); - } - - @Test - public void testCreateRecoveryWaitingThreadCommand() { - int id = 123; - Mockito.when(commandMapper.deleteById(id)).thenReturn(1); - ProcessInstance subProcessInstance = new ProcessInstance(); - subProcessInstance.setIsSubProcess(Flag.YES); - Command originCommand = new Command(); - originCommand.setId(id); - processService.createRecoveryWaitingThreadCommand(originCommand, subProcessInstance); - - ProcessInstance processInstance = new ProcessInstance(); - processInstance.setId(111); - processService.createRecoveryWaitingThreadCommand(null, subProcessInstance); - - Command recoverCommand = new Command(); - recoverCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD); - processService.createRecoveryWaitingThreadCommand(recoverCommand, subProcessInstance); - - Command repeatRunningCommand = new Command(); - recoverCommand.setCommandType(CommandType.REPEAT_RUNNING); - processService.createRecoveryWaitingThreadCommand(repeatRunningCommand, subProcessInstance); - - ProcessInstance subProcessInstance2 = new ProcessInstance(); - subProcessInstance2.setId(111); - subProcessInstance2.setIsSubProcess(Flag.NO); - processService.createRecoveryWaitingThreadCommand(repeatRunningCommand, subProcessInstance2); - } - @Test public void testHandleCommand() throws CronParseException, CodeGenerateUtils.CodeGenerateException { @@ -789,19 +661,6 @@ public class ProcessServiceTest { Assertions.assertEquals(1, stringTaskNodeTaskNodeRelationDAG.getNodesCount()); } - @Test - public void testCreateCommand() { - Command command = new Command(); - command.setProcessDefinitionCode(123); - command.setCommandParam("{\"ProcessInstanceId\":222}"); - command.setCommandType(CommandType.START_PROCESS); - int mockResult = 1; - Mockito.when(commandMapper.insert(command)).thenReturn(mockResult); - int exeMethodResult = processService.createCommand(command); - Assertions.assertEquals(mockResult, exeMethodResult); - Mockito.verify(commandMapper, Mockito.times(1)).insert(command); - } - @Test public void testChangeOutParam() { TaskInstance taskInstance = new TaskInstance(); @@ -887,17 +746,6 @@ public class ProcessServiceTest { Assertions.assertEquals(instance.getId(), taskInstanceByIdList.get(0).getId()); } - @Test - public void testFindCommandPageBySlot() { - int pageSize = 1; - int pageNumber = 0; - int masterCount = 0; - int thisMasterSlot = 2; - List commandList = - processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot); - Assertions.assertEquals(0, commandList.size()); - } - @Test public void testFindLastManualProcessInterval() { long definitionCode = 1L; diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ParamUtilsTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ParamUtilsTest.java new file mode 100644 index 0000000000..cdd0d761a4 --- /dev/null +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ParamUtilsTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.utils; + +import java.util.Map; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ParamUtilsTest { + + @Test + public void testGetGlobalParamMap() { + String globalParam = "[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]"; + Map globalParamMap = ParamUtils.getGlobalParamMap(globalParam); + Assertions.assertEquals(globalParamMap.size(), 1); + Assertions.assertEquals(globalParamMap.get("startParam1"), ""); + + Map emptyParamMap = ParamUtils.getGlobalParamMap(null); + Assertions.assertEquals(emptyParamMap.size(), 0); + } +}