diff --git a/deploy/kubernetes/dolphinscheduler/values.yaml b/deploy/kubernetes/dolphinscheduler/values.yaml index 329511a226..20527f9e39 100644 --- a/deploy/kubernetes/dolphinscheduler/values.yaml +++ b/deploy/kubernetes/dolphinscheduler/values.yaml @@ -313,7 +313,7 @@ master: MASTER_MAX_CPU_LOAD_AVG: "-1" MASTER_RESERVED_MEMORY: "0.3" MASTER_FAILOVER_INTERVAL: "10m" - MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER: "true" + MASTER_KILL_APPLICATION_WHEN_HANDLE_FAILOVER: "true" worker: ## PodManagementPolicy controls how pods are created during initial scale up, when replacing pods on nodes, or when scaling down. diff --git a/docs/docs/en/architecture/configuration.md b/docs/docs/en/architecture/configuration.md index 80cf3060e8..3821c7fefd 100644 --- a/docs/docs/en/architecture/configuration.md +++ b/docs/docs/en/architecture/configuration.md @@ -276,7 +276,7 @@ Location: `master-server/conf/application.yaml` |master.max-cpu-load-avg|-1|master max CPU load avg, only higher than the system CPU load average, master server can schedule. default value -1: the number of CPU cores * 2| |master.reserved-memory|0.3|master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G| |master.failover-interval|10|failover interval, the unit is minute| -|master.kill-yarn-job-when-task-failover|true|whether to kill yarn job when failover taskInstance| +|master.kill-application-when-task-failover|true|whether to kill yarn/k8s application when failover taskInstance| |master.registry-disconnect-strategy.strategy|stop|Used when the master disconnect from registry, default value: stop. Optional values include stop, waiting| |master.registry-disconnect-strategy.max-waiting-time|100s|Used when the master disconnect from registry, and the disconnect strategy is waiting, this config means the master will waiting to reconnect to registry in given times, and after the waiting times, if the master still cannot connect to registry, will stop itself, if the value is 0s, the Master will wait infinitely| |master.worker-group-refresh-interval|10s|The interval to refresh worker group from db to memory| diff --git a/docs/docs/zh/architecture/configuration.md b/docs/docs/zh/architecture/configuration.md index 105b433b7c..6db0975606 100644 --- a/docs/docs/zh/architecture/configuration.md +++ b/docs/docs/zh/architecture/configuration.md @@ -271,7 +271,7 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId |master.max-cpu-load-avg|-1|master最大cpuload均值,只有高于系统cpuload均值时,master服务才能调度任务. 默认值为-1: cpu cores * 2| |master.reserved-memory|0.3|master预留内存,只有低于系统可用内存时,master服务才能调度任务,单位为G| |master.failover-interval|10|failover间隔,单位为分钟| -|master.kill-yarn-job-when-task-failover|true|当任务实例failover时,是否kill掉yarn job| +|master.kill-application-when-task-failover|true|当任务实例failover时,是否kill掉yarn或k8s application| |master.registry-disconnect-strategy.strategy|stop|当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting| |master.registry-disconnect-strategy.max-waiting-time|100s|当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 | |master.master.worker-group-refresh-interval|10s|定期将workerGroup从数据库中同步到内存的时间间隔| diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResourceManagerType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResourceManagerType.java new file mode 100644 index 0000000000..265b0765bf --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResourceManagerType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.enums; + +public enum ResourceManagerType { + + YARN, + + KUBERNETES; + +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java index 36f075e00e..b2776bc7b9 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.utils; import static org.apache.dolphinscheduler.common.constants.Constants.DATA_BASEDIR_PATH; import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR; +import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S; import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_VIEW_SUFFIXES; import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_VIEW_SUFFIXES_DEFAULT_VALUE; import static org.apache.dolphinscheduler.common.constants.Constants.UTF_8; @@ -49,6 +50,8 @@ public class FileUtils { public static final String APPINFO_PATH = "appInfo.log"; + public static final String KUBE_CONFIG_FILE = "config"; + private FileUtils() { throw new UnsupportedOperationException("Construct FileUtils"); } @@ -116,6 +119,16 @@ public class FileUtils { taskInstanceId); } + /** + * absolute path of kubernetes configuration file + * + * @param execPath + * @return + */ + public static String getKubeConfigPath(String execPath) { + return String.format(FORMAT_S_S, execPath, KUBE_CONFIG_FILE); + } + /** * absolute path of appInfo file * diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index 3971d1e290..bbd90eb477 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -88,7 +88,7 @@ public class MasterConfig implements Validator { private double maxCpuLoadAvg = -1; private double reservedMemory = 0.3; private Duration failoverInterval = Duration.ofMinutes(10); - private boolean killYarnJobWhenTaskFailover = true; + private boolean killApplicationWhenTaskFailover = true; private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties(); private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L); @@ -163,7 +163,7 @@ public class MasterConfig implements Validator { log.info("Master config: maxCpuLoadAvg -> {} ", maxCpuLoadAvg); log.info("Master config: reservedMemory -> {} ", reservedMemory); log.info("Master config: failoverInterval -> {} ", failoverInterval); - log.info("Master config: killYarnJobWhenTaskFailover -> {} ", killYarnJobWhenTaskFailover); + log.info("Master config: killApplicationWhenTaskFailover -> {} ", killApplicationWhenTaskFailover); log.info("Master config: registryDisconnectStrategy -> {} ", registryDisconnectStrategy); log.info("Master config: masterAddress -> {} ", masterAddress); log.info("Master config: masterRegistryPath -> {} ", masterRegistryPath); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 757d479b9e..559163f363 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -25,8 +25,8 @@ import static org.apache.dolphinscheduler.common.constants.Constants.PASSWORD; import static org.apache.dolphinscheduler.common.constants.Constants.SINGLE_SLASH; import static org.apache.dolphinscheduler.common.constants.Constants.USER; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAMESPACE_NAME; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DATA_QUALITY; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SET_K8S; import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_NAME; import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TABLE; import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TYPE; @@ -72,6 +72,7 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceP import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.UdfFuncParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.JdbcUrlParser; import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils; +import org.apache.dolphinscheduler.plugin.task.spark.SparkParameters; import org.apache.dolphinscheduler.server.master.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -328,11 +329,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { dataQualityTaskExecutionContext = new DataQualityTaskExecutionContext(); setDataQualityTaskRelation(dataQualityTaskExecutionContext, taskInstance, tenant.getTenantCode()); } - K8sTaskExecutionContext k8sTaskExecutionContext = null; - if (TASK_TYPE_SET_K8S.contains(taskInstance.getTaskType())) { - k8sTaskExecutionContext = new K8sTaskExecutionContext(); - setK8sTaskRelation(k8sTaskExecutionContext, taskInstance); - } + + K8sTaskExecutionContext k8sTaskExecutionContext = setK8sTaskRelation(taskInstance); Map businessParamsMap = curingParamsService.preBuildBusinessParams(processInstance); @@ -635,18 +633,39 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { } /** - * set k8s task relation - * @param k8sTaskExecutionContext k8sTaskExecutionContext + * get k8s task execution context based on task type and deploy mode + * * @param taskInstance taskInstance */ - private void setK8sTaskRelation(K8sTaskExecutionContext k8sTaskExecutionContext, TaskInstance taskInstance) { - K8sTaskParameters k8sTaskParameters = - JSONUtils.parseObject(taskInstance.getTaskParams(), K8sTaskParameters.class); - Map namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace()); - String clusterName = namespace.get(CLUSTER); - String configYaml = processService.findConfigYamlByName(clusterName); - if (configYaml != null) { - k8sTaskExecutionContext.setConfigYaml(configYaml); + private K8sTaskExecutionContext setK8sTaskRelation(TaskInstance taskInstance) { + K8sTaskExecutionContext k8sTaskExecutionContext = null; + String namespace = ""; + switch (taskInstance.getTaskType()) { + case "K8S": + case "KUBEFLOW": + K8sTaskParameters k8sTaskParameters = + JSONUtils.parseObject(taskInstance.getTaskParams(), K8sTaskParameters.class); + namespace = k8sTaskParameters.getNamespace(); + break; + case "SPARK": + SparkParameters sparkParameters = + JSONUtils.parseObject(taskInstance.getTaskParams(), SparkParameters.class); + if (StringUtils.isNotEmpty(sparkParameters.getNamespace())) { + namespace = sparkParameters.getNamespace(); + } + break; + default: + break; + } + + if (StringUtils.isNotEmpty(namespace)) { + String clusterName = JSONUtils.toMap(namespace).get(CLUSTER); + String configYaml = processService.findConfigYamlByName(clusterName); + if (configYaml != null) { + k8sTaskExecutionContext = + new K8sTaskExecutionContext(configYaml, JSONUtils.toMap(namespace).get(NAMESPACE_NAME)); + } } + return k8sTaskExecutionContext; } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java index 3979401425..441556cb79 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java @@ -227,7 +227,7 @@ public class MasterFailoverService { /** * failover task instance *

