diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index af7190b9d4..115a4317cc 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.plugin.task.api; +import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING; +import static org.apache.dolphinscheduler.common.constants.Constants.SLEEP_TIME_MILLIS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL; @@ -40,6 +42,7 @@ import java.io.InputStreamReader; import java.lang.reflect.Field; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -54,6 +57,7 @@ import java.util.regex.Pattern; import org.slf4j.Logger; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.fabric8.kubernetes.client.dsl.LogWatch; /** * abstract command executor @@ -86,7 +90,9 @@ public abstract class AbstractCommandExecutor { */ protected LinkedBlockingQueue logBuffer; - protected boolean logOutputIsSuccess = false; + protected boolean processLogOutputIsSuccess = false; + + protected boolean podLogOutputIsFinished = false; /* * SHELL result string @@ -100,6 +106,8 @@ public abstract class AbstractCommandExecutor { protected Future taskOutputFuture; + protected Future podLogOutputFuture; + public AbstractCommandExecutor(Consumer> logHandler, TaskExecutionContext taskRequest, Logger logger) { @@ -107,6 +115,7 @@ public abstract class AbstractCommandExecutor { this.taskRequest = taskRequest; this.logger = logger; this.logBuffer = new LinkedBlockingQueue<>(); + this.logBuffer.add(EMPTY_STRING); if (this.taskRequest != null) { // set logBufferEnable=true if the task uses logHandler and logBuffer to buffer log messages @@ -215,6 +224,9 @@ public abstract class AbstractCommandExecutor { // parse process output parseProcessOutput(process); + // collect pod log + collectPodLogIfNeeded(); + int processId = getProcessId(process); result.setProcessId(processId); @@ -251,6 +263,17 @@ public abstract class AbstractCommandExecutor { } } + if (podLogOutputFuture != null) { + try { + // Wait kubernetes pod log collection finished + podLogOutputFuture.get(); + // delete pod after successful execution and log collection + ProcessUtils.cancelApplication(taskRequest); + } catch (ExecutionException e) { + logger.error("Handle pod log error", e); + } + } + TaskExecutionStatus kubernetesStatus = ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId()); @@ -296,6 +319,45 @@ public abstract class AbstractCommandExecutor { logger.info("task run command: {}", String.join(" ", commands)); } + private void collectPodLogIfNeeded() { + if (null == taskRequest.getK8sTaskExecutionContext()) { + podLogOutputIsFinished = true; + return; + } + + // wait for launching (driver) pod + ThreadUtils.sleep(SLEEP_TIME_MILLIS * 5L); + LogWatch watcher = + ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId()); + if (watcher != null) { + ExecutorService collectPodLogExecutorService = ThreadUtils + .newSingleDaemonScheduledExecutorService("CollectPodLogOutput-thread-" + taskRequest.getTaskName()); + + podLogOutputFuture = collectPodLogExecutorService.submit(() -> { + try { + String line; + try (BufferedReader reader = new BufferedReader(new InputStreamReader(watcher.getOutput()))) { + while ((line = reader.readLine()) != null) { + logBuffer.add(String.format("[K8S-pod-log-%s]: %s", taskRequest.getTaskName(), line)); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + watcher.close(); + podLogOutputIsFinished = true; + } + }); + + collectPodLogExecutorService.shutdown(); + } else { + CompletableFuture exceptionalFuture = new CompletableFuture<>(); + exceptionalFuture.completeExceptionally(new RuntimeException("The driver pod does not exist.")); + podLogOutputFuture = exceptionalFuture; + podLogOutputIsFinished = true; + } + } + private void parseProcessOutput(Process process) { // todo: remove this this thread pool. ExecutorService getOutputLogService = ThreadUtils @@ -315,10 +377,10 @@ public abstract class AbstractCommandExecutor { taskResultString = line; } } - logOutputIsSuccess = true; + processLogOutputIsSuccess = true; } catch (Exception e) { logger.error("Parse var pool error", e); - logOutputIsSuccess = true; + processLogOutputIsSuccess = true; } }); @@ -330,10 +392,11 @@ public abstract class AbstractCommandExecutor { try ( final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());) { - while (!logBuffer.isEmpty() || !logOutputIsSuccess) { - if (!logBuffer.isEmpty()) { + while (logBuffer.size() > 1 || !processLogOutputIsSuccess || !podLogOutputIsFinished) { + if (logBuffer.size() > 1) { logHandler.accept(logBuffer); logBuffer.clear(); + logBuffer.add(EMPTY_STRING); } else { Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java index e29df64bfe..f06e07a8db 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java @@ -41,6 +41,7 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; +import io.fabric8.kubernetes.client.dsl.LogWatch; @Slf4j @AutoService(ApplicationManager.class) @@ -170,27 +171,23 @@ public class KubernetesApplicationManager implements ApplicationManager { } /** - * collect pod's log + * get pod's log watcher * * @param kubernetesApplicationManagerContext * @return */ - public String collectPodLog(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { + public LogWatch getPodLogWatcher(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { KubernetesClient client = getClient(kubernetesApplicationManagerContext); FilterWatchListDeletable watchList = getDriverPod(kubernetesApplicationManagerContext); List driverPod = watchList.list().getItems(); if (CollectionUtils.isEmpty(driverPod)) { - return "The driver pod does not exist."; + return null; } Pod driver = driverPod.get(0); - String driverPodName = driver.getMetadata().getName(); - String logs = client.pods() - .inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace()) - .withName(driverPodName).getLog(); - - // delete driver pod only after successful execution - killApplication(kubernetesApplicationManagerContext); - return logs; + + return client.pods().inNamespace(driver.getMetadata().getNamespace()) + .withName(driver.getMetadata().getName()) + .watchLog(); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java index e260277020..db310d70ed 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java @@ -49,6 +49,7 @@ import java.util.regex.Pattern; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import io.fabric8.kubernetes.client.dsl.LogWatch; @Slf4j public final class ProcessUtils { @@ -204,12 +205,12 @@ public final class ProcessUtils { * @param taskAppId * @return */ - public static String getPodLog(K8sTaskExecutionContext k8sTaskExecutionContext, String taskAppId) { + public static LogWatch getPodLogWatcher(K8sTaskExecutionContext k8sTaskExecutionContext, String taskAppId) { KubernetesApplicationManager applicationManager = (KubernetesApplicationManager) applicationManagerMap.get(ResourceManagerType.KUBERNETES); return applicationManager - .collectPodLog(new KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId)); + .getPodLogWatcher(new KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId)); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/pom.xml index 9cae62ad94..d0778c02d0 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/pom.xml +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/pom.xml @@ -54,5 +54,9 @@ druid + + io.fabric8 + kubernetes-client + diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java index f98390bad2..dc513226ab 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java @@ -293,27 +293,17 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { } } - protected void writePodLodIfNeeded() { - if (null == taskExecutionContext.getK8sTaskExecutionContext()) { - return; - } - log.info("The current task is k8s task, begin to write pod log"); - ProcessUtils.getPodLog(taskExecutionContext.getK8sTaskExecutionContext(), taskExecutionContext.getTaskAppId()); - } - protected void closeLogAppender() { try { - writePodLodIfNeeded(); if (RemoteLogUtils.isRemoteLoggingEnable()) { RemoteLogUtils.sendRemoteLog(taskExecutionContext.getLogPath()); log.info("Log handler sends task log {} to remote storage asynchronously.", taskExecutionContext.getLogPath()); } } catch (Exception ex) { - log.error("Write k8s pod log failed", ex); + log.error("Send remote log failed", ex); } finally { log.info(FINALIZE_SESSION_MARKER, FINALIZE_SESSION_MARKER.toString()); - } }