diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 292bfaea2c..9ad359765f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.master; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; import org.apache.dolphinscheduler.common.thread.ThreadUtils; @@ -26,19 +25,16 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.consumer.TaskUpdateQueueConsumer; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread; -import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; -import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,12 +80,6 @@ public class MasterServer { @Autowired private MasterConfig masterConfig; - /** - * zookeeper registry center - */ - @Autowired - private ZookeeperRegistryCenter zookeeperRegistryCenter; - /** * spring application context * only use it for initialization @@ -105,6 +95,7 @@ public class MasterServer { /** * master registry */ + @Autowired private MasterRegistry masterRegistry; /** @@ -126,7 +117,7 @@ public class MasterServer { //init remoting server NettyServerConfig serverConfig = new NettyServerConfig(); - serverConfig.setListenPort(45678); + serverConfig.setListenPort(masterConfig.getListenPort()); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); @@ -134,7 +125,6 @@ public class MasterServer { this.nettyRemotingServer.start(); // - this.masterRegistry = new MasterRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), masterConfig.getMasterHeartbeatInterval()); this.masterRegistry.registry(); // @@ -166,8 +156,6 @@ public class MasterServer { logger.error("start Quartz failed", e); } - TaskUpdateQueueConsumer taskUpdateQueueConsumer = SpringApplicationContext.getBean(TaskUpdateQueueConsumer.class); - taskUpdateQueueConsumer.start(); /** * register hooks, which are called before the process exits */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index e8a8ecbe43..7e6ae5618a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -46,6 +46,17 @@ public class MasterConfig { @Value("${master.host.selector:lowerWeight}") private String hostSelector; + @Value("${master.listen.port:45678}") + private int listenPort; + + public int getListenPort() { + return listenPort; + } + + public void setListenPort(int listenPort) { + this.listenPort = listenPort; + } + public String getHostSelector() { return hostSelector; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java index cccc700aea..e3957afc26 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java @@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; + /** * TaskUpdateQueue consumer */ @@ -66,6 +68,12 @@ public class TaskUpdateQueueConsumer extends Thread{ @Autowired private ExecutorDispatcher dispatcher; + @PostConstruct + public void init(){ + super.setName("TaskUpdateQueueConsumerThread"); + super.start(); + } + @Override public void run() { while (Stopper.isRunning()){ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java index 1eb06b6d65..0402520e57 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java @@ -23,10 +23,14 @@ import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import javax.annotation.PostConstruct; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -37,6 +41,7 @@ import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; /** * master registry */ +@Service public class MasterRegistry { private final Logger logger = LoggerFactory.getLogger(MasterRegistry.class); @@ -44,38 +49,28 @@ public class MasterRegistry { /** * zookeeper registry center */ - private final ZookeeperRegistryCenter zookeeperRegistryCenter; + @Autowired + private ZookeeperRegistryCenter zookeeperRegistryCenter; /** - * port + * master config */ - private final int port; - - /** - * heartbeat interval - */ - private final long heartBeatInterval; + @Autowired + private MasterConfig masterConfig; /** * heartbeat executor */ - private final ScheduledExecutorService heartBeatExecutor; + private ScheduledExecutorService heartBeatExecutor; /** * worker start time */ - private final String startTime; + private String startTime; - /** - * construct - * @param zookeeperRegistryCenter zookeeperRegistryCenter - * @param port port - * @param heartBeatInterval heartBeatInterval - */ - public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){ - this.zookeeperRegistryCenter = zookeeperRegistryCenter; - this.port = port; - this.heartBeatInterval = heartBeatInterval; + + @PostConstruct + public void init(){ this.startTime = DateUtils.dateToString(new Date()); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); } @@ -100,8 +95,9 @@ public class MasterRegistry { } } }); - this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS); - logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, heartBeatInterval); + int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval(); + this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); + logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval); } /** @@ -129,7 +125,7 @@ public class MasterRegistry { * @return */ private String getLocalAddress(){ - return Constants.LOCAL_ADDRESS + ":" + port; + return Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort(); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 068d546b0c..441e8db892 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -19,11 +19,9 @@ package org.apache.dolphinscheduler.server.worker; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; -import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; @@ -37,8 +35,6 @@ import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.ComponentScan; import javax.annotation.PostConstruct; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; /** * worker server @@ -51,18 +47,6 @@ public class WorkerServer { */ private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class); - /** - * worker config - */ - @Autowired - private WorkerConfig workerConfig; - - /** - * zookeeper registry center - */ - @Autowired - private ZookeeperRegistryCenter zookeeperRegistryCenter; - /** * netty remote server */ @@ -71,8 +55,15 @@ public class WorkerServer { /** * worker registry */ + @Autowired private WorkerRegistry workerRegistry; + /** + * worker config + */ + @Autowired + private WorkerConfig workerConfig; + /** * spring application context * only use it for initialization @@ -87,6 +78,7 @@ public class WorkerServer { * @param args arguments */ public static void main(String[] args) { + System.setProperty("spring.profiles.active","worker"); Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER); new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args); } @@ -101,12 +93,13 @@ public class WorkerServer { //init remoting server NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(workerConfig.getListenPort()); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor()); this.nettyRemotingServer.start(); - this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), workerConfig.getWorkerGroup()); + // this.workerRegistry.registry(); /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 3c7500aa8b..f3e701b6c9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -40,6 +40,17 @@ public class WorkerConfig { @Value("${worker.group: default}") private String workerGroup; + @Value("${worker.listen.port: 12345}") + private int listenPort; + + public int getListenPort() { + return listenPort; + } + + public void setListenPort(int listenPort) { + this.listenPort = listenPort; + } + public String getWorkerGroup() { return workerGroup; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index a1d55240b2..b42386a200 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -25,9 +25,13 @@ import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import javax.annotation.PostConstruct; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -41,6 +45,7 @@ import static org.apache.dolphinscheduler.common.Constants.SLASH; /** * worker registry */ +@Service public class WorkerRegistry { private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class); @@ -48,54 +53,31 @@ public class WorkerRegistry { /** * zookeeper registry center */ - private final ZookeeperRegistryCenter zookeeperRegistryCenter; + @Autowired + private ZookeeperRegistryCenter zookeeperRegistryCenter; /** - * port + * worker config */ - private final int port; - - /** - * heartbeat interval - */ - private final long heartBeatInterval; + @Autowired + private WorkerConfig workerConfig; /** * heartbeat executor */ - private final ScheduledExecutorService heartBeatExecutor; + private ScheduledExecutorService heartBeatExecutor; /** * worker start time */ - private final String startTime; + private String startTime; - /** - * worker group - */ - private String workerGroup; - /** - * construct - * - * @param zookeeperRegistryCenter zookeeperRegistryCenter - * @param port port - * @param heartBeatInterval heartBeatInterval - */ - public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){ - this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_WORKER_GROUP); - } + private String workerGroup; - /** - * construct - * @param zookeeperRegistryCenter zookeeperRegistryCenter - * @param port port - */ - public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval, String workerGroup){ - this.zookeeperRegistryCenter = zookeeperRegistryCenter; - this.port = port; - this.heartBeatInterval = heartBeatInterval; - this.workerGroup = workerGroup; + @PostConstruct + public void init(){ + this.workerGroup = workerConfig.getWorkerGroup(); this.startTime = DateUtils.dateToString(new Date()); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); } @@ -120,8 +102,9 @@ public class WorkerRegistry { } } }); - this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS); - logger.info("worker node : {} registry to ZK successfully with heartBeatInterval : {}s", address, heartBeatInterval); + int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval(); + this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); + logger.info("worker node : {} registry to ZK successfully with heartBeatInterval : {}s", address, workerHeartbeatInterval); } @@ -159,7 +142,7 @@ public class WorkerRegistry { * @return */ private String getLocalAddress(){ - return Constants.LOCAL_ADDRESS + ":" + port; + return Constants.LOCAL_ADDRESS + ":" + workerConfig.getListenPort(); } /**