From b3120a74d2656f7ad2054ba8245262551063b549 Mon Sep 17 00:00:00 2001 From: BoYiZhang <39816903+BoYiZhang@users.noreply.github.com> Date: Fri, 11 Dec 2020 15:35:40 +0800 Subject: [PATCH] [Feature-4138][Master] dispatch workgroup error add sleep time (#4139) * When there are tasks with assignment failure and the number of tasks in the current task queue is less than 10, sleep for 1 second * When there are tasks with assignment failure and the number of tasks in the current task queue is less than 10, sleep for 1 second * fix code smell & code style * fix code smell & code style Co-authored-by: zhanglong --- .../consumer/TaskPriorityQueueConsumer.java | 46 +- .../TaskPriorityQueueConsumerTest.java | 459 ++++++++++++++++-- 2 files changed, 456 insertions(+), 49 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 822e493076..89d3e97b8e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -64,6 +64,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -136,9 +137,17 @@ public class TaskPriorityQueueConsumer extends Thread { failedDispatchTasks.add(taskPriorityInfo); } } - for (String dispatchFailedTask : failedDispatchTasks) { - taskPriorityQueue.put(dispatchFailedTask); + if (!failedDispatchTasks.isEmpty()) { + for (String dispatchFailedTask : failedDispatchTasks) { + taskPriorityQueue.put(dispatchFailedTask); + } + // If there are tasks in a cycle that cannot find the worker group, + // sleep for 1 second + if (taskPriorityQueue.size() <= failedDispatchTasks.size()) { + TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS); + } } + } catch (Exception e) { logger.error("dispatcher task error", e); } @@ -151,7 +160,7 @@ public class TaskPriorityQueueConsumer extends Thread { * @param taskInstanceId taskInstanceId * @return result */ - private boolean dispatch(int taskInstanceId) { + protected boolean dispatch(int taskInstanceId) { boolean result = false; try { TaskExecutionContext context = getTaskExecutionContext(taskInstanceId); @@ -224,7 +233,6 @@ public class TaskPriorityQueueConsumer extends Thread { // SQL task if (taskType == TaskType.SQL) { setSQLTaskRelation(sqlTaskExecutionContext, taskNode); - } // DATAX task @@ -271,22 +279,22 @@ public class TaskPriorityQueueConsumer extends Thread { * @param dataxTaskExecutionContext dataxTaskExecutionContext * @param taskNode taskNode */ - private void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) { + protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) { DataxParameters dataxParameters = JSONUtils.parseObject(taskNode.getParams(), DataxParameters.class); - DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource()); - DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget()); + DataSource dbSource = processService.findDataSourceById(dataxParameters.getDataSource()); + DataSource dbTarget = processService.findDataSourceById(dataxParameters.getDataTarget()); - if (dataSource != null) { + if (dbSource != null) { dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource()); - dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); - dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); + dataxTaskExecutionContext.setSourcetype(dbSource.getType().getCode()); + dataxTaskExecutionContext.setSourceConnectionParams(dbSource.getConnectionParams()); } - if (dataTarget != null) { + if (dbTarget != null) { dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget()); - dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); - dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); + dataxTaskExecutionContext.setTargetType(dbTarget.getType().getCode()); + dataxTaskExecutionContext.setTargetConnectionParams(dbTarget.getConnectionParams()); } } @@ -374,7 +382,7 @@ public class TaskPriorityQueueConsumer extends Thread { * @param taskInstance taskInstance * @return result */ - private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) { + protected boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) { if (tenant == null) { logger.error("tenant not exists,process instance id : {},task instance id : {}", taskInstance.getProcessInstance().getId(), @@ -387,8 +395,8 @@ public class TaskPriorityQueueConsumer extends Thread { /** * get resource map key is full name and value is tenantCode */ - private Map getResourceFullNames(TaskNode taskNode) { - Map resourceMap = new HashMap<>(); + protected Map getResourceFullNames(TaskNode taskNode) { + Map resourcesMap = new HashMap<>(); AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()); if (baseParam != null) { @@ -400,7 +408,7 @@ public class TaskPriorityQueueConsumer extends Thread { if (CollectionUtils.isNotEmpty(oldVersionResources)) { oldVersionResources.forEach( - (t) -> resourceMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE)) + (t) -> resourcesMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE)) ); } @@ -413,12 +421,12 @@ public class TaskPriorityQueueConsumer extends Thread { List resources = processService.listResourceByIds(resourceIds); resources.forEach( - (t) -> resourceMap.put(t.getFullName(), processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE)) + (t) -> resourcesMap.put(t.getFullName(), processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE)) ); } } } - return resourceMap; + return resourcesMap; } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index a9c6985fca..049e30e732 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -17,18 +17,22 @@ package org.apache.dolphinscheduler.server.master.consumer; -import java.util.Date; - import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; @@ -42,7 +46,15 @@ import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -51,9 +63,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - @RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, CuratorZookeeperClient.class, +@ContextConfiguration(classes = {DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, CuratorZookeeperClient.class, NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class, ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, MasterConfig.class, CuratorZookeeperClient.class}) @@ -73,7 +84,7 @@ public class TaskPriorityQueueConsumerTest { private ExecutorDispatcher dispatcher; @Before - public void init(){ + public void init() { Tenant tenant = new Tenant(); tenant.setId(1); @@ -83,12 +94,11 @@ public class TaskPriorityQueueConsumerTest { tenant.setCreateTime(new Date()); tenant.setUpdateTime(new Date()); - Mockito.doReturn(tenant).when(processService).getTenantForProcess(1,2); + Mockito.doReturn(tenant).when(processService).getTenantForProcess(1, 2); Mockito.doReturn("default").when(processService).queryUserQueueByProcessInstanceId(1); } - @Test public void testSHELLTask() throws Exception { TaskInstance taskInstance = new TaskInstance(); @@ -97,12 +107,31 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setProcessDefinitionId(1); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); - taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-55201\",\"maxRetryTimes\":0,\"name\":\"测试任务\",\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}"); + taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," + + "\"conditionsTask\":false," + + "\"depList\":[]," + + "\"dependence\":\"{}\"," + + "\"forbidden\":false," + + "\"id\":\"tasks-55201\"," + + "\"maxRetryTimes\":0," + + "\"name\":\"测试任务\"," + + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\"," + + "\"preTasks\":\"[]\"," + + "\"retryInterval\":1," + + "\"runFlag\":\"NORMAL\"," + + "\"taskInstancePriority\":\"MEDIUM\"," + + "\"taskTimeoutParameter\":{\"enable\":false," + + "\"interval\":0}," + + "\"timeout\":\"{\\\"enable\\\":false," + + "\\\"strategy\\\":\\\"\\\"}\"," + + "\"type\":\"SHELL\"," + + "\"workerGroup\":\"default\"}"); taskInstance.setProcessInstancePriority(Priority.MEDIUM); taskInstance.setWorkerGroup("default"); taskInstance.setExecutorId(2); ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(1); processInstance.setTenantId(1); processInstance.setCommandType(CommandType.START_PROCESS); taskInstance.setProcessInstance(processInstance); @@ -113,11 +142,14 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setProcessDefine(processDefinition); Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); + Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); + taskPriorityQueue.put("2_1_2_1_default"); - Thread.sleep(10000); - } + TimeUnit.SECONDS.sleep(10); + Assert.assertNotNull(taskInstance); + } @Test public void testSQLTask() throws Exception { @@ -127,7 +159,13 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setProcessDefinitionId(1); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); - taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\",\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\",\\\"sql\\\":\\\"select id,name,ds,zodia(ds) from t_journey_user\\\",\\\"preStatements\\\":[],\\\"sqlType\\\":0,\\\"receivers\\\":\\\"825193156@qq.com\\\",\\\"datasource\\\":3,\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQL\"}"); + taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\"," + + "\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\"," + + "\\\"sql\\\":\\\"select id,name,ds,zodia(ds) from t_journey_user\\\",\\\"preStatements\\\":[]," + + "\\\"sqlType\\\":0,\\\"receivers\\\":\\\"825193156@qq.com\\\",\\\"datasource\\\":3,\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[]}\"," + + "\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\"," + + "\"taskInstancePriority\":\"MEDIUM\"," + + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQL\"}"); taskInstance.setProcessInstancePriority(Priority.MEDIUM); taskInstance.setWorkerGroup("default"); taskInstance.setExecutorId(2); @@ -149,16 +187,20 @@ public class TaskPriorityQueueConsumerTest { dataSource.setName("sqlDatasource"); dataSource.setType(DbType.MYSQL); dataSource.setUserId(2); - dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}"); + dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\"," + + "\"database\":\"dolphinscheduler_qiaozhanwei\"," + + "\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\"," + + "\"user\":\"root\"," + + "\"password\":\"root@123\"}"); dataSource.setCreateTime(new Date()); dataSource.setUpdateTime(new Date()); Mockito.doReturn(dataSource).when(processService).findDataSourceById(1); - Thread.sleep(10000); + TimeUnit.SECONDS.sleep(10); + Assert.assertNotNull(taskInstance); } - @Test public void testDataxTask() throws Exception { TaskInstance taskInstance = new TaskInstance(); @@ -167,7 +209,26 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setProcessDefinitionId(1); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); - taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-97625\",\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\",\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\",\\\"postStatements\\\":[],\\\"jobSpeedRecord\\\":1000,\\\"customConfig\\\":0,\\\"dtType\\\":\\\"MYSQL\\\",\\\"dsType\\\":\\\"MYSQL\\\",\\\"jobSpeedByte\\\":0,\\\"dataSource\\\":80,\\\"dataTarget\\\":80,\\\"sql\\\":\\\"SELECT dt,count FROM pv\\\",\\\"preStatements\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"DATAX\",\"workerGroup\":\"default\"}"); + taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," + + "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\"," + + "\"forbidden\":false,\"id\":\"tasks-97625\"," + + "\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\"," + + "\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\"," + + " \\\"postStatements\\\":[]," + + " \\\"jobSpeedRecord\\\":1000," + + " \\\"customConfig\\\":0," + + " \\\"dtType\\\":\\\"MYSQL\\\"," + + " \\\"dsType\\\":\\\"MYSQL\\\"," + + " \\\"jobSpeedByte\\\":0," + + " \\\"dataSource\\\":80," + + " \\\"dataTarget\\\":80," + + " \\\"sql\\\":\\\"SELECT dt,count FROM pv\\\"," + + " \\\"preStatements\\\":[]}\"," + + "\"preTasks\":\"[]\"," + + "\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\"," + + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\"," + + "\"type\":\"DATAX\"," + + "\"workerGroup\":\"default\"}"); taskInstance.setProcessInstancePriority(Priority.MEDIUM); taskInstance.setWorkerGroup("default"); taskInstance.setExecutorId(2); @@ -184,21 +245,23 @@ public class TaskPriorityQueueConsumerTest { Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); taskPriorityQueue.put("2_1_2_1_default"); - - DataSource dataSource = new DataSource(); dataSource.setId(80); dataSource.setName("datax"); dataSource.setType(DbType.MYSQL); dataSource.setUserId(2); - dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}"); + dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\"," + + "\"database\":\"dolphinscheduler_qiaozhanwei\"," + + "\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\"," + + "\"user\":\"root\"," + + "\"password\":\"root@123\"}"); dataSource.setCreateTime(new Date()); dataSource.setUpdateTime(new Date()); Mockito.doReturn(dataSource).when(processService).findDataSourceById(80); - Thread.sleep(10000); + TimeUnit.SECONDS.sleep(10); + Assert.assertNotNull(taskInstance); } - @Test public void testSqoopTask() throws Exception { TaskInstance taskInstance = new TaskInstance(); @@ -207,7 +270,32 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setProcessDefinitionId(1); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); - taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-63634\",\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\",\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\",\\\"targetType\\\":\\\"HDFS\\\",\\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\",\\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\\\\\\":\\\\\\\"--as-textfile\\\\\\\",\\\\\\\"compressionCodec\\\\\\\":\\\\\\\"\\\\\\\",\\\\\\\"fieldsTerminated\\\\\\\":\\\\\\\",\\\\\\\",\\\\\\\"linesTerminated\\\\\\\":\\\\\\\"\\\\\\\\\\\\\\\\n\\\\\\\"}\\\",\\\"modelType\\\":\\\"import\\\",\\\"sourceParams\\\":\\\"{\\\\\\\"srcType\\\\\\\":\\\\\\\"MYSQL\\\\\\\",\\\\\\\"srcDatasource\\\\\\\":1,\\\\\\\"srcTable\\\\\\\":\\\\\\\"t_ds_user\\\\\\\",\\\\\\\"srcQueryType\\\\\\\":\\\\\\\"0\\\\\\\",\\\\\\\"srcQuerySql\\\\\\\":\\\\\\\"\\\\\\\",\\\\\\\"srcColumnType\\\\\\\":\\\\\\\"0\\\\\\\",\\\\\\\"srcColumns\\\\\\\":\\\\\\\"\\\\\\\",\\\\\\\"srcConditionList\\\\\\\":[],\\\\\\\"mapColumnHive\\\\\\\":[],\\\\\\\"mapColumnJava\\\\\\\":[]}\\\",\\\"localParams\\\":[],\\\"concurrency\\\":1}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQOOP\",\"workerGroup\":\"default\"}"); + taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\"," + + "\"forbidden\":false,\"id\":\"tasks-63634\"," + + "\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\"," + + "\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\"," + + " \\\"targetType\\\":\\\"HDFS\\\"," + + " \\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\"," + + " \\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\\\\\\":\\\\\\\"--as-textfile\\\\\\\"," + + " \\\\\\\"compressionCodec\\\\\\\":\\\\\\\"\\\\\\\"," + + " \\\\\\\"fieldsTerminated\\\\\\\":\\\\\\\",\\\\\\\"," + + " \\\\\\\"linesTerminated\\\\\\\":\\\\\\\"\\\\\\\\\\\\\\\\n\\\\\\\"}\\\"," + + " \\\"modelType\\\":\\\"import\\\"," + + " \\\"sourceParams\\\":\\\"{\\\\\\\"srcType\\\\\\\":\\\\\\\"MYSQL\\\\\\\"," + + " \\\\\\\"srcDatasource\\\\\\\":1,\\\\\\\"srcTable\\\\\\\":\\\\\\\"t_ds_user\\\\\\\"," + + " \\\\\\\"srcQueryType\\\\\\\":\\\\\\\"0\\\\\\\"," + + " \\\\\\\"srcQuerySql\\\\\\\":\\\\\\\"\\\\\\\"," + + " \\\\\\\"srcColumnType\\\\\\\":\\\\\\\"0\\\\\\\"," + + " \\\\\\\"srcColumns\\\\\\\":\\\\\\\"\\\\\\\"," + + " \\\\\\\"srcConditionList\\\\\\\":[],\\\\\\\"mapColumnHive\\\\\\\":[],\\\\\\\"mapColumnJava\\\\\\\":[]}\\\"," + + " \\\"localParams\\\":[],\\\"concurrency\\\":1}\"," + + "\"preTasks\":\"[]\"," + + "\"retryInterval\":1," + + "\"runFlag\":\"NORMAL\"," + + "\"taskInstancePriority\":\"MEDIUM\"," + + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\"," + + "\"type\":\"SQOOP\"," + + "\"workerGroup\":\"default\"}"); taskInstance.setProcessInstancePriority(Priority.MEDIUM); taskInstance.setWorkerGroup("default"); taskInstance.setExecutorId(2); @@ -224,37 +312,350 @@ public class TaskPriorityQueueConsumerTest { Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); taskPriorityQueue.put("2_1_2_1_default"); - DataSource dataSource = new DataSource(); dataSource.setId(1); dataSource.setName("datax"); dataSource.setType(DbType.MYSQL); dataSource.setUserId(2); - dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}"); + dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\"," + + "\"database\":\"dolphinscheduler_qiaozhanwei\"," + + "\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\"," + + "\"user\":\"root\"," + + "\"password\":\"root@123\"}"); dataSource.setCreateTime(new Date()); dataSource.setUpdateTime(new Date()); Mockito.doReturn(dataSource).when(processService).findDataSourceById(1); - Thread.sleep(10000); + TimeUnit.SECONDS.sleep(10); + Assert.assertNotNull(taskInstance); } - @Test - public void testTaskInstanceIsFinalState(){ + public void testTaskInstanceIsFinalState() { TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1); taskInstance.setTaskType("SHELL"); taskInstance.setProcessDefinitionId(1); taskInstance.setProcessInstanceId(1); taskInstance.setState(ExecutionStatus.KILL); - taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-55201\",\"maxRetryTimes\":0,\"name\":\"测试任务\",\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}"); + taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," + + "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\"," + + "\"forbidden\":false,\"id\":\"tasks-55201\"," + + "\"maxRetryTimes\":0,\"name\":\"测试任务\"," + + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\"," + + "\"retryInterval\":1,\"runFlag\":\"NORMAL\"," + + "\"taskInstancePriority\":\"MEDIUM\"," + + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\"," + + "\"type\":\"SHELL\"," + + "\"workerGroup\":\"default\"}"); taskInstance.setProcessInstancePriority(Priority.MEDIUM); taskInstance.setWorkerGroup("default"); taskInstance.setExecutorId(2); + Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); + + Boolean state = taskPriorityQueueConsumer.taskInstanceIsFinalState(1); + Assert.assertNotNull(state); + } + + @Test + public void testNotFoundWorkerGroup() throws Exception { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setTaskType("SHELL"); + taskInstance.setProcessDefinitionId(1); + taskInstance.setProcessInstanceId(1); + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," + + "\"conditionsTask\":false," + + "\"depList\":[]," + + "\"dependence\":\"{}\"," + + "\"forbidden\":false," + + "\"id\":\"tasks-55201\"," + + "\"maxRetryTimes\":0," + + "\"name\":\"测试任务\"," + + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\"," + + "\"preTasks\":\"[]\"," + + "\"retryInterval\":1," + + "\"runFlag\":\"NORMAL\"," + + "\"taskInstancePriority\":\"MEDIUM\"," + + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0}," + + "\"timeout\":\"{\\\"enable\\\":false," + + "\\\"strategy\\\":\\\"\\\"}\"," + + "\"type\":\"SHELL\"," + + "\"workerGroup\":\"NoWorkGroup\"}"); + taskInstance.setProcessInstancePriority(Priority.MEDIUM); + taskInstance.setWorkerGroup("NoWorkGroup"); + taskInstance.setExecutorId(2); + + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(1); + processInstance.setTenantId(1); + processInstance.setCommandType(CommandType.START_PROCESS); + taskInstance.setProcessInstance(processInstance); + taskInstance.setState(ExecutionStatus.DELAY_EXECUTION); + + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setUserId(2); + processDefinition.setProjectId(1); + taskInstance.setProcessDefine(processDefinition); + + Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); + Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); + + taskPriorityQueue.put("2_1_2_1_NoWorkGroup"); + + TimeUnit.SECONDS.sleep(10); + + Assert.assertNotNull(taskInstance); + + } + + @Test + public void testDispatch() throws Exception { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setTaskType("SHELL"); + taskInstance.setProcessDefinitionId(1); + taskInstance.setProcessInstanceId(1); + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," + + "\"conditionsTask\":false," + + "\"depList\":[]," + + "\"dependence\":\"{}\"," + + "\"forbidden\":false," + + "\"id\":\"tasks-55201\"," + + "\"maxRetryTimes\":0," + + "\"name\":\"测试任务\"," + + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\"," + + "\"preTasks\":\"[]\"," + + "\"retryInterval\":1," + + "\"runFlag\":\"NORMAL\"," + + "\"taskInstancePriority\":\"MEDIUM\"," + + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0}," + + "\"timeout\":\"{\\\"enable\\\":false," + + "\\\"strategy\\\":\\\"\\\"}\"," + + "\"type\":\"SHELL\"," + + "\"workerGroup\":\"NoWorkGroup\"}"); + taskInstance.setProcessInstancePriority(Priority.MEDIUM); + taskInstance.setWorkerGroup("NoWorkGroup"); + taskInstance.setExecutorId(2); + + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(1); + processInstance.setTenantId(1); + processInstance.setCommandType(CommandType.START_PROCESS); + taskInstance.setProcessInstance(processInstance); + taskInstance.setState(ExecutionStatus.DELAY_EXECUTION); + + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setUserId(2); + processDefinition.setProjectId(1); + taskInstance.setProcessDefine(processDefinition); + + Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); + Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); + + boolean res = taskPriorityQueueConsumer.dispatch(1); + + Assert.assertFalse(res); + } + + @Test + public void testGetTaskExecutionContext() throws Exception { + + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setTaskType("SHELL"); + taskInstance.setProcessDefinitionId(1); + taskInstance.setProcessInstanceId(1); + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," + + "\"conditionsTask\":false," + + "\"depList\":[]," + + "\"dependence\":\"{}\"," + + "\"forbidden\":false," + + "\"id\":\"tasks-55201\"," + + "\"maxRetryTimes\":0," + + "\"name\":\"测试任务\"," + + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\"," + + "\"preTasks\":\"[]\"," + + "\"retryInterval\":1," + + "\"runFlag\":\"NORMAL\"," + + "\"taskInstancePriority\":\"MEDIUM\"," + + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0}," + + "\"timeout\":\"{\\\"enable\\\":false," + + "\\\"strategy\\\":\\\"\\\"}\"," + + "\"type\":\"SHELL\"," + + "\"workerGroup\":\"NoWorkGroup\"}"); + taskInstance.setProcessInstancePriority(Priority.MEDIUM); + taskInstance.setWorkerGroup("NoWorkGroup"); + taskInstance.setExecutorId(2); + + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(1); + processInstance.setTenantId(1); + processInstance.setCommandType(CommandType.START_PROCESS); + taskInstance.setProcessInstance(processInstance); + taskInstance.setState(ExecutionStatus.DELAY_EXECUTION); + + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setUserId(2); + processDefinition.setProjectId(1); + taskInstance.setProcessDefine(processDefinition); + + Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); + Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); + + TaskExecutionContext taskExecutionContext = taskPriorityQueueConsumer.getTaskExecutionContext(1); + + Assert.assertNotNull(taskExecutionContext); + } + + @Test + public void testGetResourceFullNames() throws Exception { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setTaskType("SHELL"); + taskInstance.setProcessDefinitionId(1); + taskInstance.setProcessInstanceId(1); + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," + + "\"conditionsTask\":false," + + "\"depList\":[]," + + "\"dependence\":\"{}\"," + + "\"forbidden\":false," + + "\"id\":\"tasks-55201\"," + + "\"maxRetryTimes\":0," + + "\"name\":\"测试任务\"," + + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[{\\\"id\\\":123},{\\\"res\\\":\\\"/data/file\\\"}]}\"," + + "\"preTasks\":\"[]\"," + + "\"retryInterval\":1," + + "\"runFlag\":\"NORMAL\"," + + "\"taskInstancePriority\":\"MEDIUM\"," + + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0}," + + "\"timeout\":\"{\\\"enable\\\":false," + + "\\\"strategy\\\":\\\"\\\"}\"," + + "\"type\":\"SHELL\"," + + "\"workerGroup\":\"NoWorkGroup\"}"); + + taskInstance.setProcessInstancePriority(Priority.MEDIUM); + taskInstance.setWorkerGroup("NoWorkGroup"); + taskInstance.setExecutorId(2); + // task node + TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), TaskNode.class); + + Map map = taskPriorityQueueConsumer.getResourceFullNames(taskNode); + + List resourcesList = new ArrayList(); + Resource resource = new Resource(); + resource.setFileName("fileName"); + resourcesList.add(resource); + + Mockito.doReturn(resourcesList).when(processService).listResourceByIds(new Integer[]{123}); + Mockito.doReturn("tenantCode").when(processService).queryTenantCodeByResName(resource.getFullName(), ResourceType.FILE); + Assert.assertNotNull(map); + + } + + @Test + public void testVerifyTenantIsNull() throws Exception { + Tenant tenant = null; + + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setTaskType("SHELL"); + taskInstance.setProcessDefinitionId(1); + taskInstance.setProcessInstanceId(1); + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(1); + taskInstance.setProcessInstance(processInstance); + + boolean res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant,taskInstance); + Assert.assertTrue(res); + + tenant = new Tenant(); + tenant.setId(1); + tenant.setTenantCode("journey"); + tenant.setDescription("journey"); + tenant.setQueueId(1); + tenant.setCreateTime(new Date()); + tenant.setUpdateTime(new Date()); + res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant,taskInstance); + Assert.assertFalse(res); + + } + + @Test + public void testSetDataxTaskRelation() throws Exception { + + DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext(); + TaskNode taskNode = new TaskNode(); + taskNode.setParams("{\"dataSource\":1,\"dataTarget\":1}"); + DataSource dataSource = new DataSource(); + dataSource.setId(1); + dataSource.setConnectionParams(""); + dataSource.setType(DbType.MYSQL); + Mockito.doReturn(dataSource).when(processService).findDataSourceById(1); + + taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext,taskNode); + + Assert.assertEquals(1,dataxTaskExecutionContext.getDataSourceId()); + Assert.assertEquals(1,dataxTaskExecutionContext.getDataTargetId()); + } + + @Test + public void testRun() throws Exception { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setTaskType("SHELL"); + taskInstance.setProcessDefinitionId(1); + taskInstance.setProcessInstanceId(1); + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," + + "\"conditionsTask\":false," + + "\"depList\":[]," + + "\"dependence\":\"{}\"," + + "\"forbidden\":false," + + "\"id\":\"tasks-55201\"," + + "\"maxRetryTimes\":0," + + "\"name\":\"测试任务\"," + + "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\"," + + "\"preTasks\":\"[]\"," + + "\"retryInterval\":1," + + "\"runFlag\":\"NORMAL\"," + + "\"taskInstancePriority\":\"MEDIUM\"," + + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0}," + + "\"timeout\":\"{\\\"enable\\\":false," + + "\\\"strategy\\\":\\\"\\\"}\"," + + "\"type\":\"SHELL\"," + + "\"workerGroup\":\"NoWorkGroup\"}"); + taskInstance.setProcessInstancePriority(Priority.MEDIUM); + taskInstance.setWorkerGroup("NoWorkGroup"); + taskInstance.setExecutorId(2); + + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(1); + processInstance.setTenantId(1); + processInstance.setCommandType(CommandType.START_PROCESS); + taskInstance.setProcessInstance(processInstance); + taskInstance.setState(ExecutionStatus.DELAY_EXECUTION); + + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setUserId(2); + processDefinition.setProjectId(1); + taskInstance.setProcessDefine(processDefinition); + + Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); - taskPriorityQueueConsumer.taskInstanceIsFinalState(1); + taskPriorityQueue.put("2_1_2_1_NoWorkGroup"); + + taskPriorityQueueConsumer.run(); + + TimeUnit.SECONDS.sleep(10); + Assert.assertNotEquals(-1,taskPriorityQueue.size()); + } @After @@ -262,6 +663,4 @@ public class TaskPriorityQueueConsumerTest { Stopper.stop(); } - - }