|
|
@ -17,6 +17,7 @@ |
|
|
|
|
|
|
|
|
|
|
|
package org.apache.dolphinscheduler.plugin.task.api.k8s.impl; |
|
|
|
package org.apache.dolphinscheduler.plugin.task.api.k8s.impl; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import static java.util.Collections.singletonList; |
|
|
|
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.API_VERSION; |
|
|
|
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.API_VERSION; |
|
|
|
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CPU; |
|
|
|
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CPU; |
|
|
|
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; |
|
|
|
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; |
|
|
@ -67,6 +68,7 @@ import org.slf4j.Logger; |
|
|
|
import io.fabric8.kubernetes.api.model.Affinity; |
|
|
|
import io.fabric8.kubernetes.api.model.Affinity; |
|
|
|
import io.fabric8.kubernetes.api.model.AffinityBuilder; |
|
|
|
import io.fabric8.kubernetes.api.model.AffinityBuilder; |
|
|
|
import io.fabric8.kubernetes.api.model.EnvVar; |
|
|
|
import io.fabric8.kubernetes.api.model.EnvVar; |
|
|
|
|
|
|
|
import io.fabric8.kubernetes.api.model.LocalObjectReference; |
|
|
|
import io.fabric8.kubernetes.api.model.NodeSelectorTerm; |
|
|
|
import io.fabric8.kubernetes.api.model.NodeSelectorTerm; |
|
|
|
import io.fabric8.kubernetes.api.model.Quantity; |
|
|
|
import io.fabric8.kubernetes.api.model.Quantity; |
|
|
|
import io.fabric8.kubernetes.api.model.ResourceRequirements; |
|
|
|
import io.fabric8.kubernetes.api.model.ResourceRequirements; |
|
|
@ -91,10 +93,11 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { |
|
|
|
super(logger, taskRequest); |
|
|
|
super(logger, taskRequest); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public Job buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) { |
|
|
|
public void buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) { |
|
|
|
String taskInstanceId = String.valueOf(taskRequest.getTaskInstanceId()); |
|
|
|
String taskInstanceId = String.valueOf(taskRequest.getTaskInstanceId()); |
|
|
|
String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT); |
|
|
|
String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT); |
|
|
|
String image = k8STaskMainParameters.getImage(); |
|
|
|
String image = k8STaskMainParameters.getImage(); |
|
|
|
|
|
|
|
String pullSecret = k8STaskMainParameters.getPullSecret(); |
|
|
|
String namespaceName = k8STaskMainParameters.getNamespaceName(); |
|
|
|
String namespaceName = k8STaskMainParameters.getNamespaceName(); |
|
|
|
String imagePullPolicy = k8STaskMainParameters.getImagePullPolicy(); |
|
|
|
String imagePullPolicy = k8STaskMainParameters.getImagePullPolicy(); |
|
|
|
Map<String, String> otherParams = k8STaskMainParameters.getParamsMap(); |
|
|
|
Map<String, String> otherParams = k8STaskMainParameters.getParamsMap(); |
|
|
@ -155,7 +158,7 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { |
|
|
|
.endRequiredDuringSchedulingIgnoredDuringExecution() |
|
|
|
.endRequiredDuringSchedulingIgnoredDuringExecution() |
|
|
|
.endNodeAffinity().build(); |
|
|
|
.endNodeAffinity().build(); |
|
|
|
|
|
|
|
|
|
|
|
JobBuilder jobBuilder = new JobBuilder() |
|
|
|
job = new JobBuilder() |
|
|
|
.withApiVersion(API_VERSION) |
|
|
|
.withApiVersion(API_VERSION) |
|
|
|
.withNewMetadata() |
|
|
|
.withNewMetadata() |
|
|
|
.withName(k8sJobName) |
|
|
|
.withName(k8sJobName) |
|
|
@ -178,14 +181,16 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { |
|
|
|
.withResources(new ResourceRequirements(limitRes, reqRes)) |
|
|
|
.withResources(new ResourceRequirements(limitRes, reqRes)) |
|
|
|
.withEnv(envVars) |
|
|
|
.withEnv(envVars) |
|
|
|
.endContainer() |
|
|
|
.endContainer() |
|
|
|
|
|
|
|
.withImagePullSecrets( |
|
|
|
|
|
|
|
StringUtils.isEmpty(pullSecret) ? null : singletonList(new LocalObjectReference(pullSecret))) |
|
|
|
.withRestartPolicy(RESTART_POLICY) |
|
|
|
.withRestartPolicy(RESTART_POLICY) |
|
|
|
.withAffinity(affinity) |
|
|
|
.withAffinity(affinity) |
|
|
|
.endSpec() |
|
|
|
.endSpec() |
|
|
|
.endTemplate() |
|
|
|
.endTemplate() |
|
|
|
.withBackoffLimit(retryNum) |
|
|
|
.withBackoffLimit(retryNum) |
|
|
|
.endSpec(); |
|
|
|
.endSpec() |
|
|
|
|
|
|
|
.build(); |
|
|
|
|
|
|
|
|
|
|
|
return jobBuilder.build(); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse taskResponse) { |
|
|
|
public void registerBatchJobWatcher(Job job, String taskInstanceId, TaskResponse taskResponse) { |
|
|
@ -322,7 +327,7 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { |
|
|
|
JSONUtils.parseObject(k8sParameterStr, K8sTaskMainParameters.class); |
|
|
|
JSONUtils.parseObject(k8sParameterStr, K8sTaskMainParameters.class); |
|
|
|
try { |
|
|
|
try { |
|
|
|
log.info("[K8sJobExecutor-{}-{}] start to submit job", taskName, taskInstanceId); |
|
|
|
log.info("[K8sJobExecutor-{}-{}] start to submit job", taskName, taskInstanceId); |
|
|
|
job = buildK8sJob(k8STaskMainParameters); |
|
|
|
buildK8sJob(k8STaskMainParameters); |
|
|
|
stopJobOnK8s(k8sParameterStr); |
|
|
|
stopJobOnK8s(k8sParameterStr); |
|
|
|
String namespaceName = k8STaskMainParameters.getNamespaceName(); |
|
|
|
String namespaceName = k8STaskMainParameters.getNamespaceName(); |
|
|
|
k8sUtils.createJob(namespaceName, job); |
|
|
|
k8sUtils.createJob(namespaceName, job); |
|
|
|