Browse Source

[Feature-#6268][server-master] Serial execte process (#6185)

* add serial processInstance

* fix bug

* add test

* fix code style

* fix style code

* add sql

* fix sql error

* add api

* add test

* fix code style

* modify api

* delete column , fix mapper

* fix unimport

* fix test

* fix bug of missing param for Python

* fix code style

* fix test

* fix test

Co-authored-by: wangxj <wangxj31>
3.0.0/version-upgrade
wangxj3 3 years ago committed by GitHub
parent
commit
dc8b87e473
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  2. 7
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  3. 168
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  4. 9
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
  5. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  6. 6
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  7. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
  8. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
  9. 79
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProcessExecutionTypeEnum.java
  10. 18
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
  11. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java
  12. 11
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
  13. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
  14. 4
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
  15. 10
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
  16. 2
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml
  17. 31
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  18. 1
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
  19. 8
      dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java
  20. 29
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  21. 35
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
  22. 77
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  23. 83
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  24. 3
      sql/dolphinscheduler_h2.sql
  25. 3
      sql/dolphinscheduler_mysql.sql
  26. 3
      sql/dolphinscheduler_postgre.sql

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@ -126,9 +127,10 @@ public class ProcessDefinitionController extends BaseController {
@RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout,
@RequestParam(value = "tenantCode", required = true) String tenantCode,
@RequestParam(value = "taskRelationJson", required = true) String taskRelationJson,
@RequestParam(value = "taskDefinitionJson", required = true) String taskDefinitionJson) {
@RequestParam(value = "taskDefinitionJson", required = true) String taskDefinitionJson,
@RequestParam(value = "executionType", defaultValue = "PARALLEL") ProcessExecutionTypeEnum executionType) {
Map<String, Object> result = processDefinitionService.createProcessDefinition(loginUser, projectCode, name, description, globalParams,
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson);
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType);
return returnDataList(result);
}
@ -244,10 +246,11 @@ public class ProcessDefinitionController extends BaseController {
@RequestParam(value = "tenantCode", required = true) String tenantCode,
@RequestParam(value = "taskRelationJson", required = true) String taskRelationJson,
@RequestParam(value = "taskDefinitionJson", required = true) String taskDefinitionJson,
@RequestParam(value = "executionType", defaultValue = "PARALLEL") ProcessExecutionTypeEnum executionType,
@RequestParam(value = "releaseState", required = false, defaultValue = "OFFLINE") ReleaseState releaseState) {
Map<String, Object> result = processDefinitionService.updateProcessDefinition(loginUser, projectCode, name, code, description, globalParams,
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson);
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType);
// If the update fails, the result will be returned directly
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return returnDataList(result);

