From 9fe5c3717ded3e42ccde854d5ba23a685c23508d Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Fri, 18 Dec 2020 17:28:37 +0800 Subject: [PATCH] [Fix-4222][Master]Add the priority queue to ensure that tasks are submitted according to priority. (#4250) * [Fix-4222][Master]Add the priority queue to ensure that tasks are submitted according to priority. * [Fix-4222][Master]Add the priority queue to ensure that tasks are submitted according to priority. * [Fix-4222][Master]Remove useless import * [Fix-4222][Master]Reformat code style * [Fix-4222][Master]Reformat code style * [Fix-4222][Master]Reformat code style * [Fix-4222][Master]add PeerTaskInstancePriorityQueueTest * [Fix-4222][Master]Fix code smell * [Fix-4222][Master]Reformat code style * [Fix-4222][Master]Fix code smell Co-authored-by: xingchun-chen <55787491+xingchun-chen@users.noreply.github.com> --- .../consumer/TaskPriorityQueueConsumer.java | 2 +- .../master/runner/MasterExecThread.java | 72 ++++++---- .../TaskPriorityQueueException.java | 44 ++++++ .../queue/PeerTaskInstancePriorityQueue.java | 127 ++++++++++++++++++ .../service/queue/TaskPriorityQueue.java | 21 ++- .../service/queue/TaskPriorityQueueImpl.java | 38 +++--- .../PeerTaskInstancePriorityQueueTest.java | 89 ++++++++++++ pom.xml | 1 + 8 files changed, 342 insertions(+), 52 deletions(-) create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java create mode 100644 dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 89d3e97b8e..a407e4ea90 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -90,7 +90,7 @@ public class TaskPriorityQueueConsumer extends Thread { * taskUpdateQueue */ @Autowired - private TaskPriorityQueue taskPriorityQueue; + private TaskPriorityQueue taskPriorityQueue; /** * processService diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 66de0863e2..e68f9d5937 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -55,6 +55,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.AlertManager; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; +import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; import org.apache.commons.io.FileUtils; @@ -120,9 +121,9 @@ public class MasterExecThread implements Runnable { private Map completeTaskList = new ConcurrentHashMap<>(); /** - * ready to submit task list + * ready to submit task queue */ - private Map readyToSubmitTaskList = new ConcurrentHashMap<>(); + private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue(); /** * depend failed task map @@ -178,8 +179,8 @@ public class MasterExecThread implements Runnable { /** * constructor of MasterExecThread * - * @param processInstance processInstance - * @param processService processService + * @param processInstance processInstance + * @param processService processService * @param nettyRemotingClient nettyRemotingClient */ public MasterExecThread(ProcessInstance processInstance @@ -483,7 +484,7 @@ public class MasterExecThread implements Runnable { * encapsulation task * * @param processInstance process instance - * @param nodeName node name + * @param nodeName node name * @return TaskInstance */ private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName, @@ -563,7 +564,7 @@ public class MasterExecThread implements Runnable { // if previous node success , post node submit for (TaskInstance task : taskInstances) { - if (readyToSubmitTaskList.containsKey(task.getName())) { + if (readyToSubmitTaskQueue.contains(task)) { continue; } @@ -699,7 +700,7 @@ public class MasterExecThread implements Runnable { return true; } if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) { - return readyToSubmitTaskList.size() == 0 || activeTaskNode.size() == 0; + return readyToSubmitTaskQueue.size() == 0 || activeTaskNode.size() == 0; } } return false; @@ -731,7 +732,7 @@ public class MasterExecThread implements Runnable { List pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE); if (CollectionUtils.isNotEmpty(pauseList) || !isComplementEnd() - || readyToSubmitTaskList.size() > 0) { + || readyToSubmitTaskQueue.size() > 0) { return ExecutionStatus.PAUSE; } else { return ExecutionStatus.SUCCESS; @@ -782,7 +783,7 @@ public class MasterExecThread implements Runnable { // success if (state == ExecutionStatus.RUNNING_EXECUTION) { List killTasks = getCompleteTaskByState(ExecutionStatus.KILL); - if (readyToSubmitTaskList.size() > 0) { + if (readyToSubmitTaskQueue.size() > 0) { //tasks currently pending submission, no retries, indicating that depend is waiting to complete return ExecutionStatus.RUNNING_EXECUTION; } else if (CollectionUtils.isNotEmpty(killTasks)) { @@ -804,8 +805,8 @@ public class MasterExecThread implements Runnable { boolean result = false; - for (String taskName : readyToSubmitTaskList.keySet()) { - TaskInstance task = readyToSubmitTaskList.get(taskName); + for (Iterator iter = readyToSubmitTaskQueue.iterator(); iter.hasNext(); ) { + TaskInstance task = iter.next(); if (task.getState().typeIsFailure()) { result = true; break; @@ -872,7 +873,11 @@ public class MasterExecThread implements Runnable { */ private void addTaskToStandByList(TaskInstance taskInstance) { logger.info("add task to stand by list: {}", taskInstance.getName()); - readyToSubmitTaskList.putIfAbsent(taskInstance.getName(), taskInstance); + try { + readyToSubmitTaskQueue.put(taskInstance); + } catch (Exception e) { + logger.error("add task instance to readyToSubmitTaskQueue error"); + } } /** @@ -882,7 +887,11 @@ public class MasterExecThread implements Runnable { */ private void removeTaskFromStandbyList(TaskInstance taskInstance) { logger.info("remove task from stand by list: {}", taskInstance.getName()); - readyToSubmitTaskList.remove(taskInstance.getName()); + try { + readyToSubmitTaskQueue.remove(taskInstance); + } catch (Exception e) { + logger.error("remove task instance from readyToSubmitTaskQueue error"); + } } /** @@ -891,8 +900,8 @@ public class MasterExecThread implements Runnable { * @return Boolean whether has retry task in standby */ private boolean hasRetryTaskInStandBy() { - for (Map.Entry entry : readyToSubmitTaskList.entrySet()) { - if (entry.getValue().getState().typeIsFailure()) { + for (Iterator iter = readyToSubmitTaskQueue.iterator(); iter.hasNext(); ) { + if (iter.next().getState().typeIsFailure()) { return true; } } @@ -1083,20 +1092,25 @@ public class MasterExecThread implements Runnable { * handling the list of tasks to be submitted */ private void submitStandByTask() { - for (Map.Entry entry : readyToSubmitTaskList.entrySet()) { - TaskInstance task = entry.getValue(); - DependResult dependResult = getDependResultForTask(task); - if (DependResult.SUCCESS == dependResult) { - if (retryTaskIntervalOverTime(task)) { - submitTaskExec(task); + try { + int length = readyToSubmitTaskQueue.size(); + for (int i = 0; i < length; i++) { + TaskInstance task = readyToSubmitTaskQueue.peek(); + DependResult dependResult = getDependResultForTask(task); + if (DependResult.SUCCESS == dependResult) { + if (retryTaskIntervalOverTime(task)) { + submitTaskExec(task); + removeTaskFromStandbyList(task); + } + } else if (DependResult.FAILED == dependResult) { + // if the dependency fails, the current node is not submitted and the state changes to failure. + dependFailedTask.put(task.getName(), task); removeTaskFromStandbyList(task); + logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult); } - } else if (DependResult.FAILED == dependResult) { - // if the dependency fails, the current node is not submitted and the state changes to failure. - dependFailedTask.put(entry.getKey(), task); - removeTaskFromStandbyList(task); - logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult); } + } catch (Exception e) { + logger.error("submit standby task error", e); } } @@ -1185,9 +1199,9 @@ public class MasterExecThread implements Runnable { * generate flow dag * * @param processDefinitionJson process definition json - * @param startNodeNameList start node name list - * @param recoveryNodeNameList recovery node name list - * @param depNodeType depend node type + * @param startNodeNameList start node name list + * @param recoveryNodeNameList recovery node name list + * @param depNodeType depend node type * @return ProcessDag process dag * @throws Exception exception */ diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java new file mode 100644 index 0000000000..30a72144bb --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.exceptions; + +/** + * task priority queue exception + */ +public class TaskPriorityQueueException extends Exception { + + /** + * Construct a new runtime exception with the detail message + * + * @param message message + */ + public TaskPriorityQueueException(String message) { + super(message); + } + + /** + * Construct a new runtime exception with the detail message and cause + * + * @param message message + * @param cause cause + */ + public TaskPriorityQueueException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java new file mode 100644 index 0000000000..d7a902550f --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.queue; + +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.PriorityQueue; + +/** + * Task instances priority queue implementation + * All the task instances are in the same process instance. + */ +public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue { + /** + * queue size + */ + private static final Integer QUEUE_MAX_SIZE = 3000; + + /** + * queue + */ + private PriorityQueue queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator()); + + /** + * put task instance to priority queue + * + * @param taskInstance taskInstance + * @throws TaskPriorityQueueException + */ + public void put(TaskInstance taskInstance) throws TaskPriorityQueueException { + queue.add(taskInstance); + } + + /** + * take task info + * + * @return task instance + * @throws TaskPriorityQueueException + */ + @Override + public TaskInstance take() throws TaskPriorityQueueException { + return queue.poll(); + } + + /** + * peek taskInfo + * + * @return task instance + */ + public TaskInstance peek() { + return queue.peek(); + } + + /** + * queue size + * + * @return size + */ + public int size() { + return queue.size(); + } + + /** + * whether contains the task instance + * + * @param taskInstance task instance + * @return true is contains + */ + public boolean contains(TaskInstance taskInstance) { + return queue.contains(taskInstance); + } + + /** + * remove task + * + * @param taskInstance task instance + * @return true if remove success + */ + public boolean remove(TaskInstance taskInstance) { + return queue.remove(taskInstance); + } + + /** + * get iterator + * + * @return Iterator + */ + public Iterator iterator() { + return queue.iterator(); + } + + /** + * TaskInfoComparator + */ + private class TaskInfoComparator implements Comparator { + + /** + * compare o1 o2 + * + * @param o1 o1 + * @param o2 o2 + * @return compare result + */ + @Override + public int compare(TaskInstance o1, TaskInstance o2) { + return o1.getTaskInstancePriority().compareTo(o2.getTaskInstancePriority()); + } + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java index 3ad9aef6c5..14c6b382d4 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java @@ -14,31 +14,38 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.service.queue; +import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; -public interface TaskPriorityQueue { +/** + * task priority queue + * @param + */ +public interface TaskPriorityQueue { /** * put task info * * @param taskInfo taskInfo - * @throws Exception + * @throws TaskPriorityQueueException */ - void put(String taskInfo) throws Exception; + void put(T taskInfo) throws TaskPriorityQueueException; /** * take taskInfo + * * @return taskInfo - * @throws Exception + * @throws TaskPriorityQueueException */ - String take()throws Exception; + T take() throws TaskPriorityQueueException, InterruptedException; /** * size * * @return size - * @throws Exception + * @throws TaskPriorityQueueException */ - int size() throws Exception; + int size() throws TaskPriorityQueueException; } \ No newline at end of file diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java index 0a0fb1b9b0..aefad8499c 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java @@ -14,22 +14,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.service.queue; +import static org.apache.dolphinscheduler.common.Constants.TASK_INFO_LENGTH; +import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; -import org.springframework.stereotype.Service; +import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; -import java.util.*; +import java.util.Comparator; import java.util.concurrent.PriorityBlockingQueue; -import static org.apache.dolphinscheduler.common.Constants.*; +import org.springframework.stereotype.Service; + + /** * A singleton of a task queue implemented with zookeeper * tasks queue implementation */ @Service -public class TaskPriorityQueueImpl implements TaskPriorityQueue { +public class TaskPriorityQueueImpl implements TaskPriorityQueue { /** * queue size */ @@ -44,40 +49,43 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue { * put task takePriorityInfo * * @param taskPriorityInfo takePriorityInfo - * @throws Exception + * @throws TaskPriorityQueueException */ @Override - public void put(String taskPriorityInfo) throws Exception { + public void put(String taskPriorityInfo) throws TaskPriorityQueueException { queue.put(taskPriorityInfo); } /** * take taskInfo + * * @return taskInfo - * @throws Exception + * @throws TaskPriorityQueueException */ @Override - public String take() throws Exception { + public String take() throws TaskPriorityQueueException, InterruptedException { return queue.take(); } /** * queue size + * * @return size - * @throws Exception + * @throws TaskPriorityQueueException */ @Override - public int size() throws Exception { + public int size() throws TaskPriorityQueueException { return queue.size(); } /** * TaskInfoComparator */ - private class TaskInfoComparator implements Comparator{ + private class TaskInfoComparator implements Comparator { /** * compare o1 o2 + * * @param o1 o1 * @param o2 o2 * @return compare result @@ -87,15 +95,15 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue { String s1 = o1; String s2 = o2; String[] s1Array = s1.split(UNDERLINE); - if(s1Array.length > TASK_INFO_LENGTH){ + if (s1Array.length > TASK_INFO_LENGTH) { // warning: if this length > 5, need to be changed - s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE) ); + s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE)); } String[] s2Array = s2.split(UNDERLINE); - if(s2Array.length > TASK_INFO_LENGTH){ + if (s2Array.length > TASK_INFO_LENGTH) { // warning: if this length > 5, need to be changed - s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE) ); + s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE)); } return s1.compareTo(s2); diff --git a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java new file mode 100644 index 0000000000..cf39d57b8b --- /dev/null +++ b/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package queue; + +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Task instances priority queue implementation + * All the task instances are in the same process instance. + */ +public class PeerTaskInstancePriorityQueueTest { + + @Test + public void testPut() throws Exception { + PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH); + TaskInstance taskInstanceMediumPriority = createTaskInstance("high", Priority.MEDIUM); + queue.put(taskInstanceHigPriority); + queue.put(taskInstanceMediumPriority); + Assert.assertEquals(2,queue.size()); + } + + @Test + public void testPeek() throws Exception { + PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); + int peekBeforeLength = queue.size(); + queue.peek(); + Assert.assertEquals(peekBeforeLength,queue.size()); + + } + + @Test + public void testTake() throws Exception { + PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); + int peekBeforeLength = queue.size(); + queue.take(); + Assert.assertTrue(queue.size() < peekBeforeLength); + } + + /** + * get queue + * + * @return queue + * @throws Exception + */ + private PeerTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception { + PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH); + TaskInstance taskInstanceMediumPriority = createTaskInstance("high", Priority.MEDIUM); + queue.put(taskInstanceHigPriority); + queue.put(taskInstanceMediumPriority); + return queue; + } + + /** + * create task instance + * + * @param name name + * @param priority priority + * @return + */ + private TaskInstance createTaskInstance(String name, Priority priority) { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setName(name); + taskInstance.setTaskInstancePriority(priority); + return taskInstance; + } + +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 5163ae7df7..4eda05354a 100644 --- a/pom.xml +++ b/pom.xml @@ -862,6 +862,7 @@ **/service/zk/ZKServerTest.java **/service/zk/CuratorZookeeperClientTest.java **/service/queue/TaskUpdateQueueTest.java + **/service/queue/PeerTaskInstancePriorityQueueTest.java **/dao/mapper/DataSourceUserMapperTest.java