From c4131d9fb10418d36553389c84e79f215cf4f2c1 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Mon, 8 Feb 2021 19:34:45 +0800 Subject: [PATCH 1/8] [alert-script]alert msg should contains content --- .../dolphinscheduler/plugin/alert/script/ScriptSender.java | 1 + .../plugin/alert/script/StreamGobbler.java | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSender.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSender.java index 377c318b8e..476b9be784 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSender.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSender.java @@ -43,6 +43,7 @@ public class ScriptSender { userParams = config.get(ScriptParamsConstants.NAME_SCRIPT_USER_PARAMS); } + //fixme AlertResult sendScriptAlert(String msg) { AlertResult alertResult = new AlertResult(); if (ScriptType.of(scriptType).equals(ScriptType.SHELL)) { diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/StreamGobbler.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/StreamGobbler.java index 813e91a729..41aabfe13d 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/StreamGobbler.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/StreamGobbler.java @@ -55,6 +55,13 @@ public class StreamGobbler extends Thread { } } catch (IOException e) { logger.error("I/O error occurs {}", e.getMessage()); + } finally { + try { + inputBufferReader.close(); + inputStreamReader.close(); + } catch (IOException e) { + logger.error("I/O error occurs {}", e.getMessage()); + } } } From 6b56a78909c20d7edcfea285911ff00d1516c459 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Tue, 9 Feb 2021 11:01:26 +0800 Subject: [PATCH 2/8] [alert-script]alert msg should contains content --- .../alert/script/ScriptAlertChannel.java | 2 +- .../plugin/alert/script/ScriptSender.java | 9 ++++----- .../plugin/alert/script/ScriptSenderTest.java | 6 +++--- .../src/test/script/shell/example.sh | 9 ++++++++- .../src/test/script/shell/scriptTest.sh | 18 ++++++++++++++++-- 5 files changed, 32 insertions(+), 12 deletions(-) diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptAlertChannel.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptAlertChannel.java index dc6aa27e25..df1cbcc28e 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptAlertChannel.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptAlertChannel.java @@ -36,6 +36,6 @@ public class ScriptAlertChannel implements AlertChannel { if (null == paramsMap) { return new AlertResult("false", "ding talk params is null"); } - return new ScriptSender(paramsMap).sendScriptAlert(alertData.getTitle()); + return new ScriptSender(paramsMap).sendScriptAlert(alertData.getTitle(),alertData.getContent()); } } diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSender.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSender.java index 476b9be784..119abd5326 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSender.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSender.java @@ -43,23 +43,22 @@ public class ScriptSender { userParams = config.get(ScriptParamsConstants.NAME_SCRIPT_USER_PARAMS); } - //fixme - AlertResult sendScriptAlert(String msg) { + AlertResult sendScriptAlert(String title, String content) { AlertResult alertResult = new AlertResult(); if (ScriptType.of(scriptType).equals(ScriptType.SHELL)) { - return executeShellScript(msg); + return executeShellScript(title, content); } return alertResult; } - private AlertResult executeShellScript(String msg) { + private AlertResult executeShellScript(String title, String content) { AlertResult alertResult = new AlertResult(); alertResult.setStatus("false"); if (Boolean.TRUE.equals(OSUtils.isWindows())) { alertResult.setMessage("shell script not support windows os"); return alertResult; } - String[] cmd = {"/bin/sh", "-c", scriptPath + " " + msg + " " + userParams}; + String[] cmd = {"/bin/sh", "-c", scriptPath + " -alertTitle " + title+" -alertContent " + content + " alertUserParams " + userParams}; int exitCode = ProcessUtils.executeScript(cmd); if (exitCode == 0) { diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java index 1cd74cfaba..8457079417 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java @@ -41,7 +41,7 @@ public class ScriptSenderTest { public void initScriptConfig() { scriptConfig.put(ScriptParamsConstants.NAME_SCRIPT_TYPE, String.valueOf(ScriptType.SHELL.getCode())); - scriptConfig.put(ScriptParamsConstants.NAME_SCRIPT_USER_PARAMS, "userParams"); + scriptConfig.put(ScriptParamsConstants.NAME_SCRIPT_USER_PARAMS, "test user params"); scriptConfig.put(ScriptParamsConstants.NAME_SCRIPT_PATH, shellFilPath); } @@ -49,9 +49,9 @@ public class ScriptSenderTest { public void testScriptSenderTest() { ScriptSender scriptSender = new ScriptSender(scriptConfig); AlertResult alertResult; - alertResult = scriptSender.sendScriptAlert("success"); + alertResult = scriptSender.sendScriptAlert("testtitleKrisKi", "testcontent"); Assert.assertEquals("true", alertResult.getStatus()); - alertResult = scriptSender.sendScriptAlert("errorMsg"); + alertResult = scriptSender.sendScriptAlert("errorMsgtitle ", "testcontent"); Assert.assertEquals("false", alertResult.getStatus()); } diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/example.sh b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/example.sh index 708dcd004b..b41f22f34a 100755 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/example.sh +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/example.sh @@ -16,8 +16,15 @@ # -msg=$1 +title=$1 content=$2 +userParams=$3 + +echo "$title" + +echo "$content" + +echo "$userParams" # Write your specific logic here diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptTest.sh b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptTest.sh index 02eba48a81..91d71da75a 100755 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptTest.sh +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptTest.sh @@ -15,10 +15,24 @@ # limitations under the License. # -msg=$1 +title=$1 content=$2 +userParams=$3 -if [ $msg = errorMsg ] +echo title + +echo "last" + +echo $title + +echo $content + +echo $userParams + + +done + +if [ $title = errorMsg ] then exit 12 fi From d83719f4333b56772fac21fe50bef0383b2018d2 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Mon, 15 Feb 2021 12:50:24 +0800 Subject: [PATCH 3/8] test --- .../src/test/script/shell/scriptTest.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptTest.sh b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptTest.sh index 91d71da75a..1d4305d2b1 100755 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptTest.sh +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptTest.sh @@ -18,8 +18,8 @@ title=$1 content=$2 userParams=$3 - -echo title +echo "last" +echo $alertTitle echo "last" From 4275bde92702f16ccb8ed75f79500cbcf03a58cf Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Fri, 19 Feb 2021 22:31:41 +0800 Subject: [PATCH 4/8] [Improvement][spi-alert]script plugin should contain alert content fix alert params contain space not parse fix stream not close --- .../plugin/alert/script/ScriptSender.java | 8 +++++- .../plugin/alert/script/ScriptSenderTest.java | 4 +-- .../src/test/script/shell/scriptTest.sh | 26 +++++++------------ 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSender.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSender.java index 119abd5326..4269b401fb 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSender.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSender.java @@ -37,6 +37,12 @@ public class ScriptSender { private String userParams; + private static final String ALERT_TITLE_OPTION = " -t "; + + private static final String ALERT_CONTENT_OPTION = " -c "; + + private static final String ALERT_USER_PARAMS_OPTION = " -p "; + ScriptSender(Map config) { scriptPath = config.get(ScriptParamsConstants.NAME_SCRIPT_PATH); scriptType = Integer.parseInt(config.get(ScriptParamsConstants.NAME_SCRIPT_TYPE)); @@ -58,7 +64,7 @@ public class ScriptSender { alertResult.setMessage("shell script not support windows os"); return alertResult; } - String[] cmd = {"/bin/sh", "-c", scriptPath + " -alertTitle " + title+" -alertContent " + content + " alertUserParams " + userParams}; + String[] cmd = {"/bin/sh", "-c", scriptPath + ALERT_TITLE_OPTION + "'" + title + "'" + ALERT_CONTENT_OPTION + "'" + content + "'" + ALERT_USER_PARAMS_OPTION + "'" + userParams + "'"}; int exitCode = ProcessUtils.executeScript(cmd); if (exitCode == 0) { diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java index 8457079417..4b41d23493 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java @@ -49,9 +49,9 @@ public class ScriptSenderTest { public void testScriptSenderTest() { ScriptSender scriptSender = new ScriptSender(scriptConfig); AlertResult alertResult; - alertResult = scriptSender.sendScriptAlert("testtitleKrisKi", "testcontent"); + alertResult = scriptSender.sendScriptAlert("test title Kris", "test content"); Assert.assertEquals("true", alertResult.getStatus()); - alertResult = scriptSender.sendScriptAlert("errorMsgtitle ", "testcontent"); + alertResult = scriptSender.sendScriptAlert("error msg title", "test content"); Assert.assertEquals("false", alertResult.getStatus()); } diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptTest.sh b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptTest.sh index 1d4305d2b1..153b6debfb 100755 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptTest.sh +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptTest.sh @@ -1,3 +1,4 @@ +#!/bin/bash # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with @@ -15,25 +16,18 @@ # limitations under the License. # -title=$1 -content=$2 -userParams=$3 -echo "last" -echo $alertTitle - -echo "last" - -echo $title - -echo $content - -echo $userParams - - +while getopts t:c:p: opts; do + case $opts in + t) t=$OPTARG ;; + c) c=$OPTARG ;; + p) p=$OPTARG ;; + ?) ;; + esac done -if [ $title = errorMsg ] +if [ "$t" = "error msg title" ] then exit 12 fi +exit 0 exit 0 \ No newline at end of file From af00de357a87e53175c0ee1ec7dfb02a29f4adcb Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Fri, 19 Feb 2021 22:36:43 +0800 Subject: [PATCH 5/8] [Improvement][spi-alert]script plugin should contain alert content fix alert params contain space not parse fix stream not close --- .../dolphinscheduler/plugin/alert/script/ScriptSenderTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java index 4b41d23493..6e1aee8c26 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java @@ -47,6 +47,7 @@ public class ScriptSenderTest { @Test public void testScriptSenderTest() { + ScriptSender scriptSender = new ScriptSender(scriptConfig); AlertResult alertResult; alertResult = scriptSender.sendScriptAlert("test title Kris", "test content"); From 5e1be068c450fd64ccf7899dbe706b73d5eacae0 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Fri, 19 Feb 2021 22:49:21 +0800 Subject: [PATCH 6/8] [Improvement][spi-alert]script plugin should contain alert content fix alert params contain space not parse fix stream not close --- .../dolphinscheduler/plugin/alert/script/ScriptSenderTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java index 0ab8de04a2..9b12ad2fdb 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java @@ -47,7 +47,6 @@ public class ScriptSenderTest { @Test public void testScriptSenderTest() { - ScriptSender scriptSender = new ScriptSender(scriptConfig); AlertResult alertResult; alertResult = scriptSender.sendScriptAlert("test title Kris", "test content"); From cf042ccd6ce97d3c50fda9aa448b0d135f27e7bb Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Fri, 19 Feb 2021 23:01:20 +0800 Subject: [PATCH 7/8] rename example.sh --- .../plugin/alert/script/ProcessUtilsTest.java | 4 ++-- .../plugin/alert/script/ScriptSenderTest.java | 2 +- .../shell/{scriptTest.sh => scriptExample.sh} | 6 ++++++ .../test/script/shell/{example.sh => test.sh} | 19 +++++++------------ 4 files changed, 16 insertions(+), 15 deletions(-) rename dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/{scriptTest.sh => scriptExample.sh} (85%) rename dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/{example.sh => test.sh} (76%) diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ProcessUtilsTest.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ProcessUtilsTest.java index 1bf98d2019..1d847a0635 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ProcessUtilsTest.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ProcessUtilsTest.java @@ -26,9 +26,9 @@ public class ProcessUtilsTest { private static final String rootPath = System.getProperty("user.dir"); - private static final String shellFilPath = rootPath + "/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/example.sh"; + private static final String shellFilPath = rootPath + "/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/test.sh"; - private String[] cmd = {"/bin/sh", "-c", shellFilPath + " " + "testMsg" + " " + "userParams"}; + private String[] cmd = {"/bin/sh", "-c", shellFilPath + " -t 1"}; @Test public void testExecuteScript() { diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java index 9b12ad2fdb..e022b9ebf7 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSenderTest.java @@ -35,7 +35,7 @@ public class ScriptSenderTest { private static final String rootPath = System.getProperty("user.dir"); - private static final String shellFilPath = rootPath + "/src/test/script/shell/scriptTest.sh"; + private static final String shellFilPath = rootPath + "/src/test/script/shell/scriptExample.sh"; @Before public void initScriptConfig() { diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptTest.sh b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptExample.sh similarity index 85% rename from dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptTest.sh rename to dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptExample.sh index 153b6debfb..aca9866df0 100755 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptTest.sh +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/scriptExample.sh @@ -25,6 +25,12 @@ while getopts t:c:p: opts; do esac done + +# Write your specific logic here + +# Set the exit code according to your execution result, and alert needs to use it to judge the status of this alarm result + + if [ "$t" = "error msg title" ] then exit 12 diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/example.sh b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/test.sh similarity index 76% rename from dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/example.sh rename to dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/test.sh index b41f22f34a..7c9d163a9e 100755 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/example.sh +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/test/script/shell/test.sh @@ -15,18 +15,13 @@ # limitations under the License. # +while getopts t: opts; do + case $opts in + t) t=$OPTARG ;; + ?) ;; + esac +done -title=$1 -content=$2 -userParams=$3 +echo "$t" -echo "$title" - -echo "$content" - -echo "$userParams" - -# Write your specific logic here - -# Set the exit code according to your execution result, and alert needs to use it to judge the status of this alarm result exit 0 From ffcb1c22e16107e387ceb21dc5a0faaef0783ae7 Mon Sep 17 00:00:00 2001 From: guohaozhang Date: Tue, 23 Feb 2021 10:10:11 +0800 Subject: [PATCH 8/8] [Improvement-#3735] Make task delayed execution more efficient (#4812) * [Improvement-3735] improve implementation of delay task execution * [Improvement][worker] delay task compatible with dev branch and fix test Co-authored-by: vanilla111 <1115690319@qq.com> --- .../server/worker/WorkerServer.java | 7 + .../processor/TaskExecuteProcessor.java | 48 +++-- .../worker/processor/TaskKillProcessor.java | 8 + .../worker/runner/TaskExecuteThread.java | 48 +++-- .../worker/runner/WorkerManagerThread.java | 143 +++++++++++++ .../processor/TaskCallbackServiceTest.java | 4 +- .../processor/TaskExecuteProcessorTest.java | 195 ++++++++++++++++++ .../runner/WorkerManagerThreadTest.java | 187 +++++++++++++++++ pom.xml | 2 + 9 files changed, 599 insertions(+), 43 deletions(-) create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 1f138f6acd..a267b5bf32 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread; +import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -84,6 +85,9 @@ public class WorkerServer { @Autowired private RetryReportTaskStatusThread retryReportTaskStatusThread; + @Autowired + private WorkerManagerThread workerManagerThread; + /** * worker server startup * @@ -119,6 +123,9 @@ public class WorkerServer { // worker registry this.workerRegistry.registry(); + // task execute manager + this.workerManagerThread.start(); + // retry report task status this.retryReportTaskStatusThread.start(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index fc8b33a488..3088080849 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.worker.processor; import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; @@ -38,12 +38,12 @@ import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCache import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; +import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.Date; import java.util.Optional; -import java.util.concurrent.ExecutorService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,11 +57,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class); - /** - * thread executor service - */ - private final ExecutorService workerExecService; - /** * worker config */ @@ -73,20 +68,25 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { private final TaskCallbackService taskCallbackService; /** - * alert client service + * alert client service */ private AlertClientService alertClientService; /** * taskExecutionContextCacheManager */ - private TaskExecutionContextCacheManager taskExecutionContextCacheManager; + private final TaskExecutionContextCacheManager taskExecutionContextCacheManager; + + /* + * task execute manager + */ + private final WorkerManagerThread workerManager; public TaskExecuteProcessor() { this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); - this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); + this.workerManager = SpringApplicationContext.getBean(WorkerManagerThread.class); } /** @@ -101,11 +101,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { } public TaskExecuteProcessor(AlertClientService alertClientService) { - this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); - this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); - this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()); - this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); - + this(); this.alertClientService = alertClientService; } @@ -140,9 +136,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { taskExecutionContext.getTaskInstanceId())); taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort())); - taskExecutionContext.setStartTime(new Date()); taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); - taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); // local execute path String execLocalPath = getExecLocalPath(taskExecutionContext); @@ -163,10 +157,23 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); + // delay task process + long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L); + if (remainTime > 0) { + logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION); + taskExecutionContext.setStartTime(null); + } else { + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); + taskExecutionContext.setStartTime(new Date()); + } + this.doAck(taskExecutionContext); - // submit task - workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService)); + // submit task to manager + if (!workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService))) { + logger.info("submit task to manager error, queue is full, queue size is {}", workerManager.getQueueSize()); + } } private void doAck(TaskExecutionContext taskExecutionContext) { @@ -178,6 +185,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { /** * build ack command + * * @param taskExecutionContext taskExecutionContext * @return TaskExecuteAckCommand */ @@ -209,4 +217,4 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); } -} \ No newline at end of file +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index b41c189eda..a3665b33e0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.log.LogClientService; @@ -69,10 +70,16 @@ public class TaskKillProcessor implements NettyRequestProcessor { */ private TaskExecutionContextCacheManager taskExecutionContextCacheManager; + /* + * task execute manager + */ + private final WorkerManagerThread workerManager; + public TaskKillProcessor() { this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); + this.workerManager = SpringApplicationContext.getBean(WorkerManagerThread.class); } /** @@ -110,6 +117,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); Integer processId = taskExecutionContext.getProcessId(); if (processId.equals(0)) { + workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId); taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId); return Pair.of(true, appIds); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 721656730d..05bd8065e8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.worker.runner; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; @@ -51,7 +50,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -62,7 +63,7 @@ import com.github.rholder.retry.RetryException; /** * task scheduler thread */ -public class TaskExecuteThread implements Runnable { +public class TaskExecuteThread implements Runnable, Delayed { /** * logger @@ -132,7 +133,6 @@ public class TaskExecuteThread implements Runnable { // task node TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class); - delayExecutionIfNeeded(); if (taskExecutionContext.getStartTime() == null) { taskExecutionContext.setStartTime(new Date()); } @@ -289,24 +289,6 @@ public class TaskExecuteThread implements Runnable { } } - /** - * delay execution if needed. - */ - private void delayExecutionIfNeeded() { - long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), - taskExecutionContext.getDelayTime() * 60L); - logger.info("delay execution time: {} s", remainTime < 0 ? 0 : remainTime); - if (remainTime > 0) { - try { - Thread.sleep(remainTime * Constants.SLEEP_TIME_MILLIS); - } catch (Exception e) { - logger.error("delay task execution failure, the task will be executed directly. process instance id:{}, task instance id:{}", - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId()); - } - } - } - /** * send an ack to change the status of the task. */ @@ -343,4 +325,26 @@ public class TaskExecuteThread implements Runnable { } return ackCommand; } -} \ No newline at end of file + + /** + * get current TaskExecutionContext + * @return TaskExecutionContext + */ + public TaskExecutionContext getTaskExecutionContext() { + return this.taskExecutionContext; + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), + taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS); + } + + @Override + public int compareTo(Delayed o) { + if (o == null) { + return 1; + } + return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java new file mode 100644 index 0000000000..073c9488ae --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import org.apache.dolphinscheduler.common.enums.Event; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; +import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import java.util.concurrent.DelayQueue; +import java.util.concurrent.ExecutorService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * Manage tasks + */ +@Component +public class WorkerManagerThread implements Runnable { + + private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class); + + /** + * task queue + */ + private final DelayQueue workerExecuteQueue = new DelayQueue<>(); + + /** + * worker config + */ + private final WorkerConfig workerConfig; + + /** + * thread executor service + */ + private final ExecutorService workerExecService; + + /** + * taskExecutionContextCacheManager + */ + private TaskExecutionContextCacheManager taskExecutionContextCacheManager; + + /** + * task callback service + */ + private final TaskCallbackService taskCallbackService; + + public WorkerManagerThread() { + this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); + this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); + this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads()); + this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); + } + + /** + * get queue size + * + * @return queue size + */ + public int getQueueSize() { + return workerExecuteQueue.size(); + } + + /** + * Kill tasks that have not been executed, like delay task + * then send Response to Master, update the execution status of task instance + */ + public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) { + workerExecuteQueue.stream() + .filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == taskInstanceId) + .forEach(workerExecuteQueue::remove); + sendTaskKillResponse(taskInstanceId); + } + + /** + * kill task before execute , like delay task + */ + private void sendTaskKillResponse(Integer taskInstanceId) { + TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); + if (taskExecutionContext == null) { + return; + } + TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId()); + responseCommand.setStatus(ExecutionStatus.KILL.getCode()); + ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT); + taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); + } + + /** + * submit task + * + * @param taskExecuteThread taskExecuteThread + * @return submit result + */ + public boolean offer(TaskExecuteThread taskExecuteThread) { + return workerExecuteQueue.offer(taskExecuteThread); + } + + public void start() { + Thread thread = new Thread(this, this.getClass().getName()); + thread.start(); + } + + @Override + public void run() { + Thread.currentThread().setName("Worker-Execute-Manager-Thread"); + TaskExecuteThread taskExecuteThread; + while (Stopper.isRunning()) { + try { + taskExecuteThread = workerExecuteQueue.take(); + workerExecService.submit(taskExecuteThread); + } catch (Exception e) { + logger.error("An unexpected interrupt is happened, " + + "the exception will be ignored and this thread will continue to run", e); + } + } + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index 5a5561d1bd..bdd723a4cd 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; +import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; @@ -79,7 +80,8 @@ import io.netty.channel.Channel; TaskResponseProcessor.class, TaskExecuteProcessor.class, CuratorZookeeperClient.class, - TaskExecutionContextCacheManagerImpl.class}) + TaskExecutionContextCacheManagerImpl.class, + WorkerManagerThread.class}) public class TaskCallbackServiceTest { @Autowired diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java new file mode 100644 index 0000000000..fef21519d3 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.processor; + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; +import org.apache.dolphinscheduler.remote.utils.ChannelUtils; +import org.apache.dolphinscheduler.remote.utils.JsonSerializer; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; +import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; +import org.apache.dolphinscheduler.service.alert.AlertClientService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import java.util.Date; +import java.util.concurrent.ExecutorService; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * test task execute processor + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class, + JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class}) +public class TaskExecuteProcessorTest { + + private TaskExecutionContext taskExecutionContext; + + private TaskCallbackService taskCallbackService; + + private ExecutorService workerExecService; + + private WorkerConfig workerConfig; + + private Command command; + + private Command ackCommand; + + private TaskExecuteRequestCommand taskRequestCommand; + + private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager; + + private AlertClientService alertClientService; + + private WorkerManagerThread workerManager; + + @Before + public void before() throws Exception { + // init task execution context + taskExecutionContext = getTaskExecutionContext(); + workerConfig = new WorkerConfig(); + workerConfig.setWorkerExecThreads(1); + workerConfig.setListenPort(1234); + command = new Command(); + command.setType(CommandType.TASK_EXECUTE_REQUEST); + ackCommand = new TaskExecuteAckCommand().convert2Command(); + taskRequestCommand = new TaskExecuteRequestCommand(); + alertClientService = PowerMockito.mock(AlertClientService.class); + workerExecService = PowerMockito.mock(ExecutorService.class); + PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class))) + .thenReturn(null); + + PowerMockito.mockStatic(ChannelUtils.class); + PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null); + + taskExecutionContextCacheManager = PowerMockito.mock(TaskExecutionContextCacheManagerImpl.class); + taskCallbackService = PowerMockito.mock(TaskCallbackService.class); + PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand); + + PowerMockito.mockStatic(SpringApplicationContext.class); + PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class)) + .thenReturn(taskCallbackService); + PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)) + .thenReturn(workerConfig); + PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)) + .thenReturn(null); + PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)) + .thenReturn(taskExecutionContextCacheManager); + + Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId())); + + workerManager = PowerMockito.mock(WorkerManagerThread.class); + PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService))).thenReturn(Boolean.TRUE); + + PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class)) + .thenReturn(workerManager); + + PowerMockito.mockStatic(ThreadUtils.class); + PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads())) + .thenReturn(workerExecService); + + PowerMockito.mockStatic(JsonSerializer.class); + PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskExecuteRequestCommand.class)) + .thenReturn(taskRequestCommand); + + PowerMockito.mockStatic(JSONUtils.class); + PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskExecuteRequestCommand.class)) + .thenReturn(taskRequestCommand); + PowerMockito.when(JSONUtils.parseObject(taskRequestCommand.getTaskExecutionContext(), TaskExecutionContext.class)) + .thenReturn(taskExecutionContext); + + PowerMockito.mockStatic(FileUtils.class); + PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(), + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId())) + .thenReturn(taskExecutionContext.getExecutePath()); + PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath()); + + SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(null, null, null, alertClientService); + PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments() + .thenReturn(simpleTaskExecuteThread); + } + + @Test + public void testNormalExecution() { + TaskExecuteProcessor processor = new TaskExecuteProcessor(); + processor.process(null, command); + + Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskExecutionContext.getCurrentExecutionStatus()); + } + + @Test + public void testDelayExecution() { + taskExecutionContext.setDelayTime(1); + TaskExecuteProcessor processor = new TaskExecuteProcessor(); + processor.process(null, command); + + Assert.assertEquals(ExecutionStatus.DELAY_EXECUTION, taskExecutionContext.getCurrentExecutionStatus()); + } + + public TaskExecutionContext getTaskExecutionContext() { + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setProcessId(12345); + taskExecutionContext.setProcessDefineId(1); + taskExecutionContext.setProcessInstanceId(1); + taskExecutionContext.setTaskInstanceId(1); + taskExecutionContext.setTaskType("sql"); + taskExecutionContext.setFirstSubmitTime(new Date()); + taskExecutionContext.setDelayTime(0); + taskExecutionContext.setLogPath("/tmp/test.log"); + taskExecutionContext.setHost("localhost"); + taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4"); + return taskExecutionContext; + } + + private static class SimpleTaskExecuteThread extends TaskExecuteThread { + + public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService, Logger taskLogger, AlertClientService alertClientService) { + super(taskExecutionContext, taskCallbackService, taskLogger, alertClientService); + } + + @Override + public void run() { + // + } + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java new file mode 100644 index 0000000000..c6b0493e1f --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.CommonUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; +import org.apache.dolphinscheduler.server.worker.task.AbstractTask; +import org.apache.dolphinscheduler.server.worker.task.TaskManager; +import org.apache.dolphinscheduler.service.alert.AlertClientService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import java.util.Collections; +import java.util.Date; +import java.util.List; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * test worker manager thread. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({ + Stopper.class, + TaskManager.class, + JSONUtils.class, + CommonUtils.class, + SpringApplicationContext.class, + OSUtils.class}) +public class WorkerManagerThreadTest { + + private TaskCallbackService taskCallbackService; + + private WorkerManagerThread workerManager; + + private TaskExecutionContext taskExecutionContext; + + private AlertClientService alertClientService; + + private Logger taskLogger; + + @Before + public void before() { + // init task execution context, logger + taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setProcessId(12345); + taskExecutionContext.setProcessDefineId(1); + taskExecutionContext.setProcessInstanceId(1); + taskExecutionContext.setTaskInstanceId(1); + taskExecutionContext.setTenantCode("test"); + taskExecutionContext.setTaskType(""); + taskExecutionContext.setFirstSubmitTime(new Date()); + taskExecutionContext.setDelayTime(0); + taskExecutionContext.setLogPath("/tmp/test.log"); + taskExecutionContext.setHost("localhost"); + taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4"); + + Command ackCommand = new TaskExecuteAckCommand().convert2Command(); + Command responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId()).convert2Command(); + + taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId( + LoggerUtils.TASK_LOGGER_INFO_PREFIX, + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId() + )); + + TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl(); + taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); + + alertClientService = PowerMockito.mock(AlertClientService.class); + WorkerConfig workerConfig = PowerMockito.mock(WorkerConfig.class); + taskCallbackService = PowerMockito.mock(TaskCallbackService.class); + PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand); + PowerMockito.doNothing().when(taskCallbackService).sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand); + PowerMockito.mockStatic(SpringApplicationContext.class); + PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)) + .thenReturn(taskExecutionContextCacheManager); + PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)) + .thenReturn(workerConfig); + PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class)) + .thenReturn(taskCallbackService); + PowerMockito.when(workerConfig.getWorkerExecThreads()).thenReturn(5); + workerManager = new WorkerManagerThread(); + + PowerMockito.mockStatic(TaskManager.class); + PowerMockito.when(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService)) + .thenReturn(new SimpleTask(taskExecutionContext, taskLogger)); + PowerMockito.mockStatic(JSONUtils.class); + PowerMockito.when(JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class)) + .thenReturn(new TaskNode()); + PowerMockito.mockStatic(CommonUtils.class); + PowerMockito.when(CommonUtils.getSystemEnvPath()).thenReturn("/user_home/.bash_profile"); + List osUserList = Collections.singletonList("test"); + PowerMockito.mockStatic(OSUtils.class); + PowerMockito.when(OSUtils.getUserList()).thenReturn(osUserList); + PowerMockito.mockStatic(Stopper.class); + PowerMockito.when(Stopper.isRunning()).thenReturn(true, false); + } + + @Test + public void testSendTaskKillResponse() { + TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService); + workerManager.offer(taskExecuteThread); + Assert.assertEquals(1, workerManager.getQueueSize()); + workerManager.killTaskBeforeExecuteByInstanceId(1); + Assert.assertEquals(0, workerManager.getQueueSize()); + } + + @Test + public void testRun() { + TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService); + workerManager.offer(taskExecuteThread); + Assert.assertEquals(1, workerManager.getQueueSize()); + workerManager.run(); + Assert.assertEquals(0, workerManager.getQueueSize()); + } + + private static class SimpleTask extends AbstractTask { + + protected SimpleTask(TaskExecutionContext taskExecutionContext, Logger logger) { + super(taskExecutionContext, logger); + // pid + this.processId = taskExecutionContext.getProcessId(); + } + + @Override + public AbstractParameters getParameters() { + return null; + } + + @Override + public void init() { + + } + + @Override + public void handle() { + + } + + @Override + public void after() { + + } + + @Override + public ExecutionStatus getExitStatus() { + return ExecutionStatus.SUCCESS; + } + } +} diff --git a/pom.xml b/pom.xml index c0974a53de..c6880c8b8d 100644 --- a/pom.xml +++ b/pom.xml @@ -913,6 +913,7 @@ **/server/utils/ProcessUtilsTest.java **/server/utils/SparkArgsUtilsTest.java **/server/worker/processor/TaskCallbackServiceTest.java + **/server/worker/processor/TaskExecuteProcessorTest.java **/server/worker/registry/WorkerRegistryTest.java **/server/worker/shell/ShellCommandExecutorTest.java **/server/worker/sql/SqlExecutorTest.java @@ -926,6 +927,7 @@ **/server/worker/task/TaskManagerTest.java **/server/worker/EnvFileTest.java **/server/worker/runner/TaskExecuteThreadTest.java + **/server/worker/runner/WorkerManagerThreadTest.java **/service/quartz/cron/CronUtilsTest.java **/service/process/ProcessServiceTest.java **/service/zk/DefaultEnsembleProviderTest.java