Browse Source

Refactor worker (#7)

* Refactor worker (#2000)

* Refactor worker (#2)

* Refactor worker (#1993)

* Refactor worker (#1)

* add TaskResponseProcessor (#1983)

* 1, master persistent task 2. extract  master and worker communication model (#1992)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* TaskExecutionContext create modify (#1994)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

* add- register processor

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* buildAckCommand taskInstanceId not set modify (#2002)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify (#2004)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment (#2006)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type (#2012)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>
pull/2/head
Tboy 5 years ago committed by GitHub
parent
commit
f0f8644753
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
  2. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
  3. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
  4. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
  5. 22
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java

@ -50,7 +50,7 @@ public class ExecutorDispatcher implements InitializingBean {
@Autowired @Autowired
private RoundRobinHostManager hostManager; private RoundRobinHostManager hostManager;
private final ConcurrentHashMap<ExecutorType, ExecutorManager> executorManagers; private final ConcurrentHashMap<ExecutorType, ExecutorManager<Boolean>> executorManagers;
public ExecutorDispatcher(){ public ExecutorDispatcher(){
this.executorManagers = new ConcurrentHashMap<>(); this.executorManagers = new ConcurrentHashMap<>();
@ -61,11 +61,11 @@ public class ExecutorDispatcher implements InitializingBean {
* @param context context * @param context context
* @throws ExecuteException * @throws ExecuteException
*/ */
public void dispatch(final ExecutionContext context) throws ExecuteException { public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
/** /**
* get executor manager * get executor manager
*/ */
ExecutorManager executorManager = this.executorManagers.get(context.getExecutorType()); ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
if(executorManager == null){ if(executorManager == null){
throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType()); throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
} }
@ -83,7 +83,7 @@ public class ExecutorDispatcher implements InitializingBean {
/** /**
* task execute * task execute
*/ */
executorManager.execute(context); return executorManager.execute(context);
} finally { } finally {
executorManager.afterExecute(context); executorManager.afterExecute(context);
} }

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java

@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
/** /**
* abstract executor manager * abstract executor manager
*/ */
public abstract class AbstractExecutorManager implements ExecutorManager{ public abstract class AbstractExecutorManager<T> implements ExecutorManager<T>{
/** /**
* before execute , add time monitor timeout * before execute , add time monitor timeout

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java

@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
/** /**
* executor manager * executor manager
*/ */
public interface ExecutorManager { public interface ExecutorManager<T> {
/** /**
* before execute * before execute
@ -37,7 +37,7 @@ public interface ExecutorManager {
* @param context context * @param context context
* @throws ExecuteException * @throws ExecuteException
*/ */
void execute(ExecutionContext context) throws ExecuteException; T execute(ExecutionContext context) throws ExecuteException;
/** /**
* after execute * after execute

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java

@ -46,7 +46,7 @@ import java.util.Set;
* netty executor manager * netty executor manager
*/ */
@Service @Service
public class NettyExecutorManager extends AbstractExecutorManager{ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class); private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class);
@ -64,6 +64,10 @@ public class NettyExecutorManager extends AbstractExecutorManager{
public NettyExecutorManager(){ public NettyExecutorManager(){
final NettyClientConfig clientConfig = new NettyClientConfig(); final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig); this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
/**
* register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor
* register EXECUTE_TASK_ACK command type TaskAckProcessor
*/
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor()); this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor()); this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
} }
@ -74,7 +78,7 @@ public class NettyExecutorManager extends AbstractExecutorManager{
* @throws ExecuteException * @throws ExecuteException
*/ */
@Override @Override
public void execute(ExecutionContext context) throws ExecuteException { public Boolean execute(ExecutionContext context) throws ExecuteException {
/** /**
* all nodes * all nodes
@ -118,6 +122,8 @@ public class NettyExecutorManager extends AbstractExecutorManager{
} }
} }
} }
return success;
} }
/** /**
@ -189,7 +195,7 @@ public class NettyExecutorManager extends AbstractExecutorManager{
break; break;
case CLIENT: case CLIENT:
break; break;
default: default:
throw new IllegalArgumentException("invalid executor type : " + executorType); throw new IllegalArgumentException("invalid executor type : " + executorType);
} }

22
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -20,22 +20,10 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.utils.BeanContext;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
@ -48,7 +36,6 @@ import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory; import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -138,14 +125,16 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
* dispatch task to worker * dispatch task to worker
* @param taskInstance * @param taskInstance
*/ */
public void dispatch(TaskInstance taskInstance){ private Boolean dispatch(TaskInstance taskInstance){
TaskExecutionContext context = getTaskExecutionContext(taskInstance); TaskExecutionContext context = getTaskExecutionContext(taskInstance);
ExecutionContext executionContext = new ExecutionContext(context, ExecutorType.WORKER); ExecutionContext executionContext = new ExecutionContext(context, ExecutorType.WORKER);
try { try {
dispatcher.dispatch(executionContext); return dispatcher.dispatch(executionContext);
} catch (ExecuteException e) { } catch (ExecuteException e) {
logger.error("execute exception", e); logger.error("execute exception", e);
return false;
} }
} }
/** /**
@ -234,8 +223,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
} }
if(submitDB && !submitQueue){ if(submitDB && !submitQueue){
// submit task to queue // submit task to queue
dispatch(task); submitQueue = dispatch(task);
submitQueue = true;
} }
if(submitDB && submitQueue){ if(submitDB && submitQueue){
return task; return task;

Loading…
Cancel
Save