From 7c9f5960e5a0c2b446a5575d1703c8f644c10dce Mon Sep 17 00:00:00 2001 From: Rubik-W <39549317+Rubik-W@users.noreply.github.com> Date: Sat, 6 Jun 2020 21:12:41 +0800 Subject: [PATCH] Fixbug datax task (#2909) * fix: local param bug * fix: UT bug Co-authored-by: Rubik-W --- .../common/task/datax/DataxParameters.java | 11 ++--- .../common/utils/ParameterUtils.java | 2 +- .../server/worker/task/datax/DataxTask.java | 49 +++++++------------ .../worker/task/datax/DataxTaskTest.java | 11 +++-- .../dag/_source/formModel/tasks/datax.vue | 11 +++++ 5 files changed, 40 insertions(+), 44 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java index 872b3aa174..f54e107995 100755 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; @@ -31,7 +32,7 @@ public class DataxParameters extends AbstractParameters { /** * if custom json configļ¼Œeg 0, 1 */ - private Integer customConfig; + private int customConfig; /** * if customConfig eq 1 ,then json is usable @@ -88,11 +89,11 @@ public class DataxParameters extends AbstractParameters { */ private int jobSpeedRecord; - public Integer getCustomConfig() { + public int getCustomConfig() { return customConfig; } - public void setCustomConfig(Integer customConfig) { + public void setCustomConfig(int customConfig) { this.customConfig = customConfig; } @@ -184,11 +185,9 @@ public class DataxParameters extends AbstractParameters { this.jobSpeedRecord = jobSpeedRecord; } - @Override public boolean checkParameters() { - if (customConfig == null) return false; - if (customConfig == 0) { + if (customConfig == Flag.NO.ordinal()) { return dataSource != 0 && dataTarget != 0 && StringUtils.isNotEmpty(sql) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java index bbc8955588..84c60db566 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java @@ -48,7 +48,7 @@ public class ParameterUtils { * @return convert parameters place holders */ public static String convertParameterPlaceholders(String parameterString, Map parameterMap) { - if (StringUtils.isEmpty(parameterString)) { + if (StringUtils.isEmpty(parameterString) || parameterMap == null) { return parameterString; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java index 9589a305c1..f636133bb4 100755 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters; @@ -149,9 +150,16 @@ public class DataxTask extends AbstractTask { String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId()); Thread.currentThread().setName(threadLoggerInfoName); + // combining local and global parameters + Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), + taskExecutionContext.getDefinedParams(), + dataXParameters.getLocalParametersMap(), + CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), + taskExecutionContext.getScheduleTime()); + // run datax process - String jsonFilePath = buildDataxJsonFile(); - String shellCommandFilePath = buildShellCommandFile(jsonFilePath); + String jsonFilePath = buildDataxJsonFile(paramsMap); + String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap); CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath); setExitStatusCode(commandExecuteResult.getExitStatusCode()); @@ -184,7 +192,7 @@ public class DataxTask extends AbstractTask { * @return datax json file name * @throws Exception if error throws Exception */ - private String buildDataxJsonFile() + private String buildDataxJsonFile(Map paramsMap) throws Exception { // generate json String fileName = String.format("%s/%s_job.json", @@ -197,26 +205,9 @@ public class DataxTask extends AbstractTask { return fileName; } - - - if (dataXParameters.getCustomConfig() == 1){ - + if (dataXParameters.getCustomConfig() == Flag.YES.ordinal()){ json = dataXParameters.getJson().replaceAll("\\r\\n", "\n"); - - /** - * combining local and global parameters - */ - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - dataXParameters.getLocalParametersMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); - if (paramsMap != null){ - json = ParameterUtils.convertParameterPlaceholders(json, ParamUtils.convert(paramsMap)); - } - }else { - JSONObject job = new JSONObject(); job.put("content", buildDataxJobContentJson()); job.put("setting", buildDataxJobSettingJson()); @@ -227,6 +218,9 @@ public class DataxTask extends AbstractTask { json = root.toString(); } + // replace placeholder + json = ParameterUtils.convertParameterPlaceholders(json, ParamUtils.convert(paramsMap)); + logger.debug("datax job json : {}", json); // create datax json file @@ -359,7 +353,7 @@ public class DataxTask extends AbstractTask { * @return shell command file name * @throws Exception if error throws Exception */ - private String buildShellCommandFile(String jobConfigFilePath) + private String buildShellCommandFile(String jobConfigFilePath, Map paramsMap) throws Exception { // generate scripts String fileName = String.format("%s/%s_node.%s", @@ -380,18 +374,9 @@ public class DataxTask extends AbstractTask { sbr.append(DATAX_HOME_EVN); sbr.append(" "); sbr.append(jobConfigFilePath); - String dataxCommand = sbr.toString(); - // combining local and global parameters // replace placeholder - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - dataXParameters.getLocalParametersMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); - if (paramsMap != null) { - dataxCommand = ParameterUtils.convertParameterPlaceholders(dataxCommand, ParamUtils.convert(paramsMap)); - } + String dataxCommand = ParameterUtils.convertParameterPlaceholders(sbr.toString(), ParamUtils.convert(paramsMap)); logger.debug("raw script : {}", dataxCommand); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java index a2a46ef5a5..041f81d62b 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java @@ -21,9 +21,9 @@ import java.lang.reflect.Method; import java.util.Arrays; import java.util.Date; import java.util.List; +import java.util.Map; import com.alibaba.fastjson.JSONObject; -import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; @@ -273,14 +273,15 @@ public class DataxTaskTest { setTaskParems(0); buildDataJson(); } catch (Exception e) { + e.printStackTrace(); Assert.fail(e.getMessage()); } } public void buildDataJson() throws Exception { - Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile"); + Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile", new Class[]{Map.class}); method.setAccessible(true); - String filePath = (String) method.invoke(dataxTask, null); + String filePath = (String) method.invoke(dataxTask, new Object[]{null}); Assert.assertNotNull(filePath); } @@ -358,9 +359,9 @@ public class DataxTaskTest { public void testBuildShellCommandFile() throws Exception { try { - Method method = DataxTask.class.getDeclaredMethod("buildShellCommandFile", String.class); + Method method = DataxTask.class.getDeclaredMethod("buildShellCommandFile", String.class, Map.class); method.setAccessible(true); - Assert.assertNotNull(method.invoke(dataxTask, "test.json")); + Assert.assertNotNull(method.invoke(dataxTask, "test.json", null)); } catch (Exception e) { Assert.fail(e.getMessage()); diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue index f1c9b757bd..9347b34dd0 100755 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue @@ -236,6 +236,12 @@ _onPostStatements (a) { this.postStatements = a }, + /** + * return localParams + */ + _onLocalParams (a) { + this.localParams = a + }, /** * verification */ @@ -246,6 +252,11 @@ return false } + // localParams Subcomponent verification + if (!this.$refs.refLocalParams._verifProp()) { + return false + } + // storage this.$emit('on-params', { customConfig: this.customConfig,