Browse Source

fix MasterBaseTaskExecThread submit method bug (#1532)

* fix #1515

* sleep when resource in not satisfy. fix #1522

* add sleep 1s for no command

* fix MasterBaseTaskExecThread submit method bug

* updates

* add log
pull/2/head
Tboy 5 years ago committed by qiaozhanwei
parent
commit
26d98931cf
  1. 26
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  2. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java

26
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -114,30 +114,29 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval();
int retryTimes = 1;
boolean taskDBFlag = false;
boolean taskQueueFlag = false;
boolean submitDB = false;
boolean submitQueue = false;
TaskInstance task = null;
while (true){
while (retryTimes <= commitRetryTimes){
try {
if(!taskDBFlag){
if(!submitDB){
// submit task to db
task = processDao.submitTask(taskInstance, processInstance);
if(task != null && task.getId() != 0){
taskDBFlag = true;
submitDB = true;
}
}
if(taskDBFlag && !taskQueueFlag){
if(submitDB && !submitQueue){
// submit task to queue
taskQueueFlag = processDao.submitTaskToQueue(task);
submitQueue = processDao.submitTaskToQueue(task);
}
if(taskDBFlag && taskQueueFlag){
if(submitDB && submitQueue){
return task;
}
if(!taskDBFlag){
logger.error("task commit to db failed , task has already retry {} times, please check the database", retryTimes);
}else if(!taskQueueFlag){
logger.error("task commit to queue failed , task has already retry {} times, please check the database", retryTimes);
if(!submitDB){
logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
}else if(!submitQueue){
logger.error("task commit to queue failed , taskId {} has already retry {} times, please check the queue", taskInstance.getId(), retryTimes);
}
Thread.sleep(commitRetryInterval);
} catch (Exception e) {
@ -145,6 +144,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
}
retryTimes += 1;
}
return task;
}
/**

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java

@ -74,6 +74,10 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
public Boolean submitWaitComplete() {
Boolean result = false;
this.taskInstance = submit();
if(this.taskInstance == null){
logger.error("submit task instance to mysql and queue failed , please check and fix it");
return result;
}
if(!this.taskInstance.getState().typeIsFinished()) {
result = waitTaskQuit();
}

Loading…
Cancel
Save