Browse Source

[Feature][Task Plugin] Add support for dynamic form for zeppelin task plugin (#9977) (#10269)

3.1.0-release
Eric Gao 3 years ago committed by GitHub
parent
commit
a8a5da367a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      docs/docs/en/guide/task/zeppelin.md
  2. 1
      docs/docs/zh/guide/task/zeppelin.md
  3. 13
      dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
  4. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
  5. 34
      dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
  6. 3
      dolphinscheduler-ui/src/locales/en_US/project.ts
  7. 3
      dolphinscheduler-ui/src/locales/zh_CN/project.ts
  8. 8
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
  9. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

1
docs/docs/en/guide/task/zeppelin.md

@ -22,6 +22,7 @@ it will call `Zeppelin Client API` to trigger zeppelin notebook paragraph. Click
- Timeout alarm: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will send and the task execution will fail. - Timeout alarm: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will send and the task execution will fail.
- Zeppelin Note ID: The unique note id for a zeppelin notebook note. - Zeppelin Note ID: The unique note id for a zeppelin notebook note.
- Zeppelin Paragraph ID: The unique paragraph id for a zeppelin notebook paragraph. - Zeppelin Paragraph ID: The unique paragraph id for a zeppelin notebook paragraph.
- Zeppelin Parameters: Parameters in json format used for zeppelin dynamic form.
## Task Example ## Task Example

1
docs/docs/zh/guide/task/zeppelin.md

@ -23,6 +23,7 @@
- 前置任务:选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。 - 前置任务:选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
- Zeppelin Note ID:Zeppelin Note对应的唯一ID。 - Zeppelin Note ID:Zeppelin Note对应的唯一ID。
- Zepplin Paragraph:Zeppelin Paragraph对应的唯一ID。 - Zepplin Paragraph:Zeppelin Paragraph对应的唯一ID。
- Zeppelin Parameters: 用于传入Zeppelin Dynamic Form的参数。
## Task Example ## Task Example

13
dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java

@ -32,17 +32,16 @@ public class ZeppelinParameters extends AbstractParameters {
*/ */
private String noteId; private String noteId;
private String paragraphId; private String paragraphId;
private String parameters;
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
return StringUtils.isNotEmpty(this.noteId) && StringUtils.isNotEmpty(this.paragraphId); return StringUtils.isNotEmpty(this.noteId) && StringUtils.isNotEmpty(this.paragraphId);
} }
@Override @Override
public List<ResourceInfo> getResourceFilesList() { public List<ResourceInfo> getResourceFilesList() {
return Collections.emptyList(); return Collections.emptyList();
} }
public String getNoteId() { public String getNoteId() {
@ -61,11 +60,21 @@ public class ZeppelinParameters extends AbstractParameters {
this.paragraphId = paragraphId; this.paragraphId = paragraphId;
} }
public String getParameters() {
return parameters;
}
public void setParameters(String parameters) {
this.parameters = parameters;
}
@Override @Override
public String toString() { public String toString() {
return "ZeppelinParameters{" + return "ZeppelinParameters{" +
"noteId='" + noteId + '\'' + "noteId='" + noteId + '\'' +
", paragraphId='" + paragraphId + '\'' + ", paragraphId='" + paragraphId + '\'' +
", parameters='" + parameters + '\'' +
'}'; '}';
} }
} }

