Browse Source

Optimize MasterServer, add MasterRPCService (#10371)

* Optimize MasterServer, avoid NPE
3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
3ecbee3885
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 72
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  3. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
  4. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
  5. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
  6. 42
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
  7. 16
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
  8. 62
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java
  9. 71
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
  10. 114
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java
  11. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
  12. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
  13. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  14. 24
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  15. 89
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  16. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
  17. 29
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
  18. 19
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
  19. 12
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java
  20. 7
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
  21. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
  22. 3
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -775,7 +775,7 @@ public final class Constants {
*/
public static final String PSTREE = "pstree";
public static final Boolean KUBERNETES_MODE = !StringUtils.isEmpty(System.getenv("KUBERNETES_SERVICE_HOST")) && !StringUtils.isEmpty(System.getenv("KUBERNETES_SERVICE_PORT"));
public static final boolean KUBERNETES_MODE = !StringUtils.isEmpty(System.getenv("KUBERNETES_SERVICE_HOST")) && !StringUtils.isEmpty(System.getenv("KUBERNETES_SERVICE_PORT"));
/**
* dry run flag

72
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -20,20 +20,9 @@ package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.CacheProcessor;
import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
@ -42,7 +31,6 @@ import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import javax.annotation.PostConstruct;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -60,14 +48,9 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
public class MasterServer implements IStoppable {
private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);
@Autowired
private MasterConfig masterConfig;
@Autowired
private SpringApplicationContext springApplicationContext;
private NettyRemotingServer nettyRemotingServer;
@Autowired
private MasterRegistryClient masterRegistryClient;
@ -80,27 +63,6 @@ public class MasterServer implements IStoppable {
@Autowired
private SchedulerApi schedulerApi;
@Autowired
private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
@Autowired
private TaskExecuteResponseProcessor taskExecuteResponseProcessor;
@Autowired
private TaskEventProcessor taskEventProcessor;
@Autowired
private StateEventProcessor stateEventProcessor;
@Autowired
private CacheProcessor cacheProcessor;
@Autowired
private TaskKillResponseProcessor taskKillResponseProcessor;
@Autowired
private TaskRecallProcessor taskRecallProcessor;
@Autowired
private EventExecuteService eventExecuteService;
@ -108,7 +70,7 @@ public class MasterServer implements IStoppable {
private FailoverExecuteThread failoverExecuteThread;
@Autowired
private LoggerRequestProcessor loggerRequestProcessor;
private MasterRPCServer masterRPCServer;
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
@ -120,26 +82,8 @@ public class MasterServer implements IStoppable {
*/
@PostConstruct
public void run() throws SchedulerException {
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.start();
// init rpc server
this.masterRPCServer.start();
// install task plugin
this.taskPluginManager.installPlugin();
@ -174,6 +118,7 @@ public class MasterServer implements IStoppable {
try {
// execute only once
if (Stopper.isStopped()) {
logger.warn("MasterServer has been stopped ..., current cause: {}", cause);
return;
}
@ -191,10 +136,13 @@ public class MasterServer implements IStoppable {
// close
this.schedulerApi.close();
this.masterSchedulerService.close();
this.nettyRemotingServer.close();
this.masterRPCServer.close();
this.masterRegistryClient.closeRegistry();
// close spring Context and will invoke method with @PreDestroy annotation to destory beans. like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
// close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
// like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
springApplicationContext.close();
logger.info("MasterServer stopped...");
} catch (Exception e) {
logger.error("master server stop exception ", e);
}

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java vendored

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.cache;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import java.util.Collection;
@ -32,7 +32,7 @@ public interface ProcessInstanceExecCacheManager {
* @param processInstanceId processInstanceId
* @return WorkflowExecuteThread
*/
WorkflowExecuteThread getByProcessInstanceId(int processInstanceId);
WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId);
/**
* judge the process instance does it exist
@ -55,12 +55,12 @@ public interface ProcessInstanceExecCacheManager {
* @param processInstanceId processInstanceId
* @param workflowExecuteThread if it is null, will not be cached
*/
void cache(int processInstanceId, WorkflowExecuteThread workflowExecuteThread);
void cache(int processInstanceId, WorkflowExecuteRunnable workflowExecuteThread);
/**
* get all WorkflowExecuteThread from cache
*
* @return all WorkflowExecuteThread in cache
*/
Collection<WorkflowExecuteThread> getAll();
Collection<WorkflowExecuteRunnable> getAll();
}

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java vendored

@ -18,7 +18,7 @@
package org.apache.dolphinscheduler.server.master.cache.impl;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
@ -33,10 +33,10 @@ import com.google.common.collect.ImmutableList;
@Component
public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecCacheManager {
private final ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, WorkflowExecuteRunnable> processInstanceExecMaps = new ConcurrentHashMap<>();
@Override
public WorkflowExecuteThread getByProcessInstanceId(int processInstanceId) {
public WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId) {
return processInstanceExecMaps.get(processInstanceId);
}
@ -51,7 +51,7 @@ public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecC
}
@Override
public void cache(int processInstanceId, WorkflowExecuteThread workflowExecuteThread) {
public void cache(int processInstanceId, WorkflowExecuteRunnable workflowExecuteThread) {
if (workflowExecuteThread == null) {
return;
}
@ -59,7 +59,7 @@ public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecC
}
@Override
public Collection<WorkflowExecuteThread> getAll() {
public Collection<WorkflowExecuteRunnable> getAll() {
return ImmutableList.copyOf(processInstanceExecMaps.values());
}
}

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java

@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import java.util.ArrayList;
@ -135,7 +135,7 @@ public class StateEventResponseService {
return;
}
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
WorkflowExecuteRunnable workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
switch (stateEvent.getType()) {
case TASK_STATE_CHANGE:
workflowExecuteThread.refreshTaskInstance(stateEvent.getTaskInstanceId());

42
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThread.java → dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java

@ -26,13 +26,12 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
@ -43,9 +42,9 @@ import io.netty.channel.Channel;
/**
* task execute thread
*/
public class TaskExecuteThread {
public class TaskExecuteRunnable implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
private static final Logger logger = LoggerFactory.getLogger(TaskExecuteRunnable.class);
private final int processInstanceId;
@ -59,7 +58,7 @@ public class TaskExecuteThread {
private DataQualityResultOperator dataQualityResultOperator;
public TaskExecuteThread(int processInstanceId, ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool,
public TaskExecuteRunnable(int processInstanceId, ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool,
ProcessInstanceExecCacheManager processInstanceExecCacheManager, DataQualityResultOperator dataQualityResultOperator) {
this.processInstanceId = processInstanceId;
this.processService = processService;
@ -68,6 +67,7 @@ public class TaskExecuteThread {
this.dataQualityResultOperator = dataQualityResultOperator;
}
@Override
public void run() {
while (!this.events.isEmpty()) {
TaskEvent event = this.events.peek();
@ -116,12 +116,12 @@ public class TaskExecuteThread {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
TaskInstance taskInstance;
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread != null && workflowExecuteThread.checkTaskInstanceById(taskInstanceId)) {
taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
Optional<TaskInstance> taskInstance;
WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteRunnable != null && workflowExecuteRunnable.checkTaskInstanceById(taskInstanceId)) {
taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId);
} else {
taskInstance = processService.findTaskInstanceById(taskInstanceId);
taskInstance = Optional.ofNullable(processService.findTaskInstanceById(taskInstanceId));
}
switch (event) {
@ -137,7 +137,7 @@ public class TaskExecuteThread {
handleResultEvent(taskEvent, taskInstance);
break;
case WORKER_REJECT:
handleWorkerRejectEvent(taskEvent.getChannel(), taskInstance, workflowExecuteThread);
handleWorkerRejectEvent(taskEvent.getChannel(), taskInstance, workflowExecuteRunnable);
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
@ -154,11 +154,12 @@ public class TaskExecuteThread {
/**
* handle dispatch event
*/
private void handleDispatchEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
if (taskInstance == null) {
private void handleDispatchEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
if (!taskInstanceOptional.isPresent()) {
logger.error("taskInstance is null");
return;
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
return;
}
@ -170,10 +171,11 @@ public class TaskExecuteThread {
/**
* handle running event
*/
private void handleRunningEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
private void handleRunningEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
Channel channel = taskEvent.getChannel();
try {
if (taskInstance != null) {
if (taskInstanceOptional.isPresent()) {
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState().typeIsFinished()) {
logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
} else {
@ -200,10 +202,11 @@ public class TaskExecuteThread {
/**
* handle result event
*/
private void handleResultEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
private void handleResultEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
Channel channel = taskEvent.getChannel();
try {
if (taskInstance != null) {
if (taskInstanceOptional.isPresent()) {
TaskInstance taskInstance = taskInstanceOptional.get();
dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
taskInstance.setStartTime(taskEvent.getStartTime());
@ -231,7 +234,8 @@ public class TaskExecuteThread {
/**
* handle result event
*/
private void handleWorkerRejectEvent(Channel channel, TaskInstance taskInstance, WorkflowExecuteThread executeThread) {
private void handleWorkerRejectEvent(Channel channel, Optional<TaskInstance> taskInstanceOptional, WorkflowExecuteRunnable executeThread) {
TaskInstance taskInstance = taskInstanceOptional.orElseThrow(() -> new RuntimeException("taskInstance is null"));
try {
if (executeThread != null) {
executeThread.resubmit(taskInstance.getTaskCode());

16
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java

@ -40,7 +40,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThreadPool.class);
private final ConcurrentHashMap<String, TaskExecuteThread> multiThreadFilterMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, TaskExecuteRunnable> multiThreadFilterMap = new ConcurrentHashMap<>();
@Autowired
private MasterConfig masterConfig;
@ -67,7 +67,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
/**
* task event thread map
*/
private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, TaskExecuteRunnable> taskExecuteThreadMap = new ConcurrentHashMap<>();
@PostConstruct
private void init() {
@ -83,26 +83,26 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
return;
}
if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) {
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(
TaskExecuteRunnable taskExecuteThread = new TaskExecuteRunnable(
taskEvent.getProcessInstanceId(),
processService, workflowExecuteThreadPool,
processInstanceExecCacheManager,
dataQualityResultOperator);
taskExecuteThreadMap.put(taskEvent.getProcessInstanceId(), taskExecuteThread);
}
TaskExecuteThread taskExecuteThread = taskExecuteThreadMap.get(taskEvent.getProcessInstanceId());
if (taskExecuteThread != null) {
taskExecuteThread.addEvent(taskEvent);
TaskExecuteRunnable taskExecuteRunnable= taskExecuteThreadMap.get(taskEvent.getProcessInstanceId());
if (taskExecuteRunnable != null) {
taskExecuteRunnable.addEvent(taskEvent);
}
}
public void eventHandler() {
for (TaskExecuteThread taskExecuteThread: taskExecuteThreadMap.values()) {
for (TaskExecuteRunnable taskExecuteThread : taskExecuteThreadMap.values()) {
executeEvent(taskExecuteThread);
}
}
public void executeEvent(TaskExecuteThread taskExecuteThread) {
public void executeEvent(TaskExecuteRunnable taskExecuteThread) {
if (taskExecuteThread.eventSize() == 0) {
return;
}

62
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterConnectionStateListener.java

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.registry;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MasterConnectionStateListener implements ConnectionListener {
private static final Logger logger = LoggerFactory.getLogger(MasterConnectionStateListener.class);
private final String masterNodePath;
private final RegistryClient registryClient;
public MasterConnectionStateListener(String masterNodePath, RegistryClient registryClient) {
this.masterNodePath = checkNotNull(masterNodePath);
this.registryClient = checkNotNull(registryClient);
}
@Override
public void onUpdate(ConnectionState state) {
switch (state) {
case CONNECTED:
logger.debug("registry connection state is {}", state);
break;
case SUSPENDED:
logger.warn("registry connection state is {}, ready to retry connection", state);
break;
case RECONNECTED:
logger.debug("registry connection state is {}, clean the node info", state);
registryClient.remove(masterNodePath);
registryClient.persistEphemeral(masterNodePath, "");
break;
case DISCONNECTED:
logger.warn("registry connection state is {}, ready to stop myself", state);
registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself");
break;
default:
}
}
}

71
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -20,32 +20,36 @@ package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_NODE;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.service.FailoverService;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.lang3.StringUtils;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.collect.Sets;
/**
* zookeeper master client
* <p>
* single instance
* <p>DolphinScheduler master register client, used to connect to registry and hand the registry events.
* <p>When the Master node startup, it will register in registry center. And schedule a {@link HeartBeatTask} to update its metadata in registry.
*/
@Component
public class MasterRegistryClient {
@ -79,8 +83,10 @@ public class MasterRegistryClient {
* master startup time, ms
*/
private long startupTime;
private String masterAddress;
public void init() {
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
this.startupTime = System.currentTimeMillis();
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
}
@ -89,11 +95,11 @@ public class MasterRegistryClient {
try {
// master registry
registry();
registryClient.addConnectionStateListener(new MasterConnectionStateListener(getCurrentNodePath(), registryClient));
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
} catch (Exception e) {
logger.error("master start up exception", e);
throw new RuntimeException("master start up error", e);
throw new RegistryException("Master registry client start up error", e);
}
}
@ -176,16 +182,16 @@ public class MasterRegistryClient {
}
/**
* registry
* Registry the current master server itself to registry.
*/
public void registry() {
String address = NetUtils.getAddr(masterConfig.getListenPort());
String localNodePath = getMasterPath();
void registry() {
logger.info("master node : {} registering to registry center...", masterAddress);
String localNodePath = getCurrentNodePath();
int masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
masterConfig.getMaxCpuLoadAvg(),
masterConfig.getReservedMemory(),
Sets.newHashSet(getMasterPath()),
Sets.newHashSet(localNodePath),
Constants.MASTER_TYPE,
registryClient);
@ -194,6 +200,7 @@ public class MasterRegistryClient {
registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
logger.warn("The current master server node:{} cannot find in registry....", NetUtils.getHost());
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
@ -203,37 +210,15 @@ public class MasterRegistryClient {
// delete dead server
registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP);
registryClient.addConnectionStateListener(this::handleConnectionState);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", masterAddress, masterHeartbeatInterval);
}
public void handleConnectionState(ConnectionState state) {
String localNodePath = getMasterPath();
switch (state) {
case CONNECTED:
logger.debug("registry connection state is {}", state);
break;
case SUSPENDED:
logger.warn("registry connection state is {}, ready to retry connection", state);
break;
case RECONNECTED:
logger.debug("registry connection state is {}, clean the node info", state);
registryClient.persistEphemeral(localNodePath, "");
break;
case DISCONNECTED:
logger.warn("registry connection state is {}, ready to stop myself", state);
registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself");
break;
default:
}
}
public void deregister() {
try {
String address = getLocalAddress();
String localNodePath = getMasterPath();
String localNodePath = getCurrentNodePath();
registryClient.remove(localNodePath);
logger.info("master node : {} unRegistry to register center.", address);
heartBeatExecutor.shutdown();
@ -247,7 +232,7 @@ public class MasterRegistryClient {
/**
* get master path
*/
private String getMasterPath() {
private String getCurrentNodePath() {
String address = getLocalAddress();
return REGISTRY_DOLPHINSCHEDULER_MASTERS + "/" + address;
}

114
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.rpc;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.CacheProcessor;
import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Master RPC Server, used to send/receive request to other system.
*/
@Service
public class MasterRPCServer implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(MasterRPCServer.class);
private NettyRemotingServer nettyRemotingServer;
@Autowired
private MasterConfig masterConfig;
@Autowired
private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
@Autowired
private TaskExecuteResponseProcessor taskExecuteResponseProcessor;
@Autowired
private TaskEventProcessor taskEventProcessor;
@Autowired
private StateEventProcessor stateEventProcessor;
@Autowired
private CacheProcessor cacheProcessor;
@Autowired
private TaskKillResponseProcessor taskKillResponseProcessor;
@Autowired
private TaskRecallProcessor taskRecallProcessor;
@Autowired
private LoggerRequestProcessor loggerRequestProcessor;
@PostConstruct
private void init() {
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.start();
}
public void start() {
logger.info("Starting Master RPC Server...");
this.nettyRemotingServer.start();
logger.info("Started Master RPC Server...");
}
@Override
public void close() {
logger.info("Closing Master RPC Server...");
this.nettyRemotingServer.close();
logger.info("Closed Master RPC Server...");
}
}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java

@ -62,7 +62,7 @@ public class EventExecuteService extends Thread {
}
private void eventHandler() {
for (WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) {
for (WorkflowExecuteRunnable workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) {
workflowExecuteThreadPool.executeEvent(workflowExecuteThread);
}
}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java

@ -56,6 +56,8 @@ public class FailoverExecuteThread extends Thread {
logger.info("failover execute thread started");
while (Stopper.isRunning()) {
try {
// todo: DO we need to schedule a task to do this kind of check
// This kind of check may only need to be executed when a master server start
failoverService.checkMasterFailover();
} catch (Exception e) {
logger.error("failover execute error", e);

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -165,7 +165,7 @@ public class MasterSchedulerService extends Thread {
continue;
}
WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(
WorkflowExecuteRunnable workflowExecuteThread = new WorkflowExecuteRunnable(
processInstance
, processService
, nettyExecutorManager

24
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
import org.apache.hadoop.util.ThreadUtil;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
@ -112,7 +113,7 @@ public class StateWheelExecuteThread extends Thread {
if (processInstanceId == null) {
continue;
}
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId);
processInstanceTimeoutCheckList.remove(processInstanceId);
@ -219,20 +220,21 @@ public class StateWheelExecuteThread extends Thread {
int processInstanceId = taskInstanceKey.getProcessInstanceId();
long taskCode = taskInstanceKey.getTaskCode();
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
continue;
}
TaskInstance taskInstance = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
if (taskInstance == null) {
Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
if (!taskInstanceOptional.isPresent()) {
logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
continue;
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
@ -252,7 +254,7 @@ public class StateWheelExecuteThread extends Thread {
int processInstanceId = taskInstanceKey.getProcessInstanceId();
long taskCode = taskInstanceKey.getTaskCode();
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
@ -261,7 +263,7 @@ public class StateWheelExecuteThread extends Thread {
continue;
}
TaskInstance taskInstance = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
@ -270,13 +272,14 @@ public class StateWheelExecuteThread extends Thread {
break;
}
if (taskInstance == null) {
if (!taskInstanceOptional.isPresent()) {
logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceRetryCheckList.remove(taskInstanceKey);
continue;
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.retryTaskIntervalOverTime()) {
// reset taskInstance endTime and state
// todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
@ -297,20 +300,21 @@ public class StateWheelExecuteThread extends Thread {
int processInstanceId = taskInstanceKey.getProcessInstanceId();
long taskCode = taskInstanceKey.getTaskCode();
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceStateCheckList.remove(taskInstanceKey);
continue;
}
TaskInstance taskInstance = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
if (taskInstance == null) {
Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
if (!taskInstanceOptional.isPresent()) {
logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceStateCheckList.remove(taskInstanceKey);
continue;
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState().typeIsFinished()) {
continue;
}

89
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java → dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -87,6 +87,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -99,34 +100,34 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
/**
* master exec thread,split dag
* Workflow execute task, used to execute a workflow instance.
*/
public class WorkflowExecuteThread {
public class WorkflowExecuteRunnable implements Runnable {
/**
* logger of WorkflowExecuteThread
*/
private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteThread.class);
private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteRunnable.class);
/**
* master config
*/
private MasterConfig masterConfig;
private final MasterConfig masterConfig;
/**
* process service
*/
private ProcessService processService;
private final ProcessService processService;
/**
* alert manager
*/
private ProcessAlertManager processAlertManager;
private final ProcessAlertManager processAlertManager;
/**
* netty executor manager
*/
private NettyExecutorManager nettyExecutorManager;
private final NettyExecutorManager nettyExecutorManager;
/**
* process instance
@ -161,7 +162,7 @@ public class WorkflowExecuteThread {
/**
* task instance hash map, taskId as key
*/
private Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<>();
private final Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<>();
/**
* running taskProcessor, taskCode as key, taskProcessor as value
@ -173,34 +174,34 @@ public class WorkflowExecuteThread {
* valid task map, taskCode as key, taskId as value
* in a DAG, only one taskInstance per taskCode is valid
*/
private Map<Long, Integer> validTaskMap = new ConcurrentHashMap<>();
private final Map<Long, Integer> validTaskMap = new ConcurrentHashMap<>();
/**
* error task map, taskCode as key, taskInstanceId as value
* in a DAG, only one taskInstance per taskCode is valid
*/
private Map<Long, Integer> errorTaskMap = new ConcurrentHashMap<>();
private final Map<Long, Integer> errorTaskMap = new ConcurrentHashMap<>();
/**
* complete task map, taskCode as key, taskInstanceId as value
* in a DAG, only one taskInstance per taskCode is valid
*/
private Map<Long, Integer> completeTaskMap = new ConcurrentHashMap<>();
private final Map<Long, Integer> completeTaskMap = new ConcurrentHashMap<>();
/**
* depend failed task map, taskCode as key, taskId as value
*/
private Map<Long, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
private final Map<Long, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
/**
* forbidden task map, code as key
*/
private Map<Long, TaskNode> forbiddenTaskMap = new ConcurrentHashMap<>();
private final Map<Long, TaskNode> forbiddenTaskMap = new ConcurrentHashMap<>();
/**
* skip task map, code as key
*/
private Map<String, TaskNode> skipTaskNodeMap = new ConcurrentHashMap<>();
private final Map<String, TaskNode> skipTaskNodeMap = new ConcurrentHashMap<>();
/**
* complement date list
@ -210,27 +211,25 @@ public class WorkflowExecuteThread {
/**
* state event queue
*/
private ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();
/**
* ready to submit task queue
*/
private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
private final PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
/**
* wait to retry taskInstance map, taskCode as key, taskInstance as value
* before retry, the taskInstance id is 0
*/
private Map<Long, TaskInstance> waitToRetryTaskInstanceMap = new ConcurrentHashMap<>();
private final Map<Long, TaskInstance> waitToRetryTaskInstanceMap = new ConcurrentHashMap<>();
/**
* state wheel execute thread
*/
private StateWheelExecuteThread stateWheelExecuteThread;
private final StateWheelExecuteThread stateWheelExecuteThread;
/**
* constructor of WorkflowExecuteThread
*
* @param processInstance processInstance
* @param processService processService
* @param nettyExecutorManager nettyExecutorManager
@ -238,7 +237,7 @@ public class WorkflowExecuteThread {
* @param masterConfig masterConfig
* @param stateWheelExecuteThread stateWheelExecuteThread
*/
public WorkflowExecuteThread(ProcessInstance processInstance
public WorkflowExecuteRunnable(ProcessInstance processInstance
, ProcessService processService
, NettyExecutorManager nettyExecutorManager
, ProcessAlertManager processAlertManager
@ -401,7 +400,10 @@ public class WorkflowExecuteThread {
return true;
}
TaskInstance task = getTaskInstance(stateEvent.getTaskInstanceId());
Optional<TaskInstance> taskInstanceOptional = getTaskInstance(stateEvent.getTaskInstanceId());
TaskInstance task = taskInstanceOptional.orElseThrow(
() -> new RuntimeException("Cannot find task instance by task instance id: " + stateEvent.getTaskInstanceId()));
if (task.getState() == null) {
logger.error("task state is null, state handler error: {}", stateEvent);
return true;
@ -632,37 +634,37 @@ public class WorkflowExecuteThread {
/**
* get task instance from memory
*/
public TaskInstance getTaskInstance(int taskInstanceId) {
public Optional<TaskInstance> getTaskInstance(int taskInstanceId) {
if (taskInstanceMap.containsKey(taskInstanceId)) {
return taskInstanceMap.get(taskInstanceId);
return Optional.ofNullable(taskInstanceMap.get(taskInstanceId));
}
return null;
return Optional.empty();
}
public TaskInstance getTaskInstance(long taskCode) {
if (taskInstanceMap == null || taskInstanceMap.size() == 0) {
return null;
public Optional<TaskInstance> getTaskInstance(long taskCode) {
if (taskInstanceMap.isEmpty()) {
return Optional.empty();
}
for (TaskInstance taskInstance : taskInstanceMap.values()) {
if (taskInstance.getTaskCode() == taskCode) {
return taskInstance;
return Optional.of(taskInstance);
}
}
return null;
return Optional.empty();
}
public TaskInstance getActiveTaskInstanceByTaskCode(long taskCode) {
public Optional<TaskInstance> getActiveTaskInstanceByTaskCode(long taskCode) {
if (activeTaskProcessorMaps.containsKey(taskCode)) {
return activeTaskProcessorMaps.get(taskCode).taskInstance();
return Optional.ofNullable(activeTaskProcessorMaps.get(taskCode).taskInstance());
}
return null;
return Optional.empty();
}
public TaskInstance getRetryTaskInstanceByTaskCode(long taskCode) {
public Optional<TaskInstance> getRetryTaskInstanceByTaskCode(long taskCode) {
if (waitToRetryTaskInstanceMap.containsKey(taskCode)) {
return waitToRetryTaskInstanceMap.get(taskCode);
return Optional.ofNullable(waitToRetryTaskInstanceMap.get(taskCode));
}
return null;
return Optional.empty();
}
private boolean processStateChangeHandler(StateEvent stateEvent) {
@ -697,7 +699,9 @@ public class WorkflowExecuteThread {
private boolean processBlockHandler(StateEvent stateEvent) {
try {
TaskInstance task = getTaskInstance(stateEvent.getTaskInstanceId());
Optional<TaskInstance> taskInstanceOptional = getTaskInstance(stateEvent.getTaskInstanceId());
TaskInstance task = taskInstanceOptional.orElseThrow(
() -> new RuntimeException("Cannot find taskInstance by taskInstanceId:" + stateEvent.getTaskInstanceId()));
if (!checkTaskInstanceByStateEvent(stateEvent)) {
logger.error("task {} is not a blocking task", task.getTaskCode());
return false;
@ -789,9 +793,10 @@ public class WorkflowExecuteThread {
}
/**
* process start handle
* ProcessInstance start entrypoint.
*/
public void startProcess() {
@Override
public void run() {
if (this.taskInstanceMap.size() > 0) {
return;
}
@ -1304,9 +1309,9 @@ public class WorkflowExecuteThread {
List<TaskInstance> taskInstances = new ArrayList<>();
for (String taskNode : submitTaskNodeList) {
TaskNode taskNodeObject = dag.getNode(taskNode);
TaskInstance existTaskInstance = getTaskInstance(taskNodeObject.getCode());
if (existTaskInstance != null) {
taskInstances.add(existTaskInstance);
Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(taskNodeObject.getCode());
if (existTaskInstanceOptional.isPresent()) {
taskInstances.add(existTaskInstanceOptional.get());
continue;
}
TaskInstance task = createTaskInstance(processInstance, taskNodeObject);

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@ -66,7 +66,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
/**
* multi-thread filter, avoid handling workflow at the same time
*/
private ConcurrentHashMap<String, WorkflowExecuteThread> multiThreadFilterMap = new ConcurrentHashMap();
private ConcurrentHashMap<String, WorkflowExecuteRunnable> multiThreadFilterMap = new ConcurrentHashMap();
@PostConstruct
private void init() {
@ -80,7 +80,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
* submit state event
*/
public void submitStateEvent(StateEvent stateEvent) {
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
if (workflowExecuteThread == null) {
logger.warn("workflowExecuteThread is null, stateEvent:{}", stateEvent);
return;
@ -91,14 +91,14 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
/**
* start workflow
*/
public void startWorkflow(WorkflowExecuteThread workflowExecuteThread) {
submit(workflowExecuteThread::startProcess);
public void startWorkflow(WorkflowExecuteRunnable workflowExecuteThread) {
submit(workflowExecuteThread);
}
/**
* execute workflow
*/
public void executeEvent(WorkflowExecuteThread workflowExecuteThread) {
public void executeEvent(WorkflowExecuteRunnable workflowExecuteThread) {
if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
return;
}

29
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java

@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.server.master.service;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.StateEvent;
@ -37,12 +40,14 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -70,12 +75,14 @@ public class FailoverService {
/**
* check master failover
*/
@Counted(value = "failover_scheduler_check_task_count")
@Timed(value = "failover_scheduler_check_task_time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
public void checkMasterFailover() {
List<String> hosts = getNeedFailoverMasterServers();
if (CollectionUtils.isEmpty(hosts)) {
return;
}
LOGGER.info("need failover hosts:{}", hosts);
LOGGER.info("{} begin to failover hosts:{}", getLocalAddress(), hosts);
for (String host : hosts) {
failoverMasterWithLock(host);
@ -114,9 +121,9 @@ public class FailoverService {
}
/**
* failover master
* <p>
* failover process instance and associated task instance
* Failover master, will failover process instance and associated task instance.
* <p>When the process instance belongs to the given masterHost and the restartTime is before the current server start up time,
* then the process instance will be failovered.
*
* @param masterHost master host
*/
@ -125,11 +132,11 @@ public class FailoverService {
return;
}
Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost);
long startTime = System.currentTimeMillis();
StopWatch failoverTimeCost = StopWatch.createStarted();
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
LOGGER.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size());
LOGGER.info("start master[{}] failover, need to failover process list size:{}", masterHost, needFailoverProcessInstanceList.size());
// servers need to contains master hosts and worker hosts, otherwise the logic task will failover fail.
// servers need to contain master hosts and worker hosts, otherwise the logic task will failover fail.
List<Server> servers = registryClient.getServerList(NodeType.WORKER);
servers.addAll(registryClient.getServerList(NodeType.MASTER));
@ -155,7 +162,8 @@ public class FailoverService {
processService.processNeedFailoverProcessInstances(processInstance);
}
LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime);
failoverTimeCost.stop();
LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
}
/**
@ -248,7 +256,10 @@ public class FailoverService {
}
/**
* get need failover master servers
* Get need failover master servers.
* <p>
* Query the process instances from database, if the processInstance's host doesn't exist in registry
* or the host is the currentServer, then it will need to failover.
*
* @return need failover master servers
*/

19
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java → dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java

@ -21,7 +21,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.powermock.api.mockito.PowerMockito.mock;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -37,7 +36,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -70,10 +69,10 @@ import org.springframework.context.ApplicationContext;
* test for WorkflowExecuteThread
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({WorkflowExecuteThread.class})
public class WorkflowExecuteThreadTest {
@PrepareForTest({WorkflowExecuteRunnable.class})
public class WorkflowExecuteTaskTest {
private WorkflowExecuteThread workflowExecuteThread;
private WorkflowExecuteRunnable workflowExecuteThread;
private ProcessInstance processInstance;
@ -118,9 +117,9 @@ public class WorkflowExecuteThreadTest {
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
stateWheelExecuteThread = mock(StateWheelExecuteThread.class);
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, stateWheelExecuteThread));
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, null, null, config, stateWheelExecuteThread));
// prepareProcess init dag
Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
dag.setAccessible(true);
dag.set(workflowExecuteThread, new DAG());
PowerMockito.doNothing().when(workflowExecuteThread, "endProcess");
@ -132,7 +131,7 @@ public class WorkflowExecuteThreadTest {
Map<String, String> cmdParam = new HashMap<>();
cmdParam.put(CMD_PARAM_START_NODES, "1,2,3");
Mockito.when(processInstance.getCommandParam()).thenReturn(JSONUtils.toJsonString(cmdParam));
Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
Class<WorkflowExecuteRunnable> masterExecThreadClass = WorkflowExecuteRunnable.class;
Method method = masterExecThreadClass.getDeclaredMethod("parseStartNodeName", String.class);
method.setAccessible(true);
List<String> nodeNames = (List<String>) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam));
@ -158,7 +157,7 @@ public class WorkflowExecuteThreadTest {
Mockito.when(processService.findTaskInstanceByIdList(
Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId()))
).thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4));
Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
Class<WorkflowExecuteRunnable> masterExecThreadClass = WorkflowExecuteRunnable.class;
Method method = masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList", String.class);
method.setAccessible(true);
List<TaskInstance> taskInstances = (List<TaskInstance>) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam));
@ -202,7 +201,7 @@ public class WorkflowExecuteThreadTest {
completeTaskList.put(taskInstance1.getTaskCode(), taskInstance1.getId());
completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId());
Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
Class<WorkflowExecuteRunnable> masterExecThreadClass = WorkflowExecuteRunnable.class;
Field completeTaskMapField = masterExecThreadClass.getDeclaredField("completeTaskMap");
completeTaskMapField.setAccessible(true);

12
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java vendored

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.cache.impl;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import java.util.Collection;
@ -37,7 +37,7 @@ public class ProcessInstanceExecCacheManagerImplTest {
private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager;
@Mock
private WorkflowExecuteThread workflowExecuteThread;
private WorkflowExecuteRunnable workflowExecuteThread;
@Before
public void before() {
@ -47,7 +47,7 @@ public class ProcessInstanceExecCacheManagerImplTest {
@Test
public void testGetByProcessInstanceId() {
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1);
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1);
Assert.assertEquals("workflowExecuteThread1", workflowExecuteThread.getKey());
}
@ -59,20 +59,20 @@ public class ProcessInstanceExecCacheManagerImplTest {
@Test
public void testCacheNull() {
processInstanceExecCacheManager.cache(2, null);
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(2);
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(2);
Assert.assertNull(workflowExecuteThread);
}
@Test
public void testRemoveByProcessInstanceId() {
processInstanceExecCacheManager.removeByProcessInstanceId(1);
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1);
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1);
Assert.assertNull(workflowExecuteThread);
}
@Test
public void testGetAll() {
Collection<WorkflowExecuteThread> workflowExecuteThreads = processInstanceExecCacheManager.getAll();
Collection<WorkflowExecuteRunnable> workflowExecuteThreads = processInstanceExecCacheManager.getAll();
Assert.assertEquals(1, workflowExecuteThreads.size());
}
}

7
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java

@ -111,13 +111,6 @@ public class MasterRegistryClientTest {
masterRegistryClient.registry();
}
@Test
public void handleConnectionStateTest() {
masterRegistryClient.handleConnectionState(ConnectionState.CONNECTED);
masterRegistryClient.handleConnectionState(ConnectionState.RECONNECTED);
masterRegistryClient.handleConnectionState(ConnectionState.SUSPENDED);
}
@Test
public void removeNodePathTest() {
masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, false);

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java

@ -89,7 +89,7 @@ public class HeartBeatTask implements Runnable {
registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
}
} catch (Throwable ex) {
logger.error("error write heartbeat info", ex);
logger.error("HeartBeat task execute failed", ex);
}
}
}

3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.service.registry;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.dolphinscheduler.common.Constants.ADD_OP;
import static org.apache.dolphinscheduler.common.Constants.COLON;
import static org.apache.dolphinscheduler.common.Constants.DELETE_OP;
@ -29,8 +30,6 @@ import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE;
import static com.google.common.base.Preconditions.checkArgument;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;

Loading…
Cancel
Save