Browse Source

[Feature-13511] Submit Spark task directly on Kubernetes (#13550)

3.2.0-release
Aaron Wang 1 year ago committed by GitHub
parent
commit
047fa2f65e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      deploy/kubernetes/dolphinscheduler/values.yaml
  2. 2
      docs/docs/en/architecture/configuration.md
  3. 2
      docs/docs/zh/architecture/configuration.md
  4. 26
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResourceManagerType.java
  5. 13
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
  6. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  7. 51
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  8. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
  9. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
  10. 4
      dolphinscheduler-master/src/main/resources/application.yaml
  11. 10
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java
  12. 4
      dolphinscheduler-standalone-server/src/main/resources/application.yaml
  13. 20
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
  14. 20
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
  15. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
  16. 7
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
  17. 39
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/ApplicationManager.java
  18. 22
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/ApplicationManagerContext.java
  19. 198
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
  20. 39
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java
  21. 326
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java
  22. 44
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManagerContext.java
  23. 359
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
  24. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/pom.xml
  25. 32
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java
  26. 15
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
  27. 38
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
  28. 20
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
  29. 3
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-namespace.ts
  30. 2
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts
  31. 3
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
  32. 71
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  33. 14
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java

2
deploy/kubernetes/dolphinscheduler/values.yaml

@ -313,7 +313,7 @@ master:
MASTER_MAX_CPU_LOAD_AVG: "-1"
MASTER_RESERVED_MEMORY: "0.3"
MASTER_FAILOVER_INTERVAL: "10m"
MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER: "true"
MASTER_KILL_APPLICATION_WHEN_HANDLE_FAILOVER: "true"
worker:
## PodManagementPolicy controls how pods are created during initial scale up, when replacing pods on nodes, or when scaling down.

2
docs/docs/en/architecture/configuration.md

@ -276,7 +276,7 @@ Location: `master-server/conf/application.yaml`
|master.max-cpu-load-avg|-1|master max CPU load avg, only higher than the system CPU load average, master server can schedule. default value -1: the number of CPU cores * 2|
|master.reserved-memory|0.3|master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G|
|master.failover-interval|10|failover interval, the unit is minute|
|master.kill-yarn-job-when-task-failover|true|whether to kill yarn job when failover taskInstance|
|master.kill-application-when-task-failover|true|whether to kill yarn/k8s application when failover taskInstance|
|master.registry-disconnect-strategy.strategy|stop|Used when the master disconnect from registry, default value: stop. Optional values include stop, waiting|
|master.registry-disconnect-strategy.max-waiting-time|100s|Used when the master disconnect from registry, and the disconnect strategy is waiting, this config means the master will waiting to reconnect to registry in given times, and after the waiting times, if the master still cannot connect to registry, will stop itself, if the value is 0s, the Master will wait infinitely|
|master.worker-group-refresh-interval|10s|The interval to refresh worker group from db to memory|

2
docs/docs/zh/architecture/configuration.md

@ -271,7 +271,7 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
|master.max-cpu-load-avg|-1|master最大cpuload均值,只有高于系统cpuload均值时,master服务才能调度任务. 默认值为-1: cpu cores * 2|
|master.reserved-memory|0.3|master预留内存,只有低于系统可用内存时,master服务才能调度任务,单位为G|
|master.failover-interval|10|failover间隔,单位为分钟|
|master.kill-yarn-job-when-task-failover|true|当任务实例failover时,是否kill掉yarn job|
|master.kill-application-when-task-failover|true|当任务实例failover时,是否kill掉yarn或k8s application|
|master.registry-disconnect-strategy.strategy|stop|当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting|
|master.registry-disconnect-strategy.max-waiting-time|100s|当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 |
|master.master.worker-group-refresh-interval|10s|定期将workerGroup从数据库中同步到内存的时间间隔|

26
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResourceManagerType.java

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
public enum ResourceManagerType {
YARN,
KUBERNETES;
}

13
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.utils;
import static org.apache.dolphinscheduler.common.constants.Constants.DATA_BASEDIR_PATH;
import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S;
import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_VIEW_SUFFIXES;
import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_VIEW_SUFFIXES_DEFAULT_VALUE;
import static org.apache.dolphinscheduler.common.constants.Constants.UTF_8;
@ -49,6 +50,8 @@ public class FileUtils {
public static final String APPINFO_PATH = "appInfo.log";
public static final String KUBE_CONFIG_FILE = "config";
private FileUtils() {
throw new UnsupportedOperationException("Construct FileUtils");
}
@ -116,6 +119,16 @@ public class FileUtils {
taskInstanceId);
}
/**
* absolute path of kubernetes configuration file
*
* @param execPath
* @return
*/
public static String getKubeConfigPath(String execPath) {
return String.format(FORMAT_S_S, execPath, KUBE_CONFIG_FILE);
}
/**
* absolute path of appInfo file
*

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -88,7 +88,7 @@ public class MasterConfig implements Validator {
private double maxCpuLoadAvg = -1;
private double reservedMemory = 0.3;
private Duration failoverInterval = Duration.ofMinutes(10);
private boolean killYarnJobWhenTaskFailover = true;
private boolean killApplicationWhenTaskFailover = true;
private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties();
private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L);
@ -163,7 +163,7 @@ public class MasterConfig implements Validator {
log.info("Master config: maxCpuLoadAvg -> {} ", maxCpuLoadAvg);
log.info("Master config: reservedMemory -> {} ", reservedMemory);
log.info("Master config: failoverInterval -> {} ", failoverInterval);
log.info("Master config: killYarnJobWhenTaskFailover -> {} ", killYarnJobWhenTaskFailover);
log.info("Master config: killApplicationWhenTaskFailover -> {} ", killApplicationWhenTaskFailover);
log.info("Master config: registryDisconnectStrategy -> {} ", registryDisconnectStrategy);
log.info("Master config: masterAddress -> {} ", masterAddress);
log.info("Master config: masterRegistryPath -> {} ", masterRegistryPath);

51
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -25,8 +25,8 @@ import static org.apache.dolphinscheduler.common.constants.Constants.PASSWORD;
import static org.apache.dolphinscheduler.common.constants.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.constants.Constants.USER;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAMESPACE_NAME;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DATA_QUALITY;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SET_K8S;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_NAME;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TABLE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TYPE;
@ -72,6 +72,7 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceP
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.UdfFuncParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.JdbcUrlParser;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.plugin.task.spark.SparkParameters;
import org.apache.dolphinscheduler.server.master.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -328,11 +329,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
dataQualityTaskExecutionContext = new DataQualityTaskExecutionContext();
setDataQualityTaskRelation(dataQualityTaskExecutionContext, taskInstance, tenant.getTenantCode());
}
K8sTaskExecutionContext k8sTaskExecutionContext = null;
if (TASK_TYPE_SET_K8S.contains(taskInstance.getTaskType())) {
k8sTaskExecutionContext = new K8sTaskExecutionContext();
setK8sTaskRelation(k8sTaskExecutionContext, taskInstance);
}
K8sTaskExecutionContext k8sTaskExecutionContext = setK8sTaskRelation(taskInstance);
Map<String, Property> businessParamsMap = curingParamsService.preBuildBusinessParams(processInstance);
@ -635,18 +633,39 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
}
/**
* set k8s task relation
* @param k8sTaskExecutionContext k8sTaskExecutionContext
* get k8s task execution context based on task type and deploy mode
*
* @param taskInstance taskInstance
*/
private void setK8sTaskRelation(K8sTaskExecutionContext k8sTaskExecutionContext, TaskInstance taskInstance) {
K8sTaskParameters k8sTaskParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), K8sTaskParameters.class);
Map<String, String> namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace());
String clusterName = namespace.get(CLUSTER);
String configYaml = processService.findConfigYamlByName(clusterName);
if (configYaml != null) {
k8sTaskExecutionContext.setConfigYaml(configYaml);
private K8sTaskExecutionContext setK8sTaskRelation(TaskInstance taskInstance) {
K8sTaskExecutionContext k8sTaskExecutionContext = null;
String namespace = "";
switch (taskInstance.getTaskType()) {
case "K8S":
case "KUBEFLOW":
K8sTaskParameters k8sTaskParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), K8sTaskParameters.class);
namespace = k8sTaskParameters.getNamespace();
break;
case "SPARK":
SparkParameters sparkParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), SparkParameters.class);
if (StringUtils.isNotEmpty(sparkParameters.getNamespace())) {
namespace = sparkParameters.getNamespace();
}
break;
default:
break;
}
if (StringUtils.isNotEmpty(namespace)) {
String clusterName = JSONUtils.toMap(namespace).get(CLUSTER);
String configYaml = processService.findConfigYamlByName(clusterName);
if (configYaml != null) {
k8sTaskExecutionContext =
new K8sTaskExecutionContext(configYaml, JSONUtils.toMap(namespace).get(NAMESPACE_NAME));
}
}
return k8sTaskExecutionContext;
}
}

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java

