@ -16,54 +16,61 @@
* /
package org.apache.dolphinscheduler.server.utils ;
import java.nio.charset.StandardCharsets ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus ;
import org.apache.dolphinscheduler.common.utils.CollectionUtils ;
import org.apache.dolphinscheduler.common.utils.CommonUtils ;
import org.apache.dolphinscheduler.common.utils.FileUtils ;
import org.apache.dolphinscheduler.common.utils.HadoopUtils ;
import org.apache.dolphinscheduler.common.utils.LoggerUtils ;
import org.apache.dolphinscheduler.common.utils.OSUtils ;
import org.apache.dolphinscheduler.common.utils.StringUtils ;
import org.apache.commons.io.FileUtils ;
import org.apache.dolphinscheduler.remote.utils.Host ;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext ;
import org.apache.dolphinscheduler.service.log.LogClientService ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import java.io.File ;
import java.io.IOException ;
import java.nio.charset.StandardCharsets ;
import java.util.ArrayList ;
import java.util.List ;
import java.util.regex.Matcher ;
import java.util.regex.Pattern ;
/ * *
* mainly used to get the start command line of a process
* mainly used to get the start command line of a process .
* /
public class ProcessUtils {
/ * *
* logger
* /
private final static Logger logger = LoggerFactory . getLogger ( ProcessUtils . class ) ;
private static final Logger logger = LoggerFactory . getLogger ( ProcessUtils . class ) ;
/ * *
* build command line characters
* Initialization regularization , solve the problem of pre - compilation performance ,
* avoid the thread safety problem of multi - thread operation .
* /
private static final Pattern MACPATTERN = Pattern . compile ( "-[+|-]-\\s(\\d+)" ) ;
private static final Pattern LINUXPATTERN = Pattern . compile ( "(\\d+)" ) ;
private static final String LOCAL_PROCESS_EXEC = "jdk.lang.Process.allowAmbiguousCommands" ;
/ * *
* build command line characters .
*
* @param commandList command list
* @return command
* @throws IOException io exception
* /
public static String buildCommandStr ( List < String > commandList ) throws IOException {
String cmdstr ;
String [ ] cmd = commandList . toArray ( new String [ commandList . size ( ) ] ) ;
String [ ] cmd = commandList . toArray ( new String [ 0 ] ) ;
SecurityManager security = System . getSecurityManager ( ) ;
boolean allowAmbiguousCommands = false ;
if ( security = = null ) {
allowAmbiguousCommands = true ;
String value = System . getProperty ( "jdk.lang.Process.allowAmbiguousCommands" ) ;
if ( value ! = null ) {
allowAmbiguousCommands = ! "false" . equalsIgnoreCase ( value ) ;
}
}
boolean allowAmbiguousCommands = isAllowAmbiguousCommands ( security ) ;
if ( allowAmbiguousCommands ) {
String executablePath = new File ( cmd [ 0 ] ) . getPath ( ) ;
@ -94,7 +101,6 @@ public class ProcessUtils {
}
}
cmdstr = createCommandLine (
isShellFile ( executablePath ) ? VERIFICATION_CMD_BAT : VERIFICATION_WIN32 , quoteString ( executablePath ) , cmd ) ;
@ -103,7 +109,25 @@ public class ProcessUtils {
}
/ * *
* get executable path
* check is allow ambiguous commands
*
* @param security security manager
* @return allow ambiguous command flag
* /
private static boolean isAllowAmbiguousCommands ( SecurityManager security ) {
boolean allowAmbiguousCommands = false ;
if ( security = = null ) {
allowAmbiguousCommands = true ;
String value = System . getProperty ( LOCAL_PROCESS_EXEC ) ;
if ( value ! = null ) {
allowAmbiguousCommands = ! Constants . STRING_FALSE . equalsIgnoreCase ( value ) ;
}
}
return allowAmbiguousCommands ;
}
/ * *
* get executable path .
*
* @param path path
* @return executable path
@ -117,7 +141,7 @@ public class ProcessUtils {
}
/ * *
* whether is shell file
* whether is shell file .
*
* @param executablePath executable path
* @return true if endsWith . CMD or . BAT
@ -139,7 +163,7 @@ public class ProcessUtils {
}
/ * *
* get tokens from command
* get tokens from command .
*
* @param command command
* @return token string array
@ -150,15 +174,17 @@ public class ProcessUtils {
while ( regexMatcher . find ( ) ) {
matchList . add ( regexMatcher . group ( ) ) ;
}
return matchList . toArray ( new String [ matchList . size ( ) ] ) ;
return matchList . toArray ( new String [ 0 ] ) ;
}
/ * *
* Lazy Pattern
* /
private static class LazyPattern {
// Escape-support version:
// "(\")((?:\\\\\\1|.)+?)\\1|([^\\s\"]+)";
/ * *
* Escape - support version :
* "(\")((?:\\\\\\1|.)+?)\\1|([^\\s\"]+)" ;
* /
private static final Pattern PATTERN = Pattern . compile ( "[^\\s\"]+|\"[^\"]*\"" ) ;
}
@ -184,13 +210,9 @@ public class ProcessUtils {
{ ' ' , '\t' , '<' , '>' } , { ' ' , '\t' } } ;
/ * *
* matcher
* /
private static Matcher matcher ;
/ * *
* create command line
*
* @param verificationType verification type
* @param executablePath executable path
* @param cmd cmd
@ -220,29 +242,26 @@ public class ProcessUtils {
/ * *
* whether is quoted
* @param noQuotesInside
* @param arg
* @param errorMessage
*
* @param noQuotesInside no quotes inside
* @param arg arg
* @param errorMessage error message
* @return boolean
* /
private static boolean isQuoted ( boolean noQuotesInside , String arg , String errorMessage ) {
int lastPos = arg . length ( ) - 1 ;
if ( lastPos > = 1 & & arg . charAt ( 0 ) = = '"' & & arg . charAt ( lastPos ) = = '"' ) {
// The argument has already been quoted.
if ( noQuotesInside ) {
if ( arg . indexOf ( '"' , 1 ) ! = lastPos ) {
if ( noQuotesInside & & arg . indexOf ( '"' , 1 ) ! = lastPos ) {
// There is ["] inside.
throw new IllegalArgumentException ( errorMessage ) ;
}
}
return true ;
}
if ( noQuotesInside ) {
if ( arg . indexOf ( '"' ) > = 0 ) {
if ( noQuotesInside & & arg . indexOf ( '"' ) > = 0 ) {
// There is ["] inside.
throw new IllegalArgumentException ( errorMessage ) ;
}
}
return false ;
}
@ -259,8 +278,8 @@ public class ProcessUtils {
if ( ! argIsQuoted ) {
char [ ] testEscape = ESCAPE_VERIFICATION [ verificationType ] ;
for ( int i = 0 ; i < testEscape . length ; + + i ) {
if ( arg . indexOf ( testEs cape [ i ] ) > = 0 ) {
for ( char c : testEscape ) {
if ( arg . indexOf ( c ) > = 0 ) {
return true ;
}
}
@ -275,22 +294,44 @@ public class ProcessUtils {
* @param logger logger
* @param tenantCode tenant code
* @param executePath execute path
* @throws IOException io exception
* /
public static void cancelApplication ( List < String > appIds , Logger logger , String tenantCode , String executePath )
throws IOException {
if ( appIds . size ( ) > 0 ) {
String appid = appIds . get ( appIds . size ( ) - 1 ) ;
public static void cancelApplication ( List < String > appIds , Logger logger , String tenantCode , String executePath ) {
if ( CollectionUtils . isNotEmpty ( appIds ) ) {
for ( String appId : appIds ) {
try {
ExecutionStatus applicationStatus = HadoopUtils . getInstance ( ) . getApplicationStatus ( appId ) ;
if ( ! applicationStatus . typeIsFinished ( ) ) {
String commandFile = String
. format ( "%s/%s.kill" , executePath , appid ) ;
String cmd = "yarn application -kill " + appid ;
. format ( "%s/%s.kill" , executePath , appId ) ;
String cmd = "yarn application -kill " + appId ;
execYarnKillCommand ( logger , tenantCode , appId , commandFile , cmd ) ;
}
} catch ( Exception e ) {
logger . error ( String . format ( "Get yarn application app id [%s] status failed: [%s]" , appId , e . getMessage ( ) ) ) ;
}
}
}
}
/ * *
* build kill command for yarn application
*
* @param logger logger
* @param tenantCode tenant code
* @param appId app id
* @param commandFile command file
* @param cmd cmd
* /
private static void execYarnKillCommand ( Logger logger , String tenantCode , String appId , String commandFile , String cmd ) {
try {
StringBuilder sb = new StringBuilder ( ) ;
sb . append ( "#!/bin/sh\n" ) ;
sb . append ( "BASEDIR=$(cd `dirname $0`; pwd)\n" ) ;
sb . append ( "cd $BASEDIR\n" ) ;
if ( CommonUtils . getSystemEnvPath ( ) ! = null ) {
sb . append ( "source " + CommonUtils . getSystemEnvPath ( ) + "\n" ) ;
sb . append ( "source " ) . append ( CommonUtils . getSystemEnvPath ( ) ) . append ( "\n" ) ;
}
sb . append ( "\n\n" ) ;
sb . append ( cmd ) ;
@ -301,17 +342,15 @@ public class ProcessUtils {
FileUtils . writeStringToFile ( new File ( commandFile ) , sb . toString ( ) , StandardCharsets . UTF_8 ) ;
}
String runCmd = "sh " + commandFile ;
String runCmd = String . format ( "%s %s" , Constants . SH , commandFile ) ;
if ( StringUtils . isNotEmpty ( tenantCode ) ) {
runCmd = "sudo -u " + tenantCode + " " + runCmd ;
}
logger . info ( "kill cmd:{}" , runCmd ) ;
Runtime . getRuntime ( ) . exec ( runCmd ) ;
OSUtils . exeCmd ( runCmd ) ;
} catch ( Exception e ) {
logger . error ( "kill application error" , e ) ;
}
logger . error ( String . format ( "Kill yarn application app id [%s] failed: [%s]" , appId , e . getMessage ( ) ) ) ;
}
}
@ -323,10 +362,10 @@ public class ProcessUtils {
public static void kill ( TaskExecutionContext taskExecutionContext ) {
try {
int processId = taskExecutionContext . getProcessId ( ) ;
if ( processId = = 0 ) {
if ( processId = = 0 ) {
logger . error ( "process kill failed, process id :{}, task id:{}" ,
processId , taskExecutionContext . getTaskInstanceId ( ) ) ;
return ;
return ;
}
String cmd = String . format ( "sudo kill -9 %s" , getPidsStr ( processId ) ) ;
@ -347,24 +386,29 @@ public class ProcessUtils {
* get pids str
*
* @param processId process id
* @return pids
* @return pids pid String
* @throws Exception exception
* /
public static String getPidsStr ( int processId ) throws Exception {
public static String getPidsStr ( int processId ) throws Exception {
StringBuilder sb = new StringBuilder ( ) ;
Matcher mat ;
Matcher mat = null ;
// pstree pid get sub pids
if ( OSUtils . isMacOS ( ) ) {
String pids = OSUtils . exeCmd ( "pstree -sp " + processId ) ;
mat = Pattern . compile ( "-[+|-]-\\s(\\d+)" ) . matcher ( pids ) ;
String pids = OSUtils . exeCmd ( String . format ( "%s -sp %d" , Constants . PSTREE , processId ) ) ;
if ( null ! = pids ) {
mat = MACPATTERN . matcher ( pids ) ;
}
} else {
String pids = OSUtils . exeCmd ( "pstree -p " + processId ) ;
mat = Pattern . compile ( "(\\d+)" ) . matcher ( pids ) ;
String pids = OSUtils . exeCmd ( String . format ( "%s -p %d" , Constants . PSTREE , processId ) ) ;
mat = LINUXPATTERN . matcher ( pids ) ;
}
while ( mat . find ( ) ) {
if ( null ! = mat ) {
while ( mat . find ( ) ) {
sb . append ( mat . group ( 1 ) ) . append ( " " ) ;
}
}
return sb . toString ( ) . trim ( ) ;
}
@ -377,14 +421,14 @@ public class ProcessUtils {
try {
Thread . sleep ( Constants . SLEEP_TIME_MILLIS ) ;
LogClientService logClient = null ;
String log = null ;
String log ;
try {
logClient = new LogClientService ( ) ;
log = logClient . viewLog ( Host . of ( taskExecutionContext . getHost ( ) ) . getIp ( ) ,
Constants . RPC_PORT ,
taskExecutionContext . getLogPath ( ) ) ;
} finally {
if ( logClient ! = null ) {
if ( logClient ! = null ) {
logClient . close ( ) ;
}
}
@ -395,13 +439,13 @@ public class ProcessUtils {
logger . error ( "task instance work dir is empty" ) ;
throw new RuntimeException ( "task instance work dir is empty" ) ;
}
if ( appIds . size ( ) > 0 ) {
if ( CollectionUtils . isNotEmpty ( appIds ) ) {
cancelApplication ( appIds , logger , taskExecutionContext . getTenantCode ( ) , taskExecutionContext . getExecutePath ( ) ) ;
}
}
} catch ( Exception e ) {
logger . error ( "kill yarn job failure" , e ) ;
logger . error ( "kill yarn job failure" , e ) ;
}
}
}