Browse Source

[Feature-10871] add workflow executing data query (#10875)

* add workflow executing data query
* fix sonar check for interrupted
3.0.0/version-upgrade
caishunfeng 2 years ago
parent
commit
b9e3187ad5
  1. 35
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
  2. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
  3. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  4. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  5. 25
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  6. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  7. 51
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/controller/WorkflowExecuteController.java
  8. 65
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowExecutingDataRequestProcessor.java
  9. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
  10. 9
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
  11. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
  12. 75
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java
  13. 12
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
  14. 50
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataRequestCommand.java
  15. 51
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataResponseCommand.java
  16. 109
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/TaskInstanceExecuteDto.java
  17. 154
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/WorkflowExecuteDto.java
  18. 36
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
  19. 15
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

35
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.CHECK_PROCESS_DEFINITION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.CHECK_PROCESS_DEFINITION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.EXECUTE_PROCESS_INSTANCE_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.EXECUTE_PROCESS_INSTANCE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_EXECUTING_WORKFLOW_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.START_PROCESS_INSTANCE_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.START_PROCESS_INSTANCE_ERROR;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
@ -37,9 +38,21 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
import org.apache.commons.lang3.StringUtils;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute; import org.springframework.web.bind.annotation.RequestAttribute;
@ -55,12 +68,6 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore; import springfox.documentation.annotations.ApiIgnore;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/** /**
* executor controller * executor controller
@ -303,4 +310,20 @@ public class ExecutorController extends BaseController {
Map<String, Object> result = execService.startCheckByProcessDefinedCode(processDefinitionCode); Map<String, Object> result = execService.startCheckByProcessDefinedCode(processDefinitionCode);
return returnDataList(result); return returnDataList(result);
} }
/**
* query execute data of processInstance from master
*/
@ApiOperation(value = "queryExecutingWorkflow", notes = "QUERY_WORKFLOW_EXECUTE_DATA")
@ApiImplicitParams({
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/query-executing-workflow")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_EXECUTING_WORKFLOW_ERROR)
@AccessLogAnnotation
public Result queryExecutingWorkflow(@RequestParam("id") Integer processInstanceId) {
WorkflowExecuteDto workflowExecuteDto = execService.queryExecutingWorkflowByProcessInstanceId(processInstanceId);
return Result.success(workflowExecuteDto);
}
} }

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java

@ -43,7 +43,10 @@ import org.apache.commons.lang.StringUtils;
import java.io.IOException; import java.io.IOException;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.*; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -216,6 +216,7 @@ public enum Status {
QUERY_AUTHORIZED_USER(10183, "query authorized user error", "查询拥有项目权限的用户错误"), QUERY_AUTHORIZED_USER(10183, "query authorized user error", "查询拥有项目权限的用户错误"),
PROJECT_NOT_EXIST(10190, "This project was not found. Please refresh page.", "该项目不存在,请刷新页面"), PROJECT_NOT_EXIST(10190, "This project was not found. Please refresh page.", "该项目不存在,请刷新页面"),
TASK_INSTANCE_HOST_IS_NULL(10191, "task instance host is null", "任务实例host为空"), TASK_INSTANCE_HOST_IS_NULL(10191, "task instance host is null", "任务实例host为空"),
QUERY_EXECUTING_WORKFLOW_ERROR(10192, "query executing workflow error", "查询运行的工作流实例错误"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
import java.util.Map; import java.util.Map;
@ -111,4 +112,11 @@ public interface ExecutorService {
* @return * @return
*/ */
Map<String, Object> forceStartTaskInstance(User loginUser, int queueId); Map<String, Object> forceStartTaskInstance(User loginUser, int queueId);
/**
* query executing workflow data in Master memory
* @param processInstanceId
* @return
*/
WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId);
} }

25
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -64,6 +64,9 @@ import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataRequestCommand;
import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataResponseCommand;
import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -851,4 +854,26 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return validDependentProcessDefinitionList; return validDependentProcessDefinitionList;
} }
/**
* query executing data of processInstance by master
* @param processInstanceId
* @return
*/
@Override
public WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId) {
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
if (processInstance == null) {
return null;
}
Host host = new Host(processInstance.getHost());
WorkflowExecutingDataRequestCommand requestCommand = new WorkflowExecutingDataRequestCommand();
requestCommand.setProcessInstanceId(processInstanceId);
org.apache.dolphinscheduler.remote.command.Command command = stateEventCallbackService.sendSync(host, requestCommand.convert2Command());
if (command == null) {
return null;
}
WorkflowExecutingDataResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), WorkflowExecutingDataResponseCommand.class);
return responseCommand.getWorkflowExecuteDto();
}
} }

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -829,5 +829,4 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
public List<ProcessInstance> queryByProcessDefineCode(Long processDefinitionCode, int size) { public List<ProcessInstance> queryByProcessDefineCode(Long processDefinitionCode, int size) {
return processInstanceMapper.queryByProcessDefineCode(processDefinitionCode, size); return processInstanceMapper.queryByProcessDefineCode(processDefinitionCode, size);
} }
} }

