From 4351a25f2a1d0eb965ebb0db080c46575b0b0e25 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Fri, 24 Feb 2023 13:47:41 +0800 Subject: [PATCH] Add execute function to handle the workflow instance operation (#13610) --- .../api/controller/ExecutorController.java | 2 - .../api/enums/ExecuteType.java | 35 ++- .../dolphinscheduler/api/enums/Status.java | 2 +- .../api/executor/ExecuteClient.java | 62 +++++ .../api/executor/ExecuteContext.java | 51 ++++ .../api/executor/ExecuteFunction.java | 37 +++ .../api/executor/ExecuteFunctionBuilder.java | 32 +++ .../api/executor/ExecuteRequest.java | 22 ++ .../api/executor/ExecuteResult.java | 22 ++ .../api/executor/ExecuteRuntimeException.java | 46 ++++ .../FailureRecoveryExecuteFunction.java | 64 +++++ ...FailureRecoveryExecuteFunctionBuilder.java | 59 ++++ .../recovery/FailureRecoveryRequest.java | 36 +++ .../recovery/FailureRecoveryResult.java | 30 +++ .../pause/pause/PauseExecuteFunction.java | 80 ++++++ .../pause/PauseExecuteFunctionBuilder.java | 60 +++++ .../pause/pause/PauseExecuteRequest.java | 35 +++ .../pause/pause/PauseExecuteResult.java | 29 ++ .../pause/recover/RecoverExecuteFunction.java | 78 ++++++ .../RecoverExecuteFunctionBuilder.java | 60 +++++ .../pause/recover/RecoverExecuteRequest.java | 36 +++ .../pause/recover/RecoverExecuteResult.java | 29 ++ .../rerun/RepeatRunningExecuteFunction.java | 93 +++++++ .../RepeatRunningExecuteFunctionBuilder.java | 59 ++++ .../instance/rerun/RepeatRunningRequest.java | 38 +++ .../instance/rerun/RepeatRunningResult.java | 30 +++ .../instance/stop/StopExecuteFunction.java | 87 ++++++ .../stop/StopExecuteFunctionBuilder.java | 56 ++++ .../workflow/instance/stop/StopRequest.java | 33 +++ .../workflow/instance/stop/StopResult.java | 33 +++ .../api/rpc/ApiRpcClient.java | 41 +++ .../api/service/ProcessDefinitionService.java | 11 +- .../api/service/ProcessInstanceService.java | 2 + .../api/service/ProjectService.java | 5 +- .../api/service/impl/ExecutorServiceImpl.java | 252 ++++-------------- .../impl/ProcessDefinitionServiceImpl.java | 20 ++ .../impl/ProcessInstanceServiceImpl.java | 9 + .../api/service/impl/ProjectServiceImpl.java | 6 + ...ava => ExecuteFunctionControllerTest.java} | 2 +- ...t.java => ExecuteFunctionServiceTest.java} | 49 +++- .../dolphinscheduler/dao/entity/Command.java | 21 +- .../repository/ProcessDefinitionLogDao.java | 4 + .../dao/repository/ProcessInstanceDao.java | 1 + .../impl/ProcessDefinitionLogDaoImpl.java | 7 + .../impl/ProcessInstanceDaoImpl.java | 5 + .../plugin/task/api/utils/ShellUtils.java | 7 +- 46 files changed, 1541 insertions(+), 237 deletions(-) create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteClient.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteContext.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunction.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunctionBuilder.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRequest.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteResult.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRuntimeException.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunction.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunctionBuilder.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryRequest.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryResult.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunctionBuilder.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteRequest.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteResult.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunction.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunctionBuilder.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteRequest.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteResult.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunction.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunctionBuilder.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningRequest.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningResult.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunctionBuilder.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopRequest.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopResult.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/rpc/ApiRpcClient.java rename dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/{ExecutorControllerTest.java => ExecuteFunctionControllerTest.java} (99%) rename dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/{ExecutorServiceTest.java => ExecuteFunctionServiceTest.java} (94%) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java index 16ec43dbdd..4f474821a7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java @@ -306,8 +306,6 @@ public class ExecutorController extends BaseController { @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, @RequestParam("processInstanceId") Integer processInstanceId, @RequestParam("executeType") ExecuteType executeType) { - log.info("Start to execute process instance, projectCode:{}, processInstanceId:{}.", projectCode, - processInstanceId); Map result = execService.execute(loginUser, projectCode, processInstanceId, executeType); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java index fea69bf22d..ff3f122a23 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java @@ -30,14 +30,43 @@ public enum ExecuteType { * 4 stop * 5 pause */ - NONE, REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE, EXECUTE_TASK; + NONE(0, "NONE"), + + // ******************************* Workflow *************************** + REPEAT_RUNNING(1, "REPEAT_RUNNING"), + RECOVER_SUSPENDED_PROCESS(2, "RECOVER_SUSPENDED_PROCESS"), + START_FAILURE_TASK_PROCESS(3, "START_FAILURE_TASK_PROCESS"), + STOP(4, "STOP"), + PAUSE(5, "PAUSE"), + // ******************************* Workflow *************************** + + // ******************************* Task ******************************* + EXECUTE_TASK(6, "EXECUTE_TASK"), + // ******************************* Task ******************************* + ; + + private final int code; + private final String desc; + + ExecuteType(int code, String desc) { + this.code = code; + this.desc = desc; + } + + public int getCode() { + return code; + } + + public String getDesc() { + return desc; + } public static ExecuteType getEnum(int value) { for (ExecuteType e : ExecuteType.values()) { - if (e.ordinal() == value) { + if (e.getCode() == value) { return e; } } - return null; + return NONE; } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 97db8eb78f..168d0f94ae 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -532,7 +532,7 @@ public enum Status { NO_CURRENT_OPERATING_PERMISSION(1400001, "The current user does not have this permission.", "当前用户无此权限"), FUNCTION_DISABLED(1400002, "The current feature is disabled.", "当前功能已被禁用"), - SCHEDULE_TIME_NUMBER(1400003, "The number of complement dates exceed 100.", "补数日期个数超过100"), + SCHEDULE_TIME_NUMBER_EXCEED(1400003, "The number of complement dates exceed 100.", "补数日期个数超过100"), DESCRIPTION_TOO_LONG_ERROR(1400004, "description is too long error", "描述过长"), DELETE_WORKER_GROUP_BY_ID_FAIL_ENV(1400005, "delete worker group fail, for there are [{0}] enviroments using:{1}", "删除工作组失败,有 [{0}] 个环境正在使用:{1}"); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteClient.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteClient.java new file mode 100644 index 0000000000..f8f685b4b3 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteClient.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.dolphinscheduler.api.enums.ExecuteType; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.springframework.stereotype.Component; + +/** + * This is the main class for executing workflow/workflowInstance/tasks. + *
+ *     ExecuteContext executeContext = ExecuteContext.builder()
+ *         .processInstance(processInstance)
+ *         .executeType(...)
+ *         .build();
+ *     executeClient.execute(executeContext);
+ * 
+ */ +@Component +@SuppressWarnings("unchecked") +public class ExecuteClient { + + private final Map executorFunctionBuilderMap; + + public ExecuteClient(List executeFunctionBuilders) { + executorFunctionBuilderMap = executeFunctionBuilders.stream() + .collect(Collectors.toMap(ExecuteFunctionBuilder::getExecuteType, Function.identity())); + } + + public ExecuteResult executeWorkflowInstance(ExecuteContext executeContext) throws ExecuteRuntimeException { + ExecuteFunctionBuilder executeFunctionBuilder = checkNotNull( + executorFunctionBuilderMap.get(executeContext.getExecuteType()), + String.format("The executeType: %s is not supported", executeContext.getExecuteType())); + + return executeFunctionBuilder.createWorkflowInstanceExecuteFunction(executeContext) + .thenCombine(executeFunctionBuilder.createWorkflowInstanceExecuteRequest(executeContext), + ExecuteFunction::execute) + .join(); + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteContext.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteContext.java new file mode 100644 index 0000000000..84c100b019 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteContext.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.dolphinscheduler.api.enums.ExecuteType; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.User; + +import lombok.Data; + +// todo: to be interface +@Data +public class ExecuteContext { + + private final ProcessInstance workflowInstance; + + private final ProcessDefinition workflowDefinition; + + private final User executeUser; + + private final ExecuteType executeType; + + public ExecuteContext(ProcessInstance workflowInstance, + ProcessDefinition workflowDefinition, + User executeUser, + ExecuteType executeType) { + this.workflowInstance = checkNotNull(workflowInstance, "workflowInstance cannot be null"); + this.workflowDefinition = checkNotNull(workflowDefinition, "workflowDefinition cannot be null"); + this.executeUser = checkNotNull(executeUser, "executeUser cannot be null"); + this.executeType = checkNotNull(executeType, "executeType cannot be null"); + } + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunction.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunction.java new file mode 100644 index 0000000000..8945c0d678 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunction.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor; + +import org.apache.dolphinscheduler.api.enums.ExecuteType; + +public interface ExecuteFunction { + + /** + * Execute the workflow by the given request. + * + * @param request execute request + * @return execute result + * @throws ExecuteRuntimeException If there is an exception during execution, it will be thrown. + */ + Result execute(Request request) throws ExecuteRuntimeException; + + /** + * @return the type of the executor + */ + ExecuteType getExecuteType(); +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunctionBuilder.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunctionBuilder.java new file mode 100644 index 0000000000..49dc9e9f8d --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunctionBuilder.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor; + +import org.apache.dolphinscheduler.api.enums.ExecuteType; + +import java.util.concurrent.CompletableFuture; + +public interface ExecuteFunctionBuilder { + + CompletableFuture> createWorkflowInstanceExecuteFunction(ExecuteContext executeContext); + + CompletableFuture createWorkflowInstanceExecuteRequest(ExecuteContext executeContext); + + ExecuteType getExecuteType(); + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRequest.java new file mode 100644 index 0000000000..d5b02c5eba --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRequest.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor; + +public interface ExecuteRequest { + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteResult.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteResult.java new file mode 100644 index 0000000000..52ae3c8d5f --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteResult.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor; + +public interface ExecuteResult { + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRuntimeException.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRuntimeException.java new file mode 100644 index 0000000000..1cdebb0952 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRuntimeException.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor; + +// todo: implement from DolphinSchedulerRuntimeException +public class ExecuteRuntimeException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + private static final String EXECUTE_WORKFLOW_INSTANCE_ERROR = + "Execute workflow instance %s failed, execute type is %s"; + + public ExecuteRuntimeException(String message) { + super(message); + } + + public ExecuteRuntimeException(String message, Throwable cause) { + super(message, cause); + } + + public static ExecuteRuntimeException executeWorkflowInstanceError(ExecuteContext executeContext) { + return executeWorkflowInstanceError(executeContext, null); + } + + public static ExecuteRuntimeException executeWorkflowInstanceError(ExecuteContext executeContext, Throwable cause) { + return new ExecuteRuntimeException( + String.format(EXECUTE_WORKFLOW_INSTANCE_ERROR, executeContext.getWorkflowInstance().getName(), + executeContext.getExecuteType()), + cause); + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunction.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunction.java new file mode 100644 index 0000000000..546e632b80 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunction.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.failure.recovery; + +import org.apache.dolphinscheduler.api.enums.ExecuteType; +import org.apache.dolphinscheduler.api.executor.ExecuteFunction; +import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.service.command.CommandService; + +public class FailureRecoveryExecuteFunction implements ExecuteFunction { + + private final CommandService commandService; + + public FailureRecoveryExecuteFunction(CommandService commandService) { + this.commandService = commandService; + } + + @Override + public FailureRecoveryResult execute(FailureRecoveryRequest request) throws ExecuteRuntimeException { + ProcessInstance workflowInstance = request.getWorkflowInstance(); + if (!workflowInstance.getState().isFailure()) { + throw new ExecuteRuntimeException( + String.format("The workflow instance: %s status is %s, can not be recovered", + workflowInstance.getName(), workflowInstance.getState())); + } + + Command command = Command.builder() + .commandType(CommandType.START_FAILURE_TASK_PROCESS) + .processDefinitionCode(workflowInstance.getProcessDefinitionCode()) + .processDefinitionVersion(workflowInstance.getProcessDefinitionVersion()) + .processInstanceId(workflowInstance.getId()) + .executorId(request.getExecuteUser().getId()) + .testFlag(workflowInstance.getTestFlag()) + .build(); + if (commandService.createCommand(command) <= 0) { + throw new ExecuteRuntimeException( + "Failure recovery workflow instance failed, due to insert command to db failed"); + } + return new FailureRecoveryResult(command.getId()); + } + + @Override + public ExecuteType getExecuteType() { + return FailureRecoveryExecuteFunctionBuilder.EXECUTE_TYPE; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunctionBuilder.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunctionBuilder.java new file mode 100644 index 0000000000..1509220bee --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunctionBuilder.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.failure.recovery; + +import org.apache.dolphinscheduler.api.enums.ExecuteType; +import org.apache.dolphinscheduler.api.executor.ExecuteContext; +import org.apache.dolphinscheduler.api.executor.ExecuteFunction; +import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder; +import org.apache.dolphinscheduler.service.command.CommandService; + +import java.util.concurrent.CompletableFuture; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class FailureRecoveryExecuteFunctionBuilder + implements + ExecuteFunctionBuilder { + + public static final ExecuteType EXECUTE_TYPE = ExecuteType.START_FAILURE_TASK_PROCESS; + + @Autowired + private CommandService commandService; + + @Override + public CompletableFuture> createWorkflowInstanceExecuteFunction(ExecuteContext executeContext) { + return CompletableFuture.completedFuture(new FailureRecoveryExecuteFunction(commandService)); + } + + @Override + public CompletableFuture createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) { + return CompletableFuture.completedFuture( + new FailureRecoveryRequest( + executeContext.getWorkflowInstance(), + executeContext.getWorkflowDefinition(), + executeContext.getExecuteUser())); + } + + @Override + public ExecuteType getExecuteType() { + return EXECUTE_TYPE; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryRequest.java new file mode 100644 index 0000000000..e6e7231540 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryRequest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.failure.recovery; + +import org.apache.dolphinscheduler.api.executor.ExecuteRequest; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.User; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class FailureRecoveryRequest implements ExecuteRequest { + + private final ProcessInstance workflowInstance; + private final ProcessDefinition workflowDefinition; + private final User executeUser; + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryResult.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryResult.java new file mode 100644 index 0000000000..c75ed591c7 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryResult.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.failure.recovery; + +import org.apache.dolphinscheduler.api.executor.ExecuteResult; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class FailureRecoveryResult implements ExecuteResult { + + private final Integer commandId; +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java new file mode 100644 index 0000000000..83711660f8 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.pause; + +import org.apache.dolphinscheduler.api.enums.ExecuteType; +import org.apache.dolphinscheduler.api.executor.ExecuteFunction; +import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException; +import org.apache.dolphinscheduler.api.rpc.ApiRpcClient; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand; +import org.apache.dolphinscheduler.remote.exceptions.RemotingException; +import org.apache.dolphinscheduler.remote.utils.Host; + +public class PauseExecuteFunction implements ExecuteFunction { + + private final ProcessInstanceDao processInstanceDao; + + private final ApiRpcClient apiRpcClient; + + public PauseExecuteFunction(ProcessInstanceDao processInstanceDao, ApiRpcClient apiRpcClient) { + this.processInstanceDao = processInstanceDao; + this.apiRpcClient = apiRpcClient; + } + + @Override + public PauseExecuteResult execute(PauseExecuteRequest request) throws ExecuteRuntimeException { + ProcessInstance workflowInstance = request.getWorkflowInstance(); + if (!workflowInstance.getState().isRunning()) { + throw new ExecuteRuntimeException( + String.format("The workflow instance: %s status is %s, can not pause", workflowInstance.getName(), + workflowInstance.getState())); + } + workflowInstance.setCommandType(CommandType.PAUSE); + workflowInstance.addHistoryCmd(CommandType.PAUSE); + workflowInstance.setStateWithDesc(WorkflowExecutionStatus.READY_PAUSE, + CommandType.PAUSE.getDescp() + " by " + request.getExecuteUser().getUserName()); + + if (processInstanceDao.updateProcessInstance(workflowInstance) <= 0) { + throw new ExecuteRuntimeException( + String.format( + "The workflow instance: %s pause failed, due to update the workflow instance status in DB failed", + workflowInstance.getName())); + } + WorkflowStateEventChangeCommand workflowStateEventChangeCommand = new WorkflowStateEventChangeCommand( + workflowInstance.getId(), 0, workflowInstance.getState(), workflowInstance.getId(), 0); + try { + apiRpcClient.send(Host.of(workflowInstance.getHost()), workflowStateEventChangeCommand.convert2Command()); + } catch (RemotingException e) { + throw new ExecuteRuntimeException( + String.format( + "The workflow instance: %s pause failed, due to send rpc request to master: %s failed", + workflowInstance.getName(), workflowInstance.getHost()), + e); + } + return new PauseExecuteResult(workflowInstance); + } + + @Override + public ExecuteType getExecuteType() { + return PauseExecuteFunctionBuilder.EXECUTE_TYPE; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunctionBuilder.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunctionBuilder.java new file mode 100644 index 0000000000..2e8cf21afd --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunctionBuilder.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.pause; + +import org.apache.dolphinscheduler.api.enums.ExecuteType; +import org.apache.dolphinscheduler.api.executor.ExecuteContext; +import org.apache.dolphinscheduler.api.executor.ExecuteFunction; +import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder; +import org.apache.dolphinscheduler.api.rpc.ApiRpcClient; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; + +import java.util.concurrent.CompletableFuture; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class PauseExecuteFunctionBuilder implements ExecuteFunctionBuilder { + + public static final ExecuteType EXECUTE_TYPE = ExecuteType.PAUSE; + + @Autowired + private ProcessInstanceDao processInstanceDao; + @Autowired + private ApiRpcClient apiRpcClient; + + @Override + public CompletableFuture> createWorkflowInstanceExecuteFunction(ExecuteContext executeContext) { + return CompletableFuture.completedFuture(new PauseExecuteFunction(processInstanceDao, apiRpcClient)); + } + + @Override + public CompletableFuture createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) { + return CompletableFuture.completedFuture( + new PauseExecuteRequest( + executeContext.getWorkflowDefinition(), + executeContext.getWorkflowInstance(), + executeContext.getExecuteUser())); + } + + @Override + public ExecuteType getExecuteType() { + return EXECUTE_TYPE; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteRequest.java new file mode 100644 index 0000000000..02cb471961 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteRequest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.pause; + +import org.apache.dolphinscheduler.api.executor.ExecuteRequest; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.User; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class PauseExecuteRequest implements ExecuteRequest { + + private final ProcessDefinition processDefinition; + private final ProcessInstance workflowInstance; + private final User executeUser; +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteResult.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteResult.java new file mode 100644 index 0000000000..38bbc03c07 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteResult.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.pause; + +import org.apache.dolphinscheduler.api.executor.ExecuteResult; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; + +import lombok.AllArgsConstructor; + +@AllArgsConstructor +public class PauseExecuteResult implements ExecuteResult { + + private final ProcessInstance processInstance; +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunction.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunction.java new file mode 100644 index 0000000000..149e1abd29 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunction.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.recover; + +import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; + +import org.apache.dolphinscheduler.api.enums.ExecuteType; +import org.apache.dolphinscheduler.api.executor.ExecuteFunction; +import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.service.command.CommandService; + +import java.util.Map; + +import com.google.common.collect.ImmutableMap; + +public class RecoverExecuteFunction implements ExecuteFunction { + + private final CommandService commandService; + + public RecoverExecuteFunction(CommandService commandService) { + this.commandService = commandService; + } + + @Override + public RecoverExecuteResult execute(RecoverExecuteRequest request) throws ExecuteRuntimeException { + ProcessInstance workflowInstance = request.getWorkflowInstance(); + if (!workflowInstance.getState().isPause()) { + throw new ExecuteRuntimeException( + String.format("The workflow instance: %s state is %s, cannot recovery", workflowInstance.getName(), + workflowInstance.getState())); + } + Command command = Command.builder() + .commandType(CommandType.RECOVER_SUSPENDED_PROCESS) + .processDefinitionCode(workflowInstance.getProcessDefinitionCode()) + .processDefinitionVersion(workflowInstance.getProcessDefinitionVersion()) + .processInstanceId(workflowInstance.getId()) + .commandParam(JSONUtils.toJsonString(createCommandParam(workflowInstance))) + .executorId(request.getExecuteUser().getId()) + .testFlag(workflowInstance.getTestFlag()) + .build(); + if (commandService.createCommand(command) <= 0) { + throw new ExecuteRuntimeException( + String.format("Recovery workflow instance: %s failed, due to insert command to db failed", + workflowInstance.getName())); + } + return new RecoverExecuteResult(command); + } + + private Map createCommandParam(ProcessInstance workflowInstance) { + return new ImmutableMap.Builder() + .put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, workflowInstance.getId()) + .build(); + } + + @Override + public ExecuteType getExecuteType() { + return RecoverExecuteFunctionBuilder.EXECUTE_TYPE; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunctionBuilder.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunctionBuilder.java new file mode 100644 index 0000000000..cbd88a0d2d --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunctionBuilder.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.recover; + +import org.apache.dolphinscheduler.api.enums.ExecuteType; +import org.apache.dolphinscheduler.api.executor.ExecuteContext; +import org.apache.dolphinscheduler.api.executor.ExecuteFunction; +import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder; +import org.apache.dolphinscheduler.service.command.CommandService; + +import java.util.concurrent.CompletableFuture; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class RecoverExecuteFunctionBuilder + implements + ExecuteFunctionBuilder { + + public static final ExecuteType EXECUTE_TYPE = ExecuteType.RECOVER_SUSPENDED_PROCESS; + + @Autowired + private CommandService commandService; + + @Override + public CompletableFuture> createWorkflowInstanceExecuteFunction(ExecuteContext executeContext) { + return CompletableFuture.completedFuture( + new RecoverExecuteFunction(commandService)); + } + + @Override + public CompletableFuture createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) { + return CompletableFuture.completedFuture( + new RecoverExecuteRequest( + executeContext.getWorkflowInstance(), + executeContext.getWorkflowDefinition(), + executeContext.getExecuteUser())); + } + + @Override + public ExecuteType getExecuteType() { + return EXECUTE_TYPE; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteRequest.java new file mode 100644 index 0000000000..8f8b8f4e84 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteRequest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.recover; + +import org.apache.dolphinscheduler.api.executor.ExecuteRequest; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.User; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class RecoverExecuteRequest implements ExecuteRequest { + + private final ProcessInstance workflowInstance; + private final ProcessDefinition processDefinition; + private final User executeUser; + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteResult.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteResult.java new file mode 100644 index 0000000000..882174cddd --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteResult.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.recover; + +import org.apache.dolphinscheduler.api.executor.ExecuteResult; +import org.apache.dolphinscheduler.dao.entity.Command; + +import lombok.AllArgsConstructor; + +@AllArgsConstructor +public class RecoverExecuteResult implements ExecuteResult { + + private final Command command; +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunction.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunction.java new file mode 100644 index 0000000000..82c59b907f --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunction.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.rerun; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_PARAMS; + +import org.apache.dolphinscheduler.api.enums.ExecuteType; +import org.apache.dolphinscheduler.api.executor.ExecuteFunction; +import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.service.command.CommandService; + +import org.apache.commons.collections4.MapUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import com.fasterxml.jackson.core.type.TypeReference; + +public class RepeatRunningExecuteFunction implements ExecuteFunction { + + private final CommandService commandService; + + public RepeatRunningExecuteFunction(CommandService commandService) { + this.commandService = commandService; + } + + @Override + public RepeatRunningResult execute(RepeatRunningRequest request) throws ExecuteRuntimeException { + checkNotNull(request, "request cannot be null"); + // todo: check workflow definition valid? or we don't need to do this check, since we will check in master + // again. + // todo: check tenant valid? or we don't need to do this check, since we need to check in master again. + ProcessInstance workflowInstance = request.getWorkflowInstance(); + if (workflowInstance.getState() == null || !workflowInstance.getState().isFinished()) { + throw new ExecuteRuntimeException( + String.format("The workflow instance: %s status is %s, cannot repeat running", + workflowInstance.getName(), workflowInstance.getState())); + } + Command command = Command.builder() + .commandType(CommandType.REPEAT_RUNNING) + .commandParam(JSONUtils.toJsonString(createCommandParams(workflowInstance))) + .processDefinitionCode(workflowInstance.getProcessDefinitionCode()) + .processDefinitionVersion(workflowInstance.getProcessDefinitionVersion()) + .processInstanceId(workflowInstance.getId()) + .processInstancePriority(workflowInstance.getProcessInstancePriority()) + .testFlag(workflowInstance.getTestFlag()) + .build(); + if (commandService.createCommand(command) <= 0) { + throw new ExecuteRuntimeException( + String.format("Repeat running workflow instance: %s failed, due to insert command to db failed", + workflowInstance.getName())); + } + return new RepeatRunningResult(command.getId()); + } + + @Override + public ExecuteType getExecuteType() { + return RepeatRunningExecuteFunctionBuilder.EXECUTE_TYPE; + } + + private Map createCommandParams(ProcessInstance workflowInstance) { + Map commandMap = + JSONUtils.parseObject(workflowInstance.getCommandParam(), new TypeReference>() { + }); + Map repeatRunningCommandParams = new HashMap<>(); + Optional.ofNullable(MapUtils.getObject(commandMap, CMD_PARAM_START_PARAMS)) + .ifPresent(startParams -> repeatRunningCommandParams.put(CMD_PARAM_START_PARAMS, startParams)); + repeatRunningCommandParams.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, workflowInstance.getId()); + return repeatRunningCommandParams; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunctionBuilder.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunctionBuilder.java new file mode 100644 index 0000000000..f5363f90da --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunctionBuilder.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.rerun; + +import org.apache.dolphinscheduler.api.enums.ExecuteType; +import org.apache.dolphinscheduler.api.executor.ExecuteContext; +import org.apache.dolphinscheduler.api.executor.ExecuteFunction; +import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder; +import org.apache.dolphinscheduler.service.command.CommandService; + +import java.util.concurrent.CompletableFuture; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class RepeatRunningExecuteFunctionBuilder + implements + ExecuteFunctionBuilder { + + public static final ExecuteType EXECUTE_TYPE = ExecuteType.REPEAT_RUNNING; + + @Autowired + private CommandService commandService; + + @Override + public CompletableFuture> createWorkflowInstanceExecuteFunction(ExecuteContext executeContext) { + return CompletableFuture.completedFuture(new RepeatRunningExecuteFunction(commandService)); + } + + @Override + public CompletableFuture createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) { + return CompletableFuture.completedFuture( + new RepeatRunningRequest( + executeContext.getWorkflowInstance(), + executeContext.getWorkflowDefinition(), + executeContext.getExecuteUser())); + } + + @Override + public ExecuteType getExecuteType() { + return EXECUTE_TYPE; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningRequest.java new file mode 100644 index 0000000000..c0631190c2 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningRequest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.rerun; + +import org.apache.dolphinscheduler.api.executor.ExecuteRequest; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.User; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class RepeatRunningRequest implements ExecuteRequest { + + private final ProcessInstance workflowInstance; + + private final ProcessDefinition processDefinition; + + private final User executeUser; + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningResult.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningResult.java new file mode 100644 index 0000000000..9fa7ed0004 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningResult.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.rerun; + +import org.apache.dolphinscheduler.api.executor.ExecuteResult; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class RepeatRunningResult implements ExecuteResult { + + private final Integer commandId; +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java new file mode 100644 index 0000000000..2e37423e9a --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.stop; + +import org.apache.dolphinscheduler.api.enums.ExecuteType; +import org.apache.dolphinscheduler.api.executor.ExecuteFunction; +import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException; +import org.apache.dolphinscheduler.api.rpc.ApiRpcClient; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand; +import org.apache.dolphinscheduler.remote.exceptions.RemotingException; +import org.apache.dolphinscheduler.remote.utils.Host; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class StopExecuteFunction implements ExecuteFunction { + + private final ProcessInstanceDao processInstanceDao; + // todo: Use ApiRpcClient instead of NettyRemotingClient + private final ApiRpcClient apiRpcClient; + + public StopExecuteFunction(ProcessInstanceDao processInstanceDao, ApiRpcClient apiRpcClient) { + this.processInstanceDao = processInstanceDao; + this.apiRpcClient = apiRpcClient; + } + + @Override + public StopResult execute(StopRequest request) throws ExecuteRuntimeException { + ProcessInstance workflowInstance = request.getWorkflowInstance(); + + if (!workflowInstance.getState().canStop() + || workflowInstance.getState() == WorkflowExecutionStatus.READY_STOP) { + throw new ExecuteRuntimeException( + String.format("The workflow instance: %s status is %s, can not be stopped", + workflowInstance.getName(), workflowInstance.getState())); + } + // update the workflow instance's status to stop + workflowInstance.setCommandType(CommandType.STOP); + workflowInstance.addHistoryCmd(CommandType.STOP); + workflowInstance.setStateWithDesc(WorkflowExecutionStatus.READY_STOP, CommandType.STOP.getDescp() + " by user"); + if (processInstanceDao.updateProcessInstance(workflowInstance) > 0) { + log.info("Workflow instance {} ready to stop success, will call master to stop the workflow instance", + workflowInstance.getName()); + // todo: Use specific stop command instead of WorkflowStateEventChangeCommand + WorkflowStateEventChangeCommand workflowStateEventChangeCommand = new WorkflowStateEventChangeCommand( + workflowInstance.getId(), 0, workflowInstance.getState(), workflowInstance.getId(), 0); + try { + apiRpcClient.send(Host.of(workflowInstance.getHost()), + workflowStateEventChangeCommand.convert2Command()); + } catch (RemotingException e) { + throw new ExecuteRuntimeException( + String.format("Workflow instance: %s stop failed, due to send request to master: %s failed", + workflowInstance.getName(), workflowInstance.getHost()), + e); + } + // todo: use async and inject the completeFuture in the result. + return new StopResult(workflowInstance); + } + throw new ExecuteRuntimeException( + "Workflow instance stop failed, due to update the workflow instance status failed"); + } + + @Override + public ExecuteType getExecuteType() { + return StopExecuteFunctionBuilder.EXECUTE_TYPE; + } + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunctionBuilder.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunctionBuilder.java new file mode 100644 index 0000000000..2fc06493f0 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunctionBuilder.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.stop; + +import org.apache.dolphinscheduler.api.enums.ExecuteType; +import org.apache.dolphinscheduler.api.executor.ExecuteContext; +import org.apache.dolphinscheduler.api.executor.ExecuteFunction; +import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder; +import org.apache.dolphinscheduler.api.rpc.ApiRpcClient; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; + +import java.util.concurrent.CompletableFuture; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class StopExecuteFunctionBuilder implements ExecuteFunctionBuilder { + + public static final ExecuteType EXECUTE_TYPE = ExecuteType.STOP; + + @Autowired + private ProcessInstanceDao processInstanceDao; + @Autowired + private ApiRpcClient apiRpcClient; + + @Override + public CompletableFuture> createWorkflowInstanceExecuteFunction(ExecuteContext executeContext) { + return CompletableFuture.completedFuture(new StopExecuteFunction(processInstanceDao, apiRpcClient)); + } + + @Override + public CompletableFuture createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) { + return CompletableFuture.completedFuture(new StopRequest(executeContext.getWorkflowInstance())); + } + + @Override + public ExecuteType getExecuteType() { + return EXECUTE_TYPE; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopRequest.java new file mode 100644 index 0000000000..55a4b3a108 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopRequest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.stop; + +import org.apache.dolphinscheduler.api.executor.ExecuteRequest; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NonNull; + +@Data +@AllArgsConstructor +public class StopRequest implements ExecuteRequest { + + @NonNull + private final ProcessInstance workflowInstance; +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopResult.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopResult.java new file mode 100644 index 0000000000..cf7ddb237e --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopResult.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.workflow.instance.stop; + +import org.apache.dolphinscheduler.api.executor.ExecuteResult; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NonNull; + +@Data +@AllArgsConstructor +public class StopResult implements ExecuteResult { + + @NonNull + private final ProcessInstance workflowInstance; +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/rpc/ApiRpcClient.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/rpc/ApiRpcClient.java new file mode 100644 index 0000000000..17f24dc67e --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/rpc/ApiRpcClient.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.rpc; + +import org.apache.dolphinscheduler.remote.NettyRemotingClient; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.exceptions.RemotingException; +import org.apache.dolphinscheduler.remote.utils.Host; + +import org.springframework.stereotype.Component; + +@Component +public class ApiRpcClient { + + private final NettyRemotingClient nettyRemotingClient; + + public ApiRpcClient() { + this.nettyRemotingClient = new NettyRemotingClient(new NettyClientConfig()); + } + + public void send(Host host, Command command) throws RemotingException { + nettyRemotingClient.send(host, command); + } + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 0fc682d403..12cc8a1829 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.User; import java.util.List; import java.util.Map; +import java.util.Optional; import javax.servlet.http.HttpServletResponse; @@ -146,18 +147,22 @@ public interface ProcessDefinitionService { * Get resource workflow * * @param loginUser login user - * @param code process definition code + * @param code process definition code * @return Process definition Object */ ProcessDefinition getProcessDefinition(User loginUser, long code); + Optional queryWorkflowDefinition(long workflowDefinitionCode, int workflowDefinitionVersion); + ProcessDefinition queryWorkflowDefinitionThrowExceptionIfNotFound(long workflowDefinitionCode, + int workflowDefinitionVersion); + /** * query detail of process definition * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param name process definition name + * @param name process definition name * @return process definition detail */ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index eb760722fa..f866fc9313 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -57,6 +57,8 @@ public interface ProcessInstanceService { long projectCode, Integer processId); + ProcessInstance queryByWorkflowInstanceIdThrowExceptionIfNotFound(Integer processId); + /** * query process instance by id * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java index 54a66c79ec..61e8540eb8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.api.service; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.User; @@ -66,7 +67,9 @@ public interface ProjectService { */ Map checkProjectAndAuth(User loginUser, Project project, long projectCode, String perm); - void checkProjectAndAuthThrowException(User loginUser, Project project, String permission); + void checkProjectAndAuthThrowException(User loginUser, Project project, String permission) throws ServiceException; + + void checkProjectAndAuthThrowException(User loginUser, long projectCode, String permission) throws ServiceException; boolean hasProjectAndPerm(User loginUser, Project project, Map result, String perm); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 533a254eea..3f2a5f87df 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.api.service.impl; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START; import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; @@ -34,8 +36,11 @@ import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteRespo import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; +import org.apache.dolphinscheduler.api.executor.ExecuteClient; +import org.apache.dolphinscheduler.api.executor.ExecuteContext; import org.apache.dolphinscheduler.api.service.ExecutorService; import org.apache.dolphinscheduler.api.service.MonitorService; +import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.WorkerGroupService; import org.apache.dolphinscheduler.common.constants.Constants; @@ -91,7 +96,6 @@ import org.apache.dolphinscheduler.service.process.TriggerRelationService; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import java.time.ZonedDateTime; @@ -101,6 +105,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -111,7 +116,6 @@ import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.Lists; /** @@ -144,10 +148,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ private ProcessService processService; @Autowired - private CommandService commandService; + private ProcessInstanceDao processInstanceDao; @Autowired - private ProcessInstanceDao processInstanceDao; + private ProcessDefinitionService processDefinitionService; + + @Autowired + private CommandService commandService; @Autowired private TaskDefinitionLogMapper taskDefinitionLogMapper; @@ -169,6 +176,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ @Autowired private TriggerRelationService triggerRelationService; + + @Autowired + private ExecuteClient executeClient; + /** * execute process instance * @@ -236,15 +247,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return result; } - if (!checkScheduleTimeNum(commandType, cronTime)) { - putMsg(result, Status.SCHEDULE_TIME_NUMBER); - return result; - } - - // check master exists - if (!checkMasterExists(result)) { - return result; - } + checkScheduleTimeNumExceed(commandType, cronTime); + checkMasterExists(); long triggerCode = CodeGenerateUtils.getInstance().genCode(); @@ -275,46 +279,31 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return result; } - /** - * check whether master exists - * - * @param result result - * @return master exists return true , otherwise return false - */ - private boolean checkMasterExists(Map result) { + private void checkMasterExists() { // check master server exists List masterServers = monitorService.getServerListFromRegistry(true); // no master if (masterServers.isEmpty()) { - log.error("Master does not exist."); - putMsg(result, Status.MASTER_NOT_EXISTS); - return false; + throw new ServiceException(Status.MASTER_NOT_EXISTS); } - return true; } - /** - * @param complementData - * @param cronTime - * @return CommandType is COMPLEMENT_DATA and cronTime's number is not greater than 100 return true , otherwise return false - */ - private boolean checkScheduleTimeNum(CommandType complementData, String cronTime) { + private void checkScheduleTimeNumExceed(CommandType complementData, String cronTime) { if (!CommandType.COMPLEMENT_DATA.equals(complementData)) { - return true; + return; } if (cronTime == null) { - return true; + return; } Map cronMap = JSONUtils.toMap(cronTime); if (cronMap.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { String[] stringDates = cronMap.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).split(COMMA); if (stringDates.length > SCHEDULE_TIME_MAX_LENGTH) { log.warn("Parameter cornTime is bigger than {}.", SCHEDULE_TIME_MAX_LENGTH); - return false; + throw new ServiceException(Status.SCHEDULE_TIME_NUMBER_EXCEED); } } - return true; } /** @@ -322,19 +311,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * * @param projectCode project code * @param processDefinition process definition - * @param processDefineCode process definition code - * @param version process instance version */ @Override public void checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, long processDefineCode, Integer version) { // check process definition exists - if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { - throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefineCode)); + if (projectCode != processDefinition.getProjectCode()) { + throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, processDefinition.getCode()); } // check process definition online if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { - throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(processDefineCode), version); + throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getCode(), + processDefinition.getVersion()); } // check sub process definition online if (!checkSubProcessDefinitionValid(processDefinition)) { @@ -381,8 +369,6 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ /** * do action to process instance:pause, stop, repeat, recover from pause, recover from stop,rerun failed task - - * * @param loginUser login user * @param projectCode project code @@ -391,103 +377,35 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @return execute result code */ @Override - public Map execute(User loginUser, long projectCode, Integer processInstanceId, + public Map execute(User loginUser, + long projectCode, + Integer processInstanceId, ExecuteType executeType) { - Project project = projectMapper.queryByCode(projectCode); - // check user access for project + checkNotNull(processInstanceId, "workflowInstanceId cannot be null"); + checkNotNull(executeType, "executeType cannot be null"); - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode, + // check user access for project + projectService.checkProjectAndAuthThrowException(loginUser, projectCode, ApiFuncIdentificationConstant.map.get(executeType)); - if (result.get(Constants.STATUS) != Status.SUCCESS) { - return result; - } + checkMasterExists(); - // check master exists - if (!checkMasterExists(result)) { - return result; - } + ProcessInstance workflowInstance = + Optional.ofNullable(processInstanceDao.queryByWorkflowInstanceId(processInstanceId)) + .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId)); - ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId) - .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId)); + checkState(workflowInstance.getProjectCode() == projectCode, + "The workflow instance's project code doesn't equals to the given project"); + ProcessDefinition processDefinition = processDefinitionService.queryWorkflowDefinitionThrowExceptionIfNotFound( + workflowInstance.getProcessDefinitionCode(), workflowInstance.getProcessDefinitionVersion()); - ProcessDefinition processDefinition = - processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion()); - processDefinition.setReleaseState(ReleaseState.ONLINE); - if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) { - this.checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion()); - } + executeClient.executeWorkflowInstance(new ExecuteContext( + workflowInstance, + processDefinition, + loginUser, + executeType)); - result = checkExecuteType(processInstance, executeType); - if (result.get(Constants.STATUS) != Status.SUCCESS) { - return result; - } - if (!checkTenantSuitable(processDefinition)) { - log.error( - "There is not any valid tenant for the process definition, processDefinitionId:{}, processDefinitionCode:{}, ", - processDefinition.getId(), processDefinition.getName()); - putMsg(result, Status.TENANT_NOT_SUITABLE); - } - - // get the startParams user specified at the first starting while repeat running is needed - Map commandMap = - JSONUtils.parseObject(processInstance.getCommandParam(), new TypeReference>() { - }); - String startParams = null; - if (MapUtils.isNotEmpty(commandMap) && executeType == ExecuteType.REPEAT_RUNNING) { - Object startParamsJson = commandMap.get(CMD_PARAM_START_PARAMS); - if (startParamsJson != null) { - startParams = startParamsJson.toString(); - } - } - - switch (executeType) { - case REPEAT_RUNNING: - result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), - processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams, - processInstance.getTestFlag()); - break; - case RECOVER_SUSPENDED_PROCESS: - result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), - processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams, - processInstance.getTestFlag()); - break; - case START_FAILURE_TASK_PROCESS: - result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), - processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams, - processInstance.getTestFlag()); - break; - case STOP: - if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP) { - log.warn("Process instance status is already {}, processInstanceName:{}.", - WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName()); - putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), - processInstance.getState()); - } else { - result = - updateProcessInstancePrepare(processInstance, CommandType.STOP, - WorkflowExecutionStatus.READY_STOP); - } - break; - case PAUSE: - if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) { - log.warn("Process instance status is already {}, processInstanceName:{}.", - WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName()); - putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), - processInstance.getState()); - } else { - result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, - WorkflowExecutionStatus.READY_PAUSE); - } - break; - default: - log.warn("Unknown execute type for process instance, processInstanceId:{}.", - processInstance.getId()); - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type"); - - break; - } + Map result = new HashMap<>(); + result.put(Constants.STATUS, Status.SUCCESS); return result; } @@ -504,10 +422,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ @Override public Map execute(User loginUser, Integer workflowInstanceId, ExecuteType executeType) { ProcessInstance processInstance = processInstanceMapper.selectById(workflowInstanceId); - ProcessDefinition processDefinition = - processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); - - return execute(loginUser, processDefinition.getProjectCode(), workflowInstanceId, executeType); + return execute(loginUser, processInstance.getProjectCode(), workflowInstanceId, executeType); } /** @@ -633,10 +548,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return result; } - // check master exists - if (!checkMasterExists(result)) { - return result; - } + checkMasterExists(); return forceStart(processInstance, taskGroupQueue); } @@ -773,62 +685,6 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return result; } - /** - * insert command, used in the implementation of the page, rerun, recovery (pause / failure) execution - * - * @param loginUser login user - * @param instanceId instance id - * @param processDefinitionCode process definition code - * @param processVersion - * @param commandType command type - * @return insert result code - */ - private Map insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, - int processVersion, CommandType commandType, String startParams, - int testFlag) { - Map result = new HashMap<>(); - - // To add startParams only when repeat running is needed - Map cmdParam = new HashMap<>(); - cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId); - if (!StringUtils.isEmpty(startParams)) { - cmdParam.put(CMD_PARAM_START_PARAMS, startParams); - } - - Command command = new Command(); - command.setCommandType(commandType); - command.setProcessDefinitionCode(processDefinitionCode); - command.setCommandParam(JSONUtils.toJsonString(cmdParam)); - command.setExecutorId(loginUser.getId()); - command.setProcessDefinitionVersion(processVersion); - command.setProcessInstanceId(instanceId); - command.setTestFlag(testFlag); - if (!commandService.verifyIsNeedCreateCommand(command)) { - log.warn( - "Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.", - processDefinitionCode, processVersion, instanceId); - putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(processDefinitionCode)); - return result; - } - - log.info("Creating command, commandInfo:{}.", command); - int create = commandService.createCommand(command); - - if (create > 0) { - log.info("Create {} command complete, processDefinitionCode:{}, processDefinitionVersion:{}.", - command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion); - putMsg(result, Status.SUCCESS); - } else { - log.error( - "Execute process instance failed because create {} command error, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.", - command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion, - instanceId); - putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR); - } - - return result; - } - /** * check whether sub processes are offline before starting process definition * @@ -1319,11 +1175,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return result; } - // check master exists - if (!checkMasterExists(result)) { - return result; - } - + checkMasterExists(); // todo dispatch improvement List masterServerList = monitorService.getServerListFromRegistry(true); Host host = new Host(masterServerList.get(0).getHost(), masterServerList.get(0).getPort()); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index c7e36efb3a..b8def57492 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -236,6 +236,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro @Autowired private TaskDefinitionMapper taskDefinitionMapper; + @Lazy @Autowired private SchedulerService schedulerService; @@ -724,6 +725,25 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return processDefinition; } + @Override + public Optional queryWorkflowDefinition(long workflowDefinitionCode, + int workflowDefinitionVersion) { + ProcessDefinition workflowDefinition = processDefinitionDao.queryByCode(workflowDefinitionCode).orElse(null); + if (workflowDefinition == null || workflowDefinition.getVersion() != workflowDefinitionVersion) { + workflowDefinition = processDefinitionLogDao.queryProcessDefinitionLog(workflowDefinitionCode, + workflowDefinitionVersion); + } + return Optional.ofNullable(workflowDefinition); + } + + @Override + public ProcessDefinition queryWorkflowDefinitionThrowExceptionIfNotFound(long workflowDefinitionCode, + int workflowDefinitionVersion) { + return queryWorkflowDefinition(workflowDefinitionCode, workflowDefinitionVersion) + .orElseThrow(() -> new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, + String.valueOf(workflowDefinitionCode))); + } + @Override public Map queryProcessDefinitionByName(User loginUser, long projectCode, String name) { Project project = projectMapper.queryByCode(projectCode); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 9d151e528f..2f7417a4cd 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -278,6 +278,15 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } + @Override + public ProcessInstance queryByWorkflowInstanceIdThrowExceptionIfNotFound(Integer workflowInstanceId) { + ProcessInstance processInstance = processInstanceDao.queryByWorkflowInstanceId(workflowInstanceId); + if (processInstance == null) { + throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, workflowInstanceId); + } + return processInstance; + } + /** * query workflow instance by id * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java index bd1349f4fa..35aeacf798 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java @@ -242,6 +242,12 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic } } + @Override + public void checkProjectAndAuthThrowException(User loginUser, long projectCode, String permission) { + Project project = projectMapper.queryByCode(projectCode); + checkProjectAndAuthThrowException(loginUser, project, permission); + } + @Override public boolean hasProjectAndPerm(User loginUser, Project project, Map result, String permission) { boolean checkResult = false; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java similarity index 99% rename from dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java rename to dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java index 68ec7a5c0d..2f5cfb960a 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java @@ -55,7 +55,7 @@ import com.google.gson.JsonObject; /** * executor controller test */ -public class ExecutorControllerTest extends AbstractControllerTest { +public class ExecuteFunctionControllerTest extends AbstractControllerTest { final Gson gson = new Gson(); final long projectCode = 1L; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java similarity index 94% rename from dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java rename to dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java index fe19b5b402..21630b0e1d 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java @@ -31,8 +31,8 @@ import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteRespo import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; +import org.apache.dolphinscheduler.api.executor.ExecuteClient; import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService; -import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.common.constants.Constants; @@ -64,6 +64,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.TriggerRelationService; @@ -96,11 +97,9 @@ import org.slf4j.LoggerFactory; */ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) -public class ExecutorServiceTest { +public class ExecuteFunctionServiceTest { - private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceTest.class); - - private static final Logger baseServiceLogger = LoggerFactory.getLogger(BaseServiceImpl.class); + private static final Logger logger = LoggerFactory.getLogger(ExecuteFunctionServiceTest.class); @Mock private ResourcePermissionCheckService resourcePermissionCheckService; @@ -147,6 +146,15 @@ public class ExecutorServiceTest { @Mock private TriggerRelationService triggerRelationService; + @Mock + private ExecuteClient executeClient; + + @Mock + private ProcessInstanceDao processInstanceDao; + + @Mock + private ProcessDefinitionService processDefinitionService; + private int processDefinitionId = 1; private int processDefinitionVersion = 1; @@ -195,6 +203,7 @@ public class ExecutorServiceTest { // processInstance processInstance.setId(processInstanceId); + processInstance.setProjectCode(projectCode); processInstance.setState(WorkflowExecutionStatus.FAILURE); processInstance.setExecutorId(userId); processInstance.setTenantId(tenantId); @@ -453,25 +462,39 @@ public class ExecutorServiceTest { public void testNoMasterServers() { Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(new ArrayList<>()); - Map result = executorService.execProcessInstance(loginUser, projectCode, + Assertions.assertThrows(ServiceException.class, () -> executorService.execProcessInstance( + loginUser, + projectCode, processDefinitionCode, "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.COMPLEMENT_DATA, - null, null, - null, null, null, + null, + null, + null, + null, + null, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, + Priority.LOW, + Constants.DEFAULT_WORKER_GROUP, + 100L, + 110, + null, + 0, + Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, - ComplementDependentMode.OFF_MODE, null); - Assertions.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS); + ComplementDependentMode.OFF_MODE, + null)); } @Test public void testExecuteRepeatRunning() { - Mockito.when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN)) + when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true); + when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN)) .thenReturn(checkProjectAndAuth()); + when(processInstanceDao.queryByWorkflowInstanceId(processInstanceId)).thenReturn(processInstance); + when(processDefinitionService.queryWorkflowDefinitionThrowExceptionIfNotFound(processDefinitionCode, + processDefinitionVersion)).thenReturn(processDefinition); Map result = executorService.execute(loginUser, projectCode, processInstanceId, ExecuteType.REPEAT_RUNNING); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java index 290be4377c..12d0a69fb7 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java @@ -25,7 +25,10 @@ import org.apache.dolphinscheduler.common.enums.WarningType; import java.util.Date; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; @@ -33,6 +36,9 @@ import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor @TableName("t_ds_command") public class Command { @@ -55,10 +61,10 @@ public class Command { private String commandParam; @TableField("task_depend_type") - private TaskDependType taskDependType; + private TaskDependType taskDependType = TaskDependType.TASK_POST; @TableField("failure_strategy") - private FailureStrategy failureStrategy; + private FailureStrategy failureStrategy = FailureStrategy.CONTINUE; @TableField("warning_type") private WarningType warningType; @@ -70,13 +76,13 @@ public class Command { private Date scheduleTime; @TableField("start_time") - private Date startTime; + private Date startTime = new Date(); @TableField("process_instance_priority") private Priority processInstancePriority; @TableField("update_time") - private Date updateTime; + private Date updateTime = new Date(); @TableField("worker_group") private String workerGroup; @@ -99,13 +105,6 @@ public class Command { @TableField("test_flag") private int testFlag; - public Command() { - this.taskDependType = TaskDependType.TASK_POST; - this.failureStrategy = FailureStrategy.CONTINUE; - this.startTime = new Date(); - this.updateTime = new Date(); - } - public Command( CommandType commandType, TaskDependType taskDependType, diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java index 7ba19d6892..4f32d142cd 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java @@ -17,7 +17,11 @@ package org.apache.dolphinscheduler.dao.repository; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; + public interface ProcessDefinitionLogDao { + ProcessDefinitionLog queryProcessDefinitionLog(long workflowDefinitionCode, int workflowDefinitionVersion); + void deleteByWorkflowDefinitionCode(long workflowDefinitionCode); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java index f9002c823d..b249e265e7 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java @@ -38,4 +38,5 @@ public interface ProcessInstanceDao { void deleteById(Integer workflowInstanceId); + ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java index 47a1747c4f..0f757193fb 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.dao.repository.impl; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao; @@ -29,6 +30,12 @@ public class ProcessDefinitionLogDaoImpl implements ProcessDefinitionLogDao { @Autowired private ProcessDefinitionLogMapper processDefinitionLogMapper; + @Override + public ProcessDefinitionLog queryProcessDefinitionLog(long workflowDefinitionCode, int workflowDefinitionVersion) { + return processDefinitionLogMapper.queryByDefinitionCodeAndVersion(workflowDefinitionCode, + workflowDefinitionVersion); + } + @Override public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) { processDefinitionLogMapper.deleteByProcessDefinitionCode(workflowDefinitionCode); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java index 2832e0a3f4..ae36ca924e 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java @@ -69,4 +69,9 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao { public void deleteById(Integer workflowInstanceId) { processInstanceMapper.deleteById(workflowInstanceId); } + + @Override + public ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId) { + return processInstanceMapper.selectById(workflowInstanceId); + } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ShellUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ShellUtils.java index 7988bd0459..595823f7b9 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ShellUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ShellUtils.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.plugin.task.api.utils; import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.commons.lang3.StringUtils; + import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -30,8 +32,9 @@ import lombok.experimental.UtilityClass; public class ShellUtils { public List ENV_SOURCE_LIST = Arrays.stream( - Optional.ofNullable(PropertyUtils.getString("shell.env_source_list")) - .map(s -> s.split(",")).orElse(new String[0])) + Optional.ofNullable(PropertyUtils.getString("shell.env_source_list")).map(s -> s.split(",")) + .orElse(new String[0])) .map(String::trim) + .filter(StringUtils::isNotBlank) .collect(Collectors.toList()); }