Browse Source

Refactor worker (#1993)

* Refactor worker (#1)

* add TaskResponseProcessor (#1983)

* 1, master persistent task 2. extract  master and worker communication model (#1992)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>
pull/2/head
Tboy 5 years ago committed by GitHub
parent
commit
3800a2a99b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 250
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java
  2. 95
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/Host.java
  3. 27
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java
  4. 57
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/RoundRobinHostManager.java
  5. 45
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java
  6. 40
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RoundRobinSelector.java
  7. 26
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/Selector.java
  8. 104
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
  9. 43
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  10. 159
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
  11. 25
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
  12. 32
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
  13. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
  14. 80
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
  15. 36
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java
  16. 7
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java

250
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/entity/TaskExecutionContext.java

@ -0,0 +1,250 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.entity;
import java.io.Serializable;
import java.util.Date;
/**
* master/worker task transport
*/
public class TaskExecutionContext implements Serializable{
/**
* task instance id
*/
private Integer taskId;
/**
* taks name
*/
private String taskName;
/**
* task start time
*/
private Date startTime;
/**
* task type
*/
private String taskType;
/**
* task execute path
*/
private String executePath;
/**
* task json
*/
private String taskJson;
/**
* process instance id
*/
private Integer processInstanceId;
/**
* process instance schedule time
*/
private Date scheduleTime;
/**
* process instance global parameters
*/
private String globalParams;
/**
* execute user id
*/
private Integer executorId;
/**
* command type if complement
*/
private Integer cmdTypeIfComplement;
/**
* tenant code
*/
private String tenantCode;
/**
* task queue
*/
private String queue;
/**
* process define id
*/
private Integer processDefineId;
/**
* project id
*/
private Integer projectId;
public Integer getTaskId() {
return taskId;
}
public void setTaskId(Integer taskId) {
this.taskId = taskId;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public String getTaskType() {
return taskType;
}
public void setTaskType(String taskType) {
this.taskType = taskType;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
public String getTaskJson() {
return taskJson;
}
public void setTaskJson(String taskJson) {
this.taskJson = taskJson;
}
public Integer getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(Integer processInstanceId) {
this.processInstanceId = processInstanceId;
}
public Date getScheduleTime() {
return scheduleTime;
}
public void setScheduleTime(Date scheduleTime) {
this.scheduleTime = scheduleTime;
}
public String getGlobalParams() {
return globalParams;
}
public void setGlobalParams(String globalParams) {
this.globalParams = globalParams;
}
public String getTenantCode() {
return tenantCode;
}
public void setTenantCode(String tenantCode) {
this.tenantCode = tenantCode;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public Integer getProcessDefineId() {
return processDefineId;
}
public void setProcessDefineId(Integer processDefineId) {
this.processDefineId = processDefineId;
}
public Integer getProjectId() {
return projectId;
}
public void setProjectId(Integer projectId) {
this.projectId = projectId;
}
public Integer getExecutorId() {
return executorId;
}
public void setExecutorId(Integer executorId) {
this.executorId = executorId;
}
public Integer getCmdTypeIfComplement() {
return cmdTypeIfComplement;
}
public void setCmdTypeIfComplement(Integer cmdTypeIfComplement) {
this.cmdTypeIfComplement = cmdTypeIfComplement;
}
@Override
public String toString() {
return "TaskExecutionContext{" +
"taskId=" + taskId +
", taskName='" + taskName + '\'' +
", startTime=" + startTime +
", taskType='" + taskType + '\'' +
", executePath='" + executePath + '\'' +
", taskJson='" + taskJson + '\'' +
", processInstanceId=" + processInstanceId +
", scheduleTime=" + scheduleTime +
", globalParams='" + globalParams + '\'' +
", executorId=" + executorId +
", cmdTypeIfComplement=" + cmdTypeIfComplement +
", tenantCode='" + tenantCode + '\'' +
", queue='" + queue + '\'' +
", processDefineId=" + processDefineId +
", projectId=" + projectId +
'}';
}
}

95
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/Host.java

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.host;
import java.util.Objects;
public class Host {
private String address;
private String ip;
private int port;
public Host() {
}
public Host(String ip, int port) {
this.ip = ip;
this.port = port;
this.address = ip + ":" + port;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
this.address = ip + ":" + port;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
this.address = ip + ":" + port;
}
public static Host of(String address){
String[] parts = address.split(":");
if (parts.length != 2) {
throw new IllegalArgumentException(String.format("Address : %s illegal.", address));
}
Host host = new Host(parts[0], Integer.parseInt(parts[1]));
return host;
}
@Override
public String toString() {
return "Host{" +
"address='" + address + '\'' +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Host host = (Host) o;
return Objects.equals(getAddress(), host.getAddress());
}
@Override
public int hashCode() {
return Objects.hash(getAddress());
}
}

27
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/HostManager.java

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.host;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
public interface HostManager {
Host select(TaskExecutionContext context);
}

57
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/RoundRobinHostManager.java

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.host;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.host.assign.RoundRobinSelector;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@Service
public class RoundRobinHostManager implements HostManager {
private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class);
@Autowired
private RoundRobinSelector<Host> selector;
@Autowired
private ZookeeperNodeManager zookeeperNodeManager;
@Override
public Host select(TaskExecutionContext context){
Host host = new Host();
Collection<String> nodes = zookeeperNodeManager.getWorkerNodes();
if(CollectionUtils.isEmpty(nodes)){
return host;
}
List<Host> candidateHosts = new ArrayList<>(nodes.size());
nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
return selector.select(candidateHosts);
}
}

45
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RandomSelector.java

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.host.assign;
import java.util.Collection;
import java.util.Random;
public class RandomSelector<T> implements Selector<T> {
private final Random random = new Random();
@Override
public T select(final Collection<T> source) {
if (source == null || source.size() == 0) {
throw new IllegalArgumentException("Empty source.");
}
if (source.size() == 1) {
return (T) source.toArray()[0];
}
int size = source.size();
int randomIndex = random.nextInt(size);
return (T) source.toArray()[randomIndex];
}
}

40
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/RoundRobinSelector.java

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.host.assign;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
public class RoundRobinSelector<T> implements Selector<T> {
private final AtomicInteger index = new AtomicInteger(0);
@Override
public T select(Collection<T> source) {
if (source == null || source.size() == 0) {
throw new IllegalArgumentException("Empty source.");
}
if (source.size() == 1) {
return (T)source.toArray()[0];
}
int size = source.size();
return (T) source.toArray()[index.getAndIncrement() % size];
}
}

26
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/host/assign/Selector.java

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.host.assign;
import java.util.Collection;
public interface Selector<T> {
T select(Collection<T> source);
}

104
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* master registry
*/
public class MasterRegistry {
private final Logger logger = LoggerFactory.getLogger(MasterRegistry.class);
/**
* zookeeper registry center
*/
private final ZookeeperRegistryCenter zookeeperRegistryCenter;
/**
* port
*/
private final int port;
/**
* construct
* @param zookeeperRegistryCenter zookeeperRegistryCenter
* @param port port
*/
public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
this.port = port;
}
/**
* registry
*/
public void registry() {
String address = Constants.LOCAL_ADDRESS;
String localNodePath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if(newState == ConnectionState.LOST){
logger.error("master : {} connection lost from zookeeper", address);
} else if(newState == ConnectionState.RECONNECTED){
logger.info("master : {} reconnected to zookeeper", address);
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
} else if(newState == ConnectionState.SUSPENDED){
logger.warn("master : {} connection SUSPENDED ", address);
}
}
});
logger.info("master node : {} registry to ZK successfully.", address);
}
/**
* remove registry info
*/
public void unRegistry() {
String address = getLocalAddress();
String localNodePath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
logger.info("worker node : {} unRegistry to ZK.", address);
}
/**
* get worker path
* @return
*/
private String getWorkerPath() {
String address = getLocalAddress();
String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address;
return localNodePath;
}
/**
* get local address
* @return
*/
private String getLocalAddress(){
return Constants.LOCAL_ADDRESS + ":" + port;
}
}

43
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -28,7 +28,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskInfo; import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Address; import org.apache.dolphinscheduler.remote.utils.Address;
@ -137,7 +137,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
FastJsonSerializer.serializeToString(convertToTaskInfo(destTaskInstance))); FastJsonSerializer.serializeToString(convertToTaskInfo(destTaskInstance)));
try { try {
Command responseCommand = nettyRemotingClient.sendSync(address, Command responseCommand = nettyRemotingClient.sendSync(address,
taskRequestCommand.convert2Command(), Integer.MAX_VALUE); taskRequestCommand.convert2Command(), 2000);
ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize( ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(
responseCommand.getBody(), ExecuteTaskAckCommand.class); responseCommand.getBody(), ExecuteTaskAckCommand.class);
@ -155,7 +155,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
} }
} }
/** /**
* set task instance relation * set task instance relation
* *
@ -203,25 +202,25 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
* @param taskInstance taskInstance * @param taskInstance taskInstance
* @return taskInfo * @return taskInfo
*/ */
private TaskInfo convertToTaskInfo(TaskInstance taskInstance){ private TaskExecutionContext convertToTaskInfo(TaskInstance taskInstance){
TaskInfo taskInfo = new TaskInfo(); TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskInfo.setTaskId(taskInstance.getId()); taskExecutionContext.setTaskId(taskInstance.getId());
taskInfo.setTaskName(taskInstance.getName()); taskExecutionContext.setTaskName(taskInstance.getName());
taskInfo.setStartTime(taskInstance.getStartTime()); taskExecutionContext.setStartTime(taskInstance.getStartTime());
taskInfo.setTaskType(taskInstance.getTaskType()); taskExecutionContext.setTaskType(taskInstance.getTaskType());
taskInfo.setExecutePath(getExecLocalPath(taskInstance)); taskExecutionContext.setExecutePath(getExecLocalPath(taskInstance));
taskInfo.setTaskJson(taskInstance.getTaskJson()); taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
taskInfo.setProcessInstanceId(taskInstance.getProcessInstance().getId()); taskExecutionContext.setProcessInstanceId(taskInstance.getProcessInstance().getId());
taskInfo.setScheduleTime(taskInstance.getProcessInstance().getScheduleTime()); taskExecutionContext.setScheduleTime(taskInstance.getProcessInstance().getScheduleTime());
taskInfo.setGlobalParams(taskInstance.getProcessInstance().getGlobalParams()); taskExecutionContext.setGlobalParams(taskInstance.getProcessInstance().getGlobalParams());
taskInfo.setExecutorId(taskInstance.getProcessInstance().getExecutorId()); taskExecutionContext.setExecutorId(taskInstance.getProcessInstance().getExecutorId());
taskInfo.setCmdTypeIfComplement(taskInstance.getProcessInstance().getCmdTypeIfComplement().getCode()); taskExecutionContext.setCmdTypeIfComplement(taskInstance.getProcessInstance().getCmdTypeIfComplement().getCode());
taskInfo.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); taskExecutionContext.setTenantCode(taskInstance.getProcessInstance().getTenantCode());
taskInfo.setQueue(taskInstance.getProcessInstance().getQueue()); taskExecutionContext.setQueue(taskInstance.getProcessInstance().getQueue());
taskInfo.setProcessDefineId(taskInstance.getProcessDefine().getId()); taskExecutionContext.setProcessDefineId(taskInstance.getProcessDefine().getId());
taskInfo.setProjectId(taskInstance.getProcessDefine().getProjectId()); taskExecutionContext.setProjectId(taskInstance.getProcessDefine().getProjectId());
return taskInfo; return taskExecutionContext;
} }