51
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/controller/WorkflowExecuteController.java

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.controller;
import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.server.master.service.ExecutingService;
import java.util.Optional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/workflow/execute")
public class WorkflowExecuteController {
@Autowired
private ExecutingService executingService;
/**
* query workflow execute data in memory
* @param processInstanceId
* @return
*/
@GetMapping("")
@ResponseStatus(HttpStatus.OK)
public WorkflowExecuteDto queryExecuteData(@RequestParam("id") int processInstanceId) {
Optional<WorkflowExecuteDto> workflowExecuteDtoOptional = executingService.queryWorkflowExecutingData(processInstanceId);
return workflowExecuteDtoOptional.orElse(null);
}
}

65
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowExecutingDataRequestProcessor.java

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataRequestCommand;
import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataResponseCommand;
import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.service.ExecutingService;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**
* workflow executing data process from api/master
*/
@Component
public class WorkflowExecutingDataRequestProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(WorkflowExecutingDataRequestProcessor.class);
@Autowired
private ExecutingService executingService;
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST == command.getType(), String.format("invalid command type: %s", command.getType()));
WorkflowExecutingDataRequestCommand requestCommand = JSONUtils.parseObject(command.getBody(), WorkflowExecutingDataRequestCommand.class);
logger.info("received command, processInstanceId:{}", requestCommand.getProcessInstanceId());
Optional<WorkflowExecuteDto> workflowExecuteDtoOptional = executingService.queryWorkflowExecutingData(requestCommand.getProcessInstanceId());
WorkflowExecutingDataResponseCommand responseCommand = new WorkflowExecutingDataResponseCommand();
workflowExecuteDtoOptional.ifPresent(responseCommand::setWorkflowExecuteDto);
channel.writeAndFlush(responseCommand.convert2Command(command.getOpaque()));
}
}

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -70,6 +71,9 @@ public class MasterRPCServer implements AutoCloseable {
@Autowired @Autowired
private LoggerRequestProcessor loggerRequestProcessor; private LoggerRequestProcessor loggerRequestProcessor;
@Autowired
private WorkflowExecutingDataRequestProcessor workflowExecutingDataRequestProcessor;
@PostConstruct @PostConstruct
private void init() { private void init() {
// init remoting server // init remoting server
@ -83,6 +87,7 @@ public class MasterRPCServer implements AutoCloseable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor); this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST, workflowExecutingDataRequestProcessor);
// logger server // logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor); this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);

9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.runner; package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
@ -27,8 +26,10 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
@ -46,8 +47,6 @@ import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.util.concurrent.ListenableFutureCallback;
import com.google.common.base.Strings;
import lombok.NonNull; import lombok.NonNull;
/** /**
@ -191,11 +190,9 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
logger.error("process {} host is empty, cannot notify task {} now", processInstance.getId(), taskInstance.getId()); logger.error("process {} host is empty, cannot notify task {} now", processInstance.getId(), taskInstance.getId());
return; return;
} }
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
finishProcessInstance.getId(), 0, finishProcessInstance.getState(), processInstance.getId(), taskInstance.getId() finishProcessInstance.getId(), 0, finishProcessInstance.getState(), processInstance.getId(), taskInstance.getId()
); );
stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); stateEventCallbackService.sendResult(new Host(host), stateEventChangeCommand.convert2Command());
} }
} }

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -210,9 +211,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
processInstance.getId(), taskInstance.getId(), subProcessInstance.getState(), subProcessInstance.getId(), 0 processInstance.getId(), taskInstance.getId(), subProcessInstance.getState(), subProcessInstance.getId(), 0
); );
String address = subProcessInstance.getHost().split(":")[0]; this.stateEventCallbackService.sendResult(new Host(subProcessInstance.getHost()), stateEventChangeCommand.convert2Command());
int port = Integer.parseInt(subProcessInstance.getHost().split(":")[1]);
this.stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
} }
@Override @Override

75
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.service;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.dto.TaskInstanceExecuteDto;
import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.controller.WorkflowExecuteController;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.compress.utils.Lists;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* executing service, to query executing data from memory, such workflow instance
*/
@Component
public class ExecutingService {
private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteController.class);
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
public Optional<WorkflowExecuteDto> queryWorkflowExecutingData(Integer processInstanceId) {
WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteRunnable == null) {
logger.info("workflow execute data not found, maybe it has finished, workflow id:{}", processInstanceId);
return Optional.empty();
}
try {
WorkflowExecuteDto workflowExecuteDto = new WorkflowExecuteDto();
BeanUtils.copyProperties(workflowExecuteDto, workflowExecuteRunnable.getProcessInstance());
List<TaskInstanceExecuteDto> taskInstanceList = Lists.newArrayList();
if (CollectionUtils.isNotEmpty(workflowExecuteRunnable.getAllTaskInstances())) {
for (TaskInstance taskInstance : workflowExecuteRunnable.getAllTaskInstances()) {
TaskInstanceExecuteDto taskInstanceExecuteDto = new TaskInstanceExecuteDto();
BeanUtils.copyProperties(taskInstanceExecuteDto, taskInstance);
taskInstanceList.add(taskInstanceExecuteDto);
}
}
workflowExecuteDto.setTaskInstances(taskInstanceList);
return Optional.of(workflowExecuteDto);
} catch (IllegalAccessException | InvocationTargetException e) {
logger.error("query workflow execute data fail, workflow id:{}", processInstanceId, e);
}
return Optional.empty();
}
}

