Browse Source

Add execute function to handle the workflow instance operation (#13610)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
4351a25f2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
  2. 35
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java
  3. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  4. 62
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteClient.java
  5. 51
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteContext.java
  6. 37
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunction.java
  7. 32
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunctionBuilder.java
  8. 22
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRequest.java
  9. 22
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteResult.java
  10. 46
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRuntimeException.java
  11. 64
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunction.java
  12. 59
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunctionBuilder.java
  13. 36
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryRequest.java
  14. 30
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryResult.java
  15. 80
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java
  16. 60
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunctionBuilder.java
  17. 35
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteRequest.java
  18. 29
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteResult.java
  19. 78
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunction.java
  20. 60
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunctionBuilder.java
  21. 36
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteRequest.java
  22. 29
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteResult.java
  23. 93
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunction.java
  24. 59
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunctionBuilder.java
  25. 38
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningRequest.java
  26. 30
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningResult.java
  27. 87
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java
  28. 56
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunctionBuilder.java
  29. 33
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopRequest.java
  30. 33
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopResult.java
  31. 41
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/rpc/ApiRpcClient.java
  32. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  33. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  34. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
  35. 250
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  36. 20
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  37. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  38. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
  39. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java
  40. 49
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
  41. 21
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
  42. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java
  43. 1
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
  44. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java
  45. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
  46. 7
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ShellUtils.java

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java

@ -306,8 +306,6 @@ public class ExecutorController extends BaseController {
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam("processInstanceId") Integer processInstanceId,
@RequestParam("executeType") ExecuteType executeType) {
log.info("Start to execute process instance, projectCode:{}, processInstanceId:{}.", projectCode,
processInstanceId);
Map<String, Object> result = execService.execute(loginUser, projectCode, processInstanceId, executeType);
return returnDataList(result);
}

35
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java

@ -30,14 +30,43 @@ public enum ExecuteType {
* 4 stop
* 5 pause
*/
NONE, REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE, EXECUTE_TASK;
NONE(0, "NONE"),
// ******************************* Workflow ***************************
REPEAT_RUNNING(1, "REPEAT_RUNNING"),
RECOVER_SUSPENDED_PROCESS(2, "RECOVER_SUSPENDED_PROCESS"),
START_FAILURE_TASK_PROCESS(3, "START_FAILURE_TASK_PROCESS"),
STOP(4, "STOP"),
PAUSE(5, "PAUSE"),
// ******************************* Workflow ***************************
// ******************************* Task *******************************
EXECUTE_TASK(6, "EXECUTE_TASK"),
// ******************************* Task *******************************
;
private final int code;
private final String desc;
ExecuteType(int code, String desc) {
this.code = code;
this.desc = desc;
}
public int getCode() {
return code;
}
public String getDesc() {
return desc;
}
public static ExecuteType getEnum(int value) {
for (ExecuteType e : ExecuteType.values()) {
if (e.ordinal() == value) {
if (e.getCode() == value) {
return e;
}
}
return null;
return NONE;
}
}

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -532,7 +532,7 @@ public enum Status {
NO_CURRENT_OPERATING_PERMISSION(1400001, "The current user does not have this permission.", "当前用户无此权限"),
FUNCTION_DISABLED(1400002, "The current feature is disabled.", "当前功能已被禁用"),
SCHEDULE_TIME_NUMBER(1400003, "The number of complement dates exceed 100.", "补数日期个数超过100"),
SCHEDULE_TIME_NUMBER_EXCEED(1400003, "The number of complement dates exceed 100.", "补数日期个数超过100"),
DESCRIPTION_TOO_LONG_ERROR(1400004, "description is too long error", "描述过长"),
DELETE_WORKER_GROUP_BY_ID_FAIL_ENV(1400005,
"delete worker group fail, for there are [{0}] enviroments using:{1}", "删除工作组失败,有 [{0}] 个环境正在使用:{1}");

62
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteClient.java

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.stereotype.Component;
/**
* This is the main class for executing workflow/workflowInstance/tasks.
* <pre>
* ExecuteContext executeContext = ExecuteContext.builder()
* .processInstance(processInstance)
* .executeType(...)
* .build();
* executeClient.execute(executeContext);
* </pre>
*/
@Component
@SuppressWarnings("unchecked")
public class ExecuteClient {
private final Map<ExecuteType, ExecuteFunctionBuilder> executorFunctionBuilderMap;
public ExecuteClient(List<ExecuteFunctionBuilder> executeFunctionBuilders) {
executorFunctionBuilderMap = executeFunctionBuilders.stream()
.collect(Collectors.toMap(ExecuteFunctionBuilder::getExecuteType, Function.identity()));
}
public ExecuteResult executeWorkflowInstance(ExecuteContext executeContext) throws ExecuteRuntimeException {
ExecuteFunctionBuilder<ExecuteRequest, ExecuteResult> executeFunctionBuilder = checkNotNull(
executorFunctionBuilderMap.get(executeContext.getExecuteType()),
String.format("The executeType: %s is not supported", executeContext.getExecuteType()));
return executeFunctionBuilder.createWorkflowInstanceExecuteFunction(executeContext)
.thenCombine(executeFunctionBuilder.createWorkflowInstanceExecuteRequest(executeContext),
ExecuteFunction::execute)
.join();
}
}

51
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteContext.java

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import lombok.Data;
// todo: to be interface
@Data
public class ExecuteContext {
private final ProcessInstance workflowInstance;
private final ProcessDefinition workflowDefinition;
private final User executeUser;
private final ExecuteType executeType;
public ExecuteContext(ProcessInstance workflowInstance,
ProcessDefinition workflowDefinition,
User executeUser,
ExecuteType executeType) {
this.workflowInstance = checkNotNull(workflowInstance, "workflowInstance cannot be null");
this.workflowDefinition = checkNotNull(workflowDefinition, "workflowDefinition cannot be null");
this.executeUser = checkNotNull(executeUser, "executeUser cannot be null");
this.executeType = checkNotNull(executeType, "executeType cannot be null");
}
}

37
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunction.java

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
public interface ExecuteFunction<Request extends ExecuteRequest, Result extends ExecuteResult> {
/**
* Execute the workflow by the given request.
*
* @param request execute request
* @return execute result
* @throws ExecuteRuntimeException If there is an exception during execution, it will be thrown.
*/
Result execute(Request request) throws ExecuteRuntimeException;
/**
* @return the type of the executor
*/
ExecuteType getExecuteType();
}

32
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteFunctionBuilder.java

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import java.util.concurrent.CompletableFuture;
public interface ExecuteFunctionBuilder<Request extends ExecuteRequest, Result extends ExecuteResult> {
CompletableFuture<ExecuteFunction<Request, Result>> createWorkflowInstanceExecuteFunction(ExecuteContext executeContext);
CompletableFuture<Request> createWorkflowInstanceExecuteRequest(ExecuteContext executeContext);
ExecuteType getExecuteType();
}

22
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRequest.java

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor;
public interface ExecuteRequest {
}

22
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteResult.java

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor;
public interface ExecuteResult {
}

46
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/ExecuteRuntimeException.java

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor;
// todo: implement from DolphinSchedulerRuntimeException
public class ExecuteRuntimeException extends RuntimeException {
private static final long serialVersionUID = 1L;
private static final String EXECUTE_WORKFLOW_INSTANCE_ERROR =
"Execute workflow instance %s failed, execute type is %s";
public ExecuteRuntimeException(String message) {
super(message);
}
public ExecuteRuntimeException(String message, Throwable cause) {
super(message, cause);
}
public static ExecuteRuntimeException executeWorkflowInstanceError(ExecuteContext executeContext) {
return executeWorkflowInstanceError(executeContext, null);
}
public static ExecuteRuntimeException executeWorkflowInstanceError(ExecuteContext executeContext, Throwable cause) {
return new ExecuteRuntimeException(
String.format(EXECUTE_WORKFLOW_INSTANCE_ERROR, executeContext.getWorkflowInstance().getName(),
executeContext.getExecuteType()),
cause);
}
}

64
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunction.java

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.failure.recovery;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.service.command.CommandService;
public class FailureRecoveryExecuteFunction implements ExecuteFunction<FailureRecoveryRequest, FailureRecoveryResult> {
private final CommandService commandService;
public FailureRecoveryExecuteFunction(CommandService commandService) {
this.commandService = commandService;
}
@Override
public FailureRecoveryResult execute(FailureRecoveryRequest request) throws ExecuteRuntimeException {
ProcessInstance workflowInstance = request.getWorkflowInstance();
if (!workflowInstance.getState().isFailure()) {
throw new ExecuteRuntimeException(
String.format("The workflow instance: %s status is %s, can not be recovered",
workflowInstance.getName(), workflowInstance.getState()));
}
Command command = Command.builder()
.commandType(CommandType.START_FAILURE_TASK_PROCESS)
.processDefinitionCode(workflowInstance.getProcessDefinitionCode())
.processDefinitionVersion(workflowInstance.getProcessDefinitionVersion())
.processInstanceId(workflowInstance.getId())
.executorId(request.getExecuteUser().getId())
.testFlag(workflowInstance.getTestFlag())
.build();
if (commandService.createCommand(command) <= 0) {
throw new ExecuteRuntimeException(
"Failure recovery workflow instance failed, due to insert command to db failed");
}
return new FailureRecoveryResult(command.getId());
}
@Override
public ExecuteType getExecuteType() {
return FailureRecoveryExecuteFunctionBuilder.EXECUTE_TYPE;
}
}

59
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryExecuteFunctionBuilder.java

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.failure.recovery;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.executor.ExecuteContext;
import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder;
import org.apache.dolphinscheduler.service.command.CommandService;
import java.util.concurrent.CompletableFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class FailureRecoveryExecuteFunctionBuilder
implements
ExecuteFunctionBuilder<FailureRecoveryRequest, FailureRecoveryResult> {
public static final ExecuteType EXECUTE_TYPE = ExecuteType.START_FAILURE_TASK_PROCESS;
@Autowired
private CommandService commandService;
@Override
public CompletableFuture<ExecuteFunction<FailureRecoveryRequest, FailureRecoveryResult>> createWorkflowInstanceExecuteFunction(ExecuteContext executeContext) {
return CompletableFuture.completedFuture(new FailureRecoveryExecuteFunction(commandService));
}
@Override
public CompletableFuture<FailureRecoveryRequest> createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) {
return CompletableFuture.completedFuture(
new FailureRecoveryRequest(
executeContext.getWorkflowInstance(),
executeContext.getWorkflowDefinition(),
executeContext.getExecuteUser()));
}
@Override
public ExecuteType getExecuteType() {
return EXECUTE_TYPE;
}
}

36
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryRequest.java

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.failure.recovery;
import org.apache.dolphinscheduler.api.executor.ExecuteRequest;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class FailureRecoveryRequest implements ExecuteRequest {
private final ProcessInstance workflowInstance;
private final ProcessDefinition workflowDefinition;
private final User executeUser;
}

30
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/failure/recovery/FailureRecoveryResult.java

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.failure.recovery;
import org.apache.dolphinscheduler.api.executor.ExecuteResult;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class FailureRecoveryResult implements ExecuteResult {
private final Integer commandId;
}

80
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.pause;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
public class PauseExecuteFunction implements ExecuteFunction<PauseExecuteRequest, PauseExecuteResult> {
private final ProcessInstanceDao processInstanceDao;
private final ApiRpcClient apiRpcClient;
public PauseExecuteFunction(ProcessInstanceDao processInstanceDao, ApiRpcClient apiRpcClient) {
this.processInstanceDao = processInstanceDao;
this.apiRpcClient = apiRpcClient;
}
@Override
public PauseExecuteResult execute(PauseExecuteRequest request) throws ExecuteRuntimeException {
ProcessInstance workflowInstance = request.getWorkflowInstance();
if (!workflowInstance.getState().isRunning()) {
throw new ExecuteRuntimeException(
String.format("The workflow instance: %s status is %s, can not pause", workflowInstance.getName(),
workflowInstance.getState()));
}
workflowInstance.setCommandType(CommandType.PAUSE);
workflowInstance.addHistoryCmd(CommandType.PAUSE);
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.READY_PAUSE,
CommandType.PAUSE.getDescp() + " by " + request.getExecuteUser().getUserName());
if (processInstanceDao.updateProcessInstance(workflowInstance) <= 0) {
throw new ExecuteRuntimeException(
String.format(
"The workflow instance: %s pause failed, due to update the workflow instance status in DB failed",
workflowInstance.getName()));
}
WorkflowStateEventChangeCommand workflowStateEventChangeCommand = new WorkflowStateEventChangeCommand(
workflowInstance.getId(), 0, workflowInstance.getState(), workflowInstance.getId(), 0);
try {
apiRpcClient.send(Host.of(workflowInstance.getHost()), workflowStateEventChangeCommand.convert2Command());
} catch (RemotingException e) {
throw new ExecuteRuntimeException(
String.format(
"The workflow instance: %s pause failed, due to send rpc request to master: %s failed",
workflowInstance.getName(), workflowInstance.getHost()),
e);
}
return new PauseExecuteResult(workflowInstance);
}
@Override
public ExecuteType getExecuteType() {
return PauseExecuteFunctionBuilder.EXECUTE_TYPE;
}
}

60
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunctionBuilder.java

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.pause;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.executor.ExecuteContext;
import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder;
import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import java.util.concurrent.CompletableFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class PauseExecuteFunctionBuilder implements ExecuteFunctionBuilder<PauseExecuteRequest, PauseExecuteResult> {
public static final ExecuteType EXECUTE_TYPE = ExecuteType.PAUSE;
@Autowired
private ProcessInstanceDao processInstanceDao;
@Autowired
private ApiRpcClient apiRpcClient;
@Override
public CompletableFuture<ExecuteFunction<PauseExecuteRequest, PauseExecuteResult>> createWorkflowInstanceExecuteFunction(ExecuteContext executeContext) {
return CompletableFuture.completedFuture(new PauseExecuteFunction(processInstanceDao, apiRpcClient));
}
@Override
public CompletableFuture<PauseExecuteRequest> createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) {
return CompletableFuture.completedFuture(
new PauseExecuteRequest(
executeContext.getWorkflowDefinition(),
executeContext.getWorkflowInstance(),
executeContext.getExecuteUser()));
}
@Override
public ExecuteType getExecuteType() {
return EXECUTE_TYPE;
}
}

