|
|
|
@ -45,10 +45,10 @@ public class TaskQueueZkImpl implements ITaskQueue {
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); |
|
|
|
|
String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL); |
|
|
|
|
String tasksKillPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL); |
|
|
|
|
|
|
|
|
|
for(String key : new String[]{tasksQueuePath,tasksCancelPath}){ |
|
|
|
|
if(!zookeeperOperator.isExisted(key)){ |
|
|
|
|
for (String key : new String[]{tasksQueuePath,tasksKillPath}){ |
|
|
|
|
if (!zookeeperOperator.isExisted(key)){ |
|
|
|
|
zookeeperOperator.persist(key, ""); |
|
|
|
|
logger.info("create tasks queue parent node success : {}", key); |
|
|
|
|
} |
|
|
|
@ -340,20 +340,20 @@ public class TaskQueueZkImpl implements ITaskQueue {
|
|
|
|
|
public void delete(){ |
|
|
|
|
try { |
|
|
|
|
String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); |
|
|
|
|
String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL); |
|
|
|
|
String tasksKillPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL); |
|
|
|
|
|
|
|
|
|
for(String taskQueuePath : new String[]{tasksQueuePath,tasksCancelPath}){ |
|
|
|
|
if(zookeeperOperator.isExisted(taskQueuePath)){ |
|
|
|
|
List<String> list = zookeeperOperator.getChildrenKeys(taskQueuePath); |
|
|
|
|
for (String key : new String[]{tasksQueuePath,tasksKillPath}){ |
|
|
|
|
if (zookeeperOperator.isExisted(key)){ |
|
|
|
|
List<String> list = zookeeperOperator.getChildrenKeys(key); |
|
|
|
|
for (String task : list) { |
|
|
|
|
zookeeperOperator.remove(taskQueuePath + Constants.SINGLE_SLASH + task); |
|
|
|
|
logger.info("delete task from tasks queue : {}/{} ",taskQueuePath,task); |
|
|
|
|
zookeeperOperator.remove(key + Constants.SINGLE_SLASH + task); |
|
|
|
|
logger.info("delete task from tasks queue : {}/{} ", key, task); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
logger.error("delete all tasks in tasks queue failure",e); |
|
|
|
|
logger.error("delete all tasks in tasks queue failure", e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|