diff --git a/docs/docs/en/guide/resource/configuration.md b/docs/docs/en/guide/resource/configuration.md index 4263354197..5614d3c359 100644 --- a/docs/docs/en/guide/resource/configuration.md +++ b/docs/docs/en/guide/resource/configuration.md @@ -121,9 +121,6 @@ development.state=false # rpc port alert.rpc.port=50052 -# Url endpoint for zeppelin RESTful API -zeppelin.rest.url=http://localhost:8080 - # set path of conda.sh conda.path=/opt/anaconda3/etc/profile.d/conda.sh diff --git a/docs/docs/en/guide/task/zeppelin.md b/docs/docs/en/guide/task/zeppelin.md index fd40b2fc49..c88c0b92b8 100644 --- a/docs/docs/en/guide/task/zeppelin.md +++ b/docs/docs/en/guide/task/zeppelin.md @@ -26,6 +26,7 @@ it will call `Zeppelin Client API` to trigger zeppelin notebook paragraph. Click | Timeout alarm | Check Timeout Alarm and Timeout Failure. When the task exceeds the "timeout duration", an alarm email will be sent and the task execution will fail. | | Zeppelin Note ID | The unique note id for a zeppelin notebook note. | | Zeppelin Paragraph ID | The unique paragraph id for a zeppelin notebook paragraph. If you want to schedule a whole note at a time, leave this field blank. | +| Zeppelin Rest Endpoint | The REST endpoint of your zeppelin server | | Zeppelin Parameters | Parameters in json format used for zeppelin dynamic form. | ## Task Example diff --git a/docs/docs/zh/guide/task/zeppelin.md b/docs/docs/zh/guide/task/zeppelin.md index 18d36dfb71..eb802d5cfb 100644 --- a/docs/docs/zh/guide/task/zeppelin.md +++ b/docs/docs/zh/guide/task/zeppelin.md @@ -23,6 +23,7 @@ - 前置任务:选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。 - Zeppelin Note ID:Zeppelin Note对应的唯一ID。 - Zeppelin Paragraph ID:Zeppelin Paragraph对应的唯一ID。如果你想一次性调度整个note,这一栏不填即可。 +- Zeppelin Rest Endpoint:您的Zeppelin服务的REST Endpoint。 - Zeppelin Parameters: 用于传入Zeppelin Dynamic Form的参数。 ## Task Example diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties index 97355f8811..3b12d7c606 100644 --- a/dolphinscheduler-common/src/main/resources/common.properties +++ b/dolphinscheduler-common/src/main/resources/common.properties @@ -101,9 +101,6 @@ development.state=false # rpc port alert.rpc.port=50052 -# Url endpoint for zeppelin RESTful API -zeppelin.rest.url=http://localhost:8080 - # set path of conda.sh conda.path=/opt/anaconda3/etc/profile.d/conda.sh diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java index 1df63f8a27..03010825fa 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java @@ -421,10 +421,6 @@ public class TaskConstants { public static final int LOG_LINES = 500; public static final String NAMESPACE_NAME = "name"; public static final String CLUSTER = "cluster"; - /** - * zeppelin config - */ - public static final String ZEPPELIN_REST_URL = "zeppelin.rest.url"; /** * conda config used by jupyter task plugin diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java index 4ae64b69de..0c67e4f69b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java @@ -32,11 +32,12 @@ public class ZeppelinParameters extends AbstractParameters { */ private String noteId; private String paragraphId; + private String restEndpoint; private String parameters; @Override public boolean checkParameters() { - return StringUtils.isNotEmpty(this.noteId); + return StringUtils.isNotEmpty(this.noteId) && StringUtils.isNotEmpty(this.restEndpoint); } @Override @@ -68,13 +69,22 @@ public class ZeppelinParameters extends AbstractParameters { this.parameters = parameters; } + public String getRestEndpoint() { + return restEndpoint; + } + + public void setRestEndpoint(String restEndpoint) { + this.restEndpoint = restEndpoint; + } + @Override public String toString() { - return "ZeppelinParameters{" + - "noteId='" + noteId + '\'' + - ", paragraphId='" + paragraphId + '\'' + - ", parameters='" + parameters + '\'' + - '}'; + return "ZeppelinParameters{" + + "noteId='" + noteId + '\'' + + ", paragraphId='" + paragraphId + '\'' + + ", restEndpoint='" + restEndpoint + '\'' + + ", parameters='" + parameters + '\'' + + '}'; } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java index 062b56f503..ca850dcbbc 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java @@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.spi.utils.JSONUtils; -import org.apache.dolphinscheduler.spi.utils.PropertyUtils; import org.apache.zeppelin.client.ClientConfig; import org.apache.zeppelin.client.NoteResult; import org.apache.zeppelin.client.ParagraphResult; @@ -77,9 +76,9 @@ public class ZeppelinTask extends AbstractTaskExecutor { @Override public void handle() throws Exception { try { - String noteId = this.zeppelinParameters.getNoteId(); - String paragraphId = this.zeppelinParameters.getParagraphId(); - String parameters = this.zeppelinParameters.getParameters(); + final String noteId = this.zeppelinParameters.getNoteId(); + final String paragraphId = this.zeppelinParameters.getParagraphId(); + final String parameters = this.zeppelinParameters.getParameters(); Map zeppelinParamsMap = new HashMap<>(); if (parameters != null) { ObjectMapper mapper = new ObjectMapper(); @@ -90,8 +89,8 @@ public class ZeppelinTask extends AbstractTaskExecutor { String resultContent; Status status = Status.FINISHED; if (paragraphId == null) { - NoteResult noteResult = this.zClient.executeNote(noteId, zeppelinParamsMap); - List paragraphResultList = noteResult.getParagraphResultList(); + final NoteResult noteResult = this.zClient.executeNote(noteId, zeppelinParamsMap); + final List paragraphResultList = noteResult.getParagraphResultList(); StringBuilder resultContentBuilder = new StringBuilder(); for (ParagraphResult paragraphResult : paragraphResultList) { resultContentBuilder.append( @@ -108,7 +107,7 @@ public class ZeppelinTask extends AbstractTaskExecutor { } resultContent = resultContentBuilder.toString(); } else { - ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap); + final ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap); resultContent = paragraphResult.getResultInText(); status = paragraphResult.getStatus(); } @@ -130,12 +129,12 @@ public class ZeppelinTask extends AbstractTaskExecutor { * @return ZeppelinClient */ private ZeppelinClient getZeppelinClient() { - final String zeppelinRestUrl = PropertyUtils.getString(TaskConstants.ZEPPELIN_REST_URL); - ClientConfig clientConfig = new ClientConfig(zeppelinRestUrl); + final String restEndpoint = zeppelinParameters.getRestEndpoint(); + final ClientConfig clientConfig = new ClientConfig(restEndpoint); ZeppelinClient zClient = null; try { zClient = new ZeppelinClient(clientConfig); - String zeppelinVersion = zClient.getVersion(); + final String zeppelinVersion = zClient.getVersion(); logger.info("zeppelin version: {}", zeppelinVersion); } catch (Exception e) { // TODO: complete error handling @@ -168,14 +167,15 @@ public class ZeppelinTask extends AbstractTaskExecutor { @Override public void cancelApplication(boolean status) throws Exception { + final String restEndpoint = this.zeppelinParameters.getRestEndpoint(); super.cancelApplication(status); - String noteId = this.zeppelinParameters.getNoteId(); - String paragraphId = this.zeppelinParameters.getParagraphId(); + final String noteId = this.zeppelinParameters.getNoteId(); + final String paragraphId = this.zeppelinParameters.getParagraphId(); if (paragraphId == null) { logger.info("trying terminate zeppelin task, taskId: {}, noteId: {}", this.taskExecutionContext.getTaskInstanceId(), noteId); - Unirest.config().defaultBaseUrl(PropertyUtils.getString(TaskConstants.ZEPPELIN_REST_URL) + "/api"); + Unirest.config().defaultBaseUrl(restEndpoint + "/api"); Unirest.delete("/notebook/job/{noteId}").routeParam("noteId", noteId).asJson(); logger.info("zeppelin task terminated, taskId: {}, noteId: {}", this.taskExecutionContext.getTaskInstanceId(), diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java index 96d7466343..4670e87872 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java @@ -61,6 +61,7 @@ public class ZeppelinTaskTest { private static final String MOCK_NOTE_ID = "2GYJR92R7"; private static final String MOCK_PARAGRAPH_ID = "paragraph_1648793472526_1771221396"; private static final String MOCK_PARAMETERS = "{\"key1\": \"value1\", \"key2\": \"value2\"}"; + private static final String MOCK_REST_ENDPOINT = "localhost:8080"; private final ObjectMapper mapper = new ObjectMapper(); private ZeppelinClient zClient; @@ -164,6 +165,7 @@ public class ZeppelinTaskTest { ZeppelinParameters zeppelinParameters = new ZeppelinParameters(); zeppelinParameters.setNoteId(MOCK_NOTE_ID); zeppelinParameters.setParagraphId(MOCK_PARAGRAPH_ID); + zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT); zeppelinParameters.setParameters(MOCK_PARAMETERS); return JSONUtils.toJsonString(zeppelinParameters); @@ -173,6 +175,7 @@ public class ZeppelinTaskTest { ZeppelinParameters zeppelinParameters = new ZeppelinParameters(); zeppelinParameters.setNoteId(MOCK_NOTE_ID); zeppelinParameters.setParameters(MOCK_PARAMETERS); + zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT); return JSONUtils.toJsonString(zeppelinParameters); } diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index 3f7bc898ed..2ab410ed2d 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -617,6 +617,10 @@ export default { zeppelin_paragraph_id: 'zeppelinParagraphId', zeppelin_paragraph_id_tips: 'Please enter the paragraph id of your zeppelin paragraph', + zeppelin_parameters: 'parameters', + zeppelin_parameters_tips: 'Please enter the parameters for zeppelin dynamic form', + zeppelin_rest_endpoint: 'zeppelinRestEndpoint', + zeppelin_rest_endpoint_tips: 'Please enter the rest endpoint of your Zeppelin server', jupyter_conda_env_name: 'condaEnvName', jupyter_conda_env_name_tips: 'Please enter the conda environment name of papermill', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index 8e918a6ec1..d6a4758f77 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -605,12 +605,14 @@ export default { emr_steps_define_json_tips: '请输入EMR步骤定义', segment_separator: '分段执行符号', segment_separator_tips: '请输入分段执行符号', - zeppelin_note_id: 'zeppelin_note_id', + zeppelin_note_id: 'zeppelinNoteId', zeppelin_note_id_tips: '请输入zeppelin note id', - zeppelin_paragraph_id: 'zeppelin_paragraph_id', + zeppelin_paragraph_id: 'zeppelinParagraphId', zeppelin_paragraph_id_tips: '请输入zeppelin paragraph id', zeppelin_parameters: 'parameters', zeppelin_parameters_tips: '请输入zeppelin dynamic form参数', + zeppelin_rest_endpoint: 'zeppelinRestEndpoint', + zeppelin_rest_endpoint_tips: '请输入zeppelin server的rest endpoint', jupyter_conda_env_name: 'condaEnvName', jupyter_conda_env_name_tips: '请输入papermill所在的conda环境名', jupyter_input_note_path: 'inputNotePath', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts index 5cbc1324f9..a3077dbd2f 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts @@ -47,6 +47,23 @@ export function useZeppelin(model: { [field: string]: any }): IJsonItem[] { placeholder: t('project.node.zeppelin_paragraph_id_tips') } }, + { + type: 'input', + field: 'zeppelinRestEndpoint', + name: t('project.node.zeppelin_rest_endpoint'), + props: { + placeholder: t('project.node.zeppelin_rest_endpoint_tips') + }, + validate: { + trigger: ['input', 'blur'], + required: true, + validator(validate: any, value: string) { + if (!value) { + return new Error(t('project.node.zeppelin_rest_endpoint_tips')) + } + } + } + }, { type: 'input', field: 'parameters', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index 4ab18ccd05..20209639e5 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -329,6 +329,7 @@ export function formatParams(data: INodeData): { if (data.taskType === 'ZEPPELIN') { taskParams.noteId = data.zeppelinNoteId taskParams.paragraphId = data.zeppelinParagraphId + taskParams.restEndpoint = data.zeppelinRestEndpoint taskParams.parameters = data.parameters } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts index 187ff7532f..ced5d43759 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts @@ -298,6 +298,8 @@ interface ITaskParams { stepsDefineJson?: string zeppelinNoteId?: string zeppelinParagraphId?: string + zeppelinRestEndpoint?: string + restEndpoint?: string noteId?: string paragraphId?: string condaEnvName?: string