From ed78aea2f8a473eac747993c1a9b4fcfa537a6e9 Mon Sep 17 00:00:00 2001 From: lenboo Date: Sat, 1 Aug 2020 16:48:23 +0800 Subject: [PATCH 1/7] fix bug: there exists some problems in checking task dependency --- .../master/runner/MasterExecThread.java | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index c0bc3aaea8..14b9a3aea6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner; import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; +import net.sf.jsqlparser.expression.NextValExpression; import org.apache.commons.io.FileUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; @@ -663,7 +664,6 @@ public class MasterExecThread implements Runnable { if(startNodes.contains(taskName)){ return DependResult.SUCCESS; } - TaskNode taskNode = dag.getNode(taskName); List depNameList = taskNode.getDepList(); for(String depsNode : depNameList ){ @@ -672,25 +672,30 @@ public class MasterExecThread implements Runnable { || skipTaskNodeList.containsKey(depsNode)){ continue; } - // dependencies must be fully completed + // all the dependencies must be completed if(!completeTaskList.containsKey(depsNode)){ return DependResult.WAITING; } + // depend node has already complete. ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); - // conditions task would not return failed. - if(depTaskState.typeIsFailure() - && !DagHelper.haveConditionsAfterNode(depsNode, dag ) - && !dag.getNode(depsNode).isConditionsTask()){ - return DependResult.FAILED; - } - if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){ return DependResult.WAITING; } + // ignore task state if current task is condition + if(taskNode.isConditionsTask()){ + continue; + } + if(dag.getNode(depsNode).isConditionsTask()){ + //condition task need check the branch to run + List nextTaskList = parseConditionTask(depsNode); + if(!nextTaskList.contains(taskName)){ + return DependResult.FAILED; + } + }else if(depTaskState.typeIsFailure()){ + return DependResult.FAILED; + } } - logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray())); - return DependResult.SUCCESS; } From 0e885cd0eecff4994bed2c2020c728ff48ee0fa1 Mon Sep 17 00:00:00 2001 From: lenboo Date: Sat, 1 Aug 2020 17:26:30 +0800 Subject: [PATCH 2/7] remove unused import --- .../dolphinscheduler/server/master/runner/MasterExecThread.java | 1 - 1 file changed, 1 deletion(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 14b9a3aea6..097e8d74d2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.master.runner; import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; -import net.sf.jsqlparser.expression.NextValExpression; import org.apache.commons.io.FileUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; From cdd76d6b118a65f863f3581f0911807f6fa43fc8 Mon Sep 17 00:00:00 2001 From: lenboo Date: Sun, 2 Aug 2020 11:13:50 +0800 Subject: [PATCH 3/7] refactor depend result --- .../master/runner/MasterExecThread.java | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 097e8d74d2..479d6560a6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -675,7 +675,6 @@ public class MasterExecThread implements Runnable { if(!completeTaskList.containsKey(depsNode)){ return DependResult.WAITING; } - // depend node has already complete. ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); if(depTaskState.typeIsPause() || depTaskState.typeIsCancel()){ return DependResult.WAITING; @@ -684,13 +683,7 @@ public class MasterExecThread implements Runnable { if(taskNode.isConditionsTask()){ continue; } - if(dag.getNode(depsNode).isConditionsTask()){ - //condition task need check the branch to run - List nextTaskList = parseConditionTask(depsNode); - if(!nextTaskList.contains(taskName)){ - return DependResult.FAILED; - } - }else if(depTaskState.typeIsFailure()){ + if(!dependTaskSuccess(depsNode, taskName)){ return DependResult.FAILED; } } @@ -698,6 +691,28 @@ public class MasterExecThread implements Runnable { return DependResult.SUCCESS; } + /** + * depend node is completed, but here need check the condition task branch is the next node + * @param dependNodeName + * @param nextNodeName + * @return + */ + private boolean dependTaskSuccess(String dependNodeName, String nextNodeName){ + if(dag.getNode(dependNodeName).isConditionsTask()){ + //condition task need check the branch to run + List nextTaskList = parseConditionTask(dependNodeName); + if(!nextTaskList.contains(nextNodeName)){ + return false; + } + }else { + ExecutionStatus depTaskState = completeTaskList.get(dependNodeName).getState(); + if(depTaskState.typeIsFailure()){ + return false; + } + } + return true; + } + /** * query task instance by complete state From 88bc30b32b9af8dc1e8b306fb7808175e50f8fab Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Tue, 4 Aug 2020 14:55:49 +0800 Subject: [PATCH 4/7] [Improvement-3327][api]support re-upload the resource file (#3395) --- .../api/controller/ResourcesController.java | 35 +++---- .../api/service/ResourcesService.java | 91 ++++++++++++++++--- .../api/service/ResourcesServiceTest.java | 19 ++-- 3 files changed, 102 insertions(+), 43 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java index cc09b2d650..2af8ccd767 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java @@ -16,6 +16,11 @@ */ package org.apache.dolphinscheduler.api.controller; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; +import io.swagger.annotations.ApiOperation; +import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.ResourcesService; @@ -26,11 +31,6 @@ import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.entity.User; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiImplicitParam; -import io.swagger.annotations.ApiImplicitParams; -import io.swagger.annotations.ApiOperation; -import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -63,23 +63,15 @@ public class ResourcesController extends BaseController { private UdfFuncService udfFuncService; /** - * create resource + * create directory * - * @param loginUser login user - * @param alias alias - * @param description description - * @param type type - * @return create result code - */ - - /** * @param loginUser login user * @param type type * @param alias alias * @param description description * @param pid parent id * @param currentDir current directory - * @return + * @return create result code */ @ApiOperation(value = "createDirctory", notes = "CREATE_RESOURCE_NOTES") @ApiImplicitParams({ @@ -140,6 +132,7 @@ public class ResourcesController extends BaseController { * @param resourceId resource id * @param type resource type * @param description description + * @param file resource file * @return update result code */ @ApiOperation(value = "updateResource", notes = "UPDATE_RESOURCE_NOTES") @@ -147,7 +140,8 @@ public class ResourcesController extends BaseController { @ApiImplicitParam(name = "id", value = "RESOURCE_ID", required = true, dataType = "Int", example = "100"), @ApiImplicitParam(name = "type", value = "RESOURCE_TYPE", required = true, dataType = "ResourceType"), @ApiImplicitParam(name = "name", value = "RESOURCE_NAME", required = true, dataType = "String"), - @ApiImplicitParam(name = "description", value = "RESOURCE_DESC", dataType = "String") + @ApiImplicitParam(name = "description", value = "RESOURCE_DESC", dataType = "String"), + @ApiImplicitParam(name = "file", value = "RESOURCE_FILE", required = true, dataType = "MultipartFile") }) @PostMapping(value = "/update") @ApiException(UPDATE_RESOURCE_ERROR) @@ -155,10 +149,11 @@ public class ResourcesController extends BaseController { @RequestParam(value = "id") int resourceId, @RequestParam(value = "type") ResourceType type, @RequestParam(value = "name") String alias, - @RequestParam(value = "description", required = false) String description) { - logger.info("login user {}, update resource, type: {}, resource alias: {}, desc: {}", - loginUser.getUserName(), type, alias, description); - return resourceService.updateResource(loginUser, resourceId, alias, description, type); + @RequestParam(value = "description", required = false) String description, + @RequestParam(value = "file" ,required = false) MultipartFile file) { + logger.info("login user {}, update resource, type: {}, resource alias: {}, desc: {}, file: {}", + loginUser.getUserName(), type, alias, description, file); + return resourceService.updateResource(loginUser, resourceId, alias, description, type, file); } /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java index 15b106f63e..e0ee7117c4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java @@ -287,6 +287,7 @@ public class ResourcesService extends BaseService { * @param name name * @param desc description * @param type resource type + * @param file resource file * @return update result code */ @Transactional(rollbackFor = Exception.class) @@ -294,7 +295,8 @@ public class ResourcesService extends BaseService { int resourceId, String name, String desc, - ResourceType type) { + ResourceType type, + MultipartFile file) { Result result = new Result(); // if resource upload startup @@ -330,6 +332,42 @@ public class ResourcesService extends BaseService { return result; } + if (file != null) { + + // file is empty + if (file.isEmpty()) { + logger.error("file is empty: {}", file.getOriginalFilename()); + putMsg(result, Status.RESOURCE_FILE_IS_EMPTY); + return result; + } + + // file suffix + String fileSuffix = FileUtils.suffix(file.getOriginalFilename()); + String nameSuffix = FileUtils.suffix(name); + + // determine file suffix + if (!(StringUtils.isNotEmpty(fileSuffix) && fileSuffix.equalsIgnoreCase(nameSuffix))) { + /** + * rename file suffix and original suffix must be consistent + */ + logger.error("rename file suffix and original suffix must be consistent: {}", file.getOriginalFilename()); + putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE); + return result; + } + + //If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar + if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(FileUtils.suffix(originFullName))) { + logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg()); + putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR); + return result; + } + if (file.getSize() > Constants.MAX_FILE_SIZE) { + logger.error("file size is too large: {}", file.getOriginalFilename()); + putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT); + return result; + } + } + // query tenant by user id String tenantCode = getTenantCode(resource.getUserId(),result); if (StringUtils.isEmpty(tenantCode)){ @@ -379,26 +417,33 @@ public class ResourcesService extends BaseService { } // updateResource data - List childrenResource = listAllChildren(resource,false); + Date now = new Date(); resource.setAlias(name); resource.setFullName(fullName); resource.setDescription(desc); resource.setUpdateTime(now); + if (file != null) { + resource.setFileName(file.getOriginalFilename()); + resource.setSize(file.getSize()); + } try { resourcesMapper.updateById(resource); - if (resource.isDirectory() && CollectionUtils.isNotEmpty(childrenResource)) { - String matcherFullName = Matcher.quoteReplacement(fullName); - List childResourceList = new ArrayList<>(); - List resourceList = resourcesMapper.listResourceByIds(childrenResource.toArray(new Integer[childrenResource.size()])); - childResourceList = resourceList.stream().map(t -> { - t.setFullName(t.getFullName().replaceFirst(originFullName, matcherFullName)); - t.setUpdateTime(now); - return t; - }).collect(Collectors.toList()); - resourcesMapper.batchUpdateResource(childResourceList); + if (resource.isDirectory()) { + List childrenResource = listAllChildren(resource,false); + if (CollectionUtils.isNotEmpty(childrenResource)) { + String matcherFullName = Matcher.quoteReplacement(fullName); + List childResourceList = new ArrayList<>(); + List resourceList = resourcesMapper.listResourceByIds(childrenResource.toArray(new Integer[childrenResource.size()])); + childResourceList = resourceList.stream().map(t -> { + t.setFullName(t.getFullName().replaceFirst(originFullName, matcherFullName)); + t.setUpdateTime(now); + return t; + }).collect(Collectors.toList()); + resourcesMapper.batchUpdateResource(childResourceList); + } } putMsg(result, Status.SUCCESS); @@ -414,11 +459,31 @@ public class ResourcesService extends BaseService { logger.error(Status.UPDATE_RESOURCE_ERROR.getMsg(), e); throw new ServiceException(Status.UPDATE_RESOURCE_ERROR); } + // if name unchanged, return directly without moving on HDFS - if (originResourceName.equals(name)) { + if (originResourceName.equals(name) && file == null) { return result; } + if (file != null) { + // fail upload + if (!upload(loginUser, fullName, file, type)) { + logger.error("upload resource: {} file: {} failed.", name, file.getOriginalFilename()); + putMsg(result, Status.HDFS_OPERATION_ERROR); + throw new RuntimeException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename())); + } + if (!fullName.equals(originFullName)) { + try { + HadoopUtils.getInstance().delete(originHdfsFileName,false); + } catch (IOException e) { + logger.error(e.getMessage(),e); + throw new RuntimeException(String.format("delete resource: %s failed.", originFullName)); + } + } + return result; + } + + // get the path of dest file in hdfs String destHdfsFileName = HadoopUtils.getHdfsFileName(resource.getType(),tenantCode,fullName); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java index 407f6b587f..1e3c95d775 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.api.service; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; @@ -159,19 +158,19 @@ public class ResourcesServiceTest { PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); User user = new User(); //HDFS_NOT_STARTUP - Result result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE); + Result result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null); logger.info(result.toString()); Assert.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(),result.getMsg()); //RESOURCE_NOT_EXIST Mockito.when(resourcesMapper.selectById(1)).thenReturn(getResource()); PowerMockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); - result = resourcesService.updateResource(user,0,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE); + result = resourcesService.updateResource(user,0,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null); logger.info(result.toString()); Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(),result.getMsg()); //USER_NO_OPERATION_PERM - result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest","ResourcesServiceTest",ResourceType.FILE,null); logger.info(result.toString()); Assert.assertEquals(Status.USER_NO_OPERATION_PERM.getMsg(),result.getMsg()); @@ -186,7 +185,7 @@ public class ResourcesServiceTest { } catch (IOException e) { logger.error(e.getMessage(),e); } - result = resourcesService.updateResource(user, 1, "ResourcesServiceTest1.jar", "ResourcesServiceTest", ResourceType.UDF); + result = resourcesService.updateResource(user, 1, "ResourcesServiceTest1.jar", "ResourcesServiceTest", ResourceType.UDF,null); Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(),result.getMsg()); //SUCCESS @@ -199,25 +198,25 @@ public class ResourcesServiceTest { logger.error(e.getMessage(),e); } - result = resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest.jar","ResourcesServiceTest",ResourceType.FILE,null); logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg()); //RESOURCE_EXIST Mockito.when(resourcesMapper.queryResourceList("/ResourcesServiceTest1.jar", 0, 0)).thenReturn(getResourceList()); - result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.FILE); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.FILE,null); logger.info(result.toString()); Assert.assertEquals(Status.RESOURCE_EXIST.getMsg(),result.getMsg()); //USER_NOT_EXIST Mockito.when(userMapper.selectById(Mockito.anyInt())).thenReturn(null); - result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF,null); logger.info(result.toString()); Assert.assertTrue(Status.USER_NOT_EXIST.getCode() == result.getCode()); //TENANT_NOT_EXIST Mockito.when(userMapper.selectById(1)).thenReturn(getUser()); Mockito.when(tenantMapper.queryById(Mockito.anyInt())).thenReturn(null); - result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest",ResourceType.UDF,null); logger.info(result.toString()); Assert.assertEquals(Status.TENANT_NOT_EXIST.getMsg(),result.getMsg()); @@ -231,7 +230,7 @@ public class ResourcesServiceTest { logger.error(e.getMessage(),e); } - result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF); + result = resourcesService.updateResource(user,1,"ResourcesServiceTest1.jar","ResourcesServiceTest1.jar",ResourceType.UDF,null); logger.info(result.toString()); Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg()); From a3793be7dabb7aa84eb518b592d82860a1cc3003 Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Tue, 4 Aug 2020 15:04:56 +0800 Subject: [PATCH 5/7] [Fix-3390][server]Running hive sql task need find the hdfs path correctly (#3396) * [Fix-3390][api]Running hive sql task need find the hdfs path correctly * [Fix-3390][api]Running hive sql task need find the hdfs path correctly * update soft version --- .../common/process/ResourceInfo.java | 18 ------------------ .../server/utils/UDFUtils.java | 6 ++++-- sql/soft_version | 2 +- 3 files changed, 5 insertions(+), 21 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java index c460354fe5..287f7267bc 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java @@ -42,22 +42,4 @@ public class ResourceInfo { public void setRes(String res) { this.res = res; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ResourceInfo that = (ResourceInfo) o; - - if (id != that.id) return false; - return res.equals(that.res); - } - - @Override - public int hashCode() { - int result = id; - result = 31 * result + res.hashCode(); - return result; - } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java index 3a8c8fe7d6..1bc79ab94e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java @@ -70,14 +70,16 @@ public class UDFUtils { */ private static void buildJarSql(List sqls, Map udfFuncTenantCodeMap) { String defaultFS = HadoopUtils.getInstance().getConfiguration().get(Constants.FS_DEFAULTFS); - + String resourceFullName; Set> entries = udfFuncTenantCodeMap.entrySet(); for (Map.Entry entry:entries){ String uploadPath = HadoopUtils.getHdfsUdfDir(entry.getValue()); if (!uploadPath.startsWith("hdfs:")) { uploadPath = defaultFS + uploadPath; } - sqls.add(String.format("add jar %s%s", uploadPath, entry.getKey().getResourceName())); + resourceFullName = entry.getKey().getResourceName(); + resourceFullName = resourceFullName.startsWith("/") ? resourceFullName : String.format("/%s",resourceFullName); + sqls.add(String.format("add jar %s%s", uploadPath, resourceFullName)); } } diff --git a/sql/soft_version b/sql/soft_version index 6261a05bb0..d5e98f723f 100644 --- a/sql/soft_version +++ b/sql/soft_version @@ -1 +1 @@ -1.3.1 \ No newline at end of file +1.3.2 \ No newline at end of file From f936b07fdf1a145147e58eb2dcf392465764607c Mon Sep 17 00:00:00 2001 From: lenboo Date: Tue, 4 Aug 2020 16:49:30 +0800 Subject: [PATCH 6/7] fix bug: Fix master task dependency check bug --- .../server/master/runner/MasterExecThread.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 479d6560a6..6692598ce6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -667,10 +667,12 @@ public class MasterExecThread implements Runnable { List depNameList = taskNode.getDepList(); for(String depsNode : depNameList ){ if(!dag.containsNode(depsNode) - || forbiddenTaskList.containsKey(depsNode) - || skipTaskNodeList.containsKey(depsNode)){ + || forbiddenTaskList.containsKey(depsNode)){ continue; } + if(skipTaskNodeList.containsKey(depsNode)){ + return DependResult.FAILED; + } // all the dependencies must be completed if(!completeTaskList.containsKey(depsNode)){ return DependResult.WAITING; From e4097a488cc03ee58713f3de28f6e8a77a840e44 Mon Sep 17 00:00:00 2001 From: Yelli Date: Wed, 5 Aug 2020 10:20:34 +0800 Subject: [PATCH 7/7] cancel spark task version check (#3406) Co-authored-by: Eights-Li --- .../dolphinscheduler/common/task/spark/SparkParameters.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java index 4e58201bf3..32a2a6b05d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java @@ -216,7 +216,7 @@ public class SparkParameters extends AbstractParameters { @Override public boolean checkParameters() { - return mainJar != null && programType != null && sparkVersion != null; + return mainJar != null && programType != null; } @Override