Browse Source

add taskResponseTest

pull/3/MERGE
CalvinKirs 4 years ago
parent
commit
6c1a2ebeab
  1. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
  2. 97
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java

@ -138,7 +138,7 @@ public class TaskResponseService {
case ACK:
try {
TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
if (taskInstance != null && ExecutionStatus.RUNNING_EXECUTION.getCode() >= taskInstance.getState().getCode()) {
if (taskInstance != null && ExecutionStatus.SUCCESS.getCode() != taskInstance.getState().getCode()) {
processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
taskResponseEvent.getStartTime(),
taskResponseEvent.getWorkerAddress(),

97
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java

@ -14,55 +14,80 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.processor.queue;
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.registry.DependencyConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Assert;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Date;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Date;
import io.netty.channel.Channel;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, TaskResponseService.class, ZookeeperRegistryCenter.class,
ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskResponseService.class,
CuratorZookeeperClient.class})
@RunWith(MockitoJUnitRunner.class)
public class TaskResponseServiceTest {
@Autowired
private TaskResponseService taskResponseService;
@Mock(name = "processService")
private ProcessService processService;
@Test
public void testAdd(){
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION, new Date(),
"", "", "", 1,null);
taskResponseService.addResponse(taskResponseEvent);
Assert.assertTrue(taskResponseService.getEventQueue().size() == 1);
try {
Thread.sleep(10);
} catch (InterruptedException ignore) {
}
//after sleep, inner worker will take the event
Assert.assertTrue(taskResponseService.getEventQueue().size() == 0);
@InjectMocks
TaskResponseService taskResponseService;
@Mock
private Channel channel;
private TaskResponseEvent ackEvent;
private TaskResponseEvent resultEvent;
private TaskInstance taskInstance;
@Before
public void before() {
taskResponseService.start();
ackEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION,
new Date(),
"127.*.*.*",
"path",
"logPath",
22,
channel);
resultEvent = TaskResponseEvent.newResult(ExecutionStatus.SUCCESS,
new Date(),
1,
"ids",
22,
"varPol",
channel);
taskInstance = new TaskInstance();
taskInstance.setId(22);
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
}
@Test
public void testStop(){
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION, new Date(),
"", "", "", 1,null);
taskResponseService.addResponse(taskResponseEvent);
public void testAddResponse() {
Mockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance);
Mockito.when(channel.writeAndFlush(Mockito.any())).thenReturn(null);
taskResponseService.addResponse(ackEvent);
taskResponseService.addResponse(resultEvent);
}
@After
public void after() {
taskResponseService.stop();
Assert.assertTrue(taskResponseService.getEventQueue().size() == 0);
}
}

Loading…
Cancel
Save