Browse Source

[Feature][plugin] Add k8s task in task plugin (#9425)

* [Feature][plugin] Add k8s task in task plugin

* [Feature][plugin] fix dos and code problems

* [Feature][plugin] refactor some code based on sonar

* [Feature][UI] front-end for k8s task plugin

* [Feature][plugin] delete some front files

* [Feature][plugin] update document

* Update docs/docs/zh/guide/task/k8s.md

* Update docs/docs/en/guide/task/k8s.md

* Update docs/docs/en/guide/task/k8s.md

* Update docs/docs/en/guide/task/k8s.md

* [Feature][UI] front-end change from review comments

* [Feature][UI] replace get namespace list api

* [Feature][plugin] change file name

* Add kubernetes to zh task list

Co-authored-by: hezhao2 <hezhao2@cisco.com>
Co-authored-by: William Tong <weitong@cisco.com>
Co-authored-by: Jiajie Zhong <zhongjiajie955@gmail.com>
3.1.0-release
He Zhao 2 years ago committed by GitHub
parent
commit
5bb1eb04fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      docs/configs/docsdev.js
  2. 44
      docs/docs/en/guide/task/kubernetes.md
  3. 45
      docs/docs/zh/guide/task/kubernetes.md
  4. BIN
      docs/img/tasks/demo/kubernetes-task-en.png
  5. BIN
      docs/img/tasks/icons/kubernetes.png
  6. 14
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
  7. 65
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  8. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  9. 22
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  10. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
  11. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
  12. 43
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
  13. 23
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
  14. 14
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
  15. 74
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
  16. 63
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
  17. 93
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
  18. 286
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
  19. 119
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java
  20. 101
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
  21. 55
      dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/pom.xml
  22. 87
      dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
  23. 49
      dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannel.java
  24. 44
      dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannelFactory.java
  25. 88
      dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskParameters.java
  26. 58
      dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sParametersTest.java
  27. 85
      dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
  28. 1
      dolphinscheduler-task-plugin/pom.xml
  29. BIN
      dolphinscheduler-ui/public/images/task-icons/k8s.png
  30. BIN
      dolphinscheduler-ui/public/images/task-icons/k8s_hover.png
  31. 8
      dolphinscheduler-ui/src/locales/modules/en_US.ts
  32. 8
      dolphinscheduler-ui/src/locales/modules/zh_CN.ts
  33. 7
      dolphinscheduler-ui/src/service/modules/k8s-namespace/index.ts
  34. 2
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
  35. 58
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-k8s.ts
  36. 71
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-namespace.ts
  37. 7
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
  38. 2
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
  39. 81
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts
  40. 6
      dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
  41. 5
      dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
  42. 6
      dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss

8
docs/configs/docsdev.js

@ -174,6 +174,10 @@ export default {
title: 'Jupyter',
link: '/en-us/docs/dev/user_doc/guide/task/jupyter.html',
},
{
title: 'Kubernetes',
link: '/en-us/docs/dev/user_doc/guide/task/kubernetes.html',
},
],
},
{
@ -525,6 +529,10 @@ export default {
title: 'Jupyter',
link: '/zh-cn/docs/dev/user_doc/guide/task/jupyter.html',
},
{
title: 'Kubernetes',
link: '/zh-cn/docs/dev/user_doc/guide/task/kubernetes.html',
},
],
},
{

44
docs/docs/en/guide/task/kubernetes.md

@ -0,0 +1,44 @@
# K8S Node
## Overview
K8S task type used to execute a batch task. In this task, the worker submits the task by using a k8s client.
## Create Task
- Click `Project -> Management-Project -> Name-Workflow Definition`, and click the `Create Workflow` button to enter the DAG editing page.
- Drag from the toolbar <img src="/img/tasks/icons/kubernetes.png" width="15"/> to the canvas.
## Task Parameter
- **Node name**: The node name in a workflow definition is unique.
- **Run flag**: Identifies whether this node schedules normally, if it does not need to execute, select the `prohibition execution`.
- **Descriptive information**: Describe the function of the node.
- **Task priority**: When the number of worker threads is insufficient, execute in the order of priority from high to low, and tasks with the same priority will execute in a first-in first-out order.
- **Worker grouping**: Assign tasks to the machines of the worker group to execute. If `Default` is selected, randomly select a worker machine for execution.
- **Environment Name**: Configure the environment name in which to run the task.
- **Times of failed retry attempts**: The number of times the task failed to resubmit.
- **Failed retry interval**: The time interval (unit minute) for resubmitting the task after a failed task.
- **Delayed execution time**: The time (unit minute) that a task delays in execution.
- **Timeout alarm**: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm email will send and the task execution will fail.
- **Namespace**::the namespace for running k8s task
- **Min CPU**:min CPU requirement for running k8s task
- **Min Memory**:min memory requirement for running k8s task
- **Image**:the registry url for image
- **Custom parameter**: It is a local user-defined parameter for K8S task, these params will pass to container as environment variables.
- **Predecessor task**: Selecting a predecessor task for the current task, will set the selected predecessor task as upstream of the current task.
## Task Example
### Configure the K8S Environment in DolphinScheduler
If you are using the K8S task type in a production environment, the K8S cluster environment is required.
### Configure K8S Nodes
Configure the required content according to the parameter descriptions above.
![K8S](/img/tasks/demo/kubernetes-task-en.png)
## Notice
Task name contains only lowercase alphanumeric characters or '-'

45
docs/docs/zh/guide/task/kubernetes.md

@ -0,0 +1,45 @@
# Kubernetes
## 综述
kubernetes任务类型,用于在kubernetes上执行一个短时和批处理的任务。worker最终会通过使用kubernetes client提交任务。
## 创建任务
- 点击项目管理-项目名称-工作流定义,点击"创建工作流"按钮,进入DAG编辑页面。
- 工具栏中拖动 <img src="/img/tasks/icons/kubernetes.png" width="25"/> 到画板中,选择需要连接的数据源,即可完成创建。
## 任务参数
- 节点名称:设置任务的名称。一个工作流定义中的节点名称是唯一的。
- 运行标志:标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。
- 描述:描述该节点的功能。
- 任务优先级:worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。
- Worker 分组:任务分配给 worker 组的机器执行,选择 Default 会随机选择一台 worker 机执行。
- 环境名称:配置运行任务的环境。
- 失败重试次数:任务失败重新提交的次数。
- 失败重试间隔:任务失败重新提交任务的时间间隔,以分为单位。
- 延迟执行时间:任务延迟执行的时间,以分为单位。
- 超时警告:勾选超时警告、超时失败,当任务超过“超时时长”后,会发送告警邮件并且任务执行失败。
- 命名空间:选择kubernetes集群上存在的命名空间
- 最小CPU:任务在kubernetes上运行所需的最小CPU
- 最小内存:任务在kubernetes上运行所需的最小内存
- 镜像:镜像地址
- 自定义参数:kubernetes任务局部的用户自定义参数,自定义参数最终会通过环境变量形式存在于容器中,提供给kubernetes任务使用
- 前置任务:在当前kubernetes任务之前需要执行的任务
## 任务样例
### 在 DolphinScheduler 中配置 kubernetes 集群环境
若生产环境中要是使用到 kubernetes 任务类型,则需要预先配置好所需的kubernetes集群环境
### 配置 kubernetes 任务节点
根据上述参数说明,配置所需的内容即可。
![kubernetes](/img/tasks/demo/kubernetes-task-en.png)
## 注意事项
任务名字限制在小写字母、数字和-这三种字符之中

BIN
docs/img/tasks/demo/kubernetes-task-en.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 475 KiB

BIN
docs/img/tasks/icons/kubernetes.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.4 KiB

14
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@ -70,7 +71,7 @@ public class TaskExecutionContextBuilder {
if (taskDefinition.getTimeoutFlag() == TimeoutFlag.OPEN) {
taskExecutionContext.setTaskTimeoutStrategy(taskDefinition.getTimeoutNotifyStrategy());
if (taskDefinition.getTimeoutNotifyStrategy() == TaskTimeoutStrategy.FAILED
|| taskDefinition.getTimeoutNotifyStrategy() == TaskTimeoutStrategy.WARNFAILED) {
|| taskDefinition.getTimeoutNotifyStrategy() == TaskTimeoutStrategy.WARNFAILED) {
taskExecutionContext.setTaskTimeout(Math.min(taskDefinition.getTimeout() * SEC_2_MINUTES_TIME_UNIT, Integer.MAX_VALUE));
}
}
@ -117,12 +118,23 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setResourceParametersHelper(parametersHelper);
return this;
}
/**
* build k8sTask related info
*
* @param k8sTaskExecutionContext sqoopTaskExecutionContext
* @return TaskExecutionContextBuilder
*/
public TaskExecutionContextBuilder buildK8sTaskRelatedInfo(K8sTaskExecutionContext k8sTaskExecutionContext) {
taskExecutionContext.setK8sTaskExecutionContext(k8sTaskExecutionContext);
return this;
}
/**
* create
*
* @return taskExecutionContext
*/
public TaskExecutionContext create() {
return taskExecutionContext;
}

65
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -17,8 +17,24 @@
package org.apache.dolphinscheduler.server.master.runner.task;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.commons.collections.CollectionUtils;
import static org.apache.dolphinscheduler.common.Constants.ADDRESS;
import static org.apache.dolphinscheduler.common.Constants.DATABASE;
import static org.apache.dolphinscheduler.common.Constants.JDBC_URL;
import static org.apache.dolphinscheduler.common.Constants.OTHER;
import static org.apache.dolphinscheduler.common.Constants.PASSWORD;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.USER;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DATA_QUALITY;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_K8S;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_NAME;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TABLE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_CONNECTOR_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_DATASOURCE_ID;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_CONNECTOR_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_DATASOURCE_ID;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -35,6 +51,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -52,6 +69,7 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.UdfFuncPa
import org.apache.dolphinscheduler.plugin.task.api.utils.JdbcUrlParser;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.plugin.task.dq.DataQualityParameters;
import org.apache.dolphinscheduler.plugin.task.k8s.K8sTaskParameters;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -60,8 +78,8 @@ import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.HashMap;
@ -73,21 +91,10 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.dolphinscheduler.common.Constants.ADDRESS;
import static org.apache.dolphinscheduler.common.Constants.DATABASE;
import static org.apache.dolphinscheduler.common.Constants.JDBC_URL;
import static org.apache.dolphinscheduler.common.Constants.OTHER;
import static org.apache.dolphinscheduler.common.Constants.PASSWORD;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.USER;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DATA_QUALITY;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_NAME;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TABLE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_CONNECTOR_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_DATASOURCE_ID;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_CONNECTOR_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_DATASOURCE_ID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.zaxxer.hikari.HikariDataSource;
public abstract class BaseTaskProcessor implements ITaskProcessor {
@ -278,6 +285,10 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
if (TASK_TYPE_DATA_QUALITY.equalsIgnoreCase(taskInstance.getTaskType())) {
setDataQualityTaskRelation(dataQualityTaskExecutionContext,taskInstance,tenant.getTenantCode());
}
K8sTaskExecutionContext k8sTaskExecutionContext = new K8sTaskExecutionContext();
if (TASK_TYPE_K8S.equalsIgnoreCase(taskInstance.getTaskType())) {
setK8sTaskRelation(k8sTaskExecutionContext, taskInstance);
}
return TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
@ -286,6 +297,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
.buildResourceParametersInfo(resources)
.buildDataQualityTaskExecutionContext(dataQualityTaskExecutionContext)
.buildK8sTaskRelatedInfo(k8sTaskExecutionContext)
.create();
}
@ -579,4 +591,19 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return resourcesMap;
}
/**
* set k8s task relation
* @param k8sTaskExecutionContext k8sTaskExecutionContext
* @param taskInstance taskInstance
*/
private void setK8sTaskRelation(K8sTaskExecutionContext k8sTaskExecutionContext, TaskInstance taskInstance) {
K8sTaskParameters k8sTaskParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), K8sTaskParameters.class);
Map<String,String> namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace());
String clusterName = namespace.get(CLUSTER);
String configYaml = processService.findConfigYamlByName(clusterName);
if (configYaml != null) {
k8sTaskExecutionContext.setConfigYaml(configYaml);
}
}
}

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -296,4 +296,6 @@ public interface ProcessService {
org.apache.dolphinscheduler.remote.command.CommandType taskType);
ProcessInstance loadNextProcess4Serial(long code, int state, int id);
public String findConfigYamlByName(String clusterName) ;
}