12
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java

@ -130,5 +130,15 @@ public enum CommandType {
/** /**
* task state event request * task state event request
*/ */
TASK_WAKEUP_EVENT_REQUEST; TASK_WAKEUP_EVENT_REQUEST,
/**
* workflow executing data request, from api to master
*/
WORKFLOW_EXECUTING_DATA_REQUEST,
/**
* workflow executing data response, from master to api
*/
WORKFLOW_EXECUTING_DATA_RESPONSE;
} }

50
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataRequestCommand.java

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* workflow executing data request, from api to master
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WorkflowExecutingDataRequestCommand implements Serializable {
private Integer processInstanceId;
/**
* package request command
*
* @return command
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
}

51
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataResponseCommand.java

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* workflow executing data response, from master to api
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WorkflowExecutingDataResponseCommand implements Serializable {
private WorkflowExecuteDto workflowExecuteDto;
/**
* package request command
*
* @return command
*/
public Command convert2Command(long opaque) {
Command command = new Command(opaque);
command.setType(CommandType.WORKFLOW_EXECUTING_DATA_RESPONSE);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
}

109
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/TaskInstanceExecuteDto.java

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.dto;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import java.util.Date;
import java.util.Map;
import lombok.Data;
@Data
public class TaskInstanceExecuteDto {
private int id;
private String name;
private String taskType;
private int processInstanceId;
private long taskCode;
private int taskDefinitionVersion;
private String processInstanceName;
private int taskGroupPriority;
private ExecutionStatus state;
private Date firstSubmitTime;
private Date submitTime;
private Date startTime;
private Date endTime;
private String host;
private String executePath;
private String logPath;
private int retryTimes;
private Flag alertFlag;
private int pid;
private String appLink;
private Flag flag;
private String duration;
private int maxRetryTimes;
private int retryInterval;
private Priority taskInstancePriority;
private Priority processInstancePriority;
private String workerGroup;
private Long environmentCode;
private String environmentConfig;
private int executorId;
private String varPool;
private String executorName;
private Map<String, String> resources;
private int delayTime;
private String taskParams;
private int dryRun;
private int taskGroupId;
private Integer cpuQuota;
private Integer memoryMax;
}

154
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/WorkflowExecuteDto.java

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.dto;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import java.util.Collection;
import java.util.Date;
import lombok.Getter;
import lombok.Setter;
@Setter
@Getter
public class WorkflowExecuteDto {
private int id;
private String name;
private Long processDefinitionCode;
private int processDefinitionVersion;
private ExecutionStatus state;
/**
* recovery flag for failover
*/
private Flag recovery;
private Date startTime;
private Date endTime;
private int runTimes;
private String host;
private CommandType commandType;
private String commandParam;
/**
* node depend type
*/
private TaskDependType taskDependType;
private int maxTryTimes;
/**
* failure strategy when task failed.
*/
private FailureStrategy failureStrategy;
/**
* warning type
*/
private WarningType warningType;
private Integer warningGroupId;
private Date scheduleTime;
private Date commandStartTime;
/**
* user define parameters string
*/
private String globalParams;
/**
* executor id
*/
private int executorId;
/**
* executor name
*/
private String executorName;
/**
* tenant code
*/
private String tenantCode;
/**
* queue
*/
private String queue;
/**
* process is sub process
*/
private Flag isSubProcess;
/**
* history command
*/
private String historyCmd;
/**
* depend processes schedule time
*/
private String dependenceScheduleTimes;
private String duration;
private Priority processInstancePriority;
private String workerGroup;
private Long environmentCode;
private int timeout;
private int tenantId;
/**
* varPool string
*/
private String varPool;
private int nextProcessInstanceId;
private int dryRun;
private Date restartTime;
private boolean isBlocked;
private Collection<TaskInstanceExecuteDto> taskInstances;
}

36
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java

@ -17,11 +17,13 @@
package org.apache.dolphinscheduler.remote.processor; package org.apache.dolphinscheduler.remote.processor;
import static org.apache.dolphinscheduler.common.Constants.HTTP_CONNECTION_REQUEST_TIMEOUT;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.Optional; import java.util.Optional;
@ -110,17 +112,6 @@ public class StateEventCallbackService {
REMOTE_CHANNELS.remove(host); REMOTE_CHANNELS.remove(host);
} }
/**
* Send the command to target address, this method doesn't guarantee the command send success.
*
* @param command command need tp send
*/
public void sendResult(String address, int port, Command command) {
logger.info("send result, host:{}, command:{}", address, command.toString());
Host host = new Host(address, port);
sendResult(host, command);
}
/** /**
* Send the command to target host, this method doesn't guarantee the command send success. * Send the command to target host, this method doesn't guarantee the command send success.
* *
@ -133,4 +124,27 @@ public class StateEventCallbackService {
nettyRemoteChannel.writeAndFlush(command); nettyRemoteChannel.writeAndFlush(command);
}); });
} }
/**
* send sync and return response command
* @param host
* @param requestCommand
* @return
* @throws RemotingException
* @throws InterruptedException
*/
public Command sendSync(Host host, Command requestCommand) {
try {
return this.nettyRemotingClient.sendSync(host, requestCommand, HTTP_CONNECTION_REQUEST_TIMEOUT);
} catch (InterruptedException e) {
logger.error("send sync fail, host:{}, command:{}", host, requestCommand, e);
Thread.currentThread().interrupt();
} catch (RemotingException e) {
logger.error("send sync fail, host:{}, command:{}", host, requestCommand, e);
}
finally {
this.nettyRemotingClient.closeChannel(host);
}
return null;
}
} }

