diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptAlertChannel.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptAlertChannel.java index dc6aa27e25..df1cbcc28e 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptAlertChannel.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptAlertChannel.java @@ -36,6 +36,6 @@ public class ScriptAlertChannel implements AlertChannel { if (null == paramsMap) { return new AlertResult("false", "ding talk params is null"); } - return new ScriptSender(paramsMap).sendScriptAlert(alertData.getTitle()); + return new ScriptSender(paramsMap).sendScriptAlert(alertData.getTitle(),alertData.getContent()); } } diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSender.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSender.java index 638a500299..8e33b792d8 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSender.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSender.java @@ -37,28 +37,34 @@ public class ScriptSender { private String userParams; + private static final String ALERT_TITLE_OPTION = " -t "; + + private static final String ALERT_CONTENT_OPTION = " -c "; + + private static final String ALERT_USER_PARAMS_OPTION = " -p "; + ScriptSender(Map config) { scriptPath = config.get(ScriptParamsConstants.NAME_SCRIPT_PATH); scriptType = config.get(ScriptParamsConstants.NAME_SCRIPT_TYPE); userParams = config.get(ScriptParamsConstants.NAME_SCRIPT_USER_PARAMS); } - AlertResult sendScriptAlert(String msg) { + AlertResult sendScriptAlert(String title, String content) { AlertResult alertResult = new AlertResult(); if (ScriptType.SHELL.getDescp().equals(scriptType)) { - return executeShellScript(msg); + return executeShellScript(title, content); } return alertResult; } - private AlertResult executeShellScript(String msg) { + private AlertResult executeShellScript(String title, String content) { AlertResult alertResult = new AlertResult(); alertResult.setStatus("false"); if (Boolean.TRUE.equals(OSUtils.isWindows())) { alertResult.setMessage("shell script not support windows os"); return alertResult; } - String[] cmd = {"/bin/sh", "-c", scriptPath + " " + msg + " " + userParams}; + String[] cmd = {"/bin/sh", "-c", scriptPath + ALERT_TITLE_OPTION + "'" + title + "'" + ALERT_CONTENT_OPTION + "'" + content + "'" + ALERT_USER_PARAMS_OPTION + "'" + userParams + "'"}; int exitCode = ProcessUtils.executeScript(cmd); if (exitCode == 0) { diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/StreamGobbler.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/StreamGobbler.java index 813e91a729..41aabfe13d 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/StreamGobbler.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/StreamGobbler.java @@ -55,6 +55,13 @@ public class StreamGobbler extends Thread { } } catch (IOException e) { logger.error("I/O error occurs {}", e.getMessage()); + } finally { + try { + inputBufferReader.close(); + inputStreamReader.close(); + } catch (IOException e) { + logger.error("I/O error occurs {}", e.getMessage()); + } } } diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ProcessUtilsTest.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ProcessUtilsTest.java index 1bf98d2019..1d847a0635 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ProcessUtilsTest.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ProcessUtilsTest.java @@ -26,9 +26,9 @@ public class ProcessUtilsTest { private static final String rootPath = System.getProperty("user.dir"); - private static final String shellFilPath = rootPath + "/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/example.sh"; + private static final String shellFilPath = rootPath + "/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/test.sh"; - private String[] cmd = {"/bin/sh", "-c", shellFilPath + " " + "testMsg" + " " + "userParams"}; + private String[] cmd = {"/bin/sh", "-c", shellFilPath + " -t 1"}; @Test public void testExecuteScript() { diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java index 7f15ed8a7b..e022b9ebf7 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java @@ -35,7 +35,7 @@ public class ScriptSenderTest { private static final String rootPath = System.getProperty("user.dir"); - private static final String shellFilPath = rootPath + "/src/test/script/shell/scriptTest.sh"; + private static final String shellFilPath = rootPath + "/src/test/script/shell/scriptExample.sh"; @Before public void initScriptConfig() { @@ -49,9 +49,9 @@ public class ScriptSenderTest { public void testScriptSenderTest() { ScriptSender scriptSender = new ScriptSender(scriptConfig); AlertResult alertResult; - alertResult = scriptSender.sendScriptAlert("success"); + alertResult = scriptSender.sendScriptAlert("test title Kris", "test content"); Assert.assertEquals("true", alertResult.getStatus()); - alertResult = scriptSender.sendScriptAlert("errorMsg"); + alertResult = scriptSender.sendScriptAlert("error msg title", "test content"); Assert.assertEquals("false", alertResult.getStatus()); } diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/example.sh b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptExample.sh similarity index 80% rename from dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/example.sh rename to dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptExample.sh index 708dcd004b..aca9866df0 100755 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/example.sh +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptExample.sh @@ -1,3 +1,4 @@ +#!/bin/bash # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with @@ -15,11 +16,24 @@ # limitations under the License. # +while getopts t:c:p: opts; do + case $opts in + t) t=$OPTARG ;; + c) c=$OPTARG ;; + p) p=$OPTARG ;; + ?) ;; + esac +done -msg=$1 -content=$2 # Write your specific logic here # Set the exit code according to your execution result, and alert needs to use it to judge the status of this alarm result + + +if [ "$t" = "error msg title" ] + then + exit 12 +fi exit 0 +exit 0 \ No newline at end of file diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptTest.sh b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/test.sh similarity index 87% rename from dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptTest.sh rename to dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/test.sh index 02eba48a81..7c9d163a9e 100755 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptTest.sh +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/test.sh @@ -15,11 +15,13 @@ # limitations under the License. # -msg=$1 -content=$2 +while getopts t: opts; do + case $opts in + t) t=$OPTARG ;; + ?) ;; + esac +done -if [ $msg = errorMsg ] - then - exit 12 -fi -exit 0 \ No newline at end of file +echo "$t" + +exit 0 diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 1f138f6acd..a267b5bf32 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread; +import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -84,6 +85,9 @@ public class WorkerServer { @Autowired private RetryReportTaskStatusThread retryReportTaskStatusThread; + @Autowired + private WorkerManagerThread workerManagerThread; + /** * worker server startup * @@ -119,6 +123,9 @@ public class WorkerServer { // worker registry this.workerRegistry.registry(); + // task execute manager + this.workerManagerThread.start(); + // retry report task status this.retryReportTaskStatusThread.start(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index fc8b33a488..3088080849 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.worker.processor; import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; @@ -38,12 +38,12 @@ import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCache import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; +import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.Date; import java.util.Optional; -import java.util.concurrent.ExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,11 +57,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class); - /** - * thread executor service - */ - private final ExecutorService workerExecService; - /** * worker config */ @@ -73,20 +68,25 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { private final TaskCallbackService taskCallbackService; /** - * alert client service + * alert client service */ private AlertClientService alertClientService; /** * taskExecutionContextCacheManager */ - private TaskExecutionContextCacheManager taskExecutionContextCacheManager; + private final TaskExecutionContextCacheManager taskExecutionContextCacheManager; + + /* + * task execute manager + */ + private final WorkerManagerThread workerManager; public TaskExecuteProcessor() { this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); - this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); + this.workerManager = SpringApplicationContext.getBean(WorkerManagerThread.class); } /** @@ -101,11 +101,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { } public TaskExecuteProcessor(AlertClientService alertClientService) { - this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); - this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); - this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()); - this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); - + this(); this.alertClientService = alertClientService; } @@ -140,9 +136,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { taskExecutionContext.getTaskInstanceId())); taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort())); - taskExecutionContext.setStartTime(new Date()); taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); - taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); // local execute path String execLocalPath = getExecLocalPath(taskExecutionContext); @@ -163,10 +157,23 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); + // delay task process + long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L); + if (remainTime > 0) { + logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION); + taskExecutionContext.setStartTime(null); + } else { + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); + taskExecutionContext.setStartTime(new Date()); + } + this.doAck(taskExecutionContext); - // submit task - workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService)); + // submit task to manager + if (!workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService))) { + logger.info("submit task to manager error, queue is full, queue size is {}", workerManager.getQueueSize()); + } } private void doAck(TaskExecutionContext taskExecutionContext) { @@ -178,6 +185,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { /** * build ack command + * * @param taskExecutionContext taskExecutionContext * @return TaskExecuteAckCommand */ @@ -209,4 +217,4 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); } -} \ No newline at end of file +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index b41c189eda..a3665b33e0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.log.LogClientService; @@ -69,10 +70,16 @@ public class TaskKillProcessor implements NettyRequestProcessor { */ private TaskExecutionContextCacheManager taskExecutionContextCacheManager; + /* + * task execute manager + */ + private final WorkerManagerThread workerManager; + public TaskKillProcessor() { this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); + this.workerManager = SpringApplicationContext.getBean(WorkerManagerThread.class); } /** @@ -110,6 +117,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); Integer processId = taskExecutionContext.getProcessId(); if (processId.equals(0)) { + workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId); taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId); return Pair.of(true, appIds); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 721656730d..05bd8065e8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.worker.runner; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; @@ -51,7 +50,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -62,7 +63,7 @@ import com.github.rholder.retry.RetryException; /** * task scheduler thread */ -public class TaskExecuteThread implements Runnable { +public class TaskExecuteThread implements Runnable, Delayed { /** * logger @@ -132,7 +133,6 @@ public class TaskExecuteThread implements Runnable { // task node TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class); - delayExecutionIfNeeded(); if (taskExecutionContext.getStartTime() == null) { taskExecutionContext.setStartTime(new Date()); } @@ -289,24 +289,6 @@ public class TaskExecuteThread implements Runnable { } } - /** - * delay execution if needed. - */ - private void delayExecutionIfNeeded() { - long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), - taskExecutionContext.getDelayTime() * 60L); - logger.info("delay execution time: {} s", remainTime < 0 ? 0 : remainTime); - if (remainTime > 0) { - try { - Thread.sleep(remainTime * Constants.SLEEP_TIME_MILLIS); - } catch (Exception e) { - logger.error("delay task execution failure, the task will be executed directly. process instance id:{}, task instance id:{}", - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId()); - } - } - } - /** * send an ack to change the status of the task. */ @@ -343,4 +325,26 @@ public class TaskExecuteThread implements Runnable { } return ackCommand; } -} \ No newline at end of file + + /** + * get current TaskExecutionContext + * @return TaskExecutionContext + */ + public TaskExecutionContext getTaskExecutionContext() { + return this.taskExecutionContext; + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), + taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS); + } + + @Override + public int compareTo(Delayed o) { + if (o == null) { + return 1; + } + return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java new file mode 100644 index 0000000000..073c9488ae --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import org.apache.dolphinscheduler.common.enums.Event; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; +import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import java.util.concurrent.DelayQueue; +import java.util.concurrent.ExecutorService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * Manage tasks + */ +@Component +public class WorkerManagerThread implements Runnable { + + private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class); + + /** + * task queue + */ + private final DelayQueue workerExecuteQueue = new DelayQueue<>(); + + /** + * worker config + */ + private final WorkerConfig workerConfig; + + /** + * thread executor service + */ + private final ExecutorService workerExecService; + + /** + * taskExecutionContextCacheManager + */ + private TaskExecutionContextCacheManager taskExecutionContextCacheManager; + + /** + * task callback service + */ + private final TaskCallbackService taskCallbackService; + + public WorkerManagerThread() { + this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); + this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); + this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads()); + this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); + } + + /** + * get queue size + * + * @return queue size + */ + public int getQueueSize() { + return workerExecuteQueue.size(); + } + + /** + * Kill tasks that have not been executed, like delay task + * then send Response to Master, update the execution status of task instance + */ + public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) { + workerExecuteQueue.stream() + .filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == taskInstanceId) + .forEach(workerExecuteQueue::remove); + sendTaskKillResponse(taskInstanceId); + } + + /** + * kill task before execute , like delay task + */ + private void sendTaskKillResponse(Integer taskInstanceId) { + TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); + if (taskExecutionContext == null) { + return; + } + TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId()); + responseCommand.setStatus(ExecutionStatus.KILL.getCode()); + ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT); + taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); + } + + /** + * submit task + * + * @param taskExecuteThread taskExecuteThread + * @return submit result + */ + public boolean offer(TaskExecuteThread taskExecuteThread) { + return workerExecuteQueue.offer(taskExecuteThread); + } + + public void start() { + Thread thread = new Thread(this, this.getClass().getName()); + thread.start(); + } + + @Override + public void run() { + Thread.currentThread().setName("Worker-Execute-Manager-Thread"); + TaskExecuteThread taskExecuteThread; + while (Stopper.isRunning()) { + try { + taskExecuteThread = workerExecuteQueue.take(); + workerExecService.submit(taskExecuteThread); + } catch (Exception e) { + logger.error("An unexpected interrupt is happened, " + + "the exception will be ignored and this thread will continue to run", e); + } + } + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index 5a5561d1bd..bdd723a4cd 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; +import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; @@ -79,7 +80,8 @@ import io.netty.channel.Channel; TaskResponseProcessor.class, TaskExecuteProcessor.class, CuratorZookeeperClient.class, - TaskExecutionContextCacheManagerImpl.class}) + TaskExecutionContextCacheManagerImpl.class, + WorkerManagerThread.class}) public class TaskCallbackServiceTest { @Autowired diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java new file mode 100644 index 0000000000..fef21519d3 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.processor; + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; +import org.apache.dolphinscheduler.remote.utils.ChannelUtils; +import org.apache.dolphinscheduler.remote.utils.JsonSerializer; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; +import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; +import org.apache.dolphinscheduler.service.alert.AlertClientService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import java.util.Date; +import java.util.concurrent.ExecutorService; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * test task execute processor + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class, + JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class}) +public class TaskExecuteProcessorTest { + + private TaskExecutionContext taskExecutionContext; + + private TaskCallbackService taskCallbackService; + + private ExecutorService workerExecService; + + private WorkerConfig workerConfig; + + private Command command; + + private Command ackCommand; + + private TaskExecuteRequestCommand taskRequestCommand; + + private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager; + + private AlertClientService alertClientService; + + private WorkerManagerThread workerManager; + + @Before + public void before() throws Exception { + // init task execution context + taskExecutionContext = getTaskExecutionContext(); + workerConfig = new WorkerConfig(); + workerConfig.setWorkerExecThreads(1); + workerConfig.setListenPort(1234); + command = new Command(); + command.setType(CommandType.TASK_EXECUTE_REQUEST); + ackCommand = new TaskExecuteAckCommand().convert2Command(); + taskRequestCommand = new TaskExecuteRequestCommand(); + alertClientService = PowerMockito.mock(AlertClientService.class); + workerExecService = PowerMockito.mock(ExecutorService.class); + PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class))) + .thenReturn(null); + + PowerMockito.mockStatic(ChannelUtils.class); + PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null); + + taskExecutionContextCacheManager = PowerMockito.mock(TaskExecutionContextCacheManagerImpl.class); + taskCallbackService = PowerMockito.mock(TaskCallbackService.class); + PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand); + + PowerMockito.mockStatic(SpringApplicationContext.class); + PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class)) + .thenReturn(taskCallbackService); + PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)) + .thenReturn(workerConfig); + PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)) + .thenReturn(null); + PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)) + .thenReturn(taskExecutionContextCacheManager); + + Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId())); + + workerManager = PowerMockito.mock(WorkerManagerThread.class); + PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService))).thenReturn(Boolean.TRUE); + + PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class)) + .thenReturn(workerManager); + + PowerMockito.mockStatic(ThreadUtils.class); + PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads())) + .thenReturn(workerExecService); + + PowerMockito.mockStatic(JsonSerializer.class); + PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskExecuteRequestCommand.class)) + .thenReturn(taskRequestCommand); + + PowerMockito.mockStatic(JSONUtils.class); + PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskExecuteRequestCommand.class)) + .thenReturn(taskRequestCommand); + PowerMockito.when(JSONUtils.parseObject(taskRequestCommand.getTaskExecutionContext(), TaskExecutionContext.class)) + .thenReturn(taskExecutionContext); + + PowerMockito.mockStatic(FileUtils.class); + PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(), + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId())) + .thenReturn(taskExecutionContext.getExecutePath()); + PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath()); + + SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(null, null, null, alertClientService); + PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments() + .thenReturn(simpleTaskExecuteThread); + } + + @Test + public void testNormalExecution() { + TaskExecuteProcessor processor = new TaskExecuteProcessor(); + processor.process(null, command); + + Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskExecutionContext.getCurrentExecutionStatus()); + } + + @Test + public void testDelayExecution() { + taskExecutionContext.setDelayTime(1); + TaskExecuteProcessor processor = new TaskExecuteProcessor(); + processor.process(null, command); + + Assert.assertEquals(ExecutionStatus.DELAY_EXECUTION, taskExecutionContext.getCurrentExecutionStatus()); + } + + public TaskExecutionContext getTaskExecutionContext() { + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setProcessId(12345); + taskExecutionContext.setProcessDefineId(1); + taskExecutionContext.setProcessInstanceId(1); + taskExecutionContext.setTaskInstanceId(1); + taskExecutionContext.setTaskType("sql"); + taskExecutionContext.setFirstSubmitTime(new Date()); + taskExecutionContext.setDelayTime(0); + taskExecutionContext.setLogPath("/tmp/test.log"); + taskExecutionContext.setHost("localhost"); + taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4"); + return taskExecutionContext; + } + + private static class SimpleTaskExecuteThread extends TaskExecuteThread { + + public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService, Logger taskLogger, AlertClientService alertClientService) { + super(taskExecutionContext, taskCallbackService, taskLogger, alertClientService); + } + + @Override + public void run() { + // + } + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java new file mode 100644 index 0000000000..c6b0493e1f --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; +import org.apache.dolphinscheduler.server.worker.task.AbstractTask; +import org.apache.dolphinscheduler.server.worker.task.TaskManager; +import org.apache.dolphinscheduler.service.alert.AlertClientService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import java.util.Collections; +import java.util.Date; +import java.util.List; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * test worker manager thread. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({ + Stopper.class, + TaskManager.class, + JSONUtils.class, + CommonUtils.class, + SpringApplicationContext.class, + OSUtils.class}) +public class WorkerManagerThreadTest { + + private TaskCallbackService taskCallbackService; + + private WorkerManagerThread workerManager; + + private TaskExecutionContext taskExecutionContext; + + private AlertClientService alertClientService; + + private Logger taskLogger; + + @Before + public void before() { + // init task execution context, logger + taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setProcessId(12345); + taskExecutionContext.setProcessDefineId(1); + taskExecutionContext.setProcessInstanceId(1); + taskExecutionContext.setTaskInstanceId(1); + taskExecutionContext.setTenantCode("test"); + taskExecutionContext.setTaskType(""); + taskExecutionContext.setFirstSubmitTime(new Date()); + taskExecutionContext.setDelayTime(0); + taskExecutionContext.setLogPath("/tmp/test.log"); + taskExecutionContext.setHost("localhost"); + taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4"); + + Command ackCommand = new TaskExecuteAckCommand().convert2Command(); + Command responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId()).convert2Command(); + + taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId( + LoggerUtils.TASK_LOGGER_INFO_PREFIX, + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId() + )); + + TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl(); + taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); + + alertClientService = PowerMockito.mock(AlertClientService.class); + WorkerConfig workerConfig = PowerMockito.mock(WorkerConfig.class); + taskCallbackService = PowerMockito.mock(TaskCallbackService.class); + PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand); + PowerMockito.doNothing().when(taskCallbackService).sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand); + PowerMockito.mockStatic(SpringApplicationContext.class); + PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)) + .thenReturn(taskExecutionContextCacheManager); + PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)) + .thenReturn(workerConfig); + PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class)) + .thenReturn(taskCallbackService); + PowerMockito.when(workerConfig.getWorkerExecThreads()).thenReturn(5); + workerManager = new WorkerManagerThread(); + + PowerMockito.mockStatic(TaskManager.class); + PowerMockito.when(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService)) + .thenReturn(new SimpleTask(taskExecutionContext, taskLogger)); + PowerMockito.mockStatic(JSONUtils.class); + PowerMockito.when(JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class)) + .thenReturn(new TaskNode()); + PowerMockito.mockStatic(CommonUtils.class); + PowerMockito.when(CommonUtils.getSystemEnvPath()).thenReturn("/user_home/.bash_profile"); + List osUserList = Collections.singletonList("test"); + PowerMockito.mockStatic(OSUtils.class); + PowerMockito.when(OSUtils.getUserList()).thenReturn(osUserList); + PowerMockito.mockStatic(Stopper.class); + PowerMockito.when(Stopper.isRunning()).thenReturn(true, false); + } + + @Test + public void testSendTaskKillResponse() { + TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService); + workerManager.offer(taskExecuteThread); + Assert.assertEquals(1, workerManager.getQueueSize()); + workerManager.killTaskBeforeExecuteByInstanceId(1); + Assert.assertEquals(0, workerManager.getQueueSize()); + } + + @Test + public void testRun() { + TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService); + workerManager.offer(taskExecuteThread); + Assert.assertEquals(1, workerManager.getQueueSize()); + workerManager.run(); + Assert.assertEquals(0, workerManager.getQueueSize()); + } + + private static class SimpleTask extends AbstractTask { + + protected SimpleTask(TaskExecutionContext taskExecutionContext, Logger logger) { + super(taskExecutionContext, logger); + // pid + this.processId = taskExecutionContext.getProcessId(); + } + + @Override + public AbstractParameters getParameters() { + return null; + } + + @Override + public void init() { + + } + + @Override + public void handle() { + + } + + @Override + public void after() { + + } + + @Override + public ExecutionStatus getExitStatus() { + return ExecutionStatus.SUCCESS; + } + } +} diff --git a/pom.xml b/pom.xml index c0974a53de..c6880c8b8d 100644 --- a/pom.xml +++ b/pom.xml @@ -913,6 +913,7 @@ **/server/utils/ProcessUtilsTest.java **/server/utils/SparkArgsUtilsTest.java **/server/worker/processor/TaskCallbackServiceTest.java + **/server/worker/processor/TaskExecuteProcessorTest.java **/server/worker/registry/WorkerRegistryTest.java **/server/worker/shell/ShellCommandExecutorTest.java **/server/worker/sql/SqlExecutorTest.java @@ -926,6 +927,7 @@ **/server/worker/task/TaskManagerTest.java **/server/worker/EnvFileTest.java **/server/worker/runner/TaskExecuteThreadTest.java + **/server/worker/runner/WorkerManagerThreadTest.java **/service/quartz/cron/CronUtilsTest.java **/service/process/ProcessServiceTest.java **/service/zk/DefaultEnsembleProviderTest.java