Browse Source

Merge remote-tracking branch 'upstream/dev' into dev-merge

# Conflicts:
#	dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
pull/3/MERGE
lenboo 4 years ago
parent
commit
fd76f172c1
  1. 8
      .github/workflows/ci_ut.yml
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
  3. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
  4. 40
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/shell/AbstractShell.java
  5. 60
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
  6. 27
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
  7. 103
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
  8. 6
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java
  9. 22
      dolphinscheduler-dist/src/main/assembly/dolphinscheduler-nginx.xml
  10. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java
  11. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
  12. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java
  13. 45
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java
  14. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
  15. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
  16. 14
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
  17. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
  18. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  19. 56
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  20. 36
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  21. 24
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
  22. 56
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
  23. 4
      pom.xml
  24. 2
      sql/dolphinscheduler_mysql.sql

8
.github/workflows/ci_ut.yml

@ -67,10 +67,14 @@ jobs:
- name: Upload coverage report to codecov - name: Upload coverage report to codecov
run: | run: |
CODECOV_TOKEN="09c2663f-b091-4258-8a47-c981827eb29a" bash <(curl -s https://codecov.io/bash) CODECOV_TOKEN="09c2663f-b091-4258-8a47-c981827eb29a" bash <(curl -s https://codecov.io/bash)
# Set up JDK 11 for SonarCloud.
- name: Set up JDK 1.11
uses: actions/setup-java@v1
with:
java-version: 1.11
- name: Run SonarCloud Analysis - name: Run SonarCloud Analysis
run: > run: >
mvn verify --batch-mode mvn --batch-mode verify sonar:sonar
org.sonarsource.scanner.maven:sonar-maven-plugin:3.6.1.1688:sonar
-Dsonar.coverage.jacoco.xmlReportPaths=target/site/jacoco/jacoco.xml -Dsonar.coverage.jacoco.xmlReportPaths=target/site/jacoco/jacoco.xml
-Dmaven.test.skip=true -Dmaven.test.skip=true
-Dsonar.host.url=https://sonarcloud.io -Dsonar.host.url=https://sonarcloud.io

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java

@ -63,7 +63,7 @@ public class ResourcesController extends BaseController {
private UdfFuncService udfFuncService; private UdfFuncService udfFuncService;
/** /**
* create resource * create directory
* *
* @param loginUser login user * @param loginUser login user
* @param alias alias * @param alias alias

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java

@ -115,7 +115,7 @@ public class CheckUtils {
* *
* @param parameter parameter * @param parameter parameter
* @param taskType task type * @param taskType task type
* @return true if taks node parameters are valid, otherwise return false * @return true if task node parameters are valid, otherwise return false
*/ */
public static boolean checkTaskNodeParameters(String parameter, String taskType) { public static boolean checkTaskNodeParameters(String parameter, String taskType) {
AbstractParameters abstractParameters = TaskParametersUtils.getParameters(taskType, parameter); AbstractParameters abstractParameters = TaskParametersUtils.getParameters(taskType, parameter);

40
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/shell/AbstractShell.java

@ -16,9 +16,6 @@
*/ */
package org.apache.dolphinscheduler.common.shell; package org.apache.dolphinscheduler.common.shell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -30,6 +27,9 @@ import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* A base class for running a Unix command. * A base class for running a Unix command.
@ -128,7 +128,7 @@ public abstract class AbstractShell {
/** /**
* Run a command actual work * Run a command actual work
*/ */
private void runCommand() throws IOException { private void runCommand() throws IOException {
ProcessBuilder builder = new ProcessBuilder(getExecString()); ProcessBuilder builder = new ProcessBuilder(getExecString());
Timer timeOutTimer = null; Timer timeOutTimer = null;
ShellTimeoutTimerTask timeoutTimerTask = null; ShellTimeoutTimerTask timeoutTimerTask = null;
@ -153,11 +153,11 @@ public abstract class AbstractShell {
timeOutTimer.schedule(timeoutTimerTask, timeOutInterval); timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
} }
final BufferedReader errReader = final BufferedReader errReader =
new BufferedReader(new InputStreamReader(process new BufferedReader(
.getErrorStream())); new InputStreamReader(process.getErrorStream()));
BufferedReader inReader = BufferedReader inReader =
new BufferedReader(new InputStreamReader(process new BufferedReader(
.getInputStream())); new InputStreamReader(process.getInputStream()));
final StringBuilder errMsg = new StringBuilder(); final StringBuilder errMsg = new StringBuilder();
// read error and input streams as this would free up the buffers // read error and input streams as this would free up the buffers
@ -177,23 +177,35 @@ public abstract class AbstractShell {
} }
} }
}; };
Thread inThread = new Thread() {
@Override
public void run() {
try {
parseExecResult(inReader);
} catch (IOException ioe) {
logger.warn("Error reading the in stream", ioe);
}
super.run();
}
};
try { try {
errThread.start(); errThread.start();
inThread.start();
} catch (IllegalStateException ise) { } } catch (IllegalStateException ise) { }
try { try {
// parse the output // parse the output
parseExecResult(inReader); exitCode = process.waitFor();
exitCode = process.waitFor();
try { try {
// make sure that the error thread exits // make sure that the error and in thread exits
errThread.join(); errThread.join();
inThread.join();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
logger.warn("Interrupted while reading the error stream", ie); logger.warn("Interrupted while reading the error and in stream", ie);
} }
completed.set(true); completed.set(true);
//the timeout thread handling //the timeout thread handling
//taken care in finally block //taken care in finally block
if (exitCode != 0) { if (exitCode != 0 || errMsg.length() > 0) {
throw new ExitCodeException(exitCode, errMsg.toString()); throw new ExitCodeException(exitCode, errMsg.toString());
} }
} catch (InterruptedException ie) { } catch (InterruptedException ie) {

60
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java

@ -16,18 +16,32 @@
*/ */
package org.apache.dolphinscheduler.common.utils; package org.apache.dolphinscheduler.common.utils;
import static org.apache.dolphinscheduler.common.Constants.DATA_BASEDIR_PATH;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXS;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXS_DEFAULT_VALUE;
import static org.apache.dolphinscheduler.common.Constants.YYYYMMDDHHMMSS;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.nio.charset.UnsupportedCharsetException;
import java.util.Optional;
import org.apache.commons.io.Charsets; import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.charset.Charset;
import java.nio.charset.UnsupportedCharsetException;
import static org.apache.dolphinscheduler.common.Constants.*;
/** /**
* file utils * file utils
*/ */
@ -36,6 +50,8 @@ public class FileUtils {
public static final String DATA_BASEDIR = PropertyUtils.getString(DATA_BASEDIR_PATH,"/tmp/dolphinscheduler"); public static final String DATA_BASEDIR = PropertyUtils.getString(DATA_BASEDIR_PATH,"/tmp/dolphinscheduler");
public static final ThreadLocal<Logger> taskLoggerThreadLocal = new ThreadLocal<>();
/** /**
* get file suffix * get file suffix
* *
@ -118,7 +134,7 @@ public class FileUtils {
String fileName = String.format("%s/exec/process/%s/%s/%s", DATA_BASEDIR, Integer.toString(projectId), String fileName = String.format("%s/exec/process/%s/%s/%s", DATA_BASEDIR, Integer.toString(projectId),
Integer.toString(processDefineId), Integer.toString(processInstanceId)); Integer.toString(processDefineId), Integer.toString(processInstanceId));
File file = new File(fileName); File file = new File(fileName);
if (!file.getParentFile().exists()){ if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs(); file.getParentFile().mkdirs();
} }
@ -138,24 +154,40 @@ public class FileUtils {
* @param userName user name * @param userName user name
* @throws IOException errors * @throws IOException errors
*/ */
public static void createWorkDirAndUserIfAbsent(String execLocalPath, String userName) throws IOException{ public static void createWorkDirAndUserIfAbsent(String execLocalPath, String userName) throws IOException {
//if work dir exists, first delete //if work dir exists, first delete
File execLocalPathFile = new File(execLocalPath); File execLocalPathFile = new File(execLocalPath);
if (execLocalPathFile.exists()){ if (execLocalPathFile.exists()) {
org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile); org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile);
} }
//create work dir //create work dir
org.apache.commons.io.FileUtils.forceMkdir(execLocalPathFile); org.apache.commons.io.FileUtils.forceMkdir(execLocalPathFile);
logger.info("create dir success {}" , execLocalPath); String mkdirLog = "create dir success " + execLocalPath;
LoggerUtils.logInfo(Optional.ofNullable(logger), mkdirLog);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), mkdirLog);
//if not exists this user,then create //if not exists this user,then create
if (!OSUtils.getUserList().contains(userName)){ OSUtils.taskLoggerThreadLocal.set(taskLoggerThreadLocal.get());
OSUtils.createUser(userName); try {
if (!OSUtils.getUserList().contains(userName)) {
boolean isSuccessCreateUser = OSUtils.createUser(userName);
String infoLog;
if (isSuccessCreateUser) {
infoLog = String.format("create user name success %s", userName);
} else {
infoLog = String.format("create user name fail %s", userName);
}
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog);
}
} catch (Throwable e) {
LoggerUtils.logError(Optional.ofNullable(logger), e);
LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), e);
} }
logger.info("create user name success {}", userName); OSUtils.taskLoggerThreadLocal.remove();
} }

