Browse Source

Merge branch 'json_split' of https://github.com/apache/incubator-dolphinscheduler into spilit

pull/3/MERGE
lenboo 4 years ago
parent
commit
eeaa495661
  1. 13
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  2. 63
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  3. 34
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  4. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
  5. 79
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  6. 28
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  7. 12
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java
  8. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
  9. 74
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
  10. 39
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  11. 7
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml
  12. 2
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
  13. 1
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java
  14. 75
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
  15. 16
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
  16. 35
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  17. 19
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  18. 17
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

13
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -1,3 +1,4 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@ -149,21 +150,21 @@ public interface ProcessInstanceService {
Map<String, Object> viewGantt(Integer processInstanceId) throws Exception;
/**
* query process instance by processDefinitionId and stateArray
* query process instance by processDefinitionCode and stateArray
*
* @param processDefinitionId processDefinitionId
* @param processDefinitionCode processDefinitionCode
* @param states states array
* @return process instance list
*/
List<ProcessInstance> queryByProcessDefineIdAndStatus(int processDefinitionId, int[] states);
List<ProcessInstance> queryByProcessDefineCodeAndStatus(Long processDefinitionCode, int[] states);
/**
* query process instance by processDefinitionId
* query process instance by processDefinitionCode
*
* @param processDefinitionId processDefinitionId
* @param processDefinitionCode processDefinitionCode
* @param size size
* @return process instance list
*/
List<ProcessInstance> queryByProcessDefineId(int processDefinitionId,int size);
List<ProcessInstance> queryByProcessDefineCode(Long processDefinitionCode,int size);
}

