From 880e995654e148511ff94dcf439ef0d591fa5796 Mon Sep 17 00:00:00 2001 From: ruson <648380139@qq.com> Date: Wed, 18 Mar 2020 23:26:07 +0800 Subject: [PATCH] add repace time when run history job and batch complement job ,not current time (#2196) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add release notes in 1.2.1 (#1654) * fix Monitor bug (#1656) * fix Monitor bug * fix zk monitor bug * fix api url (#1673) * get root path from zookeeper config * Fixed DAG zoom in and zoom out nodes separated from arrows #1679 (#1685) * fix api url * Fixed DAG zoom in and zoom out nodes separated from arrows #1679 * check license and update (#1725) * Fixed tasks_queue and tasks_kill did not exist in zookeeper #1696 (#1734) Co-authored-by: elonlo * upgrade jackson from 2.9.8 to 2.10.1 (#1767) * add out put log when master/worker server start (#1769) * merge hadoop.properties into common.properties * merge hadoop,zookeeper.properties into common.properties remove combined.properties/master.properties/worker.properties * change db user/pwd to test/test * rename .dolphinscheduler_en.sh to dolphinscheduler_env.sh remove some unused in install.sh * add out put log when master/worker server start... * add start log when servers start * add check download resource permission in order to fix issues 1770 (#1788) * merge dev branch front-end code (#1786) * fix #1775 bug,delete process definition when process instance is running (#1790) * fix #1775 bug,delete process definition when process instance is running * revert CONTRIBUTING.md * fix udfs assignment and task instance page jump #1789 (#1791) * merge dev branch front-end code * fix udfs assignment * Fix task instance page jump * fix udfs assignment and task instance page jump #1789 * update method checkDownloadPermission in order to fix issues 1770 (#1794) * add check download resource permission in order to fix issues 1770 * update method checkDownloadPermission in order to fix issues 1770 * update method listUnauthorizedResource in order to fix issues 1770 * update method listUnauthorizedResource in order to fix issues 1770 (#1797) * if login user is admin,it will has permission to query all udf function (#1799) * Fixed space and icon display issues before and after the input box (#1798) * merge dev branch front-end code * fix udfs assignment * Fix task instance page jump * fix udfs assignment and task instance page jump #1789 * Fixed space and icon display issues before and after the input box * add license * add license * donot submit task to queue if sub process (#1793) * dockerfile modify (#1800) * fix #1775 bug,delete process definition when process instance is running * revert CONTRIBUTING.md * dockerfile modify * dockerfile modify * dockerfile modify * Fix data echo, style and popup cannot be closed (#1801) * merge dev branch front-end code * fix udfs assignment * Fix task instance page jump * fix udfs assignment and task instance page jump #1789 * Fixed space and icon display issues before and after the input box * add license * add license * Fix data echo, style and popup cannot be closed * Fix page number loading issue and dag not getting value(#1810) (#1815) * merge dev branch front-end code * fix udfs assignment * Fix task instance page jump * fix udfs assignment and task instance page jump #1789 * Fixed space and icon display issues before and after the input box * add license * add license * Fix data echo, style and popup cannot be closed * Fix page number loading issue and dag not getting value * [Fix issue #1770]check udf and data source in order to fix issue 1770 (#1817) * if login user is admin,it will has permission to query all udf function * check udf and data source in order to fix issue 1770 * check udf and data source in order to fix issue 1770 * check udf and data source in order to fix issue 1770 * check udf and data source in order to fix issue 1770 * check udf and data source in order to fix issue 1770 * revert common.properties * update the test method name * remove useless code and import in unit test * refactor has permission and check permission * #1813 remove "_001" after the master/server register path in zookeepe (#1824) * donot submit task to queue if sub process * [feature] #1813 remove "_001" after the master/server register path in zookeeper (#1820) * change master/worker register path. * remove "_" from register path. * remove install.sh server.servlet.context-path replace (#1823) * fix #1775 bug,delete process definition when process instance is running * revert CONTRIBUTING.md * dockerfile modify * dockerfile modify * dockerfile modify * remove install.sh server.servlet.context-path replace * Fix DAG add dependent project value exception # 1816 and keep requesting task interface (#1827) * merge dev branch front-end code * fix udfs assignment * Fix task instance page jump * fix udfs assignment and task instance page jump #1789 * Fixed space and icon display issues before and after the input box * add license * add license * Fix data echo, style and popup cannot be closed * Fix page number loading issue and dag not getting value * Fix DAG add dependent project value exception # 1816 and keep requesting task interface * rpm package add UI (#1846) * [fix #1828] when the executor of process instance is not the owner of udf resouce, the path of the read resource file is incorrect (#1847) * fix issue 1828:get the udf resource path error when create udf function * update grantResources * first verify whether udf resource is bound by udf function * update grantResources * update testListAuthorizedUdfFunc * update getUserInfo in order to run success * add LoggerServer RPC PORT modified. #1848 (#1849) * fix #1775 bug,delete process definition when process instance is running * revert CONTRIBUTING.md * dockerfile modify * dockerfile modify * dockerfile modify * remove install.sh server.servlet.context-path replace * add LoggerServer RPC PORT modified. #1848 * LoggerService Logger RPC PORT get Error modify . #1848 (#1857) * fix #1775 bug,delete process definition when process instance is running * revert CONTRIBUTING.md * dockerfile modify * dockerfile modify * dockerfile modify * remove install.sh server.servlet.context-path replace * add LoggerServer RPC PORT modified. #1848 * LoggerService Logger RPC PORT get Error modify . #1848 * Fix UDF function list delete data without refresh and page data display widened #1851 (#1852) * merge dev branch front-end code * fix udfs assignment * Fix task instance page jump * fix udfs assignment and task instance page jump #1789 * Fixed space and icon display issues before and after the input box * add license * add license * Fix data echo, style and popup cannot be closed * Fix page number loading issue and dag not getting value * Fix DAG add dependent project value exception # 1816 and keep requesting task interface * Fix UDF function list delete data without refresh and page data display widened * [Fix #1828]check whether has permission to download udf file or delete udf function (#1858) * fix issue 1828:get the udf resource path error when create udf function * update grantResources * first verify whether udf resource is bound by udf function * update grantResources * update testListAuthorizedUdfFunc * update getUserInfo in order to run success * check whether has permission to download udf file or delete udf file * update listAuthorizedResourceById in ResourceMapper.xml * add log, don't swallow exception info (#1877) * Added hints in Resource Center (#1891) * update jackson version from 2.9.8 to 2.10.1 (#1892) * update release notes and modify the plugin of rat (#1899) * modify how to build (#1902) * update release notes and modify the plugin of rat * modify how to build * [maven-release-plugin] prepare release 1.2.1 * [maven-release-plugin] prepare for next development iteration * get authorized udf resources need exclude all created by self (#1905) * get authorized udf resources need exclude all created by self * reset pom version to 1.2.1-SNAPSHOT * Update ReleaseNotes.md add detail info * Update ReleaseNotes.md * update notice year form 2019 to 2019-2020 (#1907) * [maven-release-plugin] prepare release 1.2.1 * [maven-release-plugin] prepare for next development iteration * 当手动重跑历史活着补数的时候对shell和sql脚本中的[YYYYmmddd...]变量赋值与传递的日期,而不是当前日期 Co-authored-by: lgcareer <18610854716@163.com> Co-authored-by: bao liang <29528966+lenboo@users.noreply.github.com> Co-authored-by: break60 <790061044@qq.com> Co-authored-by: Tboy Co-authored-by: elonlo Co-authored-by: qiaozhanwei Co-authored-by: zhangchunyang <18910529250@163.com> Co-authored-by: dailidong Co-authored-by: 谷雨 --- .../dolphinscheduler/common/Constants.java | 7 ++ .../common/utils/ParameterUtils.java | 79 +++++++++++++++++++ .../server/worker/task/shell/ShellTask.java | 17 +++- .../server/worker/task/sql/SqlTask.java | 12 +-- 4 files changed, 107 insertions(+), 8 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index b0a7b74d39..73655e7a9d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -999,4 +999,11 @@ public final class Constants { * dataSource sensitive param */ public static final String DATASOURCE_PASSWORD_REGEX = "(?<=(\"password\":\")).*?(?=(\"))"; + + + /** + * new + * schedule time + */ + public static final String PARAMETER_SHECDULE_TIME = "schedule.time"; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java index 7a4553aaf5..270e0c4696 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java @@ -78,6 +78,45 @@ public class ParameterUtils { return parameterString; } + /** + * new + * convert parameters place holders + * + * @param parameterString parameter + * @param parameterMap parameter map + * @return convert parameters place holders + */ + public static String convertParameterPlaceholders2(String parameterString, Map parameterMap) { + if (StringUtils.isEmpty(parameterString)) { + return parameterString; + } + //Get current time, schedule execute time + String cronTimeStr = parameterMap.get(Constants.PARAMETER_SHECDULE_TIME); + Date cronTime = null; + + if (StringUtils.isNotEmpty(cronTimeStr)) { + try { + cronTime = DateUtils.parseDate(cronTimeStr, new String[]{Constants.PARAMETER_FORMAT_TIME}); + + } catch (ParseException e) { + logger.error(String.format("parse %s exception", cronTimeStr), e); + } + } else { + cronTime = new Date(); + } + + // replace variable ${} form,refers to the replacement of system variables and custom variables + parameterString = PlaceholderUtils.replacePlaceholders(parameterString, parameterMap, true); + + // replace time $[...] form, eg. $[yyyyMMdd] + if (cronTime != null) { + parameterString = TimePlaceholderUtils.replacePlaceholders(parameterString, cronTime, true); + + } + return parameterString; + } + + /** * set in parameter * @param index index @@ -173,4 +212,44 @@ public class ParameterUtils { } return inputString; } + + /** + * new + * $[yyyyMMdd] replace scheduler time + * @param text + * @param paramsMap + * @return + */ + public static String replaceScheduleTime(String text, Date scheduleTime, Map paramsMap) { + if (paramsMap != null) { + //if getScheduleTime null ,is current date + if (null == scheduleTime) { + scheduleTime = new Date(); + } + String dateTime = org.apache.dolphinscheduler.common.utils.DateUtils.format(scheduleTime, Constants.PARAMETER_FORMAT_TIME); + Property p = new Property(); + p.setValue(dateTime); + p.setProp(Constants.PARAMETER_SHECDULE_TIME); + paramsMap.put(Constants.PARAMETER_SHECDULE_TIME, p); + text = ParameterUtils.convertParameterPlaceholders2(text, convert(paramsMap)); + } + return text; + } + + + /** + * format convert + * @param paramsMap params map + * @return Map of converted + * see org.apache.dolphinscheduler.server.utils.ParamUtils.convert + */ + public static Map convert(Map paramsMap){ + Map map = new HashMap<>(); + Iterator> iter = paramsMap.entrySet().iterator(); + while (iter.hasNext()){ + Map.Entry en = iter.next(); + map.put(en.getKey(),en.getValue().getValue()); + } + return map; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index 90661a690a..2c8433aa01 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; +import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; @@ -142,11 +143,21 @@ public class ShellTask extends AbstractTask { shellParameters.getLocalParametersMap(), taskProps.getCmdTypeIfComplement(), taskProps.getScheduleTime()); - if (paramsMap != null){ - script = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); +// if (paramsMap != null){ +// script = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); +// } + + //new +// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job + if (paramsMap != null) { + String dateTime = DateUtils.format(taskProps.getScheduleTime(), Constants.PARAMETER_FORMAT_TIME); + Property p = new Property(); + p.setValue(dateTime); + p.setProp(Constants.PARAMETER_SHECDULE_TIME); + paramsMap.put(Constants.PARAMETER_SHECDULE_TIME, p); + script = ParameterUtils.convertParameterPlaceholders2(script, ParamUtils.convert(paramsMap)); } - shellParameters.setRawScript(script); logger.info("raw script : {}", shellParameters.getRawScript()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index 82034740fc..71ab56b528 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -222,7 +222,9 @@ public class SqlTask extends AbstractTask { logger.info("SQL title : {}",title); sqlParameters.setTitle(title); } - + //new + //replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job + sql = ParameterUtils.replaceScheduleTime(sql, taskProps.getScheduleTime(), paramsMap); // special characters need to be escaped, ${} needs to be escaped String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*"; setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap); @@ -341,10 +343,10 @@ public class SqlTask extends AbstractTask { logger.error(e.getMessage(),e); throw new RuntimeException(e.getMessage()); } finally { - try { - connection.close(); - } catch (Exception e) { - logger.error(e.getMessage(), e); + try { + connection.close(); + } catch (Exception e) { + logger.error(e.getMessage(), e); } } return connection;