Browse Source

[Improve] Move recover/repeat running to IWorkflowControlClient (#16545)

dev
Wenjun Ruan 2 months ago committed by GitHub
parent
commit
002e9ca9e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 39
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RecoverFailureTaskInstanceExecutorDelegate.java
  2. 39
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RecoverSuspendedWorkflowInstanceExecutorDelegate.java
  3. 41
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RepeatRunningWorkflowInstanceExecutorDelegate.java
  4. 15
      dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IWorkflowControlClient.java
  5. 35
      dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverFailureTasksRequest.java
  6. 39
      dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverFailureTasksResponse.java
  7. 35
      dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverSuspendTasksRequest.java
  8. 39
      dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverSuspendTasksResponse.java
  9. 35
      dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRepeatRunningRequest.java
  10. 39
      dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRepeatRunningResponse.java
  11. 76
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowInstanceTrigger.java
  12. 58
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceRecoverFailureTaskTrigger.java
  13. 58
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceRecoverSuspendTaskTrigger.java
  14. 58
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceRepeatTrigger.java
  15. 58
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/WorkflowControlClient.java
  16. 19
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/it/WorkflowOperator.java
  17. 117
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/it/cases/WorkflowInstanceRecoverFailureTaskIT.java
  18. 202
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/it/cases/WorkflowInstanceRepeatRunningIT.java
  19. 130
      dolphinscheduler-master/src/test/resources/it/recover_failure_tasks/failure_workflow_with_two_serial_fake_task.yaml
  20. 111
      dolphinscheduler-master/src/test/resources/it/repeat_running/failed_workflow_with_one_fake_task_failed.yaml
  21. 111
      dolphinscheduler-master/src/test/resources/it/repeat_running/success_workflow_with_one_fake_task_success.yaml
  22. 131
      dolphinscheduler-master/src/test/resources/it/repeat_running/success_workflow_with_task_only.yaml

39
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RecoverFailureTaskInstanceExecutorDelegate.java

@ -18,13 +18,15 @@
package org.apache.dolphinscheduler.api.executor.workflow;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.CommandDao;
import java.util.Date;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverFailureTasksRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverFailureTasksResponse;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import lombok.Getter;
@ -37,7 +39,7 @@ public class RecoverFailureTaskInstanceExecutorDelegate
IExecutorDelegate<RecoverFailureTaskInstanceExecutorDelegate.RecoverFailureTaskInstanceOperation, Void> {
@Autowired
private CommandDao commandDao;
private RegistryClient registryClient;
@Override
public Void execute(RecoverFailureTaskInstanceOperation recoverFailureTaskInstanceOperation) {
@ -48,16 +50,23 @@ public class RecoverFailureTaskInstanceExecutorDelegate
workflowInstance.getName(), workflowInstance.getState()));
}
Command command = Command.builder()
.commandType(CommandType.START_FAILURE_TASK_PROCESS)
.processDefinitionCode(workflowInstance.getProcessDefinitionCode())
.processDefinitionVersion(workflowInstance.getProcessDefinitionVersion())
.processInstanceId(workflowInstance.getId())
.executorId(recoverFailureTaskInstanceOperation.getExecuteUser().getId())
.startTime(new Date())
.updateTime(new Date())
final Server masterServer = registryClient.getRandomServer(RegistryNodeType.MASTER).orElse(null);
if (masterServer == null) {
throw new ServiceException("no master server available");
}
final WorkflowInstanceRecoverFailureTasksRequest recoverFailureTaskRequest =
WorkflowInstanceRecoverFailureTasksRequest.builder()
.workflowInstanceId(workflowInstance.getId())
.userId(recoverFailureTaskInstanceOperation.executeUser.getId())
.build();
commandDao.insert(command);
final WorkflowInstanceRecoverFailureTasksResponse recoverFailureTaskResponse = Clients
.withService(IWorkflowControlClient.class)
.withHost(masterServer.getHost() + ":" + masterServer.getPort())
.triggerFromFailureTasks(recoverFailureTaskRequest);
if (!recoverFailureTaskResponse.isSuccess()) {
throw new ServiceException("Recover workflow instance failed: " + recoverFailureTaskResponse.getMessage());
}
return null;
}

39
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RecoverSuspendedWorkflowInstanceExecutorDelegate.java