63
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -484,7 +484,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
// check process instances is already running
List<ProcessInstance> processInstances = processInstanceService.queryByProcessDefineIdAndStatus(processDefinitionId, Constants.NOT_TERMINATED_STATES);
List<ProcessInstance> processInstances = processInstanceService.queryByProcessDefineCodeAndStatus(processDefinition.getCode(), Constants.NOT_TERMINATED_STATES);
if (CollectionUtils.isNotEmpty(processInstances)) {
putMsg(result, Status.DELETE_PROCESS_DEFINITION_BY_ID_FAIL, processInstances.size());
return result;
@ -621,17 +621,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* get process definition list by ids
*/
private List<ProcessMeta> getProcessDefinitionList(String processDefinitionIds) {
List<ProcessMeta> processDefinitionList = new ArrayList<>();
String[] processDefinitionIdArray = processDefinitionIds.split(",");
List<ProcessMeta> processDefinitionList = new ArrayList<>();
for (String strProcessDefinitionId : processDefinitionIdArray) {
//get workflow info
int processDefinitionId = Integer.parseInt(strProcessDefinitionId);
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(processDefinitionId);
String processDefinitionJson = JSONUtils.toJsonString(processService.genProcessData(processDefinition));
processDefinition.setProcessDefinitionJson(processDefinitionJson);
processDefinitionList.add(exportProcessMetaData(processDefinitionId, processDefinition));
processDefinitionList.add(exportProcessMetaData(processDefinition));
}
return processDefinitionList;
}
@ -671,39 +669,25 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* get export process metadata string
*
* @param processDefinitionId process definition id
* @param processDefinition process definition
* @return export process metadata string
*/
public String exportProcessMetaDataStr(Integer processDefinitionId, ProcessDefinition processDefinition) {
//create workflow json file
return JSONUtils.toJsonString(exportProcessMetaData(processDefinitionId, processDefinition));
}
/**
* get export process metadata string
*
* @param processDefinitionId process definition id
* @param processDefinition process definition
* @return export process metadata string
*/
public ProcessMeta exportProcessMetaData(Integer processDefinitionId, ProcessDefinition processDefinition) {
String processDefinitionJson = processDefinition.getProcessDefinitionJson();
public ProcessMeta exportProcessMetaData(ProcessDefinition processDefinition) {
ProcessData processData = processService.genProcessData(processDefinition);
//correct task param which has data source or dependent param
String correctProcessDefinitionJson = addExportTaskNodeSpecialParam(processDefinitionJson);
processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson);
addExportTaskNodeSpecialParam(processData);
//export process metadata
ProcessMeta exportProcessMeta = new ProcessMeta();
exportProcessMeta.setProjectName(processDefinition.getProjectName());
exportProcessMeta.setProcessDefinitionName(processDefinition.getName());
exportProcessMeta.setProcessDefinitionJson(processDefinitionJson);
exportProcessMeta.setProcessDefinitionJson(JSONUtils.toJsonString(processService.genProcessData(processDefinition)));
exportProcessMeta.setProcessDefinitionDescription(processDefinition.getDescription());
exportProcessMeta.setProcessDefinitionLocations(processDefinition.getLocations());
exportProcessMeta.setProcessDefinitionConnects(processDefinition.getConnects());
//schedule info
List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinition.getId());
if (!schedules.isEmpty()) {
Schedule schedule = schedules.get(0);
exportProcessMeta.setScheduleWarningType(schedule.getWarningType().toString());
@ -723,26 +707,21 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* correct task param which has datasource or dependent
*
* @param processDefinitionJson processDefinitionJson
* @param processData process data
* @return correct processDefinitionJson
*/
private String addExportTaskNodeSpecialParam(String processDefinitionJson) {
ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson);
ArrayNode jsonArray = (ArrayNode) jsonObject.path(TASKS);
for (int i = 0; i < jsonArray.size(); i++) {
JsonNode taskNode = jsonArray.path(i);
if (StringUtils.isNotEmpty(taskNode.path("type").asText())) {
String taskType = taskNode.path("type").asText();
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
if (null != addTaskParam) {
addTaskParam.addExportSpecialParam(taskNode);
}
private void addExportTaskNodeSpecialParam(ProcessData processData) {
List<TaskNode> taskNodeList = processData.getTasks();
List<TaskNode> tmpNodeList = new ArrayList<>();
for (TaskNode taskNode : taskNodeList) {
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskNode.getType());
JsonNode jsonNode = JSONUtils.toJsonNode(taskNode);
if (null != addTaskParam) {
addTaskParam.addExportSpecialParam(jsonNode);
}
tmpNodeList.add(JSONUtils.parseObject(jsonNode.toString(), TaskNode.class));
}
jsonObject.set(TASKS, jsonArray);
return jsonObject.toString();
processData.setTasks(tmpNodeList);
}
/**
@ -1259,7 +1238,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* List of process instances
*/
List<ProcessInstance> processInstanceList = processInstanceService.queryByProcessDefineId(processId, limit);
List<ProcessInstance> processInstanceList = processInstanceService.queryByProcessDefineCode(processDefinition.getCode(), limit);
for (ProcessInstance processInstance : processInstanceList) {
processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime()));

34
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -257,9 +257,11 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
PageInfo<ProcessInstance> pageInfo = new PageInfo<>(pageNo, pageSize);
int executorId = usersService.getUserIdByName(executorName);
ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefineId);
IPage<ProcessInstance> processInstanceList =
processInstanceMapper.queryProcessInstanceListPaging(page,
project.getId(), processDefineId, searchVal, executorId, statusArray, host, start, end);
project.getCode(), processDefinition.getCode(), searchVal, executorId, statusArray, host, start, end);
List<ProcessInstance> processInstances = processInstanceList.getRecords();
@ -451,7 +453,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
int update = processService.updateProcessInstance(processInstance);
int updateDefine = 1;
if (Boolean.TRUE.equals(syncDefine)) {
updateDefine = syncDefinition(loginUser, project, processInstanceJson, locations, connects,
updateDefine = syncDefinition(loginUser, project, locations, connects,
processInstance, processDefinition, processData);
}
if (update > 0 && updateDefine > 0) {
@ -465,12 +467,11 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
/**
* sync definition according process instance
*/
private int syncDefinition(User loginUser, Project project, String processInstanceJson, String locations, String connects,
private int syncDefinition(User loginUser, Project project, String locations, String connects,
ProcessInstance processInstance, ProcessDefinition processDefinition,
ProcessData processData) {
String originDefParams = JSONUtils.toJsonString(processData.getGlobalParams());
processDefinition.setProcessDefinitionJson(processInstanceJson);
processDefinition.setGlobalParams(originDefParams);
processDefinition.setLocations(locations);
processDefinition.setConnects(connects);
@ -511,9 +512,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
if (tenant != null) {
processInstance.setTenantCode(tenant.getTenantCode());
}
processInstance.setProcessInstanceJson(processInstanceJson);
processInstance.setGlobalParams(globalParams);
}
/**
* query parent process instance detail info by sub process instance id
*
@ -645,10 +646,6 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
/**
* get local params
*
* @param processInstance
* @param timeParams
* @return
*/
private Map<String, Map<String, Object>> getLocalParams(ProcessInstance processInstance, Map<String, String> timeParams) {
Map<String, Map<String, Object>> localUserDefParams = new HashMap<>();
@ -674,6 +671,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
}
return localUserDefParams;
}
/**
* encapsulation gantt structure
*
@ -732,25 +730,27 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
}
/**
* query process instance by processDefinitionId and stateArray
* @param processDefinitionId processDefinitionId
* query process instance by processDefinitionCode and stateArray
*
* @param processDefinitionCode processDefinitionCode
* @param states states array
* @return process instance list
*/
@Override
public List<ProcessInstance> queryByProcessDefineIdAndStatus(int processDefinitionId, int[] states) {
return processInstanceMapper.queryByProcessDefineIdAndStatus(processDefinitionId, states);
public List<ProcessInstance> queryByProcessDefineCodeAndStatus(Long processDefinitionCode, int[] states) {
return processInstanceMapper.queryByProcessDefineCodeAndStatus(processDefinitionCode, states);
}
/**
* query process instance by processDefinitionId
* @param processDefinitionId processDefinitionId
* query process instance by processDefinitionCode
*
* @param processDefinitionCode processDefinitionCode
* @param size size
* @return process instance list
*/
@Override
public List<ProcessInstance> queryByProcessDefineId(int processDefinitionId, int size) {
return processInstanceMapper.queryByProcessDefineId(processDefinitionId, size);
public List<ProcessInstance> queryByProcessDefineCode(Long processDefinitionCode, int size) {
return processInstanceMapper.queryByProcessDefineCode(processDefinitionCode, size);
}
}

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java

@ -184,7 +184,6 @@ public class ProcessDefinitionControllerTest {
processDefinition.setId(id);
processDefinition.setLocations(locations);
processDefinition.setName(name);
processDefinition.setProcessDefinitionJson(json);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
@ -267,7 +266,6 @@ public class ProcessDefinitionControllerTest {
processDefinition.setId(id);
processDefinition.setLocations(locations);
processDefinition.setName(name);
processDefinition.setProcessDefinitionJson(json);
String name2 = "dag_test";
int id2 = 2;
@ -279,7 +277,6 @@ public class ProcessDefinitionControllerTest {
processDefinition2.setId(id2);
processDefinition2.setLocations(locations);
processDefinition2.setName(name2);
processDefinition2.setProcessDefinitionJson(json);
resourceList.add(processDefinition);
resourceList.add(processDefinition2);

79
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils;
@ -391,14 +392,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test_def")).thenReturn(null);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+ ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}"
+ ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+ ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}";
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
ProcessData processData = getProcessData();
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData);
Map<String, Object> instanceNotexitRes = processDefinitionService.queryProcessDefinitionByName(loginUser,
"project_test1", "test_def");
@ -416,12 +410,6 @@ public class ProcessDefinitionServiceTest {
String projectName = "project_test1";
Project project = getProject(projectName);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\","
+ "\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234"
+ "\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+ "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},"
+ "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}";
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
@ -463,7 +451,6 @@ public class ProcessDefinitionServiceTest {
// instance exit
ProcessDefinition definition = getProcessDefinition();
definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}");
definition.setProcessDefinitionJson(processDefinitionJson);
definition.setConnects("[]");
Mockito.when(processDefineMapper.selectById(46)).thenReturn(definition);
@ -478,8 +465,7 @@ public class ProcessDefinitionServiceTest {
, Mockito.any(ProcessDefinition.class)))
.thenReturn(1);
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData);
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(getProcessData());
Map<String, Object> map3 = processDefinitionService.batchCopyProcessDefinition(
loginUser, projectName, "46", 1);
@ -518,11 +504,6 @@ public class ProcessDefinitionServiceTest {
ProcessDefinition definition = getProcessDefinition();
definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}");
definition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\""
+ ",\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234"
+ "\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+ "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},"
+ "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}");
definition.setConnects("[]");
// check target project result == null
@ -656,16 +637,6 @@ public class ProcessDefinitionServiceTest {
loginUser, "project_test1", 46, ReleaseState.getEnum(2));
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failRes.get(Constants.STATUS));
//FIXME has function exit code 1 when exception
//process definition offline
// List<Schedule> schedules = new ArrayList<>();
// Schedule schedule = getSchedule();
// schedules.add(schedule);
// Mockito.when(scheduleMapper.selectAllByProcessDefineArray(new int[]{46})).thenReturn(schedules);
// Mockito.when(scheduleMapper.updateById(schedule)).thenReturn(1);
// Map<String, Object> offlineRes = processDefinitionService.releaseProcessDefinition(loginUser, "project_test1",
// 46, ReleaseState.OFFLINE.getCode());
// Assert.assertEquals(Status.SUCCESS, offlineRes.get(Constants.STATUS));
}
@Test
@ -746,7 +717,6 @@ public class ProcessDefinitionServiceTest {
//success
Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(new ProcessData());
processDefinition.setProcessDefinitionJson(SHELL_JSON);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Map<String, Object> dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionId(46);
Assert.assertEquals(Status.SUCCESS, dataNotValidRes.get(Constants.STATUS));
@ -763,29 +733,37 @@ public class ProcessDefinitionServiceTest {
//process definition exist
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(SHELL_JSON);
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(processDefinition);
Mockito.when(processDefineMapper.queryDefinitionListByIdList(idArray)).thenReturn(processDefinitionList);
String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+ ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}"
+ ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+ ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}";
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
ProcessData processData = getProcessData();
Mockito.when(processService.genProcessData(processDefinition)).thenReturn(processData);
Map<String, Object> successRes = processDefinitionService.getTaskNodeListByDefinitionIdList(defineIdList);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
private ProcessData getProcessData() {
ProcessData processData = new ProcessData();
List<TaskNode> taskNodeList = new ArrayList<>();
processData.setTasks(taskNodeList);
List<Property> properties = new ArrayList<>();
processData.setGlobalParams(properties);
processData.setTenantId(10);
processData.setTimeout(100);
return processData;
}
@Test
public void testQueryProcessDefinitionAllByProjectId() {
int projectId = 1;
Long projectCode = 2L;
Project project = new Project();
project.setId(projectId);
project.setCode(projectCode);
Mockito.when(projectMapper.selectById(projectId)).thenReturn(project);
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProcessDefinitionJson(SHELL_JSON);
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(processDefinition);
Project test = getProject("test");
@ -825,7 +803,7 @@ public class ProcessDefinitionServiceTest {
//task instance not exist
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processInstanceService.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList);
Mockito.when(processInstanceService.queryByProcessDefineCode(46L, 10)).thenReturn(processInstanceList);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Map<String, Object> taskNullRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS));
@ -893,7 +871,7 @@ public class ProcessDefinitionServiceTest {
//task instance exist
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Mockito.when(processInstanceService.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList);
Mockito.when(processInstanceService.queryByProcessDefineCode(46L, 10)).thenReturn(processInstanceList);
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
@ -1188,6 +1166,7 @@ public class ProcessDefinitionServiceTest {
processDefinition.setProjectId(2);
processDefinition.setTenantId(1);
processDefinition.setDescription("");
processDefinition.setCode(9999L);
return processDefinition;
}
@ -1284,15 +1263,7 @@ public class ProcessDefinitionServiceTest {
Integer processDefinitionId = 111;
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(processDefinitionId);
processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":"
+ "[\"\"]},\"delayTime\":\"0\",\"dependence\":{},"
+ "\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\","
+ "\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]},"
+ "\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\","
+ "\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\","
+ "\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
Assert.assertNotNull(processDefinitionService.exportProcessMetaData(processDefinitionId, processDefinition));
Assert.assertNotNull(processDefinitionService.exportProcessMetaData(processDefinition));
}
@Test

28
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -130,11 +130,24 @@ public class ProcessInstanceServiceTest {
"192.168.xx.xx", 1, 10);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, proejctAuthFailRes.get(Constants.STATUS));
Date start = DateUtils.getScheduleDate("2020-01-01 00:00:00");
Date end = DateUtils.getScheduleDate("2020-01-02 00:00:00");
ProcessInstance processInstance = getProcessInstance();
List<ProcessInstance> processInstanceList = new ArrayList<>();
Page<ProcessInstance> pageReturn = new Page<>(1, 10);
processInstanceList.add(processInstance);
pageReturn.setRecords(processInstanceList);
// data parameter check
putMsg(result, Status.SUCCESS, projectName);
Project project = getProject(projectName);
when(projectMapper.queryByName(projectName)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
when(processDefineMapper.selectById(Mockito.anyInt())).thenReturn(getProcessDefinition());
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class)
, Mockito.any(), Mockito.any(), Mockito.any(),Mockito.any(), Mockito.any(),
eq("192.168.xx.xx"), Mockito.any(), Mockito.any())).thenReturn(pageReturn);
Map<String, Object> dataParameterRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 1, "20200101 00:00:00",
"20200102 00:00:00", "", loginUser.getUserName(), ExecutionStatus.SUBMITTED_SUCCESS,
"192.168.xx.xx", 1, 10);
@ -142,18 +155,12 @@ public class ProcessInstanceServiceTest {
//project auth success
putMsg(result, Status.SUCCESS, projectName);
Date start = DateUtils.getScheduleDate("2020-01-01 00:00:00");
Date end = DateUtils.getScheduleDate("2020-01-02 00:00:00");
ProcessInstance processInstance = getProcessInstance();
List<ProcessInstance> processInstanceList = new ArrayList<>();
Page<ProcessInstance> pageReturn = new Page<>(1, 10);
processInstanceList.add(processInstance);
pageReturn.setRecords(processInstanceList);
when(projectMapper.queryByName(projectName)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser);
when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId());
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(-1), Mockito.any(),
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1L), eq(""), eq(-1), Mockito.any(),
eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn);
when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser);
Map<String, Object> successRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 1, "2020-01-01 00:00:00",
@ -162,7 +169,7 @@ public class ProcessInstanceServiceTest {
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
// data parameter empty
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(-1), Mockito.any(),
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1L), eq(""), eq(-1), Mockito.any(),
eq("192.168.xx.xx"), eq(null), eq(null))).thenReturn(pageReturn);
successRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 1, "",
"", "", loginUser.getUserName(), ExecutionStatus.SUBMITTED_SUCCESS,
@ -178,7 +185,7 @@ public class ProcessInstanceServiceTest {
Assert.assertEquals(Status.SUCCESS, executorExistRes.get(Constants.STATUS));
//executor name empty
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(0), Mockito.any(),
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1L), eq(""), eq(0), Mockito.any(),
eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn);
Map<String, Object> executorEmptyRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 1, "2020-01-01 00:00:00",
"2020-01-02 00:00:00", "", "", ExecutionStatus.SUBMITTED_SUCCESS,
@ -532,6 +539,7 @@ public class ProcessInstanceServiceTest {
*/
private Project getProject(String projectName) {
Project project = new Project();
project.setCode(1L);
project.setId(1);
project.setName(projectName);
project.setUserId(1);

12
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java

@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
*/
public class DependentItem {
private int definitionId;
private Long definitionCode;
private String depTasks;
private String cycle;
private String dateValue;
@ -34,18 +34,18 @@ public class DependentItem {
public String getKey(){
return String.format("%d-%s-%s-%s",
getDefinitionId(),
getDefinitionCode(),
getDepTasks(),
getCycle(),
getDateValue());
}
public int getDefinitionId() {
return definitionId;
public Long getDefinitionCode() {
return definitionCode;
}
public void setDefinitionId(int definitionId) {
this.definitionId = definitionId;
public void setDefinitionCode(Long definitionCode) {
this.definitionCode = definitionCode;
}
public String getDepTasks() {

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java

@ -52,6 +52,7 @@ public class ProcessInstance {
* process definition id
* TODO delete
*/
@TableField(exist = false)
private int processDefinitionId;
/**
@ -160,6 +161,7 @@ public class ProcessInstance {
* process instance json
* TODO delete
*/
@TableField(exist = false)
private String processInstanceJson;
/**
@ -193,11 +195,13 @@ public class ProcessInstance {
/**
* task locations for web
*/
@TableField(exist = false)
private String locations;
/**
* task connects for web
*/
@TableField(exist = false)
private String connects;
/**
@ -208,6 +212,7 @@ public class ProcessInstance {
/**
* depend processes schedule time
*/
@TableField(exist = false)
private String dependenceScheduleTimes;
/**

74
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

@ -37,6 +37,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* query process instance detail info by id
*
* @param processId processId
* @return process instance
*/
@ -44,6 +45,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* query process instance by host and stateArray
*
* @param host host
* @param stateArray stateArray
* @return process instance list
@ -53,21 +55,23 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* query process instance by tenantId and stateArray
*
* @param tenantId tenantId
* @param states states array
* @return process instance list
*/
List<ProcessInstance> queryByTenantIdAndStatus(@Param("tenantId") int tenantId,
@Param("states") int[] states);
@Param("states") int[] states);
/**
* query process instance by worker group and stateArray
*
* @param workerGroupId workerGroupId
* @param states states array
* @return process instance list
*/
List<ProcessInstance> queryByWorkerGroupIdAndStatus(@Param("workerGroupId") int workerGroupId,
@Param("states") int[] states);
@Param("states") int[] states);
/**
* process instance page
@ -85,9 +89,10 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* process instance page
*
* @param page page
* @param projectId projectId
* @param processDefinitionId processDefinitionId
* @param projectCode projectCode
* @param processDefinitionCode processDefinitionCode
* @param searchVal searchVal
* @param executorId executorId
* @param statusArray statusArray
@ -97,8 +102,8 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
* @return process instance page
*/
IPage<ProcessInstance> queryProcessInstanceListPaging(Page<ProcessInstance> page,
@Param("projectId") int projectId,
@Param("processDefinitionId") Integer processDefinitionId,
@Param("projectCode") Long projectCode,
@Param("processDefinitionCode") Long processDefinitionCode,
@Param("searchVal") String searchVal,
@Param("executorId") Integer executorId,
@Param("states") int[] statusArray,
@ -108,6 +113,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* set failover by host and state array
*
* @param host host
* @param stateArray stateArray
* @return set result
@ -117,7 +123,8 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* update process instance by state
* @param originState originState
*
* @param originState originState
* @param destState destState
* @return update result
*/
@ -125,7 +132,8 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
@Param("destState") ExecutionStatus destState);
/**
* update process instance by tenantId
* update process instance by tenantId
*
* @param originTenantId originTenantId
* @param destTenantId destTenantId
* @return update result
@ -135,6 +143,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* update process instance by worker groupId
*
* @param originWorkerGroupId originWorkerGroupId
* @param destWorkerGroupId destWorkerGroupId
* @return update result
@ -143,6 +152,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* count process instance state by user
*
* @param startTime startTime
* @param endTime endTime
* @param projectCodes projectCodes
@ -154,74 +164,76 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
@Param("projectCodes") Long[] projectCodes);
/**
* query process instance by processDefinitionId
* @param processDefinitionId processDefinitionId
* query process instance by processDefinitionCode
*
* @param processDefinitionCode processDefinitionCode
* @param size size
* @return process instance list
*/
List<ProcessInstance> queryByProcessDefineId(
@Param("processDefinitionId") int processDefinitionId,
@Param("size") int size);
List<ProcessInstance> queryByProcessDefineCode(@Param("processDefinitionCode") Long processDefinitionCode,
@Param("size") int size);
/**
* query last scheduler process instance
* @param definitionId processDefinitionId
*
* @param definitionCode definitionCode
* @param startTime startTime
* @param endTime endTime
* @return process instance
*/
ProcessInstance queryLastSchedulerProcess(@Param("processDefinitionId") int definitionId,
ProcessInstance queryLastSchedulerProcess(@Param("processDefinitionCode") Long definitionCode,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime);
/**
* query last running process instance
* @param definitionId definitionId
*
* @param definitionCode definitionCode
* @param startTime startTime
* @param endTime endTime
* @param stateArray stateArray
* @return process instance
*/
ProcessInstance queryLastRunningProcess(@Param("processDefinitionId") int definitionId,
ProcessInstance queryLastRunningProcess(@Param("processDefinitionCode") Long definitionCode,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
@Param("states") int[] stateArray);
/**
* query last manual process instance
* @param definitionId definitionId
*
* @param definitionCode definitionCode
* @param startTime startTime
* @param endTime endTime
* @return process instance
*/
ProcessInstance queryLastManualProcess(@Param("processDefinitionId") int definitionId,
ProcessInstance queryLastManualProcess(@Param("processDefinitionCode") Long definitionCode,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime);
/**
* query top n process instance order by running duration
* @param size
*
* @param status process instance status
* @param startTime
* @param endTime
* @return ProcessInstance list
*/
List<ProcessInstance> queryTopNProcessInstance(@Param("size") int size,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
@Param("status")ExecutionStatus status);
@Param("status") ExecutionStatus status);
/**
* query process instance by processDefinitionId and stateArray
* @param processDefinitionId processDefinitionId
* query process instance by processDefinitionCode and stateArray
*
* @param processDefinitionCode processDefinitionCode
* @param states states array
* @return process instance list
*/
List<ProcessInstance> queryByProcessDefineIdAndStatus(
@Param("processDefinitionId") int processDefinitionId,
@Param("states") int[] states);
List<ProcessInstance> queryByProcessDefineCodeAndStatus(@Param("processDefinitionCode") Long processDefinitionCode,
@Param("states") int[] states);
int updateGlobalParamsById(
@Param("globalParams") String globalParams,
@Param("id") int id);
int updateGlobalParamsById(@Param("globalParams") String globalParams,
@Param("id") int id);
}

39
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -19,10 +19,10 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper">
<sql id="baseSql">
id, name, process_definition_id, process_definition_version, process_definition_code, state, recovery, start_time, end_time, run_times,host,
id, name, process_definition_version, process_definition_code, state, recovery, start_time, end_time, run_times,host,
command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type,
warning_group_id, schedule_time, command_start_time, global_params, process_instance_json, flag,
update_time, is_sub_process, executor_id, locations, connects, history_cmd, dependence_schedule_times,
warning_group_id, schedule_time, command_start_time, global_params, flag,
update_time, is_sub_process, executor_id, history_cmd,
process_instance_priority, worker_group, timeout, tenant_id, var_pool
</sql>
<select id="queryDetailById" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
@ -88,15 +88,15 @@
</select>
<select id="queryProcessInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select instance.id, instance.process_definition_id, instance.command_type, instance.executor_id, instance.process_definition_version,
instance.name, instance.state, instance.schedule_time, instance.start_time, instance.end_time,
instance.run_times, instance.recovery, instance.host
select instance.id, instance.command_type, instance.executor_id, instance.process_definition_version,
instance.process_definition_code, instance.name, instance.state, instance.schedule_time, instance.start_time,
instance.end_time, instance.run_times, instance.recovery, instance.host
from t_ds_process_instance instance
join t_ds_process_definition define ON instance.process_definition_id = define.id
join t_ds_process_definition define ON instance.process_definition_code = define.code
where instance.is_sub_process=0
and define.project_id = #{projectId}
<if test="processDefinitionId != 0">
and instance.process_definition_id = #{processDefinitionId}
and define.project_code = #{projectCode}
<if test="processDefinitionCode != 0">
and instance.process_definition_code = #{processDefinitionCode}
</if>
<if test="searchVal != null and searchVal != ''">
and instance.name like concat('%', #{searchVal}, '%')
@ -147,8 +147,8 @@
<select id="countInstanceStateByUser" resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount">
select t.state, count(0) as count
from t_ds_process_instance t
join t_ds_process_definition d on d.id=t.process_definition_id
join t_ds_project p on p.id=d.project_id
join t_ds_process_definition d on d.code=t.process_definition_code
join t_ds_project p on p.code=d.project_code
where 1 = 1
and t.is_sub_process = 0
<if test="startTime != null and endTime != null">
@ -162,18 +162,18 @@
</if>
group by t.state
</select>
<select id="queryByProcessDefineId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
<select id="queryByProcessDefineCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_id=#{processDefinitionId}
where process_definition_code=#{processDefinitionCode}
order by start_time desc limit #{size}
</select>
<select id="queryLastSchedulerProcess" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_id=#{processDefinitionId}
where process_definition_code=#{processDefinitionCode}
<if test="startTime!=null and endTime != null ">
and schedule_time <![CDATA[ >= ]]> #{startTime} and schedule_time <![CDATA[ <= ]]> #{endTime}
</if>
@ -183,7 +183,7 @@
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_id=#{processDefinitionId}
where process_definition_code=#{processDefinitionCode}
<if test="states !=null and states.length != 0">
and state in
<foreach collection="states" item="i" index="index" open="(" separator="," close=")">
@ -200,19 +200,18 @@
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_id=#{processDefinitionId}
where process_definition_code=#{processDefinitionCode}
and schedule_time is null
<if test="startTime!=null and endTime != null ">
and start_time <![CDATA[ >= ]]> #{startTime} and start_time <![CDATA[ <= ]]> #{endTime}
</if>
order by end_time desc limit 1
</select>
<select id="queryByProcessDefineIdAndStatus"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
<select id="queryByProcessDefineCodeAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_id=#{processDefinitionId}
where process_definition_code=#{processDefinitionCode}
and state in
<foreach collection="states" item="i" open="(" close=")" separator=",">
#{i}

7
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml

@ -75,9 +75,10 @@
</include>
,
u.user_name as user_name,
(SELECT COUNT(*) FROM t_ds_process_definition AS def WHERE def.project_id = p.id) AS def_count,
(SELECT COUNT(*) FROM t_ds_process_definition def, t_ds_process_instance inst WHERE def.id =
inst.process_definition_id AND def.project_id = p.id AND inst.state=1 ) as inst_running_count
(SELECT COUNT(*) FROM t_ds_process_definition AS def WHERE def.project_code = p.code) AS def_count,
(SELECT COUNT(*) FROM t_ds_process_definition_log def, t_ds_process_instance inst WHERE def.code =
inst.process_definition_code and def.version = inst.process_definition_version AND def.project_code = p.code
AND inst.state=1 ) as inst_running_count
from t_ds_project p
join t_ds_user u on u.id=p.user_id
where 1=1

2
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

@ -73,7 +73,7 @@
select state, count(0) as count
from t_ds_task_instance t
left join t_ds_process_definition d on d.code=t.process_definition_code
left join t_ds_project p on p.id=d.project_id
left join t_ds_project p on p.code=d.project_code
where 1=1
<if test="projectCodes != null and projectCodes.length != 0">
and d.project_code in

1
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java

@ -73,6 +73,7 @@ public class ErrorCommandMapperTest {
ErrorCommand errorCommand = insertOne();
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(1L);
processDefinition.setName("def 1");
processDefinition.setProjectCode(1010L);
processDefinition.setUserId(101);

75
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java

@ -20,9 +20,14 @@ package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.dao.entity.*;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import java.util.Date;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -32,8 +37,8 @@ import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.List;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@RunWith(SpringRunner.class)
@SpringBootTest
@ -54,15 +59,12 @@ public class ProcessInstanceMapperTest {
/**
* insert process instance with specified start time and end time,set state to SUCCESS
*
* @param startTime
* @param endTime
* @return
*/
private ProcessInstance insertOne(Date startTime, Date endTime) {
ProcessInstance processInstance = new ProcessInstance();
Date start = startTime;
Date end = endTime;
processInstance.setProcessDefinitionCode(1L);
processInstance.setStartTime(start);
processInstance.setEndTime(end);
processInstance.setState(ExecutionStatus.SUCCESS);
@ -73,13 +75,15 @@ public class ProcessInstanceMapperTest {
/**
* insert
*
* @return ProcessInstance
*/
private ProcessInstance insertOne(){
private ProcessInstance insertOne() {
//insertOne
ProcessInstance processInstance = new ProcessInstance();
Date start = new Date(2019-1900, 1-1, 1, 0, 10,0);
Date end = new Date(2019-1900, 1-1, 1, 1, 0,0);
Date start = new Date(2019 - 1900, 1 - 1, 1, 0, 10, 0);
Date end = new Date(2019 - 1900, 1 - 1, 1, 1, 0, 0);
processInstance.setProcessDefinitionCode(1L);
processInstance.setStartTime(start);
processInstance.setEndTime(end);
processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
@ -92,7 +96,7 @@ public class ProcessInstanceMapperTest {
* test update
*/
@Test
public void testUpdate(){
public void testUpdate() {
//insertOne
ProcessInstance processInstanceMap = insertOne();
//update
@ -105,7 +109,7 @@ public class ProcessInstanceMapperTest {
* test delete
*/
@Test
public void testDelete(){
public void testDelete() {
ProcessInstance processInstanceMap = insertOne();
int delete = processInstanceMapper.deleteById(processInstanceMap.getId());
Assert.assertEquals(1, delete);
@ -168,12 +172,16 @@ public class ProcessInstanceMapperTest {
ExecutionStatus.SUCCESS.ordinal()};
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(1L);
processDefinition.setProjectId(1010);
processDefinition.setProjectCode(1L);
processDefinition.setReleaseState(ReleaseState.ONLINE);
processDefinition.setUpdateTime(new Date());
processDefinition.setCreateTime(new Date());
processDefinitionMapper.insert(processDefinition);
ProcessInstance processInstance = insertOne();
processInstance.setProcessDefinitionId(processDefinition.getId());
processInstance.setProcessDefinitionCode(processDefinition.getCode());
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
processInstance.setIsSubProcess(Flag.NO);
processInstance.setStartTime(new Date());
@ -185,8 +193,8 @@ public class ProcessInstanceMapperTest {
IPage<ProcessInstance> processInstanceIPage = processInstanceMapper.queryProcessInstanceListPaging(
page,
processDefinition.getProjectId(),
processInstance.getProcessDefinitionId(),
processDefinition.getProjectCode(),
processInstance.getProcessDefinitionCode(),
processInstance.getName(),
0,
stateArray,
@ -252,10 +260,18 @@ public class ProcessInstanceMapperTest {
Project project = new Project();
project.setName("testProject");
project.setCode(1L);
project.setCreateTime(new Date());
project.setUpdateTime(new Date());
projectMapper.insert(project);
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setProjectId(project.getId());
processDefinition.setCode(1L);
processDefinition.setProjectId(1010);
processDefinition.setProjectCode(1L);
processDefinition.setReleaseState(ReleaseState.ONLINE);
processDefinition.setUpdateTime(new Date());
processDefinition.setCreateTime(new Date());
processDefinitionMapper.insert(processDefinition);
ProcessInstance processInstance = insertOne();
@ -283,10 +299,10 @@ public class ProcessInstanceMapperTest {
ProcessInstance processInstance1 = insertOne();
List<ProcessInstance> processInstances = processInstanceMapper.queryByProcessDefineId(processInstance.getProcessDefinitionId(), 1);
List<ProcessInstance> processInstances = processInstanceMapper.queryByProcessDefineCode(processInstance.getProcessDefinitionCode(), 1);
Assert.assertEquals(1, processInstances.size());
processInstances = processInstanceMapper.queryByProcessDefineId(processInstance.getProcessDefinitionId(), 2);
processInstances = processInstanceMapper.queryByProcessDefineCode(processInstance.getProcessDefinitionCode(), 2);
Assert.assertEquals(2, processInstances.size());
processInstanceMapper.deleteById(processInstance.getId());
@ -302,7 +318,7 @@ public class ProcessInstanceMapperTest {
processInstance.setScheduleTime(new Date());
processInstanceMapper.updateById(processInstance);
ProcessInstance processInstance1 = processInstanceMapper.queryLastSchedulerProcess(processInstance.getProcessDefinitionId(), null, null );
ProcessInstance processInstance1 = processInstanceMapper.queryLastSchedulerProcess(processInstance.getProcessDefinitionCode(), null, null);
Assert.assertNotEquals(processInstance1, null);
processInstanceMapper.deleteById(processInstance.getId());
}
@ -320,7 +336,7 @@ public class ProcessInstanceMapperTest {
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.SUBMITTED_SUCCESS.ordinal()};
ProcessInstance processInstance1 = processInstanceMapper.queryLastRunningProcess(processInstance.getProcessDefinitionId(), null, null , stateArray);
ProcessInstance processInstance1 = processInstanceMapper.queryLastRunningProcess(processInstance.getProcessDefinitionCode(), null, null, stateArray);
Assert.assertNotEquals(processInstance1, null);
processInstanceMapper.deleteById(processInstance.getId());
@ -334,14 +350,14 @@ public class ProcessInstanceMapperTest {
ProcessInstance processInstance = insertOne();
processInstanceMapper.updateById(processInstance);
Date start = new Date(2019-1900, 1-1, 01, 0, 0, 0);
Date end = new Date(2019-1900, 1-1, 01, 5, 0, 0);
ProcessInstance processInstance1 = processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionId(),start, end
Date start = new Date(2019 - 1900, 1 - 1, 01, 0, 0, 0);
Date end = new Date(2019 - 1900, 1 - 1, 01, 5, 0, 0);
ProcessInstance processInstance1 = processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), start, end
);
Assert.assertEquals(processInstance1.getId(), processInstance.getId());
start = new Date(2019-1900, 1-1, 01, 1, 0, 0);
processInstance1 = processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionId(),start, end
start = new Date(2019 - 1900, 1 - 1, 01, 1, 0, 0);
processInstance1 = processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), start, end
);
Assert.assertNull(processInstance1);
@ -352,9 +368,6 @@ public class ProcessInstanceMapperTest {
/**
* test whether it is in descending order by running duration
*
* @param processInstances
* @return
*/
private boolean isSortedByDuration(List<ProcessInstance> processInstances) {
for (int i = 1; i < processInstances.size(); i++) {
@ -383,7 +396,7 @@ public class ProcessInstanceMapperTest {
ProcessInstance processInstance3 = insertOne(startTime3, endTime3);
Date start = new Date(2020, 1, 1, 1, 1, 1);
Date end = new Date(2021, 1, 1, 1, 1, 1);
List<ProcessInstance> processInstances = processInstanceMapper.queryTopNProcessInstance(2, start, end,ExecutionStatus.SUCCESS);
List<ProcessInstance> processInstances = processInstanceMapper.queryTopNProcessInstance(2, start, end, ExecutionStatus.SUCCESS);
Assert.assertEquals(2, processInstances.size());
Assert.assertTrue(isSortedByDuration(processInstances));
for (ProcessInstance processInstance : processInstances) {

16
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java

@ -101,7 +101,7 @@ public class DependentExecute {
DependResult result = DependResult.FAILED;
for(DateInterval dateInterval : dateIntervals){
ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(),
ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionCode(),
dateInterval);
if(processInstance == null){
return DependResult.WAITING;
@ -170,24 +170,20 @@ public class DependentExecute {
* find the last one process instance that :
* 1. manual run and finish between the interval
* 2. schedule run and schedule time between the interval
* @param definitionId definition id
* @param definitionCode definition code
* @param dateInterval date interval
* @return ProcessInstance
*/
private ProcessInstance findLastProcessInterval(int definitionId, DateInterval dateInterval) {
private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval) {
ProcessInstance runningProcess = processService.findLastRunningProcess(definitionId, dateInterval.getStartTime(), dateInterval.getEndTime());
ProcessInstance runningProcess = processService.findLastRunningProcess(definitionCode, dateInterval.getStartTime(), dateInterval.getEndTime());
if(runningProcess != null){
return runningProcess;
}
ProcessInstance lastSchedulerProcess = processService.findLastSchedulerProcessInterval(
definitionId, dateInterval
);
ProcessInstance lastSchedulerProcess = processService.findLastSchedulerProcessInterval(definitionCode, dateInterval);
ProcessInstance lastManualProcess = processService.findLastManualProcessInterval(
definitionId, dateInterval
);
ProcessInstance lastManualProcess = processService.findLastManualProcessInterval(definitionCode, dateInterval);
if(lastManualProcess ==null){
return lastSchedulerProcess;

35
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -120,7 +120,7 @@ public class DependentTaskTest {
DependentTaskModel dependentTaskModel = new DependentTaskModel();
dependentTaskModel.setRelation(DependentRelation.AND);
dependentTaskModel.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, "A", "today", "day")
getDependentItemFromTaskNode(2L, "A", "today", "day")
).collect(Collectors.toList()));
DependentParameters dependentParameters = new DependentParameters();
@ -140,7 +140,7 @@ public class DependentTaskTest {
getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.thenReturn(dependentProcessInstance);
// for DependentExecute.getDependTaskResult
@ -163,7 +163,7 @@ public class DependentTaskTest {
getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.thenReturn(dependentProcessInstance);
// for DependentExecute.getDependTaskResult
@ -184,15 +184,15 @@ public class DependentTaskTest {
DependentTaskModel dependentTaskModel1 = new DependentTaskModel();
dependentTaskModel1.setRelation(DependentRelation.AND);
dependentTaskModel1.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, "A", "today", "day"),
getDependentItemFromTaskNode(3, "B", "today", "day")
getDependentItemFromTaskNode(2L, "A", "today", "day"),
getDependentItemFromTaskNode(3L, "B", "today", "day")
).collect(Collectors.toList()));
DependentTaskModel dependentTaskModel2 = new DependentTaskModel();
dependentTaskModel2.setRelation(DependentRelation.OR);
dependentTaskModel2.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, "A", "today", "day"),
getDependentItemFromTaskNode(3, "C", "today", "day")
getDependentItemFromTaskNode(2L, "A", "today", "day"),
getDependentItemFromTaskNode(3L, "C", "today", "day")
).collect(Collectors.toList()));
/*
@ -217,10 +217,10 @@ public class DependentTaskTest {
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.thenReturn(processInstance200);
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(3), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(3L), Mockito.any(), Mockito.any()))
.thenReturn(processInstance300);
// for DependentExecute.getDependTaskResult
@ -249,7 +249,7 @@ public class DependentTaskTest {
DependentTaskModel dependentTaskModel = new DependentTaskModel();
dependentTaskModel.setRelation(DependentRelation.AND);
dependentTaskModel.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, Constants.DEPENDENT_ALL, "today", "day")
getDependentItemFromTaskNode(2L, Constants.DEPENDENT_ALL, "today", "day")
).collect(Collectors.toList()));
DependentParameters dependentParameters = new DependentParameters();
@ -267,7 +267,7 @@ public class DependentTaskTest {
testDependentOnAllInit();
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS));
DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
@ -280,7 +280,7 @@ public class DependentTaskTest {
testDependentOnAllInit();
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE));
DependentTaskExecThread dependentTask = new DependentTaskExecThread(taskInstance);
@ -302,7 +302,7 @@ public class DependentTaskTest {
DependentTaskModel dependentTaskModel = new DependentTaskModel();
dependentTaskModel.setRelation(DependentRelation.AND);
dependentTaskModel.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, "A", "today", "day")
getDependentItemFromTaskNode(2L, "A", "today", "day")
).collect(Collectors.toList()));
DependentParameters dependentParameters = new DependentParameters();
@ -318,7 +318,7 @@ public class DependentTaskTest {
getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.RUNNING_EXECUTION);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.thenReturn(dependentProcessInstance);
DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
@ -373,12 +373,9 @@ public class DependentTaskTest {
/**
* DependentItem defines the condition for the dependent
*/
private DependentItem getDependentItemFromTaskNode(
int processDefinitionId, String taskName,
String date, String cycle
) {
private DependentItem getDependentItemFromTaskNode(Long processDefinitionCode, String taskName, String date, String cycle) {
DependentItem dependentItem = new DependentItem();
dependentItem.setDefinitionId(processDefinitionId);
dependentItem.setDefinitionCode(processDefinitionCode);
dependentItem.setDepTasks(taskName);
dependentItem.setDateValue(date);
dependentItem.setCycle(cycle);

19
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -1582,7 +1582,6 @@ public class ProcessService {
String locations, String connects) {
ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId);
if (processInstance != null) {
processInstance.setProcessInstanceJson(processJson);
processInstance.setGlobalParams(globalParams);
processInstance.setScheduleTime(scheduleTime);
processInstance.setLocations(locations);
@ -1911,12 +1910,12 @@ public class ProcessService {
/**
* find last scheduler process instance in the date interval
*
* @param definitionId definitionId
* @param definitionCode definitionCode
* @param dateInterval dateInterval
* @return process instance
*/
public ProcessInstance findLastSchedulerProcessInterval(int definitionId, DateInterval dateInterval) {
return processInstanceMapper.queryLastSchedulerProcess(definitionId,
public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) {
return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
dateInterval.getStartTime(),
dateInterval.getEndTime());
}
@ -1924,12 +1923,12 @@ public class ProcessService {
/**
* find last manual process instance interval
*
* @param definitionId process definition id
* @param definitionCode process definition code
* @param dateInterval dateInterval
* @return process instance
*/
public ProcessInstance findLastManualProcessInterval(int definitionId, DateInterval dateInterval) {
return processInstanceMapper.queryLastManualProcess(definitionId,
public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) {
return processInstanceMapper.queryLastManualProcess(definitionCode,
dateInterval.getStartTime(),
dateInterval.getEndTime());
}
@ -1937,13 +1936,13 @@ public class ProcessService {
/**
* find last running process instance
*
* @param definitionId process definition id
* @param definitionCode process definition code
* @param startTime start time
* @param endTime end time
* @return process instance
*/
public ProcessInstance findLastRunningProcess(int definitionId, Date startTime, Date endTime) {
return processInstanceMapper.queryLastRunningProcess(definitionId,
public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) {
return processInstanceMapper.queryLastRunningProcess(definitionCode,
startTime,
endTime,
stateArray);

17
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -315,28 +315,11 @@ public class ProcessServiceTest {
public void testRecurseFindSubProcessId() {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(10L);
processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\""
+ ",\"dependence\":{},\"description\":\"\",\"id\":\"tasks-76544\""
+ ",\"maxRetryTimes\":\"0\",\"name\":\"test\",\"params\":{\"localParams\":[],"
+ "\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[],\"processDefinitionId\""
+ ":\"222\"},\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\",\"timeout\":{\"enable\":false,\"interval\":"
+ "null,\"strategy\":\"\"},\"type\":\"SHELL\",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],"
+ "\"tenantId\":4,\"timeout\":0}");
int parentId = 111;
List<Integer> ids = new ArrayList<>();
ProcessDefinition processDefinition2 = new ProcessDefinition();
processDefinition2.setCode(11L);
processDefinition2.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\""
+ ":{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{},"
+ "\"description\":\"\",\"id\":\"tasks-76544\",\"maxRetryTimes\":\"0\",\"name\":\"test\","
+ "\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]},"
+ "\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":"
+ "\"MEDIUM\",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":"
+ "\"SHELL\",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
Mockito.when(processDefineMapper.selectById(parentId)).thenReturn(processDefinition);
List<ProcessTaskRelationLog> relationLogList = new ArrayList<>();
Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong()
, Mockito.anyInt()))

Loading…
Cancel
Save