Browse Source

conflict reslove

pull/2/head
qiaozhanwei 4 years ago
parent
commit
4bda3cd92c
  1. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  2. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  3. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -21,6 +21,7 @@ import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.*;
@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
/** /**
@ -211,9 +213,11 @@ public class TaskExecuteThread implements Runnable {
AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
if (baseParam != null) { if (baseParam != null) {
List<String> projectResourceFiles = baseParam.getResourceFilesList(); List<ResourceInfo> projectResourceFiles = baseParam.getResourceFilesList();
if (projectResourceFiles != null) { if (projectResourceFiles != null) {
projectFiles.addAll(projectResourceFiles); Stream<String> resourceInfotream = projectResourceFiles.stream().map(resourceInfo -> resourceInfo.getRes());
projectFiles.addAll(resourceInfotream.collect(Collectors.toList()));
} }
} }
@ -236,7 +240,7 @@ public class TaskExecuteThread implements Runnable {
if (!resFile.exists()) { if (!resFile.exists()) {
try { try {
// query the tenant code of the resource according to the name of the resource // query the tenant code of the resource according to the name of the resource
String resHdfsPath = HadoopUtils.getHdfsFilename(tenantCode, resource); String resHdfsPath = HadoopUtils.getHdfsResourceFileName(tenantCode, resource);
logger.info("get resource file from hdfs :{}", resHdfsPath); logger.info("get resource file from hdfs :{}", resHdfsPath);
HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + resource, false, true); HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + resource, false, true);

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -78,7 +78,7 @@ public class ZKMasterClient extends AbstractZKClient {
} }
}catch (Exception e){ }catch (Exception e){
logger.error("master start up exception",e); logger.error("master start up exception",e);
}finally { }finally {
releaseMutex(mutex); releaseMutex(mutex);
} }
@ -97,13 +97,13 @@ public class ZKMasterClient extends AbstractZKClient {
*/ */
@Override @Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
if(path.startsWith(getZNodeParentPath(ZKNodeType.MASTER)+Constants.SINGLE_SLASH)){ //monitor master //monitor master
if(path.startsWith(getZNodeParentPath(ZKNodeType.MASTER)+Constants.SINGLE_SLASH)){
handleMasterEvent(event,path); handleMasterEvent(event,path);
}else if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){
}else if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){ //monitor worker //monitor worker
handleWorkerEvent(event,path); handleWorkerEvent(event,path);
} }
//other path event, ignore
} }
/** /**

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java

@ -126,7 +126,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
int i = 0; int i = 0;
for (Map.Entry<String, String> entry : masterMap.entrySet()) { for (Map.Entry<String, String> entry : masterMap.entrySet()) {
Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue()); Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue());
masterServer.setZkDirectory( parentPath + "/"+ entry.getKey()); masterServer.setZkDirectory(parentPath + "/"+ entry.getKey());
masterServer.setId(i); masterServer.setId(i);
i ++; i ++;
masterServers.add(masterServer); masterServers.add(masterServer);

Loading…
Cancel
Save