Browse Source

fix datax NPE issue (#12388) (#12407)

3.1.1-release
Kerwin 2 years ago committed by GitHub
parent
commit
b559d033d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 34
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
  2. 327
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java

34
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java

@ -134,7 +134,7 @@ public class DataxTask extends AbstractTaskExecutor {
logger.info("datax task params {}", taskExecutionContext.getTaskParams());
dataXParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DataxParameters.class);
if (!dataXParameters.checkParameters()) {
if (dataXParameters == null || !dataXParameters.checkParameters()) {
throw new RuntimeException("datax task params is not valid");
}
@ -144,7 +144,7 @@ public class DataxTask extends AbstractTaskExecutor {
/**
* run DataX process
*
* @throws Exception if error throws Exception
* @throws TaskException if error throws Exception
*/
@Override
public void handle() throws TaskException {
@ -152,7 +152,7 @@ public class DataxTask extends AbstractTaskExecutor {
// replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
// run datax procesDataSourceService.s
// run datax processDataSourceService
String jsonFilePath = buildDataxJsonFile(paramsMap);
String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap);
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath);
@ -231,7 +231,6 @@ public class DataxTask extends AbstractTaskExecutor {
* build datax job config
*
* @return collection of datax job config JSONObject
* @throws SQLException if error throws SQLException
*/
private List<ObjectNode> buildDataxJobContentJson() {
@ -391,18 +390,17 @@ public class DataxTask extends AbstractTaskExecutor {
}
// datax python command
StringBuilder sbr = new StringBuilder();
sbr.append(getPythonCommand());
sbr.append(" ");
sbr.append(DATAX_PATH);
sbr.append(" ");
sbr.append(loadJvmEnv(dataXParameters));
sbr.append(addCustomParameters(paramsMap));
sbr.append(" ");
sbr.append(jobConfigFilePath);
String sbr = getPythonCommand() +
" " +
DATAX_PATH +
" " +
loadJvmEnv(dataXParameters) +
addCustomParameters(paramsMap) +
" " +
jobConfigFilePath;
// replace placeholder
String dataxCommand = ParameterUtils.convertParameterPlaceholders(sbr.toString(), ParamUtils.convert(paramsMap));
String dataxCommand = ParameterUtils.convertParameterPlaceholders(sbr, ParamUtils.convert(paramsMap));
logger.debug("raw script : {}", dataxCommand);
@ -422,10 +420,14 @@ public class DataxTask extends AbstractTaskExecutor {
}
private StringBuilder addCustomParameters(Map<String, Property> paramsMap) {
if (paramsMap == null || paramsMap.size() == 0) {
return new StringBuilder();
}
StringBuilder customParameters = new StringBuilder("-p \"");
for (Map.Entry<String, Property> entry : paramsMap.entrySet()) {
customParameters.append(String.format(CUSTOM_PARAM, entry.getKey(), entry.getValue().getValue()));
}
customParameters.replace(4, 5, "");
customParameters.append("\"");
return customParameters;
}
@ -528,12 +530,12 @@ public class DataxTask extends AbstractTaskExecutor {
}
} else {
throw new RuntimeException(
String.format("grammatical analysis sql column [ %s ] failed", item.toString()));
String.format("grammatical analysis sql column [ %s ] failed", item));
}
if (columnName == null) {
throw new RuntimeException(
String.format("grammatical analysis sql column [ %s ] failed", item.toString()));
String.format("grammatical analysis sql column [ %s ] failed", item));
}
columnNames[i] = columnName;

327
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java

@ -0,0 +1,327 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.datax;
import org.apache.commons.lang3.SystemUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskRunStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class DataxTaskTest {
private DataxTask dataxTask;
private final TaskCallBack taskCallBack = (taskInstanceId, appIds) -> {};
@BeforeEach
public void before() throws Exception {
TaskExecutionContext taskExecutionContext = mock(TaskExecutionContext.class);
ResourceParametersHelper resourceParametersHelper = new ResourceParametersHelper();
String parameters = JSONUtils.toJsonString(createDataxParameters());
when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
taskExecutionContext.setResourceParametersHelper(resourceParametersHelper);
this.dataxTask = new DataxTask(taskExecutionContext);
this.dataxTask.init();
}
@Test
public void testHandleNullParamsMap() throws Exception {
String parameters = JSONUtils.toJsonString(createDataxParameters());
TaskExecutionContext taskExecutionContext = buildTestTaskExecutionContext();
taskExecutionContext.setPrepareParamsMap(null);
taskExecutionContext.setTaskParams(parameters);
DataxTask dataxTask = new DataxTask(taskExecutionContext);
dataxTask.init();
ShellCommandExecutor shellCommandExecutor = mock(ShellCommandExecutor.class);
Field shellCommandExecutorFiled = DataxTask.class.getDeclaredField("shellCommandExecutor");
shellCommandExecutorFiled.setAccessible(true);
shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
TaskResponse taskResponse = new TaskResponse();
taskResponse.setStatus(TaskRunStatus.SUCCESS);
taskResponse.setExitStatusCode(0);
taskResponse.setProcessId(1);
when(shellCommandExecutor.run(anyString())).thenReturn(taskResponse);
dataxTask.handle(taskCallBack);
Assertions.assertEquals(0, dataxTask.getExitStatusCode());
File jsonFile = new File("/tmp/execution/app-id_job.json");
InputStream json = Files.newInputStream(jsonFile.toPath());
String resultStr = FileUtils.readFile2Str(json);
Assertions.assertEquals(resultStr, getJsonString());
boolean delete = jsonFile.delete();
Assertions.assertTrue(delete);
File shellCommandFile = SystemUtils.IS_OS_WINDOWS ?
new File("/tmp/execution/app-id_node.bat") :
new File("/tmp/execution/app-id_node.sh");
InputStream shellCommandInputStream = Files.newInputStream(shellCommandFile.toPath());
String shellCommandStr = FileUtils.readFile2Str(shellCommandInputStream);
Assertions.assertEquals(shellCommandStr, "python2.7 ${DATAX_HOME}/bin/datax.py --jvm=\"-Xms1G -Xmx1G\" " +
" /tmp/execution/app-id_job.json");
delete = shellCommandFile.delete();
Assertions.assertTrue(delete);
}
@Test
public void testHandleParamsMap() throws Exception {
String parameters = JSONUtils.toJsonString(createDataxParameters());
TaskExecutionContext taskExecutionContext = buildTestTaskExecutionContext();
taskExecutionContext.setPrepareParamsMap(createPrepareParamsMap());
taskExecutionContext.setTaskParams(parameters);
DataxTask dataxTask = new DataxTask(taskExecutionContext);
dataxTask.init();
ShellCommandExecutor shellCommandExecutor = mock(ShellCommandExecutor.class);
Field shellCommandExecutorFiled = DataxTask.class.getDeclaredField("shellCommandExecutor");
shellCommandExecutorFiled.setAccessible(true);
shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
TaskResponse taskResponse = new TaskResponse();
taskResponse.setStatus(TaskRunStatus.SUCCESS);
taskResponse.setExitStatusCode(0);
taskResponse.setProcessId(1);
when(shellCommandExecutor.run(anyString())).thenReturn(taskResponse);
dataxTask.handle(taskCallBack);
Assertions.assertEquals(0, dataxTask.getExitStatusCode());
File jsonFile = new File("/tmp/execution/app-id_job.json");
InputStream json = Files.newInputStream(jsonFile.toPath());
String resultStr = FileUtils.readFile2Str(json);
Assertions.assertEquals(resultStr, getJsonString());
boolean delete = jsonFile.delete();
Assertions.assertTrue(delete);
File shellCommandFile = SystemUtils.IS_OS_WINDOWS ?
new File("/tmp/execution/app-id_node.bat") :
new File("/tmp/execution/app-id_node.sh");
InputStream shellCommandInputStream = Files.newInputStream(shellCommandFile.toPath());
String shellCommandStr = FileUtils.readFile2Str(shellCommandInputStream);
Assertions.assertEquals(shellCommandStr, "python2.7 ${DATAX_HOME}/bin/datax.py --jvm=\"-Xms1G -Xmx1G\" " +
"-p \"-DDT=DT -DDS=DS\" /tmp/execution/app-id_job.json");
delete = shellCommandFile.delete();
Assertions.assertTrue(delete);
}
@Test
public void testHandleInterruptedException() throws Exception {
String parameters = JSONUtils.toJsonString(createDataxParameters());
TaskExecutionContext taskExecutionContext = buildTestTaskExecutionContext();
taskExecutionContext.setPrepareParamsMap(null);
taskExecutionContext.setTaskParams(parameters);
DataxTask dataxTask = new DataxTask(taskExecutionContext);
dataxTask.init();
ShellCommandExecutor shellCommandExecutor = mock(ShellCommandExecutor.class);
Field shellCommandExecutorFiled = DataxTask.class.getDeclaredField("shellCommandExecutor");
shellCommandExecutorFiled.setAccessible(true);
shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
when(shellCommandExecutor.run(anyString())).thenThrow(new InterruptedException("Command execution failed"));
Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack));
}
@Test
public void testHandleIOException() throws Exception {
String parameters = JSONUtils.toJsonString(createDataxParameters());
TaskExecutionContext taskExecutionContext = buildTestTaskExecutionContext();
taskExecutionContext.setPrepareParamsMap(null);
taskExecutionContext.setTaskParams(parameters);
DataxTask dataxTask = new DataxTask(taskExecutionContext);
dataxTask.init();
ShellCommandExecutor shellCommandExecutor = mock(ShellCommandExecutor.class);
Field shellCommandExecutorFiled = DataxTask.class.getDeclaredField("shellCommandExecutor");
shellCommandExecutorFiled.setAccessible(true);
shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
when(shellCommandExecutor.run(anyString())).thenThrow(new IOException("Command execution failed"));
Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack));
}
@Test
public void testTryExecuteSqlResolveColumnNames() throws Exception {
BaseConnectionParam baseConnectionParam = mock(BaseConnectionParam.class);
try (
MockedStatic<DataSourceClientProvider> mockedStaticDataSourceClientProvider =
mockStatic(DataSourceClientProvider.class)) {
DataSourceClientProvider clientProvider = mock(DataSourceClientProvider.class);
when(DataSourceClientProvider.getInstance()).thenReturn(clientProvider);
mockedStaticDataSourceClientProvider.when(DataSourceClientProvider::getInstance).thenReturn(clientProvider);
Connection connection = mock(Connection.class);
when(clientProvider.getConnection(Mockito.any(), Mockito.any())).thenReturn(connection);
PreparedStatement stmt = mock(PreparedStatement.class);
when(connection.prepareStatement(anyString())).thenReturn(stmt);
ResultSetMetaData md = mock(ResultSetMetaData.class);
when(md.getColumnCount()).thenReturn(1);
when(md.getColumnName(eq(1))).thenReturn("something");
ResultSet resultSet = mock(ResultSet.class);
when(resultSet.getMetaData()).thenReturn(md);
when(stmt.executeQuery()).thenReturn(resultSet);
String[] rows = this.dataxTask.tryExecuteSqlResolveColumnNames(DbType.MYSQL,baseConnectionParam, "");
Assertions.assertEquals(rows.length, 1);
Assertions.assertEquals(rows[0], "something");
when(connection.prepareStatement(anyString())).thenThrow(new SQLException("Connection failed"));
String[] nullRows = this.dataxTask.tryExecuteSqlResolveColumnNames(DbType.MYSQL,baseConnectionParam, "");
Assertions.assertNull(nullRows);
}
}
@Test
public void testGetPythonCommand() {
Assertions.assertEquals(dataxTask.getPythonCommand(""), "python2.7");
Assertions.assertEquals(dataxTask.getPythonCommand("/bin/python"), "/bin/python2.7");
String pythonCommand = dataxTask.getPythonCommand("/opt/python");
pythonCommand = pythonCommand.replace(File.separator, "/");
Assertions.assertEquals(pythonCommand, "/opt/python/bin/python2.7");
}
@Test
public void testLoadJvmEnv() {
DataxParameters dataXParameters = createDataxParameters();
dataXParameters.setXms(3);
dataXParameters.setXmx(4);
Assertions.assertEquals(dataxTask.loadJvmEnv(dataXParameters), " --jvm=\"-Xms3G -Xmx4G\" ");
}
private DataxParameters createDataxParameters() {
DataxParameters dataxParameters = new DataxParameters();
dataxParameters.setCustomConfig(1);
dataxParameters.setDsType("mysql");
dataxParameters.setDataSource(1);
dataxParameters.setJson(getJsonString());
dataxParameters.setDataTarget(2);
dataxParameters.setSql("SELECT count(*) FROM table");
dataxParameters.setTargetTable("user.name");
return dataxParameters;
}
private Map<String, Property> createPrepareParamsMap() {
Map<String, Property> paramsMap = new HashMap<>();
Property dtProperty = new Property();
dtProperty.setProp("DT");
dtProperty.setDirect(Direct.IN);
dtProperty.setType(DataType.VARCHAR);
dtProperty.setValue("DT");
Property dsProperty = new Property();
dsProperty.setProp("DS");
dsProperty.setDirect(Direct.IN);
dsProperty.setType(DataType.VARCHAR);
dsProperty.setValue("DS");
paramsMap.put("DT", dtProperty);
paramsMap.put("DS", dsProperty);
return paramsMap;
}
private TaskExecutionContext buildTestTaskExecutionContext() {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskAppId("app-id");
taskExecutionContext.setExecutePath("/tmp/execution");
return taskExecutionContext;
}
private String getJsonString() {
return "{\n" +
" \"job\": {\n" +
" \"content\": [\n" +
" {\n" +
" \"reader\": {\n" +
" \"name\": \"stream reader\",\n" +
" \"parameter\": {\n" +
" \"sliceRecordCount\": 10,\n" +
" \"column\": [\n" +
" {\n" +
" \"type\": \"long\",\n" +
" \"value\": \"10\"\n" +
" },\n" +
" {\n" +
" \"type\": \"string\",\n" +
" \"value\": \"Hello DataX\"\n" +
" }\n" +
" ]\n" +
" }\n" +
" },\n" +
" \"writer\": {\n" +
" \"name\": \"stream writer\",\n" +
" \"parameter\": {\n" +
" \"encoding\": \"UTF-8\",\n" +
" \"print\": true\n" +
" }\n" +
" }\n" +
" }\n" +
" ],\n" +
" \"setting\": {\n" +
" \"speed\": {\n" +
" \"channel\": 5\n" +
" }\n" +
" }\n" +
" }\n" +
"}";
}
}
Loading…
Cancel
Save