Browse Source

refactor-worker merge to dev bug fix

pull/2/head
qiaozhanwei 5 years ago
parent
commit
94713cef79
  1. 12
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
  2. 29
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java
  3. 5
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
  4. 49
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/BaseTaskQueueTest.java
  5. 229
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskQueueZKImplTest.java
  6. 176
      pom.xml

12
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java

@ -65,11 +65,8 @@ public class DataxTaskTest {
private ApplicationContext applicationContext; private ApplicationContext applicationContext;
<<<<<<< HEAD
private TaskExecutionContext taskExecutionContext; private TaskExecutionContext taskExecutionContext;
=======
private TaskProps props = new TaskProps(); private TaskProps props = new TaskProps();
>>>>>>> remotes/upstream/dev
@Before @Before
public void before() public void before()
@ -82,19 +79,14 @@ public class DataxTaskTest {
springApplicationContext.setApplicationContext(applicationContext); springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
<<<<<<< HEAD
TaskProps props = new TaskProps(); TaskProps props = new TaskProps();
props.setExecutePath("/tmp"); props.setExecutePath("/tmp");
=======
props.setTaskDir("/tmp");
>>>>>>> remotes/upstream/dev
props.setTaskAppId(String.valueOf(System.currentTimeMillis())); props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstanceId(1); props.setTaskInstanceId(1);
props.setTenantCode("1"); props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh"); props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date()); props.setTaskStartTime(new Date());
props.setTaskTimeout(0); props.setTaskTimeout(0);
<<<<<<< HEAD
props.setTaskParams( props.setTaskParams(
"{\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"datasource\":1,\"dsType\":\"MYSQL\",\"datatarget\":2,\"jobSpeedByte\":0,\"sql\":\"select 1 as test from dual\",\"preStatements\":[\"delete from test\"],\"postStatements\":[\"delete from test\"]}"); "{\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"datasource\":1,\"dsType\":\"MYSQL\",\"datatarget\":2,\"jobSpeedByte\":0,\"sql\":\"select 1 as test from dual\",\"preStatements\":[\"delete from test\"],\"postStatements\":[\"delete from test\"]}");
@ -117,10 +109,8 @@ public class DataxTaskTest {
dataxTask = PowerMockito.spy(new DataxTask(taskExecutionContext, logger)); dataxTask = PowerMockito.spy(new DataxTask(taskExecutionContext, logger));
dataxTask.init(); dataxTask.init();
=======
props.setCmdTypeIfComplement(START_PROCESS); props.setCmdTypeIfComplement(START_PROCESS);
setTaskParems(0); setTaskParems(0);
>>>>>>> remotes/upstream/dev
Mockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource()); Mockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource());
Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource()); Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource());
@ -142,7 +132,7 @@ public class DataxTaskTest {
} }
dataxTask = PowerMockito.spy(new DataxTask(props, logger)); dataxTask = PowerMockito.spy(new DataxTask(taskExecutionContext, logger));
dataxTask.init(); dataxTask.init();
} }

