Browse Source

refactor-worker merge to dev bug fix

pull/2/head
qiaozhanwei 4 years ago
parent
commit
b5546ddb68
  1. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  2. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  3. 20
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -378,7 +378,7 @@ public class TaskInstance implements Serializable {
public boolean isSubProcess(){ public boolean isSubProcess(){
return TaskType.SUB_PROCESS.getDescp().equals(this.taskType); return TaskType.SUB_PROCESS.equals(TaskType.valueOf(this.taskType));
} }
public String getDependency(){ public String getDependency(){

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -28,10 +28,7 @@ import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.EnumUtils; import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.*; import org.apache.dolphinscheduler.server.entity.*;
@ -299,6 +296,10 @@ public class TaskPriorityQueueConsumer extends Thread{
} }
} }
if (CollectionUtils.isEmpty(resourceIdsSet)){
return null;
}
Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]); Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]);
List<Resource> resources = processService.listResourceByIds(resourceIds); List<Resource> resources = processService.listResourceByIds(resourceIds);

20
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -202,26 +202,6 @@ public class TaskExecuteThread implements Runnable {
} }
/**
* create project resource files
*/
private List<String> createProjectResFiles(TaskNode taskNode) throws Exception{
Set<String> projectFiles = new HashSet<>();
AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
if (baseParam != null) {
List<ResourceInfo> projectResourceFiles = baseParam.getResourceFilesList();
if (projectResourceFiles != null) {
Stream<String> resourceInfotream = projectResourceFiles.stream().map(resourceInfo -> resourceInfo.getRes());
projectFiles.addAll(resourceInfotream.collect(Collectors.toList()));
}
}
return new ArrayList<>(projectFiles);
}
/** /**
* download resource file * download resource file
* *

Loading…
Cancel
Save