22
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -62,6 +62,7 @@ import org.apache.dolphinscheduler.dao.entity.DqRuleInputEntry;
import org.apache.dolphinscheduler.dao.entity.DqTaskStatisticsValue;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
import org.apache.dolphinscheduler.dao.entity.K8s;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -90,6 +91,7 @@ import org.apache.dolphinscheduler.dao.mapper.DqRuleMapper;
import org.apache.dolphinscheduler.dao.mapper.DqTaskStatisticsValueMapper;
import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper;
import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
import org.apache.dolphinscheduler.dao.mapper.K8sMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
@ -269,6 +271,9 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private K8sMapper k8sMapper;
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
*
@ -3042,4 +3047,21 @@ public class ProcessServiceImpl implements ProcessService {
throw new ServiceException("delete command fail, id:" + commandId);
}
}
/**
* find k8s config yaml by clusterName
*
* @param clusterName clusterName
* @return datasource
*/
@Override
public String findConfigYamlByName(String clusterName) {
if (StringUtils.isEmpty(clusterName)) {
return null;
}
QueryWrapper<K8s> nodeWrapper = new QueryWrapper<>();
nodeWrapper.eq("k8s_name", clusterName);
K8s k8s = k8sMapper.selectOne(nodeWrapper);
return k8s.getK8sConfig();
}
}

6
dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml

@ -153,6 +153,12 @@
<artifactId>dolphinscheduler-task-blocking</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-k8s</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

4
dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml

@ -293,5 +293,9 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
</dependency>
</dependencies>
</project>

43
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
import java.io.Serializable;
/**
* k8s Task ExecutionContext
*/
public class K8sTaskExecutionContext implements Serializable {
private String configYaml;
public String getConfigYaml() {
return configYaml;
}
public void setConfigYaml(String configYaml) {
this.configYaml = configYaml;
}
@Override
public String toString() {
return "K8sTaskExecutionContext{"
+ "configYaml='" + configYaml + '\''
+ '}';
}
}

23
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java

@ -117,6 +117,10 @@ public class TaskConstants {
* exit code success
*/
public static final int EXIT_CODE_SUCCESS = 0;
/**
* running code
*/
public static final int RUNNING_CODE = 1;
public static final String SH = "sh";
@ -387,6 +391,8 @@ public class TaskConstants {
public static final String TASK_TYPE_DATA_QUALITY = "DATA_QUALITY";
public static final String TASK_TYPE_K8S = "K8S";
public static final String TASK_TYPE_BLOCKING = "BLOCKING";
public static final List<String> COMPLEX_TASK_TYPES = Arrays.asList(new String[]{TASK_TYPE_CONDITIONS, TASK_TYPE_SWITCH, TASK_TYPE_SUB_PROCESS, TASK_TYPE_DEPENDENT});
@ -398,6 +404,23 @@ public class TaskConstants {
public static final String AWS_SECRET_ACCESS_KEY= "aws.secret.access.key";
public static final String AWS_REGION = "aws.region";
/**
* use for k8s task
*/
public static final String API_VERSION = "batch/v1";
public static final String IMAGE_PULL_POLICY = "Always";
public static final String RESTART_POLICY = "Never";
public static final String MEMORY = "memory";
public static final String CPU = "cpu";
public static final String LAYER_LABEL = "k8s.cn/layer";
public static final String LAYER_LABEL_VALUE = "batch";
public static final String NAME_LABEL = "k8s.cn/name";
public static final String TASK_INSTANCE_ID = "taskInstanceId";
public static final String MI = "Mi";
public static final int JOB_TTL_SECONDS = 300;
public static final int LOG_LINES = 500;
public static final String NAMESPACE_NAME = "name";
public static final String CLUSTER = "cluster";
/**
* zeppelin config
*/

14
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java

@ -216,7 +216,10 @@ public class TaskExecutionContext {
* sql TaskExecutionContext
*/
private SQLTaskExecutionContext sqlTaskExecutionContext;
/**
* k8s TaskExecutionContext
*/
private K8sTaskExecutionContext k8sTaskExecutionContext;
/**
* resources full name and tenant code
*/
@ -564,6 +567,14 @@ public class TaskExecutionContext {
this.endTime = endTime;
}
public K8sTaskExecutionContext getK8sTaskExecutionContext() {
return k8sTaskExecutionContext;
}
public void setK8sTaskExecutionContext(K8sTaskExecutionContext k8sTaskExecutionContext) {
this.k8sTaskExecutionContext = k8sTaskExecutionContext;
}
@Override
public String toString() {
return "TaskExecutionContext{"
@ -601,6 +612,7 @@ public class TaskExecutionContext {
+ ", delayTime=" + delayTime
+ ", resources=" + resources
+ ", sqlTaskExecutionContext=" + sqlTaskExecutionContext
+ ", k8sTaskExecutionContext=" + k8sTaskExecutionContext
+ ", dataQualityTaskExecutionContext=" + dataQualityTaskExecutionContext
+ '}';
}

74
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.k8s;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
public abstract class AbstractK8sTask extends AbstractTaskExecutor {
/**
* process task
*/
private AbstractK8sTaskExecutor abstractK8sTaskExecutor;
/**
* Abstract k8s Task
*
* @param taskRequest taskRequest
*/
protected AbstractK8sTask(TaskExecutionContext taskRequest) {
super(taskRequest);
this.abstractK8sTaskExecutor = new K8sTaskExecutor(logger,taskRequest);
}
@Override
public void handle() throws Exception {
try {
TaskResponse response = abstractK8sTaskExecutor.run(buildCommand());
setExitStatusCode(response.getExitStatusCode());
setAppIds(response.getAppIds());
} catch (Exception e) {
exitStatusCode = -1;
throw new TaskException("k8s process failure",e);
}
}
/**
* cancel application
*
* @param status status
* @throws Exception exception
*/
@Override
public void cancelApplication(boolean status) throws Exception {
cancel = true;
// cancel process
abstractK8sTaskExecutor.cancelApplication(buildCommand());
}
/**
* create command
*
* @return String
* @throws Exception exception
*/
protected abstract String buildCommand();
}

63
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.k8s;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils;
import org.slf4j.Logger;
public abstract class AbstractK8sTaskExecutor {
protected Logger logger;
protected TaskExecutionContext taskRequest;
protected K8sUtils k8sUtils;
protected StringBuilder logStringBuffer;
protected AbstractK8sTaskExecutor(Logger logger, TaskExecutionContext taskRequest) {
this.logger = logger;
this.taskRequest = taskRequest;
this.k8sUtils = new K8sUtils();
this.logStringBuffer = new StringBuilder();
}
public abstract TaskResponse run(String k8sParameterStr) throws Exception;
public abstract void cancelApplication(String k8sParameterStr);
public void waitTimeout(Boolean timeout) throws TaskException {
if (Boolean.TRUE.equals(timeout)) {
throw new TaskException("K8sTask is timeout");
}
}
public void flushLog(TaskResponse taskResponse) {
if (logStringBuffer.length() != 0 && taskResponse.getExitStatusCode() == EXIT_CODE_FAILURE) {
logger.error(logStringBuffer.toString());
} else if (logStringBuffer.length() != 0) {
logger.info(logStringBuffer.toString());
}
}
public abstract void submitJob2k8s(String k8sParameterStr);
public abstract void stopJobOnK8s(String k8sParameterStr);
}

93
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.k8s;
import java.util.Map;
/**
* k8s task parameters
*/
public class K8sTaskMainParameters {
private String image;
private String namespaceName;
private String clusterName;
private double minCpuCores;
private double minMemorySpace;
private Map<String, String> paramsMap;
public String getImage() {
return image;
}
public void setImage(String image) {
this.image = image;
}
public double getMinCpuCores() {
return minCpuCores;
}
public void setMinCpuCores(double minCpuCores) {
this.minCpuCores = minCpuCores;
}
public double getMinMemorySpace() {
return minMemorySpace;
}
public void setMinMemorySpace(double minMemorySpace) {
this.minMemorySpace = minMemorySpace;
}
public String getNamespaceName() {
return namespaceName;
}
public void setNamespaceName(String namespaceName) {
this.namespaceName = namespaceName;
}
public String getClusterName() {
return clusterName;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public Map<String, String> getParamsMap() {
return paramsMap;
}
public void setParamsMap(Map<String, String> paramsMap) {
this.paramsMap = paramsMap;
}
@Override
public String toString() {
return "K8sTaskMainParameters{"
+ "image='" + image + '\''
+ ", namespaceName='" + namespaceName + '\''
+ ", clusterName='" + clusterName + '\''
+ ", minCpuCores=" + minCpuCores
+ ", minMemorySpace=" + minMemorySpace
+ ", paramsMap=" + paramsMap
+ '}';
}
}

286
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java

@ -0,0 +1,286 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.k8s.impl;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.API_VERSION;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CPU;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.IMAGE_PULL_POLICY;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JOB_TTL_SECONDS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LAYER_LABEL;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LAYER_LABEL_VALUE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.MEMORY;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.MI;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAME_LABEL;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RESTART_POLICY;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_INSTANCE_ID;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
/**
* K8sTaskExecutor used to submit k8s task to K8S
*/
public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
private Job job;
public K8sTaskExecutor(Logger logger, TaskExecutionContext taskRequest) {
super(logger,taskRequest);
}
public Job buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) {
String taskInstanceId = String.valueOf(taskRequest.getTaskInstanceId());
String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT);
String image = k8STaskMainParameters.getImage();
String namespaceName = k8STaskMainParameters.getNamespaceName();
Map<String, String> otherParams = k8STaskMainParameters.getParamsMap();
Double podMem = k8STaskMainParameters.getMinMemorySpace();
Double podCpu = k8STaskMainParameters.getMinCpuCores();
Double limitPodMem = podMem * 2;
Double limitPodCpu = podCpu * 2;
int retryNum = 0;
String k8sJobName = String.format("%s-%s", taskName, taskInstanceId);
Map<String, Quantity> reqRes = new HashMap<>();
reqRes.put(MEMORY, new Quantity(String.format("%s%s", podMem, MI)));
reqRes.put(CPU, new Quantity(String.valueOf(podCpu)));
Map<String, Quantity> limitRes = new HashMap<>();
limitRes.put(MEMORY, new Quantity(String.format("%s%s", limitPodMem, MI)));
limitRes.put(CPU, new Quantity(String.valueOf(limitPodCpu)));
Map<String, String> labelMap = new HashMap<>();
labelMap.put(LAYER_LABEL, LAYER_LABEL_VALUE);
labelMap.put(NAME_LABEL, k8sJobName);
EnvVar taskInstanceIdVar = new EnvVar(TASK_INSTANCE_ID, taskInstanceId, null);
List<EnvVar> envVars = new ArrayList<>();
envVars.add(taskInstanceIdVar);
if (MapUtils.isNotEmpty(otherParams)) {
for (Map.Entry<String,String> entry : otherParams.entrySet()) {
String param = entry.getKey();
String paramValue = entry.getValue();
EnvVar envVar = new EnvVar(param, paramValue, null);
envVars.add(envVar);
}
}
return new JobBuilder()
.withApiVersion(API_VERSION)
.withNewMetadata()
.withName(k8sJobName)
.withLabels(labelMap)
.withNamespace(namespaceName)
.endMetadata()
.withNewSpec()
.withTtlSecondsAfterFinished(JOB_TTL_SECONDS)
.withNewTemplate()
.withNewSpec()
.addNewContainer()
.withName(k8sJobName)
.withImage(image)
.withImagePullPolicy(IMAGE_PULL_POLICY)
.withResources(new ResourceRequirements(limitRes, reqRes))
.withEnv(envVars)
.endContainer()
.withRestartPolicy(RESTART_POLICY)
.endSpec()
.endTemplate()
.withBackoffLimit(retryNum)
.endSpec()
.build();
}
public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse taskResponse, K8sTaskMainParameters k8STaskMainParameters) {
CountDownLatch countDownLatch = new CountDownLatch(1);
Watcher<Job> watcher = new Watcher<Job>() {
@Override
public void eventReceived(Action action, Job job) {
if (action != Action.ADDED) {
int jobStatus = getK8sJobStatus(job);
setTaskStatus(jobStatus,taskInstanceId, taskResponse, k8STaskMainParameters);
countDownLatch.countDown();
}
}
@Override
public void onClose(WatcherException e) {
logStringBuffer.append(String.format("[K8sJobExecutor-%s] fail in k8s: %s",job.getMetadata().getName(),e.getMessage()));
taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
countDownLatch.countDown();
}
@Override
public void onClose() {
logger.warn("Watch gracefully closed");
}
};
Watch watch = null;
try {
watch = k8sUtils.createBatchJobWatcher(job.getMetadata().getName(), watcher);
boolean timeoutFlag = taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED
|| taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED;
if (timeoutFlag) {
Boolean timeout = !(countDownLatch.await(taskRequest.getTaskTimeout(), TimeUnit.SECONDS));
waitTimeout(timeout);
} else {
countDownLatch.await();
}
flushLog(taskResponse);
} catch (InterruptedException e) {
logger.error("job failed in k8s: {}",e.getMessage(), e);
Thread.currentThread().interrupt();
taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
} catch (Exception e) {
logger.error("job failed in k8s: {}",e.getMessage(), e);
taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
} finally {
if (watch != null) {
watch.close();
}
}
}
@Override
public TaskResponse run(String k8sParameterStr) throws Exception {
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
K8sTaskMainParameters k8STaskMainParameters = JSONUtils.parseObject(k8sParameterStr, K8sTaskMainParameters.class);
try {
if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
result.setExitStatusCode(EXIT_CODE_KILL);
return result;
}
if (StringUtils.isEmpty(k8sParameterStr)) {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
return result;
}
K8sTaskExecutionContext k8sTaskExecutionContext = taskRequest.getK8sTaskExecutionContext();
String configYaml = k8sTaskExecutionContext.getConfigYaml();
k8sUtils.buildClient(configYaml);
submitJob2k8s(k8sParameterStr);
registerBatchJobWatcher(job, Integer.toString(taskInstanceId), result, k8STaskMainParameters);
} catch (Exception e) {
cancelApplication(k8sParameterStr);
result.setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
}
return result;
}
@Override
public void cancelApplication(String k8sParameterStr) {
if (job != null) {
stopJobOnK8s(k8sParameterStr);
}
}
@Override
public void submitJob2k8s(String k8sParameterStr) {
int taskInstanceId = taskRequest.getTaskInstanceId();
String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT);
K8sTaskMainParameters k8STaskMainParameters = JSONUtils.parseObject(k8sParameterStr, K8sTaskMainParameters.class);
try {
logger.info("[K8sJobExecutor-{}-{}] start to submit job", taskName, taskInstanceId);
job = buildK8sJob(k8STaskMainParameters);
stopJobOnK8s(k8sParameterStr);
String namespaceName = k8STaskMainParameters.getNamespaceName();
k8sUtils.createJob(namespaceName, job);
logger.info("[K8sJobExecutor-{}-{}] submitted job successfully", taskName, taskInstanceId);
} catch (Exception e) {
logger.error("[K8sJobExecutor-{}-{}] fail to submit job", taskName, taskInstanceId);
throw new TaskException("K8sJobExecutor fail to submit job", e);
}
}
@Override
public void stopJobOnK8s(String k8sParameterStr) {
K8sTaskMainParameters k8STaskMainParameters = JSONUtils.parseObject(k8sParameterStr, K8sTaskMainParameters.class);
String namespaceName = k8STaskMainParameters.getNamespaceName();
String jobName = job.getMetadata().getName();
try {
if (Boolean.TRUE.equals(k8sUtils.jobExist(jobName, namespaceName))) {
k8sUtils.deleteJob(jobName, namespaceName);
}
} catch (Exception e) {
logger.error("[K8sJobExecutor-{}] fail to stop job", jobName);
throw new TaskException("K8sJobExecutor fail to stop job", e);
}
}
public int getK8sJobStatus(Job job) {
JobStatus jobStatus = job.getStatus();
if (jobStatus.getSucceeded() != null && jobStatus.getSucceeded() == 1) {
return EXIT_CODE_SUCCESS;
} else if (jobStatus.getFailed() != null && jobStatus.getFailed() == 1) {
return EXIT_CODE_FAILURE;
} else {
return TaskConstants.RUNNING_CODE;
}
}
public void setTaskStatus(int jobStatus,String taskInstanceId, TaskResponse taskResponse, K8sTaskMainParameters k8STaskMainParameters) {
if (jobStatus == EXIT_CODE_SUCCESS || jobStatus == EXIT_CODE_FAILURE) {
if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(Integer.valueOf(taskInstanceId))) {
logStringBuffer.append(String.format("[K8sJobExecutor-%s] killed", job.getMetadata().getName()));
taskResponse.setExitStatusCode(EXIT_CODE_KILL);
} else if (jobStatus == EXIT_CODE_SUCCESS) {
logStringBuffer.append(String.format("[K8sJobExecutor-%s] succeed in k8s", job.getMetadata().getName()));
taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS);
} else {
String errorMessage = k8sUtils.getPodLog(job.getMetadata().getName(), k8STaskMainParameters.getNamespaceName());
logStringBuffer.append(String.format("[K8sJobExecutor-%s] fail in k8s: %s", job.getMetadata().getName(), errorMessage));
taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
}
}
}
public Job getJob() {
return job;
}
public void setJob(Job job) {
this.job = job;
}
}

