Browse Source

[IMPROVEMENT-8178] Add Netty processor in Spring container (#8179)

* Add Netty processor in Spring container
3.0.0/version-upgrade
Wenjun Ruan 3 years ago committed by GitHub
parent
commit
f88def8ef7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      .gitignore
  2. 2
      dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
  3. 9
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  4. 9
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
  5. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
  6. 3
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
  7. 34
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  8. 34
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/BeanConfig.java
  9. 3
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
  10. 2
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
  11. 2
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
  12. 29
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  13. 24
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  14. 40
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/BeanConfigTest.java
  15. 0
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
  16. 2
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
  17. 0
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java

1
.gitignore vendored

@ -47,6 +47,7 @@ dolphinscheduler-ui/dist
dolphinscheduler-ui/node
dolphinscheduler-common/sql
dolphinscheduler-common/test
dolphinscheduler-worker/logs
# ------------------
# pydolphinscheduler

2
dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java

@ -48,12 +48,14 @@ import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import io.netty.channel.Channel;
/**
* logger request process logic
*/
@Component
public class LoggerRequestProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(LoggerRequestProcessor.class);

9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -89,12 +89,18 @@ public class MasterServer implements IStoppable {
@Autowired
private CacheProcessor cacheProcessor;
@Autowired
private TaskKillResponseProcessor taskKillResponseProcessor;
@Autowired
private EventExecuteService eventExecuteService;
@Autowired
private FailoverExecuteThread failoverExecuteThread;
@Autowired
private LoggerRequestProcessor loggerRequestProcessor;
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
SpringApplication.run(MasterServer.class);
@ -111,14 +117,13 @@ public class MasterServer implements IStoppable {
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
// logger server
LoggerRequestProcessor loggerRequestProcessor = new LoggerRequestProcessor();
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);

9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java

@ -62,6 +62,9 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
@Autowired
private TaskAckProcessor taskAckProcessor;
@Autowired
private TaskKillResponseProcessor taskKillResponseProcessor;
@Autowired
private TaskResponseProcessor taskResponseProcessor;
@ -80,13 +83,9 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
@PostConstruct
public void init(){
/**
* register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor
* register EXECUTE_TASK_ACK command type TaskAckProcessor
*/
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
}
/**

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@ -33,6 +34,7 @@ import io.netty.channel.Channel;
/**
* task response processor
*/
@Component
public class TaskKillResponseProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskKillResponseProcessor.class);

3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java

@ -28,7 +28,7 @@ import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AlertClientService {
public class AlertClientService implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(AlertClientService.class);
@ -70,6 +70,7 @@ public class AlertClientService {
/**
* close
*/
@Override
public void close() {
this.client.close();
this.isRunning = false;

34
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -81,6 +81,7 @@ public class WorkerServer implements IStoppable {
/**
* alert model netty remote server
*/
@Autowired
private AlertClientService alertClientService;
@Autowired
@ -98,6 +99,24 @@ public class WorkerServer implements IStoppable {
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private TaskExecuteProcessor taskExecuteProcessor;
@Autowired
private TaskKillProcessor taskKillProcessor;
@Autowired
private DBTaskAckProcessor dbTaskAckProcessor;
@Autowired
private DBTaskResponseProcessor dbTaskResponseProcessor;
@Autowired
private HostUpdateProcessor hostUpdateProcessor;
@Autowired
private LoggerRequestProcessor loggerRequestProcessor;
/**
* worker server startup, not use web service
*
@ -113,22 +132,17 @@ public class WorkerServer implements IStoppable {
*/
@PostConstruct
public void run() {
// alert-server client registry
alertClientService = new AlertClientService(workerConfig.getAlertListenHost(),
workerConfig.getAlertListenPort());
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor(alertClientService, taskPluginManager));
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, new HostUpdateProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, dbTaskAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, dbTaskResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
// logger server
LoggerRequestProcessor loggerRequestProcessor = new LoggerRequestProcessor();
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);

34
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/BeanConfig.java

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.config;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BeanConfig {
@Bean
public AlertClientService alertClientService(WorkerConfig workerConfig) {
return new AlertClientService(
workerConfig.getAlertListenHost(),
workerConfig.getAlertListenPort());
}
}

3
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java

@ -25,17 +25,18 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
/**
* db task ack processor
*/
@Component
public class DBTaskAckProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(DBTaskAckProcessor.class);
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.DB_TASK_ACK == command.getType(),

2
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java

@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@ -35,6 +36,7 @@ import io.netty.channel.Channel;
/**
* db task response processor
*/
@Component
public class DBTaskResponseProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(DBTaskResponseProcessor.class);

2
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java

@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@ -36,6 +37,7 @@ import io.netty.channel.Channel;
* update process host
* this used when master failover
*/
@Component
public class HostUpdateProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(HostUpdateProcessor.class);

29
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -40,7 +40,6 @@ import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
@ -49,6 +48,8 @@ import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@ -57,6 +58,7 @@ import io.netty.channel.Channel;
/**
* worker request processor
*/
@Component
public class TaskExecuteProcessor implements NettyRequestProcessor {
private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
@ -64,30 +66,29 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
/**
* worker config
*/
private final WorkerConfig workerConfig;
@Autowired
private WorkerConfig workerConfig;
/**
* task callback service
*/
private final TaskCallbackService taskCallbackService;
@Autowired
private TaskCallbackService taskCallbackService;
/**
* alert client service
*/
@Autowired
private AlertClientService alertClientService;
@Autowired
private TaskPluginManager taskPluginManager;
/*
/**
* task execute manager
*/
private final WorkerManagerThread workerManager;
public TaskExecuteProcessor() {
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerManager = SpringApplicationContext.getBean(WorkerManagerThread.class);
}
@Autowired
private WorkerManagerThread workerManager;
/**
* Pre-cache task to avoid extreme situations when kill task. There is no such task in the cache
@ -101,12 +102,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskRequest);
}
public TaskExecuteProcessor(AlertClientService alertClientService, TaskPluginManager taskPluginManager) {
this();
this.alertClientService = alertClientService;
this.taskPluginManager = taskPluginManager;
}
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),

24
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -30,9 +30,7 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
@ -45,6 +43,8 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
@ -53,30 +53,22 @@ import io.netty.channel.Channel;
/**
* task kill processor
*/
@Component
public class TaskKillProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskKillProcessor.class);
/**
* worker config
*/
private final WorkerConfig workerConfig;
/**
* task callback service
*/
private final TaskCallbackService taskCallbackService;
@Autowired
private TaskCallbackService taskCallbackService;
/*
/**
* task execute manager
*/
private final WorkerManagerThread workerManager;
public TaskKillProcessor() {
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerManager = SpringApplicationContext.getBean(WorkerManagerThread.class);
}
@Autowired
private WorkerManagerThread workerManager;
/**
* task kill process

40
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/config/BeanConfigTest.java

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.config;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = {BeanConfig.class, WorkerConfig.class})
public class BeanConfigTest {
@Autowired
private AlertClientService alertClientService;
@Test
public void alertClientService() {
Assert.assertNotNull(alertClientService);
}
}

0
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/processor/TaskCallbackServiceTestConfig.java → dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java

2
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/processor/TaskExecuteProcessorTest.java → dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java

@ -49,7 +49,6 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* test task execute processor
@ -74,7 +73,6 @@ public class TaskExecuteProcessorTest {
private TaskExecuteRequestCommand taskRequestCommand;
private AlertClientService alertClientService;
private WorkerManagerThread workerManager;

0
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/registry/WorkerRegistryClientTest.java → dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java

Loading…
Cancel
Save