@ -18,13 +18,15 @@
package org.apache.dolphinscheduler.api.executor.workflow;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.CommandDao;
import java.util.Date;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksResponse;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -35,7 +37,7 @@ public class RecoverSuspendedWorkflowInstanceExecutorDelegate
IExecutorDelegate<RecoverSuspendedWorkflowInstanceExecutorDelegate.RecoverSuspendedWorkflowInstanceOperation, Void> {
@Autowired
private CommandDao commandDao;
private RegistryClient registryClient;
@Override
public Void execute(RecoverSuspendedWorkflowInstanceOperation workflowInstanceControlRequest) {
@ -45,16 +47,23 @@ public class RecoverSuspendedWorkflowInstanceExecutorDelegate
String.format("The workflow instance: %s state is %s, cannot recovery", workflowInstance.getName(),
workflowInstance.getState()));
}
final Command command = Command.builder()
.commandType(CommandType.RECOVER_SUSPENDED_PROCESS)
.processDefinitionCode(workflowInstance.getProcessDefinitionCode())
.processDefinitionVersion(workflowInstance.getProcessDefinitionVersion())
.processInstanceId(workflowInstance.getId())
.executorId(workflowInstanceControlRequest.executeUser.getId())
.startTime(new Date())
.updateTime(new Date())
final Server masterServer = registryClient.getRandomServer(RegistryNodeType.MASTER).orElse(null);
if (masterServer == null) {
throw new ServiceException("no master server available");
}
final WorkflowInstanceRecoverSuspendTasksRequest recoverSuspendTaskRequest =
WorkflowInstanceRecoverSuspendTasksRequest.builder()
.workflowInstanceId(workflowInstance.getId())
.userId(workflowInstanceControlRequest.executeUser.getId())
.build();
commandDao.insert(command);
final WorkflowInstanceRecoverSuspendTasksResponse recoverSuspendTaskResponse = Clients
.withService(IWorkflowControlClient.class)
.withHost(masterServer.getHost() + ":" + masterServer.getPort())
.triggerFromSuspendTasks(recoverSuspendTaskRequest);
if (!recoverSuspendTaskResponse.isSuccess()) {
throw new ServiceException("Recover workflow instance failed: " + recoverSuspendTaskResponse.getMessage());
}
return null;
}

41
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/RepeatRunningWorkflowInstanceExecutorDelegate.java

@ -18,13 +18,15 @@
package org.apache.dolphinscheduler.api.executor.workflow;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.CommandDao;
import java.util.Date;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRepeatRunningRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRepeatRunningResponse;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -35,7 +37,7 @@ public class RepeatRunningWorkflowInstanceExecutorDelegate
IExecutorDelegate<RepeatRunningWorkflowInstanceExecutorDelegate.RepeatRunningWorkflowInstanceOperation, Void> {
@Autowired
private CommandDao commandDao;
private RegistryClient registryClient;
@Override
public Void execute(RepeatRunningWorkflowInstanceOperation workflowInstanceControlRequest) {
@ -45,16 +47,25 @@ public class RepeatRunningWorkflowInstanceExecutorDelegate
String.format("The workflow instance: %s status is %s, cannot repeat running",
workflowInstance.getName(), workflowInstance.getState()));
}
Command command = Command.builder()
.commandType(CommandType.REPEAT_RUNNING)
.processInstanceId(workflowInstance.getId())
.processDefinitionCode(workflowInstance.getProcessDefinitionCode())
.processDefinitionVersion(workflowInstance.getProcessDefinitionVersion())
.executorId(workflowInstanceControlRequest.executeUser.getId())
.startTime(new Date())
.updateTime(new Date())
final Server masterServer = registryClient.getRandomServer(RegistryNodeType.MASTER).orElse(null);
if (masterServer == null) {
throw new ServiceException("no master server available");
}
final WorkflowInstanceRepeatRunningRequest repeatRunningRequest = WorkflowInstanceRepeatRunningRequest.builder()
.workflowInstanceId(workflowInstance.getId())
.userId(workflowInstanceControlRequest.executeUser.getId())
.build();
commandDao.insert(command);
final WorkflowInstanceRepeatRunningResponse repeatRunningResponse = Clients
.withService(IWorkflowControlClient.class)
.withHost(masterServer.getHost() + ":" + masterServer.getPort())
.repeatTriggerWorkflowInstance(repeatRunningRequest);
if (!repeatRunningResponse.isSuccess()) {
throw new ServiceException(
"Repeat running workflow instance failed: " + repeatRunningResponse.getMessage());
}
return null;
}

15
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IWorkflowControlClient.java

