@ -15,7 +15,7 @@
* limitations under the License .
* /
package org.apache.dolphinscheduler.plugin.task.tis ;
package org.apache.dolphinscheduler.plugin.task.pigeon ;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor ;
import org.apache.dolphinscheduler.spi.task.AbstractParameters ;
@ -48,51 +48,51 @@ import org.java_websocket.handshake.ServerHandshake;
/ * *
* TIS DataX Task
* * /
public class TIS Task extends AbstractTaskExecutor {
public class Pigeon Task extends AbstractTaskExecutor {
public static final String KEY_POOL_VAR_TIS_HOST = "tisH ost" ;
public static final String KEY_POOL_VAR_PIGEON_HOST = "p_h ost" ;
private final TaskRequest taskExecutionContext ;
private TISParameters tisP arameters;
private PigeonParameters p arameters;
private BizResult triggerResult ;
private final TISConfig tisC onfig;
private final PigeonConfig c onfig;
public TIS Task( TaskRequest taskExecutionContext ) {
public Pigeon Task( TaskRequest taskExecutionContext ) {
super ( taskExecutionContext ) ;
this . taskExecutionContext = taskExecutionContext ;
this . tisConfig = TIS Config. getInstance ( ) ;
this . config = Pigeon Config. getInstance ( ) ;
}
@Override
public void init ( ) {
super . init ( ) ;
logger . info ( "tis task params {}" , taskExecutionContext . getTaskParams ( ) ) ;
tisP arameters = JSONUtils . parseObject ( taskExecutionContext . getTaskParams ( ) , TIS Parameters. class ) ;
if ( ! tisP arameters. checkParameters ( ) ) {
logger . info ( "PIGEON task params {}" , taskExecutionContext . getTaskParams ( ) ) ;
p arameters = JSONUtils . parseObject ( taskExecutionContext . getTaskParams ( ) , Pigeon Parameters. class ) ;
if ( ! p arameters. checkParameters ( ) ) {
throw new RuntimeException ( "datax task params is not valid" ) ;
}
}
@Override
public void handle ( ) throws Exception {
// Trigger TIS DataX pipeline
logger . info ( "start execute TIS task" ) ;
// Trigger PIGEON DataX pipeline
logger . info ( "start execute PIGEON task" ) ;
long startTime = System . currentTimeMillis ( ) ;
String targetJobName = this . tisP arameters. getTargetJobName ( ) ;
String tisH ost = getTis Host ( ) ;
String targetJobName = this . p arameters. getTargetJobName ( ) ;
String h ost = getHost ( ) ;
try {
final String triggerUrl = getTriggerUrl ( ) ;
final String getStatusUrl = tisC onfig. getJobStatusUrl ( tisH ost) ;
final String getStatusUrl = c onfig. getJobStatusUrl ( h ost) ;
HttpPost post = new HttpPost ( triggerUrl ) ;
post . addHeader ( "appname" , targetJobName ) ;
addFormUrlencoded ( post ) ;
StringEntity entity = new StringEntity ( tisC onfig. getJobTriggerPostBody ( ) , StandardCharsets . UTF_8 ) ;
StringEntity entity = new StringEntity ( c onfig. getJobTriggerPostBody ( ) , StandardCharsets . UTF_8 ) ;
post . setEntity ( entity ) ;
ExecResult execState = null ;
int taskId ;
WebSocketClient webSocket = null ;
try ( CloseableHttpClient client = HttpClients . createDefault ( ) ;
// trigger to start TIS dataX task
// trigger to start PIGEON dataX task
CloseableHttpResponse response = client . execute ( post ) ) {
triggerResult = processResponse ( triggerUrl , response , BizResult . class ) ;
if ( ! triggerResult . isSuccess ( ) ) {
@ -101,11 +101,11 @@ public class TISTask extends AbstractTaskExecutor {
if ( CollectionUtils . isNotEmpty ( errormsg ) ) {
errs . append ( ",errs:" ) . append ( errormsg . stream ( ) . collect ( Collectors . joining ( "," ) ) ) ;
}
throw new Exception ( "trigger TIS job faild taskName:" + targetJobName + errs . toString ( ) ) ;
throw new Exception ( "trigger PIGEON job faild taskName:" + targetJobName + errs . toString ( ) ) ;
}
taskId = triggerResult . getBizresult ( ) . getTaskid ( ) ;
webSocket = receiveRealtimeLog ( tisH ost, targetJobName , taskId ) ;
webSocket = receiveRealtimeLog ( h ost, targetJobName , taskId ) ;
setAppIds ( String . valueOf ( taskId ) ) ;
@ -141,11 +141,11 @@ public class TISTask extends AbstractTaskExecutor {
}
long costTime = System . currentTimeMillis ( ) - startTime ;
logger . info ( "TIS task: {},taskId:{} costTime : {} milliseconds, statusCode : {}" ,
logger . info ( "PIGEON task: {},taskId:{} costTime : {} milliseconds, statusCode : {}" ,
targetJobName , taskId , costTime , ( execState = = ExecResult . SUCCESS ) ? "'success'" : "'failure'" ) ;
setExitStatusCode ( ( execState = = ExecResult . SUCCESS ) ? TaskConstants . EXIT_CODE_SUCCESS : TaskConstants . EXIT_CODE_FAILURE ) ;
} catch ( Exception e ) {
logger . error ( "execute TIS dataX faild,TIS task name:" + targetJobName , e ) ;
logger . error ( "execute PIGEON dataX faild,PIGEON task name:" + targetJobName , e ) ;
setExitStatusCode ( TaskConstants . EXIT_CODE_FAILURE ) ;
if ( e instanceof InterruptedException ) {
Thread . currentThread ( ) . interrupt ( ) ;
@ -165,7 +165,7 @@ public class TISTask extends AbstractTaskExecutor {
logger . info ( "start to cancelApplication taskId:{}" , triggerResult . getTaskId ( ) ) ;
final String triggerUrl = getTriggerUrl ( ) ;
StringEntity entity = new StringEntity ( tisC onfig. getJobCancelPostBody ( triggerResult . getTaskId ( ) ) , StandardCharsets . UTF_8 ) ;
StringEntity entity = new StringEntity ( c onfig. getJobCancelPostBody ( triggerResult . getTaskId ( ) ) , StandardCharsets . UTF_8 ) ;
CancelResult cancelResult = null ;
HttpPost post = new HttpPost ( triggerUrl ) ;
@ -181,26 +181,26 @@ public class TISTask extends AbstractTaskExecutor {
if ( org . apache . dolphinscheduler . spi . utils . CollectionUtils . isNotEmpty ( errormsg ) ) {
errs . append ( ",errs:" ) . append ( errormsg . stream ( ) . collect ( Collectors . joining ( "," ) ) ) ;
}
throw new Exception ( "cancel TIS job faild taskId:" + triggerResult . getTaskId ( ) + errs . toString ( ) ) ;
throw new Exception ( "cancel PIGEON job faild taskId:" + triggerResult . getTaskId ( ) + errs . toString ( ) ) ;
}
}
}
private String getTriggerUrl ( ) {
final String tisHost = getTis Host ( ) ;
return tisC onfig. getJobTriggerUrl ( tisHost ) ;
final String tisHost = getHost ( ) ;
return c onfig. getJobTriggerUrl ( tisHost ) ;
}
private String getTis Host ( ) {
final String tisH ost = taskExecutionContext . getDefinedParams ( ) . get ( KEY_POOL_VAR_TIS _HOST ) ;
if ( StringUtils . isEmpty ( tisH ost) ) {
throw new IllegalStateException ( "global var '" + KEY_POOL_VAR_TIS _HOST + "' can not be empty" ) ;
private String getHost ( ) {
final String h ost = taskExecutionContext . getDefinedParams ( ) . get ( KEY_POOL_VAR_PIGEON _HOST ) ;
if ( StringUtils . isEmpty ( h ost) ) {
throw new IllegalStateException ( "global var '" + KEY_POOL_VAR_PIGEON _HOST + "' can not be empty" ) ;
}
return tisH ost;
return h ost;
}
private WebSocketClient receiveRealtimeLog ( final String tisHost , String dataXName , int taskId ) throws Exception {
final String applyURI = tisC onfig. getJobLogsFetchUrl ( tisHost , dataXName , taskId ) ;
final String applyURI = c onfig. getJobLogsFetchUrl ( tisHost , dataXName , taskId ) ;
logger . info ( "apply ws connection,uri:{}" , applyURI ) ;
WebSocketClient webSocketClient = new WebSocketClient ( new URI ( applyURI ) ) {
@Override
@ -241,8 +241,8 @@ public class TISTask extends AbstractTaskExecutor {
@Override
public AbstractParameters getParameters ( ) {
Objects . requireNonNull ( this . tisP arameters, "tisParameters can not be null" ) ;
return this . tisP arameters;
Objects . requireNonNull ( this . p arameters, "tisParameters can not be null" ) ;
return this . p arameters;
}
private static class CancelResult extends AjaxResult < Object > {