diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java index f36436176f..37905d30fe 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java @@ -134,7 +134,7 @@ public class DataxTask extends AbstractTaskExecutor { logger.info("datax task params {}", taskExecutionContext.getTaskParams()); dataXParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DataxParameters.class); - if (!dataXParameters.checkParameters()) { + if (dataXParameters == null || !dataXParameters.checkParameters()) { throw new RuntimeException("datax task params is not valid"); } @@ -144,7 +144,7 @@ public class DataxTask extends AbstractTaskExecutor { /** * run DataX process * - * @throws Exception if error throws Exception + * @throws TaskException if error throws Exception */ @Override public void handle() throws TaskException { @@ -152,7 +152,7 @@ public class DataxTask extends AbstractTaskExecutor { // replace placeholder,and combine local and global parameters Map paramsMap = taskExecutionContext.getPrepareParamsMap(); - // run datax procesDataSourceService.s + // run datax processDataSourceService String jsonFilePath = buildDataxJsonFile(paramsMap); String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap); TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath); @@ -231,7 +231,6 @@ public class DataxTask extends AbstractTaskExecutor { * build datax job config * * @return collection of datax job config JSONObject - * @throws SQLException if error throws SQLException */ private List buildDataxJobContentJson() { @@ -391,18 +390,17 @@ public class DataxTask extends AbstractTaskExecutor { } // datax python command - StringBuilder sbr = new StringBuilder(); - sbr.append(getPythonCommand()); - sbr.append(" "); - sbr.append(DATAX_PATH); - sbr.append(" "); - sbr.append(loadJvmEnv(dataXParameters)); - sbr.append(addCustomParameters(paramsMap)); - sbr.append(" "); - sbr.append(jobConfigFilePath); + String sbr = getPythonCommand() + + " " + + DATAX_PATH + + " " + + loadJvmEnv(dataXParameters) + + addCustomParameters(paramsMap) + + " " + + jobConfigFilePath; // replace placeholder - String dataxCommand = ParameterUtils.convertParameterPlaceholders(sbr.toString(), ParamUtils.convert(paramsMap)); + String dataxCommand = ParameterUtils.convertParameterPlaceholders(sbr, ParamUtils.convert(paramsMap)); logger.debug("raw script : {}", dataxCommand); @@ -422,10 +420,14 @@ public class DataxTask extends AbstractTaskExecutor { } private StringBuilder addCustomParameters(Map paramsMap) { - StringBuilder customParameters = new StringBuilder("-p\""); + if (paramsMap == null || paramsMap.size() == 0) { + return new StringBuilder(); + } + StringBuilder customParameters = new StringBuilder("-p \""); for (Map.Entry entry : paramsMap.entrySet()) { customParameters.append(String.format(CUSTOM_PARAM, entry.getKey(), entry.getValue().getValue())); } + customParameters.replace(4, 5, ""); customParameters.append("\""); return customParameters; } @@ -528,12 +530,12 @@ public class DataxTask extends AbstractTaskExecutor { } } else { throw new RuntimeException( - String.format("grammatical analysis sql column [ %s ] failed", item.toString())); + String.format("grammatical analysis sql column [ %s ] failed", item)); } if (columnName == null) { throw new RuntimeException( - String.format("grammatical analysis sql column [ %s ] failed", item.toString())); + String.format("grammatical analysis sql column [ %s ] failed", item)); } columnNames[i] = columnName; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java new file mode 100644 index 0000000000..636ba839c0 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.datax; + +import org.apache.commons.lang3.SystemUtils; +import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; +import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; +import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; +import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskRunStatus; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; +import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; +import org.apache.dolphinscheduler.spi.enums.DbType; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Field; +import java.nio.file.Files; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +@ExtendWith(MockitoExtension.class) +public class DataxTaskTest { + + private DataxTask dataxTask; + + private final TaskCallBack taskCallBack = (taskInstanceId, appIds) -> {}; + + @BeforeEach + public void before() throws Exception { + TaskExecutionContext taskExecutionContext = mock(TaskExecutionContext.class); + ResourceParametersHelper resourceParametersHelper = new ResourceParametersHelper(); + String parameters = JSONUtils.toJsonString(createDataxParameters()); + when(taskExecutionContext.getTaskParams()).thenReturn(parameters); + taskExecutionContext.setResourceParametersHelper(resourceParametersHelper); + this.dataxTask = new DataxTask(taskExecutionContext); + this.dataxTask.init(); + } + + @Test + public void testHandleNullParamsMap() throws Exception { + String parameters = JSONUtils.toJsonString(createDataxParameters()); + TaskExecutionContext taskExecutionContext = buildTestTaskExecutionContext(); + taskExecutionContext.setPrepareParamsMap(null); + taskExecutionContext.setTaskParams(parameters); + DataxTask dataxTask = new DataxTask(taskExecutionContext); + dataxTask.init(); + + ShellCommandExecutor shellCommandExecutor = mock(ShellCommandExecutor.class); + Field shellCommandExecutorFiled = DataxTask.class.getDeclaredField("shellCommandExecutor"); + shellCommandExecutorFiled.setAccessible(true); + shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor); + + TaskResponse taskResponse = new TaskResponse(); + taskResponse.setStatus(TaskRunStatus.SUCCESS); + taskResponse.setExitStatusCode(0); + taskResponse.setProcessId(1); + when(shellCommandExecutor.run(anyString())).thenReturn(taskResponse); + + dataxTask.handle(taskCallBack); + Assertions.assertEquals(0, dataxTask.getExitStatusCode()); + + File jsonFile = new File("/tmp/execution/app-id_job.json"); + InputStream json = Files.newInputStream(jsonFile.toPath()); + String resultStr = FileUtils.readFile2Str(json); + Assertions.assertEquals(resultStr, getJsonString()); + boolean delete = jsonFile.delete(); + Assertions.assertTrue(delete); + + File shellCommandFile = SystemUtils.IS_OS_WINDOWS ? + new File("/tmp/execution/app-id_node.bat") : + new File("/tmp/execution/app-id_node.sh"); + InputStream shellCommandInputStream = Files.newInputStream(shellCommandFile.toPath()); + String shellCommandStr = FileUtils.readFile2Str(shellCommandInputStream); + Assertions.assertEquals(shellCommandStr, "python2.7 ${DATAX_HOME}/bin/datax.py --jvm=\"-Xms1G -Xmx1G\" " + + " /tmp/execution/app-id_job.json"); + delete = shellCommandFile.delete(); + Assertions.assertTrue(delete); + } + + @Test + public void testHandleParamsMap() throws Exception { + String parameters = JSONUtils.toJsonString(createDataxParameters()); + TaskExecutionContext taskExecutionContext = buildTestTaskExecutionContext(); + + taskExecutionContext.setPrepareParamsMap(createPrepareParamsMap()); + taskExecutionContext.setTaskParams(parameters); + DataxTask dataxTask = new DataxTask(taskExecutionContext); + dataxTask.init(); + + ShellCommandExecutor shellCommandExecutor = mock(ShellCommandExecutor.class); + Field shellCommandExecutorFiled = DataxTask.class.getDeclaredField("shellCommandExecutor"); + shellCommandExecutorFiled.setAccessible(true); + shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor); + + TaskResponse taskResponse = new TaskResponse(); + taskResponse.setStatus(TaskRunStatus.SUCCESS); + taskResponse.setExitStatusCode(0); + taskResponse.setProcessId(1); + when(shellCommandExecutor.run(anyString())).thenReturn(taskResponse); + + dataxTask.handle(taskCallBack); + Assertions.assertEquals(0, dataxTask.getExitStatusCode()); + + File jsonFile = new File("/tmp/execution/app-id_job.json"); + InputStream json = Files.newInputStream(jsonFile.toPath()); + String resultStr = FileUtils.readFile2Str(json); + Assertions.assertEquals(resultStr, getJsonString()); + boolean delete = jsonFile.delete(); + Assertions.assertTrue(delete); + + File shellCommandFile = SystemUtils.IS_OS_WINDOWS ? + new File("/tmp/execution/app-id_node.bat") : + new File("/tmp/execution/app-id_node.sh"); + InputStream shellCommandInputStream = Files.newInputStream(shellCommandFile.toPath()); + String shellCommandStr = FileUtils.readFile2Str(shellCommandInputStream); + Assertions.assertEquals(shellCommandStr, "python2.7 ${DATAX_HOME}/bin/datax.py --jvm=\"-Xms1G -Xmx1G\" " + + "-p \"-DDT=DT -DDS=DS\" /tmp/execution/app-id_job.json"); + delete = shellCommandFile.delete(); + Assertions.assertTrue(delete); + } + + @Test + public void testHandleInterruptedException() throws Exception { + String parameters = JSONUtils.toJsonString(createDataxParameters()); + TaskExecutionContext taskExecutionContext = buildTestTaskExecutionContext(); + taskExecutionContext.setPrepareParamsMap(null); + taskExecutionContext.setTaskParams(parameters); + DataxTask dataxTask = new DataxTask(taskExecutionContext); + dataxTask.init(); + + ShellCommandExecutor shellCommandExecutor = mock(ShellCommandExecutor.class); + Field shellCommandExecutorFiled = DataxTask.class.getDeclaredField("shellCommandExecutor"); + shellCommandExecutorFiled.setAccessible(true); + shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor); + + when(shellCommandExecutor.run(anyString())).thenThrow(new InterruptedException("Command execution failed")); + Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack)); + } + + @Test + public void testHandleIOException() throws Exception { + String parameters = JSONUtils.toJsonString(createDataxParameters()); + TaskExecutionContext taskExecutionContext = buildTestTaskExecutionContext(); + taskExecutionContext.setPrepareParamsMap(null); + taskExecutionContext.setTaskParams(parameters); + DataxTask dataxTask = new DataxTask(taskExecutionContext); + dataxTask.init(); + + ShellCommandExecutor shellCommandExecutor = mock(ShellCommandExecutor.class); + Field shellCommandExecutorFiled = DataxTask.class.getDeclaredField("shellCommandExecutor"); + shellCommandExecutorFiled.setAccessible(true); + shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor); + + when(shellCommandExecutor.run(anyString())).thenThrow(new IOException("Command execution failed")); + Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack)); + } + + @Test + public void testTryExecuteSqlResolveColumnNames() throws Exception { + BaseConnectionParam baseConnectionParam = mock(BaseConnectionParam.class); + try ( + MockedStatic mockedStaticDataSourceClientProvider = + mockStatic(DataSourceClientProvider.class)) { + DataSourceClientProvider clientProvider = mock(DataSourceClientProvider.class); + when(DataSourceClientProvider.getInstance()).thenReturn(clientProvider); + mockedStaticDataSourceClientProvider.when(DataSourceClientProvider::getInstance).thenReturn(clientProvider); + + Connection connection = mock(Connection.class); + when(clientProvider.getConnection(Mockito.any(), Mockito.any())).thenReturn(connection); + + PreparedStatement stmt = mock(PreparedStatement.class); + when(connection.prepareStatement(anyString())).thenReturn(stmt); + + ResultSetMetaData md = mock(ResultSetMetaData.class); + when(md.getColumnCount()).thenReturn(1); + when(md.getColumnName(eq(1))).thenReturn("something"); + + ResultSet resultSet = mock(ResultSet.class); + when(resultSet.getMetaData()).thenReturn(md); + when(stmt.executeQuery()).thenReturn(resultSet); + + String[] rows = this.dataxTask.tryExecuteSqlResolveColumnNames(DbType.MYSQL,baseConnectionParam, ""); + Assertions.assertEquals(rows.length, 1); + Assertions.assertEquals(rows[0], "something"); + + when(connection.prepareStatement(anyString())).thenThrow(new SQLException("Connection failed")); + String[] nullRows = this.dataxTask.tryExecuteSqlResolveColumnNames(DbType.MYSQL,baseConnectionParam, ""); + Assertions.assertNull(nullRows); + } + } + + @Test + public void testGetPythonCommand() { + Assertions.assertEquals(dataxTask.getPythonCommand(""), "python2.7"); + Assertions.assertEquals(dataxTask.getPythonCommand("/bin/python"), "/bin/python2.7"); + + String pythonCommand = dataxTask.getPythonCommand("/opt/python"); + pythonCommand = pythonCommand.replace(File.separator, "/"); + Assertions.assertEquals(pythonCommand, "/opt/python/bin/python2.7"); + } + + @Test + public void testLoadJvmEnv() { + DataxParameters dataXParameters = createDataxParameters(); + dataXParameters.setXms(3); + dataXParameters.setXmx(4); + Assertions.assertEquals(dataxTask.loadJvmEnv(dataXParameters), " --jvm=\"-Xms3G -Xmx4G\" "); + } + + private DataxParameters createDataxParameters() { + DataxParameters dataxParameters = new DataxParameters(); + dataxParameters.setCustomConfig(1); + dataxParameters.setDsType("mysql"); + dataxParameters.setDataSource(1); + dataxParameters.setJson(getJsonString()); + dataxParameters.setDataTarget(2); + dataxParameters.setSql("SELECT count(*) FROM table"); + dataxParameters.setTargetTable("user.name"); + return dataxParameters; + } + + private Map createPrepareParamsMap() { + Map paramsMap = new HashMap<>(); + Property dtProperty = new Property(); + dtProperty.setProp("DT"); + dtProperty.setDirect(Direct.IN); + dtProperty.setType(DataType.VARCHAR); + dtProperty.setValue("DT"); + Property dsProperty = new Property(); + dsProperty.setProp("DS"); + dsProperty.setDirect(Direct.IN); + dsProperty.setType(DataType.VARCHAR); + dsProperty.setValue("DS"); + paramsMap.put("DT", dtProperty); + paramsMap.put("DS", dsProperty); + return paramsMap; + } + + private TaskExecutionContext buildTestTaskExecutionContext() { + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskAppId("app-id"); + taskExecutionContext.setExecutePath("/tmp/execution"); + return taskExecutionContext; + } + + private String getJsonString() { + return "{\n" + + " \"job\": {\n" + + " \"content\": [\n" + + " {\n" + + " \"reader\": {\n" + + " \"name\": \"stream reader\",\n" + + " \"parameter\": {\n" + + " \"sliceRecordCount\": 10,\n" + + " \"column\": [\n" + + " {\n" + + " \"type\": \"long\",\n" + + " \"value\": \"10\"\n" + + " },\n" + + " {\n" + + " \"type\": \"string\",\n" + + " \"value\": \"Hello DataX\"\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " \"writer\": {\n" + + " \"name\": \"stream writer\",\n" + + " \"parameter\": {\n" + + " \"encoding\": \"UTF-8\",\n" + + " \"print\": true\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"setting\": {\n" + + " \"speed\": {\n" + + " \"channel\": 5\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + } +} \ No newline at end of file