From 7a100086b7cf0d86456446150811989ce54939b1 Mon Sep 17 00:00:00 2001 From: xingchun-chen <55787491+xingchun-chen@users.noreply.github.com> Date: Tue, 28 Apr 2020 17:57:30 +0800 Subject: [PATCH 1/2] Increase dataX environment variable, sslTrust default value (#2555) * add LoginTest license * Delete useless packages * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * Update worker_group_id to worker_group in init.sql * Update worker_group_id to worker_group in init.sql * Update worker_group_id to worker_group * Increase dataX environment variable, sslTrust default value Co-authored-by: chenxingchun <438044805@qq.com> --- .../src/main/resources/config/install_config.conf | 3 ++- script/env/dolphinscheduler_env.sh | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-server/src/main/resources/config/install_config.conf b/dolphinscheduler-server/src/main/resources/config/install_config.conf index 4671be7371..cba117e048 100644 --- a/dolphinscheduler-server/src/main/resources/config/install_config.conf +++ b/dolphinscheduler-server/src/main/resources/config/install_config.conf @@ -63,7 +63,8 @@ mailPassword="xxxxxxxxxx" # TLS mail protocol support starttlsEnable="false" -sslTrust="xxxxxxxxxx" +#note: sslTrust is the same as mailServerHost +sslTrust="smtp.exmail.qq.com" # SSL mail protocol support # note: The SSL protocol is enabled by default. diff --git a/script/env/dolphinscheduler_env.sh b/script/env/dolphinscheduler_env.sh index e5b99e2857..066f379875 100644 --- a/script/env/dolphinscheduler_env.sh +++ b/script/env/dolphinscheduler_env.sh @@ -23,4 +23,6 @@ export PYTHON_HOME=/opt/soft/python export JAVA_HOME=/opt/soft/java export HIVE_HOME=/opt/soft/hive export FLINK_HOME=/opt/soft/flink -export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$PATH +export DATAX_HOME=/opt/soft/dataX + +export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH From a12103b08cfca98301d8deee5515d7ba9462a46d Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Tue, 28 Apr 2020 18:39:53 +0800 Subject: [PATCH 2/2] =?UTF-8?q?no=20valid=20worker=20group=EF=BC=8Cmaster?= =?UTF-8?q?=20can=20kill=20task=20directly=20(#2541)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * dispatch task fail will set task status failed * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * send mail error, #2466 bug fix * send mail error, #2466 bug fix * send mail error, #2466 bug fix * send mail error, #2466 bug fix * #2486 bug fix * host and workergroup compatible * EnterpriseWeChatUtils modify * EnterpriseWeChatUtils modify * EnterpriseWeChatUtils modify * #2499 bug fix * add comment * revert comment * revert comment * #2499 buf fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly Co-authored-by: qiaozhanwei --- .../consumer/TaskPriorityQueueConsumer.java | 16 +- .../master/runner/MasterTaskExecThread.java | 46 ++- .../TaskPriorityQueueConsumerTest.java | 262 ++++++++++++++++++ .../runner/MasterTaskExecThreadTest.java | 80 ++++++ .../server/registry/DependencyConfig.java | 8 + pom.xml | 2 + 6 files changed, 411 insertions(+), 3 deletions(-) create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 480d6657c2..3314789fdb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -120,19 +120,31 @@ public class TaskPriorityQueueConsumer extends Thread{ Boolean result = false; while (Stopper.isRunning()){ try { - result = dispatcher.dispatch(executionContext); + result = dispatcher.dispatch(executionContext); } catch (ExecuteException e) { logger.error("dispatch error",e); ThreadUtils.sleep(SLEEP_TIME_MILLIS); } - if (result){ + if (result || taskInstanceIsFinalState(taskInstanceId)){ break; } } return result; } + + /** + * taskInstance is final state + * success,failure,kill,stop,pause,threadwaiting is final state + * @param taskInstanceId taskInstanceId + * @return taskInstance is final state + */ + public Boolean taskInstanceIsFinalState(int taskInstanceId){ + TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId); + return taskInstance.getState().typeIsFinished(); + } + /** * get TaskExecutionContext * @param taskInstanceId taskInstanceId diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index 9986b07319..105584fe99 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; @@ -35,9 +36,12 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.springframework.beans.factory.annotation.Autowired; import java.util.Date; +import java.util.Set; /** @@ -53,6 +57,12 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { private NettyExecutorManager nettyExecutorManager; + + /** + * zookeeper register center + */ + private ZookeeperRegistryCenter zookeeperRegistryCenter; + /** * constructor of MasterTaskExecThread * @param taskInstance task instance @@ -61,6 +71,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { super(taskInstance); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class); + this.zookeeperRegistryCenter = SpringApplicationContext.getBean(ZookeeperRegistryCenter.class); } /** @@ -175,6 +186,16 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { } alreadyKilled = true; + String taskInstanceWorkerGroup = taskInstance.getWorkerGroup(); + + // not exists + if (!existsValidWorkerGroup(taskInstanceWorkerGroup)){ + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setEndTime(new Date()); + processService.updateTaskInstance(taskInstance); + return; + } + TaskKillRequestCommand killCommand = new TaskKillRequestCommand(); killCommand.setTaskInstanceId(taskInstance.getId()); @@ -185,10 +206,33 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { nettyExecutorManager.executeDirectly(executionContext); - logger.info("master add kill task :{} id:{} to kill queue", + logger.info("master kill taskInstance name :{} taskInstance id:{}", taskInstance.getName(), taskInstance.getId() ); } + /** + * whether exists valid worker group + * @param taskInstanceWorkerGroup taskInstanceWorkerGroup + * @return whether exists + */ + public Boolean existsValidWorkerGroup(String taskInstanceWorkerGroup){ + Set workerGroups = zookeeperRegistryCenter.getWorkerGroupDirectly(); + // not worker group + if (CollectionUtils.isEmpty(workerGroups)){ + return false; + } + + // has worker group , but not taskInstance assigned worker group + if (!workerGroups.contains(taskInstanceWorkerGroup)){ + return false; + } + Set workers = zookeeperRegistryCenter.getWorkerGroupNodesDirectly(taskInstanceWorkerGroup); + if (CollectionUtils.isEmpty(workers)) { + return false; + } + return true; + } + /** * get task timeout parameter * @return TaskTimeoutParameter diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java new file mode 100644 index 0000000000..b6f118a734 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.consumer; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.dao.entity.*; +import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; +import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; +import org.apache.dolphinscheduler.server.registry.DependencyConfig; +import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; +import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; +import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import java.util.Date; + + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, + NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class, + ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class}) +public class TaskPriorityQueueConsumerTest { + + + @Autowired + private TaskPriorityQueue taskPriorityQueue; + + @Autowired + private TaskPriorityQueueConsumer taskPriorityQueueConsumer; + + @Autowired + private ProcessService processService; + + @Autowired + private ExecutorDispatcher dispatcher; + + @Before + public void init(){ + + Tenant tenant = new Tenant(); + tenant.setId(1); + tenant.setTenantCode("journey"); + tenant.setTenantName("journey"); + tenant.setDescription("journey"); + tenant.setQueueId(1); + tenant.setCreateTime(new Date()); + tenant.setUpdateTime(new Date()); + + Mockito.when(processService.getTenantForProcess(1,2)).thenReturn(tenant); + + Mockito.when(processService.queryUserQueueByProcessInstanceId(1)).thenReturn("default"); + } + + + @Test + public void testSHELLTask() throws Exception { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setTaskType("SHELL"); + taskInstance.setProcessDefinitionId(1); + taskInstance.setProcessInstanceId(1); + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-55201\",\"maxRetryTimes\":0,\"name\":\"测试任务\",\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}"); + taskInstance.setProcessInstancePriority(Priority.MEDIUM); + taskInstance.setWorkerGroup("default"); + taskInstance.setExecutorId(2); + + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setTenantId(1); + processInstance.setCommandType(CommandType.START_PROCESS); + taskInstance.setProcessInstance(processInstance); + + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setUserId(2); + processDefinition.setProjectId(1); + taskInstance.setProcessDefine(processDefinition); + + Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance); + taskPriorityQueue.put("2_1_2_1_default"); + + Thread.sleep(10000); + } + + + @Test + public void testSQLTask() throws Exception { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setTaskType("SQL"); + taskInstance.setProcessDefinitionId(1); + taskInstance.setProcessInstanceId(1); + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\",\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\",\\\"sql\\\":\\\"select id,name,ds,zodia(ds) from t_journey_user\\\",\\\"preStatements\\\":[],\\\"sqlType\\\":0,\\\"receivers\\\":\\\"825193156@qq.com\\\",\\\"datasource\\\":3,\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQL\"}"); + taskInstance.setProcessInstancePriority(Priority.MEDIUM); + taskInstance.setWorkerGroup("default"); + taskInstance.setExecutorId(2); + + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setTenantId(1); + processInstance.setCommandType(CommandType.START_PROCESS); + taskInstance.setProcessInstance(processInstance); + + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setUserId(2); + processDefinition.setProjectId(1); + taskInstance.setProcessDefine(processDefinition); + + Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance); + taskPriorityQueue.put("2_1_2_1_default"); + + DataSource dataSource = new DataSource(); + dataSource.setId(1); + dataSource.setName("sqlDatasource"); + dataSource.setType(DbType.MYSQL); + dataSource.setUserId(2); + dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}"); + dataSource.setCreateTime(new Date()); + dataSource.setUpdateTime(new Date()); + + Mockito.when(processService.findDataSourceById(1)).thenReturn(dataSource); + + Thread.sleep(10000); + } + + + @Test + public void testDataxTask() throws Exception { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setTaskType("DATAX"); + taskInstance.setProcessDefinitionId(1); + taskInstance.setProcessInstanceId(1); + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-97625\",\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\",\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\",\\\"postStatements\\\":[],\\\"jobSpeedRecord\\\":1000,\\\"customConfig\\\":0,\\\"dtType\\\":\\\"MYSQL\\\",\\\"dsType\\\":\\\"MYSQL\\\",\\\"jobSpeedByte\\\":0,\\\"dataSource\\\":80,\\\"dataTarget\\\":80,\\\"sql\\\":\\\"SELECT dt,count FROM pv\\\",\\\"preStatements\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"DATAX\",\"workerGroup\":\"default\"}"); + taskInstance.setProcessInstancePriority(Priority.MEDIUM); + taskInstance.setWorkerGroup("default"); + taskInstance.setExecutorId(2); + + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setTenantId(1); + processInstance.setCommandType(CommandType.START_PROCESS); + taskInstance.setProcessInstance(processInstance); + + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setUserId(2); + processDefinition.setProjectId(1); + taskInstance.setProcessDefine(processDefinition); + + Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance); + taskPriorityQueue.put("2_1_2_1_default"); + + + + DataSource dataSource = new DataSource(); + dataSource.setId(80); + dataSource.setName("datax"); + dataSource.setType(DbType.MYSQL); + dataSource.setUserId(2); + dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}"); + dataSource.setCreateTime(new Date()); + dataSource.setUpdateTime(new Date()); + + Mockito.when(processService.findDataSourceById(80)).thenReturn(dataSource); + + Thread.sleep(10000); + } + + + @Test + public void testSqoopTask() throws Exception { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setTaskType("SQOOP"); + taskInstance.setProcessDefinitionId(1); + taskInstance.setProcessInstanceId(1); + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-63634\",\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\",\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\",\\\"targetType\\\":\\\"HDFS\\\",\\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\",\\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\\\\\\":\\\\\\\"--as-textfile\\\\\\\",\\\\\\\"compressionCodec\\\\\\\":\\\\\\\"\\\\\\\",\\\\\\\"fieldsTerminated\\\\\\\":\\\\\\\",\\\\\\\",\\\\\\\"linesTerminated\\\\\\\":\\\\\\\"\\\\\\\\\\\\\\\\n\\\\\\\"}\\\",\\\"modelType\\\":\\\"import\\\",\\\"sourceParams\\\":\\\"{\\\\\\\"srcType\\\\\\\":\\\\\\\"MYSQL\\\\\\\",\\\\\\\"srcDatasource\\\\\\\":1,\\\\\\\"srcTable\\\\\\\":\\\\\\\"t_ds_user\\\\\\\",\\\\\\\"srcQueryType\\\\\\\":\\\\\\\"0\\\\\\\",\\\\\\\"srcQuerySql\\\\\\\":\\\\\\\"\\\\\\\",\\\\\\\"srcColumnType\\\\\\\":\\\\\\\"0\\\\\\\",\\\\\\\"srcColumns\\\\\\\":\\\\\\\"\\\\\\\",\\\\\\\"srcConditionList\\\\\\\":[],\\\\\\\"mapColumnHive\\\\\\\":[],\\\\\\\"mapColumnJava\\\\\\\":[]}\\\",\\\"localParams\\\":[],\\\"concurrency\\\":1}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQOOP\",\"workerGroup\":\"default\"}"); + taskInstance.setProcessInstancePriority(Priority.MEDIUM); + taskInstance.setWorkerGroup("default"); + taskInstance.setExecutorId(2); + + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setTenantId(1); + processInstance.setCommandType(CommandType.START_PROCESS); + taskInstance.setProcessInstance(processInstance); + + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setUserId(2); + processDefinition.setProjectId(1); + taskInstance.setProcessDefine(processDefinition); + + Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance); + taskPriorityQueue.put("2_1_2_1_default"); + + + + DataSource dataSource = new DataSource(); + dataSource.setId(1); + dataSource.setName("datax"); + dataSource.setType(DbType.MYSQL); + dataSource.setUserId(2); + dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}"); + dataSource.setCreateTime(new Date()); + dataSource.setUpdateTime(new Date()); + + Mockito.when(processService.findDataSourceById(1)).thenReturn(dataSource); + + Thread.sleep(10000); + } + + + @Test + public void testTaskInstanceIsFinalState(){ + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setTaskType("SHELL"); + taskInstance.setProcessDefinitionId(1); + taskInstance.setProcessInstanceId(1); + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-55201\",\"maxRetryTimes\":0,\"name\":\"测试任务\",\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}"); + taskInstance.setProcessInstancePriority(Priority.MEDIUM); + taskInstance.setWorkerGroup("default"); + taskInstance.setExecutorId(2); + + + Mockito.when( processService.findTaskInstanceById(1)).thenReturn(taskInstance); + + taskPriorityQueueConsumer.taskInstanceIsFinalState(1); + } + + +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java new file mode 100644 index 0000000000..ebddec4fb1 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import junit.framework.Assert; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer; +import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; +import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; +import org.apache.dolphinscheduler.server.registry.DependencyConfig; +import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; +import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import java.util.HashSet; +import java.util.Set; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, + NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class, + ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, MasterConfig.class}) +public class MasterTaskExecThreadTest { + + @Test + public void testExistsValidWorkerGroup1(){ + ZookeeperRegistryCenter zookeeperRegistryCenter = Mockito.mock(ZookeeperRegistryCenter.class); + Mockito.when(zookeeperRegistryCenter.getWorkerGroupDirectly()).thenReturn(null); + MasterTaskExecThread masterTaskExecThread = new MasterTaskExecThread(null); + masterTaskExecThread.existsValidWorkerGroup("default"); + } + @Test + public void testExistsValidWorkerGroup2(){ + ZookeeperRegistryCenter zookeeperRegistryCenter = Mockito.mock(ZookeeperRegistryCenter.class); + Set workerGorups = new HashSet<>(); + workerGorups.add("test1"); + workerGorups.add("test2"); + + Mockito.when(zookeeperRegistryCenter.getWorkerGroupDirectly()).thenReturn(workerGorups); + MasterTaskExecThread masterTaskExecThread = new MasterTaskExecThread(null); + masterTaskExecThread.existsValidWorkerGroup("default"); + } + + @Test + public void testExistsValidWorkerGroup3(){ + ZookeeperRegistryCenter zookeeperRegistryCenter = Mockito.mock(ZookeeperRegistryCenter.class); + Set workerGorups = new HashSet<>(); + workerGorups.add("test1"); + + Mockito.when(zookeeperRegistryCenter.getWorkerGroupDirectly()).thenReturn(workerGorups); + Mockito.when(zookeeperRegistryCenter.getWorkerGroupNodesDirectly("test1")).thenReturn(workerGorups); + MasterTaskExecThread masterTaskExecThread = new MasterTaskExecThread(null); + masterTaskExecThread.existsValidWorkerGroup("test1"); + } + + +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java index 0adea44cfd..93d2b03010 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java @@ -20,11 +20,14 @@ package org.apache.dolphinscheduler.server.registry; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; +import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager; import org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; +import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; import org.mockito.Mockito; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -144,4 +147,9 @@ public class DependencyConfig { public TaskResponseService taskResponseService(){ return Mockito.mock(TaskResponseService.class); } + + @Bean + public TaskPriorityQueue taskPriorityQueue(){ + return new TaskPriorityQueueImpl(); + } } diff --git a/pom.xml b/pom.xml index 0647724ed0..dad1e3696b 100644 --- a/pom.xml +++ b/pom.xml @@ -778,6 +778,8 @@ **/server/log/TaskLogDiscriminatorTest.java **/server/log/TaskLogFilterTest.java **/server/log/WorkerLogFilterTest.java + **/server/master/consumer/TaskPriorityQueueConsumerTest.java + **/server/master/runner/MasterTaskExecThreadTest.java **/server/master/dispatch/executor/NettyExecutorManagerTest.java **/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java **/server/master/dispatch/host/assign/RandomSelectorTest.java