Browse Source

[Improvement][Task Plugin] Improvement Kubeflow task plugin (#12928)

add example
check phase in status:conditions
3.2.0-release
JieguangZhou 2 years ago committed by GitHub
parent
commit
7c90bf01bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 26
      docs/docs/en/guide/task/kubeflow.md
  2. 26
      docs/docs/zh/guide/task/kubeflow.md
  3. 7
      dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowHelper.java
  4. 1
      dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java
  5. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java
  6. 4
      dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss

26
docs/docs/en/guide/task/kubeflow.md

@ -26,7 +26,31 @@ The task plugin picture is as follows
### Here are some specific parameters for the Kubeflow plugin
- **Namespace**:The namespace parameter of the cluster
- **yamlContent**:CRD YAML file content
- **yamlContent**:CRD YAML file content, for example:
```yaml
apiVersion: "kubeflow.org/v1"
kind: TFJob
metadata:
name: tfjob-simple
namespace: kubeflow-user-example-com
spec:
tfReplicaSpecs:
Worker:
replicas: 2
restartPolicy: OnFailure
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
containers:
- name: tensorflow
image: gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0
command:
- "python"
- "/var/tf_mnist/mnist_with_summaries.py"
```
## Environment Configuration

26
docs/docs/zh/guide/task/kubeflow.md

@ -24,7 +24,31 @@
### Kubeflow组件独有的参数
- **Namespace**:集群命名空间参数
- **yamlContent**:CRD YAML文件内容
- **yamlContent**:CRD YAML文件内容, 如:
```yaml
apiVersion: "kubeflow.org/v1"
kind: TFJob
metadata:
name: tfjob-simple
namespace: kubeflow-user-example-com
spec:
tfReplicaSpecs:
Worker:
replicas: 2
restartPolicy: OnFailure
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
containers:
- name: tensorflow
image: gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0
command:
- "python"
- "/var/tf_mnist/mnist_with_summaries.py"
```
## 环境配置

7
dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowHelper.java

@ -20,6 +20,8 @@ package org.apache.dolphinscheduler.plugin.kubeflow;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@ -75,6 +77,7 @@ public class KubeflowHelper {
}
JsonNode status = data.get("status");
String lastConditionType = "";
if (status.has("conditions")) {
JsonNode conditions = status.get("conditions");
for (int x = messageIndex; x < conditions.size(); x = x + 1) {
@ -83,10 +86,14 @@ public class KubeflowHelper {
logger.info(stepMessage);
}
messageIndex = conditions.size();
JsonNode lastCondition = conditions.get(conditions.size() - 1);
lastConditionType = lastCondition.has("type") ? lastCondition.get("type").asText() : "";
}
String phase;
if (status.has("phase")) {
phase = status.get("phase").asText();
} else if (StringUtils.isNotEmpty(lastConditionType)) {
phase = lastConditionType;
} else {
phase = "";
}

1
dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java

@ -109,6 +109,7 @@ public class KubeflowTask extends AbstractRemoteTask {
logger.info("Kubeflow task delete command: \n{}", command);
String message = runCommand(command);
logger.info("Kubeflow task delete result: \n{}", message);
exitStatusCode = TaskConstants.EXIT_CODE_KILL;
}
protected String runCommand(String command) {

2
dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java

@ -105,7 +105,7 @@ public class KubeflowTaskTest {
KubeflowTask task = Mockito.spy(createTask(kubeflowParameters));
Mockito.when(task.runCommand(Mockito.anyString())).thenReturn("delete_result");
task.cancelApplication();
Assertions.assertEquals(task.getExitStatusCode(), TaskConstants.EXIT_CODE_SUCCESS);
Assertions.assertEquals(task.getExitStatusCode(), TaskConstants.EXIT_CODE_KILL);
}
public KubeflowTask createTask(KubeflowParameters kubeflowParameters) {

4
dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss

@ -202,7 +202,7 @@ $bgLight: #ffffff;
background-image: url('/images/task-icons/linkis.png');
}
&.icon-kubeflow {
background-image: url('/images/task-icons/kubeflow_hover.png');
background-image: url('/images/task-icons/kubeflow.png');
}
}
@ -309,7 +309,7 @@ $bgLight: #ffffff;
background-image: url('/images/task-icons/linkis_hover.png');
}
&.icon-kubeflow {
background-image: url('/images/task-icons/kubeflow.png');
background-image: url('/images/task-icons/kubeflow_hover.png');
}
}
}

Loading…
Cancel
Save