159
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java

@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.registry;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.dolphinscheduler.service.zk.AbstractListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Service
public abstract class ZookeeperNodeManager implements InitializingBean {
private final Logger logger = LoggerFactory.getLogger(ZookeeperNodeManager.class);
private final Lock masterLock = new ReentrantLock();
private final Lock workerLock = new ReentrantLock();
private final Set<String> workerNodes = new HashSet<>();
private final Set<String> masterNodes = new HashSet<>();
@Autowired
private ZookeeperRegistryCenter registryCenter;
@Override
public void afterPropertiesSet() throws Exception {
load();
registryCenter.getZookeeperCachedOperator().addListener(new MasterNodeListener());
registryCenter.getZookeeperCachedOperator().addListener(new WorkerNodeListener());
}
private void load(){
Set<String> schedulerNodes = registryCenter.getMasterNodesDirectly();
syncMasterNodes(schedulerNodes);
Set<String> workersNodes = registryCenter.getWorkerNodesDirectly();
syncWorkerNodes(workersNodes);
}
class WorkerNodeListener extends AbstractListener {
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
if(registryCenter.isWorkerPath(path)){
try {
if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) {
logger.info("worker node : {} added.", path);
Set<String> previousNodes = new HashSet<>(workerNodes);
Set<String> currentNodes = registryCenter.getWorkerNodesDirectly();
syncWorkerNodes(currentNodes);
} else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) {
logger.info("worker node : {} down.", path);
Set<String> previousNodes = new HashSet<>(workerNodes);
Set<String> currentNodes = registryCenter.getWorkerNodesDirectly();
syncWorkerNodes(currentNodes);
}
} catch (IllegalArgumentException ignore) {
logger.warn(ignore.getMessage());
} catch (Exception ex) {
logger.error("WorkerListener capture data change and get data failed", ex);
}
}
}
}
class MasterNodeListener extends AbstractListener {
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
if (registryCenter.isMasterPath(path)) {
try {
if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) {
logger.info("master node : {} added.", path);
Set<String> previousNodes = new HashSet<>(masterNodes);
Set<String> currentNodes = registryCenter.getMasterNodesDirectly();
syncMasterNodes(currentNodes);
} else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) {
logger.info("master node : {} down.", path);
Set<String> previousNodes = new HashSet<>(masterNodes);
Set<String> currentNodes = registryCenter.getMasterNodesDirectly();
syncMasterNodes(currentNodes);
}
} catch (Exception ex) {
logger.error("MasterNodeListener capture data change and get data failed.", ex);
}
}
}
}
public Set<String> getMasterNodes() {
masterLock.lock();
try {
return Collections.unmodifiableSet(masterNodes);
} finally {
masterLock.unlock();
}
}
private void syncMasterNodes(Set<String> nodes){
masterLock.lock();
try {
masterNodes.clear();
masterNodes.addAll(nodes);
} finally {
masterLock.unlock();
}
}
private void syncWorkerNodes(Set<String> nodes){
workerLock.lock();
try {
workerNodes.clear();
workerNodes.addAll(nodes);
} finally {
workerLock.unlock();
}
}
public Set<String> getWorkerNodes(){
workerLock.lock();
try {
return Collections.unmodifiableSet(workerNodes);
} finally {
workerLock.unlock();
}
}
public void close(){
registryCenter.close();
}
}

