@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext ;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters ;
import org.apache.dolphinscheduler.spi.utils.JSONUtils ;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils ;
import org.apache.zeppelin.client.ClientConfig ;
import org.apache.zeppelin.client.NoteResult ;
import org.apache.zeppelin.client.ParagraphResult ;
@ -77,9 +76,9 @@ public class ZeppelinTask extends AbstractTaskExecutor {
@Override
public void handle ( ) throws Exception {
try {
String noteId = this . zeppelinParameters . getNoteId ( ) ;
String paragraphId = this . zeppelinParameters . getParagraphId ( ) ;
String parameters = this . zeppelinParameters . getParameters ( ) ;
final String noteId = this . zeppelinParameters . getNoteId ( ) ;
final String paragraphId = this . zeppelinParameters . getParagraphId ( ) ;
final String parameters = this . zeppelinParameters . getParameters ( ) ;
Map < String , String > zeppelinParamsMap = new HashMap < > ( ) ;
if ( parameters ! = null ) {
ObjectMapper mapper = new ObjectMapper ( ) ;
@ -90,8 +89,8 @@ public class ZeppelinTask extends AbstractTaskExecutor {
String resultContent ;
Status status = Status . FINISHED ;
if ( paragraphId = = null ) {
NoteResult noteResult = this . zClient . executeNote ( noteId , zeppelinParamsMap ) ;
List < ParagraphResult > paragraphResultList = noteResult . getParagraphResultList ( ) ;
final NoteResult noteResult = this . zClient . executeNote ( noteId , zeppelinParamsMap ) ;
final List < ParagraphResult > paragraphResultList = noteResult . getParagraphResultList ( ) ;
StringBuilder resultContentBuilder = new StringBuilder ( ) ;
for ( ParagraphResult paragraphResult : paragraphResultList ) {
resultContentBuilder . append (
@ -108,7 +107,7 @@ public class ZeppelinTask extends AbstractTaskExecutor {
}
resultContent = resultContentBuilder . toString ( ) ;
} else {
ParagraphResult paragraphResult = this . zClient . executeParagraph ( noteId , paragraphId , zeppelinParamsMap ) ;
final ParagraphResult paragraphResult = this . zClient . executeParagraph ( noteId , paragraphId , zeppelinParamsMap ) ;
resultContent = paragraphResult . getResultInText ( ) ;
status = paragraphResult . getStatus ( ) ;
}
@ -130,12 +129,12 @@ public class ZeppelinTask extends AbstractTaskExecutor {
* @return ZeppelinClient
* /
private ZeppelinClient getZeppelinClient ( ) {
final String zeppelinRestUrl = PropertyUtils . getString ( TaskConstants . ZEPPELIN_REST_URL ) ;
ClientConfig clientConfig = new ClientConfig ( zeppelinRestUrl ) ;
final String restEndpoint = zeppelinParameters . getRestEndpoint ( ) ;
final ClientConfig clientConfig = new ClientConfig ( restEndpoint ) ;
ZeppelinClient zClient = null ;
try {
zClient = new ZeppelinClient ( clientConfig ) ;
String zeppelinVersion = zClient . getVersion ( ) ;
final String zeppelinVersion = zClient . getVersion ( ) ;
logger . info ( "zeppelin version: {}" , zeppelinVersion ) ;
} catch ( Exception e ) {
// TODO: complete error handling
@ -168,14 +167,15 @@ public class ZeppelinTask extends AbstractTaskExecutor {
@Override
public void cancelApplication ( boolean status ) throws Exception {
final String restEndpoint = this . zeppelinParameters . getRestEndpoint ( ) ;
super . cancelApplication ( status ) ;
String noteId = this . zeppelinParameters . getNoteId ( ) ;
String paragraphId = this . zeppelinParameters . getParagraphId ( ) ;
final String noteId = this . zeppelinParameters . getNoteId ( ) ;
final String paragraphId = this . zeppelinParameters . getParagraphId ( ) ;
if ( paragraphId = = null ) {
logger . info ( "trying terminate zeppelin task, taskId: {}, noteId: {}" ,
this . taskExecutionContext . getTaskInstanceId ( ) ,
noteId ) ;
Unirest . config ( ) . defaultBaseUrl ( PropertyUtils . getString ( TaskConstants . ZEPPELIN_REST_URL ) + "/api" ) ;
Unirest . config ( ) . defaultBaseUrl ( restEndpoint + "/api" ) ;
Unirest . delete ( "/notebook/job/{noteId}" ) . routeParam ( "noteId" , noteId ) . asJson ( ) ;
logger . info ( "zeppelin task terminated, taskId: {}, noteId: {}" ,
this . taskExecutionContext . getTaskInstanceId ( ) ,