|
|
|
/*
|
|
|
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
|
* contributor license agreements. See the NOTICE file distributed with
|
|
|
|
* this work for additional information regarding copyright ownership.
|
|
|
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
|
* (the "License"); you may not use this file except in compliance with
|
|
|
|
* the License. You may obtain a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
*/
|
|
|
|
package org.apache.dolphinscheduler.server.zk;
|
|
|
|
|
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
|
import org.apache.curator.framework.CuratorFramework;
|
|
|
|
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
|
|
|
|
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
|
|
|
|
import org.apache.dolphinscheduler.common.Constants;
|
|
|
|
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
|
|
|
|
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
|
|
|
|
import org.apache.dolphinscheduler.common.model.Server;
|
#2499 bug fix (#2505)
* dispatch task fail will set task status failed
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix
* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix
* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix
* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix
* send mail error, #2466 bug fix
* send mail error, #2466 bug fix
* send mail error, #2466 bug fix
* send mail error, #2466 bug fix
* #2486 bug fix
* host and workergroup compatible
* EnterpriseWeChatUtils modify
* EnterpriseWeChatUtils modify
* EnterpriseWeChatUtils modify
* #2499 bug fix
Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
4 years ago
|
|
|
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
|
|
|
|
import org.apache.dolphinscheduler.common.utils.OSUtils;
|
|
|
|
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
|
|
|
|
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
|
|
|
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
|
|
|
|
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
|
|
|
|
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
|
|
|
|
import org.apache.dolphinscheduler.service.process.ProcessService;
|
|
|
|
import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
import java.util.Date;
|
|
|
|
import java.util.List;
|
|
|
|
|
#2499 bug fix (#2505)
* dispatch task fail will set task status failed
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix
* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix
* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix
* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix
* send mail error, #2466 bug fix
* send mail error, #2466 bug fix
* send mail error, #2466 bug fix
* send mail error, #2466 bug fix
* #2486 bug fix
* host and workergroup compatible
* EnterpriseWeChatUtils modify
* EnterpriseWeChatUtils modify
* EnterpriseWeChatUtils modify
* #2499 bug fix
Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
4 years ago
|
|
|
import static org.apache.dolphinscheduler.common.Constants.*;
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
* zookeeper master client
|
|
|
|
*
|
|
|
|
* single instance
|
|
|
|
*/
|
|
|
|
@Component
|
|
|
|
public class ZKMasterClient extends AbstractZKClient {
|
|
|
|
|
|
|
|
/**
|
|
|
|
* logger
|
|
|
|
*/
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(ZKMasterClient.class);
|
|
|
|
|
|
|
|
/**
|
|
|
|
* process service
|
|
|
|
*/
|
|
|
|
@Autowired
|
|
|
|
private ProcessService processService;
|
|
|
|
|
worker fault tolerance modify (#2212)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
* ProcedureTask optimization
* MasterSchedulerService modify
* TaskUpdateQueueConsumer modify
* test
* DataxTask process run debug
* DataxTask process run debug
* add protobuf dependency,MR、Spark task etc need this
* TaskUpdateQueueConsumer modify
* TaskExecutionContextBuilder set TaskInstance workgroup
* WorkerGroupService queryAllGroup modify
query available work group
* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify
* master and worker register ip use OSUtils.getHost()
* ProcessInstance host set ip:port format
* worker fault tolerance modify
* Constants and .env modify
Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
4 years ago
|
|
|
public void start() {
|
|
|
|
|
|
|
|
InterProcessMutex mutex = null;
|
|
|
|
try {
|
|
|
|
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
|
|
|
|
String znodeLock = getMasterStartUpLockPath();
|
|
|
|
mutex = new InterProcessMutex(getZkClient(), znodeLock);
|
|
|
|
mutex.acquire();
|
|
|
|
|
|
|
|
// init system znode
|
|
|
|
this.initSystemZNode();
|
|
|
|
|
#2499 bug fix (#2505)
* dispatch task fail will set task status failed
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix
* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix
* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix
* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix
* send mail error, #2466 bug fix
* send mail error, #2466 bug fix
* send mail error, #2466 bug fix
* send mail error, #2466 bug fix
* #2486 bug fix
* host and workergroup compatible
* EnterpriseWeChatUtils modify
* EnterpriseWeChatUtils modify
* EnterpriseWeChatUtils modify
* #2499 bug fix
Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
4 years ago
|
|
|
while (!checkZKNodeExists(OSUtils.getHost(), ZKNodeType.MASTER)){
|
|
|
|
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// self tolerant
|
|
|
|
if (getActiveMasterNum() == 1) {
|
|
|
|
failoverWorker(null, true);
|
|
|
|
failoverMaster(null);
|
|
|
|
}
|
|
|
|
|
|
|
|
}catch (Exception e){
|
|
|
|
logger.error("master start up exception",e);
|
|
|
|
}finally {
|
|
|
|
releaseMutex(mutex);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void close(){
|
|
|
|
super.close();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* handle path events that this class cares about
|
|
|
|
* @param client zkClient
|
|
|
|
* @param event path event
|
|
|
|
* @param path zk path
|
|
|
|
*/
|
|
|
|
@Override
|
|
|
|
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
|
|
|
|
//monitor master
|
|
|
|
if(path.startsWith(getZNodeParentPath(ZKNodeType.MASTER)+Constants.SINGLE_SLASH)){
|
|
|
|
handleMasterEvent(event,path);
|
|
|
|
}else if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){
|
|
|
|
//monitor worker
|
|
|
|
handleWorkerEvent(event,path);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* remove zookeeper node path
|
|
|
|
*
|
|
|
|
* @param path zookeeper node path
|
|
|
|
* @param zkNodeType zookeeper node type
|
|
|
|
* @param failover is failover
|
|
|
|
*/
|
|
|
|
private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {
|
|
|
|
logger.info("{} node deleted : {}", zkNodeType.toString(), path);
|
|
|
|
InterProcessMutex mutex = null;
|
|
|
|
try {
|
|
|
|
String failoverPath = getFailoverLockPath(zkNodeType);
|
|
|
|
// create a distributed lock
|
|
|
|
mutex = new InterProcessMutex(getZkClient(), failoverPath);
|
|
|
|
mutex.acquire();
|
|
|
|
|
|
|
|
String serverHost = getHostByEventDataPath(path);
|
|
|
|
// handle dead server
|
|
|
|
handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
|
|
|
|
//failover server
|
|
|
|
if(failover){
|
|
|
|
failoverServerWhenDown(serverHost, zkNodeType);
|
|
|
|
}
|
|
|
|
}catch (Exception e){
|
|
|
|
logger.error("{} server failover failed.", zkNodeType.toString());
|
|
|
|
logger.error("failover exception ",e);
|
|
|
|
}
|
|
|
|
finally {
|
|
|
|
releaseMutex(mutex);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* failover server when server down
|
|
|
|
*
|
|
|
|
* @param serverHost server host
|
|
|
|
* @param zkNodeType zookeeper node type
|
|
|
|
* @throws Exception exception
|
|
|
|
*/
|
|
|
|
private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception {
|
#2499 buf fix (#2518)
* dispatch task fail will set task status failed
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result
* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix
* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix
* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix
* 1,task status statistics and process status statistics bug fix (#2357)
2,worker group bug fix
* send mail error, #2466 bug fix
* send mail error, #2466 bug fix
* send mail error, #2466 bug fix
* send mail error, #2466 bug fix
* #2486 bug fix
* host and workergroup compatible
* EnterpriseWeChatUtils modify
* EnterpriseWeChatUtils modify
* EnterpriseWeChatUtils modify
* #2499 bug fix
* add comment
* revert comment
* revert comment
* #2499 buf fix
Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
4 years ago
|
|
|
if(StringUtils.isEmpty(serverHost) || serverHost.startsWith(OSUtils.getHost())){
|
worker fault tolerance modify (#2212)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
* ProcedureTask optimization
* MasterSchedulerService modify
* TaskUpdateQueueConsumer modify
* test
* DataxTask process run debug
* DataxTask process run debug
* add protobuf dependency,MR、Spark task etc need this
* TaskUpdateQueueConsumer modify
* TaskExecutionContextBuilder set TaskInstance workgroup
* WorkerGroupService queryAllGroup modify
query available work group
* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify
* master and worker register ip use OSUtils.getHost()
* ProcessInstance host set ip:port format
* worker fault tolerance modify
* Constants and .env modify
Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
4 years ago
|
|
|
return ;
|
|
|
|
}
|
|
|
|
switch (zkNodeType){
|
|
|
|
case MASTER:
|
|
|
|
failoverMaster(serverHost);
|
|
|
|
break;
|
|
|
|
case WORKER:
|
|
|
|
failoverWorker(serverHost, true);
|
|
|
|
default:
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* get failover lock path
|
|
|
|
*
|
|
|
|
* @param zkNodeType zookeeper node type
|
|
|
|
* @return fail over lock path
|
|
|
|
*/
|
|
|
|
private String getFailoverLockPath(ZKNodeType zkNodeType){
|
|
|
|
|
|
|
|
switch (zkNodeType){
|
|
|
|
case MASTER:
|
|
|
|
return getMasterFailoverLockPath();
|
|
|
|
case WORKER:
|
|
|
|
return getWorkerFailoverLockPath();
|
|
|
|
default:
|
|
|
|
return "";
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* monitor master
|
|
|
|
* @param event event
|
|
|
|
* @param path path
|
|
|
|
*/
|
|
|
|
public void handleMasterEvent(TreeCacheEvent event, String path){
|
|
|
|
switch (event.getType()) {
|
|
|
|
case NODE_ADDED:
|
|
|
|
logger.info("master node added : {}", path);
|
|
|
|
break;
|
|
|
|
case NODE_REMOVED:
|
|
|
|
removeZKNodePath(path, ZKNodeType.MASTER, true);
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* monitor worker
|
|
|
|
* @param event event
|
|
|
|
* @param path path
|
|
|
|
*/
|
|
|
|
public void handleWorkerEvent(TreeCacheEvent event, String path){
|
|
|
|
switch (event.getType()) {
|
|
|
|
case NODE_ADDED:
|
|
|
|
logger.info("worker node added : {}", path);
|
|
|
|
break;
|
|
|
|
case NODE_REMOVED:
|
|
|
|
logger.info("worker node deleted : {}", path);
|
|
|
|
removeZKNodePath(path, ZKNodeType.WORKER, true);
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* task needs failover if task start before worker starts
|
worker fault tolerance modify (#2212)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
* ProcedureTask optimization
* MasterSchedulerService modify
* TaskUpdateQueueConsumer modify
* test
* DataxTask process run debug
* DataxTask process run debug
* add protobuf dependency,MR、Spark task etc need this
* TaskUpdateQueueConsumer modify
* TaskExecutionContextBuilder set TaskInstance workgroup
* WorkerGroupService queryAllGroup modify
query available work group
* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify
* master and worker register ip use OSUtils.getHost()
* ProcessInstance host set ip:port format
* worker fault tolerance modify
* Constants and .env modify
Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
4 years ago
|
|
|
*
|
|
|
|
* @param taskInstance task instance
|
|
|
|
* @return true if task instance need fail over
|
|
|
|
*/
|
|
|
|
private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception {
|
|
|
|
|
|
|
|
boolean taskNeedFailover = true;
|
|
|
|
|
|
|
|
//now no host will execute this task instance,so no need to failover the task
|
|
|
|
if(taskInstance.getHost() == null){
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
// if the worker node exists in zookeeper, we must check the task starts after the worker
|
worker fault tolerance modify (#2212)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
* ProcedureTask optimization
* MasterSchedulerService modify
* TaskUpdateQueueConsumer modify
* test
* DataxTask process run debug
* DataxTask process run debug
* add protobuf dependency,MR、Spark task etc need this
* TaskUpdateQueueConsumer modify
* TaskExecutionContextBuilder set TaskInstance workgroup
* WorkerGroupService queryAllGroup modify
query available work group
* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify
* master and worker register ip use OSUtils.getHost()
* ProcessInstance host set ip:port format
* worker fault tolerance modify
* Constants and .env modify
Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
4 years ago
|
|
|
if(checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)){
|
|
|
|
//if task start after worker starts, there is no need to failover the task.
|
|
|
|
if(checkTaskAfterWorkerStart(taskInstance)){
|
|
|
|
taskNeedFailover = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return taskNeedFailover;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* check task start after the worker server starts.
|
|
|
|
*
|
|
|
|
* @param taskInstance task instance
|
|
|
|
* @return true if task instance start time after worker server start date
|
|
|
|
*/
|
|
|
|
private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
|
worker fault tolerance modify (#2212)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
* ProcedureTask optimization
* MasterSchedulerService modify
* TaskUpdateQueueConsumer modify
* test
* DataxTask process run debug
* DataxTask process run debug
* add protobuf dependency,MR、Spark task etc need this
* TaskUpdateQueueConsumer modify
* TaskExecutionContextBuilder set TaskInstance workgroup
* WorkerGroupService queryAllGroup modify
query available work group
* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify
* master and worker register ip use OSUtils.getHost()
* ProcessInstance host set ip:port format
* worker fault tolerance modify
* Constants and .env modify
Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
4 years ago
|
|
|
if(StringUtils.isEmpty(taskInstance.getHost())){
|
|
|
|
return false;
|
|
|
|
}
|
worker fault tolerance modify (#2212)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
* ProcedureTask optimization
* MasterSchedulerService modify
* TaskUpdateQueueConsumer modify
* test
* DataxTask process run debug
* DataxTask process run debug
* add protobuf dependency,MR、Spark task etc need this
* TaskUpdateQueueConsumer modify
* TaskExecutionContextBuilder set TaskInstance workgroup
* WorkerGroupService queryAllGroup modify
query available work group
* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify
* master and worker register ip use OSUtils.getHost()
* ProcessInstance host set ip:port format
* worker fault tolerance modify
* Constants and .env modify
Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
4 years ago
|
|
|
Date workerServerStartDate = null;
|
|
|
|
List<Server> workerServers = getServersList(ZKNodeType.WORKER);
|
|
|
|
for(Server workerServer : workerServers){
|
|
|
|
if(taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())){
|
worker fault tolerance modify (#2212)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
* ProcedureTask optimization
* MasterSchedulerService modify
* TaskUpdateQueueConsumer modify
* test
* DataxTask process run debug
* DataxTask process run debug
* add protobuf dependency,MR、Spark task etc need this
* TaskUpdateQueueConsumer modify
* TaskExecutionContextBuilder set TaskInstance workgroup
* WorkerGroupService queryAllGroup modify
query available work group
* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify
* master and worker register ip use OSUtils.getHost()
* ProcessInstance host set ip:port format
* worker fault tolerance modify
* Constants and .env modify
Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
4 years ago
|
|
|
workerServerStartDate = workerServer.getCreateTime();
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if(workerServerStartDate != null){
|
|
|
|
return taskInstance.getStartTime().after(workerServerStartDate);
|
|
|
|
}else{
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* failover worker tasks
|
|
|
|
*
|
|
|
|
* 1. kill yarn job if there are yarn jobs in tasks.
|
|
|
|
* 2. change task state from running to need failover.
|
worker fault tolerance modify (#2212)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
* ProcedureTask optimization
* MasterSchedulerService modify
* TaskUpdateQueueConsumer modify
* test
* DataxTask process run debug
* DataxTask process run debug
* add protobuf dependency,MR、Spark task etc need this
* TaskUpdateQueueConsumer modify
* TaskExecutionContextBuilder set TaskInstance workgroup
* WorkerGroupService queryAllGroup modify
query available work group
* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify
* master and worker register ip use OSUtils.getHost()
* ProcessInstance host set ip:port format
* worker fault tolerance modify
* Constants and .env modify
Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
4 years ago
|
|
|
* 3. failover all tasks when workerHost is null
|
|
|
|
* @param workerHost worker host
|
|
|
|
*/
|
|
|
|
|
|
|
|
/**
|
|
|
|
* failover worker tasks
|
|
|
|
*
|
|
|
|
* 1. kill yarn job if there are yarn jobs in tasks.
|
|
|
|
* 2. change task state from running to need failover.
|
|
|
|
* 3. failover all tasks when workerHost is null
|
|
|
|
* @param workerHost worker host
|
|
|
|
* @param needCheckWorkerAlive need check worker alive
|
|
|
|
* @throws Exception exception
|
|
|
|
*/
|
|
|
|
private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
|
|
|
|
logger.info("start worker[{}] failover ...", workerHost);
|
|
|
|
|
|
|
|
List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
|
|
|
|
for(TaskInstance taskInstance : needFailoverTaskInstanceList){
|
|
|
|
if(needCheckWorkerAlive){
|
|
|
|
if(!checkTaskInstanceNeedFailover(taskInstance)){
|
|
|
|
continue;
|
worker fault tolerance modify (#2212)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
* ProcedureTask optimization
* MasterSchedulerService modify
* TaskUpdateQueueConsumer modify
* test
* DataxTask process run debug
* DataxTask process run debug
* add protobuf dependency,MR、Spark task etc need this
* TaskUpdateQueueConsumer modify
* TaskExecutionContextBuilder set TaskInstance workgroup
* WorkerGroupService queryAllGroup modify
query available work group
* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify
* master and worker register ip use OSUtils.getHost()
* ProcessInstance host set ip:port format
* worker fault tolerance modify
* Constants and .env modify
Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
4 years ago
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
|
|
|
|
if(processInstance != null){
|
|
|
|
taskInstance.setProcessInstance(processInstance);
|
|
|
|
}
|
|
|
|
|
|
|
|
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
|
|
|
|
.buildTaskInstanceRelatedInfo(taskInstance)
|
|
|
|
.buildProcessInstanceRelatedInfo(processInstance)
|
|
|
|
.create();
|
|
|
|
// only kill yarn job if exists , the local thread has exited
|
|
|
|
ProcessUtils.killYarnJob(taskExecutionContext);
|
|
|
|
|
|
|
|
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
|
|
|
|
processService.saveTaskInstance(taskInstance);
|
|
|
|
}
|
|
|
|
logger.info("end worker[{}] failover ...", workerHost);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* failover master tasks
|
|
|
|
*
|
|
|
|
* @param masterHost master host
|
|
|
|
*/
|
|
|
|
private void failoverMaster(String masterHost) {
|
|
|
|
logger.info("start master failover ...");
|
|
|
|
|
|
|
|
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
|
|
|
|
|
|
|
|
//updateProcessInstance host is null and insert into command
|
|
|
|
for(ProcessInstance processInstance : needFailoverProcessInstanceList){
|
|
|
|
if(Constants.NULL.equals(processInstance.getHost()) ){
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
processService.processNeedFailoverProcessInstances(processInstance);
|
|
|
|
}
|
|
|
|
|
|
|
|
logger.info("master failover end");
|
|
|
|
}
|
|
|
|
|
|
|
|
public InterProcessMutex blockAcquireMutex() throws Exception {
|
worker fault tolerance modify (#2212)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
* ProcedureTask optimization
* MasterSchedulerService modify
* TaskUpdateQueueConsumer modify
* test
* DataxTask process run debug
* DataxTask process run debug
* add protobuf dependency,MR、Spark task etc need this
* TaskUpdateQueueConsumer modify
* TaskExecutionContextBuilder set TaskInstance workgroup
* WorkerGroupService queryAllGroup modify
query available work group
* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify
* master and worker register ip use OSUtils.getHost()
* ProcessInstance host set ip:port format
* worker fault tolerance modify
* Constants and .env modify
Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
4 years ago
|
|
|
InterProcessMutex mutex = new InterProcessMutex(getZkClient(), getMasterLockPath());
|
|
|
|
mutex.acquire();
|
|
|
|
return mutex;
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|