From c21d25edf2b89cbf1c19db0d2da311503652a185 Mon Sep 17 00:00:00 2001 From: elonlo Date: Fri, 3 Jan 2020 18:37:39 +0800 Subject: [PATCH 1/2] Fixed tasks_queue and tasks_kill did not exist in zookeeper #1696 --- .../common/queue/TaskQueueZkImpl.java | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java index 45c6122341..d022c5e9be 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java @@ -37,8 +37,27 @@ public class TaskQueueZkImpl implements ITaskQueue { private static final Logger logger = LoggerFactory.getLogger(TaskQueueZkImpl.class); + private final ZookeeperOperator zookeeperOperator; + @Autowired - private ZookeeperOperator zookeeperOperator; + public TaskQueueZkImpl(ZookeeperOperator zookeeperOperator) { + this.zookeeperOperator = zookeeperOperator; + + try { + String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); + String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL); + + for(String key : new String[]{tasksQueuePath,tasksCancelPath}){ + if(!zookeeperOperator.isExisted(key)){ + zookeeperOperator.persist(key, ""); + logger.info("create tasks queue parent node success : {}", key); + } + } + } catch (Exception e) { + logger.error("create tasks queue parent node failure", e); + } + } + /** * get all tasks from tasks queue From cae656c1bc230e9097a94ab88b6cef379cbb51e0 Mon Sep 17 00:00:00 2001 From: elonlo Date: Fri, 3 Jan 2020 19:25:10 +0800 Subject: [PATCH 2/2] uniform naming --- .../common/queue/TaskQueueZkImpl.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java index d022c5e9be..f4e865e4f5 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java @@ -45,10 +45,10 @@ public class TaskQueueZkImpl implements ITaskQueue { try { String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); - String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL); + String tasksKillPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL); - for(String key : new String[]{tasksQueuePath,tasksCancelPath}){ - if(!zookeeperOperator.isExisted(key)){ + for (String key : new String[]{tasksQueuePath,tasksKillPath}){ + if (!zookeeperOperator.isExisted(key)){ zookeeperOperator.persist(key, ""); logger.info("create tasks queue parent node success : {}", key); } @@ -340,20 +340,20 @@ public class TaskQueueZkImpl implements ITaskQueue { public void delete(){ try { String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); - String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL); + String tasksKillPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL); - for(String taskQueuePath : new String[]{tasksQueuePath,tasksCancelPath}){ - if(zookeeperOperator.isExisted(taskQueuePath)){ - List list = zookeeperOperator.getChildrenKeys(taskQueuePath); + for (String key : new String[]{tasksQueuePath,tasksKillPath}){ + if (zookeeperOperator.isExisted(key)){ + List list = zookeeperOperator.getChildrenKeys(key); for (String task : list) { - zookeeperOperator.remove(taskQueuePath + Constants.SINGLE_SLASH + task); - logger.info("delete task from tasks queue : {}/{} ",taskQueuePath,task); + zookeeperOperator.remove(key + Constants.SINGLE_SLASH + task); + logger.info("delete task from tasks queue : {}/{} ", key, task); } } } } catch (Exception e) { - logger.error("delete all tasks in tasks queue failure",e); + logger.error("delete all tasks in tasks queue failure", e); } }