Browse Source

Clear thread name in BaseTaskProcessor (#11422)

3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
47b50067ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 100
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

100
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -17,24 +17,9 @@
package org.apache.dolphinscheduler.server.master.runner.task; package org.apache.dolphinscheduler.server.master.runner.task;
import static org.apache.dolphinscheduler.common.Constants.ADDRESS; import com.zaxxer.hikari.HikariDataSource;
import static org.apache.dolphinscheduler.common.Constants.DATABASE; import lombok.NonNull;
import static org.apache.dolphinscheduler.common.Constants.JDBC_URL; import org.apache.commons.collections.CollectionUtils;
import static org.apache.dolphinscheduler.common.Constants.OTHER;
import static org.apache.dolphinscheduler.common.Constants.PASSWORD;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.USER;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DATA_QUALITY;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_K8S;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_NAME;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TABLE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_CONNECTOR_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_DATASOURCE_ID;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_CONNECTOR_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_DATASOURCE_ID;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -80,8 +65,8 @@ import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger;
import org.apache.commons.collections.CollectionUtils; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -93,12 +78,23 @@ import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.slf4j.Logger; import static org.apache.dolphinscheduler.common.Constants.ADDRESS;
import org.slf4j.LoggerFactory; import static org.apache.dolphinscheduler.common.Constants.DATABASE;
import static org.apache.dolphinscheduler.common.Constants.JDBC_URL;
import com.zaxxer.hikari.HikariDataSource; import static org.apache.dolphinscheduler.common.Constants.OTHER;
import static org.apache.dolphinscheduler.common.Constants.PASSWORD;
import lombok.NonNull; import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.USER;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DATA_QUALITY;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_K8S;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_NAME;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TABLE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_CONNECTOR_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_DATASOURCE_ID;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_CONNECTOR_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_DATASOURCE_ID;
public abstract class BaseTaskProcessor implements ITaskProcessor { public abstract class BaseTaskProcessor implements ITaskProcessor {
@ -185,27 +181,39 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
if (StringUtils.isNotEmpty(threadLoggerInfoName)) { if (StringUtils.isNotEmpty(threadLoggerInfoName)) {
Thread.currentThread().setName(threadLoggerInfoName); Thread.currentThread().setName(threadLoggerInfoName);
} }
switch (taskAction) { boolean result = false;
case STOP: try {
return stop(); switch (taskAction) {
case PAUSE: case STOP:
return pause(); result = stop();
case TIMEOUT: break;
return timeout(); case PAUSE:
case SUBMIT: result = pause();
return submit(); break;
case RUN: case TIMEOUT:
return run(); result = timeout();
case DISPATCH: break;
return dispatch(); case SUBMIT:
case RESUBMIT: result = submit();
return resubmit(); break;
default: case RUN:
logger.error("unknown task action: {}", taskAction); result = run();
break;
case DISPATCH:
result = dispatch();
break;
case RESUBMIT:
result = resubmit();
break;
default:
logger.error("unknown task action: {}", taskAction);
}
return result;
} finally {
// reset thread name
Thread.currentThread().setName(threadName);
} }
// reset thread name
Thread.currentThread().setName(threadName);
return false;
} }
protected boolean resubmit() { protected boolean resubmit() {

Loading…
Cancel
Save