Browse Source

support custom datax configuration (#2224)

* fix #1441

* support custom datax config

* support datax custom config

* support datax custom config

* support datax custom config

Co-authored-by: 张世鸣 <zhangshiming@cvte.com>
pull/3/MERGE
Simon 5 years ago committed by gaojun2048
parent
commit
5c30205574
  1. 1
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
  2. 46
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
  3. 39
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
  4. 49
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java

1
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java

@ -211,6 +211,7 @@ public class CheckUtilsTest {
// DataxParameters // DataxParameters
DataxParameters dataxParameters = new DataxParameters(); DataxParameters dataxParameters = new DataxParameters();
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(dataxParameters), TaskType.DATAX.toString())); assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(dataxParameters), TaskType.DATAX.toString()));
dataxParameters.setCustomConfig(0);
dataxParameters.setDataSource(111); dataxParameters.setDataSource(111);
dataxParameters.setDataTarget(333); dataxParameters.setDataTarget(333);
dataxParameters.setSql("sql"); dataxParameters.setSql("sql");

46
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java

@ -27,6 +27,16 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
*/ */
public class DataxParameters extends AbstractParameters { public class DataxParameters extends AbstractParameters {
/**
* if custom json configeg 0, 1
*/
private Integer customConfig;
/**
* if customConfig eq 1 ,then json is usable
*/
private String json;
/** /**
* data source typeeg MYSQL, POSTGRES ... * data source typeeg MYSQL, POSTGRES ...
*/ */
@ -77,6 +87,22 @@ public class DataxParameters extends AbstractParameters {
*/ */
private int jobSpeedRecord; private int jobSpeedRecord;
public Integer getCustomConfig() {
return customConfig;
}
public void setCustomConfig(Integer customConfig) {
this.customConfig = customConfig;
}
public String getJson() {
return json;
}
public void setJson(String json) {
this.json = json;
}
public String getDsType() { public String getDsType() {
return dsType; return dsType;
} }
@ -157,16 +183,18 @@ public class DataxParameters extends AbstractParameters {
this.jobSpeedRecord = jobSpeedRecord; this.jobSpeedRecord = jobSpeedRecord;
} }
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
if (!(dataSource != 0 if (customConfig == null) return false;
&& dataTarget != 0 if (customConfig == 0) {
&& StringUtils.isNotEmpty(sql) return dataSource != 0
&& StringUtils.isNotEmpty(targetTable))) { && dataTarget != 0
return false; && StringUtils.isNotEmpty(sql)
&& StringUtils.isNotEmpty(targetTable);
} else {
return StringUtils.isNotEmpty(json);
} }
return true;
} }
@Override @Override
@ -177,7 +205,9 @@ public class DataxParameters extends AbstractParameters {
@Override @Override
public String toString() { public String toString() {
return "DataxParameters{" + return "DataxParameters{" +
"dsType='" + dsType + '\'' + "customConfig=" + customConfig +
", json='" + json + '\'' +
", dsType='" + dsType + '\'' +
", dataSource=" + dataSource + ", dataSource=" + dataSource +
", dtType='" + dtType + '\'' + ", dtType='" + dtType + '\'' +
", dataTarget=" + dataTarget + ", dataTarget=" + dataTarget +

39
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java

@ -191,24 +191,47 @@ public class DataxTask extends AbstractTask {
throws Exception { throws Exception {
// generate json // generate json
String fileName = String.format("%s/%s_job.json", taskDir, taskProps.getTaskAppId()); String fileName = String.format("%s/%s_job.json", taskDir, taskProps.getTaskAppId());
String json;
Path path = new File(fileName).toPath(); Path path = new File(fileName).toPath();
if (Files.exists(path)) { if (Files.exists(path)) {
return fileName; return fileName;
} }
JSONObject job = new JSONObject();
job.put("content", buildDataxJobContentJson());
job.put("setting", buildDataxJobSettingJson());
JSONObject root = new JSONObject();
root.put("job", job);
root.put("core", buildDataxCoreJson());
logger.debug("datax job json : {}", root.toString()); if (dataXParameters.getCustomConfig() == 1){
json = dataXParameters.getJson().replaceAll("\\r\\n", "\n");
/**
* combining local and global parameters
*/
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
dataXParameters.getLocalParametersMap(),
taskProps.getCmdTypeIfComplement(),
taskProps.getScheduleTime());
if (paramsMap != null){
json = ParameterUtils.convertParameterPlaceholders(json, ParamUtils.convert(paramsMap));
}
}else {
JSONObject job = new JSONObject();
job.put("content", buildDataxJobContentJson());
job.put("setting", buildDataxJobSettingJson());
JSONObject root = new JSONObject();
root.put("job", job);
root.put("core", buildDataxCoreJson());
json = root.toString();
}
logger.debug("datax job json : {}", json);
// create datax json file // create datax json file
FileUtils.writeStringToFile(new File(fileName), root.toString(), StandardCharsets.UTF_8); FileUtils.writeStringToFile(new File(fileName), json, StandardCharsets.UTF_8);
return fileName; return fileName;
} }

49
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java

@ -44,6 +44,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import static org.apache.dolphinscheduler.common.enums.CommandType.START_PROCESS;
/** /**
* DataxTask Tester. * DataxTask Tester.
*/ */
@ -59,6 +61,8 @@ public class DataxTaskTest {
private ApplicationContext applicationContext; private ApplicationContext applicationContext;
private TaskProps props = new TaskProps();
@Before @Before
public void before() public void before()
throws Exception { throws Exception {
@ -70,7 +74,6 @@ public class DataxTaskTest {
springApplicationContext.setApplicationContext(applicationContext); springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
TaskProps props = new TaskProps();
props.setTaskDir("/tmp"); props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis())); props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1); props.setTaskInstId(1);
@ -78,10 +81,8 @@ public class DataxTaskTest {
props.setEnvFile(".dolphinscheduler_env.sh"); props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date()); props.setTaskStartTime(new Date());
props.setTaskTimeout(0); props.setTaskTimeout(0);
props.setTaskParams( props.setCmdTypeIfComplement(START_PROCESS);
"{\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"datasource\":1,\"dsType\":\"MYSQL\",\"datatarget\":2,\"jobSpeedByte\":0,\"sql\":\"select 1 as test from dual\",\"preStatements\":[\"delete from test\"],\"postStatements\":[\"delete from test\"]}"); setTaskParems(0);
dataxTask = PowerMockito.spy(new DataxTask(props, logger));
dataxTask.init();
Mockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource()); Mockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource());
Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource()); Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource());
@ -91,6 +92,22 @@ public class DataxTaskTest {
Mockito.when(shellCommandExecutor.run(fileName, processService)).thenReturn(0); Mockito.when(shellCommandExecutor.run(fileName, processService)).thenReturn(0);
} }
private void setTaskParems(Integer customConfig) {
if (customConfig == 1) {
props.setTaskParams(
"{\"customConfig\":1, \"localParams\":[{\"prop\":\"test\",\"value\":\"38294729\"}],\"json\":\"{\\\"job\\\":{\\\"setting\\\":{\\\"speed\\\":{\\\"byte\\\":1048576},\\\"errorLimit\\\":{\\\"record\\\":0,\\\"percentage\\\":0.02}},\\\"content\\\":[{\\\"reader\\\":{\\\"name\\\":\\\"rdbmsreader\\\",\\\"parameter\\\":{\\\"username\\\":\\\"xxx\\\",\\\"password\\\":\\\"${test}\\\",\\\"column\\\":[\\\"id\\\",\\\"name\\\"],\\\"splitPk\\\":\\\"pk\\\",\\\"connection\\\":[{\\\"querySql\\\":[\\\"SELECT * from dual\\\"],\\\"jdbcUrl\\\":[\\\"jdbc:dm://ip:port/database\\\"]}],\\\"fetchSize\\\":1024,\\\"where\\\":\\\"1 = 1\\\"}},\\\"writer\\\":{\\\"name\\\":\\\"streamwriter\\\",\\\"parameter\\\":{\\\"print\\\":true}}}]}}\"}");
// "{\"customConfig\":1,\"json\":\"{\\\"job\\\":{\\\"setting\\\":{\\\"speed\\\":{\\\"byte\\\":1048576},\\\"errorLimit\\\":{\\\"record\\\":0,\\\"percentage\\\":0.02}},\\\"content\\\":[{\\\"reader\\\":{\\\"name\\\":\\\"rdbmsreader\\\",\\\"parameter\\\":{\\\"username\\\":\\\"xxx\\\",\\\"password\\\":\\\"xxx\\\",\\\"column\\\":[\\\"id\\\",\\\"name\\\"],\\\"splitPk\\\":\\\"pk\\\",\\\"connection\\\":[{\\\"querySql\\\":[\\\"SELECT * from dual\\\"],\\\"jdbcUrl\\\":[\\\"jdbc:dm://ip:port/database\\\"]}],\\\"fetchSize\\\":1024,\\\"where\\\":\\\"1 = 1\\\"}},\\\"writer\\\":{\\\"name\\\":\\\"streamwriter\\\",\\\"parameter\\\":{\\\"print\\\":true}}}]}}\"}");
} else {
props.setTaskParams(
"{\"customConfig\":0,\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"datasource\":1,\"dsType\":\"MYSQL\",\"datatarget\":2,\"jobSpeedByte\":0,\"sql\":\"select 1 as test from dual\",\"preStatements\":[\"delete from test\"],\"postStatements\":[\"delete from test\"]}");
}
dataxTask = PowerMockito.spy(new DataxTask(props, logger));
dataxTask.init();
}
private DataSource getDataSource() { private DataSource getDataSource() {
DataSource dataSource = new DataSource(); DataSource dataSource = new DataSource();
dataSource.setType(DbType.MYSQL); dataSource.setType(DbType.MYSQL);
@ -102,7 +119,7 @@ public class DataxTaskTest {
private ProcessInstance getProcessInstance() { private ProcessInstance getProcessInstance() {
ProcessInstance processInstance = new ProcessInstance(); ProcessInstance processInstance = new ProcessInstance();
processInstance.setCommandType(CommandType.START_PROCESS); processInstance.setCommandType(START_PROCESS);
processInstance.setScheduleTime(new Date()); processInstance.setScheduleTime(new Date());
return processInstance; return processInstance;
} }
@ -229,18 +246,24 @@ public class DataxTaskTest {
*/ */
@Test @Test
public void testBuildDataxJsonFile() public void testBuildDataxJsonFile()
throws Exception { throws Exception {
try { try {
Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile"); setTaskParems(1);
method.setAccessible(true); buildDataJson();
String filePath = (String) method.invoke(dataxTask, null); setTaskParems(0);
Assert.assertNotNull(filePath); buildDataJson();
} } catch (Exception e) {
catch (Exception e) {
Assert.fail(e.getMessage()); Assert.fail(e.getMessage());
} }
} }
public void buildDataJson() throws Exception {
Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile");
method.setAccessible(true);
String filePath = (String) method.invoke(dataxTask, null);
Assert.assertNotNull(filePath);
}
/** /**
* Method: buildDataxJobContentJson() * Method: buildDataxJobContentJson()
*/ */

Loading…
Cancel
Save