Browse Source

[Feature][JsonSplit] Fix master/processInstance bug (#5206)

* transform taskCode from long to string

* fix process bug

* code review

* code review

* code review

* Fix master/processInstance bug

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 3 years ago committed by GitHub
parent
commit
a8f2894b79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  2. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  3. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  4. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  5. 11
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -243,7 +243,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result;
}
ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) {
result = checkProcessDefinitionValid(processDefinition, processInstance.getProcessDefinitionId());
if (result.get(Constants.STATUS) != Status.SUCCESS) {

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -204,7 +204,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
// TODO relationName have ?
int saveResult = processService.saveProcessDefinition(loginUser, project, processDefinitionName, desc,
locations, connects, processData, processDefinition);

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -202,7 +202,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
processInstance.setWarningGroupId(processDefinition.getWarningGroupId());
result.put(DATA_LIST, processInstance);
putMsg(result, Status.SUCCESS);

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -383,7 +383,7 @@ public class MasterExecThread implements Runnable {
List<TaskNode> taskNodeList =
processService.genTaskNodeList(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), new HashMap<>());
forbiddenTaskList.clear();
taskNodeList.stream().forEach(taskNode -> {
taskNodeList.forEach(taskNode -> {
if (taskNode.isForbidden()) {
forbiddenTaskList.put(taskNode.getName(), taskNode);
}
@ -478,6 +478,8 @@ public class MasterExecThread implements Runnable {
TaskInstance taskInstance = findTaskIfExists(nodeName);
if (taskInstance == null) {
taskInstance = new TaskInstance();
taskInstance.setTaskCode(Long.parseLong(taskNode.getCode()));
taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
// task name
taskInstance.setName(nodeName);
// process instance define id
@ -934,7 +936,9 @@ public class MasterExecThread implements Runnable {
// send warning email if process time out.
if (!sendTimeWarning && checkProcessTimeOut(processInstance)) {
alertManager.sendProcessTimeoutAlert(processInstance,
processService.findProcessDefineById(processInstance.getProcessDefinitionId()));
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion()));
sendTimeWarning = true;
}
for (Map.Entry<MasterBaseTaskExecThread, Future<Boolean>> entry : activeTaskNode.entrySet()) {

11
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -390,7 +390,7 @@ public class ProcessService {
}
/**
* find process define by id.
* find process define by code and version.
*
* @param processDefinitionCode processDefinitionCode
* @return process definition
@ -593,6 +593,8 @@ public class ProcessService {
Command command,
Map<String, String> cmdParam) {
ProcessInstance processInstance = new ProcessInstance(processDefinition);
processInstance.setProcessDefinitionCode(processDefinition.getCode());
processInstance.setProcessDefinitionVersion(processDefinition.getVersion());
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
processInstance.setRecovery(Flag.NO);
processInstance.setStartTime(new Date());
@ -718,7 +720,7 @@ public class ProcessService {
*/
private ProcessInstance constructProcessInstance(Command command, String host) {
ProcessInstance processInstance = null;
ProcessInstance processInstance;
CommandType commandType = command.getCommandType();
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
@ -732,7 +734,7 @@ public class ProcessService {
}
if (cmdParam != null) {
Integer processInstanceId = 0;
int processInstanceId = 0;
// recover from failure or pause tasks
if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) {
String processId = cmdParam.get(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING);
@ -1226,7 +1228,8 @@ public class ProcessService {
* @param childDefinitionId childDefinitionId
*/
private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, int childDefinitionId) {
ProcessDefinition fatherDefinition = this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId());
ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(),
parentProcessInstance.getProcessDefinitionVersion());
ProcessDefinition childDefinition = this.findProcessDefineById(childDefinitionId);
if (childDefinition != null && fatherDefinition != null) {
childDefinition.setWarningGroupId(fatherDefinition.getWarningGroupId());

Loading…
Cancel
Save