|
|
|
@ -88,6 +88,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
|
|
|
|
|
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; |
|
|
|
|
import org.apache.dolphinscheduler.service.utils.DagHelper; |
|
|
|
|
import org.apache.dolphinscheduler.service.utils.LoggerUtils; |
|
|
|
|
|
|
|
|
|
import org.apache.commons.collections.CollectionUtils; |
|
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
|
|
import org.apache.commons.lang3.math.NumberUtils; |
|
|
|
@ -291,19 +292,19 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
} catch (StateEventHandleError stateEventHandleError) { |
|
|
|
|
logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError); |
|
|
|
|
this.stateEvents.remove(stateEvent); |
|
|
|
|
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); |
|
|
|
|
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT); |
|
|
|
|
} catch (StateEventHandleException stateEventHandleException) { |
|
|
|
|
logger.error("State event handle error, will retry this event: {}", |
|
|
|
|
stateEvent, |
|
|
|
|
stateEventHandleException); |
|
|
|
|
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); |
|
|
|
|
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
// we catch the exception here, since if the state event handle failed, the state event will still keep
|
|
|
|
|
// in the stateEvents queue.
|
|
|
|
|
logger.error("State event handle error, get a unknown exception, will retry this event: {}", |
|
|
|
|
stateEvent, |
|
|
|
|
e); |
|
|
|
|
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); |
|
|
|
|
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT); |
|
|
|
|
} finally { |
|
|
|
|
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); |
|
|
|
|
} |
|
|
|
@ -322,6 +323,18 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
return key; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public boolean existStateEvent(StateEvent stateEvent) { |
|
|
|
|
if (CollectionUtils.isNotEmpty(this.stateEvents)) { |
|
|
|
|
Optional<StateEvent> optional = this.stateEvents.stream() |
|
|
|
|
.filter(e -> e.getProcessInstanceId() == stateEvent.getProcessInstanceId() |
|
|
|
|
&& Objects.equals(e.getTaskInstanceId(), stateEvent.getTaskInstanceId()) |
|
|
|
|
&& e.getType() == stateEvent.getType()) |
|
|
|
|
.findFirst(); |
|
|
|
|
return optional.isPresent(); |
|
|
|
|
} |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public boolean addStateEvent(StateEvent stateEvent) { |
|
|
|
|
if (processInstance.getId() != stateEvent.getProcessInstanceId()) { |
|
|
|
|
logger.info("state event would be abounded :{}", stateEvent); |
|
|
|
@ -606,7 +619,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
Date scheduleDate = processInstance.getScheduleTime(); |
|
|
|
|
if (scheduleDate == null) { |
|
|
|
|
if (CollectionUtils.isEmpty(complementListDate)) { |
|
|
|
|
logger.info("complementListDate is empty, process complement end. process id:{}", processInstance.getId()); |
|
|
|
|
logger.info("complementListDate is empty, process complement end. process id:{}", |
|
|
|
|
processInstance.getId()); |
|
|
|
|
|
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
@ -831,7 +845,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
task.getTaskCode(), |
|
|
|
|
task.getState()); |
|
|
|
|
if (validTaskMap.containsKey(task.getTaskCode())) { |
|
|
|
|
logger.warn("Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}", |
|
|
|
|
logger.warn( |
|
|
|
|
"Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}", |
|
|
|
|
task.getTaskCode()); |
|
|
|
|
int oldTaskInstanceId = validTaskMap.get(task.getTaskCode()); |
|
|
|
|
TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId); |
|
|
|
@ -980,7 +995,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
taskInstance.getProcessInstanceId(), |
|
|
|
|
taskInstance.getTaskGroupPriority()); |
|
|
|
|
if (!acquireTaskGroup) { |
|
|
|
|
logger.info("Submitted task will not be dispatch right now because the first time to try to acquire" + |
|
|
|
|
logger.info( |
|
|
|
|
"Submitted task will not be dispatch right now because the first time to try to acquire" + |
|
|
|
|
" task group failed, taskInstanceName: {}, taskGroupId: {}", |
|
|
|
|
taskInstance.getName(), taskGroupId); |
|
|
|
|
return Optional.of(taskInstance); |
|
|
|
@ -989,7 +1005,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
|
|
|
|
|
boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH); |
|
|
|
|
if (!dispatchSuccess) { |
|
|
|
|
logger.error("Dispatch standby process {} task {} failed", processInstance.getName(), taskInstance.getName()); |
|
|
|
|
logger.error("Dispatch standby process {} task {} failed", processInstance.getName(), |
|
|
|
|
taskInstance.getName()); |
|
|
|
|
return Optional.empty(); |
|
|
|
|
} |
|
|
|
|
taskProcessor.action(TaskAction.RUN); |
|
|
|
@ -1446,9 +1463,10 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
List<String> nextTaskList = |
|
|
|
|
DagHelper.parseConditionTask(dependNodeName, skipTaskNodeMap, dag, getCompleteTaskInstanceMap()); |
|
|
|
|
if (!nextTaskList.contains(nextNodeName)) { |
|
|
|
|
logger.info("DependTask is a condition task, and its next condition branch does not hava current task, " + |
|
|
|
|
"dependTaskCode: {}, currentTaskCode: {}", dependNodeName, nextNodeName |
|
|
|
|
); |
|
|
|
|
logger.info( |
|
|
|
|
"DependTask is a condition task, and its next condition branch does not hava current task, " + |
|
|
|
|
"dependTaskCode: {}, currentTaskCode: {}", |
|
|
|
|
dependNodeName, nextNodeName); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
@ -1824,7 +1842,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
TaskInstance retryTask = processService.findTaskInstanceById(task.getId()); |
|
|
|
|
if (retryTask != null && retryTask.getState().isForceSuccess()) { |
|
|
|
|
task.setState(retryTask.getState()); |
|
|
|
|
logger.info("Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}", |
|
|
|
|
logger.info( |
|
|
|
|
"Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}", |
|
|
|
|
task.getName(), task.getId()); |
|
|
|
|
removeTaskFromStandbyList(task); |
|
|
|
|
completeTaskMap.put(task.getTaskCode(), task.getId()); |
|
|
|
|