diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 70d77e5cf8..6dabcb22bc 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -744,7 +744,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * @return if task has sub process return true else false */ private boolean checkTaskHasSubProcess(String taskType) { - return taskType.equals(TaskType.SUB_PROCESS.name()); + return taskType.equals(TaskType.SUB_PROCESS.getDesc()); } /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 28d1eb5bb5..21c79bd043 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -329,7 +329,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce */ private void addDependResultForTaskList(List taskInstanceList) throws IOException { for (TaskInstance taskInstance : taskInstanceList) { - if (taskInstance.getTaskType().equalsIgnoreCase(TaskType.DEPENDENT.toString())) { + if (TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) { Result logResult = loggerService.queryLog( taskInstance.getId(), Constants.LOG_QUERY_SKIP_LINE_NUMBER, Constants.LOG_QUERY_LIMIT); if (logResult.getCode() == Status.SUCCESS.ordinal()) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java index a13bb992d0..096bb298f5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java @@ -16,11 +16,6 @@ */ package org.apache.dolphinscheduler.api.utils; -import java.text.MessageFormat; -import java.util.HashMap; -import java.util.Map; -import java.util.regex.Pattern; - import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.TaskType; @@ -30,6 +25,11 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; +import java.text.MessageFormat; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + /** * check utils @@ -128,10 +128,10 @@ public class CheckUtils { public static boolean checkTaskNodeParameters(TaskNode taskNode) { AbstractParameters abstractParameters; - if (TaskType.of(taskNode.getType()) == TaskType.DEPENDENT) { - abstractParameters = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getDependence()); + if (TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskNode.getType())) { + abstractParameters = TaskParametersUtils.getParameters(taskNode.getType().toUpperCase(), taskNode.getDependence()); } else { - abstractParameters = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); + abstractParameters = TaskParametersUtils.getParameters(taskNode.getType().toUpperCase(), taskNode.getParams()); } if (abstractParameters != null) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java index f34554bb89..8572d7b482 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java @@ -78,7 +78,7 @@ public class DataSourceParam implements ProcessAddTaskParam, InitializingBean { */ @Override public void afterPropertiesSet() { - TaskNodeParamFactory.register(TaskType.SQL.name(), this); - TaskNodeParamFactory.register(TaskType.PROCEDURE.name(), this); + TaskNodeParamFactory.register(TaskType.SQL.getDesc(), this); + TaskNodeParamFactory.register(TaskType.PROCEDURE.getDesc(), this); } } \ No newline at end of file diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java index c5bc59cb9f..29746f8f2a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java @@ -109,6 +109,6 @@ public class DependentParam implements ProcessAddTaskParam, InitializingBean { */ @Override public void afterPropertiesSet() { - TaskNodeParamFactory.register(TaskType.DEPENDENT.name(), this); + TaskNodeParamFactory.register(TaskType.DEPENDENT.getDesc(), this); } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 2bc25e5e3d..2a4443d492 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.api.service; -import static org.assertj.core.api.Assertions.assertThat; import static org.powermock.api.mockito.PowerMockito.mock; import static org.powermock.api.mockito.PowerMockito.when; @@ -30,13 +29,12 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; -import org.apache.dolphinscheduler.common.process.ResourceInfo; -import org.apache.dolphinscheduler.common.task.shell.ShellParameters; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -61,11 +59,8 @@ import org.apache.http.entity.ContentType; import java.io.File; import java.io.FileInputStream; import java.io.IOException; -import java.lang.reflect.Method; import java.text.MessageFormat; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -82,7 +77,6 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import org.springframework.mock.web.MockMultipartFile; -import org.springframework.util.ReflectionUtils; import org.springframework.web.multipart.MultipartFile; import com.baomidou.mybatisplus.core.metadata.IPage; @@ -696,7 +690,7 @@ public class ProcessDefinitionServiceTest { Assert.assertEquals(Status.PROCESS_NODE_HAS_CYCLE, taskCycleRes.get(Constants.STATUS)); //json abnormal - String abnormalJson = processDefinitionJson.replaceAll("SHELL", ""); + String abnormalJson = processDefinitionJson.replaceAll(TaskType.SHELL.getDesc(), ""); processData = JSONUtils.parseObject(abnormalJson, ProcessData.class); Map abnormalTaskRes = processDefinitionService.checkProcessNodeList(processData, abnormalJson); Assert.assertEquals(Status.PROCESS_NODE_S_PARAMETER_INVALID, abnormalTaskRes.get(Constants.STATUS)); @@ -795,7 +789,7 @@ public class ProcessDefinitionServiceTest { TaskInstance taskInstance = new TaskInstance(); taskInstance.setStartTime(new Date()); taskInstance.setEndTime(new Date()); - taskInstance.setTaskType("SHELL"); + taskInstance.setTaskType(TaskType.SHELL.getDesc()); taskInstance.setId(1); taskInstance.setName("test_task_instance"); taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); @@ -832,7 +826,7 @@ public class ProcessDefinitionServiceTest { TaskInstance taskInstance = new TaskInstance(); taskInstance.setStartTime(new Date()); taskInstance.setEndTime(new Date()); - taskInstance.setTaskType("SUB_PROCESS"); + taskInstance.setTaskType(TaskType.SUB_PROCESS.getDesc()); taskInstance.setId(1); taskInstance.setName("test_task_instance"); taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); @@ -1128,7 +1122,6 @@ public class ProcessDefinitionServiceTest { * @return ProcessDefinition */ private ProcessDefinition getProcessDefinition() { - ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setId(46); processDefinition.setName("test_pdf"); @@ -1229,7 +1222,7 @@ public class ProcessDefinitionServiceTest { @Test public void testExportProcessMetaData() { - Integer processDefinitionId = 111; + int processDefinitionId = 111; ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setId(processDefinitionId); Assert.assertNotNull(processDefinitionService.exportProcessMetaData(processDefinition)); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index f6777fdb95..e85449dc77 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -281,7 +281,7 @@ public class ProcessInstanceServiceTest { ProcessInstance processInstance = getProcessInstance(); processInstance.setState(ExecutionStatus.SUCCESS); TaskInstance taskInstance = new TaskInstance(); - taskInstance.setTaskType(TaskType.SHELL.getDescp()); + taskInstance.setTaskType(TaskType.SHELL.getDesc()); List taskInstanceList = new ArrayList<>(); taskInstanceList.add(taskInstance); Result res = new Result(); @@ -332,7 +332,7 @@ public class ProcessInstanceServiceTest { //task not sub process TaskInstance taskInstance = getTaskInstance(); - taskInstance.setTaskType(TaskType.HTTP.toString()); + taskInstance.setTaskType(TaskType.HTTP.getDesc()); taskInstance.setProcessInstanceId(1); when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); Map notSubprocessRes = processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectName, 1); @@ -340,7 +340,7 @@ public class ProcessInstanceServiceTest { //sub process not exist TaskInstance subTask = getTaskInstance(); - subTask.setTaskType(TaskType.SUB_PROCESS.toString()); + subTask.setTaskType(TaskType.SUB_PROCESS.getDesc()); subTask.setProcessInstanceId(1); when(processService.findTaskInstanceById(subTask.getId())).thenReturn(subTask); when(processService.findSubProcessInstance(subTask.getProcessInstanceId(), subTask.getId())).thenReturn(null); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java index 14e530f59c..119af06b8b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java @@ -152,18 +152,18 @@ public class CheckUtilsTest { // sub SubProcessParameters SubProcessParameters subProcessParameters = new SubProcessParameters(); taskNode.setParams(JSONUtils.toJsonString(subProcessParameters)); - taskNode.setType(TaskType.SUB_PROCESS.toString()); + taskNode.setType(TaskType.SUB_PROCESS.getDesc()); assertFalse(CheckUtils.checkTaskNodeParameters(taskNode)); subProcessParameters.setProcessDefinitionId(1234); taskNode.setParams(JSONUtils.toJsonString(subProcessParameters)); - taskNode.setType(TaskType.SUB_PROCESS.toString()); + taskNode.setType(TaskType.SUB_PROCESS.getDesc()); assertTrue(CheckUtils.checkTaskNodeParameters(taskNode)); // ShellParameters ShellParameters shellParameters = new ShellParameters(); taskNode.setParams(JSONUtils.toJsonString(shellParameters)); - taskNode.setType(TaskType.SHELL.toString()); + taskNode.setType(TaskType.SHELL.getDesc()); assertFalse(CheckUtils.checkTaskNodeParameters(taskNode)); shellParameters.setRawScript(""); taskNode.setParams(JSONUtils.toJsonString(shellParameters)); @@ -175,7 +175,7 @@ public class CheckUtilsTest { // ProcedureParameters ProcedureParameters procedureParameters = new ProcedureParameters(); taskNode.setParams(JSONUtils.toJsonString(procedureParameters)); - taskNode.setType(TaskType.PROCEDURE.toString()); + taskNode.setType(TaskType.PROCEDURE.getDesc()); assertFalse(CheckUtils.checkTaskNodeParameters(taskNode)); procedureParameters.setDatasource(1); procedureParameters.setType("xx"); @@ -186,7 +186,7 @@ public class CheckUtilsTest { // SqlParameters SqlParameters sqlParameters = new SqlParameters(); taskNode.setParams(JSONUtils.toJsonString(sqlParameters)); - taskNode.setType(TaskType.SQL.toString()); + taskNode.setType(TaskType.SQL.getDesc()); assertFalse(CheckUtils.checkTaskNodeParameters(taskNode)); sqlParameters.setDatasource(1); sqlParameters.setType("xx"); @@ -197,7 +197,7 @@ public class CheckUtilsTest { // MapReduceParameters MapReduceParameters mapreduceParameters = new MapReduceParameters(); taskNode.setParams(JSONUtils.toJsonString(mapreduceParameters)); - taskNode.setType(TaskType.MR.toString()); + taskNode.setType(TaskType.MR.getDesc()); assertFalse(CheckUtils.checkTaskNodeParameters(taskNode)); ResourceInfo resourceInfoMapreduce = new ResourceInfo(); @@ -206,13 +206,13 @@ public class CheckUtilsTest { mapreduceParameters.setMainJar(resourceInfoMapreduce); mapreduceParameters.setProgramType(ProgramType.JAVA); taskNode.setParams(JSONUtils.toJsonString(mapreduceParameters)); - taskNode.setType(TaskType.MR.toString()); + taskNode.setType(TaskType.MR.getDesc()); assertTrue(CheckUtils.checkTaskNodeParameters(taskNode)); // SparkParameters SparkParameters sparkParameters = new SparkParameters(); taskNode.setParams(JSONUtils.toJsonString(sparkParameters)); - taskNode.setType(TaskType.SPARK.toString()); + taskNode.setType(TaskType.SPARK.getDesc()); assertFalse(CheckUtils.checkTaskNodeParameters(taskNode)); sparkParameters.setMainJar(new ResourceInfo()); sparkParameters.setProgramType(ProgramType.SCALA); @@ -223,7 +223,7 @@ public class CheckUtilsTest { // PythonParameters PythonParameters pythonParameters = new PythonParameters(); taskNode.setParams(JSONUtils.toJsonString(pythonParameters)); - taskNode.setType(TaskType.PYTHON.toString()); + taskNode.setType(TaskType.PYTHON.getDesc()); assertFalse(CheckUtils.checkTaskNodeParameters(taskNode)); pythonParameters.setRawScript("ss"); taskNode.setParams(JSONUtils.toJsonString(pythonParameters)); @@ -232,42 +232,42 @@ public class CheckUtilsTest { // DependentParameters DependentParameters dependentParameters = new DependentParameters(); taskNode.setParams(JSONUtils.toJsonString(dependentParameters)); - taskNode.setType(TaskType.DEPENDENT.toString()); + taskNode.setType(TaskType.DEPENDENT.getDesc()); assertTrue(CheckUtils.checkTaskNodeParameters(taskNode)); // FlinkParameters FlinkParameters flinkParameters = new FlinkParameters(); taskNode.setParams(JSONUtils.toJsonString(flinkParameters)); - taskNode.setType(TaskType.FLINK.toString()); + taskNode.setType(TaskType.FLINK.getDesc()); assertFalse(CheckUtils.checkTaskNodeParameters(taskNode)); flinkParameters.setMainJar(new ResourceInfo()); flinkParameters.setProgramType(ProgramType.JAVA); taskNode.setParams(JSONUtils.toJsonString(flinkParameters)); - taskNode.setType(TaskType.FLINK.toString()); + taskNode.setType(TaskType.FLINK.getDesc()); assertTrue(CheckUtils.checkTaskNodeParameters(taskNode)); // HTTP HttpParameters httpParameters = new HttpParameters(); taskNode.setParams(JSONUtils.toJsonString(httpParameters)); - taskNode.setType(TaskType.HTTP.toString()); + taskNode.setType(TaskType.HTTP.getDesc()); assertFalse(CheckUtils.checkTaskNodeParameters(taskNode)); httpParameters.setUrl("httpUrl"); taskNode.setParams(JSONUtils.toJsonString(httpParameters)); - taskNode.setType(TaskType.HTTP.toString()); + taskNode.setType(TaskType.HTTP.getDesc()); assertTrue(CheckUtils.checkTaskNodeParameters(taskNode)); // DataxParameters DataxParameters dataxParameters = new DataxParameters(); taskNode.setParams(JSONUtils.toJsonString(dataxParameters)); - taskNode.setType(TaskType.DATAX.toString()); + taskNode.setType(TaskType.DATAX.getDesc()); assertFalse(CheckUtils.checkTaskNodeParameters(taskNode)); dataxParameters.setCustomConfig(0); dataxParameters.setDataSource(111); dataxParameters.setDataTarget(333); - dataxParameters.setSql("sql"); + dataxParameters.setSql(TaskType.SQL.getDesc()); dataxParameters.setTargetTable("tar"); taskNode.setParams(JSONUtils.toJsonString(dataxParameters)); - taskNode.setType(TaskType.DATAX.toString()); + taskNode.setType(TaskType.DATAX.getDesc()); assertTrue(CheckUtils.checkTaskNodeParameters(taskNode)); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java index 59b0387e49..d0842e4ba7 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java @@ -16,9 +16,6 @@ */ package org.apache.dolphinscheduler.common.enums; -import java.util.HashMap; -import java.util.Map; - import com.baomidou.mybatisplus.annotation.EnumValue; /** @@ -41,56 +38,35 @@ public enum TaskType { * 12 SQOOP * 13 WATERDROP */ - SHELL(0, "shell"), - SQL(1, "sql"), - SUB_PROCESS(2, "sub_process"), - PROCEDURE(3, "procedure"), - MR(4, "mr"), - SPARK(5, "spark"), - PYTHON(6, "python"), - DEPENDENT(7, "dependent"), - FLINK(8, "flink"), - HTTP(9, "http"), - DATAX(10, "datax"), - CONDITIONS(11, "conditions"), - SQOOP(12, "sqoop"), - WATERDROP(13, "waterdrop"); + SHELL(0, "SHELL"), + SQL(1, "SQL"), + SUB_PROCESS(2, "SUB_PROCESS"), + PROCEDURE(3, "PROCEDURE"), + MR(4, "MR"), + SPARK(5, "SPARK"), + PYTHON(6, "PYTHON"), + DEPENDENT(7, "DEPENDENT"), + FLINK(8, "FLINK"), + HTTP(9, "HTTP"), + DATAX(10, "DATAX"), + CONDITIONS(11, "CONDITIONS"), + SQOOP(12, "SQOOP"), + WATERDROP(13, "WATERDROP"); - TaskType(int code, String descp) { + TaskType(int code, String desc) { this.code = code; - this.descp = descp; + this.desc = desc; } @EnumValue private final int code; - private final String descp; - - public static boolean typeIsNormalTask(String typeName) { - TaskType taskType = TaskType.valueOf(typeName); - return !(taskType == TaskType.SUB_PROCESS || taskType == TaskType.DEPENDENT); - } + private final String desc; public int getCode() { return code; } - public String getDescp() { - return descp; - } - - private static final Map TASK_TYPE_MAP = new HashMap<>(); - - static { - for (TaskType taskType : TaskType.values()) { - TASK_TYPE_MAP.put(taskType.descp, taskType); - } - } - - public static TaskType of(String descp) { - descp = descp.toLowerCase(); - if (TASK_TYPE_MAP.containsKey(descp)) { - return TASK_TYPE_MAP.get(descp); - } - throw new IllegalArgumentException("invalid type : " + descp); + public String getDesc() { + return desc; } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java index c6adc30d36..404b1b26d0 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java @@ -360,7 +360,7 @@ public class TaskNode { } public boolean isConditionsTask() { - return TaskType.CONDITIONS.toString().equalsIgnoreCase(this.getType()); + return TaskType.CONDITIONS.getDesc().equalsIgnoreCase(this.getType()); } public List getPreTaskNodeList() { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java index 43654d6a0f..740635cd0e 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.common.utils; -import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters; @@ -41,7 +40,7 @@ import org.slf4j.LoggerFactory; */ public class TaskParametersUtils { - private static Logger logger = LoggerFactory.getLogger(TaskParametersUtils.class); + private static final Logger logger = LoggerFactory.getLogger(TaskParametersUtils.class); private TaskParametersUtils() { throw new UnsupportedOperationException("Construct TaskParametersUtils"); @@ -55,40 +54,36 @@ public class TaskParametersUtils { * @return task parameters */ public static AbstractParameters getParameters(String taskType, String parameter) { - TaskType anEnum = EnumUtils.getEnum(TaskType.class, taskType); - if (anEnum == null) { - logger.error("not support task type: {}", taskType); - return null; - } - switch (anEnum) { - case SUB_PROCESS: + switch (taskType) { + case "SUB_PROCESS": return JSONUtils.parseObject(parameter, SubProcessParameters.class); - case SHELL: - case WATERDROP: + case "SHELL": + case "WATERDROP": return JSONUtils.parseObject(parameter, ShellParameters.class); - case PROCEDURE: + case "PROCEDURE": return JSONUtils.parseObject(parameter, ProcedureParameters.class); - case SQL: + case "SQL": return JSONUtils.parseObject(parameter, SqlParameters.class); - case MR: + case "MR": return JSONUtils.parseObject(parameter, MapReduceParameters.class); - case SPARK: + case "SPARK": return JSONUtils.parseObject(parameter, SparkParameters.class); - case PYTHON: + case "PYTHON": return JSONUtils.parseObject(parameter, PythonParameters.class); - case DEPENDENT: + case "DEPENDENT": return JSONUtils.parseObject(parameter, DependentParameters.class); - case FLINK: + case "FLINK": return JSONUtils.parseObject(parameter, FlinkParameters.class); - case HTTP: + case "HTTP": return JSONUtils.parseObject(parameter, HttpParameters.class); - case DATAX: + case "DATAX": return JSONUtils.parseObject(parameter, DataxParameters.class); - case CONDITIONS: + case "CONDITIONS": return JSONUtils.parseObject(parameter, ConditionsParameters.class); - case SQOOP: + case "SQOOP": return JSONUtils.parseObject(parameter, SqoopParameters.class); default: + logger.error("not support task type: {}", taskType); return null; } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtilsTest.java index b316b17469..731f94142c 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtilsTest.java @@ -16,6 +16,8 @@ */ package org.apache.dolphinscheduler.common.utils; +import org.apache.dolphinscheduler.common.enums.TaskType; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -30,16 +32,16 @@ public class TaskParametersUtilsTest { @Test public void testGetParameters() { Assert.assertNull(TaskParametersUtils.getParameters("xx", "ttt")); - Assert.assertNull(TaskParametersUtils.getParameters("SHELL", "ttt")); - Assert.assertNotNull(TaskParametersUtils.getParameters("SHELL", "{}")); - Assert.assertNotNull(TaskParametersUtils.getParameters("SQL", "{}")); - Assert.assertNotNull(TaskParametersUtils.getParameters("SUB_PROCESS", "{}")); - Assert.assertNotNull(TaskParametersUtils.getParameters("PROCEDURE", "{}")); - Assert.assertNotNull(TaskParametersUtils.getParameters("MR", "{}")); - Assert.assertNotNull(TaskParametersUtils.getParameters("SPARK", "{}")); - Assert.assertNotNull(TaskParametersUtils.getParameters("PYTHON", "{}")); - Assert.assertNotNull(TaskParametersUtils.getParameters("DEPENDENT", "{}")); - Assert.assertNotNull(TaskParametersUtils.getParameters("FLINK", "{}")); - Assert.assertNotNull(TaskParametersUtils.getParameters("HTTP", "{}")); + Assert.assertNull(TaskParametersUtils.getParameters(TaskType.SHELL.getDesc(), "ttt")); + Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.SHELL.getDesc(), "{}")); + Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.SQL.getDesc(), "{}")); + Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.SUB_PROCESS.getDesc(), "{}")); + Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.PROCEDURE.getDesc(), "{}")); + Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.MR.getDesc(), "{}")); + Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.SPARK.getDesc(), "{}")); + Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.PYTHON.getDesc(), "{}")); + Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.DEPENDENT.getDesc(), "{}")); + Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.FLINK.getDesc(), "{}")); + Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.HTTP.getDesc(), "{}")); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java index b33f6e7aed..e757ea30ea 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.dao.entity; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; -import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -82,7 +81,7 @@ public class TaskDefinition { /** * task type */ - private TaskType taskType; + private String taskType; /** * user defined parameters @@ -318,11 +317,11 @@ public class TaskDefinition { this.projectCode = projectCode; } - public TaskType getTaskType() { + public String getTaskType() { return taskType; } - public void setTaskType(TaskType taskType) { + public void setTaskType(String taskType) { this.taskType = taskType; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java index c1da572761..dfade537ce 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java @@ -17,11 +17,11 @@ package org.apache.dolphinscheduler.dao.entity; +import java.util.Date; + import com.baomidou.mybatisplus.annotation.TableName; import com.fasterxml.jackson.annotation.JsonFormat; -import java.util.Date; - /** * task definition log */ diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index a2de601916..f1da73f3a6 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -68,7 +68,7 @@ public class TaskInstance implements Serializable { private long taskCode; /** - * task defintion version + * task definition version */ private int taskDefinitionVersion; @@ -146,6 +146,12 @@ public class TaskInstance implements Serializable { @TableField(exist = false) private ProcessDefinition processDefine; + /** + * task definition + */ + @TableField(exist = false) + private TaskDefinition taskDefine; + /** * process id */ @@ -268,6 +274,14 @@ public class TaskInstance implements Serializable { this.processDefine = processDefine; } + public TaskDefinition getTaskDefine() { + return taskDefine; + } + + public void setTaskDefine(TaskDefinition taskDefine) { + this.taskDefine = taskDefine; + } + public int getId() { return id; } @@ -484,15 +498,15 @@ public class TaskInstance implements Serializable { } public boolean isSubProcess() { - return TaskType.SUB_PROCESS.equals(TaskType.valueOf(this.taskType)); + return TaskType.SUB_PROCESS.getDesc().equalsIgnoreCase(this.taskType); } public boolean isDependTask() { - return TaskType.DEPENDENT.equals(TaskType.valueOf(this.taskType)); + return TaskType.DEPENDENT.getDesc().equalsIgnoreCase(this.taskType); } public boolean isConditionsTask() { - return TaskType.CONDITIONS.equals(TaskType.valueOf(this.taskType)); + return TaskType.CONDITIONS.getDesc().equalsIgnoreCase(this.taskType); } /** diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java index d2117dc620..3768bb36c8 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.dao.entity; import org.apache.dolphinscheduler.common.enums.DependentRelation; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.model.DependentItem; import org.apache.dolphinscheduler.common.model.DependentTaskModel; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -39,19 +40,19 @@ public class TaskInstanceTest { TaskInstance taskInstance = new TaskInstance(); //sub process - taskInstance.setTaskType("SUB_PROCESS"); + taskInstance.setTaskType(TaskType.SUB_PROCESS.getDesc()); Assert.assertTrue(taskInstance.isSubProcess()); //not sub process - taskInstance.setTaskType("HTTP"); + taskInstance.setTaskType(TaskType.HTTP.getDesc()); Assert.assertFalse(taskInstance.isSubProcess()); //sub process - taskInstance.setTaskType("CONDITIONS"); + taskInstance.setTaskType(TaskType.CONDITIONS.getDesc()); Assert.assertTrue(taskInstance.isConditionsTask()); //sub process - taskInstance.setTaskType("DEPENDENT"); + taskInstance.setTaskType(TaskType.DEPENDENT.getDesc()); Assert.assertTrue(taskInstance.isDependTask()); } diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapperTest.java index 42c32f6602..f268a4f226 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapperTest.java @@ -63,7 +63,7 @@ public class TaskDefinitionLogMapperTest { taskDefinition.setCode(888888L); taskDefinition.setName("unit-test"); taskDefinition.setProjectCode(1L); - taskDefinition.setTaskType(TaskType.SHELL); + taskDefinition.setTaskType(TaskType.SHELL.getDesc()); taskDefinition.setUserId(userId); taskDefinition.setVersion(1); taskDefinition.setCreateTime(new Date()); @@ -120,7 +120,7 @@ public class TaskDefinitionLogMapperTest { taskDefinition.setCode(888888L); taskDefinition.setName("unit-test"); taskDefinition.setProjectCode(1L); - taskDefinition.setTaskType(TaskType.SHELL); + taskDefinition.setTaskType(TaskType.SHELL.getDesc()); taskDefinition.setUserId(1); taskDefinition.setResourceIds("1"); taskDefinition.setVersion(1); diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java index 946ed82fe1..5e93dc04c8 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java @@ -60,7 +60,7 @@ public class TaskDefinitionMapperTest { taskDefinition.setCode(888888L); taskDefinition.setName("unit-test"); taskDefinition.setProjectCode(1L); - taskDefinition.setTaskType(TaskType.SHELL); + taskDefinition.setTaskType(TaskType.SHELL.getDesc()); taskDefinition.setUserId(userId); taskDefinition.setResourceIds("1"); taskDefinition.setVersion(1); diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java index 4b26cae58c..9c852b5943 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java @@ -17,9 +17,6 @@ package org.apache.dolphinscheduler.dao.mapper; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; - import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.TaskType; @@ -41,6 +38,9 @@ import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.transaction.annotation.Transactional; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + @RunWith(SpringRunner.class) @SpringBootTest @Transactional @@ -77,7 +77,7 @@ public class TaskInstanceMapperTest { */ private TaskInstance insertTaskInstance(int processInstanceId) { //insertOne - return insertTaskInstance("us task", processInstanceId, ExecutionStatus.RUNNING_EXECUTION, TaskType.SHELL.toString()); + return insertTaskInstance(processInstanceId, TaskType.SHELL.getDesc()); } /** @@ -100,11 +100,11 @@ public class TaskInstanceMapperTest { /** * construct a task instance and then insert */ - private TaskInstance insertTaskInstance(String taskName, int processInstanceId, ExecutionStatus state, String taskType) { + private TaskInstance insertTaskInstance(int processInstanceId, String taskType) { TaskInstance taskInstance = new TaskInstance(); taskInstance.setFlag(Flag.YES); - taskInstance.setName(taskName); - taskInstance.setState(state); + taskInstance.setName("us task"); + taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(new Date()); taskInstance.setEndTime(new Date()); taskInstance.setProcessInstanceId(processInstanceId); diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java index 88e71c4891..1bc30e56ec 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/DagHelperTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; @@ -205,7 +206,7 @@ public class DagHelperTest { completeTaskList.put("2", new TaskInstance()); completeTaskList.put("4", new TaskInstance()); TaskNode node3 = dag.getNode("3"); - node3.setType("CONDITIONS"); + node3.setType(TaskType.CONDITIONS.getDesc()); node3.setConditionResult("{\n" + " \"successNode\": [5\n" @@ -273,13 +274,13 @@ public class DagHelperTest { TaskNode node1 = new TaskNode(); node1.setId("1"); node1.setName("1"); - node1.setType("SHELL"); + node1.setType(TaskType.SHELL.getDesc()); taskNodeList.add(node1); TaskNode node2 = new TaskNode(); node2.setId("2"); node2.setName("2"); - node2.setType("SHELL"); + node2.setType(TaskType.SHELL.getDesc()); List dep2 = new ArrayList<>(); dep2.add("1"); node2.setDepList(dep2); @@ -289,13 +290,13 @@ public class DagHelperTest { TaskNode node4 = new TaskNode(); node4.setId("4"); node4.setName("4"); - node4.setType("SHELL"); + node4.setType(TaskType.SHELL.getDesc()); taskNodeList.add(node4); TaskNode node3 = new TaskNode(); node3.setId("3"); node3.setName("3"); - node3.setType("SHELL"); + node3.setType(TaskType.SHELL.getDesc()); List dep3 = new ArrayList<>(); dep3.add("2"); dep3.add("4"); @@ -305,7 +306,7 @@ public class DagHelperTest { TaskNode node5 = new TaskNode(); node5.setId("5"); node5.setName("5"); - node5.setType("SHELL"); + node5.setType(TaskType.SHELL.getDesc()); List dep5 = new ArrayList<>(); dep5.add("3"); dep5.add("8"); @@ -315,7 +316,7 @@ public class DagHelperTest { TaskNode node6 = new TaskNode(); node6.setId("6"); node6.setName("6"); - node6.setType("SHELL"); + node6.setType(TaskType.SHELL.getDesc()); List dep6 = new ArrayList<>(); dep6.add("3"); node6.setDepList(dep6); @@ -324,7 +325,7 @@ public class DagHelperTest { TaskNode node7 = new TaskNode(); node7.setId("7"); node7.setName("7"); - node7.setType("SHELL"); + node7.setType(TaskType.SHELL.getDesc()); List dep7 = new ArrayList<>(); dep7.add("5"); node7.setDepList(dep7); @@ -333,7 +334,7 @@ public class DagHelperTest { TaskNode node8 = new TaskNode(); node8.setId("8"); node8.setName("8"); - node8.setType("SHELL"); + node8.setType(TaskType.SHELL.getDesc()); List dep8 = new ArrayList<>(); dep8.add("2"); node8.setDepList(dep8); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index dc00335c91..da46e4dce3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.builder; import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT; +import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.server.entity.*; @@ -54,12 +56,15 @@ public class TaskExecutionContextBuilder { } public TaskExecutionContextBuilder buildTaskDefinitionRelatedInfo(TaskDefinition taskDefinition) { - int timeoutSeconds = taskDefinition.getTimeout() * SEC_2_MINUTES_TIME_UNIT; - if (timeoutSeconds >= Integer.MAX_VALUE) { - timeoutSeconds = Integer.MAX_VALUE; + taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE); + if (taskDefinition.getTimeoutFlag() == TimeoutFlag.OPEN) { + taskExecutionContext.setTaskTimeoutStrategy(taskDefinition.getTimeoutNotifyStrategy()); + if (taskDefinition.getTimeoutNotifyStrategy() == TaskTimeoutStrategy.FAILED + || taskDefinition.getTimeoutNotifyStrategy() == TaskTimeoutStrategy.WARNFAILED) { + taskExecutionContext.setTaskTimeout(Math.min(taskDefinition.getTimeout() * SEC_2_MINUTES_TIME_UNIT, Integer.MAX_VALUE)); + } } - taskExecutionContext.setTaskTimeoutStrategy(taskDefinition.getTimeoutNotifyStrategy().getCode()); - taskExecutionContext.setTaskTimeout(timeoutSeconds); + taskExecutionContext.setTaskParams(taskDefinition.getTaskParams()); return this; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index 2a4c089d76..84908496d2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.entity; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; @@ -168,7 +169,7 @@ public class TaskExecutionContext implements Serializable { /** * task timeout strategy */ - private int taskTimeoutStrategy; + private TaskTimeoutStrategy taskTimeoutStrategy; /** * task timeout @@ -420,11 +421,11 @@ public class TaskExecutionContext implements Serializable { this.taskAppId = taskAppId; } - public int getTaskTimeoutStrategy() { + public TaskTimeoutStrategy getTaskTimeoutStrategy() { return taskTimeoutStrategy; } - public void setTaskTimeoutStrategy(int taskTimeoutStrategy) { + public void setTaskTimeoutStrategy(TaskTimeoutStrategy taskTimeoutStrategy) { this.taskTimeoutStrategy = taskTimeoutStrategy; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java index 0dc7035f23..366a6c4a9c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java @@ -23,12 +23,13 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + /** * taskInstance state manager */ @@ -74,8 +75,8 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager { taskInstance.setId(taskExecutionContext.getTaskInstanceId()); taskInstance.setName(taskExecutionContext.getTaskName()); taskInstance.setStartTime(taskExecutionContext.getStartTime()); - taskInstance.setTaskType(taskInstance.getTaskType()); - taskInstance.setExecutePath(taskInstance.getExecutePath()); + taskInstance.setTaskType(taskExecutionContext.getTaskType()); + taskInstance.setExecutePath(taskExecutionContext.getExecutePath()); taskInstanceCache.put(taskExecutionContext.getTaskInstanceId(), taskInstance); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index b51764a644..747625dd43 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -196,21 +196,17 @@ public class TaskPriorityQueueConsumer extends Thread { protected TaskExecutionContext getTaskExecutionContext(int taskInstanceId) { TaskInstance taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstanceId); - // task type - TaskType taskType = TaskType.valueOf(taskInstance.getTaskType()); - - int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId(); Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId); // verify tenant is null if (verifyTenantIsNull(tenant, taskInstance)) { processService.changeTaskState(taskInstance, ExecutionStatus.FAILURE, - taskInstance.getStartTime(), - taskInstance.getHost(), - null, - null, - taskInstance.getId()); + taskInstance.getStartTime(), + taskInstance.getHost(), + null, + null, + taskInstance.getId()); return null; } // set queue for process instance, user-specified queue takes precedence over tenant queue @@ -225,40 +221,41 @@ public class TaskPriorityQueueConsumer extends Thread { SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext(); // SQL task - if (taskType == TaskType.SQL) { + if (TaskType.SQL.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) { setSQLTaskRelation(sqlTaskExecutionContext, taskInstance); } // DATAX task - if (taskType == TaskType.DATAX) { + if (TaskType.DATAX.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) { setDataxTaskRelation(dataxTaskExecutionContext, taskInstance); } // procedure task - if (taskType == TaskType.PROCEDURE) { + if (TaskType.PROCEDURE.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) { setProcedureTaskRelation(procedureTaskExecutionContext, taskInstance); } - if (taskType == TaskType.SQOOP) { + if (TaskType.SQOOP.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) { setSqoopTaskRelation(sqoopTaskExecutionContext, taskInstance); } return TaskExecutionContextBuilder.get() - .buildTaskInstanceRelatedInfo(taskInstance) - .buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance()) - .buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine()) - .buildSQLTaskRelatedInfo(sqlTaskExecutionContext) - .buildDataxTaskRelatedInfo(dataxTaskExecutionContext) - .buildProcedureTaskRelatedInfo(procedureTaskExecutionContext) - .buildSqoopTaskRelatedInfo(sqoopTaskExecutionContext) - .create(); + .buildTaskInstanceRelatedInfo(taskInstance) + .buildTaskDefinitionRelatedInfo(taskInstance.getTaskDefine()) + .buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance()) + .buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine()) + .buildSQLTaskRelatedInfo(sqlTaskExecutionContext) + .buildDataxTaskRelatedInfo(dataxTaskExecutionContext) + .buildProcedureTaskRelatedInfo(procedureTaskExecutionContext) + .buildSqoopTaskRelatedInfo(sqoopTaskExecutionContext) + .create(); } /** * set procedure task relation * * @param procedureTaskExecutionContext procedureTaskExecutionContext - * @param taskInstance taskInstance + * @param taskInstance taskInstance */ private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskInstance taskInstance) { ProcedureParameters procedureParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class); @@ -271,7 +268,7 @@ public class TaskPriorityQueueConsumer extends Thread { * set datax task relation * * @param dataxTaskExecutionContext dataxTaskExecutionContext - * @param taskInstance taskInstance + * @param taskInstance taskInstance */ protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskInstance taskInstance) { DataxParameters dataxParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class); @@ -296,7 +293,7 @@ public class TaskPriorityQueueConsumer extends Thread { * set sqoop task relation * * @param sqoopTaskExecutionContext sqoopTaskExecutionContext - * @param taskInstance taskInstance + * @param taskInstance taskInstance */ private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskInstance taskInstance) { SqoopParameters sqoopParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class); @@ -327,7 +324,7 @@ public class TaskPriorityQueueConsumer extends Thread { * set SQL task relation * * @param sqlTaskExecutionContext sqlTaskExecutionContext - * @param taskInstance taskInstance + * @param taskInstance taskInstance */ private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskInstance taskInstance) { SqlParameters sqlParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class); @@ -337,7 +334,7 @@ public class TaskPriorityQueueConsumer extends Thread { // whether udf type boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType()) - && StringUtils.isNotEmpty(sqlParameters.getUdfs()); + && StringUtils.isNotEmpty(sqlParameters.getUdfs()); if (udfTypeFlag) { String[] udfFunIds = sqlParameters.getUdfs().split(","); @@ -360,15 +357,15 @@ public class TaskPriorityQueueConsumer extends Thread { /** * whehter tenant is null * - * @param tenant tenant + * @param tenant tenant * @param taskInstance taskInstance * @return result */ protected boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) { if (tenant == null) { logger.error("tenant not exists,process instance id : {},task instance id : {}", - taskInstance.getProcessInstance().getId(), - taskInstance.getId()); + taskInstance.getProcessInstance().getId(), + taskInstance.getId()); return true; } return false; @@ -388,10 +385,7 @@ public class TaskPriorityQueueConsumer extends Thread { // filter the resources that the resource id equals 0 Set oldVersionResources = projectResourceFiles.stream().filter(t -> t.getId() == 0).collect(Collectors.toSet()); if (CollectionUtils.isNotEmpty(oldVersionResources)) { - - oldVersionResources.forEach( - (t) -> resourcesMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE)) - ); + oldVersionResources.forEach(t -> resourcesMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE))); } // get the resource id in order to get the resource names in batch @@ -402,9 +396,7 @@ public class TaskPriorityQueueConsumer extends Thread { Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]); List resources = processService.listResourceByIds(resourceIds); - resources.forEach( - (t) -> resourcesMap.put(t.getFullName(), processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE)) - ); + resources.forEach(t -> resourcesMap.put(t.getFullName(), processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE))); } } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index d89ded788d..fc1cc973f9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -484,7 +484,7 @@ public class MasterExecThread implements Runnable { // process instance id taskInstance.setProcessInstanceId(processInstance.getId()); // task instance type - taskInstance.setTaskType(taskNode.getType()); + taskInstance.setTaskType(taskNode.getType().toUpperCase()); // task instance whether alert taskInstance.setAlertFlag(Flag.NO); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 7589e8567e..e246f711cb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -200,7 +200,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); ackCommand.setHost(taskExecutionContext.getHost()); ackCommand.setStartTime(taskExecutionContext.getStartTime()); - if (taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())) { + if (TaskType.SQL.getDesc().equalsIgnoreCase(taskExecutionContext.getTaskType()) || TaskType.PROCEDURE.getDesc().equalsIgnoreCase(taskExecutionContext.getTaskType())) { ackCommand.setExecutePath(null); } else { ackCommand.setExecutePath(taskExecutionContext.getExecutePath()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index ee43d36cfe..6fd4f34b2f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -20,9 +20,7 @@ package org.apache.dolphinscheduler.server.worker.runner; import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; -import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; -import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; @@ -132,7 +130,6 @@ public class TaskExecuteThread implements Runnable, Delayed { return; } - if (taskExecutionContext.getStartTime() == null) { taskExecutionContext.setStartTime(new Date()); } @@ -146,7 +143,6 @@ public class TaskExecuteThread implements Runnable, Delayed { taskExecutionContext.getResources(), logger); - taskExecutionContext.setTaskParams(taskExecutionContext.getTaskParams()); taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath()); taskExecutionContext.setDefinedParams(getGlobalParamsMap()); @@ -218,7 +214,7 @@ public class TaskExecuteThread implements Runnable, Delayed { /** * get global paras map - * @return + * @return map */ private Map getGlobalParamsMap() { Map globalParamsMap = new HashMap<>(16); @@ -249,13 +245,11 @@ public class TaskExecuteThread implements Runnable, Delayed { /** * download resource file * - * @param execLocalPath - * @param projectRes - * @param logger + * @param execLocalPath execLocalPath + * @param projectRes projectRes + * @param logger logger */ - private void downloadResource(String execLocalPath, - Map projectRes, - Logger logger) throws Exception { + private void downloadResource(String execLocalPath, Map projectRes, Logger logger) { if (MapUtils.isEmpty(projectRes)) { return; } @@ -311,8 +305,7 @@ public class TaskExecuteThread implements Runnable, Delayed { ackCommand.setStartTime(taskExecutionContext.getStartTime()); ackCommand.setLogPath(taskExecutionContext.getLogPath()); ackCommand.setHost(taskExecutionContext.getHost()); - if (taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) - || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())) { + if (TaskType.SQL.getDesc().equalsIgnoreCase(taskExecutionContext.getTaskType()) || TaskType.PROCEDURE.getDesc().equalsIgnoreCase(taskExecutionContext.getTaskType())) { ackCommand.setExecutePath(null); } else { ackCommand.setExecutePath(taskExecutionContext.getExecutePath()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 788d5441df..36e1bcd9bb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -135,7 +135,7 @@ public abstract class AbstractCommandExecutor { processBuilder.redirectErrorStream(true); // setting up user to run commands - if (CommonUtils.isSudoEnable()) { + if (!OSUtils.isWindows() && CommonUtils.isSudoEnable()) { command.add("sudo"); command.add("-u"); command.add(taskExecutionContext.getTenantCode()); @@ -208,9 +208,7 @@ public abstract class AbstractCommandExecutor { boolean status = process.waitFor(remainTime, TimeUnit.SECONDS); logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{}", - taskExecutionContext.getExecutePath(), - processId - , result.getExitStatusCode()); + taskExecutionContext.getExecutePath(), processId, result.getExitStatusCode()); // if SHELL task exit if (status) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index bf36b24ae2..a5221dd224 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -194,8 +194,7 @@ public abstract class AbstractTask { public void after() { if (getExitStatusCode() == Constants.EXIT_CODE_SUCCESS) { // task recor flat : if true , start up qianfan - if (TaskRecordDao.getTaskRecordFlag() - && TaskType.typeIsNormalTask(taskExecutionContext.getTaskType())) { + if (TaskRecordDao.getTaskRecordFlag() && typeIsNormalTask(taskExecutionContext.getTaskType())) { AbstractParameters params = TaskParametersUtils.getParameters(taskExecutionContext.getTaskType(), taskExecutionContext.getTaskParams()); // replace placeholder @@ -224,6 +223,9 @@ public abstract class AbstractTask { } } + private boolean typeIsNormalTask(String taskType) { + return !(TaskType.SUB_PROCESS.getDesc().equalsIgnoreCase(taskType) || TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskType)); + } /** * get exit status according to exitCode diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java index b89c8d4ca6..d44202c169 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.server.worker.task; -import org.apache.dolphinscheduler.common.enums.TaskType; -import org.apache.dolphinscheduler.common.utils.EnumUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask; import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask; @@ -35,7 +33,7 @@ import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.slf4j.Logger; /** - * task manaster + * task manager */ public class TaskManager { @@ -47,32 +45,27 @@ public class TaskManager { * @throws IllegalArgumentException illegal argument exception */ public static AbstractTask newTask(TaskExecutionContext taskExecutionContext, Logger logger, AlertClientService alertClientService) throws IllegalArgumentException { - TaskType anEnum = EnumUtils.getEnum(TaskType.class, taskExecutionContext.getTaskType()); - if (anEnum == null) { - logger.error("not support task type: {}", taskExecutionContext.getTaskType()); - throw new IllegalArgumentException("not support task type"); - } - switch (anEnum) { - case SHELL: - case WATERDROP: + switch (taskExecutionContext.getTaskType()) { + case "SHELL": + case "WATERDROP": return new ShellTask(taskExecutionContext, logger); - case PROCEDURE: + case "PROCEDURE": return new ProcedureTask(taskExecutionContext, logger); - case SQL: + case "SQL": return new SqlTask(taskExecutionContext, logger, alertClientService); - case MR: + case "MR": return new MapReduceTask(taskExecutionContext, logger); - case SPARK: + case "SPARK": return new SparkTask(taskExecutionContext, logger); - case FLINK: + case "FLINK": return new FlinkTask(taskExecutionContext, logger); - case PYTHON: + case "PYTHON": return new PythonTask(taskExecutionContext, logger); - case HTTP: + case "HTTP": return new HttpTask(taskExecutionContext, logger); - case DATAX: + case "DATAX": return new DataxTask(taskExecutionContext, logger); - case SQOOP: + case "SQOOP": return new SqoopTask(taskExecutionContext, logger); default: logger.error("not support task type: {}", taskExecutionContext.getTaskType()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java index a11a8c0d45..2eae1abdfa 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java @@ -16,32 +16,50 @@ */ package org.apache.dolphinscheduler.server.worker.task.processdure; -import com.cronutils.utils.StringUtils; +import static org.apache.dolphinscheduler.common.enums.DataType.BOOLEAN; +import static org.apache.dolphinscheduler.common.enums.DataType.DATE; +import static org.apache.dolphinscheduler.common.enums.DataType.DOUBLE; +import static org.apache.dolphinscheduler.common.enums.DataType.FLOAT; +import static org.apache.dolphinscheduler.common.enums.DataType.INTEGER; +import static org.apache.dolphinscheduler.common.enums.DataType.LONG; +import static org.apache.dolphinscheduler.common.enums.DataType.TIME; +import static org.apache.dolphinscheduler.common.enums.DataType.TIMESTAMP; +import static org.apache.dolphinscheduler.common.enums.DataType.VARCHAR; + import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.DataType; +import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.enums.Direct; +import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; -import org.slf4j.Logger; -import java.sql.*; +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import static org.apache.dolphinscheduler.common.enums.DataType.*; +import org.slf4j.Logger; /** - * procedure task + * procedure task */ public class ProcedureTask extends AbstractTask { @@ -63,8 +81,9 @@ public class ProcedureTask extends AbstractTask { /** * constructor + * * @param taskExecutionContext taskExecutionContext - * @param logger logger + * @param logger logger */ public ProcedureTask(TaskExecutionContext taskExecutionContext, Logger logger) { super(taskExecutionContext, logger); @@ -111,7 +130,6 @@ public class ProcedureTask extends AbstractTask { baseDataSource.getPassword()); - // combining local and global parameters Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), @@ -122,13 +140,13 @@ public class ProcedureTask extends AbstractTask { Collection userDefParamsList = null; - if (procedureParameters.getLocalParametersMap() != null){ + if (procedureParameters.getLocalParametersMap() != null) { userDefParamsList = procedureParameters.getLocalParametersMap().values(); } String method = getCallMethod(userDefParamsList); - logger.info("call method : {}",method); + logger.info("call method : {}", method); // call method stmt = connection.prepareCall(method); @@ -148,48 +166,48 @@ public class ProcedureTask extends AbstractTask { printOutParameter(stmt, outParameterMap); setExitStatusCode(Constants.EXIT_CODE_SUCCESS); - }catch (Exception e){ + } catch (Exception e) { setExitStatusCode(Constants.EXIT_CODE_FAILURE); - logger.error("procedure task error",e); + logger.error("procedure task error", e); throw e; - } - finally { - close(stmt,connection); + } finally { + close(stmt, connection); } } /** * get call method + * * @param userDefParamsList userDefParamsList * @return method */ private String getCallMethod(Collection userDefParamsList) { String method;// no parameters - if (CollectionUtils.isEmpty(userDefParamsList)){ + if (CollectionUtils.isEmpty(userDefParamsList)) { method = "{call " + procedureParameters.getMethod() + "}"; - }else { // exists parameters + } else { // exists parameters int size = userDefParamsList.size(); StringBuilder parameter = new StringBuilder(); parameter.append("("); - for (int i = 0 ;i < size - 1; i++){ + for (int i = 0; i < size - 1; i++) { parameter.append("?,"); } parameter.append("?)"); - method = "{call " + procedureParameters.getMethod() + parameter.toString()+ "}"; + method = "{call " + procedureParameters.getMethod() + parameter.toString() + "}"; } return method; } /** * print outParameter + * * @param stmt CallableStatement * @param outParameterMap outParameterMap - * @throws SQLException */ private void printOutParameter(CallableStatement stmt, Map outParameterMap) throws SQLException { Iterator> iter = outParameterMap.entrySet().iterator(); - while (iter.hasNext()){ + while (iter.hasNext()) { Map.Entry en = iter.next(); int index = en.getKey(); @@ -208,27 +226,26 @@ public class ProcedureTask extends AbstractTask { * @param paramsMap paramsMap * @param userDefParamsList userDefParamsList * @return outParameterMap - * @throws Exception */ private Map getOutParameterMap(CallableStatement stmt, Map paramsMap, Collection userDefParamsList) throws Exception { - Map outParameterMap = new HashMap<>(); - if (userDefParamsList != null && userDefParamsList.size() > 0){ + Map outParameterMap = new HashMap<>(); + if (userDefParamsList != null && userDefParamsList.size() > 0) { int index = 1; - for (Property property : userDefParamsList){ + for (Property property : userDefParamsList) { logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}" - ,property.getProp(), + , property.getProp(), property.getDirect(), property.getType(), property.getValue()); // set parameters - if (property.getDirect().equals(Direct.IN)){ + if (property.getDirect().equals(Direct.IN)) { ParameterUtils.setInParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue()); - }else if (property.getDirect().equals(Direct.OUT)){ - setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue()); + } else if (property.getDirect().equals(Direct.OUT)) { + setOutParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue()); property.setValue(paramsMap.get(property.getProp()).getValue()); - outParameterMap.put(index,property); + outParameterMap.put(index, property); } index++; } @@ -237,26 +254,22 @@ public class ProcedureTask extends AbstractTask { } /** - * set timtou + * set timeout + * * @param stmt CallableStatement - * @throws SQLException */ private void setTimeout(CallableStatement stmt) throws SQLException { - Boolean failed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED; - Boolean warnfailed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED; - if(failed || warnfailed){ + Boolean failed = taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED; + Boolean warnFailed = taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED; + if (failed || warnFailed) { stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout()); } } /** * close jdbc resource - * - * @param stmt - * @param connection */ - private void close(PreparedStatement stmt, - Connection connection){ + private void close(PreparedStatement stmt, Connection connection) { if (stmt != null) { try { stmt.close(); @@ -275,40 +288,35 @@ public class ProcedureTask extends AbstractTask { /** * get output parameter - * @param stmt - * @param index - * @param prop - * @param dataType - * @throws SQLException */ private void getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException { - switch (dataType){ + switch (dataType) { case VARCHAR: - logger.info("out prameter varchar key : {} , value : {}",prop,stmt.getString(index)); + logger.info("out prameter varchar key : {} , value : {}", prop, stmt.getString(index)); break; case INTEGER: logger.info("out prameter integer key : {} , value : {}", prop, stmt.getInt(index)); break; case LONG: - logger.info("out prameter long key : {} , value : {}",prop,stmt.getLong(index)); + logger.info("out prameter long key : {} , value : {}", prop, stmt.getLong(index)); break; case FLOAT: - logger.info("out prameter float key : {} , value : {}",prop,stmt.getFloat(index)); + logger.info("out prameter float key : {} , value : {}", prop, stmt.getFloat(index)); break; case DOUBLE: - logger.info("out prameter double key : {} , value : {}",prop,stmt.getDouble(index)); + logger.info("out prameter double key : {} , value : {}", prop, stmt.getDouble(index)); break; case DATE: - logger.info("out prameter date key : {} , value : {}",prop,stmt.getDate(index)); + logger.info("out prameter date key : {} , value : {}", prop, stmt.getDate(index)); break; case TIME: - logger.info("out prameter time key : {} , value : {}",prop,stmt.getTime(index)); + logger.info("out prameter time key : {} , value : {}", prop, stmt.getTime(index)); break; case TIMESTAMP: - logger.info("out prameter timestamp key : {} , value : {}",prop,stmt.getTimestamp(index)); + logger.info("out prameter timestamp key : {} , value : {}", prop, stmt.getTimestamp(index)); break; case BOOLEAN: - logger.info("out prameter boolean key : {} , value : {}",prop, stmt.getBoolean(index)); + logger.info("out prameter boolean key : {} , value : {}", prop, stmt.getBoolean(index)); break; default: break; @@ -322,72 +330,73 @@ public class ProcedureTask extends AbstractTask { /** * set out parameter - * @param index index - * @param stmt stmt - * @param dataType dataType - * @param value value + * + * @param index index + * @param stmt stmt + * @param dataType dataType + * @param value value * @throws Exception exception */ - private void setOutParameter(int index,CallableStatement stmt,DataType dataType,String value)throws Exception{ - if (dataType.equals(VARCHAR)){ - if (StringUtils.isEmpty(value)){ + private void setOutParameter(int index, CallableStatement stmt, DataType dataType, String value) throws Exception { + if (dataType.equals(VARCHAR)) { + if (StringUtils.isEmpty(value)) { stmt.registerOutParameter(index, Types.VARCHAR); - }else { + } else { stmt.registerOutParameter(index, Types.VARCHAR, value); } - }else if (dataType.equals(INTEGER)){ - if (StringUtils.isEmpty(value)){ + } else if (dataType.equals(INTEGER)) { + if (StringUtils.isEmpty(value)) { stmt.registerOutParameter(index, Types.INTEGER); - }else { + } else { stmt.registerOutParameter(index, Types.INTEGER, value); } - }else if (dataType.equals(LONG)){ - if (StringUtils.isEmpty(value)){ - stmt.registerOutParameter(index,Types.INTEGER); - }else { - stmt.registerOutParameter(index,Types.INTEGER ,value); + } else if (dataType.equals(LONG)) { + if (StringUtils.isEmpty(value)) { + stmt.registerOutParameter(index, Types.INTEGER); + } else { + stmt.registerOutParameter(index, Types.INTEGER, value); } - }else if (dataType.equals(FLOAT)){ - if (StringUtils.isEmpty(value)){ + } else if (dataType.equals(FLOAT)) { + if (StringUtils.isEmpty(value)) { stmt.registerOutParameter(index, Types.FLOAT); - }else { - stmt.registerOutParameter(index, Types.FLOAT,value); + } else { + stmt.registerOutParameter(index, Types.FLOAT, value); } - }else if (dataType.equals(DOUBLE)){ - if (StringUtils.isEmpty(value)){ + } else if (dataType.equals(DOUBLE)) { + if (StringUtils.isEmpty(value)) { stmt.registerOutParameter(index, Types.DOUBLE); - }else { - stmt.registerOutParameter(index, Types.DOUBLE , value); + } else { + stmt.registerOutParameter(index, Types.DOUBLE, value); } - }else if (dataType.equals(DATE)){ - if (StringUtils.isEmpty(value)){ + } else if (dataType.equals(DATE)) { + if (StringUtils.isEmpty(value)) { stmt.registerOutParameter(index, Types.DATE); - }else { - stmt.registerOutParameter(index, Types.DATE , value); + } else { + stmt.registerOutParameter(index, Types.DATE, value); } - }else if (dataType.equals(TIME)){ - if (StringUtils.isEmpty(value)){ + } else if (dataType.equals(TIME)) { + if (StringUtils.isEmpty(value)) { stmt.registerOutParameter(index, Types.TIME); - }else { - stmt.registerOutParameter(index, Types.TIME , value); + } else { + stmt.registerOutParameter(index, Types.TIME, value); } - }else if (dataType.equals(TIMESTAMP)){ - if (StringUtils.isEmpty(value)){ + } else if (dataType.equals(TIMESTAMP)) { + if (StringUtils.isEmpty(value)) { stmt.registerOutParameter(index, Types.TIMESTAMP); - }else { - stmt.registerOutParameter(index, Types.TIMESTAMP , value); + } else { + stmt.registerOutParameter(index, Types.TIMESTAMP, value); } - }else if (dataType.equals(BOOLEAN)){ - if (StringUtils.isEmpty(value)){ + } else if (dataType.equals(BOOLEAN)) { + if (StringUtils.isEmpty(value)) { stmt.registerOutParameter(index, Types.BOOLEAN); - }else { - stmt.registerOutParameter(index, Types.BOOLEAN , value); + } else { + stmt.registerOutParameter(index, Types.BOOLEAN, value); } } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index 06df56491d..6efa2430b4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -429,8 +429,8 @@ public class SqlTask extends AbstractTask { */ private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) throws Exception { // is the timeout set - boolean timeoutFlag = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED - || TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED; + boolean timeoutFlag = taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED + || taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED; PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql()); if (timeoutFlag) { stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout()); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java index 6837a87276..7135d6bbf9 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java @@ -42,15 +42,11 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; @RunWith(MockitoJUnitRunner.Silent.class) public class ConditionsTaskTest { - private static final Logger logger = LoggerFactory.getLogger(DependentTaskTest.class); - /** * TaskNode.runFlag : task can be run normally */ @@ -120,7 +116,7 @@ public class ConditionsTaskTest { } @Test - public void testBasicFailure() throws Exception { + public void testBasicFailure() { TaskInstance taskInstance = testBasicInit(ExecutionStatus.FAILURE); ConditionsTaskExecThread taskExecThread = new ConditionsTaskExecThread(taskInstance); taskExecThread.call(); @@ -131,7 +127,7 @@ public class ConditionsTaskTest { TaskNode taskNode = new TaskNode(); taskNode.setId("tasks-1000"); taskNode.setName("C"); - taskNode.setType(TaskType.CONDITIONS.toString()); + taskNode.setType(TaskType.CONDITIONS.getDesc()); taskNode.setRunFlag(FLOWNODE_RUN_FLAG_NORMAL); DependentItem dependentItem = new DependentItem(); @@ -171,7 +167,7 @@ public class ConditionsTaskTest { TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1000); taskInstance.setName(taskNode.getName()); - taskInstance.setTaskType(taskNode.getType()); + taskInstance.setTaskType(taskNode.getType().toUpperCase()); taskInstance.setProcessInstanceId(processInstance.getId()); return taskInstance; } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java index abe841fc41..4f80f5d36b 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java @@ -37,8 +37,6 @@ import org.apache.dolphinscheduler.server.master.runner.DependentTaskExecThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; -import java.util.ArrayList; -import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -92,7 +90,7 @@ public class DependentTaskTest { processService = Mockito.mock(ProcessService.class); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); - processInstance = getProcessInstance(100, 1); + processInstance = getProcessInstance(); // for MasterBaseTaskExecThread.call // for DependentTaskExecThread.waitTaskQuit @@ -142,7 +140,7 @@ public class DependentTaskTest { } @Test - public void testBasicSuccess() throws Exception { + public void testBasicSuccess() { testBasicInit(); ProcessInstance dependentProcessInstance = getProcessInstanceForFindLastRunningProcess(200, ExecutionStatus.FAILURE); @@ -165,7 +163,7 @@ public class DependentTaskTest { } @Test - public void testBasicFailure() throws Exception { + public void testBasicFailure() { testBasicInit(); ProcessInstance dependentProcessInstance = getProcessInstanceForFindLastRunningProcess(200, ExecutionStatus.SUCCESS); @@ -188,7 +186,7 @@ public class DependentTaskTest { } @Test - public void testDependentRelation() throws Exception { + public void testDependentRelation() { DependentTaskModel dependentTaskModel1 = new DependentTaskModel(); dependentTaskModel1.setRelation(DependentRelation.AND); dependentTaskModel1.setDependItemList(Stream.of( @@ -271,7 +269,7 @@ public class DependentTaskTest { } @Test - public void testDependentOnAllSuccess() throws Exception { + public void testDependentOnAllSuccess() { testDependentOnAllInit(); // for DependentExecute.findLastProcessInterval Mockito.when(processService @@ -284,7 +282,7 @@ public class DependentTaskTest { } @Test - public void testDependentOnAllFailure() throws Exception { + public void testDependentOnAllFailure() { testDependentOnAllInit(); // for DependentExecute.findLastProcessInterval Mockito.when(processService @@ -300,7 +298,7 @@ public class DependentTaskTest { * test whether waitTaskQuit has been well impl */ @Test - public void testWaitAndCancel() throws Exception { + public void testWaitAndCancel() { // for the poor independence of UT, error on other place may causes the condition happens if (!Stopper.isRunning()) { return; @@ -346,9 +344,9 @@ public class DependentTaskTest { Assert.assertEquals(ExecutionStatus.KILL, taskExecThread.getTaskInstance().getState()); } - private ProcessInstance getProcessInstance(int processInstanceId, int processDefinitionId) { + private ProcessInstance getProcessInstance() { ProcessInstance processInstance = new ProcessInstance(); - processInstance.setId(processInstanceId); + processInstance.setId(100); processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); return processInstance; } @@ -361,7 +359,7 @@ public class DependentTaskTest { TaskNode taskNode = new TaskNode(); taskNode.setId("tasks-10"); taskNode.setName("D"); - taskNode.setType(TaskType.DEPENDENT.toString()); + taskNode.setType(TaskType.DEPENDENT.getDesc()); taskNode.setRunFlag(FLOWNODE_RUN_FLAG_NORMAL); return taskNode; } @@ -382,7 +380,7 @@ public class DependentTaskTest { taskInstance.setTaskDefinitionVersion(TASK_VERSION); taskInstance.setProcessInstanceId(processInstance.getId()); taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); - taskInstance.setTaskType(taskNode.getType()); + taskInstance.setTaskType(taskNode.getType().toUpperCase()); taskInstance.setDependency(JSONUtils.parseObject(taskNode.getDependence(), DependentParameters.class)); taskInstance.setName(taskNode.getName()); } @@ -414,7 +412,7 @@ public class DependentTaskTest { String taskName, ProcessInstance processInstance ) { TaskInstance taskInstance = new TaskInstance(); - taskInstance.setTaskType("DEPENDENT"); + taskInstance.setTaskType(TaskType.DEPENDENT.getDesc()); taskInstance.setId(taskInstanceId); taskInstance.setName(taskName); taskInstance.setProcessInstanceId(processInstance.getId()); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java index 3912d2e5ae..94d4238630 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java @@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.config.MasterConfig; @@ -122,7 +121,7 @@ public class SubProcessTaskTest { TaskNode taskNode = new TaskNode(); taskNode.setId("tasks-10"); taskNode.setName("S"); - taskNode.setType(TaskType.SUB_PROCESS.toString()); + taskNode.setType(TaskType.SUB_PROCESS.getDesc()); taskNode.setRunFlag(FLOWNODE_RUN_FLAG_NORMAL); return taskNode; } @@ -147,9 +146,9 @@ public class SubProcessTaskTest { TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1000); taskInstance.setName("S"); - taskInstance.setTaskType(TaskType.SUB_PROCESS.toString()); + taskInstance.setTaskType(TaskType.SUB_PROCESS.getDesc()); taskInstance.setName(taskNode.getName()); - taskInstance.setTaskType(taskNode.getType()); + taskInstance.setTaskType(taskNode.getType().toUpperCase()); taskInstance.setProcessInstanceId(processInstance.getId()); taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); return taskInstance; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index 9523727669..b3049fdb4b 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -22,9 +22,8 @@ import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ResourceType; -import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; @@ -106,7 +105,7 @@ public class TaskPriorityQueueConsumerTest { public void testSHELLTask() throws Exception { TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1); - taskInstance.setTaskType("SHELL"); + taskInstance.setTaskType(TaskType.SHELL.getDesc()); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); taskInstance.setProcessInstancePriority(Priority.MEDIUM); @@ -137,7 +136,7 @@ public class TaskPriorityQueueConsumerTest { public void testSQLTask() throws Exception { TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1); - taskInstance.setTaskType("SQL"); + taskInstance.setTaskType(TaskType.SQL.getDesc()); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); taskInstance.setProcessInstancePriority(Priority.MEDIUM); @@ -180,7 +179,7 @@ public class TaskPriorityQueueConsumerTest { public void testDataxTask() throws Exception { TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1); - taskInstance.setTaskType("DATAX"); + taskInstance.setTaskType(TaskType.DATAX.getDesc()); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); taskInstance.setProcessInstancePriority(Priority.MEDIUM); @@ -221,7 +220,7 @@ public class TaskPriorityQueueConsumerTest { public void testSqoopTask() throws Exception { TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1); - taskInstance.setTaskType("SQOOP"); + taskInstance.setTaskType(TaskType.SQOOP.getDesc()); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); taskInstance.setProcessInstancePriority(Priority.MEDIUM); @@ -262,7 +261,7 @@ public class TaskPriorityQueueConsumerTest { public void testTaskInstanceIsFinalState() { TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1); - taskInstance.setTaskType("SHELL"); + taskInstance.setTaskType(TaskType.SHELL.getDesc()); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); taskInstance.setProcessInstancePriority(Priority.MEDIUM); @@ -279,7 +278,7 @@ public class TaskPriorityQueueConsumerTest { public void testNotFoundWorkerGroup() throws Exception { TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1); - taskInstance.setTaskType("SHELL"); + taskInstance.setTaskType(TaskType.SHELL.getDesc()); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); taskInstance.setProcessInstancePriority(Priority.MEDIUM); @@ -311,10 +310,10 @@ public class TaskPriorityQueueConsumerTest { } @Test - public void testDispatch() throws Exception { + public void testDispatch() { TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1); - taskInstance.setTaskType("SHELL"); + taskInstance.setTaskType(TaskType.SHELL.getDesc()); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); taskInstance.setProcessInstancePriority(Priority.MEDIUM); @@ -338,7 +337,7 @@ public class TaskPriorityQueueConsumerTest { TaskPriority taskPriority = new TaskPriority(); taskPriority.setTaskId(1); - boolean res = taskPriorityQueueConsumer.dispatch(taskPriority); + boolean res = taskPriorityQueueConsumer.dispatch(taskPriority); Assert.assertFalse(res); } @@ -348,7 +347,7 @@ public class TaskPriorityQueueConsumerTest { TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1); - taskInstance.setTaskType("SHELL"); + taskInstance.setTaskType(TaskType.SHELL.getDesc()); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); taskInstance.setProcessInstancePriority(Priority.MEDIUM); @@ -370,16 +369,16 @@ public class TaskPriorityQueueConsumerTest { Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); - TaskExecutionContext taskExecutionContext = taskPriorityQueueConsumer.getTaskExecutionContext(1); + TaskExecutionContext taskExecutionContext = taskPriorityQueueConsumer.getTaskExecutionContext(1); Assert.assertNotNull(taskExecutionContext); } @Test - public void testGetResourceFullNames() throws Exception { + public void testGetResourceFullNames() { TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1); - taskInstance.setTaskType("SHELL"); + taskInstance.setTaskType(TaskType.SHELL.getDesc()); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); taskInstance.setProcessInstancePriority(Priority.MEDIUM); @@ -387,7 +386,7 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setExecutorId(2); // task node - Map map = taskPriorityQueueConsumer.getResourceFullNames(taskInstance); + Map map = taskPriorityQueueConsumer.getResourceFullNames(taskInstance); List resourcesList = new ArrayList(); Resource resource = new Resource(); @@ -401,19 +400,19 @@ public class TaskPriorityQueueConsumerTest { } @Test - public void testVerifyTenantIsNull() throws Exception { + public void testVerifyTenantIsNull() { Tenant tenant = null; TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1); - taskInstance.setTaskType("SHELL"); + taskInstance.setTaskType(TaskType.SHELL.getDesc()); taskInstance.setProcessInstanceId(1); ProcessInstance processInstance = new ProcessInstance(); processInstance.setId(1); taskInstance.setProcessInstance(processInstance); - boolean res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant,taskInstance); + boolean res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant, taskInstance); Assert.assertTrue(res); tenant = new Tenant(); @@ -423,7 +422,7 @@ public class TaskPriorityQueueConsumerTest { tenant.setQueueId(1); tenant.setCreateTime(new Date()); tenant.setUpdateTime(new Date()); - res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant,taskInstance); + res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant, taskInstance); Assert.assertFalse(res); } @@ -442,15 +441,15 @@ public class TaskPriorityQueueConsumerTest { taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext, taskInstance); - Assert.assertEquals(1,dataxTaskExecutionContext.getDataSourceId()); - Assert.assertEquals(1,dataxTaskExecutionContext.getDataTargetId()); + Assert.assertEquals(1, dataxTaskExecutionContext.getDataSourceId()); + Assert.assertEquals(1, dataxTaskExecutionContext.getDataTargetId()); } @Test public void testRun() throws Exception { TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1); - taskInstance.setTaskType("SHELL"); + taskInstance.setTaskType(TaskType.SHELL.getDesc()); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); taskInstance.setProcessInstancePriority(Priority.MEDIUM); @@ -478,7 +477,7 @@ public class TaskPriorityQueueConsumerTest { taskPriorityQueueConsumer.run(); TimeUnit.SECONDS.sleep(10); - Assert.assertNotEquals(-1,taskPriorityQueue.size()); + Assert.assertNotEquals(-1, taskPriorityQueue.size()); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java index 9229e59d59..e8aeda0c9b 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -111,7 +112,7 @@ public class MasterTaskExecThreadTest { private TaskInstance getTaskInstance(){ TaskInstance taskInstance = new TaskInstance(); - taskInstance.setTaskType("SHELL"); + taskInstance.setTaskType(TaskType.SHELL.getDesc()); taskInstance.setId(252612); taskInstance.setName("C"); taskInstance.setProcessInstanceId(10111); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java index 8fef8aba0b..8aee9c3360 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.worker.processor; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -171,7 +172,7 @@ public class TaskExecuteProcessorTest { taskExecutionContext.setProcessId(12345); taskExecutionContext.setProcessInstanceId(1); taskExecutionContext.setTaskInstanceId(1); - taskExecutionContext.setTaskType("sql"); + taskExecutionContext.setTaskType(TaskType.SQL.getDesc()); taskExecutionContext.setFirstSubmitTime(new Date()); taskExecutionContext.setDelayTime(0); taskExecutionContext.setLogPath("/tmp/test.log"); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java index 72c63f6cee..ee7c2bb995 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.worker.runner; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.utils.CommonUtils; @@ -77,7 +78,7 @@ public class TaskExecuteThreadTest { taskExecutionContext.setProcessId(12345); taskExecutionContext.setProcessInstanceId(1); taskExecutionContext.setTaskInstanceId(1); - taskExecutionContext.setTaskType(""); + taskExecutionContext.setTaskType(TaskType.SHELL.getDesc()); taskExecutionContext.setFirstSubmitTime(new Date()); taskExecutionContext.setDelayTime(0); taskExecutionContext.setLogPath("/tmp/test.log"); @@ -124,7 +125,7 @@ public class TaskExecuteThreadTest { @Test public void testNormalExecution() { - taskExecutionContext.setTaskType("SQL"); + taskExecutionContext.setTaskType(TaskType.SQL.getDesc()); taskExecutionContext.setStartTime(new Date()); taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); taskExecutionContext.setTenantCode("test"); @@ -139,7 +140,7 @@ public class TaskExecuteThreadTest { @Test public void testDelayExecution() { - taskExecutionContext.setTaskType("PYTHON"); + taskExecutionContext.setTaskType(TaskType.PYTHON.getDesc()); taskExecutionContext.setStartTime(null); taskExecutionContext.setDelayTime(1); taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java index 99a1143fd9..db52111199 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.worker.runner; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.thread.Stopper; @@ -82,7 +83,7 @@ public class WorkerManagerThreadTest { taskExecutionContext.setProcessInstanceId(1); taskExecutionContext.setTaskInstanceId(1); taskExecutionContext.setTenantCode("test"); - taskExecutionContext.setTaskType(""); + taskExecutionContext.setTaskType(TaskType.SHELL.getDesc()); taskExecutionContext.setFirstSubmitTime(new Date()); taskExecutionContext.setDelayTime(0); taskExecutionContext.setLogPath("/tmp/test.log"); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java index e5bf3bfc40..892299cab0 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java @@ -19,9 +19,8 @@ package org.apache.dolphinscheduler.server.worker.task; import static org.mockito.ArgumentMatchers.anyString; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; -import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask; import org.apache.dolphinscheduler.server.worker.task.shell.ShellTaskTest; @@ -61,7 +60,7 @@ public class ShellTaskReturnTest { taskExecutionContext = new TaskExecutionContext(); taskExecutionContext.setTaskInstanceId(1); taskExecutionContext.setTaskName("kris test"); - taskExecutionContext.setTaskType("SHELL"); + taskExecutionContext.setTaskType(TaskType.SHELL.getDesc()); taskExecutionContext.setHost("127.0.0.1:1234"); taskExecutionContext.setExecutePath("/tmp"); taskExecutionContext.setLogPath("/log"); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java index 76539ae361..856b30e972 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker.task; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -48,7 +49,7 @@ import org.slf4j.LoggerFactory; @PrepareForTest({SpringApplicationContext.class}) public class TaskManagerTest { - private static Logger logger = LoggerFactory.getLogger(TaskManagerTest.class); + private static final Logger logger = LoggerFactory.getLogger(TaskManagerTest.class); private TaskExecutionContext taskExecutionContext; @@ -65,7 +66,7 @@ public class TaskManagerTest { taskExecutionContext.setProcessId(12345); taskExecutionContext.setProcessInstanceId(1); taskExecutionContext.setTaskInstanceId(1); - taskExecutionContext.setTaskType(""); + taskExecutionContext.setTaskType(TaskType.SHELL.getDesc()); taskExecutionContext.setFirstSubmitTime(new Date()); taskExecutionContext.setDelayTime(0); taskExecutionContext.setLogPath("/tmp/test.log"); @@ -91,23 +92,23 @@ public class TaskManagerTest { @Test public void testNewTask() { - taskExecutionContext.setTaskType("SHELL"); + taskExecutionContext.setTaskType(TaskType.SHELL.getDesc()); Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService)); - taskExecutionContext.setTaskType("WATERDROP"); + taskExecutionContext.setTaskType(TaskType.WATERDROP.getDesc()); Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService)); - taskExecutionContext.setTaskType("HTTP"); + taskExecutionContext.setTaskType(TaskType.HTTP.getDesc()); Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService)); - taskExecutionContext.setTaskType("MR"); + taskExecutionContext.setTaskType(TaskType.MR.getDesc()); Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService)); - taskExecutionContext.setTaskType("SPARK"); + taskExecutionContext.setTaskType(TaskType.SPARK.getDesc()); Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService)); - taskExecutionContext.setTaskType("FLINK"); + taskExecutionContext.setTaskType(TaskType.FLINK.getDesc()); Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService)); - taskExecutionContext.setTaskType("PYTHON"); + taskExecutionContext.setTaskType(TaskType.PYTHON.getDesc()); Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService)); - taskExecutionContext.setTaskType("DATAX"); + taskExecutionContext.setTaskType(TaskType.DATAX.getDesc()); Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService)); - taskExecutionContext.setTaskType("SQOOP"); + taskExecutionContext.setTaskType(TaskType.SQOOP.getDesc()); Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService)); } @@ -120,7 +121,7 @@ public class TaskManagerTest { @Test(expected = IllegalArgumentException.class) public void testNewTaskIsNotExists() { - taskExecutionContext.setTaskType("XXX"); + taskExecutionContext.setTaskType(TaskType.SHELL.getDesc()); TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService); } @@ -128,7 +129,7 @@ public class TaskManagerTest { public void testShellTaskReturnString() { taskExecutionContext.setTaskInstanceId(1); taskExecutionContext.setTaskName("kris test"); - taskExecutionContext.setTaskType("SHELL"); + taskExecutionContext.setTaskType(TaskType.SHELL.getDesc()); taskExecutionContext.setHost("127.0.0.1:1234"); taskExecutionContext.setExecutePath("/tmp"); taskExecutionContext.setLogPath("/log"); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java index 160e92f612..bd02f61f17 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.task.shell; import static org.mockito.ArgumentMatchers.anyString; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; @@ -27,11 +28,12 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.ArrayList; import java.sql.DriverManager; +import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.Map; + import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -64,7 +66,7 @@ public class ShellTaskTest { taskExecutionContext = new TaskExecutionContext(); taskExecutionContext.setTaskInstanceId(1); taskExecutionContext.setTaskName("kris test"); - taskExecutionContext.setTaskType("SHELL"); + taskExecutionContext.setTaskType(TaskType.SHELL.getDesc()); taskExecutionContext.setHost("127.0.0.1:1234"); taskExecutionContext.setExecutePath("/tmp"); taskExecutionContext.setLogPath("/log"); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index dcf8502676..20c88a8417 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -1428,16 +1428,19 @@ public class ProcessService { // get task instance TaskInstance taskInstance = findTaskInstanceById(taskInstId); if (taskInstance == null) { - return taskInstance; + return null; } // get process instance ProcessInstance processInstance = findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); // get process define ProcessDefinition processDefine = findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); - taskInstance.setProcessInstance(processInstance); taskInstance.setProcessDefine(processDefine); + TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion( + taskInstance.getTaskCode(), + taskInstance.getTaskDefinitionVersion()); + taskInstance.setTaskDefine(taskDefinition); return taskInstance; } @@ -2223,8 +2226,8 @@ public class ProcessService { private void setTaskFromTaskNode(TaskNode taskNode, TaskDefinition taskDefinition) { taskDefinition.setName(taskNode.getName()); taskDefinition.setDescription(taskNode.getDesc()); - taskDefinition.setTaskType(TaskType.of(taskNode.getType())); - taskDefinition.setTaskParams(TaskType.of(taskNode.getType()) == TaskType.DEPENDENT ? taskNode.getDependence() : taskNode.getParams()); + taskDefinition.setTaskType(taskNode.getType().toUpperCase()); + taskDefinition.setTaskParams(TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskNode.getType()) ? taskNode.getDependence() : taskNode.getParams()); taskDefinition.setFlag(taskNode.isForbidden() ? Flag.NO : Flag.YES); taskDefinition.setTaskPriority(taskNode.getTaskInstancePriority()); taskDefinition.setWorkerGroup(taskNode.getWorkerGroup()); @@ -2244,8 +2247,7 @@ public class ProcessService { */ public String getResourceIds(TaskDefinition taskDefinition) { Set resourceIds = null; - // TODO modify taskDefinition.getTaskType() - AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType().getDescp().toUpperCase(), taskDefinition.getTaskParams()); + AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType(), taskDefinition.getTaskParams()); if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) { resourceIds = params.getResourceFilesList(). @@ -2505,12 +2507,12 @@ public class ProcessService { v.setCode(taskDefinitionLog.getCode() + ""); v.setName(taskDefinitionLog.getName()); v.setDesc(taskDefinitionLog.getDescription()); - v.setType(taskDefinitionLog.getTaskType().getDescp().toUpperCase()); + v.setType(taskDefinitionLog.getTaskType().toUpperCase()); v.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_NORMAL : Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); v.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes()); v.setRetryInterval(taskDefinitionLog.getFailRetryInterval()); - v.setParams(taskDefinitionLog.getTaskType() == TaskType.DEPENDENT ? null : taskDefinitionLog.getTaskParams()); - v.setDependence(taskDefinitionLog.getTaskType() == TaskType.DEPENDENT ? taskDefinitionLog.getTaskParams() : null); + v.setParams(TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskDefinitionLog.getTaskType()) ? null : taskDefinitionLog.getTaskParams()); + v.setDependence(TaskType.DEPENDENT.getDesc().equalsIgnoreCase(taskDefinitionLog.getTaskType()) ? taskDefinitionLog.getTaskParams() : null); v.setTaskInstancePriority(taskDefinitionLog.getTaskPriority()); v.setWorkerGroup(taskDefinitionLog.getWorkerGroup()); v.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN, @@ -2550,7 +2552,7 @@ public class ProcessService { taskNode.setName(taskDefinition.getName()); taskNode.setName(taskDefinition.getName()); taskNode.setDesc(taskDefinition.getDescription()); - taskNode.setType(taskDefinition.getTaskType().getDescp()); + taskNode.setType(taskDefinition.getTaskType()); taskNode.setRunFlag(taskDefinition.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_FORBIDDEN : Constants.FLOWNODE_RUN_FLAG_NORMAL); taskNode.setMaxRetryTimes(taskDefinition.getFailRetryTimes()); taskNode.setRetryInterval(taskDefinition.getFailRetryInterval()); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index dc9cc414ec..3bee978a30 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -411,7 +411,7 @@ public class ProcessServiceTest { taskDefinition.setCode(3L); taskDefinition.setName("1-test"); taskDefinition.setProjectCode(1L); - taskDefinition.setTaskType(TaskType.SHELL); + taskDefinition.setTaskType(TaskType.SHELL.getDesc()); taskDefinition.setUserId(1); taskDefinition.setVersion(2); taskDefinition.setCreateTime(new Date()); @@ -421,7 +421,7 @@ public class ProcessServiceTest { td2.setCode(2L); td2.setName("unit-test"); td2.setProjectCode(1L); - td2.setTaskType(TaskType.SHELL); + td2.setTaskType(TaskType.SHELL.getDesc()); td2.setUserId(1); td2.setVersion(1); td2.setCreateTime(new Date()); @@ -472,7 +472,7 @@ public class ProcessServiceTest { taskDefinition.setCode(3L); taskDefinition.setName("1-test"); taskDefinition.setProjectCode(1L); - taskDefinition.setTaskType(TaskType.SHELL); + taskDefinition.setTaskType(TaskType.SHELL.getDesc()); taskDefinition.setUserId(1); taskDefinition.setVersion(2); taskDefinition.setCreateTime(new Date()); @@ -482,7 +482,7 @@ public class ProcessServiceTest { td2.setCode(2L); td2.setName("unit-test"); td2.setProjectCode(1L); - td2.setTaskType(TaskType.SHELL); + td2.setTaskType(TaskType.SHELL.getDesc()); td2.setUserId(1); td2.setVersion(1); td2.setCreateTime(new Date()); diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql index b9147bea84..5997046f41 100644 --- a/sql/dolphinscheduler_mysql.sql +++ b/sql/dolphinscheduler_mysql.sql @@ -455,7 +455,7 @@ CREATE TABLE `t_ds_task_definition` ( `description` text COMMENT 'description', `project_code` bigint(20) NOT NULL COMMENT 'project code', `user_id` int(11) DEFAULT NULL COMMENT 'task definition creator id', - `task_type` tinyint(4) DEFAULT NULL COMMENT '0 shell,1 sql,2 sub_process,3 procedure,4 mr,5 spark,6 python,7 dependent,8 flink,9 http,10 datax,11 conditions,12 sqoop,13 waterdrop', + `task_type` varchar(50) DEFAULT NOT NULL COMMENT 'task type', `task_params` text COMMENT 'job custom parameters', `flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available', `task_priority` tinyint(4) DEFAULT NULL COMMENT 'job priority', @@ -484,7 +484,7 @@ CREATE TABLE `t_ds_task_definition_log` ( `description` text COMMENT 'description', `project_code` bigint(20) NOT NULL COMMENT 'project code', `user_id` int(11) DEFAULT NULL COMMENT 'task definition creator id', - `task_type` tinyint(4) DEFAULT NULL COMMENT '0 shell,1 sql,2 sub_process,3 procedure,4 mr,5 spark,6 python,7 dependent,8 flink,9 http,10 datax,11 conditions,12 sqoop,13 waterdrop', + `task_type` varchar(50) DEFAULT NOT NULL COMMENT 'task type', `task_params` text COMMENT 'job custom parameters', `flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available', `task_priority` tinyint(4) DEFAULT NULL COMMENT 'job priority', @@ -814,7 +814,7 @@ DROP TABLE IF EXISTS `t_ds_task_instance`; CREATE TABLE `t_ds_task_instance` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key', `name` varchar(255) DEFAULT NULL COMMENT 'task name', - `task_type` varchar(64) DEFAULT NULL COMMENT 'task type', + `task_type` varchar(50) DEFAULT NOT NULL COMMENT 'task type', `task_code` bigint(20) NOT NULL COMMENT 'task definition code', `task_definition_version` int(11) DEFAULT NULL COMMENT 'task definition version', `process_instance_id` int(11) DEFAULT NULL COMMENT 'process instance id', diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql index 42ebade38c..ae12df56cb 100644 --- a/sql/dolphinscheduler_postgre.sql +++ b/sql/dolphinscheduler_postgre.sql @@ -343,7 +343,7 @@ CREATE TABLE t_ds_task_definition ( description text , project_code bigint DEFAULT NULL , user_id int DEFAULT NULL , - task_type int DEFAULT NULL , + task_type varchar(50) DEFAULT NULL , task_params text , flag int DEFAULT NULL , task_priority int DEFAULT NULL , @@ -371,7 +371,7 @@ CREATE TABLE t_ds_task_definition_log ( description text , project_code bigint DEFAULT NULL , user_id int DEFAULT NULL , - task_type int DEFAULT NULL , + task_type varchar(50) DEFAULT NULL , task_params text , flag int DEFAULT NULL , task_priority int DEFAULT NULL , @@ -671,7 +671,7 @@ DROP TABLE IF EXISTS t_ds_task_instance; CREATE TABLE t_ds_task_instance ( id int NOT NULL , name varchar(255) DEFAULT NULL , - task_type varchar(64) DEFAULT NULL , + task_type varchar(50) DEFAULT NULL , task_code bigint NOT NULL, task_definition_version int DEFAULT NULL , process_instance_id int DEFAULT NULL ,