From a8a5da367ad37911ee2400df894cdf8ff16fb73e Mon Sep 17 00:00:00 2001 From: Eric Gao Date: Tue, 31 May 2022 11:19:41 +0800 Subject: [PATCH] [Feature][Task Plugin] Add support for dynamic form for zeppelin task plugin (#9977) (#10269) --- docs/docs/en/guide/task/zeppelin.md | 1 + docs/docs/zh/guide/task/zeppelin.md | 1 + .../task/zeppelin/ZeppelinParameters.java | 13 +++++-- .../plugin/task/zeppelin/ZeppelinTask.java | 12 ++++++- .../task/zeppelin/ZeppelinTaskTest.java | 34 +++++++++++++------ .../src/locales/en_US/project.ts | 3 ++ .../src/locales/zh_CN/project.ts | 3 ++ .../components/node/fields/use-zeppelin.ts | 8 +++++ .../task/components/node/format-data.ts | 1 + 9 files changed, 63 insertions(+), 13 deletions(-) diff --git a/docs/docs/en/guide/task/zeppelin.md b/docs/docs/en/guide/task/zeppelin.md index 6f5478ee75..08414e664a 100644 --- a/docs/docs/en/guide/task/zeppelin.md +++ b/docs/docs/en/guide/task/zeppelin.md @@ -22,6 +22,7 @@ it will call `Zeppelin Client API` to trigger zeppelin notebook paragraph. Click - Timeout alarm: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will send and the task execution will fail. - Zeppelin Note ID: The unique note id for a zeppelin notebook note. - Zeppelin Paragraph ID: The unique paragraph id for a zeppelin notebook paragraph. +- Zeppelin Parameters: Parameters in json format used for zeppelin dynamic form. ## Task Example diff --git a/docs/docs/zh/guide/task/zeppelin.md b/docs/docs/zh/guide/task/zeppelin.md index 9461075c54..78285a0317 100644 --- a/docs/docs/zh/guide/task/zeppelin.md +++ b/docs/docs/zh/guide/task/zeppelin.md @@ -23,6 +23,7 @@ - 前置任务:选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。 - Zeppelin Note ID:Zeppelin Note对应的唯一ID。 - Zepplin Paragraph:Zeppelin Paragraph对应的唯一ID。 +- Zeppelin Parameters: 用于传入Zeppelin Dynamic Form的参数。 ## Task Example diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java index 3617449755..b379985f5d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java @@ -32,17 +32,16 @@ public class ZeppelinParameters extends AbstractParameters { */ private String noteId; private String paragraphId; + private String parameters; @Override public boolean checkParameters() { - return StringUtils.isNotEmpty(this.noteId) && StringUtils.isNotEmpty(this.paragraphId); } @Override public List getResourceFilesList() { return Collections.emptyList(); - } public String getNoteId() { @@ -61,11 +60,21 @@ public class ZeppelinParameters extends AbstractParameters { this.paragraphId = paragraphId; } + public String getParameters() { + return parameters; + } + + public void setParameters(String parameters) { + this.parameters = parameters; + } + @Override public String toString() { return "ZeppelinParameters{" + "noteId='" + noteId + '\'' + ", paragraphId='" + paragraphId + '\'' + + ", parameters='" + parameters + '\'' + '}'; } + } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java index ad163bd8ab..eb32ac5996 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.plugin.task.zeppelin; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; @@ -28,6 +29,9 @@ import org.apache.zeppelin.client.ParagraphResult; import org.apache.zeppelin.client.Status; import org.apache.zeppelin.client.ZeppelinClient; +import java.util.HashMap; +import java.util.Map; + public class ZeppelinTask extends AbstractTaskExecutor { @@ -73,9 +77,15 @@ public class ZeppelinTask extends AbstractTaskExecutor { try { String noteId = this.zeppelinParameters.getNoteId(); String paragraphId = this.zeppelinParameters.getParagraphId(); + String parameters = this.zeppelinParameters.getParameters(); + Map zeppelinParamsMap = new HashMap<>(); + if (parameters != null) { + ObjectMapper mapper = new ObjectMapper(); + zeppelinParamsMap = mapper.readValue(parameters, Map.class); + } // Submit zeppelin task - ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId); + ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap); String resultContent = paragraphResult.getResultInText(); Status status = paragraphResult.getStatus(); final int exitStatusCode = mapStatusToExitCode(status); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java index 30fb346f5d..273e5b09f1 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java @@ -27,6 +27,7 @@ import static org.powermock.api.mockito.PowerMockito.mock; import static org.powermock.api.mockito.PowerMockito.spy; import static org.powermock.api.mockito.PowerMockito.when; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.spi.utils.JSONUtils; @@ -44,17 +45,23 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import java.util.HashMap; +import java.util.Map; + @RunWith(PowerMockRunner.class) @PrepareForTest({ ZeppelinTask.class, ZeppelinClient.class, + ObjectMapper.class, }) @PowerMockIgnore({"javax.*"}) public class ZeppelinTaskTest { private static final String MOCK_NOTE_ID = "2GYJR92R7"; private static final String MOCK_PARAGRAPH_ID = "paragraph_1648793472526_1771221396"; + private static final String MOCK_PARAMETERS = "{\"key1\": \"value1\", \"key2\": \"value2\"}"; + private final ObjectMapper mapper = new ObjectMapper(); private ZeppelinClient zClient; private ZeppelinTask zeppelinTask; @@ -73,7 +80,7 @@ public class ZeppelinTaskTest { // use mocked zClient in zeppelinTask doReturn(this.zClient).when(this.zeppelinTask, "getZeppelinClient"); - when(this.zClient.executeParagraph(any(), any())).thenReturn(this.paragraphResult); + when(this.zClient.executeParagraph(any(), any(), any(Map.class))).thenReturn(this.paragraphResult); when(paragraphResult.getResultInText()).thenReturn("mock-zeppelin-paragraph-execution-result"); this.zeppelinTask.init(); } @@ -82,7 +89,9 @@ public class ZeppelinTaskTest { public void testHandleWithParagraphExecutionSucess() throws Exception { when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED); this.zeppelinTask.handle(); - Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, MOCK_PARAGRAPH_ID); + Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, + MOCK_PARAGRAPH_ID, + (Map) mapper.readValue(MOCK_PARAMETERS, Map.class)); Mockito.verify(this.paragraphResult).getResultInText(); Mockito.verify(this.paragraphResult).getStatus(); Assert.assertEquals(EXIT_CODE_SUCCESS, this.zeppelinTask.getExitStatusCode()); @@ -92,7 +101,9 @@ public class ZeppelinTaskTest { public void testHandleWithParagraphExecutionAborted() throws Exception { when(this.paragraphResult.getStatus()).thenReturn(Status.ABORT); this.zeppelinTask.handle(); - Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, MOCK_PARAGRAPH_ID); + Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, + MOCK_PARAGRAPH_ID, + (Map) mapper.readValue(MOCK_PARAMETERS, Map.class)); Mockito.verify(this.paragraphResult).getResultInText(); Mockito.verify(this.paragraphResult).getStatus(); Assert.assertEquals(EXIT_CODE_KILL, this.zeppelinTask.getExitStatusCode()); @@ -102,7 +113,9 @@ public class ZeppelinTaskTest { public void testHandleWithParagraphExecutionError() throws Exception { when(this.paragraphResult.getStatus()).thenReturn(Status.ERROR); this.zeppelinTask.handle(); - Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, MOCK_PARAGRAPH_ID); + Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, + MOCK_PARAGRAPH_ID, + (Map) mapper.readValue(MOCK_PARAMETERS, Map.class)); Mockito.verify(this.paragraphResult).getResultInText(); Mockito.verify(this.paragraphResult).getStatus(); Assert.assertEquals(EXIT_CODE_FAILURE, this.zeppelinTask.getExitStatusCode()); @@ -110,11 +123,13 @@ public class ZeppelinTaskTest { @Test public void testHandleWithParagraphExecutionException() throws Exception { - when(this.zClient.executeParagraph(any(), any())). + when(this.zClient.executeParagraph(any(), any(), any(Map.class))). thenThrow(new Exception("Something wrong happens from zeppelin side")); // when(this.paragraphResult.getStatus()).thenReturn(Status.ERROR); this.zeppelinTask.handle(); - Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, MOCK_PARAGRAPH_ID); + Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, + MOCK_PARAGRAPH_ID, + (Map) mapper.readValue(MOCK_PARAMETERS, Map.class)); Mockito.verify(this.paragraphResult, Mockito.times(0)).getResultInText(); Mockito.verify(this.paragraphResult, Mockito.times(0)).getStatus(); Assert.assertEquals(EXIT_CODE_FAILURE, this.zeppelinTask.getExitStatusCode()); @@ -122,10 +137,9 @@ public class ZeppelinTaskTest { private String buildZeppelinTaskParameters() { ZeppelinParameters zeppelinParameters = new ZeppelinParameters(); - String noteId = MOCK_NOTE_ID; - String paragraphId = MOCK_PARAGRAPH_ID; - zeppelinParameters.setNoteId(noteId); - zeppelinParameters.setParagraphId(paragraphId); + zeppelinParameters.setNoteId(MOCK_NOTE_ID); + zeppelinParameters.setParagraphId(MOCK_PARAGRAPH_ID); + zeppelinParameters.setParameters(MOCK_PARAMETERS); return JSONUtils.toJsonString(zeppelinParameters); } diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index 381de4232c..d0e7aa2bd7 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -606,6 +606,9 @@ export default { zeppelin_paragraph_id: 'zeppelinParagraphId', zeppelin_paragraph_id_tips: 'Please enter the paragraph id of your zeppelin paragraph', + zeppelin_parameters: 'parameters', + zeppelin_parameters_tips: + 'Please enter the parameters for zeppelin dynamic form', jupyter_conda_env_name: 'condaEnvName', jupyter_conda_env_name_tips: 'Please enter the conda environment name of papermill', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index ccc93958a9..27e3119470 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -599,6 +599,9 @@ export default { zeppelin_note_id_tips: '请输入zeppelin note id', zeppelin_paragraph_id: 'zeppelin_paragraph_id', zeppelin_paragraph_id_tips: '请输入zeppelin paragraph id', + zeppelin_parameters: 'parameters', + zeppelin_parameters_tips: + '请输入zeppelin dynamic form参数', jupyter_conda_env_name: 'condaEnvName', jupyter_conda_env_name_tips: '请输入papermill所在的conda环境名', jupyter_input_note_path: 'inputNotePath', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts index 5cbe70f15a..ed7caf05df 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts @@ -56,6 +56,14 @@ export function useZeppelin(model: { [field: string]: any }): IJsonItem[] { } } }, + { + type: 'input', + field: 'parameters', + name: t('project.node.zeppelin_parameters'), + props: { + placeholder: t('project.node.zeppelin_parameters_tips') + } + }, ...useCustomParams({ model, field: 'localParams', isSimple: false }) ] } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index 2ea47c72cb..7c87c22a60 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -316,6 +316,7 @@ export function formatParams(data: INodeData): { if (data.taskType === 'ZEPPELIN') { taskParams.noteId = data.zeppelinNoteId taskParams.paragraphId = data.zeppelinParagraphId + taskParams.parameters = data.parameters } if (data.taskType === 'K8S') {