Browse Source

[Bug][Dependent]: Id also clone due to duplicate when use dependent mode. (#11929)

3.1.0-release
Stalary 2 years ago committed by caishunfeng
parent
commit
714e258be6
  1. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  2. 64
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -889,7 +889,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
/**
* create complement dependent command
*/
protected int createComplementDependentCommand(List<Schedule> schedules, Command command) {
public int createComplementDependentCommand(List<Schedule> schedules, Command command) {
int dependentProcessDefinitionCreateCount = 0;
Command dependentCommand;
@ -903,9 +903,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
List<DependentProcessDefinition> dependentProcessDefinitionList =
getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(),
CronUtils.getMaxCycle(schedules.get(0).getCrontab()), dependentCommand.getWorkerGroup());
dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
// If the id is Integer, the auto-increment id will be obtained by mybatis-plus
// and causing duplicate when clone it.
dependentCommand.setId(null);
dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
Map<String, String> cmdParam = JSONUtils.toMap(dependentCommand.getCommandParam());

64
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java

@ -20,18 +20,30 @@ package org.apache.dolphinscheduler.api.service;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.RERUN;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
@ -45,16 +57,17 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.assertj.core.util.Lists;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -177,7 +190,8 @@ public class ExecutorServiceTest {
.thenReturn(checkProjectAndAuth());
Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1);
doReturn(1).when(processService).createCommand(argThat(c -> c.getId() == null));
doReturn(0).when(processService).createCommand(argThat(c -> c.getId() != null));
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList());
Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance);
Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition);
@ -236,6 +250,50 @@ public class ExecutorServiceTest {
}
@Test
public void testComplementWithDependentMode() {
Schedule schedule = new Schedule();
schedule.setStartTime(new Date());
schedule.setEndTime(new Date());
schedule.setCrontab("0 0 7 * * ? *");
schedule.setFailureStrategy(FailureStrategy.CONTINUE);
schedule.setReleaseState(ReleaseState.OFFLINE);
schedule.setWarningType(WarningType.NONE);
schedule.setCreateTime(new Date());
schedule.setUpdateTime(new Date());
List<Schedule> schedules = Lists.newArrayList(schedule);
Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(
processDefinitionCode))
.thenReturn(schedules);
DependentProcessDefinition dependentProcessDefinition = new DependentProcessDefinition();
dependentProcessDefinition.setProcessDefinitionCode(2);
dependentProcessDefinition.setProcessDefinitionVersion(1);
dependentProcessDefinition.setTaskDefinitionCode(1);
dependentProcessDefinition.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
dependentProcessDefinition.setTaskParams(
"{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":2,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}");
Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode))
.thenReturn(Lists.newArrayList(dependentProcessDefinition));
Map<Long, String> processDefinitionWorkerGroupMap = new HashMap<>();
processDefinitionWorkerGroupMap.put(1L, Constants.DEFAULT_WORKER_GROUP);
Mockito.when(processService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L)))
.thenReturn(processDefinitionWorkerGroupMap);
Command command = new Command();
command.setId(1);
command.setCommandType(CommandType.COMPLEMENT_DATA);
command.setCommandParam(
"{\"StartNodeList\":\"1\",\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}");
command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
command.setProcessDefinitionCode(processDefinitionCode);
command.setExecutorId(1);
int count = executorService.createComplementDependentCommand(schedules, command);
Assert.assertEquals(1, count);
}
/**
* date error
*/

Loading…
Cancel
Save