Browse Source

[BUG][Resource Center] Task can not select main JAR (#13536)

3.2.0-release
Aaron Wang 2 years ago committed by GitHub
parent
commit
45586f71d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/IFilter.java
  2. 45
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilter.java
  3. 78
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  4. 24
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilterTest.java
  5. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  6. 7
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  7. 2
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  8. 22
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
  9. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
  10. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java
  11. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
  12. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
  13. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java
  14. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
  15. 16
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
  16. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java
  17. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java
  18. 33
      dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
  19. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java
  20. 5
      dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceArgsUtils.java
  21. 13
      dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
  22. 1
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
  23. 14
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
  24. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java
  25. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
  26. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
  27. 5
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-java-task-main-jar.ts
  28. 5
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-main-jar.ts
  29. 8
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/IFilter.java

@ -16,7 +16,7 @@
*/
package org.apache.dolphinscheduler.api.dto.resources.filter;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
import java.util.List;
@ -25,5 +25,5 @@ import java.util.List;
*/
public interface IFilter {
List<Resource> filter();
List<StorageEntity> filter();
}

45
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilter.java

@ -16,7 +16,7 @@
*/
package org.apache.dolphinscheduler.api.dto.resources.filter;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
import java.util.ArrayList;
import java.util.HashSet;
@ -36,7 +36,7 @@ public class ResourceFilter implements IFilter {
/**
* resource list
*/
private List<Resource> resourceList;
private List<StorageEntity> resourceList;
/**
* parent list
@ -48,7 +48,7 @@ public class ResourceFilter implements IFilter {
* @param suffix resource suffix
* @param resourceList resource list
*/
public ResourceFilter(String suffix, List<Resource> resourceList) {
public ResourceFilter(String suffix, List<StorageEntity> resourceList) {
this.suffix = suffix;
this.resourceList = resourceList;
}
@ -57,44 +57,31 @@ public class ResourceFilter implements IFilter {
* file filter
* @return file filtered by suffix
*/
public Set<Resource> fileFilter() {
return resourceList.stream().filter(t -> {
String alias = t.getAlias();
return alias.endsWith(suffix);
}).collect(Collectors.toSet());
public Set<StorageEntity> fileFilter() {
return resourceList.stream().filter(t -> t.getFullName().endsWith(suffix)).collect(Collectors.toSet());
}
/**
* list all parent dir
* @return parent resource dir set
*/
Set<Resource> listAllParent() {
Set<Resource> parentList = new HashSet<>();
Set<Resource> filterFileList = fileFilter();
for (Resource file : filterFileList) {
parentList.add(file);
setAllParent(file, parentList);
Set<StorageEntity> listAllParent() {
Set<StorageEntity> parentList = new HashSet<>();
Set<StorageEntity> filterFileList = fileFilter();
for (StorageEntity file : filterFileList) {
String fullName = file.getFullName();
for (StorageEntity resource : resourceList) {
if (fullName.startsWith(resource.getFullName())) {
parentList.add(resource);
}
return parentList;
}
/**
* list all parent dir
* @param resource resource
* @return parent resource dir set
*/
private void setAllParent(Resource resource, Set<Resource> parentList) {
for (Resource resourceTemp : resourceList) {
if (resourceTemp.getId() == resource.getPid()) {
parentList.add(resourceTemp);
setAllParent(resourceTemp, parentList);
}
}
return parentList;
}
@Override
public List<Resource> filter() {
public List<StorageEntity> filter() {
return new ArrayList<>(listAllParent());
}
}

78
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -583,11 +583,36 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
List<StorageEntity> resourcesList = new ArrayList<>();
try {
resourcesList = queryStorageEntityList(loginUser, fullName, type, tenantCode, false);
} catch (ServiceException e) {
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
// remove leading and trailing spaces in searchVal
String trimmedSearchVal = searchVal != null ? searchVal.trim() : "";
// filter based on trimmed searchVal
List<StorageEntity> filteredResourceList = resourcesList.stream()
.filter(x -> x.getFileName().contains(trimmedSearchVal)).collect(Collectors.toList());
// inefficient pagination
List<StorageEntity> slicedResourcesList = filteredResourceList.stream().skip((long) (pageNo - 1) * pageSize)
.limit(pageSize).collect(Collectors.toList());
pageInfo.setTotal(filteredResourceList.size());
pageInfo.setTotalList(slicedResourcesList);
result.setData(pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
private List<StorageEntity> queryStorageEntityList(User loginUser, String fullName, ResourceType type,
String tenantCode, boolean recursive) {
String defaultPath = "";
List<StorageEntity> resourcesList = new ArrayList<>();
String resourceStorageType =
PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE, ResUploadType.NONE.name());
if (isAdmin(loginUser) && StringUtils.isBlank(fullName)) {
// list all tenants' resources to admin users in the root directory
List<User> userList = userMapper.selectList(null);
@ -600,13 +625,15 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
defaultPath = storageOperate.getUdfDir(tenantEntityCode);
}
try {
resourcesList.addAll(storageOperate.listFilesStatus(defaultPath, defaultPath,
resourcesList.addAll(recursive
? storageOperate.listFilesStatusRecursively(defaultPath, defaultPath,
tenantEntityCode, type)
: storageOperate.listFilesStatus(defaultPath, defaultPath,
tenantEntityCode, type));
visitedTenantEntityCode.add(tenantEntityCode);
} catch (Exception e) {
log.error(e.getMessage() + " Resource path: {}", defaultPath, e);
putMsg(result, Status.RESOURCE_NOT_EXIST);
throw new ServiceException(String.format(e.getMessage() +
" make sure resource path: %s exists in %s", defaultPath, resourceStorageType));
}
@ -626,26 +653,12 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
}
} catch (Exception e) {
log.error(e.getMessage() + " Resource path: {}", fullName, e);
putMsg(result, Status.RESOURCE_NOT_EXIST);
throw new ServiceException(String.format(e.getMessage() +
" make sure resource path: %s exists in %s", defaultPath, resourceStorageType));
}
}
// remove leading and trailing spaces in searchVal
String trimmedSearchVal = searchVal != null ? searchVal.trim() : "";
// filter based on trimmed searchVal
List<StorageEntity> filteredResourceList = resourcesList.stream()
.filter(x -> x.getFileName().contains(trimmedSearchVal)).collect(Collectors.toList());
// inefficient pagination
List<StorageEntity> slicedResourcesList = filteredResourceList.stream().skip((long) (pageNo - 1) * pageSize)
.limit(pageSize).collect(Collectors.toList());
pageInfo.setTotal(filteredResourceList.size());
pageInfo.setTotalList(slicedResourcesList);
result.setData(pageInfo);
putMsg(result, Status.SUCCESS);
return result;
return resourcesList;
}
/**
@ -799,14 +812,23 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
public Result<Object> queryResourceByProgramType(User loginUser, ResourceType type, ProgramType programType) {
Result<Object> result = new Result<>();
Set<Integer> resourceIds = resourcePermissionCheckService
.userOwnedResourceIdsAcquisition(checkResourceType(type), loginUser.getId(), log);
if (resourceIds.isEmpty()) {
result.setData(Collections.emptyList());
putMsg(result, Status.SUCCESS);
User user = userMapper.selectById(loginUser.getId());
if (user == null) {
log.error("user {} not exists", loginUser.getId());
putMsg(result, Status.USER_NOT_EXIST, loginUser.getId());
return result;
}
Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
log.error("tenant not exists");
putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
return result;
}
List<Resource> allResourceList = resourcesMapper.selectBatchIds(resourceIds);
String tenantCode = tenant.getTenantCode();
List<StorageEntity> allResourceList = queryStorageEntityList(loginUser, "", type, tenantCode, true);
String suffix = ".jar";
if (programType != null) {
@ -820,12 +842,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
default:
}
}
List<Resource> resources = new ResourceFilter(suffix, new ArrayList<>(allResourceList)).filter();
// Transform into StorageEntity for compatibility
List<StorageEntity> transformedResourceList = resources.stream()
.map(this::createStorageEntityBasedOnResource)
.collect(Collectors.toList());
Visitor visitor = new ResourceTreeVisitor(transformedResourceList);
List<StorageEntity> resources = new ResourceFilter(suffix, new ArrayList<>(allResourceList)).filter();
Visitor visitor = new ResourceTreeVisitor(resources);
result.setData(visitor.visit("").getChildren());
putMsg(result, Status.SUCCESS);
return result;

24
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/dto/resources/filter/ResourceFilterTest.java

@ -16,7 +16,7 @@
*/
package org.apache.dolphinscheduler.api.dto.resources.filter;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
import java.util.ArrayList;
import java.util.List;
@ -34,25 +34,23 @@ public class ResourceFilterTest {
private static Logger logger = LoggerFactory.getLogger(ResourceFilterTest.class);
@Test
public void filterTest() {
List<Resource> allList = new ArrayList<>();
List<StorageEntity> allList = new ArrayList<>();
Resource resource1 = new Resource(3, -1, "b", "/b", true);
Resource resource2 = new Resource(4, 2, "a1.txt", "/a/a1.txt", false);
Resource resource3 = new Resource(5, 3, "b1.txt", "/b/b1.txt", false);
Resource resource4 = new Resource(6, 3, "b2.jar", "/b/b2.jar", false);
Resource resource5 = new Resource(7, -1, "b2", "/b2", true);
Resource resource6 = new Resource(8, -1, "b2", "/b/b2", true);
Resource resource7 = new Resource(9, 8, "c2.jar", "/b/b2/c2.jar", false);
StorageEntity resource1 = new StorageEntity();
resource1.setFullName("a1.txt");
StorageEntity resource2 = new StorageEntity();
resource2.setFullName("b1.txt");
StorageEntity resource3 = new StorageEntity();
resource3.setFullName("b2.jar");
StorageEntity resource4 = new StorageEntity();
resource4.setFullName("c2.jar");
allList.add(resource1);
allList.add(resource2);
allList.add(resource3);
allList.add(resource4);
allList.add(resource5);
allList.add(resource6);
allList.add(resource7);
ResourceFilter resourceFilter = new ResourceFilter(".jar", allList);
List<Resource> resourceList = resourceFilter.filter();
List<StorageEntity> resourceList = resourceFilter.filter();
Assertions.assertNotNull(resourceList);
resourceList.forEach(t -> logger.info(t.toString()));
}

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -606,9 +606,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
if (baseParam != null) {
List<ResourceInfo> projectResourceFiles = baseParam.getResourceFilesList();
if (CollectionUtils.isNotEmpty(projectResourceFiles)) {
// TODO: Modify this part to accomodate(migrate) oldversionresources in the future.
projectResourceFiles.forEach(file -> resourcesMap.put(file.getResourceName(),
processService.queryTenantCodeByResName(file.getResourceName(), ResourceType.FILE)));
storageOperate.getResourceFileName(file.getResourceName())));
}
}

7
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -1364,7 +1364,8 @@ public class ProcessServiceImpl implements ProcessService {
ResourceInfo mainJar = JSONUtils.parseObject(
JSONUtils.toJsonString(mainJarObj),
ResourceInfo.class);
ResourceInfo resourceInfo = updateResourceInfo(taskDefinition.getId(), mainJar);
ResourceInfo resourceInfo =
updateResourceInfo(taskDefinitionMapper.queryByCode(taskDefinition.getCode()).getId(), mainJar);
if (resourceInfo != null) {
taskParameters.put("mainJar", resourceInfo);
}
@ -1375,7 +1376,8 @@ public class ProcessServiceImpl implements ProcessService {
List<ResourceInfo> resourceInfos = JSONUtils.toList(resourceListStr, ResourceInfo.class);
List<ResourceInfo> updatedResourceInfos = resourceInfos
.stream()
.map(resourceInfo -> updateResourceInfo(taskDefinition.getId(), resourceInfo))
.map(resourceInfo -> updateResourceInfo(
taskDefinitionMapper.queryByCode(taskDefinition.getCode()).getId(), resourceInfo))
.filter(Objects::nonNull)
.collect(Collectors.toList());
taskParameters.put("resourceList", updatedResourceInfos);
@ -1402,7 +1404,6 @@ public class ProcessServiceImpl implements ProcessService {
}
resourceInfo = new ResourceInfo();
resourceInfo.setId(-1);
resourceInfo.setRes(res.getRes());
resourceInfo.setResourceName(resourceFullName);
log.info("updated resource info {}",
JSONUtils.toJsonString(resourceInfo));

2
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -714,13 +714,11 @@ public class ProcessServiceTest {
// test normal situation
ResourceInfo resourceInfoNormal = new ResourceInfo();
resourceInfoNormal.setId(1);
resourceInfoNormal.setRes("test.txt");
resourceInfoNormal.setResourceName("/test.txt");
ResourceInfo updatedResourceInfo3 = processService.updateResourceInfo(0, resourceInfoNormal);
Assertions.assertEquals(-1, updatedResourceInfo3.getId().intValue());
Assertions.assertEquals("test.txt", updatedResourceInfo3.getRes());
Assertions.assertEquals("/test.txt", updatedResourceInfo3.getResourceName());
}

22
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java

@ -21,7 +21,6 @@ import static org.apache.dolphinscheduler.common.constants.Constants.APPID_COLLE
import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COLLECT_WAY;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
@ -122,25 +121,4 @@ public abstract class AbstractYarnTask extends AbstractRemoteTask {
*/
protected abstract String buildCommand();
/**
* set main jar name
*/
protected abstract void setMainJarName();
/**
* Get name of jar resource.
*
* @param mainJar
* @return
*/
protected String getResourceNameOfMainJar(ResourceInfo mainJar) {
if (null == mainJar) {
throw new RuntimeException("The jar for the task is required.");
}
return mainJar.getId() == null
? mainJar.getRes()
// when update resource maybe has error
: mainJar.getResourceName().replaceFirst("/", "");
}
}

4
dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java

@ -180,11 +180,11 @@ public class DataQualityTask extends AbstractYarnTask {
return command;
}
@Override
protected void setMainJarName() {
ResourceInfo mainJar = new ResourceInfo();
String basePath = System.getProperty("user.dir").replace(File.separator + "bin", "");
mainJar.setRes(basePath + File.separator + "libs" + File.separator + CommonUtils.getDataQualityJarName());
mainJar.setResourceName(
basePath + File.separator + "libs" + File.separator + CommonUtils.getDataQualityJarName());
dataQualityParameters.getSparkParameters().setMainJar(mainJar);
}

2
dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/utils/SparkArgsUtils.java

@ -120,7 +120,7 @@ public class SparkArgsUtils {
ResourceInfo mainJar = param.getMainJar();
if (mainJar != null) {
args.add(mainJar.getRes());
args.add(mainJar.getResourceName());
}
String mainArgs = param.getMainArgs();

10
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.stream.StreamTask;
@ -58,7 +57,6 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask {
throw new RuntimeException("flink task params is not valid");
}
flinkParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName();
FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
}
@ -80,14 +78,6 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask {
return command;
}
@Override
protected void setMainJarName() {
ResourceInfo mainJar = flinkParameters.getMainJar();
String resourceName = getResourceNameOfMainJar(mainJar);
mainJar.setRes(resourceName);
flinkParameters.setMainJar(mainJar);
}
@Override
public AbstractParameters getParameters() {
return flinkParameters;

6
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import java.util.HashMap;
import java.util.List;
import org.junit.jupiter.api.Assertions;
@ -38,7 +39,7 @@ public class FlinkArgsUtilsTest {
flinkParameters.setParallelism(4);
ResourceInfo resourceInfo = new ResourceInfo();
resourceInfo.setId(1);
resourceInfo.setResourceName("job");
resourceInfo.setResourceName("/opt/job.jar");
resourceInfo.setRes("/opt/job.jar");
flinkParameters.setMainJar(resourceInfo);
flinkParameters.setMainClass("org.example.Main");
@ -53,6 +54,9 @@ public class FlinkArgsUtilsTest {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskAppId("app-id");
taskExecutionContext.setExecutePath("/tmp/execution");
HashMap<String, String> map = new HashMap<>();
map.put("/opt/job.jar", "/opt/job.jar");
taskExecutionContext.setResources(map);
return taskExecutionContext;
}

6
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java

@ -33,12 +33,12 @@ public class FlinkParametersTest {
Assertions.assertTrue(flinkParameters.getResourceFilesList().isEmpty());
ResourceInfo mainResource = new ResourceInfo();
mainResource.setRes("testFlinkMain-1.0.0-SNAPSHOT.jar");
mainResource.setResourceName("/testFlinkMain-1.0.0-SNAPSHOT.jar");
flinkParameters.setMainJar(mainResource);
List<ResourceInfo> resourceInfos = new LinkedList<>();
ResourceInfo resourceInfo1 = new ResourceInfo();
resourceInfo1.setRes("testFlinkParameters1.jar");
resourceInfo1.setResourceName("/testFlinkParameters1.jar");
resourceInfos.add(resourceInfo1);
flinkParameters.setResourceList(resourceInfos);
@ -47,7 +47,7 @@ public class FlinkParametersTest {
Assertions.assertEquals(2, resourceFilesList.size());
ResourceInfo resourceInfo2 = new ResourceInfo();
resourceInfo2.setRes("testFlinkParameters2.jar");
resourceInfo2.setResourceName("/testFlinkParameters2.jar");
resourceInfos.add(resourceInfo2);
flinkParameters.setResourceList(resourceInfos);

2
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java

@ -297,7 +297,7 @@ public class FlinkArgsUtils {
if (ProgramType.PYTHON == programType) {
args.add(FlinkConstants.FLINK_PYTHON);
}
args.add(mainJar.getRes());
args.add(taskExecutionContext.getResources().get(mainJar.getResourceName()));
}
String mainArgs = flinkParameters.getMainArgs();

16
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
@ -61,7 +60,6 @@ public class FlinkTask extends AbstractYarnTask {
throw new RuntimeException("flink task params is not valid");
}
flinkParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName();
FileUtils.generateScriptFile(taskExecutionContext, flinkParameters);
}
@ -83,20 +81,6 @@ public class FlinkTask extends AbstractYarnTask {
return command;
}
@Override
protected void setMainJarName() {
if (flinkParameters.getProgramType() == ProgramType.SQL) {
log.info("The current flink job type is SQL, will no need to set main jar");
return;
}
ResourceInfo mainJar = flinkParameters.getMainJar();
String resourceName = getResourceNameOfMainJar(mainJar);
mainJar.setRes(resourceName);
flinkParameters.setMainJar(mainJar);
log.info("Success set flink jar: {}", resourceName);
}
@Override
public AbstractParameters getParameters() {
return flinkParameters;

6
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtilsTest.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import java.util.HashMap;
import java.util.List;
import org.junit.jupiter.api.Assertions;
@ -38,7 +39,7 @@ public class FlinkArgsUtilsTest {
flinkParameters.setParallelism(4);
ResourceInfo resourceInfo = new ResourceInfo();
resourceInfo.setId(1);
resourceInfo.setResourceName("job");
resourceInfo.setResourceName("/opt/job.jar");
resourceInfo.setRes("/opt/job.jar");
flinkParameters.setMainJar(resourceInfo);
flinkParameters.setMainClass("org.example.Main");
@ -53,6 +54,9 @@ public class FlinkArgsUtilsTest {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskAppId("app-id");
taskExecutionContext.setExecutePath("/tmp/execution");
HashMap<String, String> map = new HashMap<>();
map.put("/opt/job.jar", "/opt/job.jar");
taskExecutionContext.setResources(map);
return taskExecutionContext;
}

6
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParametersTest.java

@ -33,12 +33,12 @@ public class FlinkParametersTest {
Assertions.assertTrue(flinkParameters.getResourceFilesList().isEmpty());
ResourceInfo mainResource = new ResourceInfo();
mainResource.setRes("testFlinkMain-1.0.0-SNAPSHOT.jar");
mainResource.setResourceName("/testFlinkMain-1.0.0-SNAPSHOT.jar");
flinkParameters.setMainJar(mainResource);
List<ResourceInfo> resourceInfos = new LinkedList<>();
ResourceInfo resourceInfo1 = new ResourceInfo();
resourceInfo1.setRes("testFlinkParameters1.jar");
resourceInfo1.setResourceName("/testFlinkParameters1.jar");
resourceInfos.add(resourceInfo1);
flinkParameters.setResourceList(resourceInfos);
@ -47,7 +47,7 @@ public class FlinkParametersTest {
Assertions.assertEquals(2, resourceFilesList.size());
ResourceInfo resourceInfo2 = new ResourceInfo();
resourceInfo2.setRes("testFlinkParameters2.jar");
resourceInfo2.setResourceName("/testFlinkParameters2.jar");
resourceInfos.add(resourceInfo2);
flinkParameters.setResourceList(resourceInfos);

33
dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.java;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.plugin.task.java.JavaConstants.JAVA_HOME_VAR;
import static org.apache.dolphinscheduler.plugin.task.java.JavaConstants.PUBLIC_CLASS_NAME_REGEX;
@ -93,9 +93,6 @@ public class JavaTask extends AbstractTask {
if (javaParameters == null || !javaParameters.checkParameters()) {
throw new TaskException("java task params is not valid");
}
if (javaParameters.getRunType().equals(JavaConstants.RUN_TYPE_JAR)) {
setMainJarName();
}
log.info("Initialize java task params {}", JSONUtils.toPrettyJsonString(javaParameters));
}
@ -169,44 +166,25 @@ public class JavaTask extends AbstractTask {
return builder.toString();
}
private void setMainJarName() {
ResourceInfo mainJar = javaParameters.getMainJar();
String resourceName = getResourceNameOfMainJar(mainJar);
mainJar.setRes(resourceName);
javaParameters.setMainJar(mainJar);
}
/**
* Construct a shell command for the java -jar Run mode
*
* @return String
**/
protected String buildJarCommand() {
String fullName = javaParameters.getMainJar().getResourceName();
String mainJarName = fullName.substring(0, fullName.lastIndexOf('.'));
mainJarName = mainJarName.substring(mainJarName.lastIndexOf('.') + 1) + ".jar";
String mainJarName = taskRequest.getResources().get(javaParameters.getMainJar().getResourceName());
StringBuilder builder = new StringBuilder();
builder.append(getJavaCommandPath())
.append("java").append(" ")
.append(buildResourcePath()).append(" ")
.append("-jar").append(" ")
.append(taskRequest.getExecutePath())
.append(taskRequest.getExecutePath()).append(FOLDER_SEPARATOR)
.append(mainJarName).append(" ")
.append(javaParameters.getMainArgs().trim()).append(" ")
.append(javaParameters.getJvmArgs().trim());
return builder.toString();
}
private String getResourceNameOfMainJar(ResourceInfo mainJar) {
if (null == mainJar) {
throw new RuntimeException("The jar for the task is required.");
}
return mainJar.getId() == 0
? mainJar.getRes()
// when update resource maybe has error
: mainJar.getResourceName().replaceFirst(SINGLE_SLASH, "");
}
@Override
public void cancel() throws TaskException {
// cancel process
@ -300,10 +278,11 @@ public class JavaTask extends AbstractTask {
builder.append(" ").append(JavaConstants.CLASSPATH_CURRENT_DIR)
.append(JavaConstants.PATH_SEPARATOR)
.append(taskRequest.getExecutePath());
Map<String, String> resourceMap = taskRequest.getResources();
for (ResourceInfo info : javaParameters.getResourceFilesList()) {
builder.append(JavaConstants.PATH_SEPARATOR);
builder.append(taskRequest.getExecutePath())
.append(info.getResourceName());
builder.append(taskRequest.getExecutePath()).append(FOLDER_SEPARATOR)
.append(resourceMap.get(info.getResourceName()));
}
return builder.toString();
}

10
dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/test/java/org/apache/dolphinscheduler/plugin/task/java/JavaTaskTest.java

@ -39,6 +39,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -252,6 +253,11 @@ public class JavaTaskTest {
taskExecutionContext.setTaskParams(JSONUtils.toJsonString(createJavaParametersObject(RUN_TYPE_JAVA)));
taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/test/executepath");
taskExecutionContext.setTaskAppId("runJavaType");
HashMap<String, String> map = new HashMap<>();
map.put("/opt/share/jar/resource2.jar", "opt/share/jar/resource2.jar");
map.put("/opt/share/jar/main.jar", "opt/share/jar/main.jar");
map.put("/JavaTaskTest.java", "JavaTaskTest.java");
taskExecutionContext.setResources(map);
JavaTask javaTask = new JavaTask(taskExecutionContext);
javaTask.init();
return javaTask;
@ -267,6 +273,10 @@ public class JavaTaskTest {
taskExecutionContext.setTaskParams(JSONUtils.toJsonString(createJavaParametersObject(RUN_TYPE_JAR)));
taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/test/executepath");
taskExecutionContext.setTaskAppId("runJavaType");
HashMap<String, String> map = new HashMap<>();
map.put("/opt/share/jar/resource2.jar", "opt/share/jar/resource2.jar");
map.put("/opt/share/jar/main.jar", "opt/share/jar/main.jar");
taskExecutionContext.setResources(map);
JavaTask javaTask = new JavaTask(taskExecutionContext);
javaTask.init();
return javaTask;

5
dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceArgsUtils.java

@ -22,6 +22,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAR;
import static org.apache.dolphinscheduler.plugin.task.mr.MapReduceTaskConstants.MR_NAME;
import static org.apache.dolphinscheduler.plugin.task.mr.MapReduceTaskConstants.MR_QUEUE;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
@ -45,13 +46,13 @@ public class MapReduceArgsUtils {
* @param param param
* @return argument list
*/
public static List<String> buildArgs(MapReduceParameters param) {
public static List<String> buildArgs(MapReduceParameters param, TaskExecutionContext taskExecutionContext) {
List<String> args = new ArrayList<>();
ResourceInfo mainJar = param.getMainJar();
if (mainJar != null) {
args.add(JAR);
args.add(mainJar.getRes());
args.add(taskExecutionContext.getResources().get(mainJar.getResourceName()));
}
ProgramType programType = param.getProgramType();

13
dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java

@ -22,7 +22,6 @@ import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
@ -73,7 +72,6 @@ public class MapReduceTask extends AbstractYarnTask {
}
mapreduceParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName();
// replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
@ -101,7 +99,7 @@ public class MapReduceTask extends AbstractYarnTask {
args.add(MAPREDUCE_COMMAND);
// other parameters
args.addAll(MapReduceArgsUtils.buildArgs(mapreduceParameters));
args.addAll(MapReduceArgsUtils.buildArgs(mapreduceParameters, taskExecutionContext));
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args),
taskExecutionContext.getDefinedParams());
@ -110,15 +108,6 @@ public class MapReduceTask extends AbstractYarnTask {
return command;
}
@Override
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = mapreduceParameters.getMainJar();
String resourceName = getResourceNameOfMainJar(mainJar);
mainJar.setRes(resourceName);
mapreduceParameters.setMainJar(mainJar);
}
@Override
public AbstractParameters getParameters() {
return mapreduceParameters;

1
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java

@ -156,6 +156,7 @@ public class SeatunnelTask extends AbstractRemoteTask {
} else {
seatunnelParameters.getResourceList().forEach(resourceInfo -> {
args.add(CONFIG_OPTIONS);
// TODO: Need further check for refactored resource center
// TODO Currently resourceName is `/xxx.sh`, it has more `/` and needs to be optimized
args.add(resourceInfo.getResourceName().substring(1));
});

14
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java

@ -84,9 +84,6 @@ public class SparkTask extends AbstractYarnTask {
}
sparkParameters.setQueue(taskExecutionContext.getQueue());
if (sparkParameters.getProgramType() != ProgramType.SQL) {
setMainJarName();
}
log.info("Initialize spark task params {}", JSONUtils.toPrettyJsonString(sparkParameters));
}
@ -191,7 +188,7 @@ public class SparkTask extends AbstractYarnTask {
ResourceInfo mainJar = sparkParameters.getMainJar();
if (programType != ProgramType.SQL) {
args.add(mainJar.getRes());
args.add(taskExecutionContext.getResources().get(mainJar.getResourceName()));
}
String mainArgs = sparkParameters.getMainArgs();
@ -276,15 +273,6 @@ public class SparkTask extends AbstractYarnTask {
return script;
}
@Override
protected void setMainJarName() {
// main jar
ResourceInfo mainJar = sparkParameters.getMainJar();
String resourceName = getResourceNameOfMainJar(mainJar);
mainJar.setRes(resourceName);
sparkParameters.setMainJar(mainJar);
}
@Override
public AbstractParameters getParameters() {
return sparkParameters;

6
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParametersTest.java

@ -33,12 +33,12 @@ public class SparkParametersTest {
Assertions.assertTrue(sparkParameters.getResourceFilesList().isEmpty());
ResourceInfo mainResource = new ResourceInfo();
mainResource.setRes("testSparkMain-1.0.0-SNAPSHOT.jar\"");
mainResource.setResourceName("testSparkMain-1.0.0-SNAPSHOT.jar\"");
sparkParameters.setMainJar(mainResource);
LinkedList<ResourceInfo> resourceInfos = new LinkedList<>();
ResourceInfo resourceInfo1 = new ResourceInfo();
resourceInfo1.setRes("testSparkParameters1.jar");
resourceInfo1.setResourceName("testSparkParameters1.jar");
resourceInfos.add(resourceInfo1);
sparkParameters.setResourceList(resourceInfos);
@ -47,7 +47,7 @@ public class SparkParametersTest {
Assertions.assertEquals(2, resourceFilesList.size());
ResourceInfo resourceInfo2 = new ResourceInfo();
resourceInfo2.setRes("testSparkParameters2.jar");
resourceInfo2.setResourceName("testSparkParameters2.jar");
resourceInfos.add(resourceInfo2);
sparkParameters.setResourceList(resourceInfos);

6
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import java.util.Collections;
import java.util.HashMap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -59,6 +60,9 @@ public class SparkTaskTest {
public void testBuildCommandWithSparkSubmit() {
String parameters = buildSparkParametersWithSparkSubmit();
TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
HashMap<String, String> map = new HashMap<>();
map.put("/lib/dolphinscheduler-task-spark.jar", "/lib/dolphinscheduler-task-spark.jar");
Mockito.when(taskExecutionContext.getResources()).thenReturn(map);
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
SparkTask sparkTask = Mockito.spy(new SparkTask(taskExecutionContext));
sparkTask.init();
@ -73,7 +77,7 @@ public class SparkTaskTest {
"--conf spark.executor.cores=2 " +
"--conf spark.executor.memory=1G " +
"--name spark " +
"lib/dolphinscheduler-task-spark.jar");
"/lib/dolphinscheduler-task-spark.jar");
}
private String buildSparkParametersWithSparkSql() {

4
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java

@ -87,10 +87,6 @@ public class SqoopTask extends AbstractYarnTask {
}
@Override
protected void setMainJarName() {
}
@Override
public AbstractParameters getParameters() {
return sqoopParameters;

5
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-java-task-main-jar.ts

@ -59,12 +59,13 @@ export function useJavaTaskMainJar(model: { [field: string]: any }): IJsonItem {
name: t('project.node.main_package'),
span: mainJarSpan,
props: {
checkable: true,
cascade: true,
showPath: true,
checkStrategy: 'child',
placeholder: t('project.node.main_package_tips'),
keyField: 'id',
labelField: 'fullName'
keyField: 'fullName',
labelField: 'name'
},
validate: {
trigger: ['input', 'blur'],

5
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-main-jar.ts

@ -61,12 +61,13 @@ export function useMainJar(model: { [field: string]: any }): IJsonItem {
name: t('project.node.main_package'),
span: mainJarSpan,
props: {
checkable: true,
cascade: true,
showPath: true,
checkStrategy: 'child',
placeholder: t('project.node.main_package_tips'),
keyField: 'id',
labelField: 'fullName'
keyField: 'fullName',
labelField: 'name'
},
validate: {
trigger: ['input', 'blur'],

8
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java

@ -133,14 +133,12 @@ public class TaskExecutionCheckerUtils {
for (Pair<String, String> fileDownload : downloadFiles) {
try {
String fullName = fileDownload.getLeft();
// we do not actually get & need tenantCode with this implementation right now.
String tenantCode = fileDownload.getRight();
// TODO: Need a better way to get fileName because this implementation is tricky.
String fileName = storageOperate.getResourceFileName(fullName);
String fileName = fileDownload.getRight();
logger.info("get resource file from path:{}", fullName);
long resourceDownloadStartTime = System.currentTimeMillis();
storageOperate.download(tenantCode, fullName, execLocalPath + File.separator + fileName, false,
storageOperate.download(taskExecutionContext.getTenantCode(), fullName,
execLocalPath + File.separator + fileName, false,
true);
WorkerServerMetrics
.recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime);

Loading…
Cancel
Save