diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index 4c8edc6dac..2501e1e824 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -201,7 +201,7 @@ public abstract class AbstractCommandExecutor { // Wait the task log process finished. taskOutputFuture.get(); } catch (ExecutionException e) { - logger.info("Handle task log error", e); + logger.error("Handle task log error", e); } } @@ -272,7 +272,7 @@ public abstract class AbstractCommandExecutor { ThreadUtils.sleep(SLEEP_TIME_MILLIS * 5L); try ( LogWatch watcher = ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(), - taskRequest.getTaskAppId())) { + taskRequest.getTaskAppId(), "")) { if (watcher == null) { throw new RuntimeException("The driver pod does not exist."); } else { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java index 92aa6cb447..ac1ce69f76 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java @@ -17,9 +17,11 @@ package org.apache.dolphinscheduler.plugin.task.api.am; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.SLEEP_TIME_MILLIS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME; import org.apache.dolphinscheduler.common.enums.ResourceManagerType; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; @@ -54,6 +56,8 @@ public class KubernetesApplicationManager implements ApplicationManager { private static final String FAILED = "Failed"; private static final String UNKNOWN = "Unknown"; + private static final int MAX_RETRY_TIMES = 10; + /** * cache k8s client for same task */ @@ -67,7 +71,7 @@ public class KubernetesApplicationManager implements ApplicationManager { boolean isKill; String labelValue = kubernetesApplicationManagerContext.getLabelValue(); FilterWatchListDeletable watchList = - getDriverPod(kubernetesApplicationManagerContext); + getListenPod(kubernetesApplicationManagerContext); try { if (getApplicationStatus(kubernetesApplicationManagerContext, watchList).isFailure()) { log.error("Driver pod is in FAILED or UNKNOWN status."); @@ -97,16 +101,24 @@ public class KubernetesApplicationManager implements ApplicationManager { * @param kubernetesApplicationManagerContext * @return */ - private FilterWatchListDeletable getDriverPod(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { + private FilterWatchListDeletable getListenPod(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { KubernetesClient client = getClient(kubernetesApplicationManagerContext); String labelValue = kubernetesApplicationManagerContext.getLabelValue(); - FilterWatchListDeletable watchList = client.pods() - .inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace()) - .withLabel(UNIQUE_LABEL_NAME, labelValue); - List podList = watchList.list().getItems(); - if (podList.size() != 1) { - log.warn("Expected driver pod 1, but get {}.", podList.size()); + List podList = null; + FilterWatchListDeletable watchList = null; + int retryTimes = 0; + while (CollectionUtils.isEmpty(podList) && retryTimes < MAX_RETRY_TIMES) { + watchList = client.pods() + .inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace()) + .withLabel(UNIQUE_LABEL_NAME, labelValue); + podList = watchList.list().getItems(); + if (!CollectionUtils.isEmpty(podList)) { + break; + } + ThreadUtils.sleep(SLEEP_TIME_MILLIS); + retryTimes += 1; } + return watchList; } @@ -153,7 +165,7 @@ public class KubernetesApplicationManager implements ApplicationManager { String phase; try { if (Objects.isNull(watchList)) { - watchList = getDriverPod(kubernetesApplicationManagerContext); + watchList = getListenPod(kubernetesApplicationManagerContext); } List driverPod = watchList.list().getItems(); if (!driverPod.isEmpty()) { @@ -180,16 +192,27 @@ public class KubernetesApplicationManager implements ApplicationManager { */ public LogWatch getPodLogWatcher(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { KubernetesClient client = getClient(kubernetesApplicationManagerContext); - FilterWatchListDeletable watchList = - getDriverPod(kubernetesApplicationManagerContext); - List driverPod = watchList.list().getItems(); - if (CollectionUtils.isEmpty(driverPod)) { - return null; + boolean podIsReady = false; + Pod pod = null; + while (!podIsReady) { + FilterWatchListDeletable watchList = + getListenPod(kubernetesApplicationManagerContext); + List podList = watchList == null ? null : watchList.list().getItems(); + if (CollectionUtils.isEmpty(podList)) { + return null; + } + pod = podList.get(0); + String phase = pod.getStatus().getPhase(); + if (phase.equals(PENDING) || phase.equals(UNKNOWN)) { + ThreadUtils.sleep(SLEEP_TIME_MILLIS); + } else { + podIsReady = true; + } } - Pod driver = driverPod.get(0); - return client.pods().inNamespace(driver.getMetadata().getNamespace()) - .withName(driver.getMetadata().getName()) + return client.pods().inNamespace(pod.getMetadata().getNamespace()) + .withName(pod.getMetadata().getName()) + .inContainer(kubernetesApplicationManagerContext.getContainerName()) .watchLog(); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java index 19dd223477..2a37d16d35 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java @@ -32,8 +32,13 @@ public class KubernetesApplicationManagerContext implements ApplicationManagerCo private final K8sTaskExecutionContext k8sTaskExecutionContext; /** - * driver pod label value + * pod label value */ private final String labelValue; + /** + * container name (optional) + */ + private final String containerName; + } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java index 6f08dde14d..0dbff0ba33 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java @@ -48,7 +48,7 @@ public abstract class AbstractK8sTask extends AbstractRemoteTask { setExitStatusCode(response.getExitStatusCode()); setAppIds(response.getAppIds()); } catch (Exception e) { - log.error("k8s task submit failed with error", e); + log.error("k8s task submit failed with error"); exitStatusCode = -1; throw new TaskException("Execute k8s task error", e); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java index 6bdf8d39ed..045af3a484 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.plugin.task.api.k8s; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; - import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; @@ -32,14 +30,12 @@ public abstract class AbstractK8sTaskExecutor { protected Logger log; protected TaskExecutionContext taskRequest; protected K8sUtils k8sUtils; - protected StringBuilder logStringBuffer; protected Yaml yaml; protected AbstractK8sTaskExecutor(Logger log, TaskExecutionContext taskRequest) { this.log = log; this.taskRequest = taskRequest; this.k8sUtils = new K8sUtils(); - this.logStringBuffer = new StringBuilder(); this.yaml = new Yaml(); } @@ -53,14 +49,6 @@ public abstract class AbstractK8sTaskExecutor { } } - public void flushLog(TaskResponse taskResponse) { - if (logStringBuffer.length() != 0 && taskResponse.getExitStatusCode() == EXIT_CODE_FAILURE) { - log.error(logStringBuffer.toString()); - } else if (logStringBuffer.length() != 0) { - log.info(logStringBuffer.toString()); - } - } - public abstract void submitJob2k8s(String k8sParameterStr); public abstract void stopJobOnK8s(String k8sParameterStr); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index f357713d39..aecb8b1aac 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -30,7 +30,9 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.MI; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAME_LABEL; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RESTART_POLICY; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_INSTANCE_ID; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; @@ -41,16 +43,23 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils; import org.apache.commons.lang3.StringUtils; +import java.io.BufferedReader; +import java.io.InputStreamReader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -67,6 +76,7 @@ import io.fabric8.kubernetes.api.model.batch.v1.JobStatus; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; +import io.fabric8.kubernetes.client.dsl.LogWatch; /** * K8sTaskExecutor used to submit k8s task to K8S @@ -74,6 +84,9 @@ import io.fabric8.kubernetes.client.WatcherException; public class K8sTaskExecutor extends AbstractK8sTaskExecutor { private Job job; + protected boolean podLogOutputIsFinished = false; + protected Future podLogOutputFuture; + public K8sTaskExecutor(Logger logger, TaskExecutionContext taskRequest) { super(logger, taskRequest); } @@ -100,6 +113,8 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { Map labelMap = k8STaskMainParameters.getLabelMap(); labelMap.put(LAYER_LABEL, LAYER_LABEL_VALUE); labelMap.put(NAME_LABEL, k8sJobName); + Map podLabelMap = new HashMap<>(); + podLabelMap.put(UNIQUE_LABEL_NAME, taskRequest.getTaskAppId()); EnvVar taskInstanceIdVar = new EnvVar(TASK_INSTANCE_ID, taskInstanceId, null); List envVars = new ArrayList<>(); envVars.add(taskInstanceIdVar); @@ -150,6 +165,9 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { .withNewSpec() .withTtlSecondsAfterFinished(JOB_TTL_SECONDS) .withNewTemplate() + .withNewMetadata() + .withLabels(podLabelMap) + .endMetadata() .withNewSpec() .addNewContainer() .withName(k8sJobName) @@ -170,36 +188,36 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { return jobBuilder.build(); } - public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse taskResponse, - K8sTaskMainParameters k8STaskMainParameters) { + public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse taskResponse) { CountDownLatch countDownLatch = new CountDownLatch(1); Watcher watcher = new Watcher() { @Override public void eventReceived(Action action, Job job) { - log.info("event received : job:{} action:{}", job.getMetadata().getName(), action); - if (action != Action.ADDED) { - int jobStatus = getK8sJobStatus(job); - log.info("job {} status {}", job.getMetadata().getName(), jobStatus); - if (jobStatus == TaskConstants.RUNNING_CODE) { - return; + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = + LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath())) { + log.info("event received : job:{} action:{}", job.getMetadata().getName(), action); + if (action != Action.ADDED) { + int jobStatus = getK8sJobStatus(job); + log.info("job {} status {}", job.getMetadata().getName(), jobStatus); + if (jobStatus == TaskConstants.RUNNING_CODE) { + return; + } + setTaskStatus(jobStatus, taskInstanceId, taskResponse); + countDownLatch.countDown(); } - setTaskStatus(jobStatus, taskInstanceId, taskResponse, k8STaskMainParameters); - countDownLatch.countDown(); } } @Override public void onClose(WatcherException e) { - logStringBuffer.append(String.format("[K8sJobExecutor-%s] fail in k8s: %s", job.getMetadata().getName(), - e.getMessage())); + log.error("[K8sJobExecutor-{}] fail in k8s: {}", job.getMetadata().getName(), e.getMessage()); taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); countDownLatch.countDown(); } }; - Watch watch = null; - try { - watch = k8sUtils.createBatchJobWatcher(job.getMetadata().getName(), watcher); + try (Watch watch = k8sUtils.createBatchJobWatcher(job.getMetadata().getName(), watcher)) { boolean timeoutFlag = taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED; if (timeoutFlag) { @@ -208,7 +226,6 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { } else { countDownLatch.await(); } - flushLog(taskResponse); } catch (InterruptedException e) { log.error("job failed in k8s: {}", e.getMessage(), e); Thread.currentThread().interrupt(); @@ -216,19 +233,42 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { } catch (Exception e) { log.error("job failed in k8s: {}", e.getMessage(), e); taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); - } finally { - if (watch != null) { - watch.close(); - } } } + private void parsePodLogOutput() { + ExecutorService collectPodLogExecutorService = ThreadUtils + .newSingleDaemonScheduledExecutorService("CollectPodLogOutput-thread-" + taskRequest.getTaskName()); + + String taskInstanceId = String.valueOf(taskRequest.getTaskInstanceId()); + String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT); + String containerName = String.format("%s-%s", taskName, taskInstanceId); + podLogOutputFuture = collectPodLogExecutorService.submit(() -> { + try ( + final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = + LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); + LogWatch watcher = ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(), + taskRequest.getTaskAppId(), containerName)) { + String line; + try (BufferedReader reader = new BufferedReader(new InputStreamReader(watcher.getOutput()))) { + while ((line = reader.readLine()) != null) { + log.info("[K8S-pod-log] {}", line); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + podLogOutputIsFinished = true; + } + }); + + collectPodLogExecutorService.shutdown(); + } + @Override public TaskResponse run(String k8sParameterStr) throws Exception { TaskResponse result = new TaskResponse(); int taskInstanceId = taskRequest.getTaskInstanceId(); - K8sTaskMainParameters k8STaskMainParameters = - JSONUtils.parseObject(k8sParameterStr, K8sTaskMainParameters.class); try { if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) { result.setExitStatusCode(EXIT_CODE_KILL); @@ -242,9 +282,20 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { String configYaml = k8sTaskExecutionContext.getConfigYaml(); k8sUtils.buildClient(configYaml); submitJob2k8s(k8sParameterStr); - registerBatchJobWatcher(job, Integer.toString(taskInstanceId), result, k8STaskMainParameters); + parsePodLogOutput(); + registerBatchJobWatcher(job, Integer.toString(taskInstanceId), result); + + if (podLogOutputFuture != null) { + try { + // Wait kubernetes pod log collection finished + podLogOutputFuture.get(); + } catch (ExecutionException e) { + log.error("Handle pod log error", e); + } + } } catch (Exception e) { cancelApplication(k8sParameterStr); + Thread.currentThread().interrupt(); result.setExitStatusCode(EXIT_CODE_FAILURE); throw e; } @@ -270,9 +321,9 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { stopJobOnK8s(k8sParameterStr); String namespaceName = k8STaskMainParameters.getNamespaceName(); k8sUtils.createJob(namespaceName, job); - log.info("[K8sJobExecutor-{}-{}] submitted job successfully", taskName, taskInstanceId); + log.info("[K8sJobExecutor-{}-{}] submitted job successfully", taskName, taskInstanceId); } catch (Exception e) { - log.error("[K8sJobExecutor-{}-{}] fail to submit job", taskName, taskInstanceId); + log.error("[K8sJobExecutor-{}-{}] fail to submit job", taskName, taskInstanceId); throw new TaskException("K8sJobExecutor fail to submit job", e); } } @@ -288,7 +339,7 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { k8sUtils.deleteJob(jobName, namespaceName); } } catch (Exception e) { - log.error("[K8sJobExecutor-{}] fail to stop job", jobName); + log.error("[K8sJobExecutor-{}] fail to stop job", jobName); throw new TaskException("K8sJobExecutor fail to stop job", e); } } @@ -304,21 +355,16 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { } } - public void setTaskStatus(int jobStatus, String taskInstanceId, TaskResponse taskResponse, - K8sTaskMainParameters k8STaskMainParameters) { + public void setTaskStatus(int jobStatus, String taskInstanceId, TaskResponse taskResponse) { if (jobStatus == EXIT_CODE_SUCCESS || jobStatus == EXIT_CODE_FAILURE) { if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(Integer.valueOf(taskInstanceId))) { - logStringBuffer.append(String.format("[K8sJobExecutor-%s] killed", job.getMetadata().getName())); + log.info("[K8sJobExecutor-{}] killed", job.getMetadata().getName()); taskResponse.setExitStatusCode(EXIT_CODE_KILL); } else if (jobStatus == EXIT_CODE_SUCCESS) { - logStringBuffer - .append(String.format("[K8sJobExecutor-%s] succeed in k8s", job.getMetadata().getName())); + log.info("[K8sJobExecutor-{}] succeed in k8s", job.getMetadata().getName()); taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS); } else { - String errorMessage = - k8sUtils.getPodLog(job.getMetadata().getName(), k8STaskMainParameters.getNamespaceName()); - logStringBuffer.append(String.format("[K8sJobExecutor-%s] fail in k8s: %s", job.getMetadata().getName(), - errorMessage)); + log.error("[K8sJobExecutor-{}] fail in k8s", job.getMetadata().getName()); taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java index 130159137c..f0c3aa0f9a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java @@ -142,12 +142,14 @@ public final class ProcessUtils { */ public static void cancelApplication(TaskExecutionContext taskExecutionContext) { try { - if (Objects.nonNull(taskExecutionContext.getK8sTaskExecutionContext()) && - !TASK_TYPE_SET_K8S.contains(taskExecutionContext.getTaskType())) { - applicationManagerMap.get(ResourceManagerType.KUBERNETES) - .killApplication(new KubernetesApplicationManagerContext( - taskExecutionContext.getK8sTaskExecutionContext(), - taskExecutionContext.getTaskAppId())); + if (Objects.nonNull(taskExecutionContext.getK8sTaskExecutionContext())) { + if (!TASK_TYPE_SET_K8S.contains(taskExecutionContext.getTaskType())) { + // Set empty container name for Spark on K8S task + applicationManagerMap.get(ResourceManagerType.KUBERNETES) + .killApplication(new KubernetesApplicationManagerContext( + taskExecutionContext.getK8sTaskExecutionContext(), + taskExecutionContext.getTaskAppId(), "")); + } } else { String host = taskExecutionContext.getHost(); String executePath = taskExecutionContext.getExecutePath(); @@ -197,7 +199,7 @@ public final class ProcessUtils { KubernetesApplicationManager applicationManager = (KubernetesApplicationManager) applicationManagerMap.get(ResourceManagerType.KUBERNETES); return applicationManager - .getApplicationStatus(new KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId)); + .getApplicationStatus(new KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId, "")); } /** @@ -207,12 +209,14 @@ public final class ProcessUtils { * @param taskAppId * @return */ - public static LogWatch getPodLogWatcher(K8sTaskExecutionContext k8sTaskExecutionContext, String taskAppId) { + public static LogWatch getPodLogWatcher(K8sTaskExecutionContext k8sTaskExecutionContext, String taskAppId, + String containerName) { KubernetesApplicationManager applicationManager = (KubernetesApplicationManager) applicationManagerMap.get(ResourceManagerType.KUBERNETES); return applicationManager - .getPodLogWatcher(new KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId)); + .getPodLogWatcher( + new KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId, containerName)); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java index 9931a33ae5..46b226268c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java @@ -34,6 +34,8 @@ import java.util.Map; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.NodeSelectorRequirement; import io.fabric8.kubernetes.api.model.batch.v1.Job; @@ -41,6 +43,8 @@ import io.fabric8.kubernetes.api.model.batch.v1.JobStatus; public class K8sTaskExecutorTest { + private static final Logger logger = LoggerFactory.getLogger(K8sTaskExecutorTest.class); + private K8sTaskExecutor k8sTaskExecutor = null; private K8sTaskMainParameters k8sTaskMainParameters = null; private final String image = "ds-dev"; @@ -66,7 +70,7 @@ public class K8sTaskExecutorTest { requirement.setKey("node-label"); requirement.setOperator("In"); requirement.setValues(Arrays.asList("1234", "123456")); - k8sTaskExecutor = new K8sTaskExecutor(null, taskRequest); + k8sTaskExecutor = new K8sTaskExecutor(logger, taskRequest); k8sTaskMainParameters = new K8sTaskMainParameters(); k8sTaskMainParameters.setImage(image); k8sTaskMainParameters.setImagePullPolicy(imagePullPolicy); @@ -90,9 +94,8 @@ public class K8sTaskExecutorTest { public void testSetTaskStatusNormal() { int jobStatus = 0; TaskResponse taskResponse = new TaskResponse(); - K8sTaskMainParameters k8STaskMainParameters = new K8sTaskMainParameters(); k8sTaskExecutor.setJob(job); - k8sTaskExecutor.setTaskStatus(jobStatus, String.valueOf(taskInstanceId), taskResponse, k8STaskMainParameters); + k8sTaskExecutor.setTaskStatus(jobStatus, String.valueOf(taskInstanceId), taskResponse); Assertions.assertEquals(0, Integer.compare(EXIT_CODE_KILL, taskResponse.getExitStatusCode())); } @Test