diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java similarity index 92% rename from dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java rename to dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java index 3639b8eba3..ea88db9d1d 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java @@ -24,15 +24,15 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** - * Responce Cache : cache worker send master result + * Response Cache : cache worker send master result */ -public class ResponceCache { +public class ResponseCache { - private static final ResponceCache instance = new ResponceCache(); + private static final ResponseCache instance = new ResponseCache(); - private ResponceCache(){} + private ResponseCache(){} - public static ResponceCache get(){ + public static ResponseCache get(){ return instance; } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java index a340ad704e..2c9bfe745c 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java @@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.*; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; -import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.apache.dolphinscheduler.server.worker.cache.ResponseCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +49,7 @@ public class DBTaskAckProcessor implements NettyRequestProcessor { } if (taskAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){ - ResponceCache.get().removeAckCache(taskAckCommand.getTaskInstanceId()); + ResponseCache.get().removeAckCache(taskAckCommand.getTaskInstanceId()); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java index 9c0815ff2f..fba6729c48 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java @@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; -import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.apache.dolphinscheduler.server.worker.cache.ResponseCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +52,7 @@ public class DBTaskResponseProcessor implements NettyRequestProcessor { } if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) { - ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId()); + ResponseCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId()); TaskCallbackService.remove(taskResponseCommand.getTaskInstanceId()); logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskResponseCommand.getTaskInstanceId()); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 8dfaf34fbc..1f9770a957 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -34,7 +34,7 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.utils.LogUtils; -import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.apache.dolphinscheduler.server.worker.cache.ResponseCache; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; @@ -178,7 +178,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { private void doAck(TaskExecutionContext taskExecutionContext) { // tell master that task is in executing TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext); - ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command(), Event.ACK); + ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command(), Event.ACK); taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command()); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java index 394eb79cde..d9beaaeb0f 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java @@ -21,9 +21,8 @@ import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.apache.dolphinscheduler.server.worker.cache.ResponseCache; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -58,7 +57,7 @@ public class RetryReportTaskStatusThread implements Runnable { */ @Override public void run() { - ResponceCache responceCache = ResponceCache.get(); + ResponseCache instance = ResponseCache.get(); while (Stopper.isRunning()){ @@ -66,8 +65,8 @@ public class RetryReportTaskStatusThread implements Runnable { ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL); try { - if (!responceCache.getAckCache().isEmpty()){ - Map ackCache = responceCache.getAckCache(); + if (!instance.getAckCache().isEmpty()){ + Map ackCache = instance.getAckCache(); for (Map.Entry entry : ackCache.entrySet()){ Integer taskInstanceId = entry.getKey(); Command ackCommand = entry.getValue(); @@ -75,8 +74,8 @@ public class RetryReportTaskStatusThread implements Runnable { } } - if (!responceCache.getResponseCache().isEmpty()){ - Map responseCache = responceCache.getResponseCache(); + if (!instance.getResponseCache().isEmpty()){ + Map responseCache = instance.getResponseCache(); for (Map.Entry entry : responseCache.entrySet()){ Integer taskInstanceId = entry.getKey(); Command responseCommand = entry.getValue(); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index c02c04ccfb..4e03ba812c 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -17,10 +17,7 @@ package org.apache.dolphinscheduler.server.worker.runner; -import static java.util.Calendar.DAY_OF_MONTH; - import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; @@ -36,7 +33,7 @@ import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.server.utils.ProcessUtils; -import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.apache.dolphinscheduler.server.worker.cache.ResponseCache; import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.service.alert.AlertClientService; @@ -44,7 +41,6 @@ import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; import org.apache.dolphinscheduler.spi.task.AbstractTask; import org.apache.dolphinscheduler.spi.task.TaskAlertInfo; import org.apache.dolphinscheduler.spi.task.TaskChannel; -import org.apache.dolphinscheduler.spi.task.TaskConstants; import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; @@ -131,7 +127,7 @@ public class TaskExecuteThread implements Runnable, Delayed { responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode()); responseCommand.setEndTime(new Date()); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT); + ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT); taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); return; } @@ -213,7 +209,7 @@ public class TaskExecuteThread implements Runnable, Delayed { responseCommand.setAppIds(task.getAppIds()); } finally { TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT); + ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT); taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); clearTaskExecPath(); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java index 930c435ca4..3484adbc5d 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java @@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; -import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.apache.dolphinscheduler.server.worker.cache.ResponseCache; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; @@ -107,7 +107,7 @@ public class WorkerManagerThread implements Runnable { TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(JSONUtils.toJsonString(taskRequest), TaskExecutionContext.class); TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId()); responseCommand.setStatus(ExecutionStatus.KILL.getCode()); - ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT); + ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT); taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); }