Browse Source

[cherry-pick-7930][Improvement]support instance to definition (#7939)

* Update ProcessDefinitionMapper.xml (#7931)

Resolves  process definition list issue(data duplication )

* pick 7930

* Update WorkflowExecuteThread.java

* fix pg_ddl

* fix pg_ddl

Co-authored-by: mazhong <316422240@qq.com>
Co-authored-by: BaoLiang <29528966+lenboo@users.noreply.github.com>
2.0.7-release
JinYong Li 2 years ago committed by GitHub
parent
commit
ceae42721f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
  2. 19
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  3. 127
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  4. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  5. 6
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  6. 5
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  7. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  8. 2
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
  9. 12
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
  10. 8
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
  11. 21
      dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.3_schema/mysql/dolphinscheduler_ddl.sql
  12. 16
      dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.3_schema/mysql/dolphinscheduler_dml.sql
  13. 48
      dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.3_schema/postgresql/dolphinscheduler_ddl.sql
  14. 16
      dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.3_schema/postgresql/dolphinscheduler_dml.sql
  15. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  16. 82
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  17. 8
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

13
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java

@ -177,13 +177,13 @@ public class ProcessInstanceController extends BaseController {
@ApiImplicitParams({ @ApiImplicitParams({
@ApiImplicitParam(name = "taskRelationJson", value = "TASK_RELATION_JSON", type = "String"), @ApiImplicitParam(name = "taskRelationJson", value = "TASK_RELATION_JSON", type = "String"),
@ApiImplicitParam(name = "taskDefinitionJson", value = "TASK_DEFINITION_JSON", type = "String"), @ApiImplicitParam(name = "taskDefinitionJson", value = "TASK_DEFINITION_JSON", type = "String"),
@ApiImplicitParam(name = "id", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"), @ApiImplicitParam(name = "id", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", type = "String"), @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", type = "String"),
@ApiImplicitParam(name = "syncDefine", value = "SYNC_DEFINE", required = true, type = "Boolean"), @ApiImplicitParam(name = "syncDefine", value = "SYNC_DEFINE", required = true, type = "Boolean", example = "false"),
@ApiImplicitParam(name = "globalParams", value = "PROCESS_GLOBAL_PARAMS", type = "String"), @ApiImplicitParam(name = "globalParams", value = "PROCESS_GLOBAL_PARAMS", type = "String", example = "[]"),
@ApiImplicitParam(name = "locations", value = "PROCESS_INSTANCE_LOCATIONS", type = "String"), @ApiImplicitParam(name = "locations", value = "PROCESS_INSTANCE_LOCATIONS", type = "String"),
@ApiImplicitParam(name = "timeout", value = "PROCESS_TIMEOUT", type = "String"), @ApiImplicitParam(name = "timeout", value = "PROCESS_TIMEOUT", type = "Int", example = "0"),
@ApiImplicitParam(name = "tenantCode", value = "TENANT_CODE", type = "Int", example = "0") @ApiImplicitParam(name = "tenantCode", value = "TENANT_CODE", type = "String", example = "default")
}) })
@PutMapping(value = "/{id}") @PutMapping(value = "/{id}")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
@ -199,8 +199,7 @@ public class ProcessInstanceController extends BaseController {
@RequestParam(value = "globalParams", required = false, defaultValue = "[]") String globalParams, @RequestParam(value = "globalParams", required = false, defaultValue = "[]") String globalParams,
@RequestParam(value = "locations", required = false) String locations, @RequestParam(value = "locations", required = false) String locations,
@RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout, @RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout,
@RequestParam(value = "tenantCode", required = true) String tenantCode, @RequestParam(value = "tenantCode", required = true) String tenantCode) {
@RequestParam(value = "flag", required = false) Flag flag) {
Map<String, Object> result = processInstanceService.updateProcessInstance(loginUser, projectCode, id, Map<String, Object> result = processInstanceService.updateProcessInstance(loginUser, projectCode, id,
taskRelationJson, taskDefinitionJson, scheduleTime, syncDefine, globalParams, locations, timeout, tenantCode); taskRelationJson, taskDefinitionJson, scheduleTime, syncDefine, globalParams, locations, timeout, tenantCode);
return returnDataList(result); return returnDataList(result);

19
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -242,7 +242,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition, ProcessDefinition processDefinition,
List<TaskDefinitionLog> taskDefinitionLogs) { List<TaskDefinitionLog> taskDefinitionLogs) {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs); int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs, Boolean.TRUE);
if (saveTaskResult == Constants.EXIT_CODE_SUCCESS) { if (saveTaskResult == Constants.EXIT_CODE_SUCCESS) {
logger.info("The task has not changed, so skip"); logger.info("The task has not changed, so skip");
} }
@ -250,12 +250,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
} }
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true); int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion == 0) { if (insertVersion == 0) {
putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR); putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
} }
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(),
insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
if (insertResult == Constants.EXIT_CODE_SUCCESS) { if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition); result.put(Constants.DATA_LIST, processDefinition);
@ -567,7 +568,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinitionDeepCopy, ProcessDefinition processDefinitionDeepCopy,
List<TaskDefinitionLog> taskDefinitionLogs) { List<TaskDefinitionLog> taskDefinitionLogs) {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs); int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs, Boolean.TRUE);
if (saveTaskResult == Constants.EXIT_CODE_SUCCESS) { if (saveTaskResult == Constants.EXIT_CODE_SUCCESS) {
logger.info("The task has not changed, so skip"); logger.info("The task has not changed, so skip");
} }
@ -580,14 +581,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
insertVersion = processDefinitionDeepCopy.getVersion(); insertVersion = processDefinitionDeepCopy.getVersion();
} else { } else {
processDefinition.setUpdateTime(new Date()); processDefinition.setUpdateTime(new Date());
insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true); insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
} }
if (insertVersion == 0) { if (insertVersion == 0) {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
} }
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
if (insertResult == Constants.EXIT_CODE_SUCCESS) { if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition); result.put(Constants.DATA_LIST, processDefinition);
@ -725,7 +726,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
switch (releaseState) { switch (releaseState) {
case ONLINE: case ONLINE:
List<ProcessTaskRelation> relationList = processService.findRelationByCode(projectCode, code); List<ProcessTaskRelation> relationList = processService.findRelationByCode(code, processDefinition.getVersion());
if (CollectionUtils.isEmpty(relationList)) { if (CollectionUtils.isEmpty(relationList)) {
putMsg(result, Status.PROCESS_DAG_IS_EMPTY); putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result; return result;
@ -1672,7 +1673,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
private Map<String, Object> createEmptyDagDefine(User loginUser, ProcessDefinition processDefinition) { private Map<String, Object> createEmptyDagDefine(User loginUser, ProcessDefinition processDefinition) {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true); int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion == 0) { if (insertVersion == 0) {
putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR); putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
@ -1873,7 +1874,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
switch (releaseState) { switch (releaseState) {
case ONLINE: case ONLINE:
List<ProcessTaskRelation> relationList = processService.findRelationByCode(projectCode, code); List<ProcessTaskRelation> relationList = processService.findRelationByCode(code, processDefinition.getVersion());
if (CollectionUtils.isEmpty(relationList)) { if (CollectionUtils.isEmpty(relationList)) {
putMsg(result, Status.PROCESS_DAG_IS_EMPTY); putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result; return result;

127
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -429,7 +429,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
} }
ProcessInstance subWorkflowInstance = processService.findSubProcessInstance( ProcessInstance subWorkflowInstance = processService.findSubProcessInstance(
taskInstance.getProcessInstanceId(), taskInstance.getId()); taskInstance.getProcessInstanceId(), taskInstance.getId());
if (subWorkflowInstance == null) { if (subWorkflowInstance == null) {
putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId); putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId);
return result; return result;
@ -444,17 +444,17 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
/** /**
* update process instance * update process instance
* *
* @param loginUser login user * @param loginUser login user
* @param projectCode project code * @param projectCode project code
* @param taskRelationJson process task relation json * @param taskRelationJson process task relation json
* @param taskDefinitionJson taskDefinitionJson * @param taskDefinitionJson taskDefinitionJson
* @param processInstanceId process instance id * @param processInstanceId process instance id
* @param scheduleTime schedule time * @param scheduleTime schedule time
* @param syncDefine sync define * @param syncDefine sync define
* @param globalParams global params * @param globalParams global params
* @param locations locations for nodes * @param locations locations for nodes
* @param timeout timeout * @param timeout timeout
* @param tenantCode tenantCode * @param tenantCode tenantCode
* @return update result code * @return update result code
*/ */
@Transactional(rollbackFor = RuntimeException.class) @Transactional(rollbackFor = RuntimeException.class)
@ -483,68 +483,59 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
//check process instance status //check process instance status
if (!processInstance.getState().typeIsFinished()) { if (!processInstance.getState().typeIsFinished()) {
putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
processInstance.getName(), processInstance.getState().toString(), "update"); processInstance.getName(), processInstance.getState().toString(), "update");
return result; return result;
} }
setProcessInstance(processInstance, tenantCode, scheduleTime, globalParams, timeout); setProcessInstance(processInstance, tenantCode, scheduleTime, globalParams, timeout);
if (Boolean.TRUE.equals(syncDefine)) { List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); if (taskDefinitionLogs.isEmpty()) {
if (taskDefinitionLogs.isEmpty()) { putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson); return result;
}
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) {
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
return result; return result;
} }
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { }
if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) { int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs, syncDefine);
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName()); if (saveTaskResult == Constants.DEFINITION_FAILURE) {
return result; putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
} throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
} }
int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs); ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (saveTaskResult == Constants.DEFINITION_FAILURE) { List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); //check workflow json is valid
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); result = processDefinitionService.checkProcessNodeList(taskRelationJson);
} if (result.get(Constants.STATUS) != Status.SUCCESS) {
ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); return result;
List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class); }
//check workflow json is valid int tenantId = -1;
result = processDefinitionService.checkProcessNodeList(taskRelationJson); if (!Constants.DEFAULT.equals(tenantCode)) {
if (result.get(Constants.STATUS) != Status.SUCCESS) { Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant == null) {
putMsg(result, Status.TENANT_NOT_EXIST);
return result; return result;
} }
int tenantId = -1; tenantId = tenant.getId();
if (!Constants.DEFAULT.equals(tenantCode)) { }
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode); processDefinition.set(projectCode, processDefinition.getName(), processDefinition.getDescription(), globalParams, locations, timeout, tenantId);
if (tenant == null) { processDefinition.setUpdateTime(new Date());
putMsg(result, Status.TENANT_NOT_EXIST); int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, syncDefine, Boolean.FALSE);
return result; if (insertVersion == 0) {
} putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
tenantId = tenant.getId(); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
} }
ProcessDefinition processDefinitionDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class); int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.set(projectCode, processDefinition.getName(), processDefinition.getDescription(), globalParams, locations, timeout, tenantId); processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, syncDefine);
processDefinition.setUpdateTime(new Date()); if (insertResult == Constants.EXIT_CODE_SUCCESS) {
int insertVersion; putMsg(result, Status.SUCCESS);
if (processDefinition.equals(processDefinitionDeepCopy)) { result.put(Constants.DATA_LIST, processDefinition);
insertVersion = processDefinitionDeepCopy.getVersion(); } else {
} else { putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
processDefinition.setUpdateTime(new Date()); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
insertVersion = processService.saveProcessDefine(loginUser, processDefinition, false);
}
if (insertVersion == 0) {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
} else {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
processInstance.setProcessDefinitionVersion(insertVersion);
} }
processInstance.setProcessDefinitionVersion(insertVersion);
int update = processService.updateProcessInstance(processInstance); int update = processService.updateProcessInstance(processInstance);
if (update == 0) { if (update == 0) {
putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR); putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR);
@ -759,10 +750,10 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
} }
ProcessDefinition processDefinition = processDefinitionLogMapper.queryByDefinitionCodeAndVersion( ProcessDefinition processDefinition = processDefinitionLogMapper.queryByDefinitionCodeAndVersion(
processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion() processInstance.getProcessDefinitionVersion()
); );
if (processDefinition != null && projectCode != processDefinition.getProjectCode()) { if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result; return result;
} }

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -132,7 +132,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result; return result;
} }
} }
int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs); int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs, Boolean.TRUE);
if (saveTaskResult == Constants.DEFINITION_FAILURE) { if (saveTaskResult == Constants.DEFINITION_FAILURE) {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);

6
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -289,7 +289,7 @@ public class ProcessDefinitionServiceTest {
processDefinitionList.add(definition); processDefinitionList.add(definition);
Set<Long> definitionCodes = Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); Set<Long> definitionCodes = Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList); Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
Mockito.when(processService.saveProcessDefine(loginUser, definition, true)).thenReturn(2); Mockito.when(processService.saveProcessDefine(loginUser, definition, Boolean.TRUE, Boolean.TRUE)).thenReturn(2);
Map<String, Object> map3 = processDefinitionService.batchCopyProcessDefinition( Map<String, Object> map3 = processDefinitionService.batchCopyProcessDefinition(
loginUser, projectCode, "46", 1L); loginUser, projectCode, "46", 1L);
Assert.assertEquals(Status.SUCCESS, map3.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, map3.get(Constants.STATUS));
@ -321,7 +321,7 @@ public class ProcessDefinitionServiceTest {
processDefinitionList.add(definition); processDefinitionList.add(definition);
Set<Long> definitionCodes = Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); Set<Long> definitionCodes = Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList); Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
Mockito.when(processService.saveProcessDefine(loginUser, definition, true)).thenReturn(2); Mockito.when(processService.saveProcessDefine(loginUser, definition, Boolean.TRUE, Boolean.TRUE)).thenReturn(2);
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 46L)).thenReturn(getProcessTaskRelation(projectCode)); Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 46L)).thenReturn(getProcessTaskRelation(projectCode));
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
@ -432,7 +432,7 @@ public class ProcessDefinitionServiceTest {
processTaskRelation.setProcessDefinitionCode(46L); processTaskRelation.setProcessDefinitionCode(46L);
processTaskRelation.setPostTaskCode(123L); processTaskRelation.setPostTaskCode(123L);
processTaskRelationList.add(processTaskRelation); processTaskRelationList.add(processTaskRelation);
Mockito.when(processService.findRelationByCode(projectCode, 46L)).thenReturn(processTaskRelationList); Mockito.when(processService.findRelationByCode(46L, 1)).thenReturn(processTaskRelationList);
Map<String, Object> onlineRes = processDefinitionService.releaseProcessDefinition( Map<String, Object> onlineRes = processDefinitionService.releaseProcessDefinition(
loginUser, projectCode, 46, ReleaseState.ONLINE); loginUser, projectCode, 46, ReleaseState.ONLINE);
Assert.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS));