7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.dao.entity.User;
@ -56,7 +57,8 @@ public interface ProcessDefinitionService {
int timeout,
String tenantCode,
String taskRelationJson,
String taskDefinitionJson);
String taskDefinitionJson,
ProcessExecutionTypeEnum executionType);
/**
* query process definition list
@ -164,7 +166,8 @@ public interface ProcessDefinitionService {
int timeout,
String tenantCode,
String taskRelationJson,
String taskDefinitionJson);
String taskDefinitionJson,
ProcessExecutionTypeEnum executionType);
/**
* verify process definition name unique

168
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.graph.DAG;
@ -72,8 +73,6 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.permission.PermissionCheck;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang.StringUtils;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -163,15 +162,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* create process definition
*
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param description description
* @param globalParams global params
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param description description
* @param globalParams global params
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return create result code
*/
@ -186,7 +185,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
int timeout,
String tenantCode,
String taskRelationJson,
String taskDefinitionJson) {
String taskDefinitionJson,
ProcessExecutionTypeEnum executionType) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
@ -227,7 +227,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description,
globalParams, locations, timeout, loginUser.getId(), tenantId);
globalParams, locations, timeout, loginUser.getId(), tenantId);
processDefinition.setExecutionType(executionType);
return createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs);
}
@ -292,8 +294,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
List<ProcessTaskRelation> processTaskRelations = taskRelationList.stream()
.map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class))
.collect(Collectors.toList());
.map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class))
.collect(Collectors.toList());
List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
if (taskNodeList.size() != taskRelationList.size()) {
Set<Long> postTaskCodes = taskRelationList.stream().map(ProcessTaskRelationLog::getPostTaskCode).collect(Collectors.toSet());
@ -301,7 +303,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Collection<Long> codes = CollectionUtils.subtract(postTaskCodes, taskNodeCodes);
if (CollectionUtils.isNotEmpty(codes)) {
logger.error("the task code is not exit");
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, StringUtils.join(codes, Constants.COMMA));
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, org.apache.commons.lang.StringUtils.join(codes, Constants.COMMA));
return result;
}
}
@ -330,7 +332,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query process definition list
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @return definition list
*/
@ -352,12 +354,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query process definition list paging
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param searchVal search value
* @param userId user id
* @param pageNo page number
* @param pageSize page size
* @param searchVal search value
* @param userId user id
* @param pageNo page number
* @param pageSize page size
* @return process definition page
*/
@Override
@ -374,7 +376,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Page<ProcessDefinition> page = new Page<>(pageNo, pageSize);
IPage<ProcessDefinition> processDefinitionIPage = processDefinitionMapper.queryDefineListPaging(
page, searchVal, userId, project.getCode(), isAdmin(loginUser));
page, searchVal, userId, project.getCode(), isAdmin(loginUser));
List<ProcessDefinition> records = processDefinitionIPage.getRecords();
for (ProcessDefinition pd : records) {
@ -395,9 +397,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query detail of process definition
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param code process definition code
* @return process definition detail
*/
@Override
@ -447,16 +449,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* update process definition
*
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param code process definition code
* @param description description
* @param globalParams global params
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param code process definition code
* @param description description
* @param globalParams global params
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return update result code
*/
@ -472,7 +474,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
int timeout,
String tenantCode,
String taskRelationJson,
String taskDefinitionJson) {
String taskDefinitionJson,
ProcessExecutionTypeEnum executionType) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
@ -522,6 +525,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
ProcessDefinition processDefinitionDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class);
processDefinition.set(projectCode, name, description, globalParams, locations, timeout, tenantId);
processDefinition.setExecutionType(executionType);
return updateDagDefine(loginUser, taskRelationList, processDefinition, processDefinitionDeepCopy, taskDefinitionLogs);
}
@ -551,7 +555,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs);
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
@ -565,9 +569,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* verify process definition name unique
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param name name
* @param name name
* @return true if process definition name not exists, otherwise false
*/
@Override
@ -590,9 +594,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* delete process definition by code
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param code process definition code
* @return delete result code
*/
@Override
@ -661,9 +665,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* release process definition: online / offline
*
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param releaseState release state
* @return release result code
*/
@ -689,7 +693,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
case ONLINE:
// To check resources whether they are already cancel authorized or deleted
String resourceIds = processDefinition.getResourceIds();
if (StringUtils.isNotBlank(resourceIds)) {
if (org.apache.commons.lang.StringUtils.isNotBlank(resourceIds)) {
Integer[] resourceIdArray = Arrays.stream(resourceIds.split(Constants.COMMA)).map(Integer::parseInt).toArray(Integer[]::new);
PermissionCheck<Integer> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE_ID, processService, resourceIdArray, loginUser.getId(), logger);
try {
@ -708,7 +712,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinition.setReleaseState(releaseState);
int updateProcess = processDefinitionMapper.updateById(processDefinition);
List<Schedule> scheduleList = scheduleMapper.selectAllByProcessDefineArray(
new long[]{processDefinition.getCode()}
new long[]{processDefinition.getCode()}
);
if (updateProcess > 0 && scheduleList.size() == 1) {
Schedule schedule = scheduleList.get(0);
@ -737,7 +741,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
*/
@Override
public void batchExportProcessDefinitionByCodes(User loginUser, long projectCode, String codes, HttpServletResponse response) {
if (StringUtils.isEmpty(codes)) {
if (org.apache.commons.lang.StringUtils.isEmpty(codes)) {
return;
}
Project project = projectMapper.queryByCode(projectCode);
@ -807,9 +811,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* import process definition
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param file process metadata json file
* @param file process metadata json file
* @return import process
*/
@Override
@ -1017,9 +1021,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* get task node details based on process definition
*
* @param loginUser loginUser
* @param loginUser loginUser
* @param projectCode project code
* @param code process definition code
* @param code process definition code
* @return task node list
*/
@Override
@ -1046,9 +1050,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* get task node details map based on process definition
*
* @param loginUser loginUser
* @param loginUser loginUser
* @param projectCode project code
* @param codes define codes
* @param codes define codes
* @return task node list
*/
@Override
@ -1083,7 +1087,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query process definition all by project code
*
* @param loginUser loginUser
* @param loginUser loginUser
* @param projectCode project code
* @return process definitions in the project
*/
@ -1105,7 +1109,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* Encapsulates the TreeView structure
*
* @param code process definition code
* @param code process definition code
* @param limit limit
* @return tree view json data
*/
@ -1130,7 +1134,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processInstanceList.forEach(processInstance -> processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime())));
List<TaskDefinitionLog> taskDefinitionList = processService.genTaskDefineList(processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()));
Map<Long, TaskDefinitionLog> taskDefinitionMap = taskDefinitionList.stream()
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
if (limit > processInstanceList.size()) {
limit = processInstanceList.size();
@ -1145,8 +1149,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessInstance processInstance = processInstanceList.get(i);
Date endTime = processInstance.getEndTime() == null ? new Date() : processInstance.getEndTime();
parentTreeViewDto.getInstances().add(new Instance(processInstance.getId(), processInstance.getName(), processInstance.getProcessDefinitionCode(),
"", processInstance.getState().toString(), processInstance.getStartTime(), endTime, processInstance.getHost(),
DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime())));
"", processInstance.getState().toString(), processInstance.getStartTime(), endTime, processInstance.getHost(),
DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime())));
}
List<TreeViewDto> parentTreeViewDtoList = new ArrayList<>();
@ -1184,11 +1188,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (taskInstance.isSubProcess()) {
TaskDefinition taskDefinition = taskDefinitionMap.get(taskInstance.getTaskCode());
subProcessId = Integer.parseInt(JSONUtils.parseObject(
taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText());
taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText());
}
treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskCode(),
taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(),
taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessId));
taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(),
taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessId));
}
}
for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) {
@ -1249,9 +1253,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* batch copy process definition
*
* @param loginUser loginUser
* @param projectCode projectCode
* @param codes processDefinitionCodes
* @param loginUser loginUser
* @param projectCode projectCode
* @param codes processDefinitionCodes
* @param targetProjectCode targetProjectCode
*/
@Override
@ -1272,9 +1276,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* batch move process definition
*
* @param loginUser loginUser
* @param projectCode projectCode
* @param codes processDefinitionCodes
* @param loginUser loginUser
* @param projectCode projectCode
* @param codes processDefinitionCodes
* @param targetProjectCode targetProjectCode
*/
@Override
@ -1307,7 +1311,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
if (StringUtils.isEmpty(processDefinitionCodes)) {
if (org.apache.commons.lang.StringUtils.isEmpty(processDefinitionCodes)) {
putMsg(result, Status.PROCESS_DEFINITION_CODES_IS_EMPTY, processDefinitionCodes);
return result;
}
@ -1337,7 +1341,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
diffCode.forEach(code -> failedProcessList.add(code + "[null]"));
for (ProcessDefinition processDefinition : processDefinitionList) {
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
List<ProcessTaskRelationLog> taskRelationList = processTaskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
processDefinition.setProjectCode(targetProjectCode);
if (isCopy) {
@ -1373,10 +1377,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* switch the defined process definition version
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param version the version user want to switch
* @param code process definition code
* @param version the version user want to switch
* @return switch process definition version result code
*/
@Override
@ -1412,11 +1416,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* check batch operate result
*
* @param srcProjectCode srcProjectCode
* @param srcProjectCode srcProjectCode
* @param targetProjectCode targetProjectCode
* @param result result
* @param result result
* @param failedProcessList failedProcessList
* @param isCopy isCopy
* @param isCopy isCopy
*/
private void checkBatchOperateResult(long srcProjectCode, long targetProjectCode,
Map<String, Object> result, List<String> failedProcessList, boolean isCopy) {
@ -1434,11 +1438,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query the pagination versions info by one certain process definition code
*
* @param loginUser login user info to check auth
* @param loginUser login user info to check auth
* @param projectCode project code
* @param pageNo page number
* @param pageSize page size
* @param code process definition code
* @param pageNo page number
* @param pageSize page size
* @param code process definition code
* @return the pagination process definition versions info of the certain process definition
*/
@Override
@ -1468,10 +1472,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* delete one certain process definition by version number and process definition code
*
* @param loginUser login user info to check auth
* @param loginUser login user info to check auth
* @param projectCode project code
* @param code process definition code
* @param version version number
* @param code process definition code
* @param version version number
* @return delete result code
*/
@Override

9
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@ -94,10 +95,10 @@ public class ProcessDefinitionControllerTest {
result.put(Constants.DATA_LIST, 1);
Mockito.when(processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams,
locations, timeout, tenantCode, relationJson, taskDefinitionJson)).thenReturn(result);
locations, timeout, tenantCode, relationJson, taskDefinitionJson, ProcessExecutionTypeEnum.PARALLEL)).thenReturn(result);
Result response = processDefinitionController.createProcessDefinition(user, projectCode, name, description, globalParams,
locations, timeout, tenantCode, relationJson, taskDefinitionJson);
locations, timeout, tenantCode, relationJson, taskDefinitionJson,ProcessExecutionTypeEnum.PARALLEL);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue());
}
@ -157,10 +158,10 @@ public class ProcessDefinitionControllerTest {
result.put("processDefinitionId", 1);
Mockito.when(processDefinitionService.updateProcessDefinition(user, projectCode, name, code, description, globalParams,
locations, timeout, tenantCode, relationJson, taskDefinitionJson)).thenReturn(result);
locations, timeout, tenantCode, relationJson, taskDefinitionJson,ProcessExecutionTypeEnum.PARALLEL)).thenReturn(result);
Result response = processDefinitionController.updateProcessDefinition(user, projectCode, name, code, description, globalParams,
locations, timeout, tenantCode, relationJson, taskDefinitionJson, ReleaseState.OFFLINE);
locations, timeout, tenantCode, relationJson, taskDefinitionJson, ProcessExecutionTypeEnum.PARALLEL, ReleaseState.OFFLINE);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue());
}

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
@ -610,7 +611,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Map<String, Object> updateResult = processDefinitionService.updateProcessDefinition(loginUser, projectCode, "test", 1,
"", "", "", 0, "root", null, null);
"", "", "", 0, "root", null, null, ProcessExecutionTypeEnum.PARALLEL);
Assert.assertEquals(Status.DATA_IS_NOT_VALID, updateResult.get(Constants.STATUS));
}