@ -23,6 +23,12 @@ import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowB
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerResponse;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstancePauseRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstancePauseResponse;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverFailureTasksRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverFailureTasksResponse;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksResponse;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRepeatRunningRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRepeatRunningResponse;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest;
@ -45,6 +51,15 @@ public interface IWorkflowControlClient {
@RpcMethod
WorkflowScheduleTriggerResponse scheduleTriggerWorkflow(final WorkflowScheduleTriggerRequest workflowScheduleTriggerRequest);
@RpcMethod
WorkflowInstanceRepeatRunningResponse repeatTriggerWorkflowInstance(final WorkflowInstanceRepeatRunningRequest workflowInstanceRepeatRunningRequest);
@RpcMethod
WorkflowInstanceRecoverFailureTasksResponse triggerFromFailureTasks(final WorkflowInstanceRecoverFailureTasksRequest workflowInstanceRecoverFailureTasksRequest);
@RpcMethod
WorkflowInstanceRecoverSuspendTasksResponse triggerFromSuspendTasks(final WorkflowInstanceRecoverSuspendTasksRequest workflowInstanceRecoverSuspendTasksRequest);
@RpcMethod
WorkflowInstancePauseResponse pauseWorkflowInstance(final WorkflowInstancePauseRequest workflowInstancePauseRequest);

35
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverFailureTasksRequest.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.extract.master.transportor.workflow;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WorkflowInstanceRecoverFailureTasksRequest {
private Integer workflowInstanceId;
private Integer userId;
}

39
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverFailureTasksResponse.java

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.extract.master.transportor.workflow;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class WorkflowInstanceRecoverFailureTasksResponse {
private boolean success;
private String message;
public static WorkflowInstanceRecoverFailureTasksResponse success() {
return new WorkflowInstanceRecoverFailureTasksResponse(true, null);
}
public static WorkflowInstanceRecoverFailureTasksResponse fail(String message) {
return new WorkflowInstanceRecoverFailureTasksResponse(false, message);
}
}

35
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverSuspendTasksRequest.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.extract.master.transportor.workflow;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WorkflowInstanceRecoverSuspendTasksRequest {
private Integer workflowInstanceId;
private Integer userId;
}

39
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRecoverSuspendTasksResponse.java

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.extract.master.transportor.workflow;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class WorkflowInstanceRecoverSuspendTasksResponse {
private boolean success;
private String message;
public static WorkflowInstanceRecoverSuspendTasksResponse success() {
return new WorkflowInstanceRecoverSuspendTasksResponse(true, null);
}
public static WorkflowInstanceRecoverSuspendTasksResponse fail(String message) {
return new WorkflowInstanceRecoverSuspendTasksResponse(false, message);
}
}

35
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRepeatRunningRequest.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.extract.master.transportor.workflow;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WorkflowInstanceRepeatRunningRequest {
private Integer workflowInstanceId;
private Integer userId;
}

39
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/workflow/WorkflowInstanceRepeatRunningResponse.java

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.extract.master.transportor.workflow;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class WorkflowInstanceRepeatRunningResponse {
private boolean success;
private String message;
public static WorkflowInstanceRepeatRunningResponse success() {
return new WorkflowInstanceRepeatRunningResponse(true, null);
}
public static WorkflowInstanceRepeatRunningResponse fail(String message) {
return new WorkflowInstanceRepeatRunningResponse(false, message);
}
}

76
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/AbstractWorkflowInstanceTrigger.java

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.engine.workflow.trigger;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.CommandDao;
import org.apache.dolphinscheduler.dao.repository.UserDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
public abstract class AbstractWorkflowInstanceTrigger<TriggerRequest, TriggerResponse>
implements
IWorkflowTrigger<TriggerRequest, TriggerResponse> {
@Autowired
private WorkflowDefinitionLogDao workflowDefinitionDao;
@Autowired
private WorkflowInstanceDao workflowInstanceDao;
@Autowired
private UserDao userDao;
@Autowired
private CommandDao commandDao;
@Override
@Transactional
public TriggerResponse triggerWorkflow(final TriggerRequest triggerRequest) {
final WorkflowInstance workflowInstance = constructWorkflowInstance(triggerRequest);
workflowInstanceDao.updateById(workflowInstance);
final Command command = constructTriggerCommand(triggerRequest, workflowInstance);
commandDao.insert(command);
return onTriggerSuccess(workflowInstance);
}
protected abstract WorkflowInstance constructWorkflowInstance(final TriggerRequest triggerRequest);
protected abstract Command constructTriggerCommand(final TriggerRequest triggerRequest,
final WorkflowInstance workflowInstance);
protected abstract TriggerResponse onTriggerSuccess(final WorkflowInstance workflowInstance);
protected WorkflowInstance getWorkflowInstance(final Integer workflowInstanceId) {
final WorkflowInstance workflowInstance = workflowInstanceDao.queryById(workflowInstanceId);
if (workflowInstance == null) {
throw new IllegalStateException("Workflow instance not found: " + workflowInstanceId);
}
return workflowInstance;
}
}

58
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceRecoverFailureTaskTrigger.java

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.engine.workflow.trigger;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverFailureTasksRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverFailureTasksResponse;
import java.util.Date;
import org.springframework.stereotype.Component;
@Component
public class WorkflowInstanceRecoverFailureTaskTrigger
extends
AbstractWorkflowInstanceTrigger<WorkflowInstanceRecoverFailureTasksRequest, WorkflowInstanceRecoverFailureTasksResponse> {
@Override
protected WorkflowInstance constructWorkflowInstance(final WorkflowInstanceRecoverFailureTasksRequest workflowInstanceRecoverFailureTasksRequest) {
return getWorkflowInstance(workflowInstanceRecoverFailureTasksRequest.getWorkflowInstanceId());
}
@Override
protected Command constructTriggerCommand(final WorkflowInstanceRecoverFailureTasksRequest workflowInstanceRecoverFailureTasksRequest,
final WorkflowInstance workflowInstance) {
return Command.builder()
.commandType(CommandType.START_FAILURE_TASK_PROCESS)
.processDefinitionCode(workflowInstance.getProcessDefinitionCode())
.processDefinitionVersion(workflowInstance.getProcessDefinitionVersion())
.processInstanceId(workflowInstance.getId())
.executorId(workflowInstanceRecoverFailureTasksRequest.getUserId())
.startTime(new Date())
.updateTime(new Date())
.build();
}
@Override
protected WorkflowInstanceRecoverFailureTasksResponse onTriggerSuccess(WorkflowInstance workflowInstance) {
return WorkflowInstanceRecoverFailureTasksResponse.success();
}
}

58
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceRecoverSuspendTaskTrigger.java

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.engine.workflow.trigger;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksResponse;
import java.util.Date;
import org.springframework.stereotype.Component;
@Component
public class WorkflowInstanceRecoverSuspendTaskTrigger
extends
AbstractWorkflowInstanceTrigger<WorkflowInstanceRecoverSuspendTasksRequest, WorkflowInstanceRecoverSuspendTasksResponse> {
@Override
protected WorkflowInstance constructWorkflowInstance(final WorkflowInstanceRecoverSuspendTasksRequest workflowInstanceRecoverSuspendTasksRequest) {
return getWorkflowInstance(workflowInstanceRecoverSuspendTasksRequest.getWorkflowInstanceId());
}
@Override
protected Command constructTriggerCommand(final WorkflowInstanceRecoverSuspendTasksRequest workflowInstanceRecoverSuspendTasksRequest,
final WorkflowInstance workflowInstance) {
return Command.builder()
.commandType(CommandType.RECOVER_SUSPENDED_PROCESS)
.processDefinitionCode(workflowInstance.getProcessDefinitionCode())
.processDefinitionVersion(workflowInstance.getProcessDefinitionVersion())
.processInstanceId(workflowInstance.getId())
.executorId(workflowInstanceRecoverSuspendTasksRequest.getUserId())
.startTime(new Date())
.updateTime(new Date())
.build();
}
@Override
protected WorkflowInstanceRecoverSuspendTasksResponse onTriggerSuccess(WorkflowInstance workflowInstance) {
return WorkflowInstanceRecoverSuspendTasksResponse.success();
}
}

58
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/trigger/WorkflowInstanceRepeatTrigger.java

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.engine.workflow.trigger;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRepeatRunningRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRepeatRunningResponse;
import java.util.Date;
import org.springframework.stereotype.Component;
@Component
public class WorkflowInstanceRepeatTrigger
extends
AbstractWorkflowInstanceTrigger<WorkflowInstanceRepeatRunningRequest, WorkflowInstanceRepeatRunningResponse> {
@Override
protected WorkflowInstance constructWorkflowInstance(final WorkflowInstanceRepeatRunningRequest repeatRunningRequest) {
return getWorkflowInstance(repeatRunningRequest.getWorkflowInstanceId());
}
@Override
protected Command constructTriggerCommand(final WorkflowInstanceRepeatRunningRequest repeatRunningRequest,
final WorkflowInstance workflowInstance) {
return Command.builder()
.commandType(CommandType.REPEAT_RUNNING)
.processInstanceId(workflowInstance.getId())
.processDefinitionCode(workflowInstance.getProcessDefinitionCode())
.processDefinitionVersion(workflowInstance.getProcessDefinitionVersion())
.executorId(repeatRunningRequest.getUserId())
.startTime(new Date())
.updateTime(new Date())
.build();
}
@Override
protected WorkflowInstanceRepeatRunningResponse onTriggerSuccess(final WorkflowInstance workflowInstance) {
return WorkflowInstanceRepeatRunningResponse.success();
}
}

58
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/WorkflowControlClient.java

@ -22,6 +22,12 @@ import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowB
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerResponse;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstancePauseRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstancePauseResponse;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverFailureTasksRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverFailureTasksResponse;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksResponse;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRepeatRunningRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRepeatRunningResponse;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest;
@ -31,6 +37,9 @@ import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowS
import org.apache.dolphinscheduler.server.master.engine.WorkflowCacheRepository;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowBackfillTrigger;
import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowInstanceRecoverFailureTaskTrigger;
import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowInstanceRecoverSuspendTaskTrigger;
import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowInstanceRepeatTrigger;
import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowManualTrigger;
import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowScheduleTrigger;
@ -54,6 +63,15 @@ public class WorkflowControlClient implements IWorkflowControlClient {
@Autowired
private WorkflowScheduleTrigger workflowScheduleTrigger;
@Autowired
private WorkflowInstanceRepeatTrigger workflowInstanceRepeatTrigger;
@Autowired
private WorkflowInstanceRecoverFailureTaskTrigger workflowInstanceRecoverFailureTaskTrigger;
@Autowired
private WorkflowInstanceRecoverSuspendTaskTrigger workflowInstanceRecoverSuspendTaskTrigger;
@Autowired
private WorkflowCacheRepository workflowRepository;
@ -78,7 +96,7 @@ public class WorkflowControlClient implements IWorkflowControlClient {
}
@Override
public WorkflowScheduleTriggerResponse scheduleTriggerWorkflow(WorkflowScheduleTriggerRequest workflowScheduleTriggerRequest) {
public WorkflowScheduleTriggerResponse scheduleTriggerWorkflow(final WorkflowScheduleTriggerRequest workflowScheduleTriggerRequest) {
try {
return workflowScheduleTrigger.triggerWorkflow(workflowScheduleTriggerRequest);
} catch (Exception ex) {
@ -88,6 +106,44 @@ public class WorkflowControlClient implements IWorkflowControlClient {
}
}
@Override
public WorkflowInstanceRepeatRunningResponse repeatTriggerWorkflowInstance(final WorkflowInstanceRepeatRunningRequest workflowInstanceRepeatRunningRequest) {
try {
return workflowInstanceRepeatTrigger.triggerWorkflow(workflowInstanceRepeatRunningRequest);
} catch (Exception ex) {
log.error("Handle workflowInstanceRepeatRunningRequest: {} failed", workflowInstanceRepeatRunningRequest,
ex);
return WorkflowInstanceRepeatRunningResponse
.fail("Repeat trigger workflow instance failed: " + ExceptionUtils.getMessage(ex));
}
}
@Override
public WorkflowInstanceRecoverFailureTasksResponse triggerFromFailureTasks(final WorkflowInstanceRecoverFailureTasksRequest workflowInstanceRecoverFailureTasksRequest) {
try {
return workflowInstanceRecoverFailureTaskTrigger
.triggerWorkflow(workflowInstanceRecoverFailureTasksRequest);
} catch (Exception ex) {
log.error("Handle workflowInstanceRecoverFailureTaskRequest: {} failed",
workflowInstanceRecoverFailureTasksRequest, ex);
return WorkflowInstanceRecoverFailureTasksResponse
.fail("Recover failure task failed: " + ExceptionUtils.getMessage(ex));
}
}
@Override
public WorkflowInstanceRecoverSuspendTasksResponse triggerFromSuspendTasks(WorkflowInstanceRecoverSuspendTasksRequest workflowInstanceRecoverSuspendTasksRequest) {
try {
return workflowInstanceRecoverSuspendTaskTrigger
.triggerWorkflow(workflowInstanceRecoverSuspendTasksRequest);
} catch (Exception ex) {
log.error("Handle workflowInstanceRecoverSuspendTaskRequest: {} failed",
workflowInstanceRecoverSuspendTasksRequest, ex);
return WorkflowInstanceRecoverSuspendTasksResponse
.fail("Recover suspend task failed: " + ExceptionUtils.getMessage(ex));
}
}
@Override
public WorkflowInstancePauseResponse pauseWorkflowInstance(final WorkflowInstancePauseRequest workflowInstancePauseRequest) {
try {

19
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/it/WorkflowOperator.java

@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowB
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowBackfillTriggerResponse;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstancePauseRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstancePauseResponse;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverFailureTasksRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRepeatRunningRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse;
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest;
@ -89,6 +91,23 @@ public class WorkflowOperator {
schedulerApi.insertOrUpdateScheduleTask(project.getId(), schedule);
}
public void repeatRunningWorkflowInstance(final Integer workflowInstanceId) {
final WorkflowInstanceRepeatRunningRequest repeatRunningRequest = WorkflowInstanceRepeatRunningRequest.builder()
.workflowInstanceId(workflowInstanceId)
.userId(1)
.build();
workflowInstanceController.repeatTriggerWorkflowInstance(repeatRunningRequest);
}
public void recoverFailureTasks(final Integer workflowInstanceId) {
final WorkflowInstanceRecoverFailureTasksRequest recoverFailureTasksRequest =
WorkflowInstanceRecoverFailureTasksRequest.builder()
.workflowInstanceId(workflowInstanceId)
.userId(1)
.build();
workflowInstanceController.triggerFromFailureTasks(recoverFailureTasksRequest);
}
public WorkflowInstancePauseResponse pauseWorkflowInstance(Integer workflowInstanceId) {
final WorkflowInstancePauseRequest workflowInstancePauseRequest =
new WorkflowInstancePauseRequest(workflowInstanceId);

117
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/it/cases/WorkflowInstanceRecoverFailureTaskIT.java

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.it.cases;
import static com.google.common.truth.Truth.assertThat;
import static org.awaitility.Awaitility.await;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTest;
import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
import org.apache.dolphinscheduler.server.master.it.Repository;
import org.apache.dolphinscheduler.server.master.it.WorkflowITContext;
import org.apache.dolphinscheduler.server.master.it.WorkflowITContextFactory;
import org.apache.dolphinscheduler.server.master.it.WorkflowOperator;
import org.apache.commons.lang3.StringUtils;
import java.time.Duration;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
/**
* The integration test for recover from failure tasks.
*/
public class WorkflowInstanceRecoverFailureTaskIT extends AbstractMasterIntegrationTest {
@Autowired
private WorkflowITContextFactory workflowITContextFactory;
@Autowired
private WorkflowOperator workflowOperator;
@Autowired
private IWorkflowRepository workflowRepository;
@Autowired
private Repository repository;
@Test
@DisplayName("Test recover from failure tasks")
public void testRepeatRunningWorkflow_with_taskOnly() {
final String yaml = "/it/recover_failure_tasks/failure_workflow_with_two_serial_fake_task.yaml";
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml);
final Integer workflowInstanceId = context.getWorkflowInstance().getId();
workflowOperator.recoverFailureTasks(workflowInstanceId);
await()
.pollInterval(Duration.ofMillis(100))
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
});
await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
final WorkflowInstance workflowInstance = repository.queryWorkflowInstance(workflowInstanceId);
Assertions
.assertThat(workflowInstance.getState())
.isEqualTo(WorkflowExecutionStatus.SUCCESS);
Assertions
.assertThat(workflowInstance.getRunTimes())
.isEqualTo(2);
final List<TaskInstance> taskInstances = repository.queryTaskInstance(workflowInstanceId);
Assertions
.assertThat(taskInstances)
.hasSize(3);
Assertions
.assertThat(taskInstances.get(0))
.matches(t -> "A".equals(t.getName()))
.matches(t -> t.getState() == TaskExecutionStatus.FAILURE)
.matches(t -> t.getFlag() == Flag.NO);
Assertions
.assertThat(taskInstances.get(1))
.matches(t -> "A".equals(t.getName()))
.matches(t -> t.getState() == TaskExecutionStatus.SUCCESS)
.matches(t -> t.getFlag() == Flag.YES)
.matches(t -> StringUtils.isNotEmpty(t.getLogPath()));
Assertions
.assertThat(taskInstances.get(2))
.matches(t -> t.getState() == TaskExecutionStatus.SUCCESS)
.matches(t -> t.getFlag() == Flag.YES)
.matches(t -> StringUtils.isNotEmpty(t.getLogPath()));
});
assertThat(workflowRepository.getAll()).isEmpty();
}
}

202
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/it/cases/WorkflowInstanceRepeatRunningIT.java

@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.it.cases;
import static com.google.common.truth.Truth.assertThat;
import static org.awaitility.Awaitility.await;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.AbstractMasterIntegrationTest;
import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
import org.apache.dolphinscheduler.server.master.it.Repository;
import org.apache.dolphinscheduler.server.master.it.WorkflowITContext;
import org.apache.dolphinscheduler.server.master.it.WorkflowITContextFactory;
import org.apache.dolphinscheduler.server.master.it.WorkflowOperator;
import org.apache.commons.lang3.StringUtils;
import java.time.Duration;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
/**
* The integration test for repeat running a workflow instance.
*/
public class WorkflowInstanceRepeatRunningIT extends AbstractMasterIntegrationTest {
@Autowired
private WorkflowITContextFactory workflowITContextFactory;
@Autowired
private WorkflowOperator workflowOperator;
@Autowired
private IWorkflowRepository workflowRepository;
@Autowired
private Repository repository;
@Test
@DisplayName("Test repeat running a workflow instance with one success task")
public void testRepeatRunningWorkflow_with_oneSuccessTask() {
final String yaml = "/it/repeat_running/success_workflow_with_one_fake_task_success.yaml";
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml);
final Integer workflowInstanceId = context.getWorkflowInstance().getId();
workflowOperator.repeatRunningWorkflowInstance(workflowInstanceId);
await()
.pollInterval(Duration.ofMillis(100))
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
});
await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
final WorkflowInstance workflowInstance = repository.queryWorkflowInstance(workflowInstanceId);
Assertions
.assertThat(workflowInstance)
.matches(w -> w.getState() == WorkflowExecutionStatus.SUCCESS)
.matches(w -> w.getRunTimes() == 2);
final List<TaskInstance> taskInstances = repository.queryTaskInstance(workflowInstanceId);
Assertions
.assertThat(taskInstances)
.hasSize(2);
Assertions
.assertThat(taskInstances.get(0))
.matches(t -> t.getState() == TaskExecutionStatus.SUCCESS)
.matches(t -> t.getFlag() == Flag.NO);
Assertions
.assertThat(taskInstances.get(1))
.matches(t -> t.getState() == TaskExecutionStatus.SUCCESS)
.matches(t -> t.getFlag() == Flag.YES)
.matches(t -> StringUtils.isNotEmpty(t.getLogPath()));
});
assertThat(workflowRepository.getAll()).isEmpty();
}
@Test
@DisplayName("Test repeat running a workflow instance with one failed task")
public void testRepeatRunningWorkflow_with_oneFailedTask() {
final String yaml = "/it/repeat_running/failed_workflow_with_one_fake_task_failed.yaml";
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml);
final Integer workflowInstanceId = context.getWorkflowInstance().getId();
workflowOperator.repeatRunningWorkflowInstance(workflowInstanceId);
await()
.pollInterval(Duration.ofMillis(100))
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
});
await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
final WorkflowInstance workflowInstance = repository.queryWorkflowInstance(workflowInstanceId);
Assertions
.assertThat(workflowInstance.getState())
.isEqualTo(WorkflowExecutionStatus.SUCCESS);
Assertions
.assertThat(workflowInstance.getRunTimes())
.isEqualTo(2);
final List<TaskInstance> taskInstances = repository.queryTaskInstance(workflowInstanceId);
Assertions
.assertThat(taskInstances)
.hasSize(2);
Assertions
.assertThat(taskInstances.get(0))
.matches(t -> t.getState() == TaskExecutionStatus.FAILURE)
.matches(t -> t.getFlag() == Flag.NO);
Assertions
.assertThat(taskInstances.get(1))
.matches(t -> t.getState() == TaskExecutionStatus.SUCCESS)
.matches(t -> t.getFlag() == Flag.YES)
.matches(t -> StringUtils.isNotEmpty(t.getLogPath()));
});
assertThat(workflowRepository.getAll()).isEmpty();
}
@Test
@DisplayName("Test repeat running a workflow instance with task only")
public void testRepeatRunningWorkflow_with_taskOnly() {
final String yaml = "/it/repeat_running/success_workflow_with_task_only.yaml";
final WorkflowITContext context = workflowITContextFactory.initializeContextFromYaml(yaml);
final Integer workflowInstanceId = context.getWorkflowInstance().getId();
workflowOperator.repeatRunningWorkflowInstance(workflowInstanceId);
await()
.pollInterval(Duration.ofMillis(100))
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId).getState())
.isEqualTo(WorkflowExecutionStatus.RUNNING_EXECUTION);
});
await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
final WorkflowInstance workflowInstance = repository.queryWorkflowInstance(workflowInstanceId);
Assertions
.assertThat(workflowInstance.getState())
.isEqualTo(WorkflowExecutionStatus.SUCCESS);
Assertions
.assertThat(workflowInstance.getRunTimes())
.isEqualTo(2);
final List<TaskInstance> taskInstances = repository.queryTaskInstance(workflowInstanceId);
Assertions
.assertThat(taskInstances)
.hasSize(2);
Assertions
.assertThat(taskInstances.get(0))
.matches(t -> t.getState() == TaskExecutionStatus.SUCCESS)
.matches(t -> t.getFlag() == Flag.NO);
Assertions
.assertThat(taskInstances.get(1))
.matches(t -> t.getState() == TaskExecutionStatus.SUCCESS)
.matches(t -> t.getFlag() == Flag.YES)
.matches(t -> StringUtils.isNotEmpty(t.getLogPath()));
});
assertThat(workflowRepository.getAll()).isEmpty();
}
}

