From c9066e8de93a7dddf55c65507598f1099dfe4980 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Mon, 6 Mar 2023 17:44:23 +0800 Subject: [PATCH] Use MDC to filter task instance log (#13673) * Use MDC to collect task instance log * Use MDCAutoClosableContext to remove the MDC key --- .../builder/TaskExecutionContextBuilder.java | 8 +- .../master/processor/StateEventProcessor.java | 11 +- .../master/processor/TaskEventProcessor.java | 10 +- .../TaskExecuteResponseProcessor.java | 10 +- .../master/processor/TaskRecallProcessor.java | 10 +- .../queue/StateEventResponseService.java | 28 ++-- .../processor/queue/TaskExecuteRunnable.java | 7 +- .../master/runner/EventExecuteService.java | 10 +- .../runner/MasterSchedulerBootstrap.java | 6 +- .../runner/StateWheelExecuteThread.java | 18 +-- .../runner/StreamTaskExecuteRunnable.java | 6 +- .../runner/StreamTaskExecuteThreadPool.java | 10 +- .../master/runner/WorkflowEventLooper.java | 6 +- .../runner/WorkflowExecuteRunnable.java | 38 ++--- .../runner/WorkflowExecuteThreadPool.java | 17 +- .../master/runner/task/BaseTaskProcessor.java | 22 +-- .../runner/task/BlockingTaskProcessor.java | 6 +- .../runner/task/ConditionTaskProcessor.java | 8 +- .../runner/task/DependentTaskProcessor.java | 7 +- .../master/runner/task/SubTaskProcessor.java | 4 +- .../runner/task/SwitchTaskProcessor.java | 8 +- .../master/service/MasterFailoverService.java | 10 +- .../master/service/WorkerFailoverService.java | 10 +- .../src/main/resources/logback-spring.xml | 6 +- .../src/test/resources/logback.xml | 77 +++++++++ .../processor/LoggerRequestProcessor.java | 5 +- .../service/log/LogClient.java | 3 +- .../service/utils/LogUtils.java | 85 ---------- .../service/utils/LoggerUtils.java | 119 -------------- .../service/log/LogClientTest.java | 151 ------------------ .../log/LoggerRequestProcessorTest.java | 14 +- .../service/log/TaskLogDiscriminatorTest.java | 108 ------------- .../service/log/TaskLogFilterTest.java | 71 -------- .../service/utils/LogUtilsTest.java | 73 --------- .../src/main/resources/logback-spring.xml | 6 +- .../task/api/AbstractCommandExecutor.java | 96 +++-------- .../plugin/task/api/AbstractTask.java | 21 +-- .../plugin/task/api/TaskConstants.java | 25 --- .../plugin/task/api/TaskExecutionContext.java | 6 - .../api/am/KubernetesApplicationManager.java | 34 ++-- .../task/api/log/TaskLogDiscriminator.java | 35 +--- .../plugin/task/api/log/TaskLogFilter.java | 33 +--- .../plugin/task/api/utils/LogUtils.java | 133 ++++++++++----- .../task/datafactory/DatafactoryHook.java | 2 +- .../plugin/task/datasync/DatasyncHook.java | 2 +- .../plugin/task/dms/DmsHook.java | 2 +- .../plugin/kubeflow/KubeflowHelper.java | 3 +- .../plugin/task/pigeon/PigeonTaskTest.java | 1 - .../task/pytorch/GitProjectManager.java | 8 +- .../plugin/task/sagemaker/PipelineUtils.java | 7 +- .../server/worker/WorkerServer.java | 9 +- .../processor/TaskDispatchProcessor.java | 10 +- ...DefaultWorkerDelayTaskExecuteRunnable.java | 2 +- .../worker/runner/TaskCallbackImpl.java | 9 +- .../runner/WorkerTaskExecuteRunnable.java | 69 ++++---- .../worker/utils/TaskFilesTransferUtils.java | 9 +- .../src/main/resources/logback-spring.xml | 6 +- ...ultWorkerDelayTaskExecuteRunnableTest.java | 2 - .../src/test/resources/logback.xml | 78 +++++++++ 59 files changed, 506 insertions(+), 1084 deletions(-) create mode 100644 dolphinscheduler-master/src/test/resources/logback.xml delete mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LogUtils.java delete mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LoggerUtils.java delete mode 100644 dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java delete mode 100644 dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogDiscriminatorTest.java delete mode 100644 dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogFilterTest.java delete mode 100644 dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/LogUtilsTest.java create mode 100644 dolphinscheduler-worker/src/test/resources/logback.xml diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java index ca89ca2337..8d8fd77f6a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java @@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; @@ -36,18 +35,15 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceP import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; /** * TaskExecutionContext builder */ +@Slf4j public class TaskExecutionContextBuilder { - protected final Logger log = - LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass())); - public static TaskExecutionContextBuilder get() { return new TaskExecutionContextBuilder(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java index de856c393a..167aa156b7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand; @@ -28,7 +29,6 @@ import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.server.master.event.TaskStateEvent; import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent; import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; import lombok.extern.slf4j.Slf4j; @@ -62,14 +62,11 @@ public class StateEventProcessor implements NettyRequestProcessor { stateEvent = createTaskStateEvent(workflowStateEventChangeCommand); } - try { - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), - stateEvent.getTaskInstanceId()); - + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC( + stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId())) { log.info("Received state change command, event: {}", stateEvent); stateEventResponseService.addStateChangeEvent(stateEvent); - } finally { - LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java index a1ea9822d2..d4fa52445e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java @@ -19,13 +19,13 @@ package org.apache.dolphinscheduler.server.master.processor; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.master.event.TaskStateEvent; import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; import lombok.extern.slf4j.Slf4j; @@ -59,13 +59,11 @@ public class TaskEventProcessor implements NettyRequestProcessor { .key(taskEventChangeCommand.getKey()) .type(StateEventType.WAKE_UP_TASK_GROUP) .build(); - try { - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), - stateEvent.getTaskInstanceId()); + try ( + LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC( + stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId())) { log.info("Received task event change command, event: {}", stateEvent); stateEventResponseService.addEvent2WorkflowExecute(stateEvent); - } finally { - LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java index 4bc43184f9..f40473e0f9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java @@ -18,13 +18,13 @@ package org.apache.dolphinscheduler.server.master.processor; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; import lombok.extern.slf4j.Slf4j; @@ -61,14 +61,12 @@ public class TaskExecuteResponseProcessor implements NettyRequestProcessor { TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel, taskExecuteResultMessage.getMessageSenderAddress()); - try { - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(), - taskResultEvent.getTaskInstanceId()); + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC( + taskResultEvent.getProcessInstanceId(), taskResultEvent.getTaskInstanceId())) { log.info("Received task execute result, event: {}", taskResultEvent); taskEventService.addEvent(taskResultEvent); - } finally { - LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java index 54ee517ae5..a903131c9f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java @@ -18,13 +18,13 @@ package org.apache.dolphinscheduler.server.master.processor; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskRejectCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; import lombok.extern.slf4j.Slf4j; @@ -56,13 +56,11 @@ public class TaskRecallProcessor implements NettyRequestProcessor { String.format("invalid command type : %s", command.getType())); TaskRejectCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRejectCommand.class); TaskEvent taskEvent = TaskEvent.newRecallEvent(recallCommand, channel); - try { - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(recallCommand.getProcessInstanceId(), - recallCommand.getTaskInstanceId()); + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC( + recallCommand.getProcessInstanceId(), recallCommand.getTaskInstanceId())) { log.info("Receive task recall command: {}", recallCommand); taskEventService.addEvent(taskEvent); - } finally { - LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java index 6633f11c7d..41dc8948f2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java @@ -19,12 +19,12 @@ package org.apache.dolphinscheduler.server.master.processor.queue; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; import java.util.ArrayList; import java.util.List; @@ -74,13 +74,11 @@ public class StateEventResponseService { List remainEvents = new ArrayList<>(eventQueue.size()); eventQueue.drainTo(remainEvents); for (StateEvent event : remainEvents) { - try { - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), - event.getTaskInstanceId()); + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = + LogUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), + event.getTaskInstanceId())) { this.persist(event); - - } finally { - LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } } @@ -112,18 +110,20 @@ public class StateEventResponseService { public void run() { log.info("State event loop service started"); while (!ServerLifeCycleManager.isStopped()) { + StateEvent stateEvent; try { - // if not task , blocking here - StateEvent stateEvent = eventQueue.take(); - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), - stateEvent.getTaskInstanceId()); - persist(stateEvent); + stateEvent = eventQueue.take(); } catch (InterruptedException e) { log.warn("State event loop service interrupted, will stop this loop", e); Thread.currentThread().interrupt(); break; - } finally { - LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); + } + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = + LogUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), + stateEvent.getTaskInstanceId())) { + // if not task , blocking here + persist(stateEvent); } } log.info("State event loop service stopped"); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java index 78199f517d..ab022a762b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java @@ -50,8 +50,9 @@ public class TaskExecuteRunnable implements Runnable { while (!this.events.isEmpty()) { // we handle the task event belongs to one task serial, so if the event comes in wrong order, TaskEvent event = this.events.peek(); - try { - LogUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId()); + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils + .setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId())) { log.info("Handle task event begin: {}", event); taskEventHandlerMap.get(event.getEvent()).handleTaskEvent(event); events.remove(event); @@ -68,8 +69,6 @@ public class TaskExecuteRunnable implements Runnable { log.error("Handle task event error, get a unknown exception, this event will be removed, event: {}", event, unknownException); events.remove(event); - } finally { - LogUtils.removeWorkflowAndTaskInstanceIdMDC(); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java index 305d0ee680..735fb227b7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java @@ -20,9 +20,9 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.cache.StreamTaskInstanceExecCacheManager; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; import java.util.concurrent.TimeUnit; @@ -78,11 +78,11 @@ public class EventExecuteService extends BaseDaemonThread { private void workflowEventHandler() { for (WorkflowExecuteRunnable workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) { try { - LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId()); + LogUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId()); workflowExecuteThreadPool.executeEvent(workflowExecuteThread); } finally { - LoggerUtils.removeWorkflowInstanceIdMDC(); + LogUtils.removeWorkflowInstanceIdMDC(); } } } @@ -90,11 +90,11 @@ public class EventExecuteService extends BaseDaemonThread { private void streamTaskEventHandler() { for (StreamTaskExecuteRunnable streamTaskExecuteRunnable : streamTaskInstanceExecCacheManager.getAll()) { try { - LoggerUtils.setTaskInstanceIdMDC(streamTaskExecuteRunnable.getTaskInstance().getId()); + LogUtils.setTaskInstanceIdMDC(streamTaskExecuteRunnable.getTaskInstance().getId()); streamTaskExecuteThreadPool.executeEvent(streamTaskExecuteRunnable); } finally { - LoggerUtils.removeWorkflowInstanceIdMDC(); + LogUtils.removeWorkflowInstanceIdMDC(); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java index e688e758d1..0934693e41 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; @@ -43,7 +44,6 @@ import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; import org.apache.commons.collections4.CollectionUtils; @@ -180,7 +180,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl processInstances.forEach(processInstance -> { try { - LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); + LogUtils.setWorkflowInstanceIdMDC(processInstance.getId()); if (processInstanceExecCacheManager.contains(processInstance.getId())) { log.error( "The workflow instance is already been cached, this case shouldn't be happened"); @@ -200,7 +200,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW, processInstance.getId())); } finally { - LoggerUtils.removeWorkflowInstanceIdMDC(); + LogUtils.removeWorkflowInstanceIdMDC(); } }); } catch (InterruptedException interruptedException) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index 1b2c603b8e..90ce4ff00e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -28,12 +28,12 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.event.TaskStateEvent; import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent; import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; @@ -137,7 +137,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread { } for (Integer processInstanceId : processInstanceTimeoutCheckList) { try { - LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId); + LogUtils.setWorkflowInstanceIdMDC(processInstanceId); WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId( processInstanceId); if (workflowExecuteThread == null) { @@ -163,7 +163,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread { } catch (Exception ex) { log.error("Check workflow instance timeout error"); } finally { - LoggerUtils.removeWorkflowInstanceIdMDC(); + LogUtils.removeWorkflowInstanceIdMDC(); } } } @@ -252,7 +252,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread { for (TaskInstanceKey taskInstanceKey : taskInstanceTimeoutCheckList) { try { int processInstanceId = taskInstanceKey.getProcessInstanceId(); - LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId); + LogUtils.setWorkflowInstanceIdMDC(processInstanceId); long taskCode = taskInstanceKey.getTaskCode(); WorkflowExecuteRunnable workflowExecuteThread = @@ -287,7 +287,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread { } catch (Exception ex) { log.error("Check task timeout error, taskInstanceKey: {}", taskInstanceKey, ex); } finally { - LoggerUtils.removeWorkflowInstanceIdMDC(); + LogUtils.removeWorkflowInstanceIdMDC(); } } } @@ -301,7 +301,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread { int processInstanceId = taskInstanceKey.getProcessInstanceId(); long taskCode = taskInstanceKey.getTaskCode(); try { - LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId); + LogUtils.setWorkflowInstanceIdMDC(processInstanceId); WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); @@ -353,7 +353,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread { } catch (Exception ex) { log.error("Check task retry error, taskInstanceKey: {}", taskInstanceKey, ex); } finally { - LoggerUtils.removeWorkflowInstanceIdMDC(); + LogUtils.removeWorkflowInstanceIdMDC(); } } } @@ -367,7 +367,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread { long taskCode = taskInstanceKey.getTaskCode(); try { - LoggerUtils.setTaskInstanceIdMDC(processInstanceId); + LogUtils.setTaskInstanceIdMDC(processInstanceId); WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); if (workflowExecuteThread == null) { @@ -392,7 +392,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread { } catch (Exception ex) { log.error("Task state check error, taskInstanceKey: {}", taskInstanceKey, ex); } finally { - LoggerUtils.removeWorkflowInstanceIdMDC(); + LogUtils.removeWorkflowInstanceIdMDC(); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java index 0d6eabfad1..a869207a24 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java @@ -42,6 +42,7 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage; import org.apache.dolphinscheduler.remote.command.TaskExecuteStartCommand; @@ -58,7 +59,6 @@ import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.commons.collections4.CollectionUtils; @@ -221,7 +221,7 @@ public class StreamTaskExecuteRunnable implements Runnable { while (!this.taskEvents.isEmpty()) { try { taskEvent = this.taskEvents.peek(); - LoggerUtils.setTaskInstanceIdMDC(taskEvent.getTaskInstanceId()); + LogUtils.setTaskInstanceIdMDC(taskEvent.getTaskInstanceId()); log.info("Begin to handle state event, {}", taskEvent); if (this.handleTaskEvent(taskEvent)) { @@ -244,7 +244,7 @@ public class StreamTaskExecuteRunnable implements Runnable { e); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } finally { - LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteThreadPool.java index 7c9508f5ed..5c336a3aea 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteThreadPool.java @@ -17,8 +17,8 @@ package org.apache.dolphinscheduler.server.master.runner; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; import javax.annotation.PostConstruct; @@ -61,16 +61,16 @@ public class StreamTaskExecuteThreadPool extends ThreadPoolTaskExecutor { @Override public void onFailure(Throwable ex) { - LoggerUtils.setTaskInstanceIdMDC(taskInstanceId); + LogUtils.setTaskInstanceIdMDC(taskInstanceId); log.error("Stream task instance events handle failed", ex); - LoggerUtils.removeTaskInstanceIdMDC(); + LogUtils.removeTaskInstanceIdMDC(); } @Override public void onSuccess(Object result) { - LoggerUtils.setTaskInstanceIdMDC(taskInstanceId); + LogUtils.setTaskInstanceIdMDC(taskInstanceId); log.info("Stream task instance is finished."); - LoggerUtils.removeTaskInstanceIdMDC(); + LogUtils.removeTaskInstanceIdMDC(); } }); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java index eb978bc766..2e3b5844d5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java @@ -21,13 +21,13 @@ import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.event.WorkflowEvent; import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandleError; import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandleException; import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandler; import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue; import org.apache.dolphinscheduler.server.master.event.WorkflowEventType; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; import java.util.HashMap; import java.util.List; @@ -75,7 +75,7 @@ public class WorkflowEventLooper extends BaseDaemonThread { while (!ServerLifeCycleManager.isStopped()) { try { workflowEvent = workflowEventQueue.poolEvent(); - LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId()); + LogUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId()); log.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent); WorkflowEventHandler workflowEventHandler = workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType()); @@ -100,7 +100,7 @@ public class WorkflowEventLooper extends BaseDaemonThread { workflowEventQueue.addEvent(workflowEvent); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } finally { - LoggerUtils.removeWorkflowInstanceIdMDC(); + LogUtils.removeWorkflowInstanceIdMDC(); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 3596bcae5a..7e594f77fe 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -93,7 +93,6 @@ import org.apache.dolphinscheduler.service.process.ProcessDag; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; import org.apache.dolphinscheduler.service.utils.DagHelper; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -297,12 +296,13 @@ public class WorkflowExecuteRunnable implements Callable { int loopTimes = stateEvents.size() * 2; for (int i = 0; i < loopTimes; i++) { final StateEvent stateEvent = this.stateEvents.peek(); - try { - if (stateEvent == null) { - return; - } - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), - stateEvent.getTaskInstanceId()); + if (stateEvent == null) { + return; + } + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = + LogUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), + stateEvent.getTaskInstanceId())) { // if state handle success then will remove this state, otherwise will retry this state next time. // The state should always handle success except database error. checkProcessInstance(stateEvent); @@ -338,8 +338,6 @@ public class WorkflowExecuteRunnable implements Callable { stateEvent, e); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); - } finally { - LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } } @@ -725,7 +723,7 @@ public class WorkflowExecuteRunnable implements Callable { } try { - LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); + LogUtils.setWorkflowInstanceIdMDC(processInstance.getId()); if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) { buildFlowDag(); workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG; @@ -746,7 +744,7 @@ public class WorkflowExecuteRunnable implements Callable { log.error("Start workflow error", e); return WorkflowSubmitStatue.FAILED; } finally { - LoggerUtils.removeWorkflowInstanceIdMDC(); + LogUtils.removeWorkflowInstanceIdMDC(); } } @@ -862,8 +860,9 @@ public class WorkflowExecuteRunnable implements Callable { taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()); for (TaskInstance task : validTaskInstanceList) { - try { - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(task.getProcessInstanceId(), task.getId()); + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = + LogUtils.setWorkflowAndTaskInstanceIDMDC(task.getProcessInstanceId(), task.getId());) { log.info( "Check the taskInstance from a exist workflowInstance, existTaskInstanceCode: {}, taskInstanceStatus: {}", task.getTaskCode(), @@ -911,8 +910,6 @@ public class WorkflowExecuteRunnable implements Callable { if (task.getState().isFailure()) { errorTaskMap.put(task.getTaskCode(), task.getId()); } - } finally { - LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } clearDataIfExecuteTask(); @@ -993,7 +990,7 @@ public class WorkflowExecuteRunnable implements Callable { } // in a dag, only one taskInstance is valid per taskCode, so need to set the old taskInstance invalid - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId()); + LogUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId()); if (validTaskMap.containsKey(taskInstance.getTaskCode())) { int oldTaskInstanceId = validTaskMap.get(taskInstance.getTaskCode()); if (taskInstance.getId() != oldTaskInstanceId) { @@ -1063,7 +1060,7 @@ public class WorkflowExecuteRunnable implements Callable { taskInstance.getTaskCode(), e); return Optional.empty(); } finally { - LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); } } @@ -1830,8 +1827,9 @@ public class WorkflowExecuteRunnable implements Callable { if (taskInstanceId == null || taskInstanceId.equals(0)) { continue; } - LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(), taskInstanceId); - try { + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = + LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(), taskInstanceId)) { TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId); if (taskInstance == null || taskInstance.getState().isFinished()) { continue; @@ -1846,8 +1844,6 @@ public class WorkflowExecuteRunnable implements Callable { .build(); this.addStateEvent(taskStateEvent); } - } finally { - LogUtils.removeWorkflowAndTaskInstanceIdMDC(); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index e2194d6e50..00a54cc5d9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.utils.Host; @@ -31,7 +32,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.server.master.event.TaskStateEvent; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -117,20 +117,20 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { @Override public void onFailure(Throwable ex) { - LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId); + LogUtils.setWorkflowInstanceIdMDC(processInstanceId); try { log.error("Workflow instance events handle failed", ex); notifyProcessChanged(workflowExecuteThread.getProcessInstance()); multiThreadFilterMap.remove(workflowExecuteThread.getKey()); } finally { - LoggerUtils.removeWorkflowInstanceIdMDC(); + LogUtils.removeWorkflowInstanceIdMDC(); } } @Override public void onSuccess(Object result) { try { - LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId()); + LogUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId()); if (workflowExecuteThread.workFlowFinish() && workflowExecuteThread.eventSize() == 0) { stateWheelExecuteThread .removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance().getId()); @@ -143,7 +143,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { } finally { // make sure the process has been removed from multiThreadFilterMap multiThreadFilterMap.remove(workflowExecuteThread.getKey()); - LoggerUtils.removeWorkflowInstanceIdMDC(); + LogUtils.removeWorkflowInstanceIdMDC(); } } }); @@ -161,8 +161,9 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { ProcessInstance processInstance = entry.getKey(); TaskInstance taskInstance = entry.getValue(); String address = NetUtils.getAddr(masterConfig.getListenPort()); - try { - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(), taskInstance.getId()); + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = + LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(), taskInstance.getId())) { if (processInstance.getHost().equalsIgnoreCase(address)) { log.info("Process host is local master, will notify it"); this.notifyMyself(processInstance, taskInstance); @@ -170,8 +171,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { log.info("Process host is remote master, will notify it"); this.notifyProcess(finishProcessInstance, processInstance, taskInstance); } - } finally { - LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 559163f363..31bfe4c772 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -53,7 +53,6 @@ import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; @@ -71,6 +70,7 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourc import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.UdfFuncParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.JdbcUrlParser; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils; import org.apache.dolphinscheduler.plugin.task.spark.SparkParameters; import org.apache.dolphinscheduler.server.master.builder.TaskExecutionContextBuilder; @@ -78,7 +78,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.spi.plugin.SPIIdentify; @@ -103,7 +102,7 @@ import com.zaxxer.hikari.HikariDataSource; public abstract class BaseTaskProcessor implements ITaskProcessor { protected final Logger log = - LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass())); + LoggerFactory.getLogger(BaseTaskProcessor.class); protected boolean killed = false; @@ -190,9 +189,9 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { @Override public boolean action(TaskAction taskAction) { - String threadName = Thread.currentThread().getName(); + String oldTaskInstanceLogPathMdc = LogUtils.getTaskInstanceLogFullPathMdc(); if (StringUtils.isNotEmpty(threadLoggerInfoName)) { - Thread.currentThread().setName(threadLoggerInfoName); + LogUtils.setTaskInstanceLogFullPathMDC(threadLoggerInfoName); } boolean result = false; try { @@ -223,9 +222,11 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { } return result; } finally { - // reset thread name - Thread.currentThread().setName(threadName); - + LogUtils.removeTaskInstanceLogFullPathMDC(); + // reset MDC value, this should be removed. + if (oldTaskInstanceLogPathMdc != null) { + LogUtils.setTaskInstanceLogFullPathMDC(oldTaskInstanceLogPathMdc); + } } } @@ -288,12 +289,13 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { * set master task running log. */ public void setTaskExecutionLogger() { - threadLoggerInfoName = LoggerUtils.buildTaskId(taskInstance.getFirstSubmitTime(), + threadLoggerInfoName = LogUtils.getTaskInstanceLogFullPath( + taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), taskInstance.getProcessInstanceId(), taskInstance.getId()); - Thread.currentThread().setName(threadLoggerInfoName); + LogUtils.setTaskInstanceLogFullPathMDC(threadLoggerInfoName); } /** diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java index 9f11f4b26d..81739e7642 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java @@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; -import org.apache.dolphinscheduler.service.utils.LogUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import java.util.ArrayList; import java.util.Date; @@ -70,7 +70,9 @@ public class BlockingTaskProcessor extends BaseTaskProcessor { private void initTaskParameters() { taskInstance.setLogPath( - LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(), + LogUtils.getTaskInstanceLogFullPath( + taskInstance.getFirstSubmitTime(), + processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), taskInstance.getProcessInstanceId(), taskInstance.getId())); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java index f3435a21a6..7bdd48d793 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.runner.task; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS; -import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; @@ -28,7 +27,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; -import org.apache.dolphinscheduler.service.utils.LogUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import java.util.ArrayList; import java.util.Date; @@ -131,11 +130,12 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { private void initTaskParameters() { taskInstance.setLogPath( - LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(), + LogUtils.getTaskInstanceLogFullPath(taskInstance.getFirstSubmitTime(), + processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), taskInstance.getProcessInstanceId(), taskInstance.getId())); - this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); + this.taskInstance.setHost(masterConfig.getMasterAddress()); taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(new Date()); this.taskInstanceDao.upsertTaskInstance(taskInstance); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java index 84fb55a14d..5b97a88696 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.master.runner.task; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; @@ -34,9 +33,9 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.utils.DependentExecute; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.utils.LogUtils; import java.util.ArrayList; import java.util.Date; @@ -106,12 +105,12 @@ public class DependentTaskProcessor extends BaseTaskProcessor { } this.setTaskExecutionLogger(); log.info("Dependent task submit success"); - taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), + taskInstance.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), taskInstance.getProcessInstanceId(), taskInstance.getId())); - taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); + taskInstance.setHost(masterConfig.getMasterAddress()); taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(new Date()); taskInstanceDao.updateTaskInstance(taskInstance); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index 7ef817f5b0..1f8f125b19 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -28,11 +28,11 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.utils.LogUtils; import org.apache.commons.lang3.StringUtils; @@ -71,7 +71,7 @@ public class SubTaskProcessor extends BaseTaskProcessor { return false; } this.setTaskExecutionLogger(); - taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), + taskInstance.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), taskInstance.getProcessInstanceId(), diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java index 4893f44731..c2d78be5bc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java @@ -20,15 +20,14 @@ package org.apache.dolphinscheduler.server.master.runner.task; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.utils.SwitchTaskUtils; -import org.apache.dolphinscheduler.service.utils.LogUtils; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -73,11 +72,12 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { public boolean runTask() { log.info("switch task starting"); taskInstance.setLogPath( - LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(), + LogUtils.getTaskInstanceLogFullPath(taskInstance.getFirstSubmitTime(), + processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), taskInstance.getProcessInstanceId(), taskInstance.getId())); - taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); + taskInstance.setHost(masterConfig.getMasterAddress()); taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(new Date()); taskInstanceDao.updateTaskInstance(taskInstance); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java index 441556cb79..1d36b1b47b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.utils.Host; @@ -40,7 +41,6 @@ import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.service.log.LogClient; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; import org.apache.dolphinscheduler.service.utils.ProcessUtils; import org.apache.commons.collections4.CollectionUtils; @@ -167,7 +167,7 @@ public class MasterFailoverService { for (ProcessInstance processInstance : needFailoverProcessInstanceList) { try { - LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); + LogUtils.setWorkflowInstanceIdMDC(processInstance.getId()); log.info("WorkflowInstance failover starting"); if (!checkProcessInstanceNeedFailover(masterStartupTimeOptional, processInstance)) { log.info("WorkflowInstance doesn't need to failover"); @@ -180,7 +180,7 @@ public class MasterFailoverService { taskInstanceDao.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag()); for (TaskInstance taskInstance : taskInstanceList) { try { - LoggerUtils.setTaskInstanceIdMDC(taskInstance.getId()); + LogUtils.setTaskInstanceIdMDC(taskInstance.getId()); log.info("TaskInstance failover starting"); if (!checkTaskInstanceNeedFailover(taskInstance)) { log.info("The taskInstance doesn't need to failover"); @@ -189,7 +189,7 @@ public class MasterFailoverService { failoverTaskInstance(processInstance, taskInstance); log.info("TaskInstance failover finished"); } finally { - LoggerUtils.removeTaskInstanceIdMDC(); + LogUtils.removeTaskInstanceIdMDC(); } } @@ -200,7 +200,7 @@ public class MasterFailoverService { processService.processNeedFailoverProcessInstances(processInstance); log.info("WorkflowInstance failover finished"); } finally { - LoggerUtils.removeWorkflowInstanceIdMDC(); + LogUtils.removeWorkflowInstanceIdMDC(); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java index a04ce18a93..315c02ec09 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.server.master.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; @@ -38,7 +39,6 @@ import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPoo import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.service.log.LogClient; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; import org.apache.dolphinscheduler.service.utils.ProcessUtils; import org.apache.commons.collections4.CollectionUtils; @@ -119,8 +119,10 @@ public class WorkerFailoverService { needFailoverTaskInstanceList.stream().map(TaskInstance::getId).collect(Collectors.toList())); final Map processInstanceCacheMap = new HashMap<>(); for (TaskInstance taskInstance : needFailoverTaskInstanceList) { - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId()); - try { + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = + LogUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), + taskInstance.getId())) { ProcessInstance processInstance = processInstanceCacheMap.computeIfAbsent( taskInstance.getProcessInstanceId(), k -> { WorkflowExecuteRunnable workflowExecuteRunnable = cacheManager.getByProcessInstanceId( @@ -141,8 +143,6 @@ public class WorkerFailoverService { log.info("Worker[{}] failover: Finish failover taskInstance", workerHost); } catch (Exception ex) { log.info("Worker[{}] failover taskInstance occur exception", workerHost, ex); - } finally { - LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } failoverTimeCost.stop(); diff --git a/dolphinscheduler-master/src/main/resources/logback-spring.xml b/dolphinscheduler-master/src/main/resources/logback-spring.xml index c3b342bffb..deb791fae2 100644 --- a/dolphinscheduler-master/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-master/src/main/resources/logback-spring.xml @@ -32,12 +32,12 @@ - taskAppId + taskInstanceLogFullPath ${log.base} - - ${log.base}/${taskAppId}.log + + ${taskInstanceLogFullPath} [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} - %message%n diff --git a/dolphinscheduler-master/src/test/resources/logback.xml b/dolphinscheduler-master/src/test/resources/logback.xml new file mode 100644 index 0000000000..deb791fae2 --- /dev/null +++ b/dolphinscheduler-master/src/test/resources/logback.xml @@ -0,0 +1,77 @@ + + + + + + + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n + + UTF-8 + + + + + + + + taskInstanceLogFullPath + ${log.base} + + + + ${taskInstanceLogFullPath} + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} - %message%n + + UTF-8 + + true + + + + + ${log.base}/dolphinscheduler-master.log + + ${log.base}/dolphinscheduler-master.%d{yyyy-MM-dd_HH}.%i.log + 168 + 200MB + 50GB + true + + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n + + UTF-8 + + + + + + + + + + + + + diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/LoggerRequestProcessor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/LoggerRequestProcessor.java index 6259998146..ff16ae253d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/LoggerRequestProcessor.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/LoggerRequestProcessor.java @@ -45,7 +45,6 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -206,11 +205,11 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { return stream.skip(skipLine).limit(limit).collect(Collectors.toList()); } catch (IOException e) { log.error("read file error", e); + throw new RuntimeException(String.format("Read file: %s error", filePath), e); } } else { - log.info("file path: {} not exists", filePath); + throw new RuntimeException("The file path: " + filePath + " not exists"); } - return Collections.emptyList(); } private List readPartFileContentFromRemote(String filePath, diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java index dc208c2763..1c279ce026 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java @@ -39,7 +39,6 @@ import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.factory.NettyRemotingClientFactory; import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; import java.util.List; @@ -115,7 +114,7 @@ public class LogClient implements AutoCloseable { final Host address = new Host(host, port); try { if (NetUtils.getHost().equals(host)) { - return LoggerUtils.readWholeFileContent(request.getPath()); + return LogUtils.readWholeFileContentFromLocal(request.getPath()); } else { Command command = request.convert2Command(); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LogUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LogUtils.java deleted file mode 100644 index 7f7188fb2c..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LogUtils.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.utils; - -import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.constants.DateConstants; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.log.TaskLogDiscriminator; - -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Date; -import java.util.Optional; - -import org.slf4j.LoggerFactory; - -import ch.qos.logback.classic.sift.SiftingAppender; -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.spi.AppenderAttachable; - -public class LogUtils { - - public static final String LOG_TAILFIX = ".log"; - - private LogUtils() throws IllegalStateException { - throw new IllegalStateException("Utility class"); - } - - /** - * get task log path - */ - public static String getTaskLogPath(Date firstSubmitTime, Long processDefineCode, int processDefineVersion, - int processInstanceId, int taskInstanceId) { - // format /logs/YYYYMMDD/defintion-code_defintion_version-processInstanceId-taskInstanceId.log - final String taskLogFileName = new StringBuilder(String.valueOf(processDefineCode)) - .append(Constants.UNDERLINE) - .append(processDefineVersion) - .append(Constants.SUBTRACT_CHAR) - .append(processInstanceId) - .append(Constants.SUBTRACT_CHAR) - .append(taskInstanceId) - .append(LOG_TAILFIX) - .toString(); - // Optional.map will be skipped if null - return Optional.of(LoggerFactory.getILoggerFactory()) - .map(e -> (AppenderAttachable) (e.getLogger("ROOT"))) - .map(e -> (SiftingAppender) (e.getAppender("TASKLOGFILE"))) - .map(e -> ((TaskLogDiscriminator) (e.getDiscriminator()))) - .map(TaskLogDiscriminator::getLogBase) - .map(e -> Paths.get(e) - .toAbsolutePath() - .resolve(DateUtils.format(firstSubmitTime, DateConstants.YYYYMMDD, null)) - .resolve(taskLogFileName)) - .map(Path::toString) - .orElse(""); - } - - /** - * get task log path by TaskExecutionContext - */ - public static String getTaskLogPath(TaskExecutionContext taskExecutionContext) { - return getTaskLogPath(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()), - taskExecutionContext.getProcessDefineCode(), - taskExecutionContext.getProcessDefineVersion(), - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId()); - } - -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LoggerUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LoggerUtils.java deleted file mode 100644 index b8ac5647c2..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LoggerUtils.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.utils; - -import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.constants.DateConstants; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; - -import java.io.BufferedReader; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.Date; - -import lombok.experimental.UtilityClass; -import lombok.extern.slf4j.Slf4j; - -import org.slf4j.MDC; - -/** - * Please use {@link org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils}. - */ -@Deprecated -@UtilityClass -@Slf4j -public class LoggerUtils { - - public static String buildTaskId(Date firstSubmitTime, - Long processDefineCode, - int processDefineVersion, - int processInstId, - int taskId) { - // like TaskAppId=TASK-20211107-798_1-4084-15210 - String firstSubmitTimeStr = DateUtils.format(firstSubmitTime, DateConstants.YYYYMMDD, null); - return String.format("%s=%s-%s-%s_%s-%s-%s", - TaskConstants.TASK_APPID_LOG_FORMAT, TaskConstants.TASK_LOGGER_INFO_PREFIX, firstSubmitTimeStr, - processDefineCode, processDefineVersion, processInstId, taskId); - } - - /** - * read whole file content - * - * @param filePath file path - * @return whole file content - */ - public static String readWholeFileContent(String filePath) { - String line; - StringBuilder sb = new StringBuilder(); - try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)))) { - while ((line = br.readLine()) != null) { - sb.append(line + "\r\n"); - } - return sb.toString(); - } catch (IOException e) { - log.error("read file error", e); - } - return ""; - } - - /** - * Please use {@link org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils} - */ - public static void setWorkflowAndTaskInstanceIDMDC(Integer workflowInstanceId, Integer taskInstanceId) { - setWorkflowInstanceIdMDC(workflowInstanceId); - setTaskInstanceIdMDC(taskInstanceId); - } - - /** - * Please use {@link org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils} - */ - public static void setWorkflowInstanceIdMDC(Integer workflowInstanceId) { - MDC.put(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY, String.valueOf(workflowInstanceId)); - } - - /** - * Please use {@link org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils} - */ - public static void setTaskInstanceIdMDC(Integer taskInstanceId) { - MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY, String.valueOf(taskInstanceId)); - } - - /** - * Please use {@link org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils} - */ - public static void removeWorkflowAndTaskInstanceIdMDC() { - removeWorkflowInstanceIdMDC(); - removeTaskInstanceIdMDC(); - } - - /** - * Please use {@link org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils} - */ - public static void removeWorkflowInstanceIdMDC() { - MDC.remove(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY); - } - - /** - * Please use {@link org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils} - */ - public static void removeTaskInstanceIdMDC() { - MDC.remove(Constants.TASK_INSTANCE_ID_MDC_KEY); - } -} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java deleted file mode 100644 index 3864816f67..0000000000 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.log; - -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.NetUtils; -import org.apache.dolphinscheduler.remote.NettyRemotingClient; -import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand; -import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand; -import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand; -import org.apache.dolphinscheduler.remote.factory.NettyRemotingClientFactory; -import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; - -import java.nio.charset.StandardCharsets; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.MockedStatic; -import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -public class LogClientTest { - - @Test - public void testViewLogFromLocal() { - String localMachine = "LOCAL_MACHINE"; - int port = 1234; - String path = "/tmp/log"; - - try ( - MockedStatic mockedNetUtils = Mockito.mockStatic(NetUtils.class); - MockedStatic mockedLoggerUtils = Mockito.mockStatic(LoggerUtils.class)) { - mockedNetUtils.when(NetUtils::getHost) - .thenReturn(localMachine); - mockedLoggerUtils.when(() -> LoggerUtils.readWholeFileContent(Mockito.anyString())) - .thenReturn("application_xx_11"); - LogClient logClient = new LogClient(); - String log = logClient.viewLog(localMachine, port, path); - Assertions.assertNotNull(log); - } - } - - @Test - public void testViewLogFromRemote() throws Exception { - String localMachine = "127.0.0.1"; - int port = 1234; - String path = "/tmp/log"; - - try (MockedStatic mockedNetUtils = Mockito.mockStatic(NetUtils.class)) { - mockedNetUtils.when(NetUtils::getHost) - .thenReturn(localMachine + "1"); - LogClient logClient = new LogClient(); - String log = logClient.viewLog(localMachine, port, path); - Assertions.assertNotNull(log); - } - - Command command = new Command(); - command.setBody(JSONUtils.toJsonString(new ViewLogResponseCommand("")).getBytes(StandardCharsets.UTF_8)); - LogClient logClient = new LogClient(); - String log = logClient.viewLog(localMachine, port, path); - Assertions.assertNotNull(log); - } - - @Test - public void testClose() { - try ( - MockedStatic mockedNettyRemotingClientFactory = - Mockito.mockStatic(NettyRemotingClientFactory.class)) { - NettyRemotingClient remotingClient = Mockito.mock(NettyRemotingClient.class); - mockedNettyRemotingClientFactory.when(NettyRemotingClientFactory::buildNettyRemotingClient) - .thenReturn(remotingClient); - LogClient logClient = new LogClient(); - logClient.close(); - } - } - - @Test - public void testRollViewLog() throws Exception { - try ( - MockedStatic mockedNettyRemotingClientFactory = - Mockito.mockStatic(NettyRemotingClientFactory.class)) { - NettyRemotingClient remotingClient = Mockito.mock(NettyRemotingClient.class); - mockedNettyRemotingClientFactory.when(NettyRemotingClientFactory::buildNettyRemotingClient) - .thenReturn(remotingClient); - Command command = new Command(); - command.setBody(JSONUtils.toJsonByteArray(new RollViewLogResponseCommand("success"))); - Mockito.when( - remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong())) - .thenReturn(command); - - LogClient logClient = new LogClient(); - String msg = logClient.rollViewLog("localhost", 1234, "/tmp/log", 0, 10); - Assertions.assertNotNull(msg); - } - } - - @Test - public void testGetLogBytes() throws Exception { - try ( - MockedStatic mockedNettyRemotingClientFactory = - Mockito.mockStatic(NettyRemotingClientFactory.class)) { - NettyRemotingClient remotingClient = Mockito.mock(NettyRemotingClient.class); - mockedNettyRemotingClientFactory.when(NettyRemotingClientFactory::buildNettyRemotingClient) - .thenReturn(remotingClient); - Command command = new Command(); - command.setBody( - JSONUtils.toJsonByteArray(new GetLogBytesResponseCommand("log".getBytes(StandardCharsets.UTF_8)))); - Mockito.when( - remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong())) - .thenReturn(command); - - LogClient logClient = new LogClient(); - byte[] logBytes = logClient.getLogBytes("localhost", 1234, "/tmp/log"); - Assertions.assertNotNull(logBytes); - } - } - - @Test - public void testRemoveTaskLog() { - - try ( - MockedStatic mockedNettyRemotingClientFactory = - Mockito.mockStatic(NettyRemotingClientFactory.class)) { - NettyRemotingClient remotingClient = Mockito.mock(NettyRemotingClient.class); - mockedNettyRemotingClientFactory.when(NettyRemotingClientFactory::buildNettyRemotingClient) - .thenReturn(remotingClient); - - LogClient logClient = new LogClient(); - Assertions.assertDoesNotThrow(() -> logClient.removeTaskLog(Host.of("localhost:1234"), "/log/path")); - } - } -} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LoggerRequestProcessorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LoggerRequestProcessorTest.java index fb02a3be48..31912e2c6a 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LoggerRequestProcessorTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LoggerRequestProcessorTest.java @@ -18,11 +18,11 @@ package org.apache.dolphinscheduler.service.log; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand; import org.apache.dolphinscheduler.remote.processor.LoggerRequestProcessor; -import org.apache.dolphinscheduler.service.utils.LoggerUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -37,11 +37,11 @@ import io.netty.channel.Channel; @ExtendWith(MockitoExtension.class) public class LoggerRequestProcessorTest { - private MockedStatic mockedStaticLoggerUtils; + private MockedStatic mockedStaticLoggerUtils; @BeforeEach public void setUp() { - mockedStaticLoggerUtils = Mockito.mockStatic(LoggerUtils.class); + mockedStaticLoggerUtils = Mockito.mockStatic(LogUtils.class); } @AfterEach @@ -54,7 +54,7 @@ public class LoggerRequestProcessorTest { System.setProperty("DOLPHINSCHEDULER_WORKER_HOME", System.getProperty("user.dir")); Channel channel = Mockito.mock(Channel.class); Mockito.when(channel.writeAndFlush(Mockito.any(Command.class))).thenReturn(null); - Mockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn(""); + Mockito.when(LogUtils.readWholeFileContentFromLocal(Mockito.anyString())).thenReturn(""); String userDir = System.getProperty("user.dir"); ViewLogRequestCommand logRequestCommand = new ViewLogRequestCommand(userDir + "/log/path/a.log"); @@ -70,7 +70,7 @@ public class LoggerRequestProcessorTest { public void testProcessViewWholeLogRequestError() { System.setProperty("DOLPHINSCHEDULER_WORKER_HOME", System.getProperty("user.dir")); Channel channel = Mockito.mock(Channel.class); - Mockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn(""); + Mockito.when(LogUtils.readWholeFileContentFromLocal(Mockito.anyString())).thenReturn(""); String userDir = System.getProperty("user.dir"); ViewLogRequestCommand logRequestCommand = new ViewLogRequestCommand(userDir + "/log/path/a"); @@ -86,7 +86,7 @@ public class LoggerRequestProcessorTest { public void testProcessViewWholeLogRequestErrorRelativePath() { System.setProperty("DOLPHINSCHEDULER_WORKER_HOME", System.getProperty("user.dir")); Channel channel = Mockito.mock(Channel.class); - Mockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn(""); + Mockito.when(LogUtils.readWholeFileContentFromLocal(Mockito.anyString())).thenReturn(""); String userDir = System.getProperty("user.dir"); ViewLogRequestCommand logRequestCommand = new ViewLogRequestCommand(userDir + "/log/../../a.log"); @@ -102,7 +102,7 @@ public class LoggerRequestProcessorTest { public void testProcessViewWholeLogRequestErrorStartWith() { System.setProperty("DOLPHINSCHEDULER_WORKER_HOME", System.getProperty("user.dir")); Channel channel = Mockito.mock(Channel.class); - Mockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn(""); + Mockito.when(LogUtils.readWholeFileContentFromLocal(Mockito.anyString())).thenReturn(""); ViewLogRequestCommand logRequestCommand = new ViewLogRequestCommand("/log/a.log"); Command command = new Command(); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogDiscriminatorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogDiscriminatorTest.java deleted file mode 100644 index d8a79c8465..0000000000 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogDiscriminatorTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.service.log; - -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; -import org.apache.dolphinscheduler.plugin.task.api.log.TaskLogDiscriminator; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import ch.qos.logback.classic.Level; -import ch.qos.logback.classic.spi.LoggingEvent; - -public class TaskLogDiscriminatorTest { - - /** - * log base - */ - private String logBase = "logs"; - - TaskLogDiscriminator taskLogDiscriminator; - - @BeforeEach - public void before() { - taskLogDiscriminator = new TaskLogDiscriminator(); - taskLogDiscriminator.setLogBase("logs"); - taskLogDiscriminator.setKey("123"); - } - - @Test - public void getDiscriminatingValue() { - String result = taskLogDiscriminator.getDiscriminatingValue(new LoggingEvent() { - - @Override - public String getThreadName() { - return "taskAppId=TASK-20220105-101-1-1001"; - } - - @Override - public Level getLevel() { - return null; - } - - @Override - public String getMessage() { - return null; - } - - @Override - public Object[] getArgumentArray() { - return new Object[0]; - } - - @Override - public String getFormattedMessage() { - return null; - } - - @Override - public String getLoggerName() { - return TaskConstants.TASK_LOG_LOGGER_NAME; - } - }); - Assertions.assertEquals("20220105/101-1-1001", result); - } - - @Test - public void start() { - taskLogDiscriminator.start(); - Assertions.assertEquals(true, taskLogDiscriminator.isStarted()); - } - - @Test - public void getKey() { - Assertions.assertEquals("123", taskLogDiscriminator.getKey()); - } - - @Test - public void setKey() { - - taskLogDiscriminator.setKey("123"); - } - - @Test - public void getLogBase() { - Assertions.assertEquals("logs", taskLogDiscriminator.getLogBase()); - } - - @Test - public void setLogBase() { - taskLogDiscriminator.setLogBase("logs"); - } -} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogFilterTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogFilterTest.java deleted file mode 100644 index 0c9c43fd10..0000000000 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/TaskLogFilterTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.service.log; - -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; -import org.apache.dolphinscheduler.plugin.task.api.log.TaskLogFilter; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import ch.qos.logback.classic.Level; -import ch.qos.logback.classic.spi.LoggingEvent; -import ch.qos.logback.core.spi.FilterReply; - -public class TaskLogFilterTest { - - @Test - public void decide() { - TaskLogFilter taskLogFilter = new TaskLogFilter(); - - FilterReply filterReply = taskLogFilter.decide(new LoggingEvent() { - - @Override - public String getThreadName() { - return TaskConstants.TASK_APPID_LOG_FORMAT; - } - - @Override - public Level getLevel() { - return Level.INFO; - } - - @Override - public String getMessage() { - return "raw script : echo 222"; - } - - @Override - public Object[] getArgumentArray() { - return new Object[0]; - } - - @Override - public String getFormattedMessage() { - return "raw script : echo 222"; - } - - @Override - public String getLoggerName() { - return TaskConstants.TASK_LOG_LOGGER_NAME; - } - }); - - Assertions.assertEquals(FilterReply.ACCEPT, filterReply); - - } -} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/LogUtilsTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/LogUtilsTest.java deleted file mode 100644 index f03801091d..0000000000 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/LogUtilsTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.utils; - -import org.apache.dolphinscheduler.common.constants.DateConstants; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.log.TaskLogDiscriminator; - -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Date; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; -import org.slf4j.LoggerFactory; - -import ch.qos.logback.classic.Logger; -import ch.qos.logback.classic.sift.SiftingAppender; - -@ExtendWith(MockitoExtension.class) -public class LogUtilsTest { - - @Test - public void testGetTaskLogPath() { - Date firstSubmitTime = new Date(); - TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); - taskExecutionContext.setProcessInstanceId(100); - taskExecutionContext.setTaskInstanceId(1000); - taskExecutionContext.setProcessDefineCode(1L); - taskExecutionContext.setProcessDefineVersion(1); - taskExecutionContext.setFirstSubmitTime(firstSubmitTime.getTime()); - - Logger rootLogger = (Logger) LoggerFactory.getILoggerFactory().getLogger("ROOT"); - Assertions.assertNotNull(rootLogger); - - SiftingAppender appender = Mockito.mock(SiftingAppender.class); - // it's a trick to mock logger.getAppend("TASKLOGFILE") - Mockito.when(appender.getName()).thenReturn("TASKLOGFILE"); - rootLogger.addAppender(appender); - - Path logBase = Paths.get("path").resolve("to").resolve("test"); - - TaskLogDiscriminator taskLogDiscriminator = Mockito.mock(TaskLogDiscriminator.class); - Mockito.when(taskLogDiscriminator.getLogBase()).thenReturn(logBase.toString()); - Mockito.when(appender.getDiscriminator()).thenReturn(taskLogDiscriminator); - - Path logPath = Paths.get(".").toAbsolutePath().getParent() - .resolve(logBase) - .resolve(DateUtils.format(firstSubmitTime, DateConstants.YYYYMMDD, null)) - .resolve("1_1-100-1000.log"); - Assertions.assertEquals(logPath.toString(), LogUtils.getTaskLogPath(taskExecutionContext)); - } - -} diff --git a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml index 4191bc47ab..90f7290e15 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml @@ -52,12 +52,12 @@ - taskAppId + taskInstanceLogFullPath ${log.base} - - ${log.base}/${taskAppId}.log + + ${taskInstanceLogFullPath} [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} - %message%n diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index e7bece0596..92297558d7 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -21,11 +21,12 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL; import static org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.getPidsStr; -import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils; @@ -39,7 +40,6 @@ import java.io.InputStreamReader; import java.lang.reflect.Field; import java.util.LinkedList; import java.util.List; -import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -288,19 +288,18 @@ public abstract class AbstractCommandExecutor { return; } - // clear log - clear(); - int processId = getProcessId(process); - - logger.info("cancel process: {}", processId); - + logger.info("Begin to kill process process, pid is : {}", processId); // kill , waiting for completion boolean alive = softKill(processId); if (alive) { - // hard kill - hardKill(processId); + String cmd = String.format("kill -9 %s", getPidsStr(processId)); + cmd = OSUtils.getSudoCmd(taskRequest.getTenantCode(), cmd); + OSUtils.exeCmd(cmd); + logger.info("Success kill task: {}, pid: {}, cmd: {}", taskRequest.getTaskAppId(), processId, cmd); + } else { + logger.info("The process: {} is not alive, no need to kill", processId); } } @@ -328,71 +327,19 @@ public abstract class AbstractCommandExecutor { return process.isAlive(); } - /** - * hard kill - * - * @param processId process id - */ - private void hardKill(int processId) { - if (processId != 0 && process.isAlive()) { - try { - String cmd = String.format("kill -9 %s", getPidsStr(processId)); - cmd = OSUtils.getSudoCmd(taskRequest.getTenantCode(), cmd); - logger.info("hard kill task:{}, process id:{}, cmd:{}", taskRequest.getTaskAppId(), processId, cmd); - - OSUtils.exeCmd(cmd); - } catch (Exception e) { - logger.error("kill attempt failed ", e); - } - } - } - private void printCommand(List commands) { logger.info("task run command: {}", String.join(" ", commands)); } - /** - * clear - */ - private void clear() { - - LinkedBlockingQueue markerLog = new LinkedBlockingQueue<>(1); - markerLog.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString()); - String logs = appendPodLogIfNeeded(); - if (StringUtils.isNotEmpty(logs)) { - logBuffer.add("Dump logs from driver pod:"); - logBuffer.add(logs); - } - if (!logBuffer.isEmpty()) { - // log handle - logHandler.accept(logBuffer); - logBuffer.clear(); - } - logHandler.accept(markerLog); - - if (RemoteLogUtils.isRemoteLoggingEnable()) { - RemoteLogUtils.sendRemoteLog(taskRequest.getLogPath()); - logger.info("Log handler sends task log {} to remote storage asynchronously.", taskRequest.getLogPath()); - } - } - - private String appendPodLogIfNeeded() { - if (Objects.isNull(taskRequest.getK8sTaskExecutionContext())) { - return ""; - } - return ProcessUtils.getPodLog(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId()); - } - - /** - * get the standard output of the process - * - * @param process process - */ private void parseProcessOutput(Process process) { - String threadLoggerInfoName = taskRequest.getTaskLogName(); - ExecutorService getOutputLogService = newDaemonSingleThreadExecutor(threadLoggerInfoName); + // todo: remove this this thread pool. + ExecutorService getOutputLogService = ThreadUtils + .newSingleDaemonScheduledExecutorService("ResolveOutputLog-thread-" + taskRequest.getTaskName()); getOutputLogService.submit(() -> { - try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = + LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); + BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { String line; while ((line = inReader.readLine()) != null) { if (line.startsWith("${setValue(") || line.startsWith("#{setValue(")) { @@ -412,9 +359,12 @@ public abstract class AbstractCommandExecutor { getOutputLogService.shutdown(); - ExecutorService parseProcessOutputExecutorService = newDaemonSingleThreadExecutor(threadLoggerInfoName); + ExecutorService parseProcessOutputExecutorService = ThreadUtils + .newSingleDaemonScheduledExecutorService("TaskInstanceLogOutput-thread-" + taskRequest.getTaskName()); taskOutputFuture = parseProcessOutputExecutorService.submit(() -> { - try { + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = + LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());) { while (!logBuffer.isEmpty() || !logOutputIsSuccess) { if (!logBuffer.isEmpty()) { logHandler.accept(logBuffer); @@ -425,8 +375,6 @@ public abstract class AbstractCommandExecutor { } } catch (Exception e) { logger.error("Output task log error", e); - } finally { - clear(); } }); parseProcessOutputExecutorService.shutdown(); @@ -477,7 +425,7 @@ public abstract class AbstractCommandExecutor { processId = f.getInt(process); } catch (Throwable e) { - logger.error(e.getMessage(), e); + logger.error("Get task pid failed", e); } return processId; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java index b012a5fff3..1c51594b9e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java @@ -30,18 +30,13 @@ import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.Marker; -import org.slf4j.MarkerFactory; /** * executive task */ public abstract class AbstractTask { - public static final Marker FINALIZE_SESSION_MARKER = MarkerFactory.getMarker("FINALIZE_SESSION"); - - protected final Logger log = - LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass())); + protected final Logger log = LoggerFactory.getLogger(AbstractTask.class); public String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*"; @@ -197,16 +192,12 @@ public abstract class AbstractTask { * @param logs log list */ public void logHandle(LinkedBlockingQueue logs) { - // note that the "new line" is added here to facilitate log parsing - if (logs.contains(FINALIZE_SESSION_MARKER.toString())) { - log.info(FINALIZE_SESSION_MARKER, FINALIZE_SESSION_MARKER.toString()); - } else { - StringJoiner joiner = new StringJoiner("\n\t"); - while (!logs.isEmpty()) { - joiner.add(logs.poll()); - } - log.info(" -> {}", joiner); + + StringJoiner joiner = new StringJoiner("\n\t"); + while (!logs.isEmpty()) { + joiner.add(logs.poll()); } + log.info(" -> {}", joiner); } /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java index 2187102a7d..ca8924bc7f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java @@ -146,31 +146,6 @@ public class TaskConstants { public static final String RWXR_XR_X = "rwxr-xr-x"; - /** - * task log info format - */ - public static final String TASK_LOG_LOGGER_NAME = "TaskLogLogger"; - - /** - * task log logger name format - */ - public static final String TASK_LOG_LOGGER_NAME_FORMAT = TASK_LOG_LOGGER_NAME + "-%s"; - - /** - * Task Logger's prefix - */ - public static final String TASK_LOGGER_INFO_PREFIX = "TASK"; - - /** - * Task Logger Thread's name - */ - public static final String TASK_APPID_LOG_FORMAT = "taskAppId"; - - /** - * get output log service - */ - public static final String GET_OUTPUT_LOG_SERVICE = "-getOutputLogService"; - /** * date format of yyyyMMdd */ diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java index 2a676755fc..efc0289bf8 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java @@ -214,12 +214,6 @@ public class TaskExecutionContext implements Serializable { */ private TaskExecutionStatus currentExecutionStatus; - /** - * Task Logger name should be like: - * TaskAppId=TASK-{firstSubmitTime}-{processDefineCode}_{processDefineVersion}-{processInstanceId}-{taskInstanceId} - */ - private String taskLogName; - private ResourceParametersHelper resourceParametersHelper; /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java index a5950c7b0e..e29df64bfe 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java @@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.commons.collections4.CollectionUtils; + import java.util.List; import java.util.Map; import java.util.Objects; @@ -174,25 +176,21 @@ public class KubernetesApplicationManager implements ApplicationManager { * @return */ public String collectPodLog(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { - try { - KubernetesClient client = getClient(kubernetesApplicationManagerContext); - FilterWatchListDeletable watchList = getDriverPod(kubernetesApplicationManagerContext); - List driverPod = watchList.list().getItems(); - if (!driverPod.isEmpty()) { - Pod driver = driverPod.get(0); - String driverPodName = driver.getMetadata().getName(); - String logs = client.pods() - .inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace()) - .withName(driverPodName).getLog(); - - // delete driver pod only after successful execution - killApplication(kubernetesApplicationManagerContext); - return logs; - } - } catch (Exception e) { - log.error("Collect pod log failed", e); + KubernetesClient client = getClient(kubernetesApplicationManagerContext); + FilterWatchListDeletable watchList = getDriverPod(kubernetesApplicationManagerContext); + List driverPod = watchList.list().getItems(); + if (CollectionUtils.isEmpty(driverPod)) { + return "The driver pod does not exist."; } - return ""; + Pod driver = driverPod.get(0); + String driverPodName = driver.getMetadata().getName(); + String logs = client.pods() + .inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace()) + .withName(driverPodName).getLog(); + + // delete driver pod only after successful execution + killApplication(kubernetesApplicationManagerContext); + return logs; } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminator.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminator.java index 503d9baf0f..e002802d76 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminator.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminator.java @@ -16,10 +16,12 @@ */ package org.apache.dolphinscheduler.plugin.task.api.log; -import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import lombok.extern.slf4j.Slf4j; + +import org.slf4j.MDC; + import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.sift.AbstractDiscriminator; @@ -29,38 +31,17 @@ import ch.qos.logback.core.sift.AbstractDiscriminator; @Slf4j public class TaskLogDiscriminator extends AbstractDiscriminator { - /** - * key - */ private String key; - /** - * log base - */ private String logBase; - /** - * log name should be like: - * Task Logger name should be like: Task-{processDefinitionId}-{processInstanceId}-{taskInstanceId} - */ @Override public String getDiscriminatingValue(ILoggingEvent event) { - String key = "unknown_task"; - if (event.getLoggerName().startsWith(TaskConstants.TASK_LOG_LOGGER_NAME)) { - String threadName = event.getThreadName(); - if (threadName.endsWith(TaskConstants.GET_OUTPUT_LOG_SERVICE)) { - threadName = - threadName.substring(0, threadName.length() - TaskConstants.GET_OUTPUT_LOG_SERVICE.length()); - } - String part1 = threadName.split(Constants.EQUAL_SIGN)[1]; - String prefix = TaskConstants.TASK_LOGGER_INFO_PREFIX + "-"; - if (part1.startsWith(prefix)) { - key = part1.substring(prefix.length()).replaceFirst("-", "/"); - } + String taskInstanceLogPath = MDC.get(LogUtils.TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY); + if (taskInstanceLogPath == null) { + log.error("The task instance log path is null, please check the logback configuration, log: {}", event); } - log.debug("task log discriminator end, key is:{}, thread name:{}, loggerName:{}", key, event.getThreadName(), - event.getLoggerName()); - return key; + return taskInstanceLogPath; } @Override diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilter.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilter.java index f6724ea19e..8e71427559 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilter.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilter.java @@ -17,45 +17,24 @@ package org.apache.dolphinscheduler.plugin.task.api.log; -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import lombok.extern.slf4j.Slf4j; -import ch.qos.logback.classic.Level; + +import org.slf4j.MDC; + import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.filter.Filter; import ch.qos.logback.core.spi.FilterReply; /** - * task log filter + * This class is used to filter the log of the task instance. */ @Slf4j public class TaskLogFilter extends Filter { - /** - * level - */ - private Level level; - - public void setLevel(String level) { - this.level = Level.toLevel(level); - } - - /** - * Accept or reject based on thread name - * - * @param event event - * @return FilterReply - */ @Override public FilterReply decide(ILoggingEvent event) { - FilterReply filterReply = FilterReply.DENY; - if ((event.getThreadName().startsWith(TaskConstants.TASK_APPID_LOG_FORMAT) - && event.getLoggerName().startsWith(TaskConstants.TASK_LOG_LOGGER_NAME)) - || event.getLevel().isGreaterOrEqual(level)) { - filterReply = FilterReply.ACCEPT; - } - log.debug("task log filter, thread name:{}, loggerName:{}, filterReply:{}, level:{}", event.getThreadName(), - event.getLoggerName(), filterReply.name(), level); - return filterReply; + return MDC.get(LogUtils.TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY) == null ? FilterReply.DENY : FilterReply.ACCEPT; } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java index 490b07d4af..5b9411167c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java @@ -45,6 +45,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; +import lombok.AllArgsConstructor; import lombok.NonNull; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; @@ -60,9 +61,19 @@ import ch.qos.logback.core.spi.AppenderAttachable; @UtilityClass public class LogUtils { - private static final String LOG_TAILFIX = ".log"; + private static Path TASK_INSTANCE_LOG_BASE_PATH = getTaskInstanceLogBasePath(); + public static final String TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY = "taskInstanceLogFullPath"; + private static final Pattern APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX); + /** + * Get application_id from log file. + * + * @param logPath log file path + * @param appInfoPath appInfo file path + * @param fetchWay fetch way + * @return application id list. + */ public List getAppIds(@NonNull String logPath, @NonNull String appInfoPath, String fetchWay) { if (!StringUtils.isEmpty(fetchWay) && fetchWay.equals("aop")) { log.info("Start finding appId in {}, fetch way: {} ", appInfoPath, fetchWay); @@ -73,53 +84,65 @@ public class LogUtils { } } - public static String getTaskLogPath(TaskExecutionContext taskExecutionContext) { - return getTaskLogPath(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()), + /** + * Get task instance log full path. + * + * @param taskExecutionContext task execution context. + * @return task instance log full path. + */ + public static String getTaskInstanceLogFullPath(TaskExecutionContext taskExecutionContext) { + return getTaskInstanceLogFullPath( + DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()), taskExecutionContext.getProcessDefineCode(), taskExecutionContext.getProcessDefineVersion(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); } - public static String getTaskLogPath(Date firstSubmitTime, - Long processDefineCode, - int processDefineVersion, - int processInstanceId, - int taskInstanceId) { - // format /logs/YYYYMMDD/defintion-code_defintion_version-processInstanceId-taskInstanceId.log - final String taskLogFileName = new StringBuilder(String.valueOf(processDefineCode)) - .append(Constants.UNDERLINE) - .append(processDefineVersion) - .append(Constants.SUBTRACT_CHAR) - .append(processInstanceId) - .append(Constants.SUBTRACT_CHAR) - .append(taskInstanceId) - .append(LOG_TAILFIX) + /** + * todo: Remove the submitTime parameter? + * The task instance log full path, the path is like:{log.base}/{taskSubmitTime}/{workflowDefinitionCode}/{workflowDefinitionVersion}/{}workflowInstance}/{taskInstance}.log + * + * @param taskFirstSubmitTime task first submit time + * @param workflowDefinitionCode workflow definition code + * @param workflowDefinitionVersion workflow definition version + * @param workflowInstanceId workflow instance id + * @param taskInstanceId task instance id. + * @return task instance log full path. + */ + public static String getTaskInstanceLogFullPath(Date taskFirstSubmitTime, + Long workflowDefinitionCode, + int workflowDefinitionVersion, + int workflowInstanceId, + int taskInstanceId) { + if (TASK_INSTANCE_LOG_BASE_PATH == null) { + throw new IllegalArgumentException( + "Cannot find the task instance log base path, please check your logback.xml file"); + } + final String taskLogFileName = Paths.get( + String.valueOf(workflowDefinitionCode), + String.valueOf(workflowDefinitionVersion), + String.valueOf(workflowInstanceId), + String.format("%s.log", taskInstanceId)).toString(); + return TASK_INSTANCE_LOG_BASE_PATH + .resolve(DateUtils.format(taskFirstSubmitTime, DateConstants.YYYYMMDD, null)) + .resolve(taskLogFileName) .toString(); - // Optional.map will be skipped if null + } + + /** + * Get task instance log base absolute path, this is defined in logback.xml + * + * @return + */ + public static Path getTaskInstanceLogBasePath() { return Optional.of(LoggerFactory.getILoggerFactory()) .map(e -> (AppenderAttachable) (e.getLogger("ROOT"))) .map(e -> (SiftingAppender) (e.getAppender("TASKLOGFILE"))) .map(e -> ((TaskLogDiscriminator) (e.getDiscriminator()))) .map(TaskLogDiscriminator::getLogBase) - .map(e -> Paths.get(e) - .toAbsolutePath() - .resolve(DateUtils.format(firstSubmitTime, DateConstants.YYYYMMDD, null)) - .resolve(taskLogFileName)) - .map(Path::toString) - .orElse(""); - } - - public static String buildTaskId(Date firstSubmitTime, - Long processDefineCode, - int processDefineVersion, - int processInstId, - int taskId) { - // like TaskAppId=TASK-20211107-798_1-4084-15210 - String firstSubmitTimeStr = DateUtils.format(firstSubmitTime, DateConstants.YYYYMMDD, null); - return String.format("%s=%s-%s-%s_%s-%s-%s", - TaskConstants.TASK_APPID_LOG_FORMAT, TaskConstants.TASK_LOGGER_INFO_PREFIX, firstSubmitTimeStr, - processDefineCode, processDefineVersion, processInstId, taskId); + .map(e -> Paths.get(e).toAbsolutePath()) + .orElse(null); } public List getAppIdsFromAppInfoFile(@NonNull String appInfoPath) { @@ -177,17 +200,34 @@ public class LogUtils { return ""; } - public static void setWorkflowAndTaskInstanceIDMDC(Integer workflowInstanceId, Integer taskInstanceId) { - setWorkflowInstanceIdMDC(workflowInstanceId); - setTaskInstanceIdMDC(taskInstanceId); + public static String getTaskInstanceLogFullPathMdc() { + return MDC.get(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY); } - public static void setWorkflowInstanceIdMDC(Integer workflowInstanceId) { + public static MDCAutoClosableContext setTaskInstanceLogFullPathMDC(String taskInstanceLogFullPath) { + MDC.put(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY, taskInstanceLogFullPath); + return new MDCAutoClosableContext(LogUtils::removeTaskInstanceLogFullPathMDC); + } + + public static void removeTaskInstanceLogFullPathMDC() { + MDC.remove(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY); + } + + public static MDCAutoClosableContext setWorkflowAndTaskInstanceIDMDC(Integer workflowInstanceId, + Integer taskInstanceId) { MDC.put(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY, String.valueOf(workflowInstanceId)); + MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY, String.valueOf(taskInstanceId)); + return new MDCAutoClosableContext(LogUtils::removeWorkflowAndTaskInstanceIdMDC); } - public static void setTaskInstanceIdMDC(Integer taskInstanceId) { + public static MDCAutoClosableContext setWorkflowInstanceIdMDC(Integer workflowInstanceId) { + MDC.put(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY, String.valueOf(workflowInstanceId)); + return new MDCAutoClosableContext(LogUtils::removeWorkflowInstanceIdMDC); + } + + public static MDCAutoClosableContext setTaskInstanceIdMDC(Integer taskInstanceId) { MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY, String.valueOf(taskInstanceId)); + return new MDCAutoClosableContext(LogUtils::removeTaskInstanceIdMDC); } public static void removeWorkflowAndTaskInstanceIdMDC() { @@ -202,4 +242,15 @@ public class LogUtils { public static void removeTaskInstanceIdMDC() { MDC.remove(Constants.TASK_INSTANCE_ID_MDC_KEY); } + + @AllArgsConstructor + public static class MDCAutoClosableContext implements AutoCloseable { + + private final Runnable closeAction; + + @Override + public void close() { + closeAction.run(); + } + } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datafactory/src/main/java/org/apache/dolphinscheduler/plugin/task/datafactory/DatafactoryHook.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datafactory/src/main/java/org/apache/dolphinscheduler/plugin/task/datafactory/DatafactoryHook.java index 8467af86f9..8adc8accf7 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datafactory/src/main/java/org/apache/dolphinscheduler/plugin/task/datafactory/DatafactoryHook.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datafactory/src/main/java/org/apache/dolphinscheduler/plugin/task/datafactory/DatafactoryHook.java @@ -47,7 +47,7 @@ public class DatafactoryHook { public static DatafactoryStatus[] taskFinishFlags = {DatafactoryStatus.Failed, DatafactoryStatus.Succeeded, DatafactoryStatus.Cancelled}; protected final Logger log = - LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass())); + LoggerFactory.getLogger(DatafactoryHook.class); private final int QUERY_INTERVAL = PropertyUtils.getInt(TaskConstants.QUERY_INTERVAL, 10000); private DataFactoryManager client; private static AzureProfile profile; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncHook.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncHook.java index f8586179d2..a9f855503a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncHook.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncHook.java @@ -63,7 +63,7 @@ public class DatasyncHook { {TaskExecutionStatus.ERROR, TaskExecutionStatus.SUCCESS, TaskExecutionStatus.UNKNOWN_TO_SDK_VERSION}; public static TaskStatus[] taskFinishFlags = {TaskStatus.UNAVAILABLE, TaskStatus.UNKNOWN_TO_SDK_VERSION}; protected final Logger log = - LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass())); + LoggerFactory.getLogger(DatasyncHook.class); private DataSyncClient client; private String taskArn; private String taskExecArn; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHook.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHook.java index dfa1d3830b..40aa6a527d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHook.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsHook.java @@ -64,7 +64,7 @@ import com.amazonaws.services.databasemigrationservice.model.TestConnectionReque public class DmsHook { protected final Logger log = - LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass())); + LoggerFactory.getLogger(DmsHook.class); private AWSDatabaseMigrationService client; private String replicationTaskIdentifier; private String sourceEndpointArn; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowHelper.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowHelper.java index 302f23700b..a3f5d650f3 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowHelper.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowHelper.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.plugin.kubeflow; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.commons.lang3.StringUtils; @@ -39,7 +38,7 @@ import com.google.common.collect.Sets; public class KubeflowHelper { protected final Logger log = - LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass())); + LoggerFactory.getLogger(KubeflowHelper.class); private final String clusterConfigPath; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/test/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/test/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTaskTest.java index cf4fd789fe..3eed4c20c4 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/test/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/test/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTaskTest.java @@ -55,7 +55,6 @@ public class PigeonTaskTest { String taskParams = "{\"targetJobName\":\"mysql_elastic\"}"; taskExecutionContext = Mockito.mock(TaskExecutionContext.class); - Mockito.when(taskExecutionContext.getTaskLogName()).thenReturn("pigeonlogger"); Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(taskParams); Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp"); Mockito.when(taskExecutionContext.getTaskAppId()).thenReturn(UUID.randomUUID().toString()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/GitProjectManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/GitProjectManager.java index 5e4321cb4f..51d325be5f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/GitProjectManager.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/GitProjectManager.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.pytorch; import static org.apache.dolphinscheduler.plugin.task.api.AbstractShell.ExitCodeException; -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils; import java.io.File; @@ -27,17 +26,14 @@ import java.nio.file.Paths; import java.util.regex.Pattern; import lombok.Data; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; @Data +@Slf4j public class GitProjectManager { public static final String GIT_PATH_LOCAL = "GIT_PROJECT"; private static final Pattern GIT_CHECK_PATTERN = Pattern.compile("^(git@|https?://)"); - protected final Logger log = - LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass())); private String path; private String baseDir = "."; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/PipelineUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/PipelineUtils.java index 3ffc14d58c..ab3ba7f805 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/PipelineUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/PipelineUtils.java @@ -26,9 +26,7 @@ import java.util.List; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; import com.amazonaws.services.sagemaker.AmazonSageMaker; import com.amazonaws.services.sagemaker.model.DescribePipelineExecutionRequest; @@ -41,10 +39,9 @@ import com.amazonaws.services.sagemaker.model.StartPipelineExecutionResult; import com.amazonaws.services.sagemaker.model.StopPipelineExecutionRequest; import com.amazonaws.services.sagemaker.model.StopPipelineExecutionResult; +@Slf4j public class PipelineUtils { - protected final Logger log = - LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass())); private static final String EXECUTING = "Executing"; private static final String SUCCEEDED = "Succeeded"; diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 4841379de6..5e5d6219a4 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -147,14 +147,13 @@ public class WorkerServer implements IStoppable { int killNumber = 0; for (TaskExecutionContext taskRequest : taskRequests) { // kill task when it's not finished yet - try { - LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(), - taskRequest.getTaskInstanceId()); + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = + LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(), + taskRequest.getTaskInstanceId())) { if (ProcessUtils.kill(taskRequest)) { killNumber++; } - } finally { - LogUtils.removeWorkflowAndTaskInstanceIdMDC(); } } log.info("Worker after kill all cache task, task size: {}, killed number: {}", taskRequests.size(), diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java index d5f9fe544d..9d01e8e277 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java @@ -95,14 +95,14 @@ public class TaskDispatchProcessor implements NettyRequestProcessor { log.error("task execution context is null"); return; } - try { - LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId()); + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC( + taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId())) { TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType()); // set cache, it will be used when kill task TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); taskExecutionContext.setHost(workerConfig.getWorkerAddress()); - taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); + taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext)); // delay task process long remainTime = @@ -137,8 +137,6 @@ public class TaskDispatchProcessor implements NettyRequestProcessor { log.info("Submit task to wait queue success, current queue size is {}", workerManager.getWaitSubmitQueueSize()); } - } finally { - LogUtils.removeWorkflowAndTaskInstanceIdMDC(); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java index 583f1713a2..92c9baf718 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java @@ -51,7 +51,7 @@ public class DefaultWorkerDelayTaskExecuteRunnable extends WorkerDelayTaskExecut @Override public void executeTask(TaskCallBack taskCallBack) throws TaskException { if (task == null) { - throw new TaskException("The task plugin instance is not initialized"); + throw new IllegalArgumentException("The task plugin instance is not initialized"); } task.handle(taskCallBack); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java index 13dc72bfa1..506325e485 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.worker.runner; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; @@ -26,16 +25,12 @@ import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; import lombok.Builder; +import lombok.extern.slf4j.Slf4j; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +@Slf4j @Builder public class TaskCallbackImpl implements TaskCallBack { - protected final Logger log = - LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, TaskCallbackImpl.class)); - private final WorkerMessageSender workerMessageSender; private final String masterAddress; diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java index 22a10124b0..c018fe094f 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java @@ -17,19 +17,18 @@ package org.apache.dolphinscheduler.server.worker.runner; +import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER; import static org.apache.dolphinscheduler.common.constants.Constants.DRY_RUN_FLAG_YES; import static org.apache.dolphinscheduler.common.constants.Constants.SINGLE_SLASH; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils; -import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils; import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; @@ -65,8 +64,7 @@ import com.google.common.base.Strings; public abstract class WorkerTaskExecuteRunnable implements Runnable { - protected final Logger log = LoggerFactory - .getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, WorkerTaskExecuteRunnable.class)); + protected static final Logger log = LoggerFactory.getLogger(WorkerTaskExecuteRunnable.class); protected final TaskExecutionContext taskExecutionContext; protected final WorkerConfig workerConfig; @@ -93,14 +91,6 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { this.workerRpcClient = workerRpcClient; this.taskPluginManager = taskPluginManager; this.storageOperate = storageOperate; - String taskLogName = - LogUtils.buildTaskId(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()), - taskExecutionContext.getProcessDefineCode(), - taskExecutionContext.getProcessDefineVersion(), - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId()); - taskExecutionContext.setTaskLogName(taskLogName); - log.info("Set task log name: {}", taskLogName); } protected abstract void executeTask(TaskCallBack taskCallBack); @@ -117,7 +107,6 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { log.info("Remove the current task execute context from worker cache"); clearTaskExecPathIfNeeded(); - sendTaskLogOnWorkerToRemoteIfNeeded(); } protected void afterThrowing(Throwable throwable) throws TaskException { @@ -130,7 +119,6 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { "Get a exception when execute the task, will send the task execute result to master, the current task execute result is {}", TaskExecutionStatus.FAILURE); - sendTaskLogOnWorkerToRemoteIfNeeded(); } public void cancelTask() { @@ -149,13 +137,11 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { @Override public void run() { - try { - // set the thread name to make sure the log be written to the task log file - Thread.currentThread().setName(taskExecutionContext.getTaskLogName()); - - LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId()); - + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC( + taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext1 = + LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath())) { log.info("\n{}", TaskInstanceLogHeader.INITIALIZE_TASK_CONTEXT_HEADER); initializeTask(); @@ -183,12 +169,11 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { log.info("\n{}", TaskInstanceLogHeader.FINALIZE_TASK_HEADER); afterExecute(); - + closeLogAppender(); } catch (Throwable ex) { log.error("Task execute failed, due to meet an exception", ex); afterThrowing(ex); - } finally { - LogUtils.removeWorkflowAndTaskInstanceIdMDC(); + closeLogAppender(); } } @@ -279,18 +264,6 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { taskExecutionContext.getCurrentExecutionStatus()); } - protected void sendTaskLogOnWorkerToRemoteIfNeeded() { - if (taskExecutionContext.isLogBufferEnable()) { - return; - } - - if (RemoteLogUtils.isRemoteLoggingEnable()) { - RemoteLogUtils.sendRemoteLog(taskExecutionContext.getLogPath()); - log.info("Worker sends task log {} to remote storage asynchronously.", - taskExecutionContext.getLogPath()); - } - } - protected void clearTaskExecPathIfNeeded() { String execLocalPath = taskExecutionContext.getExecutePath(); if (!CommonUtils.isDevelopMode()) { @@ -325,6 +298,30 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { } } + protected void writePodLodIfNeeded() { + if (null == taskExecutionContext.getK8sTaskExecutionContext()) { + return; + } + log.info("The current task is k8s task, begin to write pod log"); + ProcessUtils.getPodLog(taskExecutionContext.getK8sTaskExecutionContext(), taskExecutionContext.getTaskAppId()); + } + + protected void closeLogAppender() { + try { + writePodLodIfNeeded(); + if (RemoteLogUtils.isRemoteLoggingEnable()) { + RemoteLogUtils.sendRemoteLog(taskExecutionContext.getLogPath()); + log.info("Log handler sends task log {} to remote storage asynchronously.", + taskExecutionContext.getLogPath()); + } + } catch (Exception ex) { + log.error("Write k8s pod log failed", ex); + } finally { + log.info(FINALIZE_SESSION_MARKER, FINALIZE_SESSION_MARKER.toString()); + + } + } + public @NonNull TaskExecutionContext getTaskExecutionContext() { return taskExecutionContext; } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java index 364e1d4b67..daa2af5db5 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java @@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; @@ -40,17 +39,15 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; + import org.zeroturnaround.zip.ZipUtil; import com.fasterxml.jackson.databind.JsonNode; +@Slf4j public class TaskFilesTransferUtils { - protected final static Logger log = LoggerFactory - .getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, TaskFilesTransferUtils.class)); - // tmp path in local path for transfer final static String DOWNLOAD_TMP = ".DT_TMP"; diff --git a/dolphinscheduler-worker/src/main/resources/logback-spring.xml b/dolphinscheduler-worker/src/main/resources/logback-spring.xml index 8be620d7aa..d63ea4a0b5 100644 --- a/dolphinscheduler-worker/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-worker/src/main/resources/logback-spring.xml @@ -33,12 +33,12 @@ - taskAppId + taskInstanceLogFullPath ${log.base} - - ${log.base}/${taskAppId}.log + + ${taskInstanceLogFullPath} [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} - %message%n diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java index 4063f14cac..6ebdf6f3ec 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java @@ -53,7 +53,6 @@ public class DefaultWorkerDelayTaskExecuteRunnableTest { .taskInstanceId(0) .processDefineId(0) .firstSubmitTime(System.currentTimeMillis()) - .taskLogName("TestLogName") .build(); WorkerTaskExecuteRunnable workerTaskExecuteRunnable = new DefaultWorkerDelayTaskExecuteRunnable( taskExecutionContext, @@ -76,7 +75,6 @@ public class DefaultWorkerDelayTaskExecuteRunnableTest { .taskInstanceId(0) .processDefineId(0) .firstSubmitTime(System.currentTimeMillis()) - .taskLogName("TestLogName") .taskType("SQL") .taskParams( "{\"localParams\":[],\"resourceList\":[],\"type\":\"POSTGRESQL\",\"datasource\":null,\"sql\":\"select * from t_ds_user\",\"sqlType\":\"0\",\"preStatements\":[],\"postStatements\":[],\"segmentSeparator\":\"\",\"displayRows\":10,\"conditionResult\":\"null\",\"dependence\":\"null\",\"switchResult\":\"null\",\"waitStartTimeout\":null}") diff --git a/dolphinscheduler-worker/src/test/resources/logback.xml b/dolphinscheduler-worker/src/test/resources/logback.xml new file mode 100644 index 0000000000..d63ea4a0b5 --- /dev/null +++ b/dolphinscheduler-worker/src/test/resources/logback.xml @@ -0,0 +1,78 @@ + + + + + + + + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n + + UTF-8 + + + + + + + + taskInstanceLogFullPath + ${log.base} + + + + ${taskInstanceLogFullPath} + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} - %message%n + + UTF-8 + + true + + + + + ${log.base}/dolphinscheduler-worker.log + + ${log.base}/dolphinscheduler-worker.%d{yyyy-MM-dd_HH}.%i.log + 168 + 200MB + 50GB + true + + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n + + UTF-8 + + + + + + + + + + + + +