35
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteRequest.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.pause;
import org.apache.dolphinscheduler.api.executor.ExecuteRequest;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class PauseExecuteRequest implements ExecuteRequest {
private final ProcessDefinition processDefinition;
private final ProcessInstance workflowInstance;
private final User executeUser;
}

29
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteResult.java

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.pause;
import org.apache.dolphinscheduler.api.executor.ExecuteResult;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import lombok.AllArgsConstructor;
@AllArgsConstructor
public class PauseExecuteResult implements ExecuteResult {
private final ProcessInstance processInstance;
}

78
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunction.java

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.recover;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.service.command.CommandService;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
public class RecoverExecuteFunction implements ExecuteFunction<RecoverExecuteRequest, RecoverExecuteResult> {
private final CommandService commandService;
public RecoverExecuteFunction(CommandService commandService) {
this.commandService = commandService;
}
@Override
public RecoverExecuteResult execute(RecoverExecuteRequest request) throws ExecuteRuntimeException {
ProcessInstance workflowInstance = request.getWorkflowInstance();
if (!workflowInstance.getState().isPause()) {
throw new ExecuteRuntimeException(
String.format("The workflow instance: %s state is %s, cannot recovery", workflowInstance.getName(),
workflowInstance.getState()));
}
Command command = Command.builder()
.commandType(CommandType.RECOVER_SUSPENDED_PROCESS)
.processDefinitionCode(workflowInstance.getProcessDefinitionCode())
.processDefinitionVersion(workflowInstance.getProcessDefinitionVersion())
.processInstanceId(workflowInstance.getId())
.commandParam(JSONUtils.toJsonString(createCommandParam(workflowInstance)))
.executorId(request.getExecuteUser().getId())
.testFlag(workflowInstance.getTestFlag())
.build();
if (commandService.createCommand(command) <= 0) {
throw new ExecuteRuntimeException(
String.format("Recovery workflow instance: %s failed, due to insert command to db failed",
workflowInstance.getName()));
}
return new RecoverExecuteResult(command);
}
private Map<String, Object> createCommandParam(ProcessInstance workflowInstance) {
return new ImmutableMap.Builder<String, Object>()
.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, workflowInstance.getId())
.build();
}
@Override
public ExecuteType getExecuteType() {
return RecoverExecuteFunctionBuilder.EXECUTE_TYPE;
}
}

