diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java index 0a6fed5072..6532dbda1c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.controller; import static org.apache.dolphinscheduler.api.enums.Status.CHECK_PROCESS_DEFINITION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.EXECUTE_PROCESS_INSTANCE_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_EXECUTING_WORKFLOW_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.START_PROCESS_INSTANCE_ERROR; import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; @@ -37,9 +38,21 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto; + +import org.apache.commons.lang3.StringUtils; + +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestAttribute; @@ -55,12 +68,6 @@ import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import springfox.documentation.annotations.ApiIgnore; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; /** * executor controller @@ -303,4 +310,20 @@ public class ExecutorController extends BaseController { Map result = execService.startCheckByProcessDefinedCode(processDefinitionCode); return returnDataList(result); } + + /** + * query execute data of processInstance from master + */ + @ApiOperation(value = "queryExecutingWorkflow", notes = "QUERY_WORKFLOW_EXECUTE_DATA") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100") + }) + @GetMapping(value = "/query-executing-workflow") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_EXECUTING_WORKFLOW_ERROR) + @AccessLogAnnotation + public Result queryExecutingWorkflow(@RequestParam("id") Integer processInstanceId) { + WorkflowExecuteDto workflowExecuteDto = execService.queryExecutingWorkflowByProcessInstanceId(processInstanceId); + return Result.success(workflowExecuteDto); + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java index 2484d9ec7b..2000b80245 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java @@ -43,7 +43,10 @@ import org.apache.commons.lang.StringUtils; import java.io.IOException; import java.text.MessageFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 3315ef671e..7017fa6d95 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -216,6 +216,7 @@ public enum Status { QUERY_AUTHORIZED_USER(10183, "query authorized user error", "查询拥有项目权限的用户错误"), PROJECT_NOT_EXIST(10190, "This project was not found. Please refresh page.", "该项目不存在,请刷新页面"), TASK_INSTANCE_HOST_IS_NULL(10191, "task instance host is null", "任务实例host为空"), + QUERY_EXECUTING_WORKFLOW_ERROR(10192, "query executing workflow error", "查询运行的工作流实例错误"), UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 2d5868b251..b5cf04bd5e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto; import java.util.Map; @@ -111,4 +112,11 @@ public interface ExecutorService { * @return */ Map forceStartTaskInstance(User loginUser, int queueId); + + /** + * query executing workflow data in Master memory + * @param processInstanceId + * @return + */ + WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index b58cc9dbcd..da9f9a742e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -64,6 +64,9 @@ import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; +import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataRequestCommand; +import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataResponseCommand; +import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -851,4 +854,26 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return validDependentProcessDefinitionList; } + + /** + * query executing data of processInstance by master + * @param processInstanceId + * @return + */ + @Override + public WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId) { + ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); + if (processInstance == null) { + return null; + } + Host host = new Host(processInstance.getHost()); + WorkflowExecutingDataRequestCommand requestCommand = new WorkflowExecutingDataRequestCommand(); + requestCommand.setProcessInstanceId(processInstanceId); + org.apache.dolphinscheduler.remote.command.Command command = stateEventCallbackService.sendSync(host, requestCommand.convert2Command()); + if (command == null) { + return null; + } + WorkflowExecutingDataResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), WorkflowExecutingDataResponseCommand.class); + return responseCommand.getWorkflowExecuteDto(); + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index ae570bc725..dd17255fdb 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -829,5 +829,4 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce public List queryByProcessDefineCode(Long processDefinitionCode, int size) { return processInstanceMapper.queryByProcessDefineCode(processDefinitionCode, size); } - } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/controller/WorkflowExecuteController.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/controller/WorkflowExecuteController.java new file mode 100644 index 0000000000..793dc40439 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/controller/WorkflowExecuteController.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.controller; + +import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto; +import org.apache.dolphinscheduler.server.master.service.ExecutingService; + +import java.util.Optional; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/workflow/execute") +public class WorkflowExecuteController { + + @Autowired + private ExecutingService executingService; + + /** + * query workflow execute data in memory + * @param processInstanceId + * @return + */ + @GetMapping("") + @ResponseStatus(HttpStatus.OK) + public WorkflowExecuteDto queryExecuteData(@RequestParam("id") int processInstanceId) { + Optional workflowExecuteDtoOptional = executingService.queryWorkflowExecutingData(processInstanceId); + return workflowExecuteDtoOptional.orElse(null); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowExecutingDataRequestProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowExecutingDataRequestProcessor.java new file mode 100644 index 0000000000..c8f70d96d0 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowExecutingDataRequestProcessor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.processor; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataRequestCommand; +import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataResponseCommand; +import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.server.master.service.ExecutingService; + +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.google.common.base.Preconditions; + +import io.netty.channel.Channel; + +/** + * workflow executing data process from api/master + */ +@Component +public class WorkflowExecutingDataRequestProcessor implements NettyRequestProcessor { + + private final Logger logger = LoggerFactory.getLogger(WorkflowExecutingDataRequestProcessor.class); + + @Autowired + private ExecutingService executingService; + + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST == command.getType(), String.format("invalid command type: %s", command.getType())); + + WorkflowExecutingDataRequestCommand requestCommand = JSONUtils.parseObject(command.getBody(), WorkflowExecutingDataRequestCommand.class); + + logger.info("received command, processInstanceId:{}", requestCommand.getProcessInstanceId()); + + Optional workflowExecuteDtoOptional = executingService.queryWorkflowExecutingData(requestCommand.getProcessInstanceId()); + + WorkflowExecutingDataResponseCommand responseCommand = new WorkflowExecutingDataResponseCommand(); + workflowExecuteDtoOptional.ifPresent(responseCommand::setWorkflowExecuteDto); + channel.writeAndFlush(responseCommand.convert2Command(command.getOpaque())); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java index 24c3530462..12bbef36f9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; +import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor; import javax.annotation.PostConstruct; @@ -70,6 +71,9 @@ public class MasterRPCServer implements AutoCloseable { @Autowired private LoggerRequestProcessor loggerRequestProcessor; + @Autowired + private WorkflowExecutingDataRequestProcessor workflowExecutingDataRequestProcessor; + @PostConstruct private void init() { // init remoting server @@ -83,6 +87,7 @@ public class MasterRPCServer implements AutoCloseable { this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST, workflowExecutingDataRequestProcessor); // logger server this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index bb49d9dea1..e61643ab36 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.enums.Flag; -import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; @@ -27,8 +26,10 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; +import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.lang.StringUtils; @@ -46,8 +47,6 @@ import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; -import com.google.common.base.Strings; - import lombok.NonNull; /** @@ -191,11 +190,9 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { logger.error("process {} host is empty, cannot notify task {} now", processInstance.getId(), taskInstance.getId()); return; } - String address = host.split(":")[0]; - int port = Integer.parseInt(host.split(":")[1]); StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( finishProcessInstance.getId(), 0, finishProcessInstance.getState(), processInstance.getId(), taskInstance.getId() ); - stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); + stateEventCallbackService.sendResult(new Host(host), stateEventChangeCommand.convert2Command()); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index 747c3dd77a..6c27adae57 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; +import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -210,9 +211,7 @@ public class SubTaskProcessor extends BaseTaskProcessor { StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( processInstance.getId(), taskInstance.getId(), subProcessInstance.getState(), subProcessInstance.getId(), 0 ); - String address = subProcessInstance.getHost().split(":")[0]; - int port = Integer.parseInt(subProcessInstance.getHost().split(":")[1]); - this.stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); + this.stateEventCallbackService.sendResult(new Host(subProcessInstance.getHost()), stateEventChangeCommand.convert2Command()); } @Override diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java new file mode 100644 index 0000000000..fa85b66f82 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.service; + +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.remote.dto.TaskInstanceExecuteDto; +import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.controller.WorkflowExecuteController; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; + +import org.apache.commons.beanutils.BeanUtils; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.compress.utils.Lists; + +import java.lang.reflect.InvocationTargetException; +import java.util.List; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * executing service, to query executing data from memory, such workflow instance + */ +@Component +public class ExecutingService { + + private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteController.class); + + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + + public Optional queryWorkflowExecutingData(Integer processInstanceId) { + WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + if (workflowExecuteRunnable == null) { + logger.info("workflow execute data not found, maybe it has finished, workflow id:{}", processInstanceId); + return Optional.empty(); + } + try { + WorkflowExecuteDto workflowExecuteDto = new WorkflowExecuteDto(); + BeanUtils.copyProperties(workflowExecuteDto, workflowExecuteRunnable.getProcessInstance()); + List taskInstanceList = Lists.newArrayList(); + if (CollectionUtils.isNotEmpty(workflowExecuteRunnable.getAllTaskInstances())) { + for (TaskInstance taskInstance : workflowExecuteRunnable.getAllTaskInstances()) { + TaskInstanceExecuteDto taskInstanceExecuteDto = new TaskInstanceExecuteDto(); + BeanUtils.copyProperties(taskInstanceExecuteDto, taskInstance); + taskInstanceList.add(taskInstanceExecuteDto); + } + } + workflowExecuteDto.setTaskInstances(taskInstanceList); + return Optional.of(workflowExecuteDto); + } catch (IllegalAccessException | InvocationTargetException e) { + logger.error("query workflow execute data fail, workflow id:{}", processInstanceId, e); + } + return Optional.empty(); + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index ed4c9ab94c..62bc7a53fc 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -130,5 +130,15 @@ public enum CommandType { /** * task state event request */ - TASK_WAKEUP_EVENT_REQUEST; + TASK_WAKEUP_EVENT_REQUEST, + + /** + * workflow executing data request, from api to master + */ + WORKFLOW_EXECUTING_DATA_REQUEST, + + /** + * workflow executing data response, from master to api + */ + WORKFLOW_EXECUTING_DATA_RESPONSE; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataRequestCommand.java new file mode 100644 index 0000000000..6085322831 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataRequestCommand.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; + +import java.io.Serializable; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * workflow executing data request, from api to master + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowExecutingDataRequestCommand implements Serializable { + + private Integer processInstanceId; + + /** + * package request command + * + * @return command + */ + public Command convert2Command() { + Command command = new Command(); + command.setType(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST); + byte[] body = JSONUtils.toJsonByteArray(this); + command.setBody(body); + return command; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataResponseCommand.java new file mode 100644 index 0000000000..2d2dfa20b0 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowExecutingDataResponseCommand.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto; + +import java.io.Serializable; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * workflow executing data response, from master to api + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class WorkflowExecutingDataResponseCommand implements Serializable { + + private WorkflowExecuteDto workflowExecuteDto; + + /** + * package request command + * + * @return command + */ + public Command convert2Command(long opaque) { + Command command = new Command(opaque); + command.setType(CommandType.WORKFLOW_EXECUTING_DATA_RESPONSE); + byte[] body = JSONUtils.toJsonByteArray(this); + command.setBody(body); + return command; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/TaskInstanceExecuteDto.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/TaskInstanceExecuteDto.java new file mode 100644 index 0000000000..4bfae1036d --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/TaskInstanceExecuteDto.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.dto; + +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; + +import java.util.Date; +import java.util.Map; + +import lombok.Data; + +@Data +public class TaskInstanceExecuteDto { + + private int id; + + private String name; + + private String taskType; + + private int processInstanceId; + + private long taskCode; + + private int taskDefinitionVersion; + + private String processInstanceName; + + private int taskGroupPriority; + + private ExecutionStatus state; + + private Date firstSubmitTime; + + private Date submitTime; + + private Date startTime; + + private Date endTime; + + private String host; + + private String executePath; + + private String logPath; + + private int retryTimes; + + private Flag alertFlag; + + private int pid; + + private String appLink; + + private Flag flag; + + private String duration; + + private int maxRetryTimes; + + private int retryInterval; + + private Priority taskInstancePriority; + + private Priority processInstancePriority; + + private String workerGroup; + + private Long environmentCode; + + private String environmentConfig; + + private int executorId; + + private String varPool; + + private String executorName; + + private Map resources; + + private int delayTime; + + private String taskParams; + + private int dryRun; + + private int taskGroupId; + + private Integer cpuQuota; + + private Integer memoryMax; +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/WorkflowExecuteDto.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/WorkflowExecuteDto.java new file mode 100644 index 0000000000..64d5542b94 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/dto/WorkflowExecuteDto.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.dto; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; + +import java.util.Collection; +import java.util.Date; + +import lombok.Getter; +import lombok.Setter; + +@Setter +@Getter +public class WorkflowExecuteDto { + + private int id; + + private String name; + + private Long processDefinitionCode; + + private int processDefinitionVersion; + + private ExecutionStatus state; + + /** + * recovery flag for failover + */ + private Flag recovery; + + private Date startTime; + + private Date endTime; + + private int runTimes; + + private String host; + + private CommandType commandType; + + private String commandParam; + + /** + * node depend type + */ + private TaskDependType taskDependType; + + private int maxTryTimes; + + /** + * failure strategy when task failed. + */ + private FailureStrategy failureStrategy; + + /** + * warning type + */ + private WarningType warningType; + + private Integer warningGroupId; + + private Date scheduleTime; + + private Date commandStartTime; + + /** + * user define parameters string + */ + private String globalParams; + + /** + * executor id + */ + private int executorId; + + /** + * executor name + */ + private String executorName; + + /** + * tenant code + */ + private String tenantCode; + + /** + * queue + */ + private String queue; + + /** + * process is sub process + */ + private Flag isSubProcess; + + /** + * history command + */ + private String historyCmd; + + /** + * depend processes schedule time + */ + private String dependenceScheduleTimes; + + private String duration; + + private Priority processInstancePriority; + + private String workerGroup; + + private Long environmentCode; + + private int timeout; + + private int tenantId; + + /** + * varPool string + */ + private String varPool; + + private int nextProcessInstanceId; + + private int dryRun; + + private Date restartTime; + + private boolean isBlocked; + + private Collection taskInstances; +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java index be564261fb..32a0f4673d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java @@ -17,11 +17,13 @@ package org.apache.dolphinscheduler.remote.processor; +import static org.apache.dolphinscheduler.common.Constants.HTTP_CONNECTION_REQUEST_TIMEOUT; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.utils.Host; import java.util.Optional; @@ -110,17 +112,6 @@ public class StateEventCallbackService { REMOTE_CHANNELS.remove(host); } - /** - * Send the command to target address, this method doesn't guarantee the command send success. - * - * @param command command need tp send - */ - public void sendResult(String address, int port, Command command) { - logger.info("send result, host:{}, command:{}", address, command.toString()); - Host host = new Host(address, port); - sendResult(host, command); - } - /** * Send the command to target host, this method doesn't guarantee the command send success. * @@ -133,4 +124,27 @@ public class StateEventCallbackService { nettyRemoteChannel.writeAndFlush(command); }); } + + /** + * send sync and return response command + * @param host + * @param requestCommand + * @return + * @throws RemotingException + * @throws InterruptedException + */ + public Command sendSync(Host host, Command requestCommand) { + try { + return this.nettyRemotingClient.sendSync(host, requestCommand, HTTP_CONNECTION_REQUEST_TIMEOUT); + } catch (InterruptedException e) { + logger.error("send sync fail, host:{}, command:{}", host, requestCommand, e); + Thread.currentThread().interrupt(); + } catch (RemotingException e) { + logger.error("send sync fail, host:{}, command:{}", host, requestCommand, e); + } + finally { + this.nettyRemotingClient.closeChannel(host); + } + return null; + } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 9cce9152f2..2f0e1c077a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.service.process; +import static java.util.stream.Collectors.toSet; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; @@ -30,8 +31,6 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; -import static java.util.stream.Collectors.toSet; - import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -334,14 +333,12 @@ public class ProcessServiceImpl implements ProcessService { int update = updateProcessInstance(info); // determine whether the process is normal if (update > 0) { - String host = info.getHost(); - String address = host.split(":")[0]; - int port = Integer.parseInt(host.split(":")[1]); + Host host = new Host(info.getHost()); StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( info.getId(), 0, info.getState(), info.getId(), 0 ); try { - stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); + stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command()); } catch (Exception e) { logger.error("sendResultError"); } @@ -3035,13 +3032,11 @@ public class ProcessServiceImpl implements ProcessService { @Override public void sendStartTask2Master(ProcessInstance processInstance, int taskId, org.apache.dolphinscheduler.remote.command.CommandType taskType) { - String host = processInstance.getHost(); - String address = host.split(":")[0]; - int port = Integer.parseInt(host.split(":")[1]); + Host host = new Host(processInstance.getHost()); TaskEventChangeCommand taskEventChangeCommand = new TaskEventChangeCommand( processInstance.getId(), taskId ); - stateEventCallbackService.sendResult(address, port, taskEventChangeCommand.convert2Command(taskType)); + stateEventCallbackService.sendResult(host, taskEventChangeCommand.convert2Command(taskType)); } @Override