119
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.utils;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LOG_LINES;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobList;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
public class K8sUtils {
private static final Logger log = LoggerFactory.getLogger(K8sUtils.class);
private KubernetesClient client;
public void createJob(String namespace, Job job) {
try {
client.batch().v1()
.jobs()
.inNamespace(namespace)
.create(job);
} catch (Exception e) {
throw new TaskException("fail to create job",e);
}
}
public void deleteJob(String jobName, String namespace) {
try {
client.batch().v1()
.jobs()
.inNamespace(namespace)
.withName(jobName)
.delete();
} catch (Exception e) {
throw new TaskException("fail to delete job",e);
}
}
public Boolean jobExist(String jobName, String namespace) {
Optional<Job> result;
try {
JobList jobList = client.batch().v1().jobs().inNamespace(namespace).list();
List<Job> jobs = jobList.getItems();
result = jobs.stream()
.filter(job -> job.getMetadata().getName().equals(jobName))
.findFirst();
return result.isPresent();
} catch (Exception e) {
throw new TaskException("fail to check job: ", e);
}
}
public Watch createBatchJobWatcher(String jobName, Watcher<Job> watcher) {
try {
return client.batch().v1()
.jobs().withName(jobName).watch(watcher);
} catch (Exception e) {
throw new TaskException("fail to register batch job watcher",e);
}
}
public String getPodLog(String jobName, String namespace) {
try {
List<Pod> podList = client.pods().inNamespace(namespace).list().getItems();
String podName = null;
for (Pod pod : podList) {
podName = pod.getMetadata().getName();
if (jobName.equals(podName.substring(0, pod.getMetadata().getName().lastIndexOf("-")))) {
break;
}
}
return client.pods().inNamespace(namespace)
.withName(podName)
.tailingLines(LOG_LINES)
.getLog(Boolean.TRUE);
} catch (Exception e) {
log.error("fail to getPodLog", e);
log.error("response bodies : {}", e.getMessage());
}
return null;
}
public void buildClient(String configYaml) {
try {
Config config = Config.fromKubeconfig(configYaml);
client = new DefaultKubernetesClient(config);
} catch (Exception e) {
throw new TaskException("fail to build k8s ApiClient",e);
}
}
}

