Browse Source

[Fix-15706] Seatunnel improvement (#15852)

* fix_seatunnel_15706

* CodeFormat

* Change to use JSONUtils

* Constants moved to constant code

* Delete empty lines

* Delete empty lines
3.2.2-prepare
XinXing 9 months ago committed by GitHub
parent
commit
3fe9fd45b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 6
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
  2. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java
  3. 9
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
  4. 93
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/test/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java

6
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java

@ -196,7 +196,10 @@ public final class JSONUtils {
* @return true if valid
*/
public static boolean checkJsonValid(String json) {
return checkJsonValid(json, true);
}
public static boolean checkJsonValid(String json, Boolean logFlag) {
if (Strings.isNullOrEmpty(json)) {
return false;
}
@ -205,7 +208,8 @@ public final class JSONUtils {
objectMapper.readTree(json);
return true;
} catch (IOException e) {
log.error("check json object valid exception!", e);
if (logFlag)
log.error("check json object valid exception!", e);
}
return false;

2
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java

@ -29,5 +29,7 @@ public class Constants {
public static final String STARTUP_SCRIPT_SPARK = "spark";
public static final String STARTUP_SCRIPT_FLINK = "flink";
public static final String STARTUP_SCRIPT_SEATUNNEL = "seatunnel";
public static final String JSON_SUFFIX = "json";
public static final String CONF_SUFFIX = "conf";
}

9
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java

@ -184,8 +184,13 @@ public class SeatunnelTask extends AbstractRemoteTask {
}
private String buildConfigFilePath() {
return String.format("%s/seatunnel_%s.conf", taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId());
return String.format("%s/seatunnel_%s.%s", taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId(), formatDetector());
}
private String formatDetector() {
return JSONUtils.checkJsonValid(seatunnelParameters.getRawScript(), false) ? Constants.JSON_SUFFIX
: Constants.CONF_SUFFIX;
}
private void createConfigFileIfNotExists(String script, String scriptFile) throws IOException {

93
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/test/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.seatunnel;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
public class SeatunnelTaskTest {
private static final String EXECUTE_PATH = "/home";
private static final String TASK_APPID = "9527";
@Test
public void formatDetector() throws Exception{
SeatunnelParameters seatunnelParameters = new SeatunnelParameters();
seatunnelParameters.setRawScript(RAW_SCRIPT);
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setExecutePath(EXECUTE_PATH);
taskExecutionContext.setTaskAppId(TASK_APPID);
taskExecutionContext.setTaskParams(JSONUtils.toJsonString(seatunnelParameters));
SeatunnelTask seatunnelTask = new SeatunnelTask(taskExecutionContext);
seatunnelTask.setSeatunnelParameters(seatunnelParameters);
Assertions.assertEquals("/home/seatunnel_9527.conf", seatunnelTask.buildCustomConfigCommand());
seatunnelParameters.setRawScript(RAW_SCRIPT_2);
seatunnelTask.setSeatunnelParameters(seatunnelParameters);
Assertions.assertEquals("/home/seatunnel_9527.json", seatunnelTask.buildCustomConfigCommand());
}
private static final String RAW_SCRIPT = "env {\n" +
" execution.parallelism = 2\n" +
" job.mode = \"BATCH\"\n" +
" checkpoint.interval = 10000\n" +
"}\n" +
"\n" +
"source {\n" +
" FakeSource {\n" +
" parallelism = 2\n" +
" result_table_name = \"fake\"\n" +
" row.num = 16\n" +
" schema = {\n" +
" fields {\n" +
" name = \"string\"\n" +
" age = \"int\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}\n" +
"\n" +
"sink {\n" +
" Console {\n" +
" }\n" +
"}";
private static final String RAW_SCRIPT_2 = "{\n" +
" \"env\": {\n" +
" \"execution.parallelism\": 2,\n" +
" \"job.mode\": \"BATCH\",\n" +
" \"checkpoint.interval\": 10000\n" +
" },\n" +
" \"source\": {\n" +
" \"FakeSource\": {\n" +
" \"parallelism\": 2,\n" +
" \"result_table_name\": \"fake\",\n" +
" \"row.num\": 16,\n" +
" \"schema\": {\n" +
" \"fields\": {\n" +
" \"name\": \"string\",\n" +
" \"age\": \"int\"\n" +
" }\n" +
" }\n" +
" }\n" +
" },\n" +
" \"sink\": {\n" +
" \"Console\": {}\n" +
" }\n" +
"}";
}
Loading…
Cancel
Save