Browse Source

[Fix-16740][SeaTunnel-Task] fix can't submit resource center config file issue (#16741)

dev
Jarvis 4 weeks ago committed by GitHub
parent
commit
071994933b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 28
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
  2. 70
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java
  3. 4
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

28
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java

@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder; import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory; import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
@ -154,26 +155,21 @@ public class SeatunnelTask extends AbstractRemoteTask {
protected List<String> buildOptions() throws Exception { protected List<String> buildOptions() throws Exception {
List<String> args = new ArrayList<>(); List<String> args = new ArrayList<>();
args.add(CONFIG_OPTIONS);
String scriptContent;
if (BooleanUtils.isTrue(seatunnelParameters.getUseCustom())) { if (BooleanUtils.isTrue(seatunnelParameters.getUseCustom())) {
args.add(CONFIG_OPTIONS); scriptContent = buildCustomConfigContent();
args.add(buildCustomConfigCommand());
} else { } else {
seatunnelParameters.getResourceList().forEach(resourceInfo -> { String resourceFileName = seatunnelParameters.getResourceList().get(0).getResourceName();
args.add(CONFIG_OPTIONS); ResourceContext resourceContext = taskExecutionContext.getResourceContext();
// TODO: Need further check for refactored resource center scriptContent = FileUtils.readFileToString(
// TODO Currently resourceName is `/xxx.sh`, it has more `/` and needs to be optimized new File(resourceContext.getResourceItem(resourceFileName).getResourceAbsolutePathInLocal()),
args.add(resourceInfo.getResourceName().replaceFirst(".*:", "")); StandardCharsets.UTF_8);
});
} }
return args;
}
protected String buildCustomConfigCommand() throws Exception {
String config = buildCustomConfigContent();
String filePath = buildConfigFilePath(); String filePath = buildConfigFilePath();
createConfigFileIfNotExists(config, filePath); createConfigFileIfNotExists(scriptContent, filePath);
args.add(filePath);
return filePath; return args;
} }
private String buildCustomConfigContent() { private String buildCustomConfigContent() {

70
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/test/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java → dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/test/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskTest.java

@ -14,35 +14,91 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.plugin.task.seatunnel; package org.apache.dolphinscheduler.plugin.task.seatunnel;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.junit.Test; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;
import org.apache.commons.io.FileUtils;
import java.util.Collections;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
public class SeatunnelTaskTest { public class SeatunnelTaskTest {
private static final String EXECUTE_PATH = "/home";
private static final String TASK_APPID = "9527"; private static final String EXECUTE_PATH = "/tmp";
private static final String RESOURCE_SCRIPT_PATH = "/tmp/demo.conf";
private MockedStatic<FileUtils> mockedStaticFileUtils;
@BeforeEach
public void setUp() {
mockedStaticFileUtils = Mockito.mockStatic(FileUtils.class);
}
@AfterEach
public void after() {
mockedStaticFileUtils.close();
}
@Test @Test
public void formatDetector() throws Exception{ public void formatDetector() throws Exception {
String taskId = "1234";
SeatunnelParameters seatunnelParameters = new SeatunnelParameters(); SeatunnelParameters seatunnelParameters = new SeatunnelParameters();
seatunnelParameters.setUseCustom(true);
seatunnelParameters.setRawScript(RAW_SCRIPT); seatunnelParameters.setRawScript(RAW_SCRIPT);
TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setExecutePath(EXECUTE_PATH); taskExecutionContext.setExecutePath(EXECUTE_PATH);
taskExecutionContext.setTaskAppId(TASK_APPID); taskExecutionContext.setTaskAppId(taskId);
taskExecutionContext.setTaskParams(JSONUtils.toJsonString(seatunnelParameters)); taskExecutionContext.setTaskParams(JSONUtils.toJsonString(seatunnelParameters));
SeatunnelTask seatunnelTask = new SeatunnelTask(taskExecutionContext); SeatunnelTask seatunnelTask = new SeatunnelTask(taskExecutionContext);
seatunnelTask.setSeatunnelParameters(seatunnelParameters); seatunnelTask.setSeatunnelParameters(seatunnelParameters);
Assertions.assertEquals("/home/seatunnel_9527.conf", seatunnelTask.buildCustomConfigCommand()); String command1 = String.join(" ", seatunnelTask.buildOptions());
String expectedCommand1 = String.format("--config %s/seatunnel_%s.conf", EXECUTE_PATH, taskId);
Assertions.assertEquals(expectedCommand1, command1);
seatunnelParameters.setRawScript(RAW_SCRIPT_2); seatunnelParameters.setRawScript(RAW_SCRIPT_2);
seatunnelTask.setSeatunnelParameters(seatunnelParameters); seatunnelTask.setSeatunnelParameters(seatunnelParameters);
Assertions.assertEquals("/home/seatunnel_9527.json", seatunnelTask.buildCustomConfigCommand()); String command2 = String.join(" ", seatunnelTask.buildOptions());
String expectedCommand2 = String.format("--config %s/seatunnel_%s.json", EXECUTE_PATH, taskId);
Assertions.assertEquals(expectedCommand2, command2);
}
@Test
public void testReadConfigFromResourceCenter() throws Exception {
String taskId = "2345";
SeatunnelParameters seatunnelParameters = new SeatunnelParameters();
seatunnelParameters.setUseCustom(false);
ResourceInfo resourceInfo = new ResourceInfo();
resourceInfo.setResourceName(RESOURCE_SCRIPT_PATH);
seatunnelParameters.setResourceList(Collections.singletonList(resourceInfo));
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setExecutePath(EXECUTE_PATH);
taskExecutionContext.setTaskAppId(taskId);
taskExecutionContext.setTaskParams(JSONUtils.toJsonString(seatunnelParameters));
ResourceContext resourceContext = new ResourceContext();
resourceContext.addResourceItem(new ResourceContext.ResourceItem(RESOURCE_SCRIPT_PATH, RESOURCE_SCRIPT_PATH));
taskExecutionContext.setResourceContext(resourceContext);
SeatunnelTask seatunnelTask = new SeatunnelTask(taskExecutionContext);
seatunnelTask.setSeatunnelParameters(seatunnelParameters);
String command = String.join(" ", seatunnelTask.buildOptions());
String expectedCommand = String.format("--config %s/seatunnel_%s.conf", EXECUTE_PATH, taskId);
Assertions.assertEquals(expectedCommand, command);
} }
private static final String RAW_SCRIPT = "env {\n" + private static final String RAW_SCRIPT = "env {\n" +
" execution.parallelism = 2\n" + " execution.parallelism = 2\n" +
" job.mode = \"BATCH\"\n" + " job.mode = \"BATCH\"\n" +

4
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -205,7 +205,9 @@ export function formatParams(data: INodeData): {
if (data.taskType === 'SEATUNNEL') { if (data.taskType === 'SEATUNNEL') {
taskParams.startupScript = data.startupScript taskParams.startupScript = data.startupScript
taskParams.useCustom = data.useCustom taskParams.useCustom = data.useCustom
taskParams.rawScript = data.rawScript if (!data.useCustom) {
taskParams.rawScript = ''
}
if (data.startupScript?.includes('flink')) { if (data.startupScript?.includes('flink')) {
taskParams.runMode = data.runMode taskParams.runMode = data.runMode
taskParams.others = data.others taskParams.others = data.others

Loading…
Cancel
Save