diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index de746b05aa..2678532b99 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -138,7 +138,7 @@ public class TaskResponseService { case ACK: try { TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); - if (taskInstance != null && ExecutionStatus.RUNNING_EXECUTION.getCode() >= taskInstance.getState().getCode()) { + if (taskInstance != null && ExecutionStatus.SUCCESS.getCode() != taskInstance.getState().getCode()) { processService.changeTaskState(taskInstance, taskResponseEvent.getState(), taskResponseEvent.getStartTime(), taskResponseEvent.getWorkerAddress(), diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index a9fdb58520..06bac173c7 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -14,55 +14,80 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.master.processor.queue; +package org.apache.dolphinscheduler.server.master.processor.queue; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; -import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; -import org.apache.dolphinscheduler.server.zk.SpringZKServer; -import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; -import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; -import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; -import org.junit.Assert; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.Date; + +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; -import java.util.Date; +import io.netty.channel.Channel; -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, TaskResponseService.class, ZookeeperRegistryCenter.class, - ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskResponseService.class, - CuratorZookeeperClient.class}) +@RunWith(MockitoJUnitRunner.class) public class TaskResponseServiceTest { - @Autowired - private TaskResponseService taskResponseService; + @Mock(name = "processService") + private ProcessService processService; - @Test - public void testAdd(){ - TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION, new Date(), - "", "", "", 1,null); - taskResponseService.addResponse(taskResponseEvent); - Assert.assertTrue(taskResponseService.getEventQueue().size() == 1); - try { - Thread.sleep(10); - } catch (InterruptedException ignore) { - } - //after sleep, inner worker will take the event - Assert.assertTrue(taskResponseService.getEventQueue().size() == 0); + @InjectMocks + TaskResponseService taskResponseService; + + @Mock + private Channel channel; + + private TaskResponseEvent ackEvent; + + private TaskResponseEvent resultEvent; + + private TaskInstance taskInstance; + + @Before + public void before() { + taskResponseService.start(); + + ackEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION, + new Date(), + "127.*.*.*", + "path", + "logPath", + 22, + channel); + + resultEvent = TaskResponseEvent.newResult(ExecutionStatus.SUCCESS, + new Date(), + 1, + "ids", + 22, + "varPol", + channel); + + taskInstance = new TaskInstance(); + taskInstance.setId(22); + taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); } @Test - public void testStop(){ - TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION, new Date(), - "", "", "", 1,null); - taskResponseService.addResponse(taskResponseEvent); + public void testAddResponse() { + Mockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance); + Mockito.when(channel.writeAndFlush(Mockito.any())).thenReturn(null); + taskResponseService.addResponse(ackEvent); + taskResponseService.addResponse(resultEvent); + } + + @After + public void after() { taskResponseService.stop(); - Assert.assertTrue(taskResponseService.getEventQueue().size() == 0); } + }