27
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java

@ -16,14 +16,15 @@
*/ */
package org.apache.dolphinscheduler.common.utils; package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.slf4j.Logger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.dolphinscheduler.common.Constants;
import org.slf4j.Logger;
/** /**
* logger utils * logger utils
*/ */
@ -93,4 +94,24 @@ public class LoggerUtils {
} }
return appIds; return appIds;
} }
public static void logError(Optional<Logger> optionalLogger
, String error) {
optionalLogger.ifPresent((Logger logger) -> logger.error(error));
}
public static void logError(Optional<Logger> optionalLogger
, Throwable e) {
optionalLogger.ifPresent((Logger logger) -> logger.error(e.getMessage(), e));
}
public static void logError(Optional<Logger> optionalLogger
, String error, Throwable e) {
optionalLogger.ifPresent((Logger logger) -> logger.error(error, e));
}
public static void logInfo(Optional<Logger> optionalLogger
, String info) {
optionalLogger.ifPresent((Logger logger) -> logger.info(info));
}
} }

103
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java

@ -16,16 +16,6 @@
*/ */
package org.apache.dolphinscheduler.common.utils; package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.shell.ShellExecutor;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import oshi.SystemInfo;
import oshi.hardware.CentralProcessor;
import oshi.hardware.GlobalMemory;
import oshi.hardware.HardwareAbstractionLayer;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
@ -40,8 +30,21 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.StringTokenizer;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.commons.configuration.Configuration;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.shell.ShellExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import oshi.SystemInfo;
import oshi.hardware.CentralProcessor;
import oshi.hardware.GlobalMemory;
import oshi.hardware.HardwareAbstractionLayer;
/** /**
* os utils * os utils
* *
@ -50,6 +53,8 @@ public class OSUtils {
private static final Logger logger = LoggerFactory.getLogger(OSUtils.class); private static final Logger logger = LoggerFactory.getLogger(OSUtils.class);
public static final ThreadLocal<Logger> taskLoggerThreadLocal = new ThreadLocal<>();
private static final SystemInfo SI = new SystemInfo(); private static final SystemInfo SI = new SystemInfo();
public static final String TWO_DECIMAL = "0.00"; public static final String TWO_DECIMAL = "0.00";
@ -251,7 +256,9 @@ public class OSUtils {
try { try {
String userGroup = OSUtils.getGroup(); String userGroup = OSUtils.getGroup();
if (StringUtils.isEmpty(userGroup)) { if (StringUtils.isEmpty(userGroup)) {
logger.error("{} group does not exist for this operating system.", userGroup); String errorLog = String.format("%s group does not exist for this operating system.", userGroup);
LoggerUtils.logError(Optional.ofNullable(logger), errorLog);
LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), errorLog);
return false; return false;
} }
if (isMacOS()) { if (isMacOS()) {
@ -263,7 +270,8 @@ public class OSUtils {
} }
return true; return true;
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); LoggerUtils.logError(Optional.ofNullable(logger), e);
LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), e);
} }
return false; return false;
@ -276,10 +284,14 @@ public class OSUtils {
* @throws IOException in case of an I/O error * @throws IOException in case of an I/O error
*/ */
private static void createLinuxUser(String userName, String userGroup) throws IOException { private static void createLinuxUser(String userName, String userGroup) throws IOException {
logger.info("create linux os user : {}", userName); String infoLog1 = String.format("create linux os user : %s", userName);
String cmd = String.format("sudo useradd -g %s %s", userGroup, userName); LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog1);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog1);
logger.info("execute cmd : {}", cmd); String cmd = String.format("sudo useradd -g %s %s", userGroup, userName);
String infoLog2 = String.format("execute cmd : %s", cmd);
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog2);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog2);
OSUtils.exeCmd(cmd); OSUtils.exeCmd(cmd);
} }
@ -290,13 +302,24 @@ public class OSUtils {
* @throws IOException in case of an I/O error * @throws IOException in case of an I/O error
*/ */
private static void createMacUser(String userName, String userGroup) throws IOException { private static void createMacUser(String userName, String userGroup) throws IOException {
logger.info("create mac os user : {}", userName);
String userCreateCmd = String.format("sudo sysadminctl -addUser %s -password %s", userName, userName);
String appendGroupCmd = String.format("sudo dseditgroup -o edit -a %s -t user %s", userName, userGroup);
logger.info("create user command : {}", userCreateCmd); Optional<Logger> optionalLogger = Optional.ofNullable(logger);
OSUtils.exeCmd(userCreateCmd); Optional<Logger> optionalTaskLogger = Optional.ofNullable(taskLoggerThreadLocal.get());
logger.info("append user to group : {}", appendGroupCmd);
String infoLog1 = String.format("create mac os user : %s", userName);
LoggerUtils.logInfo(optionalLogger, infoLog1);
LoggerUtils.logInfo(optionalTaskLogger, infoLog1);
String createUserCmd = String.format("sudo sysadminctl -addUser %s -password %s", userName, userName);
String infoLog2 = String.format("create user command : %s", createUserCmd);
LoggerUtils.logInfo(optionalLogger, infoLog2);
LoggerUtils.logInfo(optionalTaskLogger, infoLog2);
OSUtils.exeCmd(createUserCmd);
String appendGroupCmd = String.format("sudo dseditgroup -o edit -a %s -t user %s", userName, userGroup);
String infoLog3 = String.format("append user to group : %s", appendGroupCmd);
LoggerUtils.logInfo(optionalLogger, infoLog3);
LoggerUtils.logInfo(optionalTaskLogger, infoLog3);
OSUtils.exeCmd(appendGroupCmd); OSUtils.exeCmd(appendGroupCmd);
} }
@ -307,14 +330,20 @@ public class OSUtils {
* @throws IOException in case of an I/O error * @throws IOException in case of an I/O error
*/ */
private static void createWindowsUser(String userName, String userGroup) throws IOException { private static void createWindowsUser(String userName, String userGroup) throws IOException {
logger.info("create windows os user : {}", userName); String infoLog1 = String.format("create windows os user : %s", userName);
String userCreateCmd = String.format("net user \"%s\" /add", userName); LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog1);
String appendGroupCmd = String.format("net localgroup \"%s\" \"%s\" /add", userGroup, userName); LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog1);
logger.info("execute create user command : {}", userCreateCmd); String userCreateCmd = String.format("net user \"%s\" /add", userName);
String infoLog2 = String.format("execute create user command : %s", userCreateCmd);
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog2);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog2);
OSUtils.exeCmd(userCreateCmd); OSUtils.exeCmd(userCreateCmd);
logger.info("execute append user to group : {}", appendGroupCmd); String appendGroupCmd = String.format("net localgroup \"%s\" \"%s\" /add", userGroup, userName);
String infoLog3 = String.format("execute append user to group : %s", appendGroupCmd);
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog3);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog3);
OSUtils.exeCmd(appendGroupCmd); OSUtils.exeCmd(appendGroupCmd);
} }
@ -353,22 +382,12 @@ public class OSUtils {
* @throws IOException errors * @throws IOException errors
*/ */
public static String exeCmd(String command) throws IOException { public static String exeCmd(String command) throws IOException {
BufferedReader br = null; StringTokenizer st = new StringTokenizer(command);
String[] cmdArray = new String[st.countTokens()];
try { for (int i = 0; st.hasMoreTokens(); i++) {
Process p = Runtime.getRuntime().exec(command); cmdArray[i] = st.nextToken();
br = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line;
StringBuilder sb = new StringBuilder();
while ((line = br.readLine()) != null) {
sb.append(line + "\n");
}
return sb.toString();
} finally {
IOUtils.closeQuietly(br);
} }
return exeShell(cmdArray);
} }
/** /**
@ -377,7 +396,7 @@ public class OSUtils {
* @return result of execute the shell * @return result of execute the shell
* @throws IOException errors * @throws IOException errors
*/ */
public static String exeShell(String command) throws IOException { public static String exeShell(String[] command) throws IOException {
return ShellExecutor.execCommand(command); return ShellExecutor.execCommand(command);
} }

6
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java

@ -68,7 +68,11 @@ public class OSUtilsTest {
@Test @Test
public void createUser() { public void createUser() {
boolean result = OSUtils.createUser("test123"); boolean result = OSUtils.createUser("test123");
Assert.assertTrue(result); if (result) {
Assert.assertTrue("create user test123 success", true);
} else {
Assert.assertTrue("create user test123 fail", true);
}
} }
@Test @Test

22
dolphinscheduler-dist/src/main/assembly/dolphinscheduler-nginx.xml vendored

@ -98,25 +98,24 @@
</includes> </includes>
<outputDirectory>conf</outputDirectory> <outputDirectory>conf</outputDirectory>
</fileSet> </fileSet>
<fileSet> <fileSet>
<directory>${basedir}/../dolphinscheduler-common/src/main/resources</directory> <directory>${basedir}/../dolphinscheduler-dao/src/main/resources</directory>
<includes> <includes>
<include>**/*.properties</include> <include>**/*.properties</include>
<include>**/*.xml</include> <include>**/*.xml</include>
<include>**/*.json</include> <include>**/*.json</include>
<include>**/*.yml</include>
</includes> </includes>
<outputDirectory>conf</outputDirectory> <outputDirectory>conf</outputDirectory>
</fileSet> </fileSet>
<!--server end-->
<!--service end-->
<fileSet> <fileSet>
<directory>${basedir}/../dolphinscheduler-common/src/main/resources/bin</directory> <directory>${basedir}/../dolphinscheduler-service/src/main/resources</directory>
<includes>
<include>*.*</include>
</includes>
<directoryMode>755</directoryMode>
<outputDirectory>bin</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-dao/src/main/resources</directory>
<includes> <includes>
<include>**/*.properties</include> <include>**/*.properties</include>
<include>**/*.xml</include> <include>**/*.xml</include>
@ -125,7 +124,8 @@
</includes> </includes>
<outputDirectory>conf</outputDirectory> <outputDirectory>conf</outputDirectory>
</fileSet> </fileSet>
<!--server end--> <!--service end-->
<fileSet> <fileSet>
<directory>${basedir}/../dolphinscheduler-server/target/dolphinscheduler-server-${project.version}</directory> <directory>${basedir}/../dolphinscheduler-server/target/dolphinscheduler-server-${project.version}</directory>

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java

@ -34,7 +34,7 @@ public class TaskInfo implements Serializable{
/** /**
* taks name * task name
*/ */
private String taskName; private String taskName;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java

@ -39,7 +39,7 @@ public class TaskExecutionContext implements Serializable{
/** /**
* taks name * task name
*/ */
private String taskName; private String taskName;

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java

@ -16,11 +16,14 @@
*/ */
package org.apache.dolphinscheduler.server.log; package org.apache.dolphinscheduler.server.log;
import static org.apache.dolphinscheduler.common.utils.LoggerUtils.TASK_APPID_LOG_FORMAT;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.filter.Filter; import ch.qos.logback.core.filter.Filter;
import ch.qos.logback.core.spi.FilterReply; import ch.qos.logback.core.spi.FilterReply;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
/** /**
* task log filter * task log filter
@ -43,7 +46,9 @@ public class TaskLogFilter extends Filter<ILoggingEvent> {
*/ */
@Override @Override
public FilterReply decide(ILoggingEvent event) { public FilterReply decide(ILoggingEvent event) {
if (event.getThreadName().startsWith(LoggerUtils.TASK_LOGGER_THREAD_NAME) || event.getLevel().isGreaterOrEqual(level)) { if (event.getThreadName().startsWith(LoggerUtils.TASK_LOGGER_THREAD_NAME)
|| event.getLoggerName().startsWith(" - " + TASK_APPID_LOG_FORMAT)
|| event.getLevel().isGreaterOrEqual(level)) {
return FilterReply.ACCEPT; return FilterReply.ACCEPT;
} }
return FilterReply.DENY; return FilterReply.DENY;

45
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.Collection;
/**
* AbstractSelector
*/
public abstract class AbstractSelector<T> implements Selector<T>{
@Override
public T select(Collection<T> source) {
if (CollectionUtils.isEmpty(source)) {
throw new IllegalArgumentException("Empty source.");
}
/**
* if only one , return directly
*/
if (source.size() == 1) {
return (T)source.toArray()[0];
}
return doSelect(source);
}
protected abstract T doSelect(Collection<T> source);
}

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java

@ -22,7 +22,7 @@ import java.util.Collection;
/** /**
* lower weight round robin * lower weight round robin
*/ */
public class LowerWeightRoundRobin implements Selector<HostWeight>{ public class LowerWeightRoundRobin extends AbstractSelector<HostWeight>{
/** /**
* select * select
@ -30,7 +30,7 @@ public class LowerWeightRoundRobin implements Selector<HostWeight>{
* @return HostWeight * @return HostWeight
*/ */
@Override @Override
public HostWeight select(Collection<HostWeight> sources){ public HostWeight doSelect(Collection<HostWeight> sources){
int totalWeight = 0; int totalWeight = 0;
int lowWeight = 0; int lowWeight = 0;
HostWeight lowerNode = null; HostWeight lowerNode = null;

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java

@ -24,23 +24,12 @@ import java.util.Random;
* random selector * random selector
* @param <T> T * @param <T> T
*/ */
public class RandomSelector<T> implements Selector<T> { public class RandomSelector<T> extends AbstractSelector<T> {
private final Random random = new Random(); private final Random random = new Random();
@Override @Override
public T select(final Collection<T> source) { public T doSelect(final Collection<T> source) {
if (source == null || source.size() == 0) {
throw new IllegalArgumentException("Empty source.");
}
/**
* if only one , return directly
*/
if (source.size() == 1) {
return (T) source.toArray()[0];
}
int size = source.size(); int size = source.size();
/** /**

14
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java

@ -26,22 +26,12 @@ import java.util.concurrent.atomic.AtomicInteger;
* @param <T> T * @param <T> T
*/ */
@Service @Service
public class RoundRobinSelector<T> implements Selector<T> { public class RoundRobinSelector<T> extends AbstractSelector<T> {
private final AtomicInteger index = new AtomicInteger(0); private final AtomicInteger index = new AtomicInteger(0);
@Override @Override
public T select(Collection<T> source) { public T doSelect(Collection<T> source) {
if (source == null || source.size() == 0) {
throw new IllegalArgumentException("Empty source.");
}
/**
* if only one , return directly
*/
if (source.size() == 1) {
return (T)source.toArray()[0];
}
int size = source.size(); int size = source.size();
/** /**

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java

@ -34,7 +34,7 @@ public class NettyRemoteChannel {
private final Channel channel; private final Channel channel;
/** /**
* equest unique identification * request unique identification
*/ */
private final long opaque; private final long opaque;

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.processor; package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -33,14 +32,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
/** /**
* taks callback service * task callback service
*/ */
@Service @Service
public class TaskCallbackService { public class TaskCallbackService {

56
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -17,15 +17,22 @@
package org.apache.dolphinscheduler.server.worker.processor; package org.apache.dolphinscheduler.server.worker.processor;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender; import java.util.Date;
import com.github.rholder.retry.RetryException; import java.util.Optional;
import io.netty.channel.Channel; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.RetryerUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@ -40,9 +47,11 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Date; import com.github.rholder.retry.RetryException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import io.netty.channel.Channel;
/** /**
* worker request processor * worker request processor
@ -51,7 +60,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class); private final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
/** /**
* thread executor service * thread executor service
*/ */
@ -83,33 +91,53 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
logger.info("received command : {}", taskRequestCommand); logger.info("received command : {}", taskRequestCommand);
String contextJson = taskRequestCommand.getTaskExecutionContext(); if(taskRequestCommand == null){
logger.error("task execute request command is null");
return;
}
String contextJson = taskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class); TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
if(taskExecutionContext == null){
logger.error("task execution context is null");
return;
}
taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort()); taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort());
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
// local execute path // local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext); String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {} ", execLocalPath); logger.info("task instance local execute path : {} ", execLocalPath);
FileUtils.taskLoggerThreadLocal.set(taskLogger);
try { try {
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode()); FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode());
} catch (Exception ex){ } catch (Throwable ex) {
logger.error(String.format("create execLocalPath : %s", execLocalPath), ex); String errorLog = String.format("create execLocalPath : %s", execLocalPath);
LoggerUtils.logError(Optional.ofNullable(logger), errorLog, ex);
LoggerUtils.logError(Optional.ofNullable(taskLogger), errorLog, ex);
} }
FileUtils.taskLoggerThreadLocal.remove();
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque())); new NettyRemoteChannel(channel, command.getOpaque()));
// tell master that task is in executing // tell master that task is in executing
final Command ackCommand = buildAckCommand(taskExecutionContext).convert2Command(); final Command ackCommand = buildAckCommand(taskExecutionContext).convert2Command();
try { try {
RetryerUtils.retryCall(() -> { RetryerUtils.retryCall(() -> {
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(),ackCommand); taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(),ackCommand);
return Boolean.TRUE; return Boolean.TRUE;
}); });
// submit task // submit task
workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService)); workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger));
} catch (ExecutionException | RetryException e) { } catch (ExecutionException | RetryException e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} }

36
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -16,7 +16,12 @@
*/ */
package org.apache.dolphinscheduler.server.worker.runner; package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.utils.*; import java.io.File;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@ -24,6 +29,10 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
@ -35,10 +44,6 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.*;
import java.util.stream.Collectors;
/** /**
* task scheduler thread * task scheduler thread
@ -70,15 +75,23 @@ public class TaskExecuteThread implements Runnable {
*/ */
private TaskExecutionContextCacheManager taskExecutionContextCacheManager; private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
/**
* task logger
*/
private Logger taskLogger;
/** /**
* constructor * constructor
* @param taskExecutionContext taskExecutionContext * @param taskExecutionContext taskExecutionContext
* @param taskCallbackService taskCallbackService * @param taskCallbackService taskCallbackService
*/ */
public TaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService){ public TaskExecuteThread(TaskExecutionContext taskExecutionContext
, TaskCallbackService taskCallbackService
, Logger taskLogger) {
this.taskExecutionContext = taskExecutionContext; this.taskExecutionContext = taskExecutionContext;
this.taskCallbackService = taskCallbackService; this.taskCallbackService = taskCallbackService;
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
this.taskLogger = taskLogger;
} }
@Override @Override
@ -108,16 +121,7 @@ public class TaskExecuteThread implements Runnable {
taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId())); taskExecutionContext.getTaskInstanceId()));
// custom logger task = TaskManager.newTask(taskExecutionContext, taskLogger);
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
task = TaskManager.newTask(taskExecutionContext,
taskLogger);
// task init // task init
task.init(); task.init();

24
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -76,9 +76,9 @@ public class TaskPriorityQueueConsumerTest {
tenant.setCreateTime(new Date()); tenant.setCreateTime(new Date());
tenant.setUpdateTime(new Date()); tenant.setUpdateTime(new Date());
Mockito.when(processService.getTenantForProcess(1,2)).thenReturn(tenant); Mockito.doReturn(tenant).when(processService).getTenantForProcess(1,2);
Mockito.when(processService.queryUserQueueByProcessInstanceId(1)).thenReturn("default"); Mockito.doReturn("default").when(processService).queryUserQueueByProcessInstanceId(1);
} }
@ -105,7 +105,7 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setProjectId(1); processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition); taskInstance.setProcessDefine(processDefinition);
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance); Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
taskPriorityQueue.put("2_1_2_1_default"); taskPriorityQueue.put("2_1_2_1_default");
Thread.sleep(10000); Thread.sleep(10000);
@ -134,8 +134,7 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2); processDefinition.setUserId(2);
processDefinition.setProjectId(1); processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition); taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
taskPriorityQueue.put("2_1_2_1_default"); taskPriorityQueue.put("2_1_2_1_default");
DataSource dataSource = new DataSource(); DataSource dataSource = new DataSource();
@ -147,7 +146,7 @@ public class TaskPriorityQueueConsumerTest {
dataSource.setCreateTime(new Date()); dataSource.setCreateTime(new Date());
dataSource.setUpdateTime(new Date()); dataSource.setUpdateTime(new Date());
Mockito.when(processService.findDataSourceById(1)).thenReturn(dataSource); Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
Thread.sleep(10000); Thread.sleep(10000);
} }
@ -175,8 +174,7 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2); processDefinition.setUserId(2);
processDefinition.setProjectId(1); processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition); taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
taskPriorityQueue.put("2_1_2_1_default"); taskPriorityQueue.put("2_1_2_1_default");
@ -189,9 +187,7 @@ public class TaskPriorityQueueConsumerTest {
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}"); dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
dataSource.setCreateTime(new Date()); dataSource.setCreateTime(new Date());
dataSource.setUpdateTime(new Date()); dataSource.setUpdateTime(new Date());
Mockito.doReturn(dataSource).when(processService).findDataSourceById(80);
Mockito.when(processService.findDataSourceById(80)).thenReturn(dataSource);
Thread.sleep(10000); Thread.sleep(10000);
} }
@ -218,12 +214,10 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2); processDefinition.setUserId(2);
processDefinition.setProjectId(1); processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition); taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
taskPriorityQueue.put("2_1_2_1_default"); taskPriorityQueue.put("2_1_2_1_default");
DataSource dataSource = new DataSource(); DataSource dataSource = new DataSource();
dataSource.setId(1); dataSource.setId(1);
dataSource.setName("datax"); dataSource.setName("datax");
@ -251,7 +245,7 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setExecutorId(2); taskInstance.setExecutorId(2);
Mockito.when( processService.findTaskInstanceById(1)).thenReturn(taskInstance); Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
taskPriorityQueueConsumer.taskInstanceIsFinalState(1); taskPriorityQueueConsumer.taskInstanceIsFinalState(1);
} }

