executorManagers;
+
+ public ExecutorDispatcher(){
+ this.executorManagers = new ConcurrentHashMap<>();
+ }
+
+ public void dispatch(final ExecutionContext executeContext) throws ExecuteException {
+ ExecutorManager executorManager = this.executorManagers.get(executeContext.getExecutorType());
+ if(executorManager == null){
+ throw new ExecuteException("no ExecutorManager for type : " + executeContext.getExecutorType());
+ }
+ Host host = hostManager.select(executeContext);
+ if (StringUtils.isEmpty(host.getAddress())) {
+ throw new ExecuteException(String.format("fail to execute : %s due to no worker ", executeContext.getContext()));
+ }
+ executeContext.setHost(host);
+ executorManager.beforeExecute(executeContext);
+ try {
+ executorManager.execute(executeContext);
+ } finally {
+ executorManager.afterExecute(executeContext);
+ }
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ register(ExecutorType.WORKER, nettyExecutorManager);
+ register(ExecutorType.CLIENT, nettyExecutorManager);
+ }
+
+ public void register(ExecutorType type, ExecutorManager executorManager){
+ executorManagers.put(type, executorManager);
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
new file mode 100644
index 0000000000..4bccba0d7a
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.master.dispatch.context;
+
+
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+
+public class ExecutionContext {
+
+ private Host host;
+
+ private final Object context;
+
+ private final ExecutorType executorType;
+
+ public ExecutionContext(Object context, ExecutorType executorType) {
+ this.context = context;
+ this.executorType = executorType;
+ }
+
+ public ExecutorType getExecutorType() {
+ return executorType;
+ }
+
+ public Object getContext() {
+ return context;
+ }
+
+ public Host getHost() {
+ return host;
+ }
+
+ public void setHost(Host host) {
+ this.host = host;
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java
similarity index 79%
rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java
index 316ce36d5d..70aaeaeda2 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/enums/ExecutorType.java
@@ -14,14 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.dolphinscheduler.server.master.dispatch.enums;
-package org.apache.dolphinscheduler.server.master.host;
+public enum ExecutorType {
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
-
-public interface HostManager {
-
- Host select(TaskExecutionContext context);
+ WORKER,
+ CLIENT;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java
new file mode 100644
index 0000000000..d8ca50a9f9
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/ExecuteException.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.exceptions;
+
+
+public class ExecuteException extends Exception{
+
+ public ExecuteException() {
+ super();
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message. The
+ * cause is not initialized, and may subsequently be initialized by
+ * a call to {@link #initCause}.
+ *
+ * @param message the detail message. The detail message is saved for
+ * later retrieval by the {@link #getMessage()} method.
+ */
+ public ExecuteException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and
+ * cause. Note that the detail message associated with
+ * {@code cause} is not automatically incorporated in
+ * this exception's detail message.
+ *
+ * @param message the detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method).
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A null value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ * @since 1.4
+ */
+ public ExecuteException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new exception with the specified cause and a detail
+ * message of (cause==null ? null : cause.toString()) (which
+ * typically contains the class and detail message of cause).
+ * This constructor is useful for exceptions that are little more than
+ * wrappers for other throwables (for example, {@link
+ * java.security.PrivilegedActionException}).
+ *
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A null value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ * @since 1.4
+ */
+ public ExecuteException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message,
+ * cause, suppression enabled or disabled, and writable stack
+ * trace enabled or disabled.
+ *
+ * @param message the detail message.
+ * @param cause the cause. (A {@code null} value is permitted,
+ * and indicates that the cause is nonexistent or unknown.)
+ * @param enableSuppression whether or not suppression is enabled
+ * or disabled
+ * @param writableStackTrace whether or not the stack trace should
+ * be writable
+ * @since 1.7
+ */
+ protected ExecuteException(String message, Throwable cause,
+ boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
new file mode 100644
index 0000000000..65ed15eb50
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.executor;
+
+import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+
+
+public abstract class AbstractExecutorManager implements ExecutorManager{
+
+ @Override
+ public void beforeExecute(ExecutionContext executeContext) throws ExecuteException {
+ //TODO add time monitor
+ }
+
+ @Override
+ public void afterExecute(ExecutionContext executeContext) throws ExecuteException {
+ //TODO add dispatch monitor
+
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
new file mode 100644
index 0000000000..98d391e7ea
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.executor;
+
+import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+
+
+public interface ExecutorManager {
+
+ void beforeExecute(ExecutionContext executeContext) throws ExecuteException;
+
+ void execute(ExecutionContext executeContext) throws ExecuteException;
+
+ void afterExecute(ExecutionContext executeContext) throws ExecuteException;
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
new file mode 100644
index 0000000000..dac8d791f4
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.executor;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
+import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+
+@Service
+public class NettyExecutorManager extends AbstractExecutorManager{
+
+ private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class);
+
+ @Autowired
+ private ZookeeperNodeManager zookeeperNodeManager;
+
+ private final NettyRemotingClient nettyRemotingClient;
+
+ public NettyExecutorManager(){
+ final NettyClientConfig clientConfig = new NettyClientConfig();
+ this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
+ }
+
+ @Override
+ public void execute(ExecutionContext executeContext) throws ExecuteException {
+ Set allNodes = getAllNodes(executeContext);
+ Set failNodeSet = new HashSet<>();
+ //
+ Command command = buildCommand(executeContext);
+ Host host = executeContext.getHost();
+ boolean success = false;
+ //
+ while (!success) {
+ try {
+ doExecute(host, command);
+ success = true;
+ executeContext.setHost(host);
+ } catch (ExecuteException ex) {
+ logger.error(String.format("execute context : %s error", executeContext.getContext()), ex);
+ try {
+ failNodeSet.add(host.getAddress());
+ Set tmpAllIps = new HashSet<>(allNodes);
+ Collection remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
+ if (remained != null && remained.size() > 0) {
+ host = Host.of(remained.iterator().next());
+ logger.error("retry execute context : {} host : {}", executeContext.getContext(), host);
+ } else {
+ throw new ExecuteException("fail after try all nodes");
+ }
+ } catch (Throwable t) {
+ throw new ExecuteException("fail after try all nodes");
+ }
+ }
+ }
+ }
+
+ private Command buildCommand(ExecutionContext context) {
+ ExecuteTaskRequestCommand requestCommand = new ExecuteTaskRequestCommand();
+ ExecutorType executorType = context.getExecutorType();
+ switch (executorType){
+ case WORKER:
+ TaskExecutionContext taskExecutionContext = (TaskExecutionContext)context.getContext();
+ requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
+ break;
+ case CLIENT:
+ break;
+ default:
+ throw new IllegalArgumentException("invalid executor type : " + executorType);
+
+ }
+ return requestCommand.convert2Command();
+ }
+
+ private void doExecute(final Host host, final Command command) throws ExecuteException {
+ int retryCount = 3;
+ boolean success = false;
+ do {
+ try {
+ nettyRemotingClient.send(host, command);
+ success = true;
+ } catch (Exception ex) {
+ logger.error(String.format("send command : %s to %s error", command, host), ex);
+ retryCount--;
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignore) {}
+ }
+ } while (retryCount >= 0 && !success);
+
+ if (!success) {
+ throw new ExecuteException(String.format("send command : %s to %s error", command, host));
+ }
+ }
+
+ private Set getAllNodes(ExecutionContext context){
+ Set nodes = Collections.EMPTY_SET;
+ ExecutorType executorType = context.getExecutorType();
+ switch (executorType){
+ case WORKER:
+ nodes = zookeeperNodeManager.getWorkerNodes();
+ break;
+ case CLIENT:
+ break;
+ default:
+ throw new IllegalArgumentException("invalid executor type : " + executorType);
+
+ }
+ return nodes;
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
new file mode 100644
index 0000000000..87082738da
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.dispatch.host;
+
+
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+
+public interface HostManager {
+
+ Host select(ExecutionContext context);
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/RoundRobinHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
similarity index 62%
rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/RoundRobinHostManager.java
rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
index 18a4659c13..1c222b84af 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/RoundRobinHostManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
@@ -15,11 +15,14 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.host;
+package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
-import org.apache.dolphinscheduler.server.master.host.assign.RoundRobinSelector;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector;
+import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,21 +39,36 @@ public class RoundRobinHostManager implements HostManager {
private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class);
- @Autowired
- private RoundRobinSelector selector;
-
@Autowired
private ZookeeperNodeManager zookeeperNodeManager;
+ private final Selector selector;
+
+ public RoundRobinHostManager(){
+ this.selector = new RoundRobinSelector<>();
+ }
+
@Override
- public Host select(TaskExecutionContext context){
+ public Host select(ExecutionContext context){
Host host = new Host();
- Collection nodes = zookeeperNodeManager.getWorkerNodes();
+ Collection nodes = null;
+ ExecutorType executorType = context.getExecutorType();
+ switch (executorType){
+ case WORKER:
+ nodes = zookeeperNodeManager.getWorkerNodes();
+ break;
+ case CLIENT:
+ break;
+ default:
+ throw new IllegalArgumentException("invalid executorType : " + executorType);
+
+ }
if(CollectionUtils.isEmpty(nodes)){
return host;
}
List candidateHosts = new ArrayList<>(nodes.size());
nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
+
return selector.select(candidateHosts);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
similarity index 95%
rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java
rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
index 3a3f1237bf..cf8c0e84d4 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.host.assign;
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import java.util.Collection;
import java.util.Random;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RoundRobinSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
similarity index 91%
rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RoundRobinSelector.java
rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
index d3422963b0..90319de122 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RoundRobinSelector.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
@@ -14,12 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.host.assign;
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
+
+import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
-
+@Service
public class RoundRobinSelector implements Selector {
private final AtomicInteger index = new AtomicInteger(0);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/Selector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
similarity index 92%
rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/Selector.java
rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
index c6772f3e03..bd7c4ac5b9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/Selector.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.host.assign;
+package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import java.util.Collection;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
new file mode 100644
index 0000000000..1103b2310a
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.processor;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.utils.Preconditions;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * task ack processor
+ */
+public class TaskAckProcessor implements NettyRequestProcessor {
+
+ private final Logger logger = LoggerFactory.getLogger(TaskAckProcessor.class);
+
+ /**
+ * process service
+ */
+ private final ProcessService processService;
+
+ public TaskAckProcessor(ProcessService processService){
+ this.processService = processService;
+ }
+
+ @Override
+ public void process(Channel channel, Command command) {
+ Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
+ ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class);
+ logger.info("taskAckCommand : {}",taskAckCommand);
+ processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
+ taskAckCommand.getStartTime(),
+ taskAckCommand.getHost(),
+ taskAckCommand.getExecutePath(),
+ taskAckCommand.getLogPath(),
+ taskAckCommand.getTaskInstanceId());
+ }
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index c3b6a05676..b62bb773d9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -25,7 +25,6 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
-import org.apache.dolphinscheduler.server.master.future.TaskFuture;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +58,6 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
logger.info("received command : {}", command);
ExecuteTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskResponseCommand.class);
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()), responseCommand.getEndTime(), responseCommand.getTaskInstanceId());
- TaskFuture.notify(command);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index 7106cc6240..f675493bf3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -32,16 +32,23 @@ import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
-import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.future.InvokeCallback;
+import org.apache.dolphinscheduler.remote.future.ResponseFuture;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
+import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
+import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
+import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.Callable;
@@ -92,9 +99,9 @@ public class MasterBaseTaskExecThread implements Callable {
/**
- * netty remoting client
+ * executor dispatcher
*/
- private static final NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(new NettyClientConfig());
+ private ExecutorDispatcher dispatcher;
/**
* constructor of MasterBaseTaskExecThread
@@ -102,13 +109,14 @@ public class MasterBaseTaskExecThread implements Callable {
* @param processInstance process instance
*/
public MasterBaseTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
- this.processService = BeanContext.getBean(ProcessService.class);
- this.alertDao = BeanContext.getBean(AlertDao.class);
+ this.processService = SpringApplicationContext.getBean(ProcessService.class);
+ this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
this.processInstance = processInstance;
this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
this.cancel = false;
this.taskInstance = taskInstance;
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
+ this.dispatcher = SpringApplicationContext.getBean(ExecutorDispatcher.class);
}
/**
@@ -126,30 +134,17 @@ public class MasterBaseTaskExecThread implements Callable {
this.cancel = true;
}
-
- // TODO send task to worker
- public void sendToWorker(TaskInstance taskInstance){
- final Address address = new Address("127.0.0.1", 12346);
-
- ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(
- FastJsonSerializer.serializeToString(getTaskExecutionContext(taskInstance)));
+ /**
+ * dispatch task to worker
+ * @param taskInstance
+ */
+ public void dispatch(TaskInstance taskInstance){
+ TaskExecutionContext context = getTaskExecutionContext(taskInstance);
+ ExecutionContext executionContext = new ExecutionContext(context, ExecutorType.WORKER);
try {
- Command responseCommand = nettyRemotingClient.sendSync(address,
- taskRequestCommand.convert2Command(), 2000);
-
- ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(
- responseCommand.getBody(), ExecuteTaskAckCommand.class);
-
- logger.info("taskAckCommand : {}",taskAckCommand);
- processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
- taskAckCommand.getStartTime(),
- taskAckCommand.getHost(),
- taskAckCommand.getExecutePath(),
- taskAckCommand.getLogPath(),
- taskInstance.getId());
-
- } catch (InterruptedException | RemotingException ex) {
- logger.error(String.format("send command to : %s error", address), ex);
+ dispatcher.dispatch(executionContext);
+ } catch (ExecuteException e) {
+ logger.error("execute exception", e);
}
}
@@ -239,7 +234,7 @@ public class MasterBaseTaskExecThread implements Callable {
}
if(submitDB && !submitQueue){
// submit task to queue
- sendToWorker(task);
+ dispatch(task);
submitQueue = true;
}
if(submitDB && submitQueue){
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index b8bf1c9074..d0f49272bd 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -33,10 +33,6 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
-import org.apache.dolphinscheduler.remote.command.Command;
-import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
-import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
-import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
index e3eacafa84..c7a2d0bdfd 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
@@ -35,7 +35,7 @@ import java.util.concurrent.locks.ReentrantLock;
@Service
-public abstract class ZookeeperNodeManager implements InitializingBean {
+public class ZookeeperNodeManager implements InitializingBean {
private final Logger logger = LoggerFactory.getLogger(ZookeeperNodeManager.class);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
index 96b8424b55..3364a94a23 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
@@ -47,7 +47,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
-
+ init();
}
public void init() {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
index cd62e98a9b..632d2f7624 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
@@ -76,7 +76,7 @@ public class TaskCallbackService {
*/
public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
- callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command(callbackChannel.getOpaque()));
+ callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command());
}
/**
@@ -87,8 +87,7 @@ public class TaskCallbackService {
*/
public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
- callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command(
- callbackChannel.getOpaque())).addListener(new ChannelFutureListener(){
+ callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future) throws Exception {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
index 038b8ef7de..39dc136de7 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
@@ -79,9 +79,9 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
command.getBody(), ExecuteTaskRequestCommand.class);
- String taskInstanceJson = taskRequestCommand.getTaskInfoJson();
+ String contextJson = taskRequestCommand.getTaskExecutionContext();
- TaskExecutionContext taskExecutionContext = JSONObject.parseObject(taskInstanceJson, TaskExecutionContext.class);
+ TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
@@ -92,7 +92,7 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
} catch (Exception ex){
logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
}
- taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskId(),
+ taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskInstanceId(),
new CallbackChannel(channel, command.getOpaque()));
// submit task
@@ -110,6 +110,6 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(),
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskId());
+ taskExecutionContext.getTaskInstanceId());
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
index c54842b921..b288aeace3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
@@ -93,12 +93,12 @@ public class TaskScheduleThread implements Runnable {
@Override
public void run() {
- ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskId());
+ ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskInstanceId());
try {
// tell master that task is in executing
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext.getTaskType());
- taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskId(), ackCommand);
+ taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
logger.info("script path : {}", taskExecutionContext.getExecutePath());
// task node
@@ -118,7 +118,7 @@ public class TaskScheduleThread implements Runnable {
taskExecutionContext.getScheduleTime(),
taskExecutionContext.getTaskName(),
taskExecutionContext.getTaskType(),
- taskExecutionContext.getTaskId(),
+ taskExecutionContext.getTaskInstanceId(),
CommonUtils.getSystemEnvPath(),
taskExecutionContext.getTenantCode(),
taskExecutionContext.getQueue(),
@@ -132,13 +132,13 @@ public class TaskScheduleThread implements Runnable {
taskProps.setTaskAppId(String.format("%s_%s_%s",
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskId()));
+ taskExecutionContext.getTaskInstanceId()));
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskId()));
+ taskExecutionContext.getTaskInstanceId()));
task = TaskManager.newTask(taskExecutionContext.getTaskType(),
taskProps,
@@ -156,14 +156,14 @@ public class TaskScheduleThread implements Runnable {
//
responseCommand.setStatus(task.getExitStatus().getCode());
responseCommand.setEndTime(new Date());
- logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskId(), task.getExitStatus());
+ logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
}catch (Exception e){
logger.error("task scheduler failure", e);
kill();
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
responseCommand.setEndTime(new Date());
} finally {
- taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskId(), responseCommand);
+ taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand);
}
}
@@ -213,13 +213,13 @@ public class TaskScheduleThread implements Runnable {
return baseLog + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
- taskExecutionContext.getTaskId() + ".log";
+ taskExecutionContext.getTaskInstanceId() + ".log";
}
return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
baseLog + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
- taskExecutionContext.getTaskId() + ".log";
+ taskExecutionContext.getTaskInstanceId() + ".log";
}
/**
@@ -325,9 +325,9 @@ public class TaskScheduleThread implements Runnable {
* @throws Exception exception
*/
private void checkDownloadPermission(List projectRes) throws Exception {
- int userId = taskExecutionContext.getExecutorId();
+ int executorId = taskExecutionContext.getExecutorId();
String[] resNames = projectRes.toArray(new String[projectRes.size()]);
- PermissionCheck permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger);
+ PermissionCheck permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,executorId,logger);
permissionCheck.checkPermission();
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
index 5daf535625..c979eb25ec 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
@@ -16,12 +16,11 @@
*/
package org.apache.dolphinscheduler.service.log;
-import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.log.*;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
-import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,7 +72,7 @@ public class LogClientService {
logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit);
RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
String result = "";
- final Address address = new Address(host, port);
+ final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, logRequestTimeout);
@@ -101,7 +100,7 @@ public class LogClientService {
logger.info("view log path {}", path);
ViewLogRequestCommand request = new ViewLogRequestCommand(path);
String result = "";
- final Address address = new Address(host, port);
+ final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, logRequestTimeout);
@@ -129,7 +128,7 @@ public class LogClientService {
logger.info("log path {}", path);
GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path);
byte[] result = null;
- final Address address = new Address(host, port);
+ final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, logRequestTimeout);