|
|
|
@ -35,15 +35,14 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
|
|
|
|
|
import org.apache.dolphinscheduler.common.enums.FailureStrategy; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.Flag; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.Priority; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.StateEvent; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.StateEventType; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.TaskDependType; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; |
|
|
|
|
import org.apache.dolphinscheduler.common.enums.TimeoutFlag; |
|
|
|
|
import org.apache.dolphinscheduler.common.graph.DAG; |
|
|
|
|
import org.apache.dolphinscheduler.common.model.TaskNode; |
|
|
|
|
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; |
|
|
|
|
import org.apache.dolphinscheduler.common.process.ProcessDag; |
|
|
|
|
import org.apache.dolphinscheduler.common.thread.ThreadUtils; |
|
|
|
|
import org.apache.dolphinscheduler.common.utils.DateUtils; |
|
|
|
|
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
|
|
|
|
import org.apache.dolphinscheduler.common.utils.LoggerUtils; |
|
|
|
@ -63,14 +62,16 @@ import org.apache.dolphinscheduler.dao.utils.DagHelper;
|
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; |
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; |
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; |
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; |
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.model.Property; |
|
|
|
|
import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters; |
|
|
|
|
import org.apache.dolphinscheduler.remote.command.HostUpdateCommand; |
|
|
|
|
import org.apache.dolphinscheduler.remote.utils.Host; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.config.MasterConfig; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.event.StateEvent; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.event.StateEventHandleError; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.event.StateEventHandleException; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.event.StateEventHandler; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.event.StateEventHandlerManager; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor; |
|
|
|
|
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; |
|
|
|
@ -275,16 +276,37 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
if (!isStart) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
StateEvent stateEvent = null; |
|
|
|
|
while (!this.stateEvents.isEmpty()) { |
|
|
|
|
try { |
|
|
|
|
StateEvent stateEvent = this.stateEvents.peek(); |
|
|
|
|
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId()); |
|
|
|
|
if (stateEventHandler(stateEvent)) { |
|
|
|
|
stateEvent = this.stateEvents.peek(); |
|
|
|
|
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), |
|
|
|
|
stateEvent.getTaskInstanceId()); |
|
|
|
|
// if state handle success then will remove this state, otherwise will retry this state next time.
|
|
|
|
|
// The state should always handle success except database error.
|
|
|
|
|
checkProcessInstance(stateEvent); |
|
|
|
|
|
|
|
|
|
StateEventHandler stateEventHandler |
|
|
|
|
= StateEventHandlerManager.getStateEventHandler(stateEvent.getType()) |
|
|
|
|
.orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event")); |
|
|
|
|
if (stateEventHandler.handleStateEvent(this, stateEvent)) { |
|
|
|
|
this.stateEvents.remove(stateEvent); |
|
|
|
|
} |
|
|
|
|
} catch (StateEventHandleError stateEventHandleError) { |
|
|
|
|
logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError); |
|
|
|
|
this.stateEvents.remove(stateEvent); |
|
|
|
|
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); |
|
|
|
|
} catch (StateEventHandleException stateEventHandleException) { |
|
|
|
|
logger.error("State event handle error, will retry this event: {}", |
|
|
|
|
stateEvent, |
|
|
|
|
stateEventHandleException); |
|
|
|
|
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
// we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue.
|
|
|
|
|
logger.error("state handle error:", e); |
|
|
|
|
logger.error("State event handle error, get a unknown exception, will retry this event: {}", |
|
|
|
|
stateEvent, |
|
|
|
|
e); |
|
|
|
|
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); |
|
|
|
|
} finally { |
|
|
|
|
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); |
|
|
|
|
} |
|
|
|
@ -321,58 +343,14 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
return this.processInstance; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean stateEventHandler(StateEvent stateEvent) { |
|
|
|
|
logger.info("process event: {}", stateEvent); |
|
|
|
|
|
|
|
|
|
if (!checkProcessInstance(stateEvent)) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
boolean result = false; |
|
|
|
|
switch (stateEvent.getType()) { |
|
|
|
|
case PROCESS_STATE_CHANGE: |
|
|
|
|
measureProcessState(stateEvent); |
|
|
|
|
result = processStateChangeHandler(stateEvent); |
|
|
|
|
break; |
|
|
|
|
case TASK_STATE_CHANGE: |
|
|
|
|
measureTaskState(stateEvent); |
|
|
|
|
result = taskStateChangeHandler(stateEvent); |
|
|
|
|
break; |
|
|
|
|
case PROCESS_TIMEOUT: |
|
|
|
|
ProcessInstanceMetrics.incProcessInstanceTimeout(); |
|
|
|
|
result = processTimeout(); |
|
|
|
|
break; |
|
|
|
|
case TASK_TIMEOUT: |
|
|
|
|
TaskMetrics.incTaskTimeout(); |
|
|
|
|
result = taskTimeout(stateEvent); |
|
|
|
|
break; |
|
|
|
|
case WAIT_TASK_GROUP: |
|
|
|
|
result = checkForceStartAndWakeUp(stateEvent); |
|
|
|
|
break; |
|
|
|
|
case TASK_RETRY: |
|
|
|
|
TaskMetrics.incTaskRetry(); |
|
|
|
|
result = taskRetryEventHandler(stateEvent); |
|
|
|
|
break; |
|
|
|
|
case PROCESS_BLOCKED: |
|
|
|
|
result = processBlockHandler(stateEvent); |
|
|
|
|
break; |
|
|
|
|
default: |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (result) { |
|
|
|
|
this.stateEvents.remove(stateEvent); |
|
|
|
|
} |
|
|
|
|
return result; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean checkForceStartAndWakeUp(StateEvent stateEvent) { |
|
|
|
|
public boolean checkForceStartAndWakeUp(StateEvent stateEvent) { |
|
|
|
|
TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId()); |
|
|
|
|
if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) { |
|
|
|
|
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); |
|
|
|
|
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); |
|
|
|
|
taskProcessor.action(TaskAction.DISPATCH); |
|
|
|
|
this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); |
|
|
|
|
this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(), |
|
|
|
|
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) { |
|
|
|
@ -387,76 +365,17 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean taskTimeout(StateEvent stateEvent) { |
|
|
|
|
if (!checkTaskInstanceByStateEvent(stateEvent)) { |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TaskInstance taskInstance = taskInstanceMap.get(stateEvent.getTaskInstanceId()); |
|
|
|
|
if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) { |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy(); |
|
|
|
|
if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) { |
|
|
|
|
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); |
|
|
|
|
taskProcessor.action(TaskAction.TIMEOUT); |
|
|
|
|
} |
|
|
|
|
if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) { |
|
|
|
|
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); |
|
|
|
|
processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser); |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean processTimeout() { |
|
|
|
|
public void processTimeout() { |
|
|
|
|
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); |
|
|
|
|
this.processAlertManager.sendProcessTimeoutAlert(this.processInstance, projectUser); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean taskStateChangeHandler(StateEvent stateEvent) { |
|
|
|
|
if (!checkTaskInstanceByStateEvent(stateEvent)) { |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Optional<TaskInstance> taskInstanceOptional = getTaskInstance(stateEvent.getTaskInstanceId()); |
|
|
|
|
TaskInstance task = taskInstanceOptional.orElseThrow( |
|
|
|
|
() -> new RuntimeException("Cannot find task instance by task instance id: " + stateEvent.getTaskInstanceId())); |
|
|
|
|
|
|
|
|
|
if (task.getState() == null) { |
|
|
|
|
logger.error("task state is null, state handler error: {}", stateEvent); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (task.getState().typeIsFinished()) { |
|
|
|
|
if (completeTaskMap.containsKey(task.getTaskCode()) && completeTaskMap.get(task.getTaskCode()) == task.getId()) { |
|
|
|
|
logger.warn("The task instance is already complete, stateEvent: {}", stateEvent); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
taskFinished(task); |
|
|
|
|
if (task.getTaskGroupId() > 0) { |
|
|
|
|
releaseTaskGroup(task); |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
if (activeTaskProcessorMaps.containsKey(task.getTaskCode())) { |
|
|
|
|
ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(task.getTaskCode()); |
|
|
|
|
iTaskProcessor.action(TaskAction.RUN); |
|
|
|
|
|
|
|
|
|
if (iTaskProcessor.taskInstance().getState().typeIsFinished()) { |
|
|
|
|
if (iTaskProcessor.taskInstance().getState() != task.getState()) { |
|
|
|
|
task.setState(iTaskProcessor.taskInstance().getState()); |
|
|
|
|
} |
|
|
|
|
taskFinished(task); |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
logger.error("state handler error: {}", stateEvent); |
|
|
|
|
|
|
|
|
|
return true; |
|
|
|
|
public void taskTimeout(TaskInstance taskInstance) { |
|
|
|
|
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); |
|
|
|
|
processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void taskFinished(TaskInstance taskInstance) { |
|
|
|
|
public void taskFinished(TaskInstance taskInstance) throws StateEventHandleException { |
|
|
|
|
logger.info("TaskInstance finished task code:{} state:{} ", |
|
|
|
|
taskInstance.getTaskCode(), |
|
|
|
|
taskInstance.getState()); |
|
|
|
@ -503,7 +422,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
* |
|
|
|
|
* @param taskInstance |
|
|
|
|
*/ |
|
|
|
|
private void releaseTaskGroup(TaskInstance taskInstance) { |
|
|
|
|
public void releaseTaskGroup(TaskInstance taskInstance) { |
|
|
|
|
if (taskInstance.getTaskGroupId() > 0) { |
|
|
|
|
TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance); |
|
|
|
|
if (nextTaskInstance != null) { |
|
|
|
@ -528,13 +447,15 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
* |
|
|
|
|
* @param taskInstance |
|
|
|
|
*/ |
|
|
|
|
private void retryTaskInstance(TaskInstance taskInstance) { |
|
|
|
|
private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException { |
|
|
|
|
if (!taskInstance.taskCanRetry()) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance); |
|
|
|
|
if (newTaskInstance == null) { |
|
|
|
|
logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId()); |
|
|
|
|
logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}", |
|
|
|
|
taskInstance.getTaskCode(), |
|
|
|
|
taskInstance.getId()); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance); |
|
|
|
@ -556,20 +477,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* handle task retry event |
|
|
|
|
* |
|
|
|
|
* @param stateEvent |
|
|
|
|
* @return |
|
|
|
|
*/ |
|
|
|
|
private boolean taskRetryEventHandler(StateEvent stateEvent) { |
|
|
|
|
TaskInstance taskInstance = waitToRetryTaskInstanceMap.get(stateEvent.getTaskCode()); |
|
|
|
|
addTaskToStandByList(taskInstance); |
|
|
|
|
submitStandByTask(); |
|
|
|
|
waitToRetryTaskInstanceMap.remove(stateEvent.getTaskCode()); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* update process instance |
|
|
|
|
*/ |
|
|
|
@ -603,43 +510,23 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
/** |
|
|
|
|
* check process instance by state event |
|
|
|
|
*/ |
|
|
|
|
public boolean checkProcessInstance(StateEvent stateEvent) { |
|
|
|
|
public void checkProcessInstance(StateEvent stateEvent) throws StateEventHandleError { |
|
|
|
|
if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) { |
|
|
|
|
logger.error("mismatch process instance id: {}, state event:{}", this.processInstance.getId(), stateEvent); |
|
|
|
|
return false; |
|
|
|
|
throw new StateEventHandleError("The event doesn't contains process instance id"); |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* check if task instance exist by state event |
|
|
|
|
*/ |
|
|
|
|
public boolean checkTaskInstanceByStateEvent(StateEvent stateEvent) { |
|
|
|
|
public void checkTaskInstanceByStateEvent(StateEvent stateEvent) throws StateEventHandleError { |
|
|
|
|
if (stateEvent.getTaskInstanceId() == 0) { |
|
|
|
|
logger.error("task instance id null, state event:{}", stateEvent); |
|
|
|
|
return false; |
|
|
|
|
throw new StateEventHandleError("The taskInstanceId is 0"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (!taskInstanceMap.containsKey(stateEvent.getTaskInstanceId())) { |
|
|
|
|
logger.error("mismatch task instance id, event:{}", stateEvent); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* check if task instance exist by task code |
|
|
|
|
*/ |
|
|
|
|
public boolean checkTaskInstanceByCode(long taskCode) { |
|
|
|
|
if (taskInstanceMap.isEmpty()) { |
|
|
|
|
return false; |
|
|
|
|
throw new StateEventHandleError("Cannot find the taskInstance from taskInstanceMap"); |
|
|
|
|
} |
|
|
|
|
for (TaskInstance taskInstance : taskInstanceMap.values()) { |
|
|
|
|
if (taskInstance.getTaskCode() == taskCode) { |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -688,59 +575,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
return Optional.empty(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean processStateChangeHandler(StateEvent stateEvent) { |
|
|
|
|
try { |
|
|
|
|
logger.info("process:{} state {} change to {}", processInstance.getId(), processInstance.getState(), stateEvent.getExecutionStatus()); |
|
|
|
|
|
|
|
|
|
if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) { |
|
|
|
|
// serial wait execution type needs to wake up the waiting process
|
|
|
|
|
if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()) { |
|
|
|
|
endProcess(); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
this.updateProcessInstanceState(stateEvent); |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (processComplementData()) { |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
if (stateEvent.getExecutionStatus().typeIsFinished()) { |
|
|
|
|
endProcess(); |
|
|
|
|
} |
|
|
|
|
if (processInstance.getState() == ExecutionStatus.READY_STOP) { |
|
|
|
|
killAllTasks(); |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
logger.error("process state change error:", e); |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean processBlockHandler(StateEvent stateEvent) { |
|
|
|
|
try { |
|
|
|
|
Optional<TaskInstance> taskInstanceOptional = getTaskInstance(stateEvent.getTaskInstanceId()); |
|
|
|
|
TaskInstance task = taskInstanceOptional.orElseThrow( |
|
|
|
|
() -> new RuntimeException("Cannot find taskInstance by taskInstanceId:" + stateEvent.getTaskInstanceId())); |
|
|
|
|
if (!checkTaskInstanceByStateEvent(stateEvent)) { |
|
|
|
|
logger.error("task {} is not a blocking task", task.getTaskCode()); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
BlockingParameters parameters = JSONUtils.parseObject(task.getTaskParams(), BlockingParameters.class); |
|
|
|
|
if (parameters.isAlertWhenBlocking()) { |
|
|
|
|
public void processBlock() { |
|
|
|
|
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); |
|
|
|
|
processAlertManager.sendProcessBlockingAlert(processInstance, projectUser); |
|
|
|
|
logger.info("processInstance {} block alert send successful!", processInstance.getId()); |
|
|
|
|
} |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
logger.error("sending blocking message error:", e); |
|
|
|
|
} |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean processComplementData() throws Exception { |
|
|
|
|
public boolean processComplementData() { |
|
|
|
|
if (!needComplementProcess()) { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
@ -937,7 +778,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
/** |
|
|
|
|
* init task queue |
|
|
|
|
*/ |
|
|
|
|
private void initTaskQueue() { |
|
|
|
|
private void initTaskQueue() throws StateEventHandleException { |
|
|
|
|
|
|
|
|
|
taskFailedSubmit = false; |
|
|
|
|
activeTaskProcessorMaps.clear(); |
|
|
|
@ -946,7 +787,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
errorTaskMap.clear(); |
|
|
|
|
|
|
|
|
|
if (!isNewProcessInstance()) { |
|
|
|
|
List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); |
|
|
|
|
List<TaskInstance> validTaskInstanceList |
|
|
|
|
= processService.findValidTaskListByProcessId(processInstance.getId()); |
|
|
|
|
for (TaskInstance task : validTaskInstanceList) { |
|
|
|
|
if (validTaskMap.containsKey(task.getTaskCode())) { |
|
|
|
|
int oldTaskInstanceId = validTaskMap.get(task.getTaskCode()); |
|
|
|
@ -956,7 +798,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
processService.updateTaskInstance(task); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}", task.getTaskCode()); |
|
|
|
|
logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}", |
|
|
|
|
task.getTaskCode()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
validTaskMap.put(task.getTaskCode(), task.getId()); |
|
|
|
@ -1099,6 +942,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
Host host = new Host(taskInstance.getHost()); |
|
|
|
|
nettyExecutorManager.doExecute(host, hostUpdateCommand.convert2Command()); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
// Do we need to catch this exception?
|
|
|
|
|
logger.error("notify process host update", e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1348,8 +1192,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
return validTaskInstanceList; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void submitPostNode(String parentNodeCode) { |
|
|
|
|
Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap()); |
|
|
|
|
private void submitPostNode(String parentNodeCode) throws StateEventHandleException { |
|
|
|
|
Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, |
|
|
|
|
skipTaskNodeMap, |
|
|
|
|
dag, |
|
|
|
|
getCompleteTaskInstanceMap()); |
|
|
|
|
List<TaskInstance> taskInstances = new ArrayList<>(); |
|
|
|
|
for (String taskNode : submitTaskNodeList) { |
|
|
|
|
TaskNode taskNodeObject = dag.getNode(taskNode); |
|
|
|
@ -1692,35 +1539,19 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); |
|
|
|
|
Date endTime = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); |
|
|
|
|
return processInstance.getScheduleTime().equals(endTime); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
logger.error("complement end failed ", e); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* updateProcessInstance process instance state |
|
|
|
|
* after each batch of tasks is executed, the status of the process instance is updated |
|
|
|
|
*/ |
|
|
|
|
private void updateProcessInstanceState() { |
|
|
|
|
private void updateProcessInstanceState() throws StateEventHandleException { |
|
|
|
|
ExecutionStatus state = getProcessInstanceState(processInstance); |
|
|
|
|
if (processInstance.getState() != state) { |
|
|
|
|
logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", |
|
|
|
|
processInstance.getId(), |
|
|
|
|
processInstance.getName(), |
|
|
|
|
processInstance.getState(), |
|
|
|
|
state, |
|
|
|
|
processInstance.getCommandType()); |
|
|
|
|
|
|
|
|
|
processInstance.setState(state); |
|
|
|
|
if (state.typeIsFinished()) { |
|
|
|
|
processInstance.setEndTime(new Date()); |
|
|
|
|
} |
|
|
|
|
processService.updateProcessInstance(processInstance); |
|
|
|
|
updateWorkflowInstanceStatesToDB(state); |
|
|
|
|
|
|
|
|
|
StateEvent stateEvent = new StateEvent(); |
|
|
|
|
stateEvent.setExecutionStatus(processInstance.getState()); |
|
|
|
@ -1735,21 +1566,33 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
/** |
|
|
|
|
* stateEvent's execution status as process instance state |
|
|
|
|
*/ |
|
|
|
|
private void updateProcessInstanceState(StateEvent stateEvent) { |
|
|
|
|
public void updateProcessInstanceState(StateEvent stateEvent) throws StateEventHandleException { |
|
|
|
|
ExecutionStatus state = stateEvent.getExecutionStatus(); |
|
|
|
|
if (processInstance.getState() != state) { |
|
|
|
|
updateWorkflowInstanceStatesToDB(state); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void updateWorkflowInstanceStatesToDB(ExecutionStatus newStates) throws StateEventHandleException { |
|
|
|
|
ExecutionStatus originStates = processInstance.getState(); |
|
|
|
|
if (originStates != newStates) { |
|
|
|
|
logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", |
|
|
|
|
processInstance.getId(), |
|
|
|
|
processInstance.getName(), |
|
|
|
|
processInstance.getState(), |
|
|
|
|
state, |
|
|
|
|
originStates, |
|
|
|
|
newStates, |
|
|
|
|
processInstance.getCommandType()); |
|
|
|
|
|
|
|
|
|
processInstance.setState(state); |
|
|
|
|
if (state.typeIsFinished()) { |
|
|
|
|
processInstance.setState(newStates); |
|
|
|
|
if (newStates.typeIsFinished()) { |
|
|
|
|
processInstance.setEndTime(new Date()); |
|
|
|
|
} |
|
|
|
|
try { |
|
|
|
|
processService.updateProcessInstance(processInstance); |
|
|
|
|
} catch (Exception ex) { |
|
|
|
|
// recover the status
|
|
|
|
|
processInstance.setState(originStates); |
|
|
|
|
processInstance.setEndTime(null); |
|
|
|
|
throw new StateEventHandleException("Update process instance status to DB error", ex); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1768,18 +1611,17 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
* |
|
|
|
|
* @param taskInstance task instance |
|
|
|
|
*/ |
|
|
|
|
private void addTaskToStandByList(TaskInstance taskInstance) { |
|
|
|
|
try { |
|
|
|
|
public void addTaskToStandByList(TaskInstance taskInstance) { |
|
|
|
|
if (readyToSubmitTaskQueue.contains(taskInstance)) { |
|
|
|
|
logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode()); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}", |
|
|
|
|
taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode()); |
|
|
|
|
taskInstance.getName(), |
|
|
|
|
taskInstance.getId(), |
|
|
|
|
taskInstance.getTaskCode()); |
|
|
|
|
TaskMetrics.incTaskSubmit(); |
|
|
|
|
readyToSubmitTaskQueue.put(taskInstance); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -1788,15 +1630,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
* @param taskInstance task instance |
|
|
|
|
*/ |
|
|
|
|
private void removeTaskFromStandbyList(TaskInstance taskInstance) { |
|
|
|
|
logger.info("remove task from stand by list, id: {} name:{}", taskInstance.getId(), taskInstance.getName()); |
|
|
|
|
try { |
|
|
|
|
readyToSubmitTaskQueue.remove(taskInstance); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
logger.error("remove task instance from readyToSubmitTaskQueue error, task id:{}, Name: {}", |
|
|
|
|
taskInstance.getId(), |
|
|
|
|
taskInstance.getName(), |
|
|
|
|
e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -1816,7 +1650,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
/** |
|
|
|
|
* close the on going tasks |
|
|
|
|
*/ |
|
|
|
|
private void killAllTasks() { |
|
|
|
|
public void killAllTasks() { |
|
|
|
|
logger.info("kill called on process instance id: {}, num: {}", |
|
|
|
|
processInstance.getId(), |
|
|
|
|
activeTaskProcessorMaps.size()); |
|
|
|
@ -1854,7 +1688,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
/** |
|
|
|
|
* handling the list of tasks to be submitted |
|
|
|
|
*/ |
|
|
|
|
private void submitStandByTask() { |
|
|
|
|
public void submitStandByTask() throws StateEventHandleException { |
|
|
|
|
int length = readyToSubmitTaskQueue.size(); |
|
|
|
|
for (int i = 0; i < length; i++) { |
|
|
|
|
TaskInstance task = readyToSubmitTaskQueue.peek(); |
|
|
|
@ -2013,6 +1847,19 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public Map<Long, Integer> getCompleteTaskMap() { |
|
|
|
|
return completeTaskMap; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public Map<Long, ITaskProcessor> getActiveTaskProcessMap() { |
|
|
|
|
return activeTaskProcessorMaps; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public Map<Long, TaskInstance> getWaitToRetryTaskInstanceMap() { |
|
|
|
|
return waitToRetryTaskInstanceMap; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) { |
|
|
|
|
// get start params from command param
|
|
|
|
|
Map<String, String> startParamMap = new HashMap<>(); |
|
|
|
@ -2047,46 +1894,4 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void measureProcessState(StateEvent processStateEvent) { |
|
|
|
|
if (processStateEvent.getExecutionStatus().typeIsFinished()) { |
|
|
|
|
ProcessInstanceMetrics.incProcessInstanceFinish(); |
|
|
|
|
} |
|
|
|
|
switch (processStateEvent.getExecutionStatus()) { |
|
|
|
|
case STOP: |
|
|
|
|
ProcessInstanceMetrics.incProcessInstanceStop(); |
|
|
|
|
break; |
|
|
|
|
case SUCCESS: |
|
|
|
|
ProcessInstanceMetrics.incProcessInstanceSuccess(); |
|
|
|
|
break; |
|
|
|
|
case FAILURE: |
|
|
|
|
ProcessInstanceMetrics.incProcessInstanceFailure(); |
|
|
|
|
break; |
|
|
|
|
default: |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void measureTaskState(StateEvent taskStateEvent) { |
|
|
|
|
if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) { |
|
|
|
|
// the event is broken
|
|
|
|
|
logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if (taskStateEvent.getExecutionStatus().typeIsFinished()) { |
|
|
|
|
TaskMetrics.incTaskFinish(); |
|
|
|
|
} |
|
|
|
|
switch (taskStateEvent.getExecutionStatus()) { |
|
|
|
|
case STOP: |
|
|
|
|
TaskMetrics.incTaskStop(); |
|
|
|
|
break; |
|
|
|
|
case SUCCESS: |
|
|
|
|
TaskMetrics.incTaskSuccess(); |
|
|
|
|
break; |
|
|
|
|
case FAILURE: |
|
|
|
|
TaskMetrics.incTaskFailure(); |
|
|
|
|
break; |
|
|
|
|
default: |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |