diff --git a/docs/docs/en/guide/task/kubernetes.md b/docs/docs/en/guide/task/kubernetes.md index 316a82adf7..332669a60c 100644 --- a/docs/docs/en/guide/task/kubernetes.md +++ b/docs/docs/en/guide/task/kubernetes.md @@ -16,17 +16,18 @@ K8S task type used to execute a batch task. In this task, the worker submits the - Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters. -| **Parameter** | **Description** | -|------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| Namespace | The namespace for running k8s task. | -| Min CPU | Minimum CPU requirement for running k8s task. | -| Min Memory | Minimum memory requirement for running k8s task. | -| Image | The registry url for image. | -| Command | The container execution command (yaml-style array), for example: ["printenv"] | -| Args | The args of execution command (yaml-style array), for example: ["HOSTNAME", "KUBERNETES_PORT"] | -| Custom label | The customized labels for k8s Job. | -| Node selector | The label selectors for running k8s pod. Different value in value set should be seperated by comma, for example: `value1,value2`. You can refer to https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/node-selector-requirement/ for configuration of different operators. | -| Custom parameter | It is a local user-defined parameter for K8S task, these params will pass to container as environment variables. | +| **Parameter** | **Description** | +|-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Namespace | The namespace for running k8s task. | +| Min CPU | Minimum CPU requirement for running k8s task. | +| Min Memory | Minimum memory requirement for running k8s task. | +| Image | The registry url for image. | +| Image Pull Policy | The image pull policy for image. | +| Command | The container execution command (yaml-style array), for example: ["printenv"] | +| Args | The args of execution command (yaml-style array), for example: ["HOSTNAME", "KUBERNETES_PORT"] | +| Custom label | The customized labels for k8s Job. | +| Node selector | The label selectors for running k8s pod. Different value in value set should be seperated by comma, for example: `value1,value2`. You can refer to https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/node-selector-requirement/ for configuration of different operators. | +| Custom parameter | It is a local user-defined parameter for K8S task, these params will pass to container as environment variables. | ## Task Example diff --git a/docs/docs/zh/guide/task/kubernetes.md b/docs/docs/zh/guide/task/kubernetes.md index 0409f116eb..dbfbcd20d2 100644 --- a/docs/docs/zh/guide/task/kubernetes.md +++ b/docs/docs/zh/guide/task/kubernetes.md @@ -22,6 +22,7 @@ kubernetes任务类型,用于在kubernetes上执行一个短时和批处理的 | 最小CPU | 任务在kubernetes上运行所需的最小CPU | | 最小内存 | 任务在kubernetes上运行所需的最小内存 | | 镜像 | 镜像地址 | +| 镜像拉取策略 | 镜像的拉取策略 | | 容器执行命令 | 容器执行命令(yaml格式数组),例如:["printenv"] | | 执行命令参数 | 执行命令参数(yaml格式数组),例如:["HOSTNAME", "KUBERNETES_PORT"] | | 自定义标签 | 作业自定义标签 | diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java index ec2b1cab38..d70cf11a22 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskMainParameters.java @@ -34,6 +34,7 @@ public class K8sTaskMainParameters { private String args; private String namespaceName; private String clusterName; + private String imagePullPolicy; private double minCpuCores; private double minMemorySpace; private Map paramsMap; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index 109304675e..f357713d39 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -22,7 +22,6 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CPU; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.IMAGE_PULL_POLICY; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JOB_TTL_SECONDS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LAYER_LABEL; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LAYER_LABEL_VALUE; @@ -56,6 +55,8 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; +import io.fabric8.kubernetes.api.model.Affinity; +import io.fabric8.kubernetes.api.model.AffinityBuilder; import io.fabric8.kubernetes.api.model.EnvVar; import io.fabric8.kubernetes.api.model.NodeSelectorTerm; import io.fabric8.kubernetes.api.model.Quantity; @@ -82,6 +83,7 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT); String image = k8STaskMainParameters.getImage(); String namespaceName = k8STaskMainParameters.getNamespaceName(); + String imagePullPolicy = k8STaskMainParameters.getImagePullPolicy(); Map otherParams = k8STaskMainParameters.getParamsMap(); Double podMem = k8STaskMainParameters.getMinMemorySpace(); Double podCpu = k8STaskMainParameters.getMinCpuCores(); @@ -129,7 +131,16 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { NodeSelectorTerm nodeSelectorTerm = new NodeSelectorTerm(); nodeSelectorTerm.setMatchExpressions(k8STaskMainParameters.getNodeSelectorRequirements()); - return new JobBuilder() + Affinity affinity = k8STaskMainParameters.getNodeSelectorRequirements().size() == 0 ? null + : new AffinityBuilder() + .withNewNodeAffinity() + .withNewRequiredDuringSchedulingIgnoredDuringExecution() + .addNewNodeSelectorTermLike(nodeSelectorTerm) + .endNodeSelectorTerm() + .endRequiredDuringSchedulingIgnoredDuringExecution() + .endNodeAffinity().build(); + + JobBuilder jobBuilder = new JobBuilder() .withApiVersion(API_VERSION) .withNewMetadata() .withName(k8sJobName) @@ -145,24 +156,18 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { .withImage(image) .withCommand(commands.size() == 0 ? null : commands) .withArgs(args.size() == 0 ? null : args) - .withImagePullPolicy(IMAGE_PULL_POLICY) + .withImagePullPolicy(imagePullPolicy) .withResources(new ResourceRequirements(limitRes, reqRes)) .withEnv(envVars) .endContainer() .withRestartPolicy(RESTART_POLICY) - .withNewAffinity() - .withNewNodeAffinity() - .withNewRequiredDuringSchedulingIgnoredDuringExecution() - .addNewNodeSelectorTermLike(nodeSelectorTerm) - .endNodeSelectorTerm() - .endRequiredDuringSchedulingIgnoredDuringExecution() - .endNodeAffinity() - .endAffinity() + .withAffinity(affinity) .endSpec() .endTemplate() .withBackoffLimit(retryNum) - .endSpec() - .build(); + .endSpec(); + + return jobBuilder.build(); } public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse taskResponse, diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java index be1c5d0c62..fd738fe736 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java @@ -37,11 +37,12 @@ public class K8sTaskParameters extends AbstractParameters { private String image; private String namespace; private String command; - private List