12
dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.zeppelin; package org.apache.dolphinscheduler.plugin.task.zeppelin;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -28,6 +29,9 @@ import org.apache.zeppelin.client.ParagraphResult;
import org.apache.zeppelin.client.Status; import org.apache.zeppelin.client.Status;
import org.apache.zeppelin.client.ZeppelinClient; import org.apache.zeppelin.client.ZeppelinClient;
import java.util.HashMap;
import java.util.Map;
public class ZeppelinTask extends AbstractTaskExecutor { public class ZeppelinTask extends AbstractTaskExecutor {
@ -73,9 +77,15 @@ public class ZeppelinTask extends AbstractTaskExecutor {
try { try {
String noteId = this.zeppelinParameters.getNoteId(); String noteId = this.zeppelinParameters.getNoteId();
String paragraphId = this.zeppelinParameters.getParagraphId(); String paragraphId = this.zeppelinParameters.getParagraphId();
String parameters = this.zeppelinParameters.getParameters();
Map<String, String> zeppelinParamsMap = new HashMap<>();
if (parameters != null) {
ObjectMapper mapper = new ObjectMapper();
zeppelinParamsMap = mapper.readValue(parameters, Map.class);
}
// Submit zeppelin task // Submit zeppelin task
ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId); ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap);
String resultContent = paragraphResult.getResultInText(); String resultContent = paragraphResult.getResultInText();
Status status = paragraphResult.getStatus(); Status status = paragraphResult.getStatus();
final int exitStatusCode = mapStatusToExitCode(status); final int exitStatusCode = mapStatusToExitCode(status);

34
dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java

@ -27,6 +27,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.spy; import static org.powermock.api.mockito.PowerMockito.spy;
import static org.powermock.api.mockito.PowerMockito.when; import static org.powermock.api.mockito.PowerMockito.when;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils;
@ -44,17 +45,23 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunner;
import java.util.HashMap;
import java.util.Map;
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PrepareForTest({ @PrepareForTest({
ZeppelinTask.class, ZeppelinTask.class,
ZeppelinClient.class, ZeppelinClient.class,
ObjectMapper.class,
}) })
@PowerMockIgnore({"javax.*"}) @PowerMockIgnore({"javax.*"})
public class ZeppelinTaskTest { public class ZeppelinTaskTest {
private static final String MOCK_NOTE_ID = "2GYJR92R7"; private static final String MOCK_NOTE_ID = "2GYJR92R7";
private static final String MOCK_PARAGRAPH_ID = "paragraph_1648793472526_1771221396"; private static final String MOCK_PARAGRAPH_ID = "paragraph_1648793472526_1771221396";
private static final String MOCK_PARAMETERS = "{\"key1\": \"value1\", \"key2\": \"value2\"}";
private final ObjectMapper mapper = new ObjectMapper();
private ZeppelinClient zClient; private ZeppelinClient zClient;
private ZeppelinTask zeppelinTask; private ZeppelinTask zeppelinTask;
@ -73,7 +80,7 @@ public class ZeppelinTaskTest {
// use mocked zClient in zeppelinTask // use mocked zClient in zeppelinTask
doReturn(this.zClient).when(this.zeppelinTask, "getZeppelinClient"); doReturn(this.zClient).when(this.zeppelinTask, "getZeppelinClient");
when(this.zClient.executeParagraph(any(), any())).thenReturn(this.paragraphResult); when(this.zClient.executeParagraph(any(), any(), any(Map.class))).thenReturn(this.paragraphResult);
when(paragraphResult.getResultInText()).thenReturn("mock-zeppelin-paragraph-execution-result"); when(paragraphResult.getResultInText()).thenReturn("mock-zeppelin-paragraph-execution-result");
this.zeppelinTask.init(); this.zeppelinTask.init();
} }
@ -82,7 +89,9 @@ public class ZeppelinTaskTest {
public void testHandleWithParagraphExecutionSucess() throws Exception { public void testHandleWithParagraphExecutionSucess() throws Exception {
when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED); when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED);
this.zeppelinTask.handle(); this.zeppelinTask.handle();
Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, MOCK_PARAGRAPH_ID); Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
MOCK_PARAGRAPH_ID,
(Map<String, String>) mapper.readValue(MOCK_PARAMETERS, Map.class));
Mockito.verify(this.paragraphResult).getResultInText(); Mockito.verify(this.paragraphResult).getResultInText();
Mockito.verify(this.paragraphResult).getStatus(); Mockito.verify(this.paragraphResult).getStatus();
Assert.assertEquals(EXIT_CODE_SUCCESS, this.zeppelinTask.getExitStatusCode()); Assert.assertEquals(EXIT_CODE_SUCCESS, this.zeppelinTask.getExitStatusCode());
@ -92,7 +101,9 @@ public class ZeppelinTaskTest {
public void testHandleWithParagraphExecutionAborted() throws Exception { public void testHandleWithParagraphExecutionAborted() throws Exception {
when(this.paragraphResult.getStatus()).thenReturn(Status.ABORT); when(this.paragraphResult.getStatus()).thenReturn(Status.ABORT);
this.zeppelinTask.handle(); this.zeppelinTask.handle();
Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, MOCK_PARAGRAPH_ID); Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
MOCK_PARAGRAPH_ID,
(Map<String, String>) mapper.readValue(MOCK_PARAMETERS, Map.class));
Mockito.verify(this.paragraphResult).getResultInText(); Mockito.verify(this.paragraphResult).getResultInText();
Mockito.verify(this.paragraphResult).getStatus(); Mockito.verify(this.paragraphResult).getStatus();
Assert.assertEquals(EXIT_CODE_KILL, this.zeppelinTask.getExitStatusCode()); Assert.assertEquals(EXIT_CODE_KILL, this.zeppelinTask.getExitStatusCode());
@ -102,7 +113,9 @@ public class ZeppelinTaskTest {
public void testHandleWithParagraphExecutionError() throws Exception { public void testHandleWithParagraphExecutionError() throws Exception {
when(this.paragraphResult.getStatus()).thenReturn(Status.ERROR); when(this.paragraphResult.getStatus()).thenReturn(Status.ERROR);
this.zeppelinTask.handle(); this.zeppelinTask.handle();
Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, MOCK_PARAGRAPH_ID); Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
MOCK_PARAGRAPH_ID,
(Map<String, String>) mapper.readValue(MOCK_PARAMETERS, Map.class));
Mockito.verify(this.paragraphResult).getResultInText(); Mockito.verify(this.paragraphResult).getResultInText();
Mockito.verify(this.paragraphResult).getStatus(); Mockito.verify(this.paragraphResult).getStatus();
Assert.assertEquals(EXIT_CODE_FAILURE, this.zeppelinTask.getExitStatusCode()); Assert.assertEquals(EXIT_CODE_FAILURE, this.zeppelinTask.getExitStatusCode());
@ -110,11 +123,13 @@ public class ZeppelinTaskTest {
@Test @Test
public void testHandleWithParagraphExecutionException() throws Exception { public void testHandleWithParagraphExecutionException() throws Exception {
when(this.zClient.executeParagraph(any(), any())). when(this.zClient.executeParagraph(any(), any(), any(Map.class))).
thenThrow(new Exception("Something wrong happens from zeppelin side")); thenThrow(new Exception("Something wrong happens from zeppelin side"));
// when(this.paragraphResult.getStatus()).thenReturn(Status.ERROR); // when(this.paragraphResult.getStatus()).thenReturn(Status.ERROR);
this.zeppelinTask.handle(); this.zeppelinTask.handle();
Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, MOCK_PARAGRAPH_ID); Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
MOCK_PARAGRAPH_ID,
(Map<String, String>) mapper.readValue(MOCK_PARAMETERS, Map.class));
Mockito.verify(this.paragraphResult, Mockito.times(0)).getResultInText(); Mockito.verify(this.paragraphResult, Mockito.times(0)).getResultInText();
Mockito.verify(this.paragraphResult, Mockito.times(0)).getStatus(); Mockito.verify(this.paragraphResult, Mockito.times(0)).getStatus();
Assert.assertEquals(EXIT_CODE_FAILURE, this.zeppelinTask.getExitStatusCode()); Assert.assertEquals(EXIT_CODE_FAILURE, this.zeppelinTask.getExitStatusCode());
@ -122,10 +137,9 @@ public class ZeppelinTaskTest {
private String buildZeppelinTaskParameters() { private String buildZeppelinTaskParameters() {
ZeppelinParameters zeppelinParameters = new ZeppelinParameters(); ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
String noteId = MOCK_NOTE_ID; zeppelinParameters.setNoteId(MOCK_NOTE_ID);
String paragraphId = MOCK_PARAGRAPH_ID; zeppelinParameters.setParagraphId(MOCK_PARAGRAPH_ID);
zeppelinParameters.setNoteId(noteId); zeppelinParameters.setParameters(MOCK_PARAMETERS);
zeppelinParameters.setParagraphId(paragraphId);
return JSONUtils.toJsonString(zeppelinParameters); return JSONUtils.toJsonString(zeppelinParameters);
} }

3
dolphinscheduler-ui/src/locales/en_US/project.ts

@ -606,6 +606,9 @@ export default {
zeppelin_paragraph_id: 'zeppelinParagraphId', zeppelin_paragraph_id: 'zeppelinParagraphId',
zeppelin_paragraph_id_tips: zeppelin_paragraph_id_tips:
'Please enter the paragraph id of your zeppelin paragraph', 'Please enter the paragraph id of your zeppelin paragraph',
zeppelin_parameters: 'parameters',
zeppelin_parameters_tips:
'Please enter the parameters for zeppelin dynamic form',
jupyter_conda_env_name: 'condaEnvName', jupyter_conda_env_name: 'condaEnvName',
jupyter_conda_env_name_tips: jupyter_conda_env_name_tips:
'Please enter the conda environment name of papermill', 'Please enter the conda environment name of papermill',

3
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@ -599,6 +599,9 @@ export default {
zeppelin_note_id_tips: '请输入zeppelin note id', zeppelin_note_id_tips: '请输入zeppelin note id',
zeppelin_paragraph_id: 'zeppelin_paragraph_id', zeppelin_paragraph_id: 'zeppelin_paragraph_id',
zeppelin_paragraph_id_tips: '请输入zeppelin paragraph id', zeppelin_paragraph_id_tips: '请输入zeppelin paragraph id',
zeppelin_parameters: 'parameters',
zeppelin_parameters_tips:
'请输入zeppelin dynamic form参数',
jupyter_conda_env_name: 'condaEnvName', jupyter_conda_env_name: 'condaEnvName',
jupyter_conda_env_name_tips: '请输入papermill所在的conda环境名', jupyter_conda_env_name_tips: '请输入papermill所在的conda环境名',
jupyter_input_note_path: 'inputNotePath', jupyter_input_note_path: 'inputNotePath',

8
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts

@ -56,6 +56,14 @@ export function useZeppelin(model: { [field: string]: any }): IJsonItem[] {
} }
} }
}, },
{
type: 'input',
field: 'parameters',
name: t('project.node.zeppelin_parameters'),
props: {
placeholder: t('project.node.zeppelin_parameters_tips')
}
},
...useCustomParams({ model, field: 'localParams', isSimple: false }) ...useCustomParams({ model, field: 'localParams', isSimple: false })
] ]
} }

1
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -316,6 +316,7 @@ export function formatParams(data: INodeData): {
if (data.taskType === 'ZEPPELIN') { if (data.taskType === 'ZEPPELIN') {
taskParams.noteId = data.zeppelinNoteId taskParams.noteId = data.zeppelinNoteId
taskParams.paragraphId = data.zeppelinParagraphId taskParams.paragraphId = data.zeppelinParagraphId
taskParams.parameters = data.parameters
} }
if (data.taskType === 'K8S') { if (data.taskType === 'K8S') {

Loading…
Cancel
Save