diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java index 6750b364f9..bfc3af2c58 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java @@ -196,7 +196,10 @@ public final class JSONUtils { * @return true if valid */ public static boolean checkJsonValid(String json) { + return checkJsonValid(json, true); + } + public static boolean checkJsonValid(String json, Boolean logFlag) { if (Strings.isNullOrEmpty(json)) { return false; } @@ -205,7 +208,8 @@ public final class JSONUtils { objectMapper.readTree(json); return true; } catch (IOException e) { - log.error("check json object valid exception!", e); + if (logFlag) + log.error("check json object valid exception!", e); } return false; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java index fb1c52ee18..5f0f22b0a1 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/Constants.java @@ -29,5 +29,7 @@ public class Constants { public static final String STARTUP_SCRIPT_SPARK = "spark"; public static final String STARTUP_SCRIPT_FLINK = "flink"; public static final String STARTUP_SCRIPT_SEATUNNEL = "seatunnel"; + public static final String JSON_SUFFIX = "json"; + public static final String CONF_SUFFIX = "conf"; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java index 547e0158ef..2aadab8039 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java @@ -184,8 +184,13 @@ public class SeatunnelTask extends AbstractRemoteTask { } private String buildConfigFilePath() { - return String.format("%s/seatunnel_%s.conf", taskExecutionContext.getExecutePath(), - taskExecutionContext.getTaskAppId()); + return String.format("%s/seatunnel_%s.%s", taskExecutionContext.getExecutePath(), + taskExecutionContext.getTaskAppId(), formatDetector()); + } + + private String formatDetector() { + return JSONUtils.checkJsonValid(seatunnelParameters.getRawScript(), false) ? Constants.JSON_SUFFIX + : Constants.CONF_SUFFIX; } private void createConfigFileIfNotExists(String script, String scriptFile) throws IOException { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/test/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/test/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java new file mode 100644 index 0000000000..8f4c2a815a --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/test/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.plugin.task.seatunnel; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +public class SeatunnelTaskTest { + private static final String EXECUTE_PATH = "/home"; + private static final String TASK_APPID = "9527"; + + @Test + public void formatDetector() throws Exception{ + SeatunnelParameters seatunnelParameters = new SeatunnelParameters(); + seatunnelParameters.setRawScript(RAW_SCRIPT); + + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setExecutePath(EXECUTE_PATH); + taskExecutionContext.setTaskAppId(TASK_APPID); + taskExecutionContext.setTaskParams(JSONUtils.toJsonString(seatunnelParameters)); + + SeatunnelTask seatunnelTask = new SeatunnelTask(taskExecutionContext); + seatunnelTask.setSeatunnelParameters(seatunnelParameters); + Assertions.assertEquals("/home/seatunnel_9527.conf", seatunnelTask.buildCustomConfigCommand()); + + seatunnelParameters.setRawScript(RAW_SCRIPT_2); + seatunnelTask.setSeatunnelParameters(seatunnelParameters); + Assertions.assertEquals("/home/seatunnel_9527.json", seatunnelTask.buildCustomConfigCommand()); + } + private static final String RAW_SCRIPT = "env {\n" + + " execution.parallelism = 2\n" + + " job.mode = \"BATCH\"\n" + + " checkpoint.interval = 10000\n" + + "}\n" + + "\n" + + "source {\n" + + " FakeSource {\n" + + " parallelism = 2\n" + + " result_table_name = \"fake\"\n" + + " row.num = 16\n" + + " schema = {\n" + + " fields {\n" + + " name = \"string\"\n" + + " age = \"int\"\n" + + " }\n" + + " }\n" + + " }\n" + + "}\n" + + "\n" + + "sink {\n" + + " Console {\n" + + " }\n" + + "}"; + private static final String RAW_SCRIPT_2 = "{\n" + + " \"env\": {\n" + + " \"execution.parallelism\": 2,\n" + + " \"job.mode\": \"BATCH\",\n" + + " \"checkpoint.interval\": 10000\n" + + " },\n" + + " \"source\": {\n" + + " \"FakeSource\": {\n" + + " \"parallelism\": 2,\n" + + " \"result_table_name\": \"fake\",\n" + + " \"row.num\": 16,\n" + + " \"schema\": {\n" + + " \"fields\": {\n" + + " \"name\": \"string\",\n" + + " \"age\": \"int\"\n" + + " }\n" + + " }\n" + + " }\n" + + " },\n" + + " \"sink\": {\n" + + " \"Console\": {}\n" + + " }\n" + + "}"; +} \ No newline at end of file