Browse Source

Fix ResponseCache Class Name ERROR (#8015)

* fix ResponseCache name error
3.0.0/version-upgrade
yimaixinchen 2 years ago committed by GitHub
parent
commit
08e303f867
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
  2. 4
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
  3. 4
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
  4. 4
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  5. 13
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
  6. 10
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  7. 4
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java

10
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java → dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java vendored

@ -24,15 +24,15 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Responce Cache : cache worker send master result
* Response Cache : cache worker send master result
*/
public class ResponceCache {
public class ResponseCache {
private static final ResponceCache instance = new ResponceCache();
private static final ResponseCache instance = new ResponseCache();
private ResponceCache(){}
private ResponseCache(){}
public static ResponceCache get(){
public static ResponseCache get(){
return instance;
}

4
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java

@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.*;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,7 +49,7 @@ public class DBTaskAckProcessor implements NettyRequestProcessor {
}
if (taskAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){
ResponceCache.get().removeAckCache(taskAckCommand.getTaskInstanceId());
ResponseCache.get().removeAckCache(taskAckCommand.getTaskInstanceId());
}
}

4
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java

@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -52,7 +52,7 @@ public class DBTaskResponseProcessor implements NettyRequestProcessor {
}
if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId());
ResponseCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId());
TaskCallbackService.remove(taskResponseCommand.getTaskInstanceId());
logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskResponseCommand.getTaskInstanceId());
}

4
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -34,7 +34,7 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
@ -178,7 +178,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private void doAck(TaskExecutionContext taskExecutionContext) {
// tell master that task is in executing
TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext);
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command(), Event.ACK);
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command(), Event.ACK);
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
}

13
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java

@ -21,9 +21,8 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -58,7 +57,7 @@ public class RetryReportTaskStatusThread implements Runnable {
*/
@Override
public void run() {
ResponceCache responceCache = ResponceCache.get();
ResponseCache instance = ResponseCache.get();
while (Stopper.isRunning()){
@ -66,8 +65,8 @@ public class RetryReportTaskStatusThread implements Runnable {
ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL);
try {
if (!responceCache.getAckCache().isEmpty()){
Map<Integer,Command> ackCache = responceCache.getAckCache();
if (!instance.getAckCache().isEmpty()){
Map<Integer,Command> ackCache = instance.getAckCache();
for (Map.Entry<Integer, Command> entry : ackCache.entrySet()){
Integer taskInstanceId = entry.getKey();
Command ackCommand = entry.getValue();
@ -75,8 +74,8 @@ public class RetryReportTaskStatusThread implements Runnable {
}
}
if (!responceCache.getResponseCache().isEmpty()){
Map<Integer,Command> responseCache = responceCache.getResponseCache();
if (!instance.getResponseCache().isEmpty()){
Map<Integer,Command> responseCache = instance.getResponseCache();
for (Map.Entry<Integer, Command> entry : responseCache.entrySet()){
Integer taskInstanceId = entry.getKey();
Command responseCommand = entry.getValue();

10
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -17,10 +17,7 @@
package org.apache.dolphinscheduler.server.worker.runner;
import static java.util.Calendar.DAY_OF_MONTH;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
@ -36,7 +33,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
@ -44,7 +41,6 @@ import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskAlertInfo;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskConstants;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
@ -131,7 +127,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
responseCommand.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
return;
}
@ -213,7 +209,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
responseCommand.setAppIds(task.getAppIds());
} finally {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
clearTaskExecPath();
}

4
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java

@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
@ -107,7 +107,7 @@ public class WorkerManagerThread implements Runnable {
TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(JSONUtils.toJsonString(taskRequest), TaskExecutionContext.class);
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId());
responseCommand.setStatus(ExecutionStatus.KILL.getCode());
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
}

Loading…
Cancel
Save