From 49979c658e3b9b9468b1a8332ac6b81290b1fd3f Mon Sep 17 00:00:00 2001 From: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com> Date: Tue, 31 May 2022 11:49:54 +0800 Subject: [PATCH] [Fix-8828] [Master] Assign tasks to worker optimization (#9919) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix 9584 * master recall * fix ut * update logger * update delay queue * fix ut * remove sleep Co-authored-by: 进勇 Co-authored-by: JinyLeeChina --- .../dolphinscheduler/common/enums/Event.java | 2 +- .../common/enums/StateEvent.java | 2 +- .../common/utils/HeartBeat.java | 15 ++-- .../server/master/MasterServer.java | 5 ++ .../master/dispatch/ExecutorDispatcher.java | 25 +++--- .../executor/NettyExecutorManager.java | 32 +++---- .../dispatch/host/LowerWeightHostManager.java | 2 +- .../dispatch/host/assign/HostWeight.java | 10 ++- .../host/assign/LowerWeightRoundRobin.java | 22 ++++- .../master/processor/TaskRecallProcessor.java | 62 +++++++++++++ .../master/processor/queue/TaskEvent.java | 10 +++ .../processor/queue/TaskExecuteThread.java | 29 +++++- .../master/runner/WorkflowExecuteThread.java | 10 +++ .../master/runner/task/BaseTaskProcessor.java | 11 +++ .../runner/task/BlockingTaskProcessor.java | 5 ++ .../runner/task/CommonTaskProcessor.java | 11 ++- .../runner/task/ConditionTaskProcessor.java | 5 ++ .../runner/task/DependentTaskProcessor.java | 5 ++ .../master/runner/task/SubTaskProcessor.java | 5 ++ .../runner/task/SwitchTaskProcessor.java | 5 ++ .../server/master/runner/task/TaskAction.java | 3 +- .../assign/LowerWeightRoundRobinTest.java | 23 ++--- .../remote/command/CommandType.java | 10 +++ .../remote/command/TaskRecallAckCommand.java | 74 +++++++++++++++ .../remote/command/TaskRecallCommand.java | 90 +++++++++++++++++++ .../server/worker/WorkerServer.java | 6 +- .../server/worker/cache/ResponseCache.java | 21 ++++- .../worker/processor/TaskCallbackService.java | 18 ++++ .../processor/TaskExecuteProcessor.java | 10 +-- .../processor/TaskRecallAckProcessor.java | 58 ++++++++++++ .../runner/RetryReportTaskStatusThread.java | 8 ++ .../worker/runner/WorkerManagerThread.java | 41 ++++++--- 32 files changed, 554 insertions(+), 81 deletions(-) create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java create mode 100644 dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java index 78b036f037..26c3a3beab 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java @@ -22,5 +22,5 @@ public enum Event { DELAY, RUNNING, RESULT, - ; + WORKER_REJECT } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java index 54c7835f0b..405df09d3e 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java @@ -97,7 +97,7 @@ public class StateEvent { public String toString() { return "State Event :" + "key: " + key - + " type: " + type.toString() + + " type: " + type + " executeStatus: " + executionStatus + " task instance id: " + taskInstanceId + " process instance id: " + processInstanceId diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java index d28cd3db08..b4df08f4b7 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java @@ -25,7 +25,6 @@ import org.slf4j.LoggerFactory; public class HeartBeat { private static final Logger logger = LoggerFactory.getLogger(HeartBeat.class); - public static final String COMMA = ","; private long startupTime; private long reportTime; @@ -205,18 +204,18 @@ public class HeartBeat { this.updateServerState(); StringBuilder builder = new StringBuilder(100); - builder.append(cpuUsage).append(COMMA); - builder.append(memoryUsage).append(COMMA); - builder.append(loadAverage).append(COMMA); + builder.append(cpuUsage).append(Constants.COMMA); + builder.append(memoryUsage).append(Constants.COMMA); + builder.append(loadAverage).append(Constants.COMMA); builder.append(availablePhysicalMemorySize).append(Constants.COMMA); builder.append(maxCpuloadAvg).append(Constants.COMMA); builder.append(reservedMemory).append(Constants.COMMA); builder.append(startupTime).append(Constants.COMMA); builder.append(reportTime).append(Constants.COMMA); - builder.append(serverStatus).append(COMMA); - builder.append(processId).append(COMMA); - builder.append(workerHostWeight).append(COMMA); - builder.append(workerExecThreadCount).append(COMMA); + builder.append(serverStatus).append(Constants.COMMA); + builder.append(processId).append(Constants.COMMA); + builder.append(workerHostWeight).append(Constants.COMMA); + builder.append(workerExecThreadCount).append(Constants.COMMA); builder.append(workerWaitingTaskCount); return builder.toString(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 6ab0d4e51a..ad4d02e2e9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; +import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread; @@ -96,6 +97,9 @@ public class MasterServer implements IStoppable { @Autowired private TaskKillResponseProcessor taskKillResponseProcessor; + @Autowired + private TaskRecallProcessor taskRecallProcessor; + @Autowired private EventExecuteService eventExecuteService; @@ -126,6 +130,7 @@ public class MasterServer implements IStoppable { this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor); // logger server this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java index 7c94144af8..2885c5a6be 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.server.master.dispatch; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; @@ -29,6 +31,8 @@ import org.apache.commons.lang.StringUtils; import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -39,6 +43,8 @@ import org.springframework.stereotype.Service; @Service public class ExecutorDispatcher implements InitializingBean { + private static final Logger logger = LoggerFactory.getLogger(ExecutorDispatcher.class); + /** * netty executor manager */ @@ -71,30 +77,23 @@ public class ExecutorDispatcher implements InitializingBean { * @throws ExecuteException if error throws ExecuteException */ public Boolean dispatch(final ExecutionContext context) throws ExecuteException { - /** - * get executor manager - */ + // get executor manager ExecutorManager executorManager = this.executorManagers.get(context.getExecutorType()); if (executorManager == null) { throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType()); } - /** - * host select - */ - + // host select Host host = hostManager.select(context); if (StringUtils.isEmpty(host.getAddress())) { - throw new ExecuteException(String.format("fail to execute : %s due to no suitable worker, " - + "current task needs worker group %s to execute", - context.getCommand(),context.getWorkerGroup())); + logger.warn("fail to execute : {} due to no suitable worker, current task needs worker group {} to execute", + context.getCommand(), context.getWorkerGroup()); + return false; } context.setHost(host); executorManager.beforeExecute(context); try { - /** - * task execute - */ + // task execute return executorManager.execute(context); } finally { executorManager.afterExecute(context); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index 0ba24e287d..82f3416569 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; @@ -29,6 +30,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; +import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.commons.collections.CollectionUtils; @@ -68,6 +70,9 @@ public class NettyExecutorManager extends AbstractExecutorManager { @Autowired private TaskExecuteResponseProcessor taskExecuteResponseProcessor; + @Autowired + private TaskRecallProcessor taskRecallProcessor; + /** * netty remote client */ @@ -86,6 +91,7 @@ public class NettyExecutorManager extends AbstractExecutorManager { this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor); this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor); this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor); + this.nettyRemotingClient.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor); } /** @@ -97,25 +103,13 @@ public class NettyExecutorManager extends AbstractExecutorManager { */ @Override public Boolean execute(ExecutionContext context) throws ExecuteException { - - /** - * all nodes - */ + // all nodes Set allNodes = getAllNodes(context); - - /** - * fail nodes - */ + // fail nodes Set failNodeSet = new HashSet<>(); - - /** - * build command accord executeContext - */ + // build command accord executeContext Command command = context.getCommand(); - - /** - * execute task host - */ + // execute task host Host host = context.getHost(); boolean success = false; while (!success) { @@ -158,9 +152,7 @@ public class NettyExecutorManager extends AbstractExecutorManager { * @throws ExecuteException if error throws ExecuteException */ public void doExecute(final Host host, final Command command) throws ExecuteException { - /** - * retry count,default retry 3 - */ + // retry count,default retry 3 int retryCount = 3; boolean success = false; do { @@ -170,7 +162,7 @@ public class NettyExecutorManager extends AbstractExecutorManager { } catch (Exception ex) { logger.error(String.format("send command : %s to %s error", command, host), ex); retryCount--; - ThreadUtils.sleep(100); + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } } while (retryCount >= 0 && !success); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java index 646d770b01..2b8fe7b93a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java @@ -178,7 +178,7 @@ public class LowerWeightHostManager extends CommonHostManager { return Optional.of( new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup), heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(), - heartBeat.getStartupTime())); + heartBeat.getWorkerWaitingTaskCount(), heartBeat.getStartupTime())); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java index 9d7855f054..a441582235 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java @@ -37,10 +37,13 @@ public class HostWeight { private double currentWeight; - public HostWeight(HostWorker hostWorker, double cpu, double memory, double loadAverage, long startTime) { + private final int waitingTaskCount; + + public HostWeight(HostWorker hostWorker, double cpu, double memory, double loadAverage, int waitingTaskCount, long startTime) { this.hostWorker = hostWorker; this.weight = calculateWeight(cpu, memory, loadAverage, startTime); this.currentWeight = this.weight; + this.waitingTaskCount = waitingTaskCount; } public double getWeight() { @@ -63,12 +66,17 @@ public class HostWeight { return (Host)hostWorker; } + public int getWaitingTaskCount() { + return waitingTaskCount; + } + @Override public String toString() { return "HostWeight{" + "hostWorker=" + hostWorker + ", weight=" + weight + ", currentWeight=" + currentWeight + + ", waitingTaskCount=" + waitingTaskCount + '}'; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java index ea55785182..f099d81473 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java @@ -18,6 +18,11 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign; import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +import com.google.common.collect.Lists; /** * lower weight round robin @@ -35,7 +40,8 @@ public class LowerWeightRoundRobin extends AbstractSelector { double totalWeight = 0; double lowWeight = 0; HostWeight lowerNode = null; - for (HostWeight hostWeight : sources) { + List weights = canAssignTaskHost(sources); + for (HostWeight hostWeight : weights) { totalWeight += hostWeight.getWeight(); hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight()); if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) { @@ -45,7 +51,21 @@ public class LowerWeightRoundRobin extends AbstractSelector { } lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight); return lowerNode; + } + private List canAssignTaskHost(Collection sources) { + List zeroWaitingTask = sources.stream().filter(h -> h.getWaitingTaskCount() == 0).collect(Collectors.toList()); + if (!zeroWaitingTask.isEmpty()) { + return zeroWaitingTask; + } + HostWeight hostWeight = sources.stream().min(Comparator.comparing(HostWeight::getWaitingTaskCount)).get(); + List waitingTask = Lists.newArrayList(hostWeight); + List equalWaitingTask = sources.stream().filter(h -> !h.getHost().equals(hostWeight.getHost()) && h.getWaitingTaskCount() == hostWeight.getWaitingTaskCount()) + .collect(Collectors.toList()); + if (!equalWaitingTask.isEmpty()) { + waitingTask.addAll(equalWaitingTask); + } + return waitingTask; } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java new file mode 100644 index 0000000000..2d94d026fa --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.processor; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.TaskRecallCommand; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.google.common.base.Preconditions; + +import io.netty.channel.Channel; + +/** + * task recall processor + */ +@Component +public class TaskRecallProcessor implements NettyRequestProcessor { + + private final Logger logger = LoggerFactory.getLogger(TaskRecallProcessor.class); + + @Autowired + private TaskEventService taskEventService; + + /** + * task ack process + * + * @param channel channel channel + * @param command command TaskExecuteAckCommand + */ + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.TASK_RECALL == command.getType(), String.format("invalid command type : %s", command.getType())); + TaskRecallCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRecallCommand.class); + logger.info("taskRecallCommand : {}", recallCommand); + TaskEvent taskEvent = TaskEvent.newRecall(recallCommand, channel); + taskEventService.addEvent(taskEvent); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java index 865eee53a5..8227793c9f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; +import org.apache.dolphinscheduler.remote.command.TaskRecallCommand; import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import java.util.Date; @@ -135,6 +136,15 @@ public class TaskEvent { return event; } + public static TaskEvent newRecall(TaskRecallCommand command, Channel channel) { + TaskEvent event = new TaskEvent(); + event.setTaskInstanceId(command.getTaskInstanceId()); + event.setProcessInstanceId(command.getProcessInstanceId()); + event.setChannel(channel); + event.setEvent(Event.WORKER_REJECT); + return event; + } + public String getVarPool() { return varPool; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java index 47b190e246..b2778ae542 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java @@ -24,9 +24,12 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; +import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor; +import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -133,6 +136,9 @@ public class TaskExecuteThread { case RESULT: handleResultEvent(taskEvent, taskInstance); break; + case WORKER_REJECT: + handleWorkerRejectEvent(taskEvent.getChannel(), taskInstance, workflowExecuteThread); + break; default: throw new IllegalArgumentException("invalid event type : " + event); } @@ -185,7 +191,7 @@ public class TaskExecuteThread { TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command()); } catch (Exception e) { - logger.error("worker ack master error", e); + logger.error("handle worker ack master error", e); TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1); channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command()); } @@ -216,9 +222,28 @@ public class TaskExecuteThread { TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command()); } catch (Exception e) { - logger.error("worker response master error", e); + logger.error("handle worker response master error", e); TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1); channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command()); } } + + /** + * handle result event + */ + private void handleWorkerRejectEvent(Channel channel, TaskInstance taskInstance, WorkflowExecuteThread executeThread) { + try { + if (executeThread != null) { + executeThread.resubmit(taskInstance.getTaskCode()); + } + if (channel != null) { + TaskRecallAckCommand taskRecallAckCommand = new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskInstance.getId()); + channel.writeAndFlush(taskRecallAckCommand.convert2Command()); + } + } catch (Exception e) { + logger.error("handle worker reject error", e); + TaskRecallAckCommand taskRecallAckCommand = new TaskRecallAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance.getId()); + channel.writeAndFlush(taskRecallAckCommand.convert2Command()); + } + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index a40b7e5b27..ed283b1a89 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner; +import net.bytebuddy.implementation.bytecode.Throw; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; @@ -1974,6 +1975,15 @@ public class WorkflowExecuteThread { } } + public void resubmit(long taskCode) throws Exception { + ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskCode); + if (taskProcessor != null) { + taskProcessor.action(TaskAction.RESUBMIT); + logger.debug("RESUBMIT: task code:{}", taskCode); + } else { + throw new Exception("resubmit error, taskProcessor is null, task code: " + taskCode); + } + } private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map cmdParam) { // get start params from command param Map startParamMap = new HashMap<>(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 2ca0d6cb19..329f72d1b1 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -159,6 +159,11 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { */ protected abstract boolean submitTask(); + /* + * resubmit task + */ + protected abstract boolean resubmitTask(); + /** * run task */ @@ -188,6 +193,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { return run(); case DISPATCH: return dispatch(); + case RESUBMIT: + return resubmit(); default: logger.error("unknown task action: {}", taskAction); } @@ -196,6 +203,10 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { return false; } + protected boolean resubmit() { + return resubmitTask(); + } + protected boolean submit() { return submitTask(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java index 1fa6b28625..be7dbe103f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java @@ -126,6 +126,11 @@ public class BlockingTaskProcessor extends BaseTaskProcessor { return true; } + @Override + protected boolean resubmitTask() { + return true; + } + @Override protected boolean dispatchTask() { return false; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index ffeb89a0d2..b1bf05dcd9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -71,6 +71,15 @@ public class CommonTaskProcessor extends BaseTaskProcessor { return true; } + @Override + protected boolean resubmitTask() { + if (this.taskInstance == null) { + return false; + } + setTaskExecutionLogger(); + return dispatchTask(); + } + @Override public boolean runTask() { return true; @@ -110,7 +119,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName()); return true; } - logger.info("task ready to submit: {}", taskInstance); + logger.debug("task ready to submit: {}", taskInstance.getName()); TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(), processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(), diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java index 4749e20f0f..9f441df52d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java @@ -83,6 +83,11 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { return true; } + @Override + protected boolean resubmitTask() { + return true; + } + @Override protected boolean dispatchTask() { return true; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java index d17f4b6419..c2da0b1b71 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java @@ -99,6 +99,11 @@ public class DependentTaskProcessor extends BaseTaskProcessor { return true; } + @Override + protected boolean resubmitTask() { + return true; + } + @Override protected boolean dispatchTask() { return true; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index 747c3dd77a..da38a11a9e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -92,6 +92,11 @@ public class SubTaskProcessor extends BaseTaskProcessor { return true; } + @Override + protected boolean resubmitTask() { + return true; + } + @Override protected boolean dispatchTask() { return true; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java index 444d41622c..047df02336 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java @@ -90,6 +90,11 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { return true; } + @Override + protected boolean resubmitTask() { + return true; + } + @Override protected boolean dispatchTask() { return true; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java index 9044945258..d292cb1d34 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java @@ -26,5 +26,6 @@ public enum TaskAction { TIMEOUT, SUBMIT, RUN, - DISPATCH + DISPATCH, + RESUBMIT } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java index f822f04d97..fcb8fbc541 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java @@ -28,33 +28,36 @@ public class LowerWeightRoundRobinTest { @Test public void testSelect() { Collection sources = new ArrayList<>(); - sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 8 * 1000)); - sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.56, 3.24, System.currentTimeMillis() - 60 * 5 * 1000)); - sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.80, 3.15, System.currentTimeMillis() - 60 * 2 * 1000)); + sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, 1, System.currentTimeMillis() - 60 * 8 * 1000)); + sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.56, 3.24, 2, System.currentTimeMillis() - 60 * 5 * 1000)); + sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.80, 3.15, 1, System.currentTimeMillis() - 60 * 2 * 1000)); LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin(); HostWeight result; result = roundRobin.select(sources); Assert.assertEquals("192.158.2.1", result.getHost().getIp()); result = roundRobin.select(sources); - Assert.assertEquals("192.158.2.2", result.getHost().getIp()); + Assert.assertEquals("192.158.2.1", result.getHost().getIp()); + Assert.assertEquals("192.158.2.1", result.getHost().getIp()); result = roundRobin.select(sources); Assert.assertEquals("192.158.2.1", result.getHost().getIp()); result = roundRobin.select(sources); - Assert.assertEquals("192.158.2.2", result.getHost().getIp()); + Assert.assertEquals("192.158.2.3", result.getHost().getIp()); + Assert.assertEquals("192.158.2.3", result.getHost().getIp()); result = roundRobin.select(sources); Assert.assertEquals("192.158.2.1", result.getHost().getIp()); result = roundRobin.select(sources); - Assert.assertEquals("192.158.2.2", result.getHost().getIp()); + Assert.assertEquals("192.158.2.1", result.getHost().getIp()); + Assert.assertEquals("192.158.2.1", result.getHost().getIp()); } @Test public void testWarmUpSelect() { Collection sources = new ArrayList<>(); - sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 8 * 1000)); - sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 5 * 1000)); - sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 3 * 1000)); - sources.add(new HostWeight(HostWorker.of("192.158.2.4:33", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 11 * 1000)); + sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 8 * 1000)); + sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 5 * 1000)); + sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 3 * 1000)); + sources.add(new HostWeight(HostWorker.of("192.158.2.4:33", 100, "default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 11 * 1000)); LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin(); HostWeight result; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index 5718872bf5..e540efddfc 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -98,6 +98,16 @@ public enum CommandType { */ TASK_KILL_RESPONSE, + /** + * task recall + */ + TASK_RECALL, + + /** + * task recall ack + */ + TASK_RECALL_ACK, + /** * HEART_BEAT */ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java new file mode 100644 index 0000000000..2221a6c09d --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; + +import java.io.Serializable; + +/** + * task recall ack command + */ +public class TaskRecallAckCommand implements Serializable { + + private int taskInstanceId; + private int status; + + public TaskRecallAckCommand() { + super(); + } + + public TaskRecallAckCommand(int status, int taskInstanceId) { + this.status = status; + this.taskInstanceId = taskInstanceId; + } + + public int getTaskInstanceId() { + return taskInstanceId; + } + + public void setTaskInstanceId(int taskInstanceId) { + this.taskInstanceId = taskInstanceId; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + /** + * package response command + * + * @return command + */ + public Command convert2Command() { + Command command = new Command(); + command.setType(CommandType.TASK_RECALL_ACK); + byte[] body = JSONUtils.toJsonByteArray(this); + command.setBody(body); + return command; + } + + @Override + public String toString() { + return "TaskRecallAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}'; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java new file mode 100644 index 0000000000..3d33d8c363 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.common.enums.Event; +import org.apache.dolphinscheduler.common.utils.JSONUtils; + +import java.io.Serializable; + +/** + * kill task recall command + */ +public class TaskRecallCommand implements Serializable { + + /** + * taskInstanceId + */ + private int taskInstanceId; + + /** + * host + */ + private String host; + + /** + * process instance id + */ + private int processInstanceId; + + public int getTaskInstanceId() { + return taskInstanceId; + } + + public void setTaskInstanceId(int taskInstanceId) { + this.taskInstanceId = taskInstanceId; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getProcessInstanceId() { + return processInstanceId; + } + + public void setProcessInstanceId(int processInstanceId) { + this.processInstanceId = processInstanceId; + } + + /** + * package request command + * + * @return command + */ + public Command convert2Command() { + Command command = new Command(); + command.setType(CommandType.TASK_RECALL); + byte[] body = JSONUtils.toJsonByteArray(this); + command.setBody(body); + return command; + } + + @Override + public String toString() { + return "TaskRecallCommand{" + + "taskInstanceId=" + taskInstanceId + + ", host='" + host + '\'' + + ", processInstanceId=" + processInstanceId + + '}'; + } +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 8ac528dee4..009ab3996c 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; +import org.apache.dolphinscheduler.server.worker.processor.TaskRecallAckProcessor; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; @@ -110,6 +111,9 @@ public class WorkerServer implements IStoppable { @Autowired private TaskKillProcessor taskKillProcessor; + @Autowired + private TaskRecallAckProcessor taskRecallAckProcessor; + @Autowired private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor; @@ -146,7 +150,7 @@ public class WorkerServer implements IStoppable { this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor); this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor); - + this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL_ACK, taskRecallAckProcessor); // logger server this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor); this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java index f28990b152..fb3c84da68 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java @@ -37,8 +37,9 @@ public class ResponseCache { return instance; } - private Map runningCache = new ConcurrentHashMap<>(); - private Map responseCache = new ConcurrentHashMap<>(); + private final Map runningCache = new ConcurrentHashMap<>(); + private final Map responseCache = new ConcurrentHashMap<>(); + private final Map recallCache = new ConcurrentHashMap<>(); /** * cache response @@ -55,11 +56,27 @@ public class ResponseCache { case RESULT: responseCache.put(taskInstanceId, command); break; + case WORKER_REJECT: + recallCache.put(taskInstanceId, command); + break; default: throw new IllegalArgumentException("invalid event type : " + event); } } + /** + * recall response cache + * + * @param taskInstanceId taskInstanceId + */ + public void removeRecallCache(Integer taskInstanceId) { + recallCache.remove(taskInstanceId); + } + + public Map getRecallCache() { + return recallCache; + } + /** * remove running cache * diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 3641de8453..1c3d21501a 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; +import org.apache.dolphinscheduler.remote.command.TaskRecallCommand; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel; import org.apache.dolphinscheduler.server.worker.cache.ResponseCache; @@ -222,6 +223,14 @@ public class TaskCallbackService { return taskKillResponseCommand; } + private TaskRecallCommand buildRecallCommand(TaskExecutionContext taskExecutionContext) { + TaskRecallCommand taskRecallCommand = new TaskRecallCommand(); + taskRecallCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + taskRecallCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId()); + taskRecallCommand.setHost(taskExecutionContext.getHost()); + return taskRecallCommand; + } + /** * send task execute running command * todo unified callback command @@ -257,4 +266,13 @@ public class TaskCallbackService { TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext); send(taskExecutionContext.getTaskInstanceId(), taskKillResponseCommand.convert2Command()); } + + /** + * send task execute response command + */ + public void sendRecallCommand(TaskExecutionContext taskExecutionContext) { + TaskRecallCommand taskRecallCommand = buildRecallCommand(taskExecutionContext); + ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), taskRecallCommand.convert2Command(), Event.WORKER_REJECT); + send(taskExecutionContext.getTaskInstanceId(), taskRecallCommand.convert2Command()); + } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index cd3d940f00..c7df9e7876 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -159,8 +159,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { } } - taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), - new NettyRemoteChannel(channel, command.getOpaque())); + taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); // delay task process long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L); @@ -174,10 +173,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { // submit task to manager boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager)); if (!offer) { - logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}", - workerManager.getDelayQueueSize(), taskExecutionContext.getTaskInstanceId()); - taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); - taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); + logger.warn("submit task to wait queue error, queue is full, queue size is {}, taskInstanceId: {}", + workerManager.getWaitSubmitQueueSize(), taskExecutionContext.getTaskInstanceId()); + taskCallbackService.sendRecallCommand(taskExecutionContext); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java new file mode 100644 index 0000000000..769024e0aa --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.processor; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.server.worker.cache.ResponseCache; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import com.google.common.base.Preconditions; + +import io.netty.channel.Channel; + +@Component +public class TaskRecallAckProcessor implements NettyRequestProcessor { + + private final Logger logger = LoggerFactory.getLogger(TaskRecallAckProcessor.class); + + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.TASK_RECALL_ACK == command.getType(), + String.format("invalid command type : %s", command.getType())); + + TaskRecallAckCommand taskRecallAckCommand = JSONUtils.parseObject(command.getBody(), TaskRecallAckCommand.class); + if (taskRecallAckCommand == null) { + return; + } + + if (taskRecallAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) { + ResponseCache.get().removeRecallCache(taskRecallAckCommand.getTaskInstanceId()); + logger.debug("removeRecallCache: task instance id:{}", taskRecallAckCommand.getTaskInstanceId()); + TaskCallbackService.remove(taskRecallAckCommand.getTaskInstanceId()); + logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskRecallAckCommand.getTaskInstanceId()); + } + } +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java index fc737ca1de..6e7d879ded 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java @@ -82,6 +82,14 @@ public class RetryReportTaskStatusThread implements Runnable { taskCallbackService.send(taskInstanceId, responseCommand); } } + if (!instance.getRecallCache().isEmpty()) { + Map recallCache = instance.getRecallCache(); + for (Map.Entry entry : recallCache.entrySet()) { + Integer taskInstanceId = entry.getKey(); + Command responseCommand = entry.getValue(); + taskCallbackService.send(taskInstanceId, responseCommand); + } + } } catch (Exception e) { logger.warn("retry report task status error", e); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java index 60f752401d..d9531b58bf 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker.runner; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.storage.StorageOperate; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; @@ -26,10 +27,9 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +47,7 @@ public class WorkerManagerThread implements Runnable { /** * task queue */ - private final DelayQueue workerExecuteQueue = new DelayQueue<>(); + private final BlockingQueue waitSubmitQueue; @Autowired(required = false) private StorageOperate storageOperate; @@ -63,12 +63,16 @@ public class WorkerManagerThread implements Runnable { @Autowired private TaskCallbackService taskCallbackService; + private volatile int workerExecThreads; + /** * running task */ private final ConcurrentHashMap taskExecuteThreadMap = new ConcurrentHashMap<>(); public WorkerManagerThread(WorkerConfig workerConfig) { + workerExecThreads = workerConfig.getExecThreads(); + this.waitSubmitQueue = new DelayQueue<>(); workerExecService = new WorkerExecService( ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()), taskExecuteThreadMap @@ -80,12 +84,12 @@ public class WorkerManagerThread implements Runnable { } /** - * get delay queue size + * get wait submit queue size * * @return queue size */ - public int getDelayQueueSize() { - return workerExecuteQueue.size(); + public int getWaitSubmitQueueSize() { + return waitSubmitQueue.size(); } /** @@ -102,9 +106,9 @@ public class WorkerManagerThread implements Runnable { * then send Response to Master, update the execution status of task instance */ public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) { - workerExecuteQueue.stream() + waitSubmitQueue.stream() .filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == taskInstanceId) - .forEach(workerExecuteQueue::remove); + .forEach(waitSubmitQueue::remove); sendTaskKillResponse(taskInstanceId); } @@ -127,7 +131,14 @@ public class WorkerManagerThread implements Runnable { * @return submit result */ public boolean offer(TaskExecuteThread taskExecuteThread) { - return workerExecuteQueue.offer(taskExecuteThread); + if (waitSubmitQueue.size() > workerExecThreads) { + // if waitSubmitQueue is full, it will wait 1s, then try add + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); + if (waitSubmitQueue.size() > workerExecThreads) { + return false; + } + } + return waitSubmitQueue.offer(taskExecuteThread); } public void start() { @@ -142,9 +153,15 @@ public class WorkerManagerThread implements Runnable { TaskExecuteThread taskExecuteThread; while (Stopper.isRunning()) { try { - taskExecuteThread = workerExecuteQueue.take(); - taskExecuteThread.setStorageOperate(storageOperate); - workerExecService.submit(taskExecuteThread); + if (this.getThreadPoolQueueSize() <= workerExecThreads) { + taskExecuteThread = waitSubmitQueue.take(); + taskExecuteThread.setStorageOperate(storageOperate); + workerExecService.submit(taskExecuteThread); + } else { + logger.info("Exec queue is full, waiting submit queue {}, waiting exec queue size {}", + this.getWaitSubmitQueueSize(), this.getThreadPoolQueueSize()); + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); + } } catch (Exception e) { logger.error("An unexpected interrupt is happened, " + "the exception will be ignored and this thread will continue to run", e);