25
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java

@ -22,6 +22,9 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@Service @Service
@ -76,6 +79,28 @@ public class ZookeeperRegistryCenter implements InitializingBean {
return WORKER_PATH; return WORKER_PATH;
} }
public Set<String> getMasterNodesDirectly() {
List<String> masters = getChildrenKeys(MASTER_PATH);
return new HashSet<>(masters);
}
public Set<String> getWorkerNodesDirectly() {
List<String> workers = getChildrenKeys(WORKER_PATH);
return new HashSet<>(workers);
}
public boolean isWorkerPath(String path) {
return path != null && path.contains(WORKER_PATH);
}
public boolean isMasterPath(String path) {
return path != null && path.contains(MASTER_PATH);
}
public List<String> getChildrenKeys(final String key) {
return zookeeperCachedOperator.getChildrenKeys(key);
}
public ZookeeperCachedOperator getZookeeperCachedOperator() { public ZookeeperCachedOperator getZookeeperCachedOperator() {
return zookeeperCachedOperator; return zookeeperCachedOperator;
} }

32
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java

@ -19,18 +19,13 @@ package org.apache.dolphinscheduler.server.worker.processor;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskInfo; import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@ -40,7 +35,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
/** /**
@ -87,37 +81,35 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
String taskInstanceJson = taskRequestCommand.getTaskInfoJson(); String taskInstanceJson = taskRequestCommand.getTaskInfoJson();
TaskInfo taskInfo = JSONObject.parseObject(taskInstanceJson, TaskInfo.class); TaskExecutionContext taskExecutionContext = JSONObject.parseObject(taskInstanceJson, TaskExecutionContext.class);
// local execute path // local execute path
String execLocalPath = getExecLocalPath(taskInfo); String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {} ", execLocalPath); logger.info("task instance local execute path : {} ", execLocalPath);
try { try {
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskInfo.getTenantCode()); FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode());
} catch (Exception ex){ } catch (Exception ex){
logger.error(String.format("create execLocalPath : %s", execLocalPath), ex); logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
} }
taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskId(),
taskCallbackService.addCallbackChannel(taskInfo.getTaskId(),
new CallbackChannel(channel, command.getOpaque())); new CallbackChannel(channel, command.getOpaque()));
// submit task // submit task
workerExecService.submit(new TaskScheduleThread(taskInfo, workerExecService.submit(new TaskScheduleThread(taskExecutionContext,
processService, taskCallbackService)); processService, taskCallbackService));
} }
/** /**
* get execute local path * get execute local path
* * @param taskExecutionContext taskExecutionContext
* @param taskInfo taskInfo
* @return execute local path * @return execute local path
*/ */
private String getExecLocalPath(TaskInfo taskInfo){ private String getExecLocalPath(TaskExecutionContext taskExecutionContext){
return FileUtils.getProcessExecDir(taskInfo.getProjectId(), return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(),
taskInfo.getProcessDefineId(), taskExecutionContext.getProcessDefineId(),
taskInfo.getProcessInstanceId(), taskExecutionContext.getProcessInstanceId(),
taskInfo.getTaskId()); taskExecutionContext.getTaskId());
} }
} }

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -71,7 +71,7 @@ public class WorkerRegistry {
} }
} }
}); });
logger.info("scheduler node : {} registry to ZK successfully.", address); logger.info("worker node : {} registry to ZK successfully.", address);
} }
/** /**

80
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

@ -31,11 +31,9 @@ import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskInfo; import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.server.worker.task.TaskManager;
@ -63,7 +61,7 @@ public class TaskScheduleThread implements Runnable {
/** /**
* task instance * task instance
*/ */
private TaskInfo taskInfo; private TaskExecutionContext taskExecutionContext;
/** /**
* process service * process service
@ -82,68 +80,67 @@ public class TaskScheduleThread implements Runnable {
/** /**
* constructor * constructor
* * @param taskExecutionContext taskExecutionContext
* @param taskInfo taskInfo
* @param processService processService * @param processService processService
* @param taskInstanceCallbackService taskInstanceCallbackService * @param taskInstanceCallbackService taskInstanceCallbackService
*/ */
public TaskScheduleThread(TaskInfo taskInfo, ProcessService processService, TaskCallbackService taskInstanceCallbackService){ public TaskScheduleThread(TaskExecutionContext taskExecutionContext, ProcessService processService, TaskCallbackService taskInstanceCallbackService){
this.processService = processService; this.processService = processService;
this.taskInfo = taskInfo; this.taskExecutionContext = taskExecutionContext;
this.taskInstanceCallbackService = taskInstanceCallbackService; this.taskInstanceCallbackService = taskInstanceCallbackService;
} }
@Override @Override
public void run() { public void run() {
ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId()); ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskId());
try { try {
// tell master that task is in executing // tell master that task is in executing
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInfo.getTaskType()); ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext.getTaskType());
taskInstanceCallbackService.sendAck(taskInfo.getTaskId(), ackCommand); taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskId(), ackCommand);
logger.info("script path : {}", taskInfo.getExecutePath()); logger.info("script path : {}", taskExecutionContext.getExecutePath());
// task node // task node
TaskNode taskNode = JSONObject.parseObject(taskInfo.getTaskJson(), TaskNode.class); TaskNode taskNode = JSONObject.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
// get resource files // get resource files
List<String> resourceFiles = createProjectResFiles(taskNode); List<String> resourceFiles = createProjectResFiles(taskNode);
// copy hdfs/minio file to local // copy hdfs/minio file to local
downloadResource( downloadResource(
taskInfo.getExecutePath(), taskExecutionContext.getExecutePath(),
resourceFiles, resourceFiles,
logger); logger);
// set task props // set task props
TaskProps taskProps = new TaskProps(taskNode.getParams(), TaskProps taskProps = new TaskProps(taskNode.getParams(),
taskInfo.getExecutePath(), taskExecutionContext.getExecutePath(),
taskInfo.getScheduleTime(), taskExecutionContext.getScheduleTime(),
taskInfo.getTaskName(), taskExecutionContext.getTaskName(),
taskInfo.getTaskType(), taskExecutionContext.getTaskType(),
taskInfo.getTaskId(), taskExecutionContext.getTaskId(),
CommonUtils.getSystemEnvPath(), CommonUtils.getSystemEnvPath(),
taskInfo.getTenantCode(), taskExecutionContext.getTenantCode(),
taskInfo.getQueue(), taskExecutionContext.getQueue(),
taskInfo.getStartTime(), taskExecutionContext.getStartTime(),
getGlobalParamsMap(), getGlobalParamsMap(),
null, null,
CommandType.of(taskInfo.getCmdTypeIfComplement())); CommandType.of(taskExecutionContext.getCmdTypeIfComplement()));
// set task timeout // set task timeout
setTaskTimeout(taskProps, taskNode); setTaskTimeout(taskProps, taskNode);
taskProps.setTaskAppId(String.format("%s_%s_%s", taskProps.setTaskAppId(String.format("%s_%s_%s",
taskInfo.getProcessDefineId(), taskExecutionContext.getProcessDefineId(),
taskInfo.getProcessInstanceId(), taskExecutionContext.getProcessInstanceId(),
taskInfo.getTaskId())); taskExecutionContext.getTaskId()));
// custom logger // custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInfo.getProcessDefineId(), taskExecutionContext.getProcessDefineId(),
taskInfo.getProcessInstanceId(), taskExecutionContext.getProcessInstanceId(),
taskInfo.getTaskId())); taskExecutionContext.getTaskId()));
task = TaskManager.newTask(taskInfo.getTaskType(), task = TaskManager.newTask(taskExecutionContext.getTaskType(),
taskProps, taskProps,
taskLogger); taskLogger);
@ -159,14 +156,14 @@ public class TaskScheduleThread implements Runnable {
// //
responseCommand.setStatus(task.getExitStatus().getCode()); responseCommand.setStatus(task.getExitStatus().getCode());
responseCommand.setEndTime(new Date()); responseCommand.setEndTime(new Date());
logger.info("task instance id : {},task final status : {}", taskInfo.getTaskId(), task.getExitStatus()); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskId(), task.getExitStatus());
}catch (Exception e){ }catch (Exception e){
logger.error("task scheduler failure", e); logger.error("task scheduler failure", e);
kill(); kill();
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
responseCommand.setEndTime(new Date()); responseCommand.setEndTime(new Date());
} finally { } finally {
taskInstanceCallbackService.sendResult(taskInfo.getTaskId(), responseCommand); taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskId(), responseCommand);
} }
} }
@ -178,8 +175,7 @@ public class TaskScheduleThread implements Runnable {
Map<String,String> globalParamsMap = new HashMap<>(16); Map<String,String> globalParamsMap = new HashMap<>(16);
// global params string // global params string
String globalParamsStr = taskInfo.getGlobalParams(); String globalParamsStr = taskExecutionContext.getGlobalParams();
if (globalParamsStr != null) { if (globalParamsStr != null) {
List<Property> globalParamsList = JSONObject.parseArray(globalParamsStr, Property.class); List<Property> globalParamsList = JSONObject.parseArray(globalParamsStr, Property.class);
globalParamsMap.putAll(globalParamsList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue))); globalParamsMap.putAll(globalParamsList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)));
@ -199,7 +195,7 @@ public class TaskScheduleThread implements Runnable {
if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){ if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){
ackCommand.setExecutePath(null); ackCommand.setExecutePath(null);
}else{ }else{
ackCommand.setExecutePath(taskInfo.getExecutePath()); ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
} }
return ackCommand; return ackCommand;
} }
@ -215,15 +211,15 @@ public class TaskScheduleThread implements Runnable {
.getDiscriminator()).getLogBase(); .getDiscriminator()).getLogBase();
if (baseLog.startsWith(Constants.SINGLE_SLASH)){ if (baseLog.startsWith(Constants.SINGLE_SLASH)){
return baseLog + Constants.SINGLE_SLASH + return baseLog + Constants.SINGLE_SLASH +
taskInfo.getProcessDefineId() + Constants.SINGLE_SLASH + taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskInfo.getProcessInstanceId() + Constants.SINGLE_SLASH + taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskInfo.getTaskId() + ".log"; taskExecutionContext.getTaskId() + ".log";
} }
return System.getProperty("user.dir") + Constants.SINGLE_SLASH + return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
baseLog + Constants.SINGLE_SLASH + baseLog + Constants.SINGLE_SLASH +
taskInfo.getProcessDefineId() + Constants.SINGLE_SLASH + taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskInfo.getProcessInstanceId() + Constants.SINGLE_SLASH + taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskInfo.getTaskId() + ".log"; taskExecutionContext.getTaskId() + ".log";
} }
/** /**
@ -329,7 +325,7 @@ public class TaskScheduleThread implements Runnable {
* @throws Exception exception * @throws Exception exception
*/ */
private void checkDownloadPermission(List<String> projectRes) throws Exception { private void checkDownloadPermission(List<String> projectRes) throws Exception {
int userId = taskInfo.getExecutorId(); int userId = taskExecutionContext.getExecutorId();
String[] resNames = projectRes.toArray(new String[projectRes.size()]); String[] resNames = projectRes.toArray(new String[projectRes.size()]);
PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger); PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger);
permissionCheck.checkPermission(); permissionCheck.checkPermission();

36
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
public abstract class AbstractListener implements TreeCacheListener {
@Override
public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
String path = null == event.getData() ? "" : event.getData().getPath();
if (path.isEmpty()) {
return;
}
dataChanged(client, event, path);
}
protected abstract void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path);
}

7
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java

@ -20,6 +20,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -32,7 +33,7 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
private final Logger logger = LoggerFactory.getLogger(ZookeeperCachedOperator.class); private final Logger logger = LoggerFactory.getLogger(ZookeeperCachedOperator.class);
TreeCache treeCache; private TreeCache treeCache;
/** /**
* register a unified listener of /${dsRoot}, * register a unified listener of /${dsRoot},
*/ */
@ -72,6 +73,10 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
return treeCache; return treeCache;
} }
public void addListener(TreeCacheListener listener){
this.treeCache.getListenable().addListener(listener);
}
@Override @Override
public void close() { public void close() {
treeCache.close(); treeCache.close();

Loading…
Cancel
Save