From 45586f71d548cf22195bcc60c63d8bc84884974f Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Tue, 18 Apr 2023 18:38:16 +0800 Subject: [PATCH] [BUG][Resource Center] Task can not select main JAR (#13536) --- .../api/dto/resources/filter/IFilter.java | 4 +- .../dto/resources/filter/ResourceFilter.java | 47 ++++------- .../service/impl/ResourcesServiceImpl.java | 80 ++++++++++++------- .../resources/filter/ResourceFilterTest.java | 24 +++--- .../master/runner/task/BaseTaskProcessor.java | 3 +- .../service/process/ProcessServiceImpl.java | 7 +- .../service/process/ProcessServiceTest.java | 2 - .../plugin/task/api/AbstractYarnTask.java | 22 ----- .../plugin/task/dq/DataQualityTask.java | 4 +- .../plugin/task/dq/utils/SparkArgsUtils.java | 2 +- .../plugin/task/flink/FlinkStreamTask.java | 10 --- .../plugin/task/flink/FlinkArgsUtilsTest.java | 6 +- .../task/flink/FlinkParametersTest.java | 6 +- .../plugin/task/flink/FlinkArgsUtils.java | 2 +- .../plugin/task/flink/FlinkTask.java | 16 ---- .../plugin/task/flink/FlinkArgsUtilsTest.java | 6 +- .../task/flink/FlinkParametersTest.java | 6 +- .../plugin/task/java/JavaTask.java | 33 ++------ .../plugin/task/java/JavaTaskTest.java | 10 +++ .../plugin/task/mr/MapReduceArgsUtils.java | 5 +- .../plugin/task/mr/MapReduceTask.java | 13 +-- .../plugin/task/seatunnel/SeatunnelTask.java | 1 + .../plugin/task/spark/SparkTask.java | 14 +--- .../task/spark/SparkParametersTest.java | 6 +- .../plugin/task/spark/SparkTaskTest.java | 6 +- .../plugin/task/sqoop/SqoopTask.java | 4 - .../node/fields/use-java-task-main-jar.ts | 5 +- .../components/node/fields/use-main-jar.ts | 5 +- .../utils/TaskExecutionCheckerUtils.java | 8 +- 29 files changed, 143 insertions(+), 214 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/IFilter.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/IFilter.java index 2bcc2822a8..39c81b5880 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/IFilter.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/IFilter.java @@ -16,7 +16,7 @@ */ package org.apache.dolphinscheduler.api.dto.resources.filter; -import org.apache.dolphinscheduler.dao.entity.Resource; +import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity; import java.util.List; @@ -25,5 +25,5 @@ import java.util.List; */ public interface IFilter { - List filter(); + List filter(); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilter.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilter.java index 6206769f7f..14f4a6292d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilter.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilter.java @@ -16,7 +16,7 @@ */ package org.apache.dolphinscheduler.api.dto.resources.filter; -import org.apache.dolphinscheduler.dao.entity.Resource; +import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity; import java.util.ArrayList; import java.util.HashSet; @@ -36,7 +36,7 @@ public class ResourceFilter implements IFilter { /** * resource list */ - private List resourceList; + private List resourceList; /** * parent list @@ -48,7 +48,7 @@ public class ResourceFilter implements IFilter { * @param suffix resource suffix * @param resourceList resource list */ - public ResourceFilter(String suffix, List resourceList) { + public ResourceFilter(String suffix, List resourceList) { this.suffix = suffix; this.resourceList = resourceList; } @@ -57,44 +57,31 @@ public class ResourceFilter implements IFilter { * file filter * @return file filtered by suffix */ - public Set fileFilter() { - return resourceList.stream().filter(t -> { - String alias = t.getAlias(); - return alias.endsWith(suffix); - }).collect(Collectors.toSet()); + public Set fileFilter() { + return resourceList.stream().filter(t -> t.getFullName().endsWith(suffix)).collect(Collectors.toSet()); } /** * list all parent dir * @return parent resource dir set */ - Set listAllParent() { - Set parentList = new HashSet<>(); - Set filterFileList = fileFilter(); - for (Resource file : filterFileList) { - parentList.add(file); - setAllParent(file, parentList); - } - return parentList; - - } - - /** - * list all parent dir - * @param resource resource - * @return parent resource dir set - */ - private void setAllParent(Resource resource, Set parentList) { - for (Resource resourceTemp : resourceList) { - if (resourceTemp.getId() == resource.getPid()) { - parentList.add(resourceTemp); - setAllParent(resourceTemp, parentList); + Set listAllParent() { + Set parentList = new HashSet<>(); + Set filterFileList = fileFilter(); + for (StorageEntity file : filterFileList) { + String fullName = file.getFullName(); + for (StorageEntity resource : resourceList) { + if (fullName.startsWith(resource.getFullName())) { + parentList.add(resource); + } } } + + return parentList; } @Override - public List filter() { + public List filter() { return new ArrayList<>(listAllParent()); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java index 6f83c4f914..cd8d2cd1ea 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java @@ -583,11 +583,36 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe return result; } + List resourcesList = new ArrayList<>(); + try { + resourcesList = queryStorageEntityList(loginUser, fullName, type, tenantCode, false); + } catch (ServiceException e) { + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + + // remove leading and trailing spaces in searchVal + String trimmedSearchVal = searchVal != null ? searchVal.trim() : ""; + // filter based on trimmed searchVal + List filteredResourceList = resourcesList.stream() + .filter(x -> x.getFileName().contains(trimmedSearchVal)).collect(Collectors.toList()); + // inefficient pagination + List slicedResourcesList = filteredResourceList.stream().skip((long) (pageNo - 1) * pageSize) + .limit(pageSize).collect(Collectors.toList()); + + pageInfo.setTotal(filteredResourceList.size()); + pageInfo.setTotalList(slicedResourcesList); + result.setData(pageInfo); + putMsg(result, Status.SUCCESS); + return result; + } + + private List queryStorageEntityList(User loginUser, String fullName, ResourceType type, + String tenantCode, boolean recursive) { String defaultPath = ""; List resourcesList = new ArrayList<>(); String resourceStorageType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE, ResUploadType.NONE.name()); - if (isAdmin(loginUser) && StringUtils.isBlank(fullName)) { // list all tenants' resources to admin users in the root directory List userList = userMapper.selectList(null); @@ -600,13 +625,15 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe defaultPath = storageOperate.getUdfDir(tenantEntityCode); } try { - resourcesList.addAll(storageOperate.listFilesStatus(defaultPath, defaultPath, - tenantEntityCode, type)); + resourcesList.addAll(recursive + ? storageOperate.listFilesStatusRecursively(defaultPath, defaultPath, + tenantEntityCode, type) + : storageOperate.listFilesStatus(defaultPath, defaultPath, + tenantEntityCode, type)); visitedTenantEntityCode.add(tenantEntityCode); } catch (Exception e) { log.error(e.getMessage() + " Resource path: {}", defaultPath, e); - putMsg(result, Status.RESOURCE_NOT_EXIST); throw new ServiceException(String.format(e.getMessage() + " make sure resource path: %s exists in %s", defaultPath, resourceStorageType)); } @@ -626,26 +653,12 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe } } catch (Exception e) { log.error(e.getMessage() + " Resource path: {}", fullName, e); - putMsg(result, Status.RESOURCE_NOT_EXIST); throw new ServiceException(String.format(e.getMessage() + " make sure resource path: %s exists in %s", defaultPath, resourceStorageType)); } } - // remove leading and trailing spaces in searchVal - String trimmedSearchVal = searchVal != null ? searchVal.trim() : ""; - // filter based on trimmed searchVal - List filteredResourceList = resourcesList.stream() - .filter(x -> x.getFileName().contains(trimmedSearchVal)).collect(Collectors.toList()); - // inefficient pagination - List slicedResourcesList = filteredResourceList.stream().skip((long) (pageNo - 1) * pageSize) - .limit(pageSize).collect(Collectors.toList()); - - pageInfo.setTotal(filteredResourceList.size()); - pageInfo.setTotalList(slicedResourcesList); - result.setData(pageInfo); - putMsg(result, Status.SUCCESS); - return result; + return resourcesList; } /** @@ -799,14 +812,23 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe public Result queryResourceByProgramType(User loginUser, ResourceType type, ProgramType programType) { Result result = new Result<>(); - Set resourceIds = resourcePermissionCheckService - .userOwnedResourceIdsAcquisition(checkResourceType(type), loginUser.getId(), log); - if (resourceIds.isEmpty()) { - result.setData(Collections.emptyList()); - putMsg(result, Status.SUCCESS); + User user = userMapper.selectById(loginUser.getId()); + if (user == null) { + log.error("user {} not exists", loginUser.getId()); + putMsg(result, Status.USER_NOT_EXIST, loginUser.getId()); + return result; + } + + Tenant tenant = tenantMapper.queryById(user.getTenantId()); + if (tenant == null) { + log.error("tenant not exists"); + putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST); return result; } - List allResourceList = resourcesMapper.selectBatchIds(resourceIds); + + String tenantCode = tenant.getTenantCode(); + + List allResourceList = queryStorageEntityList(loginUser, "", type, tenantCode, true); String suffix = ".jar"; if (programType != null) { @@ -820,12 +842,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe default: } } - List resources = new ResourceFilter(suffix, new ArrayList<>(allResourceList)).filter(); - // Transform into StorageEntity for compatibility - List transformedResourceList = resources.stream() - .map(this::createStorageEntityBasedOnResource) - .collect(Collectors.toList()); - Visitor visitor = new ResourceTreeVisitor(transformedResourceList); + List resources = new ResourceFilter(suffix, new ArrayList<>(allResourceList)).filter(); + Visitor visitor = new ResourceTreeVisitor(resources); result.setData(visitor.visit("").getChildren()); putMsg(result, Status.SUCCESS); return result; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilterTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilterTest.java index ba0c796f2e..c84574a7e9 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilterTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilterTest.java @@ -16,7 +16,7 @@ */ package org.apache.dolphinscheduler.api.dto.resources.filter; -import org.apache.dolphinscheduler.dao.entity.Resource; +import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity; import java.util.ArrayList; import java.util.List; @@ -34,25 +34,23 @@ public class ResourceFilterTest { private static Logger logger = LoggerFactory.getLogger(ResourceFilterTest.class); @Test public void filterTest() { - List allList = new ArrayList<>(); + List allList = new ArrayList<>(); - Resource resource1 = new Resource(3, -1, "b", "/b", true); - Resource resource2 = new Resource(4, 2, "a1.txt", "/a/a1.txt", false); - Resource resource3 = new Resource(5, 3, "b1.txt", "/b/b1.txt", false); - Resource resource4 = new Resource(6, 3, "b2.jar", "/b/b2.jar", false); - Resource resource5 = new Resource(7, -1, "b2", "/b2", true); - Resource resource6 = new Resource(8, -1, "b2", "/b/b2", true); - Resource resource7 = new Resource(9, 8, "c2.jar", "/b/b2/c2.jar", false); + StorageEntity resource1 = new StorageEntity(); + resource1.setFullName("a1.txt"); + StorageEntity resource2 = new StorageEntity(); + resource2.setFullName("b1.txt"); + StorageEntity resource3 = new StorageEntity(); + resource3.setFullName("b2.jar"); + StorageEntity resource4 = new StorageEntity(); + resource4.setFullName("c2.jar"); allList.add(resource1); allList.add(resource2); allList.add(resource3); allList.add(resource4); - allList.add(resource5); - allList.add(resource6); - allList.add(resource7); ResourceFilter resourceFilter = new ResourceFilter(".jar", allList); - List resourceList = resourceFilter.filter(); + List resourceList = resourceFilter.filter(); Assertions.assertNotNull(resourceList); resourceList.forEach(t -> logger.info(t.toString())); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 6045e2609e..9925e36b5e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -606,9 +606,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { if (baseParam != null) { List projectResourceFiles = baseParam.getResourceFilesList(); if (CollectionUtils.isNotEmpty(projectResourceFiles)) { - // TODO: Modify this part to accomodate(migrate) oldversionresources in the future. projectResourceFiles.forEach(file -> resourcesMap.put(file.getResourceName(), - processService.queryTenantCodeByResName(file.getResourceName(), ResourceType.FILE))); + storageOperate.getResourceFileName(file.getResourceName()))); } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 43196ad569..be997c934a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -1364,7 +1364,8 @@ public class ProcessServiceImpl implements ProcessService { ResourceInfo mainJar = JSONUtils.parseObject( JSONUtils.toJsonString(mainJarObj), ResourceInfo.class); - ResourceInfo resourceInfo = updateResourceInfo(taskDefinition.getId(), mainJar); + ResourceInfo resourceInfo = + updateResourceInfo(taskDefinitionMapper.queryByCode(taskDefinition.getCode()).getId(), mainJar); if (resourceInfo != null) { taskParameters.put("mainJar", resourceInfo); } @@ -1375,7 +1376,8 @@ public class ProcessServiceImpl implements ProcessService { List resourceInfos = JSONUtils.toList(resourceListStr, ResourceInfo.class); List updatedResourceInfos = resourceInfos .stream() - .map(resourceInfo -> updateResourceInfo(taskDefinition.getId(), resourceInfo)) + .map(resourceInfo -> updateResourceInfo( + taskDefinitionMapper.queryByCode(taskDefinition.getCode()).getId(), resourceInfo)) .filter(Objects::nonNull) .collect(Collectors.toList()); taskParameters.put("resourceList", updatedResourceInfos); @@ -1402,7 +1404,6 @@ public class ProcessServiceImpl implements ProcessService { } resourceInfo = new ResourceInfo(); resourceInfo.setId(-1); - resourceInfo.setRes(res.getRes()); resourceInfo.setResourceName(resourceFullName); log.info("updated resource info {}", JSONUtils.toJsonString(resourceInfo)); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index f2b7dbe489..81ef7a214f 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -714,13 +714,11 @@ public class ProcessServiceTest { // test normal situation ResourceInfo resourceInfoNormal = new ResourceInfo(); resourceInfoNormal.setId(1); - resourceInfoNormal.setRes("test.txt"); resourceInfoNormal.setResourceName("/test.txt"); ResourceInfo updatedResourceInfo3 = processService.updateResourceInfo(0, resourceInfoNormal); Assertions.assertEquals(-1, updatedResourceInfo3.getId().intValue()); - Assertions.assertEquals("test.txt", updatedResourceInfo3.getRes()); Assertions.assertEquals("/test.txt", updatedResourceInfo3.getResourceName()); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java index 85ab9a610f..406e78a80b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java @@ -21,7 +21,6 @@ import static org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLE import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY; import org.apache.dolphinscheduler.common.utils.PropertyUtils; -import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; @@ -122,25 +121,4 @@ public abstract class AbstractYarnTask extends AbstractRemoteTask { */ protected abstract String buildCommand(); - /** - * set main jar name - */ - protected abstract void setMainJarName(); - - /** - * Get name of jar resource. - * - * @param mainJar - * @return - */ - protected String getResourceNameOfMainJar(ResourceInfo mainJar) { - if (null == mainJar) { - throw new RuntimeException("The jar for the task is required."); - } - - return mainJar.getId() == null - ? mainJar.getRes() - // when update resource maybe has error - : mainJar.getResourceName().replaceFirst("/", ""); - } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java index 54b88c5977..b270b9a5f5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java @@ -180,11 +180,11 @@ public class DataQualityTask extends AbstractYarnTask { return command; } - @Override protected void setMainJarName() { ResourceInfo mainJar = new ResourceInfo(); String basePath = System.getProperty("user.dir").replace(File.separator + "bin", ""); - mainJar.setRes(basePath + File.separator + "libs" + File.separator + CommonUtils.getDataQualityJarName()); + mainJar.setResourceName( + basePath + File.separator + "libs" + File.separator + CommonUtils.getDataQualityJarName()); dataQualityParameters.getSparkParameters().setMainJar(mainJar); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java index 66c157e82b..d576abe6d0 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java @@ -120,7 +120,7 @@ public class SparkArgsUtils { ResourceInfo mainJar = param.getMainJar(); if (mainJar != null) { - args.add(mainJar.getRes()); + args.add(mainJar.getResourceName()); } String mainArgs = param.getMainArgs(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java index 750711502f..4da841b0d3 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java @@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; import org.apache.dolphinscheduler.plugin.task.api.stream.StreamTask; @@ -58,7 +57,6 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask { throw new RuntimeException("flink task params is not valid"); } flinkParameters.setQueue(taskExecutionContext.getQueue()); - setMainJarName(); FileUtils.generateScriptFile(taskExecutionContext, flinkParameters); } @@ -80,14 +78,6 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask { return command; } - @Override - protected void setMainJarName() { - ResourceInfo mainJar = flinkParameters.getMainJar(); - String resourceName = getResourceNameOfMainJar(mainJar); - mainJar.setRes(resourceName); - flinkParameters.setMainJar(mainJar); - } - @Override public AbstractParameters getParameters() { return flinkParameters; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java index e8d29df4be..c274ee2594 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.flink; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; +import java.util.HashMap; import java.util.List; import org.junit.jupiter.api.Assertions; @@ -38,7 +39,7 @@ public class FlinkArgsUtilsTest { flinkParameters.setParallelism(4); ResourceInfo resourceInfo = new ResourceInfo(); resourceInfo.setId(1); - resourceInfo.setResourceName("job"); + resourceInfo.setResourceName("/opt/job.jar"); resourceInfo.setRes("/opt/job.jar"); flinkParameters.setMainJar(resourceInfo); flinkParameters.setMainClass("org.example.Main"); @@ -53,6 +54,9 @@ public class FlinkArgsUtilsTest { TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); taskExecutionContext.setTaskAppId("app-id"); taskExecutionContext.setExecutePath("/tmp/execution"); + HashMap map = new HashMap<>(); + map.put("/opt/job.jar", "/opt/job.jar"); + taskExecutionContext.setResources(map); return taskExecutionContext; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java index cb39748fcc..7118d40667 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java @@ -33,12 +33,12 @@ public class FlinkParametersTest { Assertions.assertTrue(flinkParameters.getResourceFilesList().isEmpty()); ResourceInfo mainResource = new ResourceInfo(); - mainResource.setRes("testFlinkMain-1.0.0-SNAPSHOT.jar"); + mainResource.setResourceName("/testFlinkMain-1.0.0-SNAPSHOT.jar"); flinkParameters.setMainJar(mainResource); List resourceInfos = new LinkedList<>(); ResourceInfo resourceInfo1 = new ResourceInfo(); - resourceInfo1.setRes("testFlinkParameters1.jar"); + resourceInfo1.setResourceName("/testFlinkParameters1.jar"); resourceInfos.add(resourceInfo1); flinkParameters.setResourceList(resourceInfos); @@ -47,7 +47,7 @@ public class FlinkParametersTest { Assertions.assertEquals(2, resourceFilesList.size()); ResourceInfo resourceInfo2 = new ResourceInfo(); - resourceInfo2.setRes("testFlinkParameters2.jar"); + resourceInfo2.setResourceName("/testFlinkParameters2.jar"); resourceInfos.add(resourceInfo2); flinkParameters.setResourceList(resourceInfos); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java index 45b10c4128..20f460857c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java @@ -297,7 +297,7 @@ public class FlinkArgsUtils { if (ProgramType.PYTHON == programType) { args.add(FlinkConstants.FLINK_PYTHON); } - args.add(mainJar.getRes()); + args.add(taskExecutionContext.getResources().get(mainJar.getResourceName())); } String mainArgs = flinkParameters.getMainArgs(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java index 2e58ae0534..01862816b6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java @@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; @@ -61,7 +60,6 @@ public class FlinkTask extends AbstractYarnTask { throw new RuntimeException("flink task params is not valid"); } flinkParameters.setQueue(taskExecutionContext.getQueue()); - setMainJarName(); FileUtils.generateScriptFile(taskExecutionContext, flinkParameters); } @@ -83,20 +81,6 @@ public class FlinkTask extends AbstractYarnTask { return command; } - @Override - protected void setMainJarName() { - if (flinkParameters.getProgramType() == ProgramType.SQL) { - log.info("The current flink job type is SQL, will no need to set main jar"); - return; - } - - ResourceInfo mainJar = flinkParameters.getMainJar(); - String resourceName = getResourceNameOfMainJar(mainJar); - mainJar.setRes(resourceName); - flinkParameters.setMainJar(mainJar); - log.info("Success set flink jar: {}", resourceName); - } - @Override public AbstractParameters getParameters() { return flinkParameters; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java index 952e0c42ed..64d9931f22 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.flink; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; +import java.util.HashMap; import java.util.List; import org.junit.jupiter.api.Assertions; @@ -38,7 +39,7 @@ public class FlinkArgsUtilsTest { flinkParameters.setParallelism(4); ResourceInfo resourceInfo = new ResourceInfo(); resourceInfo.setId(1); - resourceInfo.setResourceName("job"); + resourceInfo.setResourceName("/opt/job.jar"); resourceInfo.setRes("/opt/job.jar"); flinkParameters.setMainJar(resourceInfo); flinkParameters.setMainClass("org.example.Main"); @@ -53,6 +54,9 @@ public class FlinkArgsUtilsTest { TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); taskExecutionContext.setTaskAppId("app-id"); taskExecutionContext.setExecutePath("/tmp/execution"); + HashMap map = new HashMap<>(); + map.put("/opt/job.jar", "/opt/job.jar"); + taskExecutionContext.setResources(map); return taskExecutionContext; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java index 3e4a614acb..3f9594241b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java @@ -33,12 +33,12 @@ public class FlinkParametersTest { Assertions.assertTrue(flinkParameters.getResourceFilesList().isEmpty()); ResourceInfo mainResource = new ResourceInfo(); - mainResource.setRes("testFlinkMain-1.0.0-SNAPSHOT.jar"); + mainResource.setResourceName("/testFlinkMain-1.0.0-SNAPSHOT.jar"); flinkParameters.setMainJar(mainResource); List resourceInfos = new LinkedList<>(); ResourceInfo resourceInfo1 = new ResourceInfo(); - resourceInfo1.setRes("testFlinkParameters1.jar"); + resourceInfo1.setResourceName("/testFlinkParameters1.jar"); resourceInfos.add(resourceInfo1); flinkParameters.setResourceList(resourceInfos); @@ -47,7 +47,7 @@ public class FlinkParametersTest { Assertions.assertEquals(2, resourceFilesList.size()); ResourceInfo resourceInfo2 = new ResourceInfo(); - resourceInfo2.setRes("testFlinkParameters2.jar"); + resourceInfo2.setResourceName("/testFlinkParameters2.jar"); resourceInfos.add(resourceInfo2); flinkParameters.setResourceList(resourceInfos); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java index 280ede0d7e..d05cbec1bd 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.plugin.task.java; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.SINGLE_SLASH; +import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR; import static org.apache.dolphinscheduler.plugin.task.java.JavaConstants.JAVA_HOME_VAR; import static org.apache.dolphinscheduler.plugin.task.java.JavaConstants.PUBLIC_CLASS_NAME_REGEX; @@ -93,9 +93,6 @@ public class JavaTask extends AbstractTask { if (javaParameters == null || !javaParameters.checkParameters()) { throw new TaskException("java task params is not valid"); } - if (javaParameters.getRunType().equals(JavaConstants.RUN_TYPE_JAR)) { - setMainJarName(); - } log.info("Initialize java task params {}", JSONUtils.toPrettyJsonString(javaParameters)); } @@ -169,44 +166,25 @@ public class JavaTask extends AbstractTask { return builder.toString(); } - private void setMainJarName() { - ResourceInfo mainJar = javaParameters.getMainJar(); - String resourceName = getResourceNameOfMainJar(mainJar); - mainJar.setRes(resourceName); - javaParameters.setMainJar(mainJar); - } - /** * Construct a shell command for the java -jar Run mode * * @return String **/ protected String buildJarCommand() { - String fullName = javaParameters.getMainJar().getResourceName(); - String mainJarName = fullName.substring(0, fullName.lastIndexOf('.')); - mainJarName = mainJarName.substring(mainJarName.lastIndexOf('.') + 1) + ".jar"; + String mainJarName = taskRequest.getResources().get(javaParameters.getMainJar().getResourceName()); StringBuilder builder = new StringBuilder(); builder.append(getJavaCommandPath()) .append("java").append(" ") .append(buildResourcePath()).append(" ") .append("-jar").append(" ") - .append(taskRequest.getExecutePath()) + .append(taskRequest.getExecutePath()).append(FOLDER_SEPARATOR) .append(mainJarName).append(" ") .append(javaParameters.getMainArgs().trim()).append(" ") .append(javaParameters.getJvmArgs().trim()); return builder.toString(); } - private String getResourceNameOfMainJar(ResourceInfo mainJar) { - if (null == mainJar) { - throw new RuntimeException("The jar for the task is required."); - } - return mainJar.getId() == 0 - ? mainJar.getRes() - // when update resource maybe has error - : mainJar.getResourceName().replaceFirst(SINGLE_SLASH, ""); - } - @Override public void cancel() throws TaskException { // cancel process @@ -300,10 +278,11 @@ public class JavaTask extends AbstractTask { builder.append(" ").append(JavaConstants.CLASSPATH_CURRENT_DIR) .append(JavaConstants.PATH_SEPARATOR) .append(taskRequest.getExecutePath()); + Map resourceMap = taskRequest.getResources(); for (ResourceInfo info : javaParameters.getResourceFilesList()) { builder.append(JavaConstants.PATH_SEPARATOR); - builder.append(taskRequest.getExecutePath()) - .append(info.getResourceName()); + builder.append(taskRequest.getExecutePath()).append(FOLDER_SEPARATOR) + .append(resourceMap.get(info.getResourceName())); } return builder.toString(); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java index c9193c397a..4e6bc30e1f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java @@ -39,6 +39,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.HashMap; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -252,6 +253,11 @@ public class JavaTaskTest { taskExecutionContext.setTaskParams(JSONUtils.toJsonString(createJavaParametersObject(RUN_TYPE_JAVA))); taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/test/executepath"); taskExecutionContext.setTaskAppId("runJavaType"); + HashMap map = new HashMap<>(); + map.put("/opt/share/jar/resource2.jar", "opt/share/jar/resource2.jar"); + map.put("/opt/share/jar/main.jar", "opt/share/jar/main.jar"); + map.put("/JavaTaskTest.java", "JavaTaskTest.java"); + taskExecutionContext.setResources(map); JavaTask javaTask = new JavaTask(taskExecutionContext); javaTask.init(); return javaTask; @@ -267,6 +273,10 @@ public class JavaTaskTest { taskExecutionContext.setTaskParams(JSONUtils.toJsonString(createJavaParametersObject(RUN_TYPE_JAR))); taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/test/executepath"); taskExecutionContext.setTaskAppId("runJavaType"); + HashMap map = new HashMap<>(); + map.put("/opt/share/jar/resource2.jar", "opt/share/jar/resource2.jar"); + map.put("/opt/share/jar/main.jar", "opt/share/jar/main.jar"); + taskExecutionContext.setResources(map); JavaTask javaTask = new JavaTask(taskExecutionContext); javaTask.init(); return javaTask; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceArgsUtils.java index 9147761068..070774adef 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceArgsUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceArgsUtils.java @@ -22,6 +22,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAR; import static org.apache.dolphinscheduler.plugin.task.mr.MapReduceTaskConstants.MR_NAME; import static org.apache.dolphinscheduler.plugin.task.mr.MapReduceTaskConstants.MR_QUEUE; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils; @@ -45,13 +46,13 @@ public class MapReduceArgsUtils { * @param param param * @return argument list */ - public static List buildArgs(MapReduceParameters param) { + public static List buildArgs(MapReduceParameters param, TaskExecutionContext taskExecutionContext) { List args = new ArrayList<>(); ResourceInfo mainJar = param.getMainJar(); if (mainJar != null) { args.add(JAR); - args.add(mainJar.getRes()); + args.add(taskExecutionContext.getResources().get(mainJar.getResourceName())); } ProgramType programType = param.getProgramType(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java index 8d9befd2d0..988711370f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java @@ -22,7 +22,6 @@ import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; -import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; @@ -73,7 +72,6 @@ public class MapReduceTask extends AbstractYarnTask { } mapreduceParameters.setQueue(taskExecutionContext.getQueue()); - setMainJarName(); // replace placeholder,and combine local and global parameters Map paramsMap = taskExecutionContext.getPrepareParamsMap(); @@ -101,7 +99,7 @@ public class MapReduceTask extends AbstractYarnTask { args.add(MAPREDUCE_COMMAND); // other parameters - args.addAll(MapReduceArgsUtils.buildArgs(mapreduceParameters)); + args.addAll(MapReduceArgsUtils.buildArgs(mapreduceParameters, taskExecutionContext)); String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams()); @@ -110,15 +108,6 @@ public class MapReduceTask extends AbstractYarnTask { return command; } - @Override - protected void setMainJarName() { - // main jar - ResourceInfo mainJar = mapreduceParameters.getMainJar(); - String resourceName = getResourceNameOfMainJar(mainJar); - mainJar.setRes(resourceName); - mapreduceParameters.setMainJar(mainJar); - } - @Override public AbstractParameters getParameters() { return mapreduceParameters; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java index 975f8d8106..d9ef25b860 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java @@ -156,6 +156,7 @@ public class SeatunnelTask extends AbstractRemoteTask { } else { seatunnelParameters.getResourceList().forEach(resourceInfo -> { args.add(CONFIG_OPTIONS); + // TODO: Need further check for refactored resource center // TODO Currently resourceName is `/xxx.sh`, it has more `/` and needs to be optimized args.add(resourceInfo.getResourceName().substring(1)); }); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java index 16533397da..00a12430a3 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java @@ -84,9 +84,6 @@ public class SparkTask extends AbstractYarnTask { } sparkParameters.setQueue(taskExecutionContext.getQueue()); - if (sparkParameters.getProgramType() != ProgramType.SQL) { - setMainJarName(); - } log.info("Initialize spark task params {}", JSONUtils.toPrettyJsonString(sparkParameters)); } @@ -191,7 +188,7 @@ public class SparkTask extends AbstractYarnTask { ResourceInfo mainJar = sparkParameters.getMainJar(); if (programType != ProgramType.SQL) { - args.add(mainJar.getRes()); + args.add(taskExecutionContext.getResources().get(mainJar.getResourceName())); } String mainArgs = sparkParameters.getMainArgs(); @@ -276,15 +273,6 @@ public class SparkTask extends AbstractYarnTask { return script; } - @Override - protected void setMainJarName() { - // main jar - ResourceInfo mainJar = sparkParameters.getMainJar(); - String resourceName = getResourceNameOfMainJar(mainJar); - mainJar.setRes(resourceName); - sparkParameters.setMainJar(mainJar); - } - @Override public AbstractParameters getParameters() { return sparkParameters; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java index 42c1f5ccf1..ab164f2eb5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java @@ -33,12 +33,12 @@ public class SparkParametersTest { Assertions.assertTrue(sparkParameters.getResourceFilesList().isEmpty()); ResourceInfo mainResource = new ResourceInfo(); - mainResource.setRes("testSparkMain-1.0.0-SNAPSHOT.jar\""); + mainResource.setResourceName("testSparkMain-1.0.0-SNAPSHOT.jar\""); sparkParameters.setMainJar(mainResource); LinkedList resourceInfos = new LinkedList<>(); ResourceInfo resourceInfo1 = new ResourceInfo(); - resourceInfo1.setRes("testSparkParameters1.jar"); + resourceInfo1.setResourceName("testSparkParameters1.jar"); resourceInfos.add(resourceInfo1); sparkParameters.setResourceList(resourceInfos); @@ -47,7 +47,7 @@ public class SparkParametersTest { Assertions.assertEquals(2, resourceFilesList.size()); ResourceInfo resourceInfo2 = new ResourceInfo(); - resourceInfo2.setRes("testSparkParameters2.jar"); + resourceInfo2.setResourceName("testSparkParameters2.jar"); resourceInfos.add(resourceInfo2); sparkParameters.setResourceList(resourceInfos); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java index 3c2c41e787..3bc9be5959 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import java.util.Collections; +import java.util.HashMap; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -59,6 +60,9 @@ public class SparkTaskTest { public void testBuildCommandWithSparkSubmit() { String parameters = buildSparkParametersWithSparkSubmit(); TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class); + HashMap map = new HashMap<>(); + map.put("/lib/dolphinscheduler-task-spark.jar", "/lib/dolphinscheduler-task-spark.jar"); + Mockito.when(taskExecutionContext.getResources()).thenReturn(map); Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters); SparkTask sparkTask = Mockito.spy(new SparkTask(taskExecutionContext)); sparkTask.init(); @@ -73,7 +77,7 @@ public class SparkTaskTest { "--conf spark.executor.cores=2 " + "--conf spark.executor.memory=1G " + "--name spark " + - "lib/dolphinscheduler-task-spark.jar"); + "/lib/dolphinscheduler-task-spark.jar"); } private String buildSparkParametersWithSparkSql() { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java index 72e2eca02d..d5a5d87c8b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java @@ -87,10 +87,6 @@ public class SqoopTask extends AbstractYarnTask { } - @Override - protected void setMainJarName() { - } - @Override public AbstractParameters getParameters() { return sqoopParameters; diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-java-task-main-jar.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-java-task-main-jar.ts index fed6ef3bc1..826d709fad 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-java-task-main-jar.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-java-task-main-jar.ts @@ -59,12 +59,13 @@ export function useJavaTaskMainJar(model: { [field: string]: any }): IJsonItem { name: t('project.node.main_package'), span: mainJarSpan, props: { + checkable: true, cascade: true, showPath: true, checkStrategy: 'child', placeholder: t('project.node.main_package_tips'), - keyField: 'id', - labelField: 'fullName' + keyField: 'fullName', + labelField: 'name' }, validate: { trigger: ['input', 'blur'], diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-main-jar.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-main-jar.ts index b617329c82..79ef7a2825 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-main-jar.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-main-jar.ts @@ -61,12 +61,13 @@ export function useMainJar(model: { [field: string]: any }): IJsonItem { name: t('project.node.main_package'), span: mainJarSpan, props: { + checkable: true, cascade: true, showPath: true, checkStrategy: 'child', placeholder: t('project.node.main_package_tips'), - keyField: 'id', - labelField: 'fullName' + keyField: 'fullName', + labelField: 'name' }, validate: { trigger: ['input', 'blur'], diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java index 44143d0f9a..8b66a7e893 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java @@ -133,14 +133,12 @@ public class TaskExecutionCheckerUtils { for (Pair fileDownload : downloadFiles) { try { String fullName = fileDownload.getLeft(); - // we do not actually get & need tenantCode with this implementation right now. - String tenantCode = fileDownload.getRight(); - // TODO: Need a better way to get fileName because this implementation is tricky. - String fileName = storageOperate.getResourceFileName(fullName); + String fileName = fileDownload.getRight(); logger.info("get resource file from path:{}", fullName); long resourceDownloadStartTime = System.currentTimeMillis(); - storageOperate.download(tenantCode, fullName, execLocalPath + File.separator + fileName, false, + storageOperate.download(taskExecutionContext.getTenantCode(), fullName, + execLocalPath + File.separator + fileName, false, true); WorkerServerMetrics .recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime);