diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java index 292202501c..bc021e5e08 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java @@ -21,7 +21,7 @@ import com.baomidou.mybatisplus.annotation.EnumValue; public enum StateEventType { - PROCESS_STATE_CHANGE(0, "process statechange"), + PROCESS_STATE_CHANGE(0, "process state change"), TASK_STATE_CHANGE(1, "task state change"), PROCESS_TIMEOUT(2, "process timeout"), TASK_TIMEOUT(3, "task timeout"), diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java similarity index 97% rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java index 26c3a3beab..09f85d3f17 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.common.enums; -public enum Event { +public enum TaskEventType { DISPATCH, DELAY, RUNNING, diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java new file mode 100644 index 0000000000..fa3bfec0ca --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.utils; + +import org.apache.dolphinscheduler.dao.entity.TaskInstance; + +public class TaskInstanceUtils { + + /** + * Copy the property of given source {@link TaskInstance} to target. + * + * @param source Given task instance, copy from. + * @param target Given task instance, copy to + * @return a soft copy of given task instance. + */ + public static void copyTaskInstance(TaskInstance source, TaskInstance target) { + target.setId(source.getId()); + target.setName(source.getName()); + target.setTaskType(source.getTaskType()); + target.setProcessInstanceId(source.getProcessInstanceId()); + target.setTaskCode(source.getTaskCode()); + target.setTaskDefinitionVersion(source.getTaskDefinitionVersion()); + target.setProcessInstanceName(source.getProcessInstanceName()); + target.setTaskGroupPriority(source.getTaskGroupPriority()); + target.setState(source.getState()); + target.setFirstSubmitTime(source.getFirstSubmitTime()); + target.setSubmitTime(source.getSubmitTime()); + target.setStartTime(source.getStartTime()); + target.setEndTime(source.getEndTime()); + target.setHost(source.getHost()); + target.setExecutePath(source.getExecutePath()); + target.setLogPath(source.getLogPath()); + target.setRetryTimes(source.getRetryTimes()); + target.setAlertFlag(source.getAlertFlag()); + target.setProcessInstance(source.getProcessInstance()); + target.setProcessDefine(source.getProcessDefine()); + target.setTaskDefine(source.getTaskDefine()); + target.setPid(source.getPid()); + target.setAppLink(source.getAppLink()); + target.setFlag(source.getFlag()); + target.setDependency(source.getDependency()); + // todo: we need to cpoy the task params and then copy switchDependency, since the setSwitchDependency rely on task params, this is really a very bad practice. + target.setTaskParams(source.getTaskParams()); + target.setSwitchDependency(source.getSwitchDependency()); + target.setDuration(source.getDuration()); + target.setMaxRetryTimes(source.getMaxRetryTimes()); + target.setRetryInterval(source.getRetryInterval()); + target.setTaskInstancePriority(source.getTaskInstancePriority()); + target.setDependentResult(source.getDependentResult()); + target.setWorkerGroup(source.getWorkerGroup()); + target.setEnvironmentCode(source.getEnvironmentCode()); + target.setEnvironmentConfig(source.getEnvironmentConfig()); + target.setExecutorId(source.getExecutorId()); + target.setVarPool(source.getVarPool()); + target.setExecutorName(source.getExecutorName()); + target.setResources(source.getResources()); + target.setDelayTime(source.getDelayTime()); + target.setDryRun(source.getDryRun()); + target.setTaskGroupId(source.getTaskGroupId()); + target.setCpuQuota(source.getCpuQuota()); + target.setMemoryMax(source.getMemoryMax()); + } + +} diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtilsTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtilsTest.java new file mode 100644 index 0000000000..497c2fb881 --- /dev/null +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtilsTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.utils; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; + +import java.util.Date; +import java.util.HashMap; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class TaskInstanceUtilsTest { + + @Test + void copyTaskInstance() { + TaskInstance source = new TaskInstance(); + source.setId(1); + source.setName("source"); + source.setSubmitTime(new Date()); + source.setTaskParams(JSONUtils.toJsonString(new HashMap<>())); + TaskInstance target = new TaskInstance(); + TaskInstanceUtils.copyTaskInstance(source, target); + Assertions.assertEquals(target.getId(), source.getId()); + Assertions.assertEquals(target.getName(), source.getName()); + } +} \ No newline at end of file diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index d26d33c062..3469cd4852 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer; import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread; -import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; +import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.task.TaskPluginManager; @@ -59,7 +59,7 @@ public class MasterServer implements IStoppable { private TaskPluginManager taskPluginManager; @Autowired - private MasterSchedulerService masterSchedulerService; + private MasterSchedulerBootstrap masterSchedulerBootstrap; @Autowired private SchedulerApi schedulerApi; @@ -94,8 +94,8 @@ public class MasterServer implements IStoppable { this.masterRegistryClient.start(); this.masterRegistryClient.setRegistryStoppable(this); - this.masterSchedulerService.init(); - this.masterSchedulerService.start(); + this.masterSchedulerBootstrap.init(); + this.masterSchedulerBootstrap.start(); this.eventExecuteService.start(); this.failoverExecuteThread.start(); @@ -130,7 +130,7 @@ public class MasterServer implements IStoppable { ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis()); // close this.schedulerApi.close(); - this.masterSchedulerService.close(); + this.masterSchedulerBootstrap.close(); this.masterRPCServer.close(); this.masterRegistryClient.closeRegistry(); // close spring Context and will invoke method with @PreDestroy annotation to destroy beans. diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java new file mode 100644 index 0000000000..8e2dcced23 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.common.enums.TaskEventType; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class TaskDelayEventHandler implements TaskEventHandler { + + private final Logger logger = LoggerFactory.getLogger(TaskDelayEventHandler.class); + + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + + @Autowired + private ProcessService processService; + + @Autowired + private WorkflowExecuteThreadPool workflowExecuteThreadPool; + + @Override + public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError { + int taskInstanceId = taskEvent.getTaskInstanceId(); + int processInstanceId = taskEvent.getProcessInstanceId(); + + WorkflowExecuteRunnable workflowExecuteRunnable = + this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + if (workflowExecuteRunnable == null) { + sendAckToWorker(taskEvent); + throw new TaskEventHandleError("Cannot find related workflow instance from cache"); + } + Optional taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskInstanceId); + if (!taskInstanceOptional.isPresent()) { + sendAckToWorker(taskEvent); + return; + } + TaskInstance taskInstance = taskInstanceOptional.get(); + if (taskInstance.getState().typeIsFinished()) { + logger.warn( + "The current task status is: {}, will not handle the running event, this event is delay, will discard this event: {}", + taskInstance.getState(), + taskEvent); + sendAckToWorker(taskEvent); + return; + } + + TaskInstance oldTaskInstance = new TaskInstance(); + TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance); + try { + taskInstance.setState(taskEvent.getState()); + taskInstance.setStartTime(taskEvent.getStartTime()); + taskInstance.setHost(taskEvent.getWorkerAddress()); + taskInstance.setLogPath(taskEvent.getLogPath()); + taskInstance.setExecutePath(taskEvent.getExecutePath()); + taskInstance.setPid(taskEvent.getProcessId()); + taskInstance.setAppLink(taskEvent.getAppIds()); + if (!processService.updateTaskInstance(taskInstance)) { + throw new TaskEventHandleError("Handle task delay event error, update taskInstance to db failed"); + } + sendAckToWorker(taskEvent); + } catch (Exception ex) { + TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance); + if (ex instanceof TaskEventHandleError) { + throw ex; + } + throw new TaskEventHandleError("Handle task dispatch event error, update taskInstance to db failed", ex); + } + StateEvent stateEvent = new StateEvent(); + stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId()); + stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId()); + stateEvent.setExecutionStatus(taskEvent.getState()); + stateEvent.setType(StateEventType.TASK_STATE_CHANGE); + workflowExecuteThreadPool.submitStateEvent(stateEvent); + + } + + private void sendAckToWorker(TaskEvent taskEvent) { + // If event handle success, send ack to worker to otherwise the worker will retry this event + TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = + new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); + taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckCommand.convert2Command()); + } + + @Override + public TaskEventType getHandleEventType() { + return TaskEventType.DELAY; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java new file mode 100644 index 0000000000..9378f0c36e --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import org.apache.dolphinscheduler.common.enums.TaskEventType; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class TaskDispatchEventHandler implements TaskEventHandler { + + private final Logger logger = LoggerFactory.getLogger(TaskDispatchEventHandler.class); + + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + + @Autowired + private ProcessService processService; + + @Override + public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError { + int taskInstanceId = taskEvent.getTaskInstanceId(); + int processInstanceId = taskEvent.getProcessInstanceId(); + + WorkflowExecuteRunnable workflowExecuteRunnable = + this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + if (workflowExecuteRunnable == null) { + throw new TaskEventHandleError("Cannot find related workflow instance from cache"); + } + TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId) + .orElseThrow(() -> new TaskEventHandleError("Cannot find related taskInstance from cache")); + if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) { + logger.warn( + "The current taskInstance status is not SUBMITTED_SUCCESS, so the dispatch event will be discarded, the current is a delay event, event: {}", + taskEvent); + return; + } + + // todo: we need to just log the old status and rollback these two field, no need to copy all fields + TaskInstance oldTaskInstance = new TaskInstance(); + TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance); + // update the taskInstance status + taskInstance.setState(ExecutionStatus.DISPATCH); + taskInstance.setHost(taskEvent.getWorkerAddress()); + try { + if (!processService.updateTaskInstance(taskInstance)) { + throw new TaskEventHandleError("Handle task dispatch event error, update taskInstance to db failed"); + } + } catch (Exception ex) { + // rollback status + TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance); + if (ex instanceof TaskEventHandleError) { + throw ex; + } + throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed", ex); + } + } + + @Override + public TaskEventType getHandleEventType() { + return TaskEventType.DISPATCH; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleError.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleError.java new file mode 100644 index 0000000000..deae719f4b --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleError.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +public class TaskEventHandleError extends Exception { + + public TaskEventHandleError(String message) { + super(message); + } + + public TaskEventHandleError(String message, Throwable throwable) { + super(message, throwable); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleException.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleException.java new file mode 100644 index 0000000000..e53a2bbcc3 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +public class TaskEventHandleException extends Exception { + + public TaskEventHandleException(String message) { + super(message); + } + + public TaskEventHandleException(String message, Throwable throwable) { + super(message, throwable); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandler.java new file mode 100644 index 0000000000..5d214cbb5a --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandler.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import org.apache.dolphinscheduler.common.enums.TaskEventType; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; + +public interface TaskEventHandler { + + /** + * Handle the task event + * + * @throws TaskEventHandleError this exception means we will discord this event. + * @throws TaskEventHandleException this exception means we need to retry this event + */ + void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, TaskEventHandleException; + + TaskEventType getHandleEventType(); +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java new file mode 100644 index 0000000000..d09a8364ca --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import org.apache.dolphinscheduler.common.enums.TaskEventType; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class TaskRejectByWorkerEventHandler implements TaskEventHandler { + + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + + @Override + public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError { + int taskInstanceId = taskEvent.getTaskInstanceId(); + int processInstanceId = taskEvent.getProcessInstanceId(); + + WorkflowExecuteRunnable workflowExecuteRunnable = + this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + if (workflowExecuteRunnable == null) { + sendAckToWorker(taskEvent); + throw new TaskEventHandleError( + "Handle task reject event error, cannot find related workflow instance from cache, will discard this event"); + } + TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId).orElseThrow(() -> { + sendAckToWorker(taskEvent); + return new TaskEventHandleError( + "Handle task reject event error, cannot find the taskInstance from cache, will discord this event"); + }); + try { + // todo: If the worker submit multiple reject response to master, the task instance may be dispatch multiple, + // we need to control the worker overload by master rather than worker + // if the task resubmit and the worker failover, this task may be dispatch twice? + // todo: we need to clear the taskInstance host and rollback the status to submit. + workflowExecuteRunnable.resubmit(taskInstance.getTaskCode()); + sendAckToWorker(taskEvent); + } catch (Exception ex) { + throw new TaskEventHandleError("Handle task reject event error", ex); + } + + } + + public void sendAckToWorker(TaskEvent taskEvent) { + TaskRecallAckCommand taskRecallAckCommand = + new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); + taskEvent.getChannel().writeAndFlush(taskRecallAckCommand.convert2Command()); + } + + @Override + public TaskEventType getHandleEventType() { + return TaskEventType.WORKER_REJECT; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java new file mode 100644 index 0000000000..67df03682f --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.common.enums.TaskEventType; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; +import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.Optional; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class TaskResultEventHandler implements TaskEventHandler { + + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + + @Autowired + private WorkflowExecuteThreadPool workflowExecuteThreadPool; + + @Autowired + private DataQualityResultOperator dataQualityResultOperator; + + @Autowired + private ProcessService processService; + + @Override + public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, TaskEventHandleException { + int taskInstanceId = taskEvent.getTaskInstanceId(); + int processInstanceId = taskEvent.getProcessInstanceId(); + + WorkflowExecuteRunnable workflowExecuteRunnable = + this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + if (workflowExecuteRunnable == null) { + sendAckToWorker(taskEvent); + throw new TaskEventHandleError( + "Handle task result event error, cannot find related workflow instance from cache, will discard this event"); + } + Optional taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskInstanceId); + if (!taskInstanceOptional.isPresent()) { + sendAckToWorker(taskEvent); + throw new TaskEventHandleError( + "Handle task result event error, cannot find the taskInstance from cache, will discord this event"); + } + TaskInstance taskInstance = taskInstanceOptional.get(); + if (taskInstance.getState().typeIsFinished()) { + sendAckToWorker(taskEvent); + throw new TaskEventHandleError( + "Handle task result event error, the task instance is already finished, will discord this event"); + } + dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance); + + TaskInstance oldTaskInstance = new TaskInstance(); + TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance); + try { + taskInstance.setStartTime(taskEvent.getStartTime()); + taskInstance.setHost(taskEvent.getWorkerAddress()); + taskInstance.setLogPath(taskEvent.getLogPath()); + taskInstance.setExecutePath(taskEvent.getExecutePath()); + taskInstance.setPid(taskEvent.getProcessId()); + taskInstance.setAppLink(taskEvent.getAppIds()); + taskInstance.setState(taskEvent.getState()); + taskInstance.setEndTime(taskEvent.getEndTime()); + taskInstance.setVarPool(taskEvent.getVarPool()); + processService.changeOutParam(taskInstance); + processService.updateTaskInstance(taskInstance); + sendAckToWorker(taskEvent); + } catch (Exception ex) { + TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance); + throw new TaskEventHandleError("Handle task result event error, save taskInstance to db error", ex); + } + StateEvent stateEvent = new StateEvent(); + stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId()); + stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId()); + stateEvent.setExecutionStatus(taskEvent.getState()); + stateEvent.setType(StateEventType.TASK_STATE_CHANGE); + workflowExecuteThreadPool.submitStateEvent(stateEvent); + + } + + public void sendAckToWorker(TaskEvent taskEvent) { + TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = + new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); + taskEvent.getChannel().writeAndFlush(taskExecuteResponseAckCommand.convert2Command()); + } + + @Override + public TaskEventType getHandleEventType() { + return TaskEventType.RESULT; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java new file mode 100644 index 0000000000..31152973a2 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.common.enums.TaskEventType; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class TaskRunningEventHandler implements TaskEventHandler { + private final Logger logger = LoggerFactory.getLogger(TaskRunningEventHandler.class); + + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + + @Autowired + private WorkflowExecuteThreadPool workflowExecuteThreadPool; + + @Autowired + private ProcessService processService; + + @Override + public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError { + int taskInstanceId = taskEvent.getTaskInstanceId(); + int processInstanceId = taskEvent.getProcessInstanceId(); + + WorkflowExecuteRunnable workflowExecuteRunnable = + this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + if (workflowExecuteRunnable == null) { + sendAckToWorker(taskEvent); + throw new TaskEventHandleError( + "Handle task running event error, cannot find related workflow instance from cache, will discard this event"); + } + Optional taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskInstanceId); + if (!taskInstanceOptional.isPresent()) { + sendAckToWorker(taskEvent); + throw new TaskEventHandleError( + "Handle running event error, cannot find the taskInstance from cache, will discord this event"); + } + TaskInstance taskInstance = taskInstanceOptional.get(); + if (taskInstance.getState().typeIsFinished()) { + sendAckToWorker(taskEvent); + throw new TaskEventHandleError( + "Handle task running event error, this task instance is already finished, this event is delay, will discard this event"); + } + + TaskInstance oldTaskInstance = new TaskInstance(); + TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance); + try { + taskInstance.setState(taskEvent.getState()); + taskInstance.setStartTime(taskEvent.getStartTime()); + taskInstance.setHost(taskEvent.getWorkerAddress()); + taskInstance.setLogPath(taskEvent.getLogPath()); + taskInstance.setExecutePath(taskEvent.getExecutePath()); + taskInstance.setPid(taskEvent.getProcessId()); + taskInstance.setAppLink(taskEvent.getAppIds()); + if (!processService.updateTaskInstance(taskInstance)) { + throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed"); + } + sendAckToWorker(taskEvent); + } catch (Exception ex) { + TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance); + if (ex instanceof TaskEventHandleError) { + throw ex; + } + throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed", ex); + } + + StateEvent stateEvent = new StateEvent(); + stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId()); + stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId()); + stateEvent.setExecutionStatus(taskEvent.getState()); + stateEvent.setType(StateEventType.TASK_STATE_CHANGE); + workflowExecuteThreadPool.submitStateEvent(stateEvent); + } + + private void sendAckToWorker(TaskEvent taskEvent) { + // If event handle success, send ack to worker to otherwise the worker will retry this event + TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = + new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); + taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckCommand.convert2Command()); + } + + @Override + public TaskEventType getHandleEventType() { + return TaskEventType.RUNNING; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEvent.java new file mode 100644 index 0000000000..5d5bc2eb32 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class WorkflowEvent { + + private WorkflowEventType workflowEventType; + + private int workflowInstanceId; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleError.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleError.java new file mode 100644 index 0000000000..651f714037 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleError.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +public class WorkflowEventHandleError extends Exception { + + public WorkflowEventHandleError(String message) { + super(message); + } + + public WorkflowEventHandleError(String message, Throwable throwable) { + super(message, throwable); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleException.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleException.java new file mode 100644 index 0000000000..c9b84fd10d --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +public class WorkflowEventHandleException extends Exception { + + public WorkflowEventHandleException(String message) { + super(message); + } + + public WorkflowEventHandleException(String message, Throwable throwable) { + super(message, throwable); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandler.java new file mode 100644 index 0000000000..600d321566 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandler.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +public interface WorkflowEventHandler { + + /** + * Handle a workflow event, + * + * @throws WorkflowEventHandleError if this exception happen, means the event is broken, need to drop this event. + * @throws WorkflowEventHandleException if this exception happen, means we need to retry this event. + */ + void handleWorkflowEvent(WorkflowEvent workflowEvent) throws WorkflowEventHandleError, WorkflowEventHandleException; + + WorkflowEventType getHandleWorkflowEventType(); +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventQueue.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventQueue.java new file mode 100644 index 0000000000..86c8b90cfa --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventQueue.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import java.util.concurrent.LinkedBlockingQueue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component +public class WorkflowEventQueue { + + private final Logger logger = LoggerFactory.getLogger(WorkflowEventQueue.class); + + private static final LinkedBlockingQueue workflowEventQueue = new LinkedBlockingQueue<>(); + + /** + * Add a workflow event. + */ + public void addEvent(WorkflowEvent workflowEvent) { + workflowEventQueue.add(workflowEvent); + logger.info("Added workflow event to workflowEvent queue, event: {}", workflowEvent); + } + + /** + * Pool the head of the workflow event queue and wait an workflow event. + */ + public WorkflowEvent poolEvent() throws InterruptedException { + return workflowEventQueue.take(); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventType.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventType.java new file mode 100644 index 0000000000..b0f5f09e30 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +public enum WorkflowEventType { + + START_WORKFLOW, + ; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java new file mode 100644 index 0000000000..b4d9fc1f85 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.event; + +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; +import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; +import org.apache.dolphinscheduler.server.master.runner.WorkflowSubmitStatue; + +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class WorkflowStartEventHandler implements WorkflowEventHandler { + + private final Logger logger = LoggerFactory.getLogger(WorkflowStartEventHandler.class); + + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + + @Autowired + private StateWheelExecuteThread stateWheelExecuteThread; + + @Autowired + private WorkflowExecuteThreadPool workflowExecuteThreadPool; + + @Autowired + private WorkflowEventQueue workflowEventQueue; + + @Override + public void handleWorkflowEvent(WorkflowEvent workflowEvent) throws WorkflowEventHandleError { + logger.info("Handle workflow start event, begin to start a workflow, event: {}", workflowEvent); + WorkflowExecuteRunnable workflowExecuteRunnable = + processInstanceExecCacheManager.getByProcessInstanceId(workflowEvent.getWorkflowInstanceId()); + if (workflowExecuteRunnable == null) { + throw new WorkflowEventHandleError( + "The workflow start event is invalid, cannot find the workflow instance from cache"); + } + ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance(); + + ProcessInstanceMetrics.incProcessInstanceSubmit(); + CompletableFuture workflowSubmitFuture = + CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool); + workflowSubmitFuture.thenAccept(workflowSubmitStatue -> { + if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) { + // submit failed will resend the event to workflow event queue + logger.info("Success submit the workflow instance"); + if (processInstance.getTimeout() > 0) { + stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); + } + } else { + logger.error("Failed to submit the workflow instance, will resend the workflow start event: {}", + workflowEvent); + workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW, + processInstance.getId())); + } + }); + } + + @Override + public WorkflowEventType getHandleWorkflowEventType() { + return WorkflowEventType.START_WORKFLOW; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java index 37d8ceb1da..3abdd879bb 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java @@ -41,15 +41,14 @@ public class WorkflowStateEventHandler implements StateEventHandler { ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance(); ProcessDefinition processDefinition = processInstance.getProcessDefinition(); - logger.info("process:{} state {} change to {}", - processInstance.getId(), - processInstance.getState(), - stateEvent.getExecutionStatus()); + logger.info( + "Handle workflow instance state event, the current workflow instance state {} will be changed to {}", + processInstance.getState(), stateEvent.getExecutionStatus()); if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) { // serial wait execution type needs to wake up the waiting process if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType() - .typeIsSerialPriority()) { + .typeIsSerialPriority()) { workflowExecuteRunnable.endProcess(); return true; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java index f90277d0eb..18afd11ae5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.master.processor; -import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; @@ -26,6 +25,7 @@ import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService; import org.slf4j.Logger; @@ -67,11 +67,12 @@ public class StateEventProcessor implements NettyRequestProcessor { stateEvent.setType(type); try { - LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId()); + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), + stateEvent.getTaskInstanceId()); - logger.info("Received state event change command, event: {}", stateEvent); - stateEventResponseService.addResponse(stateEvent); - }finally { + logger.info("Received state change command, event: {}", stateEvent); + stateEventResponseService.addStateChangeEvent(stateEvent); + } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java index bdf80bbee9..80c90ee1b7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java @@ -42,9 +42,6 @@ import org.springframework.stereotype.Component; import io.netty.channel.Channel; -/** - * task manager - */ @Component public class StateEventResponseService { @@ -96,7 +93,7 @@ public class StateEventResponseService { /** * put task to attemptQueue */ - public void addResponse(StateEvent stateEvent) { + public void addStateChangeEvent(StateEvent stateEvent) { try { // check the event is validated eventQueue.put(stateEvent); @@ -154,6 +151,7 @@ public class StateEventResponseService { } WorkflowExecuteRunnable workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); + // We will refresh the task instance status first, if the refresh failed the event will not be removed switch (stateEvent.getType()) { case TASK_STATE_CHANGE: workflowExecuteThread.refreshTaskInstance(stateEvent.getTaskInstanceId()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java index 842bcaf333..248d253739 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.server.master.processor.queue; -import org.apache.dolphinscheduler.common.enums.Event; +import org.apache.dolphinscheduler.common.enums.TaskEventType; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; @@ -83,7 +83,7 @@ public class TaskEvent { /** * ack / response */ - private Event event; + private TaskEventType event; /** * varPool @@ -102,7 +102,7 @@ public class TaskEvent { event.setProcessInstanceId(processInstanceId); event.setTaskInstanceId(taskInstanceId); event.setWorkerAddress(workerAddress); - event.setEvent(Event.DISPATCH); + event.setEvent(TaskEventType.DISPATCH); return event; } @@ -116,7 +116,7 @@ public class TaskEvent { event.setLogPath(command.getLogPath()); event.setChannel(channel); event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress()); - event.setEvent(Event.RUNNING); + event.setEvent(TaskEventType.RUNNING); return event; } @@ -134,7 +134,7 @@ public class TaskEvent { event.setVarPool(command.getVarPool()); event.setChannel(channel); event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress()); - event.setEvent(Event.RESULT); + event.setEvent(TaskEventType.RESULT); return event; } @@ -143,7 +143,7 @@ public class TaskEvent { event.setTaskInstanceId(command.getTaskInstanceId()); event.setProcessInstanceId(command.getProcessInstanceId()); event.setChannel(channel); - event.setEvent(Event.WORKER_REJECT); + event.setEvent(TaskEventType.WORKER_REJECT); return event; } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java index 9fc96e7564..3c05671a10 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java @@ -17,29 +17,18 @@ package org.apache.dolphinscheduler.server.master.processor.queue; -import org.apache.dolphinscheduler.common.enums.Event; -import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.common.enums.TaskEventType; import org.apache.dolphinscheduler.common.utils.LoggerUtils; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; -import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand; -import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand; -import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand; -import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; -import org.apache.dolphinscheduler.server.master.event.StateEvent; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; -import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator; -import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.server.master.event.TaskEventHandleError; +import org.apache.dolphinscheduler.server.master.event.TaskEventHandleException; +import org.apache.dolphinscheduler.server.master.event.TaskEventHandler; -import java.util.Optional; +import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.channel.Channel; - /** * task execute thread */ @@ -51,34 +40,37 @@ public class TaskExecuteRunnable implements Runnable { private final ConcurrentLinkedQueue events = new ConcurrentLinkedQueue<>(); - private ProcessService processService; - - private WorkflowExecuteThreadPool workflowExecuteThreadPool; - - private ProcessInstanceExecCacheManager processInstanceExecCacheManager; - - private DataQualityResultOperator dataQualityResultOperator; + private final Map taskEventHandlerMap; - public TaskExecuteRunnable(int processInstanceId, ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool, - ProcessInstanceExecCacheManager processInstanceExecCacheManager, DataQualityResultOperator dataQualityResultOperator) { + public TaskExecuteRunnable(int processInstanceId, Map taskEventHandlerMap) { this.processInstanceId = processInstanceId; - this.processService = processService; - this.workflowExecuteThreadPool = workflowExecuteThreadPool; - this.processInstanceExecCacheManager = processInstanceExecCacheManager; - this.dataQualityResultOperator = dataQualityResultOperator; + this.taskEventHandlerMap = taskEventHandlerMap; } @Override public void run() { while (!this.events.isEmpty()) { + // we handle the task event belongs to one task serial, so if the event comes in wrong order, TaskEvent event = this.events.peek(); try { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId()); - persist(event); - } catch (Exception e) { - logger.error("persist task event error, event:{}", event, e); + logger.info("Handle task event begin: {}", event); + taskEventHandlerMap.get(event.getEvent()).handleTaskEvent(event); + events.remove(event); + logger.info("Handle task event finished: {}", event); + } catch (TaskEventHandleException taskEventHandleException) { + // we don't need to resubmit this event, since the worker will resubmit this event + logger.error("Handle task event failed, this event will be retry later, event: {}", event, + taskEventHandleException); + } catch (TaskEventHandleError taskEventHandleError) { + logger.error("Handle task event error, this event will be removed, event: {}", event, + taskEventHandleError); + events.remove(event); + } catch (Exception unknownException) { + logger.error("Handle task event error, get a unknown exception, this event will be removed, event: {}", + event, unknownException); + events.remove(event); } finally { - this.events.remove(event); LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } @@ -109,156 +101,4 @@ public class TaskExecuteRunnable implements Runnable { return this.events.add(event); } - /** - * persist task event - * - * @param taskEvent taskEvent - */ - private void persist(TaskEvent taskEvent) throws Exception { - Event event = taskEvent.getEvent(); - int taskInstanceId = taskEvent.getTaskInstanceId(); - int processInstanceId = taskEvent.getProcessInstanceId(); - - Optional taskInstance; - WorkflowExecuteRunnable workflowExecuteRunnable = - this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); - if (workflowExecuteRunnable != null && workflowExecuteRunnable.checkTaskInstanceById(taskInstanceId)) { - taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId); - } else { - taskInstance = Optional.ofNullable(processService.findTaskInstanceById(taskInstanceId)); - } - - boolean needToSendEvent = true; - switch (event) { - case DISPATCH: - needToSendEvent = handleDispatchEvent(taskEvent, taskInstance); - // dispatch event do not need to submit state event - break; - case DELAY: - case RUNNING: - needToSendEvent = handleRunningEvent(taskEvent, taskInstance); - break; - case RESULT: - needToSendEvent = handleResultEvent(taskEvent, taskInstance); - break; - case WORKER_REJECT: - needToSendEvent = - handleWorkerRejectEvent(taskEvent.getChannel(), taskInstance, workflowExecuteRunnable); - break; - default: - throw new IllegalArgumentException("invalid event type : " + event); - } - if (!needToSendEvent) { - logger.info("Handle task event: {} success, there is no need to send a StateEvent", taskEvent); - return; - } - - StateEvent stateEvent = new StateEvent(); - stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId()); - stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId()); - stateEvent.setExecutionStatus(taskEvent.getState()); - stateEvent.setType(StateEventType.TASK_STATE_CHANGE); - workflowExecuteThreadPool.submitStateEvent(stateEvent); - } - - /** - * handle dispatch event - */ - private boolean handleDispatchEvent(TaskEvent taskEvent, Optional taskInstanceOptional) { - if (!taskInstanceOptional.isPresent()) { - logger.error("taskInstance is null"); - return false; - } - TaskInstance taskInstance = taskInstanceOptional.get(); - if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) { - return false; - } - taskInstance.setState(ExecutionStatus.DISPATCH); - taskInstance.setHost(taskEvent.getWorkerAddress()); - processService.saveTaskInstance(taskInstance); - return true; - } - - /** - * handle running event - */ - private boolean handleRunningEvent(TaskEvent taskEvent, Optional taskInstanceOptional) { - Channel channel = taskEvent.getChannel(); - if (taskInstanceOptional.isPresent()) { - TaskInstance taskInstance = taskInstanceOptional.get(); - if (taskInstance.getState().typeIsFinished()) { - logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", - taskInstance.getId(), - taskInstance.getState()); - return false; - } else { - taskInstance.setState(taskEvent.getState()); - taskInstance.setStartTime(taskEvent.getStartTime()); - taskInstance.setHost(taskEvent.getWorkerAddress()); - taskInstance.setLogPath(taskEvent.getLogPath()); - taskInstance.setExecutePath(taskEvent.getExecutePath()); - taskInstance.setPid(taskEvent.getProcessId()); - taskInstance.setAppLink(taskEvent.getAppIds()); - processService.saveTaskInstance(taskInstance); - } - } - // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success - // send ack to worker - TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = - new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); - channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command()); - return true; - } - - /** - * handle result event - */ - private boolean handleResultEvent(TaskEvent taskEvent, Optional taskInstanceOptional) { - Channel channel = taskEvent.getChannel(); - if (taskInstanceOptional.isPresent()) { - TaskInstance taskInstance = taskInstanceOptional.get(); - if (taskInstance.getState().typeIsFinished()) { - logger.warn("The current taskInstance has already been finished, taskEvent: {}", taskEvent); - return false; - } - - dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance); - - taskInstance.setStartTime(taskEvent.getStartTime()); - taskInstance.setHost(taskEvent.getWorkerAddress()); - taskInstance.setLogPath(taskEvent.getLogPath()); - taskInstance.setExecutePath(taskEvent.getExecutePath()); - taskInstance.setPid(taskEvent.getProcessId()); - taskInstance.setAppLink(taskEvent.getAppIds()); - taskInstance.setState(taskEvent.getState()); - taskInstance.setEndTime(taskEvent.getEndTime()); - taskInstance.setVarPool(taskEvent.getVarPool()); - processService.changeOutParam(taskInstance); - processService.saveTaskInstance(taskInstance); - } - // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success - TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = - new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); - channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command()); - return true; - } - - /** - * handle result event - */ - private boolean handleWorkerRejectEvent(Channel channel, - Optional taskInstanceOptional, - WorkflowExecuteRunnable executeThread) throws Exception { - TaskInstance taskInstance = - taskInstanceOptional.orElseThrow(() -> new RuntimeException("taskInstance is null")); - if (executeThread != null) { - executeThread.resubmit(taskInstance.getTaskCode()); - } - if (channel != null) { - TaskRecallAckCommand taskRecallAckCommand = - new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskInstance.getId()); - channel.writeAndFlush(taskRecallAckCommand.convert2Command()); - } - return true; - } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java index ae578d4c03..8d5f2fd3db 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java @@ -17,12 +17,14 @@ package org.apache.dolphinscheduler.server.master.processor.queue; +import org.apache.dolphinscheduler.common.enums.TaskEventType; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; -import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator; -import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.server.master.event.TaskEventHandler; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.PostConstruct; @@ -48,21 +50,10 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor { @Autowired private ProcessInstanceExecCacheManager processInstanceExecCacheManager; - /** - * process service - */ - @Autowired - private ProcessService processService; - - /** - * data quality result operator - */ @Autowired - private DataQualityResultOperator dataQualityResultOperator; + private List taskEventHandlerList; - - @Autowired - private WorkflowExecuteThreadPool workflowExecuteThreadPool; + private Map taskEventHandlerMap = new HashMap<>(); /** * task event thread map @@ -75,6 +66,8 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor { this.setThreadNamePrefix("Task-Execute-Thread-"); this.setMaxPoolSize(masterConfig.getExecThreads()); this.setCorePoolSize(masterConfig.getExecThreads()); + taskEventHandlerList.forEach( + taskEventHandler -> taskEventHandlerMap.put(taskEventHandler.getHandleEventType(), taskEventHandler)); } public void submitTaskEvent(TaskEvent taskEvent) { @@ -83,11 +76,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor { return; } TaskExecuteRunnable taskExecuteRunnable = taskExecuteThreadMap.computeIfAbsent(taskEvent.getProcessInstanceId(), - (processInstanceId) -> new TaskExecuteRunnable(processInstanceId, - processService, - workflowExecuteThreadPool, - processInstanceExecCacheManager, - dataQualityResultOperator)); + (processInstanceId) -> new TaskExecuteRunnable(processInstanceId, taskEventHandlerMap)); taskExecuteRunnable.addEvent(taskEvent); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java similarity index 57% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java index e88d84312c..870cc1b24c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java @@ -30,6 +30,9 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; +import org.apache.dolphinscheduler.server.master.event.WorkflowEvent; +import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue; +import org.apache.dolphinscheduler.server.master.event.WorkflowEventType; import org.apache.dolphinscheduler.server.master.exception.MasterException; import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; @@ -43,9 +46,7 @@ import org.apache.commons.collections4.CollectionUtils; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import org.slf4j.Logger; @@ -53,18 +54,13 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import lombok.NonNull; - /** * Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed. */ @Service -public class MasterSchedulerService extends BaseDaemonThread { +public class MasterSchedulerBootstrap extends BaseDaemonThread { - /** - * logger of MasterSchedulerService - */ - private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class); + private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerBootstrap.class); @Autowired private ProcessService processService; @@ -83,12 +79,6 @@ public class MasterSchedulerService extends BaseDaemonThread { */ private ThreadPoolExecutor masterPrepareExecService; - /** - * workflow exec service - */ - @Autowired - private WorkflowExecuteThreadPool workflowExecuteThreadPool; - @Autowired private ProcessInstanceExecCacheManager processInstanceExecCacheManager; @@ -98,13 +88,15 @@ public class MasterSchedulerService extends BaseDaemonThread { @Autowired private CuringParamsService curingGlobalParamsService; - private final LinkedBlockingQueue submitFailedProcessInstances = new LinkedBlockingQueue<>(); + @Autowired + private WorkflowEventQueue workflowEventQueue; - private Thread failedProcessInstanceResubmitThread; + @Autowired + private WorkflowEventLooper workflowEventLooper; private String masterAddress; - protected MasterSchedulerService() { + protected MasterSchedulerBootstrap() { super("MasterCommandLoopThread"); } @@ -114,23 +106,19 @@ public class MasterSchedulerService extends BaseDaemonThread { public void init() { this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads()); this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort()); - this.failedProcessInstanceResubmitThread = new FailedProcessInstanceResubmitThread(submitFailedProcessInstances); - ProcessInstanceMetrics.registerProcessInstanceResubmitGauge(submitFailedProcessInstances::size); } @Override public synchronized void start() { - logger.info("Master schedule service starting.."); + logger.info("Master schedule bootstrap starting.."); super.start(); - this.failedProcessInstanceResubmitThread.start(); - logger.info("Master schedule service started..."); + workflowEventLooper.start(); + logger.info("Master schedule bootstrap started..."); } public void close() { - logger.info("Master schedule service stopping..."); - // these process instances will be failover, so we can safa clear here - submitFailedProcessInstances.clear(); - logger.info("Master schedule service stopped..."); + logger.info("Master schedule bootstrap stopping..."); + logger.info("Master schedule bootstrap stopped..."); } /** @@ -140,15 +128,51 @@ public class MasterSchedulerService extends BaseDaemonThread { public void run() { while (Stopper.isRunning()) { try { - boolean isOverload = OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory()); + // todo: if the workflow event queue is much, we need to handle the back pressure + boolean isOverload = + OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory()); if (isOverload) { MasterServerMetrics.incMasterOverload(); Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } - scheduleWorkflow(); + List commands = findCommands(); + if (CollectionUtils.isEmpty(commands)) { + // indicate that no command ,sleep for 1s + Thread.sleep(Constants.SLEEP_TIME_MILLIS); + continue; + } + + List processInstances = command2ProcessInstance(commands); + if (CollectionUtils.isEmpty(processInstances)) { + // indicate that the command transform to processInstance error, sleep for 1s + Thread.sleep(Constants.SLEEP_TIME_MILLIS); + continue; + } + MasterServerMetrics.incMasterConsumeCommand(commands.size()); + + processInstances.forEach(processInstance -> { + try { + LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); + if (processInstanceExecCacheManager.contains(processInstance.getId())) { + logger.error("The workflow instance is already been cached, this case shouldn't be happened"); + } + WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance, + processService, + nettyExecutorManager, + processAlertManager, + masterConfig, + stateWheelExecuteThread, + curingGlobalParamsService); + processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable); + workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW, + processInstance.getId())); + } finally { + LoggerUtils.removeWorkflowInstanceIdMDC(); + } + }); } catch (InterruptedException interruptedException) { - logger.warn("Master schedule service interrupted, close the loop", interruptedException); + logger.warn("Master schedule bootstrap interrupted, close the loop", interruptedException); Thread.currentThread().interrupt(); break; } catch (Exception e) { @@ -159,72 +183,9 @@ public class MasterSchedulerService extends BaseDaemonThread { } } - /** - * Query command from database by slot, and transform to workflow instance, then submit to workflowExecuteThreadPool. - */ - private void scheduleWorkflow() throws InterruptedException, MasterException { - List commands = findCommands(); - if (CollectionUtils.isEmpty(commands)) { - // indicate that no command ,sleep for 1s - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - return; - } - - List processInstances = command2ProcessInstance(commands); - if (CollectionUtils.isEmpty(processInstances)) { - // indicate that the command transform to processInstance error, sleep for 1s - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - return; - } - MasterServerMetrics.incMasterConsumeCommand(commands.size()); - - for (ProcessInstance processInstance : processInstances) { - submitProcessInstance(processInstance); - } - } - - private void submitProcessInstance(@NonNull ProcessInstance processInstance) { - try { - LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); - logger.info("Master schedule service starting workflow instance"); - final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable( - processInstance - , processService - , nettyExecutorManager - , processAlertManager - , masterConfig - , stateWheelExecuteThread - , curingGlobalParamsService); - - this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable); - if (processInstance.getTimeout() > 0) { - stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); - } - ProcessInstanceMetrics.incProcessInstanceSubmit(); - CompletableFuture workflowSubmitFuture = CompletableFuture.supplyAsync( - workflowExecuteRunnable::call, workflowExecuteThreadPool); - workflowSubmitFuture.thenAccept(workflowSubmitStatue -> { - if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) { - // submit failed - processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId()); - stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId()); - submitFailedProcessInstances.add(processInstance); - } - }); - logger.info("Master schedule service started workflow instance"); - - } catch (Exception ex) { - processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId()); - stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId()); - logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex); - } finally { - LoggerUtils.removeWorkflowInstanceIdMDC(); - } - } - private List command2ProcessInstance(List commands) throws InterruptedException { long commandTransformStartTime = System.currentTimeMillis(); - logger.info("Master schedule service transforming command to ProcessInstance, commandSize: {}", commands.size()); + logger.info("Master schedule bootstrap transforming command to ProcessInstance, commandSize: {}", commands.size()); List processInstances = Collections.synchronizedList(new ArrayList<>(commands.size())); CountDownLatch latch = new CountDownLatch(commands.size()); for (final Command command : commands) { @@ -254,7 +215,7 @@ public class MasterSchedulerService extends BaseDaemonThread { // make sure to finish handling command each time before next scan latch.await(); - logger.info("Master schedule service transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}", + logger.info("Master schedule bootstrap transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}", commands.size(), processInstances.size()); ProcessInstanceMetrics.recordProcessInstanceGenerateTime(System.currentTimeMillis() - commandTransformStartTime); return processInstances; @@ -273,7 +234,7 @@ public class MasterSchedulerService extends BaseDaemonThread { int pageSize = masterConfig.getFetchCommandNum(); final List result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot); if (CollectionUtils.isNotEmpty(result)) { - logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}", + logger.info("Master schedule bootstrap loop command success, command size: {}, current slot: {}, total slot size: {}", result.size(), thisMasterSlot, masterCount); } ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime); @@ -297,34 +258,4 @@ public class MasterSchedulerService extends BaseDaemonThread { return state; } - private class FailedProcessInstanceResubmitThread extends Thread { - - private final LinkedBlockingQueue submitFailedProcessInstances; - - public FailedProcessInstanceResubmitThread(LinkedBlockingQueue submitFailedProcessInstances) { - logger.info("Starting workflow resubmit thread"); - this.submitFailedProcessInstances = submitFailedProcessInstances; - this.setDaemon(true); - this.setName("SubmitFailedProcessInstanceHandleThread"); - logger.info("Started workflow resubmit thread"); - } - - @Override - public void run() { - while (Stopper.isRunning()) { - try { - ProcessInstance processInstance = submitFailedProcessInstances.take(); - submitProcessInstance(processInstance); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.warn("SubmitFailedProcessInstanceHandleThread has been interrupted, will return"); - break; - } - - // avoid the failed-fast cause CPU higher - ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); - } - } - } - } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java new file mode 100644 index 0000000000..ee2e70bfd0 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; +import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.server.master.event.WorkflowEvent; +import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandleError; +import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandleException; +import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandler; +import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue; +import org.apache.dolphinscheduler.server.master.event.WorkflowEventType; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.PostConstruct; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class WorkflowEventLooper extends BaseDaemonThread { + + private final Logger logger = LoggerFactory.getLogger(WorkflowEventLooper.class); + + @Autowired + private WorkflowEventQueue workflowEventQueue; + + @Autowired + private List workflowEventHandlerList; + + private final Map workflowEventHandlerMap = new HashMap<>(); + + protected WorkflowEventLooper() { + super("WorkflowEventLooper"); + } + + @PostConstruct + public void init() { + workflowEventHandlerList.forEach(workflowEventHandler -> workflowEventHandlerMap.put(workflowEventHandler.getHandleWorkflowEventType(), + workflowEventHandler)); + } + + @Override + public synchronized void start() { + logger.info("WorkflowEventLooper thread starting"); + super.start(); + logger.info("WorkflowEventLooper thread started"); + } + + public void run() { + WorkflowEvent workflowEvent = null; + while (Stopper.isRunning()) { + try { + workflowEvent = workflowEventQueue.poolEvent(); + LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId()); + logger.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent); + WorkflowEventHandler workflowEventHandler = + workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType()); + workflowEventHandler.handleWorkflowEvent(workflowEvent); + } catch (InterruptedException e) { + logger.warn("WorkflowEventLooper thread is interrupted, will close this loop", e); + Thread.currentThread().interrupt(); + break; + } catch (WorkflowEventHandleException workflowEventHandleException) { + logger.error("Handle workflow event failed, will add this event to event queue again, event: {}", + workflowEvent, workflowEventHandleException); + workflowEventQueue.addEvent(workflowEvent); + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); + } catch (WorkflowEventHandleError workflowEventHandleError) { + logger.error("Handle workflow event error, will drop this event, event: {}", + workflowEvent, + workflowEventHandleError); + } catch (Exception unknownException) { + logger.error( + "Handle workflow event failed, get a unknown exception, will add this event to event queue again, event: {}", + workflowEvent, unknownException); + workflowEventQueue.addEvent(workflowEvent); + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); + } finally { + LoggerUtils.removeWorkflowInstanceIdMDC(); + } + } + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 1834e413f8..94f5f51b80 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -119,39 +119,18 @@ import lombok.NonNull; */ public class WorkflowExecuteRunnable implements Callable { - /** - * logger of WorkflowExecuteThread - */ private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteRunnable.class); - /** - * process service - */ private final ProcessService processService; - /** - * alert manager - */ private final ProcessAlertManager processAlertManager; - /** - * netty executor manager - */ private final NettyExecutorManager nettyExecutorManager; - /** - * process instance - */ private final ProcessInstance processInstance; - /** - * process definition - */ private ProcessDefinition processDefinition; - /** - * the object of DAG - */ private DAG dag; /** @@ -159,10 +138,8 @@ public class WorkflowExecuteRunnable implements Callable { */ private String key; - /** - * start flag, true: start nodes submit completely - */ - private volatile boolean isStart = false; + + private WorkflowRunnableStatus workflowRunnableStatus = WorkflowRunnableStatus.CREATED; /** * submit failure nodes @@ -224,7 +201,7 @@ public class WorkflowExecuteRunnable implements Callable { private final ConcurrentLinkedQueue stateEvents = new ConcurrentLinkedQueue<>(); /** - * ready to submit task queue + * The StandBy task list, will be executed, need to know, the taskInstance in this queue may doesn't have id. */ private final PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue(); @@ -234,14 +211,8 @@ public class WorkflowExecuteRunnable implements Callable { */ private final Map waitToRetryTaskInstanceMap = new ConcurrentHashMap<>(); - /** - * state wheel execute thread - */ private final StateWheelExecuteThread stateWheelExecuteThread; - /** - * curing global params service - */ private final CuringParamsService curingParamsService; private final String masterAddress; @@ -276,14 +247,17 @@ public class WorkflowExecuteRunnable implements Callable { * the process start nodes are submitted completely. */ public boolean isStart() { - return this.isStart; + return WorkflowRunnableStatus.STARTED == workflowRunnableStatus; } /** * handle event */ public void handleEvents() { - if (!isStart) { + if (!isStart()) { + logger.info( + "The workflow instance is not started, will not handle its state event, current state event size: {}", + stateEvents); return; } StateEvent stateEvent = null; @@ -387,45 +361,53 @@ public class WorkflowExecuteRunnable implements Callable { } public void taskFinished(TaskInstance taskInstance) throws StateEventHandleException { - logger.info("TaskInstance finished task code:{} state:{} ", - taskInstance.getTaskCode(), - taskInstance.getState()); - - activeTaskProcessorMaps.remove(taskInstance.getTaskCode()); - stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance); - stateWheelExecuteThread.removeTask4RetryCheck(processInstance, taskInstance); - stateWheelExecuteThread.removeTask4StateCheck(processInstance, taskInstance); - - if (taskInstance.getState().typeIsSuccess()) { - completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); - // todo: merge the last taskInstance - processInstance.setVarPool(taskInstance.getVarPool()); - processService.saveProcessInstance(processInstance); - if (!processInstance.isBlocked()) { - submitPostNode(Long.toString(taskInstance.getTaskCode())); - } - } else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) { - // retry task - logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState()); - retryTaskInstance(taskInstance); - } else if (taskInstance.getState().typeIsFailure()) { - completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); - // There are child nodes and the failure policy is: CONTINUE - if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE - && DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) { - submitPostNode(Long.toString(taskInstance.getTaskCode())); - } else { - errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); - if (processInstance.getFailureStrategy() == FailureStrategy.END) { - killAllTasks(); + logger.info("TaskInstance finished task code:{} state:{}", taskInstance.getTaskCode(), taskInstance.getState()); + try { + + activeTaskProcessorMaps.remove(taskInstance.getTaskCode()); + stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance); + stateWheelExecuteThread.removeTask4RetryCheck(processInstance, taskInstance); + stateWheelExecuteThread.removeTask4StateCheck(processInstance, taskInstance); + + if (taskInstance.getState().typeIsSuccess()) { + completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); + // todo: merge the last taskInstance + processInstance.setVarPool(taskInstance.getVarPool()); + processService.saveProcessInstance(processInstance); + if (!processInstance.isBlocked()) { + submitPostNode(Long.toString(taskInstance.getTaskCode())); + } + } else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) { + // retry task + logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState()); + retryTaskInstance(taskInstance); + } else if (taskInstance.getState().typeIsFailure()) { + completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); + // There are child nodes and the failure policy is: CONTINUE + if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode( + Long.toString(taskInstance.getTaskCode()), + dag)) { + submitPostNode(Long.toString(taskInstance.getTaskCode())); + } else { + errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); + if (processInstance.getFailureStrategy() == FailureStrategy.END) { + killAllTasks(); + } } + } else if (taskInstance.getState().typeIsFinished()) { + // todo: when the task instance type is pause, then it should not in completeTaskMap + completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); } - } else if (taskInstance.getState().typeIsFinished()) { - // todo: when the task instance type is pause, then it should not in completeTaskMap - completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); + logger.info("TaskInstance finished will try to update the workflow instance state, task code:{} state:{}", + taskInstance.getTaskCode(), + taskInstance.getState()); + this.updateProcessInstanceState(); + } catch (Exception ex) { + logger.error("Task finish failed, get a exception, will remove this taskInstance from completeTaskMap", ex); + // remove the task from complete map, so that we can finish in the next time. + completeTaskMap.remove(taskInstance.getTaskCode()); + throw ex; } - - this.updateProcessInstanceState(); } /** @@ -672,17 +654,29 @@ public class WorkflowExecuteRunnable implements Callable { */ @Override public WorkflowSubmitStatue call() { - if (this.taskInstanceMap.size() > 0 || isStart) { - logger.warn("The workflow has already been started"); + if (isStart()) { + // This case should not been happened + logger.warn("[WorkflowInstance-{}] The workflow has already been started", processInstance.getId()); return WorkflowSubmitStatue.DUPLICATED_SUBMITTED; } try { LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); - buildFlowDag(); - initTaskQueue(); - submitPostNode(null); - isStart = true; + if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) { + buildFlowDag(); + workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG; + logger.info("workflowStatue changed to :{}", workflowRunnableStatus); + } + if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) { + initTaskQueue(); + workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE; + logger.info("workflowStatue changed to :{}", workflowRunnableStatus); + } + if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) { + submitPostNode(null); + workflowRunnableStatus = WorkflowRunnableStatus.STARTED; + logger.info("workflowStatue changed to :{}", workflowRunnableStatus); + } return WorkflowSubmitStatue.SUCCESS; } catch (Exception e) { logger.error("Start workflow error", e); @@ -749,9 +743,6 @@ public class WorkflowExecuteRunnable implements Callable { * @throws Exception exception */ private void buildFlowDag() throws Exception { - if (this.dag != null) { - return; - } processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); processInstance.setProcessDefinition(processDefinition); @@ -886,9 +877,9 @@ public class WorkflowExecuteRunnable implements Callable { } } logger.info("Initialize task queue, dependFailedTaskMap: {}, completeTaskMap: {}, errorTaskMap: {}", - dependFailedTaskMap, - completeTaskMap, - errorTaskMap); + dependFailedTaskMap, + completeTaskMap, + errorTaskMap); } /** @@ -911,7 +902,11 @@ public class WorkflowExecuteRunnable implements Callable { boolean submit = taskProcessor.action(TaskAction.SUBMIT); if (!submit) { - logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!", processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName()); + logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!", + processInstance.getId(), + processInstance.getName(), + taskInstance.getId(), + taskInstance.getName()); return Optional.empty(); } @@ -962,7 +957,10 @@ public class WorkflowExecuteRunnable implements Callable { } return Optional.of(taskInstance); } catch (Exception e) { - logger.error("submit standby task error", e); + logger.error("submit standby task error, taskCode: {}, taskInstanceId: {}", + taskInstance.getTaskCode(), + taskInstance.getId(), + e); return Optional.empty(); } } @@ -1215,9 +1213,19 @@ public class WorkflowExecuteRunnable implements Callable { */ private Map getCompleteTaskInstanceMap() { Map completeTaskInstanceMap = new HashMap<>(); - for (Integer taskInstanceId : completeTaskMap.values()) { + for (Map.Entry entry : completeTaskMap.entrySet()) { + Long taskConde = entry.getKey(); + Integer taskInstanceId = entry.getValue(); TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId); + if (taskInstance == null) { + logger.warn("Cannot find the taskInstance from taskInstanceMap, taskInstanceId: {}, taskConde: {}", + taskInstanceId, + taskConde); + // This case will happen when we submit to db failed, then the taskInstanceId is 0 + continue; + } completeTaskInstanceMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance); + } return completeTaskInstanceMap; } @@ -1429,6 +1437,7 @@ public class WorkflowExecuteRunnable implements Callable { */ private boolean processFailed() { if (hasFailedTask()) { + logger.info("The current process has failed task, the current process failed"); if (processInstance.getFailureStrategy() == FailureStrategy.END) { return true; } @@ -1504,22 +1513,29 @@ public class WorkflowExecuteRunnable implements Callable { if (activeTaskProcessorMaps.size() > 0 || hasRetryTaskInStandBy()) { // active task and retry task exists - return runningState(state); + ExecutionStatus executionStatus = runningState(state); + logger.info("The workflowInstance has task running, the workflowInstance status is {}", executionStatus); + return executionStatus; } // block if (state == ExecutionStatus.READY_BLOCK) { - return processReadyBlock(); + ExecutionStatus executionStatus = processReadyBlock(); + logger.info("The workflowInstance is ready to block, the workflowInstance status is {}", executionStatus); } // waiting thread if (hasWaitingThreadTask()) { + logger.info("The workflowInstance has waiting thread task, the workflow status is {}", + ExecutionStatus.WAITING_THREAD); return ExecutionStatus.WAITING_THREAD; } // pause if (state == ExecutionStatus.READY_PAUSE) { - return processReadyPause(); + ExecutionStatus executionStatus = processReadyPause(); + logger.info("The workflowInstance is ready to pause, the workflow status is {}", executionStatus); + return executionStatus; } // stop @@ -1527,15 +1543,18 @@ public class WorkflowExecuteRunnable implements Callable { List stopList = getCompleteTaskByState(ExecutionStatus.STOP); List killList = getCompleteTaskByState(ExecutionStatus.KILL); List failList = getCompleteTaskByState(ExecutionStatus.FAILURE); + ExecutionStatus executionStatus; if (CollectionUtils.isNotEmpty(stopList) || CollectionUtils.isNotEmpty(killList) || CollectionUtils.isNotEmpty(failList) || !isComplementEnd()) { - return ExecutionStatus.STOP; + executionStatus = ExecutionStatus.STOP; } else { - return ExecutionStatus.SUCCESS; + executionStatus = ExecutionStatus.SUCCESS; } + logger.info("The workflowInstance is ready to stop, the workflow status is {}", executionStatus); } // process failure if (processFailed()) { + logger.info("The workflowInstance is failed, the workflow status is {}", ExecutionStatus.FAILURE); return ExecutionStatus.FAILURE; } @@ -1579,15 +1598,21 @@ public class WorkflowExecuteRunnable implements Callable { private void updateProcessInstanceState() throws StateEventHandleException { ExecutionStatus state = getProcessInstanceState(processInstance); if (processInstance.getState() != state) { + logger.info("Update workflowInstance states, origin state: {}, target state: {}", + processInstance.getState(), + state); updateWorkflowInstanceStatesToDB(state); StateEvent stateEvent = new StateEvent(); stateEvent.setExecutionStatus(processInstance.getState()); stateEvent.setProcessInstanceId(this.processInstance.getId()); stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE); - // this.processStateChangeHandler(stateEvent); // replace with `stateEvents`, make sure `WorkflowExecuteThread` can be deleted to avoid memory leaks this.stateEvents.add(stateEvent); + } else { + logger.info("There is no need to update the workflow instance state, origin state: {}, target state: {}", + processInstance.getState(), + state); } } @@ -1602,12 +1627,9 @@ public class WorkflowExecuteRunnable implements Callable { private void updateWorkflowInstanceStatesToDB(ExecutionStatus newStates) throws StateEventHandleException { ExecutionStatus originStates = processInstance.getState(); if (originStates != newStates) { - logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", - processInstance.getId(), - processInstance.getName(), - originStates, - newStates, - processInstance.getCommandType()); + logger.info("Begin to update workflow instance state , state will change from {} to {}", + originStates, + newStates); processInstance.setState(newStates); if (newStates.typeIsFinished()) { @@ -1657,8 +1679,8 @@ public class WorkflowExecuteRunnable implements Callable { * * @param taskInstance task instance */ - private void removeTaskFromStandbyList(TaskInstance taskInstance) { - readyToSubmitTaskQueue.remove(taskInstance); + private boolean removeTaskFromStandbyList(TaskInstance taskInstance) { + return readyToSubmitTaskQueue.remove(taskInstance); } /** @@ -1748,10 +1770,20 @@ public class WorkflowExecuteRunnable implements Callable { if (!taskInstanceOptional.isPresent()) { this.taskFailedSubmit = true; // Remove and add to complete map and error map - removeTaskFromStandbyList(task); + if (!removeTaskFromStandbyList(task)) { + logger.error( + "Task submit failed, remove from standby list failed, workflowInstanceId: {}, taskCode: {}", + processInstance.getId(), + task.getTaskCode()); + } completeTaskMap.put(task.getTaskCode(), task.getId()); + taskInstanceMap.put(task.getId(), task); errorTaskMap.put(task.getTaskCode(), task.getId()); - logger.error("Task submitted failed, processInstanceId: {}, taskInstanceId: {}", task.getProcessInstanceId(), task.getId()); + activeTaskProcessorMaps.remove(task.getTaskCode()); + logger.error("Task submitted failed, workflowInstanceId: {}, taskInstanceId: {}, taskCode: {}", + task.getProcessInstanceId(), + task.getId(), + task.getTaskCode()); } else { removeTaskFromStandbyList(task); } @@ -1927,4 +1959,10 @@ public class WorkflowExecuteRunnable implements Callable { } } + private enum WorkflowRunnableStatus { + CREATED, INITIALIZE_DAG, INITIALIZE_QUEUE, STARTED, + ; + + } + } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java index 3d33d8c363..0cd81dc3e2 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.remote.command; -import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.utils.JSONUtils; import java.io.Serializable; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 5024a944f1..075991c955 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -1284,8 +1284,8 @@ public class ProcessServiceImpl implements ProcessService { break; } logger.error( - "task commit to db failed , taskId {} has already retry {} times, please check the database", - taskInstance.getId(), + "task commit to db failed , taskCode: {} has already retry {} times, please check the database", + taskInstance.getTaskCode(), retryTimes); Thread.sleep(commitInterval); } catch (Exception e) { @@ -1298,6 +1298,7 @@ public class ProcessServiceImpl implements ProcessService { } /** + * // todo: This method need to refactor, we find when the db down, but the taskInstanceId is not 0. It's better to change to void, rather than return TaskInstance * submit task to db * submit sub process to command * @@ -1316,9 +1317,9 @@ public class ProcessServiceImpl implements ProcessService { TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance); if (task == null) { logger.error("Save taskInstance to db error, task name:{}, process id:{} state: {} ", - taskInstance.getName(), - taskInstance.getProcessInstance(), - processInstance.getState()); + taskInstance.getName(), + taskInstance.getProcessInstance().getId(), + processInstance.getState()); return null; } @@ -1328,8 +1329,8 @@ public class ProcessServiceImpl implements ProcessService { logger.info( "End save taskInstance to db successfully:{}, taskInstanceName: {}, taskInstance state:{}, processInstanceId:{}, processInstanceState: {}", - taskInstance.getId(), - taskInstance.getName(), + task.getId(), + task.getName(), task.getState(), processInstance.getId(), processInstance.getState()); @@ -1564,7 +1565,10 @@ public class ProcessServiceImpl implements ProcessService { public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) { ExecutionStatus processInstanceState = processInstance.getState(); if (processInstanceState.typeIsFinished() || processInstanceState == ExecutionStatus.READY_STOP) { - logger.warn("processInstance {} was {}, skip submit task", processInstance.getProcessDefinitionCode(), processInstanceState); + logger.warn("processInstance: {} state was: {}, skip submit this task, taskCode: {}", + processInstance.getId(), + processInstanceState, + taskInstance.getTaskCode()); return null; } if (processInstanceState == ExecutionStatus.READY_PAUSE) { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java index fb3c84da68..f8e47fc43a 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.server.worker.cache; -import org.apache.dolphinscheduler.common.enums.Event; +import org.apache.dolphinscheduler.common.enums.TaskEventType; import org.apache.dolphinscheduler.remote.command.Command; import java.util.Map; @@ -48,7 +48,7 @@ public class ResponseCache { * @param command command * @param event event ACK/RESULT */ - public void cache(Integer taskInstanceId, Command command, Event event) { + public void cache(Integer taskInstanceId, Command command, TaskEventType event) { switch (event) { case RUNNING: runningCache.put(taskInstanceId, command); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 3f70974344..21616a1f60 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.worker.processor; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; -import org.apache.dolphinscheduler.common.enums.Event; +import org.apache.dolphinscheduler.common.enums.TaskEventType; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.remote.NettyRemotingClient; @@ -236,7 +236,7 @@ public class TaskCallbackService { public void sendTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) { TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext); // add response cache - ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), Event.RUNNING); + ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), TaskEventType.RUNNING); send(taskExecutionContext.getTaskInstanceId(), command.convert2Command()); } @@ -256,7 +256,7 @@ public class TaskCallbackService { public void sendTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) { TaskExecuteResponseCommand command = buildTaskExecuteResponseCommand(taskExecutionContext); // add response cache - ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), Event.RESULT); + ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), TaskEventType.RESULT); send(taskExecutionContext.getTaskInstanceId(), command.convert2Command()); } @@ -270,7 +270,7 @@ public class TaskCallbackService { */ public void sendRecallCommand(TaskExecutionContext taskExecutionContext) { TaskRecallCommand taskRecallCommand = buildRecallCommand(taskExecutionContext); - ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), taskRecallCommand.convert2Command(), Event.WORKER_REJECT); + ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), taskRecallCommand.convert2Command(), TaskEventType.WORKER_REJECT); send(taskExecutionContext.getTaskInstanceId(), taskRecallCommand.convert2Command()); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java index f691f7c8de..73b8d84cf7 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java @@ -84,6 +84,7 @@ public class RetryReportTaskStatusThread extends BaseDaemonThread { private void retryRunningCommand(ResponseCache instance) { if (!instance.getRunningCache().isEmpty()) { Map runningCache = instance.getRunningCache(); + logger.info("Send task running retry command starting, waiting to retry size: {}", runningCache.size()); for (Map.Entry entry : runningCache.entrySet()) { Integer taskInstanceId = entry.getKey(); Command runningCommand = entry.getValue(); @@ -93,12 +94,14 @@ public class RetryReportTaskStatusThread extends BaseDaemonThread { logger.error("Retry send running command to master error, taskInstanceId: {}, command: {}", taskInstanceId, runningCommand); } } + logger.info("Send task running retry command finished, waiting to retry size: {}", runningCache.size()); } } private void retryResponseCommand(ResponseCache instance) { - if (!instance.getResponseCache().isEmpty()) { - Map responseCache = instance.getResponseCache(); + Map responseCache = instance.getResponseCache(); + if (!responseCache.isEmpty()) { + logger.info("Send task response retry command starting, waiting to retry size: {}", responseCache.size()); for (Map.Entry entry : responseCache.entrySet()) { Integer taskInstanceId = entry.getKey(); Command responseCommand = entry.getValue(); @@ -108,12 +111,14 @@ public class RetryReportTaskStatusThread extends BaseDaemonThread { logger.error("Retry send response command to master error, taskInstanceId: {}, command: {}", taskInstanceId, responseCommand); } } + logger.info("Send task response retry command finished, waiting to retry size: {}", responseCache.size()); } } private void retryRecallCommand(ResponseCache instance) { - if (!instance.getRecallCache().isEmpty()) { - Map recallCache = instance.getRecallCache(); + Map recallCache = instance.getRecallCache(); + if (!recallCache.isEmpty()) { + logger.info("Send task recall retry command starting, waiting to retry size: {}", recallCache.size()); for (Map.Entry entry : recallCache.entrySet()) { Integer taskInstanceId = entry.getKey(); Command responseCommand = entry.getValue(); @@ -123,6 +128,7 @@ public class RetryReportTaskStatusThread extends BaseDaemonThread { logger.error("Retry send recall command to master error, taskInstanceId: {}, command: {}", taskInstanceId, responseCommand); } } + logger.info("Send task recall retry command finished, waiting to retry size: {}", recallCache.size()); } } }