101
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.k8s;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAMESPACE_NAME;
import static org.hamcrest.Matchers.is;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class K8sTaskExecutorTest {
private K8sTaskExecutor k8sTaskExecutor = null;
private K8sTaskMainParameters k8sTaskMainParameters = null;
private final String image = "ds-dev";
private final String namespace = "{\"name\":\"default\",\"cluster\":\"lab\"}";
private final double minCpuCores = 2;
private final double minMemorySpace = 10;
private final int taskInstanceId = 1000;
private final String taskName = "k8s_task_test";
private Job job;
@Before
public void before() {
TaskExecutionContext taskRequest = new TaskExecutionContext();
taskRequest.setTaskInstanceId(taskInstanceId);
taskRequest.setTaskName(taskName);
Map<String,String> namespace = JSONUtils.toMap(this.namespace);
String namespaceName = namespace.get(NAMESPACE_NAME);
String clusterName = namespace.get(CLUSTER);
k8sTaskExecutor = new K8sTaskExecutor(null,taskRequest);
k8sTaskMainParameters = new K8sTaskMainParameters();
k8sTaskMainParameters.setImage(image);
k8sTaskMainParameters.setNamespaceName(namespaceName);
k8sTaskMainParameters.setClusterName(clusterName);
k8sTaskMainParameters.setMinCpuCores(minCpuCores);
k8sTaskMainParameters.setMinMemorySpace(minMemorySpace);
job = k8sTaskExecutor.buildK8sJob(k8sTaskMainParameters);
}
@Test
public void testBuildK8sJobNormal() {
String jobStr = "Job(apiVersion=batch/v1, kind=Job, metadata=ObjectMeta(annotations=null, clusterName=null, creationTimestamp=null, deletionGracePeriodSeconds=null, deletionTimestamp=null, finalizers=[], generateName=null, generation=null, labels={k8s.cn/layer=batch, k8s.cn/name=k8s_task_test-1000}, managedFields=[], name=k8s_task_test-1000, namespace=default, ownerReferences=[], resourceVersion=null, selfLink=null, uid=null, additionalProperties={}), spec=JobSpec(activeDeadlineSeconds=null, backoffLimit=0, completionMode=null, completions=null, manualSelector=null, parallelism=null, selector=null, suspend=null, template=PodTemplateSpec(metadata=null, spec=PodSpec(activeDeadlineSeconds=null, affinity=null, automountServiceAccountToken=null, containers=[Container(args=[], command=[], env=[EnvVar(name=taskInstanceId, value=1000, valueFrom=null, additionalProperties={})], envFrom=[], image=ds-dev, imagePullPolicy=Always, lifecycle=null, livenessProbe=null, name=k8s_task_test-1000, ports=[], readinessProbe=null, resources=ResourceRequirements(limits={memory=20.0Mi, cpu=4.0}, requests={memory=10.0Mi, cpu=2.0}, additionalProperties={}), securityContext=null, startupProbe=null, stdin=null, stdinOnce=null, terminationMessagePath=null, terminationMessagePolicy=null, tty=null, volumeDevices=[], volumeMounts=[], workingDir=null, additionalProperties={})], dnsConfig=null, dnsPolicy=null, enableServiceLinks=null, ephemeralContainers=[], hostAliases=[], hostIPC=null, hostNetwork=null, hostPID=null, hostname=null, imagePullSecrets=[], initContainers=[], nodeName=null, nodeSelector=null, overhead=null, preemptionPolicy=null, priority=null, priorityClassName=null, readinessGates=[], restartPolicy=Never, runtimeClassName=null, schedulerName=null, securityContext=null, serviceAccount=null, serviceAccountName=null, setHostnameAsFQDN=null, shareProcessNamespace=null, subdomain=null, terminationGracePeriodSeconds=null, tolerations=[], topologySpreadConstraints=[], volumes=[], additionalProperties={}), additionalProperties={}), ttlSecondsAfterFinished=300, additionalProperties={}), status=null, additionalProperties={})";
Assert.assertEquals(jobStr, job.toString());
}
@Test
public void testGetJobNormal() {
k8sTaskExecutor.setJob(job);
String jobStr = "Job(apiVersion=batch/v1, kind=Job, metadata=ObjectMeta(annotations=null, clusterName=null, creationTimestamp=null, deletionGracePeriodSeconds=null, deletionTimestamp=null, finalizers=[], generateName=null, generation=null, labels={k8s.cn/layer=batch, k8s.cn/name=k8s_task_test-1000}, managedFields=[], name=k8s_task_test-1000, namespace=default, ownerReferences=[], resourceVersion=null, selfLink=null, uid=null, additionalProperties={}), spec=JobSpec(activeDeadlineSeconds=null, backoffLimit=0, completionMode=null, completions=null, manualSelector=null, parallelism=null, selector=null, suspend=null, template=PodTemplateSpec(metadata=null, spec=PodSpec(activeDeadlineSeconds=null, affinity=null, automountServiceAccountToken=null, containers=[Container(args=[], command=[], env=[EnvVar(name=taskInstanceId, value=1000, valueFrom=null, additionalProperties={})], envFrom=[], image=ds-dev, imagePullPolicy=Always, lifecycle=null, livenessProbe=null, name=k8s_task_test-1000, ports=[], readinessProbe=null, resources=ResourceRequirements(limits={memory=20.0Mi, cpu=4.0}, requests={memory=10.0Mi, cpu=2.0}, additionalProperties={}), securityContext=null, startupProbe=null, stdin=null, stdinOnce=null, terminationMessagePath=null, terminationMessagePolicy=null, tty=null, volumeDevices=[], volumeMounts=[], workingDir=null, additionalProperties={})], dnsConfig=null, dnsPolicy=null, enableServiceLinks=null, ephemeralContainers=[], hostAliases=[], hostIPC=null, hostNetwork=null, hostPID=null, hostname=null, imagePullSecrets=[], initContainers=[], nodeName=null, nodeSelector=null, overhead=null, preemptionPolicy=null, priority=null, priorityClassName=null, readinessGates=[], restartPolicy=Never, runtimeClassName=null, schedulerName=null, securityContext=null, serviceAccount=null, serviceAccountName=null, setHostnameAsFQDN=null, shareProcessNamespace=null, subdomain=null, terminationGracePeriodSeconds=null, tolerations=[], topologySpreadConstraints=[], volumes=[], additionalProperties={}), additionalProperties={}), ttlSecondsAfterFinished=300, additionalProperties={}), status=null, additionalProperties={})";
Assert.assertEquals(jobStr,k8sTaskExecutor.getJob().toString());
}
@Test
public void testGetK8sJobStatusNormal() {
JobStatus jobStatus = new JobStatus();
jobStatus.setSucceeded(1);
job.setStatus(jobStatus);
Assert.assertEquals(0, Integer.compare(0,k8sTaskExecutor.getK8sJobStatus(job)));
}
@Test
public void testSetTaskStatusNormal() {
int jobStatus = 0;
TaskResponse taskResponse = new TaskResponse();
K8sTaskMainParameters k8STaskMainParameters = new K8sTaskMainParameters();
k8sTaskExecutor.setJob(job);
k8sTaskExecutor.setTaskStatus(jobStatus,String.valueOf(taskInstanceId),taskResponse,k8STaskMainParameters);
Assert.assertEquals(0, Integer.compare(EXIT_CODE_KILL,taskResponse.getExitStatusCode()));
}
@Test
public void testWaitTimeoutNormal() {
try {
k8sTaskExecutor.waitTimeout(true);
} catch (TaskException e) {
Assert.assertThat(e.getMessage(),is("K8sTask is timeout"));
}
}
}