15
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.service.process; package org.apache.dolphinscheduler.service.process;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
@ -30,8 +31,6 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
import static java.util.stream.Collectors.toSet;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
@ -334,14 +333,12 @@ public class ProcessServiceImpl implements ProcessService {
int update = updateProcessInstance(info); int update = updateProcessInstance(info);
// determine whether the process is normal // determine whether the process is normal
if (update > 0) { if (update > 0) {
String host = info.getHost(); Host host = new Host(info.getHost());
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
info.getId(), 0, info.getState(), info.getId(), 0 info.getId(), 0, info.getState(), info.getId(), 0
); );
try { try {
stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command());
} catch (Exception e) { } catch (Exception e) {
logger.error("sendResultError"); logger.error("sendResultError");
} }
@ -3035,13 +3032,11 @@ public class ProcessServiceImpl implements ProcessService {
@Override @Override
public void sendStartTask2Master(ProcessInstance processInstance, int taskId, public void sendStartTask2Master(ProcessInstance processInstance, int taskId,
org.apache.dolphinscheduler.remote.command.CommandType taskType) { org.apache.dolphinscheduler.remote.command.CommandType taskType) {
String host = processInstance.getHost(); Host host = new Host(processInstance.getHost());
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
TaskEventChangeCommand taskEventChangeCommand = new TaskEventChangeCommand( TaskEventChangeCommand taskEventChangeCommand = new TaskEventChangeCommand(
processInstance.getId(), taskId processInstance.getId(), taskId
); );
stateEventCallbackService.sendResult(address, port, taskEventChangeCommand.convert2Command(taskType)); stateEventCallbackService.sendResult(host, taskEventChangeCommand.convert2Command(taskType));
} }
@Override @Override

Loading…
Cancel
Save