6
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -876,6 +876,12 @@ public final class Constants {
ExecutionStatus.WAITING_DEPEND.ordinal()
};
public static final int[] RUNNING_PROCESS_STATE = new int[] {
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.SERIAL_WAIT.ordinal()
};
/**
* status
*/

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java

@ -50,7 +50,9 @@ public enum CommandType {
REPEAT_RUNNING(7, "repeat running a process"),
PAUSE(8, "pause a process"),
STOP(9, "stop a process"),
RECOVER_WAITING_THREAD(10, "recover waiting thread");
RECOVER_WAITING_THREAD(10, "recover waiting thread"),
RECOVER_SERIAL_WAIT(11, "recover serial wait"),
;
CommandType(int code, String descp){
this.code = code;

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java

@ -56,7 +56,8 @@ public enum ExecutionStatus {
WAITING_THREAD(10, "waiting thread"),
WAITING_DEPEND(11, "waiting depend node complete"),
DELAY_EXECUTION(12, "delay execution"),
FORCED_SUCCESS(13, "forced success");
FORCED_SUCCESS(13, "forced success"),
SERIAL_WAIT(14, "serial wait");
ExecutionStatus(int code, String descp) {
this.code = code;

79
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProcessExecutionTypeEnum.java

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
import java.util.HashMap;
import com.baomidou.mybatisplus.annotation.EnumValue;
public enum ProcessExecutionTypeEnum {
PARALLEL(0, "parallel"),
SERIAL_WAIT(1, "serial wait"),
SERIAL_DISCARD(2, "serial discard"),
SERIAL_PRIORITY(3, "serial priority");
ProcessExecutionTypeEnum(int code, String descp) {
this.code = code;
this.descp = descp;
}
@EnumValue
private final int code;
private final String descp;
private static HashMap<Integer, ProcessExecutionTypeEnum> EXECUTION_STATUS_MAP = new HashMap<>();
static {
for (ProcessExecutionTypeEnum executionType : ProcessExecutionTypeEnum.values()) {
EXECUTION_STATUS_MAP.put(executionType.code, executionType);
}
}
public boolean typeIsSerial() {
return this != PARALLEL;
}
public boolean typeIsSerialWait() {
return this == SERIAL_WAIT;
}
public boolean typeIsSerialDiscard() {
return this == SERIAL_DISCARD;
}
public boolean typeIsSerialPriority() {
return this == SERIAL_PRIORITY;
}
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
public static ProcessExecutionTypeEnum of(int executionType) {
if (EXECUTION_STATUS_MAP.containsKey(executionType)) {
return EXECUTION_STATUS_MAP.get(executionType);
}
throw new IllegalArgumentException("invalid status : " + executionType);
}
}

18
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -174,8 +175,12 @@ public class ProcessDefinition {
@TableField(exist = false)
private int warningGroupId;
public ProcessDefinition() {
}
/**
* execution type
*/
private ProcessExecutionTypeEnum executionType;
public ProcessDefinition() { }
public ProcessDefinition(long projectCode,
String name,
@ -412,6 +417,14 @@ public class ProcessDefinition {
this.warningGroupId = warningGroupId;
}
public ProcessExecutionTypeEnum getExecutionType() {
return executionType;
}
public void setExecutionType(ProcessExecutionTypeEnum executionType) {
this.executionType = executionType;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -430,6 +443,7 @@ public class ProcessDefinition {
&& Objects.equals(description, that.description)
&& Objects.equals(globalParams, that.globalParams)
&& flag == that.flag
&& executionType == that.executionType
&& Objects.equals(locations, that.locations);
}

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java

@ -67,6 +67,7 @@ public class ProcessDefinitionLog extends ProcessDefinition {
this.setModifyBy(processDefinition.getModifyBy());
this.setResourceIds(processDefinition.getResourceIds());
this.setWarningGroupId(processDefinition.getWarningGroupId());
this.setExecutionType(processDefinition.getExecutionType());
}
public int getOperator() {
@ -89,4 +90,5 @@ public class ProcessDefinitionLog extends ProcessDefinition {
public boolean equals(Object o) {
return super.equals(o);
}
}

11
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java

@ -238,6 +238,10 @@ public class ProcessInstance {
* varPool string
*/
private String varPool;
/**
* serial queue next processInstanceId
*/
private int nextProcessInstanceId;
/**
* dry run flag
@ -706,4 +710,11 @@ public class ProcessInstance {
return Objects.hash(id);
}
public int getNextProcessInstanceId() {
return nextProcessInstanceId;
}
public void setNextProcessInstanceId(int nextProcessInstanceId) {
this.nextProcessInstanceId = nextProcessInstanceId;
}
}

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

@ -233,6 +233,13 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
List<ProcessInstance> queryByProcessDefineCodeAndStatus(@Param("processDefinitionCode") Long processDefinitionCode,
@Param("states") int[] states);
List<ProcessInstance> queryByProcessDefineCodeAndStatusAndNextId(@Param("processDefinitionCode") Long processDefinitionCode,
@Param("states") int[] states, @Param("id") int id);
int updateGlobalParamsById(@Param("globalParams") String globalParams,
@Param("id") int id);
boolean updateNextProcessIdById(@Param("thisInstanceId") int thisInstanceId, @Param("runningInstanceId")int runningInstanceId);
ProcessInstance loadNextProcess4Serial(@Param("processDefinitionCode") Long processDefinitionCode,@Param("state") int state);
}

4
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml

@ -22,14 +22,14 @@
<sql id="baseSql">
id, code, name, version, description, project_code,
release_state, user_id,global_params, flag, locations,
warning_group_id, timeout, tenant_id,operator, operate_time, create_time,
warning_group_id, timeout, tenant_id,operator,execution_type, operate_time, create_time,
update_time
</sql>
<select id="queryByDefinitionName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
select pd.id, pd.code, pd.name, pd.version, pd.description, pd.project_code,
pd.release_state, pd.user_id,pd.global_params, pd.flag, pd.locations,
pd.warning_group_id, pd.timeout, pd.tenant_id,pd.operator, pd.operate_time, pd.create_time,
pd.update_time, u.user_name,p.name as project_name
pd.update_time, u.user_name,p.name as project_name ,pd.execution_type
from t_ds_process_definition_log pd
JOIN t_ds_user u ON pd.user_id = u.id
JOIN t_ds_project p ON pd.project_code = p.code

10
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml

@ -20,12 +20,12 @@
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper">
<sql id="baseSql">
id, code, name, version, release_state, project_code, user_id, description,
global_params, flag, locations, warning_group_id, create_time, timeout, tenant_id, update_time
global_params, flag, locations, warning_group_id, create_time, timeout, tenant_id, update_time,execution_type
</sql>
<select id="verifyByDefineName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
select pd.id, pd.code, pd.name, pd.version, pd.release_state, pd.project_code, pd.user_id, pd.description,
pd.global_params, pd.flag, pd.locations, pd.warning_group_id, pd.create_time, pd.timeout, pd.tenant_id, pd.update_time
pd.global_params, pd.flag, pd.locations, pd.warning_group_id, pd.create_time, pd.timeout, pd.tenant_id, pd.update_time,pd.execution_type
from t_ds_process_definition pd
WHERE pd.project_code = #{projectCode}
and pd.name = #{processDefinitionName}
@ -58,7 +58,7 @@
<select id="queryByDefineName" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
select pd.id, pd.code, pd.name, pd.version, pd.release_state, pd.project_code, p.id as project_id, pd.user_id, pd.description,
pd.global_params, pd.flag, pd.locations, pd.warning_group_id, pd.create_time, pd.timeout, pd.tenant_id, pd.update_time,
u.user_name,p.name as project_name,t.tenant_code,q.queue,q.queue_name
u.user_name,p.name as project_name,t.tenant_code,q.queue,q.queue_name,pd.execution_type
from t_ds_process_definition pd
JOIN t_ds_user u ON pd.user_id = u.id
JOIN t_ds_project p ON pd.project_code = p.code
@ -70,7 +70,7 @@
<select id="queryDefineListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
SELECT td.id, td.code, td.name, td.version, td.release_state, td.project_code, td.user_id, td.description,
td.global_params, td.flag, td.warning_group_id, td.timeout, td.tenant_id, td.update_time, td.create_time,
sc.schedule_release_state, tu.user_name
sc.schedule_release_state, tu.user_name ,td.execution_type
FROM t_ds_process_definition td
left join (select process_definition_code,release_state as schedule_release_state from t_ds_schedules group by
process_definition_code,release_state) sc on sc.process_definition_code = td.code
@ -127,7 +127,7 @@
SELECT
pd.id, pd.code, pd.name, pd.version, pd.release_state, pd.project_code, pd.user_id, pd.description,
pd.global_params, pd.flag, pd.locations, pd.warning_group_id, pd.create_time, pd.timeout,
pd.tenant_id, pd.update_time, u.user_name,p.name AS project_name
pd.tenant_id, pd.update_time, u.user_name,p.name AS project_name ,pd.execution_type
FROM
t_ds_process_definition pd,
t_ds_user u,

2
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml

@ -19,7 +19,7 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper">
<sql id="baseSql">
id, parent_process_instance_id, parent_task_instance_id, process_instance_id
id, parent_process_instance_id, parent_task_instance_id, process_instance_id,next_process_instance_id
</sql>
<delete id="deleteByParentProcessId">
delete

31
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -23,7 +23,7 @@
command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type,
warning_group_id, schedule_time, command_start_time, global_params, flag,
update_time, is_sub_process, executor_id, history_cmd,
process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool, dry_run
process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool, dry_run,next_process_instance_id
</sql>
<select id="queryDetailById" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
@ -90,7 +90,7 @@
<select id="queryProcessInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select instance.id, instance.command_type, instance.executor_id, instance.process_definition_version,
instance.process_definition_code, instance.name, instance.state, instance.schedule_time, instance.start_time,
instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run
instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run ,instance.next_process_instance_id
from t_ds_process_instance instance
join t_ds_process_definition define ON instance.process_definition_code = define.code
where instance.is_sub_process=0
@ -218,9 +218,36 @@
</foreach>
order by id asc
</select>
<select id="queryByProcessDefineCodeAndStatusAndNextId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_code=#{processDefinitionCode}
and state in
<foreach collection="states" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
and next_process_instance_id=0
and id <![CDATA[ < ]]> #{id}
order by id desc
</select>
<select id="loadNextProcess4Serial" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_code=#{processDefinitionCode}
and state = #{state}
and id <![CDATA[ < ]]> #{id}
order by id desc limit 1
</select>
<update id="updateGlobalParamsById">
update t_ds_process_instance
set global_params = #{globalParams}
where id = #{id}
</update>
<update id="updateNextProcessIdById">
update t_ds_process_instance
set next_process_instance_id = #{thisInstanceId}
where id = #{runningInstanceId} and next_process_instance_id=0
</update>
</mapper>

1
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py

@ -281,6 +281,7 @@ class ProcessDefinition(Base):
# TODO add serialization function
json.dumps(self.task_relation_json),
json.dumps(self.task_definition_json),
None,
)
return self._process_definition_code

8
dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
@ -175,7 +176,8 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
int timeout,
String tenantCode,
String taskRelationJson,
String taskDefinitionJson) {
String taskDefinitionJson,
ProcessExecutionTypeEnum executionType) {
User user = usersService.queryUser(userName);
Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST);
long projectCode = project.getCode();
@ -189,12 +191,12 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
// make sure process definition offline which could edit
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE);
Map<String, Object> result = processDefinitionService.updateProcessDefinition(user, projectCode, name, processDefinitionCode, description, globalParams,
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson);
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType);
return processDefinitionCode;
} else if (verifyStatus == Status.SUCCESS) {
// create process definition
Map<String, Object> result = processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams,
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson);
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType);
ProcessDefinition processDefinition = (ProcessDefinition) result.get(Constants.DATA_LIST);
return processDefinition.getCode();
} else {

29
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.runner;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
@ -48,6 +49,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -510,6 +512,10 @@ public class WorkflowExecuteThread implements Runnable {
private void endProcess() {
this.stateEvents.clear();
processInstance.setEndTime(new Date());
ProcessDefinition processDefinition = this.processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),processInstance.getProcessDefinitionVersion());
if (processDefinition.getExecutionType().typeIsSerialWait()) {
checkSerialProcess(processDefinition);
}
processService.updateProcessInstance(processInstance);
if (processInstance.getState().typeIsWaitingThread()) {
processService.createRecoveryWaitingThreadCommand(null, processInstance);
@ -519,6 +525,29 @@ public class WorkflowExecuteThread implements Runnable {
processAlertManager.sendAlertProcessInstance(processInstance, taskInstances, projectUser);
}
public void checkSerialProcess(ProcessDefinition processDefinition) {
this.processInstance = processService.findProcessInstanceById(processInstance.getId());
int nextInstanceId = processInstance.getNextProcessInstanceId();
if (nextInstanceId == 0) {
ProcessInstance nextProcessInstance = this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(),ExecutionStatus.SERIAL_WAIT.getCode());
if (nextProcessInstance == null) {
return;
}
nextInstanceId = nextProcessInstance.getId();
}
ProcessInstance nextProcessInstance = this.processService.findProcessInstanceById(nextInstanceId);
if (nextProcessInstance.getState().typeIsFinished() || nextProcessInstance.getState().typeIsRunning()) {
return;
}
Map<String, Object> cmdParam = new HashMap<>();
cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, nextInstanceId);
Command command = new Command();
command.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
command.setProcessDefinitionCode(processDefinition.getCode());
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
processService.createCommand(command);
}
/**
* generate process dag
*

35
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java

@ -29,6 +29,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -111,9 +112,7 @@ public class WorkflowExecuteThreadTest {
Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
dag.setAccessible(true);
dag.set(workflowExecuteThread, new DAG());
PowerMockito.doNothing().when(workflowExecuteThread, "executeProcess");
PowerMockito.doNothing().when(workflowExecuteThread, "prepareProcess");
PowerMockito.doNothing().when(workflowExecuteThread, "runProcess");
PowerMockito.doNothing().when(workflowExecuteThread, "endProcess");
}
@ -256,6 +255,36 @@ public class WorkflowExecuteThreadTest {
}
}
@Test
public void testCheckSerialProcess() {
try {
ProcessDefinition processDefinition1 = new ProcessDefinition();
processDefinition1.setId(123);
processDefinition1.setName("test");
processDefinition1.setVersion(1);
processDefinition1.setCode(11L);
processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_WAIT);
Mockito.when(processInstance.getId()).thenReturn(225);
Mockito.when(processService.findProcessInstanceById(225)).thenReturn(processInstance);
workflowExecuteThread.checkSerialProcess(processDefinition1);
Mockito.when(processInstance.getId()).thenReturn(225);
Mockito.when(processInstance.getNextProcessInstanceId()).thenReturn(222);
ProcessInstance processInstance9 = new ProcessInstance();
processInstance9.setId(222);
processInstance9.setProcessDefinitionCode(11L);
processInstance9.setProcessDefinitionVersion(1);
processInstance9.setState(ExecutionStatus.SERIAL_WAIT);
Mockito.when(processService.findProcessInstanceById(225)).thenReturn(processInstance);
Mockito.when(processService.findProcessInstanceById(222)).thenReturn(processInstance9);
workflowExecuteThread.checkSerialProcess(processDefinition1);
} catch (Exception e) {
Assert.fail();
}
}
private List<Schedule> zeroSchedulerList() {
return Collections.emptyList();
}
@ -268,4 +297,4 @@ public class WorkflowExecuteThreadTest {
return schedulerList;
}
}
}

77
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -100,6 +100,8 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
@ -200,6 +202,10 @@ public class ProcessService {
@Autowired
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Autowired
StateEventCallbackService stateEventCallbackService;
@Autowired
private EnvironmentMapper environmentMapper;
@ -222,12 +228,77 @@ public class ProcessService {
}
processInstance.setCommandType(command.getCommandType());
processInstance.addHistoryCmd(command.getCommandType());
saveProcessInstance(processInstance);
//if the processDefination is serial
ProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
if (processDefinition.getExecutionType().typeIsSerial()) {
saveSerialProcess(processInstance,processDefinition);
if (processInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
this.setSubProcessParam(processInstance);
this.commandMapper.deleteById(command.getId());
return null;
}
} else {
saveProcessInstance(processInstance);
}
this.setSubProcessParam(processInstance);
this.commandMapper.deleteById(command.getId());
return processInstance;
}
private void saveSerialProcess(ProcessInstance processInstance,ProcessDefinition processDefinition) {
processInstance.setState(ExecutionStatus.SERIAL_WAIT);
saveProcessInstance(processInstance);
//serial wait
//when we get the running instance(or waiting instance) only get the priority instance(by id)
if (processDefinition.getExecutionType().typeIsSerialWait()) {
while (true) {
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
Constants.RUNNING_PROCESS_STATE,processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
saveProcessInstance(processInstance);
return;
}
ProcessInstance runningProcess = runningProcessInstances.get(0);
if (this.processInstanceMapper.updateNextProcessIdById(processInstance.getId(), runningProcess.getId())) {
return;
}
}
} else if (processDefinition.getExecutionType().typeIsSerialDiscard()) {
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
Constants.RUNNING_PROCESS_STATE,processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.STOP);
saveProcessInstance(processInstance);
}
} else if (processDefinition.getExecutionType().typeIsSerialPriority()) {
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
Constants.RUNNING_PROCESS_STATE,processInstance.getId());
if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
for (ProcessInstance info : runningProcessInstances) {
info.setCommandType(CommandType.STOP);
info.addHistoryCmd(CommandType.STOP);
info.setState(ExecutionStatus.READY_STOP);
int update = updateProcessInstance(info);
// determine whether the process is normal
if (update > 0) {
String host = info.getHost();
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
info.getId(), 0, info.getState(), info.getId(), 0
);
try {
stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
} catch (Exception e) {
logger.error("sendResultError");
}
}
}
}
}
}
/**
* save error command, and delete original command
*
@ -2495,4 +2566,8 @@ public class ProcessService {
}
return processTaskMap;
}
public ProcessInstance loadNextProcess4Serial(long code, int state) {
return this.processInstanceMapper.loadNextProcess4Serial(code,state);
}
}

83
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.any;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
@ -86,7 +87,7 @@ import com.fasterxml.jackson.databind.JsonNode;
/**
* process service test
*/
@RunWith(MockitoJUnitRunner.class)
@RunWith(MockitoJUnitRunner.Silent.class)
public class ProcessServiceTest {
private static final Logger logger = LoggerFactory.getLogger(CronUtilsTest.class);
@ -257,16 +258,25 @@ public class ProcessServiceTest {
command1.setProcessDefinitionVersion(definitionVersion);
command1.setCommandParam("{\"ProcessInstanceId\":222}");
command1.setCommandType(CommandType.START_PROCESS);
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(123);
processDefinition.setName("test");
processDefinition.setVersion(definitionVersion);
processDefinition.setCode(definitionCode);
processDefinition.setGlobalParams("[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
processDefinition.setExecutionType(ProcessExecutionTypeEnum.PARALLEL);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(222);
processInstance.setProcessDefinitionCode(11L);
processInstance.setHost("127.0.0.1:5678");
processInstance.setProcessDefinitionVersion(1);
processInstance.setId(processInstanceId);
processInstance.setProcessDefinitionCode(definitionCode);
processInstance.setProcessDefinitionVersion(definitionVersion);
Mockito.when(processDefineMapper.queryByCode(command1.getProcessDefinitionCode())).thenReturn(processDefinition);
Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
@ -309,6 +319,77 @@ public class ProcessServiceTest {
command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5, processDefinitionCacheMaps);
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
ProcessDefinition processDefinition1 = new ProcessDefinition();
processDefinition1.setId(123);
processDefinition1.setName("test");
processDefinition1.setVersion(1);
processDefinition1.setCode(11L);
processDefinition1.setVersion(1);
processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_WAIT);
List<ProcessInstance> lists = new ArrayList<>();
ProcessInstance processInstance11 = new ProcessInstance();
processInstance11.setId(222);
processInstance11.setProcessDefinitionCode(11L);
processInstance11.setProcessDefinitionVersion(1);
processInstance11.setHost("127.0.0.1:5678");
lists.add(processInstance11);
ProcessInstance processInstance2 = new ProcessInstance();
processInstance2.setId(223);
processInstance2.setProcessDefinitionCode(11L);
processInstance2.setProcessDefinitionVersion(1);
Mockito.when(processInstanceMapper.queryDetailById(223)).thenReturn(processInstance2);
Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1);
Assert.assertNotNull(processService.handleCommand(logger, host, command1, processDefinitionCacheMaps));
Command command6 = new Command();
command6.setProcessDefinitionCode(11L);
command6.setCommandParam("{\"ProcessInstanceId\":223}");
command6.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
command6.setProcessDefinitionVersion(1);
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L,Constants.RUNNING_PROCESS_STATE,223)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateNextProcessIdById(223, 222)).thenReturn(true);
ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6, processDefinitionCacheMaps);
Assert.assertTrue(processInstance6 != null);
processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_DISCARD);
Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1);
ProcessInstance processInstance7 = new ProcessInstance();
processInstance7.setId(224);
processInstance7.setProcessDefinitionCode(11L);
processInstance7.setProcessDefinitionVersion(1);
Mockito.when(processInstanceMapper.queryDetailById(224)).thenReturn(processInstance7);
Command command7 = new Command();
command7.setProcessDefinitionCode(11L);
command7.setCommandParam("{\"ProcessInstanceId\":224}");
command7.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
command7.setProcessDefinitionVersion(1);
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L,Constants.RUNNING_PROCESS_STATE,224)).thenReturn(null);
ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7, processDefinitionCacheMaps);
Assert.assertTrue(processInstance8 == null);
ProcessDefinition processDefinition2 = new ProcessDefinition();
processDefinition2.setId(123);
processDefinition2.setName("test");
processDefinition2.setVersion(1);
processDefinition2.setCode(12L);
processDefinition2.setExecutionType(ProcessExecutionTypeEnum.SERIAL_PRIORITY);
Mockito.when(processDefineMapper.queryByCode(12L)).thenReturn(processDefinition2);
ProcessInstance processInstance9 = new ProcessInstance();
processInstance9.setId(225);
processInstance9.setProcessDefinitionCode(11L);
processInstance9.setProcessDefinitionVersion(1);
Command command9 = new Command();
command9.setProcessDefinitionCode(12L);
command9.setCommandParam("{\"ProcessInstanceId\":225}");
command9.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
command9.setProcessDefinitionVersion(1);
Mockito.when(processInstanceMapper.queryDetailById(225)).thenReturn(processInstance9);
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(12L,Constants.RUNNING_PROCESS_STATE,0)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1);
ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9, processDefinitionCacheMaps);
Assert.assertTrue(processInstance10 == null);
}
@Test