5
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -424,7 +424,7 @@ public class ProcessInstanceServiceTest {
when(tenantMapper.queryByTenantCode("root")).thenReturn(tenant); when(tenantMapper.queryByTenantCode("root")).thenReturn(tenant);
when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant); when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant);
when(processService.updateProcessInstance(processInstance)).thenReturn(1); when(processService.updateProcessInstance(processInstance)).thenReturn(1);
when(processService.saveProcessDefine(loginUser, processDefinition, false)).thenReturn(1); when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.FALSE)).thenReturn(1);
when(processDefinitionService.checkProcessNodeList(shellJson)).thenReturn(result); when(processDefinitionService.checkProcessNodeList(shellJson)).thenReturn(result);
putMsg(result, Status.SUCCESS, projectCode); putMsg(result, Status.SUCCESS, projectCode);
Map<String, Object> processInstanceFinishRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1, Map<String, Object> processInstanceFinishRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
@ -435,8 +435,9 @@ public class ProcessInstanceServiceTest {
when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
putMsg(result, Status.SUCCESS, projectCode); putMsg(result, Status.SUCCESS, projectCode);
when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.FALSE, Boolean.FALSE)).thenReturn(1);
Map<String, Object> successRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1, Map<String, Object> successRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
shellJson, taskJson,"2020-02-21 00:00:00", false, "", "", 0, "root"); shellJson, taskJson,"2020-02-21 00:00:00", Boolean.FALSE, "", "", 0, "root");
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
} }

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -97,7 +97,7 @@ public class TaskDefinitionServiceImplTest {
+ "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0," + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]"; + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
List<TaskDefinitionLog> taskDefinitions = JSONUtils.toList(createTaskDefinitionJson, TaskDefinitionLog.class); List<TaskDefinitionLog> taskDefinitions = JSONUtils.toList(createTaskDefinitionJson, TaskDefinitionLog.class);
Mockito.when(processService.saveTaskDefine(loginUser, projectCode, taskDefinitions)).thenReturn(1); Mockito.when(processService.saveTaskDefine(loginUser, projectCode, taskDefinitions, Boolean.TRUE)).thenReturn(1);
Map<String, Object> relation = taskDefinitionService Map<String, Object> relation = taskDefinitionService
.createTaskDefinition(loginUser, projectCode, createTaskDefinitionJson); .createTaskDefinition(loginUser, projectCode, createTaskDefinitionJson);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));

