diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 1d944d0737..89782578cf 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -549,7 +549,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ command.setCommandParam(JSONUtils.toJsonString(cmdParam)); return processService.createCommand(command); } else if (runMode == RunMode.RUN_MODE_PARALLEL) { - List schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefineId); + List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefineId); // TODO: next pr change to code List listDate = new LinkedList<>(); if (!CollectionUtils.isEmpty(schedules)) { for (Schedule item : schedules) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index b28e89936d..65e2b10da2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -555,7 +555,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } // get the timing according to the process definition - List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId); + List schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinitionId); if (!schedules.isEmpty() && schedules.size() > 1) { logger.warn("scheduler num is {},Greater than 1", schedules.size()); putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR); @@ -630,7 +630,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processDefinition.setReleaseState(releaseState); processDefinitionMapper.updateById(processDefinition); List scheduleList = scheduleMapper.selectAllByProcessDefineArray( - new int[]{processDefinition.getId()} + new long[]{processDefinition.getCode()} ); for (Schedule schedule : scheduleList) { @@ -712,7 +712,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * @return DagDataSchedule */ public DagDataSchedule exportProcessDagData(ProcessDefinition processDefinition) { - List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinition.getId()); + List schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinition.getCode()); DagDataSchedule dagDataSchedule = new DagDataSchedule(processService.genDagData(processDefinition)); if (!schedules.isEmpty()) { Schedule schedule = schedules.get(0); @@ -821,7 +821,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro Schedule schedule = dagDataSchedule.getSchedule(); if (null != schedule) { ProcessDefinition newProcessDefinition = processDefinitionMapper.queryByCode(processDefinition.getCode()); - schedule.setProcessDefinitionId(newProcessDefinition.getId()); + schedule.setProcessDefinitionCode(newProcessDefinition.getCode()); schedule.setUserId(loginUser.getId()); schedule.setCreateTime(now); schedule.setUpdateTime(now); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java index 350ab9a513..edc7074fdb 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java @@ -140,7 +140,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe Date now = new Date(); scheduleObj.setProjectName(project.getName()); - scheduleObj.setProcessDefinitionId(processDefinition.getId()); + scheduleObj.setProcessDefinitionCode(processDefineCode); scheduleObj.setProcessDefinitionName(processDefinition.getName()); ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class); @@ -228,9 +228,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe return result; } - ProcessDefinition processDefinition = processService.findProcessDefineById(schedule.getProcessDefinitionId()); + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(schedule.getProcessDefinitionCode()); if (processDefinition == null) { - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, schedule.getProcessDefinitionId()); + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, schedule.getProcessDefinitionCode()); return result; } @@ -326,9 +326,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus); return result; } - ProcessDefinition processDefinition = processService.findProcessDefineById(scheduleObj.getProcessDefinitionId()); + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode()); if (processDefinition == null) { - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionId()); + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionCode()); return result; } @@ -342,7 +342,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe } // check sub process definition release state List subProcessDefineIds = new ArrayList<>(); - processService.recurseFindSubProcessId(scheduleObj.getProcessDefinitionId(), subProcessDefineIds); + processService.recurseFindSubProcessId(processDefinition.getId(), subProcessDefineIds); Integer[] idArray = subProcessDefineIds.toArray(new Integer[subProcessDefineIds.size()]); if (!subProcessDefineIds.isEmpty()) { List subProcessDefinitionList = @@ -430,7 +430,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe } Page page = new Page<>(pageNo, pageSize); - IPage scheduleIPage = scheduleMapper.queryByProcessDefineIdPaging(page, processDefinition.getId(), + IPage scheduleIPage = scheduleMapper.queryByProcessDefineCodePaging(page, processDefineCode, searchVal); PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index 624fa6e124..2cae0513b0 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -152,7 +152,7 @@ public class ExecutorServiceTest { @Test public void testNoComplement() { - Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList()); Map result = executorService.execProcessInstance(loginUser, projectCode, processDefinitionCode, cronTime, CommandType.START_PROCESS, null, null, @@ -170,7 +170,7 @@ public class ExecutorServiceTest { @Test public void testComplementWithStartNodeList() { - Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList()); Map result = executorService.execProcessInstance(loginUser, projectCode, processDefinitionCode, cronTime, CommandType.START_PROCESS, null, "n1,n2", @@ -188,7 +188,7 @@ public class ExecutorServiceTest { @Test public void testDateError() { - Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList()); Map result = executorService.execProcessInstance(loginUser, projectCode, processDefinitionCode, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA, null, null, @@ -205,7 +205,7 @@ public class ExecutorServiceTest { @Test public void testSerial() { - Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList()); Map result = executorService.execProcessInstance(loginUser, projectCode, processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA, null, null, @@ -222,7 +222,7 @@ public class ExecutorServiceTest { @Test public void testParallelWithOutSchedule() { - Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList()); Map result = executorService.execProcessInstance(loginUser, projectCode, processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA, null, null, @@ -240,7 +240,7 @@ public class ExecutorServiceTest { @Test public void testParallelWithSchedule() { - Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList()); + Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(oneSchedulerList()); Map result = executorService.execProcessInstance(loginUser, projectCode, processDefinitionCode, cronTime, CommandType.COMPLEMENT_DATA, null, null, diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index c25fa59746..f6a6a7fb7d 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.api.service; import static org.powermock.api.mockito.PowerMockito.mock; -import static org.powermock.api.mockito.PowerMockito.when; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl; @@ -34,8 +33,6 @@ import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.DagData; @@ -55,10 +52,6 @@ import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.http.entity.ContentType; - -import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.text.MessageFormat; import java.util.ArrayList; @@ -70,7 +63,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletResponse; import org.junit.Assert; @@ -80,8 +72,6 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; -import org.springframework.mock.web.MockMultipartFile; -import org.springframework.web.multipart.MultipartFile; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -516,7 +506,7 @@ public class ProcessDefinitionServiceTest { List schedules = new ArrayList<>(); schedules.add(getSchedule()); schedules.add(getSchedule()); - Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(schedules); + Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46)).thenReturn(schedules); Map schedulerGreaterThanOneRes = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46); Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, schedulerGreaterThanOneRes.get(Constants.STATUS)); @@ -527,7 +517,7 @@ public class ProcessDefinitionServiceTest { schedules.add(schedule); putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result); - Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(schedules); + Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46)).thenReturn(schedules); Map schedulerOnlineRes = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46); Assert.assertEquals(Status.SCHEDULE_CRON_STATE_ONLINE, schedulerOnlineRes.get(Constants.STATUS)); @@ -537,7 +527,7 @@ public class ProcessDefinitionServiceTest { schedules.add(schedule); putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, project.getName())).thenReturn(result); - Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(schedules); + Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46)).thenReturn(schedules); Mockito.when(processDefineMapper.deleteById(46)).thenReturn(0); Map deleteFail = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46); Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, deleteFail.get(Constants.STATUS)); @@ -950,7 +940,7 @@ public class ProcessDefinitionServiceTest { Date date = new Date(); Schedule schedule = new Schedule(); schedule.setId(46); - schedule.setProcessDefinitionId(1); + schedule.setProcessDefinitionCode(1); schedule.setStartTime(date); schedule.setEndTime(date); schedule.setCrontab("0 0 5 * * ? *"); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java index 7cd383a071..238d1d08fc 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -70,6 +71,9 @@ public class SchedulerServiceTest { @Mock private ProjectMapper projectMapper; + @Mock + private ProcessDefinitionMapper processDefinitionMapper; + @Mock private ProjectServiceImpl projectService; @@ -102,7 +106,7 @@ public class SchedulerServiceTest { Schedule schedule = new Schedule(); schedule.setId(1); - schedule.setProcessDefinitionId(1); + schedule.setProcessDefinitionCode(1); schedule.setReleaseState(ReleaseState.OFFLINE); List masterServers = new ArrayList<>(); @@ -113,7 +117,7 @@ public class SchedulerServiceTest { Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project); - Mockito.when(processService.findProcessDefineById(1)).thenReturn(processDefinition); + Mockito.when(processDefinitionMapper.queryByCode(1)).thenReturn(processDefinition); //hash no auth result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE); @@ -128,10 +132,10 @@ public class SchedulerServiceTest { Assert.assertEquals(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, result.get(Constants.STATUS)); //PROCESS_DEFINE_NOT_EXIST - schedule.setProcessDefinitionId(2); + schedule.setProcessDefinitionCode(2); result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE); Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, result.get(Constants.STATUS)); - schedule.setProcessDefinitionId(1); + schedule.setProcessDefinitionCode(1); // PROCESS_DEFINE_NOT_RELEASE result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java index 74ed5c1ee1..e1b4c90ea4 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java @@ -39,10 +39,11 @@ public class Schedule { @TableId(value = "id", type = IdType.AUTO) private int id; + /** - * process definition id + * process definition code */ - private int processDefinitionId; + private long processDefinitionCode; /** * process definition name @@ -222,12 +223,12 @@ public class Schedule { this.releaseState = releaseState; } - public int getProcessDefinitionId() { - return processDefinitionId; + public long getProcessDefinitionCode() { + return processDefinitionCode; } - public void setProcessDefinitionId(int processDefinitionId) { - this.processDefinitionId = processDefinitionId; + public void setProcessDefinitionCode(long processDefinitionCode) { + this.processDefinitionCode = processDefinitionCode; } public String getProcessDefinitionName() { @@ -290,7 +291,7 @@ public class Schedule { public String toString() { return "Schedule{" + "id=" + id - + ", processDefinitionId=" + processDefinitionId + + ", processDefinitionCode=" + processDefinitionCode + ", processDefinitionName='" + processDefinitionName + '\'' + ", projectName='" + projectName + '\'' + ", description='" + definitionDescription + '\'' diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java index 225677d152..37fae5d393 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java @@ -17,12 +17,14 @@ package org.apache.dolphinscheduler.dao.mapper; import org.apache.dolphinscheduler.dao.entity.Schedule; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import com.baomidou.mybatisplus.core.metadata.IPage; + import org.apache.ibatis.annotations.Param; import java.util.List; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.core.metadata.IPage; + /** * scheduler mapper interface */ @@ -31,13 +33,13 @@ public interface ScheduleMapper extends BaseMapper { /** * scheduler page * @param page page - * @param processDefinitionId processDefinitionId + * @param processDefinitionCode processDefinitionCode * @param searchVal searchVal * @return scheduler IPage */ - IPage queryByProcessDefineIdPaging(IPage page, - @Param("processDefinitionId") int processDefinitionId, - @Param("searchVal") String searchVal); + IPage queryByProcessDefineCodePaging(IPage page, + @Param("processDefinitionCode") long processDefinitionCode, + @Param("searchVal") String searchVal); /** * query schedule list by project name @@ -47,24 +49,24 @@ public interface ScheduleMapper extends BaseMapper { List querySchedulerListByProjectName(@Param("projectName") String projectName); /** - * query schedule list by process definition ids - * @param processDefineIds processDefineIds + * query schedule list by process definition codes + * @param processDefineCodes processDefineCodes * @return schedule list */ - List selectAllByProcessDefineArray(@Param("processDefineIds") int[] processDefineIds); + List selectAllByProcessDefineArray(@Param("processDefineCodes") long[] processDefineCodes); /** - * query schedule list by process definition id - * @param processDefinitionId processDefinitionId + * query schedule list by process definition code + * @param processDefinitionCode processDefinitionCode * @return schedule list */ - List queryByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId); + List queryByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode); /** - * query schedule list by process definition id - * @param processDefinitionId processDefinitionId + * query schedule list by process definition code + * @param processDefinitionCode processDefinitionCode * @return schedule list */ - List queryReleaseSchedulerListByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId); + List queryReleaseSchedulerListByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml index 0ede680435..590e44746c 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml @@ -72,8 +72,8 @@ td.global_params, td.flag, td.warning_group_id, td.timeout, td.tenant_id, td.update_time, td.create_time, sc.schedule_release_state, tu.user_name FROM t_ds_process_definition td - left join (select process_definition_id,release_state as schedule_release_state from t_ds_schedules group by - process_definition_id,release_state) sc on sc.process_definition_id = td.id + left join (select process_definition_code,release_state as schedule_release_state from t_ds_schedules group by + process_definition_code,release_state) sc on sc.process_definition_code = td.code left join t_ds_user tu on td.user_id = tu.id where td.project_code = #{projectCode} @@ -120,7 +120,7 @@ group by td.user_id,tu.user_name - + + - select from t_ds_schedules - where process_definition_id =#{processDefinitionId} + where process_definition_code = #{processDefinitionCode} - diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml index b78113b66f..eeddaf56e8 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml @@ -22,7 +22,7 @@ diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java index e55aacaa6c..752eca4685 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapperTest.java @@ -14,8 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.dao.mapper; +package org.apache.dolphinscheduler.dao.mapper; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.ReleaseState; @@ -24,8 +24,10 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.User; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + +import java.util.Date; +import java.util.List; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -35,8 +37,8 @@ import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.transaction.annotation.Transactional; -import java.util.Date; -import java.util.List; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @RunWith(SpringRunner.class) @SpringBootTest @@ -61,7 +63,7 @@ public class ScheduleMapperTest { * insert * @return Schedule */ - private Schedule insertOne(){ + private Schedule insertOne() { //insertOne Schedule schedule = new Schedule(); schedule.setStartTime(new Date()); @@ -80,7 +82,7 @@ public class ScheduleMapperTest { * test update */ @Test - public void testUpdate(){ + public void testUpdate() { //insertOne Schedule schedule = insertOne(); schedule.setCreateTime(new Date()); @@ -93,7 +95,7 @@ public class ScheduleMapperTest { * test delete */ @Test - public void testDelete(){ + public void testDelete() { Schedule schedule = insertOne(); int delete = scheduleMapper.deleteById(schedule.getId()); Assert.assertEquals(delete, 1); @@ -138,18 +140,15 @@ public class ScheduleMapperTest { processDefinition.setUpdateTime(new Date()); processDefinitionMapper.insert(processDefinition); - Schedule schedule= insertOne(); + Schedule schedule = insertOne(); schedule.setUserId(user.getId()); - schedule.setProcessDefinitionId(processDefinition.getId()); + schedule.setProcessDefinitionCode(processDefinition.getCode()); scheduleMapper.insert(schedule); Page page = new Page(1,3); - IPage scheduleIPage = scheduleMapper.queryByProcessDefineIdPaging(page, - processDefinition.getId(), "" - ); + IPage scheduleIPage = scheduleMapper.queryByProcessDefineCodePaging(page, + processDefinition.getCode(), ""); Assert.assertNotEquals(scheduleIPage.getSize(), 0); - - } /** @@ -158,7 +157,6 @@ public class ScheduleMapperTest { @Test public void testQuerySchedulerListByProjectName() { - User user = new User(); user.setUserName("ut name"); userMapper.insert(user); @@ -181,9 +179,9 @@ public class ScheduleMapperTest { processDefinition.setUpdateTime(new Date()); processDefinitionMapper.insert(processDefinition); - Schedule schedule= insertOne(); + Schedule schedule = insertOne(); schedule.setUserId(user.getId()); - schedule.setProcessDefinitionId(processDefinition.getId()); + schedule.setProcessDefinitionCode(processDefinition.getCode()); scheduleMapper.insert(schedule); Page page = new Page(1,3); @@ -201,11 +199,11 @@ public class ScheduleMapperTest { public void testSelectAllByProcessDefineArray() { Schedule schedule = insertOne(); - schedule.setProcessDefinitionId(12345); + schedule.setProcessDefinitionCode(12345); schedule.setReleaseState(ReleaseState.ONLINE); scheduleMapper.updateById(schedule); - List schedules= scheduleMapper.selectAllByProcessDefineArray(new int[] {schedule.getProcessDefinitionId()}); + List schedules = scheduleMapper.selectAllByProcessDefineArray(new long[] {schedule.getProcessDefinitionCode()}); Assert.assertNotEquals(schedules.size(), 0); } @@ -215,10 +213,10 @@ public class ScheduleMapperTest { @Test public void queryByProcessDefinitionId() { Schedule schedule = insertOne(); - schedule.setProcessDefinitionId(12345); + schedule.setProcessDefinitionCode(12345); scheduleMapper.updateById(schedule); - List schedules= scheduleMapper.queryByProcessDefinitionId(schedule.getProcessDefinitionId()); + List schedules = scheduleMapper.queryByProcessDefinitionCode(schedule.getProcessDefinitionCode()); Assert.assertNotEquals(schedules.size(), 0); } -} \ No newline at end of file +} diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java index 28e9dc2ba4..cc60ce4504 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapperTest.java @@ -112,7 +112,7 @@ public class WorkFlowLineageMapperTest { schedule.setWarningType(WarningType.NONE); schedule.setCreateTime(new Date()); schedule.setUpdateTime(new Date()); - schedule.setProcessDefinitionId(id); + schedule.setProcessDefinitionCode(id); scheduleMapper.insert(schedule); return schedule; } @@ -131,8 +131,9 @@ public class WorkFlowLineageMapperTest { public void testQueryCodeRelation() { ProcessTaskRelation processTaskRelation = insertOneProcessTaskRelation(); - List workFlowLineages = workFlowLineageMapper.queryCodeRelation(processTaskRelation.getPreTaskCode() - , processTaskRelation.getPreTaskVersion(), 11L, 1L); + List workFlowLineages = workFlowLineageMapper.queryCodeRelation( + processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion(), + processTaskRelation.getProcessDefinitionCode(), processTaskRelation.getProjectCode()); Assert.assertNotEquals(workFlowLineages.size(), 0); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 1863087fca..72f7b47eae 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -255,8 +255,8 @@ public class MasterExecThread implements Runnable { processService.saveProcessInstance(processInstance); // get schedules - int processDefinitionId = processInstance.getProcessDefinition().getId(); - List schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId); + List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode( + processInstance.getProcessDefinitionCode()); List listDate = Lists.newLinkedList(); if (!CollectionUtils.isEmpty(schedules)) { for (Schedule schedule : schedules) { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java index fbc4ed800d..196fb54e76 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java @@ -76,7 +76,7 @@ public class MasterExecThreadTest { private ProcessService processService; - private int processDefinitionId = 1; + private long processDefinitionCode = 1L; private MasterConfig config; @@ -104,6 +104,7 @@ public class MasterExecThreadTest { processDefinition.setGlobalParamMap(Collections.EMPTY_MAP); processDefinition.setGlobalParamList(Collections.EMPTY_LIST); Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); + Mockito.when(processInstance.getProcessDefinitionCode()).thenReturn(processDefinitionCode); masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processService, null, null, config)); // prepareProcess init dag @@ -120,9 +121,9 @@ public class MasterExecThreadTest { * without schedule */ @Test - public void testParallelWithOutSchedule() throws ParseException { + public void testParallelWithOutSchedule() { try { - Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(zeroSchedulerList()); Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess"); method.setAccessible(true); method.invoke(masterExecThread); @@ -140,12 +141,12 @@ public class MasterExecThreadTest { @Test public void testParallelWithSchedule() { try { - Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList()); + Mockito.when(processService.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode)).thenReturn(oneSchedulerList()); Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess"); method.setAccessible(true); method.invoke(masterExecThread); // one create save, and 9(1 to 20 step 2) for next save, and last day 31 no save - verify(processService, times(20)).saveProcessInstance(processInstance); + verify(processService, times(9)).saveProcessInstance(processInstance); } catch (Exception e) { Assert.fail(); } @@ -267,4 +268,4 @@ public class MasterExecThreadTest { return schedulerList; } -} \ No newline at end of file +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 83b5e8c0e4..01e4678b6e 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ConditionType; -import org.apache.dolphinscheduler.common.enums.CycleEnum; import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.FailureStrategy; @@ -64,7 +63,6 @@ import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeExceptio import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.entity.Command; -import org.apache.dolphinscheduler.dao.entity.CycleDependency; import org.apache.dolphinscheduler.dao.entity.DagData; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.ErrorCommand; @@ -108,11 +106,9 @@ import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.log.LogClientService; -import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import java.util.ArrayList; import java.util.Arrays; -import java.util.Calendar; import java.util.Date; import java.util.EnumMap; import java.util.HashMap; @@ -125,14 +121,12 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; -import org.quartz.CronExpression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; -import com.cronutils.model.Cron; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -403,6 +397,16 @@ public class ProcessService { return processDefinition; } + /** + * find process define by code. + * + * @param processDefinitionCode processDefinitionCode + * @return process definition + */ + public ProcessDefinition findProcessDefinitionByCode(Long processDefinitionCode) { + return processDefineMapper.queryByCode(processDefinitionCode); + } + /** * delete work process instance by id * @@ -483,7 +487,6 @@ public class ProcessService { public void recurseFindSubProcessId(int parentId, List ids) { List taskNodeList = this.getTaskNodeListByDefinitionId(parentId); - if (taskNodeList != null && !taskNodeList.isEmpty()) { for (TaskDefinition taskNode : taskNodeList) { @@ -1628,7 +1631,6 @@ public class ProcessService { taskInstance.setTaskParams(JSONUtils.toJsonString(taskParams)); } - /** * convert integer list to string list * @@ -1657,13 +1659,13 @@ public class ProcessService { } /** - * query Schedule by processDefinitionId + * query Schedule by processDefinitionCode * - * @param processDefinitionId processDefinitionId + * @param processDefinitionCode processDefinitionCode * @see Schedule */ - public List queryReleaseSchedulerListByProcessDefinitionId(int processDefinitionId) { - return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId); + public List queryReleaseSchedulerListByProcessDefinitionCode(long processDefinitionCode) { + return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode); } /** @@ -1730,7 +1732,6 @@ public class ProcessService { ProcessInstance instance = processInstanceMapper.selectById(processInstanceId); instance.setState(executionStatus); return processInstanceMapper.updateById(instance); - } /** @@ -1785,98 +1786,13 @@ public class ProcessService { } /** - * find schedule list by process define id. + * find schedule list by process define codes. * - * @param ids ids + * @param codes codes * @return schedule list */ - public List selectAllByProcessDefineId(int[] ids) { - return scheduleMapper.selectAllByProcessDefineArray( - ids); - } - - /** - * get dependency cycle by work process define id and scheduler fire time - * - * @param masterId masterId - * @param processDefinitionId processDefinitionId - * @param scheduledFireTime the time the task schedule is expected to trigger - * @return CycleDependency - * @throws Exception if error throws Exception - */ - public CycleDependency getCycleDependency(int masterId, int processDefinitionId, Date scheduledFireTime) throws Exception { - List list = getCycleDependencies(masterId, new int[]{processDefinitionId}, scheduledFireTime); - return !list.isEmpty() ? list.get(0) : null; - - } - - /** - * get dependency cycle list by work process define id list and scheduler fire time - * - * @param masterId masterId - * @param ids ids - * @param scheduledFireTime the time the task schedule is expected to trigger - * @return CycleDependency list - * @throws Exception if error throws Exception - */ - public List getCycleDependencies(int masterId, int[] ids, Date scheduledFireTime) throws Exception { - List cycleDependencyList = new ArrayList<>(); - if (null == ids || ids.length == 0) { - logger.warn("ids[] is empty!is invalid!"); - return cycleDependencyList; - } - if (scheduledFireTime == null) { - logger.warn("scheduledFireTime is null!is invalid!"); - return cycleDependencyList; - } - - String strCrontab = ""; - CronExpression depCronExpression; - Cron depCron; - List list; - List schedules = this.selectAllByProcessDefineId(ids); - // for all scheduling information - for (Schedule depSchedule : schedules) { - strCrontab = depSchedule.getCrontab(); - depCronExpression = CronUtils.parse2CronExpression(strCrontab); - depCron = CronUtils.parse2Cron(strCrontab); - CycleEnum cycleEnum = CronUtils.getMiniCycle(depCron); - if (cycleEnum == null) { - logger.error("{} is not valid", strCrontab); - continue; - } - Calendar calendar = Calendar.getInstance(); - switch (cycleEnum) { - case HOUR: - calendar.add(Calendar.HOUR, -25); - break; - case DAY: - case WEEK: - calendar.add(Calendar.DATE, -32); - break; - case MONTH: - calendar.add(Calendar.MONTH, -13); - break; - default: - String cycleName = cycleEnum.name(); - logger.warn("Dependent process definition's cycleEnum is {},not support!!", cycleName); - continue; - } - Date start = calendar.getTime(); - - if (depSchedule.getProcessDefinitionId() == masterId) { - list = CronUtils.getSelfFireDateList(start, scheduledFireTime, depCronExpression); - } else { - list = CronUtils.getFireDateList(start, scheduledFireTime, depCronExpression); - } - if (!list.isEmpty()) { - start = list.get(list.size() - 1); - CycleDependency dependency = new CycleDependency(depSchedule.getProcessDefinitionId(), start, CronUtils.getExpirationTime(start, cycleEnum), cycleEnum); - cycleDependencyList.add(dependency); - } - - } - return cycleDependencyList; + public List selectAllByProcessDefineCode(long[] codes) { + return scheduleMapper.selectAllByProcessDefineArray(codes); } /** diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java index 2921ce2bba..bda8ad899f 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java @@ -81,11 +81,11 @@ public class ProcessScheduleJob implements Job { return; } - ProcessDefinition processDefinition = getProcessService().findProcessDefineById(schedule.getProcessDefinitionId()); + ProcessDefinition processDefinition = getProcessService().findProcessDefinitionByCode(schedule.getProcessDefinitionCode()); // release state : online/offline ReleaseState releaseState = processDefinition.getReleaseState(); if (releaseState == ReleaseState.OFFLINE) { - logger.warn("process definition does not exist in db or offline,need not to create command, projectId:{}, processId:{}", projectId, scheduleId); + logger.warn("process definition does not exist in db or offline,need not to create command, projectId:{}, processId:{}", projectId, processDefinition.getId()); return; } @@ -93,7 +93,7 @@ public class ProcessScheduleJob implements Job { command.setCommandType(CommandType.SCHEDULER); command.setExecutorId(schedule.getUserId()); command.setFailureStrategy(schedule.getFailureStrategy()); - command.setProcessDefinitionId(schedule.getProcessDefinitionId()); + //command.setProcessDefinitionId(schedule.getProcessDefinitionCode()); TODO next pr command.setScheduleTime(scheduledFireTime); command.setStartTime(fireTime); command.setWarningGroupId(schedule.getWarningGroupId()); diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql index a6e3f977e5..11bdc8dbfa 100644 --- a/sql/dolphinscheduler_mysql.sql +++ b/sql/dolphinscheduler_mysql.sql @@ -747,7 +747,7 @@ CREATE TABLE `t_ds_resources` ( DROP TABLE IF EXISTS `t_ds_schedules`; CREATE TABLE `t_ds_schedules` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key', - `process_definition_id` int(11) NOT NULL COMMENT 'process definition id', + `process_definition_code` bigint(20) NOT NULL COMMENT 'process definition code', `start_time` datetime NOT NULL COMMENT 'start time', `end_time` datetime NOT NULL COMMENT 'end time', `timezone_id` varchar(40) DEFAULT NULL COMMENT 'timezoneId', diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql index 88cf1431e9..eb97912562 100644 --- a/sql/dolphinscheduler_postgre.sql +++ b/sql/dolphinscheduler_postgre.sql @@ -611,7 +611,7 @@ CREATE TABLE t_ds_resources ( DROP TABLE IF EXISTS t_ds_schedules; CREATE TABLE t_ds_schedules ( id int NOT NULL , - process_definition_id int NOT NULL , + process_definition_code bigint NOT NULL , start_time timestamp NOT NULL , end_time timestamp NOT NULL , timezone_id varchar(40) default NULL ,