Browse Source

merge from 1.3.3-release

pull/3/MERGE
baoliang 4 years ago
parent
commit
d0c8dadc94
  1. 32
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java

32
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java

@ -121,15 +121,6 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
taskInstance = processService.findTaskInstanceById(taskInstance.getId()); taskInstance = processService.findTaskInstanceById(taskInstance.getId());
logger.info("wait task: process id: {}, task id:{}, task name:{} complete", logger.info("wait task: process id: {}, task id:{}, task name:{} complete",
this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName()); this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());
// task time out
boolean checkTimeout = false;
TaskTimeoutParameter taskTimeoutParameter = getTaskTimeoutParameter();
if(taskTimeoutParameter.getEnable()){
TaskTimeoutStrategy strategy = taskTimeoutParameter.getStrategy();
if(strategy == TaskTimeoutStrategy.WARN || strategy == TaskTimeoutStrategy.WARNFAILED){
checkTimeout = true;
}
}
while (Stopper.isRunning()){ while (Stopper.isRunning()){
try { try {
@ -150,18 +141,8 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId()); taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId());
break; break;
} }
if(checkTimeout){ if (checkTaskTimeout()) {
long remainTime = DateUtils.getRemainTime(taskInstance.getStartTime(), taskTimeoutParameter.getInterval() * 60L); this.checkTimeoutFlag = !alertTimeout();
if (remainTime < 0) {
logger.warn("task id: {} execution time out",taskInstance.getId());
// process define
ProcessDefinition processDefine = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
// send warn mail
alertDao.sendTaskTimeoutAlert(processInstance.getWarningGroupId(),processDefine.getReceivers(),
processDefine.getReceiversCc(), processInstance.getId(), processInstance.getName(),
taskInstance.getId(),taskInstance.getName());
checkTimeout = false;
}
} }
// updateProcessInstance task instance // updateProcessInstance task instance
taskInstance = processService.findTaskInstanceById(taskInstance.getId()); taskInstance = processService.findTaskInstanceById(taskInstance.getId());
@ -248,13 +229,4 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
return true; return true;
} }
/**
* get task timeout parameter
* @return TaskTimeoutParameter
*/
private TaskTimeoutParameter getTaskTimeoutParameter(){
String taskJson = taskInstance.getTaskJson();
TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
return taskNode.getTaskTimeoutParameter();
}
} }

Loading…
Cancel
Save