@ -16,6 +16,8 @@
* /
* /
package org.apache.dolphinscheduler.server.worker.task ;
package org.apache.dolphinscheduler.server.worker.task ;
import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.enums.CommandType ;
import org.apache.dolphinscheduler.common.enums.CommandType ;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus ;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus ;
@ -23,27 +25,17 @@ import org.apache.dolphinscheduler.common.enums.TaskRecordStatus;
import org.apache.dolphinscheduler.common.enums.TaskType ;
import org.apache.dolphinscheduler.common.enums.TaskType ;
import org.apache.dolphinscheduler.common.process.Property ;
import org.apache.dolphinscheduler.common.process.Property ;
import org.apache.dolphinscheduler.common.task.AbstractParameters ;
import org.apache.dolphinscheduler.common.task.AbstractParameters ;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters ;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils ;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters ;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters ;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters ;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters ;
import org.apache.dolphinscheduler.common.task.python.PythonParameters ;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters ;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters ;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters ;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters ;
import org.apache.dolphinscheduler.common.utils.JSONUtils ;
import org.apache.dolphinscheduler.dao.TaskRecordDao ;
import org.apache.dolphinscheduler.dao.TaskRecordDao ;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext ;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext ;
import org.apache.dolphinscheduler.server.utils.ParamUtils ;
import org.apache.dolphinscheduler.server.utils.ParamUtils ;
import org.apache.commons.lang.StringUtils ;
import org.apache.commons.lang.StringUtils ;
import org.slf4j.Logger ;
import java.util.List ;
import java.util.List ;
import java.util.Map ;
import java.util.Map ;
import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER ;
import org.slf4j.Logger ;
/ * *
/ * *
* executive task
* executive task
@ -61,13 +53,13 @@ public abstract class AbstractTask {
TaskExecutionContext taskExecutionContext ;
TaskExecutionContext taskExecutionContext ;
/ * *
/ * *
* log record
* log record
* /
* /
protected Logger logger ;
protected Logger logger ;
/ * *
/ * *
* SHELL process pid
* SHELL process pid
* /
* /
protected int processId ;
protected int processId ;
@ -83,14 +75,15 @@ public abstract class AbstractTask {
protected volatile boolean cancel = false ;
protected volatile boolean cancel = false ;
/ * *
/ * *
* exit code
* exit code
* /
* /
protected volatile int exitStatusCode = - 1 ;
protected volatile int exitStatusCode = - 1 ;
/ * *
/ * *
* constructor
* constructor
*
* @param taskExecutionContext taskExecutionContext
* @param taskExecutionContext taskExecutionContext
* @param logger logger
* @param logger logger
* /
* /
protected AbstractTask ( TaskExecutionContext taskExecutionContext , Logger logger ) {
protected AbstractTask ( TaskExecutionContext taskExecutionContext , Logger logger ) {
this . taskExecutionContext = taskExecutionContext ;
this . taskExecutionContext = taskExecutionContext ;
@ -99,6 +92,7 @@ public abstract class AbstractTask {
/ * *
/ * *
* init task
* init task
*
* @throws Exception exception
* @throws Exception exception
* /
* /
public void init ( ) throws Exception {
public void init ( ) throws Exception {
@ -106,6 +100,7 @@ public abstract class AbstractTask {
/ * *
/ * *
* task handle
* task handle
*
* @throws Exception exception
* @throws Exception exception
* /
* /
public abstract void handle ( ) throws Exception ;
public abstract void handle ( ) throws Exception ;
@ -113,6 +108,7 @@ public abstract class AbstractTask {
/ * *
/ * *
* cancel application
* cancel application
*
* @param status status
* @param status status
* @throws Exception exception
* @throws Exception exception
* /
* /
@ -122,6 +118,7 @@ public abstract class AbstractTask {
/ * *
/ * *
* log handle
* log handle
*
* @param logs log list
* @param logs log list
* /
* /
public void logHandle ( List < String > logs ) {
public void logHandle ( List < String > logs ) {
@ -136,13 +133,15 @@ public abstract class AbstractTask {
public void setVarPool ( String varPool ) {
public void setVarPool ( String varPool ) {
this . varPool = varPool ;
this . varPool = varPool ;
}
}
public String getVarPool ( ) {
public String getVarPool ( ) {
return varPool ;
return varPool ;
}
}
/ * *
/ * *
* get exit status code
* get exit status code
* @return exit status code
*
* @return exit status code
* /
* /
public int getExitStatusCode ( ) {
public int getExitStatusCode ( ) {
return exitStatusCode ;
return exitStatusCode ;
@ -170,21 +169,21 @@ public abstract class AbstractTask {
/ * *
/ * *
* get task parameters
* get task parameters
*
* @return AbstractParameters
* @return AbstractParameters
* /
* /
public abstract AbstractParameters getParameters ( ) ;
public abstract AbstractParameters getParameters ( ) ;
/ * *
/ * *
* result processing
* result processing
* /
* /
public void after ( ) {
public void after ( ) {
if ( getExitStatusCode ( ) = = Constants . EXIT_CODE_SUCCESS ) {
if ( getExitStatusCode ( ) = = Constants . EXIT_CODE_SUCCESS ) {
// task recor flat : if true , start up qianfan
// task recor flat : if true , start up qianfan
if ( TaskRecordDao . getTaskRecordFlag ( )
if ( TaskRecordDao . getTaskRecordFlag ( )
& & TaskType . typeIsNormalTask ( taskExecutionContext . getTaskType ( ) ) ) {
& & TaskType . typeIsNormalTask ( taskExecutionContext . getTaskType ( ) ) ) {
AbstractParameters params = ( AbstractParameters ) JSONUtils . parseObject ( taskExecutionContext . getTaskParams ( ) , getCurTaskParamsClas s( ) ) ;
AbstractParameters params = TaskParametersUtils . getParameters ( taskExecutionContext . getTaskType ( ) , taskExecutionContext . getTaskParam s( ) ) ;
// replace placeholder
// replace placeholder
Map < String , Property > paramsMap = ParamUtils . convert ( ParamUtils . getUserDefParamsMap ( taskExecutionContext . getDefinedParams ( ) ) ,
Map < String , Property > paramsMap = ParamUtils . convert ( ParamUtils . getUserDefParamsMap ( taskExecutionContext . getDefinedParams ( ) ) ,
@ -193,81 +192,34 @@ public abstract class AbstractTask {
CommandType . of ( taskExecutionContext . getCmdTypeIfComplement ( ) ) ,
CommandType . of ( taskExecutionContext . getCmdTypeIfComplement ( ) ) ,
taskExecutionContext . getScheduleTime ( ) ) ;
taskExecutionContext . getScheduleTime ( ) ) ;
if ( paramsMap ! = null & & ! paramsMap . isEmpty ( )
if ( paramsMap ! = null & & ! paramsMap . isEmpty ( )
& & paramsMap . containsKey ( "v_proc_date" ) ) {
& & paramsMap . containsKey ( "v_proc_date" ) ) {
String vProcDate = paramsMap . get ( "v_proc_date" ) . getValue ( ) ;
String vProcDate = paramsMap . get ( "v_proc_date" ) . getValue ( ) ;
if ( ! StringUtils . isEmpty ( vProcDate ) ) {
if ( ! StringUtils . isEmpty ( vProcDate ) ) {
TaskRecordStatus taskRecordState = TaskRecordDao . getTaskRecordState ( taskExecutionContext . getTaskName ( ) , vProcDate ) ;
TaskRecordStatus taskRecordState = TaskRecordDao . getTaskRecordState ( taskExecutionContext . getTaskName ( ) , vProcDate ) ;
logger . info ( "task record status : {}" , taskRecordState ) ;
logger . info ( "task record status : {}" , taskRecordState ) ;
if ( taskRecordState = = TaskRecordStatus . FAILURE ) {
if ( taskRecordState = = TaskRecordStatus . FAILURE ) {
setExitStatusCode ( Constants . EXIT_CODE_FAILURE ) ;
setExitStatusCode ( Constants . EXIT_CODE_FAILURE ) ;
}
}
}
}
}
}
}
}
} else if ( getExitStatusCode ( ) = = Constants . EXIT_CODE_KILL ) {
} else if ( getExitStatusCode ( ) = = Constants . EXIT_CODE_KILL ) {
setExitStatusCode ( Constants . EXIT_CODE_KILL ) ;
setExitStatusCode ( Constants . EXIT_CODE_KILL ) ;
} else {
} else {
setExitStatusCode ( Constants . EXIT_CODE_FAILURE ) ;
setExitStatusCode ( Constants . EXIT_CODE_FAILURE ) ;
}
}
}
}
/ * *
* get current task parameter class
* @return Task Params Class
* /
private Class getCurTaskParamsClass ( ) {
Class paramsClass = null ;
// get task type
TaskType taskType = TaskType . valueOf ( taskExecutionContext . getTaskType ( ) ) ;
switch ( taskType ) {
case SHELL :
paramsClass = ShellParameters . class ;
break ;
case SQL :
paramsClass = SqlParameters . class ;
break ;
case PROCEDURE :
paramsClass = ProcedureParameters . class ;
break ;
case MR :
paramsClass = MapreduceParameters . class ;
break ;
case SPARK :
paramsClass = SparkParameters . class ;
break ;
case FLINK :
paramsClass = FlinkParameters . class ;
break ;
case PYTHON :
paramsClass = PythonParameters . class ;
break ;
case DATAX :
paramsClass = DataxParameters . class ;
break ;
case SQOOP :
paramsClass = SqoopParameters . class ;
break ;
case CONDITIONS :
paramsClass = ConditionsParameters . class ;
break ;
default :
logger . error ( "not support this task type: {}" , taskType ) ;
throw new IllegalArgumentException ( "not support this task type" ) ;
}
return paramsClass ;
}
/ * *
/ * *
* get exit status according to exitCode
* get exit status according to exitCode
*
* @return exit status
* @return exit status
* /
* /
public ExecutionStatus getExitStatus ( ) {
public ExecutionStatus getExitStatus ( ) {
ExecutionStatus status ;
ExecutionStatus status ;
switch ( getExitStatusCode ( ) ) {
switch ( getExitStatusCode ( ) ) {
case Constants . EXIT_CODE_SUCCESS :
case Constants . EXIT_CODE_SUCCESS :
status = ExecutionStatus . SUCCESS ;
status = ExecutionStatus . SUCCESS ;
break ;
break ;