Browse Source

1,master and worker register ip use OSUtils.getHost() 2,ProcessInstance host set ip:port format (#2209)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify

* worker remove db

* ShellTask modify

* master persistence processId and appIds

* master persistence processId and appIds

* master add kill task logic

* master add kill task logic

* master add kill task logic

* javadoc error modify

* remove chinese log

* executeDirectly method add Override

* remote module modify

* TaskKillResponseProcessor command type modify

* create buildKillCommand

* host add host:port format

* host add host:port format

* TaskAckProcessor modify

* TaskAckProcessor modify

* task prioriry refator

* remove ITaskQueue

* task prioriry refator

* remove ITaskQueue

* TaskPriority refactor

* remove logs

* WorkerServer refactor

* MasterSchedulerService modify

* WorkerConfig listen port modify

* modify master and worker listen port

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* Encapsulate the parameters required by sqltask

* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization

* AbstractTask modify

* ProcedureTask optimization

* MasterSchedulerService modify

* TaskUpdateQueueConsumer modify

* test

* DataxTask process run debug

* DataxTask process run debug

* add protobuf dependency,MR、Spark task etc need this

* TaskUpdateQueueConsumer modify

* TaskExecutionContextBuilder set TaskInstance workgroup

* WorkerGroupService queryAllGroup modify
query available work group

* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify

* master and worker register ip  use OSUtils.getHost()

* ProcessInstance host set ip:port format

Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
pull/2/head
qiaozhanwei 4 years ago committed by GitHub
parent
commit
cc878505cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
  2. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  3. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java

@ -21,7 +21,6 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
@ -79,7 +78,7 @@ public class MasterRegistry {
* registry
*/
public void registry() {
String address = Constants.LOCAL_ADDRESS;
String address = OSUtils.getHost();
String localNodePath = getMasterPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
@ -125,7 +124,7 @@ public class MasterRegistry {
* @return
*/
private String getLocalAddress(){
return Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort();
return OSUtils.getHost() + ":" + masterConfig.getListenPort();
}
/**

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -76,6 +76,7 @@ public class MasterSchedulerService extends Thread {
*/
private ThreadPoolExecutor masterExecService;
/**
* constructor of MasterSchedulerThread
*/
@ -122,7 +123,10 @@ public class MasterSchedulerService extends Thread {
logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString()));
try{
ProcessInstance processInstance = processService.handleCommand(logger, OSUtils.getHost(), this.masterConfig.getMasterExecThreads() - activeCount, command);
ProcessInstance processInstance = processService.handleCommand(logger,
getLocalAddress(),
this.masterConfig.getMasterExecThreads() - activeCount, command);
if (processInstance != null) {
logger.info("start master exec thread , split DAG ...");
masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient));
@ -143,4 +147,8 @@ public class MasterSchedulerService extends Thread {
}
}
}
private String getLocalAddress(){
return OSUtils.getHost() + ":" + masterConfig.getListenPort();
}
}

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -22,7 +22,6 @@ import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@ -86,7 +85,7 @@ public class WorkerRegistry {
* registry
*/
public void registry() {
String address = Constants.LOCAL_ADDRESS;
String address = OSUtils.getHost();
String localNodePath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
@ -142,7 +141,7 @@ public class WorkerRegistry {
* @return
*/
private String getLocalAddress(){
return Constants.LOCAL_ADDRESS + ":" + workerConfig.getListenPort();
return OSUtils.getHost() + ":" + workerConfig.getListenPort();
}
/**

Loading…
Cancel
Save