From 1513aae3ceb6f70633f64cc3a290b8f5ec6ab042 Mon Sep 17 00:00:00 2001 From: itbasketplayer <825621876@qq.com> Date: Sat, 30 May 2020 23:07:17 +0800 Subject: [PATCH] add job history to judge application status/2625 (#2848) * job history status url when application number threshold is reached(default 10000,maybe it was set to 1000) * job history status url when application number threshold is reached(default 10000,maybe it was set to 1000) * job history status url when application number threshold is reached(default 10000,maybe it was set to 1000) Co-authored-by: yuhaibin@lizhi.fm <35716fc5847f6d154cf556296453ca91> Co-authored-by: dailidong --- .../dolphinscheduler/common/Constants.java | 5 ++ .../common/utils/HadoopUtils.java | 59 ++++++++++++------- .../src/main/resources/common.properties | 14 +++-- .../common/utils/HadoopUtilsTest.java | 6 ++ 4 files changed, 58 insertions(+), 26 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index effa4f0f8e..fc09960026 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -103,6 +103,11 @@ public final class Constants { */ public static final String YARN_APPLICATION_STATUS_ADDRESS = "yarn.application.status.address"; + /** + * yarn.job.history.status.address + */ + public static final String YARN_JOB_HISTORY_STATUS_ADDRESS = "yarn.job.history.status.address"; + /** * hdfs configuration * hdfs.root.user diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java index 963aff5f31..1544b449ff 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java @@ -16,16 +16,16 @@ */ package org.apache.dolphinscheduler.common.utils; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONException; +import com.alibaba.fastjson.JSONObject; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import org.apache.commons.io.IOUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ResUploadType; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONException; -import com.alibaba.fastjson.JSONObject; -import org.apache.commons.io.IOUtils; import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; @@ -59,6 +59,7 @@ public class HadoopUtils implements Closeable { public static final String resourceUploadPath = PropertyUtils.getString(RESOURCE_UPLOAD_PATH, "/dolphinscheduler"); public static final String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS); public static final String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS); + public static final String jobHistoryAddress = PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS); private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY"; @@ -114,11 +115,11 @@ public class HadoopUtils implements Closeable { String resourceStorageType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE); ResUploadType resUploadType = ResUploadType.valueOf(resourceStorageType); - if (resUploadType == ResUploadType.HDFS){ - if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,false)){ + if (resUploadType == ResUploadType.HDFS) { + if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) { System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF, PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH)); - configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION,"kerberos"); + configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); hdfsUser = ""; UserGroupInformation.setConfiguration(configuration); UserGroupInformation.loginUserFromKeytab(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME), @@ -195,7 +196,7 @@ public class HadoopUtils implements Closeable { */ String appUrl = ""; //not use resourcemanager - if (rmHaIds.contains(Constants.YARN_RESOURCEMANAGER_HA_XX)){ + if (rmHaIds.contains(Constants.YARN_RESOURCEMANAGER_HA_XX)) { yarnEnabled = false; logger.warn("should not step here"); @@ -212,6 +213,12 @@ public class HadoopUtils implements Closeable { return String.format(appUrl, applicationId); } + public String getJobHistoryUrl(String applicationId) { + //eg:application_1587475402360_712719 -> job_1587475402360_712719 + String jobId = applicationId.replace("application", "job"); + return String.format(jobHistoryAddress, jobId); + } + /** * cat file on hdfs * @@ -389,9 +396,10 @@ public class HadoopUtils implements Closeable { /** * hadoop resourcemanager enabled or not + * * @return result */ - public boolean isYarnEnabled() { + public boolean isYarnEnabled() { return yarnEnabled; } @@ -407,12 +415,22 @@ public class HadoopUtils implements Closeable { return null; } + String result = Constants.FAILED; String applicationUrl = getApplicationUrl(applicationId); + logger.info("applicationUrl={}", applicationUrl); String responseContent = HttpUtils.get(applicationUrl); - - JSONObject jsonObject = JSON.parseObject(responseContent); - String result = jsonObject.getJSONObject("app").getString("finalStatus"); + if (responseContent != null) { + JSONObject jsonObject = JSON.parseObject(responseContent); + result = jsonObject.getJSONObject("app").getString("finalStatus"); + } else { + //may be in job history + String jobHistoryUrl = getJobHistoryUrl(applicationId); + logger.info("jobHistoryUrl={}", jobHistoryUrl); + responseContent = HttpUtils.get(jobHistoryUrl); + JSONObject jsonObject = JSONObject.parseObject(responseContent); + result = jsonObject.getJSONObject("job").getString("state"); + } switch (result) { case Constants.ACCEPTED: @@ -435,6 +453,7 @@ public class HadoopUtils implements Closeable { /** * get data hdfs path + * * @return data hdfs path */ public static String getHdfsDataBasePath() { @@ -452,7 +471,7 @@ public class HadoopUtils implements Closeable { * @param tenantCode tenant code * @return hdfs resource dir */ - public static String getHdfsDir(ResourceType resourceType,String tenantCode) { + public static String getHdfsDir(ResourceType resourceType, String tenantCode) { String hdfsDir = ""; if (resourceType.equals(ResourceType.FILE)) { hdfsDir = getHdfsResDir(tenantCode); @@ -497,16 +516,16 @@ public class HadoopUtils implements Closeable { /** * get hdfs file name * - * @param resourceType resource type - * @param tenantCode tenant code - * @param fileName file name + * @param resourceType resource type + * @param tenantCode tenant code + * @param fileName file name * @return hdfs file name */ public static String getHdfsFileName(ResourceType resourceType, String tenantCode, String fileName) { if (fileName.startsWith("/")) { - fileName = fileName.replaceFirst("/",""); + fileName = fileName.replaceFirst("/", ""); } - return String.format("%s/%s", getHdfsDir(resourceType,tenantCode), fileName); + return String.format("%s/%s", getHdfsDir(resourceType, tenantCode), fileName); } /** @@ -518,7 +537,7 @@ public class HadoopUtils implements Closeable { */ public static String getHdfsResourceFileName(String tenantCode, String fileName) { if (fileName.startsWith("/")) { - fileName = fileName.replaceFirst("/",""); + fileName = fileName.replaceFirst("/", ""); } return String.format("%s/%s", getHdfsResDir(tenantCode), fileName); } @@ -532,7 +551,7 @@ public class HadoopUtils implements Closeable { */ public static String getHdfsUdfFileName(String tenantCode, String fileName) { if (fileName.startsWith("/")) { - fileName = fileName.replaceFirst("/",""); + fileName = fileName.replaceFirst("/", ""); } return String.format("%s/%s", getHdfsUdfDir(tenantCode), fileName); } diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties index 3852c310b1..0cc118feb4 100644 --- a/dolphinscheduler-common/src/main/resources/common.properties +++ b/dolphinscheduler-common/src/main/resources/common.properties @@ -18,7 +18,7 @@ # resource storage type : HDFS,S3,NONE resource.storage.type=NONE -# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。"/dolphinscheduler" is recommended +# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions."/dolphinscheduler" is recommended #resource.upload.path=/dolphinscheduler # user data local directory path, please make sure the directory exists and have read write permissions @@ -42,16 +42,16 @@ resource.storage.type=NONE # if resource.storage.type=HDFS, the user need to have permission to create directories under the HDFS root path hdfs.root.user=hdfs -# if resource.storage.type=S3,the value like: s3a://dolphinscheduler ; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir +# if resource.storage.type=S3,the value like: s3a://dolphinscheduler ; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir fs.defaultFS=hdfs://mycluster:8020 -# if resource.storage.type=S3,s3 endpoint +# if resource.storage.type=S3,s3 endpoint #fs.s3a.endpoint=http://192.168.199.91:9010 -# if resource.storage.type=S3,s3 access key +# if resource.storage.type=S3,s3 access key #fs.s3a.access.key=A3DXS30FO22544RE -# if resource.storage.type=S3,s3 secret key +# if resource.storage.type=S3,s3 secret key #fs.s3a.secret.key=OloCLq3n+8+sdPHUhJ21XrSxTC+JK # if not use hadoop resourcemanager, please keep default value; if resourcemanager HA enable, please type the HA ips ; if resourcemanager is single, make this value empty @@ -59,8 +59,10 @@ yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx # If resourcemanager HA enable or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ark1 to actual resourcemanager hostname. yarn.application.status.address=http://ark1:8088/ws/v1/cluster/apps/%s +# job history status url when application number threshold is reached(default 10000,maybe it was set to 1000) +yarn.job.history.status.address=http://ark1:19888/ws/v1/history/mapreduce/jobs/%s # system env path #dolphinscheduler.env.path=env/dolphinscheduler_env.sh development.state=false -kerberos.expire.time=7 \ No newline at end of file +kerberos.expire.time=7 diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java index e239fe7cb0..440f86395a 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java @@ -190,6 +190,12 @@ public class HadoopUtilsTest { logger.info(application_1516778421218_0042); } + @Test + public void getJobHistoryUrl(){ + String application_1516778421218_0042 = hadoopUtils.getJobHistoryUrl("application_1529051418016_0167"); + logger.info(application_1516778421218_0042); + } + @Test public void catFileWithLimitTest() { List stringList = new ArrayList<>();