- * 1. kill yarn job if run on worker and there are yarn jobs in tasks. + * 1. kill yarn/k8s job if run on worker and there are yarn/k8s jobs in tasks. * 2. change task state from running to need failover. * 3. try to notify local master * @@ -248,10 +248,10 @@ public class MasterFailoverService { .buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition()) .create(); - if (masterConfig.isKillYarnJobWhenTaskFailover()) { - // only kill yarn job if exists , the local thread has exited - log.info("TaskInstance failover begin kill the task related yarn job"); - ProcessUtils.killYarnJob(logClient, taskExecutionContext); + if (masterConfig.isKillApplicationWhenTaskFailover()) { + // only kill yarn/k8s job if exists , the local thread has exited + log.info("TaskInstance failover begin kill the task related yarn or k8s job"); + ProcessUtils.killApplication(logClient, taskExecutionContext); } // kill worker task, When the master failover and worker failover happened in the same time, // the task may not be failover if we don't set NEED_FAULT_TOLERANCE. diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java index 928580fb82..a04ce18a93 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java @@ -154,7 +154,7 @@ public class WorkerFailoverService { /** * failover task instance *

- * 1. kill yarn job if run on worker and there are yarn jobs in tasks. + * 1. kill yarn/k8s job if run on worker and there are yarn/k8s jobs in tasks. * 2. change task state from running to need failover. * 3. try to notify local master * @@ -175,10 +175,10 @@ public class WorkerFailoverService { .buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition()) .create(); - if (masterConfig.isKillYarnJobWhenTaskFailover()) { - // only kill yarn job if exists , the local thread has exited - log.info("TaskInstance failover begin kill the task related yarn job"); - ProcessUtils.killYarnJob(logClient, taskExecutionContext); + if (masterConfig.isKillApplicationWhenTaskFailover()) { + // only kill yarn/k8s job if exists , the local thread has exited + log.info("TaskInstance failover begin kill the task related yarn or k8s job"); + ProcessUtils.killApplication(logClient, taskExecutionContext); } } else { log.info("The failover taskInstance is a master task"); diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml index 7f9c948e77..e333b566b6 100644 --- a/dolphinscheduler-master/src/main/resources/application.yaml +++ b/dolphinscheduler-master/src/main/resources/application.yaml @@ -108,8 +108,8 @@ master: reserved-memory: 0.3 # failover interval, the unit is minute failover-interval: 10m - # kill yarn jon when failover taskInstance, default true - kill-yarn-job-when-task-failover: true + # kill yarn / k8s application when failover taskInstance, default true + kill-application-when-task-failover: true registry-disconnect-strategy: # The disconnect strategy: stop, waiting strategy: waiting diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java index 78f5cb44a6..f926d3ee31 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.log.LogClient; @@ -161,8 +162,8 @@ public class ProcessUtils { * @param taskExecutionContext taskExecutionContext * @return yarn application ids */ - public static @Nullable List killYarnJob(@NonNull LogClient logClient, - @NonNull TaskExecutionContext taskExecutionContext) { + public static @Nullable List killApplication(@NonNull LogClient logClient, + @NonNull TaskExecutionContext taskExecutionContext) { if (taskExecutionContext.getLogPath() == null) { return Collections.emptyList(); } @@ -172,6 +173,7 @@ public class ProcessUtils { List appIds = logClient.getAppIds(host.getIp(), host.getPort(), taskExecutionContext.getLogPath(), taskExecutionContext.getAppInfoPath()); if (CollectionUtils.isNotEmpty(appIds)) { + taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, appIds)); if (StringUtils.isEmpty(taskExecutionContext.getExecutePath())) { taskExecutionContext .setExecutePath(FileUtils.getProcessExecDir( @@ -183,9 +185,7 @@ public class ProcessUtils { taskExecutionContext.getTaskInstanceId())); } FileUtils.createWorkDirIfAbsent(taskExecutionContext.getExecutePath()); - org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(appIds, log, - taskExecutionContext.getTenantCode(), - taskExecutionContext.getExecutePath()); + org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(taskExecutionContext); return appIds; } else { log.info("The current appId is empty, don't need to kill the yarn job, taskInstanceId: {}", diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index 5a995a6dcc..afacc8f7d7 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -146,8 +146,8 @@ master: reserved-memory: 0.03 # failover interval failover-interval: 10m - # kill yarn jon when failover taskInstance, default true - kill-yarn-job-when-task-failover: true + # kill yarn/k8s application when failover taskInstance, default true + kill-application-when-task-failover: true worker-group-refresh-interval: 10s worker: diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index faceab2b3d..e2e0ce9338 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -23,6 +23,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.get import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants; import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils; @@ -38,6 +39,7 @@ import java.io.InputStreamReader; import java.lang.reflect.Field; import java.util.LinkedList; import java.util.List; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -229,8 +231,11 @@ public abstract class AbstractCommandExecutor { // waiting for the run to finish boolean status = process.waitFor(remainTime, TimeUnit.SECONDS); + TaskExecutionStatus kubernetesStatus = + ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId()); + // if SHELL task exit - if (status) { + if (status && kubernetesStatus.isSuccess()) { // SHELL task state result.setExitStatusCode(process.exitValue()); @@ -334,7 +339,11 @@ public abstract class AbstractCommandExecutor { LinkedBlockingQueue markerLog = new LinkedBlockingQueue<>(1); markerLog.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString()); - + String logs = appendPodLogIfNeeded(); + if (StringUtils.isNotEmpty(logs)) { + logBuffer.add("Dump logs from driver pod:"); + logBuffer.add(logs); + } if (!logBuffer.isEmpty()) { // log handle logHandler.accept(logBuffer); @@ -348,6 +357,13 @@ public abstract class AbstractCommandExecutor { } } + private String appendPodLogIfNeeded() { + if (Objects.isNull(taskRequest.getK8sTaskExecutionContext())) { + return ""; + } + return ProcessUtils.getPodLog(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId()); + } + /** * get the standard output of the process * diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java index 7e39f184fa..aa5bf62fe2 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java @@ -19,26 +19,34 @@ package org.apache.dolphinscheduler.plugin.task.api; import java.io.Serializable; +import lombok.Value; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + /** * k8s Task ExecutionContext */ - +@Value public class K8sTaskExecutionContext implements Serializable { private String configYaml; - public String getConfigYaml() { - return configYaml; - } + private String namespace; - public void setConfigYaml(String configYaml) { + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public K8sTaskExecutionContext( + @JsonProperty("configYaml") String configYaml, + @JsonProperty("namespace") String namespace) { this.configYaml = configYaml; + this.namespace = namespace; } @Override public String toString() { return "K8sTaskExecutionContext{" - + "configYaml='" + configYaml + '\'' + + "namespace=" + namespace + + ", configYaml='" + configYaml + '\'' + '}'; } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java index 18799227b1..1a30864a94 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java @@ -29,6 +29,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; +import java.util.Objects; import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; @@ -115,6 +116,15 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) { sb.append(taskRequest.getEnvironmentConfig()).append(System.lineSeparator()); } + if (Objects.nonNull(taskRequest.getK8sTaskExecutionContext())) { + String configYaml = taskRequest.getK8sTaskExecutionContext().getConfigYaml(); + Path kubeConfigPath = Paths.get(org.apache.dolphinscheduler.common.utils.FileUtils + .getKubeConfigPath(taskRequest.getExecutePath())); + FileUtils.createFileWith755(kubeConfigPath); + Files.write(kubeConfigPath, configYaml.getBytes(), StandardOpenOption.APPEND); + sb.append("export KUBECONFIG=" + kubeConfigPath).append(System.lineSeparator()); + logger.info("Create kubernetes configuration file: {}.", kubeConfigPath); + } } sb.append(execCommand); String commandContent = sb.toString(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java index a0928d0eac..2187102a7d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java @@ -430,7 +430,7 @@ public class TaskConstants { public static final String TASK_TYPE_DATA_QUALITY = "DATA_QUALITY"; - public static final String TASK_TYPE_K8S = "K8S"; + public static final String DEPLOY_MODE_KUBERNETES = "Kubernetes"; public static final Set TASK_TYPE_SET_K8S = Sets.newHashSet("K8S", "KUBEFLOW"); @@ -480,6 +480,11 @@ public class TaskConstants { public static final String CLUSTER = "cluster"; public static final Pattern COMMAND_SPLIT_REGEX = Pattern.compile("[^\\s\"'`]+|\"([^\"]+)\"|'([^']+)'|`([^`]+)`"); + /** + * spark / flink on k8s label name + */ + public static final String UNIQUE_LABEL_NAME = "dolphinscheduler-label"; + /** * conda config used by jupyter task plugin */ diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/ApplicationManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/ApplicationManager.java new file mode 100644 index 0000000000..0d7761ae6d --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/ApplicationManager.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.am; + +import org.apache.dolphinscheduler.common.enums.ResourceManagerType; + +public interface ApplicationManager { + + /** + * kill application by application manager context + * + * @param applicationManagerContext + * @return + */ + boolean killApplication(ApplicationManagerContext applicationManagerContext); + + /** + * get resource manager type + * + * @return ResourceManagerType yarn / kubernetes + */ + ResourceManagerType getResourceManagerType(); + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/ApplicationManagerContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/ApplicationManagerContext.java new file mode 100644 index 0000000000..b6fad893de --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/ApplicationManagerContext.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.am; + +public interface ApplicationManagerContext { + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java new file mode 100644 index 0000000000..551bd4465d --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.am; + +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME; + +import org.apache.dolphinscheduler.common.enums.ResourceManagerType; +import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +import lombok.extern.slf4j.Slf4j; + +import com.google.auto.service.AutoService; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; + +@Slf4j +@AutoService(ApplicationManager.class) +public class KubernetesApplicationManager implements ApplicationManager { + + private static final String PENDING = "Pending"; + private static final String RUNNING = "Running"; + private static final String FINISH = "Succeeded"; + private static final String FAILED = "Failed"; + private static final String UNKNOWN = "Unknown"; + + /** + * cache k8s client for same task + */ + private final Map cacheClientMap = new ConcurrentHashMap<>(); + + @Override + public boolean killApplication(ApplicationManagerContext applicationManagerContext) throws TaskException { + KubernetesApplicationManagerContext kubernetesApplicationManagerContext = + (KubernetesApplicationManagerContext) applicationManagerContext; + + boolean isKill; + String labelValue = kubernetesApplicationManagerContext.getLabelValue(); + FilterWatchListDeletable watchList = getDriverPod(kubernetesApplicationManagerContext); + try { + if (getApplicationStatus(kubernetesApplicationManagerContext, watchList).isFailure()) { + log.error("Driver pod is in FAILED or UNKNOWN status."); + isKill = false; + } else { + watchList.delete(); + isKill = true; + } + } catch (Exception e) { + throw new TaskException("Failed to kill Kubernetes application with label " + labelValue, e); + } finally { + // remove client cache after killing application + removeCache(labelValue); + } + + return isKill; + } + + @Override + public ResourceManagerType getResourceManagerType() { + return ResourceManagerType.KUBERNETES; + } + + /** + * get driver pod + * + * @param kubernetesApplicationManagerContext + * @return + */ + private FilterWatchListDeletable getDriverPod(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { + KubernetesClient client = getClient(kubernetesApplicationManagerContext); + String labelValue = kubernetesApplicationManagerContext.getLabelValue(); + FilterWatchListDeletable watchList = + client.pods() + .inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace()) + .withLabel(UNIQUE_LABEL_NAME, labelValue); + List podList = watchList.list().getItems(); + if (podList.size() != 1) { + log.warn("Expected driver pod 1, but get {}.", podList.size()); + } + return watchList; + } + + /** + * create client or get from cache map + * + * @param kubernetesApplicationManagerContext + * @return + */ + private KubernetesClient getClient(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { + K8sTaskExecutionContext k8sTaskExecutionContext = + kubernetesApplicationManagerContext.getK8sTaskExecutionContext(); + return cacheClientMap.computeIfAbsent(kubernetesApplicationManagerContext.getLabelValue(), + key -> new DefaultKubernetesClient(Config.fromKubeconfig(k8sTaskExecutionContext.getConfigYaml()))); + } + + public void removeCache(String cacheKey) { + try (KubernetesClient ignored = cacheClientMap.remove(cacheKey)) { + } + } + + /** + * get application execution status + * + * @param kubernetesApplicationManagerContext + * @return TaskExecutionStatus SUCCESS / FAILURE + * @throws TaskException + */ + public TaskExecutionStatus getApplicationStatus(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) throws TaskException { + return getApplicationStatus(kubernetesApplicationManagerContext, null); + } + + /** + * get application (driver pod) status + * + * @param kubernetesApplicationManagerContext + * @param watchList + * @return + * @throws TaskException + */ + private TaskExecutionStatus getApplicationStatus(KubernetesApplicationManagerContext kubernetesApplicationManagerContext, + FilterWatchListDeletable watchList) throws TaskException { + String phase; + try { + if (Objects.isNull(watchList)) { + watchList = getDriverPod(kubernetesApplicationManagerContext); + } + List driverPod = watchList.list().getItems(); + if (!driverPod.isEmpty()) { + // cluster mode + Pod driver = driverPod.get(0); + phase = driver.getStatus().getPhase(); + } else { + // client mode + phase = FINISH; + } + } catch (Exception e) { + throw new TaskException("Failed to get Kubernetes application status", e); + } + + return phase.equals(FAILED) || phase.equals(UNKNOWN) ? TaskExecutionStatus.FAILURE + : TaskExecutionStatus.SUCCESS; + } + + /** + * collect pod's log + * + * @param kubernetesApplicationManagerContext + * @return + */ + public String collectPodLog(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) { + try { + KubernetesClient client = getClient(kubernetesApplicationManagerContext); + FilterWatchListDeletable watchList = getDriverPod(kubernetesApplicationManagerContext); + List driverPod = watchList.list().getItems(); + if (!driverPod.isEmpty()) { + Pod driver = driverPod.get(0); + String driverPodName = driver.getMetadata().getName(); + String logs = client.pods() + .inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace()) + .withName(driverPodName).getLog(); + + // delete driver pod only after successful execution + killApplication(kubernetesApplicationManagerContext); + return logs; + } + } catch (Exception e) { + log.error("Collect pod log failed:", e.getMessage()); + } + return ""; + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java new file mode 100644 index 0000000000..19dd223477 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.am; + +import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; + +import lombok.RequiredArgsConstructor; +import lombok.Value; + +@Value +@RequiredArgsConstructor +public class KubernetesApplicationManagerContext implements ApplicationManagerContext { + + /** + * kubernetes execution context + */ + private final K8sTaskExecutionContext k8sTaskExecutionContext; + + /** + * driver pod label value + */ + private final String labelValue; + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java new file mode 100644 index 0000000000..d7f3ffcf10 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.am; + +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.enums.ResourceManagerType; +import org.apache.dolphinscheduler.common.utils.HttpUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.KerberosHttpClient; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; + +import org.apache.commons.lang3.StringUtils; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import lombok.extern.slf4j.Slf4j; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.auto.service.AutoService; + +@Slf4j +@AutoService(ApplicationManager.class) +public class YarnApplicationManager implements ApplicationManager { + + private static final String RM_HA_IDS = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS); + private static final String APP_ADDRESS = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS); + private static final String JOB_HISTORY_ADDRESS = + PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS); + private static final int HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE = + PropertyUtils.getInt(Constants.HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT, 8088); + + @Override + public boolean killApplication(ApplicationManagerContext applicationManagerContext) throws TaskException { + YarnApplicationManagerContext yarnApplicationManagerContext = + (YarnApplicationManagerContext) applicationManagerContext; + String executePath = yarnApplicationManagerContext.getExecutePath(); + String tenantCode = yarnApplicationManagerContext.getTenantCode(); + List appIds = yarnApplicationManagerContext.getAppIds(); + for (String appId : appIds) { + try { + TaskExecutionStatus applicationStatus = getApplicationStatus(appId); + + if (!applicationStatus.isFinished()) { + String commandFile = String.format("%s/%s.kill", executePath, appId); + String cmd = getKerberosInitCommand() + "yarn application -kill " + appId; + execYarnKillCommand(tenantCode, appId, commandFile, cmd); + } + } catch (Exception e) { + log.error("Get yarn application app id [{}}] status failed", appId, e); + throw new TaskException(e.getMessage()); + } + } + return true; + } + + @Override + public ResourceManagerType getResourceManagerType() { + return ResourceManagerType.YARN; + } + + /** + * get the state of an application + * + * @param applicationId application id + * @return the return may be null or there may be other parse exceptions + */ + public TaskExecutionStatus getApplicationStatus(String applicationId) throws TaskException { + if (StringUtils.isEmpty(applicationId)) { + return null; + } + + String result; + String applicationUrl = getApplicationUrl(applicationId); + log.debug("generate yarn application url, applicationUrl={}", applicationUrl); + + String responseContent = Boolean.TRUE + .equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) + ? KerberosHttpClient.get(applicationUrl) + : HttpUtils.get(applicationUrl); + if (responseContent != null) { + ObjectNode jsonObject = JSONUtils.parseObject(responseContent); + if (!jsonObject.has("app")) { + return TaskExecutionStatus.FAILURE; + } + result = jsonObject.path("app").path("finalStatus").asText(); + + } else { + // may be in job history + String jobHistoryUrl = getJobHistoryUrl(applicationId); + log.debug("generate yarn job history application url, jobHistoryUrl={}", jobHistoryUrl); + responseContent = Boolean.TRUE + .equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) + ? KerberosHttpClient.get(jobHistoryUrl) + : HttpUtils.get(jobHistoryUrl); + + if (null != responseContent) { + ObjectNode jsonObject = JSONUtils.parseObject(responseContent); + if (!jsonObject.has("job")) { + return TaskExecutionStatus.FAILURE; + } + result = jsonObject.path("job").path("state").asText(); + } else { + return TaskExecutionStatus.FAILURE; + } + } + + return getExecutionStatus(result); + } + + /** + * get application url + * if rmHaIds contains xx, it signs not use resourcemanager + * otherwise: + * if rmHaIds is empty, single resourcemanager enabled + * if rmHaIds not empty: resourcemanager HA enabled + * + * @param applicationId application id + * @return url of application + */ + private String getApplicationUrl(String applicationId) throws TaskException { + + String appUrl = StringUtils.isEmpty(RM_HA_IDS) ? APP_ADDRESS : getAppAddress(APP_ADDRESS, RM_HA_IDS); + if (StringUtils.isBlank(appUrl)) { + throw new TaskException("yarn application url generation failed"); + } + log.debug("yarn application url:{}, applicationId:{}", appUrl, applicationId); + return String.format(appUrl, HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE, applicationId); + } + + private String getJobHistoryUrl(String applicationId) { + // eg:application_1587475402360_712719 -> job_1587475402360_712719 + String jobId = applicationId.replace("application", "job"); + return String.format(JOB_HISTORY_ADDRESS, jobId); + } + + /** + * build kill command for yarn application + * + * @param tenantCode tenant code + * @param appId app id + * @param commandFile command file + * @param cmd cmd + */ + private void execYarnKillCommand(String tenantCode, String appId, String commandFile, + String cmd) { + try { + StringBuilder sb = new StringBuilder(); + sb.append("#!/bin/sh\n"); + sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n"); + sb.append("cd $BASEDIR\n"); + + sb.append("\n\n"); + sb.append(cmd); + + File f = new File(commandFile); + + if (!f.exists()) { + org.apache.commons.io.FileUtils.writeStringToFile(new File(commandFile), sb.toString(), + StandardCharsets.UTF_8); + } + + String runCmd = String.format("%s %s", Constants.SH, commandFile); + runCmd = org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd); + log.info("kill cmd:{}", runCmd); + org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd); + } catch (Exception e) { + log.error(String.format("Kill yarn application app id [%s] failed: [%s]", appId, e.getMessage())); + } + } + + private TaskExecutionStatus getExecutionStatus(String result) { + switch (result) { + case Constants.ACCEPTED: + return TaskExecutionStatus.SUBMITTED_SUCCESS; + case Constants.SUCCEEDED: + case Constants.ENDED: + return TaskExecutionStatus.SUCCESS; + case Constants.NEW: + case Constants.NEW_SAVING: + case Constants.SUBMITTED: + case Constants.FAILED: + return TaskExecutionStatus.FAILURE; + case Constants.KILLED: + return TaskExecutionStatus.KILL; + case Constants.RUNNING: + default: + return TaskExecutionStatus.RUNNING_EXECUTION; + } + } + + /** + * getAppAddress + * + * @param appAddress app address + * @param rmHa resource manager ha + * @return app address + */ + private String getAppAddress(String appAddress, String rmHa) { + + String[] split1 = appAddress.split(Constants.DOUBLE_SLASH); + + if (split1.length != 2) { + return null; + } + + String start = split1[0] + Constants.DOUBLE_SLASH; + String[] split2 = split1[1].split(Constants.COLON); + + if (split2.length != 2) { + return null; + } + + String end = Constants.COLON + split2[1]; + + // get active ResourceManager + String activeRM = YarnHAAdminUtils.getActiveRMName(start, rmHa); + + if (StringUtils.isEmpty(activeRM)) { + return null; + } + + return start + activeRM + end; + } + + /** + * get kerberos init command + */ + private String getKerberosInitCommand() { + log.info("get kerberos init command"); + StringBuilder kerberosCommandBuilder = new StringBuilder(); + boolean hadoopKerberosState = + PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false); + if (hadoopKerberosState) { + kerberosCommandBuilder.append("export KRB5_CONFIG=") + .append(PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH)) + .append("\n\n") + .append(String.format("kinit -k -t %s %s || true", + PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH), + PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME))) + .append("\n\n"); + log.info("kerberos init command: {}", kerberosCommandBuilder); + } + return kerberosCommandBuilder.toString(); + } + + /** + * yarn ha admin utils + */ + private static final class YarnHAAdminUtils { + + /** + * get active resourcemanager node + * + * @param protocol http protocol + * @param rmIds yarn ha ids + * @return yarn active node + */ + public static String getActiveRMName(String protocol, String rmIds) { + + String[] rmIdArr = rmIds.split(Constants.COMMA); + + String yarnUrl = protocol + "%s:" + HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE + "/ws/v1/cluster/info"; + + try { + + /** + * send http get request to rm + */ + + for (String rmId : rmIdArr) { + String state = getRMState(String.format(yarnUrl, rmId)); + if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) { + return rmId; + } + } + + } catch (Exception e) { + log.error("yarn ha application url generation failed, message:{}", e.getMessage()); + } + return null; + } + + /** + * get ResourceManager state + */ + public static String getRMState(String url) { + + String retStr = Boolean.TRUE + .equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) + ? KerberosHttpClient.get(url) + : HttpUtils.get(url); + + if (StringUtils.isEmpty(retStr)) { + return null; + } + // to json + ObjectNode jsonObject = JSONUtils.parseObject(retStr); + + // get ResourceManager state + if (!jsonObject.has("clusterInfo")) { + return null; + } + return jsonObject.get("clusterInfo").path("haState").asText(); + } + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManagerContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManagerContext.java new file mode 100644 index 0000000000..44dc047f13 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManagerContext.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.am; + +import java.util.List; + +import lombok.RequiredArgsConstructor; +import lombok.Value; + +@Value +@RequiredArgsConstructor +public class YarnApplicationManagerContext implements ApplicationManagerContext { + + /** + * execute path + */ + private final String executePath; + + /** + * tenant code + */ + private final String tenantCode; + + /** + * appId list + */ + private final List appIds; + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java index abaa688c9a..9ec8480589 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java @@ -17,32 +17,39 @@ package org.apache.dolphinscheduler.plugin.task.api.utils; -import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.exception.BaseException; -import org.apache.dolphinscheduler.common.utils.HttpUtils; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.KerberosHttpClient; +import static org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLECT; +import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.COMMA; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SET_K8S; + +import org.apache.dolphinscheduler.common.enums.ResourceManagerType; import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.am.ApplicationManager; +import org.apache.dolphinscheduler.plugin.task.api.am.KubernetesApplicationManager; +import org.apache.dolphinscheduler.plugin.task.api.am.KubernetesApplicationManagerContext; +import org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManagerContext; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.SystemUtils; -import java.io.File; -import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.ServiceLoader; import java.util.regex.Matcher; import java.util.regex.Pattern; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; -import org.slf4j.Logger; - -import com.fasterxml.jackson.databind.node.ObjectNode; - @Slf4j public final class ProcessUtils { @@ -50,6 +57,14 @@ public final class ProcessUtils { throw new IllegalStateException("Utility class"); } + private static final Map applicationManagerMap = new HashMap<>(); + + static { + ServiceLoader.load(ApplicationManager.class) + .forEach(applicationManager -> applicationManagerMap.put(applicationManager.getResourceManagerType(), + applicationManager)); + } + /** * Initialization regularization, solve the problem of pre-compilation performance, * avoid the thread safety problem of multi-thread operation @@ -61,13 +76,6 @@ public final class ProcessUtils { */ private static final Pattern WINDOWSATTERN = Pattern.compile("(\\d+)"); - private static final String RM_HA_IDS = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS); - private static final String APP_ADDRESS = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS); - private static final String JOB_HISTORY_ADDRESS = - PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS); - private static final int HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE = - PropertyUtils.getInt(Constants.HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT, 8088); - /** * kill tasks according to different task types. */ @@ -124,278 +132,83 @@ public final class ProcessUtils { } /** - * kill yarn application. + * cacel k8s / yarn application * - * @param appIds app id list - * @param logger logger - * @param tenantCode tenant code - * @param executePath execute path + * @param taskExecutionContext + * @return */ - public static void cancelApplication(List appIds, Logger logger, String tenantCode, String executePath) { - if (appIds == null || appIds.isEmpty()) { - return; - } - - for (String appId : appIds) { - try { - TaskExecutionStatus applicationStatus = getApplicationStatus(appId); - - if (!applicationStatus.isFinished()) { - String commandFile = String.format("%s/%s.kill", executePath, appId); - String cmd = getKerberosInitCommand() + "yarn application -kill " + appId; - execYarnKillCommand(logger, tenantCode, appId, commandFile, cmd); + public static void cancelApplication(TaskExecutionContext taskExecutionContext) { + try { + if (Objects.nonNull(taskExecutionContext.getK8sTaskExecutionContext()) && + !TASK_TYPE_SET_K8S.contains(taskExecutionContext.getTaskType())) { + applicationManagerMap.get(ResourceManagerType.KUBERNETES) + .killApplication(new KubernetesApplicationManagerContext( + taskExecutionContext.getK8sTaskExecutionContext(), + taskExecutionContext.getTaskAppId())); + } else { + String host = taskExecutionContext.getHost(); + String executePath = taskExecutionContext.getExecutePath(); + String tenantCode = taskExecutionContext.getTenantCode(); + List appIds; + if (StringUtils.isNotEmpty(taskExecutionContext.getAppIds())) { + // is failover + appIds = Arrays.asList(taskExecutionContext.getAppIds().split(COMMA)); + } else { + String logPath = taskExecutionContext.getLogPath(); + String appInfoPath = taskExecutionContext.getAppInfoPath(); + if (logPath == null || appInfoPath == null || executePath == null || tenantCode == null) { + log.error( + "Kill yarn job error, the input params is illegal, host: {}, logPath: {}, appInfoPath: {}, executePath: {}, tenantCode: {}", + host, logPath, appInfoPath, executePath, tenantCode); + throw new TaskException("Cancel application failed!"); + } + log.info("Get appIds from worker {}, taskLogPath: {}", host, logPath); + appIds = LogUtils.getAppIds(logPath, appInfoPath, + PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY)); + taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, appIds)); } - } catch (Exception e) { - log.error("Get yarn application app id [{}}] status failed", appId, e); - } - } - } - - /** - * get the state of an application - * - * @param applicationId application id - * @return the return may be null or there may be other parse exceptions - */ - public static TaskExecutionStatus getApplicationStatus(String applicationId) throws BaseException { - if (StringUtils.isEmpty(applicationId)) { - return null; - } - - String result; - String applicationUrl = getApplicationUrl(applicationId); - log.debug("generate yarn application url, applicationUrl={}", applicationUrl); - - String responseContent = Boolean.TRUE - .equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) - ? KerberosHttpClient.get(applicationUrl) - : HttpUtils.get(applicationUrl); - if (responseContent != null) { - ObjectNode jsonObject = JSONUtils.parseObject(responseContent); - if (!jsonObject.has("app")) { - return TaskExecutionStatus.FAILURE; - } - result = jsonObject.path("app").path("finalStatus").asText(); - - } else { - // may be in job history - String jobHistoryUrl = getJobHistoryUrl(applicationId); - log.debug("generate yarn job history application url, jobHistoryUrl={}", jobHistoryUrl); - responseContent = Boolean.TRUE - .equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) - ? KerberosHttpClient.get(jobHistoryUrl) - : HttpUtils.get(jobHistoryUrl); - - if (null != responseContent) { - ObjectNode jsonObject = JSONUtils.parseObject(responseContent); - if (!jsonObject.has("job")) { - return TaskExecutionStatus.FAILURE; + if (CollectionUtils.isEmpty(appIds)) { + log.info("The appId is empty"); } - result = jsonObject.path("job").path("state").asText(); - } else { - return TaskExecutionStatus.FAILURE; + ApplicationManager applicationManager = applicationManagerMap.get(ResourceManagerType.YARN); + applicationManager.killApplication(new YarnApplicationManagerContext(executePath, tenantCode, appIds)); } - } - - return getExecutionStatus(result); - } - - /** - * get application url - * if rmHaIds contains xx, it signs not use resourcemanager - * otherwise: - * if rmHaIds is empty, single resourcemanager enabled - * if rmHaIds not empty: resourcemanager HA enabled - * - * @param applicationId application id - * @return url of application - */ - private static String getApplicationUrl(String applicationId) throws BaseException { - - String appUrl = StringUtils.isEmpty(RM_HA_IDS) ? APP_ADDRESS : getAppAddress(APP_ADDRESS, RM_HA_IDS); - if (StringUtils.isBlank(appUrl)) { - throw new BaseException("yarn application url generation failed"); - } - log.debug("yarn application url:{}, applicationId:{}", appUrl, applicationId); - return String.format(appUrl, HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE, applicationId); - } - - private static String getJobHistoryUrl(String applicationId) { - // eg:application_1587475402360_712719 -> job_1587475402360_712719 - String jobId = applicationId.replace("application", "job"); - return String.format(JOB_HISTORY_ADDRESS, jobId); - } - - /** - * build kill command for yarn application - * - * @param logger logger - * @param tenantCode tenant code - * @param appId app id - * @param commandFile command file - * @param cmd cmd - */ - private static void execYarnKillCommand(Logger logger, String tenantCode, String appId, String commandFile, - String cmd) { - try { - StringBuilder sb = new StringBuilder(); - sb.append("#!/bin/sh\n"); - sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n"); - sb.append("cd $BASEDIR\n"); - - sb.append("\n\n"); - sb.append(cmd); - - File f = new File(commandFile); - - if (!f.exists()) { - org.apache.commons.io.FileUtils.writeStringToFile(new File(commandFile), sb.toString(), - StandardCharsets.UTF_8); - } - - String runCmd = String.format("%s %s", Constants.SH, commandFile); - runCmd = org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd); - log.info("kill cmd:{}", runCmd); - org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd); } catch (Exception e) { - log.error(String.format("Kill yarn application app id [%s] failed: [%s]", appId, e.getMessage())); - } - } - - private static TaskExecutionStatus getExecutionStatus(String result) { - switch (result) { - case Constants.ACCEPTED: - return TaskExecutionStatus.SUBMITTED_SUCCESS; - case Constants.SUCCEEDED: - case Constants.ENDED: - return TaskExecutionStatus.SUCCESS; - case Constants.NEW: - case Constants.NEW_SAVING: - case Constants.SUBMITTED: - case Constants.FAILED: - return TaskExecutionStatus.FAILURE; - case Constants.KILLED: - return TaskExecutionStatus.KILL; - case Constants.RUNNING: - default: - return TaskExecutionStatus.RUNNING_EXECUTION; + log.error("Cancel application failed: {}", e.getMessage()); } } /** - * getAppAddress + * get k8s application status * - * @param appAddress app address - * @param rmHa resource manager ha - * @return app address + * @param k8sTaskExecutionContext + * @param taskAppId + * @return */ - private static String getAppAddress(String appAddress, String rmHa) { - - String[] split1 = appAddress.split(Constants.DOUBLE_SLASH); - - if (split1.length != 2) { - return null; - } - - String start = split1[0] + Constants.DOUBLE_SLASH; - String[] split2 = split1[1].split(Constants.COLON); - - if (split2.length != 2) { - return null; - } - - String end = Constants.COLON + split2[1]; - - // get active ResourceManager - String activeRM = YarnHAAdminUtils.getActiveRMName(start, rmHa); - - if (StringUtils.isEmpty(activeRM)) { - return null; - } - - return start + activeRM + end; + public static TaskExecutionStatus getApplicationStatus(K8sTaskExecutionContext k8sTaskExecutionContext, + String taskAppId) { + if (Objects.isNull(k8sTaskExecutionContext)) { + return TaskExecutionStatus.SUCCESS; + } + KubernetesApplicationManager applicationManager = + (KubernetesApplicationManager) applicationManagerMap.get(ResourceManagerType.KUBERNETES); + return applicationManager + .getApplicationStatus(new KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId)); } /** - * get kerberos init command - */ - private static String getKerberosInitCommand() { - log.info("get kerberos init command"); - StringBuilder kerberosCommandBuilder = new StringBuilder(); - boolean hadoopKerberosState = - PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false); - if (hadoopKerberosState) { - kerberosCommandBuilder.append("export KRB5_CONFIG=") - .append(PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH)) - .append("\n\n") - .append(String.format("kinit -k -t %s %s || true", - PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH), - PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME))) - .append("\n\n"); - log.info("kerberos init command: {}", kerberosCommandBuilder); - } - return kerberosCommandBuilder.toString(); - } - - /** - * yarn ha admin utils + * get driver pod logs + * + * @param k8sTaskExecutionContext + * @param taskAppId + * @return */ - private static final class YarnHAAdminUtils { - - /** - * get active resourcemanager node - * - * @param protocol http protocol - * @param rmIds yarn ha ids - * @return yarn active node - */ - public static String getActiveRMName(String protocol, String rmIds) { - - String[] rmIdArr = rmIds.split(Constants.COMMA); + public static String getPodLog(K8sTaskExecutionContext k8sTaskExecutionContext, String taskAppId) { + KubernetesApplicationManager applicationManager = + (KubernetesApplicationManager) applicationManagerMap.get(ResourceManagerType.KUBERNETES); - String yarnUrl = protocol + "%s:" + HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE + "/ws/v1/cluster/info"; - - try { - - /** - * send http get request to rm - */ - - for (String rmId : rmIdArr) { - String state = getRMState(String.format(yarnUrl, rmId)); - if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) { - return rmId; - } - } - - } catch (Exception e) { - log.error("yarn ha application url generation failed, message:{}", e.getMessage()); - } - return null; - } - - /** - * get ResourceManager state - */ - public static String getRMState(String url) { - - String retStr = Boolean.TRUE - .equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) - ? KerberosHttpClient.get(url) - : HttpUtils.get(url); - - if (StringUtils.isEmpty(retStr)) { - return null; - } - // to json - ObjectNode jsonObject = JSONUtils.parseObject(retStr); - - // get ResourceManager state - if (!jsonObject.has("clusterInfo")) { - return null; - } - return jsonObject.get("clusterInfo").path("haState").asText(); - } + return applicationManager + .collectPodLog(new KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId)); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/pom.xml index 6d0d6d79b1..046a89d082 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/pom.xml +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/pom.xml @@ -37,5 +37,9 @@ dolphinscheduler-task-api ${project.version} + + io.fabric8 + kubernetes-client + diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java index 0ca069ca34..7ad2e27b07 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java @@ -43,14 +43,14 @@ public class SparkConstants { public static final String DEPLOY_MODE_LOCAL = "local"; /** - * --driver-cores NUM + * --conf spark.driver.cores NUM */ - public static final String DRIVER_CORES = "--driver-cores"; + public static final String DRIVER_CORES = "--conf spark.driver.cores=%d"; /** - * --driver-memory MEM + * --conf spark.driver.memory MEM */ - public static final String DRIVER_MEMORY = "--driver-memory"; + public static final String DRIVER_MEMORY = "--conf spark.driver.memory=%s"; /** * master @@ -59,20 +59,32 @@ public class SparkConstants { public static final String SPARK_ON_YARN = "yarn"; + public static final String SPARK_ON_K8S_MASTER_PREFIX = "k8s://"; + + /** + * add label for driver pod + */ + public static final String DRIVER_LABEL_CONF = "--conf spark.kubernetes.driver.label.%s=%s"; + + /** + * spark kubernetes namespace + */ + public static final String SPARK_KUBERNETES_NAMESPACE = "--conf spark.kubernetes.namespace=%s"; + /** - * --num-executors NUM + * --conf spark.executor.instances NUM */ - public static final String NUM_EXECUTORS = "--num-executors"; + public static final String NUM_EXECUTORS = "--conf spark.executor.instances=%d"; /** - * --executor-cores NUM + * --conf spark.executor.cores NUM */ - public static final String EXECUTOR_CORES = "--executor-cores"; + public static final String EXECUTOR_CORES = "--conf spark.executor.cores=%d"; /** - * --executor-memory MEM + * --conf spark.executor.memory MEM */ - public static final String EXECUTOR_MEMORY = "--executor-memory"; + public static final String EXECUTOR_MEMORY = "--conf spark.executor.memory=%s"; /** * -f SQL from files diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java index 2532c978b9..aa7cd27c85 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java @@ -39,7 +39,7 @@ public class SparkParameters extends AbstractParameters { private String mainClass; /** - * deploy mode + * deploy mode local / cluster / client */ private String deployMode; @@ -100,6 +100,19 @@ public class SparkParameters extends AbstractParameters { */ private String rawScript; + /** + * kubernetes cluster namespace + */ + private String namespace; + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + /** * resource list */ diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java index 80d5be232f..16533397da 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java @@ -17,7 +17,12 @@ package org.apache.dolphinscheduler.plugin.task.spark; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAMESPACE_NAME; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME; +import static org.apache.dolphinscheduler.plugin.task.spark.SparkConstants.DRIVER_LABEL_CONF; +import static org.apache.dolphinscheduler.plugin.task.spark.SparkConstants.SPARK_KUBERNETES_NAMESPACE; +import static org.apache.dolphinscheduler.plugin.task.spark.SparkConstants.SPARK_ON_K8S_MASTER_PREFIX; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask; @@ -45,6 +50,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import io.fabric8.kubernetes.client.Config; + public class SparkTask extends AbstractYarnTask { /** @@ -132,8 +139,15 @@ public class SparkTask extends AbstractYarnTask { String deployMode = StringUtils.isNotEmpty(sparkParameters.getDeployMode()) ? sparkParameters.getDeployMode() : SparkConstants.DEPLOY_MODE_LOCAL; + + boolean onNativeKubernetes = StringUtils.isNotEmpty(sparkParameters.getNamespace()); + + String masterUrl = onNativeKubernetes ? SPARK_ON_K8S_MASTER_PREFIX + + Config.fromKubeconfig(taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml()).getMasterUrl() + : SparkConstants.SPARK_ON_YARN; + if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) { - args.add(SparkConstants.SPARK_ON_YARN); + args.add(masterUrl); args.add(SparkConstants.DEPLOY_MODE); } args.add(deployMode); @@ -168,6 +182,13 @@ public class SparkTask extends AbstractYarnTask { args.add(others); } + // add driver label for spark on native kubernetes + if (onNativeKubernetes) { + args.add(String.format(DRIVER_LABEL_CONF, UNIQUE_LABEL_NAME, taskExecutionContext.getTaskAppId())); + args.add(String.format(SPARK_KUBERNETES_NAMESPACE, + JSONUtils.toMap(sparkParameters.getNamespace()).get(NAMESPACE_NAME))); + } + ResourceInfo mainJar = sparkParameters.getMainJar(); if (programType != ProgramType.SQL) { args.add(mainJar.getRes()); @@ -189,32 +210,27 @@ public class SparkTask extends AbstractYarnTask { private void populateSparkResourceDefinitions(List args) { int driverCores = sparkParameters.getDriverCores(); if (driverCores > 0) { - args.add(SparkConstants.DRIVER_CORES); - args.add(String.format("%d", driverCores)); + args.add(String.format(SparkConstants.DRIVER_CORES, driverCores)); } String driverMemory = sparkParameters.getDriverMemory(); if (StringUtils.isNotEmpty(driverMemory)) { - args.add(SparkConstants.DRIVER_MEMORY); - args.add(driverMemory); + args.add(String.format(SparkConstants.DRIVER_MEMORY, driverMemory)); } int numExecutors = sparkParameters.getNumExecutors(); if (numExecutors > 0) { - args.add(SparkConstants.NUM_EXECUTORS); - args.add(String.format("%d", numExecutors)); + args.add(String.format(SparkConstants.NUM_EXECUTORS, numExecutors)); } int executorCores = sparkParameters.getExecutorCores(); if (executorCores > 0) { - args.add(SparkConstants.EXECUTOR_CORES); - args.add(String.format("%d", executorCores)); + args.add(String.format(SparkConstants.EXECUTOR_CORES, executorCores)); } String executorMemory = sparkParameters.getExecutorMemory(); if (StringUtils.isNotEmpty(executorMemory)) { - args.add(SparkConstants.EXECUTOR_MEMORY); - args.add(executorMemory); + args.add(String.format(SparkConstants.EXECUTOR_MEMORY, executorMemory)); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java index 53a05b9eb7..3c2c41e787 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java @@ -46,11 +46,11 @@ public class SparkTaskTest { "${SPARK_HOME}/bin/spark-sql " + "--master yarn " + "--deploy-mode client " + - "--driver-cores 1 " + - "--driver-memory 512M " + - "--num-executors 2 " + - "--executor-cores 2 " + - "--executor-memory 1G " + + "--conf spark.driver.cores=1 " + + "--conf spark.driver.memory=512M " + + "--conf spark.executor.instances=2 " + + "--conf spark.executor.cores=2 " + + "--conf spark.executor.memory=1G " + "--name sparksql " + "-f /tmp/5536_node.sql"); } @@ -67,11 +67,11 @@ public class SparkTaskTest { "--master yarn " + "--deploy-mode client " + "--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " + - "--driver-cores 1 " + - "--driver-memory 512M " + - "--num-executors 2 " + - "--executor-cores 2 " + - "--executor-memory 1G " + + "--conf spark.driver.cores=1 " + + "--conf spark.driver.memory=512M " + + "--conf spark.executor.instances=2 " + + "--conf spark.executor.cores=2 " + + "--conf spark.executor.memory=1G " + "--name spark " + "lib/dolphinscheduler-task-spark.jar"); } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-namespace.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-namespace.ts index fc4a8925f1..2c0e52aaad 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-namespace.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-namespace.ts @@ -57,7 +57,8 @@ export function useNamespace(): IJsonItem { name: t('project.node.namespace_cluster'), props: { loading, - 'render-label': renderLabel + 'render-label': renderLabel, + 'clearable': true }, options: [ { diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts index de100d770e..7c68a25920 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts @@ -25,6 +25,7 @@ import { useExecutorMemory, useExecutorCores, useMainJar, + useNamespace, useResources } from '.' import type { IJsonItem } from '../types' @@ -90,6 +91,7 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] { } }, useDeployMode(24, ref(true), showCluster), + useNamespace(), { type: 'input', field: 'appName', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index 7d227db0e7..75e69079df 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -60,6 +60,9 @@ export function formatParams(data: INodeData): { taskParams.appName = data.appName taskParams.mainArgs = data.mainArgs taskParams.others = data.others + if (data.namespace) { + taskParams.namespace = data.namespace + } } if (data.taskType === 'SPARK') { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index 3c99ea492e..9afc4708c7 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -17,14 +17,11 @@ package org.apache.dolphinscheduler.server.worker.processor; -import static org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLECT; -import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY; - import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; @@ -35,17 +32,11 @@ import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; -import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.remote.utils.Pair; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable; -import org.apache.commons.collections4.CollectionUtils; - import java.util.Arrays; -import java.util.Collections; -import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -55,7 +46,6 @@ import org.springframework.stereotype.Component; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import io.micrometer.core.lang.NonNull; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -113,11 +103,10 @@ public class TaskKillProcessor implements NettyRequestProcessor { // if processId > 0, it should call cancelApplication to cancel remote application too. this.cancelApplication(taskInstanceId); - Pair> result = doKill(taskExecutionContext); + boolean result = doKill(taskExecutionContext); taskExecutionContext.setCurrentExecutionStatus( - result.getLeft() ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE); - taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight())); + result ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE); sendTaskKillResponseCommand(channel, taskExecutionContext); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); @@ -155,17 +144,17 @@ public class TaskKillProcessor implements NettyRequestProcessor { * * @return kill result */ - private Pair> doKill(TaskExecutionContext taskExecutionContext) { + private boolean doKill(TaskExecutionContext taskExecutionContext) { // kill system process boolean processFlag = killProcess(taskExecutionContext.getTenantCode(), taskExecutionContext.getProcessId()); - // find log and kill yarn job - Pair> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()), - taskExecutionContext.getLogPath(), - taskExecutionContext.getAppInfoPath(), - taskExecutionContext.getExecutePath(), - taskExecutionContext.getTenantCode()); - return Pair.of(processFlag && yarnResult.getLeft(), yarnResult.getRight()); + // kill yarn or k8s application + try { + ProcessUtils.cancelApplication(taskExecutionContext); + } catch (TaskException e) { + return false; + } + return processFlag; } /** @@ -216,42 +205,4 @@ public class TaskKillProcessor implements NettyRequestProcessor { return processFlag; } - /** - * kill yarn job - * - * @param host host - * @param logPath logPath - * @param executePath executePath - * @param tenantCode tenantCode - * @return Pair> yarn kill result - */ - private Pair> killYarnJob(@NonNull Host host, - String logPath, - String appInfoPath, - String executePath, - String tenantCode) { - if (logPath == null || appInfoPath == null || executePath == null || tenantCode == null) { - log.error( - "Kill yarn job error, the input params is illegal, host: {}, logPath: {}, appInfoPath: {}, executePath: {}, tenantCode: {}", - host, logPath, appInfoPath, executePath, tenantCode); - return Pair.of(false, Collections.emptyList()); - } - try { - log.info("Get appIds from worker {}:{} taskLogPath: {}", host.getIp(), host.getPort(), logPath); - List appIds = LogUtils.getAppIds(logPath, appInfoPath, - PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY)); - if (CollectionUtils.isEmpty(appIds)) { - log.info("The appId is empty"); - return Pair.of(true, Collections.emptyList()); - } - - ProcessUtils.cancelApplication(appIds, log, tenantCode, executePath); - return Pair.of(true, appIds); - } catch (Exception e) { - log.error("Kill yarn job error, host: {}, logPath: {}, executePath: {}, tenantCode: {}", host, logPath, - executePath, tenantCode, e); - } - return Pair.of(false, Collections.emptyList()); - } - } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java index 103e253800..2bc30ba346 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.server.worker.runner; -import static org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLECT; -import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY; import static org.apache.dolphinscheduler.common.constants.Constants.DRY_RUN_FLAG_YES; import static org.apache.dolphinscheduler.common.constants.Constants.SINGLE_SLASH; @@ -26,7 +24,6 @@ import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils; import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; @@ -52,12 +49,9 @@ import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient; import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionCheckerUtils; import org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils; -import org.apache.commons.collections4.CollectionUtils; - import java.io.File; import java.io.IOException; import java.nio.file.NoSuchFileException; -import java.util.List; import javax.annotation.Nullable; @@ -143,13 +137,7 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { if (task != null) { try { task.cancel(); - List appIds = - LogUtils.getAppIds(taskExecutionContext.getLogPath(), taskExecutionContext.getExecutePath(), - PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY)); - if (CollectionUtils.isNotEmpty(appIds)) { - ProcessUtils.cancelApplication(appIds, log, taskExecutionContext.getTenantCode(), - taskExecutionContext.getExecutePath()); - } + ProcessUtils.cancelApplication(taskExecutionContext); } catch (Exception e) { log.error( "Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual",