Browse Source

fix AbstractTask's handle method exception (#1490)

* fix AbstractTask's handle method exception

* update ut
pull/2/head
DK.Pino 5 years ago committed by bao liang
parent
commit
15d5d66d05
  1. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
  2. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java
  3. 25
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
  4. 14
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java
  5. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
  6. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
  7. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java

@ -68,6 +68,7 @@ public abstract class AbstractYarnTask extends AbstractTask {
} catch (Exception e) {
logger.error("yarn process failure", e);
exitStatusCode = -1;
throw e;
}
}

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java

@ -99,7 +99,7 @@ public class DependentTask extends AbstractTask {
}
@Override
public void handle(){
public void handle() throws Exception {
// set the name of the current thread
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
@ -135,6 +135,7 @@ public class DependentTask extends AbstractTask {
}catch (Exception e){
logger.error(e.getMessage(),e);
exitStatusCode = -1;
throw e;
}
}

25
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java

@ -113,23 +113,20 @@ public class HttpTask extends AbstractTask {
long startTime = System.currentTimeMillis();
String statusCode = null;
String body = null;
try(CloseableHttpClient client = createHttpClient()) {
try(CloseableHttpResponse response = sendRequest(client)) {
statusCode = String.valueOf(getStatusCode(response));
body = getResponseBody(response);
exitStatusCode = validResponse(body, statusCode);
long costTime = System.currentTimeMillis() - startTime;
logger.info("startTime: {}, httpUrl: {}, httpMethod: {}, costTime : {}Millisecond, statusCode : {}, body : {}, log : {}",
DateUtils.format2Readable(startTime), httpParameters.getUrl(),httpParameters.getHttpMethod(), costTime, statusCode, body, output);
}catch (Exception e) {
appendMessage(e.toString());
exitStatusCode = -1;
logger.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:"+output, e);
}
} catch (Exception e) {
try(CloseableHttpClient client = createHttpClient();
CloseableHttpResponse response = sendRequest(client)) {
statusCode = String.valueOf(getStatusCode(response));
body = getResponseBody(response);
exitStatusCode = validResponse(body, statusCode);
long costTime = System.currentTimeMillis() - startTime;
logger.info("startTime: {}, httpUrl: {}, httpMethod: {}, costTime : {}Millisecond, statusCode : {}, body : {}, log : {}",
DateUtils.format2Readable(startTime), httpParameters.getUrl(),httpParameters.getHttpMethod(), costTime, statusCode, body, output);
}catch (Exception e){
appendMessage(e.toString());
exitStatusCode = -1;
logger.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:"+output, e);
throw e;
}
}

14
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java

@ -97,14 +97,13 @@ public class ProcedureTask extends AbstractTask {
procedureParameters.getMethod(),
procedureParameters.getLocalParams());
// determine whether there is a data source
if (procedureParameters.getDatasource() == 0){
logger.error("datasource id not exists");
DataSource dataSource = processDao.findDataSourceById(procedureParameters.getDatasource());
if (dataSource == null){
logger.error("datasource not exists");
exitStatusCode = -1;
return;
throw new IllegalArgumentException("datasource not found");
}
DataSource dataSource = processDao.findDataSourceById(procedureParameters.getDatasource());
logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}",
dataSource.getName(),
dataSource.getType(),
@ -112,11 +111,6 @@ public class ProcedureTask extends AbstractTask {
dataSource.getUserId(),
dataSource.getConnectionParams());
if (dataSource == null){
logger.error("datasource not exists");
exitStatusCode = -1;
return;
}
Connection connection = null;
CallableStatement stmt = null;
try {

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java

@ -98,6 +98,7 @@ public class PythonTask extends AbstractTask {
} catch (Exception e) {
logger.error("python task failure", e);
exitStatusCode = -1;
throw e;
}
}

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java

@ -106,6 +106,7 @@ public class ShellTask extends AbstractTask {
} catch (Exception e) {
logger.error("shell task failure", e);
exitStatusCode = -1;
throw e;
}
}

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java

@ -29,7 +29,7 @@ public class DependentTaskTest {
@Test
public void testDependInit(){
public void testDependInit() throws Exception{
TaskProps taskProps = new TaskProps();

Loading…
Cancel
Save