Browse Source

[Feature][JsonSplit] Fix master/worker about shellTask (#5300)

* Fix quoted bug about processDefineId of processInstance

* Fix master/worker about shellTask

* codeStyle

* codeStyle

* update taskType

* codeStyle

* update timeoutNotifyStrategy

* codeStyle

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 4 years ago committed by GitHub
parent
commit
f7a06a033d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  3. 16
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
  4. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java
  5. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java
  6. 17
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  7. 6
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  8. 34
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
  9. 62
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
  10. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
  11. 39
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
  12. 24
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtilsTest.java
  13. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
  14. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
  15. 22
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  16. 9
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java
  17. 4
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapperTest.java
  18. 2
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java
  19. 14
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
  20. 19
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java
  21. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
  22. 7
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
  23. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
  24. 64
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  25. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  26. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  27. 19
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  28. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
  29. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
  30. 33
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
  31. 199
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java
  32. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  33. 10
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
  34. 26
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  35. 7
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
  36. 47
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
  37. 3
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java
  38. 3
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
  39. 7
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
  40. 3
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java
  41. 5
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java
  42. 27
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
  43. 6
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java
  44. 22
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  45. 8
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  46. 6
      sql/dolphinscheduler_mysql.sql
  47. 6
      sql/dolphinscheduler_postgre.sql

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -744,7 +744,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @return if task has sub process return true else false
*/
private boolean checkTaskHasSubProcess(String taskType) {
return taskType.equals(TaskType.SUB_PROCESS.name());
return taskType.equals(TaskType.SUB_PROCESS.getDesc());
}
/**

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -329,7 +329,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
*/
private void addDependResultForTaskList(List<TaskInstance> taskInstanceList) throws IOException {
for (TaskInstance taskInstance : taskInstanceList) {
if (taskInstance.getTaskType().equalsIgnoreCase(TaskType.DEPENDENT.toString())) {
if (TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
Result<String> logResult = loggerService.queryLog(
taskInstance.getId(), Constants.LOG_QUERY_SKIP_LINE_NUMBER, Constants.LOG_QUERY_LIMIT);
if (logResult.getCode() == Status.SUCCESS.ordinal()) {

16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java

@ -16,11 +16,6 @@
*/
package org.apache.dolphinscheduler.api.utils;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.TaskType;
@ -30,6 +25,11 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
/**
* check utils
@ -128,10 +128,10 @@ public class CheckUtils {
public static boolean checkTaskNodeParameters(TaskNode taskNode) {
AbstractParameters abstractParameters;
if (TaskType.of(taskNode.getType()) == TaskType.DEPENDENT) {
abstractParameters = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getDependence());
if (TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskNode.getType())) {
abstractParameters = TaskParametersUtils.getParameters(taskNode.getType().toUpperCase(), taskNode.getDependence());
} else {
abstractParameters = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
abstractParameters = TaskParametersUtils.getParameters(taskNode.getType().toUpperCase(), taskNode.getParams());
}
if (abstractParameters != null) {

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java

@ -78,7 +78,7 @@ public class DataSourceParam implements ProcessAddTaskParam, InitializingBean {
*/
@Override
public void afterPropertiesSet() {
TaskNodeParamFactory.register(TaskType.SQL.name(), this);
TaskNodeParamFactory.register(TaskType.PROCEDURE.name(), this);
TaskNodeParamFactory.register(TaskType.SQL.getDesc(), this);
TaskNodeParamFactory.register(TaskType.PROCEDURE.getDesc(), this);
}
}

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java

@ -109,6 +109,6 @@ public class DependentParam implements ProcessAddTaskParam, InitializingBean {
*/
@Override
public void afterPropertiesSet() {
TaskNodeParamFactory.register(TaskType.DEPENDENT.name(), this);
TaskNodeParamFactory.register(TaskType.DEPENDENT.getDesc(), this);
}
}

17
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.api.service;
import static org.assertj.core.api.Assertions.assertThat;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
@ -30,13 +29,12 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -61,11 +59,8 @@ import org.apache.http.entity.ContentType;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Method;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@ -82,7 +77,6 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.mock.web.MockMultipartFile;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.multipart.MultipartFile;
import com.baomidou.mybatisplus.core.metadata.IPage;
@ -696,7 +690,7 @@ public class ProcessDefinitionServiceTest {
Assert.assertEquals(Status.PROCESS_NODE_HAS_CYCLE, taskCycleRes.get(Constants.STATUS));
//json abnormal
String abnormalJson = processDefinitionJson.replaceAll("SHELL", "");
String abnormalJson = processDefinitionJson.replaceAll(TaskType.SHELL.getDesc(), "");
processData = JSONUtils.parseObject(abnormalJson, ProcessData.class);
Map<String, Object> abnormalTaskRes = processDefinitionService.checkProcessNodeList(processData, abnormalJson);
Assert.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID, abnormalTaskRes.get(Constants.STATUS));
@ -795,7 +789,7 @@ public class ProcessDefinitionServiceTest {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setStartTime(new Date());
taskInstance.setEndTime(new Date());
taskInstance.setTaskType("SHELL");
taskInstance.setTaskType(TaskType.SHELL.getDesc());
taskInstance.setId(1);
taskInstance.setName("test_task_instance");
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
@ -832,7 +826,7 @@ public class ProcessDefinitionServiceTest {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setStartTime(new Date());
taskInstance.setEndTime(new Date());
taskInstance.setTaskType("SUB_PROCESS");
taskInstance.setTaskType(TaskType.SUB_PROCESS.getDesc());
taskInstance.setId(1);
taskInstance.setName("test_task_instance");
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
@ -1128,7 +1122,6 @@ public class ProcessDefinitionServiceTest {
* @return ProcessDefinition
*/
private ProcessDefinition getProcessDefinition() {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(46);
processDefinition.setName("test_pdf");
@ -1229,7 +1222,7 @@ public class ProcessDefinitionServiceTest {
@Test
public void testExportProcessMetaData() {
Integer processDefinitionId = 111;
int processDefinitionId = 111;
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(processDefinitionId);
Assert.assertNotNull(processDefinitionService.exportProcessMetaData(processDefinition));

6
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -281,7 +281,7 @@ public class ProcessInstanceServiceTest {
ProcessInstance processInstance = getProcessInstance();
processInstance.setState(ExecutionStatus.SUCCESS);
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType(TaskType.SHELL.getDescp());
taskInstance.setTaskType(TaskType.SHELL.getDesc());
List<TaskInstance> taskInstanceList = new ArrayList<>();
taskInstanceList.add(taskInstance);
Result res = new Result();
@ -332,7 +332,7 @@ public class ProcessInstanceServiceTest {
//task not sub process
TaskInstance taskInstance = getTaskInstance();
taskInstance.setTaskType(TaskType.HTTP.toString());
taskInstance.setTaskType(TaskType.HTTP.getDesc());
taskInstance.setProcessInstanceId(1);
when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Map<String, Object> notSubprocessRes = processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectName, 1);
@ -340,7 +340,7 @@ public class ProcessInstanceServiceTest {
//sub process not exist
TaskInstance subTask = getTaskInstance();
subTask.setTaskType(TaskType.SUB_PROCESS.toString());
subTask.setTaskType(TaskType.SUB_PROCESS.getDesc());
subTask.setProcessInstanceId(1);
when(processService.findTaskInstanceById(subTask.getId())).thenReturn(subTask);
when(processService.findSubProcessInstance(subTask.getProcessInstanceId(), subTask.getId())).thenReturn(null);

34
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java

@ -152,18 +152,18 @@ public class CheckUtilsTest {
// sub SubProcessParameters
SubProcessParameters subProcessParameters = new SubProcessParameters();
taskNode.setParams(JSONUtils.toJsonString(subProcessParameters));
taskNode.setType(TaskType.SUB_PROCESS.toString());
taskNode.setType(TaskType.SUB_PROCESS.getDesc());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
subProcessParameters.setProcessDefinitionId(1234);
taskNode.setParams(JSONUtils.toJsonString(subProcessParameters));
taskNode.setType(TaskType.SUB_PROCESS.toString());
taskNode.setType(TaskType.SUB_PROCESS.getDesc());
assertTrue(CheckUtils.checkTaskNodeParameters(taskNode));
// ShellParameters
ShellParameters shellParameters = new ShellParameters();
taskNode.setParams(JSONUtils.toJsonString(shellParameters));
taskNode.setType(TaskType.SHELL.toString());
taskNode.setType(TaskType.SHELL.getDesc());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
shellParameters.setRawScript("");
taskNode.setParams(JSONUtils.toJsonString(shellParameters));
@ -175,7 +175,7 @@ public class CheckUtilsTest {
// ProcedureParameters
ProcedureParameters procedureParameters = new ProcedureParameters();
taskNode.setParams(JSONUtils.toJsonString(procedureParameters));
taskNode.setType(TaskType.PROCEDURE.toString());
taskNode.setType(TaskType.PROCEDURE.getDesc());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
procedureParameters.setDatasource(1);
procedureParameters.setType("xx");
@ -186,7 +186,7 @@ public class CheckUtilsTest {
// SqlParameters
SqlParameters sqlParameters = new SqlParameters();
taskNode.setParams(JSONUtils.toJsonString(sqlParameters));
taskNode.setType(TaskType.SQL.toString());
taskNode.setType(TaskType.SQL.getDesc());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
sqlParameters.setDatasource(1);
sqlParameters.setType("xx");
@ -197,7 +197,7 @@ public class CheckUtilsTest {
// MapReduceParameters
MapReduceParameters mapreduceParameters = new MapReduceParameters();
taskNode.setParams(JSONUtils.toJsonString(mapreduceParameters));
taskNode.setType(TaskType.MR.toString());
taskNode.setType(TaskType.MR.getDesc());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
ResourceInfo resourceInfoMapreduce = new ResourceInfo();
@ -206,13 +206,13 @@ public class CheckUtilsTest {
mapreduceParameters.setMainJar(resourceInfoMapreduce);
mapreduceParameters.setProgramType(ProgramType.JAVA);
taskNode.setParams(JSONUtils.toJsonString(mapreduceParameters));
taskNode.setType(TaskType.MR.toString());
taskNode.setType(TaskType.MR.getDesc());
assertTrue(CheckUtils.checkTaskNodeParameters(taskNode));
// SparkParameters
SparkParameters sparkParameters = new SparkParameters();
taskNode.setParams(JSONUtils.toJsonString(sparkParameters));
taskNode.setType(TaskType.SPARK.toString());
taskNode.setType(TaskType.SPARK.getDesc());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
sparkParameters.setMainJar(new ResourceInfo());
sparkParameters.setProgramType(ProgramType.SCALA);
@ -223,7 +223,7 @@ public class CheckUtilsTest {
// PythonParameters
PythonParameters pythonParameters = new PythonParameters();
taskNode.setParams(JSONUtils.toJsonString(pythonParameters));
taskNode.setType(TaskType.PYTHON.toString());
taskNode.setType(TaskType.PYTHON.getDesc());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
pythonParameters.setRawScript("ss");
taskNode.setParams(JSONUtils.toJsonString(pythonParameters));
@ -232,42 +232,42 @@ public class CheckUtilsTest {
// DependentParameters
DependentParameters dependentParameters = new DependentParameters();
taskNode.setParams(JSONUtils.toJsonString(dependentParameters));
taskNode.setType(TaskType.DEPENDENT.toString());
taskNode.setType(TaskType.DEPENDENT.getDesc());
assertTrue(CheckUtils.checkTaskNodeParameters(taskNode));
// FlinkParameters
FlinkParameters flinkParameters = new FlinkParameters();
taskNode.setParams(JSONUtils.toJsonString(flinkParameters));
taskNode.setType(TaskType.FLINK.toString());
taskNode.setType(TaskType.FLINK.getDesc());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
flinkParameters.setMainJar(new ResourceInfo());
flinkParameters.setProgramType(ProgramType.JAVA);
taskNode.setParams(JSONUtils.toJsonString(flinkParameters));
taskNode.setType(TaskType.FLINK.toString());
taskNode.setType(TaskType.FLINK.getDesc());
assertTrue(CheckUtils.checkTaskNodeParameters(taskNode));
// HTTP
HttpParameters httpParameters = new HttpParameters();
taskNode.setParams(JSONUtils.toJsonString(httpParameters));
taskNode.setType(TaskType.HTTP.toString());
taskNode.setType(TaskType.HTTP.getDesc());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
httpParameters.setUrl("httpUrl");
taskNode.setParams(JSONUtils.toJsonString(httpParameters));
taskNode.setType(TaskType.HTTP.toString());
taskNode.setType(TaskType.HTTP.getDesc());
assertTrue(CheckUtils.checkTaskNodeParameters(taskNode));
// DataxParameters
DataxParameters dataxParameters = new DataxParameters();
taskNode.setParams(JSONUtils.toJsonString(dataxParameters));
taskNode.setType(TaskType.DATAX.toString());
taskNode.setType(TaskType.DATAX.getDesc());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
dataxParameters.setCustomConfig(0);
dataxParameters.setDataSource(111);
dataxParameters.setDataTarget(333);
dataxParameters.setSql("sql");
dataxParameters.setSql(TaskType.SQL.getDesc());
dataxParameters.setTargetTable("tar");
taskNode.setParams(JSONUtils.toJsonString(dataxParameters));
taskNode.setType(TaskType.DATAX.toString());
taskNode.setType(TaskType.DATAX.getDesc());
assertTrue(CheckUtils.checkTaskNodeParameters(taskNode));
}

62
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java

@ -16,9 +16,6 @@
*/
package org.apache.dolphinscheduler.common.enums;
import java.util.HashMap;
import java.util.Map;
import com.baomidou.mybatisplus.annotation.EnumValue;
/**
@ -41,56 +38,35 @@ public enum TaskType {
* 12 SQOOP
* 13 WATERDROP
*/
SHELL(0, "shell"),
SQL(1, "sql"),
SUB_PROCESS(2, "sub_process"),
PROCEDURE(3, "procedure"),
MR(4, "mr"),
SPARK(5, "spark"),
PYTHON(6, "python"),
DEPENDENT(7, "dependent"),
FLINK(8, "flink"),
HTTP(9, "http"),
DATAX(10, "datax"),
CONDITIONS(11, "conditions"),
SQOOP(12, "sqoop"),
WATERDROP(13, "waterdrop");
SHELL(0, "SHELL"),
SQL(1, "SQL"),
SUB_PROCESS(2, "SUB_PROCESS"),
PROCEDURE(3, "PROCEDURE"),
MR(4, "MR"),
SPARK(5, "SPARK"),
PYTHON(6, "PYTHON"),
DEPENDENT(7, "DEPENDENT"),
FLINK(8, "FLINK"),
HTTP(9, "HTTP"),
DATAX(10, "DATAX"),
CONDITIONS(11, "CONDITIONS"),
SQOOP(12, "SQOOP"),
WATERDROP(13, "WATERDROP");
TaskType(int code, String descp) {
TaskType(int code, String desc) {
this.code = code;
this.descp = descp;
this.desc = desc;
}
@EnumValue
private final int code;
private final String descp;
public static boolean typeIsNormalTask(String typeName) {
TaskType taskType = TaskType.valueOf(typeName);
return !(taskType == TaskType.SUB_PROCESS || taskType == TaskType.DEPENDENT);
}
private final String desc;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
private static final Map<String, TaskType> TASK_TYPE_MAP = new HashMap<>();
static {
for (TaskType taskType : TaskType.values()) {
TASK_TYPE_MAP.put(taskType.descp, taskType);
}
}
public static TaskType of(String descp) {
descp = descp.toLowerCase();
if (TASK_TYPE_MAP.containsKey(descp)) {
return TASK_TYPE_MAP.get(descp);
}
throw new IllegalArgumentException("invalid type : " + descp);
public String getDesc() {
return desc;
}
}

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java

@ -360,7 +360,7 @@ public class TaskNode {
}
public boolean isConditionsTask() {
return TaskType.CONDITIONS.toString().equalsIgnoreCase(this.getType());
return TaskType.CONDITIONS.getDesc().equalsIgnoreCase(this.getType());
}
public List<PreviousTaskNode> getPreTaskNodeList() {

39
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
@ -41,7 +40,7 @@ import org.slf4j.LoggerFactory;
*/
public class TaskParametersUtils {
private static Logger logger = LoggerFactory.getLogger(TaskParametersUtils.class);
private static final Logger logger = LoggerFactory.getLogger(TaskParametersUtils.class);
private TaskParametersUtils() {
throw new UnsupportedOperationException("Construct TaskParametersUtils");
@ -55,40 +54,36 @@ public class TaskParametersUtils {
* @return task parameters
*/
public static AbstractParameters getParameters(String taskType, String parameter) {
TaskType anEnum = EnumUtils.getEnum(TaskType.class, taskType);
if (anEnum == null) {
logger.error("not support task type: {}", taskType);
return null;
}
switch (anEnum) {
case SUB_PROCESS:
switch (taskType) {
case "SUB_PROCESS":
return JSONUtils.parseObject(parameter, SubProcessParameters.class);
case SHELL:
case WATERDROP:
case "SHELL":
case "WATERDROP":
return JSONUtils.parseObject(parameter, ShellParameters.class);
case PROCEDURE:
case "PROCEDURE":
return JSONUtils.parseObject(parameter, ProcedureParameters.class);
case SQL:
case "SQL":
return JSONUtils.parseObject(parameter, SqlParameters.class);
case MR:
case "MR":
return JSONUtils.parseObject(parameter, MapReduceParameters.class);
case SPARK:
case "SPARK":
return JSONUtils.parseObject(parameter, SparkParameters.class);
case PYTHON:
case "PYTHON":
return JSONUtils.parseObject(parameter, PythonParameters.class);
case DEPENDENT:
case "DEPENDENT":
return JSONUtils.parseObject(parameter, DependentParameters.class);
case FLINK:
case "FLINK":
return JSONUtils.parseObject(parameter, FlinkParameters.class);
case HTTP:
case "HTTP":
return JSONUtils.parseObject(parameter, HttpParameters.class);
case DATAX:
case "DATAX":
return JSONUtils.parseObject(parameter, DataxParameters.class);
case CONDITIONS:
case "CONDITIONS":
return JSONUtils.parseObject(parameter, ConditionsParameters.class);
case SQOOP:
case "SQOOP":
return JSONUtils.parseObject(parameter, SqoopParameters.class);
default:
logger.error("not support task type: {}", taskType);
return null;
}

24
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtilsTest.java

@ -16,6 +16,8 @@
*/
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -30,16 +32,16 @@ public class TaskParametersUtilsTest {
@Test
public void testGetParameters() {
Assert.assertNull(TaskParametersUtils.getParameters("xx", "ttt"));
Assert.assertNull(TaskParametersUtils.getParameters("SHELL", "ttt"));
Assert.assertNotNull(TaskParametersUtils.getParameters("SHELL", "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters("SQL", "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters("SUB_PROCESS", "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters("PROCEDURE", "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters("MR", "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters("SPARK", "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters("PYTHON", "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters("DEPENDENT", "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters("FLINK", "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters("HTTP", "{}"));
Assert.assertNull(TaskParametersUtils.getParameters(TaskType.SHELL.getDesc(), "ttt"));
Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.SHELL.getDesc(), "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.SQL.getDesc(), "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.SUB_PROCESS.getDesc(), "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.PROCEDURE.getDesc(), "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.MR.getDesc(), "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.SPARK.getDesc(), "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.PYTHON.getDesc(), "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.DEPENDENT.getDesc(), "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.FLINK.getDesc(), "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.HTTP.getDesc(), "{}"));
}
}

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -82,7 +81,7 @@ public class TaskDefinition {
/**
* task type
*/
private TaskType taskType;
private String taskType;
/**
* user defined parameters
@ -318,11 +317,11 @@ public class TaskDefinition {
this.projectCode = projectCode;
}
public TaskType getTaskType() {
public String getTaskType() {
return taskType;
}
public void setTaskType(TaskType taskType) {
public void setTaskType(String taskType) {
this.taskType = taskType;
}

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java

@ -17,11 +17,11 @@
package org.apache.dolphinscheduler.dao.entity;
import java.util.Date;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import java.util.Date;
/**
* task definition log
*/

22
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -68,7 +68,7 @@ public class TaskInstance implements Serializable {
private long taskCode;
/**
* task defintion version
* task definition version
*/
private int taskDefinitionVersion;
@ -146,6 +146,12 @@ public class TaskInstance implements Serializable {
@TableField(exist = false)
private ProcessDefinition processDefine;
/**
* task definition
*/
@TableField(exist = false)
private TaskDefinition taskDefine;
/**
* process id
*/
@ -268,6 +274,14 @@ public class TaskInstance implements Serializable {
this.processDefine = processDefine;
}
public TaskDefinition getTaskDefine() {
return taskDefine;
}
public void setTaskDefine(TaskDefinition taskDefine) {
this.taskDefine = taskDefine;
}
public int getId() {
return id;
}
@ -484,15 +498,15 @@ public class TaskInstance implements Serializable {
}
public boolean isSubProcess() {
return TaskType.SUB_PROCESS.equals(TaskType.valueOf(this.taskType));
return TaskType.SUB_PROCESS.getDesc().equalsIgnoreCase(this.taskType);
}
public boolean isDependTask() {
return TaskType.DEPENDENT.equals(TaskType.valueOf(this.taskType));
return TaskType.DEPENDENT.getDesc().equalsIgnoreCase(this.taskType);
}
public boolean isConditionsTask() {
return TaskType.CONDITIONS.equals(TaskType.valueOf(this.taskType));
return TaskType.CONDITIONS.getDesc().equalsIgnoreCase(this.taskType);
}
/**

9
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.model.TaskNode;
@ -39,19 +40,19 @@ public class TaskInstanceTest {
TaskInstance taskInstance = new TaskInstance();
//sub process
taskInstance.setTaskType("SUB_PROCESS");
taskInstance.setTaskType(TaskType.SUB_PROCESS.getDesc());
Assert.assertTrue(taskInstance.isSubProcess());
//not sub process
taskInstance.setTaskType("HTTP");
taskInstance.setTaskType(TaskType.HTTP.getDesc());
Assert.assertFalse(taskInstance.isSubProcess());
//sub process
taskInstance.setTaskType("CONDITIONS");
taskInstance.setTaskType(TaskType.CONDITIONS.getDesc());
Assert.assertTrue(taskInstance.isConditionsTask());
//sub process
taskInstance.setTaskType("DEPENDENT");
taskInstance.setTaskType(TaskType.DEPENDENT.getDesc());
Assert.assertTrue(taskInstance.isDependTask());
}

4
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapperTest.java

@ -63,7 +63,7 @@ public class TaskDefinitionLogMapperTest {
taskDefinition.setCode(888888L);
taskDefinition.setName("unit-test");
taskDefinition.setProjectCode(1L);
taskDefinition.setTaskType(TaskType.SHELL);
taskDefinition.setTaskType(TaskType.SHELL.getDesc());
taskDefinition.setUserId(userId);
taskDefinition.setVersion(1);
taskDefinition.setCreateTime(new Date());
@ -120,7 +120,7 @@ public class TaskDefinitionLogMapperTest {
taskDefinition.setCode(888888L);
taskDefinition.setName("unit-test");
taskDefinition.setProjectCode(1L);
taskDefinition.setTaskType(TaskType.SHELL);
taskDefinition.setTaskType(TaskType.SHELL.getDesc());
taskDefinition.setUserId(1);
taskDefinition.setResourceIds("1");
taskDefinition.setVersion(1);

2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java

@ -60,7 +60,7 @@ public class TaskDefinitionMapperTest {
taskDefinition.setCode(888888L);
taskDefinition.setName("unit-test");
taskDefinition.setProjectCode(1L);
taskDefinition.setTaskType(TaskType.SHELL);
taskDefinition.setTaskType(TaskType.SHELL.getDesc());
taskDefinition.setUserId(userId);
taskDefinition.setResourceIds("1");
taskDefinition.setVersion(1);

14
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java

@ -17,9 +17,6 @@
package org.apache.dolphinscheduler.dao.mapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskType;
@ -41,6 +38,9 @@ import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
@ -77,7 +77,7 @@ public class TaskInstanceMapperTest {
*/
private TaskInstance insertTaskInstance(int processInstanceId) {
//insertOne
return insertTaskInstance("us task", processInstanceId, ExecutionStatus.RUNNING_EXECUTION, TaskType.SHELL.toString());
return insertTaskInstance(processInstanceId, TaskType.SHELL.getDesc());
}
/**
@ -100,11 +100,11 @@ public class TaskInstanceMapperTest {
/**
* construct a task instance and then insert
*/
private TaskInstance insertTaskInstance(String taskName, int processInstanceId, ExecutionStatus state, String taskType) {
private TaskInstance insertTaskInstance(int processInstanceId, String taskType) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setFlag(Flag.YES);
taskInstance.setName(taskName);
taskInstance.setState(state);
taskInstance.setName("us task");
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
taskInstance.setEndTime(new Date());
taskInstance.setProcessInstanceId(processInstanceId);

19
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java

@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@ -205,7 +206,7 @@ public class DagHelperTest {
completeTaskList.put("2", new TaskInstance());
completeTaskList.put("4", new TaskInstance());
TaskNode node3 = dag.getNode("3");
node3.setType("CONDITIONS");
node3.setType(TaskType.CONDITIONS.getDesc());
node3.setConditionResult("{\n"
+
" \"successNode\": [5\n"
@ -273,13 +274,13 @@ public class DagHelperTest {
TaskNode node1 = new TaskNode();
node1.setId("1");
node1.setName("1");
node1.setType("SHELL");
node1.setType(TaskType.SHELL.getDesc());
taskNodeList.add(node1);
TaskNode node2 = new TaskNode();
node2.setId("2");
node2.setName("2");
node2.setType("SHELL");
node2.setType(TaskType.SHELL.getDesc());
List<String> dep2 = new ArrayList<>();
dep2.add("1");
node2.setDepList(dep2);
@ -289,13 +290,13 @@ public class DagHelperTest {
TaskNode node4 = new TaskNode();
node4.setId("4");
node4.setName("4");
node4.setType("SHELL");
node4.setType(TaskType.SHELL.getDesc());
taskNodeList.add(node4);
TaskNode node3 = new TaskNode();
node3.setId("3");
node3.setName("3");
node3.setType("SHELL");
node3.setType(TaskType.SHELL.getDesc());
List<String> dep3 = new ArrayList<>();
dep3.add("2");
dep3.add("4");
@ -305,7 +306,7 @@ public class DagHelperTest {
TaskNode node5 = new TaskNode();
node5.setId("5");
node5.setName("5");
node5.setType("SHELL");
node5.setType(TaskType.SHELL.getDesc());
List<String> dep5 = new ArrayList<>();
dep5.add("3");
dep5.add("8");
@ -315,7 +316,7 @@ public class DagHelperTest {
TaskNode node6 = new TaskNode();
node6.setId("6");
node6.setName("6");
node6.setType("SHELL");
node6.setType(TaskType.SHELL.getDesc());
List<String> dep6 = new ArrayList<>();
dep6.add("3");
node6.setDepList(dep6);
@ -324,7 +325,7 @@ public class DagHelperTest {
TaskNode node7 = new TaskNode();
node7.setId("7");
node7.setName("7");
node7.setType("SHELL");
node7.setType(TaskType.SHELL.getDesc());
List<String> dep7 = new ArrayList<>();
dep7.add("5");
node7.setDepList(dep7);
@ -333,7 +334,7 @@ public class DagHelperTest {
TaskNode node8 = new TaskNode();
node8.setId("8");
node8.setName("8");
node8.setType("SHELL");
node8.setType(TaskType.SHELL.getDesc());
List<String> dep8 = new ArrayList<>();
dep8.add("2");
node8.setDepList(dep8);

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java

@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.builder;
import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.server.entity.*;
@ -54,12 +56,15 @@ public class TaskExecutionContextBuilder {
}
public TaskExecutionContextBuilder buildTaskDefinitionRelatedInfo(TaskDefinition taskDefinition) {
int timeoutSeconds = taskDefinition.getTimeout() * SEC_2_MINUTES_TIME_UNIT;
if (timeoutSeconds >= Integer.MAX_VALUE) {
timeoutSeconds = Integer.MAX_VALUE;
taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE);
if (taskDefinition.getTimeoutFlag() == TimeoutFlag.OPEN) {
taskExecutionContext.setTaskTimeoutStrategy(taskDefinition.getTimeoutNotifyStrategy());
if (taskDefinition.getTimeoutNotifyStrategy() == TaskTimeoutStrategy.FAILED
|| taskDefinition.getTimeoutNotifyStrategy() == TaskTimeoutStrategy.WARNFAILED) {
taskExecutionContext.setTaskTimeout(Math.min(taskDefinition.getTimeout() * SEC_2_MINUTES_TIME_UNIT, Integer.MAX_VALUE));
}
}
taskExecutionContext.setTaskTimeoutStrategy(taskDefinition.getTimeoutNotifyStrategy().getCode());
taskExecutionContext.setTaskTimeout(timeoutSeconds);
taskExecutionContext.setTaskParams(taskDefinition.getTaskParams());
return this;
}

7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.entity;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
@ -168,7 +169,7 @@ public class TaskExecutionContext implements Serializable {
/**
* task timeout strategy
*/
private int taskTimeoutStrategy;
private TaskTimeoutStrategy taskTimeoutStrategy;
/**
* task timeout
@ -420,11 +421,11 @@ public class TaskExecutionContext implements Serializable {
this.taskAppId = taskAppId;
}
public int getTaskTimeoutStrategy() {
public TaskTimeoutStrategy getTaskTimeoutStrategy() {
return taskTimeoutStrategy;
}
public void setTaskTimeoutStrategy(int taskTimeoutStrategy) {
public void setTaskTimeoutStrategy(TaskTimeoutStrategy taskTimeoutStrategy) {
this.taskTimeoutStrategy = taskTimeoutStrategy;
}

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java vendored

@ -23,12 +23,13 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* taskInstance state manager
*/
@ -74,8 +75,8 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
taskInstance.setId(taskExecutionContext.getTaskInstanceId());
taskInstance.setName(taskExecutionContext.getTaskName());
taskInstance.setStartTime(taskExecutionContext.getStartTime());
taskInstance.setTaskType(taskInstance.getTaskType());
taskInstance.setExecutePath(taskInstance.getExecutePath());
taskInstance.setTaskType(taskExecutionContext.getTaskType());
taskInstance.setExecutePath(taskExecutionContext.getExecutePath());
taskInstanceCache.put(taskExecutionContext.getTaskInstanceId(), taskInstance);
}

64
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -196,21 +196,17 @@ public class TaskPriorityQueueConsumer extends Thread {
protected TaskExecutionContext getTaskExecutionContext(int taskInstanceId) {
TaskInstance taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstanceId);
// task type
TaskType taskType = TaskType.valueOf(taskInstance.getTaskType());
int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
// verify tenant is null
if (verifyTenantIsNull(tenant, taskInstance)) {
processService.changeTaskState(taskInstance, ExecutionStatus.FAILURE,
taskInstance.getStartTime(),
taskInstance.getHost(),
null,
null,
taskInstance.getId());
taskInstance.getStartTime(),
taskInstance.getHost(),
null,
null,
taskInstance.getId());
return null;
}
// set queue for process instance, user-specified queue takes precedence over tenant queue
@ -225,40 +221,41 @@ public class TaskPriorityQueueConsumer extends Thread {
SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext();
// SQL task
if (taskType == TaskType.SQL) {
if (TaskType.SQL.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
setSQLTaskRelation(sqlTaskExecutionContext, taskInstance);
}
// DATAX task
if (taskType == TaskType.DATAX) {
if (TaskType.DATAX.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
setDataxTaskRelation(dataxTaskExecutionContext, taskInstance);
}
// procedure task
if (taskType == TaskType.PROCEDURE) {
if (TaskType.PROCEDURE.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
setProcedureTaskRelation(procedureTaskExecutionContext, taskInstance);
}
if (taskType == TaskType.SQOOP) {
if (TaskType.SQOOP.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
setSqoopTaskRelation(sqoopTaskExecutionContext, taskInstance);
}
return TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
.buildSQLTaskRelatedInfo(sqlTaskExecutionContext)
.buildDataxTaskRelatedInfo(dataxTaskExecutionContext)
.buildProcedureTaskRelatedInfo(procedureTaskExecutionContext)
.buildSqoopTaskRelatedInfo(sqoopTaskExecutionContext)
.create();
.buildTaskInstanceRelatedInfo(taskInstance)
.buildTaskDefinitionRelatedInfo(taskInstance.getTaskDefine())
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
.buildSQLTaskRelatedInfo(sqlTaskExecutionContext)
.buildDataxTaskRelatedInfo(dataxTaskExecutionContext)
.buildProcedureTaskRelatedInfo(procedureTaskExecutionContext)
.buildSqoopTaskRelatedInfo(sqoopTaskExecutionContext)
.create();
}
/**
* set procedure task relation
*
* @param procedureTaskExecutionContext procedureTaskExecutionContext
* @param taskInstance taskInstance
* @param taskInstance taskInstance
*/
private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskInstance taskInstance) {
ProcedureParameters procedureParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class);
@ -271,7 +268,7 @@ public class TaskPriorityQueueConsumer extends Thread {
* set datax task relation
*
* @param dataxTaskExecutionContext dataxTaskExecutionContext
* @param taskInstance taskInstance
* @param taskInstance taskInstance
*/
protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskInstance taskInstance) {
DataxParameters dataxParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class);
@ -296,7 +293,7 @@ public class TaskPriorityQueueConsumer extends Thread {
* set sqoop task relation
*
* @param sqoopTaskExecutionContext sqoopTaskExecutionContext
* @param taskInstance taskInstance
* @param taskInstance taskInstance
*/
private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskInstance taskInstance) {
SqoopParameters sqoopParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class);
@ -327,7 +324,7 @@ public class TaskPriorityQueueConsumer extends Thread {
* set SQL task relation
*
* @param sqlTaskExecutionContext sqlTaskExecutionContext
* @param taskInstance taskInstance
* @param taskInstance taskInstance
*/
private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskInstance taskInstance) {
SqlParameters sqlParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class);
@ -337,7 +334,7 @@ public class TaskPriorityQueueConsumer extends Thread {
// whether udf type
boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
&& StringUtils.isNotEmpty(sqlParameters.getUdfs());
&& StringUtils.isNotEmpty(sqlParameters.getUdfs());
if (udfTypeFlag) {
String[] udfFunIds = sqlParameters.getUdfs().split(",");
@ -360,15 +357,15 @@ public class TaskPriorityQueueConsumer extends Thread {
/**
* whehter tenant is null
*
* @param tenant tenant
* @param tenant tenant
* @param taskInstance taskInstance
* @return result
*/
protected boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
if (tenant == null) {
logger.error("tenant not exists,process instance id : {},task instance id : {}",
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
return true;
}
return false;
@ -388,10 +385,7 @@ public class TaskPriorityQueueConsumer extends Thread {
// filter the resources that the resource id equals 0
Set<ResourceInfo> oldVersionResources = projectResourceFiles.stream().filter(t -> t.getId() == 0).collect(Collectors.toSet());
if (CollectionUtils.isNotEmpty(oldVersionResources)) {
oldVersionResources.forEach(
(t) -> resourcesMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE))
);
oldVersionResources.forEach(t -> resourcesMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE)));
}
// get the resource id in order to get the resource names in batch
@ -402,9 +396,7 @@ public class TaskPriorityQueueConsumer extends Thread {
Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]);
List<Resource> resources = processService.listResourceByIds(resourceIds);
resources.forEach(
(t) -> resourcesMap.put(t.getFullName(), processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE))
);
resources.forEach(t -> resourcesMap.put(t.getFullName(), processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE)));
}
}
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -484,7 +484,7 @@ public class MasterExecThread implements Runnable {
// process instance id
taskInstance.setProcessInstanceId(processInstance.getId());
// task instance type
taskInstance.setTaskType(taskNode.getType());
taskInstance.setTaskType(taskNode.getType().toUpperCase());
// task instance whether alert
taskInstance.setAlertFlag(Flag.NO);

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -200,7 +200,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
ackCommand.setHost(taskExecutionContext.getHost());
ackCommand.setStartTime(taskExecutionContext.getStartTime());
if (taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())) {
if (TaskType.SQL.getDesc().equalsIgnoreCase(taskExecutionContext.getTaskType()) || TaskType.PROCEDURE.getDesc().equalsIgnoreCase(taskExecutionContext.getTaskType())) {
ackCommand.setExecutePath(null);
} else {
ackCommand.setExecutePath(taskExecutionContext.getExecutePath());

19
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -20,9 +20,7 @@ package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
@ -132,7 +130,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
return;
}
if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date());
}
@ -146,7 +143,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
taskExecutionContext.getResources(),
logger);
taskExecutionContext.setTaskParams(taskExecutionContext.getTaskParams());
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setDefinedParams(getGlobalParamsMap());
@ -218,7 +214,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
/**
* get global paras map
* @return
* @return map
*/
private Map<String, String> getGlobalParamsMap() {
Map<String, String> globalParamsMap = new HashMap<>(16);
@ -249,13 +245,11 @@ public class TaskExecuteThread implements Runnable, Delayed {
/**
* download resource file
*
* @param execLocalPath
* @param projectRes
* @param logger
* @param execLocalPath execLocalPath
* @param projectRes projectRes
* @param logger logger
*/
private void downloadResource(String execLocalPath,
Map<String, String> projectRes,
Logger logger) throws Exception {
private void downloadResource(String execLocalPath, Map<String, String> projectRes, Logger logger) {
if (MapUtils.isEmpty(projectRes)) {
return;
}
@ -311,8 +305,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
ackCommand.setStartTime(taskExecutionContext.getStartTime());
ackCommand.setLogPath(taskExecutionContext.getLogPath());
ackCommand.setHost(taskExecutionContext.getHost());
if (taskExecutionContext.getTaskType().equals(TaskType.SQL.name())
|| taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())) {
if (TaskType.SQL.getDesc().equalsIgnoreCase(taskExecutionContext.getTaskType()) || TaskType.PROCEDURE.getDesc().equalsIgnoreCase(taskExecutionContext.getTaskType())) {
ackCommand.setExecutePath(null);
} else {
ackCommand.setExecutePath(taskExecutionContext.getExecutePath());

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java

@ -135,7 +135,7 @@ public abstract class AbstractCommandExecutor {
processBuilder.redirectErrorStream(true);
// setting up user to run commands
if (CommonUtils.isSudoEnable()) {
if (!OSUtils.isWindows() && CommonUtils.isSudoEnable()) {
command.add("sudo");
command.add("-u");
command.add(taskExecutionContext.getTenantCode());
@ -208,9 +208,7 @@ public abstract class AbstractCommandExecutor {
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{}",
taskExecutionContext.getExecutePath(),
processId
, result.getExitStatusCode());
taskExecutionContext.getExecutePath(), processId, result.getExitStatusCode());
// if SHELL task exit
if (status) {

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java

@ -194,8 +194,7 @@ public abstract class AbstractTask {
public void after() {
if (getExitStatusCode() == Constants.EXIT_CODE_SUCCESS) {
// task recor flat : if true , start up qianfan
if (TaskRecordDao.getTaskRecordFlag()
&& TaskType.typeIsNormalTask(taskExecutionContext.getTaskType())) {
if (TaskRecordDao.getTaskRecordFlag() && typeIsNormalTask(taskExecutionContext.getTaskType())) {
AbstractParameters params = TaskParametersUtils.getParameters(taskExecutionContext.getTaskType(), taskExecutionContext.getTaskParams());
// replace placeholder
@ -224,6 +223,9 @@ public abstract class AbstractTask {
}
}
private boolean typeIsNormalTask(String taskType) {
return !(TaskType.SUB_PROCESS.getDesc().equalsIgnoreCase(taskType) || TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskType));
}
/**
* get exit status according to exitCode

33
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java

@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.EnumUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask;
import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask;
@ -35,7 +33,7 @@ import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.slf4j.Logger;
/**
* task manaster
* task manager
*/
public class TaskManager {
@ -47,32 +45,27 @@ public class TaskManager {
* @throws IllegalArgumentException illegal argument exception
*/
public static AbstractTask newTask(TaskExecutionContext taskExecutionContext, Logger logger, AlertClientService alertClientService) throws IllegalArgumentException {
TaskType anEnum = EnumUtils.getEnum(TaskType.class, taskExecutionContext.getTaskType());
if (anEnum == null) {
logger.error("not support task type: {}", taskExecutionContext.getTaskType());
throw new IllegalArgumentException("not support task type");
}
switch (anEnum) {
case SHELL:
case WATERDROP:
switch (taskExecutionContext.getTaskType()) {
case "SHELL":
case "WATERDROP":
return new ShellTask(taskExecutionContext, logger);
case PROCEDURE:
case "PROCEDURE":
return new ProcedureTask(taskExecutionContext, logger);
case SQL:
case "SQL":
return new SqlTask(taskExecutionContext, logger, alertClientService);
case MR:
case "MR":
return new MapReduceTask(taskExecutionContext, logger);
case SPARK:
case "SPARK":
return new SparkTask(taskExecutionContext, logger);
case FLINK:
case "FLINK":
return new FlinkTask(taskExecutionContext, logger);
case PYTHON:
case "PYTHON":
return new PythonTask(taskExecutionContext, logger);
case HTTP:
case "HTTP":
return new HttpTask(taskExecutionContext, logger);
case DATAX:
case "DATAX":
return new DataxTask(taskExecutionContext, logger);
case SQOOP:
case "SQOOP":
return new SqoopTask(taskExecutionContext, logger);
default:
logger.error("not support task type: {}", taskExecutionContext.getTaskType());

199
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java

@ -16,32 +16,50 @@
*/
package org.apache.dolphinscheduler.server.worker.task.processdure;
import com.cronutils.utils.StringUtils;
import static org.apache.dolphinscheduler.common.enums.DataType.BOOLEAN;
import static org.apache.dolphinscheduler.common.enums.DataType.DATE;
import static org.apache.dolphinscheduler.common.enums.DataType.DOUBLE;
import static org.apache.dolphinscheduler.common.enums.DataType.FLOAT;
import static org.apache.dolphinscheduler.common.enums.DataType.INTEGER;
import static org.apache.dolphinscheduler.common.enums.DataType.LONG;
import static org.apache.dolphinscheduler.common.enums.DataType.TIME;
import static org.apache.dolphinscheduler.common.enums.DataType.TIMESTAMP;
import static org.apache.dolphinscheduler.common.enums.DataType.VARCHAR;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.slf4j.Logger;
import java.sql.*;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import static org.apache.dolphinscheduler.common.enums.DataType.*;
import org.slf4j.Logger;
/**
* procedure task
* procedure task
*/
public class ProcedureTask extends AbstractTask {
@ -63,8 +81,9 @@ public class ProcedureTask extends AbstractTask {
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
* @param logger logger
* @param logger logger
*/
public ProcedureTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
@ -111,7 +130,6 @@ public class ProcedureTask extends AbstractTask {
baseDataSource.getPassword());
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
@ -122,13 +140,13 @@ public class ProcedureTask extends AbstractTask {
Collection<Property> userDefParamsList = null;
if (procedureParameters.getLocalParametersMap() != null){
if (procedureParameters.getLocalParametersMap() != null) {
userDefParamsList = procedureParameters.getLocalParametersMap().values();
}
String method = getCallMethod(userDefParamsList);
logger.info("call method : {}",method);
logger.info("call method : {}", method);
// call method
stmt = connection.prepareCall(method);
@ -148,48 +166,48 @@ public class ProcedureTask extends AbstractTask {
printOutParameter(stmt, outParameterMap);
setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
}catch (Exception e){
} catch (Exception e) {
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
logger.error("procedure task error",e);
logger.error("procedure task error", e);
throw e;
}
finally {
close(stmt,connection);
} finally {
close(stmt, connection);
}
}
/**
* get call method
*
* @param userDefParamsList userDefParamsList
* @return method
*/
private String getCallMethod(Collection<Property> userDefParamsList) {
String method;// no parameters
if (CollectionUtils.isEmpty(userDefParamsList)){
if (CollectionUtils.isEmpty(userDefParamsList)) {
method = "{call " + procedureParameters.getMethod() + "}";
}else { // exists parameters
} else { // exists parameters
int size = userDefParamsList.size();
StringBuilder parameter = new StringBuilder();
parameter.append("(");
for (int i = 0 ;i < size - 1; i++){
for (int i = 0; i < size - 1; i++) {
parameter.append("?,");
}
parameter.append("?)");
method = "{call " + procedureParameters.getMethod() + parameter.toString()+ "}";
method = "{call " + procedureParameters.getMethod() + parameter.toString() + "}";
}
return method;
}
/**
* print outParameter
*
* @param stmt CallableStatement
* @param outParameterMap outParameterMap
* @throws SQLException
*/
private void printOutParameter(CallableStatement stmt,
Map<Integer, Property> outParameterMap) throws SQLException {
Iterator<Map.Entry<Integer, Property>> iter = outParameterMap.entrySet().iterator();
while (iter.hasNext()){
while (iter.hasNext()) {
Map.Entry<Integer, Property> en = iter.next();
int index = en.getKey();
@ -208,27 +226,26 @@ public class ProcedureTask extends AbstractTask {
* @param paramsMap paramsMap
* @param userDefParamsList userDefParamsList
* @return outParameterMap
* @throws Exception
*/
private Map<Integer, Property> getOutParameterMap(CallableStatement stmt,
Map<String, Property> paramsMap,
Collection<Property> userDefParamsList) throws Exception {
Map<Integer,Property> outParameterMap = new HashMap<>();
if (userDefParamsList != null && userDefParamsList.size() > 0){
Map<Integer, Property> outParameterMap = new HashMap<>();
if (userDefParamsList != null && userDefParamsList.size() > 0) {
int index = 1;
for (Property property : userDefParamsList){
for (Property property : userDefParamsList) {
logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}"
,property.getProp(),
, property.getProp(),
property.getDirect(),
property.getType(),
property.getValue());
// set parameters
if (property.getDirect().equals(Direct.IN)){
if (property.getDirect().equals(Direct.IN)) {
ParameterUtils.setInParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue());
}else if (property.getDirect().equals(Direct.OUT)){
setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
} else if (property.getDirect().equals(Direct.OUT)) {
setOutParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue());
property.setValue(paramsMap.get(property.getProp()).getValue());
outParameterMap.put(index,property);
outParameterMap.put(index, property);
}
index++;
}
@ -237,26 +254,22 @@ public class ProcedureTask extends AbstractTask {
}
/**
* set timtou
* set timeout
*
* @param stmt CallableStatement
* @throws SQLException
*/
private void setTimeout(CallableStatement stmt) throws SQLException {
Boolean failed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED;
Boolean warnfailed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED;
if(failed || warnfailed){
Boolean failed = taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED;
Boolean warnFailed = taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED;
if (failed || warnFailed) {
stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
}
}
/**
* close jdbc resource
*
* @param stmt
* @param connection
*/
private void close(PreparedStatement stmt,
Connection connection){
private void close(PreparedStatement stmt, Connection connection) {
if (stmt != null) {
try {
stmt.close();
@ -275,40 +288,35 @@ public class ProcedureTask extends AbstractTask {
/**
* get output parameter
* @param stmt
* @param index
* @param prop
* @param dataType
* @throws SQLException
*/
private void getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException {
switch (dataType){
switch (dataType) {
case VARCHAR:
logger.info("out prameter varchar key : {} , value : {}",prop,stmt.getString(index));
logger.info("out prameter varchar key : {} , value : {}", prop, stmt.getString(index));
break;
case INTEGER:
logger.info("out prameter integer key : {} , value : {}", prop, stmt.getInt(index));
break;
case LONG:
logger.info("out prameter long key : {} , value : {}",prop,stmt.getLong(index));
logger.info("out prameter long key : {} , value : {}", prop, stmt.getLong(index));
break;
case FLOAT:
logger.info("out prameter float key : {} , value : {}",prop,stmt.getFloat(index));
logger.info("out prameter float key : {} , value : {}", prop, stmt.getFloat(index));
break;
case DOUBLE:
logger.info("out prameter double key : {} , value : {}",prop,stmt.getDouble(index));
logger.info("out prameter double key : {} , value : {}", prop, stmt.getDouble(index));
break;
case DATE:
logger.info("out prameter date key : {} , value : {}",prop,stmt.getDate(index));
logger.info("out prameter date key : {} , value : {}", prop, stmt.getDate(index));
break;
case TIME:
logger.info("out prameter time key : {} , value : {}",prop,stmt.getTime(index));
logger.info("out prameter time key : {} , value : {}", prop, stmt.getTime(index));
break;
case TIMESTAMP:
logger.info("out prameter timestamp key : {} , value : {}",prop,stmt.getTimestamp(index));
logger.info("out prameter timestamp key : {} , value : {}", prop, stmt.getTimestamp(index));
break;
case BOOLEAN:
logger.info("out prameter boolean key : {} , value : {}",prop, stmt.getBoolean(index));
logger.info("out prameter boolean key : {} , value : {}", prop, stmt.getBoolean(index));
break;
default:
break;
@ -322,72 +330,73 @@ public class ProcedureTask extends AbstractTask {
/**
* set out parameter
* @param index index
* @param stmt stmt
* @param dataType dataType
* @param value value
*
* @param index index
* @param stmt stmt
* @param dataType dataType
* @param value value
* @throws Exception exception
*/
private void setOutParameter(int index,CallableStatement stmt,DataType dataType,String value)throws Exception{
if (dataType.equals(VARCHAR)){
if (StringUtils.isEmpty(value)){
private void setOutParameter(int index, CallableStatement stmt, DataType dataType, String value) throws Exception {
if (dataType.equals(VARCHAR)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.VARCHAR);
}else {
} else {
stmt.registerOutParameter(index, Types.VARCHAR, value);
}
}else if (dataType.equals(INTEGER)){
if (StringUtils.isEmpty(value)){
} else if (dataType.equals(INTEGER)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.INTEGER);
}else {
} else {
stmt.registerOutParameter(index, Types.INTEGER, value);
}
}else if (dataType.equals(LONG)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index,Types.INTEGER);
}else {
stmt.registerOutParameter(index,Types.INTEGER ,value);
} else if (dataType.equals(LONG)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.INTEGER);
} else {
stmt.registerOutParameter(index, Types.INTEGER, value);
}
}else if (dataType.equals(FLOAT)){
if (StringUtils.isEmpty(value)){
} else if (dataType.equals(FLOAT)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.FLOAT);
}else {
stmt.registerOutParameter(index, Types.FLOAT,value);
} else {
stmt.registerOutParameter(index, Types.FLOAT, value);
}
}else if (dataType.equals(DOUBLE)){
if (StringUtils.isEmpty(value)){
} else if (dataType.equals(DOUBLE)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.DOUBLE);
}else {
stmt.registerOutParameter(index, Types.DOUBLE , value);
} else {
stmt.registerOutParameter(index, Types.DOUBLE, value);
}
}else if (dataType.equals(DATE)){
if (StringUtils.isEmpty(value)){
} else if (dataType.equals(DATE)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.DATE);
}else {
stmt.registerOutParameter(index, Types.DATE , value);
} else {
stmt.registerOutParameter(index, Types.DATE, value);
}
}else if (dataType.equals(TIME)){
if (StringUtils.isEmpty(value)){
} else if (dataType.equals(TIME)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.TIME);
}else {
stmt.registerOutParameter(index, Types.TIME , value);
} else {
stmt.registerOutParameter(index, Types.TIME, value);
}
}else if (dataType.equals(TIMESTAMP)){
if (StringUtils.isEmpty(value)){
} else if (dataType.equals(TIMESTAMP)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.TIMESTAMP);
}else {
stmt.registerOutParameter(index, Types.TIMESTAMP , value);
} else {
stmt.registerOutParameter(index, Types.TIMESTAMP, value);
}
}else if (dataType.equals(BOOLEAN)){
if (StringUtils.isEmpty(value)){
} else if (dataType.equals(BOOLEAN)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.BOOLEAN);
}else {
stmt.registerOutParameter(index, Types.BOOLEAN , value);
} else {
stmt.registerOutParameter(index, Types.BOOLEAN, value);
}
}
}

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -429,8 +429,8 @@ public class SqlTask extends AbstractTask {
*/
private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) throws Exception {
// is the timeout set
boolean timeoutFlag = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED
|| TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED;
boolean timeoutFlag = taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED
|| taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED;
PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql());
if (timeoutFlag) {
stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());

10
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java

@ -42,15 +42,11 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
@RunWith(MockitoJUnitRunner.Silent.class)
public class ConditionsTaskTest {
private static final Logger logger = LoggerFactory.getLogger(DependentTaskTest.class);
/**
* TaskNode.runFlag : task can be run normally
*/
@ -120,7 +116,7 @@ public class ConditionsTaskTest {
}
@Test
public void testBasicFailure() throws Exception {
public void testBasicFailure() {
TaskInstance taskInstance = testBasicInit(ExecutionStatus.FAILURE);
ConditionsTaskExecThread taskExecThread = new ConditionsTaskExecThread(taskInstance);
taskExecThread.call();
@ -131,7 +127,7 @@ public class ConditionsTaskTest {
TaskNode taskNode = new TaskNode();
taskNode.setId("tasks-1000");
taskNode.setName("C");
taskNode.setType(TaskType.CONDITIONS.toString());
taskNode.setType(TaskType.CONDITIONS.getDesc());
taskNode.setRunFlag(FLOWNODE_RUN_FLAG_NORMAL);
DependentItem dependentItem = new DependentItem();
@ -171,7 +167,7 @@ public class ConditionsTaskTest {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1000);
taskInstance.setName(taskNode.getName());
taskInstance.setTaskType(taskNode.getType());
taskInstance.setTaskType(taskNode.getType().toUpperCase());
taskInstance.setProcessInstanceId(processInstance.getId());
return taskInstance;
}

26
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -37,8 +37,6 @@ import org.apache.dolphinscheduler.server.master.runner.DependentTaskExecThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -92,7 +90,7 @@ public class DependentTaskTest {
processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
processInstance = getProcessInstance(100, 1);
processInstance = getProcessInstance();
// for MasterBaseTaskExecThread.call
// for DependentTaskExecThread.waitTaskQuit
@ -142,7 +140,7 @@ public class DependentTaskTest {
}
@Test
public void testBasicSuccess() throws Exception {
public void testBasicSuccess() {
testBasicInit();
ProcessInstance dependentProcessInstance =
getProcessInstanceForFindLastRunningProcess(200, ExecutionStatus.FAILURE);
@ -165,7 +163,7 @@ public class DependentTaskTest {
}
@Test
public void testBasicFailure() throws Exception {
public void testBasicFailure() {
testBasicInit();
ProcessInstance dependentProcessInstance =
getProcessInstanceForFindLastRunningProcess(200, ExecutionStatus.SUCCESS);
@ -188,7 +186,7 @@ public class DependentTaskTest {
}
@Test
public void testDependentRelation() throws Exception {
public void testDependentRelation() {
DependentTaskModel dependentTaskModel1 = new DependentTaskModel();
dependentTaskModel1.setRelation(DependentRelation.AND);
dependentTaskModel1.setDependItemList(Stream.of(
@ -271,7 +269,7 @@ public class DependentTaskTest {
}
@Test
public void testDependentOnAllSuccess() throws Exception {
public void testDependentOnAllSuccess() {
testDependentOnAllInit();
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
@ -284,7 +282,7 @@ public class DependentTaskTest {
}
@Test
public void testDependentOnAllFailure() throws Exception {
public void testDependentOnAllFailure() {
testDependentOnAllInit();
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
@ -300,7 +298,7 @@ public class DependentTaskTest {
* test whether waitTaskQuit has been well impl
*/
@Test
public void testWaitAndCancel() throws Exception {
public void testWaitAndCancel() {
// for the poor independence of UT, error on other place may causes the condition happens
if (!Stopper.isRunning()) {
return;
@ -346,9 +344,9 @@ public class DependentTaskTest {
Assert.assertEquals(ExecutionStatus.KILL, taskExecThread.getTaskInstance().getState());
}
private ProcessInstance getProcessInstance(int processInstanceId, int processDefinitionId) {
private ProcessInstance getProcessInstance() {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(processInstanceId);
processInstance.setId(100);
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
return processInstance;
}
@ -361,7 +359,7 @@ public class DependentTaskTest {
TaskNode taskNode = new TaskNode();
taskNode.setId("tasks-10");
taskNode.setName("D");
taskNode.setType(TaskType.DEPENDENT.toString());
taskNode.setType(TaskType.DEPENDENT.getDesc());
taskNode.setRunFlag(FLOWNODE_RUN_FLAG_NORMAL);
return taskNode;
}
@ -382,7 +380,7 @@ public class DependentTaskTest {
taskInstance.setTaskDefinitionVersion(TASK_VERSION);
taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
taskInstance.setTaskType(taskNode.getType());
taskInstance.setTaskType(taskNode.getType().toUpperCase());
taskInstance.setDependency(JSONUtils.parseObject(taskNode.getDependence(), DependentParameters.class));
taskInstance.setName(taskNode.getName());
}
@ -414,7 +412,7 @@ public class DependentTaskTest {
String taskName, ProcessInstance processInstance
) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("DEPENDENT");
taskInstance.setTaskType(TaskType.DEPENDENT.getDesc());
taskInstance.setId(taskInstanceId);
taskInstance.setName(taskName);
taskInstance.setProcessInstanceId(processInstance.getId());

7
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@ -122,7 +121,7 @@ public class SubProcessTaskTest {
TaskNode taskNode = new TaskNode();
taskNode.setId("tasks-10");
taskNode.setName("S");
taskNode.setType(TaskType.SUB_PROCESS.toString());
taskNode.setType(TaskType.SUB_PROCESS.getDesc());
taskNode.setRunFlag(FLOWNODE_RUN_FLAG_NORMAL);
return taskNode;
}
@ -147,9 +146,9 @@ public class SubProcessTaskTest {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1000);
taskInstance.setName("S");
taskInstance.setTaskType(TaskType.SUB_PROCESS.toString());
taskInstance.setTaskType(TaskType.SUB_PROCESS.getDesc());
taskInstance.setName(taskNode.getName());
taskInstance.setTaskType(taskNode.getType());
taskInstance.setTaskType(taskNode.getType().toUpperCase());
taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
return taskInstance;

47
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -22,9 +22,8 @@ import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@ -106,7 +105,7 @@ public class TaskPriorityQueueConsumerTest {
public void testSHELLTask() throws Exception {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setTaskType(TaskType.SHELL.getDesc());
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -137,7 +136,7 @@ public class TaskPriorityQueueConsumerTest {
public void testSQLTask() throws Exception {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SQL");
taskInstance.setTaskType(TaskType.SQL.getDesc());
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -180,7 +179,7 @@ public class TaskPriorityQueueConsumerTest {
public void testDataxTask() throws Exception {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("DATAX");
taskInstance.setTaskType(TaskType.DATAX.getDesc());
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -221,7 +220,7 @@ public class TaskPriorityQueueConsumerTest {
public void testSqoopTask() throws Exception {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SQOOP");
taskInstance.setTaskType(TaskType.SQOOP.getDesc());
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -262,7 +261,7 @@ public class TaskPriorityQueueConsumerTest {
public void testTaskInstanceIsFinalState() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setTaskType(TaskType.SHELL.getDesc());
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -279,7 +278,7 @@ public class TaskPriorityQueueConsumerTest {
public void testNotFoundWorkerGroup() throws Exception {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setTaskType(TaskType.SHELL.getDesc());
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -311,10 +310,10 @@ public class TaskPriorityQueueConsumerTest {
}
@Test
public void testDispatch() throws Exception {
public void testDispatch() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setTaskType(TaskType.SHELL.getDesc());
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -338,7 +337,7 @@ public class TaskPriorityQueueConsumerTest {
TaskPriority taskPriority = new TaskPriority();
taskPriority.setTaskId(1);
boolean res = taskPriorityQueueConsumer.dispatch(taskPriority);
boolean res = taskPriorityQueueConsumer.dispatch(taskPriority);
Assert.assertFalse(res);
}
@ -348,7 +347,7 @@ public class TaskPriorityQueueConsumerTest {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setTaskType(TaskType.SHELL.getDesc());
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -370,16 +369,16 @@ public class TaskPriorityQueueConsumerTest {
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
TaskExecutionContext taskExecutionContext = taskPriorityQueueConsumer.getTaskExecutionContext(1);
TaskExecutionContext taskExecutionContext = taskPriorityQueueConsumer.getTaskExecutionContext(1);
Assert.assertNotNull(taskExecutionContext);
}
@Test
public void testGetResourceFullNames() throws Exception {
public void testGetResourceFullNames() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setTaskType(TaskType.SHELL.getDesc());
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -387,7 +386,7 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setExecutorId(2);
// task node
Map<String, String> map = taskPriorityQueueConsumer.getResourceFullNames(taskInstance);
Map<String, String> map = taskPriorityQueueConsumer.getResourceFullNames(taskInstance);
List<Resource> resourcesList = new ArrayList<Resource>();
Resource resource = new Resource();
@ -401,19 +400,19 @@ public class TaskPriorityQueueConsumerTest {
}
@Test
public void testVerifyTenantIsNull() throws Exception {
public void testVerifyTenantIsNull() {
Tenant tenant = null;
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setTaskType(TaskType.SHELL.getDesc());
taskInstance.setProcessInstanceId(1);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
taskInstance.setProcessInstance(processInstance);
boolean res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant,taskInstance);
boolean res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant, taskInstance);
Assert.assertTrue(res);
tenant = new Tenant();
@ -423,7 +422,7 @@ public class TaskPriorityQueueConsumerTest {
tenant.setQueueId(1);
tenant.setCreateTime(new Date());
tenant.setUpdateTime(new Date());
res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant,taskInstance);
res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant, taskInstance);
Assert.assertFalse(res);
}
@ -442,15 +441,15 @@ public class TaskPriorityQueueConsumerTest {
taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext, taskInstance);
Assert.assertEquals(1,dataxTaskExecutionContext.getDataSourceId());
Assert.assertEquals(1,dataxTaskExecutionContext.getDataTargetId());
Assert.assertEquals(1, dataxTaskExecutionContext.getDataSourceId());
Assert.assertEquals(1, dataxTaskExecutionContext.getDataTargetId());
}
@Test
public void testRun() throws Exception {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setTaskType(TaskType.SHELL.getDesc());
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
@ -478,7 +477,7 @@ public class TaskPriorityQueueConsumerTest {
taskPriorityQueueConsumer.run();
TimeUnit.SECONDS.sleep(10);
Assert.assertNotEquals(-1,taskPriorityQueue.size());
Assert.assertNotEquals(-1, taskPriorityQueue.size());
}

3
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -111,7 +112,7 @@ public class MasterTaskExecThreadTest {
private TaskInstance getTaskInstance(){
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("SHELL");
taskInstance.setTaskType(TaskType.SHELL.getDesc());
taskInstance.setId(252612);
taskInstance.setName("C");
taskInstance.setProcessInstanceId(10111);

3
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -171,7 +172,7 @@ public class TaskExecuteProcessorTest {
taskExecutionContext.setProcessId(12345);
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskType("sql");
taskExecutionContext.setTaskType(TaskType.SQL.getDesc());
taskExecutionContext.setFirstSubmitTime(new Date());
taskExecutionContext.setDelayTime(0);
taskExecutionContext.setLogPath("/tmp/test.log");

7
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
@ -77,7 +78,7 @@ public class TaskExecuteThreadTest {
taskExecutionContext.setProcessId(12345);
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskType("");
taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
taskExecutionContext.setFirstSubmitTime(new Date());
taskExecutionContext.setDelayTime(0);
taskExecutionContext.setLogPath("/tmp/test.log");
@ -124,7 +125,7 @@ public class TaskExecuteThreadTest {
@Test
public void testNormalExecution() {
taskExecutionContext.setTaskType("SQL");
taskExecutionContext.setTaskType(TaskType.SQL.getDesc());
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
taskExecutionContext.setTenantCode("test");
@ -139,7 +140,7 @@ public class TaskExecuteThreadTest {
@Test
public void testDelayExecution() {
taskExecutionContext.setTaskType("PYTHON");
taskExecutionContext.setTaskType(TaskType.PYTHON.getDesc());
taskExecutionContext.setStartTime(null);
taskExecutionContext.setDelayTime(1);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);

3
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
@ -82,7 +83,7 @@ public class WorkerManagerThreadTest {
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTenantCode("test");
taskExecutionContext.setTaskType("");
taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
taskExecutionContext.setFirstSubmitTime(new Date());
taskExecutionContext.setDelayTime(0);
taskExecutionContext.setLogPath("/tmp/test.log");

5
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java

@ -19,9 +19,8 @@ package org.apache.dolphinscheduler.server.worker.task;
import static org.mockito.ArgumentMatchers.anyString;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask;
import org.apache.dolphinscheduler.server.worker.task.shell.ShellTaskTest;
@ -61,7 +60,7 @@ public class ShellTaskReturnTest {
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskName("kris test");
taskExecutionContext.setTaskType("SHELL");
taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
taskExecutionContext.setHost("127.0.0.1:1234");
taskExecutionContext.setExecutePath("/tmp");
taskExecutionContext.setLogPath("/log");

27
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -48,7 +49,7 @@ import org.slf4j.LoggerFactory;
@PrepareForTest({SpringApplicationContext.class})
public class TaskManagerTest {
private static Logger logger = LoggerFactory.getLogger(TaskManagerTest.class);
private static final Logger logger = LoggerFactory.getLogger(TaskManagerTest.class);
private TaskExecutionContext taskExecutionContext;
@ -65,7 +66,7 @@ public class TaskManagerTest {
taskExecutionContext.setProcessId(12345);
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskType("");
taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
taskExecutionContext.setFirstSubmitTime(new Date());
taskExecutionContext.setDelayTime(0);
taskExecutionContext.setLogPath("/tmp/test.log");
@ -91,23 +92,23 @@ public class TaskManagerTest {
@Test
public void testNewTask() {
taskExecutionContext.setTaskType("SHELL");
taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("WATERDROP");
taskExecutionContext.setTaskType(TaskType.WATERDROP.getDesc());
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("HTTP");
taskExecutionContext.setTaskType(TaskType.HTTP.getDesc());
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("MR");
taskExecutionContext.setTaskType(TaskType.MR.getDesc());
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("SPARK");
taskExecutionContext.setTaskType(TaskType.SPARK.getDesc());
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("FLINK");
taskExecutionContext.setTaskType(TaskType.FLINK.getDesc());
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("PYTHON");
taskExecutionContext.setTaskType(TaskType.PYTHON.getDesc());
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("DATAX");
taskExecutionContext.setTaskType(TaskType.DATAX.getDesc());
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("SQOOP");
taskExecutionContext.setTaskType(TaskType.SQOOP.getDesc());
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
}
@ -120,7 +121,7 @@ public class TaskManagerTest {
@Test(expected = IllegalArgumentException.class)
public void testNewTaskIsNotExists() {
taskExecutionContext.setTaskType("XXX");
taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService);
}
@ -128,7 +129,7 @@ public class TaskManagerTest {
public void testShellTaskReturnString() {
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskName("kris test");
taskExecutionContext.setTaskType("SHELL");
taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
taskExecutionContext.setHost("127.0.0.1:1234");
taskExecutionContext.setExecutePath("/tmp");
taskExecutionContext.setLogPath("/log");

6
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.task.shell;
import static org.mockito.ArgumentMatchers.anyString;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
@ -27,11 +28,12 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -64,7 +66,7 @@ public class ShellTaskTest {
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskName("kris test");
taskExecutionContext.setTaskType("SHELL");
taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
taskExecutionContext.setHost("127.0.0.1:1234");
taskExecutionContext.setExecutePath("/tmp");
taskExecutionContext.setLogPath("/log");

22
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -1428,16 +1428,19 @@ public class ProcessService {
// get task instance
TaskInstance taskInstance = findTaskInstanceById(taskInstId);
if (taskInstance == null) {
return taskInstance;
return null;
}
// get process instance
ProcessInstance processInstance = findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
// get process define
ProcessDefinition processDefine = findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processDefine);
TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
taskInstance.setTaskDefine(taskDefinition);
return taskInstance;
}
@ -2223,8 +2226,8 @@ public class ProcessService {
private void setTaskFromTaskNode(TaskNode taskNode, TaskDefinition taskDefinition) {
taskDefinition.setName(taskNode.getName());
taskDefinition.setDescription(taskNode.getDesc());
taskDefinition.setTaskType(TaskType.of(taskNode.getType()));
taskDefinition.setTaskParams(TaskType.of(taskNode.getType()) == TaskType.DEPENDENT ? taskNode.getDependence() : taskNode.getParams());
taskDefinition.setTaskType(taskNode.getType().toUpperCase());
taskDefinition.setTaskParams(TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskNode.getType()) ? taskNode.getDependence() : taskNode.getParams());
taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES);
taskDefinition.setTaskPriority(taskNode.getTaskInstancePriority());
taskDefinition.setWorkerGroup(taskNode.getWorkerGroup());
@ -2244,8 +2247,7 @@ public class ProcessService {
*/
public String getResourceIds(TaskDefinition taskDefinition) {
Set<Integer> resourceIds = null;
// TODO modify taskDefinition.getTaskType()
AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType().getDescp().toUpperCase(), taskDefinition.getTaskParams());
AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType(), taskDefinition.getTaskParams());
if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
resourceIds = params.getResourceFilesList().
@ -2505,12 +2507,12 @@ public class ProcessService {
v.setCode(taskDefinitionLog.getCode() + "");
v.setName(taskDefinitionLog.getName());
v.setDesc(taskDefinitionLog.getDescription());
v.setType(taskDefinitionLog.getTaskType().getDescp().toUpperCase());
v.setType(taskDefinitionLog.getTaskType().toUpperCase());
v.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_NORMAL : Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
v.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes());
v.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
v.setParams(taskDefinitionLog.getTaskType() == TaskType.DEPENDENT ? null : taskDefinitionLog.getTaskParams());
v.setDependence(taskDefinitionLog.getTaskType() == TaskType.DEPENDENT ? taskDefinitionLog.getTaskParams() : null);
v.setParams(TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskDefinitionLog.getTaskType()) ? null : taskDefinitionLog.getTaskParams());
v.setDependence(TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskDefinitionLog.getTaskType()) ? taskDefinitionLog.getTaskParams() : null);
v.setTaskInstancePriority(taskDefinitionLog.getTaskPriority());
v.setWorkerGroup(taskDefinitionLog.getWorkerGroup());
v.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
@ -2550,7 +2552,7 @@ public class ProcessService {
taskNode.setName(taskDefinition.getName());
taskNode.setName(taskDefinition.getName());
taskNode.setDesc(taskDefinition.getDescription());
taskNode.setType(taskDefinition.getTaskType().getDescp());
taskNode.setType(taskDefinition.getTaskType());
taskNode.setRunFlag(taskDefinition.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_FORBIDDEN : Constants.FLOWNODE_RUN_FLAG_NORMAL);
taskNode.setMaxRetryTimes(taskDefinition.getFailRetryTimes());
taskNode.setRetryInterval(taskDefinition.getFailRetryInterval());

8
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -411,7 +411,7 @@ public class ProcessServiceTest {
taskDefinition.setCode(3L);
taskDefinition.setName("1-test");
taskDefinition.setProjectCode(1L);
taskDefinition.setTaskType(TaskType.SHELL);
taskDefinition.setTaskType(TaskType.SHELL.getDesc());
taskDefinition.setUserId(1);
taskDefinition.setVersion(2);
taskDefinition.setCreateTime(new Date());
@ -421,7 +421,7 @@ public class ProcessServiceTest {
td2.setCode(2L);
td2.setName("unit-test");
td2.setProjectCode(1L);
td2.setTaskType(TaskType.SHELL);
td2.setTaskType(TaskType.SHELL.getDesc());
td2.setUserId(1);
td2.setVersion(1);
td2.setCreateTime(new Date());
@ -472,7 +472,7 @@ public class ProcessServiceTest {
taskDefinition.setCode(3L);
taskDefinition.setName("1-test");
taskDefinition.setProjectCode(1L);
taskDefinition.setTaskType(TaskType.SHELL);
taskDefinition.setTaskType(TaskType.SHELL.getDesc());
taskDefinition.setUserId(1);
taskDefinition.setVersion(2);
taskDefinition.setCreateTime(new Date());
@ -482,7 +482,7 @@ public class ProcessServiceTest {
td2.setCode(2L);
td2.setName("unit-test");
td2.setProjectCode(1L);
td2.setTaskType(TaskType.SHELL);
td2.setTaskType(TaskType.SHELL.getDesc());
td2.setUserId(1);
td2.setVersion(1);
td2.setCreateTime(new Date());

6
sql/dolphinscheduler_mysql.sql

@ -455,7 +455,7 @@ CREATE TABLE `t_ds_task_definition` (
`description` text COMMENT 'description',
`project_code` bigint(20) NOT NULL COMMENT 'project code',
`user_id` int(11) DEFAULT NULL COMMENT 'task definition creator id',
`task_type` tinyint(4) DEFAULT NULL COMMENT '0 shell,1 sql,2 sub_process,3 procedure,4 mr,5 spark,6 python,7 dependent,8 flink,9 http,10 datax,11 conditions,12 sqoop,13 waterdrop',
`task_type` varchar(50) DEFAULT NOT NULL COMMENT 'task type',
`task_params` text COMMENT 'job custom parameters',
`flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available',
`task_priority` tinyint(4) DEFAULT NULL COMMENT 'job priority',
@ -484,7 +484,7 @@ CREATE TABLE `t_ds_task_definition_log` (
`description` text COMMENT 'description',
`project_code` bigint(20) NOT NULL COMMENT 'project code',
`user_id` int(11) DEFAULT NULL COMMENT 'task definition creator id',
`task_type` tinyint(4) DEFAULT NULL COMMENT '0 shell,1 sql,2 sub_process,3 procedure,4 mr,5 spark,6 python,7 dependent,8 flink,9 http,10 datax,11 conditions,12 sqoop,13 waterdrop',
`task_type` varchar(50) DEFAULT NOT NULL COMMENT 'task type',
`task_params` text COMMENT 'job custom parameters',
`flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available',
`task_priority` tinyint(4) DEFAULT NULL COMMENT 'job priority',
@ -814,7 +814,7 @@ DROP TABLE IF EXISTS `t_ds_task_instance`;
CREATE TABLE `t_ds_task_instance` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
`name` varchar(255) DEFAULT NULL COMMENT 'task name',
`task_type` varchar(64) DEFAULT NULL COMMENT 'task type',
`task_type` varchar(50) DEFAULT NOT NULL COMMENT 'task type',
`task_code` bigint(20) NOT NULL COMMENT 'task definition code',
`task_definition_version` int(11) DEFAULT NULL COMMENT 'task definition version',
`process_instance_id` int(11) DEFAULT NULL COMMENT 'process instance id',

6
sql/dolphinscheduler_postgre.sql

@ -343,7 +343,7 @@ CREATE TABLE t_ds_task_definition (
description text ,
project_code bigint DEFAULT NULL ,
user_id int DEFAULT NULL ,
task_type int DEFAULT NULL ,
task_type varchar(50) DEFAULT NULL ,
task_params text ,
flag int DEFAULT NULL ,
task_priority int DEFAULT NULL ,
@ -371,7 +371,7 @@ CREATE TABLE t_ds_task_definition_log (
description text ,
project_code bigint DEFAULT NULL ,
user_id int DEFAULT NULL ,
task_type int DEFAULT NULL ,
task_type varchar(50) DEFAULT NULL ,
task_params text ,
flag int DEFAULT NULL ,
task_priority int DEFAULT NULL ,
@ -671,7 +671,7 @@ DROP TABLE IF EXISTS t_ds_task_instance;
CREATE TABLE t_ds_task_instance (
id int NOT NULL ,
name varchar(255) DEFAULT NULL ,
task_type varchar(64) DEFAULT NULL ,
task_type varchar(50) DEFAULT NULL ,
task_code bigint NOT NULL,
task_definition_version int DEFAULT NULL ,
process_instance_id int DEFAULT NULL ,

Loading…
Cancel
Save