130
dolphinscheduler-master/src/test/resources/it/recover_failure_tasks/failure_workflow_with_two_serial_fake_task.yaml

@ -0,0 +1,130 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
project:
name: MasterIntegrationTest
code: 1
description: This is a fake project
userId: 1
userName: admin
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
workflowInstance:
id: 1
name: workflow_with_two_serial_fake_task_success-20240816071251690
processDefinitionCode: 1
processDefinitionVersion: 1
projectCode: 1
state: FAILURE
recovery: NO
startTime: 2024-08-16 07:12:52
endTime: 2024-08-16 07:12:57
runTimes: 1
host: 127.0.0.1:5678
commandType: START_PROCESS
commandParam: '{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
taskDependType: TASK_POST
commandStartTime: 2024-08-16 07:12:52
isSubProcess: NO
executorId: 1
historyCmd: START_PROCESS
workerGroup: default
globalParams: '[]'
varPool: '[]'
dryRun: 0
taskInstances:
- id: 1
name: A
taskType: LogicFakeTask
processInstanceId: 1
processInstanceName: workflow_with_two_parallel_fake_task_success-20240816071251690
projectCode: 1
taskCode: 1
taskDefinitionVersion: 1
state: FAILURE
firstSubmitTime: 2024-08-16 07:12:52
submitTime: 2024-08-16 07:12:57
startTime: 2024-08-16 07:12:57
endTime: 2024-08-16 07:12:57
retryTimes: 0
host: 127.0.0.1:1234
maxRetryTimes: 0
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
flag: YES
retryInterval: 0
delayTime: 0
workerGroup: default
executorId: 1
varPool: '[]'
taskExecuteType: BATCH
workflow:
name: workflow_with_two_serial_fake_task_success
code: 1
version: 1
projectCode: 1
description: This is a fake workflow with two serial tasks
releaseState: ONLINE
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
userId: 1
executionType: PARALLEL
tasks:
- name: A
code: 1
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo hello"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH
- name: B
code: 2
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo hello"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH
taskRelations:
- projectCode: 1
processDefinitionCode: 1
processDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 1
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
- projectCode: 1
processDefinitionCode: 1
processDefinitionVersion: 1
preTaskCode: 1
preTaskVersion: 1
postTaskCode: 2
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00

