From 4bda3cd92c0228fb66f48785fd34e491775d4e08 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Wed, 1 Apr 2020 18:29:47 +0800 Subject: [PATCH] conflict reslove --- .../server/worker/runner/TaskExecuteThread.java | 10 +++++++--- .../dolphinscheduler/server/zk/ZKMasterClient.java | 10 +++++----- .../dolphinscheduler/service/zk/AbstractZKClient.java | 2 +- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 347dfb620a..4d8600d652 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -21,6 +21,7 @@ import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.*; @@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.util.*; import java.util.stream.Collectors; +import java.util.stream.Stream; /** @@ -211,9 +213,11 @@ public class TaskExecuteThread implements Runnable { AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); if (baseParam != null) { - List projectResourceFiles = baseParam.getResourceFilesList(); + List projectResourceFiles = baseParam.getResourceFilesList(); if (projectResourceFiles != null) { - projectFiles.addAll(projectResourceFiles); + Stream resourceInfotream = projectResourceFiles.stream().map(resourceInfo -> resourceInfo.getRes()); + projectFiles.addAll(resourceInfotream.collect(Collectors.toList())); + } } @@ -236,7 +240,7 @@ public class TaskExecuteThread implements Runnable { if (!resFile.exists()) { try { // query the tenant code of the resource according to the name of the resource - String resHdfsPath = HadoopUtils.getHdfsFilename(tenantCode, resource); + String resHdfsPath = HadoopUtils.getHdfsResourceFileName(tenantCode, resource); logger.info("get resource file from hdfs :{}", resHdfsPath); HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + resource, false, true); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 0e9a83944d..be8ca3f298 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -78,7 +78,7 @@ public class ZKMasterClient extends AbstractZKClient { } }catch (Exception e){ - logger.error("master start up exception",e); + logger.error("master start up exception",e); }finally { releaseMutex(mutex); } @@ -97,13 +97,13 @@ public class ZKMasterClient extends AbstractZKClient { */ @Override protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { - if(path.startsWith(getZNodeParentPath(ZKNodeType.MASTER)+Constants.SINGLE_SLASH)){ //monitor master + //monitor master + if(path.startsWith(getZNodeParentPath(ZKNodeType.MASTER)+Constants.SINGLE_SLASH)){ handleMasterEvent(event,path); - - }else if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){ //monitor worker + }else if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){ + //monitor worker handleWorkerEvent(event,path); } - //other path event, ignore } /** diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java index a83ff9e6ae..4689a88fe0 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java @@ -126,7 +126,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { int i = 0; for (Map.Entry entry : masterMap.entrySet()) { Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue()); - masterServer.setZkDirectory( parentPath + "/"+ entry.getKey()); + masterServer.setZkDirectory(parentPath + "/"+ entry.getKey()); masterServer.setId(i); i ++; masterServers.add(masterServer);