@ -227,7 +227,7 @@ public class MasterFailoverService {
/**
* failover task instance
* <p>
* 1. kill yarn job if run on worker and there are yarn jobs in tasks.
* 1. kill yarn/k8s job if run on worker and there are yarn/k8s jobs in tasks.
* 2. change task state from running to need failover.
* 3. try to notify local master
*
@ -248,10 +248,10 @@ public class MasterFailoverService {
.buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
.create();
if (masterConfig.isKillYarnJobWhenTaskFailover()) {
// only kill yarn job if exists , the local thread has exited
log.info("TaskInstance failover begin kill the task related yarn job");
ProcessUtils.killYarnJob(logClient, taskExecutionContext);
if (masterConfig.isKillApplicationWhenTaskFailover()) {
// only kill yarn/k8s job if exists , the local thread has exited
log.info("TaskInstance failover begin kill the task related yarn or k8s job");
ProcessUtils.killApplication(logClient, taskExecutionContext);
}
// kill worker task, When the master failover and worker failover happened in the same time,
// the task may not be failover if we don't set NEED_FAULT_TOLERANCE.

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java

@ -154,7 +154,7 @@ public class WorkerFailoverService {
/**
* failover task instance
* <p>
* 1. kill yarn job if run on worker and there are yarn jobs in tasks.
* 1. kill yarn/k8s job if run on worker and there are yarn/k8s jobs in tasks.
* 2. change task state from running to need failover.
* 3. try to notify local master
*
@ -175,10 +175,10 @@ public class WorkerFailoverService {
.buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
.create();
if (masterConfig.isKillYarnJobWhenTaskFailover()) {
// only kill yarn job if exists , the local thread has exited
log.info("TaskInstance failover begin kill the task related yarn job");
ProcessUtils.killYarnJob(logClient, taskExecutionContext);
if (masterConfig.isKillApplicationWhenTaskFailover()) {
// only kill yarn/k8s job if exists , the local thread has exited
log.info("TaskInstance failover begin kill the task related yarn or k8s job");
ProcessUtils.killApplication(logClient, taskExecutionContext);
}
} else {
log.info("The failover taskInstance is a master task");

4
dolphinscheduler-master/src/main/resources/application.yaml

@ -108,8 +108,8 @@ master:
reserved-memory: 0.3
# failover interval, the unit is minute
failover-interval: 10m
# kill yarn jon when failover taskInstance, default true
kill-yarn-job-when-task-failover: true
# kill yarn / k8s application when failover taskInstance, default true
kill-application-when-task-failover: true
registry-disconnect-strategy:
# The disconnect strategy: stop, waiting
strategy: waiting

10
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClient;
@ -161,8 +162,8 @@ public class ProcessUtils {
* @param taskExecutionContext taskExecutionContext
* @return yarn application ids
*/
public static @Nullable List<String> killYarnJob(@NonNull LogClient logClient,
@NonNull TaskExecutionContext taskExecutionContext) {
public static @Nullable List<String> killApplication(@NonNull LogClient logClient,
@NonNull TaskExecutionContext taskExecutionContext) {
if (taskExecutionContext.getLogPath() == null) {
return Collections.emptyList();
}
@ -172,6 +173,7 @@ public class ProcessUtils {
List<String> appIds = logClient.getAppIds(host.getIp(), host.getPort(), taskExecutionContext.getLogPath(),
taskExecutionContext.getAppInfoPath());
if (CollectionUtils.isNotEmpty(appIds)) {
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, appIds));
if (StringUtils.isEmpty(taskExecutionContext.getExecutePath())) {
taskExecutionContext
.setExecutePath(FileUtils.getProcessExecDir(
@ -183,9 +185,7 @@ public class ProcessUtils {
taskExecutionContext.getTaskInstanceId()));
}
FileUtils.createWorkDirIfAbsent(taskExecutionContext.getExecutePath());
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(appIds, log,
taskExecutionContext.getTenantCode(),
taskExecutionContext.getExecutePath());
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(taskExecutionContext);
return appIds;
} else {
log.info("The current appId is empty, don't need to kill the yarn job, taskInstanceId: {}",

4
dolphinscheduler-standalone-server/src/main/resources/application.yaml

@ -146,8 +146,8 @@ master:
reserved-memory: 0.03
# failover interval
failover-interval: 10m
# kill yarn jon when failover taskInstance, default true
kill-yarn-job-when-task-failover: true
# kill yarn/k8s application when failover taskInstance, default true
kill-application-when-task-failover: true
worker-group-refresh-interval: 10s
worker:

20
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

@ -23,6 +23,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.get
import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
@ -38,6 +39,7 @@ import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@ -229,8 +231,11 @@ public abstract class AbstractCommandExecutor {
// waiting for the run to finish
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
TaskExecutionStatus kubernetesStatus =
ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());
// if SHELL task exit
if (status) {
if (status && kubernetesStatus.isSuccess()) {
// SHELL task state
result.setExitStatusCode(process.exitValue());
@ -334,7 +339,11 @@ public abstract class AbstractCommandExecutor {
LinkedBlockingQueue<String> markerLog = new LinkedBlockingQueue<>(1);
markerLog.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());
String logs = appendPodLogIfNeeded();
if (StringUtils.isNotEmpty(logs)) {
logBuffer.add("Dump logs from driver pod:");
logBuffer.add(logs);
}
if (!logBuffer.isEmpty()) {
// log handle
logHandler.accept(logBuffer);
@ -348,6 +357,13 @@ public abstract class AbstractCommandExecutor {
}
}
private String appendPodLogIfNeeded() {
if (Objects.isNull(taskRequest.getK8sTaskExecutionContext())) {
return "";
}
return ProcessUtils.getPodLog(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());
}
/**
* get the standard output of the process
*

20
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java

@ -19,26 +19,34 @@ package org.apache.dolphinscheduler.plugin.task.api;
import java.io.Serializable;
import lombok.Value;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* k8s Task ExecutionContext
*/
@Value
public class K8sTaskExecutionContext implements Serializable {
private String configYaml;
public String getConfigYaml() {
return configYaml;
}
private String namespace;
public void setConfigYaml(String configYaml) {
@JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
public K8sTaskExecutionContext(
@JsonProperty("configYaml") String configYaml,
@JsonProperty("namespace") String namespace) {
this.configYaml = configYaml;
this.namespace = namespace;
}
@Override
public String toString() {
return "K8sTaskExecutionContext{"
+ "configYaml='" + configYaml + '\''
+ "namespace=" + namespace
+ ", configYaml='" + configYaml + '\''
+ '}';
}
}

10
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java

@ -29,6 +29,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
@ -115,6 +116,15 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
sb.append(taskRequest.getEnvironmentConfig()).append(System.lineSeparator());
}
if (Objects.nonNull(taskRequest.getK8sTaskExecutionContext())) {
String configYaml = taskRequest.getK8sTaskExecutionContext().getConfigYaml();
Path kubeConfigPath = Paths.get(org.apache.dolphinscheduler.common.utils.FileUtils
.getKubeConfigPath(taskRequest.getExecutePath()));
FileUtils.createFileWith755(kubeConfigPath);
Files.write(kubeConfigPath, configYaml.getBytes(), StandardOpenOption.APPEND);
sb.append("export KUBECONFIG=" + kubeConfigPath).append(System.lineSeparator());
logger.info("Create kubernetes configuration file: {}.", kubeConfigPath);
}
}
sb.append(execCommand);
String commandContent = sb.toString();

7
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java

@ -430,7 +430,7 @@ public class TaskConstants {
public static final String TASK_TYPE_DATA_QUALITY = "DATA_QUALITY";
public static final String TASK_TYPE_K8S = "K8S";
public static final String DEPLOY_MODE_KUBERNETES = "Kubernetes";
public static final Set<String> TASK_TYPE_SET_K8S = Sets.newHashSet("K8S", "KUBEFLOW");
@ -480,6 +480,11 @@ public class TaskConstants {
public static final String CLUSTER = "cluster";
public static final Pattern COMMAND_SPLIT_REGEX = Pattern.compile("[^\\s\"'`]+|\"([^\"]+)\"|'([^']+)'|`([^`]+)`");
/**
* spark / flink on k8s label name
*/
public static final String UNIQUE_LABEL_NAME = "dolphinscheduler-label";
/**
* conda config used by jupyter task plugin
*/

39
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/ApplicationManager.java

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.am;
import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
public interface ApplicationManager {
/**
* kill application by application manager context
*
* @param applicationManagerContext
* @return
*/
boolean killApplication(ApplicationManagerContext applicationManagerContext);
/**
* get resource manager type
*
* @return ResourceManagerType yarn / kubernetes
*/
ResourceManagerType getResourceManagerType();
}

22
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/ApplicationManagerContext.java

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.am;
public interface ApplicationManagerContext {
}

198
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java

@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.am;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME;
import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import com.google.auto.service.AutoService;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
@Slf4j
@AutoService(ApplicationManager.class)
public class KubernetesApplicationManager implements ApplicationManager {
private static final String PENDING = "Pending";
private static final String RUNNING = "Running";
private static final String FINISH = "Succeeded";
private static final String FAILED = "Failed";
private static final String UNKNOWN = "Unknown";
/**
* cache k8s client for same task
*/
private final Map<String, KubernetesClient> cacheClientMap = new ConcurrentHashMap<>();
@Override
public boolean killApplication(ApplicationManagerContext applicationManagerContext) throws TaskException {
KubernetesApplicationManagerContext kubernetesApplicationManagerContext =
(KubernetesApplicationManagerContext) applicationManagerContext;
boolean isKill;
String labelValue = kubernetesApplicationManagerContext.getLabelValue();
FilterWatchListDeletable<Pod, PodList> watchList = getDriverPod(kubernetesApplicationManagerContext);
try {
if (getApplicationStatus(kubernetesApplicationManagerContext, watchList).isFailure()) {
log.error("Driver pod is in FAILED or UNKNOWN status.");
isKill = false;
} else {
watchList.delete();
isKill = true;
}
} catch (Exception e) {
throw new TaskException("Failed to kill Kubernetes application with label " + labelValue, e);
} finally {
// remove client cache after killing application
removeCache(labelValue);
}
return isKill;
}
@Override
public ResourceManagerType getResourceManagerType() {
return ResourceManagerType.KUBERNETES;
}
/**
* get driver pod
*
* @param kubernetesApplicationManagerContext
* @return
*/
private FilterWatchListDeletable<Pod, PodList> getDriverPod(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) {
KubernetesClient client = getClient(kubernetesApplicationManagerContext);
String labelValue = kubernetesApplicationManagerContext.getLabelValue();
FilterWatchListDeletable<Pod, PodList> watchList =
client.pods()
.inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace())
.withLabel(UNIQUE_LABEL_NAME, labelValue);
List<Pod> podList = watchList.list().getItems();
if (podList.size() != 1) {
log.warn("Expected driver pod 1, but get {}.", podList.size());
}
return watchList;
}
/**
* create client or get from cache map
*
* @param kubernetesApplicationManagerContext
* @return
*/
private KubernetesClient getClient(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) {
K8sTaskExecutionContext k8sTaskExecutionContext =
kubernetesApplicationManagerContext.getK8sTaskExecutionContext();
return cacheClientMap.computeIfAbsent(kubernetesApplicationManagerContext.getLabelValue(),
key -> new DefaultKubernetesClient(Config.fromKubeconfig(k8sTaskExecutionContext.getConfigYaml())));
}
public void removeCache(String cacheKey) {
try (KubernetesClient ignored = cacheClientMap.remove(cacheKey)) {
}
}
/**
* get application execution status
*
* @param kubernetesApplicationManagerContext
* @return TaskExecutionStatus SUCCESS / FAILURE
* @throws TaskException
*/
public TaskExecutionStatus getApplicationStatus(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) throws TaskException {
return getApplicationStatus(kubernetesApplicationManagerContext, null);
}
/**
* get application (driver pod) status
*
* @param kubernetesApplicationManagerContext
* @param watchList
* @return
* @throws TaskException
*/
private TaskExecutionStatus getApplicationStatus(KubernetesApplicationManagerContext kubernetesApplicationManagerContext,
FilterWatchListDeletable<Pod, PodList> watchList) throws TaskException {
String phase;
try {
if (Objects.isNull(watchList)) {
watchList = getDriverPod(kubernetesApplicationManagerContext);
}
List<Pod> driverPod = watchList.list().getItems();
if (!driverPod.isEmpty()) {
// cluster mode
Pod driver = driverPod.get(0);
phase = driver.getStatus().getPhase();
} else {
// client mode
phase = FINISH;
}
} catch (Exception e) {
throw new TaskException("Failed to get Kubernetes application status", e);
}
return phase.equals(FAILED) || phase.equals(UNKNOWN) ? TaskExecutionStatus.FAILURE
: TaskExecutionStatus.SUCCESS;
}
/**
* collect pod's log
*
* @param kubernetesApplicationManagerContext
* @return
*/
public String collectPodLog(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) {
try {
KubernetesClient client = getClient(kubernetesApplicationManagerContext);
FilterWatchListDeletable<Pod, PodList> watchList = getDriverPod(kubernetesApplicationManagerContext);
List<Pod> driverPod = watchList.list().getItems();
if (!driverPod.isEmpty()) {
Pod driver = driverPod.get(0);
String driverPodName = driver.getMetadata().getName();
String logs = client.pods()
.inNamespace(kubernetesApplicationManagerContext.getK8sTaskExecutionContext().getNamespace())
.withName(driverPodName).getLog();
// delete driver pod only after successful execution
killApplication(kubernetesApplicationManagerContext);
return logs;
}
} catch (Exception e) {
log.error("Collect pod log failed:", e.getMessage());
}
return "";
}
}

39
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManagerContext.java

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.am;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import lombok.RequiredArgsConstructor;
import lombok.Value;
@Value
@RequiredArgsConstructor
public class KubernetesApplicationManagerContext implements ApplicationManagerContext {
/**
* kubernetes execution context
*/
private final K8sTaskExecutionContext k8sTaskExecutionContext;
/**
* driver pod label value
*/
private final String labelValue;
}

326
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java

@ -0,0 +1,326 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.am;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
import org.apache.dolphinscheduler.common.utils.HttpUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.KerberosHttpClient;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auto.service.AutoService;
@Slf4j
@AutoService(ApplicationManager.class)
public class YarnApplicationManager implements ApplicationManager {
private static final String RM_HA_IDS = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
private static final String APP_ADDRESS = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
private static final String JOB_HISTORY_ADDRESS =
PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS);
private static final int HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE =
PropertyUtils.getInt(Constants.HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT, 8088);
@Override
public boolean killApplication(ApplicationManagerContext applicationManagerContext) throws TaskException {
YarnApplicationManagerContext yarnApplicationManagerContext =
(YarnApplicationManagerContext) applicationManagerContext;
String executePath = yarnApplicationManagerContext.getExecutePath();
String tenantCode = yarnApplicationManagerContext.getTenantCode();
List<String> appIds = yarnApplicationManagerContext.getAppIds();
for (String appId : appIds) {
try {
TaskExecutionStatus applicationStatus = getApplicationStatus(appId);
if (!applicationStatus.isFinished()) {
String commandFile = String.format("%s/%s.kill", executePath, appId);
String cmd = getKerberosInitCommand() + "yarn application -kill " + appId;
execYarnKillCommand(tenantCode, appId, commandFile, cmd);
}
} catch (Exception e) {
log.error("Get yarn application app id [{}}] status failed", appId, e);
throw new TaskException(e.getMessage());
}
}
return true;
}
@Override
public ResourceManagerType getResourceManagerType() {
return ResourceManagerType.YARN;
}
/**
* get the state of an application
*
* @param applicationId application id
* @return the return may be null or there may be other parse exceptions
*/
public TaskExecutionStatus getApplicationStatus(String applicationId) throws TaskException {
if (StringUtils.isEmpty(applicationId)) {
return null;
}
String result;
String applicationUrl = getApplicationUrl(applicationId);
log.debug("generate yarn application url, applicationUrl={}", applicationUrl);
String responseContent = Boolean.TRUE
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false))
? KerberosHttpClient.get(applicationUrl)
: HttpUtils.get(applicationUrl);
if (responseContent != null) {
ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
if (!jsonObject.has("app")) {
return TaskExecutionStatus.FAILURE;
}
result = jsonObject.path("app").path("finalStatus").asText();
} else {
// may be in job history
String jobHistoryUrl = getJobHistoryUrl(applicationId);
log.debug("generate yarn job history application url, jobHistoryUrl={}", jobHistoryUrl);
responseContent = Boolean.TRUE
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false))
? KerberosHttpClient.get(jobHistoryUrl)
: HttpUtils.get(jobHistoryUrl);
if (null != responseContent) {
ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
if (!jsonObject.has("job")) {
return TaskExecutionStatus.FAILURE;
}
result = jsonObject.path("job").path("state").asText();
} else {
return TaskExecutionStatus.FAILURE;
}
}
return getExecutionStatus(result);
}
/**
* get application url
* if rmHaIds contains xx, it signs not use resourcemanager
* otherwise:
* if rmHaIds is empty, single resourcemanager enabled
* if rmHaIds not empty: resourcemanager HA enabled
*
* @param applicationId application id
* @return url of application
*/
private String getApplicationUrl(String applicationId) throws TaskException {
String appUrl = StringUtils.isEmpty(RM_HA_IDS) ? APP_ADDRESS : getAppAddress(APP_ADDRESS, RM_HA_IDS);
if (StringUtils.isBlank(appUrl)) {
throw new TaskException("yarn application url generation failed");
}
log.debug("yarn application url:{}, applicationId:{}", appUrl, applicationId);
return String.format(appUrl, HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE, applicationId);
}
private String getJobHistoryUrl(String applicationId) {
// eg:application_1587475402360_712719 -> job_1587475402360_712719
String jobId = applicationId.replace("application", "job");
return String.format(JOB_HISTORY_ADDRESS, jobId);
}
/**
* build kill command for yarn application
*
* @param tenantCode tenant code
* @param appId app id
* @param commandFile command file
* @param cmd cmd
*/
private void execYarnKillCommand(String tenantCode, String appId, String commandFile,
String cmd) {
try {
StringBuilder sb = new StringBuilder();
sb.append("#!/bin/sh\n");
sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
sb.append("cd $BASEDIR\n");
sb.append("\n\n");
sb.append(cmd);
File f = new File(commandFile);
if (!f.exists()) {
org.apache.commons.io.FileUtils.writeStringToFile(new File(commandFile), sb.toString(),
StandardCharsets.UTF_8);
}
String runCmd = String.format("%s %s", Constants.SH, commandFile);
runCmd = org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd);
log.info("kill cmd:{}", runCmd);
org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd);
} catch (Exception e) {
log.error(String.format("Kill yarn application app id [%s] failed: [%s]", appId, e.getMessage()));
}
}
private TaskExecutionStatus getExecutionStatus(String result) {
switch (result) {
case Constants.ACCEPTED:
return TaskExecutionStatus.SUBMITTED_SUCCESS;
case Constants.SUCCEEDED:
case Constants.ENDED:
return TaskExecutionStatus.SUCCESS;
case Constants.NEW:
case Constants.NEW_SAVING:
case Constants.SUBMITTED:
case Constants.FAILED:
return TaskExecutionStatus.FAILURE;
case Constants.KILLED:
return TaskExecutionStatus.KILL;
case Constants.RUNNING:
default:
return TaskExecutionStatus.RUNNING_EXECUTION;
}
}
/**
* getAppAddress
*
* @param appAddress app address
* @param rmHa resource manager ha
* @return app address
*/
private String getAppAddress(String appAddress, String rmHa) {
String[] split1 = appAddress.split(Constants.DOUBLE_SLASH);
if (split1.length != 2) {
return null;
}
String start = split1[0] + Constants.DOUBLE_SLASH;
String[] split2 = split1[1].split(Constants.COLON);
if (split2.length != 2) {
return null;
}
String end = Constants.COLON + split2[1];
// get active ResourceManager
String activeRM = YarnHAAdminUtils.getActiveRMName(start, rmHa);
if (StringUtils.isEmpty(activeRM)) {
return null;
}
return start + activeRM + end;
}
/**
* get kerberos init command
*/
private String getKerberosInitCommand() {
log.info("get kerberos init command");
StringBuilder kerberosCommandBuilder = new StringBuilder();
boolean hadoopKerberosState =
PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
if (hadoopKerberosState) {
kerberosCommandBuilder.append("export KRB5_CONFIG=")
.append(PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH))
.append("\n\n")
.append(String.format("kinit -k -t %s %s || true",
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH),
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME)))
.append("\n\n");
log.info("kerberos init command: {}", kerberosCommandBuilder);
}
return kerberosCommandBuilder.toString();
}
/**
* yarn ha admin utils
*/
private static final class YarnHAAdminUtils {
/**
* get active resourcemanager node
*
* @param protocol http protocol
* @param rmIds yarn ha ids
* @return yarn active node
*/
public static String getActiveRMName(String protocol, String rmIds) {
String[] rmIdArr = rmIds.split(Constants.COMMA);
String yarnUrl = protocol + "%s:" + HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE + "/ws/v1/cluster/info";
try {
/**
* send http get request to rm
*/
for (String rmId : rmIdArr) {
String state = getRMState(String.format(yarnUrl, rmId));
if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) {
return rmId;
}
}
} catch (Exception e) {
log.error("yarn ha application url generation failed, message:{}", e.getMessage());
}
return null;
}
/**
* get ResourceManager state
*/
public static String getRMState(String url) {
String retStr = Boolean.TRUE
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false))
? KerberosHttpClient.get(url)
: HttpUtils.get(url);
if (StringUtils.isEmpty(retStr)) {
return null;
}
// to json
ObjectNode jsonObject = JSONUtils.parseObject(retStr);
// get ResourceManager state
if (!jsonObject.has("clusterInfo")) {
return null;
}
return jsonObject.get("clusterInfo").path("haState").asText();
}
}
}

