diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java index d7f3ffcf10..9ab19d3e24 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/am/YarnApplicationManager.java @@ -19,14 +19,8 @@ package org.apache.dolphinscheduler.plugin.task.api.am; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.ResourceManagerType; -import org.apache.dolphinscheduler.common.utils.HttpUtils; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.KerberosHttpClient; import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskException; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; - -import org.apache.commons.lang3.StringUtils; import java.io.File; import java.nio.charset.StandardCharsets; @@ -34,20 +28,12 @@ import java.util.List; import lombok.extern.slf4j.Slf4j; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.auto.service.AutoService; @Slf4j @AutoService(ApplicationManager.class) public class YarnApplicationManager implements ApplicationManager { - private static final String RM_HA_IDS = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS); - private static final String APP_ADDRESS = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS); - private static final String JOB_HISTORY_ADDRESS = - PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS); - private static final int HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE = - PropertyUtils.getInt(Constants.HADOOP_RESOURCE_MANAGER_HTTPADDRESS_PORT, 8088); - @Override public boolean killApplication(ApplicationManagerContext applicationManagerContext) throws TaskException { YarnApplicationManagerContext yarnApplicationManagerContext = @@ -55,20 +41,16 @@ public class YarnApplicationManager implements ApplicationManager { String executePath = yarnApplicationManagerContext.getExecutePath(); String tenantCode = yarnApplicationManagerContext.getTenantCode(); List appIds = yarnApplicationManagerContext.getAppIds(); - for (String appId : appIds) { - try { - TaskExecutionStatus applicationStatus = getApplicationStatus(appId); - if (!applicationStatus.isFinished()) { - String commandFile = String.format("%s/%s.kill", executePath, appId); - String cmd = getKerberosInitCommand() + "yarn application -kill " + appId; - execYarnKillCommand(tenantCode, appId, commandFile, cmd); - } - } catch (Exception e) { - log.error("Get yarn application app id [{}}] status failed", appId, e); - throw new TaskException(e.getMessage()); - } + try { + String commandFile = String.format("%s/%s.kill", executePath, String.join(Constants.UNDERLINE, appIds)); + String cmd = getKerberosInitCommand() + "yarn application -kill " + String.join(Constants.SPACE, appIds); + execYarnKillCommand(tenantCode, commandFile, cmd); + } catch (Exception e) { + log.error("Kill yarn application [{}] failed", appIds, e); + throw new TaskException(e.getMessage()); } + return true; } @@ -77,168 +59,34 @@ public class YarnApplicationManager implements ApplicationManager { return ResourceManagerType.YARN; } - /** - * get the state of an application - * - * @param applicationId application id - * @return the return may be null or there may be other parse exceptions - */ - public TaskExecutionStatus getApplicationStatus(String applicationId) throws TaskException { - if (StringUtils.isEmpty(applicationId)) { - return null; - } - - String result; - String applicationUrl = getApplicationUrl(applicationId); - log.debug("generate yarn application url, applicationUrl={}", applicationUrl); - - String responseContent = Boolean.TRUE - .equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) - ? KerberosHttpClient.get(applicationUrl) - : HttpUtils.get(applicationUrl); - if (responseContent != null) { - ObjectNode jsonObject = JSONUtils.parseObject(responseContent); - if (!jsonObject.has("app")) { - return TaskExecutionStatus.FAILURE; - } - result = jsonObject.path("app").path("finalStatus").asText(); - - } else { - // may be in job history - String jobHistoryUrl = getJobHistoryUrl(applicationId); - log.debug("generate yarn job history application url, jobHistoryUrl={}", jobHistoryUrl); - responseContent = Boolean.TRUE - .equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) - ? KerberosHttpClient.get(jobHistoryUrl) - : HttpUtils.get(jobHistoryUrl); - - if (null != responseContent) { - ObjectNode jsonObject = JSONUtils.parseObject(responseContent); - if (!jsonObject.has("job")) { - return TaskExecutionStatus.FAILURE; - } - result = jsonObject.path("job").path("state").asText(); - } else { - return TaskExecutionStatus.FAILURE; - } - } - - return getExecutionStatus(result); - } - - /** - * get application url - * if rmHaIds contains xx, it signs not use resourcemanager - * otherwise: - * if rmHaIds is empty, single resourcemanager enabled - * if rmHaIds not empty: resourcemanager HA enabled - * - * @param applicationId application id - * @return url of application - */ - private String getApplicationUrl(String applicationId) throws TaskException { - - String appUrl = StringUtils.isEmpty(RM_HA_IDS) ? APP_ADDRESS : getAppAddress(APP_ADDRESS, RM_HA_IDS); - if (StringUtils.isBlank(appUrl)) { - throw new TaskException("yarn application url generation failed"); - } - log.debug("yarn application url:{}, applicationId:{}", appUrl, applicationId); - return String.format(appUrl, HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE, applicationId); - } - - private String getJobHistoryUrl(String applicationId) { - // eg:application_1587475402360_712719 -> job_1587475402360_712719 - String jobId = applicationId.replace("application", "job"); - return String.format(JOB_HISTORY_ADDRESS, jobId); - } - /** * build kill command for yarn application * * @param tenantCode tenant code - * @param appId app id * @param commandFile command file * @param cmd cmd */ - private void execYarnKillCommand(String tenantCode, String appId, String commandFile, - String cmd) { - try { - StringBuilder sb = new StringBuilder(); - sb.append("#!/bin/sh\n"); - sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n"); - sb.append("cd $BASEDIR\n"); + private void execYarnKillCommand(String tenantCode, String commandFile, + String cmd) throws Exception { + StringBuilder sb = new StringBuilder(); + sb.append("#!/bin/sh\n"); + sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n"); + sb.append("cd $BASEDIR\n"); - sb.append("\n\n"); - sb.append(cmd); + sb.append("\n\n"); + sb.append(cmd); - File f = new File(commandFile); + File f = new File(commandFile); - if (!f.exists()) { - org.apache.commons.io.FileUtils.writeStringToFile(new File(commandFile), sb.toString(), - StandardCharsets.UTF_8); - } - - String runCmd = String.format("%s %s", Constants.SH, commandFile); - runCmd = org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd); - log.info("kill cmd:{}", runCmd); - org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd); - } catch (Exception e) { - log.error(String.format("Kill yarn application app id [%s] failed: [%s]", appId, e.getMessage())); + if (!f.exists()) { + org.apache.commons.io.FileUtils.writeStringToFile(new File(commandFile), sb.toString(), + StandardCharsets.UTF_8); } - } - private TaskExecutionStatus getExecutionStatus(String result) { - switch (result) { - case Constants.ACCEPTED: - return TaskExecutionStatus.SUBMITTED_SUCCESS; - case Constants.SUCCEEDED: - case Constants.ENDED: - return TaskExecutionStatus.SUCCESS; - case Constants.NEW: - case Constants.NEW_SAVING: - case Constants.SUBMITTED: - case Constants.FAILED: - return TaskExecutionStatus.FAILURE; - case Constants.KILLED: - return TaskExecutionStatus.KILL; - case Constants.RUNNING: - default: - return TaskExecutionStatus.RUNNING_EXECUTION; - } - } - - /** - * getAppAddress - * - * @param appAddress app address - * @param rmHa resource manager ha - * @return app address - */ - private String getAppAddress(String appAddress, String rmHa) { - - String[] split1 = appAddress.split(Constants.DOUBLE_SLASH); - - if (split1.length != 2) { - return null; - } - - String start = split1[0] + Constants.DOUBLE_SLASH; - String[] split2 = split1[1].split(Constants.COLON); - - if (split2.length != 2) { - return null; - } - - String end = Constants.COLON + split2[1]; - - // get active ResourceManager - String activeRM = YarnHAAdminUtils.getActiveRMName(start, rmHa); - - if (StringUtils.isEmpty(activeRM)) { - return null; - } - - return start + activeRM + end; + String runCmd = String.format("%s %s", Constants.SH, commandFile); + runCmd = org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd); + log.info("kill cmd:{}", runCmd); + org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd); } /** @@ -261,66 +109,4 @@ public class YarnApplicationManager implements ApplicationManager { } return kerberosCommandBuilder.toString(); } - - /** - * yarn ha admin utils - */ - private static final class YarnHAAdminUtils { - - /** - * get active resourcemanager node - * - * @param protocol http protocol - * @param rmIds yarn ha ids - * @return yarn active node - */ - public static String getActiveRMName(String protocol, String rmIds) { - - String[] rmIdArr = rmIds.split(Constants.COMMA); - - String yarnUrl = protocol + "%s:" + HADOOP_RESOURCE_MANAGER_HTTP_ADDRESS_PORT_VALUE + "/ws/v1/cluster/info"; - - try { - - /** - * send http get request to rm - */ - - for (String rmId : rmIdArr) { - String state = getRMState(String.format(yarnUrl, rmId)); - if (Constants.HADOOP_RM_STATE_ACTIVE.equals(state)) { - return rmId; - } - } - - } catch (Exception e) { - log.error("yarn ha application url generation failed, message:{}", e.getMessage()); - } - return null; - } - - /** - * get ResourceManager state - */ - public static String getRMState(String url) { - - String retStr = Boolean.TRUE - .equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) - ? KerberosHttpClient.get(url) - : HttpUtils.get(url); - - if (StringUtils.isEmpty(retStr)) { - return null; - } - // to json - ObjectNode jsonObject = JSONUtils.parseObject(retStr); - - // get ResourceManager state - if (!jsonObject.has("clusterInfo")) { - return null; - } - return jsonObject.get("clusterInfo").path("haState").asText(); - } - } - } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerTaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerTaskKillProcessor.java index e2f4d74ced..01bd5c85e0 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerTaskKillProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerTaskKillProcessor.java @@ -85,9 +85,10 @@ public class WorkerTaskKillProcessor implements WorkerRpcProcessor { return; } + this.cancelApplication(taskInstanceId); + int processId = taskExecutionContext.getProcessId(); if (processId == 0) { - this.cancelApplication(taskInstanceId); workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId); taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.KILL); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); @@ -96,8 +97,6 @@ public class WorkerTaskKillProcessor implements WorkerRpcProcessor { return; } - // if processId > 0, it should call cancelApplication to cancel remote application too. - this.cancelApplication(taskInstanceId); boolean result = doKill(taskExecutionContext); taskExecutionContext.setCurrentExecutionStatus(