Browse Source

fix MasterBaseTaskExecThread submit method bug

pull/2/head
dk.technoboy 5 years ago
parent
commit
7bb0bebffb
  1. 22
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  2. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java

22
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -114,30 +114,29 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval(); Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval();
int retryTimes = 1; int retryTimes = 1;
boolean taskDBFlag = false; boolean submitDB = false;
boolean taskQueueFlag = false; boolean submitQueue = false;
TaskInstance task = null; TaskInstance task = null;
while (true){ while (retryTimes <= commitRetryTimes){
try { try {
if(!taskDBFlag){ if(!submitDB){
// submit task to db // submit task to db
task = processDao.submitTask(taskInstance, processInstance); task = processDao.submitTask(taskInstance, processInstance);
if(task != null && task.getId() != 0){ if(task != null && task.getId() != 0){
taskDBFlag = true; submitDB = true;
} }
} }
if(taskDBFlag && !taskQueueFlag){ if(submitDB && !submitQueue){
// submit task to queue // submit task to queue
taskQueueFlag = processDao.submitTaskToQueue(task); submitQueue = processDao.submitTaskToQueue(task);
} }
if(taskDBFlag && taskQueueFlag){ if(submitDB && submitQueue){
return task; return task;
} }
if(!taskDBFlag){ if(!submitDB){
logger.error("task commit to db failed , task has already retry {} times, please check the database", retryTimes); logger.error("task commit to db failed , task has already retry {} times, please check the database", retryTimes);
}else if(!taskQueueFlag){ }else if(!submitDB){
logger.error("task commit to queue failed , task has already retry {} times, please check the database", retryTimes); logger.error("task commit to queue failed , task has already retry {} times, please check the database", retryTimes);
} }
Thread.sleep(commitRetryInterval); Thread.sleep(commitRetryInterval);
} catch (Exception e) { } catch (Exception e) {
@ -145,6 +144,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
} }
retryTimes += 1; retryTimes += 1;
} }
return task;
} }
/** /**

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java

@ -74,6 +74,9 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
public Boolean submitWaitComplete() { public Boolean submitWaitComplete() {
Boolean result = false; Boolean result = false;
this.taskInstance = submit(); this.taskInstance = submit();
if(this.taskInstance == null){
return result;
}
if(!this.taskInstance.getState().typeIsFinished()) { if(!this.taskInstance.getState().typeIsFinished()) {
result = waitTaskQuit(); result = waitTaskQuit();
} }

Loading…
Cancel
Save