Browse Source

worker fault tolerance modify (#2212)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify

* worker remove db

* ShellTask modify

* master persistence processId and appIds

* master persistence processId and appIds

* master add kill task logic

* master add kill task logic

* master add kill task logic

* javadoc error modify

* remove chinese log

* executeDirectly method add Override

* remote module modify

* TaskKillResponseProcessor command type modify

* create buildKillCommand

* host add host:port format

* host add host:port format

* TaskAckProcessor modify

* TaskAckProcessor modify

* task prioriry refator

* remove ITaskQueue

* task prioriry refator

* remove ITaskQueue

* TaskPriority refactor

* remove logs

* WorkerServer refactor

* MasterSchedulerService modify

* WorkerConfig listen port modify

* modify master and worker listen port

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* Encapsulate the parameters required by sqltask

* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization

* AbstractTask modify

* ProcedureTask optimization

* MasterSchedulerService modify

* TaskUpdateQueueConsumer modify

* test

* DataxTask process run debug

* DataxTask process run debug

* add protobuf dependency,MR、Spark task etc need this

* TaskUpdateQueueConsumer modify

* TaskExecutionContextBuilder set TaskInstance workgroup

* WorkerGroupService queryAllGroup modify
query available work group

* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify

* master and worker register ip  use OSUtils.getHost()

* ProcessInstance host set ip:port format

* worker fault tolerance modify

* Constants and .env modify

Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
pull/2/head
qiaozhanwei 5 years ago committed by GitHub
parent
commit
055bb28de4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
  2. 124
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  3. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
  4. 21
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
  5. 45
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  6. 58
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
  7. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java

@ -60,14 +60,14 @@ public class LoggerController extends BaseController {
*/ */
@ApiOperation(value = "queryLog", notes= "QUERY_TASK_INSTANCE_LOG_NOTES") @ApiOperation(value = "queryLog", notes= "QUERY_TASK_INSTANCE_LOG_NOTES")
@ApiImplicitParams({ @ApiImplicitParams({
@ApiImplicitParam(name = "taskInstId", value = "TASK_ID", dataType = "Int", example = "100"), @ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "skipLineNum", value = "SKIP_LINE_NUM", dataType ="Int", example = "100"), @ApiImplicitParam(name = "skipLineNum", value = "SKIP_LINE_NUM", dataType ="Int", example = "100"),
@ApiImplicitParam(name = "limit", value = "LIMIT", dataType ="Int", example = "100") @ApiImplicitParam(name = "limit", value = "LIMIT", dataType ="Int", example = "100")
}) })
@GetMapping(value = "/detail") @GetMapping(value = "/detail")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
public Result queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, public Result queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "taskInstId") int taskInstanceId, @RequestParam(value = "taskInstanceId") int taskInstanceId,
@RequestParam(value = "skipLineNum") int skipNum, @RequestParam(value = "skipLineNum") int skipNum,
@RequestParam(value = "limit") int limit) { @RequestParam(value = "limit") int limit) {
try { try {
@ -91,12 +91,12 @@ public class LoggerController extends BaseController {
*/ */
@ApiOperation(value = "downloadTaskLog", notes= "DOWNLOAD_TASK_INSTANCE_LOG_NOTES") @ApiOperation(value = "downloadTaskLog", notes= "DOWNLOAD_TASK_INSTANCE_LOG_NOTES")
@ApiImplicitParams({ @ApiImplicitParams({
@ApiImplicitParam(name = "taskInstId", value = "TASK_ID",dataType = "Int", example = "100") @ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID",dataType = "Int", example = "100")
}) })
@GetMapping(value = "/download-log") @GetMapping(value = "/download-log")
@ResponseBody @ResponseBody
public ResponseEntity downloadTaskLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, public ResponseEntity downloadTaskLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "taskInstId") int taskInstanceId) { @RequestParam(value = "taskInstanceId") int taskInstanceId) {
try { try {
byte[] logBytes = loggerService.getLogBytes(taskInstanceId); byte[] logBytes = loggerService.getLogBytes(taskInstanceId);
return ResponseEntity return ResponseEntity

124
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -118,20 +118,15 @@ public final class Constants {
*/ */
public static final String RES_UPLOAD_STARTUP_TYPE = "res.upload.startup.type"; public static final String RES_UPLOAD_STARTUP_TYPE = "res.upload.startup.type";
/**
* zookeeper quorum
*/
public static final String ZOOKEEPER_QUORUM = "zookeeper.quorum";
/** /**
* MasterServer directory registered in zookeeper * MasterServer directory registered in zookeeper
*/ */
public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "/masters"; public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "/nodes/masters";
/** /**
* WorkerServer directory registered in zookeeper * WorkerServer directory registered in zookeeper
*/ */
public static final String ZOOKEEPER_DOLPHINSCHEDULER_WORKERS = "/workers"; public static final String ZOOKEEPER_DOLPHINSCHEDULER_WORKERS = "/nodes/worker";
/** /**
* all servers directory registered in zookeeper * all servers directory registered in zookeeper
@ -143,10 +138,6 @@ public final class Constants {
*/ */
public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS = "/lock/masters"; public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS = "/lock/masters";
/**
* WorkerServer lock directory registered in zookeeper
*/
public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS = "/lock/workers";
/** /**
* MasterServer failover directory registered in zookeeper * MasterServer failover directory registered in zookeeper
@ -163,10 +154,6 @@ public final class Constants {
*/ */
public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters"; public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters";
/**
* need send warn times when master server or worker server failover
*/
public static final int DOLPHINSCHEDULER_WARN_TIMES_FAILOVER = 3;
/** /**
* comma , * comma ,
@ -203,37 +190,6 @@ public final class Constants {
*/ */
public static final String EQUAL_SIGN = "="; public static final String EQUAL_SIGN = "=";
/**
* ZOOKEEPER_SESSION_TIMEOUT
*/
public static final String ZOOKEEPER_SESSION_TIMEOUT = "zookeeper.session.timeout";
public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "zookeeper.connection.timeout";
public static final String ZOOKEEPER_RETRY_SLEEP = "zookeeper.retry.sleep";
public static final String ZOOKEEPER_RETRY_BASE_SLEEP = "zookeeper.retry.base.sleep";
public static final String ZOOKEEPER_RETRY_MAX_SLEEP = "zookeeper.retry.max.sleep";
public static final String ZOOKEEPER_RETRY_MAXTIME = "zookeeper.retry.maxtime";
public static final String MASTER_HEARTBEAT_INTERVAL = "master.heartbeat.interval";
public static final String MASTER_EXEC_THREADS = "master.exec.threads";
public static final String MASTER_EXEC_TASK_THREADS = "master.exec.task.number";
public static final String MASTER_COMMIT_RETRY_TIMES = "master.task.commit.retryTimes";
public static final String MASTER_COMMIT_RETRY_INTERVAL = "master.task.commit.interval";
public static final String WORKER_EXEC_THREADS = "worker.exec.threads";
public static final String WORKER_HEARTBEAT_INTERVAL = "worker.heartbeat.interval";
public static final String WORKER_FETCH_TASK_NUM = "worker.fetch.task.num";
public static final String WORKER_MAX_CPULOAD_AVG = "worker.max.cpuload.avg"; public static final String WORKER_MAX_CPULOAD_AVG = "worker.max.cpuload.avg";
@ -244,17 +200,6 @@ public final class Constants {
public static final String MASTER_RESERVED_MEMORY = "master.reserved.memory"; public static final String MASTER_RESERVED_MEMORY = "master.reserved.memory";
/**
* dolphinscheduler tasks queue
*/
public static final String DOLPHINSCHEDULER_TASKS_QUEUE = "tasks_queue";
/**
* dolphinscheduler need kill tasks queue
*/
public static final String DOLPHINSCHEDULER_TASKS_KILL = "tasks_kill";
public static final String ZOOKEEPER_DOLPHINSCHEDULER_ROOT = "zookeeper.dolphinscheduler.root";
public static final String SCHEDULER_QUEUE_IMPL = "dolphinscheduler.queue.impl"; public static final String SCHEDULER_QUEUE_IMPL = "dolphinscheduler.queue.impl";
@ -350,26 +295,6 @@ public final class Constants {
public static final int MAX_TASK_TIMEOUT = 24 * 3600; public static final int MAX_TASK_TIMEOUT = 24 * 3600;
/**
* heartbeat threads number
*/
public static final int DEFAUL_WORKER_HEARTBEAT_THREAD_NUM = 1;
/**
* heartbeat interval
*/
public static final int DEFAULT_WORKER_HEARTBEAT_INTERVAL = 60;
/**
* worker fetch task number
*/
public static final int DEFAULT_WORKER_FETCH_TASK_NUM = 1;
/**
* worker execute threads number
*/
public static final int DEFAULT_WORKER_EXEC_THREAD_NUM = 10;
/** /**
* master cpu load * master cpu load
*/ */
@ -391,16 +316,6 @@ public final class Constants {
public static final double DEFAULT_WORKER_RESERVED_MEMORY = OSUtils.totalMemorySize() / 10; public static final double DEFAULT_WORKER_RESERVED_MEMORY = OSUtils.totalMemorySize() / 10;
/**
* master execute threads number
*/
public static final int DEFAULT_MASTER_EXEC_THREAD_NUM = 100;
/**
* default master concurrent task execute num
*/
public static final int DEFAULT_MASTER_TASK_EXEC_NUM = 20;
/** /**
* default log cache rows num,output when reach the number * default log cache rows num,output when reach the number
@ -408,33 +323,11 @@ public final class Constants {
public static final int DEFAULT_LOG_ROWS_NUM = 4 * 16; public static final int DEFAULT_LOG_ROWS_NUM = 4 * 16;
/** /**
* log flush intervaloutput when reach the interval * log flush interval?output when reach the interval
*/ */
public static final int DEFAULT_LOG_FLUSH_INTERVAL = 1000; public static final int DEFAULT_LOG_FLUSH_INTERVAL = 1000;
/**
* default master heartbeat thread number
*/
public static final int DEFAULT_MASTER_HEARTBEAT_THREAD_NUM = 1;
/**
* default master heartbeat interval
*/
public static final int DEFAULT_MASTER_HEARTBEAT_INTERVAL = 60;
/**
* default master commit retry times
*/
public static final int DEFAULT_MASTER_COMMIT_RETRY_TIMES = 5;
/**
* default master commit retry interval
*/
public static final int DEFAULT_MASTER_COMMIT_RETRY_INTERVAL = 3000;
/** /**
* time unit secong to minutes * time unit secong to minutes
*/ */
@ -805,7 +698,6 @@ public final class Constants {
public static final String ALIAS = "alias"; public static final String ALIAS = "alias";
public static final String CONTENT = "content"; public static final String CONTENT = "content";
public static final String DEPENDENT_SPLIT = ":||"; public static final String DEPENDENT_SPLIT = ":||";
public static final String DEPENDENT_ALL = "ALL";
/** /**
@ -864,7 +756,7 @@ public final class Constants {
*/ */
public static final String HIVE_CONF = "hiveconf:"; public static final String HIVE_CONF = "hiveconf:";
//flink 任务 //flink ??
public static final String FLINK_YARN_CLUSTER = "yarn-cluster"; public static final String FLINK_YARN_CLUSTER = "yarn-cluster";
public static final String FLINK_RUN_MODE = "-m"; public static final String FLINK_RUN_MODE = "-m";
public static final String FLINK_YARN_SLOT = "-ys"; public static final String FLINK_YARN_SLOT = "-ys";
@ -899,26 +791,20 @@ public final class Constants {
/** /**
* data total * data total
* 数据总数
*/ */
public static final String COUNT = "count"; public static final String COUNT = "count";
/** /**
* page size * page size
* 每页数据条数
*/ */
public static final String PAGE_SIZE = "pageSize"; public static final String PAGE_SIZE = "pageSize";
/** /**
* current page no * current page no
* 当前页码
*/ */
public static final String PAGE_NUMBER = "pageNo"; public static final String PAGE_NUMBER = "pageNo";
/**
* result
*/
public static final String RESULT = "result";
/** /**
* *

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java

@ -49,6 +49,7 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setExecutePath(taskInstance.getExecutePath()); taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
taskExecutionContext.setTaskJson(taskInstance.getTaskJson()); taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup()); taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup());
taskExecutionContext.setHost(taskInstance.getHost());
return this; return this;
} }

21
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClientService;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -60,7 +61,7 @@ public class ProcessUtils {
allowAmbiguousCommands = true; allowAmbiguousCommands = true;
String value = System.getProperty("jdk.lang.Process.allowAmbiguousCommands"); String value = System.getProperty("jdk.lang.Process.allowAmbiguousCommands");
if (value != null) { if (value != null) {
allowAmbiguousCommands = !"false".equalsIgnoreCase(value); allowAmbiguousCommands = !"false".equalsIgnoreCase(value);
} }
} }
if (allowAmbiguousCommands) { if (allowAmbiguousCommands) {
@ -68,7 +69,7 @@ public class ProcessUtils {
String executablePath = new File(cmd[0]).getPath(); String executablePath = new File(cmd[0]).getPath();
if (needsEscaping(VERIFICATION_LEGACY, executablePath)) { if (needsEscaping(VERIFICATION_LEGACY, executablePath)) {
executablePath = quoteString(executablePath); executablePath = quoteString(executablePath);
} }
cmdstr = createCommandLine( cmdstr = createCommandLine(
@ -81,7 +82,7 @@ public class ProcessUtils {
StringBuilder join = new StringBuilder(); StringBuilder join = new StringBuilder();
for (String s : cmd) { for (String s : cmd) {
join.append(s).append(' '); join.append(s).append(' ');
} }
cmd = getTokensFromCommand(join.toString()); cmd = getTokensFromCommand(join.toString());
@ -89,7 +90,7 @@ public class ProcessUtils {
// Check new executable name once more // Check new executable name once more
if (security != null) { if (security != null) {
security.checkExec(executablePath); security.checkExec(executablePath);
} }
} }
@ -147,7 +148,7 @@ public class ProcessUtils {
ArrayList<String> matchList = new ArrayList<>(8); ArrayList<String> matchList = new ArrayList<>(8);
Matcher regexMatcher = LazyPattern.PATTERN.matcher(command); Matcher regexMatcher = LazyPattern.PATTERN.matcher(command);
while (regexMatcher.find()) { while (regexMatcher.find()) {
matchList.add(regexMatcher.group()); matchList.add(regexMatcher.group());
} }
return matchList.toArray(new String[matchList.size()]); return matchList.toArray(new String[matchList.size()]);
} }
@ -323,9 +324,9 @@ public class ProcessUtils {
try { try {
int processId = taskExecutionContext.getProcessId(); int processId = taskExecutionContext.getProcessId();
if(processId == 0 ){ if(processId == 0 ){
logger.error("process kill failed, process id :{}, task id:{}", logger.error("process kill failed, process id :{}, task id:{}",
processId, taskExecutionContext.getTaskInstanceId()); processId, taskExecutionContext.getTaskInstanceId());
return ; return ;
} }
String cmd = String.format("sudo kill -9 %s", getPidsStr(processId)); String cmd = String.format("sudo kill -9 %s", getPidsStr(processId));
@ -379,7 +380,9 @@ public class ProcessUtils {
String log = null; String log = null;
try { try {
logClient = new LogClientService(); logClient = new LogClientService();
log = logClient.viewLog(taskExecutionContext.getHost(), Constants.RPC_PORT, taskExecutionContext.getLogPath()); log = logClient.viewLog(Host.of(taskExecutionContext.getHost()).getIp(),
Constants.RPC_PORT,
taskExecutionContext.getLogPath());
} finally { } finally {
if(logClient != null){ if(logClient != null){
logClient.close(); logClient.close();

45
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -59,7 +59,7 @@ public class ZKMasterClient extends AbstractZKClient {
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
public void start() { public void start() {
InterProcessMutex mutex = null; InterProcessMutex mutex = null;
try { try {
@ -71,7 +71,7 @@ public class ZKMasterClient extends AbstractZKClient {
// init system znode // init system znode
this.initSystemZNode(); this.initSystemZNode();
// check if fault tolerance is requiredfailure and tolerance // check if fault tolerance is required?failure and tolerance
if (getActiveMasterNum() == 1) { if (getActiveMasterNum() == 1) {
failoverWorker(null, true); failoverWorker(null, true);
failoverMaster(null); failoverMaster(null);
@ -146,8 +146,8 @@ public class ZKMasterClient extends AbstractZKClient {
* @throws Exception exception * @throws Exception exception
*/ */
private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception {
if(StringUtils.isEmpty(serverHost)){ if(StringUtils.isEmpty(serverHost)){
return ; return ;
} }
switch (zkNodeType){ switch (zkNodeType){
case MASTER: case MASTER:
@ -217,7 +217,7 @@ public class ZKMasterClient extends AbstractZKClient {
/** /**
* task needs failover if task start before worker starts * task needs failover if task start before worker starts
* *
* @param taskInstance task instance * @param taskInstance task instance
* @return true if task instance need fail over * @return true if task instance need fail over
*/ */
@ -231,10 +231,10 @@ public class ZKMasterClient extends AbstractZKClient {
} }
// if the worker node exists in zookeeper, we must check the task starts after the worker // if the worker node exists in zookeeper, we must check the task starts after the worker
if(checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)){ if(checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)){
//if task start after worker starts, there is no need to failover the task. //if task start after worker starts, there is no need to failover the task.
if(checkTaskAfterWorkerStart(taskInstance)){ if(checkTaskAfterWorkerStart(taskInstance)){
taskNeedFailover = false; taskNeedFailover = false;
} }
} }
return taskNeedFailover; return taskNeedFailover;
@ -247,15 +247,15 @@ public class ZKMasterClient extends AbstractZKClient {
* @return true if task instance start time after worker server start date * @return true if task instance start time after worker server start date
*/ */
private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) { private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
if(StringUtils.isEmpty(taskInstance.getHost())){ if(StringUtils.isEmpty(taskInstance.getHost())){
return false; return false;
} }
Date workerServerStartDate = null; Date workerServerStartDate = null;
List<Server> workerServers = getServersList(ZKNodeType.WORKER); List<Server> workerServers = getServersList(ZKNodeType.WORKER);
for(Server workerServer : workerServers){ for(Server workerServer : workerServers){
if(workerServer.getHost().equals(taskInstance.getHost())){ if(workerServer.getHost().equals(taskInstance.getHost())){
workerServerStartDate = workerServer.getCreateTime(); workerServerStartDate = workerServer.getCreateTime();
break; break;
} }
} }
@ -271,7 +271,7 @@ public class ZKMasterClient extends AbstractZKClient {
* *
* 1. kill yarn job if there are yarn jobs in tasks. * 1. kill yarn job if there are yarn jobs in tasks.
* 2. change task state from running to need failover. * 2. change task state from running to need failover.
* 3. failover all tasks when workerHost is null * 3. failover all tasks when workerHost is null
* @param workerHost worker host * @param workerHost worker host
*/ */
@ -293,7 +293,7 @@ public class ZKMasterClient extends AbstractZKClient {
if(needCheckWorkerAlive){ if(needCheckWorkerAlive){
if(!checkTaskInstanceNeedFailover(taskInstance)){ if(!checkTaskInstanceNeedFailover(taskInstance)){
continue; continue;
} }
} }
ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
@ -304,7 +304,6 @@ public class ZKMasterClient extends AbstractZKClient {
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance) .buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance) .buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(null)
.create(); .create();
// only kill yarn job if exists , the local thread has exited // only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext); ProcessUtils.killYarnJob(taskExecutionContext);
@ -334,9 +333,9 @@ public class ZKMasterClient extends AbstractZKClient {
} }
public InterProcessMutex blockAcquireMutex() throws Exception { public InterProcessMutex blockAcquireMutex() throws Exception {
InterProcessMutex mutex = new InterProcessMutex(getZkClient(), getMasterLockPath()); InterProcessMutex mutex = new InterProcessMutex(getZkClient(), getMasterLockPath());
mutex.acquire(); mutex.acquire();
return mutex; return mutex;
} }
} }

58
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java

@ -38,56 +38,24 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class); private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class);
/** /**
* check dead server or not , if dead, stop self * remove dead server by host
* * @param host host
* @param zNode node path * @param serverType serverType
* @param serverType master or worker prefix * @throws Exception
* @return true if not exists
* @throws Exception errors
*/ */
protected boolean checkIsDeadServer(String zNode, String serverType) throws Exception{
//ip_sequenceno
String[] zNodesPath = zNode.split("\\/");
String ipSeqNo = zNodesPath[zNodesPath.length - 1];
String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX;
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
if(!isExisted(zNode) || isExisted(deadServerPath)){
return true;
}
return false;
}
public void removeDeadServerByHost(String host, String serverType) throws Exception { public void removeDeadServerByHost(String host, String serverType) throws Exception {
List<String> deadServers = super.getChildrenKeys(getDeadZNodeParentPath()); List<String> deadServers = super.getChildrenKeys(getDeadZNodeParentPath());
for(String serverPath : deadServers){ for(String serverPath : deadServers){
if(serverPath.startsWith(serverType+UNDERLINE+host)){ if(serverPath.startsWith(serverType+UNDERLINE+host)){
String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath; String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
super.remove(server); super.remove(server);
logger.info("{} server {} deleted from zk dead server path success" , serverType , host); logger.info("{} server {} deleted from zk dead server path success" , serverType , host);
} }
} }
} }
/**
* create zookeeper path according the zk node type.
* @param zkNodeType zookeeper node type
* @return register zookeeper path
* @throws Exception
*/
private String createZNodePath(ZKNodeType zkNodeType, String host) throws Exception {
// specify the format of stored data in ZK nodes
String heartbeatZKInfo = ResInfo.getHeartBeatInfo(new Date());
// create temporary sequence nodes for master znode
String registerPath= getZNodeParentPath(zkNodeType) + SINGLE_SLASH + host;
super.persistEphemeral(registerPath, heartbeatZKInfo);
logger.info("register {} node {} success" , zkNodeType.toString(), registerPath);
return registerPath;
}
/** /**
* opType(add): if find dead server , then add to zk deadServerPath * opType(add): if find dead server , then add to zk deadServerPath
@ -326,7 +294,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
*/ */
protected String getHostByEventDataPath(String path) { protected String getHostByEventDataPath(String path) {
if(StringUtils.isEmpty(path)){ if(StringUtils.isEmpty(path)){
logger.error("empty path!"); logger.error("empty path!");
return ""; return "";
} }
String[] pathArray = path.split(SINGLE_SLASH); String[] pathArray = path.split(SINGLE_SLASH);

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java

@ -39,7 +39,7 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
*/ */
@Override @Override
protected void registerListener() { protected void registerListener() {
treeCache = new TreeCache(zkClient, getZookeeperConfig().getDsRoot()); treeCache = new TreeCache(zkClient, getZookeeperConfig().getDsRoot() + "/nodes");
logger.info("add listener to zk path: {}", getZookeeperConfig().getDsRoot()); logger.info("add listener to zk path: {}", getZookeeperConfig().getDsRoot());
try { try {
treeCache.start(); treeCache.start();

Loading…
Cancel
Save