From 703f9991b42ef99ee9d153e527f13eea3f414c79 Mon Sep 17 00:00:00 2001 From: sgw <56352018+guowei-su@users.noreply.github.com> Date: Thu, 29 Sep 2022 16:01:29 +0800 Subject: [PATCH] [DS-12154][worker] Optimize the log printing of the worker module (#12183) * [DS-12154][worker] Optimize the log printing of the worker module according to the log specification. --- .../worker/processor/TaskDispatchProcessor.java | 5 +++-- .../processor/TaskExecuteResultAckProcessor.java | 2 +- .../worker/processor/TaskRejectAckProcessor.java | 3 +++ .../worker/processor/TaskSavePointProcessor.java | 15 +++++++++++---- .../server/worker/runner/WorkerManagerThread.java | 1 + 5 files changed, 19 insertions(+), 7 deletions(-) diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java index 4bb0c92124..288a5d9b33 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java @@ -95,7 +95,7 @@ public class TaskDispatchProcessor implements NettyRequestProcessor { return; } final String workflowMasterAddress = taskDispatchCommand.getMessageSenderAddress(); - logger.info("task execute request message: {}", taskDispatchCommand); + logger.info("Receive task dispatch request, command: {}", taskDispatchCommand); TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext(); @@ -133,7 +133,8 @@ public class TaskDispatchProcessor implements NettyRequestProcessor { if (!offer) { logger.warn("submit task to wait queue error, queue is full, current queue size is {}, will send a task reject message to master", workerManager.getWaitSubmitQueueSize()); workerMessageSender.sendMessageWithRetry(taskExecutionContext, workflowMasterAddress, CommandType.TASK_REJECT); - } + } else + logger.info("Submit task to wait queue success, current queue size is {}", workerManager.getWaitSubmitQueueSize()); } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java index e59902d6fc..bff92e5d11 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java @@ -54,10 +54,10 @@ public class TaskExecuteResultAckProcessor implements NettyRequestProcessor { logger.error("task execute response ack command is null"); return; } - logger.info("task execute response ack command : {}", taskExecuteAckMessage); try { LoggerUtils.setTaskInstanceIdMDC(taskExecuteAckMessage.getTaskInstanceId()); + logger.info("Receive task execute response ack command : {}", taskExecuteAckMessage); if (taskExecuteAckMessage.isSuccess()) { messageRetryRunner.removeRetryMessage(taskExecuteAckMessage.getTaskInstanceId(), CommandType.TASK_EXECUTE_RESULT); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java index a18223b90d..b6eb954b5d 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java @@ -47,10 +47,13 @@ public class TaskRejectAckProcessor implements NettyRequestProcessor { TaskRejectAckCommand taskRejectAckMessage = JSONUtils.parseObject(command.getBody(), TaskRejectAckCommand.class); if (taskRejectAckMessage == null) { + logger.error("Receive task reject response, the response message is null"); return; } + try { LoggerUtils.setTaskInstanceIdMDC(taskRejectAckMessage.getTaskInstanceId()); + logger.info("Receive task reject response ack command: {}", taskRejectAckMessage); if (taskRejectAckMessage.isSuccess()) { messageRetryRunner.removeRetryMessage(taskRejectAckMessage.getTaskInstanceId(), CommandType.TASK_REJECT); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java index 899ac7d9b7..1a621a39e7 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java @@ -22,6 +22,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; @@ -67,7 +68,7 @@ public class TaskSavePointProcessor implements NettyRequestProcessor { logger.error("task savepoint request command is null"); return; } - logger.info("task savepoint command : {}", taskSavePointRequestCommand); + logger.info("Receive task savepoint command : {}", taskSavePointRequestCommand); int taskInstanceId = taskSavePointRequestCommand.getTaskInstanceId(); TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); @@ -76,9 +77,14 @@ public class TaskSavePointProcessor implements NettyRequestProcessor { return; } - doSavePoint(taskInstanceId); + try { + LoggerUtils.setTaskInstanceIdMDC(taskInstanceId); + doSavePoint(taskInstanceId); - sendTaskSavePointResponseCommand(channel, taskExecutionContext); + sendTaskSavePointResponseCommand(channel, taskExecutionContext); + } finally { + LoggerUtils.removeTaskInstanceIdMDC(); + } } private void sendTaskSavePointResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) { @@ -89,7 +95,8 @@ public class TaskSavePointProcessor implements NettyRequestProcessor { public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { logger.error("Submit kill response to master error, kill command: {}", taskSavePointResponseCommand); - } + } else + logger.info("Submit kill response to master success, kill command: {}", taskSavePointResponseCommand); } }); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java index 41824a9a44..1e6a0ba6b7 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java @@ -93,6 +93,7 @@ public class WorkerManagerThread implements Runnable { public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) { if (waitSubmitQueue.size() > workerExecThreads) { + logger.warn("Wait submit queue is full, will retry submit task later"); WorkerServerMetrics.incWorkerSubmitQueueIsFullCount(); // if waitSubmitQueue is full, it will wait 1s, then try add ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);