111
dolphinscheduler-master/src/test/resources/it/repeat_running/failed_workflow_with_one_fake_task_failed.yaml

@ -0,0 +1,111 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
project:
name: MasterIntegrationTest
code: 1
description: This is a fake project
userId: 1
userName: admin
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
workflow:
name: workflow_with_one_fake_task_success
code: 1
version: 1
projectCode: 1
description: This is a fake workflow with single task
releaseState: ONLINE
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
userId: 1
executionType: PARALLEL
workflowInstance:
id: 1
name: workflow_with_one_fake_task_success-20240816071251690
processDefinitionCode: 1
processDefinitionVersion: 1
projectCode: 1
state: FAILURE
recovery: NO
startTime: 2024-08-16 07:12:52
endTime: 2024-08-16 07:12:57
runTimes: 1
host: 127.0.0.1:5678
commandType: START_PROCESS
commandParam: '{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
taskDependType: TASK_POST
commandStartTime: 2024-08-16 07:12:52
isSubProcess: NO
executorId: 1
historyCmd: START_PROCESS
workerGroup: default
globalParams: '[]'
varPool: '[]'
dryRun: 0
taskInstances:
- id: 1
name: A
taskType: LogicFakeTask
processInstanceId: 1
processInstanceName: workflow_with_one_fake_task_success-20240816071251690
projectCode: 1
taskCode: 1
taskDefinitionVersion: 1
state: FAILURE
firstSubmitTime: 2024-08-16 07:12:52
submitTime: 2024-08-16 07:12:57
startTime: 2024-08-16 07:12:57
endTime: 2024-08-16 07:12:57
retryTimes: 0
host: 127.0.0.1:1234
maxRetryTimes: 0
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
flag: YES
retryInterval: 0
delayTime: 0
workerGroup: default
executorId: 1
varPool: '[]'
taskExecuteType: BATCH
tasks:
- name: A
code: 1
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH
taskRelations:
- projectCode: 1
processDefinitionCode: 1
processDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 1
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00

