From d678447ecc755e7b8eea44f9257960dd7f2bf053 Mon Sep 17 00:00:00 2001 From: Tboy Date: Sun, 22 Mar 2020 15:33:42 +0800 Subject: [PATCH] Refactor worker (#2266) * let quartz use the same datasource * move master/worker config from dao.properties to each config add master/worker registry test * move mybatis config from application.properties to SpringConnectionFactory * move mybatis-plus config from application.properties to SpringConnectionFactory * refactor TaskCallbackService * add ZookeeperNodeManagerTest * add NettyExecutorManagerTest --- .../dao/entity/ProcessInstance.java | 2 +- .../dispatch/context/ExecutionContext.java | 5 +- .../executor/NettyExecutorManager.java | 9 +- .../executor/NettyExecutorManagerTest.java | 104 ++++++++++++++++++ .../server/registry/DependencyConfig.java | 92 +++++++++++++++- 5 files changed, 205 insertions(+), 7 deletions(-) create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index 77e148a8f0..720232f771 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -487,7 +487,7 @@ public class ProcessInstance { * @return whether complement data */ public Boolean isComplementData(){ - if(!StringUtils.isNotEmpty(this.historyCmd)){ + if(StringUtils.isEmpty(this.historyCmd)){ return false; } return historyCmd.startsWith(CommandType.COMPLEMENT_DATA.toString()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java index 9d04511848..fd673ca678 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java @@ -21,6 +21,8 @@ import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; +import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; + /** * execution context */ @@ -48,8 +50,7 @@ public class ExecutionContext { public ExecutionContext(Command command, ExecutorType executorType) { - this.command = command; - this.executorType = executorType; + this(command, executorType, DEFAULT_WORKER_GROUP); } public ExecutionContext(Command command, ExecutorType executorType, String workerGroup) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index 7c6517d6a0..7ded3b0056 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import javax.annotation.PostConstruct; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -59,9 +60,16 @@ public class NettyExecutorManager extends AbstractExecutorManager{ */ private final NettyRemotingClient nettyRemotingClient; + /** + * constructor + */ public NettyExecutorManager(){ final NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); + } + + @PostConstruct + public void init(){ /** * register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor * register EXECUTE_TASK_ACK command type TaskAckProcessor @@ -71,7 +79,6 @@ public class NettyExecutorManager extends AbstractExecutorManager{ this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); } - /** * execute logic * @param context context diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java new file mode 100644 index 0000000000..5955f46056 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.master.dispatch.executor; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.remote.NettyRemotingServer; +import org.apache.dolphinscheduler.remote.config.NettyServerConfig; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; +import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; +import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; +import org.apache.dolphinscheduler.server.registry.DependencyConfig; +import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; +import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; +import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +/** + * netty executor manager test + */ +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, WorkerRegistry.class, + ZookeeperNodeManager.class, ZookeeperRegistryCenter.class, WorkerConfig.class, + ZookeeperCachedOperator.class, ZookeeperConfig.class, SpringApplicationContext.class, NettyExecutorManager.class}) +public class NettyExecutorManagerTest { + + @Autowired + private NettyExecutorManager nettyExecutorManager; + + + @Test + public void testExecute() throws ExecuteException{ + final NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(30000); + NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); + nettyRemotingServer.registerProcessor(org.apache.dolphinscheduler.remote.command.CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor()); + nettyRemotingServer.start(); + TaskInstance taskInstance = Mockito.mock(TaskInstance.class); + ProcessDefinition processDefinition = Mockito.mock(ProcessDefinition.class); + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setCommandType(CommandType.COMPLEMENT_DATA); + taskInstance.setProcessInstance(processInstance); + TaskExecutionContext context = TaskExecutionContextBuilder.get() + .buildTaskInstanceRelatedInfo(taskInstance) + .buildProcessInstanceRelatedInfo(processInstance) + .buildProcessDefinitionRelatedInfo(processDefinition) + .create(); + ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER); + executionContext.setHost(Host.of(OSUtils.getHost() + ":" + serverConfig.getListenPort())); + Boolean execute = nettyExecutorManager.execute(executionContext); + Assert.assertTrue(execute); + nettyRemotingServer.close(); + } + + @Test(expected = ExecuteException.class) + public void testExecuteWithException() throws ExecuteException{ + TaskInstance taskInstance = Mockito.mock(TaskInstance.class); + ProcessDefinition processDefinition = Mockito.mock(ProcessDefinition.class); + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setCommandType(CommandType.COMPLEMENT_DATA); + taskInstance.setProcessInstance(processInstance); + TaskExecutionContext context = TaskExecutionContextBuilder.get() + .buildTaskInstanceRelatedInfo(taskInstance) + .buildProcessInstanceRelatedInfo(processInstance) + .buildProcessDefinitionRelatedInfo(processDefinition) + .create(); + ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER); + executionContext.setHost(Host.of(OSUtils.getHost() + ":4444")); + nettyExecutorManager.execute(executionContext); + + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java index 9f8b0b2962..cd5a221a8b 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java @@ -18,14 +18,16 @@ package org.apache.dolphinscheduler.server.registry; import org.apache.dolphinscheduler.dao.AlertDao; -import org.apache.dolphinscheduler.dao.mapper.AlertMapper; -import org.apache.dolphinscheduler.dao.mapper.UserAlertGroupMapper; +import org.apache.dolphinscheduler.dao.mapper.*; +import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; +import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; +import org.apache.dolphinscheduler.service.process.ProcessService; import org.mockito.Mockito; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** - * dependency config for ZookeeperNodeManager + * dependency config */ @Configuration public class DependencyConfig { @@ -45,4 +47,88 @@ public class DependencyConfig { return Mockito.mock(UserAlertGroupMapper.class); } + @Bean + public TaskInstanceCacheManagerImpl taskInstanceCacheManagerImpl(){ + return Mockito.mock(TaskInstanceCacheManagerImpl.class); + } + + @Bean + public ProcessService processService(){ + return Mockito.mock(ProcessService.class); + } + + @Bean + public UserMapper userMapper(){ + return Mockito.mock(UserMapper.class); + } + + @Bean + public ProcessDefinitionMapper processDefineMapper(){ + return Mockito.mock(ProcessDefinitionMapper.class); + } + + @Bean + public ProcessInstanceMapper processInstanceMapper(){ + return Mockito.mock(ProcessInstanceMapper.class); + } + + @Bean + public DataSourceMapper dataSourceMapper(){ + return Mockito.mock(DataSourceMapper.class); + } + + @Bean + public ProcessInstanceMapMapper processInstanceMapMapper(){ + return Mockito.mock(ProcessInstanceMapMapper.class); + } + + @Bean + public TaskInstanceMapper taskInstanceMapper(){ + return Mockito.mock(TaskInstanceMapper.class); + } + + @Bean + public CommandMapper commandMapper(){ + return Mockito.mock(CommandMapper.class); + } + + @Bean + public ScheduleMapper scheduleMapper(){ + return Mockito.mock(ScheduleMapper.class); + } + + @Bean + public UdfFuncMapper udfFuncMapper(){ + return Mockito.mock(UdfFuncMapper.class); + } + + @Bean + public ResourceMapper resourceMapper(){ + return Mockito.mock(ResourceMapper.class); + } + + @Bean + public WorkerGroupMapper workerGroupMapper(){ + return Mockito.mock(WorkerGroupMapper.class); + } + + @Bean + public ErrorCommandMapper errorCommandMapper(){ + return Mockito.mock(ErrorCommandMapper.class); + } + + @Bean + public TenantMapper tenantMapper(){ + return Mockito.mock(TenantMapper.class); + } + + @Bean + public ProjectMapper projectMapper(){ + return Mockito.mock(ProjectMapper.class); + } + + @Bean + public TaskCallbackService taskCallbackService(){ + return Mockito.mock(TaskCallbackService.class); + } }