Browse Source

Refactor worker (#1975)

* updates

* move FetchTaskThread logic to WorkerNettyRequestProcessor

* add NettyRemotingClient to scheduler thread
pull/2/head
Tboy 5 years ago committed by GitHub
parent
commit
f9500c58b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
  2. 3
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
  3. 142
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/IPUtils.java
  4. 28
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  5. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
  6. 83
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
  7. 49
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  8. 106
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java
  9. 77
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.List; import java.util.concurrent.atomic.AtomicLong; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { /** * task id */ private String taskId; /** * attempt id */ private String attemptId; /** * application name */ private String applicationName; /** * group name */ private String groupName; /** * task name */ private String taskName; /** * connector port */ private int connectorPort; /** * description info */ private String description; /** * class name */ private String className; /** * method name */ private String methodName; /** * parameters */ private String params; /** * shard itemds */ private List<Integer> shardItems; public List<Integer> getShardItems() { return shardItems; } public void setShardItems(List<Integer> shardItems) { this.shardItems = shardItems; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public String getApplicationName() { return applicationName; } public void setApplicationName(String applicationName) { this.applicationName = applicationName; } public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public int getConnectorPort() { return connectorPort; } public void setConnectorPort(int connectorPort) { this.connectorPort = connectorPort; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }

3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java

@ -38,4 +38,7 @@ public class Constants {
*/
public static final int CPUS = Runtime.getRuntime().availableProcessors();
public static final String LOCAL_ADDRESS = IPUtils.getFirstNoLoopbackIP4Address();
}

142
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/IPUtils.java

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Enumeration;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class IPUtils {
private static final Logger logger = LoggerFactory.getLogger(IPUtils.class);
private static String IP_REGEX = "([1-9]|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3}";
private static String LOCAL_HOST = "unknown";
static {
String host = System.getenv("HOSTNAME");
if (isNotEmpty(host)) {
LOCAL_HOST = host;
} else {
try {
String hostName = InetAddress.getLocalHost().getHostName();
if (isNotEmpty(hostName)) {
LOCAL_HOST = hostName;
}
} catch (UnknownHostException e) {
logger.error("get hostName error!", e);
}
}
}
public static String getLocalHost() {
return LOCAL_HOST;
}
public static String getFirstNoLoopbackIP4Address() {
Collection<String> allNoLoopbackIP4Addresses = getNoLoopbackIP4Addresses();
if (allNoLoopbackIP4Addresses.isEmpty()) {
return null;
}
return allNoLoopbackIP4Addresses.iterator().next();
}
public static Collection<String> getNoLoopbackIP4Addresses() {
Collection<String> noLoopbackIP4Addresses = new ArrayList<>();
Collection<InetAddress> allInetAddresses = getAllHostAddress();
for (InetAddress address : allInetAddresses) {
if (!address.isLoopbackAddress() && !address.isSiteLocalAddress()
&& !Inet6Address.class.isInstance(address)) {
noLoopbackIP4Addresses.add(address.getHostAddress());
}
}
if (noLoopbackIP4Addresses.isEmpty()) {
for (InetAddress address : allInetAddresses) {
if (!address.isLoopbackAddress() && !Inet6Address.class.isInstance(address)) {
noLoopbackIP4Addresses.add(address.getHostAddress());
}
}
}
return noLoopbackIP4Addresses;
}
public static Collection<InetAddress> getAllHostAddress() {
try {
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
Collection<InetAddress> addresses = new ArrayList<>();
while (networkInterfaces.hasMoreElements()) {
NetworkInterface networkInterface = networkInterfaces.nextElement();
Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
InetAddress inetAddress = inetAddresses.nextElement();
addresses.add(inetAddress);
}
}
return addresses;
} catch (SocketException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
public static String getIpByHostName(String host) {
InetAddress address = null;
try {
address = InetAddress.getByName(host);
} catch (UnknownHostException e) {
logger.error("get IP error", e);
}
if (address == null) {
return "";
}
return address.getHostAddress();
}
private static boolean isEmpty(final CharSequence cs) {
return cs == null || cs.length() == 0;
}
private static boolean isNotEmpty(final CharSequence cs) {
return !isEmpty(cs);
}
public static boolean isIp(String addr) {
if (addr.length() < 7 || addr.length() > 15 || "".equals(addr)) {
return false;
}
Pattern pat = Pattern.compile(IP_REGEX);
Matcher mat = pat.matcher(addr);
boolean ipAddress = mat.find();
return ipAddress;
}
}

28
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -32,6 +32,11 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -134,12 +139,17 @@ public class MasterExecThread implements Runnable {
*/
private MasterConfig masterConfig;
/**
*
*/
private NettyRemotingClient nettyRemotingClient;
/**
* constructor of MasterExecThread
* @param processInstance process instance
* @param processService process dao
*/
public MasterExecThread(ProcessInstance processInstance, ProcessService processService){
public MasterExecThread(ProcessInstance processInstance, ProcessService processService, NettyRemotingClient nettyRemotingClient){
this.processService = processService;
this.processInstance = processInstance;
@ -147,6 +157,22 @@ public class MasterExecThread implements Runnable {
int masterTaskExecNum = masterConfig.getMasterExecTaskNum();
this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread",
masterTaskExecNum);
this.nettyRemotingClient = nettyRemotingClient;
}
//TODO
/**端口默认是123456
* 需要构造ExecuteTaskRequestCommand里面就是TaskInstance的属性
*/
private void sendToWorker(){
final Address address = new Address("localhost", 12346);
ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand();
try {
Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), 5000);
//结果可能为空,所以不用管,能发过去,就行。
} catch (InterruptedException | RemotingException ex) {
logger.error(String.format("send command to : %s error", address), ex);
}
}

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java

@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -70,6 +72,11 @@ public class MasterSchedulerThread implements Runnable {
*/
private MasterConfig masterConfig;
/**
* netty remoting client
*/
private NettyRemotingClient nettyRemotingClient;
/**
* constructor of MasterSchedulerThread
@ -83,6 +90,9 @@ public class MasterSchedulerThread implements Runnable {
this.masterExecThreadNum = masterExecThreadNum;
this.masterExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread",masterExecThreadNum);
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
//
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
/**
@ -123,7 +133,7 @@ public class MasterSchedulerThread implements Runnable {
processInstance = processService.handleCommand(logger, OSUtils.getHost(), this.masterExecThreadNum - activeCount, command);
if (processInstance != null) {
logger.info("start master exec thread , split DAG ...");
masterExecService.execute(new MasterExecThread(processInstance, processService));
masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient));
}
}catch (Exception e){
logger.error("scan command error ", e);
@ -140,6 +150,7 @@ public class MasterSchedulerThread implements Runnable {
AbstractZKClient.releaseMutex(mutex);
}
}
nettyRemotingClient.close();
logger.info("master server stopped...");
}

83
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.registry;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.atomic.AtomicBoolean;
@Service
public class ZookeeperRegistryCenter implements InitializingBean {
private final AtomicBoolean isStarted = new AtomicBoolean(false);
public static final String NAMESPACE = "/dolphinscheduler";
public static final String NODES = NAMESPACE + "/nodes";
public static final String MASTER_PATH = NODES + "/master";
public static final String WORKER_PATH = NODES + "/worker";
public static final String EMPTY = "";
@Autowired
protected ZookeeperCachedOperator zookeeperCachedOperator;
@Override
public void afterPropertiesSet() throws Exception {
}
public void init() {
if (isStarted.compareAndSet(false, true)) {
//TODO
// zookeeperCachedOperator.start(NODES);
initNodes();
}
}
private void initNodes() {
zookeeperCachedOperator.persist(MASTER_PATH, EMPTY);
zookeeperCachedOperator.persist(WORKER_PATH, EMPTY);
}
public void close() {
if (isStarted.compareAndSet(true, false)) {
if (zookeeperCachedOperator != null) {
zookeeperCachedOperator.close();
}
}
}
public String getMasterPath() {
return MASTER_PATH;
}
public String getWorkerPath() {
return WORKER_PATH;
}
public ZookeeperCachedOperator getZookeeperCachedOperator() {
return zookeeperCachedOperator;
}
}

49
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -29,8 +29,14 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.WorkerNettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -112,9 +118,28 @@ public class WorkerServer implements IStoppable {
@Value("${server.is-combined-server:false}")
private Boolean isCombinedServer;
/**
* worker config
*/
@Autowired
private WorkerConfig workerConfig;
/**
* zookeeper registry center
*/
@Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter;
/**
* netty remote server
*/
private NettyRemotingServer nettyRemotingServer;
/**
* worker registry
*/
private WorkerRegistry workerRegistry;
/**
* spring application context
* only use it for initialization
@ -141,7 +166,17 @@ public class WorkerServer implements IStoppable {
public void run(){
logger.info("start worker server...");
zkWorkerClient.init();
//init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerNettyRequestProcessor(processService));
this.nettyRemotingServer.start();
//worker registry
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort());
this.workerRegistry.registry();
this.zkWorkerClient.init();
this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
@ -167,10 +202,10 @@ public class WorkerServer implements IStoppable {
killExecutorService.execute(killProcessThread);
// new fetch task thread
FetchTaskThread fetchTaskThread = new FetchTaskThread(zkWorkerClient, processService, taskQueue);
// submit fetch task thread
fetchTaskExecutorService.execute(fetchTaskThread);
// FetchTaskThread fetchTaskThread = new FetchTaskThread(zkWorkerClient, processService, taskQueue);
//
// // submit fetch task thread
// fetchTaskExecutorService.execute(fetchTaskThread);
/**
* register hooks, which are called before the process exits
@ -217,6 +252,9 @@ public class WorkerServer implements IStoppable {
logger.warn("thread sleep exception", e);
}
this.nettyRemotingServer.close();
this.workerRegistry.unRegistry();
try {
heartbeatWorkerService.shutdownNow();
}catch (Exception e){
@ -260,7 +298,6 @@ public class WorkerServer implements IStoppable {
}
}
/**
* heartbeat thread implement
*

106
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskScheduleThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.concurrent.ExecutorService;
public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(WorkerNettyRequestProcessor.class);
private final ProcessService processService;
private final ExecutorService workerExecService;
private final WorkerConfig workerConfig;
public WorkerNettyRequestProcessor(ProcessService processService){
this.processService = processService;
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
}
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
logger.debug("received command : {}", command);
TaskInstance taskInstance = FastJsonSerializer.deserialize(command.getBody(), TaskInstance.class);
int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
// verify tenant is null
if (verifyTenantIsNull(tenant, taskInstance)) {
processService.changeTaskState(ExecutionStatus.FAILURE, taskInstance.getStartTime(), taskInstance.getHost(), null, null, taskInstance.getId());
return;
}
// set queue for process instance, user-specified queue takes precedence over tenant queue
String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
// local execute path
String execLocalPath = getExecLocalPath(taskInstance);
logger.info("task instance local execute path : {} ", execLocalPath);
// init task
taskInstance.init(OSUtils.getHost(), new Date(), execLocalPath);
try {
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, tenant.getTenantCode());
} catch (Exception ex){
logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
}
// submit task
workerExecService.submit(new TaskScheduleThread(taskInstance, processService));
}
private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
if(tenant == null){
logger.error("tenant not exists,process instance id : {},task instance id : {}",
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
return true;
}
return false;
}
private String getExecLocalPath(TaskInstance taskInstance){
return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
}
}

77
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WorkerRegistry {
private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class);
private final ZookeeperRegistryCenter zookeeperRegistryCenter;
private final int port;
public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
this.port = port;
}
public void registry() {
String address = Constants.LOCAL_ADDRESS;
String localNodePath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if(newState == ConnectionState.LOST){
logger.error("worker : {} connection lost from zookeeper", address);
} else if(newState == ConnectionState.RECONNECTED){
logger.info("worker : {} reconnected to zookeeper", address);
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
} else if(newState == ConnectionState.SUSPENDED){
logger.warn("worker : {} connection SUSPENDED ", address);
}
}
});
logger.info("scheduler node : {} registry to ZK successfully.", address);
}
public void unRegistry() {
String address = getLocalAddress();
String localNodePath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
logger.info("worker node : {} unRegistry to ZK.", address);
}
private String getWorkerPath() {
String address = getLocalAddress();
String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address;
return localNodePath;
}
private String getLocalAddress(){
return Constants.LOCAL_ADDRESS + ":" + port;
}
}
Loading…
Cancel
Save