From ffcb1c22e16107e387ceb21dc5a0faaef0783ae7 Mon Sep 17 00:00:00 2001 From: guohaozhang Date: Tue, 23 Feb 2021 10:10:11 +0800 Subject: [PATCH] [Improvement-#3735] Make task delayed execution more efficient (#4812) * [Improvement-3735] improve implementation of delay task execution * [Improvement][worker] delay task compatible with dev branch and fix test Co-authored-by: vanilla111 <1115690319@qq.com> --- .../server/worker/WorkerServer.java | 7 + .../processor/TaskExecuteProcessor.java | 48 +++-- .../worker/processor/TaskKillProcessor.java | 8 + .../worker/runner/TaskExecuteThread.java | 48 +++-- .../worker/runner/WorkerManagerThread.java | 143 +++++++++++++ .../processor/TaskCallbackServiceTest.java | 4 +- .../processor/TaskExecuteProcessorTest.java | 195 ++++++++++++++++++ .../runner/WorkerManagerThreadTest.java | 187 +++++++++++++++++ pom.xml | 2 + 9 files changed, 599 insertions(+), 43 deletions(-) create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 1f138f6acd..a267b5bf32 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread; +import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -84,6 +85,9 @@ public class WorkerServer { @Autowired private RetryReportTaskStatusThread retryReportTaskStatusThread; + @Autowired + private WorkerManagerThread workerManagerThread; + /** * worker server startup * @@ -119,6 +123,9 @@ public class WorkerServer { // worker registry this.workerRegistry.registry(); + // task execute manager + this.workerManagerThread.start(); + // retry report task status this.retryReportTaskStatusThread.start(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index fc8b33a488..3088080849 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.worker.processor; import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; @@ -38,12 +38,12 @@ import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCache import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; +import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.Date; import java.util.Optional; -import java.util.concurrent.ExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,11 +57,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class); - /** - * thread executor service - */ - private final ExecutorService workerExecService; - /** * worker config */ @@ -73,20 +68,25 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { private final TaskCallbackService taskCallbackService; /** - * alert client service + * alert client service */ private AlertClientService alertClientService; /** * taskExecutionContextCacheManager */ - private TaskExecutionContextCacheManager taskExecutionContextCacheManager; + private final TaskExecutionContextCacheManager taskExecutionContextCacheManager; + + /* + * task execute manager + */ + private final WorkerManagerThread workerManager; public TaskExecuteProcessor() { this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); - this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); + this.workerManager = SpringApplicationContext.getBean(WorkerManagerThread.class); } /** @@ -101,11 +101,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { } public TaskExecuteProcessor(AlertClientService alertClientService) { - this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); - this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); - this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()); - this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); - + this(); this.alertClientService = alertClientService; } @@ -140,9 +136,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { taskExecutionContext.getTaskInstanceId())); taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort())); - taskExecutionContext.setStartTime(new Date()); taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); - taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); // local execute path String execLocalPath = getExecLocalPath(taskExecutionContext); @@ -163,10 +157,23 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); + // delay task process + long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L); + if (remainTime > 0) { + logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION); + taskExecutionContext.setStartTime(null); + } else { + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); + taskExecutionContext.setStartTime(new Date()); + } + this.doAck(taskExecutionContext); - // submit task - workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService)); + // submit task to manager + if (!workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService))) { + logger.info("submit task to manager error, queue is full, queue size is {}", workerManager.getQueueSize()); + } } private void doAck(TaskExecutionContext taskExecutionContext) { @@ -178,6 +185,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { /** * build ack command + * * @param taskExecutionContext taskExecutionContext * @return TaskExecuteAckCommand */ @@ -209,4 +217,4 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); } -} \ No newline at end of file +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index b41c189eda..a3665b33e0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.log.LogClientService; @@ -69,10 +70,16 @@ public class TaskKillProcessor implements NettyRequestProcessor { */ private TaskExecutionContextCacheManager taskExecutionContextCacheManager; + /* + * task execute manager + */ + private final WorkerManagerThread workerManager; + public TaskKillProcessor() { this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); + this.workerManager = SpringApplicationContext.getBean(WorkerManagerThread.class); } /** @@ -110,6 +117,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); Integer processId = taskExecutionContext.getProcessId(); if (processId.equals(0)) { + workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId); taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId); return Pair.of(true, appIds); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 721656730d..05bd8065e8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.worker.runner; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; @@ -51,7 +50,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -62,7 +63,7 @@ import com.github.rholder.retry.RetryException; /** * task scheduler thread */ -public class TaskExecuteThread implements Runnable { +public class TaskExecuteThread implements Runnable, Delayed { /** * logger @@ -132,7 +133,6 @@ public class TaskExecuteThread implements Runnable { // task node TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class); - delayExecutionIfNeeded(); if (taskExecutionContext.getStartTime() == null) { taskExecutionContext.setStartTime(new Date()); } @@ -289,24 +289,6 @@ public class TaskExecuteThread implements Runnable { } } - /** - * delay execution if needed. - */ - private void delayExecutionIfNeeded() { - long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), - taskExecutionContext.getDelayTime() * 60L); - logger.info("delay execution time: {} s", remainTime < 0 ? 0 : remainTime); - if (remainTime > 0) { - try { - Thread.sleep(remainTime * Constants.SLEEP_TIME_MILLIS); - } catch (Exception e) { - logger.error("delay task execution failure, the task will be executed directly. process instance id:{}, task instance id:{}", - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId()); - } - } - } - /** * send an ack to change the status of the task. */ @@ -343,4 +325,26 @@ public class TaskExecuteThread implements Runnable { } return ackCommand; } -} \ No newline at end of file + + /** + * get current TaskExecutionContext + * @return TaskExecutionContext + */ + public TaskExecutionContext getTaskExecutionContext() { + return this.taskExecutionContext; + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), + taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS); + } + + @Override + public int compareTo(Delayed o) { + if (o == null) { + return 1; + } + return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java new file mode 100644 index 0000000000..073c9488ae --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import org.apache.dolphinscheduler.common.enums.Event; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; +import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import java.util.concurrent.DelayQueue; +import java.util.concurrent.ExecutorService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * Manage tasks + */ +@Component +public class WorkerManagerThread implements Runnable { + + private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class); + + /** + * task queue + */ + private final DelayQueue workerExecuteQueue = new DelayQueue<>(); + + /** + * worker config + */ + private final WorkerConfig workerConfig; + + /** + * thread executor service + */ + private final ExecutorService workerExecService; + + /** + * taskExecutionContextCacheManager + */ + private TaskExecutionContextCacheManager taskExecutionContextCacheManager; + + /** + * task callback service + */ + private final TaskCallbackService taskCallbackService; + + public WorkerManagerThread() { + this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); + this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); + this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads()); + this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); + } + + /** + * get queue size + * + * @return queue size + */ + public int getQueueSize() { + return workerExecuteQueue.size(); + } + + /** + * Kill tasks that have not been executed, like delay task + * then send Response to Master, update the execution status of task instance + */ + public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) { + workerExecuteQueue.stream() + .filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == taskInstanceId) + .forEach(workerExecuteQueue::remove); + sendTaskKillResponse(taskInstanceId); + } + + /** + * kill task before execute , like delay task + */ + private void sendTaskKillResponse(Integer taskInstanceId) { + TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); + if (taskExecutionContext == null) { + return; + } + TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId()); + responseCommand.setStatus(ExecutionStatus.KILL.getCode()); + ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT); + taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); + } + + /** + * submit task + * + * @param taskExecuteThread taskExecuteThread + * @return submit result + */ + public boolean offer(TaskExecuteThread taskExecuteThread) { + return workerExecuteQueue.offer(taskExecuteThread); + } + + public void start() { + Thread thread = new Thread(this, this.getClass().getName()); + thread.start(); + } + + @Override + public void run() { + Thread.currentThread().setName("Worker-Execute-Manager-Thread"); + TaskExecuteThread taskExecuteThread; + while (Stopper.isRunning()) { + try { + taskExecuteThread = workerExecuteQueue.take(); + workerExecService.submit(taskExecuteThread); + } catch (Exception e) { + logger.error("An unexpected interrupt is happened, " + + "the exception will be ignored and this thread will continue to run", e); + } + } + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index 5a5561d1bd..bdd723a4cd 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; +import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; @@ -79,7 +80,8 @@ import io.netty.channel.Channel; TaskResponseProcessor.class, TaskExecuteProcessor.class, CuratorZookeeperClient.class, - TaskExecutionContextCacheManagerImpl.class}) + TaskExecutionContextCacheManagerImpl.class, + WorkerManagerThread.class}) public class TaskCallbackServiceTest { @Autowired diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java new file mode 100644 index 0000000000..fef21519d3 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.processor; + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; +import org.apache.dolphinscheduler.remote.utils.ChannelUtils; +import org.apache.dolphinscheduler.remote.utils.JsonSerializer; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; +import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; +import org.apache.dolphinscheduler.service.alert.AlertClientService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import java.util.Date; +import java.util.concurrent.ExecutorService; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * test task execute processor + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class, + JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class}) +public class TaskExecuteProcessorTest { + + private TaskExecutionContext taskExecutionContext; + + private TaskCallbackService taskCallbackService; + + private ExecutorService workerExecService; + + private WorkerConfig workerConfig; + + private Command command; + + private Command ackCommand; + + private TaskExecuteRequestCommand taskRequestCommand; + + private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager; + + private AlertClientService alertClientService; + + private WorkerManagerThread workerManager; + + @Before + public void before() throws Exception { + // init task execution context + taskExecutionContext = getTaskExecutionContext(); + workerConfig = new WorkerConfig(); + workerConfig.setWorkerExecThreads(1); + workerConfig.setListenPort(1234); + command = new Command(); + command.setType(CommandType.TASK_EXECUTE_REQUEST); + ackCommand = new TaskExecuteAckCommand().convert2Command(); + taskRequestCommand = new TaskExecuteRequestCommand(); + alertClientService = PowerMockito.mock(AlertClientService.class); + workerExecService = PowerMockito.mock(ExecutorService.class); + PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class))) + .thenReturn(null); + + PowerMockito.mockStatic(ChannelUtils.class); + PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null); + + taskExecutionContextCacheManager = PowerMockito.mock(TaskExecutionContextCacheManagerImpl.class); + taskCallbackService = PowerMockito.mock(TaskCallbackService.class); + PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand); + + PowerMockito.mockStatic(SpringApplicationContext.class); + PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class)) + .thenReturn(taskCallbackService); + PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)) + .thenReturn(workerConfig); + PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)) + .thenReturn(null); + PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)) + .thenReturn(taskExecutionContextCacheManager); + + Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId())); + + workerManager = PowerMockito.mock(WorkerManagerThread.class); + PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService))).thenReturn(Boolean.TRUE); + + PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class)) + .thenReturn(workerManager); + + PowerMockito.mockStatic(ThreadUtils.class); + PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads())) + .thenReturn(workerExecService); + + PowerMockito.mockStatic(JsonSerializer.class); + PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskExecuteRequestCommand.class)) + .thenReturn(taskRequestCommand); + + PowerMockito.mockStatic(JSONUtils.class); + PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskExecuteRequestCommand.class)) + .thenReturn(taskRequestCommand); + PowerMockito.when(JSONUtils.parseObject(taskRequestCommand.getTaskExecutionContext(), TaskExecutionContext.class)) + .thenReturn(taskExecutionContext); + + PowerMockito.mockStatic(FileUtils.class); + PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(), + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId())) + .thenReturn(taskExecutionContext.getExecutePath()); + PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath()); + + SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(null, null, null, alertClientService); + PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments() + .thenReturn(simpleTaskExecuteThread); + } + + @Test + public void testNormalExecution() { + TaskExecuteProcessor processor = new TaskExecuteProcessor(); + processor.process(null, command); + + Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskExecutionContext.getCurrentExecutionStatus()); + } + + @Test + public void testDelayExecution() { + taskExecutionContext.setDelayTime(1); + TaskExecuteProcessor processor = new TaskExecuteProcessor(); + processor.process(null, command); + + Assert.assertEquals(ExecutionStatus.DELAY_EXECUTION, taskExecutionContext.getCurrentExecutionStatus()); + } + + public TaskExecutionContext getTaskExecutionContext() { + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setProcessId(12345); + taskExecutionContext.setProcessDefineId(1); + taskExecutionContext.setProcessInstanceId(1); + taskExecutionContext.setTaskInstanceId(1); + taskExecutionContext.setTaskType("sql"); + taskExecutionContext.setFirstSubmitTime(new Date()); + taskExecutionContext.setDelayTime(0); + taskExecutionContext.setLogPath("/tmp/test.log"); + taskExecutionContext.setHost("localhost"); + taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4"); + return taskExecutionContext; + } + + private static class SimpleTaskExecuteThread extends TaskExecuteThread { + + public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService, Logger taskLogger, AlertClientService alertClientService) { + super(taskExecutionContext, taskCallbackService, taskLogger, alertClientService); + } + + @Override + public void run() { + // + } + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java new file mode 100644 index 0000000000..c6b0493e1f --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; +import org.apache.dolphinscheduler.server.worker.task.AbstractTask; +import org.apache.dolphinscheduler.server.worker.task.TaskManager; +import org.apache.dolphinscheduler.service.alert.AlertClientService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import java.util.Collections; +import java.util.Date; +import java.util.List; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * test worker manager thread. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({ + Stopper.class, + TaskManager.class, + JSONUtils.class, + CommonUtils.class, + SpringApplicationContext.class, + OSUtils.class}) +public class WorkerManagerThreadTest { + + private TaskCallbackService taskCallbackService; + + private WorkerManagerThread workerManager; + + private TaskExecutionContext taskExecutionContext; + + private AlertClientService alertClientService; + + private Logger taskLogger; + + @Before + public void before() { + // init task execution context, logger + taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setProcessId(12345); + taskExecutionContext.setProcessDefineId(1); + taskExecutionContext.setProcessInstanceId(1); + taskExecutionContext.setTaskInstanceId(1); + taskExecutionContext.setTenantCode("test"); + taskExecutionContext.setTaskType(""); + taskExecutionContext.setFirstSubmitTime(new Date()); + taskExecutionContext.setDelayTime(0); + taskExecutionContext.setLogPath("/tmp/test.log"); + taskExecutionContext.setHost("localhost"); + taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4"); + + Command ackCommand = new TaskExecuteAckCommand().convert2Command(); + Command responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId()).convert2Command(); + + taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId( + LoggerUtils.TASK_LOGGER_INFO_PREFIX, + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId() + )); + + TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl(); + taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); + + alertClientService = PowerMockito.mock(AlertClientService.class); + WorkerConfig workerConfig = PowerMockito.mock(WorkerConfig.class); + taskCallbackService = PowerMockito.mock(TaskCallbackService.class); + PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand); + PowerMockito.doNothing().when(taskCallbackService).sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand); + PowerMockito.mockStatic(SpringApplicationContext.class); + PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)) + .thenReturn(taskExecutionContextCacheManager); + PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)) + .thenReturn(workerConfig); + PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class)) + .thenReturn(taskCallbackService); + PowerMockito.when(workerConfig.getWorkerExecThreads()).thenReturn(5); + workerManager = new WorkerManagerThread(); + + PowerMockito.mockStatic(TaskManager.class); + PowerMockito.when(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService)) + .thenReturn(new SimpleTask(taskExecutionContext, taskLogger)); + PowerMockito.mockStatic(JSONUtils.class); + PowerMockito.when(JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class)) + .thenReturn(new TaskNode()); + PowerMockito.mockStatic(CommonUtils.class); + PowerMockito.when(CommonUtils.getSystemEnvPath()).thenReturn("/user_home/.bash_profile"); + List osUserList = Collections.singletonList("test"); + PowerMockito.mockStatic(OSUtils.class); + PowerMockito.when(OSUtils.getUserList()).thenReturn(osUserList); + PowerMockito.mockStatic(Stopper.class); + PowerMockito.when(Stopper.isRunning()).thenReturn(true, false); + } + + @Test + public void testSendTaskKillResponse() { + TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService); + workerManager.offer(taskExecuteThread); + Assert.assertEquals(1, workerManager.getQueueSize()); + workerManager.killTaskBeforeExecuteByInstanceId(1); + Assert.assertEquals(0, workerManager.getQueueSize()); + } + + @Test + public void testRun() { + TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService); + workerManager.offer(taskExecuteThread); + Assert.assertEquals(1, workerManager.getQueueSize()); + workerManager.run(); + Assert.assertEquals(0, workerManager.getQueueSize()); + } + + private static class SimpleTask extends AbstractTask { + + protected SimpleTask(TaskExecutionContext taskExecutionContext, Logger logger) { + super(taskExecutionContext, logger); + // pid + this.processId = taskExecutionContext.getProcessId(); + } + + @Override + public AbstractParameters getParameters() { + return null; + } + + @Override + public void init() { + + } + + @Override + public void handle() { + + } + + @Override + public void after() { + + } + + @Override + public ExecutionStatus getExitStatus() { + return ExecutionStatus.SUCCESS; + } + } +} diff --git a/pom.xml b/pom.xml index c0974a53de..c6880c8b8d 100644 --- a/pom.xml +++ b/pom.xml @@ -913,6 +913,7 @@ **/server/utils/ProcessUtilsTest.java **/server/utils/SparkArgsUtilsTest.java **/server/worker/processor/TaskCallbackServiceTest.java + **/server/worker/processor/TaskExecuteProcessorTest.java **/server/worker/registry/WorkerRegistryTest.java **/server/worker/shell/ShellCommandExecutorTest.java **/server/worker/sql/SqlExecutorTest.java @@ -926,6 +927,7 @@ **/server/worker/task/TaskManagerTest.java **/server/worker/EnvFileTest.java **/server/worker/runner/TaskExecuteThreadTest.java + **/server/worker/runner/WorkerManagerThreadTest.java **/service/quartz/cron/CronUtilsTest.java **/service/process/ProcessServiceTest.java **/service/zk/DefaultEnsembleProviderTest.java