diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java index da48b6d9da..c888ff76fa 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java @@ -177,13 +177,13 @@ public class ProcessInstanceController extends BaseController { @ApiImplicitParams({ @ApiImplicitParam(name = "taskRelationJson", value = "TASK_RELATION_JSON", type = "String"), @ApiImplicitParam(name = "taskDefinitionJson", value = "TASK_DEFINITION_JSON", type = "String"), - @ApiImplicitParam(name = "id", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "id", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "1"), @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", type = "String"), - @ApiImplicitParam(name = "syncDefine", value = "SYNC_DEFINE", required = true, type = "Boolean"), - @ApiImplicitParam(name = "globalParams", value = "PROCESS_GLOBAL_PARAMS", type = "String"), + @ApiImplicitParam(name = "syncDefine", value = "SYNC_DEFINE", required = true, type = "Boolean", example = "false"), + @ApiImplicitParam(name = "globalParams", value = "PROCESS_GLOBAL_PARAMS", type = "String", example = "[]"), @ApiImplicitParam(name = "locations", value = "PROCESS_INSTANCE_LOCATIONS", type = "String"), - @ApiImplicitParam(name = "timeout", value = "PROCESS_TIMEOUT", type = "String"), - @ApiImplicitParam(name = "tenantCode", value = "TENANT_CODE", type = "Int", example = "0") + @ApiImplicitParam(name = "timeout", value = "PROCESS_TIMEOUT", type = "Int", example = "0"), + @ApiImplicitParam(name = "tenantCode", value = "TENANT_CODE", type = "String", example = "default") }) @PutMapping(value = "/{id}") @ResponseStatus(HttpStatus.OK) @@ -199,8 +199,7 @@ public class ProcessInstanceController extends BaseController { @RequestParam(value = "globalParams", required = false, defaultValue = "[]") String globalParams, @RequestParam(value = "locations", required = false) String locations, @RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout, - @RequestParam(value = "tenantCode", required = true) String tenantCode, - @RequestParam(value = "flag", required = false) Flag flag) { + @RequestParam(value = "tenantCode", required = true) String tenantCode) { Map result = processInstanceService.updateProcessInstance(loginUser, projectCode, id, taskRelationJson, taskDefinitionJson, scheduleTime, syncDefine, globalParams, locations, timeout, tenantCode); return returnDataList(result); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 957d3aca9b..ed8784433f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -263,7 +263,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro ProcessDefinition processDefinition, List taskDefinitionLogs) { Map result = new HashMap<>(); - int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs); + int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs, Boolean.TRUE); if (saveTaskResult == Constants.EXIT_CODE_SUCCESS) { logger.info("The task has not changed, so skip"); } @@ -271,12 +271,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); } - int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true); + int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); if (insertVersion == 0) { putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR); } - int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); + int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), + insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE); if (insertResult == Constants.EXIT_CODE_SUCCESS) { putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, processDefinition); @@ -590,7 +591,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro ProcessDefinition processDefinitionDeepCopy, List taskDefinitionLogs) { Map result = new HashMap<>(); - int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs); + int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs, Boolean.TRUE); if (saveTaskResult == Constants.EXIT_CODE_SUCCESS) { logger.info("The task has not changed, so skip"); } @@ -603,14 +604,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro insertVersion = processDefinitionDeepCopy.getVersion(); } else { processDefinition.setUpdateTime(new Date()); - insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true); + insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); } if (insertVersion == 0) { putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), - processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); + processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE); if (insertResult == Constants.EXIT_CODE_SUCCESS) { putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, processDefinition); @@ -748,7 +749,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } switch (releaseState) { case ONLINE: - List relationList = processService.findRelationByCode(projectCode, code); + List relationList = processService.findRelationByCode(code, processDefinition.getVersion()); if (CollectionUtils.isEmpty(relationList)) { putMsg(result, Status.PROCESS_DAG_IS_EMPTY); return result; @@ -1899,7 +1900,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro private Map createEmptyDagDefine(User loginUser, ProcessDefinition processDefinition) { Map result = new HashMap<>(); - int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true); + int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); if (insertVersion == 0) { putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR); @@ -2103,7 +2104,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } switch (releaseState) { case ONLINE: - List relationList = processService.findRelationByCode(projectCode, code); + List relationList = processService.findRelationByCode(code, processDefinition.getVersion()); if (CollectionUtils.isEmpty(relationList)) { putMsg(result, Status.PROCESS_DAG_IS_EMPTY); return result; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 179f361e50..61af8f7183 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -470,64 +470,55 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } setProcessInstance(processInstance, tenantCode, scheduleTime, globalParams, timeout); - if (Boolean.TRUE.equals(syncDefine)) { - List taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); - if (taskDefinitionLogs.isEmpty()) { - putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson); + List taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); + if (taskDefinitionLogs.isEmpty()) { + putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson); + return result; + } + for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { + if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) { + putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName()); return result; } - for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { - if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) { - putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName()); - return result; - } - } - int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs); - if (saveTaskResult == Constants.DEFINITION_FAILURE) { - putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); - throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); - } - ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); - List taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class); - //check workflow json is valid - result = processDefinitionService.checkProcessNodeList(taskRelationJson); - if (result.get(Constants.STATUS) != Status.SUCCESS) { + } + int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs, syncDefine); + if (saveTaskResult == Constants.DEFINITION_FAILURE) { + putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); + throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); + } + ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); + List taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class); + //check workflow json is valid + result = processDefinitionService.checkProcessNodeList(taskRelationJson); + if (result.get(Constants.STATUS) != Status.SUCCESS) { + return result; + } + int tenantId = -1; + if (!Constants.DEFAULT.equals(tenantCode)) { + Tenant tenant = tenantMapper.queryByTenantCode(tenantCode); + if (tenant == null) { + putMsg(result, Status.TENANT_NOT_EXIST); return result; } - int tenantId = -1; - if (!Constants.DEFAULT.equals(tenantCode)) { - Tenant tenant = tenantMapper.queryByTenantCode(tenantCode); - if (tenant == null) { - putMsg(result, Status.TENANT_NOT_EXIST); - return result; - } - tenantId = tenant.getId(); - } - ProcessDefinition processDefinitionDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class); - processDefinition.set(projectCode, processDefinition.getName(), processDefinition.getDescription(), globalParams, locations, timeout, tenantId); - processDefinition.setUpdateTime(new Date()); - int insertVersion; - if (processDefinition.equals(processDefinitionDeepCopy)) { - insertVersion = processDefinitionDeepCopy.getVersion(); - } else { - processDefinition.setUpdateTime(new Date()); - insertVersion = processService.saveProcessDefine(loginUser, processDefinition, false); - } - if (insertVersion == 0) { - putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); - throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); - } - int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), - processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); - if (insertResult == Constants.EXIT_CODE_SUCCESS) { - putMsg(result, Status.SUCCESS); - result.put(Constants.DATA_LIST, processDefinition); - } else { - putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); - throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); - } - processInstance.setProcessDefinitionVersion(insertVersion); + tenantId = tenant.getId(); + } + processDefinition.set(projectCode, processDefinition.getName(), processDefinition.getDescription(), globalParams, locations, timeout, tenantId); + processDefinition.setUpdateTime(new Date()); + int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, syncDefine, Boolean.FALSE); + if (insertVersion == 0) { + putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); + throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); + } + int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), + processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, syncDefine); + if (insertResult == Constants.EXIT_CODE_SUCCESS) { + putMsg(result, Status.SUCCESS); + result.put(Constants.DATA_LIST, processDefinition); + } else { + putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); + throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } + processInstance.setProcessDefinitionVersion(insertVersion); int update = processService.updateProcessInstance(processInstance); if (update == 0) { putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR); @@ -745,7 +736,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion() ); - if (processDefinition != null && projectCode != processDefinition.getProjectCode()) { + if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); return result; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 094ce9f775..fc06beb55f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -137,7 +137,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } } - int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs); + int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs, Boolean.TRUE); if (saveTaskResult == Constants.DEFINITION_FAILURE) { putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); @@ -230,13 +230,13 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe processTaskRelationLogList.addAll(processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList())); } int insertResult = processService.saveTaskRelation(loginUser, projectCode, processDefinition.getCode(), processDefinition.getVersion(), - processTaskRelationLogList, null); + processTaskRelationLogList, Lists.newArrayList(), Boolean.TRUE); if (insertResult != Constants.EXIT_CODE_SUCCESS) { putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); } } - int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, Lists.newArrayList(taskDefinition)); + int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, Lists.newArrayList(taskDefinition), Boolean.TRUE); if (saveTaskResult == Constants.DEFINITION_FAILURE) { putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 8bdcc1db08..84c7f68db8 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -298,7 +298,7 @@ public class ProcessDefinitionServiceTest { processDefinitionList.add(definition); Set definitionCodes = Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList); - Mockito.when(processService.saveProcessDefine(loginUser, definition, true)).thenReturn(2); + Mockito.when(processService.saveProcessDefine(loginUser, definition, Boolean.TRUE, Boolean.TRUE)).thenReturn(2); Map map3 = processDefinitionService.batchCopyProcessDefinition( loginUser, projectCode, "46", 1L); Assert.assertEquals(Status.SUCCESS, map3.get(Constants.STATUS)); @@ -330,7 +330,7 @@ public class ProcessDefinitionServiceTest { processDefinitionList.add(definition); Set definitionCodes = Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList); - Mockito.when(processService.saveProcessDefine(loginUser, definition, true)).thenReturn(2); + Mockito.when(processService.saveProcessDefine(loginUser, definition, Boolean.TRUE, Boolean.TRUE)).thenReturn(2); Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 46L)).thenReturn(getProcessTaskRelation(projectCode)); putMsg(result, Status.SUCCESS); @@ -442,7 +442,7 @@ public class ProcessDefinitionServiceTest { processTaskRelation.setProcessDefinitionCode(46L); processTaskRelation.setPostTaskCode(123L); processTaskRelationList.add(processTaskRelation); - Mockito.when(processService.findRelationByCode(projectCode, 46L)).thenReturn(processTaskRelationList); + Mockito.when(processService.findRelationByCode(46L, 1)).thenReturn(processTaskRelationList); Map onlineRes = processDefinitionService.releaseProcessDefinition( loginUser, projectCode, 46, ReleaseState.ONLINE); Assert.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS)); @@ -692,10 +692,10 @@ public class ProcessDefinitionServiceTest { Mockito.when(dataSourceMapper.queryDataSourceByNameAndUserId(userId, "mysql_1")).thenReturn(dataSource); long projectCode = 1001; - Mockito.when(processService.saveTaskDefine(Mockito.same(loginUser), Mockito.eq(projectCode), Mockito.notNull())).thenReturn(2); - Mockito.when(processService.saveProcessDefine(Mockito.same(loginUser), Mockito.notNull(), Mockito.notNull())).thenReturn(1); + Mockito.when(processService.saveTaskDefine(Mockito.same(loginUser), Mockito.eq(projectCode), Mockito.notNull(), Mockito.anyBoolean())).thenReturn(2); + Mockito.when(processService.saveProcessDefine(Mockito.same(loginUser), Mockito.notNull(), Mockito.notNull(), Mockito.anyBoolean())).thenReturn(1); Mockito.when(processService.saveTaskRelation(Mockito.same(loginUser), Mockito.eq(projectCode), Mockito.anyLong(), - Mockito.eq(1), Mockito.notNull(), Mockito.notNull())).thenReturn(0); + Mockito.eq(1), Mockito.notNull(), Mockito.notNull(), Mockito.anyBoolean())).thenReturn(0); Map result = processDefinitionService.importSqlProcessDefinition(loginUser, projectCode, mockMultipartFile); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index c7c0996869..d2c669de73 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -424,7 +424,7 @@ public class ProcessInstanceServiceTest { when(tenantMapper.queryByTenantCode("root")).thenReturn(tenant); when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant); when(processService.updateProcessInstance(processInstance)).thenReturn(1); - when(processService.saveProcessDefine(loginUser, processDefinition, false)).thenReturn(1); + when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.FALSE)).thenReturn(1); when(processDefinitionService.checkProcessNodeList(shellJson)).thenReturn(result); putMsg(result, Status.SUCCESS, projectCode); Map processInstanceFinishRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1, @@ -435,8 +435,9 @@ public class ProcessInstanceServiceTest { when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); putMsg(result, Status.SUCCESS, projectCode); + when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.FALSE, Boolean.FALSE)).thenReturn(1); Map successRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1, - shellJson, taskJson,"2020-02-21 00:00:00", false, "", "", 0, "root"); + shellJson, taskJson,"2020-02-21 00:00:00", Boolean.FALSE, "", "", 0, "root"); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java index d8852b4ce0..266f95efd0 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java @@ -97,7 +97,7 @@ public class TaskDefinitionServiceImplTest { + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0," + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]"; List taskDefinitions = JSONUtils.toList(createTaskDefinitionJson, TaskDefinitionLog.class); - Mockito.when(processService.saveTaskDefine(loginUser, projectCode, taskDefinitions)).thenReturn(1); + Mockito.when(processService.saveTaskDefine(loginUser, projectCode, taskDefinitions, Boolean.TRUE)).thenReturn(1); Map relation = taskDefinitionService .createTaskDefinition(loginUser, projectCode, createTaskDefinitionJson); Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 21556187f6..b7dfc38ccb 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -478,7 +478,7 @@ CREATE TABLE `t_ds_task_definition` ( `delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute', `resource_ids` text COMMENT 'resource id, separated by comma', `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id', - `task_group_priority` tinyint(4) DEFAULT 1 COMMENT 'task group priority', + `task_group_priority` tinyint(4) DEFAULT '0' COMMENT 'task group priority', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime NOT NULL COMMENT 'update time', PRIMARY KEY (`id`,`code`) @@ -511,7 +511,7 @@ CREATE TABLE `t_ds_task_definition_log` ( `resource_ids` text DEFAULT NULL COMMENT 'resource id, separated by comma', `operator` int(11) DEFAULT NULL COMMENT 'operator user id', `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id', - `task_group_priority` tinyint(4) DEFAULT 1 COMMENT 'task group priority', + `task_group_priority` tinyint(4) DEFAULT 0 COMMENT 'task group priority', `operate_time` datetime DEFAULT NULL COMMENT 'operate time', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime NOT NULL COMMENT 'update time', @@ -538,7 +538,7 @@ CREATE TABLE `t_ds_process_task_relation` ( `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime NOT NULL COMMENT 'update time', PRIMARY KEY (`id`), - KEY `idx_project_code_project_code` (`project_code`,`process_definition_code`) + KEY `idx_code` (`project_code`,`process_definition_code`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; -- ---------------------------- @@ -562,7 +562,7 @@ CREATE TABLE `t_ds_process_task_relation_log` ( `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime NOT NULL COMMENT 'update time', PRIMARY KEY (`id`), - KEY `idx_project_code_project_code` (`project_code`,`process_definition_code`) + KEY `idx_process_code_version` (`process_definition_code`,`process_definition_version`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; -- ---------------------------- diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql index 0a5c91a8f5..e0d6673ec9 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql @@ -16,11 +16,11 @@ */ ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_params` longtext COMMENT 'job custom parameters' AFTER `app_link`; -ALTER TABLE `t_ds_process_task_relation` ADD INDEX `idx_project_code_process_definition_code` (`project_code`, `process_definition_code`) USING BTREE; -ALTER TABLE `t_ds_process_task_relation_log` ADD INDEX `idx_project_code_process_definition_code` (`project_code`, `process_definition_code`) USING BTREE; +ALTER TABLE `t_ds_process_task_relation` ADD KEY `idx_code` (`project_code`, `process_definition_code`) USING BTREE; +ALTER TABLE `t_ds_process_task_relation_log` ADD KEY `idx_process_code_version` (`process_definition_code`,`process_definition_version`) USING BTREE; ALTER TABLE `t_ds_task_definition_log` ADD INDEX `idx_code_version` (`code`,`version`) USING BTREE; alter table t_ds_task_definition_log add `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `resource_ids`; alter table t_ds_task_definition_log add `task_group_priority` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `task_group_id`; alter table t_ds_task_definition add `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `resource_ids`; -alter table t_ds_task_definition add `task_group_priority` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `task_group_id`; \ No newline at end of file +alter table t_ds_task_definition add `task_group_priority` int(11) DEFAULT '0' COMMENT 'task group id' AFTER `task_group_id`; \ No newline at end of file diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 5ca705db78..2d83f9a8e4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -716,7 +716,7 @@ public class WorkflowExecuteThread { List recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam()); - List processTaskRelations = processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()); + List processTaskRelations = processService.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion()); List taskDefinitionLogs = processService.getTaskDefineLogListByRelation(processTaskRelations); List taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs); forbiddenTaskMap.clear(); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 4441bfbe89..ea09aeb50b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -2254,7 +2254,7 @@ public class ProcessService { return StringUtils.join(resourceIds, ","); } - public int saveTaskDefine(User operator, long projectCode, List taskDefinitionLogs) { + public int saveTaskDefine(User operator, long projectCode, List taskDefinitionLogs, Boolean syncDefine) { Date now = new Date(); List newTaskDefinitionLogs = new ArrayList<>(); List updateTaskDefinitionLogs = new ArrayList<>(); @@ -2299,13 +2299,21 @@ public class ProcessService { newTaskDefinitionLogs.add(taskDefinitionToUpdate); } else { insertResult += taskDefinitionLogMapper.insert(taskDefinitionToUpdate); - taskDefinitionToUpdate.setId(task.getId()); - updateResult += taskDefinitionMapper.updateById(taskDefinitionToUpdate); + if (Boolean.TRUE.equals(syncDefine)) { + taskDefinitionToUpdate.setId(task.getId()); + updateResult += taskDefinitionMapper.updateById(taskDefinitionToUpdate); + } else { + updateResult++; + } } } if (!newTaskDefinitionLogs.isEmpty()) { - updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs); insertResult += taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs); + if (Boolean.TRUE.equals(syncDefine)) { + updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs); + } else { + updateResult += newTaskDefinitionLogs.size(); + } } return (insertResult & updateResult) > 0 ? 1 : Constants.EXIT_CODE_SUCCESS; } @@ -2313,7 +2321,7 @@ public class ProcessService { /** * save processDefinition (including create or update processDefinition) */ - public int saveProcessDefine(User operator, ProcessDefinition processDefinition, Boolean isFromProcessDefine) { + public int saveProcessDefine(User operator, ProcessDefinition processDefinition, Boolean syncDefine, Boolean isFromProcessDefine) { ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition); Integer version = processDefineLogMapper.queryMaxVersionForDefinition(processDefinition.getCode()); int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1; @@ -2322,12 +2330,14 @@ public class ProcessService { processDefinitionLog.setOperator(operator.getId()); processDefinitionLog.setOperateTime(processDefinition.getUpdateTime()); int insertLog = processDefineLogMapper.insert(processDefinitionLog); - int result; - if (0 == processDefinition.getId()) { - result = processDefineMapper.insert(processDefinitionLog); - } else { - processDefinitionLog.setId(processDefinition.getId()); - result = processDefineMapper.updateById(processDefinitionLog); + int result = 1; + if (Boolean.TRUE.equals(syncDefine)) { + if (0 == processDefinition.getId()) { + result = processDefineMapper.insert(processDefinitionLog); + } else { + processDefinitionLog.setId(processDefinition.getId()); + result = processDefineMapper.updateById(processDefinitionLog); + } } return (insertLog & result) > 0 ? insertVersion : 0; } @@ -2336,7 +2346,8 @@ public class ProcessService { * save task relations */ public int saveTaskRelation(User operator, long projectCode, long processDefinitionCode, int processDefinitionVersion, - List taskRelationList, List taskDefinitionLogs) { + List taskRelationList, List taskDefinitionLogs, + Boolean syncDefine) { if (taskRelationList.isEmpty()) { return Constants.EXIT_CODE_SUCCESS; } @@ -2365,19 +2376,22 @@ public class ProcessService { processTaskRelationLog.setOperator(operator.getId()); processTaskRelationLog.setOperateTime(now); } - List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); - if (!processTaskRelationList.isEmpty()) { - Set processTaskRelationSet = processTaskRelationList.stream().map(ProcessTaskRelation::hashCode).collect(toSet()); - Set taskRelationSet = taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet()); - boolean result = CollectionUtils.isEqualCollection(processTaskRelationSet, taskRelationSet); - if (result) { - return Constants.EXIT_CODE_SUCCESS; + int insert = taskRelationList.size(); + if (Boolean.TRUE.equals(syncDefine)) { + List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + if (!processTaskRelationList.isEmpty()) { + Set processTaskRelationSet = processTaskRelationList.stream().map(ProcessTaskRelation::hashCode).collect(toSet()); + Set taskRelationSet = taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet()); + boolean result = CollectionUtils.isEqualCollection(processTaskRelationSet, taskRelationSet); + if (result) { + return Constants.EXIT_CODE_SUCCESS; + } + processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode); } - processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode); + insert = processTaskRelationMapper.batchInsert(taskRelationList); } - int result = processTaskRelationMapper.batchInsert(taskRelationList); int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList); - return (result & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE; + return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE; } public boolean isTaskOnline(long taskCode) { @@ -2400,14 +2414,15 @@ public class ProcessService { /** * Generate the DAG Graph based on the process definition id + * Use temporarily before refactoring taskNode * * @param processDefinition process definition * @return dag graph */ public DAG genDagGraph(ProcessDefinition processDefinition) { - List processTaskRelations = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()); - List taskNodeList = transformTask(processTaskRelations, Lists.newArrayList()); - ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new ArrayList<>(processTaskRelations)); + List taskRelations = this.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion()); + List taskNodeList = transformTask(taskRelations, Lists.newArrayList()); + ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new ArrayList<>(taskRelations)); // Generate concrete Dag to be executed return DagHelper.buildDagGraph(processDag); } @@ -2416,12 +2431,10 @@ public class ProcessService { * generate DagData */ public DagData genDagData(ProcessDefinition processDefinition) { - List processTaskRelations = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()); - List taskDefinitionLogList = genTaskDefineList(processTaskRelations); - List taskDefinitions = taskDefinitionLogList.stream() - .map(taskDefinitionLog -> JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinitionLog), TaskDefinition.class)) - .collect(Collectors.toList()); - return new DagData(processDefinition, processTaskRelations, taskDefinitions); + List taskRelations = this.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion()); + List taskDefinitionLogList = genTaskDefineList(taskRelations); + List taskDefinitions = taskDefinitionLogList.stream().map(t -> (TaskDefinition) t).collect(Collectors.toList()); + return new DagData(processDefinition, taskRelations, taskDefinitions); } public List genTaskDefineList(List processTaskRelations) { @@ -2465,10 +2478,11 @@ public class ProcessService { } /** - * find process task relation list by projectCode and processDefinitionCode + * find process task relation list by process */ - public List findRelationByCode(long projectCode, long processDefinitionCode) { - return processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + public List findRelationByCode(long processDefinitionCode, int processDefinitionVersion) { + List processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinitionCode, processDefinitionVersion); + return processTaskRelationLogList.stream().map(r -> (ProcessTaskRelation) r).collect(Collectors.toList()); } /** diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 510e3a0098..76b5bad47d 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -566,7 +566,7 @@ public class ProcessServiceTest { Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskDefinition.getCode(), taskDefinition.getVersion())).thenReturn(taskDefinition); Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinition.getCode())).thenReturn(1); Mockito.when(taskDefinitionMapper.queryByCode(taskDefinition.getCode())).thenReturn(taskDefinition); - int result = processService.saveTaskDefine(operator, projectCode, taskDefinitionLogs); + int result = processService.saveTaskDefine(operator, projectCode, taskDefinitionLogs, Boolean.TRUE); Assert.assertEquals(0, result); } @@ -579,7 +579,7 @@ public class ProcessServiceTest { processDefinition.setVersion(1); processDefinition.setCode(11L); - ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); + ProcessTaskRelationLog processTaskRelation = new ProcessTaskRelationLog(); processTaskRelation.setName("def 1"); processTaskRelation.setProcessDefinitionVersion(1); processTaskRelation.setProjectCode(1L); @@ -588,7 +588,7 @@ public class ProcessServiceTest { processTaskRelation.setPreTaskCode(2L); processTaskRelation.setUpdateTime(new Date()); processTaskRelation.setCreateTime(new Date()); - List list = new ArrayList<>(); + List list = new ArrayList<>(); list.add(processTaskRelation); TaskDefinitionLog taskDefinition = new TaskDefinitionLog(); @@ -616,7 +616,7 @@ public class ProcessServiceTest { taskDefinitionLogs.add(td2); Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(any())).thenReturn(taskDefinitionLogs); - Mockito.when(processTaskRelationMapper.queryByProcessCode(Mockito.anyLong(), Mockito.anyLong())).thenReturn(list); + Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong(), Mockito.anyInt())).thenReturn(list); DAG stringTaskNodeTaskNodeRelationDAG = processService.genDagGraph(processDefinition); Assert.assertEquals(1, stringTaskNodeTaskNodeRelationDAG.getNodesCount());