From 00ff88ef1c4a8af8d2dc1324543338629dd3ca6b Mon Sep 17 00:00:00 2001 From: Simon <3656562@qq.com> Date: Thu, 19 Mar 2020 18:56:58 +0800 Subject: [PATCH] support custom datax configuration (#2224) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix #1441 * support custom datax config * support datax custom config * support datax custom config * support datax custom config Co-authored-by: 张世鸣 --- .../api/utils/CheckUtilsTest.java | 1 + .../common/task/datax/DataxParameters.java | 46 ++++++++++++++--- .../server/worker/task/datax/DataxTask.java | 39 ++++++++++++--- .../worker/task/datax/DataxTaskTest.java | 49 ++++++++++++++----- 4 files changed, 106 insertions(+), 29 deletions(-) diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java index 24a0ed31d6..308ed8e9b6 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java @@ -211,6 +211,7 @@ public class CheckUtilsTest { // DataxParameters DataxParameters dataxParameters = new DataxParameters(); assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(dataxParameters), TaskType.DATAX.toString())); + dataxParameters.setCustomConfig(0); dataxParameters.setDataSource(111); dataxParameters.setDataTarget(333); dataxParameters.setSql("sql"); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java index 95dd505c02..f153360d63 100755 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java @@ -27,6 +27,16 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters; */ public class DataxParameters extends AbstractParameters { + /** + * if custom json config,eg 0, 1 + */ + private Integer customConfig; + + /** + * if customConfig eq 1 ,then json is usable + */ + private String json; + /** * data source type,eg MYSQL, POSTGRES ... */ @@ -77,6 +87,22 @@ public class DataxParameters extends AbstractParameters { */ private int jobSpeedRecord; + public Integer getCustomConfig() { + return customConfig; + } + + public void setCustomConfig(Integer customConfig) { + this.customConfig = customConfig; + } + + public String getJson() { + return json; + } + + public void setJson(String json) { + this.json = json; + } + public String getDsType() { return dsType; } @@ -157,16 +183,18 @@ public class DataxParameters extends AbstractParameters { this.jobSpeedRecord = jobSpeedRecord; } + @Override public boolean checkParameters() { - if (!(dataSource != 0 - && dataTarget != 0 - && StringUtils.isNotEmpty(sql) - && StringUtils.isNotEmpty(targetTable))) { - return false; + if (customConfig == null) return false; + if (customConfig == 0) { + return dataSource != 0 + && dataTarget != 0 + && StringUtils.isNotEmpty(sql) + && StringUtils.isNotEmpty(targetTable); + } else { + return StringUtils.isNotEmpty(json); } - - return true; } @Override @@ -177,7 +205,9 @@ public class DataxParameters extends AbstractParameters { @Override public String toString() { return "DataxParameters{" + - "dsType='" + dsType + '\'' + + "customConfig=" + customConfig + + ", json='" + json + '\'' + + ", dsType='" + dsType + '\'' + ", dataSource=" + dataSource + ", dtType='" + dtType + '\'' + ", dataTarget=" + dataTarget + diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java index 952030ea62..8083bb6869 100755 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -192,24 +192,47 @@ public class DataxTask extends AbstractTask { throws Exception { // generate json String fileName = String.format("%s/%s_job.json", taskDir, taskProps.getTaskAppId()); + String json; Path path = new File(fileName).toPath(); if (Files.exists(path)) { return fileName; } - JSONObject job = new JSONObject(); - job.put("content", buildDataxJobContentJson()); - job.put("setting", buildDataxJobSettingJson()); - JSONObject root = new JSONObject(); - root.put("job", job); - root.put("core", buildDataxCoreJson()); - logger.debug("datax job json : {}", root.toString()); + if (dataXParameters.getCustomConfig() == 1){ + + json = dataXParameters.getJson().replaceAll("\\r\\n", "\n"); + + /** + * combining local and global parameters + */ + Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), + taskProps.getDefinedParams(), + dataXParameters.getLocalParametersMap(), + taskProps.getCmdTypeIfComplement(), + taskProps.getScheduleTime()); + if (paramsMap != null){ + json = ParameterUtils.convertParameterPlaceholders(json, ParamUtils.convert(paramsMap)); + } + + }else { + + JSONObject job = new JSONObject(); + job.put("content", buildDataxJobContentJson()); + job.put("setting", buildDataxJobSettingJson()); + + JSONObject root = new JSONObject(); + root.put("job", job); + root.put("core", buildDataxCoreJson()); + json = root.toString(); + } + + logger.debug("datax job json : {}", json); // create datax json file - FileUtils.writeStringToFile(new File(fileName), root.toString(), StandardCharsets.UTF_8); + FileUtils.writeStringToFile(new File(fileName), json, StandardCharsets.UTF_8); return fileName; } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java index bd7f27530a..c2dbd268e6 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java @@ -44,6 +44,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; +import static org.apache.dolphinscheduler.common.enums.CommandType.START_PROCESS; + /** * DataxTask Tester. */ @@ -59,6 +61,8 @@ public class DataxTaskTest { private ApplicationContext applicationContext; + private TaskProps props = new TaskProps(); + @Before public void before() throws Exception { @@ -70,7 +74,6 @@ public class DataxTaskTest { springApplicationContext.setApplicationContext(applicationContext); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); - TaskProps props = new TaskProps(); props.setTaskDir("/tmp"); props.setTaskAppId(String.valueOf(System.currentTimeMillis())); props.setTaskInstId(1); @@ -78,10 +81,8 @@ public class DataxTaskTest { props.setEnvFile(".dolphinscheduler_env.sh"); props.setTaskStartTime(new Date()); props.setTaskTimeout(0); - props.setTaskParams( - "{\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"datasource\":1,\"dsType\":\"MYSQL\",\"datatarget\":2,\"jobSpeedByte\":0,\"sql\":\"select 1 as test from dual\",\"preStatements\":[\"delete from test\"],\"postStatements\":[\"delete from test\"]}"); - dataxTask = PowerMockito.spy(new DataxTask(props, logger)); - dataxTask.init(); + props.setCmdTypeIfComplement(START_PROCESS); + setTaskParems(0); Mockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource()); Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource()); @@ -91,6 +92,22 @@ public class DataxTaskTest { Mockito.when(shellCommandExecutor.run(fileName, processService)).thenReturn(0); } + private void setTaskParems(Integer customConfig) { + if (customConfig == 1) { + props.setTaskParams( + "{\"customConfig\":1, \"localParams\":[{\"prop\":\"test\",\"value\":\"38294729\"}],\"json\":\"{\\\"job\\\":{\\\"setting\\\":{\\\"speed\\\":{\\\"byte\\\":1048576},\\\"errorLimit\\\":{\\\"record\\\":0,\\\"percentage\\\":0.02}},\\\"content\\\":[{\\\"reader\\\":{\\\"name\\\":\\\"rdbmsreader\\\",\\\"parameter\\\":{\\\"username\\\":\\\"xxx\\\",\\\"password\\\":\\\"${test}\\\",\\\"column\\\":[\\\"id\\\",\\\"name\\\"],\\\"splitPk\\\":\\\"pk\\\",\\\"connection\\\":[{\\\"querySql\\\":[\\\"SELECT * from dual\\\"],\\\"jdbcUrl\\\":[\\\"jdbc:dm://ip:port/database\\\"]}],\\\"fetchSize\\\":1024,\\\"where\\\":\\\"1 = 1\\\"}},\\\"writer\\\":{\\\"name\\\":\\\"streamwriter\\\",\\\"parameter\\\":{\\\"print\\\":true}}}]}}\"}"); + +// "{\"customConfig\":1,\"json\":\"{\\\"job\\\":{\\\"setting\\\":{\\\"speed\\\":{\\\"byte\\\":1048576},\\\"errorLimit\\\":{\\\"record\\\":0,\\\"percentage\\\":0.02}},\\\"content\\\":[{\\\"reader\\\":{\\\"name\\\":\\\"rdbmsreader\\\",\\\"parameter\\\":{\\\"username\\\":\\\"xxx\\\",\\\"password\\\":\\\"xxx\\\",\\\"column\\\":[\\\"id\\\",\\\"name\\\"],\\\"splitPk\\\":\\\"pk\\\",\\\"connection\\\":[{\\\"querySql\\\":[\\\"SELECT * from dual\\\"],\\\"jdbcUrl\\\":[\\\"jdbc:dm://ip:port/database\\\"]}],\\\"fetchSize\\\":1024,\\\"where\\\":\\\"1 = 1\\\"}},\\\"writer\\\":{\\\"name\\\":\\\"streamwriter\\\",\\\"parameter\\\":{\\\"print\\\":true}}}]}}\"}"); + } else { + props.setTaskParams( + "{\"customConfig\":0,\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"datasource\":1,\"dsType\":\"MYSQL\",\"datatarget\":2,\"jobSpeedByte\":0,\"sql\":\"select 1 as test from dual\",\"preStatements\":[\"delete from test\"],\"postStatements\":[\"delete from test\"]}"); + + } + + dataxTask = PowerMockito.spy(new DataxTask(props, logger)); + dataxTask.init(); + } + private DataSource getDataSource() { DataSource dataSource = new DataSource(); dataSource.setType(DbType.MYSQL); @@ -102,7 +119,7 @@ public class DataxTaskTest { private ProcessInstance getProcessInstance() { ProcessInstance processInstance = new ProcessInstance(); - processInstance.setCommandType(CommandType.START_PROCESS); + processInstance.setCommandType(START_PROCESS); processInstance.setScheduleTime(new Date()); return processInstance; } @@ -229,18 +246,24 @@ public class DataxTaskTest { */ @Test public void testBuildDataxJsonFile() - throws Exception { + throws Exception { try { - Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile"); - method.setAccessible(true); - String filePath = (String) method.invoke(dataxTask, null); - Assert.assertNotNull(filePath); - } - catch (Exception e) { + setTaskParems(1); + buildDataJson(); + setTaskParems(0); + buildDataJson(); + } catch (Exception e) { Assert.fail(e.getMessage()); } } + public void buildDataJson() throws Exception { + Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile"); + method.setAccessible(true); + String filePath = (String) method.invoke(dataxTask, null); + Assert.assertNotNull(filePath); + } + /** * Method: buildDataxJobContentJson() */