From e53369318bdf61f169dcbf2644caf8521b3dd536 Mon Sep 17 00:00:00 2001 From: zhuangchong <37063904+zhuangchong@users.noreply.github.com> Date: Tue, 30 Mar 2021 22:33:49 +0800 Subject: [PATCH] [Fix-4721][worker]The shell background starts the YARN task scenario, and the kill function is abnormal (#4722) * fix the shell starts the yarn task in the background * update StringUtils code style. * solve code smell. * add method comment in StringUtils class. * update AlertGroupMapper code style. * update AlertGroupMapperTest * update sql script test. --- .../common/utils/HadoopUtils.java | 9 +- .../common/utils/StringUtils.java | 141 ++++++++++++++++-- .../common/utils/StringUtilsTest.java | 13 ++ .../dao/mapper/AlertGroupMapper.java | 10 +- .../server/utils/ProcessUtils.java | 19 ++- .../worker/processor/TaskKillProcessor.java | 11 +- .../worker/task/AbstractCommandExecutor.java | 7 +- 7 files changed, 181 insertions(+), 29 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java index 6a53c000fe..6dc3335f17 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java @@ -205,6 +205,7 @@ public class HadoopUtils implements Closeable { * if rmHaIds is empty, single resourcemanager enabled * if rmHaIds not empty: resourcemanager HA enabled */ + yarnEnabled = true; String appUrl = StringUtils.isEmpty(rmHaIds) ? appAddress : getAppAddress(appAddress, rmHaIds); if (StringUtils.isBlank(appUrl)) { @@ -419,7 +420,9 @@ public class HadoopUtils implements Closeable { String result = Constants.FAILED; String applicationUrl = getApplicationUrl(applicationId); - logger.info("applicationUrl={}", applicationUrl); + if (logger.isDebugEnabled()) { + logger.debug("generate yarn application url, applicationUrl={}", applicationUrl); + } String responseContent = PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false) ? KerberosHttpClient.get(applicationUrl) : HttpUtils.get(applicationUrl); if (responseContent != null) { @@ -432,7 +435,9 @@ public class HadoopUtils implements Closeable { } else { //may be in job history String jobHistoryUrl = getJobHistoryUrl(applicationId); - logger.info("jobHistoryUrl={}", jobHistoryUrl); + if (logger.isDebugEnabled()) { + logger.debug("generate yarn job history application url, jobHistoryUrl={}", jobHistoryUrl); + } responseContent = PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false) ? KerberosHttpClient.get(jobHistoryUrl) : HttpUtils.get(jobHistoryUrl); if (null != responseContent) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java index f506f76627..ffa783316b 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java @@ -17,48 +17,169 @@ package org.apache.dolphinscheduler.common.utils; +import java.util.Collection; +import java.util.Iterator; + +/** + * java.lang.String utils class + */ public class StringUtils { + /** + * The empty String {@code ""}. + */ public static final String EMPTY = ""; private StringUtils() { throw new UnsupportedOperationException("Construct StringUtils"); } + /** + *

Checks if a CharSequence is empty ("") or null.

+ * + * @param cs the CharSequence to check, may be null + * @return {@code true} if the CharSequence is empty or null + */ public static boolean isEmpty(final CharSequence cs) { return cs == null || cs.length() == 0; } + /** + *

Checks if a CharSequence is not empty ("") and not null.

+ * + * @param cs the CharSequence to check, may be null + * @return {@code true} if the CharSequence is not empty and not null + */ public static boolean isNotEmpty(final CharSequence cs) { return !isEmpty(cs); } - public static boolean isBlank(String str) { + /** + *

Checks if a CharSequence is empty (""), null or whitespace only.

+ * + * @param cs the CharSequence to check, may be null + * @return {@code true} if the CharSequence is null, empty or whitespace only + */ + public static boolean isBlank(final CharSequence cs) { int strLen; - if (str != null && (strLen = str.length()) != 0) { - for (int i = 0; i < strLen; ++i) { - if (!Character.isWhitespace(str.charAt(i))) { - return false; - } + if (cs == null || (strLen = cs.length()) == 0) { + return true; + } + for (int i = 0; i < strLen; i++) { + if (!Character.isWhitespace(cs.charAt(i))) { + return false; } } return true; + } + /** + *

Checks if a CharSequence is not empty (""), not null and not whitespace only.

+ * + * @param cs the CharSequence to check, may be null + * @return {@code true} if the CharSequence is not empty and not null and not whitespace only + */ + public static boolean isNotBlank(final CharSequence cs) { + return !isBlank(cs); } - public static boolean isNotBlank(String s) { - return !isBlank(s); + /** + *

Replace all strings matching the regular expression \t \n \r with _

+ * + * @param src the String , may be null + * @return the string that has been replaced + */ + public static String replaceNRTtoUnderline(String src) { + return isBlank(src) ? src : src.replaceAll("[\n|\r|\t]", "_"); } - public static String trim(String str) { + /** + *

Removes control characters (char <= 32) from both + * ends of this String, handling {@code null} by returning + * {@code null}.

+ * + * @param str the String to be trimmed, may be null + * @return the trimmed string, {@code null} if null String input + */ + public static String trim(final String str) { return str == null ? null : str.trim(); } - public static String defaultIfBlank(String str, String defaultStr) { + /** + *

Returns either the passed in CharSequence, or if the CharSequence is + * whitespace, empty ("") or {@code null}, the value of {@code defaultStr}.

+ * + * @param the specific kind of CharSequence + * @param str the CharSequence to check, may be null + * @param defaultStr the default CharSequence to return + * if the input is whitespace, empty ("") or {@code null}, may be null + * @return the passed in CharSequence, or the default + */ + public static T defaultIfBlank(final T str, final T defaultStr) { return isBlank(str) ? defaultStr : str; } + /** + *

Compares two String, returning {@code true} if they represent + * equal string, ignoring case.

+ * + * @param str1 the first String, may be null + * @param str2 the second String, may be null + * @return {@code true} if the String are equal, case insensitive, or + * both {@code null} + */ public static boolean equalsIgnoreCase(String str1, String str2) { return str1 == null ? str2 == null : str1.equalsIgnoreCase(str2); } + + /** + *

Joins the elements of the provided Collection into a single String + * containing the provided Collection of elements.

+ * + * @param collection the collection, may be null + * @param separator the separator + * @return a single String + */ + public static String join(Collection collection, String separator) { + return collection == null ? null : join(collection.iterator(), separator); + } + + /** + *

Joins the elements of the provided Iterator into a single String + * containing the provided Iterator of elements.

+ * + * @param iterator the iterator, may be null + * @param separator the separator + * @return a single String + */ + public static String join(Iterator iterator, String separator) { + if (iterator == null) { + return null; + } else if (!iterator.hasNext()) { + return ""; + } else { + Object first = iterator.next(); + if (!iterator.hasNext()) { + return first == null ? "" : first.toString(); + } else { + StringBuilder buf = new StringBuilder(256); + if (first != null) { + buf.append(first); + } + + while (iterator.hasNext()) { + if (separator != null) { + buf.append(separator); + } + + Object obj = iterator.next(); + if (obj != null) { + buf.append(obj); + } + } + return buf.toString(); + } + } + } + } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StringUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StringUtilsTest.java index 3f5aeda3f9..2e59750fe2 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StringUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StringUtilsTest.java @@ -17,6 +17,9 @@ package org.apache.dolphinscheduler.common.utils; +import java.util.ArrayList; +import java.util.List; + import org.junit.Assert; import org.junit.Test; @@ -80,4 +83,14 @@ public class StringUtilsTest { defaultStr = StringUtils.defaultIfBlank("test", "defaultStr"); Assert.assertEquals("test", defaultStr); } + + @Test + public void testJoin() { + List list = new ArrayList<>(); + list.add("1"); + list.add("3"); + list.add("4"); + String join = StringUtils.join(list, ","); + Assert.assertEquals("1,3,4", join); + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertGroupMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertGroupMapper.java index 8026b9a89f..b8f4188fc7 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertGroupMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertGroupMapper.java @@ -14,17 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.mapper; -import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.dao.entity.AlertGroup; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + import org.apache.ibatis.annotations.Param; import java.util.List; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + /** * alertgroup mapper interface */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java index aca2afedab..9e5743eaf0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java @@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.service.log.LogClientService; import java.io.File; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -385,11 +386,13 @@ public class ProcessUtils { return; } - String cmd = String.format("kill -9 %s", getPidsStr(processId)); - cmd = OSUtils.getSudoCmd(taskExecutionContext.getTenantCode(), cmd); - logger.info("process id:{}, cmd:{}", processId, cmd); - - OSUtils.exeCmd(cmd); + String pidsStr = getPidsStr(processId); + if (StringUtils.isNotEmpty(pidsStr)) { + String cmd = String.format("kill -9 %s", pidsStr); + cmd = OSUtils.getSudoCmd(taskExecutionContext.getTenantCode(), cmd); + logger.info("process id:{}, cmd:{}", processId, cmd); + OSUtils.exeCmd(cmd); + } } catch (Exception e) { logger.error("kill task failed", e); @@ -430,10 +433,10 @@ public class ProcessUtils { /** * find logs and kill yarn tasks. - * * @param taskExecutionContext taskExecutionContext + * @return yarn application ids */ - public static void killYarnJob(TaskExecutionContext taskExecutionContext) { + public static List killYarnJob(TaskExecutionContext taskExecutionContext) { try { Thread.sleep(Constants.SLEEP_TIME_MILLIS); LogClientService logClient = null; @@ -457,11 +460,13 @@ public class ProcessUtils { } if (CollectionUtils.isNotEmpty(appIds)) { cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath()); + return appIds; } } } catch (Exception e) { logger.error("kill yarn job failure", e); } + return Collections.emptyList(); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index 84109ccbc8..8cbf0471b2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -124,11 +124,14 @@ public class TaskKillProcessor implements NettyRequestProcessor { return Pair.of(true, appIds); } - String cmd = String.format("kill -9 %s", ProcessUtils.getPidsStr(taskExecutionContext.getProcessId())); - cmd = OSUtils.getSudoCmd(taskExecutionContext.getTenantCode(), cmd); - logger.info("process id:{}, cmd:{}", taskExecutionContext.getProcessId(), cmd); + String pidsStr = ProcessUtils.getPidsStr(taskExecutionContext.getProcessId()); + if (StringUtils.isNotEmpty(pidsStr)) { + String cmd = String.format("kill -9 %s", pidsStr); + cmd = OSUtils.getSudoCmd(taskExecutionContext.getTenantCode(), cmd); + logger.info("process id:{}, cmd:{}", taskExecutionContext.getProcessId(), cmd); + OSUtils.exeCmd(cmd); + } - OSUtils.exeCmd(cmd); } catch (Exception e) { processFlag = false; logger.error("kill task error", e); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 88af2d7f2c..788d5441df 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -408,9 +408,12 @@ public abstract class AbstractCommandExecutor { boolean result = true; try { for (String appId : appIds) { + logger.info("check yarn application status, appId:{}", appId); while (Stopper.isRunning()) { ExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId); - logger.info("appId:{}, final state:{}", appId, applicationStatus.name()); + if (logger.isDebugEnabled()) { + logger.debug("check yarn application status, appId:{}, final state:{}", appId, applicationStatus.name()); + } if (applicationStatus.equals(ExecutionStatus.FAILURE) || applicationStatus.equals(ExecutionStatus.KILL)) { return false; @@ -423,7 +426,7 @@ public abstract class AbstractCommandExecutor { } } } catch (Exception e) { - logger.error(String.format("yarn applications: %s status failed ", appIds.toString()), e); + logger.error("yarn applications: {} , query status failed, exception:{}", StringUtils.join(appIds, ","), e); result = false; } return result;