diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java similarity index 97% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java index 3fb58fe3da..783d166e96 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote.command; +package org.apache.dolphinscheduler.remote.entity; import java.io.Serializable; import java.util.Date; @@ -23,7 +23,7 @@ import java.util.Date; /** * master/worker task transport */ -public class TaskInfo implements Serializable{ +public class TaskExecutionContext implements Serializable{ /** * task instance id @@ -229,7 +229,7 @@ public class TaskInfo implements Serializable{ @Override public String toString() { - return "TaskInfo{" + + return "TaskExecutionContext{" + "taskId=" + taskId + ", taskName='" + taskName + '\'' + ", startTime=" + startTime + diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/Host.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/Host.java new file mode 100644 index 0000000000..57e64c1446 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/Host.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.host; + + +import java.util.Objects; + + +public class Host { + + private String address; + + private String ip; + + private int port; + + public Host() { + } + + public Host(String ip, int port) { + this.ip = ip; + this.port = port; + this.address = ip + ":" + port; + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + this.address = ip + ":" + port; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + this.address = ip + ":" + port; + } + + public static Host of(String address){ + String[] parts = address.split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException(String.format("Address : %s illegal.", address)); + } + Host host = new Host(parts[0], Integer.parseInt(parts[1])); + return host; + } + + @Override + public String toString() { + return "Host{" + + "address='" + address + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Host host = (Host) o; + return Objects.equals(getAddress(), host.getAddress()); + } + + @Override + public int hashCode() { + return Objects.hash(getAddress()); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java new file mode 100644 index 0000000000..316ce36d5d --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.host; + + +import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; + +public interface HostManager { + + Host select(TaskExecutionContext context); + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/RoundRobinHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/RoundRobinHostManager.java new file mode 100644 index 0000000000..18a4659c13 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/RoundRobinHostManager.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.host; + +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.master.host.assign.RoundRobinSelector; +import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + + +@Service +public class RoundRobinHostManager implements HostManager { + + private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class); + + @Autowired + private RoundRobinSelector selector; + + @Autowired + private ZookeeperNodeManager zookeeperNodeManager; + + @Override + public Host select(TaskExecutionContext context){ + Host host = new Host(); + Collection nodes = zookeeperNodeManager.getWorkerNodes(); + if(CollectionUtils.isEmpty(nodes)){ + return host; + } + List candidateHosts = new ArrayList<>(nodes.size()); + nodes.stream().forEach(node -> candidateHosts.add(Host.of(node))); + return selector.select(candidateHosts); + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java new file mode 100644 index 0000000000..3a3f1237bf --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.host.assign; + +import java.util.Collection; +import java.util.Random; + + +public class RandomSelector implements Selector { + + private final Random random = new Random(); + + @Override + public T select(final Collection source) { + + if (source == null || source.size() == 0) { + throw new IllegalArgumentException("Empty source."); + } + + if (source.size() == 1) { + return (T) source.toArray()[0]; + } + + int size = source.size(); + int randomIndex = random.nextInt(size); + + return (T) source.toArray()[randomIndex]; + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RoundRobinSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RoundRobinSelector.java new file mode 100644 index 0000000000..d3422963b0 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RoundRobinSelector.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.master.host.assign; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; + + +public class RoundRobinSelector implements Selector { + + private final AtomicInteger index = new AtomicInteger(0); + + @Override + public T select(Collection source) { + if (source == null || source.size() == 0) { + throw new IllegalArgumentException("Empty source."); + } + + if (source.size() == 1) { + return (T)source.toArray()[0]; + } + + int size = source.size(); + return (T) source.toArray()[index.getAndIncrement() % size]; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/Selector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/Selector.java new file mode 100644 index 0000000000..c6772f3e03 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/Selector.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.host.assign; + +import java.util.Collection; + + +public interface Selector { + + T select(Collection source); +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java new file mode 100644 index 0000000000..a9c111d0c9 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.master.registry; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.dolphinscheduler.remote.utils.Constants; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * master registry + */ +public class MasterRegistry { + + private final Logger logger = LoggerFactory.getLogger(MasterRegistry.class); + + /** + * zookeeper registry center + */ + private final ZookeeperRegistryCenter zookeeperRegistryCenter; + + /** + * port + */ + private final int port; + + /** + * construct + * @param zookeeperRegistryCenter zookeeperRegistryCenter + * @param port port + */ + public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){ + this.zookeeperRegistryCenter = zookeeperRegistryCenter; + this.port = port; + } + + /** + * registry + */ + public void registry() { + String address = Constants.LOCAL_ADDRESS; + String localNodePath = getWorkerPath(); + zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, ""); + zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + if(newState == ConnectionState.LOST){ + logger.error("master : {} connection lost from zookeeper", address); + } else if(newState == ConnectionState.RECONNECTED){ + logger.info("master : {} reconnected to zookeeper", address); + zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, ""); + } else if(newState == ConnectionState.SUSPENDED){ + logger.warn("master : {} connection SUSPENDED ", address); + } + } + }); + logger.info("master node : {} registry to ZK successfully.", address); + } + + /** + * remove registry info + */ + public void unRegistry() { + String address = getLocalAddress(); + String localNodePath = getWorkerPath(); + zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath); + logger.info("worker node : {} unRegistry to ZK.", address); + } + + /** + * get worker path + * @return + */ + private String getWorkerPath() { + String address = getLocalAddress(); + String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address; + return localNodePath; + } + + /** + * get local address + * @return + */ + private String getLocalAddress(){ + return Constants.LOCAL_ADDRESS + ":" + port; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index 09005a1f27..e7d998e3bf 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -28,7 +28,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; -import org.apache.dolphinscheduler.remote.command.TaskInfo; +import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.utils.Address; @@ -137,7 +137,7 @@ public class MasterBaseTaskExecThread implements Callable { FastJsonSerializer.serializeToString(convertToTaskInfo(destTaskInstance))); try { Command responseCommand = nettyRemotingClient.sendSync(address, - taskRequestCommand.convert2Command(), Integer.MAX_VALUE); + taskRequestCommand.convert2Command(), 2000); ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize( responseCommand.getBody(), ExecuteTaskAckCommand.class); @@ -156,6 +156,7 @@ public class MasterBaseTaskExecThread implements Callable { } + /** * set task instance relation * @@ -203,25 +204,25 @@ public class MasterBaseTaskExecThread implements Callable { * @param taskInstance taskInstance * @return taskInfo */ - private TaskInfo convertToTaskInfo(TaskInstance taskInstance){ - TaskInfo taskInfo = new TaskInfo(); - taskInfo.setTaskId(taskInstance.getId()); - taskInfo.setTaskName(taskInstance.getName()); - taskInfo.setStartTime(taskInstance.getStartTime()); - taskInfo.setTaskType(taskInstance.getTaskType()); - taskInfo.setExecutePath(getExecLocalPath(taskInstance)); - taskInfo.setTaskJson(taskInstance.getTaskJson()); - taskInfo.setProcessInstanceId(taskInstance.getProcessInstance().getId()); - taskInfo.setScheduleTime(taskInstance.getProcessInstance().getScheduleTime()); - taskInfo.setGlobalParams(taskInstance.getProcessInstance().getGlobalParams()); - taskInfo.setExecutorId(taskInstance.getProcessInstance().getExecutorId()); - taskInfo.setCmdTypeIfComplement(taskInstance.getProcessInstance().getCmdTypeIfComplement().getCode()); - taskInfo.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); - taskInfo.setQueue(taskInstance.getProcessInstance().getQueue()); - taskInfo.setProcessDefineId(taskInstance.getProcessDefine().getId()); - taskInfo.setProjectId(taskInstance.getProcessDefine().getProjectId()); - - return taskInfo; + private TaskExecutionContext convertToTaskInfo(TaskInstance taskInstance){ + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskId(taskInstance.getId()); + taskExecutionContext.setTaskName(taskInstance.getName()); + taskExecutionContext.setStartTime(taskInstance.getStartTime()); + taskExecutionContext.setTaskType(taskInstance.getTaskType()); + taskExecutionContext.setExecutePath(getExecLocalPath(taskInstance)); + taskExecutionContext.setTaskJson(taskInstance.getTaskJson()); + taskExecutionContext.setProcessInstanceId(taskInstance.getProcessInstance().getId()); + taskExecutionContext.setScheduleTime(taskInstance.getProcessInstance().getScheduleTime()); + taskExecutionContext.setGlobalParams(taskInstance.getProcessInstance().getGlobalParams()); + taskExecutionContext.setExecutorId(taskInstance.getProcessInstance().getExecutorId()); + taskExecutionContext.setCmdTypeIfComplement(taskInstance.getProcessInstance().getCmdTypeIfComplement().getCode()); + taskExecutionContext.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); + taskExecutionContext.setQueue(taskInstance.getProcessInstance().getQueue()); + taskExecutionContext.setProcessDefineId(taskInstance.getProcessDefine().getId()); + taskExecutionContext.setProjectId(taskInstance.getProcessDefine().getProjectId()); + + return taskExecutionContext; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java new file mode 100644 index 0000000000..e3eacafa84 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.registry; + +import org.apache.curator.framework.CuratorFramework; + +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.dolphinscheduler.service.zk.AbstractListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +@Service +public abstract class ZookeeperNodeManager implements InitializingBean { + + private final Logger logger = LoggerFactory.getLogger(ZookeeperNodeManager.class); + + private final Lock masterLock = new ReentrantLock(); + + private final Lock workerLock = new ReentrantLock(); + + private final Set workerNodes = new HashSet<>(); + + private final Set masterNodes = new HashSet<>(); + + @Autowired + private ZookeeperRegistryCenter registryCenter; + + @Override + public void afterPropertiesSet() throws Exception { + load(); + registryCenter.getZookeeperCachedOperator().addListener(new MasterNodeListener()); + registryCenter.getZookeeperCachedOperator().addListener(new WorkerNodeListener()); + } + + private void load(){ + Set schedulerNodes = registryCenter.getMasterNodesDirectly(); + syncMasterNodes(schedulerNodes); + Set workersNodes = registryCenter.getWorkerNodesDirectly(); + syncWorkerNodes(workersNodes); + } + + class WorkerNodeListener extends AbstractListener { + + @Override + protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { + if(registryCenter.isWorkerPath(path)){ + try { + if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) { + logger.info("worker node : {} added.", path); + Set previousNodes = new HashSet<>(workerNodes); + Set currentNodes = registryCenter.getWorkerNodesDirectly(); + syncWorkerNodes(currentNodes); + } else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) { + logger.info("worker node : {} down.", path); + Set previousNodes = new HashSet<>(workerNodes); + Set currentNodes = registryCenter.getWorkerNodesDirectly(); + syncWorkerNodes(currentNodes); + } + } catch (IllegalArgumentException ignore) { + logger.warn(ignore.getMessage()); + } catch (Exception ex) { + logger.error("WorkerListener capture data change and get data failed", ex); + } + } + } + } + + + class MasterNodeListener extends AbstractListener { + + @Override + protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { + if (registryCenter.isMasterPath(path)) { + try { + if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) { + logger.info("master node : {} added.", path); + Set previousNodes = new HashSet<>(masterNodes); + Set currentNodes = registryCenter.getMasterNodesDirectly(); + syncMasterNodes(currentNodes); + } else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) { + logger.info("master node : {} down.", path); + Set previousNodes = new HashSet<>(masterNodes); + Set currentNodes = registryCenter.getMasterNodesDirectly(); + syncMasterNodes(currentNodes); + } + } catch (Exception ex) { + logger.error("MasterNodeListener capture data change and get data failed.", ex); + } + } + } + } + + public Set getMasterNodes() { + masterLock.lock(); + try { + return Collections.unmodifiableSet(masterNodes); + } finally { + masterLock.unlock(); + } + } + + private void syncMasterNodes(Set nodes){ + masterLock.lock(); + try { + masterNodes.clear(); + masterNodes.addAll(nodes); + } finally { + masterLock.unlock(); + } + } + + private void syncWorkerNodes(Set nodes){ + workerLock.lock(); + try { + workerNodes.clear(); + workerNodes.addAll(nodes); + } finally { + workerLock.unlock(); + } + } + + public Set getWorkerNodes(){ + workerLock.lock(); + try { + return Collections.unmodifiableSet(workerNodes); + } finally { + workerLock.unlock(); + } + } + + public void close(){ + registryCenter.close(); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java index 68c19ea4eb..96b8424b55 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java @@ -22,6 +22,9 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @Service @@ -76,6 +79,28 @@ public class ZookeeperRegistryCenter implements InitializingBean { return WORKER_PATH; } + public Set getMasterNodesDirectly() { + List masters = getChildrenKeys(MASTER_PATH); + return new HashSet<>(masters); + } + + public Set getWorkerNodesDirectly() { + List workers = getChildrenKeys(WORKER_PATH); + return new HashSet<>(workers); + } + + public boolean isWorkerPath(String path) { + return path != null && path.contains(WORKER_PATH); + } + + public boolean isMasterPath(String path) { + return path != null && path.contains(MASTER_PATH); + } + + public List getChildrenKeys(final String key) { + return zookeeperCachedOperator.getChildrenKeys(key); + } + public ZookeeperCachedOperator getZookeeperCachedOperator() { return zookeeperCachedOperator; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java index ba2149492b..cbbb5cee38 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java @@ -19,18 +19,13 @@ package org.apache.dolphinscheduler.server.worker.processor; import com.alibaba.fastjson.JSONObject; import io.netty.channel.Channel; -import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; -import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.Preconditions; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; -import org.apache.dolphinscheduler.remote.command.TaskInfo; +import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -40,7 +35,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Date; import java.util.concurrent.ExecutorService; /** @@ -87,23 +81,23 @@ public class WorkerRequestProcessor implements NettyRequestProcessor { String taskInstanceJson = taskRequestCommand.getTaskInfoJson(); - TaskInfo taskInfo = JSONObject.parseObject(taskInstanceJson, TaskInfo.class); + TaskExecutionContext taskExecutionContext = JSONObject.parseObject(taskInstanceJson, TaskExecutionContext.class); // local execute path - String execLocalPath = getExecLocalPath(taskInfo); + String execLocalPath = getExecLocalPath(taskExecutionContext); logger.info("task instance local execute path : {} ", execLocalPath); try { - FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskInfo.getTenantCode()); + FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode()); } catch (Exception ex){ logger.error(String.format("create execLocalPath : %s", execLocalPath), ex); } - taskCallbackService.addCallbackChannel(taskInfo.getTaskId(), + taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskId(), new CallbackChannel(channel, command.getOpaque())); // submit task - workerExecService.submit(new TaskScheduleThread(taskInfo, + workerExecService.submit(new TaskScheduleThread(taskExecutionContext, processService, taskCallbackService)); } @@ -111,13 +105,13 @@ public class WorkerRequestProcessor implements NettyRequestProcessor { /** * get execute local path * - * @param taskInfo taskInfo + * @param taskExecutionContext taskExecutionContext * @return execute local path */ - private String getExecLocalPath(TaskInfo taskInfo){ - return FileUtils.getProcessExecDir(taskInfo.getProjectId(), - taskInfo.getProcessDefineId(), - taskInfo.getProcessInstanceId(), - taskInfo.getTaskId()); + private String getExecLocalPath(TaskExecutionContext taskExecutionContext){ + return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(), + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskId()); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index fc81638705..a0f4e664b5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -71,7 +71,7 @@ public class WorkerRegistry { } } }); - logger.info("scheduler node : {} registry to ZK successfully.", address); + logger.info("worker node : {} registry to ZK successfully.", address); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index 04ee565871..53a25b1016 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -31,11 +31,9 @@ import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.*; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; -import org.apache.dolphinscheduler.remote.command.TaskInfo; +import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; @@ -63,7 +61,7 @@ public class TaskScheduleThread implements Runnable { /** * task instance */ - private TaskInfo taskInfo; + private TaskExecutionContext taskExecutionContext; /** * process service @@ -83,67 +81,67 @@ public class TaskScheduleThread implements Runnable { /** * constructor * - * @param taskInfo taskInfo + * @param taskExecutionContext taskExecutionContext * @param processService processService * @param taskInstanceCallbackService taskInstanceCallbackService */ - public TaskScheduleThread(TaskInfo taskInfo, ProcessService processService, TaskCallbackService taskInstanceCallbackService){ + public TaskScheduleThread(TaskExecutionContext taskExecutionContext, ProcessService processService, TaskCallbackService taskInstanceCallbackService){ this.processService = processService; - this.taskInfo = taskInfo; + this.taskExecutionContext = taskExecutionContext; this.taskInstanceCallbackService = taskInstanceCallbackService; } @Override public void run() { - ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId()); + ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskId()); try { // tell master that task is in executing - ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInfo.getTaskType()); - taskInstanceCallbackService.sendAck(taskInfo.getTaskId(), ackCommand); + ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext.getTaskType()); + taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskId(), ackCommand); - logger.info("script path : {}", taskInfo.getExecutePath()); + logger.info("script path : {}", taskExecutionContext.getExecutePath()); // task node - TaskNode taskNode = JSONObject.parseObject(taskInfo.getTaskJson(), TaskNode.class); + TaskNode taskNode = JSONObject.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class); // get resource files List resourceFiles = createProjectResFiles(taskNode); // copy hdfs/minio file to local downloadResource( - taskInfo.getExecutePath(), + taskExecutionContext.getExecutePath(), resourceFiles, logger); // set task props TaskProps taskProps = new TaskProps(taskNode.getParams(), - taskInfo.getExecutePath(), - taskInfo.getScheduleTime(), - taskInfo.getTaskName(), - taskInfo.getTaskType(), - taskInfo.getTaskId(), + taskExecutionContext.getExecutePath(), + taskExecutionContext.getScheduleTime(), + taskExecutionContext.getTaskName(), + taskExecutionContext.getTaskType(), + taskExecutionContext.getTaskId(), CommonUtils.getSystemEnvPath(), - taskInfo.getTenantCode(), - taskInfo.getQueue(), - taskInfo.getStartTime(), + taskExecutionContext.getTenantCode(), + taskExecutionContext.getQueue(), + taskExecutionContext.getStartTime(), getGlobalParamsMap(), null, - CommandType.of(taskInfo.getCmdTypeIfComplement())); + CommandType.of(taskExecutionContext.getCmdTypeIfComplement())); // set task timeout setTaskTimeout(taskProps, taskNode); taskProps.setTaskAppId(String.format("%s_%s_%s", - taskInfo.getProcessDefineId(), - taskInfo.getProcessInstanceId(), - taskInfo.getTaskId())); + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskId())); // custom logger Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, - taskInfo.getProcessDefineId(), - taskInfo.getProcessInstanceId(), - taskInfo.getTaskId())); + taskExecutionContext.getProcessDefineId(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskId())); - task = TaskManager.newTask(taskInfo.getTaskType(), + task = TaskManager.newTask(taskExecutionContext.getTaskType(), taskProps, taskLogger); @@ -159,14 +157,14 @@ public class TaskScheduleThread implements Runnable { // responseCommand.setStatus(task.getExitStatus().getCode()); responseCommand.setEndTime(new Date()); - logger.info("task instance id : {},task final status : {}", taskInfo.getTaskId(), task.getExitStatus()); + logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskId(), task.getExitStatus()); }catch (Exception e){ logger.error("task scheduler failure", e); kill(); responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); responseCommand.setEndTime(new Date()); } finally { - taskInstanceCallbackService.sendResult(taskInfo.getTaskId(), responseCommand); + taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskId(), responseCommand); } } @@ -178,7 +176,7 @@ public class TaskScheduleThread implements Runnable { Map globalParamsMap = new HashMap<>(16); // global params string - String globalParamsStr = taskInfo.getGlobalParams(); + String globalParamsStr = taskExecutionContext.getGlobalParams(); if (globalParamsStr != null) { List globalParamsList = JSONObject.parseArray(globalParamsStr, Property.class); @@ -199,7 +197,7 @@ public class TaskScheduleThread implements Runnable { if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){ ackCommand.setExecutePath(null); }else{ - ackCommand.setExecutePath(taskInfo.getExecutePath()); + ackCommand.setExecutePath(taskExecutionContext.getExecutePath()); } return ackCommand; } @@ -215,15 +213,15 @@ public class TaskScheduleThread implements Runnable { .getDiscriminator()).getLogBase(); if (baseLog.startsWith(Constants.SINGLE_SLASH)){ return baseLog + Constants.SINGLE_SLASH + - taskInfo.getProcessDefineId() + Constants.SINGLE_SLASH + - taskInfo.getProcessInstanceId() + Constants.SINGLE_SLASH + - taskInfo.getTaskId() + ".log"; + taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH + + taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH + + taskExecutionContext.getTaskId() + ".log"; } return System.getProperty("user.dir") + Constants.SINGLE_SLASH + baseLog + Constants.SINGLE_SLASH + - taskInfo.getProcessDefineId() + Constants.SINGLE_SLASH + - taskInfo.getProcessInstanceId() + Constants.SINGLE_SLASH + - taskInfo.getTaskId() + ".log"; + taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH + + taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH + + taskExecutionContext.getTaskId() + ".log"; } /** @@ -329,7 +327,7 @@ public class TaskScheduleThread implements Runnable { * @throws Exception exception */ private void checkDownloadPermission(List projectRes) throws Exception { - int userId = taskInfo.getExecutorId(); + int userId = taskExecutionContext.getExecutorId(); String[] resNames = projectRes.toArray(new String[projectRes.size()]); PermissionCheck permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger); permissionCheck.checkPermission(); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java new file mode 100644 index 0000000000..3e3e6c8c20 --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.zk; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; + +public abstract class AbstractListener implements TreeCacheListener { + + @Override + public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception { + String path = null == event.getData() ? "" : event.getData().getPath(); + if (path.isEmpty()) { + return; + } + dataChanged(client, event, path); + } + + protected abstract void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path); +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java index dccb768f8b..6c38a68f3e 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java @@ -20,6 +20,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -32,7 +33,7 @@ public class ZookeeperCachedOperator extends ZookeeperOperator { private final Logger logger = LoggerFactory.getLogger(ZookeeperCachedOperator.class); - TreeCache treeCache; + private TreeCache treeCache; /** * register a unified listener of /${dsRoot}, */ @@ -72,6 +73,10 @@ public class ZookeeperCachedOperator extends ZookeeperOperator { return treeCache; } + public void addListener(TreeCacheListener listener){ + this.treeCache.getListenable().addListener(listener); + } + @Override public void close() { treeCache.close();