Browse Source

[Plugin][Task]Fix Yarn Task Kill Not valid (#6293)

2.0.7-release
Kirs 3 years ago committed by GitHub
parent
commit
552fd17878
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  2. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -31,14 +31,18 @@ import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.RetryerUtils; import org.apache.dolphinscheduler.common.utils.RetryerUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager; import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException; import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException;
import org.apache.dolphinscheduler.spi.task.AbstractTask; import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskAlertInfo; import org.apache.dolphinscheduler.spi.task.TaskAlertInfo;
@ -108,6 +112,11 @@ public class TaskExecuteThread implements Runnable, Delayed {
private TaskPluginManager taskPluginManager; private TaskPluginManager taskPluginManager;
/**
* process database access
*/
protected ProcessService processService;
/** /**
* constructor * constructor
* *
@ -120,6 +129,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
this.taskExecutionContext = taskExecutionContext; this.taskExecutionContext = taskExecutionContext;
this.taskCallbackService = taskCallbackService; this.taskCallbackService = taskCallbackService;
this.alertClientService = alertClientService; this.alertClientService = alertClientService;
this.processService = SpringApplicationContext.getBean(ProcessService.class);
} }
public TaskExecuteThread(TaskExecutionContext taskExecutionContext, public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
@ -130,6 +140,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
this.taskCallbackService = taskCallbackService; this.taskCallbackService = taskCallbackService;
this.alertClientService = alertClientService; this.alertClientService = alertClientService;
this.taskPluginManager = taskPluginManager; this.taskPluginManager = taskPluginManager;
this.processService = SpringApplicationContext.getBean(ProcessService.class);
} }
@Override @Override
@ -266,6 +277,10 @@ public class TaskExecuteThread implements Runnable, Delayed {
if (task != null) { if (task != null) {
try { try {
task.cancelApplication(true); task.cancelApplication(true);
TaskInstance taskInstance = processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());
if (taskInstance != null) {
ProcessUtils.killYarnJob(taskExecutionContext);
}
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} }

4
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java

@ -66,10 +66,6 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor {
cancel = true; cancel = true;
// cancel process // cancel process
shellCommandExecutor.cancelApplication(); shellCommandExecutor.cancelApplication();
// TaskInstance taskInstance = processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());
// if (status && taskInstance != null){
// ProcessUtils.killYarnJob(taskExecutionContext);
// }
} }
/** /**

Loading…
Cancel
Save