diff --git a/docs/configs/docsdev.js b/docs/configs/docsdev.js
index 9991cf08d8..6feba383d6 100644
--- a/docs/configs/docsdev.js
+++ b/docs/configs/docsdev.js
@@ -174,6 +174,10 @@ export default {
title: 'Jupyter',
link: '/en-us/docs/dev/user_doc/guide/task/jupyter.html',
},
+ {
+ title: 'Kubernetes',
+ link: '/en-us/docs/dev/user_doc/guide/task/kubernetes.html',
+ },
],
},
{
@@ -525,6 +529,10 @@ export default {
title: 'Jupyter',
link: '/zh-cn/docs/dev/user_doc/guide/task/jupyter.html',
},
+ {
+ title: 'Kubernetes',
+ link: '/zh-cn/docs/dev/user_doc/guide/task/kubernetes.html',
+ },
],
},
{
diff --git a/docs/docs/en/guide/task/kubernetes.md b/docs/docs/en/guide/task/kubernetes.md
new file mode 100644
index 0000000000..bc024b6100
--- /dev/null
+++ b/docs/docs/en/guide/task/kubernetes.md
@@ -0,0 +1,44 @@
+# K8S Node
+
+## Overview
+
+K8S task type used to execute a batch task. In this task, the worker submits the task by using a k8s client.
+
+## Create Task
+
+- Click `Project -> Management-Project -> Name-Workflow Definition`, and click the `Create Workflow` button to enter the DAG editing page.
+- Drag from the toolbar to the canvas.
+
+## Task Parameter
+
+- **Node name**: The node name in a workflow definition is unique.
+- **Run flag**: Identifies whether this node schedules normally, if it does not need to execute, select the `prohibition execution`.
+- **Descriptive information**: Describe the function of the node.
+- **Task priority**: When the number of worker threads is insufficient, execute in the order of priority from high to low, and tasks with the same priority will execute in a first-in first-out order.
+- **Worker grouping**: Assign tasks to the machines of the worker group to execute. If `Default` is selected, randomly select a worker machine for execution.
+- **Environment Name**: Configure the environment name in which to run the task.
+- **Times of failed retry attempts**: The number of times the task failed to resubmit.
+- **Failed retry interval**: The time interval (unit minute) for resubmitting the task after a failed task.
+- **Delayed execution time**: The time (unit minute) that a task delays in execution.
+- **Timeout alarm**: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm email will send and the task execution will fail.
+- **Namespace**::the namespace for running k8s task
+- **Min CPU**:min CPU requirement for running k8s task
+- **Min Memory**:min memory requirement for running k8s task
+- **Image**:the registry url for image
+- **Custom parameter**: It is a local user-defined parameter for K8S task, these params will pass to container as environment variables.
+- **Predecessor task**: Selecting a predecessor task for the current task, will set the selected predecessor task as upstream of the current task.
+## Task Example
+
+### Configure the K8S Environment in DolphinScheduler
+
+If you are using the K8S task type in a production environment, the K8S cluster environment is required.
+
+### Configure K8S Nodes
+
+Configure the required content according to the parameter descriptions above.
+
+![K8S](/img/tasks/demo/kubernetes-task-en.png)
+
+## Notice
+
+Task name contains only lowercase alphanumeric characters or '-'
diff --git a/docs/docs/zh/guide/task/kubernetes.md b/docs/docs/zh/guide/task/kubernetes.md
new file mode 100644
index 0000000000..7802774b7c
--- /dev/null
+++ b/docs/docs/zh/guide/task/kubernetes.md
@@ -0,0 +1,45 @@
+# Kubernetes
+
+## 综述
+
+kubernetes任务类型,用于在kubernetes上执行一个短时和批处理的任务。worker最终会通过使用kubernetes client提交任务。
+
+## 创建任务
+
+- 点击项目管理-项目名称-工作流定义,点击"创建工作流"按钮,进入DAG编辑页面。
+- 工具栏中拖动 到画板中,选择需要连接的数据源,即可完成创建。
+
+## 任务参数
+
+- 节点名称:设置任务的名称。一个工作流定义中的节点名称是唯一的。
+- 运行标志:标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。
+- 描述:描述该节点的功能。
+- 任务优先级:worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。
+- Worker 分组:任务分配给 worker 组的机器执行,选择 Default 会随机选择一台 worker 机执行。
+- 环境名称:配置运行任务的环境。
+- 失败重试次数:任务失败重新提交的次数。
+- 失败重试间隔:任务失败重新提交任务的时间间隔,以分为单位。
+- 延迟执行时间:任务延迟执行的时间,以分为单位。
+- 超时警告:勾选超时警告、超时失败,当任务超过“超时时长”后,会发送告警邮件并且任务执行失败。
+- 命名空间:选择kubernetes集群上存在的命名空间
+- 最小CPU:任务在kubernetes上运行所需的最小CPU
+- 最小内存:任务在kubernetes上运行所需的最小内存
+- 镜像:镜像地址
+- 自定义参数:kubernetes任务局部的用户自定义参数,自定义参数最终会通过环境变量形式存在于容器中,提供给kubernetes任务使用
+- 前置任务:在当前kubernetes任务之前需要执行的任务
+
+## 任务样例
+
+### 在 DolphinScheduler 中配置 kubernetes 集群环境
+
+若生产环境中要是使用到 kubernetes 任务类型,则需要预先配置好所需的kubernetes集群环境
+
+### 配置 kubernetes 任务节点
+
+根据上述参数说明,配置所需的内容即可。
+
+![kubernetes](/img/tasks/demo/kubernetes-task-en.png)
+
+## 注意事项
+
+任务名字限制在小写字母、数字和-这三种字符之中
\ No newline at end of file
diff --git a/docs/img/tasks/demo/kubernetes-task-en.png b/docs/img/tasks/demo/kubernetes-task-en.png
new file mode 100644
index 0000000000..30e732b627
Binary files /dev/null and b/docs/img/tasks/demo/kubernetes-task-en.png differ
diff --git a/docs/img/tasks/icons/kubernetes.png b/docs/img/tasks/icons/kubernetes.png
new file mode 100644
index 0000000000..ba79a67d22
Binary files /dev/null and b/docs/img/tasks/icons/kubernetes.png differ
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index 25a55fd94e..305af6b867 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@@ -70,7 +71,7 @@ public class TaskExecutionContextBuilder {
if (taskDefinition.getTimeoutFlag() == TimeoutFlag.OPEN) {
taskExecutionContext.setTaskTimeoutStrategy(taskDefinition.getTimeoutNotifyStrategy());
if (taskDefinition.getTimeoutNotifyStrategy() == TaskTimeoutStrategy.FAILED
- || taskDefinition.getTimeoutNotifyStrategy() == TaskTimeoutStrategy.WARNFAILED) {
+ || taskDefinition.getTimeoutNotifyStrategy() == TaskTimeoutStrategy.WARNFAILED) {
taskExecutionContext.setTaskTimeout(Math.min(taskDefinition.getTimeout() * SEC_2_MINUTES_TIME_UNIT, Integer.MAX_VALUE));
}
}
@@ -117,12 +118,23 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setResourceParametersHelper(parametersHelper);
return this;
}
+ /**
+ * build k8sTask related info
+ *
+ * @param k8sTaskExecutionContext sqoopTaskExecutionContext
+ * @return TaskExecutionContextBuilder
+ */
+ public TaskExecutionContextBuilder buildK8sTaskRelatedInfo(K8sTaskExecutionContext k8sTaskExecutionContext) {
+ taskExecutionContext.setK8sTaskExecutionContext(k8sTaskExecutionContext);
+ return this;
+ }
/**
* create
*
* @return taskExecutionContext
*/
+
public TaskExecutionContext create() {
return taskExecutionContext;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 38fef8f44f..2ca0d6cb19 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -17,8 +17,24 @@
package org.apache.dolphinscheduler.server.master.runner.task;
-import com.zaxxer.hikari.HikariDataSource;
-import org.apache.commons.collections.CollectionUtils;
+import static org.apache.dolphinscheduler.common.Constants.ADDRESS;
+import static org.apache.dolphinscheduler.common.Constants.DATABASE;
+import static org.apache.dolphinscheduler.common.Constants.JDBC_URL;
+import static org.apache.dolphinscheduler.common.Constants.OTHER;
+import static org.apache.dolphinscheduler.common.Constants.PASSWORD;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.USER;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DATA_QUALITY;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_K8S;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_NAME;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TABLE;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TYPE;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_CONNECTOR_TYPE;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_DATASOURCE_ID;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_CONNECTOR_TYPE;
+import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_DATASOURCE_ID;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -35,6 +51,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -52,6 +69,7 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.UdfFuncPa
import org.apache.dolphinscheduler.plugin.task.api.utils.JdbcUrlParser;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.plugin.task.dq.DataQualityParameters;
+import org.apache.dolphinscheduler.plugin.task.k8s.K8sTaskParameters;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -60,8 +78,8 @@ import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.HashMap;
@@ -73,21 +91,10 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.dolphinscheduler.common.Constants.ADDRESS;
-import static org.apache.dolphinscheduler.common.Constants.DATABASE;
-import static org.apache.dolphinscheduler.common.Constants.JDBC_URL;
-import static org.apache.dolphinscheduler.common.Constants.OTHER;
-import static org.apache.dolphinscheduler.common.Constants.PASSWORD;
-import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
-import static org.apache.dolphinscheduler.common.Constants.USER;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DATA_QUALITY;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_NAME;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TABLE;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TYPE;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_CONNECTOR_TYPE;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_DATASOURCE_ID;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_CONNECTOR_TYPE;
-import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_DATASOURCE_ID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.zaxxer.hikari.HikariDataSource;
public abstract class BaseTaskProcessor implements ITaskProcessor {
@@ -278,6 +285,10 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
if (TASK_TYPE_DATA_QUALITY.equalsIgnoreCase(taskInstance.getTaskType())) {
setDataQualityTaskRelation(dataQualityTaskExecutionContext,taskInstance,tenant.getTenantCode());
}
+ K8sTaskExecutionContext k8sTaskExecutionContext = new K8sTaskExecutionContext();
+ if (TASK_TYPE_K8S.equalsIgnoreCase(taskInstance.getTaskType())) {
+ setK8sTaskRelation(k8sTaskExecutionContext, taskInstance);
+ }
return TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
@@ -286,6 +297,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
.buildResourceParametersInfo(resources)
.buildDataQualityTaskExecutionContext(dataQualityTaskExecutionContext)
+ .buildK8sTaskRelatedInfo(k8sTaskExecutionContext)
.create();
}
@@ -579,4 +591,19 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return resourcesMap;
}
+
+ /**
+ * set k8s task relation
+ * @param k8sTaskExecutionContext k8sTaskExecutionContext
+ * @param taskInstance taskInstance
+ */
+ private void setK8sTaskRelation(K8sTaskExecutionContext k8sTaskExecutionContext, TaskInstance taskInstance) {
+ K8sTaskParameters k8sTaskParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), K8sTaskParameters.class);
+ Map namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace());
+ String clusterName = namespace.get(CLUSTER);
+ String configYaml = processService.findConfigYamlByName(clusterName);
+ if (configYaml != null) {
+ k8sTaskExecutionContext.setConfigYaml(configYaml);
+ }
+ }
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index bf48ab9cea..9ff8689fd3 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -296,4 +296,6 @@ public interface ProcessService {
org.apache.dolphinscheduler.remote.command.CommandType taskType);
ProcessInstance loadNextProcess4Serial(long code, int state, int id);
+
+ public String findConfigYamlByName(String clusterName) ;
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 5136891280..20058539a1 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -62,6 +62,7 @@ import org.apache.dolphinscheduler.dao.entity.DqRuleInputEntry;
import org.apache.dolphinscheduler.dao.entity.DqTaskStatisticsValue;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
+import org.apache.dolphinscheduler.dao.entity.K8s;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@@ -90,6 +91,7 @@ import org.apache.dolphinscheduler.dao.mapper.DqRuleMapper;
import org.apache.dolphinscheduler.dao.mapper.DqTaskStatisticsValueMapper;
import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper;
import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
+import org.apache.dolphinscheduler.dao.mapper.K8sMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
@@ -269,6 +271,9 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired
private TaskPluginManager taskPluginManager;
+ @Autowired
+ private K8sMapper k8sMapper;
+
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
*
@@ -3042,4 +3047,21 @@ public class ProcessServiceImpl implements ProcessService {
throw new ServiceException("delete command fail, id:" + commandId);
}
}
+ /**
+ * find k8s config yaml by clusterName
+ *
+ * @param clusterName clusterName
+ * @return datasource
+ */
+
+ @Override
+ public String findConfigYamlByName(String clusterName) {
+ if (StringUtils.isEmpty(clusterName)) {
+ return null;
+ }
+ QueryWrapper nodeWrapper = new QueryWrapper<>();
+ nodeWrapper.eq("k8s_name", clusterName);
+ K8s k8s = k8sMapper.selectOne(nodeWrapper);
+ return k8s.getK8sConfig();
+ }
}
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
index 5185b3ce28..2c5c7fc6df 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
@@ -153,6 +153,12 @@
dolphinscheduler-task-blocking
${project.version}
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-task-k8s
+ ${project.version}
+
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
index 64b2aa05d5..b271dda0f0 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml
@@ -293,5 +293,9 @@
+
+ io.fabric8
+ kubernetes-client
+
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
new file mode 100644
index 0000000000..4916d8e57f
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/K8sTaskExecutionContext.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api;
+
+import java.io.Serializable;
+
+/**
+ * k8s Task ExecutionContext
+ */
+
+public class K8sTaskExecutionContext implements Serializable {
+ private String configYaml;
+
+ public String getConfigYaml() {
+ return configYaml;
+ }
+
+ public void setConfigYaml(String configYaml) {
+ this.configYaml = configYaml;
+ }
+
+ @Override
+ public String toString() {
+ return "K8sTaskExecutionContext{"
+ + "configYaml='" + configYaml + '\''
+ + '}';
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
index 12b6c1c19e..c1a1ddadc1 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
@@ -117,6 +117,10 @@ public class TaskConstants {
* exit code success
*/
public static final int EXIT_CODE_SUCCESS = 0;
+ /**
+ * running code
+ */
+ public static final int RUNNING_CODE = 1;
public static final String SH = "sh";
@@ -387,6 +391,8 @@ public class TaskConstants {
public static final String TASK_TYPE_DATA_QUALITY = "DATA_QUALITY";
+ public static final String TASK_TYPE_K8S = "K8S";
+
public static final String TASK_TYPE_BLOCKING = "BLOCKING";
public static final List COMPLEX_TASK_TYPES = Arrays.asList(new String[]{TASK_TYPE_CONDITIONS, TASK_TYPE_SWITCH, TASK_TYPE_SUB_PROCESS, TASK_TYPE_DEPENDENT});
@@ -398,6 +404,23 @@ public class TaskConstants {
public static final String AWS_SECRET_ACCESS_KEY= "aws.secret.access.key";
public static final String AWS_REGION = "aws.region";
+ /**
+ * use for k8s task
+ */
+ public static final String API_VERSION = "batch/v1";
+ public static final String IMAGE_PULL_POLICY = "Always";
+ public static final String RESTART_POLICY = "Never";
+ public static final String MEMORY = "memory";
+ public static final String CPU = "cpu";
+ public static final String LAYER_LABEL = "k8s.cn/layer";
+ public static final String LAYER_LABEL_VALUE = "batch";
+ public static final String NAME_LABEL = "k8s.cn/name";
+ public static final String TASK_INSTANCE_ID = "taskInstanceId";
+ public static final String MI = "Mi";
+ public static final int JOB_TTL_SECONDS = 300;
+ public static final int LOG_LINES = 500;
+ public static final String NAMESPACE_NAME = "name";
+ public static final String CLUSTER = "cluster";
/**
* zeppelin config
*/
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
index 581c73c8f1..022dbf1327 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
@@ -216,7 +216,10 @@ public class TaskExecutionContext {
* sql TaskExecutionContext
*/
private SQLTaskExecutionContext sqlTaskExecutionContext;
-
+ /**
+ * k8s TaskExecutionContext
+ */
+ private K8sTaskExecutionContext k8sTaskExecutionContext;
/**
* resources full name and tenant code
*/
@@ -564,6 +567,14 @@ public class TaskExecutionContext {
this.endTime = endTime;
}
+ public K8sTaskExecutionContext getK8sTaskExecutionContext() {
+ return k8sTaskExecutionContext;
+ }
+
+ public void setK8sTaskExecutionContext(K8sTaskExecutionContext k8sTaskExecutionContext) {
+ this.k8sTaskExecutionContext = k8sTaskExecutionContext;
+ }
+
@Override
public String toString() {
return "TaskExecutionContext{"
@@ -601,6 +612,7 @@ public class TaskExecutionContext {
+ ", delayTime=" + delayTime
+ ", resources=" + resources
+ ", sqlTaskExecutionContext=" + sqlTaskExecutionContext
+ + ", k8sTaskExecutionContext=" + k8sTaskExecutionContext
+ ", dataQualityTaskExecutionContext=" + dataQualityTaskExecutionContext
+ '}';
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
new file mode 100644
index 0000000000..8d7f7eac7f
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.k8s;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+
+public abstract class AbstractK8sTask extends AbstractTaskExecutor {
+ /**
+ * process task
+ */
+ private AbstractK8sTaskExecutor abstractK8sTaskExecutor;
+ /**
+ * Abstract k8s Task
+ *
+ * @param taskRequest taskRequest
+ */
+ protected AbstractK8sTask(TaskExecutionContext taskRequest) {
+ super(taskRequest);
+ this.abstractK8sTaskExecutor = new K8sTaskExecutor(logger,taskRequest);
+ }
+
+ @Override
+ public void handle() throws Exception {
+ try {
+ TaskResponse response = abstractK8sTaskExecutor.run(buildCommand());
+ setExitStatusCode(response.getExitStatusCode());
+ setAppIds(response.getAppIds());
+ } catch (Exception e) {
+ exitStatusCode = -1;
+ throw new TaskException("k8s process failure",e);
+ }
+ }
+
+ /**
+ * cancel application
+ *
+ * @param status status
+ * @throws Exception exception
+ */
+ @Override
+ public void cancelApplication(boolean status) throws Exception {
+ cancel = true;
+ // cancel process
+ abstractK8sTaskExecutor.cancelApplication(buildCommand());
+ }
+
+ /**
+ * create command
+ *
+ * @return String
+ * @throws Exception exception
+ */
+ protected abstract String buildCommand();
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
new file mode 100644
index 0000000000..1d619b26b2
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.k8s;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils;
+
+import org.slf4j.Logger;
+
+public abstract class AbstractK8sTaskExecutor {
+ protected Logger logger;
+ protected TaskExecutionContext taskRequest;
+ protected K8sUtils k8sUtils;
+ protected StringBuilder logStringBuffer;
+
+ protected AbstractK8sTaskExecutor(Logger logger, TaskExecutionContext taskRequest) {
+ this.logger = logger;
+ this.taskRequest = taskRequest;
+ this.k8sUtils = new K8sUtils();
+ this.logStringBuffer = new StringBuilder();
+ }
+
+ public abstract TaskResponse run(String k8sParameterStr) throws Exception;
+
+ public abstract void cancelApplication(String k8sParameterStr);
+
+ public void waitTimeout(Boolean timeout) throws TaskException {
+ if (Boolean.TRUE.equals(timeout)) {
+ throw new TaskException("K8sTask is timeout");
+ }
+ }
+
+ public void flushLog(TaskResponse taskResponse) {
+ if (logStringBuffer.length() != 0 && taskResponse.getExitStatusCode() == EXIT_CODE_FAILURE) {
+ logger.error(logStringBuffer.toString());
+ } else if (logStringBuffer.length() != 0) {
+ logger.info(logStringBuffer.toString());
+ }
+ }
+
+ public abstract void submitJob2k8s(String k8sParameterStr);
+
+ public abstract void stopJobOnK8s(String k8sParameterStr);
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
new file mode 100644
index 0000000000..86caf43934
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.k8s;
+
+import java.util.Map;
+
+/**
+ * k8s task parameters
+ */
+public class K8sTaskMainParameters {
+
+ private String image;
+ private String namespaceName;
+ private String clusterName;
+ private double minCpuCores;
+ private double minMemorySpace;
+ private Map paramsMap;
+
+ public String getImage() {
+ return image;
+ }
+
+ public void setImage(String image) {
+ this.image = image;
+ }
+
+ public double getMinCpuCores() {
+ return minCpuCores;
+ }
+
+ public void setMinCpuCores(double minCpuCores) {
+ this.minCpuCores = minCpuCores;
+ }
+
+ public double getMinMemorySpace() {
+ return minMemorySpace;
+ }
+
+ public void setMinMemorySpace(double minMemorySpace) {
+ this.minMemorySpace = minMemorySpace;
+ }
+
+ public String getNamespaceName() {
+ return namespaceName;
+ }
+
+ public void setNamespaceName(String namespaceName) {
+ this.namespaceName = namespaceName;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public Map getParamsMap() {
+ return paramsMap;
+ }
+
+ public void setParamsMap(Map paramsMap) {
+ this.paramsMap = paramsMap;
+ }
+
+ @Override
+ public String toString() {
+ return "K8sTaskMainParameters{"
+ + "image='" + image + '\''
+ + ", namespaceName='" + namespaceName + '\''
+ + ", clusterName='" + clusterName + '\''
+ + ", minCpuCores=" + minCpuCores
+ + ", minMemorySpace=" + minMemorySpace
+ + ", paramsMap=" + paramsMap
+ + '}';
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
new file mode 100644
index 0000000000..79e1c48af3
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.k8s.impl;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.API_VERSION;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CPU;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.IMAGE_PULL_POLICY;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JOB_TTL_SECONDS;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LAYER_LABEL;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LAYER_LABEL_VALUE;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.MEMORY;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.MI;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAME_LABEL;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RESTART_POLICY;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_INSTANCE_ID;
+
+import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters;
+import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.Quantity;
+import io.fabric8.kubernetes.api.model.ResourceRequirements;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+import io.fabric8.kubernetes.client.WatcherException;
+
+/**
+ * K8sTaskExecutor used to submit k8s task to K8S
+ */
+public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
+ private Job job;
+ public K8sTaskExecutor(Logger logger, TaskExecutionContext taskRequest) {
+ super(logger,taskRequest);
+ }
+
+ public Job buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) {
+ String taskInstanceId = String.valueOf(taskRequest.getTaskInstanceId());
+ String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT);
+ String image = k8STaskMainParameters.getImage();
+ String namespaceName = k8STaskMainParameters.getNamespaceName();
+ Map otherParams = k8STaskMainParameters.getParamsMap();
+ Double podMem = k8STaskMainParameters.getMinMemorySpace();
+ Double podCpu = k8STaskMainParameters.getMinCpuCores();
+ Double limitPodMem = podMem * 2;
+ Double limitPodCpu = podCpu * 2;
+ int retryNum = 0;
+ String k8sJobName = String.format("%s-%s", taskName, taskInstanceId);
+ Map reqRes = new HashMap<>();
+ reqRes.put(MEMORY, new Quantity(String.format("%s%s", podMem, MI)));
+ reqRes.put(CPU, new Quantity(String.valueOf(podCpu)));
+ Map limitRes = new HashMap<>();
+ limitRes.put(MEMORY, new Quantity(String.format("%s%s", limitPodMem, MI)));
+ limitRes.put(CPU, new Quantity(String.valueOf(limitPodCpu)));
+ Map labelMap = new HashMap<>();
+ labelMap.put(LAYER_LABEL, LAYER_LABEL_VALUE);
+ labelMap.put(NAME_LABEL, k8sJobName);
+ EnvVar taskInstanceIdVar = new EnvVar(TASK_INSTANCE_ID, taskInstanceId, null);
+ List envVars = new ArrayList<>();
+ envVars.add(taskInstanceIdVar);
+ if (MapUtils.isNotEmpty(otherParams)) {
+ for (Map.Entry entry : otherParams.entrySet()) {
+ String param = entry.getKey();
+ String paramValue = entry.getValue();
+ EnvVar envVar = new EnvVar(param, paramValue, null);
+ envVars.add(envVar);
+ }
+ }
+ return new JobBuilder()
+ .withApiVersion(API_VERSION)
+ .withNewMetadata()
+ .withName(k8sJobName)
+ .withLabels(labelMap)
+ .withNamespace(namespaceName)
+ .endMetadata()
+ .withNewSpec()
+ .withTtlSecondsAfterFinished(JOB_TTL_SECONDS)
+ .withNewTemplate()
+ .withNewSpec()
+ .addNewContainer()
+ .withName(k8sJobName)
+ .withImage(image)
+ .withImagePullPolicy(IMAGE_PULL_POLICY)
+ .withResources(new ResourceRequirements(limitRes, reqRes))
+ .withEnv(envVars)
+ .endContainer()
+ .withRestartPolicy(RESTART_POLICY)
+ .endSpec()
+ .endTemplate()
+ .withBackoffLimit(retryNum)
+ .endSpec()
+ .build();
+ }
+
+ public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse taskResponse, K8sTaskMainParameters k8STaskMainParameters) {
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ Watcher watcher = new Watcher() {
+ @Override
+ public void eventReceived(Action action, Job job) {
+ if (action != Action.ADDED) {
+ int jobStatus = getK8sJobStatus(job);
+ setTaskStatus(jobStatus,taskInstanceId, taskResponse, k8STaskMainParameters);
+ countDownLatch.countDown();
+ }
+ }
+
+ @Override
+ public void onClose(WatcherException e) {
+ logStringBuffer.append(String.format("[K8sJobExecutor-%s] fail in k8s: %s",job.getMetadata().getName(),e.getMessage()));
+ taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onClose() {
+ logger.warn("Watch gracefully closed");
+ }
+ };
+ Watch watch = null;
+ try {
+ watch = k8sUtils.createBatchJobWatcher(job.getMetadata().getName(), watcher);
+ boolean timeoutFlag = taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED
+ || taskRequest.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED;
+ if (timeoutFlag) {
+ Boolean timeout = !(countDownLatch.await(taskRequest.getTaskTimeout(), TimeUnit.SECONDS));
+ waitTimeout(timeout);
+ } else {
+ countDownLatch.await();
+ }
+ flushLog(taskResponse);
+ } catch (InterruptedException e) {
+ logger.error("job failed in k8s: {}",e.getMessage(), e);
+ Thread.currentThread().interrupt();
+ taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
+ } catch (Exception e) {
+ logger.error("job failed in k8s: {}",e.getMessage(), e);
+ taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
+ } finally {
+ if (watch != null) {
+ watch.close();
+ }
+ }
+ }
+
+ @Override
+ public TaskResponse run(String k8sParameterStr) throws Exception {
+ TaskResponse result = new TaskResponse();
+ int taskInstanceId = taskRequest.getTaskInstanceId();
+ K8sTaskMainParameters k8STaskMainParameters = JSONUtils.parseObject(k8sParameterStr, K8sTaskMainParameters.class);
+ try {
+ if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
+ result.setExitStatusCode(EXIT_CODE_KILL);
+ return result;
+ }
+ if (StringUtils.isEmpty(k8sParameterStr)) {
+ TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
+ return result;
+ }
+ K8sTaskExecutionContext k8sTaskExecutionContext = taskRequest.getK8sTaskExecutionContext();
+ String configYaml = k8sTaskExecutionContext.getConfigYaml();
+ k8sUtils.buildClient(configYaml);
+ submitJob2k8s(k8sParameterStr);
+ registerBatchJobWatcher(job, Integer.toString(taskInstanceId), result, k8STaskMainParameters);
+ } catch (Exception e) {
+ cancelApplication(k8sParameterStr);
+ result.setExitStatusCode(EXIT_CODE_FAILURE);
+ throw e;
+ }
+ return result;
+ }
+
+ @Override
+ public void cancelApplication(String k8sParameterStr) {
+ if (job != null) {
+ stopJobOnK8s(k8sParameterStr);
+ }
+ }
+
+ @Override
+ public void submitJob2k8s(String k8sParameterStr) {
+ int taskInstanceId = taskRequest.getTaskInstanceId();
+ String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT);
+ K8sTaskMainParameters k8STaskMainParameters = JSONUtils.parseObject(k8sParameterStr, K8sTaskMainParameters.class);
+ try {
+ logger.info("[K8sJobExecutor-{}-{}] start to submit job", taskName, taskInstanceId);
+ job = buildK8sJob(k8STaskMainParameters);
+ stopJobOnK8s(k8sParameterStr);
+ String namespaceName = k8STaskMainParameters.getNamespaceName();
+ k8sUtils.createJob(namespaceName, job);
+ logger.info("[K8sJobExecutor-{}-{}] submitted job successfully", taskName, taskInstanceId);
+ } catch (Exception e) {
+ logger.error("[K8sJobExecutor-{}-{}] fail to submit job", taskName, taskInstanceId);
+ throw new TaskException("K8sJobExecutor fail to submit job", e);
+ }
+ }
+
+ @Override
+ public void stopJobOnK8s(String k8sParameterStr) {
+ K8sTaskMainParameters k8STaskMainParameters = JSONUtils.parseObject(k8sParameterStr, K8sTaskMainParameters.class);
+ String namespaceName = k8STaskMainParameters.getNamespaceName();
+ String jobName = job.getMetadata().getName();
+ try {
+ if (Boolean.TRUE.equals(k8sUtils.jobExist(jobName, namespaceName))) {
+ k8sUtils.deleteJob(jobName, namespaceName);
+ }
+ } catch (Exception e) {
+ logger.error("[K8sJobExecutor-{}] fail to stop job", jobName);
+ throw new TaskException("K8sJobExecutor fail to stop job", e);
+ }
+ }
+
+ public int getK8sJobStatus(Job job) {
+ JobStatus jobStatus = job.getStatus();
+ if (jobStatus.getSucceeded() != null && jobStatus.getSucceeded() == 1) {
+ return EXIT_CODE_SUCCESS;
+ } else if (jobStatus.getFailed() != null && jobStatus.getFailed() == 1) {
+ return EXIT_CODE_FAILURE;
+ } else {
+ return TaskConstants.RUNNING_CODE;
+ }
+ }
+
+ public void setTaskStatus(int jobStatus,String taskInstanceId, TaskResponse taskResponse, K8sTaskMainParameters k8STaskMainParameters) {
+ if (jobStatus == EXIT_CODE_SUCCESS || jobStatus == EXIT_CODE_FAILURE) {
+ if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(Integer.valueOf(taskInstanceId))) {
+ logStringBuffer.append(String.format("[K8sJobExecutor-%s] killed", job.getMetadata().getName()));
+ taskResponse.setExitStatusCode(EXIT_CODE_KILL);
+ } else if (jobStatus == EXIT_CODE_SUCCESS) {
+ logStringBuffer.append(String.format("[K8sJobExecutor-%s] succeed in k8s", job.getMetadata().getName()));
+ taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS);
+ } else {
+ String errorMessage = k8sUtils.getPodLog(job.getMetadata().getName(), k8STaskMainParameters.getNamespaceName());
+ logStringBuffer.append(String.format("[K8sJobExecutor-%s] fail in k8s: %s", job.getMetadata().getName(), errorMessage));
+ taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
+ }
+ }
+ }
+
+ public Job getJob() {
+ return job;
+ }
+
+ public void setJob(Job job) {
+ this.job = job;
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java
new file mode 100644
index 0000000000..f2b104875b
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.utils;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LOG_LINES;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+
+import java.util.List;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobList;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+
+public class K8sUtils {
+ private static final Logger log = LoggerFactory.getLogger(K8sUtils.class);
+ private KubernetesClient client;
+
+ public void createJob(String namespace, Job job) {
+ try {
+ client.batch().v1()
+ .jobs()
+ .inNamespace(namespace)
+ .create(job);
+ } catch (Exception e) {
+ throw new TaskException("fail to create job",e);
+ }
+ }
+
+ public void deleteJob(String jobName, String namespace) {
+ try {
+ client.batch().v1()
+ .jobs()
+ .inNamespace(namespace)
+ .withName(jobName)
+ .delete();
+ } catch (Exception e) {
+ throw new TaskException("fail to delete job",e);
+ }
+ }
+
+ public Boolean jobExist(String jobName, String namespace) {
+ Optional result;
+ try {
+ JobList jobList = client.batch().v1().jobs().inNamespace(namespace).list();
+ List jobs = jobList.getItems();
+ result = jobs.stream()
+ .filter(job -> job.getMetadata().getName().equals(jobName))
+ .findFirst();
+ return result.isPresent();
+ } catch (Exception e) {
+ throw new TaskException("fail to check job: ", e);
+ }
+ }
+
+ public Watch createBatchJobWatcher(String jobName, Watcher watcher) {
+ try {
+ return client.batch().v1()
+ .jobs().withName(jobName).watch(watcher);
+ } catch (Exception e) {
+ throw new TaskException("fail to register batch job watcher",e);
+ }
+ }
+
+ public String getPodLog(String jobName, String namespace) {
+ try {
+ List podList = client.pods().inNamespace(namespace).list().getItems();
+ String podName = null;
+ for (Pod pod : podList) {
+ podName = pod.getMetadata().getName();
+ if (jobName.equals(podName.substring(0, pod.getMetadata().getName().lastIndexOf("-")))) {
+ break;
+ }
+ }
+ return client.pods().inNamespace(namespace)
+ .withName(podName)
+ .tailingLines(LOG_LINES)
+ .getLog(Boolean.TRUE);
+ } catch (Exception e) {
+ log.error("fail to getPodLog", e);
+ log.error("response bodies : {}", e.getMessage());
+ }
+ return null;
+ }
+
+ public void buildClient(String configYaml) {
+ try {
+ Config config = Config.fromKubeconfig(configYaml);
+ client = new DefaultKubernetesClient(config);
+ } catch (Exception e) {
+ throw new TaskException("fail to build k8s ApiClient",e);
+ }
+ }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
new file mode 100644
index 0000000000..db30248487
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.k8s;
+import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAMESPACE_NAME;
+import static org.hamcrest.Matchers.is;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class K8sTaskExecutorTest {
+ private K8sTaskExecutor k8sTaskExecutor = null;
+ private K8sTaskMainParameters k8sTaskMainParameters = null;
+ private final String image = "ds-dev";
+ private final String namespace = "{\"name\":\"default\",\"cluster\":\"lab\"}";
+ private final double minCpuCores = 2;
+ private final double minMemorySpace = 10;
+ private final int taskInstanceId = 1000;
+ private final String taskName = "k8s_task_test";
+ private Job job;
+ @Before
+ public void before() {
+ TaskExecutionContext taskRequest = new TaskExecutionContext();
+ taskRequest.setTaskInstanceId(taskInstanceId);
+ taskRequest.setTaskName(taskName);
+ Map namespace = JSONUtils.toMap(this.namespace);
+ String namespaceName = namespace.get(NAMESPACE_NAME);
+ String clusterName = namespace.get(CLUSTER);
+ k8sTaskExecutor = new K8sTaskExecutor(null,taskRequest);
+ k8sTaskMainParameters = new K8sTaskMainParameters();
+ k8sTaskMainParameters.setImage(image);
+ k8sTaskMainParameters.setNamespaceName(namespaceName);
+ k8sTaskMainParameters.setClusterName(clusterName);
+ k8sTaskMainParameters.setMinCpuCores(minCpuCores);
+ k8sTaskMainParameters.setMinMemorySpace(minMemorySpace);
+ job = k8sTaskExecutor.buildK8sJob(k8sTaskMainParameters);
+ }
+ @Test
+ public void testBuildK8sJobNormal() {
+ String jobStr = "Job(apiVersion=batch/v1, kind=Job, metadata=ObjectMeta(annotations=null, clusterName=null, creationTimestamp=null, deletionGracePeriodSeconds=null, deletionTimestamp=null, finalizers=[], generateName=null, generation=null, labels={k8s.cn/layer=batch, k8s.cn/name=k8s_task_test-1000}, managedFields=[], name=k8s_task_test-1000, namespace=default, ownerReferences=[], resourceVersion=null, selfLink=null, uid=null, additionalProperties={}), spec=JobSpec(activeDeadlineSeconds=null, backoffLimit=0, completionMode=null, completions=null, manualSelector=null, parallelism=null, selector=null, suspend=null, template=PodTemplateSpec(metadata=null, spec=PodSpec(activeDeadlineSeconds=null, affinity=null, automountServiceAccountToken=null, containers=[Container(args=[], command=[], env=[EnvVar(name=taskInstanceId, value=1000, valueFrom=null, additionalProperties={})], envFrom=[], image=ds-dev, imagePullPolicy=Always, lifecycle=null, livenessProbe=null, name=k8s_task_test-1000, ports=[], readinessProbe=null, resources=ResourceRequirements(limits={memory=20.0Mi, cpu=4.0}, requests={memory=10.0Mi, cpu=2.0}, additionalProperties={}), securityContext=null, startupProbe=null, stdin=null, stdinOnce=null, terminationMessagePath=null, terminationMessagePolicy=null, tty=null, volumeDevices=[], volumeMounts=[], workingDir=null, additionalProperties={})], dnsConfig=null, dnsPolicy=null, enableServiceLinks=null, ephemeralContainers=[], hostAliases=[], hostIPC=null, hostNetwork=null, hostPID=null, hostname=null, imagePullSecrets=[], initContainers=[], nodeName=null, nodeSelector=null, overhead=null, preemptionPolicy=null, priority=null, priorityClassName=null, readinessGates=[], restartPolicy=Never, runtimeClassName=null, schedulerName=null, securityContext=null, serviceAccount=null, serviceAccountName=null, setHostnameAsFQDN=null, shareProcessNamespace=null, subdomain=null, terminationGracePeriodSeconds=null, tolerations=[], topologySpreadConstraints=[], volumes=[], additionalProperties={}), additionalProperties={}), ttlSecondsAfterFinished=300, additionalProperties={}), status=null, additionalProperties={})";
+ Assert.assertEquals(jobStr, job.toString());
+ }
+ @Test
+ public void testGetJobNormal() {
+ k8sTaskExecutor.setJob(job);
+ String jobStr = "Job(apiVersion=batch/v1, kind=Job, metadata=ObjectMeta(annotations=null, clusterName=null, creationTimestamp=null, deletionGracePeriodSeconds=null, deletionTimestamp=null, finalizers=[], generateName=null, generation=null, labels={k8s.cn/layer=batch, k8s.cn/name=k8s_task_test-1000}, managedFields=[], name=k8s_task_test-1000, namespace=default, ownerReferences=[], resourceVersion=null, selfLink=null, uid=null, additionalProperties={}), spec=JobSpec(activeDeadlineSeconds=null, backoffLimit=0, completionMode=null, completions=null, manualSelector=null, parallelism=null, selector=null, suspend=null, template=PodTemplateSpec(metadata=null, spec=PodSpec(activeDeadlineSeconds=null, affinity=null, automountServiceAccountToken=null, containers=[Container(args=[], command=[], env=[EnvVar(name=taskInstanceId, value=1000, valueFrom=null, additionalProperties={})], envFrom=[], image=ds-dev, imagePullPolicy=Always, lifecycle=null, livenessProbe=null, name=k8s_task_test-1000, ports=[], readinessProbe=null, resources=ResourceRequirements(limits={memory=20.0Mi, cpu=4.0}, requests={memory=10.0Mi, cpu=2.0}, additionalProperties={}), securityContext=null, startupProbe=null, stdin=null, stdinOnce=null, terminationMessagePath=null, terminationMessagePolicy=null, tty=null, volumeDevices=[], volumeMounts=[], workingDir=null, additionalProperties={})], dnsConfig=null, dnsPolicy=null, enableServiceLinks=null, ephemeralContainers=[], hostAliases=[], hostIPC=null, hostNetwork=null, hostPID=null, hostname=null, imagePullSecrets=[], initContainers=[], nodeName=null, nodeSelector=null, overhead=null, preemptionPolicy=null, priority=null, priorityClassName=null, readinessGates=[], restartPolicy=Never, runtimeClassName=null, schedulerName=null, securityContext=null, serviceAccount=null, serviceAccountName=null, setHostnameAsFQDN=null, shareProcessNamespace=null, subdomain=null, terminationGracePeriodSeconds=null, tolerations=[], topologySpreadConstraints=[], volumes=[], additionalProperties={}), additionalProperties={}), ttlSecondsAfterFinished=300, additionalProperties={}), status=null, additionalProperties={})";
+ Assert.assertEquals(jobStr,k8sTaskExecutor.getJob().toString());
+ }
+ @Test
+ public void testGetK8sJobStatusNormal() {
+ JobStatus jobStatus = new JobStatus();
+ jobStatus.setSucceeded(1);
+ job.setStatus(jobStatus);
+ Assert.assertEquals(0, Integer.compare(0,k8sTaskExecutor.getK8sJobStatus(job)));
+ }
+ @Test
+ public void testSetTaskStatusNormal() {
+ int jobStatus = 0;
+ TaskResponse taskResponse = new TaskResponse();
+ K8sTaskMainParameters k8STaskMainParameters = new K8sTaskMainParameters();
+ k8sTaskExecutor.setJob(job);
+ k8sTaskExecutor.setTaskStatus(jobStatus,String.valueOf(taskInstanceId),taskResponse,k8STaskMainParameters);
+ Assert.assertEquals(0, Integer.compare(EXIT_CODE_KILL,taskResponse.getExitStatusCode()));
+ }
+ @Test
+ public void testWaitTimeoutNormal() {
+ try {
+ k8sTaskExecutor.waitTimeout(true);
+ } catch (TaskException e) {
+ Assert.assertThat(e.getMessage(),is("K8sTask is timeout"));
+ }
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/pom.xml
new file mode 100644
index 0000000000..c49e74d102
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/pom.xml
@@ -0,0 +1,55 @@
+
+
+
+
+ dolphinscheduler-task-plugin
+ org.apache.dolphinscheduler
+ dev-SNAPSHOT
+
+ 4.0.0
+
+ dolphinscheduler-task-k8s
+ jar
+
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-datasource-all
+
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-spi
+ provided
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-task-api
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-datasource-api
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
new file mode 100644
index 0000000000..a5ce9992f1
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.k8s;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAMESPACE_NAME;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTask;
+import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class K8sTask extends AbstractK8sTask {
+
+ /**
+ * taskExecutionContext
+ */
+ private final TaskExecutionContext taskExecutionContext;
+
+ /**
+ * task parameters
+ */
+ private final K8sTaskParameters k8sTaskParameters;
+
+ /**
+ * @param taskRequest taskRequest
+ */
+ public K8sTask(TaskExecutionContext taskRequest) {
+ super(taskRequest);
+ this.taskExecutionContext = taskRequest;
+ this.k8sTaskParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), K8sTaskParameters.class);
+ if (!k8sTaskParameters.checkParameters()) {
+ throw new TaskException("K8S task params is not valid");
+ }
+ }
+
+ @Override
+ public AbstractParameters getParameters() {
+ return k8sTaskParameters;
+ }
+
+ @Override
+ protected String buildCommand() {
+ K8sTaskMainParameters k8sTaskMainParameters = new K8sTaskMainParameters();
+ Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+ if (MapUtils.isEmpty(paramsMap)) {
+ paramsMap = new HashMap<>();
+ }
+ if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+ paramsMap.putAll(taskExecutionContext.getParamsMap());
+ }
+ Map namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace());
+ String namespaceName = namespace.get(NAMESPACE_NAME);
+ String clusterName = namespace.get(CLUSTER);
+ k8sTaskMainParameters.setImage(k8sTaskParameters.getImage());
+ k8sTaskMainParameters.setNamespaceName(namespaceName);
+ k8sTaskMainParameters.setClusterName(clusterName);
+ k8sTaskMainParameters.setMinCpuCores(k8sTaskParameters.getMinCpuCores());
+ k8sTaskMainParameters.setMinMemorySpace(k8sTaskParameters.getMinMemorySpace());
+ k8sTaskMainParameters.setParamsMap(ParamUtils.convert(paramsMap));
+ return JSONUtils.toJsonString(k8sTaskMainParameters);
+ }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannel.java
new file mode 100644
index 0000000000..3edb1c448d
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannel.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.k8s;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+public class K8sTaskChannel implements TaskChannel {
+ @Override
+ public void cancelApplication(boolean status) {
+
+ }
+
+ @Override
+ public AbstractParameters parseParameters(ParametersNode parametersNode) {
+ return JSONUtils.parseObject(parametersNode.getTaskParams(), K8sTaskParameters.class);
+ }
+
+ @Override
+ public ResourceParametersHelper getResources(String parameters) {
+ return null;
+ }
+
+ @Override
+ public AbstractTask createTask(TaskExecutionContext taskRequest) {
+ return new K8sTask(taskRequest);
+ }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannelFactory.java
new file mode 100644
index 0000000000..dd8caac035
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskChannelFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.k8s;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
+import org.apache.dolphinscheduler.spi.params.base.PluginParams;
+
+import java.util.List;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(TaskChannelFactory.class)
+public class K8sTaskChannelFactory implements TaskChannelFactory {
+ @Override
+ public String getName() {
+ return "K8S";
+ }
+
+ @Override
+ public List getParams() {
+ return null;
+ }
+
+ @Override
+ public TaskChannel create() {
+ return new K8sTaskChannel();
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskParameters.java
new file mode 100644
index 0000000000..35f816ecba
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskParameters.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.k8s;
+
+import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * k8s task parameters
+ */
+public class K8sTaskParameters extends AbstractParameters {
+ private String image;
+ private String namespace;
+ private double minCpuCores;
+ private double minMemorySpace;
+
+ public String getImage() {
+ return image;
+ }
+
+ public void setImage(String image) {
+ this.image = image;
+ }
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
+
+ public double getMinCpuCores() {
+ return minCpuCores;
+ }
+
+ public void setMinCpuCores(double minCpuCores) {
+ this.minCpuCores = minCpuCores;
+ }
+
+ public double getMinMemorySpace() {
+ return minMemorySpace;
+ }
+
+ public void setMinMemorySpace(double minMemorySpace) {
+ this.minMemorySpace = minMemorySpace;
+ }
+
+ @Override
+ public boolean checkParameters() {
+ return StringUtils.isNotEmpty(image) && StringUtils.isNotEmpty(namespace)
+ ;
+ }
+
+ @Override
+ public List getResourceFilesList() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public String toString() {
+ return "K8sTaskParameters{"
+ + "image='" + image + '\''
+ + ", namespace='" + namespace + '\''
+ + ", minCpuCores=" + minCpuCores
+ + ", minMemorySpace=" + minMemorySpace
+ + '}';
+ }
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sParametersTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sParametersTest.java
new file mode 100644
index 0000000000..6a8892b4e4
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sParametersTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.k8s;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class K8sParametersTest {
+ private K8sTaskParameters k8sTaskParameters = null;
+ private final String image = "ds-dev";
+ private final String namespace = "{\"name\":\"default\",\"cluster\":\"lab\"}";
+ private final double minCpuCores = 2;
+ private final double minMemorySpace = 10;
+
+ @Before
+ public void before() {
+ k8sTaskParameters = new K8sTaskParameters();
+ k8sTaskParameters.setImage(image);
+ k8sTaskParameters.setNamespace(namespace);
+ k8sTaskParameters.setMinCpuCores(minCpuCores);
+ k8sTaskParameters.setMinMemorySpace(minMemorySpace);
+ }
+
+ @Test
+ public void testCheckParameterNormal() {
+ Assert.assertTrue(k8sTaskParameters.checkParameters());
+ }
+
+ @Test
+ public void testGetResourceFilesListNormal() {
+ Assert.assertNotNull(k8sTaskParameters.getResourceFilesList());
+ Assert.assertEquals(0, k8sTaskParameters.getResourceFilesList().size());
+ }
+
+ @Test
+ public void testK8sParameters() {
+ Assert.assertEquals(image, k8sTaskParameters.getImage());
+ Assert.assertEquals(namespace, k8sTaskParameters.getNamespace());
+ Assert.assertEquals(0, Double.compare(minCpuCores, k8sTaskParameters.getMinCpuCores()));
+ Assert.assertEquals(0,Double.compare(minMemorySpace, k8sTaskParameters.getMinMemorySpace()));
+ }
+
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
new file mode 100644
index 0000000000..60506999e4
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.k8s;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class K8sTaskTest {
+ private K8sTaskParameters k8sTaskParameters = null;
+
+ private K8sTask k8sTask = null;
+ private final String image = "ds-dev";
+
+ private final String namespace = "{\"name\":\"default\",\"cluster\":\"lab\"}";
+
+ private final double minCpuCores = 2;
+
+ private final double minMemorySpace = 10;
+ private final int taskInstanceId = 1000;
+ private final String taskName = "k8s_task_test";
+
+ private final String DAY = "day";
+ private final String date = "20220507";
+ @Before
+ public void before() {
+ k8sTaskParameters = new K8sTaskParameters();
+ k8sTaskParameters.setImage(image);
+ k8sTaskParameters.setNamespace(namespace);
+ k8sTaskParameters.setMinCpuCores(minCpuCores);
+ k8sTaskParameters.setMinMemorySpace(minMemorySpace);
+ TaskExecutionContext taskRequest = new TaskExecutionContext();
+ taskRequest.setTaskInstanceId(taskInstanceId);
+ taskRequest.setTaskName(taskName);
+ taskRequest.setTaskParams(JSONUtils.toJsonString(k8sTaskParameters));
+ Property property = new Property();
+ property.setProp(DAY);
+ property.setDirect(Direct.IN);
+ property.setType(DataType.VARCHAR);
+ property.setValue(date);
+ Map paramsMap = new HashMap<>();
+ paramsMap.put(DAY,property);
+ taskRequest.setParamsMap(paramsMap);
+ k8sTask = new K8sTask(taskRequest);
+ }
+
+ @Test
+ public void testBuildCommandNormal() {
+ String expectedStr = "{\"image\":\"ds-dev\",\"namespaceName\":\"default\",\"clusterName\":\"lab\",\"minCpuCores\":2.0,\"minMemorySpace\":10.0,\"paramsMap\":{\"day\":\"20220507\"}}";
+ String commandStr = k8sTask.buildCommand();
+ Assert.assertEquals(expectedStr, commandStr);
+ }
+
+ @Test
+ public void testGetParametersNormal() {
+ String expectedStr = "K8sTaskParameters{image='ds-dev', namespace='{\"name\":\"default\",\"cluster\":\"lab\"}', minCpuCores=2.0, minMemorySpace=10.0}";
+ String result = k8sTask.getParameters().toString();
+ Assert.assertEquals(expectedStr, result);
+ }
+
+}
diff --git a/dolphinscheduler-task-plugin/pom.xml b/dolphinscheduler-task-plugin/pom.xml
index 65a479019c..bfd8ea7b05 100644
--- a/dolphinscheduler-task-plugin/pom.xml
+++ b/dolphinscheduler-task-plugin/pom.xml
@@ -50,6 +50,7 @@
dolphinscheduler-task-all
dolphinscheduler-task-emr
dolphinscheduler-task-blocking
+ dolphinscheduler-task-k8s
dolphinscheduler-task-zeppelin
dolphinscheduler-task-jupyter
diff --git a/dolphinscheduler-ui/public/images/task-icons/k8s.png b/dolphinscheduler-ui/public/images/task-icons/k8s.png
new file mode 100644
index 0000000000..efce2bf14a
Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/k8s.png differ
diff --git a/dolphinscheduler-ui/public/images/task-icons/k8s_hover.png b/dolphinscheduler-ui/public/images/task-icons/k8s_hover.png
new file mode 100644
index 0000000000..ba79a67d22
Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/k8s_hover.png differ
diff --git a/dolphinscheduler-ui/src/locales/modules/en_US.ts b/dolphinscheduler-ui/src/locales/modules/en_US.ts
index 3395adaa85..1a88fabb6b 100644
--- a/dolphinscheduler-ui/src/locales/modules/en_US.ts
+++ b/dolphinscheduler-ui/src/locales/modules/en_US.ts
@@ -650,6 +650,14 @@ const project = {
failed_retry_interval: 'Failed retry interval',
minute: 'Minute',
delay_execution_time: 'Delay execution time',
+ namespace_cluster: 'Namespace(Cluster)',
+ min_cpu: 'Min cpu',
+ min_memory: 'Min memory',
+ cores: 'Cores',
+ mb: 'MB',
+ image: 'Image',
+ image_tips: 'Please enter image',
+ min_memory_tips: 'Please enter min memory',
state: 'State',
branch_flow: 'Branch flow',
cancel: 'Cancel',
diff --git a/dolphinscheduler-ui/src/locales/modules/zh_CN.ts b/dolphinscheduler-ui/src/locales/modules/zh_CN.ts
index f3a5e38578..576a782120 100644
--- a/dolphinscheduler-ui/src/locales/modules/zh_CN.ts
+++ b/dolphinscheduler-ui/src/locales/modules/zh_CN.ts
@@ -645,6 +645,14 @@ const project = {
failed_retry_interval: '失败重试间隔',
minute: '分',
delay_execution_time: '延时执行时间',
+ namespace_cluster: '命名空间(集群)',
+ min_cpu: '最小cpu',
+ min_memory: '最小内存',
+ cores: '核',
+ mb: 'MB',
+ image: '镜像',
+ image_tips: '请输入镜像',
+ min_memory_tips: '请输入最小内存',
state: '状态',
branch_flow: '分支流转',
cancel: '取消',
diff --git a/dolphinscheduler-ui/src/service/modules/k8s-namespace/index.ts b/dolphinscheduler-ui/src/service/modules/k8s-namespace/index.ts
index 2df1d5f703..524cac5b23 100644
--- a/dolphinscheduler-ui/src/service/modules/k8s-namespace/index.ts
+++ b/dolphinscheduler-ui/src/service/modules/k8s-namespace/index.ts
@@ -27,6 +27,13 @@ export function queryNamespaceListPaging(params: ListReq): any {
})
}
+export function getAllNamespaces(): any {
+ return axios({
+ url: '/k8s-namespace/available-list',
+ method: 'get'
+ })
+}
+
export function verifyNamespaceK8s(params: K8SReq): any {
return axios({
url: '/k8s-namespace/verify',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
index 59550af8fa..d4a733814f 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
@@ -62,4 +62,6 @@ export { useConditions } from './use-conditions'
export { useDependent } from './use-dependent'
export { useEmr } from './use-emr'
export { useZeppelin } from './use-zeppelin'
+export { useNamespace } from './use-namespace'
+export { useK8s } from './use-k8s'
export { useJupyter } from './use-jupyter'
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-k8s.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-k8s.ts
new file mode 100644
index 0000000000..f1635bea32
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-k8s.ts
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import { useCustomParams, useNamespace } from '.'
+import type { IJsonItem } from '../types'
+import { useI18n } from 'vue-i18n'
+
+export function useK8s(model: { [field: string]: any }): IJsonItem[] {
+ const { t } = useI18n()
+
+ return [
+ useNamespace(),
+ {
+ type: 'input-number',
+ field: 'minCpuCores',
+ span: 12,
+ name: t('project.node.min_cpu'),
+ slots: {
+ suffix: () => t('project.node.cores')
+ }
+ },
+ {
+ type: 'input-number',
+ field: 'minMemorySpace',
+ span: 12,
+ name: t('project.node.min_memory'),
+ slots: {
+ suffix: () => t('project.node.mb')
+ }
+ },
+ {
+ type: 'input',
+ field: 'image',
+ name: t('project.node.image'),
+ props: {
+ placeholder: t('project.node.image_tips')
+ },
+ validate: {
+ trigger: ['input', 'blur'],
+ message: t('project.node.min_memory_tips')
+ }
+ },
+ ...useCustomParams({ model, field: 'localParams', isSimple: true })
+ ]
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-namespace.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-namespace.ts
new file mode 100644
index 0000000000..983fd6506a
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-namespace.ts
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import type { IJsonItem } from '../types'
+import { useI18n } from 'vue-i18n'
+import { onMounted, ref, VNodeChild } from 'vue'
+import { getAllNamespaces } from '@/service/modules/k8s-namespace'
+import { SelectOption } from 'naive-ui'
+
+export function useNamespace(): IJsonItem {
+ const { t } = useI18n()
+
+ const options = ref([])
+ const loading = ref(false)
+
+ const getNamespaceList = async () => {
+ if (loading.value) return
+ loading.value = true
+ const totalList = await getAllNamespaces()
+ options.value = (totalList || []).map(
+ (item: { id: string; namespace: string; k8s: string }) => ({
+ label: `${item.namespace}(${item.k8s})`,
+ value: JSON.stringify({
+ name: item.namespace,
+ cluster: item.k8s
+ })
+ })
+ )
+ loading.value = false
+ }
+
+ onMounted(() => {
+ getNamespaceList()
+ })
+
+ const renderLabel = (option: SelectOption): VNodeChild => {
+ if (option.type === 'group') return option.label as string
+ return [option.label as string]
+ }
+
+ return {
+ type: 'select',
+ field: 'namespace',
+ name: t('project.node.namespace_cluster'),
+ props: {
+ loading,
+ 'render-label': renderLabel
+ },
+ options: [
+ {
+ type: 'group',
+ label: t('project.node.namespace_cluster'),
+ key: t('project.node.namespace_cluster'),
+ children: options as any
+ }
+ ]
+ }
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index 85e15a941c..a487d3802e 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -310,6 +310,13 @@ export function formatParams(data: INodeData): {
taskParams.paragraphId = data.zeppelinParagraphId
}
+ if (data.taskType === 'K8S') {
+ taskParams.namespace = data.namespace
+ taskParams.minCpuCores = data.minCpuCores
+ taskParams.minMemorySpace = data.minMemorySpace
+ taskParams.image = data.image
+ }
+
if (data.taskType === 'JUPYTER') {
taskParams.condaEnvName = data.condaEnvName
taskParams.inputNotePath = data.inputNotePath
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
index 2e1eadcd7a..a9a0671dc0 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
@@ -34,6 +34,7 @@ import { useDependent } from './use-dependent'
import { useDataQuality } from './use-data-quality'
import { useEmr } from './use-emr'
import { useZeppelin } from './use-zeppelin'
+import { useK8s } from './use-k8s'
import { useJupyter } from './use-jupyter'
export default {
@@ -56,5 +57,6 @@ export default {
DATA_QUALITY: useDataQuality,
EMR: useEmr,
ZEPPELIN: useZeppelin,
+ K8S: useK8s,
JUPYTER: useJupyter
}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts
new file mode 100644
index 0000000000..a55425ffb5
--- /dev/null
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { reactive } from 'vue'
+import * as Fields from '../fields/index'
+import type { IJsonItem, INodeData } from '../types'
+import { ITaskData } from '../types'
+
+export function useK8s({
+ projectCode,
+ from = 0,
+ readonly,
+ data
+}: {
+ projectCode: number
+ from?: number
+ readonly?: boolean
+ data?: ITaskData
+}) {
+ const model = reactive({
+ name: '',
+ taskType: 'K8S',
+ flag: 'YES',
+ description: '',
+ timeoutFlag: false,
+ localParams: [],
+ environmentCode: null,
+ failRetryInterval: 1,
+ failRetryTimes: 0,
+ workerGroup: 'default',
+ delayTime: 0,
+ timeout: 30
+ } as INodeData)
+
+ let extra: IJsonItem[] = []
+ if (from === 1) {
+ extra = [
+ Fields.useTaskType(model, readonly),
+ Fields.useProcessName({
+ model,
+ projectCode,
+ isCreate: !data?.id,
+ from,
+ processName: data?.processName
+ })
+ ]
+ }
+
+ return {
+ json: [
+ Fields.useName(),
+ ...extra,
+ Fields.useRunFlag(),
+ Fields.useDescription(),
+ Fields.useTaskPriority(),
+ Fields.useWorkerGroup(),
+ Fields.useEnvironmentName(model, !model.id),
+ ...Fields.useTaskGroup(model, projectCode),
+ ...Fields.useFailed(),
+ Fields.useDelayTime(model),
+ ...Fields.useTimeoutAlarm(model),
+ ...Fields.useK8s(model),
+ Fields.usePreTasks()
+ ] as IJsonItem[],
+ model
+ }
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index b785d0f7a0..c98ae48a9a 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -306,6 +306,12 @@ interface ITaskParams {
udfs?: string
connParams?: string
targetJobName?: string
+ cluster?: string
+ namespace?: string
+ clusterNamespace?: string
+ minCpuCores?: string
+ minMemorySpace?: string
+ image?: string
}
interface INodeData
diff --git a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
index 25deefdef2..b3b68037c0 100644
--- a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
@@ -34,6 +34,7 @@ export type TaskType =
| 'SEATUNNEL'
| 'EMR'
| 'ZEPPELIN'
+ | 'K8S'
| 'JUPYTER'
export const TASK_TYPES_MAP = {
@@ -103,5 +104,9 @@ export const TASK_TYPES_MAP = {
JUPYTER: {
alias: 'JUPYTER',
helperLinkDisable: true
+ },
+ K8S: {
+ alias: 'K8S',
+ helperLinkDisable: true
}
} as { [key in TaskType]: { alias: string; helperLinkDisable?: boolean } }
diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
index 8cc075c901..3fbc70fa48 100644
--- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
+++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
@@ -158,6 +158,9 @@ $bgLight: #ffffff;
&.icon-zeppelin {
background-image: url('/images/task-icons/zeppelin.png');
}
+ &.icon-k8s {
+ background-image: url('/images/task-icons/k8s.png');
+ }
&.icon-jupyter {
background-image: url('/images/task-icons/jupyter.png');
}
@@ -222,6 +225,9 @@ $bgLight: #ffffff;
&.icon-zeppelin {
background-image: url('/images/task-icons/zeppelin_hover.png');
}
+ &.icon-k8s {
+ background-image: url('/images/task-icons/k8s_hover.png');
+ }
&.icon-jupyter {
background-image: url('/images/task-icons/jupyter_hover.png');
}