3
sql/dolphinscheduler_h2.sql

@ -412,6 +412,7 @@ CREATE TABLE t_ds_process_definition
warning_group_id int(11) DEFAULT NULL,
timeout int(11) DEFAULT '0',
tenant_id int(11) NOT NULL DEFAULT '-1',
execution_type tinyint(4) DEFAULT '0',
create_time datetime NOT NULL,
update_time datetime DEFAULT NULL,
PRIMARY KEY (id),
@ -443,6 +444,7 @@ CREATE TABLE t_ds_process_definition_log
warning_group_id int(11) DEFAULT NULL,
timeout int(11) DEFAULT '0',
tenant_id int(11) NOT NULL DEFAULT '-1',
execution_type tinyint(4) DEFAULT '0',
operator int(11) DEFAULT NULL,
operate_time datetime DEFAULT NULL,
create_time datetime NOT NULL,
@ -595,6 +597,7 @@ CREATE TABLE t_ds_process_instance
worker_group varchar(64) DEFAULT NULL,
environment_code bigint(20) DEFAULT '-1',
timeout int(11) DEFAULT '0',
next_process_instance_id int(11) DEFAULT '0',
tenant_id int(11) NOT NULL DEFAULT '-1',
var_pool longtext,
dry_run int NULL DEFAULT 0,