60
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteFunctionBuilder.java

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.recover;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.executor.ExecuteContext;
import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder;
import org.apache.dolphinscheduler.service.command.CommandService;
import java.util.concurrent.CompletableFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RecoverExecuteFunctionBuilder
implements
ExecuteFunctionBuilder<RecoverExecuteRequest, RecoverExecuteResult> {
public static final ExecuteType EXECUTE_TYPE = ExecuteType.RECOVER_SUSPENDED_PROCESS;
@Autowired
private CommandService commandService;
@Override
public CompletableFuture<ExecuteFunction<RecoverExecuteRequest, RecoverExecuteResult>> createWorkflowInstanceExecuteFunction(ExecuteContext executeContext) {
return CompletableFuture.completedFuture(
new RecoverExecuteFunction(commandService));
}
@Override
public CompletableFuture<RecoverExecuteRequest> createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) {
return CompletableFuture.completedFuture(
new RecoverExecuteRequest(
executeContext.getWorkflowInstance(),
executeContext.getWorkflowDefinition(),
executeContext.getExecuteUser()));
}
@Override
public ExecuteType getExecuteType() {
return EXECUTE_TYPE;
}
}

36
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteRequest.java

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.recover;
import org.apache.dolphinscheduler.api.executor.ExecuteRequest;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class RecoverExecuteRequest implements ExecuteRequest {
private final ProcessInstance workflowInstance;
private final ProcessDefinition processDefinition;
private final User executeUser;
}

