@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils ;
import org.apache.dolphinscheduler.common.utils.LoggerUtils ;
import org.apache.dolphinscheduler.common.utils.OSUtils ;
import org.apache.dolphinscheduler.common.utils.OSUtils ;
import org.apache.dolphinscheduler.common.utils.RetryerUtils ;
import org.apache.dolphinscheduler.common.utils.RetryerUtils ;
import org.apache.dolphinscheduler.dao.entity.TaskInstance ;
import org.apache.dolphinscheduler.remote.command.Command ;
import org.apache.dolphinscheduler.remote.command.Command ;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand ;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand ;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand ;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand ;
@ -41,8 +40,6 @@ import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager ;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager ;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService ;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService ;
import org.apache.dolphinscheduler.service.alert.AlertClientService ;
import org.apache.dolphinscheduler.service.alert.AlertClientService ;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext ;
import org.apache.dolphinscheduler.service.process.ProcessService ;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext ;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext ;
import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException ;
import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException ;
import org.apache.dolphinscheduler.spi.task.AbstractTask ;
import org.apache.dolphinscheduler.spi.task.AbstractTask ;
@ -113,11 +110,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
private TaskPluginManager taskPluginManager ;
private TaskPluginManager taskPluginManager ;
/ * *
* process database access
* /
protected ProcessService processService ;
/ * *
/ * *
* constructor
* constructor
*
*
@ -130,7 +122,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
this . taskExecutionContext = taskExecutionContext ;
this . taskExecutionContext = taskExecutionContext ;
this . taskCallbackService = taskCallbackService ;
this . taskCallbackService = taskCallbackService ;
this . alertClientService = alertClientService ;
this . alertClientService = alertClientService ;
this . processService = SpringApplicationContext . getBean ( ProcessService . class ) ;
}
}
public TaskExecuteThread ( TaskExecutionContext taskExecutionContext ,
public TaskExecuteThread ( TaskExecutionContext taskExecutionContext ,
@ -141,7 +132,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
this . taskCallbackService = taskCallbackService ;
this . taskCallbackService = taskCallbackService ;
this . alertClientService = alertClientService ;
this . alertClientService = alertClientService ;
this . taskPluginManager = taskPluginManager ;
this . taskPluginManager = taskPluginManager ;
this . processService = SpringApplicationContext . getBean ( ProcessService . class ) ;
}
}
@Override
@Override
@ -167,8 +157,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
}
}
logger . info ( "the task begins to execute. task instance id: {}" , taskExecutionContext . getTaskInstanceId ( ) ) ;
logger . info ( "the task begins to execute. task instance id: {}" , taskExecutionContext . getTaskInstanceId ( ) ) ;
TaskInstance taskInstance = processService . findTaskInstanceById ( taskExecutionContext . getTaskInstanceId ( ) ) ;
int dryRun = taskExecutionContext . getDryRun ( ) ;
int dryRun = taskInstance . getDryRun ( ) ;
// copy hdfs/minio file to local
// copy hdfs/minio file to local
if ( dryRun = = Constants . DRY_RUN_FLAG_NO ) {
if ( dryRun = = Constants . DRY_RUN_FLAG_NO ) {
downloadResource ( taskExecutionContext . getExecutePath ( ) ,
downloadResource ( taskExecutionContext . getExecutePath ( ) ,
@ -294,10 +283,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
if ( task ! = null ) {
if ( task ! = null ) {
try {
try {
task . cancelApplication ( true ) ;
task . cancelApplication ( true ) ;
TaskInstance taskInstance = processService . findTaskInstanceById ( taskExecutionContext . getTaskInstanceId ( ) ) ;
ProcessUtils . killYarnJob ( taskExecutionContext ) ;
if ( taskInstance ! = null ) {
ProcessUtils . killYarnJob ( taskExecutionContext ) ;
}
} catch ( Exception e ) {
} catch ( Exception e ) {
logger . error ( e . getMessage ( ) , e ) ;
logger . error ( e . getMessage ( ) , e ) ;
}
}