55
dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/pom.xml

@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler-task-plugin</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>dev-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-task-k8s</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-datasource-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-datasource-api</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

87
dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.k8s;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAMESPACE_NAME;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTask;
import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.HashMap;
import java.util.Map;
public class K8sTask extends AbstractK8sTask {
/**
* taskExecutionContext
*/
private final TaskExecutionContext taskExecutionContext;
/**
* task parameters
*/
private final K8sTaskParameters k8sTaskParameters;
/**
* @param taskRequest taskRequest
*/
public K8sTask(TaskExecutionContext taskRequest) {
super(taskRequest);
this.taskExecutionContext = taskRequest;
this.k8sTaskParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), K8sTaskParameters.class);
if (!k8sTaskParameters.checkParameters()) {
throw new TaskException("K8S task params is not valid");
}
}
@Override
public AbstractParameters getParameters() {
return k8sTaskParameters;
}
@Override
protected String buildCommand() {
K8sTaskMainParameters k8sTaskMainParameters = new K8sTaskMainParameters();
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
if (MapUtils.isEmpty(paramsMap)) {
paramsMap = new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
Map<String,String> namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace());
String namespaceName = namespace.get(NAMESPACE_NAME);
String clusterName = namespace.get(CLUSTER);
k8sTaskMainParameters.setImage(k8sTaskParameters.getImage());
k8sTaskMainParameters.setNamespaceName(namespaceName);
k8sTaskMainParameters.setClusterName(clusterName);
k8sTaskMainParameters.setMinCpuCores(k8sTaskParameters.getMinCpuCores());
k8sTaskMainParameters.setMinMemorySpace(k8sTaskParameters.getMinMemorySpace());
k8sTaskMainParameters.setParamsMap(ParamUtils.convert(paramsMap));
return JSONUtils.toJsonString(k8sTaskMainParameters);
}
}

