Browse Source

The shell background starts the YARN task scenario, and the kill function is abnormal. (#5916)

zhuangchong 3 years ago committed by GitHub
parent
commit
b99eca5b59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 149
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
  2. 26
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
  3. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  4. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java

149
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java

@ -17,42 +17,169 @@
package org.apache.dolphinscheduler.common.utils;
import java.util.Collection;
import java.util.Iterator;
/**
* java.lang.String utils class
*/
public class StringUtils {
/**
* The empty String {@code ""}.
*/
public static final String EMPTY = "";
private StringUtils() {
throw new UnsupportedOperationException("Construct StringUtils");
}
/**
* <p>Checks if a CharSequence is empty ("") or null.</p>
*
* @param cs the CharSequence to check, may be null
* @return {@code true} if the CharSequence is empty or null
*/
public static boolean isEmpty(final CharSequence cs) {
return cs == null || cs.length() == 0;
}
/**
* <p>Checks if a CharSequence is not empty ("") and not null.</p>
*
* @param cs the CharSequence to check, may be null
* @return {@code true} if the CharSequence is not empty and not null
*/
public static boolean isNotEmpty(final CharSequence cs) {
return !isEmpty(cs);
}
public static boolean isBlank(String s) {
if (isEmpty(s)) {
/**
* <p>Checks if a CharSequence is empty (""), null or whitespace only.</p>
*
* @param cs the CharSequence to check, may be null
* @return {@code true} if the CharSequence is null, empty or whitespace only
*/
public static boolean isBlank(final CharSequence cs) {
int strLen;
if (cs == null || (strLen = cs.length()) == 0) {
return true;
}
return s.trim().length() == 0;
for (int i = 0; i < strLen; i++) {
if (!Character.isWhitespace(cs.charAt(i))) {
return false;
}
}
return true;
}
public static boolean isNotBlank(String s) {
return !isBlank(s);
/**
* <p>Checks if a CharSequence is not empty (""), not null and not whitespace only.</p>
*
* @param cs the CharSequence to check, may be null
* @return {@code true} if the CharSequence is not empty and not null and not whitespace only
*/
public static boolean isNotBlank(final CharSequence cs) {
return !isBlank(cs);
}
/**
* <p>Replace all strings matching the regular expression \t \n \r with _</p>
*
* @param src the String , may be null
* @return the string that has been replaced
*/
public static String replaceNRTtoUnderline(String src) {
if (isBlank(src)) {
return src;
} else {
return src.replaceAll("[\n|\r|\t]", "_");
}
return isBlank(src) ? src : src.replaceAll("[\n|\r|\t]", "_");
}
public static String trim(String str) {
/**
* <p>Removes control characters (char &lt;= 32) from both
* ends of this String, handling {@code null} by returning
* {@code null}.</p>
*
* @param str the String to be trimmed, may be null
* @return the trimmed string, {@code null} if null String input
*/
public static String trim(final String str) {
return str == null ? null : str.trim();
}
/**
* <p>Returns either the passed in CharSequence, or if the CharSequence is
* whitespace, empty ("") or {@code null}, the value of {@code defaultStr}.</p>
*
* @param <T> the specific kind of CharSequence
* @param str the CharSequence to check, may be null
* @param defaultStr the default CharSequence to return
* if the input is whitespace, empty ("") or {@code null}, may be null
* @return the passed in CharSequence, or the default
*/
public static <T extends CharSequence> T defaultIfBlank(final T str, final T defaultStr) {
return isBlank(str) ? defaultStr : str;
}
/**
* <p>Compares two String, returning {@code true} if they represent
* equal string, ignoring case.</p>
*
* @param str1 the first String, may be null
* @param str2 the second String, may be null
* @return {@code true} if the String are equal, case insensitive, or
* both {@code null}
*/
public static boolean equalsIgnoreCase(String str1, String str2) {
return str1 == null ? str2 == null : str1.equalsIgnoreCase(str2);
}
/**
* <p>Joins the elements of the provided Collection into a single String
* containing the provided Collection of elements.</p>
*
* @param collection the collection, may be null
* @param separator the separator
* @return a single String
*/
public static String join(Collection<?> collection, String separator) {
return collection == null ? null : join(collection.iterator(), separator);
}
/**
* <p>Joins the elements of the provided Iterator into a single String
* containing the provided Iterator of elements.</p>
*
* @param iterator the iterator, may be null
* @param separator the separator
* @return a single String
*/
public static String join(Iterator<?> iterator, String separator) {
if (iterator == null) {
return null;
} else if (!iterator.hasNext()) {
return "";
} else {
Object first = iterator.next();
if (!iterator.hasNext()) {
return first == null ? "" : first.toString();
} else {
StringBuilder buf = new StringBuilder(256);
if (first != null) {
buf.append(first);
}
while (iterator.hasNext()) {
if (separator != null) {
buf.append(separator);
}
Object obj = iterator.next();
if (obj != null) {
buf.append(obj);
}
}
return buf.toString();
}
}
}
}

26
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java

@ -18,17 +18,10 @@ package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,6 +29,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -368,11 +362,12 @@ public class ProcessUtils {
return;
}
String cmd = String.format("sudo kill -9 %s", getPidsStr(processId));
logger.info("process id:{}, cmd:{}", processId, cmd);
OSUtils.exeCmd(cmd);
String pidsStr = getPidsStr(processId);
if (StringUtils.isNotEmpty(pidsStr)) {
String cmd = String.format("sudo kill -9 %s", pidsStr);
logger.info("process id:{}, cmd:{}", processId, cmd);
OSUtils.exeCmd(cmd);
}
} catch (Exception e) {
logger.error("kill task failed", e);
@ -417,8 +412,9 @@ public class ProcessUtils {
* find logs and kill yarn tasks
*
* @param taskExecutionContext taskExecutionContext
* @return yarn application ids
*/
public static void killYarnJob(TaskExecutionContext taskExecutionContext) {
public static List<String> killYarnJob(TaskExecutionContext taskExecutionContext) {
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
LogClientService logClient = null;
@ -442,11 +438,13 @@ public class ProcessUtils {
}
if (CollectionUtils.isNotEmpty(appIds)) {
cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
return appIds;
}
}
} catch (Exception e) {
logger.error("kill yarn job failure", e);
}
return Collections.emptyList();
}
}

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -115,11 +115,12 @@ public class TaskKillProcessor implements NettyRequestProcessor {
return Pair.of(true, appIds);
}
String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(taskExecutionContext.getProcessId()));
logger.info("process id:{}, cmd:{}", taskExecutionContext.getProcessId(), cmd);
OSUtils.exeCmd(cmd);
String pidsStr = ProcessUtils.getPidsStr(taskExecutionContext.getProcessId());
if (StringUtils.isNotEmpty(pidsStr)) {
String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(taskExecutionContext.getProcessId()));
logger.info("process id:{}, cmd:{}", taskExecutionContext.getProcessId(), cmd);
OSUtils.exeCmd(cmd);
}
} catch (Exception e) {
processFlag = false;

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java

@ -392,9 +392,10 @@ public abstract class AbstractCommandExecutor {
boolean result = true;
try {
for (String appId : appIds) {
logger.info("check yarn application status, appId:{}", appId);
while(Stopper.isRunning()){
ExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId);
logger.info("appId:{}, final state:{}",appId,applicationStatus.name());
logger.debug("check yarn application status, appId:{}, final state:{}", appId, applicationStatus.name());
if (applicationStatus.equals(ExecutionStatus.FAILURE) ||
applicationStatus.equals(ExecutionStatus.KILL)) {
return false;
@ -407,7 +408,7 @@ public abstract class AbstractCommandExecutor {
}
}
} catch (Exception e) {
logger.error(String.format("yarn applications: %s status failed ", appIds.toString()),e);
logger.error("yarn applications: {} , query status failed, exception:{}", StringUtils.join(appIds, ","), e);
result = false;
}
return result;

Loading…
Cancel
Save