From 7500e9968296f62269daca95b270fc65870fce97 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 6 Jul 2022 14:53:28 +0800 Subject: [PATCH] [Fix-10785] Fix state event handle error will not retry (#10786) * Fix state event handle error will not retry * Use state event handler to deal with the event (cherry picked from commit 67d14fb7b3d941af613a96a0b9c3a2928e5c201c) --- .../api/service/impl/ExecutorServiceImpl.java | 12 +- .../consumer/TaskPriorityQueueConsumer.java | 2 + .../master/dispatch/host/assign/Selector.java | 2 +- .../server/master/event}/StateEvent.java | 3 +- .../master/event/StateEventHandleError.java | 33 ++ .../event/StateEventHandleException.java | 32 ++ .../master/event/StateEventHandler.java | 36 ++ .../event/StateEventHandlerManager.java | 41 ++ .../event/TaskRetryStateEventHandler.java | 47 ++ .../master/event/TaskStateEventHandler.java | 115 +++++ .../event/TaskTimeoutStateEventHandler.java | 64 +++ .../event/TaskWaitTaskGroupStateHandler.java | 36 ++ .../event/WorkflowBlockStateEventHandler.java | 60 +++ .../event/WorkflowStateEventHandler.java | 95 ++++ .../WorkflowTimeoutStateEventHandler.java | 39 ++ .../master/processor/StateEventProcessor.java | 5 +- .../master/processor/TaskEventProcessor.java | 2 +- .../queue/StateEventResponseService.java | 3 +- .../processor/queue/TaskEventService.java | 17 +- .../processor/queue/TaskExecuteRunnable.java | 3 +- .../queue/TaskExecuteThreadPool.java | 21 +- .../master/runner/MasterSchedulerService.java | 1 + .../runner/StateWheelExecuteThread.java | 2 +- .../runner/WorkflowExecuteRunnable.java | 441 +++++------------- .../runner/WorkflowExecuteThreadPool.java | 3 +- .../master/service/FailoverService.java | 2 +- .../master/service/FailoverServiceTest.java | 2 +- .../remote/processor/NettyRemoteChannel.java | 7 +- .../processor/StateEventCallbackService.java | 29 +- .../queue/PeerTaskInstancePriorityQueue.java | 3 +- .../service/queue/TaskPriorityQueue.java | 2 +- .../service/queue/TaskPriorityQueueImpl.java | 3 +- .../TaskExecuteResponseAckProcessor.java | 8 +- 33 files changed, 798 insertions(+), 373 deletions(-) rename {dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums => dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event}/StateEvent.java (91%) create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleError.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleException.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowBlockStateEventHandler.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 5f72be37ed..b58cc9dbcd 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -65,6 +65,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; +import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; @@ -116,7 +117,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ private ProcessService processService; @Autowired - StateEventCallbackService stateEventCallbackService; + private StateEventCallbackService stateEventCallbackService; @Autowired private TaskDefinitionMapper taskDefinitionMapper; @@ -467,13 +468,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ // determine whether the process is normal if (update > 0) { - String host = processInstance.getHost(); - String address = host.split(":")[0]; - int port = Integer.parseInt(host.split(":")[1]); + // directly send the process instance state change event to target master, not guarantee the event send success StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0 ); - stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); + Host host = new Host(processInstance.getHost()); + stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command()); putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR); @@ -737,7 +737,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ } } logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount); - + // Distribute the number of tasks equally to each command. // The last command with insufficient quantity will be assigned to the remaining tasks. int itemsPerCommand = (listDateSize / createCount); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index e8cf4b2919..f04ae15dd8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -116,7 +116,9 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread { @PostConstruct public void init() { this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber()); + logger.info("Task priority queue consume thread staring"); super.start(); + logger.info("Task priority queue consume thread started"); } @Override diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java index 8eed9d991e..49a4053e5e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java @@ -27,7 +27,7 @@ public interface Selector { /** * select - * @param source source + * @param source source, the given source should not be empty. * @return T */ T select(Collection source); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEvent.java similarity index 91% rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEvent.java index 7f4be924dd..68cd582994 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEvent.java @@ -15,8 +15,9 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.common.enums; +package org.apache.dolphinscheduler.server.master.event; +import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import io.netty.channel.Channel; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleError.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleError.java new file mode 100644 index 0000000000..a8dce04f03 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleError.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +/** + * This exception represent the exception can not recover, this happens when the event is broken. + * And when we get this exception, we will drop the event. + */ +public class StateEventHandleError extends Exception { + + public StateEventHandleError(String message) { + super(message); + } + + public StateEventHandleError(String message, Throwable throwable) { + super(message, throwable); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleException.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleException.java new file mode 100644 index 0000000000..dc9456a397 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +/** + * This exception represent the exception can be recovered, when we get this exception, we will retry the event. + */ +public class StateEventHandleException extends Exception { + + public StateEventHandleException(String message) { + super(message); + } + + public StateEventHandleException(String message, Throwable throwable) { + super(message, throwable); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java new file mode 100644 index 0000000000..00808b2e29 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; + +public interface StateEventHandler { + + /** + * Handle a event, if handle success will reture true, else return false + * + * @param stateEvent given state event. + * @throws StateEventHandleException this exception means it can be recovered. + * @throws StateEventHandleError this exception means it cannot be recovered, so the event need to drop. + */ + boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) + throws StateEventHandleException, StateEventHandleError; + + StateEventType getEventType(); +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java new file mode 100644 index 0000000000..b436b55890 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import org.apache.dolphinscheduler.common.enums.StateEventType; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.ServiceLoader; + +public class StateEventHandlerManager { + + private static final Map stateEventHandlerMap = new HashMap<>(); + + static { + ServiceLoader.load(StateEventHandler.class) + .forEach(stateEventHandler -> stateEventHandlerMap.put(stateEventHandler.getEventType(), + stateEventHandler)); + } + + public static Optional getStateEventHandler(StateEventType stateEventType) { + return Optional.ofNullable(stateEventHandlerMap.get(stateEventType)); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java new file mode 100644 index 0000000000..ee8168856a --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; + +import java.util.Map; + +import com.google.auto.service.AutoService; + +@AutoService(StateEventHandler.class) +public class TaskRetryStateEventHandler implements StateEventHandler { + @Override + public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) + throws StateEventHandleException { + TaskMetrics.incTaskRetry(); + Map waitToRetryTaskInstanceMap = workflowExecuteRunnable.getWaitToRetryTaskInstanceMap(); + TaskInstance taskInstance = waitToRetryTaskInstanceMap.get(stateEvent.getTaskCode()); + workflowExecuteRunnable.addTaskToStandByList(taskInstance); + workflowExecuteRunnable.submitStandByTask(); + waitToRetryTaskInstanceMap.remove(stateEvent.getTaskCode()); + return true; + } + + @Override + public StateEventType getEventType() { + return StateEventType.TASK_RETRY; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java new file mode 100644 index 0000000000..e3ad268f97 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor; +import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; + +import java.util.Map; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.auto.service.AutoService; + +@AutoService(StateEventHandler.class) +public class TaskStateEventHandler implements StateEventHandler { + + private static final Logger logger = LoggerFactory.getLogger(TaskStateEventHandler.class); + + @Override + public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) + throws StateEventHandleException, StateEventHandleError { + measureTaskState(stateEvent); + workflowExecuteRunnable.checkTaskInstanceByStateEvent(stateEvent); + + Optional taskInstanceOptional = + workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId()); + + TaskInstance task = taskInstanceOptional.orElseThrow(() -> new StateEventHandleError( + "Cannot find task instance from taskMap by task instance id: " + stateEvent.getTaskInstanceId())); + + if (task.getState() == null) { + throw new StateEventHandleError("Task state event handle error due to task state is null"); + } + + Map completeTaskMap = workflowExecuteRunnable.getCompleteTaskMap(); + + if (task.getState().typeIsFinished()) { + if (completeTaskMap.containsKey(task.getTaskCode()) + && completeTaskMap.get(task.getTaskCode()) == task.getId()) { + logger.warn("The task instance is already complete, stateEvent: {}", stateEvent); + return true; + } + workflowExecuteRunnable.taskFinished(task); + if (task.getTaskGroupId() > 0) { + workflowExecuteRunnable.releaseTaskGroup(task); + } + return true; + } + Map activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap(); + if (activeTaskProcessMap.containsKey(task.getTaskCode())) { + ITaskProcessor iTaskProcessor = activeTaskProcessMap.get(task.getTaskCode()); + iTaskProcessor.action(TaskAction.RUN); + + if (iTaskProcessor.taskInstance().getState().typeIsFinished()) { + if (iTaskProcessor.taskInstance().getState() != task.getState()) { + task.setState(iTaskProcessor.taskInstance().getState()); + } + workflowExecuteRunnable.taskFinished(task); + } + return true; + } + throw new StateEventHandleException( + "Task state event handle error, due to the task is not in activeTaskProcessorMaps"); + } + + @Override + public StateEventType getEventType() { + return StateEventType.TASK_STATE_CHANGE; + } + + private void measureTaskState(StateEvent taskStateEvent) { + if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) { + // the event is broken + logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent); + return; + } + if (taskStateEvent.getExecutionStatus().typeIsFinished()) { + TaskMetrics.incTaskFinish(); + } + switch (taskStateEvent.getExecutionStatus()) { + case STOP: + TaskMetrics.incTaskStop(); + break; + case SUCCESS: + TaskMetrics.incTaskSuccess(); + break; + case FAILURE: + TaskMetrics.incTaskFailure(); + break; + default: + break; + } + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java new file mode 100644 index 0000000000..240f10ff2c --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.common.enums.TimeoutFlag; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor; +import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; + +import java.util.Map; + +import com.google.auto.service.AutoService; + +@AutoService(StateEventHandler.class) +public class TaskTimeoutStateEventHandler implements StateEventHandler { + @Override + public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) + throws StateEventHandleError { + TaskMetrics.incTaskTimeout(); + workflowExecuteRunnable.checkTaskInstanceByStateEvent(stateEvent); + + TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId()).get(); + + if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) { + return true; + } + TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy(); + Map activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap(); + if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy + || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) { + ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode()); + taskProcessor.action(TaskAction.TIMEOUT); + } + if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) { + workflowExecuteRunnable.processTimeout(); + workflowExecuteRunnable.taskTimeout(taskInstance); + } + return true; + } + + @Override + public StateEventType getEventType() { + return StateEventType.TASK_TIMEOUT; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java new file mode 100644 index 0000000000..9a3c59a949 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; + +import com.google.auto.service.AutoService; + +@AutoService(StateEventHandler.class) +public class TaskWaitTaskGroupStateHandler implements StateEventHandler { + @Override + public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) { + return workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent); + } + + @Override + public StateEventType getEventType() { + return StateEventType.WAIT_TASK_GROUP; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowBlockStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowBlockStateEventHandler.java new file mode 100644 index 0000000000..f7349fcbd1 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowBlockStateEventHandler.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; + +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.auto.service.AutoService; + +@AutoService(StateEventHandler.class) +public class WorkflowBlockStateEventHandler implements StateEventHandler { + + private static final Logger logger = LoggerFactory.getLogger(WorkflowBlockStateEventHandler.class); + + @Override + public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) + throws StateEventHandleError { + Optional taskInstanceOptional = + workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId()); + if (!taskInstanceOptional.isPresent()) { + throw new StateEventHandleError("Cannot find taskInstance from taskMap by taskInstanceId: " + + stateEvent.getTaskInstanceId()); + } + TaskInstance task = taskInstanceOptional.get(); + + BlockingParameters parameters = JSONUtils.parseObject(task.getTaskParams(), BlockingParameters.class); + if (parameters != null && parameters.isAlertWhenBlocking()) { + workflowExecuteRunnable.processBlock(); + } + return true; + } + + @Override + public StateEventType getEventType() { + return StateEventType.PROCESS_BLOCKED; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java new file mode 100644 index 0000000000..37d8ceb1da --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.auto.service.AutoService; + +@AutoService(StateEventHandler.class) +public class WorkflowStateEventHandler implements StateEventHandler { + + private static final Logger logger = LoggerFactory.getLogger(WorkflowStateEventHandler.class); + + @Override + public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) + throws StateEventHandleException { + measureProcessState(stateEvent); + ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance(); + ProcessDefinition processDefinition = processInstance.getProcessDefinition(); + + logger.info("process:{} state {} change to {}", + processInstance.getId(), + processInstance.getState(), + stateEvent.getExecutionStatus()); + + if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) { + // serial wait execution type needs to wake up the waiting process + if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType() + .typeIsSerialPriority()) { + workflowExecuteRunnable.endProcess(); + return true; + } + workflowExecuteRunnable.updateProcessInstanceState(stateEvent); + return true; + } + if (workflowExecuteRunnable.processComplementData()) { + return true; + } + if (stateEvent.getExecutionStatus().typeIsFinished()) { + workflowExecuteRunnable.endProcess(); + } + if (processInstance.getState() == ExecutionStatus.READY_STOP) { + workflowExecuteRunnable.killAllTasks(); + } + + return true; + } + + @Override + public StateEventType getEventType() { + return StateEventType.PROCESS_STATE_CHANGE; + } + + private void measureProcessState(StateEvent processStateEvent) { + if (processStateEvent.getExecutionStatus().typeIsFinished()) { + ProcessInstanceMetrics.incProcessInstanceFinish(); + } + switch (processStateEvent.getExecutionStatus()) { + case STOP: + ProcessInstanceMetrics.incProcessInstanceStop(); + break; + case SUCCESS: + ProcessInstanceMetrics.incProcessInstanceSuccess(); + break; + case FAILURE: + ProcessInstanceMetrics.incProcessInstanceFailure(); + break; + default: + break; + } + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java new file mode 100644 index 0000000000..c2fc873bdc --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; + +import com.google.auto.service.AutoService; + +@AutoService(StateEventHandler.class) +public class WorkflowTimeoutStateEventHandler implements StateEventHandler { + @Override + public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) { + ProcessInstanceMetrics.incProcessInstanceTimeout(); + workflowExecuteRunnable.processTimeout(); + return true; + } + + @Override + public StateEventType getEventType() { + return StateEventType.PROCESS_TIMEOUT; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java index b2c47e4112..f90277d0eb 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.server.master.processor; -import org.apache.dolphinscheduler.common.enums.StateEvent; +import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; @@ -62,7 +62,8 @@ public class StateEventProcessor implements NettyRequestProcessor { } stateEvent.setProcessInstanceId(stateEventChangeCommand.getDestProcessInstanceId()); stateEvent.setTaskInstanceId(stateEventChangeCommand.getDestTaskInstanceId()); - StateEventType type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE; + StateEventType + type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE; stateEvent.setType(type); try { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java index e597f1cae5..42d4ab4db7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.server.master.processor; -import org.apache.dolphinscheduler.common.enums.StateEvent; +import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java index 4ca6b9eccb..bdf80bbee9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.server.master.processor.queue; -import org.apache.dolphinscheduler.common.enums.StateEvent; +import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.LoggerUtils; @@ -98,6 +98,7 @@ public class StateEventResponseService { */ public void addResponse(StateEvent stateEvent) { try { + // check the event is validated eventQueue.put(stateEvent); } catch (InterruptedException e) { logger.error("Put state event : {} error", stateEvent, e); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java index bed3b3d9ed..d4b97c09c7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java @@ -63,11 +63,15 @@ public class TaskEventService { @PostConstruct public void start() { - this.taskEventThread = new TaskEventThread(); + this.taskEventThread = new TaskEventDispatchThread(); + logger.info("TaskEvent dispatch thread starting"); this.taskEventThread.start(); + logger.info("TaskEvent dispatch thread started"); this.taskEventHandlerThread = new TaskEventHandlerThread(); + logger.info("TaskEvent handle thread staring"); this.taskEventHandlerThread.start(); + logger.info("TaskEvent handle thread started"); } @PreDestroy @@ -94,14 +98,14 @@ public class TaskEventService { * @param taskEvent taskEvent */ public void addEvent(TaskEvent taskEvent) { - taskExecuteThreadPool.submitTaskEvent(taskEvent); + eventQueue.add(taskEvent); } /** - * task worker thread + * Dispatch event to target task runnable. */ - class TaskEventThread extends BaseDaemonThread { - protected TaskEventThread() { + class TaskEventDispatchThread extends BaseDaemonThread { + protected TaskEventDispatchThread() { super("TaskEventLoopThread"); } @@ -109,7 +113,7 @@ public class TaskEventService { public void run() { while (Stopper.isRunning()) { try { - // if not task , blocking here + // if not task event, blocking here TaskEvent taskEvent = eventQueue.take(); taskExecuteThreadPool.submitTaskEvent(taskEvent); } catch (InterruptedException e) { @@ -141,6 +145,7 @@ public class TaskEventService { TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); + logger.warn("TaskEvent handle thread interrupted, will return this loop"); break; } catch (Exception e) { logger.error("event handler thread error", e); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java index 3bf249ed3e..f488fc3de1 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java @@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.server.master.processor.queue; import org.apache.dolphinscheduler.common.enums.Event; -import org.apache.dolphinscheduler.common.enums.StateEvent; +import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -189,6 +189,7 @@ public class TaskExecuteRunnable implements Runnable { } } // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success + // send ack to worker TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command()); } catch (Exception e) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java index 68e4511c42..ae578d4c03 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java @@ -82,18 +82,13 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor { logger.warn("Cannot find workflowExecuteThread from cacheManager, event: {}", taskEvent); return; } - if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) { - TaskExecuteRunnable taskExecuteThread = new TaskExecuteRunnable( - taskEvent.getProcessInstanceId(), - processService, workflowExecuteThreadPool, - processInstanceExecCacheManager, - dataQualityResultOperator); - taskExecuteThreadMap.put(taskEvent.getProcessInstanceId(), taskExecuteThread); - } - TaskExecuteRunnable taskExecuteRunnable= taskExecuteThreadMap.get(taskEvent.getProcessInstanceId()); - if (taskExecuteRunnable != null) { - taskExecuteRunnable.addEvent(taskEvent); - } + TaskExecuteRunnable taskExecuteRunnable = taskExecuteThreadMap.computeIfAbsent(taskEvent.getProcessInstanceId(), + (processInstanceId) -> new TaskExecuteRunnable(processInstanceId, + processService, + workflowExecuteThreadPool, + processInstanceExecCacheManager, + dataQualityResultOperator)); + taskExecuteRunnable.addEvent(taskEvent); } public void eventHandler() { @@ -103,7 +98,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor { } public void executeEvent(TaskExecuteRunnable taskExecuteThread) { - if (taskExecuteThread.eventSize() == 0) { + if (taskExecuteThread.isEmpty()) { return; } if (multiThreadFilterMap.containsKey(taskExecuteThread.getKey())) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index ae7b4bdeb0..35d3155f43 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; +import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.collections4.CollectionUtils; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index a56a7d8c5a..c70fb1f1d4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.StateEvent; +import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 88d7391dde..7a15a43d2b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -35,15 +35,14 @@ import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; -import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; -import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.ProcessDag; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; @@ -63,14 +62,16 @@ import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.model.Property; -import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters; import org.apache.dolphinscheduler.remote.command.HostUpdateCommand; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; -import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; +import org.apache.dolphinscheduler.server.master.event.StateEvent; +import org.apache.dolphinscheduler.server.master.event.StateEventHandleError; +import org.apache.dolphinscheduler.server.master.event.StateEventHandleException; +import org.apache.dolphinscheduler.server.master.event.StateEventHandler; +import org.apache.dolphinscheduler.server.master.event.StateEventHandlerManager; import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor; import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; @@ -275,16 +276,37 @@ public class WorkflowExecuteRunnable implements Callable { if (!isStart) { return; } + StateEvent stateEvent = null; while (!this.stateEvents.isEmpty()) { try { - StateEvent stateEvent = this.stateEvents.peek(); - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId()); - if (stateEventHandler(stateEvent)) { + stateEvent = this.stateEvents.peek(); + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), + stateEvent.getTaskInstanceId()); + // if state handle success then will remove this state, otherwise will retry this state next time. + // The state should always handle success except database error. + checkProcessInstance(stateEvent); + + StateEventHandler stateEventHandler + = StateEventHandlerManager.getStateEventHandler(stateEvent.getType()) + .orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event")); + if (stateEventHandler.handleStateEvent(this, stateEvent)) { this.stateEvents.remove(stateEvent); } + } catch (StateEventHandleError stateEventHandleError) { + logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError); + this.stateEvents.remove(stateEvent); + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); + } catch (StateEventHandleException stateEventHandleException) { + logger.error("State event handle error, will retry this event: {}", + stateEvent, + stateEventHandleException); + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } catch (Exception e) { // we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue. - logger.error("state handle error:", e); + logger.error("State event handle error, get a unknown exception, will retry this event: {}", + stateEvent, + e); + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } @@ -321,58 +343,14 @@ public class WorkflowExecuteRunnable implements Callable { return this.processInstance; } - private boolean stateEventHandler(StateEvent stateEvent) { - logger.info("process event: {}", stateEvent); - - if (!checkProcessInstance(stateEvent)) { - return false; - } - - boolean result = false; - switch (stateEvent.getType()) { - case PROCESS_STATE_CHANGE: - measureProcessState(stateEvent); - result = processStateChangeHandler(stateEvent); - break; - case TASK_STATE_CHANGE: - measureTaskState(stateEvent); - result = taskStateChangeHandler(stateEvent); - break; - case PROCESS_TIMEOUT: - ProcessInstanceMetrics.incProcessInstanceTimeout(); - result = processTimeout(); - break; - case TASK_TIMEOUT: - TaskMetrics.incTaskTimeout(); - result = taskTimeout(stateEvent); - break; - case WAIT_TASK_GROUP: - result = checkForceStartAndWakeUp(stateEvent); - break; - case TASK_RETRY: - TaskMetrics.incTaskRetry(); - result = taskRetryEventHandler(stateEvent); - break; - case PROCESS_BLOCKED: - result = processBlockHandler(stateEvent); - break; - default: - break; - } - - if (result) { - this.stateEvents.remove(stateEvent); - } - return result; - } - - private boolean checkForceStartAndWakeUp(StateEvent stateEvent) { + public boolean checkForceStartAndWakeUp(StateEvent stateEvent) { TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId()); if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) { TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); taskProcessor.action(TaskAction.DISPATCH); - this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); + this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(), + TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); return true; } if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) { @@ -387,79 +365,20 @@ public class WorkflowExecuteRunnable implements Callable { return false; } - private boolean taskTimeout(StateEvent stateEvent) { - if (!checkTaskInstanceByStateEvent(stateEvent)) { - return true; - } - - TaskInstance taskInstance = taskInstanceMap.get(stateEvent.getTaskInstanceId()); - if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) { - return true; - } - TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy(); - if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) { - ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); - taskProcessor.action(TaskAction.TIMEOUT); - } - if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) { - ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); - processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser); - } - return true; - } - - private boolean processTimeout() { + public void processTimeout() { ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); this.processAlertManager.sendProcessTimeoutAlert(this.processInstance, projectUser); - return true; } - private boolean taskStateChangeHandler(StateEvent stateEvent) { - if (!checkTaskInstanceByStateEvent(stateEvent)) { - return true; - } - - Optional taskInstanceOptional = getTaskInstance(stateEvent.getTaskInstanceId()); - TaskInstance task = taskInstanceOptional.orElseThrow( - () -> new RuntimeException("Cannot find task instance by task instance id: " + stateEvent.getTaskInstanceId())); - - if (task.getState() == null) { - logger.error("task state is null, state handler error: {}", stateEvent); - return true; - } - - if (task.getState().typeIsFinished()) { - if (completeTaskMap.containsKey(task.getTaskCode()) && completeTaskMap.get(task.getTaskCode()) == task.getId()) { - logger.warn("The task instance is already complete, stateEvent: {}", stateEvent); - return true; - } - taskFinished(task); - if (task.getTaskGroupId() > 0) { - releaseTaskGroup(task); - } - return true; - } - if (activeTaskProcessorMaps.containsKey(task.getTaskCode())) { - ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(task.getTaskCode()); - iTaskProcessor.action(TaskAction.RUN); - - if (iTaskProcessor.taskInstance().getState().typeIsFinished()) { - if (iTaskProcessor.taskInstance().getState() != task.getState()) { - task.setState(iTaskProcessor.taskInstance().getState()); - } - taskFinished(task); - } - return true; - } - logger.error("state handler error: {}", stateEvent); - - return true; + public void taskTimeout(TaskInstance taskInstance) { + ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); + processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser); } - private void taskFinished(TaskInstance taskInstance) { + public void taskFinished(TaskInstance taskInstance) throws StateEventHandleException { logger.info("TaskInstance finished task code:{} state:{} ", - taskInstance.getTaskCode(), - taskInstance.getState()); + taskInstance.getTaskCode(), + taskInstance.getState()); activeTaskProcessorMaps.remove(taskInstance.getTaskCode()); stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance); @@ -503,7 +422,7 @@ public class WorkflowExecuteRunnable implements Callable { * * @param taskInstance */ - private void releaseTaskGroup(TaskInstance taskInstance) { + public void releaseTaskGroup(TaskInstance taskInstance) { if (taskInstance.getTaskGroupId() > 0) { TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance); if (nextTaskInstance != null) { @@ -528,13 +447,15 @@ public class WorkflowExecuteRunnable implements Callable { * * @param taskInstance */ - private void retryTaskInstance(TaskInstance taskInstance) { + private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException { if (!taskInstance.taskCanRetry()) { return; } TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance); if (newTaskInstance == null) { - logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId()); + logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}", + taskInstance.getTaskCode(), + taskInstance.getId()); return; } waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance); @@ -556,20 +477,6 @@ public class WorkflowExecuteRunnable implements Callable { } } - /** - * handle task retry event - * - * @param stateEvent - * @return - */ - private boolean taskRetryEventHandler(StateEvent stateEvent) { - TaskInstance taskInstance = waitToRetryTaskInstanceMap.get(stateEvent.getTaskCode()); - addTaskToStandByList(taskInstance); - submitStandByTask(); - waitToRetryTaskInstanceMap.remove(stateEvent.getTaskCode()); - return true; - } - /** * update process instance */ @@ -603,43 +510,23 @@ public class WorkflowExecuteRunnable implements Callable { /** * check process instance by state event */ - public boolean checkProcessInstance(StateEvent stateEvent) { + public void checkProcessInstance(StateEvent stateEvent) throws StateEventHandleError { if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) { - logger.error("mismatch process instance id: {}, state event:{}", this.processInstance.getId(), stateEvent); - return false; + throw new StateEventHandleError("The event doesn't contains process instance id"); } - return true; } /** * check if task instance exist by state event */ - public boolean checkTaskInstanceByStateEvent(StateEvent stateEvent) { + public void checkTaskInstanceByStateEvent(StateEvent stateEvent) throws StateEventHandleError { if (stateEvent.getTaskInstanceId() == 0) { - logger.error("task instance id null, state event:{}", stateEvent); - return false; + throw new StateEventHandleError("The taskInstanceId is 0"); } if (!taskInstanceMap.containsKey(stateEvent.getTaskInstanceId())) { - logger.error("mismatch task instance id, event:{}", stateEvent); - return false; - } - return true; - } - - /** - * check if task instance exist by task code - */ - public boolean checkTaskInstanceByCode(long taskCode) { - if (taskInstanceMap.isEmpty()) { - return false; - } - for (TaskInstance taskInstance : taskInstanceMap.values()) { - if (taskInstance.getTaskCode() == taskCode) { - return true; - } + throw new StateEventHandleError("Cannot find the taskInstance from taskInstanceMap"); } - return false; } /** @@ -688,59 +575,13 @@ public class WorkflowExecuteRunnable implements Callable { return Optional.empty(); } - private boolean processStateChangeHandler(StateEvent stateEvent) { - try { - logger.info("process:{} state {} change to {}", processInstance.getId(), processInstance.getState(), stateEvent.getExecutionStatus()); - - if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) { - // serial wait execution type needs to wake up the waiting process - if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()) { - endProcess(); - return true; - } - this.updateProcessInstanceState(stateEvent); - return true; - } - - if (processComplementData()) { - return true; - } - if (stateEvent.getExecutionStatus().typeIsFinished()) { - endProcess(); - } - if (processInstance.getState() == ExecutionStatus.READY_STOP) { - killAllTasks(); - } - return true; - } catch (Exception e) { - logger.error("process state change error:", e); - } - return true; - } - - private boolean processBlockHandler(StateEvent stateEvent) { - try { - Optional taskInstanceOptional = getTaskInstance(stateEvent.getTaskInstanceId()); - TaskInstance task = taskInstanceOptional.orElseThrow( - () -> new RuntimeException("Cannot find taskInstance by taskInstanceId:" + stateEvent.getTaskInstanceId())); - if (!checkTaskInstanceByStateEvent(stateEvent)) { - logger.error("task {} is not a blocking task", task.getTaskCode()); - return false; - } - - BlockingParameters parameters = JSONUtils.parseObject(task.getTaskParams(), BlockingParameters.class); - if (parameters.isAlertWhenBlocking()) { - ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); - processAlertManager.sendProcessBlockingAlert(processInstance, projectUser); - logger.info("processInstance {} block alert send successful!", processInstance.getId()); - } - } catch (Exception e) { - logger.error("sending blocking message error:", e); - } - return true; + public void processBlock() { + ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); + processAlertManager.sendProcessBlockingAlert(processInstance, projectUser); + logger.info("processInstance {} block alert send successful!", processInstance.getId()); } - private boolean processComplementData() throws Exception { + public boolean processComplementData() { if (!needComplementProcess()) { return false; } @@ -937,7 +778,7 @@ public class WorkflowExecuteRunnable implements Callable { /** * init task queue */ - private void initTaskQueue() { + private void initTaskQueue() throws StateEventHandleException { taskFailedSubmit = false; activeTaskProcessorMaps.clear(); @@ -946,7 +787,8 @@ public class WorkflowExecuteRunnable implements Callable { errorTaskMap.clear(); if (!isNewProcessInstance()) { - List validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); + List validTaskInstanceList + = processService.findValidTaskListByProcessId(processInstance.getId()); for (TaskInstance task : validTaskInstanceList) { if (validTaskMap.containsKey(task.getTaskCode())) { int oldTaskInstanceId = validTaskMap.get(task.getTaskCode()); @@ -956,7 +798,8 @@ public class WorkflowExecuteRunnable implements Callable { processService.updateTaskInstance(task); continue; } - logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}", task.getTaskCode()); + logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}", + task.getTaskCode()); } validTaskMap.put(task.getTaskCode(), task.getId()); @@ -1099,6 +942,7 @@ public class WorkflowExecuteRunnable implements Callable { Host host = new Host(taskInstance.getHost()); nettyExecutorManager.doExecute(host, hostUpdateCommand.convert2Command()); } catch (Exception e) { + // Do we need to catch this exception? logger.error("notify process host update", e); } } @@ -1348,8 +1192,11 @@ public class WorkflowExecuteRunnable implements Callable { return validTaskInstanceList; } - private void submitPostNode(String parentNodeCode) { - Set submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap()); + private void submitPostNode(String parentNodeCode) throws StateEventHandleException { + Set submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, + skipTaskNodeMap, + dag, + getCompleteTaskInstanceMap()); List taskInstances = new ArrayList<>(); for (String taskNode : submitTaskNodeList) { TaskNode taskNodeObject = dag.getNode(taskNode); @@ -1692,35 +1539,19 @@ public class WorkflowExecuteRunnable implements Callable { return true; } - try { - Map cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); - Date endTime = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); - return processInstance.getScheduleTime().equals(endTime); - } catch (Exception e) { - logger.error("complement end failed ", e); - return false; - } + Map cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); + Date endTime = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); + return processInstance.getScheduleTime().equals(endTime); } /** * updateProcessInstance process instance state * after each batch of tasks is executed, the status of the process instance is updated */ - private void updateProcessInstanceState() { + private void updateProcessInstanceState() throws StateEventHandleException { ExecutionStatus state = getProcessInstanceState(processInstance); if (processInstance.getState() != state) { - logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", - processInstance.getId(), - processInstance.getName(), - processInstance.getState(), - state, - processInstance.getCommandType()); - - processInstance.setState(state); - if (state.typeIsFinished()) { - processInstance.setEndTime(new Date()); - } - processService.updateProcessInstance(processInstance); + updateWorkflowInstanceStatesToDB(state); StateEvent stateEvent = new StateEvent(); stateEvent.setExecutionStatus(processInstance.getState()); @@ -1735,21 +1566,33 @@ public class WorkflowExecuteRunnable implements Callable { /** * stateEvent's execution status as process instance state */ - private void updateProcessInstanceState(StateEvent stateEvent) { + public void updateProcessInstanceState(StateEvent stateEvent) throws StateEventHandleException { ExecutionStatus state = stateEvent.getExecutionStatus(); - if (processInstance.getState() != state) { + updateWorkflowInstanceStatesToDB(state); + } + + private void updateWorkflowInstanceStatesToDB(ExecutionStatus newStates) throws StateEventHandleException { + ExecutionStatus originStates = processInstance.getState(); + if (originStates != newStates) { logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", - processInstance.getId(), - processInstance.getName(), - processInstance.getState(), - state, - processInstance.getCommandType()); - - processInstance.setState(state); - if (state.typeIsFinished()) { + processInstance.getId(), + processInstance.getName(), + originStates, + newStates, + processInstance.getCommandType()); + + processInstance.setState(newStates); + if (newStates.typeIsFinished()) { processInstance.setEndTime(new Date()); } - processService.updateProcessInstance(processInstance); + try { + processService.updateProcessInstance(processInstance); + } catch (Exception ex) { + // recover the status + processInstance.setState(originStates); + processInstance.setEndTime(null); + throw new StateEventHandleException("Update process instance status to DB error", ex); + } } } @@ -1768,18 +1611,17 @@ public class WorkflowExecuteRunnable implements Callable { * * @param taskInstance task instance */ - private void addTaskToStandByList(TaskInstance taskInstance) { - try { - if (readyToSubmitTaskQueue.contains(taskInstance)) { - logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode()); - return; - } - logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}", - taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode()); - readyToSubmitTaskQueue.put(taskInstance); - } catch (Exception e) { - logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e); + public void addTaskToStandByList(TaskInstance taskInstance) { + if (readyToSubmitTaskQueue.contains(taskInstance)) { + logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode()); + return; } + logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}", + taskInstance.getName(), + taskInstance.getId(), + taskInstance.getTaskCode()); + TaskMetrics.incTaskSubmit(); + readyToSubmitTaskQueue.put(taskInstance); } /** @@ -1788,15 +1630,7 @@ public class WorkflowExecuteRunnable implements Callable { * @param taskInstance task instance */ private void removeTaskFromStandbyList(TaskInstance taskInstance) { - logger.info("remove task from stand by list, id: {} name:{}", taskInstance.getId(), taskInstance.getName()); - try { - readyToSubmitTaskQueue.remove(taskInstance); - } catch (Exception e) { - logger.error("remove task instance from readyToSubmitTaskQueue error, task id:{}, Name: {}", - taskInstance.getId(), - taskInstance.getName(), - e); - } + readyToSubmitTaskQueue.remove(taskInstance); } /** @@ -1816,10 +1650,10 @@ public class WorkflowExecuteRunnable implements Callable { /** * close the on going tasks */ - private void killAllTasks() { + public void killAllTasks() { logger.info("kill called on process instance id: {}, num: {}", - processInstance.getId(), - activeTaskProcessorMaps.size()); + processInstance.getId(), + activeTaskProcessorMaps.size()); if (readyToSubmitTaskQueue.size() > 0) { readyToSubmitTaskQueue.clear(); @@ -1854,7 +1688,7 @@ public class WorkflowExecuteRunnable implements Callable { /** * handling the list of tasks to be submitted */ - private void submitStandByTask() { + public void submitStandByTask() throws StateEventHandleException { int length = readyToSubmitTaskQueue.size(); for (int i = 0; i < length; i++) { TaskInstance task = readyToSubmitTaskQueue.peek(); @@ -2013,6 +1847,19 @@ public class WorkflowExecuteRunnable implements Callable { } } + public Map getCompleteTaskMap() { + return completeTaskMap; + } + + public Map getActiveTaskProcessMap() { + return activeTaskProcessorMaps; + } + + public Map getWaitToRetryTaskInstanceMap() { + return waitToRetryTaskInstanceMap; + } + + private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map cmdParam) { // get start params from command param Map startParamMap = new HashMap<>(); @@ -2047,46 +1894,4 @@ public class WorkflowExecuteRunnable implements Callable { } } - private void measureProcessState(StateEvent processStateEvent) { - if (processStateEvent.getExecutionStatus().typeIsFinished()) { - ProcessInstanceMetrics.incProcessInstanceFinish(); - } - switch (processStateEvent.getExecutionStatus()) { - case STOP: - ProcessInstanceMetrics.incProcessInstanceStop(); - break; - case SUCCESS: - ProcessInstanceMetrics.incProcessInstanceSuccess(); - break; - case FAILURE: - ProcessInstanceMetrics.incProcessInstanceFailure(); - break; - default: - break; - } - } - - private void measureTaskState(StateEvent taskStateEvent) { - if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) { - // the event is broken - logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent); - return; - } - if (taskStateEvent.getExecutionStatus().typeIsFinished()) { - TaskMetrics.incTaskFinish(); - } - switch (taskStateEvent.getExecutionStatus()) { - case STOP: - TaskMetrics.incTaskStop(); - break; - case SUCCESS: - TaskMetrics.incTaskSuccess(); - break; - case FAILURE: - TaskMetrics.incTaskFailure(); - break; - default: - break; - } - } } \ No newline at end of file diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index ba097109be..bb49d9dea1 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.enums.Flag; -import org.apache.dolphinscheduler.common.enums.StateEvent; +import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; @@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.lang.StringUtils; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java index 663cba4d90..313a43c0e0 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java @@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.master.service; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.NodeType; -import org.apache.dolphinscheduler.common.enums.StateEvent; +import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.LoggerUtils; diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java index fc9bcca3aa..98bf514730 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java @@ -26,7 +26,7 @@ import static org.mockito.Mockito.doNothing; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.NodeType; -import org.apache.dolphinscheduler.common.enums.StateEvent; +import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRemoteChannel.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRemoteChannel.java index 247e4066f8..0d623e398a 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRemoteChannel.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRemoteChannel.java @@ -17,12 +17,13 @@ package org.apache.dolphinscheduler.remote.processor; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.Host; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; + /** * callback channel */ @@ -72,7 +73,7 @@ public class NettyRemoteChannel { return this.channel.isActive(); } - public ChannelFuture writeAndFlush(Command command){ + public ChannelFuture writeAndFlush(Command command) { return this.channel.writeAndFlush(command); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java index af51831068..be564261fb 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.utils.Host; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; @@ -71,19 +72,19 @@ public class StateEventCallbackService { * @param host * @return callback channel */ - private NettyRemoteChannel newRemoteChannel(Host host) { + private Optional newRemoteChannel(Host host) { Channel newChannel; NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(host.getAddress()); if (nettyRemoteChannel != null) { if (nettyRemoteChannel.isActive()) { - return nettyRemoteChannel; + return Optional.of(nettyRemoteChannel); } } newChannel = nettyRemotingClient.getChannel(host); if (newChannel != null) { - return newRemoteChannel(newChannel, host.getAddress()); + return Optional.of(newRemoteChannel(newChannel, host.getAddress())); } - return null; + return Optional.empty(); } public long pause(int ntries) { @@ -110,16 +111,26 @@ public class StateEventCallbackService { } /** - * send result + * Send the command to target address, this method doesn't guarantee the command send success. * - * @param command command + * @param command command need tp send */ public void sendResult(String address, int port, Command command) { logger.info("send result, host:{}, command:{}", address, command.toString()); Host host = new Host(address, port); - NettyRemoteChannel nettyRemoteChannel = newRemoteChannel(host); - if (nettyRemoteChannel != null) { + sendResult(host, command); + } + + /** + * Send the command to target host, this method doesn't guarantee the command send success. + * + * @param host target host + * @param command command need to send + */ + public void sendResult(Host host, Command command) { + logger.info("send result, host:{}, command:{}", host.getAddress(), command.toString()); + newRemoteChannel(host).ifPresent(nettyRemoteChannel -> { nettyRemoteChannel.writeAndFlush(command); - } + }); } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java index cc7f39e402..0cf03abe8d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java @@ -51,10 +51,9 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue { * @param taskInfo taskInfo * @throws TaskPriorityQueueException */ - void put(T taskInfo) throws TaskPriorityQueueException; + void put(T taskInfo); /** * take taskInfo diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java index fef5f8ff79..0d0eebe03d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java @@ -40,10 +40,9 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue { * put task takePriorityInfo * * @param taskPriorityInfo takePriorityInfo - * @throws TaskPriorityQueueException */ @Override - public void put(TaskPriority taskPriorityInfo) throws TaskPriorityQueueException { + public void put(TaskPriority taskPriorityInfo) { queue.put(taskPriorityInfo); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java index e080842c34..b62770984a 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java @@ -58,7 +58,13 @@ public class TaskExecuteResponseAckProcessor implements NettyRequestProcessor { if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) { ResponseCache.get().removeResponseCache(taskExecuteResponseAckCommand.getTaskInstanceId()); TaskCallbackService.remove(taskExecuteResponseAckCommand.getTaskInstanceId()); - logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskExecuteResponseAckCommand.getTaskInstanceId()); + logger.debug("remove REMOTE_CHANNELS, task instance id:{}", + taskExecuteResponseAckCommand.getTaskInstanceId()); + } else if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.FAILURE.getCode()) { + // master handle worker response error, will still retry + } else { + throw new IllegalArgumentException("Invalid task execute response ack status: " + + taskExecuteResponseAckCommand.getStatus()); } }