diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java index 90c4ecab51..aa48c67448 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.controller; import static org.apache.dolphinscheduler.api.enums.Status.BATCH_EXECUTE_PROCESS_INSTANCE_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.CHECK_PROCESS_DEFINITION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.EXECUTE_PROCESS_INSTANCE_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_EXECUTING_WORKFLOW_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.START_PROCESS_INSTANCE_ERROR; import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; @@ -38,11 +39,23 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto; + +import org.apache.commons.lang3.StringUtils; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestAttribute; @@ -58,16 +71,6 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import springfox.documentation.annotations.ApiIgnore; -import org.apache.commons.lang3.StringUtils; - -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - /** * executor controller */ @@ -361,4 +364,20 @@ public class ExecutorController extends BaseController { Map result = execService.startCheckByProcessDefinedCode(processDefinitionCode); return returnDataList(result); } + + /** + * query execute data of processInstance from master + */ + @ApiOperation(value = "queryExecutingWorkflow", notes = "QUERY_WORKFLOW_EXECUTE_DATA") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100") + }) + @GetMapping(value = "/query-executing-workflow") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_EXECUTING_WORKFLOW_ERROR) + @AccessLogAnnotation + public Result queryExecutingWorkflow(@RequestParam("id") Integer processInstanceId) { + WorkflowExecuteDto workflowExecuteDto = execService.queryExecutingWorkflowByProcessInstanceId(processInstanceId); + return Result.success(workflowExecuteDto); + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java index 47a9f2183c..90711242f1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java @@ -17,19 +17,17 @@ package org.apache.dolphinscheduler.api.controller; -import java.io.IOException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import static org.apache.dolphinscheduler.api.enums.Status.BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.ENCAPSULATION_PROCESS_INSTANCE_GANTT_STRUCTURE_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PARENT_PROCESS_INSTANCE_DETAIL_INFO_BY_SUB_PROCESS_INSTANCE_ID_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_ALL_VARIABLES_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_BY_ID_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_SUB_PROCESS_INSTANCE_DETAIL_INFO_BY_TASK_ID_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_BY_PROCESS_INSTANCE_ID_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_PROCESS_INSTANCE_ERROR; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiImplicitParam; -import io.swagger.annotations.ApiImplicitParams; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiParam; -import org.apache.commons.lang3.StringUtils; import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ApiException; @@ -40,6 +38,16 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; + +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -54,17 +62,13 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; import springfox.documentation.annotations.ApiIgnore; -import static org.apache.dolphinscheduler.api.enums.Status.BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.ENCAPSULATION_PROCESS_INSTANCE_GANTT_STRUCTURE_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PARENT_PROCESS_INSTANCE_DETAIL_INFO_BY_SUB_PROCESS_INSTANCE_ID_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_ALL_VARIABLES_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_BY_ID_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.QUERY_SUB_PROCESS_INSTANCE_DETAIL_INFO_BY_TASK_ID_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_BY_PROCESS_INSTANCE_ID_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_PROCESS_INSTANCE_ERROR; /** * process instance controller diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 91a45a60a5..25212caafb 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -216,6 +216,7 @@ public enum Status { QUERY_AUTHORIZED_USER(10183, "query authorized user error", "查询拥有项目权限的用户错误"), PROJECT_NOT_EXIST(10190, "This project was not found. Please refresh page.", "该项目不存在,请刷新页面"), TASK_INSTANCE_HOST_IS_NULL(10191, "task instance host is null", "任务实例host为空"), + QUERY_EXECUTING_WORKFLOW_ERROR(10192, "query executing workflow error", "查询运行的工作流实例错误"), UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 2d5868b251..b5cf04bd5e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto; import java.util.Map; @@ -111,4 +112,11 @@ public interface ExecutorService { * @return */ Map forceStartTaskInstance(User loginUser, int queueId); + + /** + * query executing workflow data in Master memory + * @param processInstanceId + * @return + */ + WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 2bdd7eb6f4..a414891d63 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -69,6 +69,9 @@ import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; +import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataRequestCommand; +import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataResponseCommand; +import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.cron.CronUtils; @@ -991,4 +994,26 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ } return null; } + + /** + * query executing data of processInstance by master + * @param processInstanceId + * @return + */ + @Override + public WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId) { + ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); + if (processInstance == null) { + return null; + } + Host host = new Host(processInstance.getHost()); + WorkflowExecutingDataRequestCommand requestCommand = new WorkflowExecutingDataRequestCommand(); + requestCommand.setProcessInstanceId(processInstanceId); + org.apache.dolphinscheduler.remote.command.Command command = stateEventCallbackService.sendSync(host, requestCommand.convert2Command()); + if (command == null) { + return null; + } + WorkflowExecutingDataResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), WorkflowExecutingDataResponseCommand.class); + return responseCommand.getWorkflowExecuteDto(); + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 19f4df3333..2335ff4c88 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -17,26 +17,17 @@ package org.apache.dolphinscheduler.api.service.impl; -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.function.Function; -import java.util.stream.Collectors; - -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE; +import static org.apache.dolphinscheduler.common.Constants.DATA_LIST; +import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; +import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE; +import static org.apache.dolphinscheduler.common.Constants.TASK_LIST; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.dolphinscheduler.api.dto.gantt.GanttDto; import org.apache.dolphinscheduler.api.dto.gantt.Task; import org.apache.dolphinscheduler.api.enums.Status; @@ -51,7 +42,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.Flag; -import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; @@ -81,21 +71,34 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; +import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.task.TaskPluginManager; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE; -import static org.apache.dolphinscheduler.common.Constants.DATA_LIST; -import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; -import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS; -import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; -import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE; -import static org.apache.dolphinscheduler.common.Constants.TASK_LIST; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; /** * process instance service impl @@ -459,7 +462,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce String locations, int timeout, String tenantCode) { Project project = projectMapper.queryByCode(projectCode); //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode,INSTANCE_UPDATE ); + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode,INSTANCE_UPDATE); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -833,5 +836,4 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce public List queryByProcessDefineCode(Long processDefinitionCode, int size) { return processInstanceMapper.queryByProcessDefineCode(processDefinitionCode, size); } - } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/controller/WorkflowExecuteController.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/controller/WorkflowExecuteController.java new file mode 100644 index 0000000000..793dc40439 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/controller/WorkflowExecuteController.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.controller; + +import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto; +import org.apache.dolphinscheduler.server.master.service.ExecutingService; + +import java.util.Optional; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/workflow/execute") +public class WorkflowExecuteController { + + @Autowired + private ExecutingService executingService; + + /** + * query workflow execute data in memory + * @param processInstanceId + * @return + */ + @GetMapping("") + @ResponseStatus(HttpStatus.OK) + public WorkflowExecuteDto queryExecuteData(@RequestParam("id") int processInstanceId) { + Optional workflowExecuteDtoOptional = executingService.queryWorkflowExecutingData(processInstanceId); + return workflowExecuteDtoOptional.orElse(null); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowExecutingDataRequestProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowExecutingDataRequestProcessor.java new file mode 100644 index 0000000000..c8f70d96d0 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowExecutingDataRequestProcessor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.processor; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataRequestCommand; +import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataResponseCommand; +import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.server.master.service.ExecutingService; + +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.google.common.base.Preconditions; + +import io.netty.channel.Channel; + +/** + * workflow executing data process from api/master + */ +@Component +public class WorkflowExecutingDataRequestProcessor implements NettyRequestProcessor { + + private final Logger logger = LoggerFactory.getLogger(WorkflowExecutingDataRequestProcessor.class); + + @Autowired + private ExecutingService executingService; + + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST == command.getType(), String.format("invalid command type: %s", command.getType())); + + WorkflowExecutingDataRequestCommand requestCommand = JSONUtils.parseObject(command.getBody(), WorkflowExecutingDataRequestCommand.class); + + logger.info("received command, processInstanceId:{}", requestCommand.getProcessInstanceId()); + + Optional workflowExecuteDtoOptional = executingService.queryWorkflowExecutingData(requestCommand.getProcessInstanceId()); + + WorkflowExecutingDataResponseCommand responseCommand = new WorkflowExecutingDataResponseCommand(); + workflowExecuteDtoOptional.ifPresent(responseCommand::setWorkflowExecuteDto); + channel.writeAndFlush(responseCommand.convert2Command(command.getOpaque())); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java index e0b65dd077..49d40a4b04 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponsePr import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor; +import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor; import javax.annotation.PostConstruct; @@ -74,6 +75,9 @@ public class MasterRPCServer implements AutoCloseable { @Autowired private LoggerRequestProcessor loggerRequestProcessor; + @Autowired + private WorkflowExecutingDataRequestProcessor workflowExecutingDataRequestProcessor; + @PostConstruct private void init() { // init remoting server @@ -88,6 +92,7 @@ public class MasterRPCServer implements AutoCloseable { this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST, workflowExecutingDataRequestProcessor); // logger server this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java new file mode 100644 index 0000000000..fa85b66f82 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.service; + +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.remote.dto.TaskInstanceExecuteDto; +import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.controller.WorkflowExecuteController; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; + +import org.apache.commons.beanutils.BeanUtils; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.compress.utils.Lists; + +import java.lang.reflect.InvocationTargetException; +import java.util.List; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * executing service, to query executing data from memory, such workflow instance + */ +@Component +public class ExecutingService { + + private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteController.class); + + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + + public Optional queryWorkflowExecutingData(Integer processInstanceId) { + WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + if (workflowExecuteRunnable == null) { + logger.info("workflow execute data not found, maybe it has finished, workflow id:{}", processInstanceId); + return Optional.empty(); + } + try { + WorkflowExecuteDto workflowExecuteDto = new WorkflowExecuteDto(); + BeanUtils.copyProperties(workflowExecuteDto, workflowExecuteRunnable.getProcessInstance()); + List taskInstanceList = Lists.newArrayList(); + if (CollectionUtils.isNotEmpty(workflowExecuteRunnable.getAllTaskInstances())) { + for (TaskInstance taskInstance : workflowExecuteRunnable.getAllTaskInstances()) { + TaskInstanceExecuteDto taskInstanceExecuteDto = new TaskInstanceExecuteDto(); + BeanUtils.copyProperties(taskInstanceExecuteDto, taskInstance); + taskInstanceList.add(taskInstanceExecuteDto); + } + } + workflowExecuteDto.setTaskInstances(taskInstanceList); + return Optional.of(workflowExecuteDto); + } catch (IllegalAccessException | InvocationTargetException e) { + logger.error("query workflow execute data fail, workflow id:{}", processInstanceId, e); + } + return Optional.empty(); + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index e540efddfc..43a64fc559 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -158,5 +158,15 @@ public enum CommandType { /** * task state event request */ - TASK_WAKEUP_EVENT_REQUEST; + TASK_WAKEUP_EVENT_REQUEST, + + /** + * workflow executing data request, from api to master + */ + WORKFLOW_EXECUTING_DATA_REQUEST, + + /** + * workflow executing data response, from master to api + */ + WORKFLOW_EXECUTING_DATA_RESPONSE; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataRequestCommand.java new file mode 100644 index 0000000000..6085322831 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataRequestCommand.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; + +import java.io.Serializable; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * workflow executing data request, from api to master + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowExecutingDataRequestCommand implements Serializable { + + private Integer processInstanceId; + + /** + * package request command + * + * @return command + */ + public Command convert2Command() { + Command command = new Command(); + command.setType(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST); + byte[] body = JSONUtils.toJsonByteArray(this); + command.setBody(body); + return command; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataResponseCommand.java new file mode 100644 index 0000000000..2d2dfa20b0 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataResponseCommand.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto; + +import java.io.Serializable; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * workflow executing data response, from master to api + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowExecutingDataResponseCommand implements Serializable { + + private WorkflowExecuteDto workflowExecuteDto; + + /** + * package request command + * + * @return command + */ + public Command convert2Command(long opaque) { + Command command = new Command(opaque); + command.setType(CommandType.WORKFLOW_EXECUTING_DATA_RESPONSE); + byte[] body = JSONUtils.toJsonByteArray(this); + command.setBody(body); + return command; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/TaskInstanceExecuteDto.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/TaskInstanceExecuteDto.java new file mode 100644 index 0000000000..4bfae1036d --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/TaskInstanceExecuteDto.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.dto; + +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; + +import java.util.Date; +import java.util.Map; + +import lombok.Data; + +@Data +public class TaskInstanceExecuteDto { + + private int id; + + private String name; + + private String taskType; + + private int processInstanceId; + + private long taskCode; + + private int taskDefinitionVersion; + + private String processInstanceName; + + private int taskGroupPriority; + + private ExecutionStatus state; + + private Date firstSubmitTime; + + private Date submitTime; + + private Date startTime; + + private Date endTime; + + private String host; + + private String executePath; + + private String logPath; + + private int retryTimes; + + private Flag alertFlag; + + private int pid; + + private String appLink; + + private Flag flag; + + private String duration; + + private int maxRetryTimes; + + private int retryInterval; + + private Priority taskInstancePriority; + + private Priority processInstancePriority; + + private String workerGroup; + + private Long environmentCode; + + private String environmentConfig; + + private int executorId; + + private String varPool; + + private String executorName; + + private Map resources; + + private int delayTime; + + private String taskParams; + + private int dryRun; + + private int taskGroupId; + + private Integer cpuQuota; + + private Integer memoryMax; +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/WorkflowExecuteDto.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/WorkflowExecuteDto.java new file mode 100644 index 0000000000..64d5542b94 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/WorkflowExecuteDto.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.dto; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; + +import java.util.Collection; +import java.util.Date; + +import lombok.Getter; +import lombok.Setter; + +@Setter +@Getter +public class WorkflowExecuteDto { + + private int id; + + private String name; + + private Long processDefinitionCode; + + private int processDefinitionVersion; + + private ExecutionStatus state; + + /** + * recovery flag for failover + */ + private Flag recovery; + + private Date startTime; + + private Date endTime; + + private int runTimes; + + private String host; + + private CommandType commandType; + + private String commandParam; + + /** + * node depend type + */ + private TaskDependType taskDependType; + + private int maxTryTimes; + + /** + * failure strategy when task failed. + */ + private FailureStrategy failureStrategy; + + /** + * warning type + */ + private WarningType warningType; + + private Integer warningGroupId; + + private Date scheduleTime; + + private Date commandStartTime; + + /** + * user define parameters string + */ + private String globalParams; + + /** + * executor id + */ + private int executorId; + + /** + * executor name + */ + private String executorName; + + /** + * tenant code + */ + private String tenantCode; + + /** + * queue + */ + private String queue; + + /** + * process is sub process + */ + private Flag isSubProcess; + + /** + * history command + */ + private String historyCmd; + + /** + * depend processes schedule time + */ + private String dependenceScheduleTimes; + + private String duration; + + private Priority processInstancePriority; + + private String workerGroup; + + private Long environmentCode; + + private int timeout; + + private int tenantId; + + /** + * varPool string + */ + private String varPool; + + private int nextProcessInstanceId; + + private int dryRun; + + private Date restartTime; + + private boolean isBlocked; + + private Collection taskInstances; +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java index be564261fb..32a0f4673d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java @@ -17,11 +17,13 @@ package org.apache.dolphinscheduler.remote.processor; +import static org.apache.dolphinscheduler.common.Constants.HTTP_CONNECTION_REQUEST_TIMEOUT; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.utils.Host; import java.util.Optional; @@ -110,17 +112,6 @@ public class StateEventCallbackService { REMOTE_CHANNELS.remove(host); } - /** - * Send the command to target address, this method doesn't guarantee the command send success. - * - * @param command command need tp send - */ - public void sendResult(String address, int port, Command command) { - logger.info("send result, host:{}, command:{}", address, command.toString()); - Host host = new Host(address, port); - sendResult(host, command); - } - /** * Send the command to target host, this method doesn't guarantee the command send success. * @@ -133,4 +124,27 @@ public class StateEventCallbackService { nettyRemoteChannel.writeAndFlush(command); }); } + + /** + * send sync and return response command + * @param host + * @param requestCommand + * @return + * @throws RemotingException + * @throws InterruptedException + */ + public Command sendSync(Host host, Command requestCommand) { + try { + return this.nettyRemotingClient.sendSync(host, requestCommand, HTTP_CONNECTION_REQUEST_TIMEOUT); + } catch (InterruptedException e) { + logger.error("send sync fail, host:{}, command:{}", host, requestCommand, e); + Thread.currentThread().interrupt(); + } catch (RemotingException e) { + logger.error("send sync fail, host:{}, command:{}", host, requestCommand, e); + } + finally { + this.nettyRemotingClient.closeChannel(host); + } + return null; + } }