diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index a508177010..f966591df4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -79,8 +79,11 @@ public class TaskCallbackService { * @param taskInstanceId taskInstanceId * @return callback channel */ - public NettyRemoteChannel getRemoteChannel(int taskInstanceId){ + private NettyRemoteChannel getRemoteChannel(int taskInstanceId){ NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId); + if(nettyRemoteChannel == null){ + throw new IllegalArgumentException("nettyRemoteChannel is empty, should call addRemoteChannel first"); + } if(nettyRemoteChannel.isActive()){ return nettyRemoteChannel; } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomSelectorTest.java new file mode 100644 index 0000000000..1d7e03e981 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomSelectorTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.master.dispatch.host; + +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RandomSelector; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; + +/** + * random selector + */ +public class RandomSelectorTest { + + @Test(expected = IllegalArgumentException.class) + public void testSelectWithIllegalArgumentException(){ + RandomSelector selector = new RandomSelector(); + selector.select(Collections.EMPTY_LIST); + } + + @Test + public void testSelect1(){ + RandomSelector selector = new RandomSelector(); + String result = selector.select(Arrays.asList("1")); + Assert.assertTrue(StringUtils.isNotEmpty(result)); + Assert.assertTrue(result.equalsIgnoreCase("1")); + } + + @Test + public void testSelect(){ + RandomSelector selector = new RandomSelector(); + int result = selector.select(Arrays.asList(1,2,3,4,5,6,7)); + Assert.assertTrue(result >= 1 && result <= 7); + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinSelectorTest.java new file mode 100644 index 0000000000..a34e667fa4 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinSelectorTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.master.dispatch.host; + +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * round robin selector + */ +public class RoundRobinSelectorTest { + + @Test(expected = IllegalArgumentException.class) + public void testSelectWithIllegalArgumentException(){ + RoundRobinSelector selector = new RoundRobinSelector(); + selector.select(Collections.EMPTY_LIST); + } + + @Test + public void testSelect1(){ + RoundRobinSelector selector = new RoundRobinSelector(); + String result = selector.select(Arrays.asList("1")); + Assert.assertTrue(StringUtils.isNotEmpty(result)); + Assert.assertTrue(result.equalsIgnoreCase("1")); + } + + @Test + public void testSelect(){ + RoundRobinSelector selector = new RoundRobinSelector(); + List sources = Arrays.asList(1, 2, 3, 4, 5, 6, 7); + int result = selector.select(sources); + Assert.assertTrue(result == 1); + int result2 = selector.select(Arrays.asList(1,2,3,4,5,6,7)); + Assert.assertTrue(result2 == 2); + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java new file mode 100644 index 0000000000..5f44e1cee2 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.processor; + +import io.netty.channel.Channel; +import org.apache.dolphinscheduler.remote.NettyRemotingClient; +import org.apache.dolphinscheduler.remote.NettyRemotingServer; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.config.NettyServerConfig; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; +import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; +import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; +import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import java.util.Date; + +/** + * test task call back service + */ +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, MasterRegistry.class, WorkerRegistry.class, + ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, + ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class}) +public class TaskCallbackServiceTest { + + @Autowired + private TaskCallbackService taskCallbackService; + + @Autowired + private MasterRegistry masterRegistry; + + @Test + public void testSendAck(){ + final NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(30000); + NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); + nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class)); + nettyRemotingServer.start(); + + final NettyClientConfig clientConfig = new NettyClientConfig(); + NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig); + Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000")); + taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1)); + TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); + ackCommand.setTaskInstanceId(1); + ackCommand.setStartTime(new Date()); + taskCallbackService.sendAck(1, ackCommand.convert2Command()); + + nettyRemotingServer.close(); + nettyRemotingClient.close(); + } + + @Test(expected = IllegalArgumentException.class) + public void testSendAckWithIllegalArgumentException(){ + TaskExecuteAckCommand ackCommand = Mockito.mock(TaskExecuteAckCommand.class); + taskCallbackService.sendAck(1, ackCommand.convert2Command()); + } + + @Test(expected = IllegalStateException.class) + public void testSendAckWithIllegalStateException1(){ + final NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(30000); + NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); + nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class)); + nettyRemotingServer.start(); + + final NettyClientConfig clientConfig = new NettyClientConfig(); + NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig); + Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000")); + taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1)); + channel.close(); + TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); + ackCommand.setTaskInstanceId(1); + ackCommand.setStartTime(new Date()); + + nettyRemotingServer.close(); + taskCallbackService.sendAck(1, ackCommand.convert2Command()); + } + + @Test(expected = IllegalStateException.class) + public void testSendAckWithIllegalStateException2(){ + masterRegistry.registry(); + final NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(30000); + NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); + nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, Mockito.mock(TaskAckProcessor.class)); + nettyRemotingServer.start(); + + final NettyClientConfig clientConfig = new NettyClientConfig(); + NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig); + Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000")); + taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1)); + channel.close(); + TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); + ackCommand.setTaskInstanceId(1); + ackCommand.setStartTime(new Date()); + + nettyRemotingServer.close(); + taskCallbackService.sendAck(1, ackCommand.convert2Command()); + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java new file mode 100644 index 0000000000..e6dd8e721e --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.processor; + +import org.apache.dolphinscheduler.dao.AlertDao; +import org.apache.dolphinscheduler.dao.mapper.*; +import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.mockito.Mockito; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * dependency config + */ +@Configuration +public class TaskCallbackServiceTestConfig { + + + @Bean + public AlertDao alertDao() { + return new AlertDao(); + } + + @Bean + public AlertMapper alertMapper() { + return Mockito.mock(AlertMapper.class); + } + + @Bean + public UserAlertGroupMapper userAlertGroupMapper() { + return Mockito.mock(UserAlertGroupMapper.class); + } + + @Bean + public TaskInstanceCacheManagerImpl taskInstanceCacheManagerImpl(){ + return Mockito.mock(TaskInstanceCacheManagerImpl.class); + } + + @Bean + public ProcessService processService(){ + return Mockito.mock(ProcessService.class); + } + + @Bean + public UserMapper userMapper(){ + return Mockito.mock(UserMapper.class); + } + + @Bean + public ProcessDefinitionMapper processDefineMapper(){ + return Mockito.mock(ProcessDefinitionMapper.class); + } + + @Bean + public ProcessInstanceMapper processInstanceMapper(){ + return Mockito.mock(ProcessInstanceMapper.class); + } + + @Bean + public DataSourceMapper dataSourceMapper(){ + return Mockito.mock(DataSourceMapper.class); + } + + @Bean + public ProcessInstanceMapMapper processInstanceMapMapper(){ + return Mockito.mock(ProcessInstanceMapMapper.class); + } + + @Bean + public TaskInstanceMapper taskInstanceMapper(){ + return Mockito.mock(TaskInstanceMapper.class); + } + + @Bean + public CommandMapper commandMapper(){ + return Mockito.mock(CommandMapper.class); + } + + @Bean + public ScheduleMapper scheduleMapper(){ + return Mockito.mock(ScheduleMapper.class); + } + + @Bean + public UdfFuncMapper udfFuncMapper(){ + return Mockito.mock(UdfFuncMapper.class); + } + + @Bean + public ResourceMapper resourceMapper(){ + return Mockito.mock(ResourceMapper.class); + } + + @Bean + public WorkerGroupMapper workerGroupMapper(){ + return Mockito.mock(WorkerGroupMapper.class); + } + + @Bean + public ErrorCommandMapper errorCommandMapper(){ + return Mockito.mock(ErrorCommandMapper.class); + } + + @Bean + public TenantMapper tenantMapper(){ + return Mockito.mock(TenantMapper.class); + } + + @Bean + public ProjectMapper projectMapper(){ + return Mockito.mock(ProjectMapper.class); + } + +}