diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index 167cae8668..986c9dc8a7 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -205,6 +205,8 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { @Override public void eventReceived(Action action, Job job) { try { + LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(), + taskRequest.getTaskInstanceId()); LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); log.info("event received : job:{} action:{}", job.getMetadata().getName(), action); if (action == Action.DELETED) { @@ -222,14 +224,18 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { } } finally { LogUtils.removeTaskInstanceLogFullPathMDC(); + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); } } @Override public void onClose(WatcherException e) { + LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(), + taskRequest.getTaskInstanceId()); log.error("[K8sJobExecutor-{}] fail in k8s: {}", job.getMetadata().getName(), e.getMessage()); taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); countDownLatch.countDown(); + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); } }; try (Watch watch = k8sUtils.createBatchJobWatcher(job.getMetadata().getName(), watcher)) { @@ -260,10 +266,12 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { String containerName = String.format("%s-%s", taskName, taskInstanceId); podLogOutputFuture = collectPodLogExecutorService.submit(() -> { TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); + LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(), + taskRequest.getTaskInstanceId()); + LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); try ( LogWatch watcher = ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId(), containerName)) { - LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); String line; try (BufferedReader reader = new BufferedReader(new InputStreamReader(watcher.getOutput()))) { while ((line = reader.readLine()) != null) {