56
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java

@ -18,14 +18,17 @@ package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
@ -54,10 +57,23 @@ import java.util.Date;
* test task call back service * test task call back service
*/ */
@RunWith(SpringJUnit4ClassRunner.class) @RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, SpringApplicationContext.class, MasterRegistry.class, WorkerRegistry.class, @ContextConfiguration(classes={
ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, TaskCallbackServiceTestConfig.class,
ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class, SpringZKServer.class,
TaskResponseService.class, TaskAckProcessor.class,TaskResponseProcessor.class}) SpringApplicationContext.class,
MasterRegistry.class,
WorkerRegistry.class,
ZookeeperRegistryCenter.class,
MasterConfig.class,
WorkerConfig.class,
ZookeeperCachedOperator.class,
ZookeeperConfig.class,
ZookeeperNodeManager.class,
TaskCallbackService.class,
TaskResponseService.class,
TaskAckProcessor.class,
TaskResponseProcessor.class,
TaskExecuteProcessor.class})
public class TaskCallbackServiceTest { public class TaskCallbackServiceTest {
@Autowired @Autowired
@ -72,6 +88,9 @@ public class TaskCallbackServiceTest {
@Autowired @Autowired
private TaskResponseProcessor taskResponseProcessor; private TaskResponseProcessor taskResponseProcessor;
@Autowired
private TaskExecuteProcessor taskExecuteProcessor;
/** /**
* send ack test * send ack test
* @throws Exception * @throws Exception
@ -172,6 +191,35 @@ public class TaskCallbackServiceTest {
nettyRemotingClient.close(); nettyRemotingClient.close();
} }
@Test
public void testTaskExecuteProcessor() throws Exception{
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
nettyRemotingServer.start();
final NettyClientConfig clientConfig = new NettyClientConfig();
NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
TaskExecuteRequestCommand taskExecuteRequestCommand = new TaskExecuteRequestCommand();
nettyRemotingClient.send(new Host("localhost",30000),taskExecuteRequestCommand.convert2Command());
taskExecuteRequestCommand.setTaskExecutionContext(JSONUtils.toJsonString(new TaskExecutionContext()));
nettyRemotingClient.send(new Host("localhost",30000),taskExecuteRequestCommand.convert2Command());
Thread.sleep(5000);
Stopper.stop();
Thread.sleep(5000);
nettyRemotingServer.close();
nettyRemotingClient.close();
}
// @Test(expected = IllegalStateException.class) // @Test(expected = IllegalStateException.class)
// public void testSendAckWithIllegalStateException2(){ // public void testSendAckWithIllegalStateException2(){
// masterRegistry.registry(); // masterRegistry.registry();

4
pom.xml

@ -599,6 +599,10 @@
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId> <artifactId>maven-javadoc-plugin</artifactId>
<version>${maven-javadoc-plugin.version}</version> <version>${maven-javadoc-plugin.version}</version>
<configuration>
<source>8</source>
<failOnError>false</failOnError>
</configuration>
</plugin> </plugin>
<plugin> <plugin>

2
sql/dolphinscheduler_mysql.sql

@ -815,4 +815,4 @@ INSERT INTO `t_ds_relation_user_alertgroup` VALUES ('1', '1', '1', '2018-11-29 1
-- ---------------------------- -- ----------------------------
-- Records of t_ds_user -- Records of t_ds_user
-- ---------------------------- -- ----------------------------
INSERT INTO `t_ds_user` VALUES ('1', 'admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22', null); INSERT INTO `t_ds_user` VALUES ('1', 'admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', 'xx', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22', null,1);

Loading…
Cancel
Save