29
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/recover/RecoverExecuteResult.java

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.pause.recover;
import org.apache.dolphinscheduler.api.executor.ExecuteResult;
import org.apache.dolphinscheduler.dao.entity.Command;
import lombok.AllArgsConstructor;
@AllArgsConstructor
public class RecoverExecuteResult implements ExecuteResult {
private final Command command;
}

93
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunction.java

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.rerun;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_PARAMS;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.commons.collections4.MapUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import com.fasterxml.jackson.core.type.TypeReference;
public class RepeatRunningExecuteFunction implements ExecuteFunction<RepeatRunningRequest, RepeatRunningResult> {
private final CommandService commandService;
public RepeatRunningExecuteFunction(CommandService commandService) {
this.commandService = commandService;
}
@Override
public RepeatRunningResult execute(RepeatRunningRequest request) throws ExecuteRuntimeException {
checkNotNull(request, "request cannot be null");
// todo: check workflow definition valid? or we don't need to do this check, since we will check in master
// again.
// todo: check tenant valid? or we don't need to do this check, since we need to check in master again.
ProcessInstance workflowInstance = request.getWorkflowInstance();
if (workflowInstance.getState() == null || !workflowInstance.getState().isFinished()) {
throw new ExecuteRuntimeException(
String.format("The workflow instance: %s status is %s, cannot repeat running",
workflowInstance.getName(), workflowInstance.getState()));
}
Command command = Command.builder()
.commandType(CommandType.REPEAT_RUNNING)
.commandParam(JSONUtils.toJsonString(createCommandParams(workflowInstance)))
.processDefinitionCode(workflowInstance.getProcessDefinitionCode())
.processDefinitionVersion(workflowInstance.getProcessDefinitionVersion())
.processInstanceId(workflowInstance.getId())
.processInstancePriority(workflowInstance.getProcessInstancePriority())
.testFlag(workflowInstance.getTestFlag())
.build();
if (commandService.createCommand(command) <= 0) {
throw new ExecuteRuntimeException(
String.format("Repeat running workflow instance: %s failed, due to insert command to db failed",
workflowInstance.getName()));
}
return new RepeatRunningResult(command.getId());
}
@Override
public ExecuteType getExecuteType() {
return RepeatRunningExecuteFunctionBuilder.EXECUTE_TYPE;
}
private Map<String, Object> createCommandParams(ProcessInstance workflowInstance) {
Map<String, Object> commandMap =
JSONUtils.parseObject(workflowInstance.getCommandParam(), new TypeReference<Map<String, Object>>() {
});
Map<String, Object> repeatRunningCommandParams = new HashMap<>();
Optional.ofNullable(MapUtils.getObject(commandMap, CMD_PARAM_START_PARAMS))
.ifPresent(startParams -> repeatRunningCommandParams.put(CMD_PARAM_START_PARAMS, startParams));
repeatRunningCommandParams.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, workflowInstance.getId());
return repeatRunningCommandParams;
}
}

59
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningExecuteFunctionBuilder.java

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.rerun;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.executor.ExecuteContext;
import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder;
import org.apache.dolphinscheduler.service.command.CommandService;
import java.util.concurrent.CompletableFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RepeatRunningExecuteFunctionBuilder
implements
ExecuteFunctionBuilder<RepeatRunningRequest, RepeatRunningResult> {
public static final ExecuteType EXECUTE_TYPE = ExecuteType.REPEAT_RUNNING;
@Autowired
private CommandService commandService;
@Override
public CompletableFuture<ExecuteFunction<RepeatRunningRequest, RepeatRunningResult>> createWorkflowInstanceExecuteFunction(ExecuteContext executeContext) {
return CompletableFuture.completedFuture(new RepeatRunningExecuteFunction(commandService));
}
@Override
public CompletableFuture<RepeatRunningRequest> createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) {
return CompletableFuture.completedFuture(
new RepeatRunningRequest(
executeContext.getWorkflowInstance(),
executeContext.getWorkflowDefinition(),
executeContext.getExecuteUser()));
}
@Override
public ExecuteType getExecuteType() {
return EXECUTE_TYPE;
}
}

38
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningRequest.java

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.rerun;
import org.apache.dolphinscheduler.api.executor.ExecuteRequest;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class RepeatRunningRequest implements ExecuteRequest {
private final ProcessInstance workflowInstance;
private final ProcessDefinition processDefinition;
private final User executeUser;
}

30
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/rerun/RepeatRunningResult.java

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.rerun;
import org.apache.dolphinscheduler.api.executor.ExecuteResult;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class RepeatRunningResult implements ExecuteResult {
private final Integer commandId;
}

