Browse Source

Refactor worker execute task process (#11540)

* Refactor worker execute task process
3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
1b120e3a59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
  2. 11
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
  3. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
  4. 23
      dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
  5. 48
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
  6. 95
      dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
  7. 14
      dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
  8. 17
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
  9. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
  10. 5
      dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
  11. 24
      dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
  12. 16
      dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
  13. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
  14. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
  15. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
  16. 9
      dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
  17. 5
      dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
  18. 21
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
  19. 19
      dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
  20. 15
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
  21. 11
      dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
  22. 5
      dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
  23. 128
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
  24. 27
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  25. 19
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java
  26. 60
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java
  27. 53
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java
  28. 364
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  29. 61
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java
  30. 59
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java
  31. 8
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
  32. 60
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
  33. 275
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
  34. 23
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java
  35. 50
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java
  36. 129
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java
  37. 171
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java
  38. 73
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java
  39. 87
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java

10
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java

@ -28,9 +28,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -103,12 +101,8 @@ public abstract class AbstractTask {
return null;
}
/**
* task handle
*
* @throws Exception exception
*/
public abstract void handle() throws Exception;
public abstract void handle() throws TaskException;
/**
* cancel application

11
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java

@ -20,8 +20,6 @@ package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
@ -51,7 +49,7 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
// SHELL task exit code
TaskResponse response = shellCommandExecutor.run(buildCommand());
@ -59,10 +57,15 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor {
// set appIds
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(response.getProcessId());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.info("The current yarn task has been interrupted", ex);
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
throw new TaskException("The current yarn task has been interrupted", ex);
} catch (Exception e) {
logger.error("yarn process failure", e);
exitStatusCode = -1;
throw e;
throw new TaskException("Execute task failed", e);
}
}

2
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java

@ -39,7 +39,7 @@ public abstract class AbstractK8sTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
TaskResponse response = abstractK8sTaskExecutor.run(buildCommand());
setExitStatusCode(response.getExitStatusCode());

23
dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java

@ -17,12 +17,12 @@
package org.apache.dolphinscheduler.plugin.task.chunjun;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.SystemUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
@ -32,9 +32,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.enums.Flag;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.SystemUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@ -48,6 +45,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
/**
* chunjun task
*/
@ -101,10 +101,10 @@ public class ChunJunTask extends AbstractTaskExecutor {
/**
* run chunjun process
*
* @throws Exception exception
* @throws TaskException exception
*/
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
@ -115,10 +115,15 @@ public class ChunJunTask extends AbstractTaskExecutor {
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("The current ChunJun Task has been interrupted", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException("The current ChunJun Task has been interrupted", e);
} catch (Exception e) {
logger.error("chunjun task failed.", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
throw new TaskException("Execute chunjun task failed", e);
}
}

48
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java

@ -17,32 +17,38 @@
package org.apache.dolphinscheduler.plugin.task.datax;
import static org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
import com.alibaba.druid.sql.ast.statement.SQLSelect;
import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.enums.Flag;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@ -58,7 +64,6 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -66,17 +71,9 @@ import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
import com.alibaba.druid.sql.ast.statement.SQLSelect;
import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import static org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
public class DataxTask extends AbstractTaskExecutor {
/**
@ -150,7 +147,7 @@ public class DataxTask extends AbstractTaskExecutor {
* @throws Exception if error throws Exception
*/
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
// replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
@ -163,10 +160,15 @@ public class DataxTask extends AbstractTaskExecutor {
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("The current DataX task has been interrupted", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException("The current DataX task has been interrupted", e);
} catch (Exception e) {
logger.error("datax task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
throw new TaskException("Execute DataX task failed", e);
}
}

95
dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java

@ -17,13 +17,17 @@
package org.apache.dolphinscheduler.plugin.task.dinky;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.MissingNode;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
@ -39,10 +43,7 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.MissingNode;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
public class DinkyTask extends AbstractTaskExecutor {
@ -77,47 +78,55 @@ public class DinkyTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
String address = this.dinkyParameters.getAddress();
String taskId = this.dinkyParameters.getTaskId();
boolean isOnline = this.dinkyParameters.isOnline();
JsonNode result;
if (isOnline) {
// Online dinky task, and only one job is allowed to execute
result = onlineTask(address, taskId);
} else {
// Submit dinky task
result = submitTask(address, taskId);
}
if (checkResult(result)) {
boolean status = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("success").asBoolean();
String jobInstanceId = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("jobInstanceId").asText();
boolean finishFlag = false;
while (!finishFlag) {
JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId);
if (!checkResult(jobInstanceInfoResult)) {
break;
}
String jobInstanceStatus = jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText();
switch (jobInstanceStatus) {
case DinkyTaskConstants.STATUS_FINISHED:
final int exitStatusCode = mapStatusToExitCode(status);
// Use address-taskId as app id
setAppIds(String.format("%s-%s", address, taskId));
setExitStatusCode(exitStatusCode);
logger.info("dinky task finished with results: {}", result.get(DinkyTaskConstants.API_RESULT_DATAS));
finishFlag = true;
break;
case DinkyTaskConstants.STATUS_FAILED:
case DinkyTaskConstants.STATUS_CANCELED:
case DinkyTaskConstants.STATUS_UNKNOWN:
errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error").asText());
finishFlag = true;
public void handle() throws TaskException {
try {
String address = this.dinkyParameters.getAddress();
String taskId = this.dinkyParameters.getTaskId();
boolean isOnline = this.dinkyParameters.isOnline();
JsonNode result;
if (isOnline) {
// Online dinky task, and only one job is allowed to execute
result = onlineTask(address, taskId);
} else {
// Submit dinky task
result = submitTask(address, taskId);
}
if (checkResult(result)) {
boolean status = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("success").asBoolean();
String jobInstanceId = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("jobInstanceId").asText();
boolean finishFlag = false;
while (!finishFlag) {
JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId);
if (!checkResult(jobInstanceInfoResult)) {
break;
default:
Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS);
}
String jobInstanceStatus = jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText();
switch (jobInstanceStatus) {
case DinkyTaskConstants.STATUS_FINISHED:
final int exitStatusCode = mapStatusToExitCode(status);
// Use address-taskId as app id
setAppIds(String.format("%s-%s", address, taskId));
setExitStatusCode(exitStatusCode);
logger.info("dinky task finished with results: {}", result.get(DinkyTaskConstants.API_RESULT_DATAS));
finishFlag = true;
break;
case DinkyTaskConstants.STATUS_FAILED:
case DinkyTaskConstants.STATUS_CANCELED:
case DinkyTaskConstants.STATUS_UNKNOWN:
errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error").asText());
finishFlag = true;
break;
default:
Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS);
}
}
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.error("Execute dinkyTask failed", ex);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException("Execute dinkyTask failed", ex);
}
}

14
dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java

@ -17,11 +17,10 @@
package org.apache.dolphinscheduler.plugin.task.dvc;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
@ -30,6 +29,8 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.ArrayList;
import java.util.List;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
/**
* shell task
*/
@ -74,7 +75,7 @@ public class DvcTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
// construct process
String command = buildCommand();
@ -83,10 +84,15 @@ public class DvcTask extends AbstractTaskExecutor {
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
parameters.dealOutParam(shellCommandExecutor.getVarPool());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("The current DvcTask has been interrupted", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException("The current DvcTask has been interrupted", e);
} catch (Exception e) {
logger.error("dvc task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
throw new TaskException("Execute dvc task failed", e);
}
}

17
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java

@ -17,12 +17,6 @@
package org.apache.dolphinscheduler.plugin.task.emr;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import com.amazonaws.SdkBaseException;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
@ -36,6 +30,12 @@ import com.amazonaws.services.elasticmapreduce.model.StepState;
import com.amazonaws.services.elasticmapreduce.model.StepStatus;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Sets;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
/**
* AddJobFlowSteps task executor
@ -62,7 +62,7 @@ public class EmrAddStepsTask extends AbstractEmrTask {
}
@Override
public void handle() throws InterruptedException {
public void handle() throws TaskException {
StepStatus stepStatus = null;
try {
AddJobFlowStepsRequest addJobFlowStepsRequest = createAddJobFlowStepsRequest();
@ -84,6 +84,9 @@ public class EmrAddStepsTask extends AbstractEmrTask {
} catch (EmrTaskException | SdkBaseException e) {
logger.error("emr task submit failed with error", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TaskException("Execute emr task failed", e);
} finally {
final int exitStatusCode = calculateExitStatusCode(stepStatus);
setExitStatusCode(exitStatusCode);

6
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.plugin.task.emr;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.util.HashSet;
@ -55,7 +56,7 @@ public class EmrJobFlowTask extends AbstractEmrTask {
}
@Override
public void handle() throws InterruptedException {
public void handle() throws TaskException {
ClusterStatus clusterStatus = null;
try {
RunJobFlowRequest runJobFlowRequest = createRunJobFlowRequest();
@ -76,6 +77,9 @@ public class EmrJobFlowTask extends AbstractEmrTask {
} catch (EmrTaskException | SdkBaseException e) {
logger.error("emr task submit failed with error", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TaskException("Execute emr task failed", e);
} finally {
final int exitStatusCode = calculateExitStatusCode(clusterStatus);
setExitStatusCode(exitStatusCode);

5
dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.http;
import static org.apache.dolphinscheduler.plugin.task.http.HttpTaskConstants.APPLICATION_JSON;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
@ -89,7 +90,7 @@ public class HttpTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
long startTime = System.currentTimeMillis();
String formatTimeStamp = DateUtils.formatTimeStamp(startTime);
String statusCode = null;
@ -108,7 +109,7 @@ public class HttpTask extends AbstractTaskExecutor {
appendMessage(e.toString());
exitStatusCode = -1;
logger.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:" + output, e);
throw e;
throw new TaskException("Execute http task failed", e);
}
}

24
dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java

@ -17,28 +17,27 @@
package org.apache.dolphinscheduler.plugin.task.jupyter;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.utils.DateUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
public class JupyterTask extends AbstractTaskExecutor {
/**
@ -78,17 +77,22 @@ public class JupyterTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
// SHELL task exit code
TaskResponse response = shellCommandExecutor.run(buildCommand());
setExitStatusCode(response.getExitStatusCode());
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(response.getProcessId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("The current Jupyter task has been interrupted", e);
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
throw new TaskException("The current Jupyter task has been interrupted", e);
} catch (Exception e) {
logger.error("jupyter task execution failure", e);
exitStatusCode = -1;
throw e;
throw new TaskException("Execute jupyter task failed", e);
}
}

16
dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java

@ -17,12 +17,11 @@
package org.apache.dolphinscheduler.plugin.task.mlflow;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
@ -36,6 +35,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
/**
* shell task
*/
@ -80,7 +81,7 @@ public class MlflowTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
// construct process
String command = buildCommand();
@ -95,10 +96,15 @@ public class MlflowTask extends AbstractTaskExecutor {
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
mlflowParameters.dealOutParam(shellCommandExecutor.getVarPool());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("The current Mlflow task has been interrupted", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException("The current Mlflow task has been interrupted", e);
} catch (Exception e) {
logger.error("shell task error", e);
logger.error("Mlflow task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
throw new TaskException("Execute Mlflow task failed", e);
}
}

12
dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java

@ -17,14 +17,14 @@
package org.apache.dolphinscheduler.plugin.task.pigeon;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
@ -33,6 +33,8 @@ import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.net.HttpURLConnection;
import java.net.URI;
@ -42,9 +44,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
/**
* TIS DataX Task
**/
@ -74,7 +73,7 @@ public class PigeonTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
// Trigger PIGEON DataX pipeline
logger.info("start execute PIGEON task");
long startTime = System.currentTimeMillis();
@ -150,6 +149,7 @@ public class PigeonTask extends AbstractTaskExecutor {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new TaskException("Execute pigeon task failed", e);
}
}

12
dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java

@ -17,19 +17,16 @@
package org.apache.dolphinscheduler.plugin.task.procedure;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
@ -44,6 +41,9 @@ import java.sql.Types;
import java.util.HashMap;
import java.util.Map;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
/**
* procedure task
*/
@ -84,7 +84,7 @@ public class ProcedureTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
logger.info("procedure type : {}, datasource : {}, method : {} , localParams : {}",
procedureParameters.getType(),
procedureParameters.getDatasource(),
@ -123,7 +123,7 @@ public class ProcedureTask extends AbstractTaskExecutor {
} catch (Exception e) {
setExitStatusCode(EXIT_CODE_FAILURE);
logger.error("procedure task error", e);
throw e;
throw new TaskException("Execute procedure task failed", e);
} finally {
close(stmt, connection);
}

2
dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java

@ -100,7 +100,7 @@ public class PythonTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
// generate the content of this python script
String pythonScriptContent = buildPythonScriptContent();

9
dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java

@ -65,7 +65,7 @@ public class PytorchTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
String command = buildPythonExecuteCommand();
TaskResponse taskResponse = shellCommandExecutor.run(command);
@ -73,9 +73,14 @@ public class PytorchTask extends AbstractTaskExecutor {
setAppIds(taskResponse.getAppIds());
setProcessId(taskResponse.getProcessId());
setVarPool(shellCommandExecutor.getVarPool());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("The current Pytorch task has been interrupted", e);
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
throw new TaskException("The current Pytorch task has been interrupted", e);
} catch (Exception e) {
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
throw e;
throw new TaskException("Pytorch task execute failed", e);
}
}

5
dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java

@ -24,6 +24,7 @@ import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_G
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
@ -80,13 +81,13 @@ public class SagemakerTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws SagemakerTaskException {
public void handle() throws TaskException {
try {
int exitStatusCode = handleStartPipeline();
setExitStatusCode(exitStatusCode);
} catch (Exception e) {
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
throw new SagemakerTaskException("SageMaker task error", e);
throw new TaskException("SageMaker task error", e);
}
}

21
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java

@ -17,12 +17,12 @@
package org.apache.dolphinscheduler.plugin.task.seatunnel;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
@ -30,9 +30,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.BooleanUtils;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -42,6 +39,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS;
/**
* seatunnel task
*/
@ -85,7 +85,7 @@ public class SeatunnelTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
// construct process
String command = buildCommand();
@ -94,10 +94,15 @@ public class SeatunnelTask extends AbstractTaskExecutor {
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
seatunnelParameters.dealOutParam(shellCommandExecutor.getVarPool());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("The current SeaTunnel task has been interrupted", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException("The current SeaTunnel task has been interrupted", e);
} catch (Exception e) {
logger.error("SeaTunnel task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
throw new TaskException("Execute Seatunnel task failed", e);
}
}

19
dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java

@ -17,12 +17,11 @@
package org.apache.dolphinscheduler.plugin.task.shell;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
import org.apache.commons.lang3.SystemUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
@ -31,8 +30,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.lang3.SystemUtils;
import java.io.File;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
@ -44,6 +41,9 @@ import java.nio.file.attribute.PosixFilePermissions;
import java.util.Map;
import java.util.Set;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
/**
* shell task
*/
@ -90,7 +90,7 @@ public class ShellTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
// construct process
String command = buildCommand();
@ -99,10 +99,15 @@ public class ShellTask extends AbstractTaskExecutor {
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
shellParameters.dealOutParam(shellCommandExecutor.getVarPool());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("The current Shell task has been interrupted", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException("The current Shell task has been interrupted", e);
} catch (Exception e) {
logger.error("shell task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
throw new TaskException("Execute shell task error", e);
}
}

15
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.plugin.task.sql;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
@ -39,8 +42,7 @@ import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import java.sql.Connection;
import java.sql.PreparedStatement;
@ -58,11 +60,6 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class SqlTask extends AbstractTaskExecutor {
/**
@ -117,7 +114,7 @@ public class SqlTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
logger.info("Full sql parameters: {}", sqlParameters);
logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}",
sqlParameters.getType(),
@ -163,7 +160,7 @@ public class SqlTask extends AbstractTaskExecutor {
} catch (Exception e) {
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
logger.error("sql task error", e);
throw e;
throw new TaskException("Execute sql task failed", e);
}
}

11
dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java

@ -17,13 +17,15 @@
package org.apache.dolphinscheduler.plugin.task.zeppelin;
import com.fasterxml.jackson.databind.ObjectMapper;
import kong.unirest.Unirest;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.DateUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.zeppelin.client.ClientConfig;
import org.apache.zeppelin.client.NoteResult;
import org.apache.zeppelin.client.ParagraphResult;
@ -34,10 +36,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import kong.unirest.Unirest;
import com.fasterxml.jackson.databind.ObjectMapper;
public class ZeppelinTask extends AbstractTaskExecutor {
/**
@ -77,7 +75,7 @@ public class ZeppelinTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
final String paragraphId = this.zeppelinParameters.getParagraphId();
final String productionNoteDirectory = this.zeppelinParameters.getProductionNoteDirectory();
@ -142,6 +140,7 @@ public class ZeppelinTask extends AbstractTaskExecutor {
} catch (Exception e) {
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
logger.error("zeppelin task submit failed with error", e);
throw new TaskException("Execute ZeppelinTask exception");
}
}

5
dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java

@ -28,6 +28,7 @@ import static org.powermock.api.mockito.PowerMockito.spy;
import static org.powermock.api.mockito.PowerMockito.when;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.utils.DateUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
@ -127,10 +128,10 @@ public class ZeppelinTaskTest {
Assert.assertEquals(EXIT_CODE_FAILURE, this.zeppelinTask.getExitStatusCode());
}
@Test
@Test(expected = TaskException.class)
public void testHandleWithParagraphExecutionException() throws Exception {
when(this.zClient.executeParagraph(any(), any(), any(Map.class))).
thenThrow(new Exception("Something wrong happens from zeppelin side"));
thenThrow(new TaskException("Something wrong happens from zeppelin side"));
// when(this.paragraphResult.getStatus()).thenReturn(Status.ERROR);
this.zeppelinTask.handle();
Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,

128
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java

@ -17,14 +17,14 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.Constants;
import com.google.common.base.Preconditions;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@ -36,26 +36,16 @@ import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerDelayTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnableFactoryBuilder;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.lang.SystemUtils;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import io.netty.channel.Channel;
/**
* Used to handle {@link CommandType#TASK_DISPATCH_REQUEST}
*/
@ -104,7 +94,7 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
logger.error("task execute request command content is null");
return;
}
final String masterAddress = taskDispatchCommand.getMessageSenderAddress();
final String workflowMasterAddress = taskDispatchCommand.getMessageSenderAddress();
logger.info("task execute request message: {}", taskDispatchCommand);
TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext();
@ -114,111 +104,39 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
return;
}
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
// set cache, it will be used when kill task
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
// todo custom logger
taskExecutionContext.setHost(workerConfig.getWorkerAddress());
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
boolean osUserExistFlag;
// if Using distributed is true and Currently supported systems are linux,Should not let it
// automatically
// create tenants,so TenantAutoCreate has no effect
if (workerConfig.isTenantDistributedUser() && SystemUtils.IS_OS_LINUX) {
// use the id command to judge in linux
osUserExistFlag = OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode());
} else if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
// if not exists this user, then create
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
} else {
osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
}
// check if the OS user exists
if (!osUserExistFlag) {
logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
taskExecutionContext.getTenantCode(),
taskExecutionContext.getTaskInstanceId());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(new Date());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
return;
}
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {}", execLocalPath);
taskExecutionContext.setExecutePath(execLocalPath);
try {
FileUtils.createWorkDirIfAbsent(execLocalPath);
} catch (Throwable ex) {
logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}",
execLocalPath,
taskExecutionContext.getTaskInstanceId(),
ex);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
return;
}
}
// delay task process
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getDelayTime() * 60L);
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L);
if (remainTime > 0) {
logger.info("delay the execution of task instance {}, delay time: {} s",
taskExecutionContext.getTaskInstanceId(),
remainTime);
logger.info("Current taskInstance is choose delay execution, delay time: {}s", remainTime);
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
taskExecutionContext.setStartTime(null);
workerMessageSender.sendMessage(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
workerMessageSender.sendMessage(taskExecutionContext, workflowMasterAddress, CommandType.TASK_EXECUTE_RESULT);
}
WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder.createWorkerDelayTaskExecuteRunnableFactory(
taskExecutionContext,
workerConfig,
workflowMasterAddress,
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate)
.createWorkerTaskExecuteRunnable();
// submit task to manager
boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext,
masterAddress,
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate));
boolean offer = workerManager.offer(workerTaskExecuteRunnable);
if (!offer) {
logger.warn("submit task to wait queue error, queue is full, queue size is {}, taskInstanceId: {}",
workerManager.getWaitSubmitQueueSize(),
taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_REJECT);
logger.warn("submit task to wait queue error, queue is full, current queue size is {}, will send a task reject message to master", workerManager.getWaitSubmitQueueSize());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, workflowMasterAddress, CommandType.TASK_REJECT);
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
/**
* get execute local path
*
* @param taskExecutionContext taskExecutionContext
* @return execute local path
*/
private String getExecLocalPath(TaskExecutionContext taskExecutionContext) {
return FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
}
}