111
dolphinscheduler-master/src/test/resources/it/repeat_running/success_workflow_with_one_fake_task_success.yaml

@ -0,0 +1,111 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
project:
name: MasterIntegrationTest
code: 1
description: This is a fake project
userId: 1
userName: admin
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
workflow:
name: workflow_with_one_fake_task_success
code: 1
version: 1
projectCode: 1
description: This is a fake workflow with single task
releaseState: ONLINE
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
userId: 1
executionType: PARALLEL
workflowInstance:
id: 1
name: workflow_with_one_fake_task_success-20240816071251690
processDefinitionCode: 1
processDefinitionVersion: 1
projectCode: 1
state: SUCCESS
recovery: NO
startTime: 2024-08-16 07:12:52
endTime: 2024-08-16 07:12:57
runTimes: 1
host: 127.0.0.1:5678
commandType: START_PROCESS
commandParam: '{"commandType":"START_PROCESS","startNodes":[],"commandParams":[],"timeZone":"UTC"}'
taskDependType: TASK_POST
commandStartTime: 2024-08-16 07:12:52
isSubProcess: NO
executorId: 1
historyCmd: START_PROCESS
workerGroup: default
globalParams: '[]'
varPool: '[]'
dryRun: 0
taskInstances:
- id: 1
name: A
taskType: LogicFakeTask
processInstanceId: 1
processInstanceName: workflow_with_one_fake_task_success-20240816071251690
projectCode: 1
taskCode: 1
taskDefinitionVersion: 1
state: SUCCESS
firstSubmitTime: 2024-08-16 07:12:52
submitTime: 2024-08-16 07:12:57
startTime: 2024-08-16 07:12:57
endTime: 2024-08-16 07:12:57
retryTimes: 0
host: 127.0.0.1:1234
maxRetryTimes: 0
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
flag: YES
retryInterval: 0
delayTime: 0
workerGroup: default
executorId: 1
varPool: '[]'
taskExecuteType: BATCH
tasks:
- name: A
code: 1
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH
taskRelations:
- projectCode: 1
processDefinitionCode: 1
processDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 1
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00