3
sql/dolphinscheduler_mysql.sql

@ -414,6 +414,7 @@ CREATE TABLE `t_ds_process_definition` (
`warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id',
`timeout` int(11) DEFAULT '0' COMMENT 'time out, unit: minute',
`tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
`execution_type` tinyint(4) DEFAULT '0' COMMENT 'execution_type 0:parallel,1:serial wait,2:serial discard,3:serial priority',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
PRIMARY KEY (`id`,`code`),
@ -443,6 +444,7 @@ CREATE TABLE `t_ds_process_definition_log` (
`warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id',
`timeout` int(11) DEFAULT '0' COMMENT 'time out,unit: minute',
`tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
`execution_type` tinyint(4) DEFAULT '0' COMMENT 'execution_type 0:parallel,1:serial wait,2:serial discard,3:serial priority',
`operator` int(11) DEFAULT NULL COMMENT 'operator user id',
`operate_time` datetime DEFAULT NULL COMMENT 'operate time',
`create_time` datetime NOT NULL COMMENT 'create time',
@ -593,6 +595,7 @@ CREATE TABLE `t_ds_process_instance` (
`tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
`var_pool` longtext COMMENT 'var_pool',
`dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag:0 normal, 1 dry run',
`next_process_instance_id` int(11) DEFAULT '0' COMMENT 'serial queue next processInstanceId',
PRIMARY KEY (`id`),
KEY `process_instance_index` (`process_definition_code`,`id`) USING BTREE,
KEY `start_time_index` (`start_time`) USING BTREE

3
sql/dolphinscheduler_postgre.sql

@ -331,6 +331,7 @@ CREATE TABLE t_ds_process_definition (
flag int DEFAULT NULL ,
timeout int DEFAULT '0' ,
tenant_id int DEFAULT '-1' ,
execution_type int DEFAULT '0',
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id) ,
@ -355,6 +356,7 @@ CREATE TABLE t_ds_process_definition_log (
flag int DEFAULT NULL ,
timeout int DEFAULT '0' ,
tenant_id int DEFAULT '-1' ,
execution_type int DEFAULT '0',
operator int DEFAULT NULL ,
operate_time timestamp DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
@ -498,6 +500,7 @@ CREATE TABLE t_ds_process_instance (
tenant_id int NOT NULL DEFAULT '-1' ,
var_pool text ,
dry_run int DEFAULT '0' ,
next_process_instance_id int DEFAULT '0',
PRIMARY KEY (id)
) ;

Loading…
Cancel
Save