@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.plugin.task.api ;
package org.apache.dolphinscheduler.plugin.task.api ;
import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING ;
import static org.apache.dolphinscheduler.common.constants.Constants.SLEEP_TIME_MILLIS ;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE ;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE ;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL ;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL ;
@ -40,6 +42,7 @@ import java.io.InputStreamReader;
import java.lang.reflect.Field ;
import java.lang.reflect.Field ;
import java.util.LinkedList ;
import java.util.LinkedList ;
import java.util.List ;
import java.util.List ;
import java.util.concurrent.CompletableFuture ;
import java.util.concurrent.ExecutionException ;
import java.util.concurrent.ExecutionException ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.Executors ;
import java.util.concurrent.Executors ;
@ -54,6 +57,7 @@ import java.util.regex.Pattern;
import org.slf4j.Logger ;
import org.slf4j.Logger ;
import com.google.common.util.concurrent.ThreadFactoryBuilder ;
import com.google.common.util.concurrent.ThreadFactoryBuilder ;
import io.fabric8.kubernetes.client.dsl.LogWatch ;
/ * *
/ * *
* abstract command executor
* abstract command executor
@ -86,7 +90,9 @@ public abstract class AbstractCommandExecutor {
* /
* /
protected LinkedBlockingQueue < String > logBuffer ;
protected LinkedBlockingQueue < String > logBuffer ;
protected boolean logOutputIsSuccess = false ;
protected boolean processLogOutputIsSuccess = false ;
protected boolean podLogOutputIsFinished = false ;
/ *
/ *
* SHELL result string
* SHELL result string
@ -100,6 +106,8 @@ public abstract class AbstractCommandExecutor {
protected Future < ? > taskOutputFuture ;
protected Future < ? > taskOutputFuture ;
protected Future < ? > podLogOutputFuture ;
public AbstractCommandExecutor ( Consumer < LinkedBlockingQueue < String > > logHandler ,
public AbstractCommandExecutor ( Consumer < LinkedBlockingQueue < String > > logHandler ,
TaskExecutionContext taskRequest ,
TaskExecutionContext taskRequest ,
Logger logger ) {
Logger logger ) {
@ -107,6 +115,7 @@ public abstract class AbstractCommandExecutor {
this . taskRequest = taskRequest ;
this . taskRequest = taskRequest ;
this . logger = logger ;
this . logger = logger ;
this . logBuffer = new LinkedBlockingQueue < > ( ) ;
this . logBuffer = new LinkedBlockingQueue < > ( ) ;
this . logBuffer . add ( EMPTY_STRING ) ;
if ( this . taskRequest ! = null ) {
if ( this . taskRequest ! = null ) {
// set logBufferEnable=true if the task uses logHandler and logBuffer to buffer log messages
// set logBufferEnable=true if the task uses logHandler and logBuffer to buffer log messages
@ -215,6 +224,9 @@ public abstract class AbstractCommandExecutor {
// parse process output
// parse process output
parseProcessOutput ( process ) ;
parseProcessOutput ( process ) ;
// collect pod log
collectPodLogIfNeeded ( ) ;
int processId = getProcessId ( process ) ;
int processId = getProcessId ( process ) ;
result . setProcessId ( processId ) ;
result . setProcessId ( processId ) ;
@ -251,6 +263,17 @@ public abstract class AbstractCommandExecutor {
}
}
}
}
if ( podLogOutputFuture ! = null ) {
try {
// Wait kubernetes pod log collection finished
podLogOutputFuture . get ( ) ;
// delete pod after successful execution and log collection
ProcessUtils . cancelApplication ( taskRequest ) ;
} catch ( ExecutionException e ) {
logger . error ( "Handle pod log error" , e ) ;
}
}
TaskExecutionStatus kubernetesStatus =
TaskExecutionStatus kubernetesStatus =
ProcessUtils . getApplicationStatus ( taskRequest . getK8sTaskExecutionContext ( ) , taskRequest . getTaskAppId ( ) ) ;
ProcessUtils . getApplicationStatus ( taskRequest . getK8sTaskExecutionContext ( ) , taskRequest . getTaskAppId ( ) ) ;
@ -296,6 +319,45 @@ public abstract class AbstractCommandExecutor {
logger . info ( "task run command: {}" , String . join ( " " , commands ) ) ;
logger . info ( "task run command: {}" , String . join ( " " , commands ) ) ;
}
}
private void collectPodLogIfNeeded ( ) {
if ( null = = taskRequest . getK8sTaskExecutionContext ( ) ) {
podLogOutputIsFinished = true ;
return ;
}
// wait for launching (driver) pod
ThreadUtils . sleep ( SLEEP_TIME_MILLIS * 5L ) ;
LogWatch watcher =
ProcessUtils . getPodLogWatcher ( taskRequest . getK8sTaskExecutionContext ( ) , taskRequest . getTaskAppId ( ) ) ;
if ( watcher ! = null ) {
ExecutorService collectPodLogExecutorService = ThreadUtils
. newSingleDaemonScheduledExecutorService ( "CollectPodLogOutput-thread-" + taskRequest . getTaskName ( ) ) ;
podLogOutputFuture = collectPodLogExecutorService . submit ( ( ) - > {
try {
String line ;
try ( BufferedReader reader = new BufferedReader ( new InputStreamReader ( watcher . getOutput ( ) ) ) ) {
while ( ( line = reader . readLine ( ) ) ! = null ) {
logBuffer . add ( String . format ( "[K8S-pod-log-%s]: %s" , taskRequest . getTaskName ( ) , line ) ) ;
}
}
} catch ( Exception e ) {
throw new RuntimeException ( e ) ;
} finally {
watcher . close ( ) ;
podLogOutputIsFinished = true ;
}
} ) ;
collectPodLogExecutorService . shutdown ( ) ;
} else {
CompletableFuture < String > exceptionalFuture = new CompletableFuture < > ( ) ;
exceptionalFuture . completeExceptionally ( new RuntimeException ( "The driver pod does not exist." ) ) ;
podLogOutputFuture = exceptionalFuture ;
podLogOutputIsFinished = true ;
}
}
private void parseProcessOutput ( Process process ) {
private void parseProcessOutput ( Process process ) {
// todo: remove this this thread pool.
// todo: remove this this thread pool.
ExecutorService getOutputLogService = ThreadUtils
ExecutorService getOutputLogService = ThreadUtils
@ -315,10 +377,10 @@ public abstract class AbstractCommandExecutor {
taskResultString = line ;
taskResultString = line ;
}
}
}
}
l ogOutputIsSuccess = true ;
processL ogOutputIsSuccess = true ;
} catch ( Exception e ) {
} catch ( Exception e ) {
logger . error ( "Parse var pool error" , e ) ;
logger . error ( "Parse var pool error" , e ) ;
l ogOutputIsSuccess = true ;
processL ogOutputIsSuccess = true ;
}
}
} ) ;
} ) ;
@ -330,10 +392,11 @@ public abstract class AbstractCommandExecutor {
try (
try (
final LogUtils . MDCAutoClosableContext mdcAutoClosableContext =
final LogUtils . MDCAutoClosableContext mdcAutoClosableContext =
LogUtils . setTaskInstanceLogFullPathMDC ( taskRequest . getLogPath ( ) ) ; ) {
LogUtils . setTaskInstanceLogFullPathMDC ( taskRequest . getLogPath ( ) ) ; ) {
while ( ! logBuffer . isEmpty ( ) | | ! logOutputIsSuccess ) {
while ( logBuffer . size ( ) > 1 | | ! processLogOutputIsSuccess | | ! podLogOutputIsFinished ) {
if ( ! logBuffer . isEmpty ( ) ) {
if ( logBuffer . size ( ) > 1 ) {
logHandler . accept ( logBuffer ) ;
logHandler . accept ( logBuffer ) ;
logBuffer . clear ( ) ;
logBuffer . clear ( ) ;
logBuffer . add ( EMPTY_STRING ) ;
} else {
} else {
Thread . sleep ( TaskConstants . DEFAULT_LOG_FLUSH_INTERVAL ) ;
Thread . sleep ( TaskConstants . DEFAULT_LOG_FLUSH_INTERVAL ) ;
}
}