diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java index ee4e960a80..4c575a29e8 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStopStrategy.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.worker.registry; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.StrategyType; -import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import lombok.extern.slf4j.Slf4j; @@ -34,8 +33,6 @@ public class WorkerStopStrategy implements WorkerConnectStrategy { @Autowired public RegistryClient registryClient; - @Autowired - private WorkerConfig workerConfig; @Override public void disconnect() { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java index 4437e3545d..2be156b5df 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThread import java.time.Duration; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -52,6 +53,16 @@ public class WorkerWaitingStrategy implements WorkerConnectStrategy { @Autowired private WorkerTaskExecutorThreadPool workerManagerThread; + public WorkerWaitingStrategy(@NonNull WorkerConfig workerConfig, + @NonNull RegistryClient registryClient, + @NonNull MessageRetryRunner messageRetryRunner, + @NonNull WorkerTaskExecutorThreadPool workerManagerThread) { + this.workerConfig = workerConfig; + this.registryClient = registryClient; + this.messageRetryRunner = messageRetryRunner; + this.workerManagerThread = workerManagerThread; + } + @Override public void disconnect() { try { diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListenerTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListenerTest.java new file mode 100644 index 0000000000..14d75779b9 --- /dev/null +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerConnectionStateListenerTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.registry; + +import static org.mockito.Mockito.times; + +import org.apache.dolphinscheduler.registry.api.ConnectionState; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * worker registry test + */ +@ExtendWith(MockitoExtension.class) +public class WorkerConnectionStateListenerTest { + + private static final Logger log = LoggerFactory.getLogger(WorkerConnectionStateListenerTest.class); + @InjectMocks + private WorkerConnectionStateListener workerConnectionStateListener; + @Mock + private WorkerConfig workerConfig; + @Mock + private WorkerConnectStrategy workerConnectStrategy; + + @Test + public void testWorkerConnectionStateListener() { + workerConnectionStateListener.onUpdate(ConnectionState.CONNECTED); + + workerConnectionStateListener.onUpdate(ConnectionState.RECONNECTED); + Mockito.verify(workerConnectStrategy, times(1)).reconnect(); + + workerConnectionStateListener.onUpdate(ConnectionState.SUSPENDED); + + workerConnectionStateListener.onUpdate(ConnectionState.DISCONNECTED); + Mockito.verify(workerConnectStrategy, times(1)).disconnect(); + } +} diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java index f44c759420..8c7071cf60 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.worker.registry; import static org.mockito.BDDMockito.given; +import org.apache.dolphinscheduler.common.IStoppable; +import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; @@ -29,6 +31,9 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerServerLoadProtecti import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool; import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Optional; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -37,6 +42,8 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * worker registry test @@ -44,26 +51,24 @@ import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) public class WorkerRegistryClientTest { + private static final Logger log = LoggerFactory.getLogger(WorkerRegistryClientTest.class); @InjectMocks private WorkerRegistryClient workerRegistryClient; - @Mock private RegistryClient registryClient; - @Mock private WorkerConfig workerConfig; - @Mock private MetricsProvider metricsProvider; - @Mock private WorkerTaskExecutorThreadPool workerManagerThread; - @Mock private WorkerConnectStrategy workerConnectStrategy; + @Mock + private IStoppable stoppable; @Test - public void testStart() { + public void testWorkerRegistryClientbasic() { given(workerConfig.getWorkerAddress()).willReturn(NetUtils.getAddr(1234)); given(workerConfig.getMaxHeartbeatInterval()).willReturn(Duration.ofSeconds(1)); @@ -75,16 +80,23 @@ public class WorkerRegistryClientTest { workerRegistryClient.initWorkRegistry(); workerRegistryClient.start(); - Assertions.assertTrue(true); + workerRegistryClient.setRegistryStoppable(stoppable); } @Test - public void testUnRegistry() { - - } - - @Test - public void testGetWorkerZkPaths() { - + public void testWorkerRegistryClientgetAlertServerAddress() { + given(registryClient.getServerList(Mockito.any(RegistryNodeType.class))) + .willReturn(new ArrayList()); + Assertions.assertEquals(workerRegistryClient.getAlertServerAddress(), Optional.empty()); + Mockito.reset(registryClient); + String host = "test"; + Integer port = 1; + Server server = new Server(); + server.setHost(host); + server.setPort(port); + given(registryClient.getServerList(Mockito.any(RegistryNodeType.class))) + .willReturn(new ArrayList(Arrays.asList(server))); + Assertions.assertEquals(workerRegistryClient.getAlertServerAddress().get().getAddress(), + String.format("%s:%d", host, port)); } } diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStrategyTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStrategyTest.java new file mode 100644 index 0000000000..671fac5277 --- /dev/null +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerStrategyTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.registry; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.doNothing; + +import org.apache.dolphinscheduler.common.IStoppable; +import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException; +import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; +import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties; +import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.RegistryException; +import org.apache.dolphinscheduler.registry.api.StrategyType; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; +import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool; + +import java.time.Duration; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * worker registry test + */ +@ExtendWith(MockitoExtension.class) +public class WorkerStrategyTest { + + private static final Logger log = LoggerFactory.getLogger(WorkerStrategyTest.class); + @Mock + private RegistryClient registryClient; + @Mock + private IStoppable stoppable; + @Mock + private WorkerConfig workerConfig; + @Mock + private WorkerRpcServer workerRpcServer; + @Mock + private MessageRetryRunner messageRetryRunner; + @Mock + private WorkerTaskExecutorThreadPool workerManagerThread; + @Mock + private ConnectStrategyProperties connectStrategyProperties; + + @Test + public void testWorkerStopStrategy() { + given(registryClient.getStoppable()) + .willReturn(stoppable); + WorkerStopStrategy workerStopStrategy = new WorkerStopStrategy(); + workerStopStrategy.registryClient = registryClient; + workerStopStrategy.reconnect(); + workerStopStrategy.disconnect(); + Assertions.assertEquals(workerStopStrategy.getStrategyType(), StrategyType.STOP); + } + + @Test + public void testWorkerWaitingStrategyreconnect() { + WorkerWaitingStrategy workerWaitingStrategy = new WorkerWaitingStrategy( + workerConfig, + registryClient, + messageRetryRunner, + workerManagerThread); + Assertions.assertEquals(workerWaitingStrategy.getStrategyType(), StrategyType.WAITING); + + try ( + MockedStatic serverLifeCycleManagerMockedStatic = + Mockito.mockStatic(ServerLifeCycleManager.class)) { + serverLifeCycleManagerMockedStatic + .when(() -> ServerLifeCycleManager.isRunning()) + .thenReturn(true); + workerWaitingStrategy.reconnect(); + + } + + try ( + MockedStatic serverLifeCycleManagerMockedStatic = + Mockito.mockStatic(ServerLifeCycleManager.class)) { + doNothing().when(stoppable).stop(anyString()); + given(registryClient.getStoppable()) + .willReturn(stoppable); + serverLifeCycleManagerMockedStatic + .when(() -> ServerLifeCycleManager.recoverFromWaiting()) + .thenThrow(new ServerLifeCycleException("")); + workerWaitingStrategy.reconnect(); + } + + try ( + MockedStatic serverLifeCycleManagerMockedStatic = + Mockito.mockStatic(ServerLifeCycleManager.class)) { + serverLifeCycleManagerMockedStatic + .when(() -> ServerLifeCycleManager.recoverFromWaiting()) + .thenAnswer(invocation -> null); + workerWaitingStrategy.reconnect(); + } + } + + @Test + public void testWorkerWaitingStrategydisconnect() { + WorkerWaitingStrategy workerWaitingStrategy = new WorkerWaitingStrategy( + workerConfig, + registryClient, + messageRetryRunner, + workerManagerThread); + Assertions.assertEquals(workerWaitingStrategy.getStrategyType(), StrategyType.WAITING); + + try ( + MockedStatic serverLifeCycleManagerMockedStatic = + Mockito.mockStatic(ServerLifeCycleManager.class)) { + doNothing().when(stoppable).stop(anyString()); + given(registryClient.getStoppable()) + .willReturn(stoppable); + serverLifeCycleManagerMockedStatic + .when(() -> ServerLifeCycleManager.toWaiting()) + .thenThrow(new ServerLifeCycleException("")); + workerWaitingStrategy.disconnect(); + } + + try ( + MockedStatic serverLifeCycleManagerMockedStatic = + Mockito.mockStatic(ServerLifeCycleManager.class)) { + given(connectStrategyProperties.getMaxWaitingTime()).willReturn(Duration.ofSeconds(1)); + given(workerConfig.getRegistryDisconnectStrategy()).willReturn(connectStrategyProperties); + Mockito.reset(registryClient); + doNothing().when(registryClient).connectUntilTimeout(any()); + serverLifeCycleManagerMockedStatic + .when(() -> ServerLifeCycleManager.toWaiting()) + .thenAnswer(invocation -> null); + workerWaitingStrategy.disconnect(); + } + + try ( + MockedStatic serverLifeCycleManagerMockedStatic = + Mockito.mockStatic(ServerLifeCycleManager.class)) { + given(connectStrategyProperties.getMaxWaitingTime()).willReturn(Duration.ofSeconds(1)); + given(workerConfig.getRegistryDisconnectStrategy()).willReturn(connectStrategyProperties); + Mockito.reset(registryClient); + doNothing().when(stoppable).stop(anyString()); + given(registryClient.getStoppable()) + .willReturn(stoppable); + Mockito.doThrow(new RegistryException("TEST")).when(registryClient).connectUntilTimeout(any()); + serverLifeCycleManagerMockedStatic + .when(() -> ServerLifeCycleManager.toWaiting()) + .thenAnswer(invocation -> null); + workerWaitingStrategy.disconnect(); + } + + try ( + MockedStatic serverLifeCycleManagerMockedStatic = + Mockito.mockStatic(ServerLifeCycleManager.class)) { + Mockito.reset(workerConfig); + given(workerConfig.getRegistryDisconnectStrategy()).willThrow(new NullPointerException("")); + doNothing().when(stoppable).stop(anyString()); + given(registryClient.getStoppable()) + .willReturn(stoppable); + serverLifeCycleManagerMockedStatic + .when(() -> ServerLifeCycleManager.toWaiting()) + .thenAnswer(invocation -> null); + workerWaitingStrategy.disconnect(); + } + } +}