49
dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannel.java

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.k8s;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
public class K8sTaskChannel implements TaskChannel {
@Override
public void cancelApplication(boolean status) {
}
@Override
public AbstractParameters parseParameters(ParametersNode parametersNode) {
return JSONUtils.parseObject(parametersNode.getTaskParams(), K8sTaskParameters.class);
}
@Override
public ResourceParametersHelper getResources(String parameters) {
return null;
}
@Override
public AbstractTask createTask(TaskExecutionContext taskRequest) {
return new K8sTask(taskRequest);
}
}

44
dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannelFactory.java

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.k8s;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import java.util.List;
import com.google.auto.service.AutoService;
@AutoService(TaskChannelFactory.class)
public class K8sTaskChannelFactory implements TaskChannelFactory {
@Override
public String getName() {
return "K8S";
}
@Override
public List<PluginParams> getParams() {
return null;
}
@Override
public TaskChannel create() {
return new K8sTaskChannel();
}
}

88
dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskParameters.java

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.k8s;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* k8s task parameters
*/
public class K8sTaskParameters extends AbstractParameters {
private String image;
private String namespace;
private double minCpuCores;
private double minMemorySpace;
public String getImage() {
return image;
}
public void setImage(String image) {
this.image = image;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public double getMinCpuCores() {
return minCpuCores;
}
public void setMinCpuCores(double minCpuCores) {
this.minCpuCores = minCpuCores;
}
public double getMinMemorySpace() {
return minMemorySpace;
}
public void setMinMemorySpace(double minMemorySpace) {
this.minMemorySpace = minMemorySpace;
}
@Override
public boolean checkParameters() {
return StringUtils.isNotEmpty(image) && StringUtils.isNotEmpty(namespace)
;
}
@Override
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}
@Override
public String toString() {
return "K8sTaskParameters{"
+ "image='" + image + '\''
+ ", namespace='" + namespace + '\''
+ ", minCpuCores=" + minCpuCores
+ ", minMemorySpace=" + minMemorySpace
+ '}';
}
}

58
dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sParametersTest.java

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.k8s;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class K8sParametersTest {
private K8sTaskParameters k8sTaskParameters = null;
private final String image = "ds-dev";
private final String namespace = "{\"name\":\"default\",\"cluster\":\"lab\"}";
private final double minCpuCores = 2;
private final double minMemorySpace = 10;
@Before
public void before() {
k8sTaskParameters = new K8sTaskParameters();
k8sTaskParameters.setImage(image);
k8sTaskParameters.setNamespace(namespace);
k8sTaskParameters.setMinCpuCores(minCpuCores);
k8sTaskParameters.setMinMemorySpace(minMemorySpace);
}
@Test
public void testCheckParameterNormal() {
Assert.assertTrue(k8sTaskParameters.checkParameters());
}
@Test
public void testGetResourceFilesListNormal() {
Assert.assertNotNull(k8sTaskParameters.getResourceFilesList());
Assert.assertEquals(0, k8sTaskParameters.getResourceFilesList().size());
}
@Test
public void testK8sParameters() {
Assert.assertEquals(image, k8sTaskParameters.getImage());
Assert.assertEquals(namespace, k8sTaskParameters.getNamespace());
Assert.assertEquals(0, Double.compare(minCpuCores, k8sTaskParameters.getMinCpuCores()));
Assert.assertEquals(0,Double.compare(minMemorySpace, k8sTaskParameters.getMinMemorySpace()));
}
}

85
dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.k8s;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class K8sTaskTest {
private K8sTaskParameters k8sTaskParameters = null;
private K8sTask k8sTask = null;
private final String image = "ds-dev";
private final String namespace = "{\"name\":\"default\",\"cluster\":\"lab\"}";
private final double minCpuCores = 2;
private final double minMemorySpace = 10;
private final int taskInstanceId = 1000;
private final String taskName = "k8s_task_test";
private final String DAY = "day";
private final String date = "20220507";
@Before
public void before() {
k8sTaskParameters = new K8sTaskParameters();
k8sTaskParameters.setImage(image);
k8sTaskParameters.setNamespace(namespace);
k8sTaskParameters.setMinCpuCores(minCpuCores);
k8sTaskParameters.setMinMemorySpace(minMemorySpace);
TaskExecutionContext taskRequest = new TaskExecutionContext();
taskRequest.setTaskInstanceId(taskInstanceId);
taskRequest.setTaskName(taskName);
taskRequest.setTaskParams(JSONUtils.toJsonString(k8sTaskParameters));
Property property = new Property();
property.setProp(DAY);
property.setDirect(Direct.IN);
property.setType(DataType.VARCHAR);
property.setValue(date);
Map<String, Property> paramsMap = new HashMap<>();
paramsMap.put(DAY,property);
taskRequest.setParamsMap(paramsMap);
k8sTask = new K8sTask(taskRequest);
}
@Test
public void testBuildCommandNormal() {
String expectedStr = "{\"image\":\"ds-dev\",\"namespaceName\":\"default\",\"clusterName\":\"lab\",\"minCpuCores\":2.0,\"minMemorySpace\":10.0,\"paramsMap\":{\"day\":\"20220507\"}}";
String commandStr = k8sTask.buildCommand();
Assert.assertEquals(expectedStr, commandStr);
}
@Test
public void testGetParametersNormal() {
String expectedStr = "K8sTaskParameters{image='ds-dev', namespace='{\"name\":\"default\",\"cluster\":\"lab\"}', minCpuCores=2.0, minMemorySpace=10.0}";
String result = k8sTask.getParameters().toString();
Assert.assertEquals(expectedStr, result);
}
}

1
dolphinscheduler-task-plugin/pom.xml

@ -50,6 +50,7 @@
<module>dolphinscheduler-task-all</module>
<module>dolphinscheduler-task-emr</module>
<module>dolphinscheduler-task-blocking</module>
<module>dolphinscheduler-task-k8s</module>
<module>dolphinscheduler-task-zeppelin</module>
<module>dolphinscheduler-task-jupyter</module>
</modules>

BIN
dolphinscheduler-ui/public/images/task-icons/k8s.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.4 KiB

BIN
dolphinscheduler-ui/public/images/task-icons/k8s_hover.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.4 KiB

8
dolphinscheduler-ui/src/locales/modules/en_US.ts

