From da09c00cba4d02164199cfc59dbd78d9817e6e4d Mon Sep 17 00:00:00 2001 From: yimaixinchen Date: Fri, 11 Feb 2022 09:48:04 +0800 Subject: [PATCH] optimize worker taskProcessor logger. (#8242) --- .../server/worker/processor/DBTaskAckProcessor.java | 2 ++ .../server/worker/processor/DBTaskResponseProcessor.java | 3 +++ .../server/worker/processor/HostUpdateProcessor.java | 4 ++++ .../server/worker/processor/TaskExecuteProcessor.java | 3 +-- .../server/worker/processor/TaskKillProcessor.java | 6 +++++- 5 files changed, 15 insertions(+), 3 deletions(-) diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java index 186b99da69..6ac8601484 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java @@ -46,8 +46,10 @@ public class DBTaskAckProcessor implements NettyRequestProcessor { command.getBody(), DBTaskAckCommand.class); if (taskAckCommand == null){ + logger.error("dBTask ACK request command is null"); return; } + logger.info("dBTask ACK request command : {}", taskAckCommand); if (taskAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){ ResponseCache.get().removeAckCache(taskAckCommand.getTaskInstanceId()); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java index 07fbf06413..b70f8567fb 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java @@ -50,8 +50,11 @@ public class DBTaskResponseProcessor implements NettyRequestProcessor { command.getBody(), DBTaskResponseCommand.class); if (taskResponseCommand == null) { + logger.error("dBTask Response command is null"); return; } + logger.info("dBTask Response command : {}", taskResponseCommand); + if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) { ResponseCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId()); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java index 37eb07bd7d..09e786be63 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java @@ -49,6 +49,10 @@ public class HostUpdateProcessor implements NettyRequestProcessor { public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); HostUpdateCommand updateCommand = JSONUtils.parseObject(command.getBody(), HostUpdateCommand.class); + if (updateCommand == null){ + logger.error("host update command is null"); + return; + } logger.info("received host update command : {}", updateCommand); taskCallbackService.changeRemoteChannel(updateCommand.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index bca87c3737..c46e97ecfe 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -110,12 +110,11 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject( command.getBody(), TaskExecuteRequestCommand.class); - logger.info("received command : {}", taskRequestCommand); - if (taskRequestCommand == null) { logger.error("task execute request command is null"); return; } + logger.info("task execute request command : {}", taskRequestCommand); String contextJson = taskRequestCommand.getTaskExecutionContext(); TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index d0361108b0..4668c7e9c8 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -80,7 +80,11 @@ public class TaskKillProcessor implements NettyRequestProcessor { public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class); - logger.info("received kill command : {}", killCommand); + if (killCommand == null){ + logger.error("task kill request command is null"); + return; + } + logger.info("task kill command : {}", killCommand); Pair> result = doKill(killCommand);