44
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManagerContext.java

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.am;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.Value;
@Value
@RequiredArgsConstructor
public class YarnApplicationManagerContext implements ApplicationManagerContext {
/**
* execute path
*/
private final String executePath;
/**
* tenant code
*/
private final String tenantCode;
/**
* appId list
*/
private final List<String> appIds;
}

359
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java

@ -17,32 +17,39 @@
package org.apache.dolphinscheduler.plugin.task.api.utils;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.exception.BaseException;
import org.apache.dolphinscheduler.common.utils.HttpUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.KerberosHttpClient;
import static org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLECT;
import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.COMMA;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SET_K8S;
import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.am.ApplicationManager;
import org.apache.dolphinscheduler.plugin.task.api.am.KubernetesApplicationManager;
import org.apache.dolphinscheduler.plugin.task.api.am.KubernetesApplicationManagerContext;
import org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManagerContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import com.fasterxml.jackson.databind.node.ObjectNode;
@Slf4j
public final class ProcessUtils {
@ -50,6 +57,14 @@ public final class ProcessUtils {
throw new IllegalStateException("Utility class");
}
private static final Map<ResourceManagerType, ApplicationManager> applicationManagerMap = new HashMap<>();
static {
ServiceLoader.load(ApplicationManager.class)
.forEach(applicationManager -> applicationManagerMap.put(applicationManager.getResourceManagerType(),
applicationManager));
}
/**
* Initialization regularization, solve the problem of pre-compilation performance,
* avoid the thread safety problem of multi-thread operation
@ -61,13 +76,6 @@ public final class ProcessUtils {
*/
private static final Pattern WINDOWSATTERN = Pattern.compile("(\\d+)");
private static final String RM_HA_IDS = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
private static final String APP_ADDRESS = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
private static final String JOB_HISTORY_ADDRESS =
PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS);
private static final int HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE =
PropertyUtils.getInt(Constants.HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT, 8088);
/**
* kill tasks according to different task types.
*/
@ -124,278 +132,83 @@ public final class ProcessUtils {
}
/**
* kill yarn application.
* cacel k8s / yarn application
*
* @param appIds app id list
* @param logger logger
* @param tenantCode tenant code
* @param executePath execute path
* @param taskExecutionContext
* @return
*/
public static void cancelApplication(List<String> appIds, Logger logger, String tenantCode, String executePath) {
if (appIds == null || appIds.isEmpty()) {
return;
}
for (String appId : appIds) {
try {
TaskExecutionStatus applicationStatus = getApplicationStatus(appId);
if (!applicationStatus.isFinished()) {
String commandFile = String.format("%s/%s.kill", executePath, appId);
String cmd = getKerberosInitCommand() + "yarn application -kill " + appId;
execYarnKillCommand(logger, tenantCode, appId, commandFile, cmd);
public static void cancelApplication(TaskExecutionContext taskExecutionContext) {
try {
if (Objects.nonNull(taskExecutionContext.getK8sTaskExecutionContext()) &&
!TASK_TYPE_SET_K8S.contains(taskExecutionContext.getTaskType())) {
applicationManagerMap.get(ResourceManagerType.KUBERNETES)
.killApplication(new KubernetesApplicationManagerContext(
taskExecutionContext.getK8sTaskExecutionContext(),
taskExecutionContext.getTaskAppId()));
} else {
String host = taskExecutionContext.getHost();
String executePath = taskExecutionContext.getExecutePath();
String tenantCode = taskExecutionContext.getTenantCode();
List<String> appIds;
if (StringUtils.isNotEmpty(taskExecutionContext.getAppIds())) {
// is failover
appIds = Arrays.asList(taskExecutionContext.getAppIds().split(COMMA));
} else {
String logPath = taskExecutionContext.getLogPath();
String appInfoPath = taskExecutionContext.getAppInfoPath();
if (logPath == null || appInfoPath == null || executePath == null || tenantCode == null) {
log.error(
"Kill yarn job error, the input params is illegal, host: {}, logPath: {}, appInfoPath: {}, executePath: {}, tenantCode: {}",
host, logPath, appInfoPath, executePath, tenantCode);
throw new TaskException("Cancel application failed!");
}
log.info("Get appIds from worker {}, taskLogPath: {}", host, logPath);
appIds = LogUtils.getAppIds(logPath, appInfoPath,
PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, appIds));
}
} catch (Exception e) {
log.error("Get yarn application app id [{}}] status failed", appId, e);
}
}
}
/**
* get the state of an application
*
* @param applicationId application id
* @return the return may be null or there may be other parse exceptions
*/
public static TaskExecutionStatus getApplicationStatus(String applicationId) throws BaseException {
if (StringUtils.isEmpty(applicationId)) {
return null;
}
String result;
String applicationUrl = getApplicationUrl(applicationId);
log.debug("generate yarn application url, applicationUrl={}", applicationUrl);
String responseContent = Boolean.TRUE
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false))
? KerberosHttpClient.get(applicationUrl)
: HttpUtils.get(applicationUrl);
if (responseContent != null) {
ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
if (!jsonObject.has("app")) {
return TaskExecutionStatus.FAILURE;
}
result = jsonObject.path("app").path("finalStatus").asText();
} else {
// may be in job history
String jobHistoryUrl = getJobHistoryUrl(applicationId);
log.debug("generate yarn job history application url, jobHistoryUrl={}", jobHistoryUrl);
responseContent = Boolean.TRUE
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false))
? KerberosHttpClient.get(jobHistoryUrl)
: HttpUtils.get(jobHistoryUrl);
if (null != responseContent) {
ObjectNode jsonObject = JSONUtils.parseObject(responseContent);
if (!jsonObject.has("job")) {
return TaskExecutionStatus.FAILURE;
if (CollectionUtils.isEmpty(appIds)) {
log.info("The appId is empty");
}
result = jsonObject.path("job").path("state").asText();
} else {
return TaskExecutionStatus.FAILURE;
ApplicationManager applicationManager = applicationManagerMap.get(ResourceManagerType.YARN);
applicationManager.killApplication(new YarnApplicationManagerContext(executePath, tenantCode, appIds));
}
}
return getExecutionStatus(result);
}
/**
* get application url
* if rmHaIds contains xx, it signs not use resourcemanager
* otherwise:
* if rmHaIds is empty, single resourcemanager enabled
* if rmHaIds not empty: resourcemanager HA enabled
*
* @param applicationId application id
* @return url of application
*/
private static String getApplicationUrl(String applicationId) throws BaseException {
String appUrl = StringUtils.isEmpty(RM_HA_IDS) ? APP_ADDRESS : getAppAddress(APP_ADDRESS, RM_HA_IDS);
if (StringUtils.isBlank(appUrl)) {
throw new BaseException("yarn application url generation failed");
}
log.debug("yarn application url:{}, applicationId:{}", appUrl, applicationId);
return String.format(appUrl, HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE, applicationId);
}
private static String getJobHistoryUrl(String applicationId) {
// eg:application_1587475402360_712719 -> job_1587475402360_712719
String jobId = applicationId.replace("application", "job");
return String.format(JOB_HISTORY_ADDRESS, jobId);
}
/**
* build kill command for yarn application
*
* @param logger logger
* @param tenantCode tenant code
* @param appId app id
* @param commandFile command file
* @param cmd cmd
*/
private static void execYarnKillCommand(Logger logger, String tenantCode, String appId, String commandFile,
String cmd) {
try {
StringBuilder sb = new StringBuilder();
sb.append("#!/bin/sh\n");
sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
sb.append("cd $BASEDIR\n");
sb.append("\n\n");
sb.append(cmd);
File f = new File(commandFile);
if (!f.exists()) {
org.apache.commons.io.FileUtils.writeStringToFile(new File(commandFile), sb.toString(),
StandardCharsets.UTF_8);
}
String runCmd = String.format("%s %s", Constants.SH, commandFile);
runCmd = org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd);
log.info("kill cmd:{}", runCmd);
org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd);
} catch (Exception e) {
log.error(String.format("Kill yarn application app id [%s] failed: [%s]", appId, e.getMessage()));
}
}
private static TaskExecutionStatus getExecutionStatus(String result) {
switch (result) {
case Constants.ACCEPTED:
return TaskExecutionStatus.SUBMITTED_SUCCESS;
case Constants.SUCCEEDED:
case Constants.ENDED:
return TaskExecutionStatus.SUCCESS;
case Constants.NEW:
case Constants.NEW_SAVING:
case Constants.SUBMITTED:
case Constants.FAILED:
return TaskExecutionStatus.FAILURE;
case Constants.KILLED:
return TaskExecutionStatus.KILL;
case Constants.RUNNING:
default:
return TaskExecutionStatus.RUNNING_EXECUTION;
log.error("Cancel application failed: {}", e.getMessage());
}
}
/**
* getAppAddress
* get k8s application status
*
* @param appAddress app address
* @param rmHa resource manager ha
* @return app address
* @param k8sTaskExecutionContext
* @param taskAppId
* @return
*/
private static String getAppAddress(String appAddress, String rmHa) {
String[] split1 = appAddress.split(Constants.DOUBLE_SLASH);
if (split1.length != 2) {
return null;
}
String start = split1[0] + Constants.DOUBLE_SLASH;
String[] split2 = split1[1].split(Constants.COLON);
if (split2.length != 2) {
return null;
}
String end = Constants.COLON + split2[1];
// get active ResourceManager
String activeRM = YarnHAAdminUtils.getActiveRMName(start, rmHa);
if (StringUtils.isEmpty(activeRM)) {
return null;
}
return start + activeRM + end;
public static TaskExecutionStatus getApplicationStatus(K8sTaskExecutionContext k8sTaskExecutionContext,
String taskAppId) {
if (Objects.isNull(k8sTaskExecutionContext)) {
return TaskExecutionStatus.SUCCESS;
}
KubernetesApplicationManager applicationManager =
(KubernetesApplicationManager) applicationManagerMap.get(ResourceManagerType.KUBERNETES);
return applicationManager
.getApplicationStatus(new KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId));
}
/**
* get kerberos init command
*/
private static String getKerberosInitCommand() {
log.info("get kerberos init command");
StringBuilder kerberosCommandBuilder = new StringBuilder();
boolean hadoopKerberosState =
PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
if (hadoopKerberosState) {
kerberosCommandBuilder.append("export KRB5_CONFIG=")
.append(PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH))
.append("\n\n")
.append(String.format("kinit -k -t %s %s || true",
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH),
PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME)))
.append("\n\n");
log.info("kerberos init command: {}", kerberosCommandBuilder);
}
return kerberosCommandBuilder.toString();
}
/**
* yarn ha admin utils
* get driver pod logs
*
* @param k8sTaskExecutionContext
* @param taskAppId
* @return
*/
private static final class YarnHAAdminUtils {
/**
* get active resourcemanager node
*
* @param protocol http protocol
* @param rmIds yarn ha ids
* @return yarn active node
*/
public static String getActiveRMName(String protocol, String rmIds) {
String[] rmIdArr = rmIds.split(Constants.COMMA);
public static String getPodLog(K8sTaskExecutionContext k8sTaskExecutionContext, String taskAppId) {
KubernetesApplicationManager applicationManager =
(KubernetesApplicationManager) applicationManagerMap.get(ResourceManagerType.KUBERNETES);
String yarnUrl = protocol + "%s:" + HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE + "/ws/v1/cluster/info";
try {
/**
* send http get request to rm
*/
for (String rmId : rmIdArr) {
String state = getRMState(String.format(yarnUrl, rmId));
if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) {
return rmId;
}
}
} catch (Exception e) {
log.error("yarn ha application url generation failed, message:{}", e.getMessage());
}
return null;
}
/**
* get ResourceManager state
*/
public static String getRMState(String url) {
String retStr = Boolean.TRUE
.equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false))
? KerberosHttpClient.get(url)
: HttpUtils.get(url);
if (StringUtils.isEmpty(retStr)) {
return null;
}
// to json
ObjectNode jsonObject = JSONUtils.parseObject(retStr);
// get ResourceManager state
if (!jsonObject.has("clusterInfo")) {
return null;
}
return jsonObject.get("clusterInfo").path("haState").asText();
}
return applicationManager
.collectPodLog(new KubernetesApplicationManagerContext(k8sTaskExecutionContext, taskAppId));
}
}