29
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -54,9 +55,12 @@ public class ShellTaskTest {
private ShellCommandExecutor shellCommandExecutor; private ShellCommandExecutor shellCommandExecutor;
private ApplicationContext applicationContext; private ApplicationContext applicationContext;
private TaskExecutionContext taskExecutionContext;
@Before @Before
public void before() throws Exception { public void before() throws Exception {
taskExecutionContext = new TaskExecutionContext();
PowerMockito.mockStatic(OSUtils.class); PowerMockito.mockStatic(OSUtils.class);
processService = PowerMockito.mock(ProcessService.class); processService = PowerMockito.mock(ProcessService.class);
shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class); shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class);
@ -67,23 +71,22 @@ public class ShellTaskTest {
PowerMockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); PowerMockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
TaskProps props = new TaskProps(); TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis())); props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1);
props.setTenantCode("1"); props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh"); props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date()); props.setTaskStartTime(new Date());
props.setTaskTimeout(0); props.setTaskTimeout(0);
props.setTaskParams("{\"rawScript\": \" echo 'hello world!'\"}"); props.setTaskParams("{\"rawScript\": \" echo 'hello world!'\"}");
shellTask = new ShellTask(props, logger); shellTask = new ShellTask(taskExecutionContext, logger);
shellTask.init(); shellTask.init();
PowerMockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource()); PowerMockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource());
PowerMockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource()); PowerMockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource());
PowerMockito.when(processService.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance()); PowerMockito.when(processService.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance());
String fileName = String.format("%s/%s_node.%s", props.getTaskDir(), props.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh"); String fileName = String.format("%s/%s_node.%s", taskExecutionContext.getExecutePath(),
PowerMockito.when(shellCommandExecutor.run(fileName, processService)).thenReturn(0); props.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
PowerMockito.when(shellCommandExecutor.run("")).thenReturn(null);
} }
private DataSource getDataSource() { private DataSource getDataSource() {
@ -112,11 +115,9 @@ public class ShellTaskTest {
public void testShellTask() public void testShellTask()
throws Exception { throws Exception {
TaskProps props = new TaskProps(); TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis())); props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1);
props.setTenantCode("1"); props.setTenantCode("1");
ShellTask shellTaskTest = new ShellTask(props, logger); ShellTask shellTaskTest = new ShellTask(taskExecutionContext, logger);
Assert.assertNotNull(shellTaskTest); Assert.assertNotNull(shellTaskTest);
} }
@ -137,15 +138,13 @@ public class ShellTaskTest {
@Test @Test
public void testInitException() { public void testInitException() {
TaskProps props = new TaskProps(); TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis())); props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1);
props.setTenantCode("1"); props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh"); props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date()); props.setTaskStartTime(new Date());
props.setTaskTimeout(0); props.setTaskTimeout(0);
props.setTaskParams("{\"rawScript\": \"\"}"); props.setTaskParams("{\"rawScript\": \"\"}");
ShellTask shellTask = new ShellTask(props, logger); ShellTask shellTask = new ShellTask(taskExecutionContext, logger);
try { try {
shellTask.init(); shellTask.init();
} catch (Exception e) { } catch (Exception e) {
@ -178,9 +177,7 @@ public class ShellTaskTest {
try { try {
PowerMockito.when(OSUtils.isWindows()).thenReturn(false); PowerMockito.when(OSUtils.isWindows()).thenReturn(false);
TaskProps props = new TaskProps(); TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis())); props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1);
props.setTenantCode("1"); props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh"); props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date()); props.setTaskStartTime(new Date());
@ -188,7 +185,7 @@ public class ShellTaskTest {
props.setScheduleTime(new Date()); props.setScheduleTime(new Date());
props.setCmdTypeIfComplement(CommandType.START_PROCESS); props.setCmdTypeIfComplement(CommandType.START_PROCESS);
props.setTaskParams("{\"rawScript\": \" echo ${test}\", \"localParams\": [{\"prop\":\"test\", \"direct\":\"IN\", \"type\":\"VARCHAR\", \"value\":\"123\"}]}"); props.setTaskParams("{\"rawScript\": \" echo ${test}\", \"localParams\": [{\"prop\":\"test\", \"direct\":\"IN\", \"type\":\"VARCHAR\", \"value\":\"123\"}]}");
ShellTask shellTask1 = new ShellTask(props, logger); ShellTask shellTask1 = new ShellTask(taskExecutionContext, logger);
shellTask1.init(); shellTask1.init();
shellTask1.handle(); shellTask1.handle();
Assert.assertTrue(true); Assert.assertTrue(true);
@ -208,9 +205,7 @@ public class ShellTaskTest {
try { try {
Assume.assumeTrue(OSUtils.isWindows()); Assume.assumeTrue(OSUtils.isWindows());
TaskProps props = new TaskProps(); TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis())); props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1);
props.setTenantCode("1"); props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh"); props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date()); props.setTaskStartTime(new Date());
@ -218,7 +213,7 @@ public class ShellTaskTest {
props.setScheduleTime(new Date()); props.setScheduleTime(new Date());
props.setCmdTypeIfComplement(CommandType.START_PROCESS); props.setCmdTypeIfComplement(CommandType.START_PROCESS);
props.setTaskParams("{\"rawScript\": \" echo ${test}\", \"localParams\": [{\"prop\":\"test\", \"direct\":\"IN\", \"type\":\"VARCHAR\", \"value\":\"123\"}]}"); props.setTaskParams("{\"rawScript\": \" echo ${test}\", \"localParams\": [{\"prop\":\"test\", \"direct\":\"IN\", \"type\":\"VARCHAR\", \"value\":\"123\"}]}");
ShellTask shellTask1 = new ShellTask(props, logger); ShellTask shellTask1 = new ShellTask(taskExecutionContext, logger);
shellTask1.init(); shellTask1.init();
shellTask1.handle(); shellTask1.handle();
Assert.assertTrue(true); Assert.assertTrue(true);

5
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java

@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSON;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -58,16 +59,14 @@ public class SqoopTaskTest {
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
TaskProps props = new TaskProps(); TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis())); props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1);
props.setTenantCode("1"); props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh"); props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date()); props.setTaskStartTime(new Date());
props.setTaskTimeout(0); props.setTaskTimeout(0);
props.setTaskParams("{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"); props.setTaskParams("{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}");
sqoopTask = new SqoopTask(props,logger); sqoopTask = new SqoopTask(new TaskExecutionContext(),logger);
sqoopTask.init(); sqoopTask.init();
} }

49
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/BaseTaskQueueTest.java

@ -1,49 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.service.queue.ITaskQueue;
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.apache.dolphinscheduler.service.zk.ZKServer;
import org.junit.*;
/**
* base task queue test for only start zk server once
*/
@Ignore
public class BaseTaskQueueTest {
protected static ITaskQueue tasksQueue = null;
@BeforeClass
public static void setup() {
ZKServer.start();
tasksQueue = TaskQueueFactory.getTaskQueueInstance();
//clear all data
tasksQueue.delete();
}
@AfterClass
public static void tearDown() {
tasksQueue.delete();
ZKServer.stop();
}
@Test
public void tasksQueueNotNull(){
Assert.assertNotNull(tasksQueue);
}
}

229
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskQueueZKImplTest.java

@ -1,229 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.IpUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.util.List;
import java.util.Random;
import static org.junit.Assert.*;
/**
* task queue test
*/
@Ignore
public class TaskQueueZKImplTest extends BaseTaskQueueTest {
@Before
public void before(){
//clear all data
tasksQueue.delete();
}
@After
public void after(){
//clear all data
tasksQueue.delete();
}
/**
* test take out all the elements
*/
@Test
public void getAllTasks(){
//add
init();
// get all
List<String> allTasks = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
assertEquals(2, allTasks.size());
//delete all
tasksQueue.delete();
allTasks = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
assertEquals(0, allTasks.size());
}
@Test
public void hasTask(){
init();
boolean hasTask = tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
assertTrue(hasTask);
//delete all
tasksQueue.delete();
hasTask = tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
assertFalse(hasTask);
}
/**
* test check task exists in the task queue or not
*/
@Test
public void checkTaskExists(){
String task= "1_0_1_1_-1";
//add
init();
// check Exist true
boolean taskExists = tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, task);
assertTrue(taskExists);
//remove task
tasksQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
// check Exist false
taskExists = tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, task);
assertFalse(taskExists);
}
/**
* test add element to the queue
*/
@Test
public void add(){
//add
tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_0_1_1_-1");
tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_1_1_1_-1");
tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_0_0_1_" + IpUtils.ipToLong(OSUtils.getHost()));
tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_2_1_1_" + IpUtils.ipToLong(OSUtils.getHost()) + 10);
List<String> tasks = tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 1);
if(tasks.size() <= 0){
return;
}
//pop
String node1 = tasks.get(0);
assertEquals(node1,"0_0_0_1_" + IpUtils.ipToLong(OSUtils.getHost()));
}
/**
* test element pops out of the queue
*/
@Test
public void poll(){
//add
init();
List<String> taskList = tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 2);
assertEquals(2, taskList.size());
assertEquals("0_1_1_1_-1", taskList.get(0));
assertEquals("1_0_1_1_-1", taskList.get(1));
}
/**
* test remove element from queue
*/
@Test
public void removeNode(){
String task = "1_0_1_1_-1";
//add
init();
tasksQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
assertFalse(tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task));
}
/**
* test add an element to the set
*/
@Test
public void sadd(){
String task = "1_0_1_1_-1";
tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
//check size
assertEquals(1, tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size());
}
/**
* test delete the value corresponding to the key in the set
*/
@Test
public void srem(){
String task = "1_0_1_1_-1";
tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
//check size
assertEquals(1, tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size());
//remove and get size
tasksQueue.srem(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
assertEquals(0, tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size());
}
/**
* test gets all the elements of the set based on the key
*/
@Test
public void smembers(){
//first init
assertEquals(0, tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size());
//add
String task = "1_0_1_1_-1";
tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
//check size
assertEquals(1, tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size());
//add
task = "0_1_1_1_";
tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task);
//check size
assertEquals(2, tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size());
}
/**
* init data
*/
private void init(){
//add
tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_0_1_1_-1");
tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_1_1_1_-1");
}
/**
* test one million data from zookeeper queue
*/
@Ignore
@Test
public void extremeTest(){
int total = 30 * 10000;
for(int i = 0; i < total; i++) {
for(int j = 0; j < total; j++) {
//${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
//format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
String formatTask = String.format("%s_%d_%s_%d", i, i + 1, j, j == 0 ? 0 : j + new Random().nextInt(100));
tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, formatTask);
}
}
String node1 = tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 1).get(0);
assertEquals("0", node1);
}
}

176
pom.xml

@ -683,53 +683,141 @@
<version>${maven-surefire-plugin.version}</version> <version>${maven-surefire-plugin.version}</version>
<configuration> <configuration>
<includes> <includes>
<include>**/common/utils/*.java</include> <include>**/alert/template/AlertTemplateFactoryTest.java</include>
<include>**/common/utils/process/ProcessBuilderForWin32Test.java</include> <include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include>
<include>**/common/utils/process/ProcessEnvironmentForWin32Test.java</include> <include>**/alert/utils/EnterpriseWeChatUtilsTest.java</include>
<include>**/common/utils/process/ProcessImplForWin32Test.java</include> <include>**/alert/utils/ExcelUtilsTest.java</include>
<include>**/common/log/*.java</include> <include>**/alert/utils/FuncUtilsTest.java</include>
<include>**/common/threadutils/*.java</include> <include>**/alert/utils/JSONUtilsTest.java</include>
<include>**/common/graph/*.java</include> <include>**/alert/utils/MailUtilsTest.java</include>
<include>**/common/queue/*.java</include> <include>**/alert/utils/PropertyUtilsTest.java</include>
<include>**/common/task/FlinkParametersTest.java</include> <include>**/api/controller/AccessTokenControllerTest.java</include>
<include>**/common/task/SqoopParameterEntityTest.java</include> <include>**/api/controller/AlertGroupControllerTest.java</include>
<include>**/api/utils/CheckUtilsTest.java</include> <include>**/api/controller/DataAnalysisControllerTest.java</include>
<include>**/api/utils/FileUtilsTest.java</include>
<include>**/api/utils/FourLetterWordTest.java</include>
<include>**/api/utils/exportprocess/DataSourceParamTest.java</include>
<include>**/api/utils/exportprocess/DependentParamTest.java</include>
<include>**/api/enums/*.java</include>
<include>**/api/controller/DataSourceControllerTest.java</include> <include>**/api/controller/DataSourceControllerTest.java</include>
<include>**/api/controller/ExecutorControllerTest.java</include>
<include>**/api/controller/LoggerControllerTest.java</include>
<include>**/api/controller/LoginControllerTest.java</include>
<include>**/api/controller/MonitorControllerTest.java</include>
<include>**/api/controller/ProcessDefinitionControllerTest.java</include>
<include>**/api/controller/ProcessInstanceControllerTest.java</include>
<include>**/api/controller/ProjectControllerTest.java</include>
<include>**/api/controller/QueueControllerTest.java</include>
<include>**/api/controller/ResourcesControllerTest.java</include>
<include>**/api/controller/SchedulerControllerTest.java</include>
<include>**/api/controller/TaskInstanceControllerTest.java</include>
<include>**/api/controller/TaskRecordControllerTest.java</include>
<include>**/api/controller/TenantControllerTest.java</include>
<include>**/api/controller/UsersControllerTest.java</include>
<include>**/api/controller/WorkerGroupControllerTest.java</include>
<include>**/api/dto/resources/filter/ResourceFilterTest.java</include>
<include>**/api/dto/resources/visitor/ResourceTreeVisitorTest.java</include>
<include>**/api/enums/testGetEnum.java</include>
<include>**/api/enums/StatusTest.java</include>
<include>**/api/interceptor/LoginHandlerInterceptorTest.java</include>
<include>**/api/security/PasswordAuthenticatorTest.java</include>
<include>**/api/security/SecurityConfigTest.java</include>
<include>**/api/service/AccessTokenServiceTest.java</include> <include>**/api/service/AccessTokenServiceTest.java</include>
<include>**/api/service/QueueServiceTest.java</include>
<include>**/api/service/MonitorServiceTest.java</include>
<include>**/api/service/SessionServiceTest.java</include>
<include>**/api/service/UsersServiceTest.java</include>
<include>**/api/service/TenantServiceTest.java</include>
<include>**/api/service/WorkerGroupServiceTest.java</include>
<include>**/api/service/AlertGroupServiceTest.java</include> <include>**/api/service/AlertGroupServiceTest.java</include>
<include>**/api/service/UserAlertGroupServiceTest.java</include>
<include>**/api/service/ProjectServiceTest.java</include>
<include>**/api/service/ProcessDefinitionServiceTest.java</include>
<include>**/api/service/UdfFuncServiceTest.java</include>
<include>**/api/service/ResourcesServiceTest.java</include>
<include>**/api/service/ExecutorService2Test.java</include>
<include>**/api/service/BaseServiceTest.java</include>
<include>**/api/service/BaseDAGServiceTest.java</include> <include>**/api/service/BaseDAGServiceTest.java</include>
<include>**/api/service/LoggerServiceTest.java</include> <include>**/api/service/BaseServiceTest.java</include>
<include>**/api/service/DataAnalysisServiceTest.java</include> <include>**/api/service/DataAnalysisServiceTest.java</include>
<include>**/api/service/DataSourceServiceTest.java</include>
<include>**/api/service/ExecutorService2Test.java</include>
<include>**/api/service/ExecutorServiceTest.java</include>
<include>**/api/service/LoggerServiceTest.java</include>
<include>**/api/service/MonitorServiceTest.java</include>
<include>**/api/service/ProcessDefinitionServiceTest.java</include>
<include>**/api/service/ProcessInstanceServiceTest.java</include> <include>**/api/service/ProcessInstanceServiceTest.java</include>
<include>**/api/service/ProjectServiceTest.java</include>
<include>**/api/service/QueueServiceTest.java</include>
<include>**/api/service/ResourcesServiceTest.java</include>
<include>**/api/service/SchedulerServiceTest.java</include>
<include>**/api/service/SessionServiceTest.java</include>
<include>**/api/service/TaskInstanceServiceTest.java</include> <include>**/api/service/TaskInstanceServiceTest.java</include>
<include>**/alert/utils/ExcelUtilsTest.java</include> <include>**/api/service/TenantServiceTest.java</include>
<include>**/alert/utils/FuncUtilsTest.java</include> <include>**/api/service/UdfFuncServiceTest.java</include>
<include>**/alert/utils/JSONUtilsTest.java</include> <include>**/api/service/UserAlertGroupServiceTest.java</include>
<include>**/alert/utils/PropertyUtilsTest.java</include> <include>**/api/service/UsersServiceTest.java</include>
<include>**/alert/template/AlertTemplateFactoryTest.java</include> <include>**/api/service/WorkerGroupServiceTest.java</include>
<include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include> <include>**/api/utils/exportprocess/DataSourceParamTest.java</include>
<include>**/api/utils/exportprocess/DependentParamTest.java</include>
<include>**/api/utils/CheckUtilsTest.java</include>
<include>**/api/utils/FileUtilsTest.java</include>
<include>**/api/utils/FourLetterWordMainTest.java</include>
<include>**/api/utils/ZookeeperMonitorUtilsTest.java</include>
<include>**/api/utils/CheckUtilsTest.java</include>
<include>**/api/utils/CheckUtilsTest.java</include>
<include>**/api/HttpClientTest.java</include>
<include>**/common/graph/DAGTest.java</include>
<include>**/common/os/OshiTest.java</include>
<include>**/common/os/OSUtilsTest.java</include>
<include>**/common/shell/ShellExecutorTest.java</include>
<include>**/common/task/EntityTestUtils.java</include>
<include>**/common/task/FlinkParametersTest.java</include>
<include>**/common/task/SqoopParameterEntityTest.java</include>
<include>**/common/threadutils/ThreadPoolExecutorsTest.java</include>
<include>**/common/threadutils/ThreadUtilsTest.java</include>
<include>**/common/utils/placeholder/TimePlaceholderUtilsTest.java</include>
<include>**/common/utils/process/ProcessBuilderForWin32Test.java</include>
<include>**/common/utils/process/ProcessEnvironmentForWin32Test.java</include>
<include>**/common/utils/process/ProcessImplForWin32Test.java</include>
<include>**/common/utils/CollectionUtilsTest.java</include>
<include>**/common/utils/CommonUtilsTest.java</include>
<include>**/common/utils/DateUtilsTest.java</include>
<include>**/common/utils/DependentUtilsTest.java</include>
<include>**/common/utils/EncryptionUtilsTest.java</include>
<include>**/common/utils/FileUtilsTest.java</include>
<include>**/common/utils/HadoopUtilsTest.java</include>
<include>**/common/utils/IpUtilsTest.java</include>
<include>**/common/utils/JSONUtilsTest.java</include>
<include>**/common/utils/LoggerUtilsTest.java</include>
<include>**/common/utils/OSUtilsTest.java</include>
<include>**/common/utils/ParameterUtilsTest.java</include>
<include>**/common/utils/PreconditionsTest.java</include>
<include>**/common/utils/PropertyUtilsTest.java</include>
<include>**/common/utils/SchemaUtilsTest.java</include>
<include>**/common/utils/ScriptRunnerTest.java</include>
<include>**/common/utils/SensitiveLogUtilsTest.java</include>
<include>**/common/utils/StringTest.java</include>
<include>**/common/utils/StringUtilsTest.java</include>
<include>**/common/utils/TaskParametersUtilsTest.java</include>
<include>**/common/ConstantsTest.java</include>
<include>**/dao/mapper/AccessTokenMapperTest.java</include> <include>**/dao/mapper/AccessTokenMapperTest.java</include>
<include>**/dao/mapper/AlertGroupMapperTest.java</include> <include>**/dao/mapper/AlertGroupMapperTest.java</include>
<include>**/dao/mapper/AlertMapperTest.java</include> <include>**/dao/mapper/AlertMapperTest.java</include>
<include>**/dao/mapper/CommandMapperTest.java</include>
<include>**/dao/mapper/ConnectionFactoryTest.java</include>
<include>**/dao/mapper/DataSourceMapperTest.java</include> <include>**/dao/mapper/DataSourceMapperTest.java</include>
<include>**/dao/mapper/DataSourceUserMapperTest.java</include>
<include>**/dao/mapper/ErrorCommandMapperTest.java</include>
<include>**/dao/mapper/ProcessDefinitionMapperTest.java</include>
<include>**/dao/mapper/ProcessInstanceMapMapperTest.java</include>
<include>**/dao/mapper/ProcessInstanceMapperTest.java</include>
<include>**/dao/mapper/ProjectMapperTest.java</include>
<include>**/dao/mapper/ProjectUserMapperTest.java</include>
<include>**/dao/mapper/QueueMapperTest.java</include>
<include>**/dao/mapper/ResourceMapperTest.java</include>
<include>**/dao/mapper/ResourceUserMapperTest.java</include>
<include>**/dao/mapper/ScheduleMapperTest.java</include>
<include>**/dao/mapper/SessionMapperTest.java</include>
<include>**/dao/mapper/TaskInstanceMapperTest.java</include>
<include>**/dao/mapper/TenantMapperTest.java</include>
<include>**/dao/mapper/UdfFuncMapperTest.java</include>
<include>**/dao/mapper/UDFUserMapperTest.java</include>
<include>**/dao/mapper/UserAlertGroupMapperTest.java</include>
<include>**/dao/mapper/UserMapperTest.java</include>
<include>**/dao/mapper/WorkerGroupMapperTest.java</include>
<include>**/dao/utils/DagHelperTest.java</include>
<include>**/dao/AlertDaoTest.java</include>
<include>**/remote/FastJsonSerializerTest.java</include>
<include>**/remote/NettyRemotingClientTest.java</include>
<include>**/remote/ResponseFutureTest.java</include>
<include>**/server/log/MasterLogFilterTest.java</include> <include>**/server/log/MasterLogFilterTest.java</include>
<include>**/server/log/SensitiveDataConverterTest.java</include> <include>**/server/log/SensitiveDataConverterTest.java</include>
<include>**/server/log/TaskLogDiscriminatorTest.java</include> <include>**/server/log/TaskLogDiscriminatorTest.java</include>
@ -761,20 +849,16 @@
<include>**/server/worker/task/dependent/DependentTaskTest.java</include> <include>**/server/worker/task/dependent/DependentTaskTest.java</include>
<include>**/server/worker/task/spark/SparkTaskTest.java</include> <include>**/server/worker/task/spark/SparkTaskTest.java</include>
<include>**/server/worker/task/EnvFileTest.java</include> <include>**/server/worker/task/EnvFileTest.java</include>
<include>**/dao/mapper/CommandMapperTest.java</include>
<include>**/dao/entity/TaskInstanceTest.java</include>
<include>**/dao/cron/CronUtilsTest.java</include>
<include>**/dao/utils/DagHelperTest.java</include>
<include>**/alert/template/AlertTemplateFactoryTest.java</include>
<include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include>
<include>**/server/worker/task/datax/DataxTaskTest.java</include> <include>**/server/worker/task/datax/DataxTaskTest.java</include>
<include>**/server/worker/task/dependent/DependentTaskTest.java</include>
<include>**/server/worker/task/shell/ShellTaskTest.java</include> <include>**/server/worker/task/shell/ShellTaskTest.java</include>
<include>**/server/worker/task/spark/SparkTaskTest.java</include>
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include> <include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include> <include>**/server/worker/EnvFileTest.java</include>
<include>**/service/quartz/cron/CronUtilsTest.java</include>
<include>**/service/zk/DefaultEnsembleProviderTest.java</include> <include>**/service/zk/DefaultEnsembleProviderTest.java</include>
<include>**/dao/datasource/BaseDataSourceTest.java</include> <include>**/service/zk/ZKServerTest.java</include>
<include>**/alert/utils/MailUtilsTest.java</include> <include>**/service/queue/TaskUpdateQueueTest.java</include>
<include>**/dao/AlertDaoTest.java</include>
</includes> </includes>
<!-- <skip>true</skip> --> <!-- <skip>true</skip> -->
<argLine>-Xmx2048m</argLine> <argLine>-Xmx2048m</argLine>

Loading…
Cancel
Save