87
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunction.java

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.stop;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
import org.apache.dolphinscheduler.api.executor.ExecuteRuntimeException;
import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class StopExecuteFunction implements ExecuteFunction<StopRequest, StopResult> {
private final ProcessInstanceDao processInstanceDao;
// todo: Use ApiRpcClient instead of NettyRemotingClient
private final ApiRpcClient apiRpcClient;
public StopExecuteFunction(ProcessInstanceDao processInstanceDao, ApiRpcClient apiRpcClient) {
this.processInstanceDao = processInstanceDao;
this.apiRpcClient = apiRpcClient;
}
@Override
public StopResult execute(StopRequest request) throws ExecuteRuntimeException {
ProcessInstance workflowInstance = request.getWorkflowInstance();
if (!workflowInstance.getState().canStop()
|| workflowInstance.getState() == WorkflowExecutionStatus.READY_STOP) {
throw new ExecuteRuntimeException(
String.format("The workflow instance: %s status is %s, can not be stopped",
workflowInstance.getName(), workflowInstance.getState()));
}
// update the workflow instance's status to stop
workflowInstance.setCommandType(CommandType.STOP);
workflowInstance.addHistoryCmd(CommandType.STOP);
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.READY_STOP, CommandType.STOP.getDescp() + " by user");
if (processInstanceDao.updateProcessInstance(workflowInstance) > 0) {
log.info("Workflow instance {} ready to stop success, will call master to stop the workflow instance",
workflowInstance.getName());
// todo: Use specific stop command instead of WorkflowStateEventChangeCommand
WorkflowStateEventChangeCommand workflowStateEventChangeCommand = new WorkflowStateEventChangeCommand(
workflowInstance.getId(), 0, workflowInstance.getState(), workflowInstance.getId(), 0);
try {
apiRpcClient.send(Host.of(workflowInstance.getHost()),
workflowStateEventChangeCommand.convert2Command());
} catch (RemotingException e) {
throw new ExecuteRuntimeException(
String.format("Workflow instance: %s stop failed, due to send request to master: %s failed",
workflowInstance.getName(), workflowInstance.getHost()),
e);
}
// todo: use async and inject the completeFuture in the result.
return new StopResult(workflowInstance);
}
throw new ExecuteRuntimeException(
"Workflow instance stop failed, due to update the workflow instance status failed");
}
@Override
public ExecuteType getExecuteType() {
return StopExecuteFunctionBuilder.EXECUTE_TYPE;
}
}

56
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopExecuteFunctionBuilder.java

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.stop;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.executor.ExecuteContext;
import org.apache.dolphinscheduler.api.executor.ExecuteFunction;
import org.apache.dolphinscheduler.api.executor.ExecuteFunctionBuilder;
import org.apache.dolphinscheduler.api.rpc.ApiRpcClient;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import java.util.concurrent.CompletableFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class StopExecuteFunctionBuilder implements ExecuteFunctionBuilder<StopRequest, StopResult> {
public static final ExecuteType EXECUTE_TYPE = ExecuteType.STOP;
@Autowired
private ProcessInstanceDao processInstanceDao;
@Autowired
private ApiRpcClient apiRpcClient;
@Override
public CompletableFuture<ExecuteFunction<StopRequest, StopResult>> createWorkflowInstanceExecuteFunction(ExecuteContext executeContext) {
return CompletableFuture.completedFuture(new StopExecuteFunction(processInstanceDao, apiRpcClient));
}
@Override
public CompletableFuture<StopRequest> createWorkflowInstanceExecuteRequest(ExecuteContext executeContext) {
return CompletableFuture.completedFuture(new StopRequest(executeContext.getWorkflowInstance()));
}
@Override
public ExecuteType getExecuteType() {
return EXECUTE_TYPE;
}
}

33
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopRequest.java

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.stop;
import org.apache.dolphinscheduler.api.executor.ExecuteRequest;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NonNull;
@Data
@AllArgsConstructor
public class StopRequest implements ExecuteRequest {
@NonNull
private final ProcessInstance workflowInstance;
}

33
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/stop/StopResult.java

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.executor.workflow.instance.stop;
import org.apache.dolphinscheduler.api.executor.ExecuteResult;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NonNull;
@Data
@AllArgsConstructor
public class StopResult implements ExecuteResult {
@NonNull
private final ProcessInstance workflowInstance;
}