4
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/pom.xml

@ -37,5 +37,9 @@
<artifactId>dolphinscheduler-task-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
</dependency>
</dependencies>
</project>

32
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java

@ -43,14 +43,14 @@ public class SparkConstants {
public static final String DEPLOY_MODE_LOCAL = "local";
/**
* --driver-cores NUM
* --conf spark.driver.cores NUM
*/
public static final String DRIVER_CORES = "--driver-cores";
public static final String DRIVER_CORES = "--conf spark.driver.cores=%d";
/**
* --driver-memory MEM
* --conf spark.driver.memory MEM
*/
public static final String DRIVER_MEMORY = "--driver-memory";
public static final String DRIVER_MEMORY = "--conf spark.driver.memory=%s";
/**
* master
@ -59,20 +59,32 @@ public class SparkConstants {
public static final String SPARK_ON_YARN = "yarn";
public static final String SPARK_ON_K8S_MASTER_PREFIX = "k8s://";
/**
* add label for driver pod
*/
public static final String DRIVER_LABEL_CONF = "--conf spark.kubernetes.driver.label.%s=%s";
/**
* spark kubernetes namespace
*/
public static final String SPARK_KUBERNETES_NAMESPACE = "--conf spark.kubernetes.namespace=%s";
/**
* --num-executors NUM
* --conf spark.executor.instances NUM
*/
public static final String NUM_EXECUTORS = "--num-executors";
public static final String NUM_EXECUTORS = "--conf spark.executor.instances=%d";
/**
* --executor-cores NUM
* --conf spark.executor.cores NUM
*/
public static final String EXECUTOR_CORES = "--executor-cores";
public static final String EXECUTOR_CORES = "--conf spark.executor.cores=%d";
/**
* --executor-memory MEM
* --conf spark.executor.memory MEM
*/
public static final String EXECUTOR_MEMORY = "--executor-memory";
public static final String EXECUTOR_MEMORY = "--conf spark.executor.memory=%s";
/**
* -f <filename> SQL from files

15
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java

@ -39,7 +39,7 @@ public class SparkParameters extends AbstractParameters {
private String mainClass;
/**
* deploy mode
* deploy mode local / cluster / client
*/
private String deployMode;
@ -100,6 +100,19 @@ public class SparkParameters extends AbstractParameters {
*/
private String rawScript;
/**
* kubernetes cluster namespace
*/
private String namespace;
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
/**
* resource list
*/

38
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java

@ -17,7 +17,12 @@
package org.apache.dolphinscheduler.plugin.task.spark;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAMESPACE_NAME;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME;
import static org.apache.dolphinscheduler.plugin.task.spark.SparkConstants.DRIVER_LABEL_CONF;
import static org.apache.dolphinscheduler.plugin.task.spark.SparkConstants.SPARK_KUBERNETES_NAMESPACE;
import static org.apache.dolphinscheduler.plugin.task.spark.SparkConstants.SPARK_ON_K8S_MASTER_PREFIX;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
@ -45,6 +50,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import io.fabric8.kubernetes.client.Config;
public class SparkTask extends AbstractYarnTask {
/**
@ -132,8 +139,15 @@ public class SparkTask extends AbstractYarnTask {
String deployMode = StringUtils.isNotEmpty(sparkParameters.getDeployMode()) ? sparkParameters.getDeployMode()
: SparkConstants.DEPLOY_MODE_LOCAL;
boolean onNativeKubernetes = StringUtils.isNotEmpty(sparkParameters.getNamespace());
String masterUrl = onNativeKubernetes ? SPARK_ON_K8S_MASTER_PREFIX +
Config.fromKubeconfig(taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml()).getMasterUrl()
: SparkConstants.SPARK_ON_YARN;
if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
args.add(SparkConstants.SPARK_ON_YARN);
args.add(masterUrl);
args.add(SparkConstants.DEPLOY_MODE);
}
args.add(deployMode);
@ -168,6 +182,13 @@ public class SparkTask extends AbstractYarnTask {
args.add(others);
}
// add driver label for spark on native kubernetes
if (onNativeKubernetes) {
args.add(String.format(DRIVER_LABEL_CONF, UNIQUE_LABEL_NAME, taskExecutionContext.getTaskAppId()));
args.add(String.format(SPARK_KUBERNETES_NAMESPACE,
JSONUtils.toMap(sparkParameters.getNamespace()).get(NAMESPACE_NAME)));
}
ResourceInfo mainJar = sparkParameters.getMainJar();
if (programType != ProgramType.SQL) {
args.add(mainJar.getRes());
@ -189,32 +210,27 @@ public class SparkTask extends AbstractYarnTask {
private void populateSparkResourceDefinitions(List<String> args) {
int driverCores = sparkParameters.getDriverCores();
if (driverCores > 0) {
args.add(SparkConstants.DRIVER_CORES);
args.add(String.format("%d", driverCores));
args.add(String.format(SparkConstants.DRIVER_CORES, driverCores));
}
String driverMemory = sparkParameters.getDriverMemory();
if (StringUtils.isNotEmpty(driverMemory)) {
args.add(SparkConstants.DRIVER_MEMORY);
args.add(driverMemory);
args.add(String.format(SparkConstants.DRIVER_MEMORY, driverMemory));
}
int numExecutors = sparkParameters.getNumExecutors();
if (numExecutors > 0) {
args.add(SparkConstants.NUM_EXECUTORS);
args.add(String.format("%d", numExecutors));
args.add(String.format(SparkConstants.NUM_EXECUTORS, numExecutors));
}
int executorCores = sparkParameters.getExecutorCores();
if (executorCores > 0) {
args.add(SparkConstants.EXECUTOR_CORES);
args.add(String.format("%d", executorCores));
args.add(String.format(SparkConstants.EXECUTOR_CORES, executorCores));
}
String executorMemory = sparkParameters.getExecutorMemory();
if (StringUtils.isNotEmpty(executorMemory)) {
args.add(SparkConstants.EXECUTOR_MEMORY);
args.add(executorMemory);
args.add(String.format(SparkConstants.EXECUTOR_MEMORY, executorMemory));
}
}

20
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java

@ -46,11 +46,11 @@ public class SparkTaskTest {
"${SPARK_HOME}/bin/spark-sql " +
"--master yarn " +
"--deploy-mode client " +
"--driver-cores 1 " +
"--driver-memory 512M " +
"--num-executors 2 " +
"--executor-cores 2 " +
"--executor-memory 1G " +
"--conf spark.driver.cores=1 " +
"--conf spark.driver.memory=512M " +
"--conf spark.executor.instances=2 " +
"--conf spark.executor.cores=2 " +
"--conf spark.executor.memory=1G " +
"--name sparksql " +
"-f /tmp/5536_node.sql");
}
@ -67,11 +67,11 @@ public class SparkTaskTest {
"--master yarn " +
"--deploy-mode client " +
"--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " +
"--driver-cores 1 " +
"--driver-memory 512M " +
"--num-executors 2 " +
"--executor-cores 2 " +
"--executor-memory 1G " +
"--conf spark.driver.cores=1 " +
"--conf spark.driver.memory=512M " +
"--conf spark.executor.instances=2 " +
"--conf spark.executor.cores=2 " +
"--conf spark.executor.memory=1G " +
"--name spark " +
"lib/dolphinscheduler-task-spark.jar");
}

3
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-namespace.ts

@ -57,7 +57,8 @@ export function useNamespace(): IJsonItem {
name: t('project.node.namespace_cluster'),
props: {
loading,
'render-label': renderLabel
'render-label': renderLabel,
'clearable': true
},
options: [
{

2
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-spark.ts

@ -25,6 +25,7 @@ import {
useExecutorMemory,
useExecutorCores,
useMainJar,
useNamespace,
useResources
} from '.'
import type { IJsonItem } from '../types'
@ -90,6 +91,7 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] {
}
},
useDeployMode(24, ref(true), showCluster),
useNamespace(),
{
type: 'input',
field: 'appName',

3
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -60,6 +60,9 @@ export function formatParams(data: INodeData): {
taskParams.appName = data.appName
taskParams.mainArgs = data.mainArgs
taskParams.others = data.others
if (data.namespace) {
taskParams.namespace = data.namespace
}
}
if (data.taskType === 'SPARK') {

71
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -17,14 +17,11 @@
package org.apache.dolphinscheduler.server.worker.processor;
import static org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLECT;
import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@ -35,17 +32,11 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
import org.apache.commons.collections4.CollectionUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@ -55,7 +46,6 @@ import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.micrometer.core.lang.NonNull;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@ -113,11 +103,10 @@ public class TaskKillProcessor implements NettyRequestProcessor {
// if processId > 0, it should call cancelApplication to cancel remote application too.
this.cancelApplication(taskInstanceId);
Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
boolean result = doKill(taskExecutionContext);
taskExecutionContext.setCurrentExecutionStatus(
result.getLeft() ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight()));
result ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
sendTaskKillResponseCommand(channel, taskExecutionContext);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
@ -155,17 +144,17 @@ public class TaskKillProcessor implements NettyRequestProcessor {
*
* @return kill result
*/
private Pair<Boolean, List<String>> doKill(TaskExecutionContext taskExecutionContext) {
private boolean doKill(TaskExecutionContext taskExecutionContext) {
// kill system process
boolean processFlag = killProcess(taskExecutionContext.getTenantCode(), taskExecutionContext.getProcessId());
// find log and kill yarn job
Pair<Boolean, List<String>> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()),
taskExecutionContext.getLogPath(),
taskExecutionContext.getAppInfoPath(),
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTenantCode());
return Pair.of(processFlag && yarnResult.getLeft(), yarnResult.getRight());
// kill yarn or k8s application
try {
ProcessUtils.cancelApplication(taskExecutionContext);
} catch (TaskException e) {
return false;
}
return processFlag;
}
/**
@ -216,42 +205,4 @@ public class TaskKillProcessor implements NettyRequestProcessor {
return processFlag;
}
/**
* kill yarn job
*
* @param host host
* @param logPath logPath
* @param executePath executePath
* @param tenantCode tenantCode
* @return Pair<Boolean, List < String>> yarn kill result
*/
private Pair<Boolean, List<String>> killYarnJob(@NonNull Host host,
String logPath,
String appInfoPath,
String executePath,
String tenantCode) {
if (logPath == null || appInfoPath == null || executePath == null || tenantCode == null) {
log.error(
"Kill yarn job error, the input params is illegal, host: {}, logPath: {}, appInfoPath: {}, executePath: {}, tenantCode: {}",
host, logPath, appInfoPath, executePath, tenantCode);
return Pair.of(false, Collections.emptyList());
}
try {
log.info("Get appIds from worker {}:{} taskLogPath: {}", host.getIp(), host.getPort(), logPath);
List<String> appIds = LogUtils.getAppIds(logPath, appInfoPath,
PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));
if (CollectionUtils.isEmpty(appIds)) {
log.info("The appId is empty");
return Pair.of(true, Collections.emptyList());
}
ProcessUtils.cancelApplication(appIds, log, tenantCode, executePath);
return Pair.of(true, appIds);
} catch (Exception e) {
log.error("Kill yarn job error, host: {}, logPath: {}, executePath: {}, tenantCode: {}", host, logPath,
executePath, tenantCode, e);
}
return Pair.of(false, Collections.emptyList());
}
}

14
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java

@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.server.worker.runner;
import static org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLECT;
import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY;
import static org.apache.dolphinscheduler.common.constants.Constants.DRY_RUN_FLAG_YES;
import static org.apache.dolphinscheduler.common.constants.Constants.SINGLE_SLASH;
@ -26,7 +24,6 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
@ -52,12 +49,9 @@ import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionCheckerUtils;
import org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils;
import org.apache.commons.collections4.CollectionUtils;
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.List;
import javax.annotation.Nullable;
@ -143,13 +137,7 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
if (task != null) {
try {
task.cancel();
List<String> appIds =
LogUtils.getAppIds(taskExecutionContext.getLogPath(), taskExecutionContext.getExecutePath(),
PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));
if (CollectionUtils.isNotEmpty(appIds)) {
ProcessUtils.cancelApplication(appIds, log, taskExecutionContext.getTenantCode(),
taskExecutionContext.getExecutePath());
}
ProcessUtils.cancelApplication(taskExecutionContext);
} catch (Exception e) {
log.error(
"Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual",

Loading…
Cancel
Save