Browse Source

remove sub workflow finish notify (#15057)

Co-authored-by: xiangzihao <460888207@qq.com>
augit-log
caishunfeng 7 months ago committed by GitHub
parent
commit
c59b2d5b8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 103
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

103
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@ -17,30 +17,15 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -49,8 +34,6 @@ import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import com.google.common.base.Strings;
/**
* Used to execute {@link WorkflowExecuteRunnable}.
*/
@ -61,9 +44,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@Autowired
private MasterConfig masterConfig;
@Autowired
private ProcessService processService;
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@ -122,8 +102,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId);
try {
log.error("Workflow instance events handle failed", ex);
notifyProcessChanged(
workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance());
multiThreadFilterMap.remove(workflowInstanceId);
} finally {
LogUtils.removeWorkflowInstanceIdMDC();
@ -140,8 +118,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
.removeProcess4TimeoutCheck(workflowExecuteThread.getWorkflowExecuteContext()
.getWorkflowInstance().getId());
processInstanceExecCacheManager.removeByProcessInstanceId(workflowInstanceId);
notifyProcessChanged(
workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance());
log.info("Workflow instance is finished.");
}
} catch (Exception e) {
@ -155,83 +131,4 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
});
}
/**
* notify process change
*/
private void notifyProcessChanged(ProcessInstance finishProcessInstance) {
if (Flag.NO == finishProcessInstance.getIsSubProcess()) {
return;
}
Map<ProcessInstance, TaskInstance> fatherMaps = processService.notifyProcessList(finishProcessInstance.getId());
for (Map.Entry<ProcessInstance, TaskInstance> entry : fatherMaps.entrySet()) {
ProcessInstance processInstance = entry.getKey();
TaskInstance taskInstance = entry.getValue();
crossWorkflowParameterPassing(finishProcessInstance, taskInstance);
String address = NetUtils.getAddr(masterConfig.getListenPort());
try {
LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(), taskInstance.getId());
if (processInstance.getHost().equalsIgnoreCase(address)) {
log.info("Process host is local master, will notify it");
this.notifyMyself(processInstance, taskInstance);
} else {
log.info("Process host is remote master, will notify it");
this.notifyProcess(finishProcessInstance, processInstance, taskInstance);
}
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}
private void crossWorkflowParameterPassing(ProcessInstance finishProcessInstance, TaskInstance taskInstance) {
try {
MasterTaskExecuteRunnable masterTaskExecuteRunnable =
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskInstance.getId());
masterTaskExecuteRunnable.getILogicTask().getTaskParameters()
.setVarPool(finishProcessInstance.getVarPool());
log.info("Cross workflow parameter passing success, finishProcessInstanceId: {}, taskInstanceId: {}",
finishProcessInstance.getId(), taskInstance.getId());
} catch (Exception ex) {
log.error("Cross workflow parameter passing error, finishProcessInstanceId: {}, taskInstanceId: {}",
finishProcessInstance.getId(), taskInstance.getId(), ex);
}
}
/**
* notify myself
*/
private void notifyMyself(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
if (!processInstanceExecCacheManager.contains(processInstance.getId())) {
log.warn("The execute cache manager doesn't contains this workflow instance");
return;
}
TaskStateEvent stateEvent = TaskStateEvent.builder()
.processInstanceId(processInstance.getId())
.taskInstanceId(taskInstance.getId())
.type(StateEventType.TASK_STATE_CHANGE)
.status(TaskExecutionStatus.RUNNING_EXECUTION)
.build();
this.submitStateEvent(stateEvent);
}
/**
* notify process's master
*/
private void notifyProcess(ProcessInstance finishProcessInstance, ProcessInstance processInstance,
TaskInstance taskInstance) {
String processInstanceHost = processInstance.getHost();
if (Strings.isNullOrEmpty(processInstanceHost)) {
log.error("Process {} host is empty, cannot notify task {} now, taskId: {}", processInstance.getName(),
taskInstance.getName(), taskInstance.getId());
return;
}
ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener =
SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(processInstanceHost, ITaskInstanceExecutionEventListener.class);
WorkflowInstanceStateChangeEvent workflowInstanceStateChangeEvent = new WorkflowInstanceStateChangeEvent(
finishProcessInstance.getId(), 0, finishProcessInstance.getState(), processInstance.getId(),
taskInstance.getId());
iTaskInstanceExecutionEventListener.onWorkflowInstanceInstanceStateChange(workflowInstanceStateChangeEvent);
}
}

Loading…
Cancel
Save