From a8f2894b7916bde13bce5d2313f57ef968e9d24e Mon Sep 17 00:00:00 2001 From: JinyLeeChina <42576980+JinyLeeChina@users.noreply.github.com> Date: Sun, 4 Apr 2021 11:26:42 +0800 Subject: [PATCH] [Feature][JsonSplit] Fix master/processInstance bug (#5206) * transform taskCode from long to string * fix process bug * code review * code review * code review * Fix master/processInstance bug Co-authored-by: JinyLeeChina <297062848@qq.com> --- .../api/service/impl/ExecutorServiceImpl.java | 3 ++- .../service/impl/ProcessDefinitionServiceImpl.java | 1 - .../api/service/impl/ProcessInstanceServiceImpl.java | 3 ++- .../server/master/runner/MasterExecThread.java | 8 ++++++-- .../service/process/ProcessService.java | 11 +++++++---- 5 files changed, 17 insertions(+), 9 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 8e75eb9c5c..be8b9ffdee 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -243,7 +243,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return result; } - ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); + ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) { result = checkProcessDefinitionValid(processDefinition, processInstance.getProcessDefinitionId()); if (result.get(Constants.STATUS) != Status.SUCCESS) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 13048c5b21..dedc23022b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -204,7 +204,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } - // TODO relationName have ? int saveResult = processService.saveProcessDefinition(loginUser, project, processDefinitionName, desc, locations, connects, processData, processDefinition); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 266528f330..c26ee4ce82 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -202,7 +202,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce } ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId); - ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); + ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); processInstance.setWarningGroupId(processDefinition.getWarningGroupId()); result.put(DATA_LIST, processInstance); putMsg(result, Status.SUCCESS); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 4f83958b22..0e5aa35b80 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -383,7 +383,7 @@ public class MasterExecThread implements Runnable { List taskNodeList = processService.genTaskNodeList(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), new HashMap<>()); forbiddenTaskList.clear(); - taskNodeList.stream().forEach(taskNode -> { + taskNodeList.forEach(taskNode -> { if (taskNode.isForbidden()) { forbiddenTaskList.put(taskNode.getName(), taskNode); } @@ -478,6 +478,8 @@ public class MasterExecThread implements Runnable { TaskInstance taskInstance = findTaskIfExists(nodeName); if (taskInstance == null) { taskInstance = new TaskInstance(); + taskInstance.setTaskCode(Long.parseLong(taskNode.getCode())); + taskInstance.setTaskDefinitionVersion(taskNode.getVersion()); // task name taskInstance.setName(nodeName); // process instance define id @@ -934,7 +936,9 @@ public class MasterExecThread implements Runnable { // send warning email if process time out. if (!sendTimeWarning && checkProcessTimeOut(processInstance)) { alertManager.sendProcessTimeoutAlert(processInstance, - processService.findProcessDefineById(processInstance.getProcessDefinitionId())); + processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion())); + sendTimeWarning = true; } for (Map.Entry> entry : activeTaskNode.entrySet()) { diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 2c6349c1dd..28751bbe95 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -390,7 +390,7 @@ public class ProcessService { } /** - * find process define by id. + * find process define by code and version. * * @param processDefinitionCode processDefinitionCode * @return process definition @@ -593,6 +593,8 @@ public class ProcessService { Command command, Map cmdParam) { ProcessInstance processInstance = new ProcessInstance(processDefinition); + processInstance.setProcessDefinitionCode(processDefinition.getCode()); + processInstance.setProcessDefinitionVersion(processDefinition.getVersion()); processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); processInstance.setRecovery(Flag.NO); processInstance.setStartTime(new Date()); @@ -718,7 +720,7 @@ public class ProcessService { */ private ProcessInstance constructProcessInstance(Command command, String host) { - ProcessInstance processInstance = null; + ProcessInstance processInstance; CommandType commandType = command.getCommandType(); Map cmdParam = JSONUtils.toMap(command.getCommandParam()); @@ -732,7 +734,7 @@ public class ProcessService { } if (cmdParam != null) { - Integer processInstanceId = 0; + int processInstanceId = 0; // recover from failure or pause tasks if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) { String processId = cmdParam.get(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING); @@ -1226,7 +1228,8 @@ public class ProcessService { * @param childDefinitionId childDefinitionId */ private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, int childDefinitionId) { - ProcessDefinition fatherDefinition = this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId()); + ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(), + parentProcessInstance.getProcessDefinitionVersion()); ProcessDefinition childDefinition = this.findProcessDefineById(childDefinitionId); if (childDefinition != null && fatherDefinition != null) { childDefinition.setWarningGroupId(fatherDefinition.getWarningGroupId());