@ -30,7 +30,9 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.MI;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAME_LABEL ;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RESTART_POLICY ;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_INSTANCE_ID ;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.UNIQUE_LABEL_NAME ;
import org.apache.dolphinscheduler.common.thread.ThreadUtils ;
import org.apache.dolphinscheduler.common.utils.JSONUtils ;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext ;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants ;
@ -41,16 +43,23 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor ;
import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters ;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse ;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils ;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils ;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils ;
import org.apache.commons.lang3.StringUtils ;
import java.io.BufferedReader ;
import java.io.InputStreamReader ;
import java.util.ArrayList ;
import java.util.HashMap ;
import java.util.List ;
import java.util.Locale ;
import java.util.Map ;
import java.util.concurrent.CountDownLatch ;
import java.util.concurrent.ExecutionException ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.Future ;
import java.util.concurrent.TimeUnit ;
import org.slf4j.Logger ;
@ -67,6 +76,7 @@ import io.fabric8.kubernetes.api.model.batch.v1.JobStatus;
import io.fabric8.kubernetes.client.Watch ;
import io.fabric8.kubernetes.client.Watcher ;
import io.fabric8.kubernetes.client.WatcherException ;
import io.fabric8.kubernetes.client.dsl.LogWatch ;
/ * *
* K8sTaskExecutor used to submit k8s task to K8S
@ -74,6 +84,9 @@ import io.fabric8.kubernetes.client.WatcherException;
public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
private Job job ;
protected boolean podLogOutputIsFinished = false ;
protected Future < ? > podLogOutputFuture ;
public K8sTaskExecutor ( Logger logger , TaskExecutionContext taskRequest ) {
super ( logger , taskRequest ) ;
}
@ -100,6 +113,8 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
Map < String , String > labelMap = k8STaskMainParameters . getLabelMap ( ) ;
labelMap . put ( LAYER_LABEL , LAYER_LABEL_VALUE ) ;
labelMap . put ( NAME_LABEL , k8sJobName ) ;
Map < String , String > podLabelMap = new HashMap < > ( ) ;
podLabelMap . put ( UNIQUE_LABEL_NAME , taskRequest . getTaskAppId ( ) ) ;
EnvVar taskInstanceIdVar = new EnvVar ( TASK_INSTANCE_ID , taskInstanceId , null ) ;
List < EnvVar > envVars = new ArrayList < > ( ) ;
envVars . add ( taskInstanceIdVar ) ;
@ -150,6 +165,9 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
. withNewSpec ( )
. withTtlSecondsAfterFinished ( JOB_TTL_SECONDS )
. withNewTemplate ( )
. withNewMetadata ( )
. withLabels ( podLabelMap )
. endMetadata ( )
. withNewSpec ( )
. addNewContainer ( )
. withName ( k8sJobName )
@ -170,36 +188,36 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
return jobBuilder . build ( ) ;
}
public void registerBatchJobWatcher ( Job job , String taskInstanceId , TaskResponse taskResponse ,
K8sTaskMainParameters k8STaskMainParameters ) {
public void registerBatchJobWatcher ( Job job , String taskInstanceId , TaskResponse taskResponse ) {
CountDownLatch countDownLatch = new CountDownLatch ( 1 ) ;
Watcher < Job > watcher = new Watcher < Job > ( ) {
@Override
public void eventReceived ( Action action , Job job ) {
log . info ( "event received : job:{} action:{}" , job . getMetadata ( ) . getName ( ) , action ) ;
if ( action ! = Action . ADDED ) {
int jobStatus = getK8sJobStatus ( job ) ;
log . info ( "job {} status {}" , job . getMetadata ( ) . getName ( ) , jobStatus ) ;
if ( jobStatus = = TaskConstants . RUNNING_CODE ) {
return ;
try (
final LogUtils . MDCAutoClosableContext mdcAutoClosableContext =
LogUtils . setTaskInstanceLogFullPathMDC ( taskRequest . getLogPath ( ) ) ) {
log . info ( "event received : job:{} action:{}" , job . getMetadata ( ) . getName ( ) , action ) ;
if ( action ! = Action . ADDED ) {
int jobStatus = getK8sJobStatus ( job ) ;
log . info ( "job {} status {}" , job . getMetadata ( ) . getName ( ) , jobStatus ) ;
if ( jobStatus = = TaskConstants . RUNNING_CODE ) {
return ;
}
setTaskStatus ( jobStatus , taskInstanceId , taskResponse ) ;
countDownLatch . countDown ( ) ;
}
setTaskStatus ( jobStatus , taskInstanceId , taskResponse , k8STaskMainParameters ) ;
countDownLatch . countDown ( ) ;
}
}
@Override
public void onClose ( WatcherException e ) {
logStringBuffer . append ( String . format ( "[K8sJobExecutor-%s] fail in k8s: %s" , job . getMetadata ( ) . getName ( ) ,
e . getMessage ( ) ) ) ;
log . error ( "[K8sJobExecutor-{}] fail in k8s: {}" , job . getMetadata ( ) . getName ( ) , e . getMessage ( ) ) ;
taskResponse . setExitStatusCode ( EXIT_CODE_FAILURE ) ;
countDownLatch . countDown ( ) ;
}
} ;
Watch watch = null ;
try {
watch = k8sUtils . createBatchJobWatcher ( job . getMetadata ( ) . getName ( ) , watcher ) ;
try ( Watch watch = k8sUtils . createBatchJobWatcher ( job . getMetadata ( ) . getName ( ) , watcher ) ) {
boolean timeoutFlag = taskRequest . getTaskTimeoutStrategy ( ) = = TaskTimeoutStrategy . FAILED
| | taskRequest . getTaskTimeoutStrategy ( ) = = TaskTimeoutStrategy . WARNFAILED ;
if ( timeoutFlag ) {
@ -208,7 +226,6 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
} else {
countDownLatch . await ( ) ;
}
flushLog ( taskResponse ) ;
} catch ( InterruptedException e ) {
log . error ( "job failed in k8s: {}" , e . getMessage ( ) , e ) ;
Thread . currentThread ( ) . interrupt ( ) ;
@ -216,19 +233,42 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
} catch ( Exception e ) {
log . error ( "job failed in k8s: {}" , e . getMessage ( ) , e ) ;
taskResponse . setExitStatusCode ( EXIT_CODE_FAILURE ) ;
} finally {
if ( watch ! = null ) {
watch . close ( ) ;
}
}
}
private void parsePodLogOutput ( ) {
ExecutorService collectPodLogExecutorService = ThreadUtils
. newSingleDaemonScheduledExecutorService ( "CollectPodLogOutput-thread-" + taskRequest . getTaskName ( ) ) ;
String taskInstanceId = String . valueOf ( taskRequest . getTaskInstanceId ( ) ) ;
String taskName = taskRequest . getTaskName ( ) . toLowerCase ( Locale . ROOT ) ;
String containerName = String . format ( "%s-%s" , taskName , taskInstanceId ) ;
podLogOutputFuture = collectPodLogExecutorService . submit ( ( ) - > {
try (
final LogUtils . MDCAutoClosableContext mdcAutoClosableContext =
LogUtils . setTaskInstanceLogFullPathMDC ( taskRequest . getLogPath ( ) ) ;
LogWatch watcher = ProcessUtils . getPodLogWatcher ( taskRequest . getK8sTaskExecutionContext ( ) ,
taskRequest . getTaskAppId ( ) , containerName ) ) {
String line ;
try ( BufferedReader reader = new BufferedReader ( new InputStreamReader ( watcher . getOutput ( ) ) ) ) {
while ( ( line = reader . readLine ( ) ) ! = null ) {
log . info ( "[K8S-pod-log] {}" , line ) ;
}
}
} catch ( Exception e ) {
throw new RuntimeException ( e ) ;
} finally {
podLogOutputIsFinished = true ;
}
} ) ;
collectPodLogExecutorService . shutdown ( ) ;
}
@Override
public TaskResponse run ( String k8sParameterStr ) throws Exception {
TaskResponse result = new TaskResponse ( ) ;
int taskInstanceId = taskRequest . getTaskInstanceId ( ) ;
K8sTaskMainParameters k8STaskMainParameters =
JSONUtils . parseObject ( k8sParameterStr , K8sTaskMainParameters . class ) ;
try {
if ( null = = TaskExecutionContextCacheManager . getByTaskInstanceId ( taskInstanceId ) ) {
result . setExitStatusCode ( EXIT_CODE_KILL ) ;
@ -242,9 +282,20 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
String configYaml = k8sTaskExecutionContext . getConfigYaml ( ) ;
k8sUtils . buildClient ( configYaml ) ;
submitJob2k8s ( k8sParameterStr ) ;
registerBatchJobWatcher ( job , Integer . toString ( taskInstanceId ) , result , k8STaskMainParameters ) ;
parsePodLogOutput ( ) ;
registerBatchJobWatcher ( job , Integer . toString ( taskInstanceId ) , result ) ;
if ( podLogOutputFuture ! = null ) {
try {
// Wait kubernetes pod log collection finished
podLogOutputFuture . get ( ) ;
} catch ( ExecutionException e ) {
log . error ( "Handle pod log error" , e ) ;
}
}
} catch ( Exception e ) {
cancelApplication ( k8sParameterStr ) ;
Thread . currentThread ( ) . interrupt ( ) ;
result . setExitStatusCode ( EXIT_CODE_FAILURE ) ;
throw e ;
}
@ -270,9 +321,9 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
stopJobOnK8s ( k8sParameterStr ) ;
String namespaceName = k8STaskMainParameters . getNamespaceName ( ) ;
k8sUtils . createJob ( namespaceName , job ) ;
log . info ( "[K8sJobExecutor-{}-{}] submitted job successfully" , taskName , taskInstanceId ) ;
log . info ( "[K8sJobExecutor-{}-{}] submitted job successfully" , taskName , taskInstanceId ) ;
} catch ( Exception e ) {
log . error ( "[K8sJobExecutor-{}-{}] fail to submit job" , taskName , taskInstanceId ) ;
log . error ( "[K8sJobExecutor-{}-{}] fail to submit job" , taskName , taskInstanceId ) ;
throw new TaskException ( "K8sJobExecutor fail to submit job" , e ) ;
}
}
@ -288,7 +339,7 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
k8sUtils . deleteJob ( jobName , namespaceName ) ;
}
} catch ( Exception e ) {
log . error ( "[K8sJobExecutor-{}] fail to stop job" , jobName ) ;
log . error ( "[K8sJobExecutor-{}] fail to stop job" , jobName ) ;
throw new TaskException ( "K8sJobExecutor fail to stop job" , e ) ;
}
}
@ -304,21 +355,16 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
}
}
public void setTaskStatus ( int jobStatus , String taskInstanceId , TaskResponse taskResponse ,
K8sTaskMainParameters k8STaskMainParameters ) {
public void setTaskStatus ( int jobStatus , String taskInstanceId , TaskResponse taskResponse ) {
if ( jobStatus = = EXIT_CODE_SUCCESS | | jobStatus = = EXIT_CODE_FAILURE ) {
if ( null = = TaskExecutionContextCacheManager . getByTaskInstanceId ( Integer . valueOf ( taskInstanceId ) ) ) {
logStringBuffer . append ( Str ing . format ( "[K8sJobExecutor-%s ] killed" , job . getMetadata ( ) . getName ( ) ) ) ;
log . info ( "[K8sJobExecutor-{} ] killed" , job . getMetadata ( ) . getName ( ) ) ;
taskResponse . setExitStatusCode ( EXIT_CODE_KILL ) ;
} else if ( jobStatus = = EXIT_CODE_SUCCESS ) {
logStringBuffer
. append ( String . format ( "[K8sJobExecutor-%s] succeed in k8s" , job . getMetadata ( ) . getName ( ) ) ) ;
log . info ( "[K8sJobExecutor-{}] succeed in k8s" , job . getMetadata ( ) . getName ( ) ) ;
taskResponse . setExitStatusCode ( EXIT_CODE_SUCCESS ) ;
} else {
String errorMessage =
k8sUtils . getPodLog ( job . getMetadata ( ) . getName ( ) , k8STaskMainParameters . getNamespaceName ( ) ) ;
logStringBuffer . append ( String . format ( "[K8sJobExecutor-%s] fail in k8s: %s" , job . getMetadata ( ) . getName ( ) ,
errorMessage ) ) ;
log . error ( "[K8sJobExecutor-{}] fail in k8s" , job . getMetadata ( ) . getName ( ) ) ;
taskResponse . setExitStatusCode ( EXIT_CODE_FAILURE ) ;
}
}