Browse Source

Refactor worker (#2266)

* let quartz use the same datasource

* move master/worker config from dao.properties to each config
add master/worker registry test

* move mybatis config from application.properties to SpringConnectionFactory

* move mybatis-plus config from application.properties to SpringConnectionFactory

* refactor TaskCallbackService

* add ZookeeperNodeManagerTest

* add NettyExecutorManagerTest
pull/2/head
Tboy 5 years ago committed by GitHub
parent
commit
d678447ecc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
  2. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
  3. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
  4. 104
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
  5. 92
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java

@ -487,7 +487,7 @@ public class ProcessInstance {
* @return whether complement data
*/
public Boolean isComplementData(){
if(!StringUtils.isNotEmpty(this.historyCmd)){
if(StringUtils.isEmpty(this.historyCmd)){
return false;
}
return historyCmd.startsWith(CommandType.COMPLEMENT_DATA.toString());

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java

@ -21,6 +21,8 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
/**
* execution context
*/
@ -48,8 +50,7 @@ public class ExecutionContext {
public ExecutionContext(Command command, ExecutorType executorType) {
this.command = command;
this.executorType = executorType;
this(command, executorType, DEFAULT_WORKER_GROUP);
}
public ExecutionContext(Command command, ExecutorType executorType, String workerGroup) {

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java

@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@ -59,9 +60,16 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
*/
private final NettyRemotingClient nettyRemotingClient;
/**
* constructor
*/
public NettyExecutorManager(){
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
@PostConstruct
public void init(){
/**
* register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor
* register EXECUTE_TASK_ACK command type TaskAckProcessor
@ -71,7 +79,6 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
}
/**
* execute logic
* @param context context

104
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.executor;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.registry.DependencyConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* netty executor manager test
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, WorkerRegistry.class,
ZookeeperNodeManager.class, ZookeeperRegistryCenter.class, WorkerConfig.class,
ZookeeperCachedOperator.class, ZookeeperConfig.class, SpringApplicationContext.class, NettyExecutorManager.class})
public class NettyExecutorManagerTest {
@Autowired
private NettyExecutorManager nettyExecutorManager;
@Test
public void testExecute() throws ExecuteException{
final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000);
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(org.apache.dolphinscheduler.remote.command.CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
nettyRemotingServer.start();
TaskInstance taskInstance = Mockito.mock(TaskInstance.class);
ProcessDefinition processDefinition = Mockito.mock(ProcessDefinition.class);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setCommandType(CommandType.COMPLEMENT_DATA);
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext context = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER);
executionContext.setHost(Host.of(OSUtils.getHost() + ":" + serverConfig.getListenPort()));
Boolean execute = nettyExecutorManager.execute(executionContext);
Assert.assertTrue(execute);
nettyRemotingServer.close();
}
@Test(expected = ExecuteException.class)
public void testExecuteWithException() throws ExecuteException{
TaskInstance taskInstance = Mockito.mock(TaskInstance.class);
ProcessDefinition processDefinition = Mockito.mock(ProcessDefinition.class);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setCommandType(CommandType.COMPLEMENT_DATA);
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext context = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER);
executionContext.setHost(Host.of(OSUtils.getHost() + ":4444"));
nettyExecutorManager.execute(executionContext);
}
}

92
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java

@ -18,14 +18,16 @@
package org.apache.dolphinscheduler.server.registry;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.mapper.AlertMapper;
import org.apache.dolphinscheduler.dao.mapper.UserAlertGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.mockito.Mockito;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* dependency config for ZookeeperNodeManager
* dependency config
*/
@Configuration
public class DependencyConfig {
@ -45,4 +47,88 @@ public class DependencyConfig {
return Mockito.mock(UserAlertGroupMapper.class);
}
@Bean
public TaskInstanceCacheManagerImpl taskInstanceCacheManagerImpl(){
return Mockito.mock(TaskInstanceCacheManagerImpl.class);
}
@Bean
public ProcessService processService(){
return Mockito.mock(ProcessService.class);
}
@Bean
public UserMapper userMapper(){
return Mockito.mock(UserMapper.class);
}
@Bean
public ProcessDefinitionMapper processDefineMapper(){
return Mockito.mock(ProcessDefinitionMapper.class);
}
@Bean
public ProcessInstanceMapper processInstanceMapper(){
return Mockito.mock(ProcessInstanceMapper.class);
}
@Bean
public DataSourceMapper dataSourceMapper(){
return Mockito.mock(DataSourceMapper.class);
}
@Bean
public ProcessInstanceMapMapper processInstanceMapMapper(){
return Mockito.mock(ProcessInstanceMapMapper.class);
}
@Bean
public TaskInstanceMapper taskInstanceMapper(){
return Mockito.mock(TaskInstanceMapper.class);
}
@Bean
public CommandMapper commandMapper(){
return Mockito.mock(CommandMapper.class);
}
@Bean
public ScheduleMapper scheduleMapper(){
return Mockito.mock(ScheduleMapper.class);
}
@Bean
public UdfFuncMapper udfFuncMapper(){
return Mockito.mock(UdfFuncMapper.class);
}
@Bean
public ResourceMapper resourceMapper(){
return Mockito.mock(ResourceMapper.class);
}
@Bean
public WorkerGroupMapper workerGroupMapper(){
return Mockito.mock(WorkerGroupMapper.class);
}
@Bean
public ErrorCommandMapper errorCommandMapper(){
return Mockito.mock(ErrorCommandMapper.class);
}
@Bean
public TenantMapper tenantMapper(){
return Mockito.mock(TenantMapper.class);
}
@Bean
public ProjectMapper projectMapper(){
return Mockito.mock(ProjectMapper.class);
}
@Bean
public TaskCallbackService taskCallbackService(){
return Mockito.mock(TaskCallbackService.class);
}
}

Loading…
Cancel
Save