diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 7af48d14d2..9c16ce697e 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -775,7 +775,7 @@ public final class Constants { */ public static final String PSTREE = "pstree"; - public static final Boolean KUBERNETES_MODE = !StringUtils.isEmpty(System.getenv("KUBERNETES_SERVICE_HOST")) && !StringUtils.isEmpty(System.getenv("KUBERNETES_SERVICE_PORT")); + public static final boolean KUBERNETES_MODE = !StringUtils.isEmpty(System.getenv("KUBERNETES_SERVICE_HOST")) && !StringUtils.isEmpty(System.getenv("KUBERNETES_SERVICE_PORT")); /** * dry run flag diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index a7145fc049..52ab59c87d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -20,20 +20,9 @@ package org.apache.dolphinscheduler.server.master; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.remote.NettyRemotingServer; -import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.scheduler.api.SchedulerApi; -import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.processor.CacheProcessor; -import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor; -import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor; -import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor; -import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor; -import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; -import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; +import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer; import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; @@ -42,7 +31,6 @@ import org.apache.dolphinscheduler.service.task.TaskPluginManager; import javax.annotation.PostConstruct; -import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,14 +48,9 @@ import org.springframework.transaction.annotation.EnableTransactionManagement; public class MasterServer implements IStoppable { private static final Logger logger = LoggerFactory.getLogger(MasterServer.class); - @Autowired - private MasterConfig masterConfig; - @Autowired private SpringApplicationContext springApplicationContext; - private NettyRemotingServer nettyRemotingServer; - @Autowired private MasterRegistryClient masterRegistryClient; @@ -80,27 +63,6 @@ public class MasterServer implements IStoppable { @Autowired private SchedulerApi schedulerApi; - @Autowired - private TaskExecuteRunningProcessor taskExecuteRunningProcessor; - - @Autowired - private TaskExecuteResponseProcessor taskExecuteResponseProcessor; - - @Autowired - private TaskEventProcessor taskEventProcessor; - - @Autowired - private StateEventProcessor stateEventProcessor; - - @Autowired - private CacheProcessor cacheProcessor; - - @Autowired - private TaskKillResponseProcessor taskKillResponseProcessor; - - @Autowired - private TaskRecallProcessor taskRecallProcessor; - @Autowired private EventExecuteService eventExecuteService; @@ -108,7 +70,7 @@ public class MasterServer implements IStoppable { private FailoverExecuteThread failoverExecuteThread; @Autowired - private LoggerRequestProcessor loggerRequestProcessor; + private MasterRPCServer masterRPCServer; public static void main(String[] args) { Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER); @@ -120,26 +82,8 @@ public class MasterServer implements IStoppable { */ @PostConstruct public void run() throws SchedulerException { - // init remoting server - NettyServerConfig serverConfig = new NettyServerConfig(); - serverConfig.setListenPort(masterConfig.getListenPort()); - this.nettyRemotingServer = new NettyRemotingServer(serverConfig); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor); - - // logger server - this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor); - - this.nettyRemotingServer.start(); + // init rpc server + this.masterRPCServer.start(); // install task plugin this.taskPluginManager.installPlugin(); @@ -174,6 +118,7 @@ public class MasterServer implements IStoppable { try { // execute only once if (Stopper.isStopped()) { + logger.warn("MasterServer has been stopped ..., current cause: {}", cause); return; } @@ -191,10 +136,13 @@ public class MasterServer implements IStoppable { // close this.schedulerApi.close(); this.masterSchedulerService.close(); - this.nettyRemotingServer.close(); + this.masterRPCServer.close(); this.masterRegistryClient.closeRegistry(); - // close spring Context and will invoke method with @PreDestroy annotation to destory beans. like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc + // close spring Context and will invoke method with @PreDestroy annotation to destroy beans. + // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc springApplicationContext.close(); + + logger.info("MasterServer stopped..."); } catch (Exception e) { logger.error("master server stop exception ", e); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java index 5fdf64493c..2f5f6dc472 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.server.master.cache; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import java.util.Collection; @@ -32,7 +32,7 @@ public interface ProcessInstanceExecCacheManager { * @param processInstanceId processInstanceId * @return WorkflowExecuteThread */ - WorkflowExecuteThread getByProcessInstanceId(int processInstanceId); + WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId); /** * judge the process instance does it exist @@ -55,12 +55,12 @@ public interface ProcessInstanceExecCacheManager { * @param processInstanceId processInstanceId * @param workflowExecuteThread if it is null, will not be cached */ - void cache(int processInstanceId, WorkflowExecuteThread workflowExecuteThread); + void cache(int processInstanceId, WorkflowExecuteRunnable workflowExecuteThread); /** * get all WorkflowExecuteThread from cache * * @return all WorkflowExecuteThread in cache */ - Collection getAll(); + Collection getAll(); } \ No newline at end of file diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java index 1d0ab4841a..137bd4ecc5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java @@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.server.master.cache.impl; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; @@ -33,10 +33,10 @@ import com.google.common.collect.ImmutableList; @Component public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecCacheManager { - private final ConcurrentHashMap processInstanceExecMaps = new ConcurrentHashMap<>(); + private final ConcurrentHashMap processInstanceExecMaps = new ConcurrentHashMap<>(); @Override - public WorkflowExecuteThread getByProcessInstanceId(int processInstanceId) { + public WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId) { return processInstanceExecMaps.get(processInstanceId); } @@ -51,7 +51,7 @@ public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecC } @Override - public void cache(int processInstanceId, WorkflowExecuteThread workflowExecuteThread) { + public void cache(int processInstanceId, WorkflowExecuteRunnable workflowExecuteThread) { if (workflowExecuteThread == null) { return; } @@ -59,7 +59,7 @@ public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecC } @Override - public Collection getAll() { + public Collection getAll() { return ImmutableList.copyOf(processInstanceExecMaps.values()); } } \ No newline at end of file diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java index cba592c63d..83772a1054 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java @@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; import java.util.ArrayList; @@ -135,7 +135,7 @@ public class StateEventResponseService { return; } - WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); + WorkflowExecuteRunnable workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); switch (stateEvent.getType()) { case TASK_STATE_CHANGE: workflowExecuteThread.refreshTaskInstance(stateEvent.getTaskInstanceId()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java similarity index 83% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java index b2778ae542..e455e27ea8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java @@ -26,13 +26,12 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand; import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; -import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor; -import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; import org.slf4j.Logger; @@ -43,9 +42,9 @@ import io.netty.channel.Channel; /** * task execute thread */ -public class TaskExecuteThread { +public class TaskExecuteRunnable implements Runnable { - private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class); + private static final Logger logger = LoggerFactory.getLogger(TaskExecuteRunnable.class); private final int processInstanceId; @@ -59,8 +58,8 @@ public class TaskExecuteThread { private DataQualityResultOperator dataQualityResultOperator; - public TaskExecuteThread(int processInstanceId, ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool, - ProcessInstanceExecCacheManager processInstanceExecCacheManager, DataQualityResultOperator dataQualityResultOperator) { + public TaskExecuteRunnable(int processInstanceId, ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool, + ProcessInstanceExecCacheManager processInstanceExecCacheManager, DataQualityResultOperator dataQualityResultOperator) { this.processInstanceId = processInstanceId; this.processService = processService; this.workflowExecuteThreadPool = workflowExecuteThreadPool; @@ -68,6 +67,7 @@ public class TaskExecuteThread { this.dataQualityResultOperator = dataQualityResultOperator; } + @Override public void run() { while (!this.events.isEmpty()) { TaskEvent event = this.events.peek(); @@ -116,12 +116,12 @@ public class TaskExecuteThread { int taskInstanceId = taskEvent.getTaskInstanceId(); int processInstanceId = taskEvent.getProcessInstanceId(); - TaskInstance taskInstance; - WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); - if (workflowExecuteThread != null && workflowExecuteThread.checkTaskInstanceById(taskInstanceId)) { - taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId); + Optional taskInstance; + WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + if (workflowExecuteRunnable != null && workflowExecuteRunnable.checkTaskInstanceById(taskInstanceId)) { + taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId); } else { - taskInstance = processService.findTaskInstanceById(taskInstanceId); + taskInstance = Optional.ofNullable(processService.findTaskInstanceById(taskInstanceId)); } switch (event) { @@ -137,7 +137,7 @@ public class TaskExecuteThread { handleResultEvent(taskEvent, taskInstance); break; case WORKER_REJECT: - handleWorkerRejectEvent(taskEvent.getChannel(), taskInstance, workflowExecuteThread); + handleWorkerRejectEvent(taskEvent.getChannel(), taskInstance, workflowExecuteRunnable); break; default: throw new IllegalArgumentException("invalid event type : " + event); @@ -154,11 +154,12 @@ public class TaskExecuteThread { /** * handle dispatch event */ - private void handleDispatchEvent(TaskEvent taskEvent, TaskInstance taskInstance) { - if (taskInstance == null) { + private void handleDispatchEvent(TaskEvent taskEvent, Optional taskInstanceOptional) { + if (!taskInstanceOptional.isPresent()) { logger.error("taskInstance is null"); return; } + TaskInstance taskInstance = taskInstanceOptional.get(); if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) { return; } @@ -170,10 +171,11 @@ public class TaskExecuteThread { /** * handle running event */ - private void handleRunningEvent(TaskEvent taskEvent, TaskInstance taskInstance) { + private void handleRunningEvent(TaskEvent taskEvent, Optional taskInstanceOptional) { Channel channel = taskEvent.getChannel(); try { - if (taskInstance != null) { + if (taskInstanceOptional.isPresent()) { + TaskInstance taskInstance = taskInstanceOptional.get(); if (taskInstance.getState().typeIsFinished()) { logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState()); } else { @@ -200,10 +202,11 @@ public class TaskExecuteThread { /** * handle result event */ - private void handleResultEvent(TaskEvent taskEvent, TaskInstance taskInstance) { + private void handleResultEvent(TaskEvent taskEvent, Optional taskInstanceOptional) { Channel channel = taskEvent.getChannel(); try { - if (taskInstance != null) { + if (taskInstanceOptional.isPresent()) { + TaskInstance taskInstance = taskInstanceOptional.get(); dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance); taskInstance.setStartTime(taskEvent.getStartTime()); @@ -231,7 +234,8 @@ public class TaskExecuteThread { /** * handle result event */ - private void handleWorkerRejectEvent(Channel channel, TaskInstance taskInstance, WorkflowExecuteThread executeThread) { + private void handleWorkerRejectEvent(Channel channel, Optional taskInstanceOptional, WorkflowExecuteRunnable executeThread) { + TaskInstance taskInstance = taskInstanceOptional.orElseThrow(() -> new RuntimeException("taskInstance is null")); try { if (executeThread != null) { executeThread.resubmit(taskInstance.getTaskCode()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java index 75c0272519..6bf3f862ae 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java @@ -40,7 +40,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor { private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThreadPool.class); - private final ConcurrentHashMap multiThreadFilterMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap multiThreadFilterMap = new ConcurrentHashMap<>(); @Autowired private MasterConfig masterConfig; @@ -67,7 +67,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor { /** * task event thread map */ - private final ConcurrentHashMap taskExecuteThreadMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap taskExecuteThreadMap = new ConcurrentHashMap<>(); @PostConstruct private void init() { @@ -83,26 +83,26 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor { return; } if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) { - TaskExecuteThread taskExecuteThread = new TaskExecuteThread( + TaskExecuteRunnable taskExecuteThread = new TaskExecuteRunnable( taskEvent.getProcessInstanceId(), processService, workflowExecuteThreadPool, processInstanceExecCacheManager, dataQualityResultOperator); taskExecuteThreadMap.put(taskEvent.getProcessInstanceId(), taskExecuteThread); } - TaskExecuteThread taskExecuteThread = taskExecuteThreadMap.get(taskEvent.getProcessInstanceId()); - if (taskExecuteThread != null) { - taskExecuteThread.addEvent(taskEvent); + TaskExecuteRunnable taskExecuteRunnable= taskExecuteThreadMap.get(taskEvent.getProcessInstanceId()); + if (taskExecuteRunnable != null) { + taskExecuteRunnable.addEvent(taskEvent); } } public void eventHandler() { - for (TaskExecuteThread taskExecuteThread: taskExecuteThreadMap.values()) { + for (TaskExecuteRunnable taskExecuteThread : taskExecuteThreadMap.values()) { executeEvent(taskExecuteThread); } } - public void executeEvent(TaskExecuteThread taskExecuteThread) { + public void executeEvent(TaskExecuteRunnable taskExecuteThread) { if (taskExecuteThread.eventSize() == 0) { return; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java new file mode 100644 index 0000000000..bc1b217f9e --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.registry; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.dolphinscheduler.registry.api.ConnectionListener; +import org.apache.dolphinscheduler.registry.api.ConnectionState; +import org.apache.dolphinscheduler.service.registry.RegistryClient; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MasterConnectionStateListener implements ConnectionListener { + + private static final Logger logger = LoggerFactory.getLogger(MasterConnectionStateListener.class); + + private final String masterNodePath; + private final RegistryClient registryClient; + + public MasterConnectionStateListener(String masterNodePath, RegistryClient registryClient) { + this.masterNodePath = checkNotNull(masterNodePath); + this.registryClient = checkNotNull(registryClient); + } + + @Override + public void onUpdate(ConnectionState state) { + switch (state) { + case CONNECTED: + logger.debug("registry connection state is {}", state); + break; + case SUSPENDED: + logger.warn("registry connection state is {}, ready to retry connection", state); + break; + case RECONNECTED: + logger.debug("registry connection state is {}, clean the node info", state); + registryClient.remove(masterNodePath); + registryClient.persistEphemeral(masterNodePath, ""); + break; + case DISCONNECTED: + logger.warn("registry connection state is {}, ready to stop myself", state); + registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself"); + break; + default: + } + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index 371075dfd4..c4bac5ab70 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -20,32 +20,36 @@ package org.apache.dolphinscheduler.server.master.registry; import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; -import java.util.Collections; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import com.google.common.collect.Sets; -import org.apache.commons.lang3.StringUtils; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; -import org.apache.dolphinscheduler.registry.api.ConnectionState; +import org.apache.dolphinscheduler.registry.api.RegistryException; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.service.FailoverService; import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.service.registry.RegistryClient; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Collections; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import com.google.common.collect.Sets; + /** - * zookeeper master client - *

- * single instance + *

DolphinScheduler master register client, used to connect to registry and hand the registry events. + *

When the Master node startup, it will register in registry center. And schedule a {@link HeartBeatTask} to update its metadata in registry. */ @Component public class MasterRegistryClient { @@ -79,8 +83,10 @@ public class MasterRegistryClient { * master startup time, ms */ private long startupTime; + private String masterAddress; public void init() { + this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort()); this.startupTime = System.currentTimeMillis(); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); } @@ -89,11 +95,11 @@ public class MasterRegistryClient { try { // master registry registry(); - + registryClient.addConnectionStateListener(new MasterConnectionStateListener(getCurrentNodePath(), registryClient)); registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener()); } catch (Exception e) { logger.error("master start up exception", e); - throw new RuntimeException("master start up error", e); + throw new RegistryException("Master registry client start up error", e); } } @@ -176,16 +182,16 @@ public class MasterRegistryClient { } /** - * registry + * Registry the current master server itself to registry. */ - public void registry() { - String address = NetUtils.getAddr(masterConfig.getListenPort()); - String localNodePath = getMasterPath(); + void registry() { + logger.info("master node : {} registering to registry center...", masterAddress); + String localNodePath = getCurrentNodePath(); int masterHeartbeatInterval = masterConfig.getHeartbeatInterval(); HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory(), - Sets.newHashSet(getMasterPath()), + Sets.newHashSet(localNodePath), Constants.MASTER_TYPE, registryClient); @@ -194,6 +200,7 @@ public class MasterRegistryClient { registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo()); while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) { + logger.warn("The current master server node:{} cannot find in registry....", NetUtils.getHost()); ThreadUtils.sleep(SLEEP_TIME_MILLIS); } @@ -203,37 +210,15 @@ public class MasterRegistryClient { // delete dead server registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP); - registryClient.addConnectionStateListener(this::handleConnectionState); - this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); - logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval); + this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval, TimeUnit.SECONDS); + logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", masterAddress, masterHeartbeatInterval); } - public void handleConnectionState(ConnectionState state) { - String localNodePath = getMasterPath(); - switch (state) { - case CONNECTED: - logger.debug("registry connection state is {}", state); - break; - case SUSPENDED: - logger.warn("registry connection state is {}, ready to retry connection", state); - break; - case RECONNECTED: - logger.debug("registry connection state is {}, clean the node info", state); - registryClient.persistEphemeral(localNodePath, ""); - break; - case DISCONNECTED: - logger.warn("registry connection state is {}, ready to stop myself", state); - registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself"); - break; - default: - } - } - public void deregister() { try { String address = getLocalAddress(); - String localNodePath = getMasterPath(); + String localNodePath = getCurrentNodePath(); registryClient.remove(localNodePath); logger.info("master node : {} unRegistry to register center.", address); heartBeatExecutor.shutdown(); @@ -247,7 +232,7 @@ public class MasterRegistryClient { /** * get master path */ - private String getMasterPath() { + private String getCurrentNodePath() { String address = getLocalAddress(); return REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + address; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java new file mode 100644 index 0000000000..e0b65dd077 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.rpc; + +import org.apache.dolphinscheduler.remote.NettyRemotingServer; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.config.NettyServerConfig; +import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.processor.CacheProcessor; +import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor; +import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor; +import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor; +import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor; +import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; +import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor; + +import javax.annotation.PostConstruct; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Master RPC Server, used to send/receive request to other system. + */ +@Service +public class MasterRPCServer implements AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(MasterRPCServer.class); + + private NettyRemotingServer nettyRemotingServer; + + @Autowired + private MasterConfig masterConfig; + + @Autowired + private TaskExecuteRunningProcessor taskExecuteRunningProcessor; + + @Autowired + private TaskExecuteResponseProcessor taskExecuteResponseProcessor; + + @Autowired + private TaskEventProcessor taskEventProcessor; + + @Autowired + private StateEventProcessor stateEventProcessor; + + @Autowired + private CacheProcessor cacheProcessor; + + @Autowired + private TaskKillResponseProcessor taskKillResponseProcessor; + + @Autowired + private TaskRecallProcessor taskRecallProcessor; + + @Autowired + private LoggerRequestProcessor loggerRequestProcessor; + + @PostConstruct + private void init() { + // init remoting server + NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(masterConfig.getListenPort()); + this.nettyRemotingServer = new NettyRemotingServer(serverConfig); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor); + + // logger server + this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor); + + this.nettyRemotingServer.start(); + } + + public void start() { + logger.info("Starting Master RPC Server..."); + this.nettyRemotingServer.start(); + logger.info("Started Master RPC Server..."); + } + + @Override + public void close() { + logger.info("Closing Master RPC Server..."); + this.nettyRemotingServer.close(); + logger.info("Closed Master RPC Server..."); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java index 6f6f718c2f..fc9c4fd854 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java @@ -62,7 +62,7 @@ public class EventExecuteService extends Thread { } private void eventHandler() { - for (WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) { + for (WorkflowExecuteRunnable workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) { workflowExecuteThreadPool.executeEvent(workflowExecuteThread); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java index aa894f8b3b..5fd812f162 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java @@ -56,6 +56,8 @@ public class FailoverExecuteThread extends Thread { logger.info("failover execute thread started"); while (Stopper.isRunning()) { try { + // todo: DO we need to schedule a task to do this kind of check + // This kind of check may only need to be executed when a master server start failoverService.checkMasterFailover(); } catch (Exception e) { logger.error("failover execute error", e); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 5c97c0b41f..e688863d06 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -165,7 +165,7 @@ public class MasterSchedulerService extends Thread { continue; } - WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread( + WorkflowExecuteRunnable workflowExecuteThread = new WorkflowExecuteRunnable( processInstance , processService , nettyExecutorManager diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index ba614e573b..7635209ebb 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey; import org.apache.hadoop.util.ThreadUtil; +import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; import org.slf4j.Logger; @@ -112,7 +113,7 @@ public class StateWheelExecuteThread extends Thread { if (processInstanceId == null) { continue; } - WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); if (workflowExecuteThread == null) { logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId); processInstanceTimeoutCheckList.remove(processInstanceId); @@ -219,20 +220,21 @@ public class StateWheelExecuteThread extends Thread { int processInstanceId = taskInstanceKey.getProcessInstanceId(); long taskCode = taskInstanceKey.getTaskCode(); - WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); if (workflowExecuteThread == null) { logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", processInstanceId, taskCode); taskInstanceTimeoutCheckList.remove(taskInstanceKey); continue; } - TaskInstance taskInstance = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode); - if (taskInstance == null) { + Optional taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode); + if (!taskInstanceOptional.isPresent()) { logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", processInstanceId, taskCode); taskInstanceTimeoutCheckList.remove(taskInstanceKey); continue; } + TaskInstance taskInstance = taskInstanceOptional.get(); if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) { long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); if (timeRemain < 0) { @@ -252,7 +254,7 @@ public class StateWheelExecuteThread extends Thread { int processInstanceId = taskInstanceKey.getProcessInstanceId(); long taskCode = taskInstanceKey.getTaskCode(); - WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); if (workflowExecuteThread == null) { logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", @@ -261,7 +263,7 @@ public class StateWheelExecuteThread extends Thread { continue; } - TaskInstance taskInstance = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode); + Optional taskInstanceOptional = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode); ProcessInstance processInstance = workflowExecuteThread.getProcessInstance(); if (processInstance.getState() == ExecutionStatus.READY_STOP) { @@ -270,13 +272,14 @@ public class StateWheelExecuteThread extends Thread { break; } - if (taskInstance == null) { + if (!taskInstanceOptional.isPresent()) { logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", processInstanceId, taskCode); taskInstanceRetryCheckList.remove(taskInstanceKey); continue; } + TaskInstance taskInstance = taskInstanceOptional.get(); if (taskInstance.retryTaskIntervalOverTime()) { // reset taskInstance endTime and state // todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance @@ -297,20 +300,21 @@ public class StateWheelExecuteThread extends Thread { int processInstanceId = taskInstanceKey.getProcessInstanceId(); long taskCode = taskInstanceKey.getTaskCode(); - WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); if (workflowExecuteThread == null) { logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", processInstanceId, taskCode); taskInstanceStateCheckList.remove(taskInstanceKey); continue; } - TaskInstance taskInstance = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode); - if (taskInstance == null) { + Optional taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode); + if (!taskInstanceOptional.isPresent()) { logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", processInstanceId, taskCode); taskInstanceStateCheckList.remove(taskInstanceKey); continue; } + TaskInstance taskInstance = taskInstanceOptional.get(); if (taskInstance.getState().typeIsFinished()) { continue; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java similarity index 96% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 6b70b25149..ca6a7aa4fa 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -87,6 +87,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -99,34 +100,34 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; /** - * master exec thread,split dag + * Workflow execute task, used to execute a workflow instance. */ -public class WorkflowExecuteThread { +public class WorkflowExecuteRunnable implements Runnable { /** * logger of WorkflowExecuteThread */ - private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteThread.class); + private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteRunnable.class); /** * master config */ - private MasterConfig masterConfig; + private final MasterConfig masterConfig; /** * process service */ - private ProcessService processService; + private final ProcessService processService; /** * alert manager */ - private ProcessAlertManager processAlertManager; + private final ProcessAlertManager processAlertManager; /** * netty executor manager */ - private NettyExecutorManager nettyExecutorManager; + private final NettyExecutorManager nettyExecutorManager; /** * process instance @@ -161,7 +162,7 @@ public class WorkflowExecuteThread { /** * task instance hash map, taskId as key */ - private Map taskInstanceMap = new ConcurrentHashMap<>(); + private final Map taskInstanceMap = new ConcurrentHashMap<>(); /** * running taskProcessor, taskCode as key, taskProcessor as value @@ -173,34 +174,34 @@ public class WorkflowExecuteThread { * valid task map, taskCode as key, taskId as value * in a DAG, only one taskInstance per taskCode is valid */ - private Map validTaskMap = new ConcurrentHashMap<>(); + private final Map validTaskMap = new ConcurrentHashMap<>(); /** * error task map, taskCode as key, taskInstanceId as value * in a DAG, only one taskInstance per taskCode is valid */ - private Map errorTaskMap = new ConcurrentHashMap<>(); + private final Map errorTaskMap = new ConcurrentHashMap<>(); /** * complete task map, taskCode as key, taskInstanceId as value * in a DAG, only one taskInstance per taskCode is valid */ - private Map completeTaskMap = new ConcurrentHashMap<>(); + private final Map completeTaskMap = new ConcurrentHashMap<>(); /** * depend failed task map, taskCode as key, taskId as value */ - private Map dependFailedTaskMap = new ConcurrentHashMap<>(); + private final Map dependFailedTaskMap = new ConcurrentHashMap<>(); /** * forbidden task map, code as key */ - private Map forbiddenTaskMap = new ConcurrentHashMap<>(); + private final Map forbiddenTaskMap = new ConcurrentHashMap<>(); /** * skip task map, code as key */ - private Map skipTaskNodeMap = new ConcurrentHashMap<>(); + private final Map skipTaskNodeMap = new ConcurrentHashMap<>(); /** * complement date list @@ -210,27 +211,25 @@ public class WorkflowExecuteThread { /** * state event queue */ - private ConcurrentLinkedQueue stateEvents = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue stateEvents = new ConcurrentLinkedQueue<>(); /** * ready to submit task queue */ - private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue(); + private final PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue(); /** * wait to retry taskInstance map, taskCode as key, taskInstance as value * before retry, the taskInstance id is 0 */ - private Map waitToRetryTaskInstanceMap = new ConcurrentHashMap<>(); + private final Map waitToRetryTaskInstanceMap = new ConcurrentHashMap<>(); /** * state wheel execute thread */ - private StateWheelExecuteThread stateWheelExecuteThread; + private final StateWheelExecuteThread stateWheelExecuteThread; /** - * constructor of WorkflowExecuteThread - * * @param processInstance processInstance * @param processService processService * @param nettyExecutorManager nettyExecutorManager @@ -238,12 +237,12 @@ public class WorkflowExecuteThread { * @param masterConfig masterConfig * @param stateWheelExecuteThread stateWheelExecuteThread */ - public WorkflowExecuteThread(ProcessInstance processInstance - , ProcessService processService - , NettyExecutorManager nettyExecutorManager - , ProcessAlertManager processAlertManager - , MasterConfig masterConfig - , StateWheelExecuteThread stateWheelExecuteThread) { + public WorkflowExecuteRunnable(ProcessInstance processInstance + , ProcessService processService + , NettyExecutorManager nettyExecutorManager + , ProcessAlertManager processAlertManager + , MasterConfig masterConfig + , StateWheelExecuteThread stateWheelExecuteThread) { this.processService = processService; this.processInstance = processInstance; this.masterConfig = masterConfig; @@ -401,7 +400,10 @@ public class WorkflowExecuteThread { return true; } - TaskInstance task = getTaskInstance(stateEvent.getTaskInstanceId()); + Optional taskInstanceOptional = getTaskInstance(stateEvent.getTaskInstanceId()); + TaskInstance task = taskInstanceOptional.orElseThrow( + () -> new RuntimeException("Cannot find task instance by task instance id: " + stateEvent.getTaskInstanceId())); + if (task.getState() == null) { logger.error("task state is null, state handler error: {}", stateEvent); return true; @@ -632,37 +634,37 @@ public class WorkflowExecuteThread { /** * get task instance from memory */ - public TaskInstance getTaskInstance(int taskInstanceId) { + public Optional getTaskInstance(int taskInstanceId) { if (taskInstanceMap.containsKey(taskInstanceId)) { - return taskInstanceMap.get(taskInstanceId); + return Optional.ofNullable(taskInstanceMap.get(taskInstanceId)); } - return null; + return Optional.empty(); } - public TaskInstance getTaskInstance(long taskCode) { - if (taskInstanceMap == null || taskInstanceMap.size() == 0) { - return null; + public Optional getTaskInstance(long taskCode) { + if (taskInstanceMap.isEmpty()) { + return Optional.empty(); } for (TaskInstance taskInstance : taskInstanceMap.values()) { if (taskInstance.getTaskCode() == taskCode) { - return taskInstance; + return Optional.of(taskInstance); } } - return null; + return Optional.empty(); } - public TaskInstance getActiveTaskInstanceByTaskCode(long taskCode) { + public Optional getActiveTaskInstanceByTaskCode(long taskCode) { if (activeTaskProcessorMaps.containsKey(taskCode)) { - return activeTaskProcessorMaps.get(taskCode).taskInstance(); + return Optional.ofNullable(activeTaskProcessorMaps.get(taskCode).taskInstance()); } - return null; + return Optional.empty(); } - public TaskInstance getRetryTaskInstanceByTaskCode(long taskCode) { + public Optional getRetryTaskInstanceByTaskCode(long taskCode) { if (waitToRetryTaskInstanceMap.containsKey(taskCode)) { - return waitToRetryTaskInstanceMap.get(taskCode); + return Optional.ofNullable(waitToRetryTaskInstanceMap.get(taskCode)); } - return null; + return Optional.empty(); } private boolean processStateChangeHandler(StateEvent stateEvent) { @@ -697,7 +699,9 @@ public class WorkflowExecuteThread { private boolean processBlockHandler(StateEvent stateEvent) { try { - TaskInstance task = getTaskInstance(stateEvent.getTaskInstanceId()); + Optional taskInstanceOptional = getTaskInstance(stateEvent.getTaskInstanceId()); + TaskInstance task = taskInstanceOptional.orElseThrow( + () -> new RuntimeException("Cannot find taskInstance by taskInstanceId:" + stateEvent.getTaskInstanceId())); if (!checkTaskInstanceByStateEvent(stateEvent)) { logger.error("task {} is not a blocking task", task.getTaskCode()); return false; @@ -789,9 +793,10 @@ public class WorkflowExecuteThread { } /** - * process start handle + * ProcessInstance start entrypoint. */ - public void startProcess() { + @Override + public void run() { if (this.taskInstanceMap.size() > 0) { return; } @@ -1304,9 +1309,9 @@ public class WorkflowExecuteThread { List taskInstances = new ArrayList<>(); for (String taskNode : submitTaskNodeList) { TaskNode taskNodeObject = dag.getNode(taskNode); - TaskInstance existTaskInstance = getTaskInstance(taskNodeObject.getCode()); - if (existTaskInstance != null) { - taskInstances.add(existTaskInstance); + Optional existTaskInstanceOptional = getTaskInstance(taskNodeObject.getCode()); + if (existTaskInstanceOptional.isPresent()) { + taskInstances.add(existTaskInstanceOptional.get()); continue; } TaskInstance task = createTaskInstance(processInstance, taskNodeObject); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index 5920232855..642e33b6d6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -66,7 +66,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { /** * multi-thread filter, avoid handling workflow at the same time */ - private ConcurrentHashMap multiThreadFilterMap = new ConcurrentHashMap(); + private ConcurrentHashMap multiThreadFilterMap = new ConcurrentHashMap(); @PostConstruct private void init() { @@ -80,7 +80,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { * submit state event */ public void submitStateEvent(StateEvent stateEvent) { - WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); + WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); if (workflowExecuteThread == null) { logger.warn("workflowExecuteThread is null, stateEvent:{}", stateEvent); return; @@ -91,14 +91,14 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { /** * start workflow */ - public void startWorkflow(WorkflowExecuteThread workflowExecuteThread) { - submit(workflowExecuteThread::startProcess); + public void startWorkflow(WorkflowExecuteRunnable workflowExecuteThread) { + submit(workflowExecuteThread); } /** * execute workflow */ - public void executeEvent(WorkflowExecuteThread workflowExecuteThread) { + public void executeEvent(WorkflowExecuteRunnable workflowExecuteThread) { if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) { return; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java index f3409ab49f..dad2dbfe1e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java @@ -17,6 +17,9 @@ package org.apache.dolphinscheduler.server.master.service; +import io.micrometer.core.annotation.Counted; +import io.micrometer.core.annotation.Timed; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.StateEvent; @@ -37,12 +40,14 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.StopWatch; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,12 +75,14 @@ public class FailoverService { /** * check master failover */ + @Counted(value = "failover_scheduler_check_task_count") + @Timed(value = "failover_scheduler_check_task_time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true) public void checkMasterFailover() { List hosts = getNeedFailoverMasterServers(); if (CollectionUtils.isEmpty(hosts)) { return; } - LOGGER.info("need failover hosts:{}", hosts); + LOGGER.info("{} begin to failover hosts:{}", getLocalAddress(), hosts); for (String host : hosts) { failoverMasterWithLock(host); @@ -114,9 +121,9 @@ public class FailoverService { } /** - * failover master - *

- * failover process instance and associated task instance + * Failover master, will failover process instance and associated task instance. + *

When the process instance belongs to the given masterHost and the restartTime is before the current server start up time, + * then the process instance will be failovered. * * @param masterHost master host */ @@ -125,11 +132,11 @@ public class FailoverService { return; } Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost); - long startTime = System.currentTimeMillis(); + StopWatch failoverTimeCost = StopWatch.createStarted(); List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); - LOGGER.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size()); + LOGGER.info("start master[{}] failover, need to failover process list size:{}", masterHost, needFailoverProcessInstanceList.size()); - // servers need to contains master hosts and worker hosts, otherwise the logic task will failover fail. + // servers need to contain master hosts and worker hosts, otherwise the logic task will failover fail. List servers = registryClient.getServerList(NodeType.WORKER); servers.addAll(registryClient.getServerList(NodeType.MASTER)); @@ -145,7 +152,7 @@ public class FailoverService { } if (serverStartupTime != null && processInstance.getRestartTime() != null - && processInstance.getRestartTime().after(serverStartupTime)) { + && processInstance.getRestartTime().after(serverStartupTime)) { continue; } @@ -155,7 +162,8 @@ public class FailoverService { processService.processNeedFailoverProcessInstances(processInstance); } - LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime); + failoverTimeCost.stop(); + LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, failoverTimeCost.getTime(TimeUnit.MILLISECONDS)); } /** @@ -248,7 +256,10 @@ public class FailoverService { } /** - * get need failover master servers + * Get need failover master servers. + *

+ * Query the process instances from database, if the processInstance's host doesn't exist in registry + * or the host is the currentServer, then it will need to failover. * * @return need failover master servers */ diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java similarity index 95% rename from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java rename to dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java index 6e1c84382d..339c5e80ae 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java @@ -21,7 +21,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; - import static org.powermock.api.mockito.PowerMockito.mock; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -37,7 +36,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -70,10 +69,10 @@ import org.springframework.context.ApplicationContext; * test for WorkflowExecuteThread */ @RunWith(PowerMockRunner.class) -@PrepareForTest({WorkflowExecuteThread.class}) -public class WorkflowExecuteThreadTest { +@PrepareForTest({WorkflowExecuteRunnable.class}) +public class WorkflowExecuteTaskTest { - private WorkflowExecuteThread workflowExecuteThread; + private WorkflowExecuteRunnable workflowExecuteThread; private ProcessInstance processInstance; @@ -118,9 +117,9 @@ public class WorkflowExecuteThreadTest { Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); stateWheelExecuteThread = mock(StateWheelExecuteThread.class); - workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, stateWheelExecuteThread)); + workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, null, null, config, stateWheelExecuteThread)); // prepareProcess init dag - Field dag = WorkflowExecuteThread.class.getDeclaredField("dag"); + Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag"); dag.setAccessible(true); dag.set(workflowExecuteThread, new DAG()); PowerMockito.doNothing().when(workflowExecuteThread, "endProcess"); @@ -132,7 +131,7 @@ public class WorkflowExecuteThreadTest { Map cmdParam = new HashMap<>(); cmdParam.put(CMD_PARAM_START_NODES, "1,2,3"); Mockito.when(processInstance.getCommandParam()).thenReturn(JSONUtils.toJsonString(cmdParam)); - Class masterExecThreadClass = WorkflowExecuteThread.class; + Class masterExecThreadClass = WorkflowExecuteRunnable.class; Method method = masterExecThreadClass.getDeclaredMethod("parseStartNodeName", String.class); method.setAccessible(true); List nodeNames = (List) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam)); @@ -158,7 +157,7 @@ public class WorkflowExecuteThreadTest { Mockito.when(processService.findTaskInstanceByIdList( Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId())) ).thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4)); - Class masterExecThreadClass = WorkflowExecuteThread.class; + Class masterExecThreadClass = WorkflowExecuteRunnable.class; Method method = masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList", String.class); method.setAccessible(true); List taskInstances = (List) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam)); @@ -202,7 +201,7 @@ public class WorkflowExecuteThreadTest { completeTaskList.put(taskInstance1.getTaskCode(), taskInstance1.getId()); completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId()); - Class masterExecThreadClass = WorkflowExecuteThread.class; + Class masterExecThreadClass = WorkflowExecuteRunnable.class; Field completeTaskMapField = masterExecThreadClass.getDeclaredField("completeTaskMap"); completeTaskMapField.setAccessible(true); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java index 6fa9116e63..d7b53f3c34 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.server.master.cache.impl; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import java.util.Collection; @@ -37,7 +37,7 @@ public class ProcessInstanceExecCacheManagerImplTest { private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager; @Mock - private WorkflowExecuteThread workflowExecuteThread; + private WorkflowExecuteRunnable workflowExecuteThread; @Before public void before() { @@ -47,7 +47,7 @@ public class ProcessInstanceExecCacheManagerImplTest { @Test public void testGetByProcessInstanceId() { - WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1); + WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1); Assert.assertEquals("workflowExecuteThread1", workflowExecuteThread.getKey()); } @@ -59,20 +59,20 @@ public class ProcessInstanceExecCacheManagerImplTest { @Test public void testCacheNull() { processInstanceExecCacheManager.cache(2, null); - WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(2); + WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(2); Assert.assertNull(workflowExecuteThread); } @Test public void testRemoveByProcessInstanceId() { processInstanceExecCacheManager.removeByProcessInstanceId(1); - WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1); + WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1); Assert.assertNull(workflowExecuteThread); } @Test public void testGetAll() { - Collection workflowExecuteThreads = processInstanceExecCacheManager.getAll(); + Collection workflowExecuteThreads = processInstanceExecCacheManager.getAll(); Assert.assertEquals(1, workflowExecuteThreads.size()); } } \ No newline at end of file diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java index d3d5b03de9..2a4b4c33ff 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java @@ -111,13 +111,6 @@ public class MasterRegistryClientTest { masterRegistryClient.registry(); } - @Test - public void handleConnectionStateTest() { - masterRegistryClient.handleConnectionState(ConnectionState.CONNECTED); - masterRegistryClient.handleConnectionState(ConnectionState.RECONNECTED); - masterRegistryClient.handleConnectionState(ConnectionState.SUSPENDED); - } - @Test public void removeNodePathTest() { masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, false); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java index 17b4df14a9..129aaff0de 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java @@ -89,7 +89,7 @@ public class HeartBeatTask implements Runnable { registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat()); } } catch (Throwable ex) { - logger.error("error write heartbeat info", ex); + logger.error("HeartBeat task execute failed", ex); } } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java index 2f98b09b34..c606007e88 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.service.registry; +import static com.google.common.base.Preconditions.checkArgument; import static org.apache.dolphinscheduler.common.Constants.ADD_OP; import static org.apache.dolphinscheduler.common.Constants.COLON; import static org.apache.dolphinscheduler.common.Constants.DELETE_OP; @@ -29,8 +30,6 @@ import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE; -import static com.google.common.base.Preconditions.checkArgument; - import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.NodeType;