2
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml

@ -84,7 +84,7 @@
<if test=" userId != 0"> <if test=" userId != 0">
and td.user_id = #{userId} and td.user_id = #{userId}
</if> </if>
order by sc.schedule_release_state desc,td.update_time desc order by sc.schedule_release_state desc,td.update_time desc,td.id asc
</select> </select>
<select id="queryAllDefinitionList" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition"> <select id="queryAllDefinitionList" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">

12
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

@ -511,7 +511,8 @@ CREATE TABLE `t_ds_task_definition_log` (
`operate_time` datetime DEFAULT NULL COMMENT 'operate time', `operate_time` datetime DEFAULT NULL COMMENT 'operate time',
`create_time` datetime NOT NULL COMMENT 'create time', `create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time', `update_time` datetime NOT NULL COMMENT 'update time',
PRIMARY KEY (`id`) PRIMARY KEY (`id`),
KEY `idx_code_version` (`code`,`version`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ---------------------------- -- ----------------------------
@ -533,7 +534,7 @@ CREATE TABLE `t_ds_process_task_relation` (
`create_time` datetime NOT NULL COMMENT 'create time', `create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time', `update_time` datetime NOT NULL COMMENT 'update time',
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
KEY `project_code_process_definition_code_index` (`project_code`,`process_definition_code`) USING BTREE KEY `idx_code` (`project_code`,`process_definition_code`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ---------------------------- -- ----------------------------
@ -556,7 +557,8 @@ CREATE TABLE `t_ds_process_task_relation_log` (
`operate_time` datetime DEFAULT NULL COMMENT 'operate time', `operate_time` datetime DEFAULT NULL COMMENT 'operate time',
`create_time` datetime NOT NULL COMMENT 'create time', `create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time', `update_time` datetime NOT NULL COMMENT 'update time',
PRIMARY KEY (`id`) PRIMARY KEY (`id`),
KEY `idx_process_code_version` (`process_definition_code`,`process_definition_version`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ---------------------------- -- ----------------------------
@ -821,8 +823,8 @@ CREATE TABLE `t_ds_task_instance` (
`alert_flag` tinyint(4) DEFAULT NULL COMMENT 'whether alert', `alert_flag` tinyint(4) DEFAULT NULL COMMENT 'whether alert',
`retry_times` int(4) DEFAULT '0' COMMENT 'task retry times', `retry_times` int(4) DEFAULT '0' COMMENT 'task retry times',
`pid` int(4) DEFAULT NULL COMMENT 'pid of task', `pid` int(4) DEFAULT NULL COMMENT 'pid of task',
`app_link` text COMMENT 'yarn app id', `app_link` longtext COMMENT 'yarn app id',
`task_params` text COMMENT 'job custom parameters', `task_params` longtext COMMENT 'job custom parameters',
`flag` tinyint(4) DEFAULT '1' COMMENT '0 not available, 1 available', `flag` tinyint(4) DEFAULT '1' COMMENT '0 not available, 1 available',
`retry_interval` int(4) DEFAULT NULL COMMENT 'retry interval when task failed ', `retry_interval` int(4) DEFAULT NULL COMMENT 'retry interval when task failed ',
`max_retry_times` int(2) DEFAULT NULL COMMENT 'max retry times', `max_retry_times` int(2) DEFAULT NULL COMMENT 'max retry times',

8
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

@ -391,7 +391,7 @@ CREATE TABLE t_ds_task_definition (
PRIMARY KEY (id) PRIMARY KEY (id)
) ; ) ;
create index task_definition_index on t_ds_task_definition (project_code,id); create index task_definition_index on t_ds_task_definition (code,id);
DROP TABLE IF EXISTS t_ds_task_definition_log; DROP TABLE IF EXISTS t_ds_task_definition_log;
CREATE TABLE t_ds_task_definition_log ( CREATE TABLE t_ds_task_definition_log (
@ -422,6 +422,8 @@ CREATE TABLE t_ds_task_definition_log (
PRIMARY KEY (id) PRIMARY KEY (id)
) ; ) ;
create index idx_code_version on t_ds_task_definition_log (code,version);
DROP TABLE IF EXISTS t_ds_process_task_relation; DROP TABLE IF EXISTS t_ds_process_task_relation;
CREATE TABLE t_ds_process_task_relation ( CREATE TABLE t_ds_process_task_relation (
id serial NOT NULL , id serial NOT NULL ,
@ -440,7 +442,7 @@ CREATE TABLE t_ds_process_task_relation (
PRIMARY KEY (id) PRIMARY KEY (id)
) ; ) ;
create index project_code_process_definition_code_index on t_ds_process_task_relation (project_code,process_definition_code); create index idx_code on t_ds_process_task_relation (project_code,process_definition_code);
DROP TABLE IF EXISTS t_ds_process_task_relation_log; DROP TABLE IF EXISTS t_ds_process_task_relation_log;
CREATE TABLE t_ds_process_task_relation_log ( CREATE TABLE t_ds_process_task_relation_log (
@ -461,7 +463,7 @@ CREATE TABLE t_ds_process_task_relation_log (
update_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id) PRIMARY KEY (id)
) ; ) ;
create index idx_process_code_version on t_ds_process_task_relation_log (process_definition_code,process_definition_version);
-- --
-- Table structure for table t_ds_process_instance -- Table structure for table t_ds_process_instance
-- --

21
dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.3_schema/mysql/dolphinscheduler_ddl.sql

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_params` longtext COMMENT 'job custom parameters' AFTER `app_link`;
ALTER TABLE `t_ds_process_task_relation` ADD KEY `idx_code` (`project_code`, `process_definition_code`) USING BTREE;
ALTER TABLE `t_ds_process_task_relation_log` ADD KEY `idx_process_code_version` (`process_definition_code`,`process_definition_version`) USING BTREE;
ALTER TABLE `t_ds_task_definition_log` ADD INDEX `idx_code_version` (`code`,`version`) USING BTREE;

16
dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.3_schema/mysql/dolphinscheduler_dml.sql

@ -0,0 +1,16 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

48
dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.3_schema/postgresql/dolphinscheduler_ddl.sql

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
delimiter d//
CREATE OR REPLACE FUNCTION public.dolphin_update_metadata(
)
RETURNS character varying
LANGUAGE 'plpgsql'
COST 100
VOLATILE PARALLEL UNSAFE
AS $BODY$
DECLARE
v_schema varchar;
BEGIN
---get schema name
v_schema =current_schema();
EXECUTE 'DROP INDEX IF EXISTS "idx_code_relation"';
EXECUTE 'DROP INDEX IF EXISTS "idx_process_code_version_relation_log"';
EXECUTE 'DROP INDEX IF EXISTS "idx_code_version_task_log"';
EXECUTE 'CREATE INDEX IF NOT EXISTS idx_code_relation ON ' || quote_ident(v_schema) ||'.t_ds_process_task_relation USING Btree("project_code","process_definition_code")';
EXECUTE 'CREATE INDEX IF NOT EXISTS idx_process_code_version_relation_log ON ' || quote_ident(v_schema) ||'.t_ds_process_task_relation_log USING Btree("process_definition_code","process_definition_version")';
EXECUTE 'CREATE INDEX IF NOT EXISTS idx_code_version_task_log ON ' || quote_ident(v_schema) ||'.t_ds_task_definition_log USING Btree("code","version")';
return 'Success!';
exception when others then
---Raise EXCEPTION '(%)',SQLERRM;
return SQLERRM;
END;
$BODY$;
select dolphin_update_metadata();
d//

16
dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.3_schema/postgresql/dolphinscheduler_dml.sql

@ -0,0 +1,16 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -550,7 +550,7 @@ public class WorkflowExecuteThread implements Runnable {
} }
processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam()); recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
List<ProcessTaskRelation> processTaskRelationList = processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()); List<ProcessTaskRelation> processTaskRelationList = processService.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
List<TaskDefinitionLog> taskDefinitionLogList = processService.getTaskDefineLogListByRelation(processTaskRelationList); List<TaskDefinitionLog> taskDefinitionLogList = processService.getTaskDefineLogListByRelation(processTaskRelationList);
List<TaskNode> taskNodeList = processService.transformTask(processTaskRelationList, taskDefinitionLogList); List<TaskNode> taskNodeList = processService.transformTask(processTaskRelationList, taskDefinitionLogList);
forbiddenTaskList.clear(); forbiddenTaskList.clear();

