Browse Source

[Feature-4138][Master]Cherry pick from dev to dispatch add sleep when dispatch task to work error (#4211)

* [Feature-4138][Master]Cherry pick from dev to dispatch add sleep when dispatch task to work error

* [Feature-4138][Master]Cherry pick from dev to dispatch add sleep when dispatch task to work error

Co-authored-by: BoYiZhang <39816903+BoYiZhang@users.noreply.github.com>
lgcareer 4 years ago committed by GitHub
parent
commit
542ce2ef85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  2. 450
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

23
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -19,11 +19,7 @@ package org.apache.dolphinscheduler.server.master.consumer;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.SqoopJobType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
@ -34,7 +30,6 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
@ -53,6 +48,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -118,9 +114,16 @@ public class TaskPriorityQueueConsumer extends Thread{
failedDispatchTasks.add(taskPriorityInfo);
}
}
if (!failedDispatchTasks.isEmpty()) {
for (String dispatchFailedTask : failedDispatchTasks) {
taskPriorityQueue.put(dispatchFailedTask);
}
// If there are tasks in a cycle that cannot find the worker group,
// sleep for 1 second
if (taskPriorityQueue.size() <= failedDispatchTasks.size()) {
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
}catch (Exception e){
logger.error("dispatcher task error",e);
}
@ -134,7 +137,7 @@ public class TaskPriorityQueueConsumer extends Thread{
* @param taskInstanceId taskInstanceId
* @return result
*/
private boolean dispatch(int taskInstanceId){
protected boolean dispatch(int taskInstanceId) {
boolean result = false;
try {
TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
@ -255,8 +258,8 @@ public class TaskPriorityQueueConsumer extends Thread{
* @param dataxTaskExecutionContext dataxTaskExecutionContext
* @param taskNode taskNode
*/
private void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) {
DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class);
protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) {
DataxParameters dataxParameters = JSONUtils.parseObject(taskNode.getParams(), DataxParameters.class);
DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource());
DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
@ -371,7 +374,7 @@ public class TaskPriorityQueueConsumer extends Thread{
/**
* get resource map key is full name and value is tenantCode
*/
private Map<String,String> getResourceFullNames(TaskNode taskNode) {
protected Map<String, String> getResourceFullNames(TaskNode taskNode) {
Map<String, String> resourceMap = new HashMap<>();
AbstractParameters baseParam = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());

450
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -17,11 +17,13 @@
package org.apache.dolphinscheduler.server.master.consumer;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
@ -34,6 +36,8 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -42,7 +46,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@RunWith(SpringJUnit4ClassRunner.class)
@ -70,18 +78,16 @@ public class TaskPriorityQueueConsumerTest {
Tenant tenant = new Tenant();
tenant.setId(1);
tenant.setTenantCode("journey");
tenant.setTenantName("journey");
tenant.setDescription("journey");
tenant.setQueueId(1);
tenant.setCreateTime(new Date());
tenant.setUpdateTime(new Date());
Mockito.when(processService.getTenantForProcess(1,2)).thenReturn(tenant);
Mockito.doReturn(tenant).when(processService).getTenantForProcess(1, 2);
Mockito.when(processService.queryUserQueueByProcessInstanceId(1)).thenReturn("default");
Mockito.doReturn("default").when(processService).queryUserQueueByProcessInstanceId(1);
}
@Test
public void testSHELLTask() throws Exception {
TaskInstance taskInstance = new TaskInstance();
@ -90,12 +96,31 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-55201\",\"maxRetryTimes\":0,\"name\":\"测试任务\",\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}");
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,"
+ "\"depList\":[],"
+ "\"dependence\":\"{}\","
+ "\"forbidden\":false,"
+ "\"id\":\"tasks-55201\","
+ "\"maxRetryTimes\":0,"
+ "\"name\":\"测试任务\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ "\"preTasks\":\"[]\","
+ "\"retryInterval\":1,"
+ "\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,"
+ "\"interval\":0},"
+ "\"timeout\":\"{\\\"enable\\\":false,"
+ "\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"SHELL\","
+ "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setTenantId(1);
processInstance.setCommandType(CommandType.START_PROCESS);
taskInstance.setProcessInstance(processInstance);
@ -105,12 +130,15 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition);
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
taskPriorityQueue.put("2_1_2_1_default");
Thread.sleep(10000);
}
TimeUnit.SECONDS.sleep(10);
Assert.assertNotNull(taskInstance);
}
@Test
public void testSQLTask() throws Exception {
@ -120,7 +148,13 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\",\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\",\\\"sql\\\":\\\"select id,name,ds,zodia(ds) from t_journey_user\\\",\\\"preStatements\\\":[],\\\"sqlType\\\":0,\\\"receivers\\\":\\\"825193156@qq.com\\\",\\\"datasource\\\":3,\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQL\"}");
taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\","
+ "\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\","
+ "\\\"sql\\\":\\\"select id,name,ds,zodia(ds) from t_journey_user\\\",\\\"preStatements\\\":[],"
+ "\\\"sqlType\\\":0,\\\"receivers\\\":\\\"825193156@qq.com\\\",\\\"datasource\\\":3,\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[]}\","
+ "\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQL\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@ -134,8 +168,7 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2);
processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition);
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
taskPriorityQueue.put("2_1_2_1_default");
DataSource dataSource = new DataSource();
@ -143,16 +176,20 @@ public class TaskPriorityQueueConsumerTest {
dataSource.setName("sqlDatasource");
dataSource.setType(DbType.MYSQL);
dataSource.setUserId(2);
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\","
+ "\"database\":\"dolphinscheduler_qiaozhanwei\","
+ "\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\","
+ "\"user\":\"root\","
+ "\"password\":\"root@123\"}");
dataSource.setCreateTime(new Date());
dataSource.setUpdateTime(new Date());
Mockito.when(processService.findDataSourceById(1)).thenReturn(dataSource);
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
Thread.sleep(10000);
TimeUnit.SECONDS.sleep(10);
Assert.assertNotNull(taskInstance);
}
@Test
public void testDataxTask() throws Exception {
TaskInstance taskInstance = new TaskInstance();
@ -161,7 +198,26 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-97625\",\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\",\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\",\\\"postStatements\\\":[],\\\"jobSpeedRecord\\\":1000,\\\"customConfig\\\":0,\\\"dtType\\\":\\\"MYSQL\\\",\\\"dsType\\\":\\\"MYSQL\\\",\\\"jobSpeedByte\\\":0,\\\"dataSource\\\":80,\\\"dataTarget\\\":80,\\\"sql\\\":\\\"SELECT dt,count FROM pv\\\",\\\"preStatements\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"DATAX\",\"workerGroup\":\"default\"}");
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
+ "\"forbidden\":false,\"id\":\"tasks-97625\","
+ "\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\","
+ "\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\","
+ " \\\"postStatements\\\":[],"
+ " \\\"jobSpeedRecord\\\":1000,"
+ " \\\"customConfig\\\":0,"
+ " \\\"dtType\\\":\\\"MYSQL\\\","
+ " \\\"dsType\\\":\\\"MYSQL\\\","
+ " \\\"jobSpeedByte\\\":0,"
+ " \\\"dataSource\\\":80,"
+ " \\\"dataTarget\\\":80,"
+ " \\\"sql\\\":\\\"SELECT dt,count FROM pv\\\","
+ " \\\"preStatements\\\":[]}\","
+ "\"preTasks\":\"[]\","
+ "\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"DATAX\","
+ "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@ -175,27 +231,26 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2);
processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition);
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
taskPriorityQueue.put("2_1_2_1_default");
DataSource dataSource = new DataSource();
dataSource.setId(80);
dataSource.setName("datax");
dataSource.setType(DbType.MYSQL);
dataSource.setUserId(2);
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\","
+ "\"database\":\"dolphinscheduler_qiaozhanwei\","
+ "\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\","
+ "\"user\":\"root\","
+ "\"password\":\"root@123\"}");
dataSource.setCreateTime(new Date());
dataSource.setUpdateTime(new Date());
Mockito.when(processService.findDataSourceById(80)).thenReturn(dataSource);
Thread.sleep(10000);
Mockito.doReturn(dataSource).when(processService).findDataSourceById(80);
TimeUnit.SECONDS.sleep(10);
Assert.assertNotNull(taskInstance);
}
@Test
public void testSqoopTask() throws Exception {
TaskInstance taskInstance = new TaskInstance();
@ -204,7 +259,32 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-63634\",\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\",\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\",\\\"targetType\\\":\\\"HDFS\\\",\\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\",\\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\\\\\\":\\\\\\\"--as-textfile\\\\\\\",\\\\\\\"compressionCodec\\\\\\\":\\\\\\\"\\\\\\\",\\\\\\\"fieldsTerminated\\\\\\\":\\\\\\\",\\\\\\\",\\\\\\\"linesTerminated\\\\\\\":\\\\\\\"\\\\\\\\\\\\\\\\n\\\\\\\"}\\\",\\\"modelType\\\":\\\"import\\\",\\\"sourceParams\\\":\\\"{\\\\\\\"srcType\\\\\\\":\\\\\\\"MYSQL\\\\\\\",\\\\\\\"srcDatasource\\\\\\\":1,\\\\\\\"srcTable\\\\\\\":\\\\\\\"t_ds_user\\\\\\\",\\\\\\\"srcQueryType\\\\\\\":\\\\\\\"0\\\\\\\",\\\\\\\"srcQuerySql\\\\\\\":\\\\\\\"\\\\\\\",\\\\\\\"srcColumnType\\\\\\\":\\\\\\\"0\\\\\\\",\\\\\\\"srcColumns\\\\\\\":\\\\\\\"\\\\\\\",\\\\\\\"srcConditionList\\\\\\\":[],\\\\\\\"mapColumnHive\\\\\\\":[],\\\\\\\"mapColumnJava\\\\\\\":[]}\\\",\\\"localParams\\\":[],\\\"concurrency\\\":1}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQOOP\",\"workerGroup\":\"default\"}");
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
+ "\"forbidden\":false,\"id\":\"tasks-63634\","
+ "\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\","
+ "\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\","
+ " \\\"targetType\\\":\\\"HDFS\\\","
+ " \\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\","
+ " \\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\\\\\\":\\\\\\\"--as-textfile\\\\\\\","
+ " \\\\\\\"compressionCodec\\\\\\\":\\\\\\\"\\\\\\\","
+ " \\\\\\\"fieldsTerminated\\\\\\\":\\\\\\\",\\\\\\\","
+ " \\\\\\\"linesTerminated\\\\\\\":\\\\\\\"\\\\\\\\\\\\\\\\n\\\\\\\"}\\\","
+ " \\\"modelType\\\":\\\"import\\\","
+ " \\\"sourceParams\\\":\\\"{\\\\\\\"srcType\\\\\\\":\\\\\\\"MYSQL\\\\\\\","
+ " \\\\\\\"srcDatasource\\\\\\\":1,\\\\\\\"srcTable\\\\\\\":\\\\\\\"t_ds_user\\\\\\\","
+ " \\\\\\\"srcQueryType\\\\\\\":\\\\\\\"0\\\\\\\","
+ " \\\\\\\"srcQuerySql\\\\\\\":\\\\\\\"\\\\\\\","
+ " \\\\\\\"srcColumnType\\\\\\\":\\\\\\\"0\\\\\\\","
+ " \\\\\\\"srcColumns\\\\\\\":\\\\\\\"\\\\\\\","
+ " \\\\\\\"srcConditionList\\\\\\\":[],\\\\\\\"mapColumnHive\\\\\\\":[],\\\\\\\"mapColumnJava\\\\\\\":[]}\\\","
+ " \\\"localParams\\\":[],\\\"concurrency\\\":1}\","
+ "\"preTasks\":\"[]\","
+ "\"retryInterval\":1,"
+ "\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"SQOOP\","
+ "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
@ -218,25 +298,26 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2);
processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition);
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
taskPriorityQueue.put("2_1_2_1_default");
DataSource dataSource = new DataSource();
dataSource.setId(1);
dataSource.setName("datax");
dataSource.setType(DbType.MYSQL);
dataSource.setUserId(2);
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\","
+ "\"database\":\"dolphinscheduler_qiaozhanwei\","
+ "\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\","
+ "\"user\":\"root\","
+ "\"password\":\"root@123\"}");
dataSource.setCreateTime(new Date());
dataSource.setUpdateTime(new Date());
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
Thread.sleep(10000);
TimeUnit.SECONDS.sleep(10);
Assert.assertNotNull(taskInstance);
}
@Test
public void testTaskInstanceIsFinalState() {
TaskInstance taskInstance = new TaskInstance();
@ -245,16 +326,303 @@ public class TaskPriorityQueueConsumerTest {
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-55201\",\"maxRetryTimes\":0,\"name\":\"测试任务\",\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}");
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\","
+ "\"forbidden\":false,\"id\":\"tasks-55201\","
+ "\"maxRetryTimes\":0,\"name\":\"测试任务\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\","
+ "\"retryInterval\":1,\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"SHELL\","
+ "\"workerGroup\":\"default\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("default");
taskInstance.setExecutorId(2);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
Boolean state = taskPriorityQueueConsumer.taskInstanceIsFinalState(1);
Assert.assertNotNull(state);
}
@Test
public void testNotFoundWorkerGroup() throws Exception {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,"
+ "\"depList\":[],"
+ "\"dependence\":\"{}\","
+ "\"forbidden\":false,"
+ "\"id\":\"tasks-55201\","
+ "\"maxRetryTimes\":0,"
+ "\"name\":\"测试任务\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ "\"preTasks\":\"[]\","
+ "\"retryInterval\":1,"
+ "\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ "\"timeout\":\"{\\\"enable\\\":false,"
+ "\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"SHELL\","
+ "\"workerGroup\":\"NoWorkGroup\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setTenantId(1);
processInstance.setCommandType(CommandType.START_PROCESS);
taskInstance.setProcessInstance(processInstance);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setUserId(2);
processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
taskPriorityQueue.put("2_1_2_1_NoWorkGroup");
TimeUnit.SECONDS.sleep(10);
Assert.assertNotNull(taskInstance);
}
@Test
public void testDispatch() throws Exception {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,"
+ "\"depList\":[],"
+ "\"dependence\":\"{}\","
+ "\"forbidden\":false,"
+ "\"id\":\"tasks-55201\","
+ "\"maxRetryTimes\":0,"
+ "\"name\":\"测试任务\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ "\"preTasks\":\"[]\","
+ "\"retryInterval\":1,"
+ "\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ "\"timeout\":\"{\\\"enable\\\":false,"
+ "\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"SHELL\","
+ "\"workerGroup\":\"NoWorkGroup\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setTenantId(1);
processInstance.setCommandType(CommandType.START_PROCESS);
taskInstance.setProcessInstance(processInstance);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setUserId(2);
processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
boolean res = taskPriorityQueueConsumer.dispatch(1);
Assert.assertFalse(res);
}
@Test
public void testGetTaskExecutionContext() throws Exception {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,"
+ "\"depList\":[],"
+ "\"dependence\":\"{}\","
+ "\"forbidden\":false,"
+ "\"id\":\"tasks-55201\","
+ "\"maxRetryTimes\":0,"
+ "\"name\":\"测试任务\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ "\"preTasks\":\"[]\","
+ "\"retryInterval\":1,"
+ "\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ "\"timeout\":\"{\\\"enable\\\":false,"
+ "\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"SHELL\","
+ "\"workerGroup\":\"NoWorkGroup\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setTenantId(1);
processInstance.setCommandType(CommandType.START_PROCESS);
taskInstance.setProcessInstance(processInstance);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setUserId(2);
processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
TaskExecutionContext taskExecutionContext = taskPriorityQueueConsumer.getTaskExecutionContext(1);
Mockito.when( processService.findTaskInstanceById(1)).thenReturn(taskInstance);
taskPriorityQueueConsumer.taskInstanceIsFinalState(1);
Assert.assertNotNull(taskExecutionContext);
}
@Test
public void testGetResourceFullNames() throws Exception {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,"
+ "\"depList\":[],"
+ "\"dependence\":\"{}\","
+ "\"forbidden\":false,"
+ "\"id\":\"tasks-55201\","
+ "\"maxRetryTimes\":0,"
+ "\"name\":\"测试任务\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[{\\\"id\\\":123},{\\\"res\\\":\\\"/data/file\\\"}]}\","
+ "\"preTasks\":\"[]\","
+ "\"retryInterval\":1,"
+ "\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ "\"timeout\":\"{\\\"enable\\\":false,"
+ "\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"SHELL\","
+ "\"workerGroup\":\"NoWorkGroup\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
// task node
TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), TaskNode.class);
Map<String, String> map = taskPriorityQueueConsumer.getResourceFullNames(taskNode);
List<Resource> resourcesList = new ArrayList<Resource>();
Resource resource = new Resource();
resource.setFileName("fileName");
resourcesList.add(resource);
Mockito.doReturn(resourcesList).when(processService).listResourceByIds(new Integer[]{123});
Mockito.doReturn("tenantCode").when(processService).queryTenantCodeByResName(resource.getFullName(), ResourceType.FILE);
Assert.assertNotNull(map);
}
@Test
public void testSetDataxTaskRelation() throws Exception {
DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
TaskNode taskNode = new TaskNode();
taskNode.setParams("{\"dataSource\":1,\"dataTarget\":1}");
DataSource dataSource = new DataSource();
dataSource.setId(1);
dataSource.setConnectionParams("");
dataSource.setType(DbType.MYSQL);
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext,taskNode);
Assert.assertEquals(1,dataxTaskExecutionContext.getDataSourceId());
Assert.assertEquals(1,dataxTaskExecutionContext.getDataTargetId());
}
@Test
public void testRun() throws Exception {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,"
+ "\"depList\":[],"
+ "\"dependence\":\"{}\","
+ "\"forbidden\":false,"
+ "\"id\":\"tasks-55201\","
+ "\"maxRetryTimes\":0,"
+ "\"name\":\"测试任务\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\","
+ "\"preTasks\":\"[]\","
+ "\"retryInterval\":1,"
+ "\"runFlag\":\"NORMAL\","
+ "\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},"
+ "\"timeout\":\"{\\\"enable\\\":false,"
+ "\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"SHELL\","
+ "\"workerGroup\":\"NoWorkGroup\"}");
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setTenantId(1);
processInstance.setCommandType(CommandType.START_PROCESS);
taskInstance.setProcessInstance(processInstance);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setUserId(2);
processDefinition.setProjectId(1);
taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
taskPriorityQueue.put("2_1_2_1_NoWorkGroup");
taskPriorityQueueConsumer.run();
TimeUnit.SECONDS.sleep(10);
Assert.assertNotEquals(-1,taskPriorityQueue.size());
}
@After
public void close() {
Stopper.stop();
}
}

Loading…
Cancel
Save