Browse Source

fix #3900 kill multi yarn app in one job

pull/3/MERGE
Eights-LI 4 years ago
parent
commit
8fdd6f547d
  1. 47
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java

47
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java

@ -18,8 +18,11 @@
package org.apache.dolphinscheduler.server.utils; package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
@ -56,12 +59,13 @@ public class ProcessUtils {
/** /**
* build command line characters. * build command line characters.
*
* @param commandList command list * @param commandList command list
* @return command * @return command
*/ */
public static String buildCommandStr(List<String> commandList) { public static String buildCommandStr(List<String> commandList) {
String cmdstr; String cmdstr;
String[] cmd = commandList.toArray(new String[commandList.size()]); String[] cmd = (String[]) commandList.toArray();
SecurityManager security = System.getSecurityManager(); SecurityManager security = System.getSecurityManager();
boolean allowAmbiguousCommands = false; boolean allowAmbiguousCommands = false;
if (security == null) { if (security == null) {
@ -139,8 +143,7 @@ public class ProcessUtils {
* @return format arg * @return format arg
*/ */
private static String quoteString(String arg) { private static String quoteString(String arg) {
StringBuilder argbuf = new StringBuilder(arg.length() + 2); return '"' + arg + '"';
return argbuf.append('"').append(arg).append('"').toString();
} }
/** /**
@ -155,7 +158,7 @@ public class ProcessUtils {
while (regexMatcher.find()) { while (regexMatcher.find()) {
matchList.add(regexMatcher.group()); matchList.add(regexMatcher.group());
} }
return matchList.toArray(new String[matchList.size()]); return (String[]) matchList.toArray();
} }
/** /**
@ -191,6 +194,7 @@ public class ProcessUtils {
/** /**
* create command line. * create command line.
*
* @param verificationType verification type * @param verificationType verification type
* @param executablePath executable path * @param executablePath executable path
* @param cmd cmd * @param cmd cmd
@ -220,9 +224,10 @@ public class ProcessUtils {
/** /**
* whether is quoted. * whether is quoted.
* @param noQuotesInside *
* @param arg * @param noQuotesInside no quotes inside
* @param errorMessage * @param arg arg
* @param errorMessage error message
* @return boolean * @return boolean
*/ */
private static boolean isQuoted(boolean noQuotesInside, String arg, String errorMessage) { private static boolean isQuoted(boolean noQuotesInside, String arg, String errorMessage) {
@ -259,8 +264,8 @@ public class ProcessUtils {
if (!argIsQuoted) { if (!argIsQuoted) {
char[] testEscape = ESCAPE_VERIFICATION[verificationType]; char[] testEscape = ESCAPE_VERIFICATION[verificationType];
for (int i = 0; i < testEscape.length; ++i) { for (char c : testEscape) {
if (arg.indexOf(testEscape[i]) >= 0) { if (arg.indexOf(c) >= 0) {
return true; return true;
} }
} }
@ -277,18 +282,23 @@ public class ProcessUtils {
* @param executePath execute path * @param executePath execute path
*/ */
public static void cancelApplication(List<String> appIds, Logger logger, String tenantCode, String executePath) { public static void cancelApplication(List<String> appIds, Logger logger, String tenantCode, String executePath) {
if (appIds.size() > 0) { if (CollectionUtils.isNotEmpty(appIds)) {
String appid = appIds.get(appIds.size() - 1);
for (String appId : appIds) {
try {
ExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId);
if (!applicationStatus.typeIsFinished()) {
String commandFile = String String commandFile = String
.format("%s/%s.kill", executePath, appid); .format("%s/%s.kill", executePath, appId);
String cmd = "yarn application -kill " + appid; String cmd = "yarn application -kill " + appId;
try { try {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("#!/bin/sh\n"); sb.append("#!/bin/sh\n");
sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n"); sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
sb.append("cd $BASEDIR\n"); sb.append("cd $BASEDIR\n");
if (CommonUtils.getSystemEnvPath() != null) { if (CommonUtils.getSystemEnvPath() != null) {
sb.append("source " + CommonUtils.getSystemEnvPath() + "\n"); sb.append("source ").append(CommonUtils.getSystemEnvPath()).append("\n");
} }
sb.append("\n\n"); sb.append("\n\n");
sb.append(cmd); sb.append(cmd);
@ -308,7 +318,12 @@ public class ProcessUtils {
Runtime.getRuntime().exec(runCmd); Runtime.getRuntime().exec(runCmd);
} catch (Exception e) { } catch (Exception e) {
logger.error("kill application error", e); logger.error(String.format("Kill yarn application app id [%s] failed: [%s]", appId, e.getMessage()));
}
}
} catch (Exception e) {
logger.error(String.format("Get yarn application app id [%s] status failed: [%s]", appId, e.getMessage()));
}
} }
} }
} }
@ -375,7 +390,7 @@ public class ProcessUtils {
try { try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
LogClientService logClient = null; LogClientService logClient = null;
String log = null; String log;
try { try {
logClient = new LogClientService(); logClient = new LogClientService();
log = logClient.viewLog(Host.of(taskExecutionContext.getHost()).getIp(), log = logClient.viewLog(Host.of(taskExecutionContext.getHost()).getIp(),

Loading…
Cancel
Save