Browse Source

[DS-12154][worker] Optimize the log printing of the worker module (#12183)

* [DS-12154][worker] Optimize the log printing of the worker module according to the log specification.
3.1.1-release
sgw 2 years ago committed by zhuangchong
parent
commit
703f9991b4
  1. 5
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
  2. 2
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java
  3. 3
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java
  4. 15
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java
  5. 1
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java

5
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java

@ -95,7 +95,7 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
return; return;
} }
final String workflowMasterAddress = taskDispatchCommand.getMessageSenderAddress(); final String workflowMasterAddress = taskDispatchCommand.getMessageSenderAddress();
logger.info("task execute request message: {}", taskDispatchCommand); logger.info("Receive task dispatch request, command: {}", taskDispatchCommand);
TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext(); TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext();
@ -133,7 +133,8 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
if (!offer) { if (!offer) {
logger.warn("submit task to wait queue error, queue is full, current queue size is {}, will send a task reject message to master", workerManager.getWaitSubmitQueueSize()); logger.warn("submit task to wait queue error, queue is full, current queue size is {}, will send a task reject message to master", workerManager.getWaitSubmitQueueSize());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, workflowMasterAddress, CommandType.TASK_REJECT); workerMessageSender.sendMessageWithRetry(taskExecutionContext, workflowMasterAddress, CommandType.TASK_REJECT);
} } else
logger.info("Submit task to wait queue success, current queue size is {}", workerManager.getWaitSubmitQueueSize());
} finally { } finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
} }

2
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java

@ -54,10 +54,10 @@ public class TaskExecuteResultAckProcessor implements NettyRequestProcessor {
logger.error("task execute response ack command is null"); logger.error("task execute response ack command is null");
return; return;
} }
logger.info("task execute response ack command : {}", taskExecuteAckMessage);
try { try {
LoggerUtils.setTaskInstanceIdMDC(taskExecuteAckMessage.getTaskInstanceId()); LoggerUtils.setTaskInstanceIdMDC(taskExecuteAckMessage.getTaskInstanceId());
logger.info("Receive task execute response ack command : {}", taskExecuteAckMessage);
if (taskExecuteAckMessage.isSuccess()) { if (taskExecuteAckMessage.isSuccess()) {
messageRetryRunner.removeRetryMessage(taskExecuteAckMessage.getTaskInstanceId(), messageRetryRunner.removeRetryMessage(taskExecuteAckMessage.getTaskInstanceId(),
CommandType.TASK_EXECUTE_RESULT); CommandType.TASK_EXECUTE_RESULT);

3
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java

@ -47,10 +47,13 @@ public class TaskRejectAckProcessor implements NettyRequestProcessor {
TaskRejectAckCommand taskRejectAckMessage = JSONUtils.parseObject(command.getBody(), TaskRejectAckCommand taskRejectAckMessage = JSONUtils.parseObject(command.getBody(),
TaskRejectAckCommand.class); TaskRejectAckCommand.class);
if (taskRejectAckMessage == null) { if (taskRejectAckMessage == null) {
logger.error("Receive task reject response, the response message is null");
return; return;
} }
try { try {
LoggerUtils.setTaskInstanceIdMDC(taskRejectAckMessage.getTaskInstanceId()); LoggerUtils.setTaskInstanceIdMDC(taskRejectAckMessage.getTaskInstanceId());
logger.info("Receive task reject response ack command: {}", taskRejectAckMessage);
if (taskRejectAckMessage.isSuccess()) { if (taskRejectAckMessage.isSuccess()) {
messageRetryRunner.removeRetryMessage(taskRejectAckMessage.getTaskInstanceId(), messageRetryRunner.removeRetryMessage(taskRejectAckMessage.getTaskInstanceId(),
CommandType.TASK_REJECT); CommandType.TASK_REJECT);

15
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java

@ -22,6 +22,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
@ -67,7 +68,7 @@ public class TaskSavePointProcessor implements NettyRequestProcessor {
logger.error("task savepoint request command is null"); logger.error("task savepoint request command is null");
return; return;
} }
logger.info("task savepoint command : {}", taskSavePointRequestCommand); logger.info("Receive task savepoint command : {}", taskSavePointRequestCommand);
int taskInstanceId = taskSavePointRequestCommand.getTaskInstanceId(); int taskInstanceId = taskSavePointRequestCommand.getTaskInstanceId();
TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
@ -76,9 +77,14 @@ public class TaskSavePointProcessor implements NettyRequestProcessor {
return; return;
} }
doSavePoint(taskInstanceId); try {
LoggerUtils.setTaskInstanceIdMDC(taskInstanceId);
doSavePoint(taskInstanceId);
sendTaskSavePointResponseCommand(channel, taskExecutionContext); sendTaskSavePointResponseCommand(channel, taskExecutionContext);
} finally {
LoggerUtils.removeTaskInstanceIdMDC();
}
} }
private void sendTaskSavePointResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) { private void sendTaskSavePointResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) {
@ -89,7 +95,8 @@ public class TaskSavePointProcessor implements NettyRequestProcessor {
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
logger.error("Submit kill response to master error, kill command: {}", taskSavePointResponseCommand); logger.error("Submit kill response to master error, kill command: {}", taskSavePointResponseCommand);
} } else
logger.info("Submit kill response to master success, kill command: {}", taskSavePointResponseCommand);
} }
}); });
} }

1
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java

@ -93,6 +93,7 @@ public class WorkerManagerThread implements Runnable {
public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) { public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) {
if (waitSubmitQueue.size() > workerExecThreads) { if (waitSubmitQueue.size() > workerExecThreads) {
logger.warn("Wait submit queue is full, will retry submit task later");
WorkerServerMetrics.incWorkerSubmitQueueIsFullCount(); WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
// if waitSubmitQueue is full, it will wait 1s, then try add // if waitSubmitQueue is full, it will wait 1s, then try add
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);

Loading…
Cancel
Save