From 002e9ca9e3e955ad6d65333eef6c828a79ee2430 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Fri, 30 Aug 2024 10:07:16 +0800 Subject: [PATCH] [Improve] Move recover/repeat running to IWorkflowControlClient (#16545) --- ...erFailureTaskInstanceExecutorDelegate.java | 41 ++-- ...endedWorkflowInstanceExecutorDelegate.java | 41 ++-- ...nningWorkflowInstanceExecutorDelegate.java | 41 ++-- .../master/IWorkflowControlClient.java | 15 ++ ...lowInstanceRecoverFailureTasksRequest.java | 35 +++ ...owInstanceRecoverFailureTasksResponse.java | 39 ++++ ...lowInstanceRecoverSuspendTasksRequest.java | 35 +++ ...owInstanceRecoverSuspendTasksResponse.java | 39 ++++ .../WorkflowInstanceRepeatRunningRequest.java | 35 +++ ...WorkflowInstanceRepeatRunningResponse.java | 39 ++++ .../AbstractWorkflowInstanceTrigger.java | 76 +++++++ ...flowInstanceRecoverFailureTaskTrigger.java | 58 +++++ ...flowInstanceRecoverSuspendTaskTrigger.java | 58 +++++ .../WorkflowInstanceRepeatTrigger.java | 58 +++++ .../master/rpc/WorkflowControlClient.java | 58 ++++- .../server/master/it/WorkflowOperator.java | 19 ++ .../WorkflowInstanceRecoverFailureTaskIT.java | 117 ++++++++++ .../WorkflowInstanceRepeatRunningIT.java | 202 ++++++++++++++++++ ...re_workflow_with_two_serial_fake_task.yaml | 130 +++++++++++ ...ed_workflow_with_one_fake_task_failed.yaml | 111 ++++++++++ ...s_workflow_with_one_fake_task_success.yaml | 111 ++++++++++ .../success_workflow_with_task_only.yaml | 131 ++++++++++++ 22 files changed, 1441 insertions(+), 48 deletions(-) create mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverFailureTasksRequest.java create mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverFailureTasksResponse.java create mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverSuspendTasksRequest.java create mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverSuspendTasksResponse.java create mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRepeatRunningRequest.java create mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRepeatRunningResponse.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowInstanceTrigger.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceRecoverFailureTaskTrigger.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceRecoverSuspendTaskTrigger.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceRepeatTrigger.java create mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/it/cases/WorkflowInstanceRecoverFailureTaskIT.java create mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/it/cases/WorkflowInstanceRepeatRunningIT.java create mode 100644 dolphinscheduler-master/src/test/resources/it/recover_failure_tasks/failure_workflow_with_two_serial_fake_task.yaml create mode 100644 dolphinscheduler-master/src/test/resources/it/repeat_running/failed_workflow_with_one_fake_task_failed.yaml create mode 100644 dolphinscheduler-master/src/test/resources/it/repeat_running/success_workflow_with_one_fake_task_success.yaml create mode 100644 dolphinscheduler-master/src/test/resources/it/repeat_running/success_workflow_with_task_only.yaml diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RecoverFailureTaskInstanceExecutorDelegate.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RecoverFailureTaskInstanceExecutorDelegate.java index 12678af1a6..744881e7be 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RecoverFailureTaskInstanceExecutorDelegate.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RecoverFailureTaskInstanceExecutorDelegate.java @@ -18,13 +18,15 @@ package org.apache.dolphinscheduler.api.executor.workflow; import org.apache.dolphinscheduler.api.exceptions.ServiceException; -import org.apache.dolphinscheduler.common.enums.CommandType; -import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; -import org.apache.dolphinscheduler.dao.repository.CommandDao; - -import java.util.Date; +import org.apache.dolphinscheduler.extract.base.client.Clients; +import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverFailureTasksRequest; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverFailureTasksResponse; +import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import lombok.Getter; @@ -37,7 +39,7 @@ public class RecoverFailureTaskInstanceExecutorDelegate IExecutorDelegate { @Autowired - private CommandDao commandDao; + private RegistryClient registryClient; @Override public Void execute(RecoverFailureTaskInstanceOperation recoverFailureTaskInstanceOperation) { @@ -48,16 +50,23 @@ public class RecoverFailureTaskInstanceExecutorDelegate workflowInstance.getName(), workflowInstance.getState())); } - Command command = Command.builder() - .commandType(CommandType.START_FAILURE_TASK_PROCESS) - .processDefinitionCode(workflowInstance.getProcessDefinitionCode()) - .processDefinitionVersion(workflowInstance.getProcessDefinitionVersion()) - .processInstanceId(workflowInstance.getId()) - .executorId(recoverFailureTaskInstanceOperation.getExecuteUser().getId()) - .startTime(new Date()) - .updateTime(new Date()) - .build(); - commandDao.insert(command); + final Server masterServer = registryClient.getRandomServer(RegistryNodeType.MASTER).orElse(null); + if (masterServer == null) { + throw new ServiceException("no master server available"); + } + final WorkflowInstanceRecoverFailureTasksRequest recoverFailureTaskRequest = + WorkflowInstanceRecoverFailureTasksRequest.builder() + .workflowInstanceId(workflowInstance.getId()) + .userId(recoverFailureTaskInstanceOperation.executeUser.getId()) + .build(); + + final WorkflowInstanceRecoverFailureTasksResponse recoverFailureTaskResponse = Clients + .withService(IWorkflowControlClient.class) + .withHost(masterServer.getHost() + ":" + masterServer.getPort()) + .triggerFromFailureTasks(recoverFailureTaskRequest); + if (!recoverFailureTaskResponse.isSuccess()) { + throw new ServiceException("Recover workflow instance failed: " + recoverFailureTaskResponse.getMessage()); + } return null; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RecoverSuspendedWorkflowInstanceExecutorDelegate.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RecoverSuspendedWorkflowInstanceExecutorDelegate.java index 4a87f34c77..0e107fb9a7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RecoverSuspendedWorkflowInstanceExecutorDelegate.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RecoverSuspendedWorkflowInstanceExecutorDelegate.java @@ -18,13 +18,15 @@ package org.apache.dolphinscheduler.api.executor.workflow; import org.apache.dolphinscheduler.api.exceptions.ServiceException; -import org.apache.dolphinscheduler.common.enums.CommandType; -import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; -import org.apache.dolphinscheduler.dao.repository.CommandDao; - -import java.util.Date; +import org.apache.dolphinscheduler.extract.base.client.Clients; +import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksRequest; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksResponse; +import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -35,7 +37,7 @@ public class RecoverSuspendedWorkflowInstanceExecutorDelegate IExecutorDelegate { @Autowired - private CommandDao commandDao; + private RegistryClient registryClient; @Override public Void execute(RecoverSuspendedWorkflowInstanceOperation workflowInstanceControlRequest) { @@ -45,16 +47,23 @@ public class RecoverSuspendedWorkflowInstanceExecutorDelegate String.format("The workflow instance: %s state is %s, cannot recovery", workflowInstance.getName(), workflowInstance.getState())); } - final Command command = Command.builder() - .commandType(CommandType.RECOVER_SUSPENDED_PROCESS) - .processDefinitionCode(workflowInstance.getProcessDefinitionCode()) - .processDefinitionVersion(workflowInstance.getProcessDefinitionVersion()) - .processInstanceId(workflowInstance.getId()) - .executorId(workflowInstanceControlRequest.executeUser.getId()) - .startTime(new Date()) - .updateTime(new Date()) - .build(); - commandDao.insert(command); + final Server masterServer = registryClient.getRandomServer(RegistryNodeType.MASTER).orElse(null); + if (masterServer == null) { + throw new ServiceException("no master server available"); + } + final WorkflowInstanceRecoverSuspendTasksRequest recoverSuspendTaskRequest = + WorkflowInstanceRecoverSuspendTasksRequest.builder() + .workflowInstanceId(workflowInstance.getId()) + .userId(workflowInstanceControlRequest.executeUser.getId()) + .build(); + + final WorkflowInstanceRecoverSuspendTasksResponse recoverSuspendTaskResponse = Clients + .withService(IWorkflowControlClient.class) + .withHost(masterServer.getHost() + ":" + masterServer.getPort()) + .triggerFromSuspendTasks(recoverSuspendTaskRequest); + if (!recoverSuspendTaskResponse.isSuccess()) { + throw new ServiceException("Recover workflow instance failed: " + recoverSuspendTaskResponse.getMessage()); + } return null; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RepeatRunningWorkflowInstanceExecutorDelegate.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RepeatRunningWorkflowInstanceExecutorDelegate.java index a0f86bf805..8fde42e90d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RepeatRunningWorkflowInstanceExecutorDelegate.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RepeatRunningWorkflowInstanceExecutorDelegate.java @@ -18,13 +18,15 @@ package org.apache.dolphinscheduler.api.executor.workflow; import org.apache.dolphinscheduler.api.exceptions.ServiceException; -import org.apache.dolphinscheduler.common.enums.CommandType; -import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; -import org.apache.dolphinscheduler.dao.repository.CommandDao; - -import java.util.Date; +import org.apache.dolphinscheduler.extract.base.client.Clients; +import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRepeatRunningRequest; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRepeatRunningResponse; +import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -35,7 +37,7 @@ public class RepeatRunningWorkflowInstanceExecutorDelegate IExecutorDelegate { @Autowired - private CommandDao commandDao; + private RegistryClient registryClient; @Override public Void execute(RepeatRunningWorkflowInstanceOperation workflowInstanceControlRequest) { @@ -45,16 +47,25 @@ public class RepeatRunningWorkflowInstanceExecutorDelegate String.format("The workflow instance: %s status is %s, cannot repeat running", workflowInstance.getName(), workflowInstance.getState())); } - Command command = Command.builder() - .commandType(CommandType.REPEAT_RUNNING) - .processInstanceId(workflowInstance.getId()) - .processDefinitionCode(workflowInstance.getProcessDefinitionCode()) - .processDefinitionVersion(workflowInstance.getProcessDefinitionVersion()) - .executorId(workflowInstanceControlRequest.executeUser.getId()) - .startTime(new Date()) - .updateTime(new Date()) + + final Server masterServer = registryClient.getRandomServer(RegistryNodeType.MASTER).orElse(null); + if (masterServer == null) { + throw new ServiceException("no master server available"); + } + final WorkflowInstanceRepeatRunningRequest repeatRunningRequest = WorkflowInstanceRepeatRunningRequest.builder() + .workflowInstanceId(workflowInstance.getId()) + .userId(workflowInstanceControlRequest.executeUser.getId()) .build(); - commandDao.insert(command); + + final WorkflowInstanceRepeatRunningResponse repeatRunningResponse = Clients + .withService(IWorkflowControlClient.class) + .withHost(masterServer.getHost() + ":" + masterServer.getPort()) + .repeatTriggerWorkflowInstance(repeatRunningRequest); + if (!repeatRunningResponse.isSuccess()) { + throw new ServiceException( + "Repeat running workflow instance failed: " + repeatRunningResponse.getMessage()); + } + return null; } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IWorkflowControlClient.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IWorkflowControlClient.java index be1fc1f85e..ed4dc2cab7 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IWorkflowControlClient.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IWorkflowControlClient.java @@ -23,6 +23,12 @@ import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowB import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerResponse; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstancePauseRequest; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstancePauseResponse; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverFailureTasksRequest; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverFailureTasksResponse; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksRequest; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksResponse; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRepeatRunningRequest; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRepeatRunningResponse; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest; @@ -45,6 +51,15 @@ public interface IWorkflowControlClient { @RpcMethod WorkflowScheduleTriggerResponse scheduleTriggerWorkflow(final WorkflowScheduleTriggerRequest workflowScheduleTriggerRequest); + @RpcMethod + WorkflowInstanceRepeatRunningResponse repeatTriggerWorkflowInstance(final WorkflowInstanceRepeatRunningRequest workflowInstanceRepeatRunningRequest); + + @RpcMethod + WorkflowInstanceRecoverFailureTasksResponse triggerFromFailureTasks(final WorkflowInstanceRecoverFailureTasksRequest workflowInstanceRecoverFailureTasksRequest); + + @RpcMethod + WorkflowInstanceRecoverSuspendTasksResponse triggerFromSuspendTasks(final WorkflowInstanceRecoverSuspendTasksRequest workflowInstanceRecoverSuspendTasksRequest); + @RpcMethod WorkflowInstancePauseResponse pauseWorkflowInstance(final WorkflowInstancePauseRequest workflowInstancePauseRequest); diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverFailureTasksRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverFailureTasksRequest.java new file mode 100644 index 0000000000..0ea5c6cf92 --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverFailureTasksRequest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.master.transportor.workflow; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowInstanceRecoverFailureTasksRequest { + + private Integer workflowInstanceId; + + private Integer userId; + +} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverFailureTasksResponse.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverFailureTasksResponse.java new file mode 100644 index 0000000000..b4fb1c99f4 --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverFailureTasksResponse.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.master.transportor.workflow; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class WorkflowInstanceRecoverFailureTasksResponse { + + private boolean success; + private String message; + + public static WorkflowInstanceRecoverFailureTasksResponse success() { + return new WorkflowInstanceRecoverFailureTasksResponse(true, null); + } + + public static WorkflowInstanceRecoverFailureTasksResponse fail(String message) { + return new WorkflowInstanceRecoverFailureTasksResponse(false, message); + } +} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverSuspendTasksRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverSuspendTasksRequest.java new file mode 100644 index 0000000000..b589dcd51e --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverSuspendTasksRequest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.master.transportor.workflow; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowInstanceRecoverSuspendTasksRequest { + + private Integer workflowInstanceId; + + private Integer userId; + +} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverSuspendTasksResponse.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverSuspendTasksResponse.java new file mode 100644 index 0000000000..925be6fb52 --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverSuspendTasksResponse.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.master.transportor.workflow; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class WorkflowInstanceRecoverSuspendTasksResponse { + + private boolean success; + private String message; + + public static WorkflowInstanceRecoverSuspendTasksResponse success() { + return new WorkflowInstanceRecoverSuspendTasksResponse(true, null); + } + + public static WorkflowInstanceRecoverSuspendTasksResponse fail(String message) { + return new WorkflowInstanceRecoverSuspendTasksResponse(false, message); + } +} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRepeatRunningRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRepeatRunningRequest.java new file mode 100644 index 0000000000..7d55c61bc0 --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRepeatRunningRequest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.master.transportor.workflow; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowInstanceRepeatRunningRequest { + + private Integer workflowInstanceId; + + private Integer userId; + +} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRepeatRunningResponse.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRepeatRunningResponse.java new file mode 100644 index 0000000000..813ed16943 --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRepeatRunningResponse.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.master.transportor.workflow; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class WorkflowInstanceRepeatRunningResponse { + + private boolean success; + private String message; + + public static WorkflowInstanceRepeatRunningResponse success() { + return new WorkflowInstanceRepeatRunningResponse(true, null); + } + + public static WorkflowInstanceRepeatRunningResponse fail(String message) { + return new WorkflowInstanceRepeatRunningResponse(false, message); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowInstanceTrigger.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowInstanceTrigger.java new file mode 100644 index 0000000000..866d758419 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowInstanceTrigger.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.trigger; + +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.dao.repository.CommandDao; +import org.apache.dolphinscheduler.dao.repository.UserDao; +import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionLogDao; +import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Transactional; + +@Slf4j +public abstract class AbstractWorkflowInstanceTrigger + implements + IWorkflowTrigger { + + @Autowired + private WorkflowDefinitionLogDao workflowDefinitionDao; + + @Autowired + private WorkflowInstanceDao workflowInstanceDao; + + @Autowired + private UserDao userDao; + + @Autowired + private CommandDao commandDao; + + @Override + @Transactional + public TriggerResponse triggerWorkflow(final TriggerRequest triggerRequest) { + final WorkflowInstance workflowInstance = constructWorkflowInstance(triggerRequest); + workflowInstanceDao.updateById(workflowInstance); + + final Command command = constructTriggerCommand(triggerRequest, workflowInstance); + commandDao.insert(command); + + return onTriggerSuccess(workflowInstance); + } + + protected abstract WorkflowInstance constructWorkflowInstance(final TriggerRequest triggerRequest); + + protected abstract Command constructTriggerCommand(final TriggerRequest triggerRequest, + final WorkflowInstance workflowInstance); + + protected abstract TriggerResponse onTriggerSuccess(final WorkflowInstance workflowInstance); + + protected WorkflowInstance getWorkflowInstance(final Integer workflowInstanceId) { + final WorkflowInstance workflowInstance = workflowInstanceDao.queryById(workflowInstanceId); + if (workflowInstance == null) { + throw new IllegalStateException("Workflow instance not found: " + workflowInstanceId); + } + return workflowInstance; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceRecoverFailureTaskTrigger.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceRecoverFailureTaskTrigger.java new file mode 100644 index 0000000000..cc9ba05b37 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceRecoverFailureTaskTrigger.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.trigger; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverFailureTasksRequest; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverFailureTasksResponse; + +import java.util.Date; + +import org.springframework.stereotype.Component; + +@Component +public class WorkflowInstanceRecoverFailureTaskTrigger + extends + AbstractWorkflowInstanceTrigger { + + @Override + protected WorkflowInstance constructWorkflowInstance(final WorkflowInstanceRecoverFailureTasksRequest workflowInstanceRecoverFailureTasksRequest) { + return getWorkflowInstance(workflowInstanceRecoverFailureTasksRequest.getWorkflowInstanceId()); + } + + @Override + protected Command constructTriggerCommand(final WorkflowInstanceRecoverFailureTasksRequest workflowInstanceRecoverFailureTasksRequest, + final WorkflowInstance workflowInstance) { + return Command.builder() + .commandType(CommandType.START_FAILURE_TASK_PROCESS) + .processDefinitionCode(workflowInstance.getProcessDefinitionCode()) + .processDefinitionVersion(workflowInstance.getProcessDefinitionVersion()) + .processInstanceId(workflowInstance.getId()) + .executorId(workflowInstanceRecoverFailureTasksRequest.getUserId()) + .startTime(new Date()) + .updateTime(new Date()) + .build(); + } + + @Override + protected WorkflowInstanceRecoverFailureTasksResponse onTriggerSuccess(WorkflowInstance workflowInstance) { + return WorkflowInstanceRecoverFailureTasksResponse.success(); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceRecoverSuspendTaskTrigger.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceRecoverSuspendTaskTrigger.java new file mode 100644 index 0000000000..56ee9b4a5c --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceRecoverSuspendTaskTrigger.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.trigger; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksRequest; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksResponse; + +import java.util.Date; + +import org.springframework.stereotype.Component; + +@Component +public class WorkflowInstanceRecoverSuspendTaskTrigger + extends + AbstractWorkflowInstanceTrigger { + + @Override + protected WorkflowInstance constructWorkflowInstance(final WorkflowInstanceRecoverSuspendTasksRequest workflowInstanceRecoverSuspendTasksRequest) { + return getWorkflowInstance(workflowInstanceRecoverSuspendTasksRequest.getWorkflowInstanceId()); + } + + @Override + protected Command constructTriggerCommand(final WorkflowInstanceRecoverSuspendTasksRequest workflowInstanceRecoverSuspendTasksRequest, + final WorkflowInstance workflowInstance) { + return Command.builder() + .commandType(CommandType.RECOVER_SUSPENDED_PROCESS) + .processDefinitionCode(workflowInstance.getProcessDefinitionCode()) + .processDefinitionVersion(workflowInstance.getProcessDefinitionVersion()) + .processInstanceId(workflowInstance.getId()) + .executorId(workflowInstanceRecoverSuspendTasksRequest.getUserId()) + .startTime(new Date()) + .updateTime(new Date()) + .build(); + } + + @Override + protected WorkflowInstanceRecoverSuspendTasksResponse onTriggerSuccess(WorkflowInstance workflowInstance) { + return WorkflowInstanceRecoverSuspendTasksResponse.success(); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceRepeatTrigger.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceRepeatTrigger.java new file mode 100644 index 0000000000..358b6c30a8 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceRepeatTrigger.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.trigger; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRepeatRunningRequest; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRepeatRunningResponse; + +import java.util.Date; + +import org.springframework.stereotype.Component; + +@Component +public class WorkflowInstanceRepeatTrigger + extends + AbstractWorkflowInstanceTrigger { + + @Override + protected WorkflowInstance constructWorkflowInstance(final WorkflowInstanceRepeatRunningRequest repeatRunningRequest) { + return getWorkflowInstance(repeatRunningRequest.getWorkflowInstanceId()); + } + + @Override + protected Command constructTriggerCommand(final WorkflowInstanceRepeatRunningRequest repeatRunningRequest, + final WorkflowInstance workflowInstance) { + return Command.builder() + .commandType(CommandType.REPEAT_RUNNING) + .processInstanceId(workflowInstance.getId()) + .processDefinitionCode(workflowInstance.getProcessDefinitionCode()) + .processDefinitionVersion(workflowInstance.getProcessDefinitionVersion()) + .executorId(repeatRunningRequest.getUserId()) + .startTime(new Date()) + .updateTime(new Date()) + .build(); + } + + @Override + protected WorkflowInstanceRepeatRunningResponse onTriggerSuccess(final WorkflowInstance workflowInstance) { + return WorkflowInstanceRepeatRunningResponse.success(); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/WorkflowControlClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/WorkflowControlClient.java index eca632ac45..9506548788 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/WorkflowControlClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/WorkflowControlClient.java @@ -22,6 +22,12 @@ import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowB import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerResponse; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstancePauseRequest; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstancePauseResponse; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverFailureTasksRequest; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverFailureTasksResponse; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksRequest; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksResponse; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRepeatRunningRequest; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRepeatRunningResponse; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest; @@ -31,6 +37,9 @@ import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowS import org.apache.dolphinscheduler.server.master.engine.WorkflowCacheRepository; import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowBackfillTrigger; +import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowInstanceRecoverFailureTaskTrigger; +import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowInstanceRecoverSuspendTaskTrigger; +import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowInstanceRepeatTrigger; import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowManualTrigger; import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowScheduleTrigger; @@ -54,6 +63,15 @@ public class WorkflowControlClient implements IWorkflowControlClient { @Autowired private WorkflowScheduleTrigger workflowScheduleTrigger; + @Autowired + private WorkflowInstanceRepeatTrigger workflowInstanceRepeatTrigger; + + @Autowired + private WorkflowInstanceRecoverFailureTaskTrigger workflowInstanceRecoverFailureTaskTrigger; + + @Autowired + private WorkflowInstanceRecoverSuspendTaskTrigger workflowInstanceRecoverSuspendTaskTrigger; + @Autowired private WorkflowCacheRepository workflowRepository; @@ -78,7 +96,7 @@ public class WorkflowControlClient implements IWorkflowControlClient { } @Override - public WorkflowScheduleTriggerResponse scheduleTriggerWorkflow(WorkflowScheduleTriggerRequest workflowScheduleTriggerRequest) { + public WorkflowScheduleTriggerResponse scheduleTriggerWorkflow(final WorkflowScheduleTriggerRequest workflowScheduleTriggerRequest) { try { return workflowScheduleTrigger.triggerWorkflow(workflowScheduleTriggerRequest); } catch (Exception ex) { @@ -88,6 +106,44 @@ public class WorkflowControlClient implements IWorkflowControlClient { } } + @Override + public WorkflowInstanceRepeatRunningResponse repeatTriggerWorkflowInstance(final WorkflowInstanceRepeatRunningRequest workflowInstanceRepeatRunningRequest) { + try { + return workflowInstanceRepeatTrigger.triggerWorkflow(workflowInstanceRepeatRunningRequest); + } catch (Exception ex) { + log.error("Handle workflowInstanceRepeatRunningRequest: {} failed", workflowInstanceRepeatRunningRequest, + ex); + return WorkflowInstanceRepeatRunningResponse + .fail("Repeat trigger workflow instance failed: " + ExceptionUtils.getMessage(ex)); + } + } + + @Override + public WorkflowInstanceRecoverFailureTasksResponse triggerFromFailureTasks(final WorkflowInstanceRecoverFailureTasksRequest workflowInstanceRecoverFailureTasksRequest) { + try { + return workflowInstanceRecoverFailureTaskTrigger + .triggerWorkflow(workflowInstanceRecoverFailureTasksRequest); + } catch (Exception ex) { + log.error("Handle workflowInstanceRecoverFailureTaskRequest: {} failed", + workflowInstanceRecoverFailureTasksRequest, ex); + return WorkflowInstanceRecoverFailureTasksResponse + .fail("Recover failure task failed: " + ExceptionUtils.getMessage(ex)); + } + } + + @Override + public WorkflowInstanceRecoverSuspendTasksResponse triggerFromSuspendTasks(WorkflowInstanceRecoverSuspendTasksRequest workflowInstanceRecoverSuspendTasksRequest) { + try { + return workflowInstanceRecoverSuspendTaskTrigger + .triggerWorkflow(workflowInstanceRecoverSuspendTasksRequest); + } catch (Exception ex) { + log.error("Handle workflowInstanceRecoverSuspendTaskRequest: {} failed", + workflowInstanceRecoverSuspendTasksRequest, ex); + return WorkflowInstanceRecoverSuspendTasksResponse + .fail("Recover suspend task failed: " + ExceptionUtils.getMessage(ex)); + } + } + @Override public WorkflowInstancePauseResponse pauseWorkflowInstance(final WorkflowInstancePauseRequest workflowInstancePauseRequest) { try { diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/it/WorkflowOperator.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/it/WorkflowOperator.java index a6e4f16784..b3ba955e00 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/it/WorkflowOperator.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/it/WorkflowOperator.java @@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowB import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerResponse; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstancePauseRequest; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstancePauseResponse; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverFailureTasksRequest; +import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRepeatRunningRequest; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest; @@ -89,6 +91,23 @@ public class WorkflowOperator { schedulerApi.insertOrUpdateScheduleTask(project.getId(), schedule); } + public void repeatRunningWorkflowInstance(final Integer workflowInstanceId) { + final WorkflowInstanceRepeatRunningRequest repeatRunningRequest = WorkflowInstanceRepeatRunningRequest.builder() + .workflowInstanceId(workflowInstanceId) + .userId(1) + .build(); + workflowInstanceController.repeatTriggerWorkflowInstance(repeatRunningRequest); + } + + public void recoverFailureTasks(final Integer workflowInstanceId) { + final WorkflowInstanceRecoverFailureTasksRequest recoverFailureTasksRequest = + WorkflowInstanceRecoverFailureTasksRequest.builder() + .workflowInstanceId(workflowInstanceId) + .userId(1) + .build(); + workflowInstanceController.triggerFromFailureTasks(recoverFailureTasksRequest); + } + public WorkflowInstancePauseResponse pauseWorkflowInstance(Integer workflowInstanceId) { final WorkflowInstancePauseRequest workflowInstancePauseRequest = new WorkflowInstancePauseRequest(workflowInstanceId); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/it/cases/WorkflowInstanceRecoverFailureTaskIT.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/it/cases/WorkflowInstanceRecoverFailureTaskIT.java new file mode 100644 index 0000000000..9626bc4278 --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/it/cases/WorkflowInstanceRecoverFailureTaskIT.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.it.cases; + +import static com.google.common.truth.Truth.assertThat; +import static org.awaitility.Awaitility.await; + +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTest; +import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository; +import org.apache.dolphinscheduler.server.master.it.Repository; +import org.apache.dolphinscheduler.server.master.it.WorkflowITContext; +import org.apache.dolphinscheduler.server.master.it.WorkflowITContextFactory; +import org.apache.dolphinscheduler.server.master.it.WorkflowOperator; + +import org.apache.commons.lang3.StringUtils; + +import java.time.Duration; +import java.util.List; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * The integration test for recover from failure tasks. + */ +public class WorkflowInstanceRecoverFailureTaskIT extends AbstractMasterIntegrationTest { + + @Autowired + private WorkflowITContextFactory workflowITContextFactory; + + @Autowired + private WorkflowOperator workflowOperator; + + @Autowired + private IWorkflowRepository workflowRepository; + + @Autowired + private Repository repository; + + @Test + @DisplayName("Test recover from failure tasks") + public void testRepeatRunningWorkflow_with_taskOnly() { + final String yaml = "/it/recover_failure_tasks/failure_workflow_with_two_serial_fake_task.yaml"; + final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); + + final Integer workflowInstanceId = context.getWorkflowInstance().getId(); + workflowOperator.recoverFailureTasks(workflowInstanceId); + + await() + .pollInterval(Duration.ofMillis(100)) + .atMost(Duration.ofMinutes(1)) + .untilAsserted(() -> { + Assertions + .assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState()) + .isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION); + }); + + await() + .atMost(Duration.ofMinutes(1)) + .untilAsserted(() -> { + final WorkflowInstance workflowInstance = repository.queryWorkflowInstance(workflowInstanceId); + Assertions + .assertThat(workflowInstance.getState()) + .isEqualTo(WorkflowExecutionStatus.SUCCESS); + Assertions + .assertThat(workflowInstance.getRunTimes()) + .isEqualTo(2); + final List taskInstances = repository.queryTaskInstance(workflowInstanceId); + Assertions + .assertThat(taskInstances) + .hasSize(3); + + Assertions + .assertThat(taskInstances.get(0)) + .matches(t -> "A".equals(t.getName())) + .matches(t -> t.getState() == TaskExecutionStatus.FAILURE) + .matches(t -> t.getFlag() == Flag.NO); + + Assertions + .assertThat(taskInstances.get(1)) + .matches(t -> "A".equals(t.getName())) + .matches(t -> t.getState() == TaskExecutionStatus.SUCCESS) + .matches(t -> t.getFlag() == Flag.YES) + .matches(t -> StringUtils.isNotEmpty(t.getLogPath())); + + Assertions + .assertThat(taskInstances.get(2)) + .matches(t -> t.getState() == TaskExecutionStatus.SUCCESS) + .matches(t -> t.getFlag() == Flag.YES) + .matches(t -> StringUtils.isNotEmpty(t.getLogPath())); + }); + assertThat(workflowRepository.getAll()).isEmpty(); + } + +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/it/cases/WorkflowInstanceRepeatRunningIT.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/it/cases/WorkflowInstanceRepeatRunningIT.java new file mode 100644 index 0000000000..451edd4b13 --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/it/cases/WorkflowInstanceRepeatRunningIT.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.it.cases; + +import static com.google.common.truth.Truth.assertThat; +import static org.awaitility.Awaitility.await; + +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTest; +import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository; +import org.apache.dolphinscheduler.server.master.it.Repository; +import org.apache.dolphinscheduler.server.master.it.WorkflowITContext; +import org.apache.dolphinscheduler.server.master.it.WorkflowITContextFactory; +import org.apache.dolphinscheduler.server.master.it.WorkflowOperator; + +import org.apache.commons.lang3.StringUtils; + +import java.time.Duration; +import java.util.List; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * The integration test for repeat running a workflow instance. + */ +public class WorkflowInstanceRepeatRunningIT extends AbstractMasterIntegrationTest { + + @Autowired + private WorkflowITContextFactory workflowITContextFactory; + + @Autowired + private WorkflowOperator workflowOperator; + + @Autowired + private IWorkflowRepository workflowRepository; + + @Autowired + private Repository repository; + + @Test + @DisplayName("Test repeat running a workflow instance with one success task") + public void testRepeatRunningWorkflow_with_oneSuccessTask() { + final String yaml = "/it/repeat_running/success_workflow_with_one_fake_task_success.yaml"; + final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); + + final Integer workflowInstanceId = context.getWorkflowInstance().getId(); + workflowOperator.repeatRunningWorkflowInstance(workflowInstanceId); + + await() + .pollInterval(Duration.ofMillis(100)) + .atMost(Duration.ofMinutes(1)) + .untilAsserted(() -> { + Assertions + .assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState()) + .isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION); + }); + + await() + .atMost(Duration.ofMinutes(1)) + .untilAsserted(() -> { + final WorkflowInstance workflowInstance = repository.queryWorkflowInstance(workflowInstanceId); + Assertions + .assertThat(workflowInstance) + .matches(w -> w.getState() == WorkflowExecutionStatus.SUCCESS) + .matches(w -> w.getRunTimes() == 2); + + final List taskInstances = repository.queryTaskInstance(workflowInstanceId); + Assertions + .assertThat(taskInstances) + .hasSize(2); + + Assertions + .assertThat(taskInstances.get(0)) + .matches(t -> t.getState() == TaskExecutionStatus.SUCCESS) + .matches(t -> t.getFlag() == Flag.NO); + + Assertions + .assertThat(taskInstances.get(1)) + .matches(t -> t.getState() == TaskExecutionStatus.SUCCESS) + .matches(t -> t.getFlag() == Flag.YES) + .matches(t -> StringUtils.isNotEmpty(t.getLogPath())); + }); + assertThat(workflowRepository.getAll()).isEmpty(); + } + + @Test + @DisplayName("Test repeat running a workflow instance with one failed task") + public void testRepeatRunningWorkflow_with_oneFailedTask() { + final String yaml = "/it/repeat_running/failed_workflow_with_one_fake_task_failed.yaml"; + final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); + + final Integer workflowInstanceId = context.getWorkflowInstance().getId(); + workflowOperator.repeatRunningWorkflowInstance(workflowInstanceId); + + await() + .pollInterval(Duration.ofMillis(100)) + .atMost(Duration.ofMinutes(1)) + .untilAsserted(() -> { + Assertions + .assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState()) + .isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION); + }); + + await() + .atMost(Duration.ofMinutes(1)) + .untilAsserted(() -> { + final WorkflowInstance workflowInstance = repository.queryWorkflowInstance(workflowInstanceId); + Assertions + .assertThat(workflowInstance.getState()) + .isEqualTo(WorkflowExecutionStatus.SUCCESS); + Assertions + .assertThat(workflowInstance.getRunTimes()) + .isEqualTo(2); + final List taskInstances = repository.queryTaskInstance(workflowInstanceId); + Assertions + .assertThat(taskInstances) + .hasSize(2); + + Assertions + .assertThat(taskInstances.get(0)) + .matches(t -> t.getState() == TaskExecutionStatus.FAILURE) + .matches(t -> t.getFlag() == Flag.NO); + + Assertions + .assertThat(taskInstances.get(1)) + .matches(t -> t.getState() == TaskExecutionStatus.SUCCESS) + .matches(t -> t.getFlag() == Flag.YES) + .matches(t -> StringUtils.isNotEmpty(t.getLogPath())); + }); + assertThat(workflowRepository.getAll()).isEmpty(); + } + + @Test + @DisplayName("Test repeat running a workflow instance with task only") + public void testRepeatRunningWorkflow_with_taskOnly() { + final String yaml = "/it/repeat_running/success_workflow_with_task_only.yaml"; + final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml); + + final Integer workflowInstanceId = context.getWorkflowInstance().getId(); + workflowOperator.repeatRunningWorkflowInstance(workflowInstanceId); + + await() + .pollInterval(Duration.ofMillis(100)) + .atMost(Duration.ofMinutes(1)) + .untilAsserted(() -> { + Assertions + .assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState()) + .isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION); + }); + + await() + .atMost(Duration.ofMinutes(1)) + .untilAsserted(() -> { + final WorkflowInstance workflowInstance = repository.queryWorkflowInstance(workflowInstanceId); + Assertions + .assertThat(workflowInstance.getState()) + .isEqualTo(WorkflowExecutionStatus.SUCCESS); + Assertions + .assertThat(workflowInstance.getRunTimes()) + .isEqualTo(2); + final List taskInstances = repository.queryTaskInstance(workflowInstanceId); + Assertions + .assertThat(taskInstances) + .hasSize(2); + + Assertions + .assertThat(taskInstances.get(0)) + .matches(t -> t.getState() == TaskExecutionStatus.SUCCESS) + .matches(t -> t.getFlag() == Flag.NO); + + Assertions + .assertThat(taskInstances.get(1)) + .matches(t -> t.getState() == TaskExecutionStatus.SUCCESS) + .matches(t -> t.getFlag() == Flag.YES) + .matches(t -> StringUtils.isNotEmpty(t.getLogPath())); + }); + assertThat(workflowRepository.getAll()).isEmpty(); + } + +} diff --git a/dolphinscheduler-master/src/test/resources/it/recover_failure_tasks/failure_workflow_with_two_serial_fake_task.yaml b/dolphinscheduler-master/src/test/resources/it/recover_failure_tasks/failure_workflow_with_two_serial_fake_task.yaml new file mode 100644 index 0000000000..8cb566fc4f --- /dev/null +++ b/dolphinscheduler-master/src/test/resources/it/recover_failure_tasks/failure_workflow_with_two_serial_fake_task.yaml @@ -0,0 +1,130 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +project: + name: MasterIntegrationTest + code: 1 + description: This is a fake project + userId: 1 + userName: admin + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + +workflowInstance: + id: 1 + name: workflow_with_two_serial_fake_task_success-20240816071251690 + processDefinitionCode: 1 + processDefinitionVersion: 1 + projectCode: 1 + state: FAILURE + recovery: NO + startTime: 2024-08-16 07:12:52 + endTime: 2024-08-16 07:12:57 + runTimes: 1 + host: 127.0.0.1:5678 + commandType: START_PROCESS + commandParam: '{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}' + taskDependType: TASK_POST + commandStartTime: 2024-08-16 07:12:52 + isSubProcess: NO + executorId: 1 + historyCmd: START_PROCESS + workerGroup: default + globalParams: '[]' + varPool: '[]' + dryRun: 0 + +taskInstances: + - id: 1 + name: A + taskType: LogicFakeTask + processInstanceId: 1 + processInstanceName: workflow_with_two_parallel_fake_task_success-20240816071251690 + projectCode: 1 + taskCode: 1 + taskDefinitionVersion: 1 + state: FAILURE + firstSubmitTime: 2024-08-16 07:12:52 + submitTime: 2024-08-16 07:12:57 + startTime: 2024-08-16 07:12:57 + endTime: 2024-08-16 07:12:57 + retryTimes: 0 + host: 127.0.0.1:1234 + maxRetryTimes: 0 + taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}' + flag: YES + retryInterval: 0 + delayTime: 0 + workerGroup: default + executorId: 1 + varPool: '[]' + taskExecuteType: BATCH + +workflow: + name: workflow_with_two_serial_fake_task_success + code: 1 + version: 1 + projectCode: 1 + description: This is a fake workflow with two serial tasks + releaseState: ONLINE + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + userId: 1 + executionType: PARALLEL + +tasks: + - name: A + code: 1 + version: 1 + projectCode: 1 + userId: 1 + taskType: LogicFakeTask + taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo hello"}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + taskExecuteType: BATCH + - name: B + code: 2 + version: 1 + projectCode: 1 + userId: 1 + taskType: LogicFakeTask + taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo hello"}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + taskExecuteType: BATCH + +taskRelations: + - projectCode: 1 + processDefinitionCode: 1 + processDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 1 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + - projectCode: 1 + processDefinitionCode: 1 + processDefinitionVersion: 1 + preTaskCode: 1 + preTaskVersion: 1 + postTaskCode: 2 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 diff --git a/dolphinscheduler-master/src/test/resources/it/repeat_running/failed_workflow_with_one_fake_task_failed.yaml b/dolphinscheduler-master/src/test/resources/it/repeat_running/failed_workflow_with_one_fake_task_failed.yaml new file mode 100644 index 0000000000..aa49cc572a --- /dev/null +++ b/dolphinscheduler-master/src/test/resources/it/repeat_running/failed_workflow_with_one_fake_task_failed.yaml @@ -0,0 +1,111 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +project: + name: MasterIntegrationTest + code: 1 + description: This is a fake project + userId: 1 + userName: admin + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + +workflow: + name: workflow_with_one_fake_task_success + code: 1 + version: 1 + projectCode: 1 + description: This is a fake workflow with single task + releaseState: ONLINE + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + userId: 1 + executionType: PARALLEL + +workflowInstance: + id: 1 + name: workflow_with_one_fake_task_success-20240816071251690 + processDefinitionCode: 1 + processDefinitionVersion: 1 + projectCode: 1 + state: FAILURE + recovery: NO + startTime: 2024-08-16 07:12:52 + endTime: 2024-08-16 07:12:57 + runTimes: 1 + host: 127.0.0.1:5678 + commandType: START_PROCESS + commandParam: '{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}' + taskDependType: TASK_POST + commandStartTime: 2024-08-16 07:12:52 + isSubProcess: NO + executorId: 1 + historyCmd: START_PROCESS + workerGroup: default + globalParams: '[]' + varPool: '[]' + dryRun: 0 + +taskInstances: + - id: 1 + name: A + taskType: LogicFakeTask + processInstanceId: 1 + processInstanceName: workflow_with_one_fake_task_success-20240816071251690 + projectCode: 1 + taskCode: 1 + taskDefinitionVersion: 1 + state: FAILURE + firstSubmitTime: 2024-08-16 07:12:52 + submitTime: 2024-08-16 07:12:57 + startTime: 2024-08-16 07:12:57 + endTime: 2024-08-16 07:12:57 + retryTimes: 0 + host: 127.0.0.1:1234 + maxRetryTimes: 0 + taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}' + flag: YES + retryInterval: 0 + delayTime: 0 + workerGroup: default + executorId: 1 + varPool: '[]' + taskExecuteType: BATCH + +tasks: + - name: A + code: 1 + version: 1 + projectCode: 1 + userId: 1 + taskType: LogicFakeTask + taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + taskExecuteType: BATCH + +taskRelations: + - projectCode: 1 + processDefinitionCode: 1 + processDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 1 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 diff --git a/dolphinscheduler-master/src/test/resources/it/repeat_running/success_workflow_with_one_fake_task_success.yaml b/dolphinscheduler-master/src/test/resources/it/repeat_running/success_workflow_with_one_fake_task_success.yaml new file mode 100644 index 0000000000..20541fa708 --- /dev/null +++ b/dolphinscheduler-master/src/test/resources/it/repeat_running/success_workflow_with_one_fake_task_success.yaml @@ -0,0 +1,111 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +project: + name: MasterIntegrationTest + code: 1 + description: This is a fake project + userId: 1 + userName: admin + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + +workflow: + name: workflow_with_one_fake_task_success + code: 1 + version: 1 + projectCode: 1 + description: This is a fake workflow with single task + releaseState: ONLINE + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + userId: 1 + executionType: PARALLEL + +workflowInstance: + id: 1 + name: workflow_with_one_fake_task_success-20240816071251690 + processDefinitionCode: 1 + processDefinitionVersion: 1 + projectCode: 1 + state: SUCCESS + recovery: NO + startTime: 2024-08-16 07:12:52 + endTime: 2024-08-16 07:12:57 + runTimes: 1 + host: 127.0.0.1:5678 + commandType: START_PROCESS + commandParam: '{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}' + taskDependType: TASK_POST + commandStartTime: 2024-08-16 07:12:52 + isSubProcess: NO + executorId: 1 + historyCmd: START_PROCESS + workerGroup: default + globalParams: '[]' + varPool: '[]' + dryRun: 0 + +taskInstances: + - id: 1 + name: A + taskType: LogicFakeTask + processInstanceId: 1 + processInstanceName: workflow_with_one_fake_task_success-20240816071251690 + projectCode: 1 + taskCode: 1 + taskDefinitionVersion: 1 + state: SUCCESS + firstSubmitTime: 2024-08-16 07:12:52 + submitTime: 2024-08-16 07:12:57 + startTime: 2024-08-16 07:12:57 + endTime: 2024-08-16 07:12:57 + retryTimes: 0 + host: 127.0.0.1:1234 + maxRetryTimes: 0 + taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}' + flag: YES + retryInterval: 0 + delayTime: 0 + workerGroup: default + executorId: 1 + varPool: '[]' + taskExecuteType: BATCH + +tasks: + - name: A + code: 1 + version: 1 + projectCode: 1 + userId: 1 + taskType: LogicFakeTask + taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + taskExecuteType: BATCH + +taskRelations: + - projectCode: 1 + processDefinitionCode: 1 + processDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 1 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 diff --git a/dolphinscheduler-master/src/test/resources/it/repeat_running/success_workflow_with_task_only.yaml b/dolphinscheduler-master/src/test/resources/it/repeat_running/success_workflow_with_task_only.yaml new file mode 100644 index 0000000000..5d13b080dc --- /dev/null +++ b/dolphinscheduler-master/src/test/resources/it/repeat_running/success_workflow_with_task_only.yaml @@ -0,0 +1,131 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +project: + name: MasterIntegrationTest + code: 1 + description: This is a fake project + userId: 1 + userName: admin + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + +workflowInstance: + id: 1 + name: workflow_with_two_parallel_fake_task_success-20240816071251690 + processDefinitionCode: 1 + processDefinitionVersion: 1 + projectCode: 1 + state: SUCCESS + recovery: NO + startTime: 2024-08-16 07:12:52 + endTime: 2024-08-16 07:12:57 + runTimes: 1 + host: 127.0.0.1:5678 + commandType: START_PROCESS + commandParam: '{"commandType":"START_PROCESS","startNodes":[1],"commandParams":[],"timeZone":"UTC"}' + taskDependType: TASK_ONLY + commandStartTime: 2024-08-16 07:12:52 + isSubProcess: NO + executorId: 1 + historyCmd: START_PROCESS + workerGroup: default + globalParams: '[]' + varPool: '[]' + dryRun: 0 + +taskInstances: + - id: 1 + name: A + taskType: LogicFakeTask + processInstanceId: 1 + processInstanceName: workflow_with_two_parallel_fake_task_success-20240816071251690 + projectCode: 1 + taskCode: 1 + taskDefinitionVersion: 1 + state: SUCCESS + firstSubmitTime: 2024-08-16 07:12:52 + submitTime: 2024-08-16 07:12:57 + startTime: 2024-08-16 07:12:57 + endTime: 2024-08-16 07:12:57 + retryTimes: 0 + host: 127.0.0.1:1234 + maxRetryTimes: 0 + taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}' + flag: YES + retryInterval: 0 + delayTime: 0 + workerGroup: default + executorId: 1 + varPool: '[]' + taskExecuteType: BATCH + +workflow: + name: workflow_with_two_parallel_fake_task_success + code: 1 + version: 1 + projectCode: 1 + description: This is a fake workflow with two parallel success tasks + releaseState: ONLINE + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + userId: 1 + executionType: PARALLEL + +tasks: + - name: A + code: 1 + version: 1 + projectCode: 1 + userId: 1 + taskType: LogicFakeTask + taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + taskExecuteType: BATCH + - name: B + code: 2 + version: 1 + projectCode: 1 + userId: 1 + taskType: LogicFakeTask + taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2021-08-12 00:00:00 + taskExecuteType: BATCH + +taskRelations: + - projectCode: 1 + processDefinitionCode: 1 + processDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 1 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + - projectCode: 1 + processDefinitionCode: 1 + processDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 2 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00