41
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/rpc/ApiRpcClient.java

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.rpc;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.springframework.stereotype.Component;
@Component
public class ApiRpcClient {
private final NettyRemotingClient nettyRemotingClient;
public ApiRpcClient() {
this.nettyRemotingClient = new NettyRemotingClient(new NettyClientConfig());
}
public void send(Host host, Command command) throws RemotingException {
nettyRemotingClient.send(host, command);
}
}

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.servlet.http.HttpServletResponse;
@ -152,6 +153,10 @@ public interface ProcessDefinitionService {
ProcessDefinition getProcessDefinition(User loginUser,
long code);
Optional<ProcessDefinition> queryWorkflowDefinition(long workflowDefinitionCode, int workflowDefinitionVersion);
ProcessDefinition queryWorkflowDefinitionThrowExceptionIfNotFound(long workflowDefinitionCode,
int workflowDefinitionVersion);
/**
* query detail of process definition
*

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -57,6 +57,8 @@ public interface ProcessInstanceService {
long projectCode,
Integer processId);
ProcessInstance queryByWorkflowInstanceIdThrowExceptionIfNotFound(Integer processId);
/**
* query process instance by id
*

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.User;
@ -66,7 +67,9 @@ public interface ProjectService {
*/
Map<String, Object> checkProjectAndAuth(User loginUser, Project project, long projectCode, String perm);
void checkProjectAndAuthThrowException(User loginUser, Project project, String permission);
void checkProjectAndAuthThrowException(User loginUser, Project project, String permission) throws ServiceException;
void checkProjectAndAuthThrowException(User loginUser, long projectCode, String permission) throws ServiceException;
boolean hasProjectAndPerm(User loginUser, Project project, Map<String, Object> result, String perm);

250
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.api.service.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
@ -34,8 +36,11 @@ import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteRespo
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.executor.ExecuteClient;
import org.apache.dolphinscheduler.api.executor.ExecuteContext;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.common.constants.Constants;
@ -91,7 +96,6 @@ import org.apache.dolphinscheduler.service.process.TriggerRelationService;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.time.ZonedDateTime;
@ -101,6 +105,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -111,7 +116,6 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists;
/**
@ -144,10 +148,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
private ProcessService processService;
@Autowired
private CommandService commandService;
private ProcessInstanceDao processInstanceDao;
@Autowired
private ProcessInstanceDao processInstanceDao;
private ProcessDefinitionService processDefinitionService;
@Autowired
private CommandService commandService;
@Autowired
private TaskDefinitionLogMapper taskDefinitionLogMapper;
@ -169,6 +176,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
@Autowired
private TriggerRelationService triggerRelationService;
@Autowired
private ExecuteClient executeClient;
/**
* execute process instance
*
@ -236,15 +247,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result;
}
if (!checkScheduleTimeNum(commandType, cronTime)) {
putMsg(result, Status.SCHEDULE_TIME_NUMBER);
return result;
}
// check master exists
if (!checkMasterExists(result)) {
return result;
}
checkScheduleTimeNumExceed(commandType, cronTime);
checkMasterExists();
long triggerCode = CodeGenerateUtils.getInstance().genCode();
@ -275,46 +279,31 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result;
}
/**
* check whether master exists
*
* @param result result
* @return master exists return true , otherwise return false
*/
private boolean checkMasterExists(Map<String, Object> result) {
private void checkMasterExists() {
// check master server exists
List<Server> masterServers = monitorService.getServerListFromRegistry(true);
// no master
if (masterServers.isEmpty()) {
log.error("Master does not exist.");
putMsg(result, Status.MASTER_NOT_EXISTS);
return false;
throw new ServiceException(Status.MASTER_NOT_EXISTS);
}
return true;
}
/**
* @param complementData
* @param cronTime
* @return CommandType is COMPLEMENT_DATA and cronTime's number is not greater than 100 return true , otherwise return false
*/
private boolean checkScheduleTimeNum(CommandType complementData, String cronTime) {
private void checkScheduleTimeNumExceed(CommandType complementData, String cronTime) {
if (!CommandType.COMPLEMENT_DATA.equals(complementData)) {
return true;
return;
}
if (cronTime == null) {
return true;
return;
}
Map<String, String> cronMap = JSONUtils.toMap(cronTime);
if (cronMap.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
String[] stringDates = cronMap.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).split(COMMA);
if (stringDates.length > SCHEDULE_TIME_MAX_LENGTH) {
log.warn("Parameter cornTime is bigger than {}.", SCHEDULE_TIME_MAX_LENGTH);
return false;
throw new ServiceException(Status.SCHEDULE_TIME_NUMBER_EXCEED);
}
}
return true;
}
/**
@ -322,19 +311,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
*
* @param projectCode project code
* @param processDefinition process definition
* @param processDefineCode process definition code
* @param version process instance version
*/
@Override
public void checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition,
long processDefineCode, Integer version) {
// check process definition exists
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefineCode));
if (projectCode != processDefinition.getProjectCode()) {
throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, processDefinition.getCode());
}
// check process definition online
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(processDefineCode), version);
throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getCode(),
processDefinition.getVersion());
}
// check sub process definition online
if (!checkSubProcessDefinitionValid(processDefinition)) {
@ -381,8 +369,6 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
/**
* do action to process instancepause, stop, repeat, recover from pause, recover from stoprerun failed task
*
* @param loginUser login user
* @param projectCode project code
@ -391,103 +377,35 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @return execute result code
*/
@Override
public Map<String, Object> execute(User loginUser, long projectCode, Integer processInstanceId,
public Map<String, Object> execute(User loginUser,
long projectCode,
Integer processInstanceId,
ExecuteType executeType) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
checkNotNull(processInstanceId, "workflowInstanceId cannot be null");
checkNotNull(executeType, "executeType cannot be null");
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,
// check user access for project
projectService.checkProjectAndAuthThrowException(loginUser, projectCode,
ApiFuncIdentificationConstant.map.get(executeType));
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
// check master exists
if (!checkMasterExists(result)) {
return result;
}
checkMasterExists();
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId)
ProcessInstance workflowInstance =
Optional.ofNullable(processInstanceDao.queryByWorkflowInstanceId(processInstanceId))
.orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
ProcessDefinition processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
processDefinition.setReleaseState(ReleaseState.ONLINE);
if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) {
this.checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
}
checkState(workflowInstance.getProjectCode() == projectCode,
"The workflow instance's project code doesn't equals to the given project");
ProcessDefinition processDefinition = processDefinitionService.queryWorkflowDefinitionThrowExceptionIfNotFound(
workflowInstance.getProcessDefinitionCode(), workflowInstance.getProcessDefinitionVersion());
result = checkExecuteType(processInstance, executeType);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
if (!checkTenantSuitable(processDefinition)) {
log.error(
"There is not any valid tenant for the process definition, processDefinitionId:{}, processDefinitionCode:{}, ",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
}
executeClient.executeWorkflowInstance(new ExecuteContext(
workflowInstance,
processDefinition,
loginUser,
executeType));
// get the startParams user specified at the first starting while repeat running is needed
Map<String, Object> commandMap =
JSONUtils.parseObject(processInstance.getCommandParam(), new TypeReference<Map<String, Object>>() {
});
String startParams = null;
if (MapUtils.isNotEmpty(commandMap) && executeType == ExecuteType.REPEAT_RUNNING) {
Object startParamsJson = commandMap.get(CMD_PARAM_START_PARAMS);
if (startParamsJson != null) {
startParams = startParamsJson.toString();
}
}
switch (executeType) {
case REPEAT_RUNNING:
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(),
processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams,
processInstance.getTestFlag());
break;
case RECOVER_SUSPENDED_PROCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(),
processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams,
processInstance.getTestFlag());
break;
case START_FAILURE_TASK_PROCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(),
processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams,
processInstance.getTestFlag());
break;
case STOP:
if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP) {
log.warn("Process instance status is already {}, processInstanceName:{}.",
WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName());
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(),
processInstance.getState());
} else {
result =
updateProcessInstancePrepare(processInstance, CommandType.STOP,
WorkflowExecutionStatus.READY_STOP);
}
break;
case PAUSE:
if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) {
log.warn("Process instance status is already {}, processInstanceName:{}.",
WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName());
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(),
processInstance.getState());
} else {
result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE,
WorkflowExecutionStatus.READY_PAUSE);
}
break;
default:
log.warn("Unknown execute type for process instance, processInstanceId:{}.",
processInstance.getId());
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");
break;
}
Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, Status.SUCCESS);
return result;
}
@ -504,10 +422,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
@Override
public Map<String, Object> execute(User loginUser, Integer workflowInstanceId, ExecuteType executeType) {
ProcessInstance processInstance = processInstanceMapper.selectById(workflowInstanceId);
ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
return execute(loginUser, processDefinition.getProjectCode(), workflowInstanceId, executeType);
return execute(loginUser, processInstance.getProjectCode(), workflowInstanceId, executeType);
}
/**
@ -633,10 +548,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result;
}
// check master exists
if (!checkMasterExists(result)) {
return result;
}
checkMasterExists();
return forceStart(processInstance, taskGroupQueue);
}
@ -773,62 +685,6 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result;
}
/**
* insert command, used in the implementation of the page, rerun, recovery (pause / failure) execution
*
* @param loginUser login user
* @param instanceId instance id
* @param processDefinitionCode process definition code
* @param processVersion
* @param commandType command type
* @return insert result code
*/
private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode,
int processVersion, CommandType commandType, String startParams,
int testFlag) {
Map<String, Object> result = new HashMap<>();
// To add startParams only when repeat running is needed
Map<String, Object> cmdParam = new HashMap<>();
cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, instanceId);
if (!StringUtils.isEmpty(startParams)) {
cmdParam.put(CMD_PARAM_START_PARAMS, startParams);
}
Command command = new Command();
command.setCommandType(commandType);
command.setProcessDefinitionCode(processDefinitionCode);
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
command.setExecutorId(loginUser.getId());
command.setProcessDefinitionVersion(processVersion);
command.setProcessInstanceId(instanceId);
command.setTestFlag(testFlag);
if (!commandService.verifyIsNeedCreateCommand(command)) {
log.warn(
"Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
processDefinitionCode, processVersion, instanceId);
putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(processDefinitionCode));
return result;
}
log.info("Creating command, commandInfo:{}.", command);
int create = commandService.createCommand(command);
if (create > 0) {
log.info("Create {} command complete, processDefinitionCode:{}, processDefinitionVersion:{}.",
command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion);
putMsg(result, Status.SUCCESS);
} else {
log.error(
"Execute process instance failed because create {} command error, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion,
instanceId);
putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
}
return result;
}
/**
* check whether sub processes are offline before starting process definition
*
@ -1319,11 +1175,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result;
}
// check master exists
if (!checkMasterExists(result)) {
return result;
}
checkMasterExists();
// todo dispatch improvement
List<Server> masterServerList = monitorService.getServerListFromRegistry(true);
Host host = new Host(masterServerList.get(0).getHost(), masterServerList.get(0).getPort());

20
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -236,6 +236,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
@Lazy
@Autowired
private SchedulerService schedulerService;
@ -724,6 +725,25 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return processDefinition;
}
@Override
public Optional<ProcessDefinition> queryWorkflowDefinition(long workflowDefinitionCode,
int workflowDefinitionVersion) {
ProcessDefinition workflowDefinition = processDefinitionDao.queryByCode(workflowDefinitionCode).orElse(null);
if (workflowDefinition == null || workflowDefinition.getVersion() != workflowDefinitionVersion) {
workflowDefinition = processDefinitionLogDao.queryProcessDefinitionLog(workflowDefinitionCode,
workflowDefinitionVersion);
}
return Optional.ofNullable(workflowDefinition);
}
@Override
public ProcessDefinition queryWorkflowDefinitionThrowExceptionIfNotFound(long workflowDefinitionCode,
int workflowDefinitionVersion) {
return queryWorkflowDefinition(workflowDefinitionCode, workflowDefinitionVersion)
.orElseThrow(() -> new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST,
String.valueOf(workflowDefinitionCode)));
}
@Override
public Map<String, Object> queryProcessDefinitionByName(User loginUser, long projectCode, String name) {
Project project = projectMapper.queryByCode(projectCode);

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -278,6 +278,15 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result;
}
@Override
public ProcessInstance queryByWorkflowInstanceIdThrowExceptionIfNotFound(Integer workflowInstanceId) {
ProcessInstance processInstance = processInstanceDao.queryByWorkflowInstanceId(workflowInstanceId);
if (processInstance == null) {
throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, workflowInstanceId);
}
return processInstance;
}
/**
* query workflow instance by id
*

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java

@ -242,6 +242,12 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
}
}
@Override
public void checkProjectAndAuthThrowException(User loginUser, long projectCode, String permission) {
Project project = projectMapper.queryByCode(projectCode);
checkProjectAndAuthThrowException(loginUser, project, permission);
}
@Override
public boolean hasProjectAndPerm(User loginUser, Project project, Map<String, Object> result, String permission) {
boolean checkResult = false;

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java → dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java

@ -55,7 +55,7 @@ import com.google.gson.JsonObject;
/**
* executor controller test
*/
public class ExecutorControllerTest extends AbstractControllerTest {
public class ExecuteFunctionControllerTest extends AbstractControllerTest {
final Gson gson = new Gson();
final long projectCode = 1L;

49
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java → dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java

@ -31,8 +31,8 @@ import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteRespo
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.executor.ExecuteClient;
import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.common.constants.Constants;
@ -64,6 +64,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.process.TriggerRelationService;
@ -96,11 +97,9 @@ import org.slf4j.LoggerFactory;
*/
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class ExecutorServiceTest {
public class ExecuteFunctionServiceTest {
private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceTest.class);
private static final Logger baseServiceLogger = LoggerFactory.getLogger(BaseServiceImpl.class);
private static final Logger logger = LoggerFactory.getLogger(ExecuteFunctionServiceTest.class);
@Mock
private ResourcePermissionCheckService resourcePermissionCheckService;
@ -147,6 +146,15 @@ public class ExecutorServiceTest {
@Mock
private TriggerRelationService triggerRelationService;
@Mock
private ExecuteClient executeClient;
@Mock
private ProcessInstanceDao processInstanceDao;
@Mock
private ProcessDefinitionService processDefinitionService;
private int processDefinitionId = 1;
private int processDefinitionVersion = 1;
@ -195,6 +203,7 @@ public class ExecutorServiceTest {
// processInstance
processInstance.setId(processInstanceId);
processInstance.setProjectCode(projectCode);
processInstance.setState(WorkflowExecutionStatus.FAILURE);
processInstance.setExecutorId(userId);
processInstance.setTenantId(tenantId);
@ -453,25 +462,39 @@ public class ExecutorServiceTest {
public void testNoMasterServers() {
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(new ArrayList<>());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
Assertions.assertThrows(ServiceException.class, () -> executorService.execProcessInstance(
loginUser,
projectCode,
processDefinitionCode,
"{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}",
CommandType.COMPLEMENT_DATA,
null, null,
null, null, null,
null,
null,
null,
null,
null,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
Priority.LOW,
Constants.DEFAULT_WORKER_GROUP,
100L,
110,
null,
0,
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null);
Assertions.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS);
ComplementDependentMode.OFF_MODE,
null));
}
@Test
public void testExecuteRepeatRunning() {
Mockito.when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN))
when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN))
.thenReturn(checkProjectAndAuth());
when(processInstanceDao.queryByWorkflowInstanceId(processInstanceId)).thenReturn(processInstance);
when(processDefinitionService.queryWorkflowDefinitionThrowExceptionIfNotFound(processDefinitionCode,
processDefinitionVersion)).thenReturn(processDefinition);
Map<String, Object> result =
executorService.execute(loginUser, projectCode, processInstanceId, ExecuteType.REPEAT_RUNNING);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));

