From cc878505cdac486be5c3efd818e4c0ca9e51c0b4 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Tue, 17 Mar 2020 16:38:00 +0800 Subject: [PATCH] =?UTF-8?q?1=EF=BC=8Cmaster=20and=20worker=20register=20ip?= =?UTF-8?q?=20use=20OSUtils.getHost()=202=EF=BC=8CProcessInstance=20host?= =?UTF-8?q?=20set=20ip:port=20format=20(#2209)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * add TaskInstanceCacheManager receive Worker report result * TaskInstance setExecutePath * add TaskInstanceCacheManager to receive Worker Task result report * TaskInstanceCacheManager add remove method * add license * add dispatcht task method * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * 1,worker TaskPros use TaskExecutionContext replase 2,Master kill Task , KillTaskProcessor modify * worker remove db * ShellTask modify * master persistence processId and appIds * master persistence processId and appIds * master add kill task logic * master add kill task logic * master add kill task logic * javadoc error modify * remove chinese log * executeDirectly method add Override * remote module modify * TaskKillResponseProcessor command type modify * create buildKillCommand * host add host:port format * host add host:port format * TaskAckProcessor modify * TaskAckProcessor modify * task prioriry refator * remove ITaskQueue * task prioriry refator * remove ITaskQueue * TaskPriority refactor * remove logs * WorkerServer refactor * MasterSchedulerService modify * WorkerConfig listen port modify * modify master and worker listen port * cancelTaskInstance set TaskExecutionContext host,logPath,executePath * cancelTaskInstance set TaskExecutionContext host,logPath,executePath * Encapsulate the parameters required by sqltask * 1,Encapsulate the parameters required by sqltask 2,SQLTask optimization * AbstractTask modify * ProcedureTask optimization * MasterSchedulerService modify * TaskUpdateQueueConsumer modify * test * DataxTask process run debug * DataxTask process run debug * add protobuf dependency,MR、Spark task etc need this * TaskUpdateQueueConsumer modify * TaskExecutionContextBuilder set TaskInstance workgroup * WorkerGroupService queryAllGroup modify query available work group * 1,get workergroup from zk modify 2,SpringConnectionFactory repeat load modify * master and worker register ip use OSUtils.getHost() * ProcessInstance host set ip:port format Co-authored-by: qiaozhanwei --- .../server/master/registry/MasterRegistry.java | 5 ++--- .../server/master/runner/MasterSchedulerService.java | 10 +++++++++- .../server/worker/registry/WorkerRegistry.java | 5 ++--- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java index 0402520e57..b6582981f2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java @@ -21,7 +21,6 @@ import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; @@ -79,7 +78,7 @@ public class MasterRegistry { * registry */ public void registry() { - String address = Constants.LOCAL_ADDRESS; + String address = OSUtils.getHost(); String localNodePath = getMasterPath(); zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { @@ -125,7 +124,7 @@ public class MasterRegistry { * @return */ private String getLocalAddress(){ - return Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort(); + return OSUtils.getHost() + ":" + masterConfig.getListenPort(); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 4ba0d489c2..2a0e0a97fb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -76,6 +76,7 @@ public class MasterSchedulerService extends Thread { */ private ThreadPoolExecutor masterExecService; + /** * constructor of MasterSchedulerThread */ @@ -122,7 +123,10 @@ public class MasterSchedulerService extends Thread { logger.info(String.format("find one command: id: %d, type: %s", command.getId(),command.getCommandType().toString())); try{ - ProcessInstance processInstance = processService.handleCommand(logger, OSUtils.getHost(), this.masterConfig.getMasterExecThreads() - activeCount, command); + + ProcessInstance processInstance = processService.handleCommand(logger, + getLocalAddress(), + this.masterConfig.getMasterExecThreads() - activeCount, command); if (processInstance != null) { logger.info("start master exec thread , split DAG ..."); masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient)); @@ -143,4 +147,8 @@ public class MasterSchedulerService extends Thread { } } } + + private String getLocalAddress(){ + return OSUtils.getHost() + ":" + masterConfig.getListenPort(); + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index b42386a200..4d723404a5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -22,7 +22,6 @@ import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -86,7 +85,7 @@ public class WorkerRegistry { * registry */ public void registry() { - String address = Constants.LOCAL_ADDRESS; + String address = OSUtils.getHost(); String localNodePath = getWorkerPath(); zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { @@ -142,7 +141,7 @@ public class WorkerRegistry { * @return */ private String getLocalAddress(){ - return Constants.LOCAL_ADDRESS + ":" + workerConfig.getListenPort(); + return OSUtils.getHost() + ":" + workerConfig.getListenPort(); } /**