Browse Source

Merge pull request #4042 from Eights-Li/dev-kill-yarn-app

[FIX-3900][server] kill multi yarn app in one job
pull/3/MERGE
dailidong 4 years ago committed by GitHub
parent
commit
368d3e8968
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 125
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
  3. 108
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java

15
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -523,10 +523,9 @@ public final class Constants {
public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 10; public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 10;
/** /**
* hadoop params constant * hadoop params
*/
/**
* jar * jar
*/ */
public static final String JAR = "jar"; public static final String JAR = "jar";
@ -1009,4 +1008,14 @@ public final class Constants {
* Network IP gets priority, default inner outer * Network IP gets priority, default inner outer
*/ */
public static final String NETWORK_PRIORITY_STRATEGY = "dolphin.scheduler.network.priority.strategy"; public static final String NETWORK_PRIORITY_STRATEGY = "dolphin.scheduler.network.priority.strategy";
/**
* exec shell scripts
*/
public static final String SH = "sh";
/**
* pstree, get pud and sub pid
*/
public static final String PSTREE = "pstree";
} }

125
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java

@ -18,8 +18,11 @@
package org.apache.dolphinscheduler.server.utils; package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
@ -54,23 +57,19 @@ public class ProcessUtils {
private static final Pattern WINDOWSATTERN = Pattern.compile("(\\d+)"); private static final Pattern WINDOWSATTERN = Pattern.compile("(\\d+)");
private static final String LOCAL_PROCESS_EXEC = "jdk.lang.Process.allowAmbiguousCommands";
/** /**
* build command line characters. * build command line characters.
*
* @param commandList command list * @param commandList command list
* @return command * @return command
*/ */
public static String buildCommandStr(List<String> commandList) { public static String buildCommandStr(List<String> commandList) {
String cmdstr; String cmdstr;
String[] cmd = commandList.toArray(new String[commandList.size()]); String[] cmd = commandList.toArray(new String[0]);
SecurityManager security = System.getSecurityManager(); SecurityManager security = System.getSecurityManager();
boolean allowAmbiguousCommands = false; boolean allowAmbiguousCommands = isAllowAmbiguousCommands(security);
if (security == null) {
allowAmbiguousCommands = true;
String value = System.getProperty("jdk.lang.Process.allowAmbiguousCommands");
if (value != null) {
allowAmbiguousCommands = !"false".equalsIgnoreCase(value);
}
}
if (allowAmbiguousCommands) { if (allowAmbiguousCommands) {
String executablePath = new File(cmd[0]).getPath(); String executablePath = new File(cmd[0]).getPath();
@ -108,6 +107,24 @@ public class ProcessUtils {
return cmdstr; return cmdstr;
} }
/**
* check is allow ambiguous commands
*
* @param security security manager
* @return allow ambiguous command flag
*/
private static boolean isAllowAmbiguousCommands(SecurityManager security) {
boolean allowAmbiguousCommands = false;
if (security == null) {
allowAmbiguousCommands = true;
String value = System.getProperty(LOCAL_PROCESS_EXEC);
if (value != null) {
allowAmbiguousCommands = !Constants.STRING_FALSE.equalsIgnoreCase(value);
}
}
return allowAmbiguousCommands;
}
/** /**
* get executable path. * get executable path.
* *
@ -139,8 +156,7 @@ public class ProcessUtils {
* @return format arg * @return format arg
*/ */
private static String quoteString(String arg) { private static String quoteString(String arg) {
StringBuilder argbuf = new StringBuilder(arg.length() + 2); return '"' + arg + '"';
return argbuf.append('"').append(arg).append('"').toString();
} }
/** /**
@ -155,15 +171,17 @@ public class ProcessUtils {
while (regexMatcher.find()) { while (regexMatcher.find()) {
matchList.add(regexMatcher.group()); matchList.add(regexMatcher.group());
} }
return matchList.toArray(new String[matchList.size()]); return matchList.toArray(new String[0]);
} }
/** /**
* Lazy Pattern. * Lazy Pattern.
*/ */
private static class LazyPattern { private static class LazyPattern {
// Escape-support version: /**
// "(\")((?:\\\\\\1|.)+?)\\1|([^\\s\"]+)"; * Escape-support version:
* "(\")((?:\\\\\\1|.)+?)\\1|([^\\s\"]+)";
*/
private static final Pattern PATTERN = Pattern.compile("[^\\s\"]+|\"[^\"]*\""); private static final Pattern PATTERN = Pattern.compile("[^\\s\"]+|\"[^\"]*\"");
} }
@ -191,6 +209,7 @@ public class ProcessUtils {
/** /**
* create command line. * create command line.
*
* @param verificationType verification type * @param verificationType verification type
* @param executablePath executable path * @param executablePath executable path
* @param cmd cmd * @param cmd cmd
@ -220,29 +239,26 @@ public class ProcessUtils {
/** /**
* whether is quoted. * whether is quoted.
* @param noQuotesInside *
* @param arg * @param noQuotesInside no quotes inside
* @param errorMessage * @param arg arg
* @param errorMessage error message
* @return boolean * @return boolean
*/ */
private static boolean isQuoted(boolean noQuotesInside, String arg, String errorMessage) { private static boolean isQuoted(boolean noQuotesInside, String arg, String errorMessage) {
int lastPos = arg.length() - 1; int lastPos = arg.length() - 1;
if (lastPos >= 1 && arg.charAt(0) == '"' && arg.charAt(lastPos) == '"') { if (lastPos >= 1 && arg.charAt(0) == '"' && arg.charAt(lastPos) == '"') {
// The argument has already been quoted. // The argument has already been quoted.
if (noQuotesInside) { if (noQuotesInside && arg.indexOf('"', 1) != lastPos) {
if (arg.indexOf('"', 1) != lastPos) {
// There is ["] inside. // There is ["] inside.
throw new IllegalArgumentException(errorMessage); throw new IllegalArgumentException(errorMessage);
} }
}
return true; return true;
} }
if (noQuotesInside) { if (noQuotesInside && arg.indexOf('"') >= 0) {
if (arg.indexOf('"') >= 0) {
// There is ["] inside. // There is ["] inside.
throw new IllegalArgumentException(errorMessage); throw new IllegalArgumentException(errorMessage);
} }
}
return false; return false;
} }
@ -259,8 +275,8 @@ public class ProcessUtils {
if (!argIsQuoted) { if (!argIsQuoted) {
char[] testEscape = ESCAPE_VERIFICATION[verificationType]; char[] testEscape = ESCAPE_VERIFICATION[verificationType];
for (int i = 0; i < testEscape.length; ++i) { for (char c : testEscape) {
if (arg.indexOf(testEscape[i]) >= 0) { if (arg.indexOf(c) >= 0) {
return true; return true;
} }
} }
@ -277,18 +293,42 @@ public class ProcessUtils {
* @param executePath execute path * @param executePath execute path
*/ */
public static void cancelApplication(List<String> appIds, Logger logger, String tenantCode, String executePath) { public static void cancelApplication(List<String> appIds, Logger logger, String tenantCode, String executePath) {
if (appIds.size() > 0) { if (CollectionUtils.isNotEmpty(appIds)) {
String appid = appIds.get(appIds.size() - 1);
for (String appId : appIds) {
try {
ExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId);
if (!applicationStatus.typeIsFinished()) {
String commandFile = String String commandFile = String
.format("%s/%s.kill", executePath, appid); .format("%s/%s.kill", executePath, appId);
String cmd = "yarn application -kill " + appid; String cmd = "yarn application -kill " + appId;
execYarnKillCommand(logger, tenantCode, appId, commandFile, cmd);
}
} catch (Exception e) {
logger.error(String.format("Get yarn application app id [%s] status failed: [%s]", appId, e.getMessage()));
}
}
}
}
/**
* build kill command for yarn application
*
* @param logger logger
* @param tenantCode tenant code
* @param appId app id
* @param commandFile command file
* @param cmd cmd
*/
private static void execYarnKillCommand(Logger logger, String tenantCode, String appId, String commandFile, String cmd) {
try { try {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("#!/bin/sh\n"); sb.append("#!/bin/sh\n");
sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n"); sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
sb.append("cd $BASEDIR\n"); sb.append("cd $BASEDIR\n");
if (CommonUtils.getSystemEnvPath() != null) { if (CommonUtils.getSystemEnvPath() != null) {
sb.append("source " + CommonUtils.getSystemEnvPath() + "\n"); sb.append("source ").append(CommonUtils.getSystemEnvPath()).append("\n");
} }
sb.append("\n\n"); sb.append("\n\n");
sb.append(cmd); sb.append(cmd);
@ -299,17 +339,15 @@ public class ProcessUtils {
FileUtils.writeStringToFile(new File(commandFile), sb.toString(), StandardCharsets.UTF_8); FileUtils.writeStringToFile(new File(commandFile), sb.toString(), StandardCharsets.UTF_8);
} }
String runCmd = "sh " + commandFile; String runCmd = String.format("%s %s", Constants.SH, commandFile);
if (StringUtils.isNotEmpty(tenantCode)) { if (StringUtils.isNotEmpty(tenantCode)) {
runCmd = "sudo -u " + tenantCode + " " + runCmd; runCmd = "sudo -u " + tenantCode + " " + runCmd;
} }
logger.info("kill cmd:{}", runCmd); logger.info("kill cmd:{}", runCmd);
OSUtils.exeCmd(runCmd);
Runtime.getRuntime().exec(runCmd);
} catch (Exception e) { } catch (Exception e) {
logger.error("kill application error", e); logger.error(String.format("Kill yarn application app id [%s] failed: [%s]", appId, e.getMessage()));
}
} }
} }
@ -345,24 +383,29 @@ public class ProcessUtils {
* get pids str. * get pids str.
* *
* @param processId process id * @param processId process id
* @return pids * @return pids pid String
* @throws Exception exception * @throws Exception exception
*/ */
public static String getPidsStr(int processId) throws Exception { public static String getPidsStr(int processId) throws Exception {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
Matcher mat; Matcher mat = null;
// pstree pid get sub pids // pstree pid get sub pids
if (OSUtils.isMacOS()) { if (OSUtils.isMacOS()) {
String pids = OSUtils.exeCmd("pstree -sp " + processId); String pids = OSUtils.exeCmd(String.format("%s -sp %d", Constants.PSTREE, processId));
if (null != pids) {
mat = MACPATTERN.matcher(pids); mat = MACPATTERN.matcher(pids);
}
} else { } else {
String pids = OSUtils.exeCmd("pstree -p " + processId); String pids = OSUtils.exeCmd(String.format("%s -p %d", Constants.PSTREE, processId));
mat = WINDOWSATTERN.matcher(pids); mat = WINDOWSATTERN.matcher(pids);
} }
if (null != mat) {
while (mat.find()) { while (mat.find()) {
sb.append(mat.group(1)).append(" "); sb.append(mat.group(1)).append(" ");
} }
}
return sb.toString().trim(); return sb.toString().trim();
} }
@ -375,7 +418,7 @@ public class ProcessUtils {
try { try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
LogClientService logClient = null; LogClientService logClient = null;
String log = null; String log;
try { try {
logClient = new LogClientService(); logClient = new LogClientService();
log = logClient.viewLog(Host.of(taskExecutionContext.getHost()).getIp(), log = logClient.viewLog(Host.of(taskExecutionContext.getHost()).getIp(),
@ -393,7 +436,7 @@ public class ProcessUtils {
logger.error("task instance work dir is empty"); logger.error("task instance work dir is empty");
throw new RuntimeException("task instance work dir is empty"); throw new RuntimeException("task instance work dir is empty");
} }
if (appIds.size() > 0) { if (CollectionUtils.isNotEmpty(appIds)) {
cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath()); cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
} }
} }

108
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java

@ -14,34 +14,126 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.utils; package org.apache.dolphinscheduler.server.utils;
import static org.powermock.api.mockito.PowerMockito.when;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.MockitoAnnotations;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; @RunWith(PowerMockRunner.class)
import java.util.ArrayList; @PrepareForTest({System.class, OSUtils.class, HadoopUtils.class})
import java.util.List;
public class ProcessUtilsTest { public class ProcessUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(ProcessUtilsTest.class); private static final Logger logger = LoggerFactory.getLogger(ProcessUtils.class);
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}
@Test @Test
public void getPidsStr() throws Exception { public void getPidsStr() throws Exception {
String pidList = ProcessUtils.getPidsStr(1); int processId = 1;
String pidList = ProcessUtils.getPidsStr(processId);
Assert.assertNotEquals("The child process of process 1 should not be empty", pidList, ""); Assert.assertNotEquals("The child process of process 1 should not be empty", pidList, "");
logger.info("Sub process list : {}", pidList);
PowerMockito.mockStatic(OSUtils.class);
when(OSUtils.isMacOS()).thenReturn(true);
when(OSUtils.exeCmd(String.format("%s -p %d", Constants.PSTREE, processId))).thenReturn(null);
String pidListMac = ProcessUtils.getPidsStr(processId);
Assert.assertEquals("", pidListMac);
} }
@Test @Test
public void testBuildCommandStr() { public void testBuildCommandStr() {
List<String> commands = new ArrayList<>(); List<String> commands = new ArrayList<>();
commands.add("sudo"); commands.add("sudo");
Assert.assertEquals(ProcessUtils.buildCommandStr(commands), "sudo"); commands.add("-u");
commands.add("tenantCode");
//allowAmbiguousCommands false
Assert.assertEquals("sudo -u tenantCode", ProcessUtils.buildCommandStr(commands));
//quota
commands.clear();
commands.add("\"sudo\"");
Assert.assertEquals("\"sudo\"", ProcessUtils.buildCommandStr(commands));
//allowAmbiguousCommands true
commands.clear();
commands.add("sudo");
System.setProperty("jdk.lang.Process.allowAmbiguousCommands", "false");
Assert.assertEquals("\"sudo\"", ProcessUtils.buildCommandStr(commands));
} }
@Test
public void testKill() {
//get taskExecutionContext
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
//process id eq 0
taskExecutionContext.setProcessId(0);
ProcessUtils.kill(taskExecutionContext);
//process id not eq 0
taskExecutionContext.setProcessId(1);
PowerMockito.mockStatic(OSUtils.class);
try {
when(OSUtils.exeCmd(String.format("%s -sp %d", Constants.PSTREE, 1))).thenReturn("1111");
when(OSUtils.exeCmd(String.format("%s -p %d", Constants.PSTREE, 1))).thenReturn("1111");
when(OSUtils.exeCmd("sudo kill -9")).thenReturn("1111");
} catch (Exception e) {
e.printStackTrace();
}
taskExecutionContext.setHost("127.0.0.1:8888");
taskExecutionContext.setLogPath("/log/1.log");
ProcessUtils.kill(taskExecutionContext);
Assert.assertEquals(1, taskExecutionContext.getProcessId());
}
@Test
public void testCancelApplication() {
List<String> appIds = new ArrayList<>();
appIds.add("application_1585532379175_228491");
appIds.add("application_1598885606600_3677");
String tenantCode = "dev";
String executePath = "/ds-exec/1/1/1";
ExecutionStatus running = ExecutionStatus.RUNNING_EXECUTION;
PowerMockito.mockStatic(HadoopUtils.class);
HadoopUtils hadoop = HadoopUtils.getInstance();
try {
PowerMockito.whenNew(HadoopUtils.class).withAnyArguments().thenReturn(hadoop);
} catch (Exception e) {
e.printStackTrace();
}
try {
when(hadoop.getApplicationStatus("application_1585532379175_228491")).thenReturn(running);
when(hadoop.getApplicationStatus("application_1598885606600_3677")).thenReturn(running);
} catch (Exception e) {
e.printStackTrace();
ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath);
}
Assert.assertNotNull(appIds);
}
} }

Loading…
Cancel
Save