Browse Source

Set master's task running status in `runTask` to avoid the task group acquire failed, but the task status is in running (#11451) (#12011)

(cherry picked from commit 05589606a2)
3.0.1-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
041e1fd3e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
  2. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
  3. 43
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java

@ -110,19 +110,21 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
return false;
}
this.setTaskExecutionLogger();
initTaskParameters();
logger.info("blocking task start");
logger.info("blocking task submit success");
return true;
}
@Override
protected boolean runTask() {
logger.info("blocking task starting");
initTaskParameters();
if (conditionResult.equals(DependResult.WAITING)) {
setConditionResult();
endTask();
} else {
endTask();
}
logger.info("blocking task finished");
return true;
}

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java

@ -67,19 +67,21 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
return false;
}
this.setTaskExecutionLogger();
initTaskParameters();
logger.info("condition task start");
logger.info("condition task submit success");
return true;
}
@Override
public boolean runTask() {
initTaskParameters();
logger.info("condition task start");
if (conditionResult.equals(DependResult.WAITING)) {
setConditionResult();
endTask();
} else {
endTask();
}
logger.info("condition task finished");
return true;
}

43
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java

@ -17,8 +17,9 @@
package org.apache.dolphinscheduler.server.master.runner.task;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
import com.google.auto.service.AutoService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -30,9 +31,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@ -41,7 +39,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import com.google.auto.service.AutoService;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
/**
* switch task processor
@ -64,29 +62,28 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
return false;
}
this.setTaskExecutionLogger();
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);
logger.info("switch task submit success");
return true;
}
@Override
public boolean runTask() {
try {
if (!this.taskInstance().getState().typeIsFinished() && setSwitchResult()) {
endTaskState();
}
} catch (Exception e) {
logger.error("update work flow {} switch task {} state error:",
this.processInstance.getId(),
this.taskInstance.getId(),
e);
logger.info("switch task starting");
taskInstance.setLogPath(
LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);
if (!this.taskInstance().getState().typeIsFinished()) {
setSwitchResult();
}
endTaskState();
logger.info("switch task finished");
return true;
}

Loading…
Cancel
Save