Browse Source

[Bug][worker] Fix sql task query result is empty (#5322) (#5324)

* [Bug][worker] Fix sql task query result is empty (#5322)

* add ut

* remove duplicate

* add ut
pull/3/MERGE
Wenjun 3 years ago committed by GitHub
parent
commit
abdd2337b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 47
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  2. 39
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java
  3. 1
      pom.xml

47
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -304,34 +304,37 @@ public class SqlTask extends AbstractTask {
*/
private String resultProcess(ResultSet resultSet) throws Exception {
ArrayNode resultJSONArray = JSONUtils.createArrayNode();
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
int rowCount = 0;
while (rowCount < LIMIT && resultSet.next()) {
ObjectNode mapOfColValues = JSONUtils.createObjectNode();
for (int i = 1; i <= num; i++) {
mapOfColValues.set(md.getColumnLabel(i), JSONUtils.toJsonNode(resultSet.getObject(i)));
if (resultSet != null) {
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
int rowCount = 0;
while (rowCount < LIMIT && resultSet.next()) {
ObjectNode mapOfColValues = JSONUtils.createObjectNode();
for (int i = 1; i <= num; i++) {
mapOfColValues.set(md.getColumnLabel(i), JSONUtils.toJsonNode(resultSet.getObject(i)));
}
resultJSONArray.add(mapOfColValues);
rowCount++;
}
resultJSONArray.add(mapOfColValues);
rowCount++;
}
String result = JSONUtils.toJsonString(resultJSONArray);
logger.debug("execute sql result : {}", result);
int displayRows = sqlParameters.getDisplayRows() > 0 ? sqlParameters.getDisplayRows() : Constants.DEFAULT_DISPLAY_ROWS;
displayRows = Math.min(displayRows, resultJSONArray.size());
logger.info("display sql result {} rows as follows:", displayRows);
for (int i = 0; i < displayRows; i++) {
String row = JSONUtils.toJsonString(resultJSONArray.get(i));
logger.info("row {} : {}", i + 1, row);
int displayRows = sqlParameters.getDisplayRows() > 0 ? sqlParameters.getDisplayRows() : Constants.DEFAULT_DISPLAY_ROWS;
displayRows = Math.min(displayRows, resultJSONArray.size());
logger.info("display sql result {} rows as follows:", displayRows);
for (int i = 0; i < displayRows; i++) {
String row = JSONUtils.toJsonString(resultJSONArray.get(i));
logger.info("row {} : {}", i + 1, row);
}
}
String result = JSONUtils.toJsonString(resultJSONArray);
if (sqlParameters.getSendEmail() == null || sqlParameters.getSendEmail()) {
sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle() : taskExecutionContext.getTaskName() + " query result sets",
JSONUtils.toJsonString(resultJSONArray));
sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle())
? sqlParameters.getTitle()
: taskExecutionContext.getTaskName() + " query result sets", result);
}
logger.debug("execute sql result : {}", result);
return result;
}

39
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java

@ -20,6 +20,8 @@ package org.apache.dolphinscheduler.server.worker.task.sql;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
@ -29,6 +31,8 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.Date;
import org.junit.Assert;
@ -39,6 +43,7 @@ import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -46,7 +51,8 @@ import org.slf4j.LoggerFactory;
* sql task test
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(value = {SqlTask.class, DriverManager.class, SpringApplicationContext.class, ParameterUtils.class})
@PrepareForTest(value = {SqlTask.class, DriverManager.class, SpringApplicationContext.class,
ParameterUtils.class, AlertSendResponseCommand.class, BaseDataSource.class})
public class SqlTaskTest {
private static final Logger logger = LoggerFactory.getLogger(SqlTaskTest.class);
@ -112,6 +118,35 @@ public class SqlTaskTest {
PowerMockito.when(ParameterUtils.replaceScheduleTime(Mockito.any(), Mockito.any())).thenReturn("insert into tb_1 values('1','2')");
sqlTask.handle();
Assert.assertEquals(Constants.EXIT_CODE_SUCCESS,sqlTask.getExitStatusCode());
Assert.assertEquals(Constants.EXIT_CODE_SUCCESS, sqlTask.getExitStatusCode());
}
@Test
public void testResultProcess() throws Exception {
// test input null and will not throw a exception
AlertSendResponseCommand mockResponseCommand = PowerMockito.mock(AlertSendResponseCommand.class);
PowerMockito.when(mockResponseCommand.getResStatus()).thenReturn(true);
PowerMockito.when(alertClientService.sendAlert(0, "null query result sets", "[]")).thenReturn(mockResponseCommand);
String result = Whitebox.invokeMethod(sqlTask, "resultProcess", null);
Assert.assertNotNull(result);
}
@Test
public void testResultProcess02() throws Exception {
// test input not null
ResultSet resultSet = PowerMockito.mock(ResultSet.class);
ResultSetMetaData mockResultMetaData = PowerMockito.mock(ResultSetMetaData.class);
PowerMockito.when(resultSet.getMetaData()).thenReturn(mockResultMetaData);
PowerMockito.when(mockResultMetaData.getColumnCount()).thenReturn(2);
PowerMockito.when(resultSet.next()).thenReturn(true);
PowerMockito.when(resultSet.getObject(Mockito.anyInt())).thenReturn(1);
PowerMockito.when(mockResultMetaData.getColumnLabel(Mockito.anyInt())).thenReturn("a");
AlertSendResponseCommand mockResponseCommand = PowerMockito.mock(AlertSendResponseCommand.class);
PowerMockito.when(mockResponseCommand.getResStatus()).thenReturn(true);
PowerMockito.when(alertClientService.sendAlert(Mockito.anyInt(), Mockito.anyString(), Mockito.anyString())).thenReturn(mockResponseCommand);
String result = Whitebox.invokeMethod(sqlTask, "resultProcess", resultSet);
Assert.assertNotNull(result);
}
}

1
pom.xml

@ -963,6 +963,7 @@
<include>**/server/worker/task/TaskManagerTest.java</include>
<include>**/server/worker/task/AbstractCommandExecutorTest.java</include>
<include>**/server/worker/task/ShellTaskReturnTest.java</include>
<include>**/server/worker/task/sql/SqlTaskTest.java</include>
<include>**/server/worker/EnvFileTest.java</include>
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include>
<include>**/server/worker/runner/WorkerManagerThreadTest.java</include>

Loading…
Cancel
Save