Browse Source

fix [BUG] TaskExecutionContextCacheManagerImpl Do not execute removeByTaskInstanceId #2745 (#2754)

pull/3/MERGE
yaoyao 4 years ago committed by GitHub
parent
commit
009fd01047
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  2. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -93,6 +93,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand,result); TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand,result);
taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command()); taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
taskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId());
} }
/** /**

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -27,9 +27,12 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -63,6 +66,11 @@ public class TaskExecuteThread implements Runnable {
*/ */
private TaskCallbackService taskCallbackService; private TaskCallbackService taskCallbackService;
/**
* taskExecutionContextCacheManager
*/
private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
/** /**
* constructor * constructor
* @param taskExecutionContext taskExecutionContext * @param taskExecutionContext taskExecutionContext
@ -71,6 +79,7 @@ public class TaskExecuteThread implements Runnable {
public TaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService){ public TaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService){
this.taskExecutionContext = taskExecutionContext; this.taskExecutionContext = taskExecutionContext;
this.taskCallbackService = taskCallbackService; this.taskCallbackService = taskCallbackService;
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
} }
@Override @Override
@ -134,6 +143,7 @@ public class TaskExecuteThread implements Runnable {
responseCommand.setAppIds(task.getAppIds()); responseCommand.setAppIds(task.getAppIds());
} finally { } finally {
try { try {
taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
}catch (Exception e){ }catch (Exception e){
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);

Loading…
Cancel
Save