From 8dc529d35f19514123902daf1ad7b0e352988d87 Mon Sep 17 00:00:00 2001 From: zwZjut Date: Mon, 10 Jan 2022 18:47:26 +0800 Subject: [PATCH] [dolphinscheduler-server] process instance is always running (#7914) * #7698 * #7698 * #7698 * fix workflow is always running * fix workflow is always running * fix workflow is always running * #7700 #7698 * add license * fix ut * #7700 #7698 Co-authored-by: honghuo.zw --- .../dolphinscheduler/common/enums/Event.java | 3 +- .../command/TaskKillResponseCommand.java | 14 +++ .../server/master/MasterServer.java | 4 +- .../processor/TaskKillResponseProcessor.java | 26 +++++ .../processor/queue/TaskResponseEvent.java | 15 ++- .../queue/TaskResponsePersistThread.java | 12 +++ .../processor/queue/TaskResponseService.java | 12 ++- .../master/runner/MasterExecService.java | 102 ++++++++++++++++++ .../master/runner/MasterSchedulerService.java | 23 +++- .../runner/StateWheelExecuteThread.java | 36 +++++-- .../master/runner/WorkflowExecuteThread.java | 27 +++-- .../master/runner/task/BaseTaskProcessor.java | 17 +++ .../runner/task/CommonTaskProcessor.java | 19 +++- .../runner/task/ConditionTaskProcessor.java | 18 +++- .../runner/task/DependentTaskProcessor.java | 20 +++- .../master/runner/task/ITaskProcessor.java | 2 + .../master/runner/task/SubTaskProcessor.java | 11 ++ .../runner/task/SwitchTaskProcessor.java | 19 +++- .../worker/processor/TaskKillProcessor.java | 11 ++ .../worker/runner/TaskExecuteThread.java | 4 + .../worker/runner/WorkerExecService.java | 85 +++++++++++++++ .../worker/runner/WorkerManagerThread.java | 21 +++- .../master/WorkflowExecuteThreadTest.java | 2 +- .../TaskKillResponseProcessorTest.java | 13 +++ 24 files changed, 483 insertions(+), 33 deletions(-) create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecService.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java index 9cec2766f1..5adfe63d63 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java @@ -19,5 +19,6 @@ package org.apache.dolphinscheduler.common.enums; public enum Event { ACK, - RESULT; + RESULT, + ACTION_STOP; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java index 03ad4dd694..98fbe0f8a5 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java @@ -48,6 +48,11 @@ public class TaskKillResponseCommand implements Serializable { */ private int processId; + /** + * process instance id + */ + private int processInstanceId; + /** * other resource manager appId , for example : YARN etc */ @@ -85,6 +90,14 @@ public class TaskKillResponseCommand implements Serializable { this.processId = processId; } + public int getProcessInstanceId() { + return processInstanceId; + } + + public void setProcessInstanceId(int processInstanceId) { + this.processInstanceId = processInstanceId; + } + public List getAppIds() { return appIds; } @@ -114,6 +127,7 @@ public class TaskKillResponseCommand implements Serializable { + ", status=" + status + ", processId=" + processId + ", appIds=" + appIds + + ", processInstanceId=" + processInstanceId + '}'; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index a5d7cf4bfb..a6642a1b58 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -138,11 +138,13 @@ public class MasterServer implements IStoppable { ackProcessor.init(processInstanceExecMaps); TaskResponseProcessor taskResponseProcessor = new TaskResponseProcessor(); taskResponseProcessor.init(processInstanceExecMaps); + TaskKillResponseProcessor taskKillResponseProcessor = new TaskKillResponseProcessor(); + taskKillResponseProcessor.init(processInstanceExecMaps); StateEventProcessor stateEventProcessor = new StateEventProcessor(); stateEventProcessor.init(processInstanceExecMaps); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, ackProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor); this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor); this.nettyRemotingServer.start(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java index 28f18fe961..36dde2982c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java @@ -17,11 +17,18 @@ package org.apache.dolphinscheduler.server.master.processor; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +44,19 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor { private final Logger logger = LoggerFactory.getLogger(TaskKillResponseProcessor.class); + /** + * process service + */ + private final TaskResponseService taskResponseService; + + public TaskKillResponseProcessor() { + this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); + } + + public void init(ConcurrentHashMap processInstanceExecMaps) { + this.taskResponseService.init(processInstanceExecMaps); + } + /** * task final result response * need master process , state persistence @@ -50,6 +70,12 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor { TaskKillResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskKillResponseCommand.class); logger.info("received task kill response command : {}", responseCommand); + // TaskResponseEvent + TaskResponseEvent taskResponseEvent = TaskResponseEvent.newActionStop(ExecutionStatus.of(responseCommand.getStatus()), + responseCommand.getTaskInstanceId(), + responseCommand.getProcessInstanceId() + ); + taskResponseService.addResponse(taskResponseEvent); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java index 224a61753d..f2a080c607 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java @@ -17,13 +17,13 @@ package org.apache.dolphinscheduler.server.master.processor.queue; -import com.fasterxml.jackson.annotation.JsonFormat; - import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import java.util.Date; +import com.fasterxml.jackson.annotation.JsonFormat; + import io.netty.channel.Channel; /** @@ -94,6 +94,17 @@ public class TaskResponseEvent { private Channel channel; private int processInstanceId; + + public static TaskResponseEvent newActionStop(ExecutionStatus state, + int taskInstanceId, + int processInstanceId) { + TaskResponseEvent event = new TaskResponseEvent(); + event.setState(state); + event.setTaskInstanceId(taskInstanceId); + event.setEvent(Event.ACTION_STOP); + event.setProcessInstanceId(processInstanceId); + return event; + } public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java index 621dd79c74..ca8ad0ac93 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java @@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand; import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; +import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor; +import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.concurrent.ConcurrentHashMap; @@ -146,6 +148,16 @@ public class TaskResponsePersistThread implements Runnable { channel.writeAndFlush(taskResponseCommand.convert2Command()); } break; + case ACTION_STOP: + WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId()); + if (workflowExecuteThread != null) { + ITaskProcessor taskProcessor = workflowExecuteThread.getActiveTaskProcessorMaps().get(taskResponseEvent.getTaskInstanceId()); + if (taskProcessor != null) { + taskProcessor.persist(TaskAction.STOP); + logger.debug("ACTION_STOP: task instance id:{}, process instance id:{}", taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getProcessInstanceId()); + } + } + break; default: throw new IllegalArgumentException("invalid event type : " + event); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index 5ef235089e..b5e70eedc8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -115,10 +115,20 @@ public class TaskResponseService { try { this.taskResponseWorker.interrupt(); this.taskResponseEventHandler.interrupt(); - this.eventExecService.shutdown(); } catch (Exception e) { logger.error("stop error:", e); } + this.eventExecService.shutdown(); + long waitSec = 5; + boolean terminated = false; + try { + terminated = eventExecService.awaitTermination(waitSec, TimeUnit.SECONDS); + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + if (!terminated) { + logger.warn("TaskResponseService: eventExecService shutdown without terminated: {}s, increase await time", waitSec); + } } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecService.java new file mode 100644 index 0000000000..edb5e6611d --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecService.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +public class MasterExecService { + + /** + * logger of MasterExecService + */ + private static final Logger logger = LoggerFactory.getLogger(MasterExecService.class); + + /** + * master exec service + */ + private final ThreadPoolExecutor execService; + + private final ListeningExecutorService listeningExecutorService; + + /** + * start process failed map + */ + private final ConcurrentHashMap startProcessFailedMap; + + private final ConcurrentHashMap filterMap = new ConcurrentHashMap<>(); + + public MasterExecService(ConcurrentHashMap startProcessFailedMap,ThreadPoolExecutor execService) { + this.startProcessFailedMap = startProcessFailedMap; + this.execService = execService; + this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService); + } + + public void execute(WorkflowExecuteThread workflowExecuteThread) { + if (workflowExecuteThread == null + || workflowExecuteThread.getProcessInstance() == null + || workflowExecuteThread.isStart() + || filterMap.containsKey(workflowExecuteThread.getProcessInstance().getId())) { + return; + } + Integer processInstanceId = workflowExecuteThread.getProcessInstance().getId(); + filterMap.put(processInstanceId, workflowExecuteThread); + ListenableFuture future = this.listeningExecutorService.submit(workflowExecuteThread); + FutureCallback futureCallback = new FutureCallback() { + @Override + public void onSuccess(Object o) { + if (!workflowExecuteThread.isStart()) { + startProcessFailedMap.putIfAbsent(processInstanceId, workflowExecuteThread); + } else { + startProcessFailedMap.remove(processInstanceId); + } + filterMap.remove(processInstanceId); + } + + @Override + public void onFailure(Throwable throwable) { + logger.error("handle events {} failed", processInstanceId, throwable); + if (!workflowExecuteThread.isStart()) { + startProcessFailedMap.putIfAbsent(processInstanceId, workflowExecuteThread); + } else { + startProcessFailedMap.remove(processInstanceId); + } + filterMap.remove(processInstanceId); + } + }; + Futures.addCallback(future, futureCallback, this.listeningExecutorService); + } + + public void shutdown() { + this.execService.shutdown(); + } + + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return this.execService.awaitTermination(timeout, unit); + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 33c84b38e8..79cce4f05e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; @@ -57,6 +58,12 @@ public class MasterSchedulerService extends Thread { */ private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class); + /** + * handle task event + */ + @Autowired + private TaskResponseService taskResponseService; + /** * dolphinscheduler database interface */ @@ -92,7 +99,12 @@ public class MasterSchedulerService extends Thread { /** * master exec service */ - private ThreadPoolExecutor masterExecService; + private MasterExecService masterExecService; + + /** + * start process failed map + */ + private final ConcurrentHashMap startProcessFailedMap = new ConcurrentHashMap<>(); /** * process instance execution list @@ -126,11 +138,15 @@ public class MasterSchedulerService extends Thread { */ public void init(ConcurrentHashMap processInstanceExecMaps) { this.processInstanceExecMaps = processInstanceExecMaps; - this.masterExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()); + this.masterExecService = new MasterExecService(this.startProcessFailedMap, + (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads())); NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); - stateWheelExecuteThread = new StateWheelExecuteThread(processService, + stateWheelExecuteThread = new StateWheelExecuteThread( + masterExecService, + processService, + startProcessFailedMap, processTimeoutCheckList, taskTimeoutCheckList, taskRetryCheckList, @@ -202,6 +218,7 @@ public class MasterSchedulerService extends Thread { if (processInstance != null) { WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread( processInstance + , taskResponseService , processService , nettyExecutorManager , processAlertManager diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index de6db1ddb2..e12be0ce97 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -49,15 +49,30 @@ public class StateWheelExecuteThread extends Thread { private ConcurrentHashMap taskInstanceRetryCheckList; private ConcurrentHashMap processInstanceExecMaps; + /** + * start process failed map + */ + private final ConcurrentHashMap startProcessFailedMap; + private int stateCheckIntervalSecs; - public StateWheelExecuteThread(ProcessService processService, - ConcurrentHashMap processInstanceTimeoutCheckList, - ConcurrentHashMap taskInstanceTimeoutCheckList, - ConcurrentHashMap taskInstanceRetryCheckList, - ConcurrentHashMap processInstanceExecMaps, - int stateCheckIntervalSecs) { + /** + * master exec service + */ + private MasterExecService masterExecService; + + public StateWheelExecuteThread( + MasterExecService masterExecService, + ProcessService processService, + ConcurrentHashMap startProcessFailedMap, + ConcurrentHashMap processInstanceTimeoutCheckList, + ConcurrentHashMap taskInstanceTimeoutCheckList, + ConcurrentHashMap taskInstanceRetryCheckList, + ConcurrentHashMap processInstanceExecMaps, + int stateCheckIntervalSecs) { + this.masterExecService = masterExecService; this.processService = processService; + this.startProcessFailedMap = startProcessFailedMap; this.processInstanceTimeoutCheckList = processInstanceTimeoutCheckList; this.taskInstanceTimeoutCheckList = taskInstanceTimeoutCheckList; this.taskInstanceRetryCheckList = taskInstanceRetryCheckList; @@ -71,6 +86,7 @@ public class StateWheelExecuteThread extends Thread { logger.info("state wheel thread start"); while (Stopper.isRunning()) { try { + check4StartProcessFailed(); checkTask4Timeout(); checkTask4Retry(); checkProcess4Timeout(); @@ -176,4 +192,12 @@ public class StateWheelExecuteThread extends Thread { workflowExecuteThread.addStateEvent(stateEvent); } + private void check4StartProcessFailed() { + if (startProcessFailedMap.isEmpty()) { + return; + } + for (WorkflowExecuteThread workflowExecuteThread : this.startProcessFailedMap.values()) { + masterExecService.execute(workflowExecuteThread); + } + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 92862ae619..5988839a5c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -57,6 +57,8 @@ import org.apache.dolphinscheduler.remote.command.HostUpdateCommand; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor; import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; @@ -103,6 +105,11 @@ public class WorkflowExecuteThread implements Runnable { */ private final Map activeTaskProcessorMaps = new ConcurrentHashMap<>(); + /** + * handle task event + */ + private TaskResponseService taskResponseService; + /** * process instance */ @@ -205,6 +212,7 @@ public class WorkflowExecuteThread implements Runnable { * @param nettyExecutorManager nettyExecutorManager */ public WorkflowExecuteThread(ProcessInstance processInstance + , TaskResponseService taskResponseService , ProcessService processService , NettyExecutorManager nettyExecutorManager , ProcessAlertManager processAlertManager @@ -212,7 +220,7 @@ public class WorkflowExecuteThread implements Runnable { , ConcurrentHashMap taskTimeoutCheckList , ConcurrentHashMap taskRetryCheckList) { this.processService = processService; - + this.taskResponseService = taskResponseService; this.processInstance = processInstance; this.masterConfig = masterConfig; this.nettyExecutorManager = nettyExecutorManager; @@ -1249,13 +1257,12 @@ public class WorkflowExecuteThread implements Runnable { } ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskId); taskProcessor.action(TaskAction.STOP); - if (taskProcessor.taskState().typeIsFinished()) { - StateEvent stateEvent = new StateEvent(); - stateEvent.setType(StateEventType.TASK_STATE_CHANGE); - stateEvent.setProcessInstanceId(this.processInstance.getId()); - stateEvent.setTaskInstanceId(taskInstance.getId()); - stateEvent.setExecutionStatus(taskProcessor.taskState()); - this.addStateEvent(stateEvent); + if (taskProcessor != null && taskProcessor.taskState().typeIsFinished()) { + TaskResponseEvent taskResponseEvent = TaskResponseEvent.newActionStop( + taskProcessor.taskState(), + taskInstance.getId(), + this.processInstance.getId()); + taskResponseService.addResponse(taskResponseEvent); } } } @@ -1420,4 +1427,8 @@ public class WorkflowExecuteThread implements Runnable { TaskDependType depNodeType) throws Exception { return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType); } + + public Map getActiveTaskProcessorMaps() { + return activeTaskProcessorMaps; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 5532477568..4446485b6c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -82,6 +82,13 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); + /** + * persist task + * + * @return + */ + protected abstract boolean persistTask(TaskAction taskAction); + /** * pause task, common tasks donot need this. * @@ -102,6 +109,16 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { */ protected abstract boolean taskTimeout(); + /** + * persist + * + * @return + */ + @Override + public boolean persist(TaskAction taskAction) { + return persistTask(taskAction); + } + @Override public void run() { } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index 23988f9930..14bb3afd56 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -87,6 +87,24 @@ public class CommonTaskProcessor extends BaseTaskProcessor { return true; } + @Override + protected boolean persistTask(TaskAction taskAction) { + switch (taskAction) { + case STOP: + if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) { + return true; + } + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setEndTime(new Date()); + processService.updateTaskInstance(taskInstance); + return true; + default: + logger.error("unknown task action: {}", taskAction.toString()); + + } + return false; + } + /** * common task cannot be paused */ @@ -154,7 +172,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor { if (StringUtils.isBlank(taskInstance.getHost())) { taskInstance.setState(ExecutionStatus.KILL); taskInstance.setEndTime(new Date()); - processService.updateTaskInstance(taskInstance); return true; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java index 7c593b0f30..584e484123 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java @@ -133,11 +133,27 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { return true; } + @Override + protected boolean persistTask(TaskAction taskAction) { + switch (taskAction) { + case STOP: + if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) { + return true; + } + this.taskInstance.setState(ExecutionStatus.KILL); + this.taskInstance.setEndTime(new Date()); + processService.saveTaskInstance(taskInstance); + return true; + default: + logger.error("unknown task action: {}", taskAction.toString()); + } + return false; + } + @Override protected boolean killTask() { this.taskInstance.setState(ExecutionStatus.KILL); this.taskInstance.setEndTime(new Date()); - processService.saveTaskInstance(taskInstance); return true; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java index b26e641118..0f84a5f663 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java @@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.DependentExecute; import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.ArrayList; import java.util.Date; @@ -155,11 +154,28 @@ public class DependentTaskProcessor extends BaseTaskProcessor { return true; } + @Override + protected boolean persistTask(TaskAction taskAction) { + switch (taskAction) { + case STOP: + if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) { + return true; + } + this.taskInstance.setState(ExecutionStatus.KILL); + this.taskInstance.setEndTime(new Date()); + processService.saveTaskInstance(taskInstance); + return true; + default: + logger.error("unknown task action: {}", taskAction.toString()); + + } + return false; + } + @Override protected boolean killTask() { this.taskInstance.setState(ExecutionStatus.KILL); this.taskInstance.setEndTime(new Date()); - processService.saveTaskInstance(taskInstance); return true; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java index b68dc221a9..aa1e490a09 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java @@ -26,6 +26,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; */ public interface ITaskProcessor { + boolean persist(TaskAction taskAction); + void run(); boolean action(TaskAction taskAction); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index e0cd3e8603..02f08a828c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -113,6 +113,17 @@ public class SubTaskProcessor extends BaseTaskProcessor { } } + @Override + protected boolean persistTask(TaskAction taskAction) { + switch (taskAction) { + case STOP: + return true; + default: + logger.error("unknown task action: {}", taskAction.toString()); + } + return false; + } + @Override protected boolean pauseTask() { pauseSubWorkFlow(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java index 8e9316fd2d..c48a711008 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java @@ -97,6 +97,24 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { } } + @Override + protected boolean persistTask(TaskAction taskAction) { + switch (taskAction) { + case STOP: + if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) { + return true; + } + this.taskInstance.setState(ExecutionStatus.KILL); + this.taskInstance.setEndTime(new Date()); + processService.saveTaskInstance(taskInstance); + return true; + default: + logger.error("unknown task action: {}", taskAction.toString()); + + } + return false; + } + @Override protected boolean pauseTask() { this.taskInstance.setState(ExecutionStatus.PAUSE); @@ -109,7 +127,6 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { protected boolean killTask() { this.taskInstance.setState(ExecutionStatus.KILL); this.taskInstance.setEndTime(new Date()); - processService.saveTaskInstance(taskInstance); return true; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index 4341b8c6f4..4f235eaf78 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -32,10 +32,12 @@ import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Pair; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.spi.task.AbstractTask; import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; @@ -118,6 +120,14 @@ public class TaskKillProcessor implements NettyRequestProcessor { try { Integer processId = taskExecutionContext.getProcessId(); if (processId.equals(0)) { + TaskExecuteThread taskExecuteThread = workerManager.getTaskExecuteThread(taskInstanceId); + if (null != taskExecuteThread) { + AbstractTask task = taskExecuteThread.getTask(); + if (task != null) { + task.cancelApplication(true); + logger.info("kill task by cancelApplication, task id:{}", taskInstanceId); + } + } workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId); @@ -165,6 +175,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskKillResponseCommand.setHost(taskExecutionContext.getHost()); taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId()); + taskKillResponseCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId()); } return taskKillResponseCommand; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 0b18dcfce1..4b9f99ca39 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -389,4 +389,8 @@ public class TaskExecuteThread implements Runnable, Delayed { } taskExecutionContext.setParamsMap(paramsMap); } + + public AbstractTask getTask() { + return task; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java new file mode 100644 index 0000000000..b98024674b --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +public class WorkerExecService { + /** + * logger of WorkerExecService + */ + private static final Logger logger = LoggerFactory.getLogger(WorkerExecService.class); + + private final ListeningExecutorService listeningExecutorService; + + /** + * thread executor service + */ + private final ExecutorService execService; + + /** + * running task + */ + private final ConcurrentHashMap taskExecuteThreadMap; + + public WorkerExecService(ExecutorService execService, ConcurrentHashMap taskExecuteThreadMap) { + this.execService = execService; + this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService); + this.taskExecuteThreadMap = taskExecuteThreadMap; + } + + public void submit(TaskExecuteThread taskExecuteThread) { + taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread); + ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread); + FutureCallback futureCallback = new FutureCallback() { + @Override + public void onSuccess(Object o) { + taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId()); + } + + @Override + public void onFailure(Throwable throwable) { + logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}", taskExecuteThread.getTaskExecutionContext().getProcessInstanceId() + , taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), throwable); + taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId()); + } + }; + Futures.addCallback(future, futureCallback, this.listeningExecutorService); + } + + /** + * get thread pool queue size + * + * @return queue size + */ + public int getThreadPoolQueueSize() { + return ((ThreadPoolExecutor) this.execService).getQueue().size(); + } + +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java index 4f68166ebd..0deab9e974 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java @@ -31,9 +31,8 @@ import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +51,11 @@ public class WorkerManagerThread implements Runnable { */ private final DelayQueue workerExecuteQueue = new DelayQueue<>(); + /** + * running task + */ + private final ConcurrentHashMap taskExecuteThreadMap = new ConcurrentHashMap<>(); + /** * worker config */ @@ -60,7 +64,7 @@ public class WorkerManagerThread implements Runnable { /** * thread executor service */ - private final ExecutorService workerExecService; + private final WorkerExecService workerExecService; /** * task callback service @@ -69,10 +73,17 @@ public class WorkerManagerThread implements Runnable { public WorkerManagerThread() { this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); - this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads()); + this.workerExecService = new WorkerExecService( + ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads()), + taskExecuteThreadMap + ); this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); } + public TaskExecuteThread getTaskExecuteThread(Integer taskInstanceId) { + return this.taskExecuteThreadMap.get(taskInstanceId); + } + /** * get delay queue size * @@ -88,7 +99,7 @@ public class WorkerManagerThread implements Runnable { * @return queue size */ public int getThreadPoolQueueSize() { - return ((ThreadPoolExecutor) workerExecService).getQueue().size(); + return this.workerExecService.getThreadPoolQueueSize(); } /** diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java index bf527d22b3..35520c5d99 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java @@ -102,7 +102,7 @@ public class WorkflowExecuteThreadTest { ConcurrentHashMap taskTimeoutCheckList = new ConcurrentHashMap<>(); ConcurrentHashMap taskRetryCheckList = new ConcurrentHashMap<>(); - workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList, taskRetryCheckList)); + workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, null,processService, null, null, config, taskTimeoutCheckList, taskRetryCheckList)); // prepareProcess init dag Field dag = WorkflowExecuteThread.class.getDeclaredField("dag"); dag.setAccessible(true); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java index c7f047569e..8bef045f5a 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java @@ -20,19 +20,26 @@ package org.apache.dolphinscheduler.server.master.processor; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.ArrayList; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import io.netty.channel.Channel; /** * task response processor test */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({SpringApplicationContext.class}) public class TaskKillResponseProcessorTest { private TaskKillResponseProcessor taskKillResponseProcessor; @@ -41,8 +48,14 @@ public class TaskKillResponseProcessorTest { private Channel channel; + private TaskResponseService taskResponseService; + @Before public void before() { + PowerMockito.mockStatic(SpringApplicationContext.class); + + taskResponseService = PowerMockito.mock(TaskResponseService.class); + PowerMockito.when(SpringApplicationContext.getBean(TaskResponseService.class)).thenReturn(taskResponseService); taskKillResponseProcessor = new TaskKillResponseProcessor(); channel = PowerMockito.mock(Channel.class); taskKillResponseCommand = new TaskKillResponseCommand();