From 1c77708a89a6dfd6a726fff74c9ff66ad3247778 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E4=BD=9B=E7=88=B7?= <31914013+lfyee@users.noreply.github.com> Date: Thu, 16 Jan 2020 15:33:00 +0800 Subject: [PATCH] Supplementary data by schedule (#1830) * Supplementary data by schedule * fix sonar check bug * fix code duplicated blocks * ut * loop by day * MasterExecThread test * test add licene Co-authored-by: dailidong --- .../api/service/ExecutorService.java | 53 +++- .../api/service/ExecutorService2Test.java | 229 ++++++++++++++++++ .../dolphinscheduler/dao/ProcessDao.java | 9 + .../dao/mapper/ScheduleMapper.java | 7 + .../dao/mapper/ScheduleMapper.xml | 5 + dolphinscheduler-server/pom.xml | 21 ++ .../master/runner/MasterExecThread.java | 50 +++- .../server/utils/ScheduleUtils.java | 79 ++++++ .../server/master/MasterExecThreadTest.java | 154 ++++++++++++ .../server/utils/ScheduleUtilsTest.java | 44 ++++ .../definition/pages/list/_source/start.vue | 2 +- .../src/js/module/i18n/locale/en_US.js | 1 + .../src/js/module/i18n/locale/zh_CN.js | 1 + pom.xml | 3 + 14 files changed, 636 insertions(+), 22 deletions(-) create mode 100644 dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ScheduleUtils.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ScheduleUtilsTest.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 576f0c3eba..c1689c5bec 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -29,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.server.utils.ScheduleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -499,22 +501,47 @@ public class ExecutorService extends BaseService{ if(commandType == CommandType.COMPLEMENT_DATA){ runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode; - if(runMode == RunMode.RUN_MODE_SERIAL){ - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end)); - command.setCommandParam(JSONUtils.toJson(cmdParam)); - return processDao.createCommand(command); - }else if (runMode == RunMode.RUN_MODE_PARALLEL){ - int runCunt = 0; - while(!start.after(end)){ - runCunt += 1; + if(null != start && null != end && start.before(end)){ + if(runMode == RunMode.RUN_MODE_SERIAL){ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start)); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end)); command.setCommandParam(JSONUtils.toJson(cmdParam)); - processDao.createCommand(command); - start = DateUtils.getSomeDay(start, 1); + return processDao.createCommand(command); + }else if (runMode == RunMode.RUN_MODE_PARALLEL){ + List schedules = processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefineId); + List listDate = new LinkedList<>(); + if(!CollectionUtils.isEmpty(schedules)){ + for (Schedule item : schedules) { + List list = ScheduleUtils.getRecentTriggerTime(item.getCrontab(), start, end); + listDate.addAll(list); + } + } + if(!CollectionUtils.isEmpty(listDate)){ + // loop by schedule date + for (Date date : listDate) { + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(date)); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(date)); + command.setCommandParam(JSONUtils.toJson(cmdParam)); + processDao.createCommand(command); + } + return listDate.size(); + }else{ + // loop by day + int runCunt = 0; + while(!start.after(end)) { + runCunt += 1; + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start)); + command.setCommandParam(JSONUtils.toJson(cmdParam)); + processDao.createCommand(command); + start = DateUtils.getSomeDay(start, 1); + } + return runCunt; + } } - return runCunt; + }else{ + logger.error("there is not vaild schedule date for the process definition: id:{},date:{}", + processDefineId, schedule); } }else{ command.setCommandParam(JSONUtils.toJson(cmdParam)); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java new file mode 100644 index 0000000000..b4f3e7e31f --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.service; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.RunMode; +import org.apache.dolphinscheduler.dao.ProcessDao; +import org.apache.dolphinscheduler.dao.entity.*; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.text.ParseException; +import java.util.*; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; + +/** + * test for ExecutorService + */ +@RunWith(MockitoJUnitRunner.Silent.class) +public class ExecutorService2Test { + + @InjectMocks + private ExecutorService executorService; + + @Mock + private ProcessDao processDao; + + @Mock + private ProcessDefinitionMapper processDefinitionMapper; + + @Mock + private ProjectMapper projectMapper; + + @Mock + private ProjectService projectService; + + private int processDefinitionId = 1; + + private int tenantId = 1; + + private int userId = 1; + + private ProcessDefinition processDefinition = new ProcessDefinition(); + + private User loginUser = new User(); + + private String projectName = "projectName"; + + private Project project = new Project(); + + private String cronTime; + + @Before + public void init(){ + // user + loginUser.setId(userId); + + // processDefinition + processDefinition.setId(processDefinitionId); + processDefinition.setReleaseState(ReleaseState.ONLINE); + processDefinition.setTenantId(tenantId); + processDefinition.setUserId(userId); + + // project + project.setName(projectName); + + // cronRangeTime + cronTime = "2020-01-01 00:00:00,2020-01-31 23:00:00"; + + // mock + Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project); + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(checkProjectAndAuth()); + Mockito.when(processDefinitionMapper.selectById(processDefinitionId)).thenReturn(processDefinition); + Mockito.when(processDao.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant()); + Mockito.when(processDao.createCommand(any(Command.class))).thenReturn(1); + } + + /** + * not complement + * @throws ParseException + */ + @Test + public void testNoComplement() throws ParseException { + try { + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectName, + processDefinitionId, cronTime, CommandType.START_PROCESS, + null, null, + null, null, 0, + "", "", RunMode.RUN_MODE_SERIAL, + Priority.LOW, 0, 110); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + verify(processDao, times(1)).createCommand(any(Command.class)); + }catch (Exception e){ + Assert.assertTrue(false); + } + } + + /** + * date error + * @throws ParseException + */ + @Test + public void testDateError() throws ParseException { + try { + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectName, + processDefinitionId, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + "", "", RunMode.RUN_MODE_SERIAL, + Priority.LOW, 0, 110); + Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); + verify(processDao, times(0)).createCommand(any(Command.class)); + }catch (Exception e){ + Assert.assertTrue(false); + } + } + + /** + * serial + * @throws ParseException + */ + @Test + public void testSerial() throws ParseException { + try { + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectName, + processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + "", "", RunMode.RUN_MODE_SERIAL, + Priority.LOW, 0, 110); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + verify(processDao, times(1)).createCommand(any(Command.class)); + }catch (Exception e){ + Assert.assertTrue(false); + } + } + + /** + * without schedule + * @throws ParseException + */ + @Test + public void testParallelWithOutSchedule() throws ParseException { + try{ + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectName, + processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + "", "", RunMode.RUN_MODE_PARALLEL, + Priority.LOW, 0, 110); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + verify(processDao, times(31)).createCommand(any(Command.class)); + }catch (Exception e){ + Assert.assertTrue(false); + } + } + + /** + * with schedule + * @throws ParseException + */ + @Test + public void testParallelWithSchedule() throws ParseException { + try{ + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectName, + processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + "", "", RunMode.RUN_MODE_PARALLEL, + Priority.LOW, 0, 110); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + verify(processDao, times(16)).createCommand(any(Command.class)); + }catch (Exception e){ + Assert.assertTrue(false); + } + } + + private List zeroSchedulerList(){ + return Collections.EMPTY_LIST; + } + + private List oneSchedulerList(){ + List schedulerList = new LinkedList<>(); + Schedule schedule = new Schedule(); + schedule.setCrontab("0 0 0 1/2 * ?"); + schedulerList.add(schedule); + return schedulerList; + } + + private Map checkProjectAndAuth(){ + Map result = new HashMap<>(); + result.put(Constants.STATUS, Status.SUCCESS); + return result; + } +} \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java index 76e9d0b2ff..820b2fdaf4 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java @@ -1461,6 +1461,15 @@ public class ProcessDao { return scheduleMapper.selectById(id); } + /** + * query Schedule by processDefinitionId + * @param processDefinitionId processDefinitionId + * @see Schedule + */ + public List queryReleaseSchedulerListByProcessDefinitionId(int processDefinitionId) { + return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId); + } + /** * query need failover process instance * @param host host diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java index 3a1d125f48..8a49c8ff4f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java @@ -60,4 +60,11 @@ public interface ScheduleMapper extends BaseMapper { */ List queryByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId); + /** + * query schedule list by process definition id + * @param processDefinitionId + * @return + */ + List queryReleaseSchedulerListByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId); + } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml index 402c864251..ddae96a509 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml @@ -55,4 +55,9 @@ from t_ds_schedules where process_definition_id =#{processDefinitionId} + \ No newline at end of file diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml index 2ccc880a41..751fd919a8 100644 --- a/dolphinscheduler-server/pom.xml +++ b/dolphinscheduler-server/pom.xml @@ -111,6 +111,27 @@ dolphinscheduler-alert + + org.powermock + powermock-module-junit4 + test + + + org.powermock + powermock-api-mockito2 + test + + + org.mockito + mockito-core + + + + + org.mockito + mockito-core + test + diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index a91f8c17e6..6c147e2628 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner; import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Lists; import org.apache.commons.io.FileUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; @@ -29,10 +30,12 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.AlertManager; +import org.apache.dolphinscheduler.server.utils.ScheduleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -203,10 +206,30 @@ public class MasterExecThread implements Runnable { Date startDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); Date endDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); processDao.saveProcessInstance(processInstance); - Date scheduleDate = processInstance.getScheduleTime(); - if(scheduleDate == null){ - scheduleDate = startDate; + // get schedules + int processDefinitionId = processInstance.getProcessDefinitionId(); + List schedules = processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId); + List listDate = Lists.newLinkedList(); + if(!CollectionUtils.isEmpty(schedules)){ + for (Schedule schedule : schedules) { + List list = ScheduleUtils.getRecentTriggerTime(schedule.getCrontab(), startDate, endDate); + listDate.addAll(list); + } + } + // get first fire date + Iterator iterator = null; + Date scheduleDate = null; + if(!CollectionUtils.isEmpty(listDate)) { + iterator = listDate.iterator(); + scheduleDate = iterator.next(); + processInstance.setScheduleTime(scheduleDate); + processDao.updateProcessInstance(processInstance); + }else{ + scheduleDate = processInstance.getScheduleTime(); + if(scheduleDate == null){ + scheduleDate = startDate; + } } while(Stopper.isRunning()){ @@ -232,11 +255,22 @@ public class MasterExecThread implements Runnable { } // current process instance sucess ,next execute - scheduleDate = DateUtils.getSomeDay(scheduleDate, 1); - if(scheduleDate.after(endDate)){ - // all success - logger.info("process {} complement completely!", processInstance.getId()); - break; + if(null == iterator){ + // loop by day + scheduleDate = DateUtils.getSomeDay(scheduleDate, 1); + if(scheduleDate.after(endDate)){ + // all success + logger.info("process {} complement completely!", processInstance.getId()); + break; + } + }else{ + // loop by schedule date + if(!iterator.hasNext()){ + // all success + logger.info("process {} complement completely!", processInstance.getId()); + break; + } + scheduleDate = iterator.next(); } logger.info("process {} start to complement {} data", diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ScheduleUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ScheduleUtils.java new file mode 100644 index 0000000000..11730b9545 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ScheduleUtils.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.utils; + +import org.quartz.impl.triggers.CronTriggerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; + +/** + * ScheduleUtils + */ +public class ScheduleUtils { + + private static final Logger logger = LoggerFactory.getLogger(ScheduleUtils.class); + + /** + * Get the execution time of the time interval + * @param cron + * @param from + * @param to + * @return + */ + public static List getRecentTriggerTime(String cron, Date from, Date to) { + return getRecentTriggerTime(cron, Integer.MAX_VALUE, from, to); + } + + /** + * Get the execution time of the time interval + * @param cron + * @param size + * @param from + * @param to + * @return + */ + public static List getRecentTriggerTime(String cron, int size, Date from, Date to) { + List list = new LinkedList(); + if(to.before(from)){ + logger.error("schedule date from:{} must before date to:{}!", from, to); + return list; + } + try { + CronTriggerImpl trigger = new CronTriggerImpl(); + trigger.setCronExpression(cron); + trigger.setStartTime(from); + trigger.setEndTime(to); + trigger.computeFirstFireTime(null); + for (int i = 0; i < size; i++) { + Date schedule = trigger.getNextFireTime(); + if(null == schedule){ + break; + } + list.add(schedule); + trigger.triggered(null); + } + } catch (ParseException e) { + logger.error("cron:{} error:{}", cron, e.getMessage()); + } + return java.util.Collections.unmodifiableList(list); + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java new file mode 100644 index 0000000000..6f31e66213 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.master; + +import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.graph.DAG; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.SpringApplicationContext; +import org.apache.dolphinscheduler.dao.ProcessDao; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.runner.MasterExecThread; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.springframework.context.ApplicationContext; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.text.ParseException; +import java.util.*; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.mock; + +/** + * test for MasterExecThread + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({MasterExecThread.class}) +public class MasterExecThreadTest { + + private MasterExecThread masterExecThread; + + private ProcessInstance processInstance; + + private ProcessDao processDao; + + private int processDefinitionId = 1; + + private MasterConfig config; + + private ApplicationContext applicationContext; + + @Before + public void init() throws Exception{ + processDao = mock(ProcessDao.class); + + applicationContext = mock(ApplicationContext.class); + config = new MasterConfig(); + config.setMasterExecTaskNum(1); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); + + processInstance = mock(ProcessInstance.class); + Mockito.when(processInstance.getProcessDefinitionId()).thenReturn(processDefinitionId); + Mockito.when(processInstance.getState()).thenReturn(ExecutionStatus.SUCCESS); + Mockito.when(processInstance.getHistoryCmd()).thenReturn(CommandType.COMPLEMENT_DATA.toString()); + Mockito.when(processInstance.getIsSubProcess()).thenReturn(Flag.NO); + Mockito.when(processInstance.getScheduleTime()).thenReturn(DateUtils.stringToDate("2020-01-01 00:00:00")); + Map cmdParam = new HashMap<>(); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, "2020-01-01 00:00:00"); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, "2020-01-31 23:00:00"); + Mockito.when(processInstance.getCommandParam()).thenReturn(JSONObject.toJSONString(cmdParam)); + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setGlobalParamMap(Collections.EMPTY_MAP); + processDefinition.setGlobalParamList(Collections.EMPTY_LIST); + Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); + + masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processDao)); + // prepareProcess init dag + Field dag = MasterExecThread.class.getDeclaredField("dag"); + dag.setAccessible(true); + dag.set(masterExecThread, new DAG()); + PowerMockito.doNothing().when(masterExecThread, "executeProcess"); + PowerMockito.doNothing().when(masterExecThread, "postHandle"); + PowerMockito.doNothing().when(masterExecThread, "prepareProcess"); + PowerMockito.doNothing().when(masterExecThread, "runProcess"); + PowerMockito.doNothing().when(masterExecThread, "endProcess"); + } + + /** + * without schedule + * @throws ParseException + */ + @Test + public void testParallelWithOutSchedule() throws ParseException { + try{ + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess"); + method.setAccessible(true); + method.invoke(masterExecThread); + // one create save, and 1-30 for next save, and last day 31 no save + verify(processDao, times(31)).saveProcessInstance(processInstance); + }catch (Exception e){ + e.printStackTrace(); + Assert.assertTrue(false); + } + } + + /** + * with schedule + * @throws ParseException + */ + @Test + public void testParallelWithSchedule() throws ParseException { + try{ + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList()); + Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess"); + method.setAccessible(true); + method.invoke(masterExecThread); + // one create save, and 15(1 to 31 step 2) for next save, and last day 31 no save + verify(processDao, times(16)).saveProcessInstance(processInstance); + }catch (Exception e){ + Assert.assertTrue(false); + } + } + + private List zeroSchedulerList(){ + return Collections.EMPTY_LIST; + } + + private List oneSchedulerList(){ + List schedulerList = new LinkedList<>(); + Schedule schedule = new Schedule(); + schedule.setCrontab("0 0 0 1/2 * ?"); + schedulerList.add(schedule); + return schedulerList; + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ScheduleUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ScheduleUtilsTest.java new file mode 100644 index 0000000000..4fbbdab70f --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ScheduleUtilsTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.utils; + +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.junit.Test; +import java.util.Date; +import static org.junit.Assert.assertEquals; + +/** + * Test ScheduleUtils + */ +public class ScheduleUtilsTest { + + /** + * Test the getRecentTriggerTime method + */ + @Test + public void testGetRecentTriggerTime() { + Date from = DateUtils.stringToDate("2020-01-01 00:00:00"); + Date to = DateUtils.stringToDate("2020-01-31 01:00:00"); + // test date + assertEquals(0, ScheduleUtils.getRecentTriggerTime("0 0 0 * * ? ", to, from).size()); + // test error cron + assertEquals(0, ScheduleUtils.getRecentTriggerTime("0 0 0 * *", from, to).size()); + // test cron + assertEquals(31, ScheduleUtils.getRecentTriggerTime("0 0 0 * * ? ", from, to).size()); + } +} \ No newline at end of file diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue index 8672c41295..0a22e22535 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue @@ -141,7 +141,7 @@
- {{$t('Date')}} + {{$t('Schedule date')}}
**/api/service/ProcessDefinitionServiceTest.java **/api/service/UdfFuncServiceTest.java **/api/service/ResourcesServiceTest.java + **/api/service/ExecutorService2Test.java **/api/service/BaseServiceTest.java **/api/service/BaseDAGServiceTest.java **/alert/utils/ExcelUtilsTest.java @@ -703,6 +704,8 @@ **/server/utils/SparkArgsUtilsTest.java **/server/utils/FlinkArgsUtilsTest.java **/server/utils/ParamUtilsTest.java + **/server/utils/ScheduleUtilsTest.java + **/server/master/MasterExecThreadTest.java **/dao/mapper/AccessTokenMapperTest.java **/dao/mapper/AlertGroupMapperTest.java **/dao/mapper/AlertMapperTest.java