Browse Source

Fix k8sTaskExecutionContext setting configYaml (#15116)

* fixed the issue of obtaining kubeConfig of k8s tasks

* removed excess files

* removed excess files

* formatted the code

---------

Co-authored-by: xiangzihao <460888207@qq.com>
Co-authored-by: Aaron Wang <wangweirao16@gmail.com>
augit-log
chenrj 5 months ago committed by GitHub
parent
commit
ce11674668
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
  2. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
  3. 13
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
  4. 13
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java
  5. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java

3
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java

@ -22,7 +22,6 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_L
import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@ -132,8 +131,6 @@ public class KubernetesApplicationManager implements ApplicationManager {
private KubernetesClient getClient(KubernetesApplicationManagerContext kubernetesApplicationManagerContext) {
K8sTaskExecutionContext k8sTaskExecutionContext =
kubernetesApplicationManagerContext.getK8sTaskExecutionContext();
k8sTaskExecutionContext
.setConfigYaml(JSONUtils.getNodeString(k8sTaskExecutionContext.getConnectionParams(), "kubeConfig"));
return cacheClientMap.computeIfAbsent(kubernetesApplicationManagerContext.getLabelValue(),
key -> new KubernetesClientBuilder()
.withConfig(Config.fromKubeconfig(k8sTaskExecutionContext.getConfigYaml())).build());

4
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java

@ -293,9 +293,7 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
return result;
}
K8sTaskExecutionContext k8sTaskExecutionContext = taskRequest.getK8sTaskExecutionContext();
String connectionParams = k8sTaskExecutionContext.getConnectionParams();
String kubeConfig = JSONUtils.getNodeString(connectionParams, "kubeConfig");
String configYaml = kubeConfig;
String configYaml = k8sTaskExecutionContext.getConfigYaml();
k8sUtils.buildClient(configYaml);
submitJob2k8s(k8sParameterStr);
parsePodLogOutput();

13
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java

@ -18,9 +18,12 @@
package org.apache.dolphinscheduler.plugin.task.api.parameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.commons.collections4.CollectionUtils;
@ -86,6 +89,16 @@ public abstract class AbstractParameters implements IParameters {
return localParametersMaps;
}
public K8sTaskExecutionContext generateK8sTaskExecutionContext(ResourceParametersHelper parametersHelper,
int datasource) {
DataSourceParameters dataSourceParameters =
(DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, datasource);
K8sTaskExecutionContext k8sTaskExecutionContext = new K8sTaskExecutionContext();
k8sTaskExecutionContext.setConnectionParams(
Objects.nonNull(dataSourceParameters) ? dataSourceParameters.getConnectionParams() : null);
return k8sTaskExecutionContext;
}
/**
* get input local parameters map if the param direct is IN
* @return parameters map

13
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/K8sTaskParameters.java

@ -17,19 +17,16 @@
package org.apache.dolphinscheduler.plugin.task.api.parameters;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
import org.apache.dolphinscheduler.plugin.task.api.model.Label;
import org.apache.dolphinscheduler.plugin.task.api.model.NodeSelectorExpression;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@ -58,18 +55,12 @@ public class K8sTaskParameters extends AbstractParameters {
public boolean checkParameters() {
return StringUtils.isNotEmpty(image);
}
public K8sTaskExecutionContext generateExtendedContext(ResourceParametersHelper parametersHelper) {
DataSourceParameters dataSourceParameters =
(DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, datasource);
K8sTaskExecutionContext k8sTaskExecutionContext = new K8sTaskExecutionContext();
k8sTaskExecutionContext.setConnectionParams(
Objects.nonNull(dataSourceParameters) ? dataSourceParameters.getConnectionParams() : null);
return k8sTaskExecutionContext;
}
@Override
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}
@Override
public ResourceParametersHelper getResources() {
ResourceParametersHelper resources = super.getResources();

6
dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java

@ -70,14 +70,16 @@ public class K8sTask extends AbstractK8sTask {
}
k8sTaskExecutionContext =
k8sTaskParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
taskRequest.setK8sTaskExecutionContext(k8sTaskExecutionContext);
k8sTaskParameters.generateK8sTaskExecutionContext(taskExecutionContext.getResourceParametersHelper(),
k8sTaskParameters.getDatasource());
k8sConnectionParam =
(K8sConnectionParam) DataSourceUtils.buildConnectionParams(DbType.valueOf(k8sTaskParameters.getType()),
k8sTaskExecutionContext.getConnectionParams());
String kubeConfig = k8sConnectionParam.getKubeConfig();
k8sTaskParameters.setNamespace(k8sConnectionParam.getNamespace());
k8sTaskParameters.setKubeConfig(kubeConfig);
k8sTaskExecutionContext.setConfigYaml(kubeConfig);
taskRequest.setK8sTaskExecutionContext(k8sTaskExecutionContext);
log.info("Initialize k8s task params:{}", JSONUtils.toPrettyJsonString(k8sTaskParameters));
}

Loading…
Cancel
Save