From bd0fa79e377853eb3d5e92af87dabd70130ec6a5 Mon Sep 17 00:00:00 2001 From: Stalary Date: Thu, 15 Sep 2022 10:00:38 +0800 Subject: [PATCH] [Bug][Dependent]: Id also clone due to duplicate when use dependent mode. (#11929) --- .../api/service/impl/ExecutorServiceImpl.java | 6 +- .../api/service/ExecutorServiceTest.java | 64 ++++++++++++++++++- 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 20b2d9604c..ce1d66683e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -887,7 +887,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ /** * create complement dependent command */ - protected int createComplementDependentCommand(List schedules, Command command) { + public int createComplementDependentCommand(List schedules, Command command) { int dependentProcessDefinitionCreateCount = 0; Command dependentCommand; @@ -901,9 +901,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ List dependentProcessDefinitionList = getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(), CronUtils.getMaxCycle(schedules.get(0).getCrontab()), dependentCommand.getWorkerGroup()); - dependentCommand.setTaskDependType(TaskDependType.TASK_POST); for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) { + // If the id is Integer, the auto-increment id will be obtained by mybatis-plus + // and causing duplicate when clone it. + dependentCommand.setId(null); dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode()); dependentCommand.setProcessDefinitionVersion(dependentProcessDefinition.getProcessDefinitionVersion()); dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup()); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index 2a2409e06a..8cd5d681cc 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -20,18 +20,30 @@ package org.apache.dolphinscheduler.api.service; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.RERUN; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService; import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.RunMode; +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; @@ -45,17 +57,18 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; -import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService; import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import org.assertj.core.util.Lists; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -178,7 +191,8 @@ public class ExecutorServiceTest { .thenReturn(checkProjectAndAuth()); Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition); Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant()); - Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1); + doReturn(1).when(processService).createCommand(argThat(c -> c.getId() == null)); + doReturn(0).when(processService).createCommand(argThat(c -> c.getId() != null)); Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList()); Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(Optional.ofNullable(processInstance)); Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition); @@ -237,6 +251,50 @@ public class ExecutorServiceTest { } + @Test + public void testComplementWithDependentMode() { + Schedule schedule = new Schedule(); + schedule.setStartTime(new Date()); + schedule.setEndTime(new Date()); + schedule.setCrontab("0 0 7 * * ? *"); + schedule.setFailureStrategy(FailureStrategy.CONTINUE); + schedule.setReleaseState(ReleaseState.OFFLINE); + schedule.setWarningType(WarningType.NONE); + schedule.setCreateTime(new Date()); + schedule.setUpdateTime(new Date()); + List schedules = Lists.newArrayList(schedule); + Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode( + processDefinitionCode)) + .thenReturn(schedules); + + DependentProcessDefinition dependentProcessDefinition = new DependentProcessDefinition(); + dependentProcessDefinition.setProcessDefinitionCode(2); + dependentProcessDefinition.setProcessDefinitionVersion(1); + dependentProcessDefinition.setTaskDefinitionCode(1); + dependentProcessDefinition.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); + dependentProcessDefinition.setTaskParams( + "{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":2,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}"); + Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode)) + .thenReturn(Lists.newArrayList(dependentProcessDefinition)); + + Map processDefinitionWorkerGroupMap = new HashMap<>(); + processDefinitionWorkerGroupMap.put(1L, Constants.DEFAULT_WORKER_GROUP); + Mockito.when(processService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L))) + .thenReturn(processDefinitionWorkerGroupMap); + + Command command = new Command(); + command.setId(1); + command.setCommandType(CommandType.COMPLEMENT_DATA); + command.setCommandParam( + "{\"StartNodeList\":\"1\",\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}"); + command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); + command.setProcessDefinitionCode(processDefinitionCode); + command.setExecutorId(1); + + int count = executorService.createComplementDependentCommand(schedules, command); + Assert.assertEquals(1, count); + } + /** * date error */