27
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -17,6 +17,11 @@
package org.apache.dolphinscheduler.server.worker.processor;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
@ -34,25 +39,17 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
import org.apache.dolphinscheduler.service.log.LogClientService;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* task kill processor
@ -161,12 +158,12 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* @param taskInstanceId
*/
protected void cancelApplication(int taskInstanceId) {
TaskExecuteThread taskExecuteThread = workerManager.getTaskExecuteThread(taskInstanceId);
if (taskExecuteThread == null) {
WorkerTaskExecuteRunnable workerTaskExecuteRunnable = workerManager.getTaskExecuteThread(taskInstanceId);
if (workerTaskExecuteRunnable == null) {
logger.warn("taskExecuteThread not found, taskInstanceId:{}", taskInstanceId);
return;
}
AbstractTask task = taskExecuteThread.getTask();
AbstractTask task = workerTaskExecuteRunnable.getTask();
if (task == null) {
logger.warn("task not found, taskInstanceId:{}", taskInstanceId);
return;

19
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java

@ -17,6 +17,10 @@
package org.apache.dolphinscheduler.server.worker.processor;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -27,20 +31,13 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskSavePointRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskSavePointResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
/**
* task save point processor
*/
@ -98,12 +95,12 @@ public class TaskSavePointProcessor implements NettyRequestProcessor {
}
protected void doSavePoint(int taskInstanceId) {
TaskExecuteThread taskExecuteThread = workerManager.getTaskExecuteThread(taskInstanceId);
if (taskExecuteThread == null) {
WorkerTaskExecuteRunnable workerTaskExecuteRunnable = workerManager.getTaskExecuteThread(taskInstanceId);
if (workerTaskExecuteRunnable == null) {
logger.warn("taskExecuteThread not found, taskInstanceId:{}", taskInstanceId);
return;
}
AbstractTask task = taskExecuteThread.getTask();
AbstractTask task = workerTaskExecuteRunnable.getTask();
if (task == null) {
logger.warn("task not found, taskInstanceId:{}", taskInstanceId);
return;

60
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import javax.annotation.Nullable;
public class DefaultWorkerDelayTaskExecuteRunnable extends WorkerDelayTaskExecuteRunnable {
public DefaultWorkerDelayTaskExecuteRunnable(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull String workflowMaster,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
super(taskExecutionContext, workerConfig, workflowMaster, workerMessageSender, alertClientService, taskPluginManager, storageOperate);
}
@Override
public void executeTask() throws TaskException {
if (task == null) {
throw new TaskException("The task plugin instance is not initialized");
}
task.handle();
}
@Override
protected void afterExecute() {
super.afterExecute();
}
@Override
protected void afterThrowing(Throwable throwable) throws TaskException {
super.afterThrowing(throwable);
}
}

53
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import javax.annotation.Nullable;
public class DefaultWorkerDelayTaskExecuteRunnableFactory extends WorkerDelayTaskExecuteRunnableFactory<DefaultWorkerDelayTaskExecuteRunnable> {
protected DefaultWorkerDelayTaskExecuteRunnableFactory(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull String workflowMasterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
super(taskExecutionContext, workerConfig, workflowMasterAddress, workerMessageSender, alertClientService, taskPluginManager, storageOperate);
}
@Override
public DefaultWorkerDelayTaskExecuteRunnable createWorkerTaskExecuteRunnable() {
return new DefaultWorkerDelayTaskExecuteRunnable(
taskExecutionContext,
workerConfig,
workflowMasterAddress,
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate);
}
}

364
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -1,364 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import com.google.common.base.Strings;
import lombok.NonNull;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
/**
* task scheduler thread
*/
public class TaskExecuteThread implements Runnable, Delayed {
/**
* logger
*/
private final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
/**
* task instance
*/
private final TaskExecutionContext taskExecutionContext;
private final String masterAddress;
private final StorageOperate storageOperate;
/**
* abstract task
*/
private AbstractTask task;
/**
* task callback service
*/
private final WorkerMessageSender workerMessageSender;
/**
* alert client server
*/
private final AlertClientService alertClientService;
private TaskPluginManager taskPluginManager;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
* @param workerMessageSender used for worker send message to master
*/
public TaskExecuteThread(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull String masterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext;
this.masterAddress = masterAddress;
this.workerMessageSender = workerMessageSender;
this.alertClientService = alertClientService;
this.storageOperate = storageOperate;
}
public TaskExecuteThread(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull String masterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext;
this.masterAddress = masterAddress;
this.workerMessageSender = workerMessageSender;
this.alertClientService = alertClientService;
this.taskPluginManager = taskPluginManager;
this.storageOperate = storageOperate;
}
@Override
public void run() {
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
logger.info("Task dry run success");
return;
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
logger.info("script path : {}", taskExecutionContext.getExecutePath());
if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date());
}
logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
// callback task execute running
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.RUNNING_EXECUTION);
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RUNNING);
// copy hdfs/minio file to local
List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(),
taskExecutionContext.getResources());
if (!fileDownloads.isEmpty()) {
downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
}
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setTaskAppId(String.format("%s_%s",
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
if (null == taskChannel) {
throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.",
taskExecutionContext.getTaskType()));
}
String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setTaskLogName(taskLogName);
// set the name of the current thread
Thread.currentThread().setName(taskLogName);
task = taskChannel.createTask(taskExecutionContext);
// task init
this.task.init();
// init varPool
this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
// task handle
this.task.handle();
// task result process
if (this.task.getNeedAlert()) {
sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus());
}
taskExecutionContext.setCurrentExecutionStatus(this.task.getExitStatus());
taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
taskExecutionContext.setProcessId(this.task.getProcessId());
taskExecutionContext.setAppIds(this.task.getAppIds());
taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(),
this.task.getExitStatus());
} catch (Throwable e) {
logger.error("task scheduler failure", e);
kill();
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
taskExecutionContext.setProcessId(this.task.getProcessId());
taskExecutionContext.setAppIds(this.task.getAppIds());
} finally {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
clearTaskExecPath();
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
private void sendAlert(TaskAlertInfo taskAlertInfo, TaskExecutionStatus status) {
int strategy =
status == TaskExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode();
alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(),
taskAlertInfo.getContent(), strategy);
}
/**
* when task finish, clear execute path.
*/
private void clearTaskExecPath() {
logger.info("develop mode is: {}", CommonUtils.isDevelopMode());
if (!CommonUtils.isDevelopMode()) {
// get exec dir
String execLocalPath = taskExecutionContext.getExecutePath();
if (Strings.isNullOrEmpty(execLocalPath)) {
logger.warn("task: {} exec local path is empty.", taskExecutionContext.getTaskName());
return;
}
if (SINGLE_SLASH.equals(execLocalPath)) {
logger.warn("task: {} exec local path is '/', direct deletion is not allowed",
taskExecutionContext.getTaskName());
return;
}
try {
org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath));
logger.info("exec local path: {} cleared.", execLocalPath);
} catch (IOException e) {
if (e instanceof NoSuchFileException) {
// this is expected
} else {
logger.error("Delete exec dir failed.", e);
}
}
}
}
/**
* kill task
*/
public void kill() {
if (task != null) {
try {
task.cancelApplication(true);
ProcessUtils.killYarnJob(taskExecutionContext);
} catch (Exception e) {
logger.error("Kill task failed", e);
}
}
}
/**
* download resource file
*
* @param execLocalPath execLocalPath
* @param fileDownloads projectRes
* @param logger logger
*/
public void downloadResource(String execLocalPath, Logger logger, List<Pair<String, String>> fileDownloads) {
for (Pair<String, String> fileDownload : fileDownloads) {
try {
// query the tenant code of the resource according to the name of the resource
String fullName = fileDownload.getLeft();
String tenantCode = fileDownload.getRight();
String resPath = storageOperate.getResourceFileName(tenantCode, fullName);
logger.info("get resource file from path:{}", resPath);
long resourceDownloadStartTime = System.currentTimeMillis();
storageOperate.download(tenantCode, resPath, execLocalPath + File.separator + fullName, false, true);
WorkerServerMetrics
.recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime);
WorkerServerMetrics.recordWorkerResourceDownloadSize(
Files.size(Paths.get(execLocalPath, fullName)));
WorkerServerMetrics.incWorkerResourceDownloadSuccessCount();
} catch (Exception e) {
WorkerServerMetrics.incWorkerResourceDownloadFailureCount();
logger.error(e.getMessage(), e);
throw new ServiceException(e.getMessage());
}
}
}
/**
* download resource check
*
* @param execLocalPath
* @param projectRes
* @return
*/
public List<Pair<String, String>> downloadCheck(String execLocalPath, Map<String, String> projectRes) {
if (MapUtils.isEmpty(projectRes)) {
return Collections.emptyList();
}
List<Pair<String, String>> downloadFile = new ArrayList<>();
projectRes.forEach((key, value) -> {
File resFile = new File(execLocalPath, key);
boolean notExist = !resFile.exists();
if (notExist) {
downloadFile.add(Pair.of(key, value));
} else {
logger.info("file : {} exists ", resFile.getName());
}
});
if (!downloadFile.isEmpty() && !PropertyUtils.getResUploadStartupState()) {
throw new StorageOperateNoConfiguredException("Storage service config does not exist!");
}
return downloadFile;
}
/**
* get current TaskExecutionContext
*
* @return TaskExecutionContext
*/
public TaskExecutionContext getTaskExecutionContext() {
return this.taskExecutionContext;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS);
}
@Override
public int compareTo(Delayed o) {
if (o == null) {
return 1;
}
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
public AbstractTask getTask() {
return task;
}
}

