From 4be3b877c3c143e2b5c58c017b641fd927c0dff5 Mon Sep 17 00:00:00 2001 From: Eric Gao Date: Sat, 18 Jun 2022 13:34:24 +0800 Subject: [PATCH] [feature][task] Enable zeppelin schedule a whole zeppelin note (#10434) --- docs/docs/en/guide/task/zeppelin.md | 2 +- docs/docs/zh/guide/task/zeppelin.md | 2 +- .../task/zeppelin/ZeppelinParameters.java | 2 +- .../plugin/task/zeppelin/ZeppelinTask.java | 65 +++++++++++++++---- .../task/zeppelin/ZeppelinTaskTest.java | 37 ++++++++++- .../components/node/fields/use-jupyter.ts | 54 --------------- .../components/node/fields/use-zeppelin.ts | 9 --- 7 files changed, 89 insertions(+), 82 deletions(-) diff --git a/docs/docs/en/guide/task/zeppelin.md b/docs/docs/en/guide/task/zeppelin.md index ea2d203394..7dae3c4dca 100644 --- a/docs/docs/en/guide/task/zeppelin.md +++ b/docs/docs/en/guide/task/zeppelin.md @@ -21,7 +21,7 @@ it will call `Zeppelin Client API` to trigger zeppelin notebook paragraph. Click - Failed retry interval: The time interval for resubmitting the task after a failed task. It supports drop-down and hand-filling. - Timeout alarm: Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will send and the task execution will fail. - Zeppelin Note ID: The unique note id for a zeppelin notebook note. -- Zeppelin Paragraph ID: The unique paragraph id for a zeppelin notebook paragraph. +- Zeppelin Paragraph ID: The unique paragraph id for a zeppelin notebook paragraph. If you want to schedule a whole note at a time, leave this field blank. - Zeppelin Parameters: Parameters in json format used for zeppelin dynamic form. ## Task Example diff --git a/docs/docs/zh/guide/task/zeppelin.md b/docs/docs/zh/guide/task/zeppelin.md index fa2477a7dc..18d36dfb71 100644 --- a/docs/docs/zh/guide/task/zeppelin.md +++ b/docs/docs/zh/guide/task/zeppelin.md @@ -22,7 +22,7 @@ - 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败. - 前置任务:选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。 - Zeppelin Note ID:Zeppelin Note对应的唯一ID。 -- Zepplin Paragraph:Zeppelin Paragraph对应的唯一ID。 +- Zeppelin Paragraph ID:Zeppelin Paragraph对应的唯一ID。如果你想一次性调度整个note,这一栏不填即可。 - Zeppelin Parameters: 用于传入Zeppelin Dynamic Form的参数。 ## Task Example diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java index b379985f5d..4ae64b69de 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java @@ -36,7 +36,7 @@ public class ZeppelinParameters extends AbstractParameters { @Override public boolean checkParameters() { - return StringUtils.isNotEmpty(this.noteId) && StringUtils.isNotEmpty(this.paragraphId); + return StringUtils.isNotEmpty(this.noteId); } @Override diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java index eb32ac5996..062b56f503 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.plugin.task.zeppelin; import com.fasterxml.jackson.databind.ObjectMapper; +import kong.unirest.Unirest; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; @@ -25,11 +26,12 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.PropertyUtils; import org.apache.zeppelin.client.ClientConfig; +import org.apache.zeppelin.client.NoteResult; import org.apache.zeppelin.client.ParagraphResult; import org.apache.zeppelin.client.Status; import org.apache.zeppelin.client.ZeppelinClient; - import java.util.HashMap; +import java.util.List; import java.util.Map; @@ -85,11 +87,34 @@ public class ZeppelinTask extends AbstractTaskExecutor { } // Submit zeppelin task - ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap); - String resultContent = paragraphResult.getResultInText(); - Status status = paragraphResult.getStatus(); - final int exitStatusCode = mapStatusToExitCode(status); + String resultContent; + Status status = Status.FINISHED; + if (paragraphId == null) { + NoteResult noteResult = this.zClient.executeNote(noteId, zeppelinParamsMap); + List paragraphResultList = noteResult.getParagraphResultList(); + StringBuilder resultContentBuilder = new StringBuilder(); + for (ParagraphResult paragraphResult : paragraphResultList) { + resultContentBuilder.append( + String.format( + "paragraph_id: %s, paragraph_result: %s\n", + paragraphResult.getParagraphId(), + paragraphResult.getResultInText())); + status = paragraphResult.getStatus(); + // we treat note execution as failure if any paragraph in the note fails + // status will be further processed in method mapStatusToExitCode below + if (status != Status.FINISHED) { + break; + } + } + resultContent = resultContentBuilder.toString(); + } else { + ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap); + resultContent = paragraphResult.getResultInText(); + status = paragraphResult.getStatus(); + } + // Use noteId-paragraph-Id as app id + final int exitStatusCode = mapStatusToExitCode(status); setAppIds(String.format("%s-%s", noteId, paragraphId)); setExitStatusCode(exitStatusCode); logger.info("zeppelin task finished with results: {}", resultContent); @@ -146,15 +171,27 @@ public class ZeppelinTask extends AbstractTaskExecutor { super.cancelApplication(status); String noteId = this.zeppelinParameters.getNoteId(); String paragraphId = this.zeppelinParameters.getParagraphId(); - logger.info("trying terminate zeppelin task, taskId: {}, noteId: {}, paragraphId: {}", - this.taskExecutionContext.getTaskInstanceId(), - noteId, - paragraphId); - this.zClient.cancelParagraph(noteId, paragraphId); - logger.info("zeppelin task terminated, taskId: {}, noteId: {}, paragraphId: {}", - this.taskExecutionContext.getTaskInstanceId(), - noteId, - paragraphId); + if (paragraphId == null) { + logger.info("trying terminate zeppelin task, taskId: {}, noteId: {}", + this.taskExecutionContext.getTaskInstanceId(), + noteId); + Unirest.config().defaultBaseUrl(PropertyUtils.getString(TaskConstants.ZEPPELIN_REST_URL) + "/api"); + Unirest.delete("/notebook/job/{noteId}").routeParam("noteId", noteId).asJson(); + logger.info("zeppelin task terminated, taskId: {}, noteId: {}", + this.taskExecutionContext.getTaskInstanceId(), + noteId); + } else { + logger.info("trying terminate zeppelin task, taskId: {}, noteId: {}, paragraphId: {}", + this.taskExecutionContext.getTaskInstanceId(), + noteId, + paragraphId); + this.zClient.cancelParagraph(noteId, paragraphId); + logger.info("zeppelin task terminated, taskId: {}, noteId: {}, paragraphId: {}", + this.taskExecutionContext.getTaskInstanceId(), + noteId, + paragraphId); + } + } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java index 273e5b09f1..96d7466343 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java @@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.zeppelin.client.ParagraphResult; +import org.apache.zeppelin.client.NoteResult; import org.apache.zeppelin.client.Status; import org.apache.zeppelin.client.ZeppelinClient; import org.junit.Assert; @@ -45,7 +46,6 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import java.util.HashMap; import java.util.Map; @@ -66,6 +66,7 @@ public class ZeppelinTaskTest { private ZeppelinClient zClient; private ZeppelinTask zeppelinTask; private ParagraphResult paragraphResult; + private NoteResult noteResult; @Before public void before() throws Exception { @@ -86,7 +87,7 @@ public class ZeppelinTaskTest { } @Test - public void testHandleWithParagraphExecutionSucess() throws Exception { + public void testHandleWithParagraphExecutionSuccess() throws Exception { when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED); this.zeppelinTask.handle(); Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID, @@ -135,6 +136,30 @@ public class ZeppelinTaskTest { Assert.assertEquals(EXIT_CODE_FAILURE, this.zeppelinTask.getExitStatusCode()); } + @Test + public void testHandleWithNoteExecutionSuccess() throws Exception { + String zeppelinParametersWithNoParagraphId = buildZeppelinTaskParametersWithNoParagraphId(); + TaskExecutionContext taskExecutionContext= PowerMockito.mock(TaskExecutionContext.class); + when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinParametersWithNoParagraphId); + this.zeppelinTask = spy(new ZeppelinTask(taskExecutionContext)); + + // mock zClient and note result + this.zClient = mock(ZeppelinClient.class); + this.noteResult = mock(NoteResult.class); + + // use mocked zClient in zeppelinTask + doReturn(this.zClient).when(this.zeppelinTask, "getZeppelinClient"); + when(this.zClient.executeNote(any(), any(Map.class))).thenReturn(this.noteResult); + when(paragraphResult.getResultInText()).thenReturn("mock-zeppelin-paragraph-execution-result"); + this.zeppelinTask.init(); + when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED); + this.zeppelinTask.handle(); + Mockito.verify(this.zClient).executeNote(MOCK_NOTE_ID, + (Map) mapper.readValue(MOCK_PARAMETERS, Map.class)); + Mockito.verify(this.noteResult).getParagraphResultList(); + Assert.assertEquals(EXIT_CODE_SUCCESS, this.zeppelinTask.getExitStatusCode()); + } + private String buildZeppelinTaskParameters() { ZeppelinParameters zeppelinParameters = new ZeppelinParameters(); zeppelinParameters.setNoteId(MOCK_NOTE_ID); @@ -143,4 +168,12 @@ public class ZeppelinTaskTest { return JSONUtils.toJsonString(zeppelinParameters); } + + private String buildZeppelinTaskParametersWithNoParagraphId() { + ZeppelinParameters zeppelinParameters = new ZeppelinParameters(); + zeppelinParameters.setNoteId(MOCK_NOTE_ID); + zeppelinParameters.setParameters(MOCK_PARAMETERS); + + return JSONUtils.toJsonString(zeppelinParameters); + } } \ No newline at end of file diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-jupyter.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-jupyter.ts index a35510840f..c9512bb673 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-jupyter.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-jupyter.ts @@ -80,15 +80,6 @@ export function useJupyter(model: { [field: string]: any }): IJsonItem[] { props: { placeholder: t('project.node.jupyter_parameters_tips') } - // validate: { - // trigger: ['input', 'blur'], - // required: false, - // validator(validate: any, value: string) { - // if (!value) { - // return new Error(t('project.node.jupyter_parameters_tips')) - // } - // } - // } }, { type: 'input', @@ -97,15 +88,6 @@ export function useJupyter(model: { [field: string]: any }): IJsonItem[] { props: { placeholder: t('project.node.jupyter_kernel_tips') } - // validate: { - // trigger: ['input', 'blur'], - // required: false, - // validator(validate: any, value: string) { - // if (!value) { - // return new Error(t('project.node.jupyter_kernel_tips')) - // } - // } - // } }, { type: 'input', @@ -114,15 +96,6 @@ export function useJupyter(model: { [field: string]: any }): IJsonItem[] { props: { placeholder: t('project.node.jupyter_engine_tips') } - // validate: { - // trigger: ['input', 'blur'], - // required: false, - // validator(validate: any, value: string) { - // if (!value) { - // return new Error(t('project.node.jupyter_engine_tips')) - // } - // } - // } }, { type: 'input', @@ -131,15 +104,6 @@ export function useJupyter(model: { [field: string]: any }): IJsonItem[] { props: { placeholder: t('project.node.jupyter_execution_timeout_tips') } - // validate: { - // trigger: ['input', 'blur'], - // required: false, - // validator(validate: any, value: string) { - // if (!value) { - // return new Error(t('project.node.jupyter_execution_timeout_tips')) - // } - // } - // } }, { type: 'input', @@ -148,15 +112,6 @@ export function useJupyter(model: { [field: string]: any }): IJsonItem[] { props: { placeholder: t('project.node.jupyter_start_timeout_tips') } - // validate: { - // trigger: ['input', 'blur'], - // required: false, - // validator(validate: any, value: string) { - // if (!value) { - // return new Error(t('project.node.jupyter_start_timeout_tips')) - // } - // } - // } }, { type: 'input', @@ -165,15 +120,6 @@ export function useJupyter(model: { [field: string]: any }): IJsonItem[] { props: { placeholder: t('project.node.jupyter_others_tips') } - // validate: { - // trigger: ['input', 'blur'], - // required: false, - // validator(validate: any, value: string) { - // if (!value) { - // return new Error(t('project.node.jupyter_others_tips')) - // } - // } - // } }, ...useCustomParams({ model, field: 'localParams', isSimple: false }) ] diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts index ed7caf05df..5cbc1324f9 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts @@ -45,15 +45,6 @@ export function useZeppelin(model: { [field: string]: any }): IJsonItem[] { name: t('project.node.zeppelin_paragraph_id'), props: { placeholder: t('project.node.zeppelin_paragraph_id_tips') - }, - validate: { - trigger: ['input', 'blur'], - required: true, - validator(validate: any, value: string) { - if (!value) { - return new Error(t('project.node.zeppelin_paragraph_id_tips')) - } - } } }, {