From 394805b2c737e7cebf8e9718bc64093ede9513e6 Mon Sep 17 00:00:00 2001 From: Eric Gao Date: Thu, 9 Mar 2023 11:30:21 +0800 Subject: [PATCH] [Feature][Metrics] Tag workflow related metrics with process definition code (workflow id) (#13640) * Tag workflow related metrics with process definition code (workflow id) * Clean up related metrics when deleting workflow definition * Add license headers * Update related UT cases * Add an example in grafana-demo * Add related docs --- docs/docs/en/guide/metrics/metrics.md | 3 +- docs/docs/zh/guide/metrics/metrics.md | 2 +- .../api/service/MetricsCleanUpService.java | 24 ++ .../impl/MetricsCleanUpServiceImpl.java | 62 ++++ .../impl/ProcessDefinitionServiceImpl.java | 6 + .../service/ProcessDefinitionServiceTest.java | 10 +- .../event/WorkflowStartEventHandler.java | 3 +- .../event/WorkflowStateEventHandler.java | 13 +- .../WorkflowTimeoutStateEventHandler.java | 5 +- .../metrics/ProcessInstanceMetrics.java | 38 ++- .../WorkflowMetricsCleanUpProcessor.java | 47 +++ .../server/master/rpc/MasterRPCServer.java | 6 + .../master/service/MasterFailoverService.java | 3 +- .../master/service/FailoverServiceTest.java | 1 + .../grafana/DolphinSchedulerMaster.json | 320 ++++++++++++++---- .../remote/command/CommandType.java | 4 +- .../WorkflowMetricsCleanUpCommand.java | 44 +++ 17 files changed, 507 insertions(+), 84 deletions(-) create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MetricsCleanUpService.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowMetricsCleanUpProcessor.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowMetricsCleanUpCommand.java diff --git a/docs/docs/en/guide/metrics/metrics.md b/docs/docs/en/guide/metrics/metrics.md index fa0f07bf3f..951cccc4ac 100644 --- a/docs/docs/en/guide/metrics/metrics.md +++ b/docs/docs/en/guide/metrics/metrics.md @@ -83,7 +83,8 @@ For example, you can get the master metrics by `curl http://localhost:5679/actua - ds.workflow.create.command.count: (counter) the number of commands created and inserted by workflows - ds.workflow.instance.submit.count: (counter) the number of submitted workflow instances - ds.workflow.instance.running: (gauge) the number of running workflow instances -- ds.workflow.instance.count: (counter) the number of workflow instances, sliced by the tag `state`: +- ds.workflow.instance.count: (counter) the number of workflow instances, sliced by tags `process.definition.code` and `state`. To monitor a specific workflow, you could filter the metrics by tag `process.definition.code`, which refers to the definition code of your workflow. There are seven different states for workflow instances as follows: + - submit: the number of submitted workflow instances - timeout: the number of timeout workflow instances - finish: the number of finished workflow instances, both successes and failures included - success: the number of successful workflow instances diff --git a/docs/docs/zh/guide/metrics/metrics.md b/docs/docs/zh/guide/metrics/metrics.md index 4868ba7686..03b601f066 100644 --- a/docs/docs/zh/guide/metrics/metrics.md +++ b/docs/docs/zh/guide/metrics/metrics.md @@ -83,7 +83,7 @@ metrics exporter端口`server.port`是在application.yaml里定义的: master: ` - ds.workflow.create.command.count: (counter) 工作量创建并插入的命令数量 - ds.workflow.instance.running: (gauge) 正在运行的工作流实例数量 -- ds.workflow.instance.count: (counter) 工作流实例数量,由tag `state`按状态切分: +- ds.workflow.instance.count: (counter) 工作流实例数量,由tag `process.definition.code` 和 `state` 切分。您可以通过 `process.definition.code` 这个tag筛选出和某个workflow相关的指标,这里的 `process.definition.code` 指的是您工作流定义的编号代码。工作流实例有如下七种状态: - submit:已提交的工作量实例数量 - timeout:运行超时的工作流实例数量 - finish:已完成的工作流实例数量,包含成功和失败 diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MetricsCleanUpService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MetricsCleanUpService.java new file mode 100644 index 0000000000..7878f91981 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MetricsCleanUpService.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service; + +public interface MetricsCleanUpService { + + void cleanUpWorkflowMetricsByDefinitionCode(String workflowDefinitionCode); + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java new file mode 100644 index 0000000000..abea381251 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MetricsCleanUpServiceImpl.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import org.apache.dolphinscheduler.api.rpc.ApiRpcClient; +import org.apache.dolphinscheduler.api.service.MetricsCleanUpService; +import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.remote.command.WorkflowMetricsCleanUpCommand; +import org.apache.dolphinscheduler.remote.utils.Host; + +import java.util.List; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +public class MetricsCleanUpServiceImpl implements MetricsCleanUpService { + + @Autowired + private ApiRpcClient apiRpcClient; + + @Autowired + private RegistryClient registryClient; + + @Override + public void cleanUpWorkflowMetricsByDefinitionCode(String workflowDefinitionCode) { + WorkflowMetricsCleanUpCommand workflowMetricsCleanUpCommand = new WorkflowMetricsCleanUpCommand(); + workflowMetricsCleanUpCommand.setProcessDefinitionCode(workflowDefinitionCode); + List masterNodeList = registryClient.getServerList(NodeType.MASTER); + for (Server server : masterNodeList) { + try { + final String host = String.format("%s:%s", server.getHost(), server.getPort()); + apiRpcClient.send(Host.of(host), workflowMetricsCleanUpCommand.convert2Command()); + } catch (Exception e) { + log.error( + "Fail to clean up workflow related metrics on {} when deleting workflow definition {}, error message {}", + server.getHost(), workflowDefinitionCode, e.getMessage()); + } + } + } + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index b8def57492..fde462e9e9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -54,6 +54,7 @@ import org.apache.dolphinscheduler.api.dto.workflow.WorkflowFilterRequest; import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; +import org.apache.dolphinscheduler.api.service.MetricsCleanUpService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.service.ProcessInstanceService; import org.apache.dolphinscheduler.api.service.ProjectService; @@ -252,6 +253,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro @Autowired private WorkFlowLineageService workFlowLineageService; + @Autowired + private MetricsCleanUpService metricsCleanUpService; + /** * create process definition * @@ -1033,6 +1037,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro for (ProcessDefinition process : processDefinitionList) { try { this.deleteProcessDefinitionByCode(loginUser, process.getCode()); + metricsCleanUpService.cleanUpWorkflowMetricsByDefinitionCode(String.valueOf(process.getCode())); } catch (Exception e) { throw new ServiceException(Status.DELETE_PROCESS_DEFINE_ERROR, process.getName(), e.getMessage()); } @@ -1117,6 +1122,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro // we delete the workflow definition at last to avoid using transaction here. // If delete error, we can call this interface again. processDefinitionDao.deleteByWorkflowDefinitionCode(processDefinition.getCode()); + metricsCleanUpService.cleanUpWorkflowMetricsByDefinitionCode(String.valueOf(code)); log.info("Success delete workflow definition workflowDefinitionCode: {}", code); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 1d59ba8c58..d1c493a64b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -30,6 +30,7 @@ import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT; import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.times; import org.apache.dolphinscheduler.api.dto.workflow.WorkflowCreateRequest; import org.apache.dolphinscheduler.api.dto.workflow.WorkflowFilterRequest; @@ -175,6 +176,9 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { @Mock private WorkFlowLineageService workFlowLineageService; + @Mock + private MetricsCleanUpService metricsCleanUpService; + @Mock private TaskDefinitionService taskDefinitionService; @@ -478,6 +482,7 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { @Test public void deleteProcessDefinitionByCodeTest() { Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode)); + Mockito.doNothing().when(metricsCleanUpService).cleanUpWorkflowMetricsByDefinitionCode(String.valueOf(46L)); Project project = getProject(projectCode); @@ -525,6 +530,7 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode())) .thenReturn(Collections.emptySet()); processDefinitionService.deleteProcessDefinitionByCode(user, 46L); + Mockito.verify(metricsCleanUpService, times(1)).cleanUpWorkflowMetricsByDefinitionCode(String.valueOf(46L)); // scheduler online Schedule schedule = getSchedule(); @@ -551,7 +557,7 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode())) .thenReturn(Collections.emptySet()); Assertions.assertDoesNotThrow(() -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L)); - + Mockito.verify(metricsCleanUpService, times(2)).cleanUpWorkflowMetricsByDefinitionCode(String.valueOf(46L)); } @Test @@ -600,9 +606,11 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), process.getCode())) .thenReturn(Collections.emptySet()); putMsg(result, Status.SUCCESS, projectCode); + Mockito.doNothing().when(metricsCleanUpService).cleanUpWorkflowMetricsByDefinitionCode(String.valueOf(11L)); Map deleteSuccess = processDefinitionService.batchDeleteProcessDefinitionByCodes(user, projectCode, singleCodes); Assertions.assertEquals(Status.SUCCESS, deleteSuccess.get(Constants.STATUS)); + Mockito.verify(metricsCleanUpService, times(2)).cleanUpWorkflowMetricsByDefinitionCode(String.valueOf(11L)); } @Test diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java index 83af487bfe..e08e33a8d2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java @@ -59,8 +59,9 @@ public class WorkflowStartEventHandler implements WorkflowEventHandler { throw new WorkflowEventHandleError( "The workflow start event is invalid, cannot find the workflow instance from cache"); } - ProcessInstanceMetrics.incProcessInstanceByState("submit"); ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance(); + ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("submit", + processInstance.getProcessDefinitionCode().toString()); CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool) .thenAccept(workflowSubmitStatue -> { if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java index ba92d5b33a..11ad217d81 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java @@ -35,9 +35,9 @@ public class WorkflowStateEventHandler implements StateEventHandler { public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) throws StateEventHandleException { WorkflowStateEvent workflowStateEvent = (WorkflowStateEvent) stateEvent; - measureProcessState(workflowStateEvent); ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance(); ProcessDefinition processDefinition = processInstance.getProcessDefinition(); + measureProcessState(workflowStateEvent, processInstance.getProcessDefinitionCode().toString()); log.info( "Handle workflow instance state event, the current workflow instance state {} will be changed to {}", @@ -74,19 +74,20 @@ public class WorkflowStateEventHandler implements StateEventHandler { return StateEventType.PROCESS_STATE_CHANGE; } - private void measureProcessState(WorkflowStateEvent processStateEvent) { + private void measureProcessState(WorkflowStateEvent processStateEvent, String processDefinitionCode) { if (processStateEvent.getStatus().isFinished()) { - ProcessInstanceMetrics.incProcessInstanceByState("finish"); + ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("finish", processDefinitionCode); } switch (processStateEvent.getStatus()) { case STOP: - ProcessInstanceMetrics.incProcessInstanceByState("stop"); + ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("stop", processDefinitionCode); break; case SUCCESS: - ProcessInstanceMetrics.incProcessInstanceByState("success"); + ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("success", + processDefinitionCode); break; case FAILURE: - ProcessInstanceMetrics.incProcessInstanceByState("fail"); + ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("fail", processDefinitionCode); break; default: break; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java index e7bc579ea4..3d280673a6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.event; import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; @@ -32,7 +33,9 @@ public class WorkflowTimeoutStateEventHandler implements StateEventHandler { @Override public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) { log.info("Handle workflow instance timeout event"); - ProcessInstanceMetrics.incProcessInstanceByState("timeout"); + ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance(); + ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("timeout", + processInstance.getProcessDefinitionCode().toString()); workflowExecuteRunnable.processTimeout(); return true; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java index d610778424..dc1341db1d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java @@ -17,13 +17,12 @@ package org.apache.dolphinscheduler.server.master.metrics; -import java.util.HashMap; -import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; import com.google.common.collect.ImmutableSet; @@ -33,21 +32,18 @@ import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Timer; @UtilityClass +@Slf4j public class ProcessInstanceMetrics { - private final Map processInstanceCounters = new HashMap<>(); - private final Set processInstanceStates = ImmutableSet.of( "submit", "timeout", "finish", "failover", "success", "fail", "stop"); static { for (final String state : processInstanceStates) { - processInstanceCounters.put( - state, - Counter.builder("ds.workflow.instance.count") - .tag("state", state) - .description(String.format("Process instance %s total count", state)) - .register(Metrics.globalRegistry)); + Counter.builder("ds.workflow.instance.count") + .tags("state", state, "process.definition.code", "dummy") + .description(String.format("Process instance total count by state and definition code")) + .register(Metrics.globalRegistry); } } @@ -82,8 +78,26 @@ public class ProcessInstanceMetrics { .register(Metrics.globalRegistry); } - public void incProcessInstanceByState(final String state) { - processInstanceCounters.get(state).increment(); + public void incProcessInstanceByStateAndProcessDefinitionCode(final String state, + final String processDefinitionCode) { + // When tags need to be determined from local context, + // you have no choice but to construct or lookup the Meter inside your method body. + // The lookup cost is just a single hash lookup, so it is acceptable for most use cases. + Metrics.globalRegistry.counter( + "ds.workflow.instance.count", + "state", state, + "process.definition.code", processDefinitionCode) + .increment(); + } + + public void cleanUpProcessInstanceCountMetricsByDefinitionCode(final String processDefinitionCode) { + for (final String state : processInstanceStates) { + final Counter counter = Metrics.globalRegistry.counter( + "ds.workflow.instance.count", + "state", state, + "process.definition.code", processDefinitionCode); + Metrics.globalRegistry.remove(counter); + } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowMetricsCleanUpProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowMetricsCleanUpProcessor.java new file mode 100644 index 0000000000..30254d63a8 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/WorkflowMetricsCleanUpProcessor.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.processor; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.WorkflowMetricsCleanUpCommand; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; + +import org.springframework.stereotype.Component; + +import com.google.common.base.Preconditions; +import io.netty.channel.Channel; + +@Component +public class WorkflowMetricsCleanUpProcessor implements NettyRequestProcessor { + + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.WORKFLOW_METRICS_CLEANUP == command.getType(), + String.format("invalid command type: %s", command.getType())); + + WorkflowMetricsCleanUpCommand workflowMetricsCleanUpCommand = + JSONUtils.parseObject(command.getBody(), WorkflowMetricsCleanUpCommand.class); + + ProcessInstanceMetrics.cleanUpProcessInstanceCountMetricsByDefinitionCode( + workflowMetricsCleanUpCommand.getProcessDefinitionCode()); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java index a72d73b849..d039aac814 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProce import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskUpdatePidProcessor; import org.apache.dolphinscheduler.server.master.processor.WorkflowExecutingDataRequestProcessor; +import org.apache.dolphinscheduler.server.master.processor.WorkflowMetricsCleanUpProcessor; import lombok.extern.slf4j.Slf4j; @@ -83,6 +84,9 @@ public class MasterRPCServer implements AutoCloseable { @Autowired private TaskExecuteStartProcessor taskExecuteStartProcessor; + @Autowired + private WorkflowMetricsCleanUpProcessor workflowMetricsCleanUpProcessor; + public void start() { log.info("Starting Master RPC Server..."); // init remoting server @@ -101,6 +105,8 @@ public class MasterRPCServer implements AutoCloseable { this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST, workflowExecutingDataRequestProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_START, taskExecuteStartProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_METRICS_CLEANUP, + workflowMetricsCleanUpProcessor); // log server this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java index 1d36b1b47b..915b8f6ec9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java @@ -193,7 +193,8 @@ public class MasterFailoverService { } } - ProcessInstanceMetrics.incProcessInstanceByState("failover"); + ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("failover", + processInstance.getProcessDefinitionCode().toString()); // updateProcessInstance host is null to mark this processInstance has been failover // and insert a failover command processInstance.setHost(Constants.NULL); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java index 1bb9e4dbeb..8289da93b9 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java @@ -148,6 +148,7 @@ public class FailoverServiceTest { processInstance.setRestartTime(new Date()); processInstance.setHistoryCmd("xxx"); processInstance.setCommandType(CommandType.STOP); + processInstance.setProcessDefinitionCode(123L); masterTaskInstance = new TaskInstance(); masterTaskInstance.setId(1); diff --git a/dolphinscheduler-meter/src/main/resources/grafana/DolphinSchedulerMaster.json b/dolphinscheduler-meter/src/main/resources/grafana/DolphinSchedulerMaster.json index 3599ba8490..d5b7f1f397 100644 --- a/dolphinscheduler-meter/src/main/resources/grafana/DolphinSchedulerMaster.json +++ b/dolphinscheduler-meter/src/main/resources/grafana/DolphinSchedulerMaster.json @@ -70,6 +70,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -125,7 +127,8 @@ "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -159,6 +162,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -214,7 +219,8 @@ "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -248,6 +254,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -303,7 +311,8 @@ "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -362,6 +371,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -417,7 +428,8 @@ "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -502,7 +514,7 @@ "text": {}, "textMode": "auto" }, - "pluginVersion": "9.0.5", + "pluginVersion": "9.4.3", "targets": [ { "exemplar": true, @@ -569,7 +581,7 @@ "showThresholdMarkers": true, "text": {} }, - "pluginVersion": "9.0.5", + "pluginVersion": "9.4.3", "targets": [ { "exemplar": true, @@ -624,7 +636,7 @@ "alertThreshold": true }, "percentage": false, - "pluginVersion": "9.0.5", + "pluginVersion": "9.4.3", "pointradius": 5, "points": false, "renderer": "flot", @@ -731,7 +743,7 @@ "alertThreshold": true }, "percentage": false, - "pluginVersion": "9.0.5", + "pluginVersion": "9.4.3", "pointradius": 5, "points": false, "renderer": "flot", @@ -835,7 +847,7 @@ "alertThreshold": true }, "percentage": false, - "pluginVersion": "9.0.5", + "pluginVersion": "9.4.3", "pointradius": 5, "points": false, "renderer": "flot", @@ -920,6 +932,21 @@ "type": "prometheus", "uid": "PBFA97CFB590B2093" }, + "fieldConfig": { + "defaults": { + "custom": { + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "scaleDistribution": { + "type": "linear" + } + } + }, + "overrides": [] + }, "gridPos": { "h": 9, "w": 6, @@ -933,7 +960,44 @@ "legend": { "show": true }, - "pluginVersion": "9.0.5", + "options": { + "calculate": true, + "calculation": {}, + "cellGap": 2, + "cellValues": {}, + "color": { + "exponent": 0.5, + "fill": "#F2495C", + "mode": "opacity", + "reverse": false, + "scale": "exponential", + "scheme": "Oranges", + "steps": 128 + }, + "exemplars": { + "color": "rgba(255,0,255,0.7)" + }, + "filterValues": { + "le": 1e-9 + }, + "legend": { + "show": true + }, + "rowsFrame": { + "layout": "auto" + }, + "showValue": "never", + "tooltip": { + "show": true, + "yHistogram": false + }, + "yAxis": { + "axisPlacement": "left", + "reverse": false, + "unit": "s" + } + }, + "pluginVersion": "9.4.3", "reverseYBuckets": false, "targets": [ { @@ -984,6 +1048,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -1030,16 +1096,109 @@ }, "gridPos": { "h": 8, - "w": 12, + "w": 24, "x": 0, "y": 28 }, + "id": 196, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "builder", + "expr": "ds_workflow_instance_count_total{process_definition_code=\"dummy\"}", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Process Instance State Count By Definition Code", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 6, + "x": 0, + "y": 36 + }, "id": 152, "options": { "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -1070,6 +1229,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -1116,16 +1277,17 @@ }, "gridPos": { "h": 8, - "w": 12, - "x": 12, - "y": 28 + "w": 7, + "x": 6, + "y": 36 }, - "id": 162, + "id": 160, "options": { "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -1138,11 +1300,11 @@ "type": "prometheus", "uid": "PBFA97CFB590B2093" }, - "expr": "sum(increase(ds_workflow_instance_count_total{state=\"finish\"}[1m]))", + "expr": "sum(increase(ds_workflow_instance_count_total{state=\"stop\"}[1m]))", "refId": "A" } ], - "title": "Process Instance Finish/1m", + "title": "Process Instance Stop/1m", "type": "timeseries" }, { @@ -1156,6 +1318,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -1203,15 +1367,16 @@ "gridPos": { "h": 8, "w": 6, - "x": 0, + "x": 14, "y": 36 }, - "id": 156, + "id": 162, "options": { "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -1224,11 +1389,11 @@ "type": "prometheus", "uid": "PBFA97CFB590B2093" }, - "expr": "sum(increase(ds_workflow_instance_count_total{state=\"success\"}[1m]))", + "expr": "sum(increase(ds_workflow_instance_count_total{state=\"finish\"}[1m]))", "refId": "A" } ], - "title": "Process Instance Success /1m", + "title": "Process Instance Finish/1m", "type": "timeseries" }, { @@ -1242,6 +1407,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -1289,15 +1456,16 @@ "gridPos": { "h": 8, "w": 6, - "x": 6, - "y": 36 + "x": 0, + "y": 44 }, - "id": 160, + "id": 156, "options": { "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -1310,11 +1478,11 @@ "type": "prometheus", "uid": "PBFA97CFB590B2093" }, - "expr": "sum(increase(ds_workflow_instance_count_total{state=\"stop\"}[1m]))", + "expr": "sum(increase(ds_workflow_instance_count_total{state=\"success\"}[1m]))", "refId": "A" } ], - "title": "Process Instance Stop/1m", + "title": "Process Instance Success /1m", "type": "timeseries" }, { @@ -1328,6 +1496,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -1375,15 +1545,16 @@ "gridPos": { "h": 8, "w": 6, - "x": 12, - "y": 36 + "x": 6, + "y": 44 }, "id": 154, "options": { "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -1414,6 +1585,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -1461,15 +1634,16 @@ "gridPos": { "h": 8, "w": 6, - "x": 18, - "y": 36 + "x": 12, + "y": 44 }, "id": 158, "options": { "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -1495,7 +1669,7 @@ "h": 1, "w": 24, "x": 0, - "y": 44 + "y": 52 }, "id": 172, "panels": [], @@ -1513,6 +1687,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -1561,14 +1737,15 @@ "h": 8, "w": 8, "x": 0, - "y": 45 + "y": 53 }, "id": 178, "options": { "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -1617,6 +1794,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -1665,14 +1844,15 @@ "h": 8, "w": 8, "x": 8, - "y": 45 + "y": 53 }, "id": 180, "options": { "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -1703,6 +1883,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -1751,14 +1933,15 @@ "h": 8, "w": 8, "x": 16, - "y": 45 + "y": 53 }, "id": 182, "options": { "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -1789,6 +1972,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -1837,14 +2022,15 @@ "h": 8, "w": 8, "x": 0, - "y": 53 + "y": 61 }, "id": 184, "options": { "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -1875,6 +2061,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -1923,14 +2111,15 @@ "h": 8, "w": 8, "x": 8, - "y": 53 + "y": 61 }, "id": 186, "options": { "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -1961,6 +2150,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -2009,14 +2200,15 @@ "h": 8, "w": 8, "x": 16, - "y": 53 + "y": 61 }, "id": 188, "options": { "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -2047,6 +2239,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -2095,14 +2289,15 @@ "h": 8, "w": 8, "x": 0, - "y": 61 + "y": 69 }, "id": 190, "options": { "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -2133,6 +2328,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -2181,14 +2378,15 @@ "h": 8, "w": 8, "x": 8, - "y": 61 + "y": 69 }, "id": 192, "options": { "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -2219,6 +2417,8 @@ "mode": "palette-classic" }, "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, @@ -2267,14 +2467,15 @@ "h": 8, "w": 8, "x": 16, - "y": 61 + "y": 69 }, "id": 194, "options": { "legend": { "calcs": [], "displayMode": "list", - "placement": "bottom" + "placement": "bottom", + "showLegend": true }, "tooltip": { "mode": "single", @@ -2295,8 +2496,9 @@ "type": "timeseries" } ], - "refresh": "5s", - "schemaVersion": 36, + "refresh": false, + "revision": 1, + "schemaVersion": 38, "style": "dark", "tags": [], "templating": { @@ -2430,8 +2632,8 @@ ] }, "time": { - "from": "now-30m", - "to": "now" + "from": "2023-03-07T04:20:35.626Z", + "to": "2023-03-07T04:22:11.080Z" }, "timepicker": { "now": true, diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index 0ee631ae28..88f0f117f3 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -144,5 +144,7 @@ public enum CommandType { /** * workflow executing data response, from master to api */ - WORKFLOW_EXECUTING_DATA_RESPONSE; + WORKFLOW_EXECUTING_DATA_RESPONSE, + + WORKFLOW_METRICS_CLEANUP; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowMetricsCleanUpCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowMetricsCleanUpCommand.java new file mode 100644 index 0000000000..337bd05333 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowMetricsCleanUpCommand.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; + +import java.io.Serializable; + +import lombok.Data; + +@Data +public class WorkflowMetricsCleanUpCommand implements Serializable { + + private String processDefinitionCode; + + /** + * package request command + * + * @return command + */ + public Command convert2Command() { + Command command = new Command(); + command.setType(CommandType.WORKFLOW_METRICS_CLEANUP); + byte[] body = JSONUtils.toJsonByteArray(this); + command.setBody(body); + return command; + } + +}