Browse Source

[Feature][Task Plugin] Increase zeppelin task stability in production (#11010)

* [Feature][Task Plugin] Increase zeppelin task stability in production (#10584)

* Add front-end and update docs for the production mode of zeppelin task

* Fix minor front-end bug of zeppelin task plugin

* Refactor ZeppelinParameters with lombok

* Fix formatting

* Replace @Data with @Getter, @Setter, @ToString to avoid decrease in test coverage
3.1.0-release
Eric Gao 2 years ago committed by GitHub
parent
commit
f689220290
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      docs/docs/en/guide/task/zeppelin.md
  2. 11
      docs/docs/zh/guide/task/zeppelin.md
  3. 49
      dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java
  4. 22
      dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
  5. 52
      dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
  6. 2
      dolphinscheduler-ui/src/locales/en_US/project.ts
  7. 2
      dolphinscheduler-ui/src/locales/zh_CN/project.ts
  8. 8
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts
  9. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
  10. 2
      dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

12
docs/docs/en/guide/task/zeppelin.md

@ -26,9 +26,21 @@ it will call `Zeppelin Client API` to trigger zeppelin notebook paragraph. Click
| Timeout alarm | Check Timeout Alarm and Timeout Failure. When the task exceeds the "timeout duration", an alarm email will be sent and the task execution will fail. |
| Zeppelin Note ID | The unique note id for a zeppelin notebook note. |
| Zeppelin Paragraph ID | The unique paragraph id for a zeppelin notebook paragraph. If you want to schedule a whole note at a time, leave this field blank. |
| Zeppelin Production Note Directory | The directory for cloned note in production mode. |
| Zeppelin Rest Endpoint | The REST endpoint of your zeppelin server |
| Zeppelin Parameters | Parameters in json format used for zeppelin dynamic form. |
## Production (Clone) Mode
- Fill in the optional `Zeppelin Production Note Directory` parameter to enable `Production Mode`.
- In `Production Mode`, the target note gets copied to the `Zeppelin Production Note Directory` you choose.
`Zeppelin Task Plugin` will execute the cloned note instead of the original one. Once execution done,
`Zeppelin Task Plugin` will delete the cloned note automatically.
Therefore, it increases the stability as the modification to a running note triggered by `Dolphin Scheduler`
will not affect the production task.
- If you leave the `Zeppelin Production Note Directory` empty, `Zeppelin Task Plugin` will execute the original note.
- 'Zeppelin Production Note Directory' should both start and end with a `slash`. e.g. `/production_note_directory/`
## Task Example
### Zeppelin Paragraph Task Example

11
docs/docs/zh/guide/task/zeppelin.md

@ -24,8 +24,19 @@
- Zeppelin Note ID:Zeppelin Note对应的唯一ID。
- Zeppelin Paragraph ID:Zeppelin Paragraph对应的唯一ID。如果你想一次性调度整个note,这一栏不填即可。
- Zeppelin Rest Endpoint:您的Zeppelin服务的REST Endpoint。
- Zeppelin Production Note Directory:生产模式下存放克隆note的目录。
- Zeppelin Parameters: 用于传入Zeppelin Dynamic Form的参数。
## 生产(克隆)模式
- 填上`Zeppelin Production Note Directory`参数以启动`生产模式`。
- 在`生产模式`下,目标note会被克隆到您所填的`Zeppelin Production Note Directory`目录下。
`Zeppelin任务插件`将会执行克隆出来的note并在执行成功后自动清除它。
因为在此模式下,如果您不小心修改了正在被`Dolphin Scheduler`调度的note,也不会影响到生产任务的执行,
从而提高了稳定性。
- 如果您选择不填`Zeppelin Production Note Directory`这个参数,`Zeppelin任务插件`将会执行您的原始note。
'Zeppelin Production Note Directory'参数在格式上应该以`斜杠`开头和结尾,例如 `/production_note_directory/`
## Task Example
### Zeppelin Paragraph Task Example

49
dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinParameters.java

@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.plugin.task.zeppelin;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
@ -24,6 +27,9 @@ import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.Collections;
import java.util.List;
@Getter
@Setter
@ToString
public class ZeppelinParameters extends AbstractParameters {
/**
@ -33,6 +39,7 @@ public class ZeppelinParameters extends AbstractParameters {
private String noteId;
private String paragraphId;
private String restEndpoint;
private String productionNoteDirectory;
private String parameters;
@Override
@ -45,46 +52,4 @@ public class ZeppelinParameters extends AbstractParameters {
return Collections.emptyList();
}
public String getNoteId() {
return noteId;
}
public void setNoteId(String noteId) {
this.noteId = noteId;
}
public String getParagraphId() {
return paragraphId;
}
public void setParagraphId(String paragraphId) {
this.paragraphId = paragraphId;
}
public String getParameters() {
return parameters;
}
public void setParameters(String parameters) {
this.parameters = parameters;
}
public String getRestEndpoint() {
return restEndpoint;
}
public void setRestEndpoint(String restEndpoint) {
this.restEndpoint = restEndpoint;
}
@Override
public String toString() {
return "ZeppelinParameters{"
+ "noteId='" + noteId + '\''
+ ", paragraphId='" + paragraphId + '\''
+ ", restEndpoint='" + restEndpoint + '\''
+ ", parameters='" + parameters + '\''
+ '}';
}
}

22
dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.DateUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.zeppelin.client.ClientConfig;
import org.apache.zeppelin.client.NoteResult;
@ -76,9 +77,11 @@ public class ZeppelinTask extends AbstractTaskExecutor {
@Override
public void handle() throws Exception {
try {
final String noteId = this.zeppelinParameters.getNoteId();
final String paragraphId = this.zeppelinParameters.getParagraphId();
final String productionNoteDirectory = this.zeppelinParameters.getProductionNoteDirectory();
final String parameters = this.zeppelinParameters.getParameters();
// noteId may be replaced with cloned noteId
String noteId = this.zeppelinParameters.getNoteId();
Map<String, String> zeppelinParamsMap = new HashMap<>();
if (parameters != null) {
ObjectMapper mapper = new ObjectMapper();
@ -88,6 +91,16 @@ public class ZeppelinTask extends AbstractTaskExecutor {
// Submit zeppelin task
String resultContent;
Status status = Status.FINISHED;
// If in production, clone the note and run the cloned one for stability
if (productionNoteDirectory != null) {
final String cloneNotePath = String.format(
"%s%s_%s",
productionNoteDirectory,
noteId,
DateUtils.getTimestampString());
noteId = this.zClient.cloneNote(noteId, cloneNotePath);
}
if (paragraphId == null) {
final NoteResult noteResult = this.zClient.executeNote(noteId, zeppelinParamsMap);
final List<ParagraphResult> paragraphResultList = noteResult.getParagraphResultList();
@ -105,6 +118,7 @@ public class ZeppelinTask extends AbstractTaskExecutor {
break;
}
}
resultContent = resultContentBuilder.toString();
} else {
final ParagraphResult paragraphResult = this.zClient.executeParagraph(noteId, paragraphId, zeppelinParamsMap);
@ -112,6 +126,11 @@ public class ZeppelinTask extends AbstractTaskExecutor {
status = paragraphResult.getStatus();
}
// Delete cloned note
if (productionNoteDirectory != null) {
this.zClient.deleteNote(noteId);
}
// Use noteId-paragraph-Id as app id
final int exitStatusCode = mapStatusToExitCode(status);
setAppIds(String.format("%s-%s", noteId, paragraphId));
@ -121,6 +140,7 @@ public class ZeppelinTask extends AbstractTaskExecutor {
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
logger.error("zeppelin task submit failed with error", e);
}
}
/**

52
dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java

@ -29,6 +29,7 @@ import static org.powermock.api.mockito.PowerMockito.when;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.utils.DateUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
@ -51,9 +52,10 @@ import java.util.Map;
@RunWith(PowerMockRunner.class)
@PrepareForTest({
ZeppelinTask.class,
ZeppelinClient.class,
ObjectMapper.class,
ZeppelinTask.class,
ZeppelinClient.class,
ObjectMapper.class,
DateUtils.class
})
@PowerMockIgnore({"javax.*"})
public class ZeppelinTaskTest {
@ -62,6 +64,8 @@ public class ZeppelinTaskTest {
private static final String MOCK_PARAGRAPH_ID = "paragraph_1648793472526_1771221396";
private static final String MOCK_PARAMETERS = "{\"key1\": \"value1\", \"key2\": \"value2\"}";
private static final String MOCK_REST_ENDPOINT = "localhost:8080";
private static final String MOCK_CLONE_NOTE_ID = "3GYJR92R8";
private static final String MOCK_PRODUCTION_DIRECTORY = "/prod/";
private final ObjectMapper mapper = new ObjectMapper();
private ZeppelinClient zClient;
@ -161,6 +165,37 @@ public class ZeppelinTaskTest {
Assert.assertEquals(EXIT_CODE_SUCCESS, this.zeppelinTask.getExitStatusCode());
}
@Test
public void testHandleWithNoteExecutionSuccessWithProductionSetting() throws Exception {
String zeppelinParametersWithNoParagraphId = buildZeppelinTaskParametersWithProductionSetting();
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
PowerMockito.mockStatic(DateUtils.class);
when(taskExecutionContext.getTaskParams()).thenReturn(zeppelinParametersWithNoParagraphId);
this.zeppelinTask = spy(new ZeppelinTask(taskExecutionContext));
// mock zClient and note result
this.zClient = mock(ZeppelinClient.class);
this.noteResult = mock(NoteResult.class);
// use mocked zClient in zeppelinTask
doReturn(this.zClient).when(this.zeppelinTask, "getZeppelinClient");
when(this.zClient.cloneNote(any(String.class), any(String.class))).thenReturn(MOCK_CLONE_NOTE_ID);
when(this.zClient.executeNote(any(), any(Map.class))).thenReturn(this.noteResult);
when(paragraphResult.getResultInText()).thenReturn("mock-zeppelin-paragraph-execution-result");
this.zeppelinTask.init();
when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED);
when(DateUtils.getTimestampString()).thenReturn("123456789");
this.zeppelinTask.handle();
Mockito.verify(this.zClient).cloneNote(
MOCK_NOTE_ID,
String.format("%s%s_%s", MOCK_PRODUCTION_DIRECTORY, MOCK_NOTE_ID, "123456789"));
Mockito.verify(this.zClient).executeNote(MOCK_CLONE_NOTE_ID,
(Map<String, String>) mapper.readValue(MOCK_PARAMETERS, Map.class));
Mockito.verify(this.noteResult).getParagraphResultList();
Mockito.verify(this.zClient).deleteNote(MOCK_CLONE_NOTE_ID);
Assert.assertEquals(EXIT_CODE_SUCCESS, this.zeppelinTask.getExitStatusCode());
}
private String buildZeppelinTaskParameters() {
ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
zeppelinParameters.setNoteId(MOCK_NOTE_ID);
@ -179,4 +214,15 @@ public class ZeppelinTaskTest {
return JSONUtils.toJsonString(zeppelinParameters);
}
private String buildZeppelinTaskParametersWithProductionSetting() {
ZeppelinParameters zeppelinParameters = new ZeppelinParameters();
zeppelinParameters.setNoteId(MOCK_NOTE_ID);
zeppelinParameters.setParameters(MOCK_PARAMETERS);
zeppelinParameters.setRestEndpoint(MOCK_REST_ENDPOINT);
zeppelinParameters.setProductionNoteDirectory(MOCK_PRODUCTION_DIRECTORY);
return JSONUtils.toJsonString(zeppelinParameters);
}
}

2
dolphinscheduler-ui/src/locales/en_US/project.ts

@ -621,6 +621,8 @@ export default {
zeppelin_parameters_tips: 'Please enter the parameters for zeppelin dynamic form',
zeppelin_rest_endpoint: 'zeppelinRestEndpoint',
zeppelin_rest_endpoint_tips: 'Please enter the rest endpoint of your Zeppelin server',
zeppelin_production_note_directory: 'Directory for cloned zeppelin note in production mode',
zeppelin_production_note_directory_tips: 'Please enter the production note directory to enable production mode',
jupyter_conda_env_name: 'condaEnvName',
jupyter_conda_env_name_tips:
'Please enter the conda environment name of papermill',

2
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@ -608,6 +608,8 @@ export default {
zeppelin_note_id: 'zeppelinNoteId',
zeppelin_note_id_tips: '请输入zeppelin note id',
zeppelin_paragraph_id: 'zeppelinParagraphId',
zeppelin_production_note_directory: '生产模式下存放克隆note的目录',
zeppelin_production_note_directory_tips: '请输入生产环境note目录以启用生产模式',
zeppelin_paragraph_id_tips: '请输入zeppelin paragraph id',
zeppelin_parameters: 'parameters',
zeppelin_parameters_tips: '请输入zeppelin dynamic form参数',

8
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-zeppelin.ts

@ -64,6 +64,14 @@ export function useZeppelin(model: { [field: string]: any }): IJsonItem[] {
}
}
},
{
type: 'input',
field: 'zeppelinProductionNoteDirectory',
name: t('project.node.zeppelin_production_note_directory'),
props: {
placeholder: t('project.node.zeppelin_production_note_directory_tips')
}
},
{
type: 'input',
field: 'parameters',

1
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -330,6 +330,7 @@ export function formatParams(data: INodeData): {
taskParams.noteId = data.zeppelinNoteId
taskParams.paragraphId = data.zeppelinParagraphId
taskParams.restEndpoint = data.zeppelinRestEndpoint
taskParams.productionNoteDirectory = data.zeppelinProductionNoteDirectory
taskParams.parameters = data.parameters
}

2
dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

@ -300,6 +300,8 @@ interface ITaskParams {
zeppelinParagraphId?: string
zeppelinRestEndpoint?: string
restEndpoint?: string
zeppelinProductionNoteDirectory?: string
productionNoteDirectory?: string
noteId?: string
paragraphId?: string
condaEnvName?: string

Loading…
Cancel
Save