@ -650,6 +650,14 @@ const project = {
failed_retry_interval: 'Failed retry interval',
minute: 'Minute',
delay_execution_time: 'Delay execution time',
namespace_cluster: 'Namespace(Cluster)',
min_cpu: 'Min cpu',
min_memory: 'Min memory',
cores: 'Cores',
mb: 'MB',
image: 'Image',
image_tips: 'Please enter image',
min_memory_tips: 'Please enter min memory',
state: 'State',
branch_flow: 'Branch flow',
cancel: 'Cancel',

8
dolphinscheduler-ui/src/locales/modules/zh_CN.ts

@ -645,6 +645,14 @@ const project = {
failed_retry_interval: '失败重试间隔',
minute: '分',
delay_execution_time: '延时执行时间',
namespace_cluster: '命名空间(集群)',
min_cpu: '最小cpu',
min_memory: '最小内存',
cores: '核',
mb: 'MB',
image: '镜像',
image_tips: '请输入镜像',
min_memory_tips: '请输入最小内存',
state: '状态',
branch_flow: '分支流转',
cancel: '取消',

7
dolphinscheduler-ui/src/service/modules/k8s-namespace/index.ts

@ -27,6 +27,13 @@ export function queryNamespaceListPaging(params: ListReq): any {
})
}
export function getAllNamespaces(): any {
return axios({
url: '/k8s-namespace/available-list',
method: 'get'
})
}
export function verifyNamespaceK8s(params: K8SReq): any {
return axios({
url: '/k8s-namespace/verify',

2
dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts

@ -62,4 +62,6 @@ export { useConditions } from './use-conditions'
export { useDependent } from './use-dependent'
export { useEmr } from './use-emr'
export { useZeppelin } from './use-zeppelin'
export { useNamespace } from './use-namespace'
export { useK8s } from './use-k8s'
export { useJupyter } from './use-jupyter'

58
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-k8s.ts

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { useCustomParams, useNamespace } from '.'
import type { IJsonItem } from '../types'
import { useI18n } from 'vue-i18n'
export function useK8s(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n()
return [
useNamespace(),
{
type: 'input-number',
field: 'minCpuCores',
span: 12,
name: t('project.node.min_cpu'),
slots: {
suffix: () => t('project.node.cores')
}
},
{
type: 'input-number',
field: 'minMemorySpace',
span: 12,
name: t('project.node.min_memory'),
slots: {
suffix: () => t('project.node.mb')
}
},
{
type: 'input',
field: 'image',
name: t('project.node.image'),
props: {
placeholder: t('project.node.image_tips')
},
validate: {
trigger: ['input', 'blur'],
message: t('project.node.min_memory_tips')
}
},
...useCustomParams({ model, field: 'localParams', isSimple: true })
]
}

71
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-namespace.ts

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import type { IJsonItem } from '../types'
import { useI18n } from 'vue-i18n'
import { onMounted, ref, VNodeChild } from 'vue'
import { getAllNamespaces } from '@/service/modules/k8s-namespace'
import { SelectOption } from 'naive-ui'
export function useNamespace(): IJsonItem {
const { t } = useI18n()
const options = ref([])
const loading = ref(false)
const getNamespaceList = async () => {
if (loading.value) return
loading.value = true
const totalList = await getAllNamespaces()
options.value = (totalList || []).map(
(item: { id: string; namespace: string; k8s: string }) => ({
label: `${item.namespace}(${item.k8s})`,
value: JSON.stringify({
name: item.namespace,
cluster: item.k8s
})
})
)
loading.value = false
}
onMounted(() => {
getNamespaceList()
})
const renderLabel = (option: SelectOption): VNodeChild => {
if (option.type === 'group') return option.label as string
return [option.label as string]
}
return {
type: 'select',
field: 'namespace',
name: t('project.node.namespace_cluster'),
props: {
loading,
'render-label': renderLabel
},
options: [
{
type: 'group',
label: t('project.node.namespace_cluster'),
key: t('project.node.namespace_cluster'),
children: options as any
}
]
}
}

7
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -310,6 +310,13 @@ export function formatParams(data: INodeData): {
taskParams.paragraphId = data.zeppelinParagraphId
}
if (data.taskType === 'K8S') {
taskParams.namespace = data.namespace
taskParams.minCpuCores = data.minCpuCores
taskParams.minMemorySpace = data.minMemorySpace
taskParams.image = data.image
}
if (data.taskType === 'JUPYTER') {
taskParams.condaEnvName = data.condaEnvName
taskParams.inputNotePath = data.inputNotePath

2
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts

@ -34,6 +34,7 @@ import { useDependent } from './use-dependent'
import { useDataQuality } from './use-data-quality'
import { useEmr } from './use-emr'
import { useZeppelin } from './use-zeppelin'
import { useK8s } from './use-k8s'
import { useJupyter } from './use-jupyter'
export default {
@ -56,5 +57,6 @@ export default {
DATA_QUALITY: useDataQuality,
EMR: useEmr,
ZEPPELIN: useZeppelin,
K8S: useK8s,
JUPYTER: useJupyter
}

81
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { reactive } from 'vue'
import * as Fields from '../fields/index'
import type { IJsonItem, INodeData } from '../types'
import { ITaskData } from '../types'
export function useK8s({
projectCode,
from = 0,
readonly,
data
}: {
projectCode: number
from?: number
readonly?: boolean
data?: ITaskData
}) {
const model = reactive({
name: '',
taskType: 'K8S',
flag: 'YES',
description: '',
timeoutFlag: false,
localParams: [],
environmentCode: null,
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
delayTime: 0,
timeout: 30
} as INodeData)
let extra: IJsonItem[] = []
if (from === 1) {
extra = [
Fields.useTaskType(model, readonly),
Fields.useProcessName({
model,
projectCode,
isCreate: !data?.id,
from,
processName: data?.processName
})
]
}
return {
json: [
Fields.useName(),
...extra,
Fields.useRunFlag(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),
Fields.useEnvironmentName(model, !model.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useK8s(model),
Fields.usePreTasks()
] as IJsonItem[],
model
}
}

6
dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

@ -306,6 +306,12 @@ interface ITaskParams {
udfs?: string
connParams?: string
targetJobName?: string
cluster?: string
namespace?: string
clusterNamespace?: string
minCpuCores?: string
minMemorySpace?: string
image?: string
}
interface INodeData

5
dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts

@ -34,6 +34,7 @@ export type TaskType =
| 'SEATUNNEL'
| 'EMR'
| 'ZEPPELIN'
| 'K8S'
| 'JUPYTER'
export const TASK_TYPES_MAP = {
@ -103,5 +104,9 @@ export const TASK_TYPES_MAP = {
JUPYTER: {
alias: 'JUPYTER',
helperLinkDisable: true
},
K8S: {
alias: 'K8S',
helperLinkDisable: true
}
} as { [key in TaskType]: { alias: string; helperLinkDisable?: boolean } }

6
dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss

@ -158,6 +158,9 @@ $bgLight: #ffffff;
&.icon-zeppelin {
background-image: url('/images/task-icons/zeppelin.png');
}
&.icon-k8s {
background-image: url('/images/task-icons/k8s.png');
}
&.icon-jupyter {
background-image: url('/images/task-icons/jupyter.png');
}
@ -222,6 +225,9 @@ $bgLight: #ffffff;
&.icon-zeppelin {
background-image: url('/images/task-icons/zeppelin_hover.png');
}
&.icon-k8s {
background-image: url('/images/task-icons/k8s_hover.png');
}
&.icon-jupyter {
background-image: url('/images/task-icons/jupyter_hover.png');
}

Loading…
Cancel
Save