From dc8b87e473886723cba02b064bcde6d7947891d6 Mon Sep 17 00:00:00 2001 From: wangxj3 <857234426@qq.com> Date: Fri, 5 Nov 2021 17:25:45 +0800 Subject: [PATCH] [Feature-#6268][server-master] Serial execte process (#6185) * add serial processInstance * fix bug * add test * fix code style * fix style code * add sql * fix sql error * add api * add test * fix code style * modify api * delete column , fix mapper * fix unimport * fix test * fix bug of missing param for Python * fix code style * fix test * fix test Co-authored-by: wangxj --- .../ProcessDefinitionController.java | 9 +- .../api/service/ProcessDefinitionService.java | 7 +- .../impl/ProcessDefinitionServiceImpl.java | 168 +++++++++--------- .../ProcessDefinitionControllerTest.java | 9 +- .../service/ProcessDefinitionServiceTest.java | 3 +- .../dolphinscheduler/common/Constants.java | 6 + .../common/enums/CommandType.java | 4 +- .../common/enums/ExecutionStatus.java | 3 +- .../enums/ProcessExecutionTypeEnum.java | 79 ++++++++ .../dao/entity/ProcessDefinition.java | 18 +- .../dao/entity/ProcessDefinitionLog.java | 2 + .../dao/entity/ProcessInstance.java | 11 ++ .../dao/mapper/ProcessInstanceMapper.java | 7 + .../dao/mapper/ProcessDefinitionLogMapper.xml | 4 +- .../dao/mapper/ProcessDefinitionMapper.xml | 10 +- .../dao/mapper/ProcessInstanceMapMapper.xml | 2 +- .../dao/mapper/ProcessInstanceMapper.xml | 31 +++- .../core/process_definition.py | 1 + .../server/PythonGatewayServer.java | 8 +- .../master/runner/WorkflowExecuteThread.java | 29 +++ .../master/WorkflowExecuteThreadTest.java | 35 +++- .../service/process/ProcessService.java | 77 +++++++- .../service/process/ProcessServiceTest.java | 83 ++++++++- sql/dolphinscheduler_h2.sql | 3 + sql/dolphinscheduler_mysql.sql | 3 + sql/dolphinscheduler_postgre.sql | 3 + 26 files changed, 501 insertions(+), 114 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProcessExecutionTypeEnum.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java index d6cb1bacb8..62579186d6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java @@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; @@ -126,9 +127,10 @@ public class ProcessDefinitionController extends BaseController { @RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout, @RequestParam(value = "tenantCode", required = true) String tenantCode, @RequestParam(value = "taskRelationJson", required = true) String taskRelationJson, - @RequestParam(value = "taskDefinitionJson", required = true) String taskDefinitionJson) { + @RequestParam(value = "taskDefinitionJson", required = true) String taskDefinitionJson, + @RequestParam(value = "executionType", defaultValue = "PARALLEL") ProcessExecutionTypeEnum executionType) { Map result = processDefinitionService.createProcessDefinition(loginUser, projectCode, name, description, globalParams, - locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson); + locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType); return returnDataList(result); } @@ -244,10 +246,11 @@ public class ProcessDefinitionController extends BaseController { @RequestParam(value = "tenantCode", required = true) String tenantCode, @RequestParam(value = "taskRelationJson", required = true) String taskRelationJson, @RequestParam(value = "taskDefinitionJson", required = true) String taskDefinitionJson, + @RequestParam(value = "executionType", defaultValue = "PARALLEL") ProcessExecutionTypeEnum executionType, @RequestParam(value = "releaseState", required = false, defaultValue = "OFFLINE") ReleaseState releaseState) { Map result = processDefinitionService.updateProcessDefinition(loginUser, projectCode, name, code, description, globalParams, - locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson); + locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType); // If the update fails, the result will be returned directly if (result.get(Constants.STATUS) != Status.SUCCESS) { return returnDataList(result); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 42fce02bad..723717d1fc 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.dao.entity.User; @@ -56,7 +57,8 @@ public interface ProcessDefinitionService { int timeout, String tenantCode, String taskRelationJson, - String taskDefinitionJson); + String taskDefinitionJson, + ProcessExecutionTypeEnum executionType); /** * query process definition list @@ -164,7 +166,8 @@ public interface ProcessDefinitionService { int timeout, String tenantCode, String taskRelationJson, - String taskDefinitionJson); + String taskDefinitionJson, + ProcessExecutionTypeEnum executionType); /** * verify process definition name unique diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index d4aa93ec92..2fdec30284 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; +import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.graph.DAG; @@ -72,8 +73,6 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.service.permission.PermissionCheck; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.commons.lang.StringUtils; - import java.io.BufferedOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -163,15 +162,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * create process definition * - * @param loginUser login user - * @param projectCode project code - * @param name process definition name - * @param description description - * @param globalParams global params - * @param locations locations for nodes - * @param timeout timeout - * @param tenantCode tenantCode - * @param taskRelationJson relation json for nodes + * @param loginUser login user + * @param projectCode project code + * @param name process definition name + * @param description description + * @param globalParams global params + * @param locations locations for nodes + * @param timeout timeout + * @param tenantCode tenantCode + * @param taskRelationJson relation json for nodes * @param taskDefinitionJson taskDefinitionJson * @return create result code */ @@ -186,7 +185,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro int timeout, String tenantCode, String taskRelationJson, - String taskDefinitionJson) { + String taskDefinitionJson, + ProcessExecutionTypeEnum executionType) { Project project = projectMapper.queryByCode(projectCode); //check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); @@ -227,7 +227,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description, - globalParams, locations, timeout, loginUser.getId(), tenantId); + globalParams, locations, timeout, loginUser.getId(), tenantId); + processDefinition.setExecutionType(executionType); + return createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs); } @@ -292,8 +294,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } List processTaskRelations = taskRelationList.stream() - .map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class)) - .collect(Collectors.toList()); + .map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class)) + .collect(Collectors.toList()); List taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs); if (taskNodeList.size() != taskRelationList.size()) { Set postTaskCodes = taskRelationList.stream().map(ProcessTaskRelationLog::getPostTaskCode).collect(Collectors.toSet()); @@ -301,7 +303,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro Collection codes = CollectionUtils.subtract(postTaskCodes, taskNodeCodes); if (CollectionUtils.isNotEmpty(codes)) { logger.error("the task code is not exit"); - putMsg(result, Status.TASK_DEFINE_NOT_EXIST, StringUtils.join(codes, Constants.COMMA)); + putMsg(result, Status.TASK_DEFINE_NOT_EXIST, org.apache.commons.lang.StringUtils.join(codes, Constants.COMMA)); return result; } } @@ -330,7 +332,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * query process definition list * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code * @return definition list */ @@ -352,12 +354,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * query process definition list paging * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param searchVal search value - * @param userId user id - * @param pageNo page number - * @param pageSize page size + * @param searchVal search value + * @param userId user id + * @param pageNo page number + * @param pageSize page size * @return process definition page */ @Override @@ -374,7 +376,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro Page page = new Page<>(pageNo, pageSize); IPage processDefinitionIPage = processDefinitionMapper.queryDefineListPaging( - page, searchVal, userId, project.getCode(), isAdmin(loginUser)); + page, searchVal, userId, project.getCode(), isAdmin(loginUser)); List records = processDefinitionIPage.getRecords(); for (ProcessDefinition pd : records) { @@ -395,9 +397,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * query detail of process definition * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param code process definition code + * @param code process definition code * @return process definition detail */ @Override @@ -447,16 +449,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * update process definition * - * @param loginUser login user - * @param projectCode project code - * @param name process definition name - * @param code process definition code - * @param description description - * @param globalParams global params - * @param locations locations for nodes - * @param timeout timeout - * @param tenantCode tenantCode - * @param taskRelationJson relation json for nodes + * @param loginUser login user + * @param projectCode project code + * @param name process definition name + * @param code process definition code + * @param description description + * @param globalParams global params + * @param locations locations for nodes + * @param timeout timeout + * @param tenantCode tenantCode + * @param taskRelationJson relation json for nodes * @param taskDefinitionJson taskDefinitionJson * @return update result code */ @@ -472,7 +474,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro int timeout, String tenantCode, String taskRelationJson, - String taskDefinitionJson) { + String taskDefinitionJson, + ProcessExecutionTypeEnum executionType) { Project project = projectMapper.queryByCode(projectCode); //check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); @@ -522,6 +525,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } ProcessDefinition processDefinitionDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class); processDefinition.set(projectCode, name, description, globalParams, locations, timeout, tenantId); + processDefinition.setExecutionType(executionType); return updateDagDefine(loginUser, taskRelationList, processDefinition, processDefinitionDeepCopy, taskDefinitionLogs); } @@ -551,7 +555,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), - processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); + processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); if (insertResult == Constants.EXIT_CODE_SUCCESS) { putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, processDefinition); @@ -565,9 +569,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * verify process definition name unique * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param name name + * @param name name * @return true if process definition name not exists, otherwise false */ @Override @@ -590,9 +594,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * delete process definition by code * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param code process definition code + * @param code process definition code * @return delete result code */ @Override @@ -661,9 +665,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * release process definition: online / offline * - * @param loginUser login user - * @param projectCode project code - * @param code process definition code + * @param loginUser login user + * @param projectCode project code + * @param code process definition code * @param releaseState release state * @return release result code */ @@ -689,7 +693,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro case ONLINE: // To check resources whether they are already cancel authorized or deleted String resourceIds = processDefinition.getResourceIds(); - if (StringUtils.isNotBlank(resourceIds)) { + if (org.apache.commons.lang.StringUtils.isNotBlank(resourceIds)) { Integer[] resourceIdArray = Arrays.stream(resourceIds.split(Constants.COMMA)).map(Integer::parseInt).toArray(Integer[]::new); PermissionCheck permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE_ID, processService, resourceIdArray, loginUser.getId(), logger); try { @@ -708,7 +712,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processDefinition.setReleaseState(releaseState); int updateProcess = processDefinitionMapper.updateById(processDefinition); List scheduleList = scheduleMapper.selectAllByProcessDefineArray( - new long[]{processDefinition.getCode()} + new long[]{processDefinition.getCode()} ); if (updateProcess > 0 && scheduleList.size() == 1) { Schedule schedule = scheduleList.get(0); @@ -737,7 +741,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro */ @Override public void batchExportProcessDefinitionByCodes(User loginUser, long projectCode, String codes, HttpServletResponse response) { - if (StringUtils.isEmpty(codes)) { + if (org.apache.commons.lang.StringUtils.isEmpty(codes)) { return; } Project project = projectMapper.queryByCode(projectCode); @@ -807,9 +811,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * import process definition * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param file process metadata json file + * @param file process metadata json file * @return import process */ @Override @@ -1017,9 +1021,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * get task node details based on process definition * - * @param loginUser loginUser + * @param loginUser loginUser * @param projectCode project code - * @param code process definition code + * @param code process definition code * @return task node list */ @Override @@ -1046,9 +1050,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * get task node details map based on process definition * - * @param loginUser loginUser + * @param loginUser loginUser * @param projectCode project code - * @param codes define codes + * @param codes define codes * @return task node list */ @Override @@ -1083,7 +1087,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * query process definition all by project code * - * @param loginUser loginUser + * @param loginUser loginUser * @param projectCode project code * @return process definitions in the project */ @@ -1105,7 +1109,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * Encapsulates the TreeView structure * - * @param code process definition code + * @param code process definition code * @param limit limit * @return tree view json data */ @@ -1130,7 +1134,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processInstanceList.forEach(processInstance -> processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime()))); List taskDefinitionList = processService.genTaskDefineList(processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode())); Map taskDefinitionMap = taskDefinitionList.stream() - .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); + .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); if (limit > processInstanceList.size()) { limit = processInstanceList.size(); @@ -1145,8 +1149,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro ProcessInstance processInstance = processInstanceList.get(i); Date endTime = processInstance.getEndTime() == null ? new Date() : processInstance.getEndTime(); parentTreeViewDto.getInstances().add(new Instance(processInstance.getId(), processInstance.getName(), processInstance.getProcessDefinitionCode(), - "", processInstance.getState().toString(), processInstance.getStartTime(), endTime, processInstance.getHost(), - DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime()))); + "", processInstance.getState().toString(), processInstance.getStartTime(), endTime, processInstance.getHost(), + DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime()))); } List parentTreeViewDtoList = new ArrayList<>(); @@ -1184,11 +1188,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro if (taskInstance.isSubProcess()) { TaskDefinition taskDefinition = taskDefinitionMap.get(taskInstance.getTaskCode()); subProcessId = Integer.parseInt(JSONUtils.parseObject( - taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText()); + taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText()); } treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskCode(), - taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(), - taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessId)); + taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(), + taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessId)); } } for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) { @@ -1249,9 +1253,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * batch copy process definition * - * @param loginUser loginUser - * @param projectCode projectCode - * @param codes processDefinitionCodes + * @param loginUser loginUser + * @param projectCode projectCode + * @param codes processDefinitionCodes * @param targetProjectCode targetProjectCode */ @Override @@ -1272,9 +1276,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * batch move process definition * - * @param loginUser loginUser - * @param projectCode projectCode - * @param codes processDefinitionCodes + * @param loginUser loginUser + * @param projectCode projectCode + * @param codes processDefinitionCodes * @param targetProjectCode targetProjectCode */ @Override @@ -1307,7 +1311,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } - if (StringUtils.isEmpty(processDefinitionCodes)) { + if (org.apache.commons.lang.StringUtils.isEmpty(processDefinitionCodes)) { putMsg(result, Status.PROCESS_DEFINITION_CODES_IS_EMPTY, processDefinitionCodes); return result; } @@ -1337,7 +1341,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro diffCode.forEach(code -> failedProcessList.add(code + "[null]")); for (ProcessDefinition processDefinition : processDefinitionList) { List processTaskRelations = - processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()); + processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()); List taskRelationList = processTaskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); processDefinition.setProjectCode(targetProjectCode); if (isCopy) { @@ -1373,10 +1377,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * switch the defined process definition version * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param code process definition code - * @param version the version user want to switch + * @param code process definition code + * @param version the version user want to switch * @return switch process definition version result code */ @Override @@ -1412,11 +1416,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * check batch operate result * - * @param srcProjectCode srcProjectCode + * @param srcProjectCode srcProjectCode * @param targetProjectCode targetProjectCode - * @param result result + * @param result result * @param failedProcessList failedProcessList - * @param isCopy isCopy + * @param isCopy isCopy */ private void checkBatchOperateResult(long srcProjectCode, long targetProjectCode, Map result, List failedProcessList, boolean isCopy) { @@ -1434,11 +1438,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * query the pagination versions info by one certain process definition code * - * @param loginUser login user info to check auth + * @param loginUser login user info to check auth * @param projectCode project code - * @param pageNo page number - * @param pageSize page size - * @param code process definition code + * @param pageNo page number + * @param pageSize page size + * @param code process definition code * @return the pagination process definition versions info of the certain process definition */ @Override @@ -1468,10 +1472,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * delete one certain process definition by version number and process definition code * - * @param loginUser login user info to check auth + * @param loginUser login user info to check auth * @param projectCode project code - * @param code process definition code - * @param version version number + * @param code process definition code + * @param version version number * @return delete result code */ @Override diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java index 4737c2f8f8..2ac632f235 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; @@ -94,10 +95,10 @@ public class ProcessDefinitionControllerTest { result.put(Constants.DATA_LIST, 1); Mockito.when(processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams, - locations, timeout, tenantCode, relationJson, taskDefinitionJson)).thenReturn(result); + locations, timeout, tenantCode, relationJson, taskDefinitionJson, ProcessExecutionTypeEnum.PARALLEL)).thenReturn(result); Result response = processDefinitionController.createProcessDefinition(user, projectCode, name, description, globalParams, - locations, timeout, tenantCode, relationJson, taskDefinitionJson); + locations, timeout, tenantCode, relationJson, taskDefinitionJson,ProcessExecutionTypeEnum.PARALLEL); Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); } @@ -157,10 +158,10 @@ public class ProcessDefinitionControllerTest { result.put("processDefinitionId", 1); Mockito.when(processDefinitionService.updateProcessDefinition(user, projectCode, name, code, description, globalParams, - locations, timeout, tenantCode, relationJson, taskDefinitionJson)).thenReturn(result); + locations, timeout, tenantCode, relationJson, taskDefinitionJson,ProcessExecutionTypeEnum.PARALLEL)).thenReturn(result); Result response = processDefinitionController.updateProcessDefinition(user, projectCode, name, code, description, globalParams, - locations, timeout, tenantCode, relationJson, taskDefinitionJson, ReleaseState.OFFLINE); + locations, timeout, tenantCode, relationJson, taskDefinitionJson, ProcessExecutionTypeEnum.PARALLEL, ReleaseState.OFFLINE); Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 506bba3f92..3714de9870 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WarningType; @@ -610,7 +611,7 @@ public class ProcessDefinitionServiceTest { Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); Map updateResult = processDefinitionService.updateProcessDefinition(loginUser, projectCode, "test", 1, - "", "", "", 0, "root", null, null); + "", "", "", 0, "root", null, null, ProcessExecutionTypeEnum.PARALLEL); Assert.assertEquals(Status.DATA_IS_NOT_VALID, updateResult.get(Constants.STATUS)); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index fc9e36fd9f..fa9a490827 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -876,6 +876,12 @@ public final class Constants { ExecutionStatus.WAITING_DEPEND.ordinal() }; + public static final int[] RUNNING_PROCESS_STATE = new int[] { + ExecutionStatus.RUNNING_EXECUTION.ordinal(), + ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), + ExecutionStatus.SERIAL_WAIT.ordinal() + }; + /** * status */ diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java index 4f24977c5f..9bff968b73 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java @@ -50,7 +50,9 @@ public enum CommandType { REPEAT_RUNNING(7, "repeat running a process"), PAUSE(8, "pause a process"), STOP(9, "stop a process"), - RECOVER_WAITING_THREAD(10, "recover waiting thread"); + RECOVER_WAITING_THREAD(10, "recover waiting thread"), + RECOVER_SERIAL_WAIT(11, "recover serial wait"), + ; CommandType(int code, String descp){ this.code = code; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java index 637eab2a4c..99ba93e6e9 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java @@ -56,7 +56,8 @@ public enum ExecutionStatus { WAITING_THREAD(10, "waiting thread"), WAITING_DEPEND(11, "waiting depend node complete"), DELAY_EXECUTION(12, "delay execution"), - FORCED_SUCCESS(13, "forced success"); + FORCED_SUCCESS(13, "forced success"), + SERIAL_WAIT(14, "serial wait"); ExecutionStatus(int code, String descp) { this.code = code; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProcessExecutionTypeEnum.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProcessExecutionTypeEnum.java new file mode 100644 index 0000000000..50c46fd45e --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProcessExecutionTypeEnum.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.enums; + +import java.util.HashMap; + +import com.baomidou.mybatisplus.annotation.EnumValue; + +public enum ProcessExecutionTypeEnum { + + PARALLEL(0, "parallel"), + SERIAL_WAIT(1, "serial wait"), + SERIAL_DISCARD(2, "serial discard"), + SERIAL_PRIORITY(3, "serial priority"); + + ProcessExecutionTypeEnum(int code, String descp) { + this.code = code; + this.descp = descp; + } + + @EnumValue + private final int code; + private final String descp; + + private static HashMap EXECUTION_STATUS_MAP = new HashMap<>(); + + static { + for (ProcessExecutionTypeEnum executionType : ProcessExecutionTypeEnum.values()) { + EXECUTION_STATUS_MAP.put(executionType.code, executionType); + } + } + + public boolean typeIsSerial() { + return this != PARALLEL; + } + + public boolean typeIsSerialWait() { + return this == SERIAL_WAIT; + } + + public boolean typeIsSerialDiscard() { + return this == SERIAL_DISCARD; + } + + public boolean typeIsSerialPriority() { + return this == SERIAL_PRIORITY; + } + + public int getCode() { + return code; + } + + public String getDescp() { + return descp; + } + + public static ProcessExecutionTypeEnum of(int executionType) { + if (EXECUTION_STATUS_MAP.containsKey(executionType)) { + return EXECUTION_STATUS_MAP.get(executionType); + } + throw new IllegalArgumentException("invalid status : " + executionType); + } + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java index 8e50ce8a68..2fbcac14a7 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.dao.entity; import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -174,8 +175,12 @@ public class ProcessDefinition { @TableField(exist = false) private int warningGroupId; - public ProcessDefinition() { - } + /** + * execution type + */ + private ProcessExecutionTypeEnum executionType; + + public ProcessDefinition() { } public ProcessDefinition(long projectCode, String name, @@ -412,6 +417,14 @@ public class ProcessDefinition { this.warningGroupId = warningGroupId; } + public ProcessExecutionTypeEnum getExecutionType() { + return executionType; + } + + public void setExecutionType(ProcessExecutionTypeEnum executionType) { + this.executionType = executionType; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -430,6 +443,7 @@ public class ProcessDefinition { && Objects.equals(description, that.description) && Objects.equals(globalParams, that.globalParams) && flag == that.flag + && executionType == that.executionType && Objects.equals(locations, that.locations); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java index 30840e8602..ee11ba791c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java @@ -67,6 +67,7 @@ public class ProcessDefinitionLog extends ProcessDefinition { this.setModifyBy(processDefinition.getModifyBy()); this.setResourceIds(processDefinition.getResourceIds()); this.setWarningGroupId(processDefinition.getWarningGroupId()); + this.setExecutionType(processDefinition.getExecutionType()); } public int getOperator() { @@ -89,4 +90,5 @@ public class ProcessDefinitionLog extends ProcessDefinition { public boolean equals(Object o) { return super.equals(o); } + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index 18c386b854..0b7ee2fd72 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -238,6 +238,10 @@ public class ProcessInstance { * varPool string */ private String varPool; + /** + * serial queue next processInstanceId + */ + private int nextProcessInstanceId; /** * dry run flag @@ -706,4 +710,11 @@ public class ProcessInstance { return Objects.hash(id); } + public int getNextProcessInstanceId() { + return nextProcessInstanceId; + } + + public void setNextProcessInstanceId(int nextProcessInstanceId) { + this.nextProcessInstanceId = nextProcessInstanceId; + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index 7be58a7422..6e85163e81 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -233,6 +233,13 @@ public interface ProcessInstanceMapper extends BaseMapper { List queryByProcessDefineCodeAndStatus(@Param("processDefinitionCode") Long processDefinitionCode, @Param("states") int[] states); + List queryByProcessDefineCodeAndStatusAndNextId(@Param("processDefinitionCode") Long processDefinitionCode, + @Param("states") int[] states, @Param("id") int id); + int updateGlobalParamsById(@Param("globalParams") String globalParams, @Param("id") int id); + + boolean updateNextProcessIdById(@Param("thisInstanceId") int thisInstanceId, @Param("runningInstanceId")int runningInstanceId); + + ProcessInstance loadNextProcess4Serial(@Param("processDefinitionCode") Long processDefinitionCode,@Param("state") int state); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml index 40afa04628..ee9f7c06ba 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml @@ -22,14 +22,14 @@ id, code, name, version, description, project_code, release_state, user_id,global_params, flag, locations, - warning_group_id, timeout, tenant_id,operator, operate_time, create_time, + warning_group_id, timeout, tenant_id,operator,execution_type, operate_time, create_time, update_time select pd.id, pd.code, pd.name, pd.version, pd.release_state, pd.project_code, pd.user_id, pd.description, - pd.global_params, pd.flag, pd.locations, pd.warning_group_id, pd.create_time, pd.timeout, pd.tenant_id, pd.update_time + pd.global_params, pd.flag, pd.locations, pd.warning_group_id, pd.create_time, pd.timeout, pd.tenant_id, pd.update_time,pd.execution_type from t_ds_process_definition pd WHERE pd.project_code = #{projectCode} and pd.name = #{processDefinitionName} @@ -58,7 +58,7 @@ SELECT td.id, td.code, td.name, td.version, td.release_state, td.project_code, td.user_id, td.description, td.global_params, td.flag, td.warning_group_id, td.timeout, td.tenant_id, td.update_time, td.create_time, - sc.schedule_release_state, tu.user_name + sc.schedule_release_state, tu.user_name ,td.execution_type FROM t_ds_process_definition td left join (select process_definition_code,release_state as schedule_release_state from t_ds_schedules group by process_definition_code,release_state) sc on sc.process_definition_code = td.code @@ -127,7 +127,7 @@ SELECT pd.id, pd.code, pd.name, pd.version, pd.release_state, pd.project_code, pd.user_id, pd.description, pd.global_params, pd.flag, pd.locations, pd.warning_group_id, pd.create_time, pd.timeout, - pd.tenant_id, pd.update_time, u.user_name,p.name AS project_name + pd.tenant_id, pd.update_time, u.user_name,p.name AS project_name ,pd.execution_type FROM t_ds_process_definition pd, t_ds_user u, diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml index 249fb8669f..1dc61686e7 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.xml @@ -19,7 +19,7 @@ - id, parent_process_instance_id, parent_task_instance_id, process_instance_id + id, parent_process_instance_id, parent_task_instance_id, process_instance_id,next_process_instance_id delete diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index 08db3af58c..05a56a4dd0 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -23,7 +23,7 @@ command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type, warning_group_id, schedule_time, command_start_time, global_params, flag, update_time, is_sub_process, executor_id, history_cmd, - process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool, dry_run + process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool, dry_run,next_process_instance_id select instance.id, instance.command_type, instance.executor_id, instance.process_definition_version, instance.process_definition_code, instance.name, instance.state, instance.schedule_time, instance.start_time, - instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run + instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run ,instance.next_process_instance_id from t_ds_process_instance instance join t_ds_process_definition define ON instance.process_definition_code = define.code where instance.is_sub_process=0 @@ -218,9 +218,36 @@ order by id asc + + update t_ds_process_instance set global_params = #{globalParams} where id = #{id} + + update t_ds_process_instance + set next_process_instance_id = #{thisInstanceId} + where id = #{runningInstanceId} and next_process_instance_id=0 + diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py index 500f2d2380..f4527b8ccf 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py @@ -281,6 +281,7 @@ class ProcessDefinition(Base): # TODO add serialization function json.dumps(self.task_relation_json), json.dumps(self.task_definition_json), + None, ) return self._process_definition_code diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java index 08390e946a..4e15fbb1a9 100644 --- a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java +++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.RunMode; import org.apache.dolphinscheduler.common.enums.TaskDependType; @@ -175,7 +176,8 @@ public class PythonGatewayServer extends SpringBootServletInitializer { int timeout, String tenantCode, String taskRelationJson, - String taskDefinitionJson) { + String taskDefinitionJson, + ProcessExecutionTypeEnum executionType) { User user = usersService.queryUser(userName); Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST); long projectCode = project.getCode(); @@ -189,12 +191,12 @@ public class PythonGatewayServer extends SpringBootServletInitializer { // make sure process definition offline which could edit processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE); Map result = processDefinitionService.updateProcessDefinition(user, projectCode, name, processDefinitionCode, description, globalParams, - locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson); + locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType); return processDefinitionCode; } else if (verifyStatus == Status.SUCCESS) { // create process definition Map result = processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams, - locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson); + locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType); ProcessDefinition processDefinition = (ProcessDefinition) result.get(Constants.DATA_LIST); return processDefinition.getCode(); } else { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index c5012623b5..6ed8912e15 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.runner; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT; @@ -48,6 +49,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Environment; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; @@ -510,6 +512,10 @@ public class WorkflowExecuteThread implements Runnable { private void endProcess() { this.stateEvents.clear(); processInstance.setEndTime(new Date()); + ProcessDefinition processDefinition = this.processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),processInstance.getProcessDefinitionVersion()); + if (processDefinition.getExecutionType().typeIsSerialWait()) { + checkSerialProcess(processDefinition); + } processService.updateProcessInstance(processInstance); if (processInstance.getState().typeIsWaitingThread()) { processService.createRecoveryWaitingThreadCommand(null, processInstance); @@ -519,6 +525,29 @@ public class WorkflowExecuteThread implements Runnable { processAlertManager.sendAlertProcessInstance(processInstance, taskInstances, projectUser); } + public void checkSerialProcess(ProcessDefinition processDefinition) { + this.processInstance = processService.findProcessInstanceById(processInstance.getId()); + int nextInstanceId = processInstance.getNextProcessInstanceId(); + if (nextInstanceId == 0) { + ProcessInstance nextProcessInstance = this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(),ExecutionStatus.SERIAL_WAIT.getCode()); + if (nextProcessInstance == null) { + return; + } + nextInstanceId = nextProcessInstance.getId(); + } + ProcessInstance nextProcessInstance = this.processService.findProcessInstanceById(nextInstanceId); + if (nextProcessInstance.getState().typeIsFinished() || nextProcessInstance.getState().typeIsRunning()) { + return; + } + Map cmdParam = new HashMap<>(); + cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, nextInstanceId); + Command command = new Command(); + command.setCommandType(CommandType.RECOVER_SERIAL_WAIT); + command.setProcessDefinitionCode(processDefinition.getCode()); + command.setCommandParam(JSONUtils.toJsonString(cmdParam)); + processService.createCommand(command); + } + /** * generate process dag * diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java index 1b4d3bfd4b..32b2b5e1e7 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java @@ -29,6 +29,7 @@ import static org.powermock.api.mockito.PowerMockito.mock; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -111,9 +112,7 @@ public class WorkflowExecuteThreadTest { Field dag = WorkflowExecuteThread.class.getDeclaredField("dag"); dag.setAccessible(true); dag.set(workflowExecuteThread, new DAG()); - PowerMockito.doNothing().when(workflowExecuteThread, "executeProcess"); PowerMockito.doNothing().when(workflowExecuteThread, "prepareProcess"); - PowerMockito.doNothing().when(workflowExecuteThread, "runProcess"); PowerMockito.doNothing().when(workflowExecuteThread, "endProcess"); } @@ -256,6 +255,36 @@ public class WorkflowExecuteThreadTest { } } + @Test + public void testCheckSerialProcess() { + try { + ProcessDefinition processDefinition1 = new ProcessDefinition(); + processDefinition1.setId(123); + processDefinition1.setName("test"); + processDefinition1.setVersion(1); + processDefinition1.setCode(11L); + processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_WAIT); + Mockito.when(processInstance.getId()).thenReturn(225); + Mockito.when(processService.findProcessInstanceById(225)).thenReturn(processInstance); + workflowExecuteThread.checkSerialProcess(processDefinition1); + + Mockito.when(processInstance.getId()).thenReturn(225); + Mockito.when(processInstance.getNextProcessInstanceId()).thenReturn(222); + + ProcessInstance processInstance9 = new ProcessInstance(); + processInstance9.setId(222); + processInstance9.setProcessDefinitionCode(11L); + processInstance9.setProcessDefinitionVersion(1); + processInstance9.setState(ExecutionStatus.SERIAL_WAIT); + + Mockito.when(processService.findProcessInstanceById(225)).thenReturn(processInstance); + Mockito.when(processService.findProcessInstanceById(222)).thenReturn(processInstance9); + workflowExecuteThread.checkSerialProcess(processDefinition1); + } catch (Exception e) { + Assert.fail(); + } + } + private List zeroSchedulerList() { return Collections.emptyList(); } @@ -268,4 +297,4 @@ public class WorkflowExecuteThreadTest { return schedulerList; } -} \ No newline at end of file +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 1c8e72c90e..c13fbda8b6 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -100,6 +100,8 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.utils.DagHelper; +import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; +import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; @@ -200,6 +202,10 @@ public class ProcessService { @Autowired private ProcessTaskRelationLogMapper processTaskRelationLogMapper; + + @Autowired + StateEventCallbackService stateEventCallbackService; + @Autowired private EnvironmentMapper environmentMapper; @@ -222,12 +228,77 @@ public class ProcessService { } processInstance.setCommandType(command.getCommandType()); processInstance.addHistoryCmd(command.getCommandType()); - saveProcessInstance(processInstance); + //if the processDefination is serial + ProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); + if (processDefinition.getExecutionType().typeIsSerial()) { + saveSerialProcess(processInstance,processDefinition); + if (processInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) { + this.setSubProcessParam(processInstance); + this.commandMapper.deleteById(command.getId()); + return null; + } + } else { + saveProcessInstance(processInstance); + } this.setSubProcessParam(processInstance); this.commandMapper.deleteById(command.getId()); return processInstance; } + private void saveSerialProcess(ProcessInstance processInstance,ProcessDefinition processDefinition) { + processInstance.setState(ExecutionStatus.SERIAL_WAIT); + saveProcessInstance(processInstance); + //serial wait + //when we get the running instance(or waiting instance) only get the priority instance(by id) + if (processDefinition.getExecutionType().typeIsSerialWait()) { + while (true) { + List runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(), + Constants.RUNNING_PROCESS_STATE,processInstance.getId()); + if (CollectionUtils.isEmpty(runningProcessInstances)) { + processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); + saveProcessInstance(processInstance); + return; + } + ProcessInstance runningProcess = runningProcessInstances.get(0); + if (this.processInstanceMapper.updateNextProcessIdById(processInstance.getId(), runningProcess.getId())) { + return; + } + } + } else if (processDefinition.getExecutionType().typeIsSerialDiscard()) { + List runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(), + Constants.RUNNING_PROCESS_STATE,processInstance.getId()); + if (CollectionUtils.isEmpty(runningProcessInstances)) { + processInstance.setState(ExecutionStatus.STOP); + saveProcessInstance(processInstance); + } + } else if (processDefinition.getExecutionType().typeIsSerialPriority()) { + List runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(processInstance.getProcessDefinitionCode(), + Constants.RUNNING_PROCESS_STATE,processInstance.getId()); + if (CollectionUtils.isNotEmpty(runningProcessInstances)) { + for (ProcessInstance info : runningProcessInstances) { + info.setCommandType(CommandType.STOP); + info.addHistoryCmd(CommandType.STOP); + info.setState(ExecutionStatus.READY_STOP); + int update = updateProcessInstance(info); + // determine whether the process is normal + if (update > 0) { + String host = info.getHost(); + String address = host.split(":")[0]; + int port = Integer.parseInt(host.split(":")[1]); + StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( + info.getId(), 0, info.getState(), info.getId(), 0 + ); + try { + stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); + } catch (Exception e) { + logger.error("sendResultError"); + } + } + } + } + } + } + /** * save error command, and delete original command * @@ -2495,4 +2566,8 @@ public class ProcessService { } return processTaskMap; } + + public ProcessInstance loadNextProcess4Serial(long code, int state) { + return this.processInstanceMapper.loadNextProcess4Serial(code,state); + } } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 21125639e8..b55aee5f32 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.any; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WarningType; @@ -86,7 +87,7 @@ import com.fasterxml.jackson.databind.JsonNode; /** * process service test */ -@RunWith(MockitoJUnitRunner.class) +@RunWith(MockitoJUnitRunner.Silent.class) public class ProcessServiceTest { private static final Logger logger = LoggerFactory.getLogger(CronUtilsTest.class); @@ -257,16 +258,25 @@ public class ProcessServiceTest { command1.setProcessDefinitionVersion(definitionVersion); command1.setCommandParam("{\"ProcessInstanceId\":222}"); command1.setCommandType(CommandType.START_PROCESS); + ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setId(123); processDefinition.setName("test"); processDefinition.setVersion(definitionVersion); processDefinition.setCode(definitionCode); processDefinition.setGlobalParams("[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]"); + processDefinition.setExecutionType(ProcessExecutionTypeEnum.PARALLEL); + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(222); + processInstance.setProcessDefinitionCode(11L); + processInstance.setHost("127.0.0.1:5678"); + processInstance.setProcessDefinitionVersion(1); processInstance.setId(processInstanceId); processInstance.setProcessDefinitionCode(definitionCode); processInstance.setProcessDefinitionVersion(definitionVersion); + + Mockito.when(processDefineMapper.queryByCode(command1.getProcessDefinitionCode())).thenReturn(processDefinition); Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition)); Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance); @@ -309,6 +319,77 @@ public class ProcessServiceTest { command5.setDryRun(Constants.DRY_RUN_FLAG_NO); ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5, processDefinitionCacheMaps); Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\"")); + + ProcessDefinition processDefinition1 = new ProcessDefinition(); + processDefinition1.setId(123); + processDefinition1.setName("test"); + processDefinition1.setVersion(1); + processDefinition1.setCode(11L); + processDefinition1.setVersion(1); + processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_WAIT); + List lists = new ArrayList<>(); + ProcessInstance processInstance11 = new ProcessInstance(); + processInstance11.setId(222); + processInstance11.setProcessDefinitionCode(11L); + processInstance11.setProcessDefinitionVersion(1); + processInstance11.setHost("127.0.0.1:5678"); + lists.add(processInstance11); + + ProcessInstance processInstance2 = new ProcessInstance(); + processInstance2.setId(223); + processInstance2.setProcessDefinitionCode(11L); + processInstance2.setProcessDefinitionVersion(1); + Mockito.when(processInstanceMapper.queryDetailById(223)).thenReturn(processInstance2); + Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1); + Assert.assertNotNull(processService.handleCommand(logger, host, command1, processDefinitionCacheMaps)); + Command command6 = new Command(); + command6.setProcessDefinitionCode(11L); + command6.setCommandParam("{\"ProcessInstanceId\":223}"); + command6.setCommandType(CommandType.RECOVER_SERIAL_WAIT); + command6.setProcessDefinitionVersion(1); + Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L,Constants.RUNNING_PROCESS_STATE,223)).thenReturn(lists); + Mockito.when(processInstanceMapper.updateNextProcessIdById(223, 222)).thenReturn(true); + ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6, processDefinitionCacheMaps); + Assert.assertTrue(processInstance6 != null); + + processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_DISCARD); + Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1); + ProcessInstance processInstance7 = new ProcessInstance(); + processInstance7.setId(224); + processInstance7.setProcessDefinitionCode(11L); + processInstance7.setProcessDefinitionVersion(1); + Mockito.when(processInstanceMapper.queryDetailById(224)).thenReturn(processInstance7); + + Command command7 = new Command(); + command7.setProcessDefinitionCode(11L); + command7.setCommandParam("{\"ProcessInstanceId\":224}"); + command7.setCommandType(CommandType.RECOVER_SERIAL_WAIT); + command7.setProcessDefinitionVersion(1); + Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L,Constants.RUNNING_PROCESS_STATE,224)).thenReturn(null); + ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7, processDefinitionCacheMaps); + Assert.assertTrue(processInstance8 == null); + + ProcessDefinition processDefinition2 = new ProcessDefinition(); + processDefinition2.setId(123); + processDefinition2.setName("test"); + processDefinition2.setVersion(1); + processDefinition2.setCode(12L); + processDefinition2.setExecutionType(ProcessExecutionTypeEnum.SERIAL_PRIORITY); + Mockito.when(processDefineMapper.queryByCode(12L)).thenReturn(processDefinition2); + ProcessInstance processInstance9 = new ProcessInstance(); + processInstance9.setId(225); + processInstance9.setProcessDefinitionCode(11L); + processInstance9.setProcessDefinitionVersion(1); + Command command9 = new Command(); + command9.setProcessDefinitionCode(12L); + command9.setCommandParam("{\"ProcessInstanceId\":225}"); + command9.setCommandType(CommandType.RECOVER_SERIAL_WAIT); + command9.setProcessDefinitionVersion(1); + Mockito.when(processInstanceMapper.queryDetailById(225)).thenReturn(processInstance9); + Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(12L,Constants.RUNNING_PROCESS_STATE,0)).thenReturn(lists); + Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1); + ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9, processDefinitionCacheMaps); + Assert.assertTrue(processInstance10 == null); } @Test diff --git a/sql/dolphinscheduler_h2.sql b/sql/dolphinscheduler_h2.sql index 172686f2f9..7c4f3015af 100644 --- a/sql/dolphinscheduler_h2.sql +++ b/sql/dolphinscheduler_h2.sql @@ -412,6 +412,7 @@ CREATE TABLE t_ds_process_definition warning_group_id int(11) DEFAULT NULL, timeout int(11) DEFAULT '0', tenant_id int(11) NOT NULL DEFAULT '-1', + execution_type tinyint(4) DEFAULT '0', create_time datetime NOT NULL, update_time datetime DEFAULT NULL, PRIMARY KEY (id), @@ -443,6 +444,7 @@ CREATE TABLE t_ds_process_definition_log warning_group_id int(11) DEFAULT NULL, timeout int(11) DEFAULT '0', tenant_id int(11) NOT NULL DEFAULT '-1', + execution_type tinyint(4) DEFAULT '0', operator int(11) DEFAULT NULL, operate_time datetime DEFAULT NULL, create_time datetime NOT NULL, @@ -595,6 +597,7 @@ CREATE TABLE t_ds_process_instance worker_group varchar(64) DEFAULT NULL, environment_code bigint(20) DEFAULT '-1', timeout int(11) DEFAULT '0', + next_process_instance_id int(11) DEFAULT '0', tenant_id int(11) NOT NULL DEFAULT '-1', var_pool longtext, dry_run int NULL DEFAULT 0, diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql index 5a27912168..cd4c84b449 100644 --- a/sql/dolphinscheduler_mysql.sql +++ b/sql/dolphinscheduler_mysql.sql @@ -414,6 +414,7 @@ CREATE TABLE `t_ds_process_definition` ( `warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id', `timeout` int(11) DEFAULT '0' COMMENT 'time out, unit: minute', `tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id', + `execution_type` tinyint(4) DEFAULT '0' COMMENT 'execution_type 0:parallel,1:serial wait,2:serial discard,3:serial priority', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime NOT NULL COMMENT 'update time', PRIMARY KEY (`id`,`code`), @@ -443,6 +444,7 @@ CREATE TABLE `t_ds_process_definition_log` ( `warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id', `timeout` int(11) DEFAULT '0' COMMENT 'time out,unit: minute', `tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id', + `execution_type` tinyint(4) DEFAULT '0' COMMENT 'execution_type 0:parallel,1:serial wait,2:serial discard,3:serial priority', `operator` int(11) DEFAULT NULL COMMENT 'operator user id', `operate_time` datetime DEFAULT NULL COMMENT 'operate time', `create_time` datetime NOT NULL COMMENT 'create time', @@ -593,6 +595,7 @@ CREATE TABLE `t_ds_process_instance` ( `tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id', `var_pool` longtext COMMENT 'var_pool', `dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flagļ¼š0 normal, 1 dry run', + `next_process_instance_id` int(11) DEFAULT '0' COMMENT 'serial queue next processInstanceId', PRIMARY KEY (`id`), KEY `process_instance_index` (`process_definition_code`,`id`) USING BTREE, KEY `start_time_index` (`start_time`) USING BTREE diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql index 826e377b49..8c39bb1285 100644 --- a/sql/dolphinscheduler_postgre.sql +++ b/sql/dolphinscheduler_postgre.sql @@ -331,6 +331,7 @@ CREATE TABLE t_ds_process_definition ( flag int DEFAULT NULL , timeout int DEFAULT '0' , tenant_id int DEFAULT '-1' , + execution_type int DEFAULT '0', create_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL , PRIMARY KEY (id) , @@ -355,6 +356,7 @@ CREATE TABLE t_ds_process_definition_log ( flag int DEFAULT NULL , timeout int DEFAULT '0' , tenant_id int DEFAULT '-1' , + execution_type int DEFAULT '0', operator int DEFAULT NULL , operate_time timestamp DEFAULT NULL , create_time timestamp DEFAULT NULL , @@ -498,6 +500,7 @@ CREATE TABLE t_ds_process_instance ( tenant_id int NOT NULL DEFAULT '-1' , var_pool text , dry_run int DEFAULT '0' , + next_process_instance_id int DEFAULT '0', PRIMARY KEY (id) ) ;