Browse Source

WorkerServer refactor (#2106)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify

* worker remove db

* ShellTask modify

* master persistence processId and appIds

* master persistence processId and appIds

* master add kill task logic

* master add kill task logic

* master add kill task logic

* javadoc error modify

* remove chinese log

* executeDirectly method add Override

* remote module modify

* TaskKillResponseProcessor command type modify

* create buildKillCommand

* host add host:port format

* host add host:port format

* TaskAckProcessor modify

* TaskAckProcessor modify

* task prioriry refator

* remove ITaskQueue

* task prioriry refator

* remove ITaskQueue

* TaskPriority refactor

* remove logs

* WorkerServer refactor

Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
pull/2/head
qiaozhanwei 4 years ago committed by GitHub
parent
commit
043df7e189
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 33
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

33
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -52,16 +52,6 @@ public class WorkerServer {
private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class);
/**
* fetch task executor service
*/
private ExecutorService fetchTaskExecutorService;
/**
* CountDownLatch latch
*/
private CountDownLatch latch;
/**
* worker config
*/
@ -120,9 +110,6 @@ public class WorkerServer {
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), workerConfig.getWorkerGroup());
this.workerRegistry.registry();
this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
/**
* register hooks, which are called before the process exits
*/
@ -132,13 +119,6 @@ public class WorkerServer {
close("shutdownHook");
}
}));
//let the main thread await
latch = new CountDownLatch(1);
try {
latch.await();
} catch (InterruptedException ignore) {
}
}
public void close(String cause) {
@ -169,19 +149,6 @@ public class WorkerServer {
}catch (Exception e){
logger.warn("threadPool service stopped exception:{}",e.getMessage());
}
logger.info("threadPool service stopped");
try {
fetchTaskExecutorService.shutdownNow();
}catch (Exception e){
logger.warn("worker fetch task service stopped exception:{}",e.getMessage());
}
logger.info("worker fetch task service stopped");
latch.countDown();
logger.info("zookeeper service stopped");
} catch (Exception e) {
logger.error("worker server stop exception ", e);
System.exit(-1);

Loading…
Cancel
Save