61
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import javax.annotation.Nullable;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public abstract class WorkerDelayTaskExecuteRunnable extends WorkerTaskExecuteRunnable implements Delayed {
protected WorkerDelayTaskExecuteRunnable(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull String masterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
super(taskExecutionContext, workerConfig, masterAddress, workerMessageSender, alertClientService, taskPluginManager, storageOperate);
}
@Override
public long getDelay(TimeUnit unit) {
TaskExecutionContext taskExecutionContext = getTaskExecutionContext();
return unit.convert(
DateUtils.getRemainTime(
taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS);
}
@Override
public int compareTo(Delayed o) {
if (o == null) {
return 1;
}
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
}

59
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import javax.annotation.Nullable;
public abstract class WorkerDelayTaskExecuteRunnableFactory<T extends WorkerDelayTaskExecuteRunnable> implements WorkerTaskExecuteRunnableFactory<T> {
protected final @NonNull TaskExecutionContext taskExecutionContext;
protected final @NonNull WorkerConfig workerConfig;
protected final @NonNull String workflowMasterAddress;
protected final @NonNull WorkerMessageSender workerMessageSender;
protected final @NonNull AlertClientService alertClientService;
protected final @NonNull TaskPluginManager taskPluginManager;
protected final @Nullable StorageOperate storageOperate;
protected WorkerDelayTaskExecuteRunnableFactory(
@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull String workflowMasterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext;
this.workerConfig = workerConfig;
this.workflowMasterAddress = workflowMasterAddress;
this.workerMessageSender = workerMessageSender;
this.alertClientService = alertClientService;
this.taskPluginManager = taskPluginManager;
this.storageOperate = storageOperate;
}
public abstract T createWorkerTaskExecuteRunnable();
}

8
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java

@ -50,17 +50,17 @@ public class WorkerExecService {
/**
* running task
*/
private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap;
private final ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable> taskExecuteThreadMap;
public WorkerExecService(ExecutorService execService,
ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap) {
ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable> taskExecuteThreadMap) {
this.execService = execService;
this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService);
this.taskExecuteThreadMap = taskExecuteThreadMap;
WorkerServerMetrics.registerWorkerRunningTaskGauge(taskExecuteThreadMap::size);
}
public void submit(TaskExecuteThread taskExecuteThread) {
public void submit(final WorkerTaskExecuteRunnable taskExecuteThread) {
taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread);
ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread);
FutureCallback futureCallback = new FutureCallback() {
@ -91,7 +91,7 @@ public class WorkerExecService {
return ((ThreadPoolExecutor) this.execService).getQueue().size();
}
public Map<Integer, TaskExecuteThread> getTaskExecuteThreadMap() {
public Map<Integer, WorkerTaskExecuteRunnable> getTaskExecuteThreadMap() {
return taskExecuteThreadMap;
}

60
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java

@ -19,17 +19,15 @@ package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.BlockingQueue;
import javax.annotation.Nullable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
@ -41,31 +39,16 @@ public class WorkerManagerThread implements Runnable {
private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class);
/**
* task queue
*/
private final BlockingQueue<TaskExecuteThread> waitSubmitQueue;
private final DelayQueue<WorkerDelayTaskExecuteRunnable> waitSubmitQueue;
@Autowired(required = false)
private StorageOperate storageOperate;
/**
* thread executor service
*/
private final WorkerExecService workerExecService;
/**
* task callback service
*/
@Autowired
private WorkerMessageSender workerMessageSender;
private volatile int workerExecThreads;
private final int workerExecThreads;
/**
* running task
*/
private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable> taskExecuteThreadMap = new ConcurrentHashMap<>();
public WorkerManagerThread(WorkerConfig workerConfig) {
workerExecThreads = workerConfig.getExecThreads();
@ -75,8 +58,8 @@ public class WorkerManagerThread implements Runnable {
taskExecuteThreadMap);
}
public TaskExecuteThread getTaskExecuteThread(Integer taskInstanceId) {
return this.taskExecuteThreadMap.get(taskInstanceId);
public @Nullable WorkerTaskExecuteRunnable getTaskExecuteThread(Integer taskInstanceId) {
return taskExecuteThreadMap.get(taskInstanceId);
}
/**
@ -94,7 +77,7 @@ public class WorkerManagerThread implements Runnable {
* @return queue size
*/
public int getThreadPoolQueueSize() {
return this.workerExecService.getThreadPoolQueueSize();
return workerExecService.getThreadPoolQueueSize();
}
/**
@ -108,13 +91,7 @@ public class WorkerManagerThread implements Runnable {
.forEach(waitSubmitQueue::remove);
}
/**
* submit task
*
* @param taskExecuteThread taskExecuteThread
* @return submit result
*/
public boolean offer(TaskExecuteThread taskExecuteThread) {
public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) {
if (waitSubmitQueue.size() > workerExecThreads) {
WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
// if waitSubmitQueue is full, it will wait 1s, then try add
@ -123,7 +100,7 @@ public class WorkerManagerThread implements Runnable {
return false;
}
}
return waitSubmitQueue.offer(taskExecuteThread);
return waitSubmitQueue.offer(workerDelayTaskExecuteRunnable);
}
public void start() {
@ -137,15 +114,14 @@ public class WorkerManagerThread implements Runnable {
@Override
public void run() {
Thread.currentThread().setName("Worker-Execute-Manager-Thread");
TaskExecuteThread taskExecuteThread;
while (!ServerLifeCycleManager.isStopped()) {
try {
if (!ServerLifeCycleManager.isRunning()) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
if (this.getThreadPoolQueueSize() <= workerExecThreads) {
taskExecuteThread = waitSubmitQueue.take();
workerExecService.submit(taskExecuteThread);
final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take();
workerExecService.submit(workerDelayTaskExecuteRunnable);
} else {
WorkerServerMetrics.incWorkerOverloadCount();
logger.info("Exec queue is full, waiting submit queue {}, waiting exec queue size {}",
@ -161,7 +137,17 @@ public class WorkerManagerThread implements Runnable {
public void clearTask() {
waitSubmitQueue.clear();
workerExecService.getTaskExecuteThreadMap().values().forEach(TaskExecuteThread::kill);
workerExecService.getTaskExecuteThreadMap().values().forEach(workerTaskExecuteRunnable -> {
int taskInstanceId = workerTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId();
try {
workerTaskExecuteRunnable.cancelTask();
logger.info("Cancel the taskInstance in worker {}", taskInstanceId);
} catch (Exception ex) {
logger.error("Cancel the taskInstance error {}", taskInstanceId, ex);
} finally {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
}
});
workerExecService.getTaskExecuteThreadMap().clear();
}
}

275
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java

@ -0,0 +1,275 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import com.google.common.base.Strings;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionCheckerUtils;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Date;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
public abstract class WorkerTaskExecuteRunnable implements Runnable {
protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, WorkerTaskExecuteRunnable.class));
protected final TaskExecutionContext taskExecutionContext;
protected final WorkerConfig workerConfig;
protected final String masterAddress;
protected final WorkerMessageSender workerMessageSender;
protected final AlertClientService alertClientService;
protected final TaskPluginManager taskPluginManager;
protected final @Nullable StorageOperate storageOperate;
protected @Nullable AbstractTask task;
protected WorkerTaskExecuteRunnable(
@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull String masterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext;
this.workerConfig = workerConfig;
this.masterAddress = masterAddress;
this.workerMessageSender = workerMessageSender;
this.alertClientService = alertClientService;
this.taskPluginManager = taskPluginManager;
this.storageOperate = storageOperate;
String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setTaskLogName(taskLogName);
logger.info("Set task logger name: {}", taskLogName);
}
protected abstract void executeTask();
protected void afterExecute() throws TaskException {
if (task == null) {
throw new TaskException("The current task instance is null");
}
sendAlertIfNeeded();
sendTaskResult();
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
logger.info("Remove the current task execute context from worker cache");
clearTaskExecPathIfNeeded();
}
protected void afterThrowing(Throwable throwable) throws TaskException {
cancelTask();
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(new Date());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
logger.info("Get a exception when execute the task, will send the task execute result to master, the current task execute result is {}", TaskExecutionStatus.FAILURE);
}
public void cancelTask() {
// cancel the task
if (task != null) {
try {
task.cancelApplication(true);
ProcessUtils.killYarnJob(taskExecutionContext);
} catch (Exception e) {
logger.error("Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual", e);
}
}
}
@Override
public void run() {
try {
// set the thread name to make sure the log be written to the task log file
Thread.currentThread().setName(taskExecutionContext.getTaskLogName());
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
logger.info("Begin to pulling task");
initializeTask();
if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
taskExecutionContext.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
logger.info("The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
return;
}
beforeExecute();
executeTask();
afterExecute();
} catch (Throwable ex) {
logger.error("Task execute failed, due to meet an exception", ex);
afterThrowing(ex);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
protected void initializeTask() {
logger.info("Begin to initialize task");
Date taskStartTime = new Date();
taskExecutionContext.setStartTime(taskStartTime);
logger.info("Set task startTime: {}", taskStartTime);
String systemEnvPath = CommonUtils.getSystemEnvPath();
taskExecutionContext.setEnvFile(systemEnvPath);
logger.info("Set task envFile: {}", systemEnvPath);
String taskAppId = String.format("%s_%s", taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setTaskAppId(taskAppId);
logger.info("Set task appId: {}", taskAppId);
logger.info("End initialize task");
}
protected void beforeExecute() {
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.RUNNING_EXECUTION);
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RUNNING);
logger.info("Set task status to {}", TaskExecutionStatus.RUNNING_EXECUTION);
TaskExecutionCheckerUtils.checkTenantExist(workerConfig, taskExecutionContext);
logger.info("TenantCode:{} check success", taskExecutionContext.getTenantCode());
TaskExecutionCheckerUtils.createProcessLocalPathIfAbsent(taskExecutionContext);
logger.info("ProcessExecDir:{} check success", taskExecutionContext.getExecutePath());
TaskExecutionCheckerUtils.downloadResourcesIfNeeded(storageOperate, taskExecutionContext, logger);
logger.info("Resources:{} check success", taskExecutionContext.getResources());
TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
if (null == taskChannel) {
throw new TaskPluginException(String.format("%s task plugin not found, please check config file.", taskExecutionContext.getTaskType()));
}
task = taskChannel.createTask(taskExecutionContext);
if (task == null) {
throw new TaskPluginException(String.format("%s task is null, please check the task plugin is correct", taskExecutionContext.getTaskType()));
}
logger.info("Task plugin: {} create success", taskExecutionContext.getTaskType());
task.init();
logger.info("Success initialized task plugin instance success");
task.getParameters().setVarPool(taskExecutionContext.getVarPool());
logger.info("Success set taskVarPool: {}", taskExecutionContext.getVarPool());
}
protected void sendAlertIfNeeded() {
if (!task.getNeedAlert()) {
return;
}
logger.info("The current task need to send alert, begin to send alert");
TaskExecutionStatus status = task.getExitStatus();
TaskAlertInfo taskAlertInfo = task.getTaskAlertInfo();
int strategy = status == TaskExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode();
alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(), taskAlertInfo.getContent(), strategy);
logger.info("Success send alert");
}
protected void sendTaskResult() {
taskExecutionContext.setCurrentExecutionStatus(task.getExitStatus());
taskExecutionContext.setEndTime(new Date());
taskExecutionContext.setProcessId(task.getProcessId());
taskExecutionContext.setAppIds(task.getAppIds());
taskExecutionContext.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool()));
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
logger.info("Send task execute result to master, the current task status: {}", taskExecutionContext.getCurrentExecutionStatus());
}
protected void clearTaskExecPathIfNeeded() {
String execLocalPath = taskExecutionContext.getExecutePath();
if (!CommonUtils.isDevelopMode()) {
logger.info("The current execute mode isn't develop mode, will clear the task execute file: {}", execLocalPath);
// get exec dir
if (Strings.isNullOrEmpty(execLocalPath)) {
logger.warn("The task execute file is {} no need to clear", taskExecutionContext.getTaskName());
return;
}
if (SINGLE_SLASH.equals(execLocalPath)) {
logger.warn("The task execute file is '/', direct deletion is not allowed");
return;
}
try {
org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath));
logger.info("Success clear the task execute file: {}", execLocalPath);
} catch (IOException e) {
if (e instanceof NoSuchFileException) {
// this is expected
} else {
logger.error("Delete task execute file: {} failed, this will not affect the task status, but you need to clear this manually", execLocalPath, e);
}
}
} else {
logger.info("The current execute mode is develop mode, will not clear the task execute file: {}", execLocalPath);
}
}
public @NonNull TaskExecutionContext getTaskExecutionContext() {
return taskExecutionContext;
}
public @Nullable AbstractTask getTask() {
return task;
}
}

23
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
public interface WorkerTaskExecuteRunnableFactory<T> {
T createWorkerTaskExecuteRunnable();
}

50
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import lombok.NonNull;
import lombok.experimental.UtilityClass;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import javax.annotation.Nullable;
@UtilityClass
public class WorkerTaskExecuteRunnableFactoryBuilder {
public static WorkerDelayTaskExecuteRunnableFactory<?> createWorkerDelayTaskExecuteRunnableFactory(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull String workflowMasterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
return new DefaultWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext,
workerConfig,
workflowMasterAddress,
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate);
}
}

129
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.utils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import org.slf4j.Logger;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TaskExecutionCheckerUtils {
public static void checkTenantExist(WorkerConfig workerConfig, TaskExecutionContext taskExecutionContext) {
try {
boolean osUserExistFlag;
// if Using distributed is true and Currently supported systems are linux,Should not let it
// automatically
// create tenants,so TenantAutoCreate has no effect
if (workerConfig.isTenantDistributedUser() && SystemUtils.IS_OS_LINUX) {
// use the id command to judge in linux
osUserExistFlag = OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode());
} else if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
// if not exists this user, then create
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
} else {
osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
}
if (!osUserExistFlag) {
throw new TaskException(String.format("TenantCode: %s doesn't exist", taskExecutionContext.getTenantCode()));
}
} catch (TaskException ex) {
throw ex;
} catch (Exception ex) {
throw new TaskException(String.format("TenantCode: %s doesn't exist", taskExecutionContext.getTenantCode()));
}
}
public static void createProcessLocalPathIfAbsent(TaskExecutionContext taskExecutionContext) throws TaskException {
try {
// local execute path
String execLocalPath = FileUtils.getProcessExecDir(
taskExecutionContext.getProjectCode(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setExecutePath(execLocalPath);
FileUtils.createWorkDirIfAbsent(execLocalPath);
} catch (Throwable ex) {
throw new TaskException("Cannot create process execute dir", ex);
}
}
public static void downloadResourcesIfNeeded(StorageOperate storageOperate, TaskExecutionContext taskExecutionContext, Logger logger) {
String execLocalPath = taskExecutionContext.getExecutePath();
Map<String, String> projectRes = taskExecutionContext.getResources();
if (MapUtils.isEmpty(projectRes)) {
return;
}
List<Pair<String, String>> downloadFiles = new ArrayList<>();
projectRes.forEach((key, value) -> {
File resFile = new File(execLocalPath, key);
boolean notExist = !resFile.exists();
if (notExist) {
downloadFiles.add(Pair.of(key, value));
} else {
logger.info("file : {} exists ", resFile.getName());
}
});
if (!downloadFiles.isEmpty() && !PropertyUtils.getResUploadStartupState()) {
throw new StorageOperateNoConfiguredException("Storage service config does not exist!");
}
if (CollectionUtils.isNotEmpty(downloadFiles)) {
for (Pair<String, String> fileDownload : downloadFiles) {
try {
// query the tenant code of the resource according to the name of the resource
String fullName = fileDownload.getLeft();
String tenantCode = fileDownload.getRight();
String resPath = storageOperate.getResourceFileName(tenantCode, fullName);
logger.info("get resource file from path:{}", resPath);
long resourceDownloadStartTime = System.currentTimeMillis();
storageOperate.download(tenantCode, resPath, execLocalPath + File.separator + fullName, false, true);
WorkerServerMetrics
.recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime);
WorkerServerMetrics.recordWorkerResourceDownloadSize(
Files.size(Paths.get(execLocalPath, fullName)));
WorkerServerMetrics.incWorkerResourceDownloadSuccessCount();
} catch (Exception e) {
WorkerServerMetrics.incWorkerResourceDownloadFailureCount();
throw new TaskException(String.format("Download resource file: %s error", fileDownload), e);
}
}
}
}
}

171
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java

@ -17,156 +17,76 @@
package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
/**
* test task execute processor
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class, WorkerConfig.class, FileUtils.class, JsonSerializer.class,
JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
@Ignore
public class TaskDispatchProcessorTest {
private TaskExecutionContext taskExecutionContext;
private WorkerMessageSender workerMessageSender;
private ExecutorService workerExecService;
private StorageOperate storageOperate;
@InjectMocks
private TaskDispatchProcessor taskDispatchProcessor;
@Mock
private WorkerConfig workerConfig;
private Command command;
@Mock
private WorkerMessageSender workerMessageSender;
private Command ackCommand;
@Mock
private AlertClientService alertClientService;
private TaskDispatchCommand taskRequestCommand;
@Mock
private TaskPluginManager taskPluginManager;
private AlertClientService alertClientService;
@Mock
private WorkerManagerThread workerManagerThread;
private WorkerManagerThread workerManager;
@Before
public void before() throws Exception {
// init task execution context
taskExecutionContext = getTaskExecutionContext();
workerConfig = new WorkerConfig();
workerConfig.setExecThreads(1);
workerConfig.setListenPort(1234);
command = new Command();
command.setType(CommandType.TASK_DISPATCH_REQUEST);
ackCommand = new TaskExecuteRunningCommand("127.0.0.1:1234",
"127.0.0.1:5678",
System.currentTimeMillis()).convert2Command();
taskRequestCommand = new TaskDispatchCommand(taskExecutionContext,
"127.0.0.1:5678",
"127.0.0.1:1234",
System.currentTimeMillis());
alertClientService = PowerMockito.mock(AlertClientService.class);
workerExecService = PowerMockito.mock(ExecutorService.class);
PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class))).thenReturn(null);
PowerMockito.mockStatic(ChannelUtils.class);
PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null);
workerMessageSender = PowerMockito.mock(WorkerMessageSender.class);
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(WorkerMessageSender.class)).thenReturn(workerMessageSender);
PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)).thenReturn(workerConfig);
workerManager = PowerMockito.mock(WorkerManagerThread.class);
storageOperate = PowerMockito.mock(StorageOperate.class);
PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext,
"127.0.0.1:5678",
workerMessageSender,
alertClientService,
storageOperate))).thenReturn(Boolean.TRUE);
PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class)).thenReturn(workerManager);
PowerMockito.mockStatic(ThreadUtils.class);
PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread",
workerConfig.getExecThreads())).thenReturn(
workerExecService);
PowerMockito.mockStatic(JsonSerializer.class);
PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskDispatchCommand.class)).thenReturn(
taskRequestCommand);
PowerMockito.mockStatic(JSONUtils.class);
PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class)).thenReturn(
taskRequestCommand);
PowerMockito.mockStatic(FileUtils.class);
PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId())).thenReturn(
taskExecutionContext.getExecutePath());
PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());
SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(new TaskExecutionContext(),
workerMessageSender,
"127.0.0.1:5678",
LoggerFactory.getLogger(
TaskDispatchProcessorTest.class),
alertClientService,
storageOperate);
PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments().thenReturn(simpleTaskExecuteThread);
}
@Mock
private StorageOperate storageOperate;
@Test
public void testNormalExecution() {
TaskDispatchProcessor processor = new TaskDispatchProcessor();
processor.process(null, command);
public void process() {
Channel channel = Mockito.mock(Channel.class);
TaskChannel taskChannel = Mockito.mock(TaskChannel.class);
Mockito.when(taskPluginManager.getTaskChannel(Mockito.anyString())).thenReturn(taskChannel);
Assert.assertEquals(TaskExecutionStatus.RUNNING_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
TaskExecutionContext taskExecutionContext = getTaskExecutionContext();
Command dispatchCommand = createDispatchCommand(taskExecutionContext);
taskDispatchProcessor.process(channel, dispatchCommand);
Mockito.verify(workerManagerThread, Mockito.atMostOnce()).offer(Mockito.any());
Mockito.verify(workerMessageSender, Mockito.never()).sendMessageWithRetry(taskExecutionContext, "localhost:5678", CommandType.TASK_REJECT);
}
@Test
public void testDelayExecution() {
taskExecutionContext.setDelayTime(1);
TaskDispatchProcessor processor = new TaskDispatchProcessor();
processor.process(null, command);
Assert.assertEquals(TaskExecutionStatus.DELAY_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
public Command createDispatchCommand(TaskExecutionContext taskExecutionContext) {
return new TaskDispatchCommand(
taskExecutionContext,
"localhost:5678",
"localhost:1234",
System.currentTimeMillis()
).convert2Command();
}
public TaskExecutionContext getTaskExecutionContext() {
@ -184,21 +104,4 @@ public class TaskDispatchProcessorTest {
taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4");
return taskExecutionContext;
}
private static class SimpleTaskExecuteThread extends TaskExecuteThread {
public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext,
WorkerMessageSender workerMessageSender,
String masterAddress,
Logger taskLogger,
AlertClientService alertClientService,
StorageOperate storageOperate) {
super(taskExecutionContext, masterAddress, workerMessageSender, alertClientService, storageOperate);
}
@Override
public void run() {
//
}
}
}

73
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.mockito.Mockito;
import java.util.Date;
public class DefaultWorkerDelayTaskExecuteRunnableTest {
private TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
private WorkerConfig workerConfig = Mockito.mock(WorkerConfig.class);
private String masterAddress = "localhost:5678";
private WorkerMessageSender workerMessageSender = Mockito.mock(WorkerMessageSender.class);
private AlertClientService alertClientService = Mockito.mock(AlertClientService.class);
private TaskPluginManager taskPluginManager = Mockito.mock(TaskPluginManager.class);
private StorageOperate storageOperate = Mockito.mock(StorageOperate.class);
@Test
public void testDryRun() {
TaskExecutionContext taskExecutionContext = TaskExecutionContext.builder()
.dryRun(Constants.DRY_RUN_FLAG_YES)
.taskInstanceId(0)
.processDefineId(0)
.firstSubmitTime(new Date())
.taskLogName("TestLogName")
.build();
WorkerTaskExecuteRunnable workerTaskExecuteRunnable = new DefaultWorkerDelayTaskExecuteRunnable(
taskExecutionContext,
workerConfig,
masterAddress,
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate
);
Assertions.assertAll(workerTaskExecuteRunnable::run);
Assertions.assertEquals(TaskExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus());
}
}

87
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java

@ -1,87 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClientTest;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.lang3.tuple.Pair;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(PowerMockRunner.class)
public class TaskExecuteThreadTest {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRegistryClientTest.class);
@Mock
private TaskExecutionContext taskExecutionContext;
@Mock
private WorkerMessageSender workerMessageSender;
@Mock
private AlertClientService alertClientService;
@Mock
private StorageOperate storageOperate;
@Mock
private TaskPluginManager taskPluginManager;
@Test
public void checkTest() {
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext,
"127.0.0.1:5678",
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate);
String path = "/";
Map<String, String> projectRes = new HashMap<>();
projectRes.put("shell", "shell.sh");
List<Pair<String, String>> downloads = new ArrayList<>();
try {
downloads = taskExecuteThread.downloadCheck(path, projectRes);
} catch (Exception e) {
Assert.assertNotNull(e);
}
downloads.add(Pair.of("shell", "shell.sh"));
try{
taskExecuteThread.downloadResource(path, LOGGER, downloads);
}catch (Exception e){
}
}
}
Loading…
Cancel
Save