21
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java

@ -25,7 +25,10 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import java.util.Date;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
@ -33,6 +36,9 @@ import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("t_ds_command")
public class Command {
@ -55,10 +61,10 @@ public class Command {
private String commandParam;
@TableField("task_depend_type")
private TaskDependType taskDependType;
private TaskDependType taskDependType = TaskDependType.TASK_POST;
@TableField("failure_strategy")
private FailureStrategy failureStrategy;
private FailureStrategy failureStrategy = FailureStrategy.CONTINUE;
@TableField("warning_type")
private WarningType warningType;
@ -70,13 +76,13 @@ public class Command {
private Date scheduleTime;
@TableField("start_time")
private Date startTime;
private Date startTime = new Date();
@TableField("process_instance_priority")
private Priority processInstancePriority;
@TableField("update_time")
private Date updateTime;
private Date updateTime = new Date();
@TableField("worker_group")
private String workerGroup;
@ -99,13 +105,6 @@ public class Command {
@TableField("test_flag")
private int testFlag;
public Command() {
this.taskDependType = TaskDependType.TASK_POST;
this.failureStrategy = FailureStrategy.CONTINUE;
this.startTime = new Date();
this.updateTime = new Date();
}
public Command(
CommandType commandType,
TaskDependType taskDependType,

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionLogDao.java

@ -17,7 +17,11 @@
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
public interface ProcessDefinitionLogDao {
ProcessDefinitionLog queryProcessDefinitionLog(long workflowDefinitionCode, int workflowDefinitionVersion);
void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
}

1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java

@ -38,4 +38,5 @@ public interface ProcessInstanceDao {
void deleteById(Integer workflowInstanceId);
ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId);
}

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionLogDaoImpl.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
@ -29,6 +30,12 @@ public class ProcessDefinitionLogDaoImpl implements ProcessDefinitionLogDao {
@Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper;
@Override
public ProcessDefinitionLog queryProcessDefinitionLog(long workflowDefinitionCode, int workflowDefinitionVersion) {
return processDefinitionLogMapper.queryByDefinitionCodeAndVersion(workflowDefinitionCode,
workflowDefinitionVersion);
}
@Override
public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) {
processDefinitionLogMapper.deleteByProcessDefinitionCode(workflowDefinitionCode);

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java

@ -69,4 +69,9 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
public void deleteById(Integer workflowInstanceId) {
processInstanceMapper.deleteById(workflowInstanceId);
}
@Override
public ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId) {
return processInstanceMapper.selectById(workflowInstanceId);
}
}

7
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ShellUtils.java

@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.plugin.task.api.utils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@ -30,8 +32,9 @@ import lombok.experimental.UtilityClass;
public class ShellUtils {
public List<String> ENV_SOURCE_LIST = Arrays.stream(
Optional.ofNullable(PropertyUtils.getString("shell.env_source_list"))
.map(s -> s.split(",")).orElse(new String[0]))
Optional.ofNullable(PropertyUtils.getString("shell.env_source_list")).map(s -> s.split(","))
.orElse(new String[0]))
.map(String::trim)
.filter(StringUtils::isNotBlank)
.collect(Collectors.toList());
}

Loading…
Cancel
Save