@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.plugin.task.zeppelin ;
import com.fasterxml.jackson.databind.ObjectMapper ;
import kong.unirest.Unirest ;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor ;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants ;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext ;
@ -25,11 +26,12 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import org.apache.dolphinscheduler.spi.utils.JSONUtils ;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils ;
import org.apache.zeppelin.client.ClientConfig ;
import org.apache.zeppelin.client.NoteResult ;
import org.apache.zeppelin.client.ParagraphResult ;
import org.apache.zeppelin.client.Status ;
import org.apache.zeppelin.client.ZeppelinClient ;
import java.util.HashMap ;
import java.util.List ;
import java.util.Map ;
@ -85,11 +87,34 @@ public class ZeppelinTask extends AbstractTaskExecutor {
}
// Submit zeppelin task
String resultContent ;
Status status = Status . FINISHED ;
if ( paragraphId = = null ) {
NoteResult noteResult = this . zClient . executeNote ( noteId , zeppelinParamsMap ) ;
List < ParagraphResult > paragraphResultList = noteResult . getParagraphResultList ( ) ;
StringBuilder resultContentBuilder = new StringBuilder ( ) ;
for ( ParagraphResult paragraphResult : paragraphResultList ) {
resultContentBuilder . append (
String . format (
"paragraph_id: %s, paragraph_result: %s\n" ,
paragraphResult . getParagraphId ( ) ,
paragraphResult . getResultInText ( ) ) ) ;
status = paragraphResult . getStatus ( ) ;
// we treat note execution as failure if any paragraph in the note fails
// status will be further processed in method mapStatusToExitCode below
if ( status ! = Status . FINISHED ) {
break ;
}
}
resultContent = resultContentBuilder . toString ( ) ;
} else {
ParagraphResult paragraphResult = this . zClient . executeParagraph ( noteId , paragraphId , zeppelinParamsMap ) ;
String resultContent = paragraphResult . getResultInText ( ) ;
Status status = paragraphResult . getStatus ( ) ;
final int exitStatusCode = mapStatusToExitCode ( status ) ;
resultContent = paragraphResult . getResultInText ( ) ;
status = paragraphResult . getStatus ( ) ;
}
// Use noteId-paragraph-Id as app id
final int exitStatusCode = mapStatusToExitCode ( status ) ;
setAppIds ( String . format ( "%s-%s" , noteId , paragraphId ) ) ;
setExitStatusCode ( exitStatusCode ) ;
logger . info ( "zeppelin task finished with results: {}" , resultContent ) ;
@ -146,6 +171,16 @@ public class ZeppelinTask extends AbstractTaskExecutor {
super . cancelApplication ( status ) ;
String noteId = this . zeppelinParameters . getNoteId ( ) ;
String paragraphId = this . zeppelinParameters . getParagraphId ( ) ;
if ( paragraphId = = null ) {
logger . info ( "trying terminate zeppelin task, taskId: {}, noteId: {}" ,
this . taskExecutionContext . getTaskInstanceId ( ) ,
noteId ) ;
Unirest . config ( ) . defaultBaseUrl ( PropertyUtils . getString ( TaskConstants . ZEPPELIN_REST_URL ) + "/api" ) ;
Unirest . delete ( "/notebook/job/{noteId}" ) . routeParam ( "noteId" , noteId ) . asJson ( ) ;
logger . info ( "zeppelin task terminated, taskId: {}, noteId: {}" ,
this . taskExecutionContext . getTaskInstanceId ( ) ,
noteId ) ;
} else {
logger . info ( "trying terminate zeppelin task, taskId: {}, noteId: {}, paragraphId: {}" ,
this . taskExecutionContext . getTaskInstanceId ( ) ,
noteId ,
@ -158,3 +193,5 @@ public class ZeppelinTask extends AbstractTaskExecutor {
}
}
}