From c59b2d5b8cd6532b38c37f6284ca034c5b2321c3 Mon Sep 17 00:00:00 2001 From: caishunfeng Date: Mon, 23 Oct 2023 10:19:57 +0800 Subject: [PATCH] remove sub workflow finish notify (#15057) Co-authored-by: xiangzihao <460888207@qq.com> --- .../runner/WorkflowExecuteThreadPool.java | 103 ------------------ 1 file changed, 103 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index c0743b5a2c..1aa4232865 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -17,30 +17,15 @@ package org.apache.dolphinscheduler.server.master.runner; -import org.apache.dolphinscheduler.common.enums.Flag; -import org.apache.dolphinscheduler.common.enums.StateEventType; -import org.apache.dolphinscheduler.common.utils.NetUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; -import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener; -import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.event.StateEvent; -import org.apache.dolphinscheduler.server.master.event.TaskStateEvent; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder; -import org.apache.dolphinscheduler.service.process.ProcessService; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.PostConstruct; -import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -49,8 +34,6 @@ import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; -import com.google.common.base.Strings; - /** * Used to execute {@link WorkflowExecuteRunnable}. */ @@ -61,9 +44,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { @Autowired private MasterConfig masterConfig; - @Autowired - private ProcessService processService; - @Autowired private ProcessInstanceExecCacheManager processInstanceExecCacheManager; @@ -122,8 +102,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); try { log.error("Workflow instance events handle failed", ex); - notifyProcessChanged( - workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance()); multiThreadFilterMap.remove(workflowInstanceId); } finally { LogUtils.removeWorkflowInstanceIdMDC(); @@ -140,8 +118,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { .removeProcess4TimeoutCheck(workflowExecuteThread.getWorkflowExecuteContext() .getWorkflowInstance().getId()); processInstanceExecCacheManager.removeByProcessInstanceId(workflowInstanceId); - notifyProcessChanged( - workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance()); log.info("Workflow instance is finished."); } } catch (Exception e) { @@ -155,83 +131,4 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { }); } - /** - * notify process change - */ - private void notifyProcessChanged(ProcessInstance finishProcessInstance) { - if (Flag.NO == finishProcessInstance.getIsSubProcess()) { - return; - } - Map fatherMaps = processService.notifyProcessList(finishProcessInstance.getId()); - for (Map.Entry entry : fatherMaps.entrySet()) { - ProcessInstance processInstance = entry.getKey(); - TaskInstance taskInstance = entry.getValue(); - crossWorkflowParameterPassing(finishProcessInstance, taskInstance); - String address = NetUtils.getAddr(masterConfig.getListenPort()); - try { - LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(), taskInstance.getId()); - if (processInstance.getHost().equalsIgnoreCase(address)) { - log.info("Process host is local master, will notify it"); - this.notifyMyself(processInstance, taskInstance); - } else { - log.info("Process host is remote master, will notify it"); - this.notifyProcess(finishProcessInstance, processInstance, taskInstance); - } - } finally { - LogUtils.removeWorkflowAndTaskInstanceIdMDC(); - } - } - } - - private void crossWorkflowParameterPassing(ProcessInstance finishProcessInstance, TaskInstance taskInstance) { - try { - MasterTaskExecuteRunnable masterTaskExecuteRunnable = - MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskInstance.getId()); - masterTaskExecuteRunnable.getILogicTask().getTaskParameters() - .setVarPool(finishProcessInstance.getVarPool()); - log.info("Cross workflow parameter passing success, finishProcessInstanceId: {}, taskInstanceId: {}", - finishProcessInstance.getId(), taskInstance.getId()); - } catch (Exception ex) { - log.error("Cross workflow parameter passing error, finishProcessInstanceId: {}, taskInstanceId: {}", - finishProcessInstance.getId(), taskInstance.getId(), ex); - } - } - - /** - * notify myself - */ - private void notifyMyself(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) { - if (!processInstanceExecCacheManager.contains(processInstance.getId())) { - log.warn("The execute cache manager doesn't contains this workflow instance"); - return; - } - TaskStateEvent stateEvent = TaskStateEvent.builder() - .processInstanceId(processInstance.getId()) - .taskInstanceId(taskInstance.getId()) - .type(StateEventType.TASK_STATE_CHANGE) - .status(TaskExecutionStatus.RUNNING_EXECUTION) - .build(); - this.submitStateEvent(stateEvent); - } - - /** - * notify process's master - */ - private void notifyProcess(ProcessInstance finishProcessInstance, ProcessInstance processInstance, - TaskInstance taskInstance) { - String processInstanceHost = processInstance.getHost(); - if (Strings.isNullOrEmpty(processInstanceHost)) { - log.error("Process {} host is empty, cannot notify task {} now, taskId: {}", processInstance.getName(), - taskInstance.getName(), taskInstance.getId()); - return; - } - ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = - SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(processInstanceHost, ITaskInstanceExecutionEventListener.class); - - WorkflowInstanceStateChangeEvent workflowInstanceStateChangeEvent = new WorkflowInstanceStateChangeEvent( - finishProcessInstance.getId(), 0, finishProcessInstance.getState(), processInstance.getId(), - taskInstance.getId()); - iTaskInstanceExecutionEventListener.onWorkflowInstanceInstanceStateChange(workflowInstanceStateChangeEvent); - } }