131
dolphinscheduler-master/src/test/resources/it/repeat_running/success_workflow_with_task_only.yaml

@ -0,0 +1,131 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
project:
name: MasterIntegrationTest
code: 1
description: This is a fake project
userId: 1
userName: admin
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
workflowInstance:
id: 1
name: workflow_with_two_parallel_fake_task_success-20240816071251690
processDefinitionCode: 1
processDefinitionVersion: 1
projectCode: 1
state: SUCCESS
recovery: NO
startTime: 2024-08-16 07:12:52
endTime: 2024-08-16 07:12:57
runTimes: 1
host: 127.0.0.1:5678
commandType: START_PROCESS
commandParam: '{"commandType":"START_PROCESS","startNodes":[1],"commandParams":[],"timeZone":"UTC"}'
taskDependType: TASK_ONLY
commandStartTime: 2024-08-16 07:12:52
isSubProcess: NO
executorId: 1
historyCmd: START_PROCESS
workerGroup: default
globalParams: '[]'
varPool: '[]'
dryRun: 0
taskInstances:
- id: 1
name: A
taskType: LogicFakeTask
processInstanceId: 1
processInstanceName: workflow_with_two_parallel_fake_task_success-20240816071251690
projectCode: 1
taskCode: 1
taskDefinitionVersion: 1
state: SUCCESS
firstSubmitTime: 2024-08-16 07:12:52
submitTime: 2024-08-16 07:12:57
startTime: 2024-08-16 07:12:57
endTime: 2024-08-16 07:12:57
retryTimes: 0
host: 127.0.0.1:1234
maxRetryTimes: 0
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
flag: YES
retryInterval: 0
delayTime: 0
workerGroup: default
executorId: 1
varPool: '[]'
taskExecuteType: BATCH
workflow:
name: workflow_with_two_parallel_fake_task_success
code: 1
version: 1
projectCode: 1
description: This is a fake workflow with two parallel success tasks
releaseState: ONLINE
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
userId: 1
executionType: PARALLEL
tasks:
- name: A
code: 1
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH
- name: B
code: 2
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH
taskRelations:
- projectCode: 1
processDefinitionCode: 1
processDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 1
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
- projectCode: 1
processDefinitionCode: 1
processDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 2
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
Loading…
Cancel
Save