From 23ba035182a30700a3589f4bdd1212b523e89ea4 Mon Sep 17 00:00:00 2001 From: Tboy Date: Fri, 27 Mar 2020 22:03:09 +0800 Subject: [PATCH] add TaskResponseServiceTest (#2325) * let quartz use the same datasource * move master/worker config from dao.properties to each config add master/worker registry test * move mybatis config from application.properties to SpringConnectionFactory * move mybatis-plus config from application.properties to SpringConnectionFactory * refactor TaskCallbackService * add ZookeeperNodeManagerTest * add NettyExecutorManagerTest * refactor TaskKillProcessor * add RandomSelectorTest, RoundRobinSelectorTest, TaskCallbackServiceTest * add RoundRobinHostManagerTest, ExecutorDispatcherTest * refactor task response service * add TaskResponseServiceTest --- .../processor/queue/TaskResponseService.java | 100 +++++++++++------- .../queue/TaskResponseServiceTest.java | 66 ++++++++++++ 2 files changed, 129 insertions(+), 37 deletions(-) create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index c471c9c787..b9772ca523 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -25,6 +25,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -37,12 +40,12 @@ public class TaskResponseService { /** * logger */ - private static final Logger logger = LoggerFactory.getLogger(TaskResponseService.class); + private final Logger logger = LoggerFactory.getLogger(TaskResponseService.class); /** * attemptQueue */ - private final BlockingQueue attemptQueue = new LinkedBlockingQueue<>(5000); + private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(5000); /** @@ -51,12 +54,29 @@ public class TaskResponseService { @Autowired private ProcessService processService; + /** + * task response worker + */ + private Thread taskResponseWorker; + @PostConstruct - public void init(){ - TaskWorker taskWorker = new TaskWorker(); - taskWorker.setName("TaskWorkerThread"); - taskWorker.start(); + public void start(){ + this.taskResponseWorker = new TaskResponseWorker(); + this.taskResponseWorker.setName("TaskResponseWorker"); + this.taskResponseWorker.start(); + } + + @PreDestroy + public void stop(){ + this.taskResponseWorker.interrupt(); + if(!eventQueue.isEmpty()){ + List remainEvents = new ArrayList<>(eventQueue.size()); + eventQueue.drainTo(remainEvents); + for(TaskResponseEvent event : remainEvents){ + this.persist(event); + } + } } /** @@ -66,7 +86,7 @@ public class TaskResponseService { */ public void addResponse(TaskResponseEvent taskResponseEvent){ try { - attemptQueue.put(taskResponseEvent); + eventQueue.put(taskResponseEvent); } catch (InterruptedException e) { logger.error("put task : {} error :{}", taskResponseEvent,e); } @@ -76,7 +96,7 @@ public class TaskResponseService { /** * task worker thread */ - class TaskWorker extends Thread { + class TaskResponseWorker extends Thread { @Override public void run() { @@ -84,41 +104,47 @@ public class TaskResponseService { while (Stopper.isRunning()){ try { // if not task , blocking here - TaskResponseEvent taskResponseEvent = attemptQueue.take(); + TaskResponseEvent taskResponseEvent = eventQueue.take(); persist(taskResponseEvent); - - }catch (Exception e){ + } catch (InterruptedException e){ + break; + } catch (Exception e){ logger.error("persist task error",e); } } + logger.info("TaskResponseWorker stopped"); } + } - /** - * persist taskResponseEvent - * @param taskResponseEvent taskResponseEvent - */ - private void persist(TaskResponseEvent taskResponseEvent){ - TaskResponseEvent.Event event = taskResponseEvent.getEvent(); - - switch (event){ - case ACK: - processService.changeTaskState(taskResponseEvent.getState(), - taskResponseEvent.getStartTime(), - taskResponseEvent.getWorkerAddress(), - taskResponseEvent.getExecutePath(), - taskResponseEvent.getLogPath(), - taskResponseEvent.getTaskInstanceId()); - break; - case RESULT: - processService.changeTaskState(taskResponseEvent.getState(), - taskResponseEvent.getEndTime(), - taskResponseEvent.getProcessId(), - taskResponseEvent.getAppIds(), - taskResponseEvent.getTaskInstanceId()); - break; - default: - throw new IllegalArgumentException("invalid event type : " + event); - } + /** + * persist taskResponseEvent + * @param taskResponseEvent taskResponseEvent + */ + private void persist(TaskResponseEvent taskResponseEvent){ + TaskResponseEvent.Event event = taskResponseEvent.getEvent(); + + switch (event){ + case ACK: + processService.changeTaskState(taskResponseEvent.getState(), + taskResponseEvent.getStartTime(), + taskResponseEvent.getWorkerAddress(), + taskResponseEvent.getExecutePath(), + taskResponseEvent.getLogPath(), + taskResponseEvent.getTaskInstanceId()); + break; + case RESULT: + processService.changeTaskState(taskResponseEvent.getState(), + taskResponseEvent.getEndTime(), + taskResponseEvent.getProcessId(), + taskResponseEvent.getAppIds(), + taskResponseEvent.getTaskInstanceId()); + break; + default: + throw new IllegalArgumentException("invalid event type : " + event); } } + + public BlockingQueue getEventQueue() { + return eventQueue; + } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java new file mode 100644 index 0000000000..dcba83271c --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.master.processor.queue; + + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.server.registry.DependencyConfig; +import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import java.util.Date; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, TaskResponseService.class, ZookeeperRegistryCenter.class, + ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskResponseService.class}) +public class TaskResponseServiceTest { + + @Autowired + private TaskResponseService taskResponseService; + + @Test + public void testAdd(){ + TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXEUTION, new Date(), + "", "", "", 1); + taskResponseService.addResponse(taskResponseEvent); + Assert.assertTrue(taskResponseService.getEventQueue().size() == 1); + try { + Thread.sleep(10); + } catch (InterruptedException ignore) { + } + //after sleep, inner worker will take the event + Assert.assertTrue(taskResponseService.getEventQueue().size() == 0); + } + + @Test + public void testStop(){ + TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXEUTION, new Date(), + "", "", "", 1); + taskResponseService.addResponse(taskResponseEvent); + taskResponseService.stop(); + Assert.assertTrue(taskResponseService.getEventQueue().size() == 0); + } +}