82
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2221,7 +2221,7 @@ public class ProcessService {
return StringUtils.join(resourceIds, ","); return StringUtils.join(resourceIds, ",");
} }
public int saveTaskDefine(User operator, long projectCode, List<TaskDefinitionLog> taskDefinitionLogs) { public int saveTaskDefine(User operator, long projectCode, List<TaskDefinitionLog> taskDefinitionLogs, Boolean syncDefine) {
Date now = new Date(); Date now = new Date();
List<TaskDefinitionLog> newTaskDefinitionLogs = new ArrayList<>(); List<TaskDefinitionLog> newTaskDefinitionLogs = new ArrayList<>();
List<TaskDefinitionLog> updateTaskDefinitionLogs = new ArrayList<>(); List<TaskDefinitionLog> updateTaskDefinitionLogs = new ArrayList<>();
@ -2266,13 +2266,21 @@ public class ProcessService {
newTaskDefinitionLogs.add(taskDefinitionToUpdate); newTaskDefinitionLogs.add(taskDefinitionToUpdate);
} else { } else {
insertResult += taskDefinitionLogMapper.insert(taskDefinitionToUpdate); insertResult += taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
taskDefinitionToUpdate.setId(task.getId()); if (Boolean.TRUE.equals(syncDefine)) {
updateResult += taskDefinitionMapper.updateById(taskDefinitionToUpdate); taskDefinitionToUpdate.setId(task.getId());
updateResult += taskDefinitionMapper.updateById(taskDefinitionToUpdate);
} else {
updateResult++;
}
} }
} }
if (!newTaskDefinitionLogs.isEmpty()) { if (!newTaskDefinitionLogs.isEmpty()) {
updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
insertResult += taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs); insertResult += taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
if (Boolean.TRUE.equals(syncDefine)) {
updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
} else {
updateResult += newTaskDefinitionLogs.size();
}
} }
return (insertResult & updateResult) > 0 ? 1 : Constants.EXIT_CODE_SUCCESS; return (insertResult & updateResult) > 0 ? 1 : Constants.EXIT_CODE_SUCCESS;
} }
@ -2280,7 +2288,7 @@ public class ProcessService {
/** /**
* save processDefinition (including create or update processDefinition) * save processDefinition (including create or update processDefinition)
*/ */
public int saveProcessDefine(User operator, ProcessDefinition processDefinition, Boolean isFromProcessDefine) { public int saveProcessDefine(User operator, ProcessDefinition processDefinition, Boolean syncDefine, Boolean isFromProcessDefine) {
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition); ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
Integer version = processDefineLogMapper.queryMaxVersionForDefinition(processDefinition.getCode()); Integer version = processDefineLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1; int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1;
@ -2289,12 +2297,14 @@ public class ProcessService {
processDefinitionLog.setOperator(operator.getId()); processDefinitionLog.setOperator(operator.getId());
processDefinitionLog.setOperateTime(processDefinition.getUpdateTime()); processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
int insertLog = processDefineLogMapper.insert(processDefinitionLog); int insertLog = processDefineLogMapper.insert(processDefinitionLog);
int result; int result = 1;
if (0 == processDefinition.getId()) { if (Boolean.TRUE.equals(syncDefine)) {
result = processDefineMapper.insert(processDefinitionLog); if (0 == processDefinition.getId()) {
} else { result = processDefineMapper.insert(processDefinitionLog);
processDefinitionLog.setId(processDefinition.getId()); } else {
result = processDefineMapper.updateById(processDefinitionLog); processDefinitionLog.setId(processDefinition.getId());
result = processDefineMapper.updateById(processDefinitionLog);
}
} }
return (insertLog & result) > 0 ? insertVersion : 0; return (insertLog & result) > 0 ? insertVersion : 0;
} }
@ -2303,7 +2313,8 @@ public class ProcessService {
* save task relations * save task relations
*/ */
public int saveTaskRelation(User operator, long projectCode, long processDefinitionCode, int processDefinitionVersion, public int saveTaskRelation(User operator, long projectCode, long processDefinitionCode, int processDefinitionVersion,
List<ProcessTaskRelationLog> taskRelationList, List<TaskDefinitionLog> taskDefinitionLogs) { List<ProcessTaskRelationLog> taskRelationList, List<TaskDefinitionLog> taskDefinitionLogs,
Boolean syncDefine) {
if (taskRelationList.isEmpty()) { if (taskRelationList.isEmpty()) {
return Constants.EXIT_CODE_SUCCESS; return Constants.EXIT_CODE_SUCCESS;
} }
@ -2332,19 +2343,22 @@ public class ProcessService {
processTaskRelationLog.setOperator(operator.getId()); processTaskRelationLog.setOperator(operator.getId());
processTaskRelationLog.setOperateTime(now); processTaskRelationLog.setOperateTime(now);
} }
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); int insert = taskRelationList.size();
if (!processTaskRelationList.isEmpty()) { if (Boolean.TRUE.equals(syncDefine)) {
Set<Integer> processTaskRelationSet = processTaskRelationList.stream().map(ProcessTaskRelation::hashCode).collect(toSet()); List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
Set<Integer> taskRelationSet = taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet()); if (!processTaskRelationList.isEmpty()) {
boolean result = CollectionUtils.isEqualCollection(processTaskRelationSet, taskRelationSet); Set<Integer> processTaskRelationSet = processTaskRelationList.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
if (result) { Set<Integer> taskRelationSet = taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
return Constants.EXIT_CODE_SUCCESS; boolean result = CollectionUtils.isEqualCollection(processTaskRelationSet, taskRelationSet);
if (result) {
return Constants.EXIT_CODE_SUCCESS;
}
processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
} }
processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode); insert = processTaskRelationMapper.batchInsert(taskRelationList);
} }
int result = processTaskRelationMapper.batchInsert(taskRelationList);
int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList); int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList);
return (result & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE; return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
} }
public boolean isTaskOnline(long taskCode) { public boolean isTaskOnline(long taskCode) {
@ -2367,14 +2381,15 @@ public class ProcessService {
/** /**
* Generate the DAG Graph based on the process definition id * Generate the DAG Graph based on the process definition id
* Use temporarily before refactoring taskNode
* *
* @param processDefinition process definition * @param processDefinition process definition
* @return dag graph * @return dag graph
*/ */
public DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) { public DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) {
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()); List<ProcessTaskRelation> taskRelations = this.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
List<TaskNode> taskNodeList = transformTask(processTaskRelations, Lists.newArrayList()); List<TaskNode> taskNodeList = transformTask(taskRelations, Lists.newArrayList());
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new ArrayList<>(processTaskRelations)); ProcessDag processDag = DagHelper.getProcessDag(taskNodeList, new ArrayList<>(taskRelations));
// Generate concrete Dag to be executed // Generate concrete Dag to be executed
return DagHelper.buildDagGraph(processDag); return DagHelper.buildDagGraph(processDag);
} }
@ -2383,12 +2398,10 @@ public class ProcessService {
* generate DagData * generate DagData
*/ */
public DagData genDagData(ProcessDefinition processDefinition) { public DagData genDagData(ProcessDefinition processDefinition) {
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()); List<ProcessTaskRelation> taskRelations = this.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
List<TaskDefinitionLog> taskDefinitionLogList = genTaskDefineList(processTaskRelations); List<TaskDefinitionLog> taskDefinitionLogList = genTaskDefineList(taskRelations);
List<TaskDefinition> taskDefinitions = taskDefinitionLogList.stream() List<TaskDefinition> taskDefinitions = taskDefinitionLogList.stream().map(t -> (TaskDefinition) t).collect(Collectors.toList());
.map(taskDefinitionLog -> JSONUtils.parseObject(JSONUtils.toJsonString(taskDefinitionLog), TaskDefinition.class)) return new DagData(processDefinition, taskRelations, taskDefinitions);
.collect(Collectors.toList());
return new DagData(processDefinition, processTaskRelations, taskDefinitions);
} }
public List<TaskDefinitionLog> genTaskDefineList(List<ProcessTaskRelation> processTaskRelations) { public List<TaskDefinitionLog> genTaskDefineList(List<ProcessTaskRelation> processTaskRelations) {
@ -2432,10 +2445,11 @@ public class ProcessService {
} }
/** /**
* find process task relation list by projectCode and processDefinitionCode * find process task relation list by process
*/ */
public List<ProcessTaskRelation> findRelationByCode(long projectCode, long processDefinitionCode) { public List<ProcessTaskRelation> findRelationByCode(long processDefinitionCode, int processDefinitionVersion) {
return processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinitionCode, processDefinitionVersion);
return processTaskRelationLogList.stream().map(r -> (ProcessTaskRelation) r).collect(Collectors.toList());
} }
/** /**

8
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -423,7 +423,7 @@ public class ProcessServiceTest {
Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskDefinition.getCode(), taskDefinition.getVersion())).thenReturn(taskDefinition); Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskDefinition.getCode(), taskDefinition.getVersion())).thenReturn(taskDefinition);
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinition.getCode())).thenReturn(1); Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinition.getCode())).thenReturn(1);
Mockito.when(taskDefinitionMapper.queryByCode(taskDefinition.getCode())).thenReturn(taskDefinition); Mockito.when(taskDefinitionMapper.queryByCode(taskDefinition.getCode())).thenReturn(taskDefinition);
int result = processService.saveTaskDefine(operator, projectCode, taskDefinitionLogs); int result = processService.saveTaskDefine(operator, projectCode, taskDefinitionLogs, Boolean.TRUE);
Assert.assertEquals(0, result); Assert.assertEquals(0, result);
} }
@ -436,7 +436,7 @@ public class ProcessServiceTest {
processDefinition.setVersion(1); processDefinition.setVersion(1);
processDefinition.setCode(11L); processDefinition.setCode(11L);
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); ProcessTaskRelationLog processTaskRelation = new ProcessTaskRelationLog();
processTaskRelation.setName("def 1"); processTaskRelation.setName("def 1");
processTaskRelation.setProcessDefinitionVersion(1); processTaskRelation.setProcessDefinitionVersion(1);
processTaskRelation.setProjectCode(1L); processTaskRelation.setProjectCode(1L);
@ -445,7 +445,7 @@ public class ProcessServiceTest {
processTaskRelation.setPreTaskCode(2L); processTaskRelation.setPreTaskCode(2L);
processTaskRelation.setUpdateTime(new Date()); processTaskRelation.setUpdateTime(new Date());
processTaskRelation.setCreateTime(new Date()); processTaskRelation.setCreateTime(new Date());
List<ProcessTaskRelation> list = new ArrayList<>(); List<ProcessTaskRelationLog> list = new ArrayList<>();
list.add(processTaskRelation); list.add(processTaskRelation);
TaskDefinitionLog taskDefinition = new TaskDefinitionLog(); TaskDefinitionLog taskDefinition = new TaskDefinitionLog();
@ -473,7 +473,7 @@ public class ProcessServiceTest {
taskDefinitionLogs.add(td2); taskDefinitionLogs.add(td2);
Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(any())).thenReturn(taskDefinitionLogs); Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(any())).thenReturn(taskDefinitionLogs);
Mockito.when(processTaskRelationMapper.queryByProcessCode(Mockito.anyLong(), Mockito.anyLong())).thenReturn(list); Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong(), Mockito.anyInt())).thenReturn(list);
DAG<String, TaskNode, TaskNodeRelation> stringTaskNodeTaskNodeRelationDAG = processService.genDagGraph(processDefinition); DAG<String, TaskNode, TaskNodeRelation> stringTaskNodeTaskNodeRelationDAG = processService.genDagGraph(processDefinition);
Assert.assertEquals(1, stringTaskNodeTaskNodeRelationDAG.getNodesCount()); Assert.assertEquals(1, stringTaskNodeTaskNodeRelationDAG.getNodesCount());

Loading…
Cancel
Save