From e3bd26322fcb75f620c5733c6f009823fb74dc3a Mon Sep 17 00:00:00 2001 From: John Huang Date: Tue, 5 Mar 2024 18:32:32 +0800 Subject: [PATCH] [Improvement][UT] Improve Worker runner coverage (#15428) Co-authored-by: Rick Cheng Co-authored-by: caishunfeng --- .../WorkerTaskExecutorFactoryBuilder.java | 15 + ...xecutionEventAckListenFunctionManager.java | 9 + ...ExecutionFinishEventAckListenFunction.java | 4 + ...ceExecutionInfoEventAckListenFunction.java | 4 + ...xecutionRunningEventAckListenFunction.java | 3 + ...TaskInstanceDispatchOperationFunction.java | 9 + .../TaskInstanceKillOperationFunction.java | 7 + .../TaskInstanceOperationFunctionManager.java | 11 + .../UpdateWorkflowHostOperationFunction.java | 4 + ...ceExecutionEventAckListenFunctionTest.java | 104 +++++++ .../TaskInstanceOperationFunctionTest.java | 280 ++++++++++++++++++ 11 files changed, 450 insertions(+) create mode 100644 dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionTest.java create mode 100644 dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionTest.java diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java index a9c2948482..599746818d 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java @@ -48,6 +48,21 @@ public class WorkerTaskExecutorFactoryBuilder { @Autowired private WorkerRegistryClient workerRegistryClient; + public WorkerTaskExecutorFactoryBuilder( + WorkerConfig workerConfig, + WorkerMessageSender workerMessageSender, + TaskPluginManager taskPluginManager, + WorkerTaskExecutorThreadPool workerManager, + StorageOperate storageOperate, + WorkerRegistryClient workerRegistryClient) { + this.workerConfig = workerConfig; + this.workerMessageSender = workerMessageSender; + this.taskPluginManager = taskPluginManager; + this.workerManager = workerManager; + this.storageOperate = storageOperate; + this.workerRegistryClient = workerRegistryClient; + } + public WorkerTaskExecutorFactory createWorkerTaskExecutorFactory(TaskExecutionContext taskExecutionContext) { return new DefaultWorkerTaskExecutorFactory(taskExecutionContext, workerConfig, diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionManager.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionManager.java index b4423f4880..3214be8c89 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionManager.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionManager.java @@ -35,6 +35,15 @@ public class TaskInstanceExecutionEventAckListenFunctionManager { @Autowired private TaskInstanceExecutionInfoEventAckListenFunction taskInstanceExecutionInfoEventAckListenFunction; + public TaskInstanceExecutionEventAckListenFunctionManager( + TaskInstanceExecutionRunningEventAckListenFunction taskInstanceExecutionRunningEventAckListenFunction, + TaskInstanceExecutionFinishEventAckListenFunction taskInstanceExecutionFinishEventAckListenFunction, + TaskInstanceExecutionInfoEventAckListenFunction taskInstanceExecutionInfoEventAckListenFunction) { + this.taskInstanceExecutionRunningEventAckListenFunction = taskInstanceExecutionRunningEventAckListenFunction; + this.taskInstanceExecutionFinishEventAckListenFunction = taskInstanceExecutionFinishEventAckListenFunction; + this.taskInstanceExecutionInfoEventAckListenFunction = taskInstanceExecutionInfoEventAckListenFunction; + } + public TaskInstanceExecutionRunningEventAckListenFunction getTaskInstanceExecutionRunningEventAckListenFunction() { return taskInstanceExecutionRunningEventAckListenFunction; } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionFinishEventAckListenFunction.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionFinishEventAckListenFunction.java index ad7892bc7a..a358623519 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionFinishEventAckListenFunction.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionFinishEventAckListenFunction.java @@ -36,6 +36,10 @@ public class TaskInstanceExecutionFinishEventAckListenFunction @Autowired private MessageRetryRunner messageRetryRunner; + public TaskInstanceExecutionFinishEventAckListenFunction(MessageRetryRunner messageRetryRunner) { + this.messageRetryRunner = messageRetryRunner; + } + @Override public void handleTaskInstanceExecutionEventAck(TaskInstanceExecutionFinishEventAck taskInstanceExecutionFinishEventAck) { try { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionInfoEventAckListenFunction.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionInfoEventAckListenFunction.java index 971343103a..b3dcc9bf8a 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionInfoEventAckListenFunction.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionInfoEventAckListenFunction.java @@ -37,6 +37,10 @@ public class TaskInstanceExecutionInfoEventAckListenFunction @Resource private MessageRetryRunner messageRetryRunner; + public TaskInstanceExecutionInfoEventAckListenFunction(MessageRetryRunner messageRetryRunner) { + this.messageRetryRunner = messageRetryRunner; + } + @Override public void handleTaskInstanceExecutionEventAck(TaskInstanceExecutionInfoEventAck taskInstanceExecutionInfoEventAck) { try { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionRunningEventAckListenFunction.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionRunningEventAckListenFunction.java index 9d6de78e02..e17d72ad99 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionRunningEventAckListenFunction.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionRunningEventAckListenFunction.java @@ -36,6 +36,9 @@ public class TaskInstanceExecutionRunningEventAckListenFunction @Autowired private MessageRetryRunner messageRetryRunner; + public TaskInstanceExecutionRunningEventAckListenFunction(MessageRetryRunner messageRetryRunner) { + this.messageRetryRunner = messageRetryRunner; + } @Override public void handleTaskInstanceExecutionEventAck(TaskInstanceExecutionRunningEventAck taskInstanceExecutionRunningEventAck) { try { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java index e6d259412f..fc128a9a34 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java @@ -48,6 +48,15 @@ public class TaskInstanceDispatchOperationFunction @Autowired private WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool; + public TaskInstanceDispatchOperationFunction( + WorkerConfig workerConfig, + WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder, + WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool) { + this.workerConfig = workerConfig; + this.workerTaskExecutorFactoryBuilder = workerTaskExecutorFactoryBuilder; + this.workerTaskExecutorThreadPool = workerTaskExecutorThreadPool; + } + @Override public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest taskInstanceDispatchRequest) { log.info("Receive TaskInstanceDispatchRequest: {}", taskInstanceDispatchRequest); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java index 69e3994a90..d55765d23f 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java @@ -50,6 +50,13 @@ public class TaskInstanceKillOperationFunction @Autowired private MessageRetryRunner messageRetryRunner; + public TaskInstanceKillOperationFunction( + WorkerTaskExecutorThreadPool workerManager, + MessageRetryRunner messageRetryRunner) { + this.workerManager = workerManager; + this.messageRetryRunner = messageRetryRunner; + } + @Override public TaskInstanceKillResponse operate(TaskInstanceKillRequest taskInstanceKillRequest) { log.info("Receive TaskInstanceKillRequest: {}", taskInstanceKillRequest); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionManager.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionManager.java index 99ae193b47..8014b88fd1 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionManager.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionManager.java @@ -35,6 +35,17 @@ public class TaskInstanceOperationFunctionManager { @Autowired private TaskInstancePauseOperationFunction taskInstancePauseOperationFunction; + public TaskInstanceOperationFunctionManager( + TaskInstanceKillOperationFunction taskInstanceKillOperationFunction, + UpdateWorkflowHostOperationFunction updateWorkflowHostOperationFunction, + TaskInstanceDispatchOperationFunction taskInstanceDispatchOperationFunction, + TaskInstancePauseOperationFunction taskInstancePauseOperationFunction) { + this.taskInstanceKillOperationFunction = taskInstanceKillOperationFunction; + this.updateWorkflowHostOperationFunction = updateWorkflowHostOperationFunction; + this.taskInstanceDispatchOperationFunction = taskInstanceDispatchOperationFunction; + this.taskInstancePauseOperationFunction = taskInstancePauseOperationFunction; + } + public TaskInstanceKillOperationFunction getTaskInstanceKillOperationFunction() { return taskInstanceKillOperationFunction; } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java index 7485b9230f..c0ab345450 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java @@ -39,6 +39,10 @@ public class UpdateWorkflowHostOperationFunction @Autowired private MessageRetryRunner messageRetryRunner; + public UpdateWorkflowHostOperationFunction(MessageRetryRunner messageRetryRunner) { + this.messageRetryRunner = messageRetryRunner; + } + @Override public UpdateWorkflowHostResponse operate(UpdateWorkflowHostRequest updateWorkflowHostRequest) { try { diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionTest.java new file mode 100644 index 0000000000..5044fba11e --- /dev/null +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/listener/TaskInstanceExecutionEventAckListenFunctionTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner.listener; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.times; + +import org.apache.dolphinscheduler.extract.master.transportor.ITaskInstanceExecutionEvent; +import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceExecutionFinishEventAck; +import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceExecutionInfoEventAck; +import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceExecutionRunningEventAck; +import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TaskInstanceExecutionEventAckListenFunctionTest { + + private static final Logger log = LoggerFactory.getLogger(TaskInstanceExecutionEventAckListenFunctionTest.class); + private MessageRetryRunner messageRetryRunner = Mockito.mock(MessageRetryRunner.class); + + @Test + public void testTaskInstanceExecutionEventAckListenFunctionManager() { + TaskInstanceExecutionFinishEventAckListenFunction taskInstanceExecutionFinishEventAckListenFunction = + new TaskInstanceExecutionFinishEventAckListenFunction(messageRetryRunner); + TaskInstanceExecutionInfoEventAckListenFunction taskInstanceExecutionInfoEventAckListenFunction = + new TaskInstanceExecutionInfoEventAckListenFunction(messageRetryRunner); + TaskInstanceExecutionRunningEventAckListenFunction taskInstanceExecutionRunningEventAckListenFunction = + new TaskInstanceExecutionRunningEventAckListenFunction(messageRetryRunner); + TaskInstanceExecutionEventAckListenFunctionManager taskInstanceExecutionEventAckListenFunctionManager = + new TaskInstanceExecutionEventAckListenFunctionManager( + taskInstanceExecutionRunningEventAckListenFunction, + taskInstanceExecutionFinishEventAckListenFunction, + taskInstanceExecutionInfoEventAckListenFunction); + Assertions.assertEquals(taskInstanceExecutionRunningEventAckListenFunction, + taskInstanceExecutionEventAckListenFunctionManager + .getTaskInstanceExecutionRunningEventAckListenFunction()); + Assertions.assertEquals(taskInstanceExecutionInfoEventAckListenFunction, + taskInstanceExecutionEventAckListenFunctionManager + .getTaskInstanceExecutionInfoEventAckListenFunction()); + Assertions.assertEquals(taskInstanceExecutionFinishEventAckListenFunction, + taskInstanceExecutionEventAckListenFunctionManager + .getTaskInstanceExecutionFinishEventAckListenFunction()); + } + + @Test + public void testTaskInstanceExecutionEventAckListenFunctionDryRun() { + int taskInstanceId1 = 111; + int taskInstanceId2 = 222; + int taskInstanceId3 = 333; + TaskInstanceExecutionFinishEventAckListenFunction taskInstanceExecutionFinishEventAckListenFunction = + new TaskInstanceExecutionFinishEventAckListenFunction(messageRetryRunner); + taskInstanceExecutionFinishEventAckListenFunction.handleTaskInstanceExecutionEventAck( + TaskInstanceExecutionFinishEventAck.success(taskInstanceId1)); + + ArgumentCaptor acInt = ArgumentCaptor.forClass(int.class); + ArgumentCaptor acEventType = + ArgumentCaptor.forClass(ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.class); + + Mockito.verify(messageRetryRunner, times(1)).removeRetryMessage( + (int) acInt.capture(), + (ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType) acEventType.capture()); + + assertEquals(taskInstanceId1, acInt.getValue()); + + TaskInstanceExecutionInfoEventAckListenFunction taskInstanceExecutionInfoEventAckListenFunction = + new TaskInstanceExecutionInfoEventAckListenFunction(messageRetryRunner); + taskInstanceExecutionInfoEventAckListenFunction.handleTaskInstanceExecutionEventAck( + TaskInstanceExecutionInfoEventAck.success(taskInstanceId2)); + + Mockito.verify(messageRetryRunner, times(2)).removeRetryMessage( + (int) acInt.capture(), + (ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType) acEventType.capture()); + assertEquals(taskInstanceId2, acInt.getValue()); + + TaskInstanceExecutionRunningEventAckListenFunction taskInstanceExecutionRunningEventAckListenFunction = + new TaskInstanceExecutionRunningEventAckListenFunction(messageRetryRunner); + taskInstanceExecutionRunningEventAckListenFunction.handleTaskInstanceExecutionEventAck( + TaskInstanceExecutionRunningEventAck.success(taskInstanceId3)); + Mockito.verify(messageRetryRunner, times(3)).removeRetryMessage( + (int) acInt.capture(), + (ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType) acEventType.capture()); + assertEquals(taskInstanceId3, acInt.getValue()); + } +} diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionTest.java new file mode 100644 index 0000000000..592340214f --- /dev/null +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceOperationFunctionTest.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner.operator; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchRequest; +import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchResponse; +import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRequest; +import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillResponse; +import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstancePauseRequest; +import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstancePauseResponse; +import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest; +import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostResponse; +import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; +import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; +import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; +import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorFactoryBuilder; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TaskInstanceOperationFunctionTest { + + private static final Logger log = LoggerFactory.getLogger(TaskInstanceOperationFunctionTest.class); + private MessageRetryRunner messageRetryRunner = Mockito.mock(MessageRetryRunner.class); + + private WorkerConfig workerConfig = Mockito.mock(WorkerConfig.class); + + private TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class); + + private WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool = + Mockito.mock(WorkerTaskExecutorThreadPool.class); + + private WorkerTaskExecutor workerTaskExecutor = Mockito.mock(WorkerTaskExecutor.class); + + private AbstractTask task = Mockito.mock(AbstractTask.class); + + private WorkerMessageSender workerMessageSender = Mockito.mock(WorkerMessageSender.class); + + private TaskPluginManager taskPluginManager = Mockito.mock(TaskPluginManager.class); + + private WorkerTaskExecutorThreadPool workerManager = Mockito.mock(WorkerTaskExecutorThreadPool.class); + + private StorageOperate storageOperate = Mockito.mock(StorageOperate.class); + + private WorkerRegistryClient workerRegistryClient = Mockito.mock(WorkerRegistryClient.class); + + @Test + public void testTaskInstanceOperationFunctionManager() { + TaskInstanceKillOperationFunction taskInstanceKillOperationFunction = new TaskInstanceKillOperationFunction( + workerTaskExecutorThreadPool, + messageRetryRunner); + + TaskInstancePauseOperationFunction taskInstancePauseOperationFunction = + new TaskInstancePauseOperationFunction(); + + UpdateWorkflowHostOperationFunction updateWorkflowHostOperationFunction = + new UpdateWorkflowHostOperationFunction( + messageRetryRunner); + + WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder = new WorkerTaskExecutorFactoryBuilder( + workerConfig, + workerMessageSender, + taskPluginManager, + workerManager, + storageOperate, + workerRegistryClient); + + TaskInstanceDispatchOperationFunction taskInstanceDispatchOperationFunction = + new TaskInstanceDispatchOperationFunction( + workerConfig, + workerTaskExecutorFactoryBuilder, + workerTaskExecutorThreadPool); + + TaskInstanceOperationFunctionManager taskInstanceOperationFunctionManager = + new TaskInstanceOperationFunctionManager( + taskInstanceKillOperationFunction, + updateWorkflowHostOperationFunction, + taskInstanceDispatchOperationFunction, + taskInstancePauseOperationFunction); + + Assertions.assertEquals(taskInstanceKillOperationFunction, + taskInstanceOperationFunctionManager.getTaskInstanceKillOperationFunction()); + Assertions.assertEquals(taskInstancePauseOperationFunction, + taskInstanceOperationFunctionManager.getTaskInstancePauseOperationFunction()); + Assertions.assertEquals(updateWorkflowHostOperationFunction, + taskInstanceOperationFunctionManager.getUpdateWorkflowHostOperationFunction()); + Assertions.assertEquals(taskInstanceDispatchOperationFunction, + taskInstanceOperationFunctionManager.getTaskInstanceDispatchOperationFunction()); + } + + @Test + public void testUpdateWorkflowHostOperationFunction() { + UpdateWorkflowHostOperationFunction updateWorkflowHostOperationFunction = + new UpdateWorkflowHostOperationFunction( + messageRetryRunner); + + try (MockedStatic logUtilsMockedStatic = Mockito.mockStatic(LogUtils.class)) { + logUtilsMockedStatic + .when(() -> LogUtils + .setTaskInstanceIdMDC(any(Integer.class))) + .then(invocationOnMock -> null); + UpdateWorkflowHostRequest request = new UpdateWorkflowHostRequest(); + request.setTaskInstanceId(1); + request.setWorkflowHost("host"); + UpdateWorkflowHostResponse taskInstanceDispatchResponse = updateWorkflowHostOperationFunction.operate( + request); + Assertions.assertEquals(taskInstanceDispatchResponse.isSuccess(), false); + } + + try (MockedStatic logUtilsMockedStatic = Mockito.mockStatic(LogUtils.class)) { + logUtilsMockedStatic + .when(() -> LogUtils + .setTaskInstanceIdMDC(any(Integer.class))) + .then(invocationOnMock -> null); + + try ( + MockedStatic workerTaskExecutorHolderMockedStatic = + Mockito.mockStatic(WorkerTaskExecutorHolder.class)) { + given(workerTaskExecutor.getTaskExecutionContext()).willReturn(taskExecutionContext); + workerTaskExecutorHolderMockedStatic + .when(() -> WorkerTaskExecutorHolder.get(any(Integer.class))) + .thenReturn(workerTaskExecutor); + int taskInstanceId = 111; + UpdateWorkflowHostRequest request = new UpdateWorkflowHostRequest(); + request.setTaskInstanceId(taskInstanceId); + request.setWorkflowHost("host"); + + UpdateWorkflowHostResponse taskInstanceDispatchResponse = updateWorkflowHostOperationFunction.operate( + request); + Assertions.assertEquals(taskInstanceDispatchResponse.isSuccess(), true); + } + } + } + + @Test + public void testTaskInstancePauseOperationFunction() { + TaskInstancePauseOperationFunction taskInstancePauseOperationFunction = + new TaskInstancePauseOperationFunction(); + + try (MockedStatic logUtilsMockedStatic = Mockito.mockStatic(LogUtils.class)) { + logUtilsMockedStatic + .when(() -> LogUtils + .setTaskInstanceIdMDC(any(Integer.class))) + .then(invocationOnMock -> null); + TaskInstancePauseRequest request = new TaskInstancePauseRequest(); + request.setTaskInstanceId(1); + TaskInstancePauseResponse taskInstanceDispatchResponse = taskInstancePauseOperationFunction.operate( + request); + Assertions.assertEquals(taskInstanceDispatchResponse.isSuccess(), true); + } + } + + @Test + public void testTaskInstanceDispatchOperationFunction() { + WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder = new WorkerTaskExecutorFactoryBuilder( + workerConfig, + workerMessageSender, + taskPluginManager, + workerManager, + storageOperate, + workerRegistryClient); + + TaskInstanceDispatchOperationFunction taskInstanceDispatchOperationFunction = + new TaskInstanceDispatchOperationFunction( + workerConfig, + workerTaskExecutorFactoryBuilder, + workerTaskExecutorThreadPool); + + try (MockedStatic logUtilsMockedStatic = Mockito.mockStatic(LogUtils.class)) { + logUtilsMockedStatic + .when(() -> LogUtils + .getTaskInstanceLogFullPath(any(TaskExecutionContext.class))) + .thenReturn("test"); + TaskInstanceDispatchResponse taskInstanceDispatchResponse = taskInstanceDispatchOperationFunction.operate( + new TaskInstanceDispatchRequest(taskExecutionContext)); + Assertions.assertEquals(taskInstanceDispatchResponse.isDispatchSuccess(), false); + logUtilsMockedStatic.verify(times(1), () -> LogUtils.removeWorkflowAndTaskInstanceIdMDC()); + + given(workerTaskExecutorThreadPool.submitWorkerTaskExecutor(any())).willReturn(true); + taskInstanceDispatchResponse = taskInstanceDispatchOperationFunction.operate( + new TaskInstanceDispatchRequest(taskExecutionContext)); + Assertions.assertEquals(taskInstanceDispatchResponse.isDispatchSuccess(), true); + logUtilsMockedStatic.verify(times(2), () -> LogUtils.removeWorkflowAndTaskInstanceIdMDC()); + } + } + + @Test + public void testTaskInstanceKillOperationFunction() { + TaskInstanceKillOperationFunction taskInstanceKillOperationFunction = new TaskInstanceKillOperationFunction( + workerManager, + messageRetryRunner); + + try (MockedStatic logUtilsMockedStatic = Mockito.mockStatic(LogUtils.class)) { + int taskInstanceId = 111; + logUtilsMockedStatic + .when(() -> LogUtils + .setTaskInstanceLogFullPathMDC(any(String.class))) + .then(invocationOnMock -> null); + TaskInstanceKillResponse response = taskInstanceKillOperationFunction.operate( + new TaskInstanceKillRequest(taskInstanceId)); + Assertions.assertEquals("Cannot find WorkerTaskExecutor", response.getMessage()); + } + + try (MockedStatic logUtilsMockedStatic = Mockito.mockStatic(LogUtils.class)) { + int processId = 12; + int taskInstanceId = 111; + Mockito.reset(taskExecutionContext); + given(taskExecutionContext.getProcessId()).willReturn(processId); + given(taskExecutionContext.getLogPath()).willReturn("logpath"); + logUtilsMockedStatic + .when(() -> LogUtils + .setTaskInstanceLogFullPathMDC(any(String.class))) + .then(invocationOnMock -> null); + taskInstanceKillOperationFunction.operate( + new TaskInstanceKillRequest(taskInstanceId)); + logUtilsMockedStatic.verify(times(1), () -> LogUtils.removeTaskInstanceIdMDC()); + logUtilsMockedStatic.verify(times(1), () -> LogUtils.removeTaskInstanceLogFullPathMDC()); + } + + try (MockedStatic logUtilsMockedStatic = Mockito.mockStatic(LogUtils.class)) { + try ( + MockedStatic workerTaskExecutorHolderMockedStatic = + Mockito.mockStatic(WorkerTaskExecutorHolder.class)) { + given(workerTaskExecutor.getTaskExecutionContext()).willReturn(taskExecutionContext); + workerTaskExecutorHolderMockedStatic + .when(() -> WorkerTaskExecutorHolder.get(any(Integer.class))) + .thenReturn(workerTaskExecutor); + int processId = 12; + int taskInstanceId = 111; + Mockito.reset(taskExecutionContext); + given(taskExecutionContext.getProcessId()).willReturn(processId); + given(taskExecutionContext.getLogPath()).willReturn("logpath"); + logUtilsMockedStatic + .when(() -> LogUtils + .setTaskInstanceLogFullPathMDC(any(String.class))) + .then(invocationOnMock -> null); + when(workerTaskExecutor.getTask()).thenReturn(task); + // given(workerManager.getTaskExecuteThread(taskInstanceId)).willReturn(workerTaskExecutor); + taskInstanceKillOperationFunction.operate( + new TaskInstanceKillRequest(taskInstanceId)); + verify(task, times(1)).cancel(); + } + + } + } +}