diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 576f0c3eba..c1689c5bec 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -29,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.server.utils.ScheduleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -499,22 +501,47 @@ public class ExecutorService extends BaseService{ if(commandType == CommandType.COMPLEMENT_DATA){ runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode; - if(runMode == RunMode.RUN_MODE_SERIAL){ - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end)); - command.setCommandParam(JSONUtils.toJson(cmdParam)); - return processDao.createCommand(command); - }else if (runMode == RunMode.RUN_MODE_PARALLEL){ - int runCunt = 0; - while(!start.after(end)){ - runCunt += 1; + if(null != start && null != end && start.before(end)){ + if(runMode == RunMode.RUN_MODE_SERIAL){ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start)); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end)); command.setCommandParam(JSONUtils.toJson(cmdParam)); - processDao.createCommand(command); - start = DateUtils.getSomeDay(start, 1); + return processDao.createCommand(command); + }else if (runMode == RunMode.RUN_MODE_PARALLEL){ + List schedules = processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefineId); + List listDate = new LinkedList<>(); + if(!CollectionUtils.isEmpty(schedules)){ + for (Schedule item : schedules) { + List list = ScheduleUtils.getRecentTriggerTime(item.getCrontab(), start, end); + listDate.addAll(list); + } + } + if(!CollectionUtils.isEmpty(listDate)){ + // loop by schedule date + for (Date date : listDate) { + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(date)); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(date)); + command.setCommandParam(JSONUtils.toJson(cmdParam)); + processDao.createCommand(command); + } + return listDate.size(); + }else{ + // loop by day + int runCunt = 0; + while(!start.after(end)) { + runCunt += 1; + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start)); + command.setCommandParam(JSONUtils.toJson(cmdParam)); + processDao.createCommand(command); + start = DateUtils.getSomeDay(start, 1); + } + return runCunt; + } } - return runCunt; + }else{ + logger.error("there is not vaild schedule date for the process definition: id:{},date:{}", + processDefineId, schedule); } }else{ command.setCommandParam(JSONUtils.toJson(cmdParam)); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SessionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SessionService.java index f3f66d730d..2588dd0e65 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SessionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SessionService.java @@ -139,12 +139,16 @@ public class SessionService extends BaseService{ * @param loginUser login user */ public void signOut(String ip, User loginUser) { - /** - * query session by user id and ip - */ - Session session = sessionMapper.queryByUserIdAndIp(loginUser.getId(),ip); + try { + /** + * query session by user id and ip + */ + Session session = sessionMapper.queryByUserIdAndIp(loginUser.getId(),ip); - //delete session - sessionMapper.deleteById(session.getId()); + //delete session + sessionMapper.deleteById(session.getId()); + }catch (Exception e){ + logger.warn("userId : {} , ip : {} , find more one session",loginUser.getId(),ip); + } } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java new file mode 100644 index 0000000000..b4f3e7e31f --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.service; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.RunMode; +import org.apache.dolphinscheduler.dao.ProcessDao; +import org.apache.dolphinscheduler.dao.entity.*; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.text.ParseException; +import java.util.*; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; + +/** + * test for ExecutorService + */ +@RunWith(MockitoJUnitRunner.Silent.class) +public class ExecutorService2Test { + + @InjectMocks + private ExecutorService executorService; + + @Mock + private ProcessDao processDao; + + @Mock + private ProcessDefinitionMapper processDefinitionMapper; + + @Mock + private ProjectMapper projectMapper; + + @Mock + private ProjectService projectService; + + private int processDefinitionId = 1; + + private int tenantId = 1; + + private int userId = 1; + + private ProcessDefinition processDefinition = new ProcessDefinition(); + + private User loginUser = new User(); + + private String projectName = "projectName"; + + private Project project = new Project(); + + private String cronTime; + + @Before + public void init(){ + // user + loginUser.setId(userId); + + // processDefinition + processDefinition.setId(processDefinitionId); + processDefinition.setReleaseState(ReleaseState.ONLINE); + processDefinition.setTenantId(tenantId); + processDefinition.setUserId(userId); + + // project + project.setName(projectName); + + // cronRangeTime + cronTime = "2020-01-01 00:00:00,2020-01-31 23:00:00"; + + // mock + Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project); + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(checkProjectAndAuth()); + Mockito.when(processDefinitionMapper.selectById(processDefinitionId)).thenReturn(processDefinition); + Mockito.when(processDao.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant()); + Mockito.when(processDao.createCommand(any(Command.class))).thenReturn(1); + } + + /** + * not complement + * @throws ParseException + */ + @Test + public void testNoComplement() throws ParseException { + try { + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectName, + processDefinitionId, cronTime, CommandType.START_PROCESS, + null, null, + null, null, 0, + "", "", RunMode.RUN_MODE_SERIAL, + Priority.LOW, 0, 110); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + verify(processDao, times(1)).createCommand(any(Command.class)); + }catch (Exception e){ + Assert.assertTrue(false); + } + } + + /** + * date error + * @throws ParseException + */ + @Test + public void testDateError() throws ParseException { + try { + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectName, + processDefinitionId, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + "", "", RunMode.RUN_MODE_SERIAL, + Priority.LOW, 0, 110); + Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); + verify(processDao, times(0)).createCommand(any(Command.class)); + }catch (Exception e){ + Assert.assertTrue(false); + } + } + + /** + * serial + * @throws ParseException + */ + @Test + public void testSerial() throws ParseException { + try { + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectName, + processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + "", "", RunMode.RUN_MODE_SERIAL, + Priority.LOW, 0, 110); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + verify(processDao, times(1)).createCommand(any(Command.class)); + }catch (Exception e){ + Assert.assertTrue(false); + } + } + + /** + * without schedule + * @throws ParseException + */ + @Test + public void testParallelWithOutSchedule() throws ParseException { + try{ + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectName, + processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + "", "", RunMode.RUN_MODE_PARALLEL, + Priority.LOW, 0, 110); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + verify(processDao, times(31)).createCommand(any(Command.class)); + }catch (Exception e){ + Assert.assertTrue(false); + } + } + + /** + * with schedule + * @throws ParseException + */ + @Test + public void testParallelWithSchedule() throws ParseException { + try{ + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectName, + processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + "", "", RunMode.RUN_MODE_PARALLEL, + Priority.LOW, 0, 110); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + verify(processDao, times(16)).createCommand(any(Command.class)); + }catch (Exception e){ + Assert.assertTrue(false); + } + } + + private List zeroSchedulerList(){ + return Collections.EMPTY_LIST; + } + + private List oneSchedulerList(){ + List schedulerList = new LinkedList<>(); + Schedule schedule = new Schedule(); + schedule.setCrontab("0 0 0 1/2 * ?"); + schedulerList.add(schedule); + return schedulerList; + } + + private Map checkProjectAndAuth(){ + Map result = new HashMap<>(); + result.put(Constants.STATUS, Status.SUCCESS); + return result; + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java new file mode 100644 index 0000000000..1c371e799e --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.enums; + +import com.baomidou.mybatisplus.annotation.EnumValue; + +/** + * Authorization type + */ +public enum AuthorizationType { + /** + * 0 RESOURCE_FILE; + * 1 DATASOURCE; + * 2 UDF; + */ + RESOURCE_FILE(0, "resource file"), + DATASOURCE(1, "data source"), + UDF(2, "udf function"); + + AuthorizationType(int code, String descp){ + this.code = code; + this.descp = descp; + } + + @EnumValue + private final int code; + private final String descp; + + public int getCode() { + return code; + } + + public String getDescp() { + return descp; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/ClickHouseDataSource.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/ClickHouseDataSource.java index 457ddb0a1d..fe76497ff8 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/ClickHouseDataSource.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/ClickHouseDataSource.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.job.db; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +59,7 @@ public class ClickHouseDataSource extends BaseDataSource { public void isConnectable() throws Exception { Connection con = null; try { - Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); + Class.forName(Constants.COM_CLICKHOUSE_JDBC_DRIVER); con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword()); } finally { if (con != null) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/HiveDataSource.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/HiveDataSource.java index 101efae793..cddedd1f73 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/HiveDataSource.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/HiveDataSource.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.job.db; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +62,7 @@ public class HiveDataSource extends BaseDataSource { public void isConnectable() throws Exception { Connection con = null; try { - Class.forName("org.apache.hive.jdbc.HiveDriver"); + Class.forName(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER); con = DriverManager.getConnection(getJdbcUrl(), getUser(), ""); } finally { if (con != null) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/MySQLDataSource.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/MySQLDataSource.java index 3cf2b2ce8c..fa149e67e2 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/MySQLDataSource.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/MySQLDataSource.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.job.db; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +57,7 @@ public class MySQLDataSource extends BaseDataSource { public void isConnectable() throws Exception { Connection con = null; try { - Class.forName("com.mysql.jdbc.Driver"); + Class.forName(Constants.COM_MYSQL_JDBC_DRIVER); con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword()); } finally { if (con != null) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/OracleDataSource.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/OracleDataSource.java index 551c7823cb..c3dc3a96df 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/OracleDataSource.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/OracleDataSource.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.job.db; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +59,7 @@ public class OracleDataSource extends BaseDataSource { public void isConnectable() throws Exception { Connection con = null; try { - Class.forName("oracle.jdbc.driver.OracleDriver"); + Class.forName(Constants.COM_ORACLE_JDBC_DRIVER); con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword()); } finally { if (con != null) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/PostgreDataSource.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/PostgreDataSource.java index 5241b4c7ef..4989e7681e 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/PostgreDataSource.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/PostgreDataSource.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.job.db; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +61,7 @@ public class PostgreDataSource extends BaseDataSource { public void isConnectable() throws Exception { Connection con = null; try { - Class.forName("org.postgresql.Driver"); + Class.forName(Constants.ORG_POSTGRESQL_DRIVER); con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword()); } finally { if (con != null) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SQLServerDataSource.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SQLServerDataSource.java index fe398eb88b..8554992efc 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SQLServerDataSource.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SQLServerDataSource.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.job.db; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +55,7 @@ public class SQLServerDataSource extends BaseDataSource { public void isConnectable() throws Exception { Connection con = null; try { - Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver"); + Class.forName(Constants.COM_SQLSERVER_JDBC_DRIVER); con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword()); } finally { if (con != null) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SparkDataSource.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SparkDataSource.java index eb455124de..5d10c63e5d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SparkDataSource.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SparkDataSource.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.job.db; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +64,7 @@ public class SparkDataSource extends BaseDataSource { public void isConnectable() throws Exception { Connection con = null; try { - Class.forName("org.apache.hive.jdbc.HiveDriver"); + Class.forName(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER); con = DriverManager.getConnection(getJdbcUrl(), getUser(), ""); } finally { if (con != null) { diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java index a45fbfff6f..820b2fdaf4 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.dao; import com.alibaba.fastjson.JSONObject; import com.cronutils.model.Cron; +import org.apache.commons.lang.ArrayUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; import org.apache.dolphinscheduler.common.model.DateInterval; @@ -25,7 +26,6 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.queue.ITaskQueue; import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters; -import org.apache.dolphinscheduler.common.utils.ArrayUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.IpUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -44,6 +44,7 @@ import org.springframework.transaction.annotation.Transactional; import java.util.*; import java.util.stream.Collectors; +import static java.util.stream.Collectors.toSet; import static org.apache.dolphinscheduler.common.Constants.*; /** @@ -462,12 +463,9 @@ public class ProcessDao { return null; } - if(null == tenant){ + if(tenant == null){ User user = userMapper.selectById(userId); - - if (null != user) { - tenant = tenantMapper.queryById(user.getTenantId()); - } + tenant = tenantMapper.queryById(user.getTenantId()); } return tenant; } @@ -974,6 +972,9 @@ public class ProcessDao { public Boolean submitTaskToQueue(TaskInstance taskInstance) { try{ + if(taskInstance.isSubProcess()){ + return true; + } if(taskInstance.getState().typeIsFinished()){ logger.info(String.format("submit to task queue, but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString())); return true; @@ -1460,6 +1461,15 @@ public class ProcessDao { return scheduleMapper.selectById(id); } + /** + * query Schedule by processDefinitionId + * @param processDefinitionId processDefinitionId + * @see Schedule + */ + public List queryReleaseSchedulerListByProcessDefinitionId(int processDefinitionId) { + return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId); + } + /** * query need failover process instance * @param host host @@ -1770,5 +1780,47 @@ public class ProcessDao { return projectIdList; } + /** + * list unauthorized udf function + * @param userId user id + * @param needChecks data source id array + * @return unauthorized udf function list + */ + public List listUnauthorized(int userId,T[] needChecks,AuthorizationType authorizationType){ + List resultList = new ArrayList(); + + if (!ArrayUtils.isEmpty(needChecks)) { + Set originResSet = new HashSet(Arrays.asList(needChecks)); + + switch (authorizationType){ + case RESOURCE_FILE: + Set authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t -> t.getAlias()).collect(toSet()); + originResSet.removeAll(authorizedResources); + break; + case DATASOURCE: + Set authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId,needChecks).stream().map(t -> t.getId()).collect(toSet()); + originResSet.removeAll(authorizedDatasources); + break; + case UDF: + Set authorizedUdfs = udfFuncMapper.listAuthorizedUdfFunc(userId, needChecks).stream().map(t -> t.getId()).collect(toSet()); + originResSet.removeAll(authorizedUdfs); + break; + } + + resultList.addAll(originResSet); + } + + return resultList; + } + + /** + * get user by user id + * @param userId user id + * @return User + */ + public User getUserById(int userId){ + return userMapper.queryDetailsById(userId); + } + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java index 2b4944dd61..f95fbc7a4d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java @@ -77,4 +77,13 @@ public interface DataSourceMapper extends BaseMapper { List listAllDataSourceByType(@Param("type") Integer type); + /** + * list authorized UDF function + * @param userId userId + * @param dataSourceIds data source id array + * @return UDF function list + */ + List listAuthorizedDataSource(@Param("userId") int userId,@Param("dataSourceIds")T[] dataSourceIds); + + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java index 4e3d9c3f45..cf65e5d08a 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java @@ -83,4 +83,12 @@ public interface ResourceMapper extends BaseMapper { * @return tenant code */ String queryTenantCodeByResourceName(@Param("resName") String resName); + + /** + * list authorized resource + * @param userId userId + * @param resNames resource names + * @return resource list + */ + List listAuthorizedResource(@Param("userId") int userId,@Param("resNames")T[] resNames); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java index 3a1d125f48..8a49c8ff4f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java @@ -60,4 +60,11 @@ public interface ScheduleMapper extends BaseMapper { */ List queryByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId); + /** + * query schedule list by process definition id + * @param processDefinitionId + * @return + */ + List queryReleaseSchedulerListByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId); + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java index 03ad58da86..5a8734233c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java @@ -78,5 +78,12 @@ public interface UdfFuncMapper extends BaseMapper { */ List queryAuthedUdfFunc(@Param("userId") int userId); + /** + * list authorized UDF function + * @param userId userId + * @param udfIds UDF function id array + * @return UDF function list + */ + List listAuthorizedUdfFunc (@Param("userId") int userId,@Param("udfIds")T[] udfIds); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/permission/PermissionCheck.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/permission/PermissionCheck.java new file mode 100644 index 0000000000..63d4c1c8af --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/permission/PermissionCheck.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.dao.permission; + +import org.apache.dolphinscheduler.common.enums.AuthorizationType; +import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.dao.ProcessDao; +import org.apache.dolphinscheduler.dao.entity.User; +import org.slf4j.Logger; + +import java.util.List; + +public class PermissionCheck { + /** + * logger + */ + private Logger logger; + /** + * Authorization Type + */ + private AuthorizationType authorizationType; + + /** + * Authorization Type + */ + private ProcessDao processDao; + + /** + * need check array + */ + private T[] needChecks; + + /** + * user id + */ + private int userId; + + /** + * permission check + * @param authorizationType authorization type + * @param processDao process dao + */ + public PermissionCheck(AuthorizationType authorizationType, ProcessDao processDao) { + this.authorizationType = authorizationType; + this.processDao = processDao; + } + + /** + * permission check + * @param authorizationType + * @param processDao + * @param needChecks + * @param userId + */ + public PermissionCheck(AuthorizationType authorizationType, ProcessDao processDao, T[] needChecks, int userId) { + this.authorizationType = authorizationType; + this.processDao = processDao; + this.needChecks = needChecks; + this.userId = userId; + } + + /** + * permission check + * @param authorizationType + * @param processDao + * @param needChecks + * @param userId + * @param logger + */ + public PermissionCheck(AuthorizationType authorizationType, ProcessDao processDao, T[] needChecks, int userId,Logger logger) { + this.authorizationType = authorizationType; + this.processDao = processDao; + this.needChecks = needChecks; + this.userId = userId; + this.logger = logger; + } + + public AuthorizationType getAuthorizationType() { + return authorizationType; + } + + public void setAuthorizationType(AuthorizationType authorizationType) { + this.authorizationType = authorizationType; + } + + public ProcessDao getProcessDao() { + return processDao; + } + + public void setProcessDao(ProcessDao processDao) { + this.processDao = processDao; + } + + public T[] getNeedChecks() { + return needChecks; + } + + public void setNeedChecks(T[] needChecks) { + this.needChecks = needChecks; + } + + public int getUserId() { + return userId; + } + + public void setUserId(int userId) { + this.userId = userId; + } + + /** + * has permission + * @return true if has permission + */ + public boolean hasPermission(){ + try { + checkPermission(); + return true; + } catch (Exception e) { + return false; + } + } + + /** + * check permission + * @throws Exception exception + */ + public void checkPermission() throws Exception{ + if(this.needChecks.length > 0){ + // get user type in order to judge whether the user is admin + User user = processDao.getUserById(userId); + if (user.getUserType() != UserType.ADMIN_USER){ + List unauthorizedList = processDao.listUnauthorized(userId,needChecks,authorizationType); + // if exist unauthorized resource + if(CollectionUtils.isNotEmpty(unauthorizedList)){ + logger.error("user {} didn't has permission of {}: {}", user.getUserName(), authorizationType.getDescp(),unauthorizedList.toString()); + throw new RuntimeException(String.format("user %s didn't has permission of %s %s", user.getUserName(), authorizationType.getDescp(), unauthorizedList.get(0))); + } + } + } + } + +} diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml index b296d5fc3e..15536ae652 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml @@ -74,6 +74,19 @@ from t_ds_datasource where type = #{type} + \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml index 402c864251..ddae96a509 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml @@ -55,4 +55,9 @@ from t_ds_schedules where process_definition_id =#{processDefinitionId} + \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml index 8a041babf0..0aa10607c4 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml @@ -74,4 +74,17 @@ WHERE u.id = rel.udf_id AND rel.user_id = #{userId} + \ No newline at end of file diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapperTest.java index c826236239..92df6cc45c 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapperTest.java @@ -17,12 +17,14 @@ package org.apache.dolphinscheduler.dao.mapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.DatasourceUser; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import org.apache.dolphinscheduler.dao.entity.User; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -34,7 +36,9 @@ import org.springframework.transaction.annotation.Transactional; import java.util.*; -import static org.hamcrest.Matchers.*; +import static java.util.stream.Collectors.toList; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.*; /** @@ -58,6 +62,9 @@ public class DataSourceMapperTest { @Autowired DataSourceUserMapper dataSourceUserMapper; + @Autowired + private UserMapper userMapper; + /** * test insert */ @@ -244,6 +251,33 @@ public class DataSourceMapperTest { } } + @Test + public void testListAuthorizedDataSource(){ + //create general user + User generalUser1 = createGeneralUser("user1"); + User generalUser2 = createGeneralUser("user2"); + + //create data source + DataSource dataSource = createDataSource(generalUser1.getId(), "ds-1"); + DataSource unauthorizdDataSource = createDataSource(generalUser2.getId(), "ds-2"); + + + //data source ids + Integer[] dataSourceIds = new Integer[]{dataSource.getId(),unauthorizdDataSource.getId()}; + + List authorizedDataSource = dataSourceMapper.listAuthorizedDataSource(generalUser1.getId(), dataSourceIds); + + Assert.assertEquals(generalUser1.getId(),dataSource.getUserId()); + Assert.assertNotEquals(generalUser1.getId(),unauthorizdDataSource.getUserId()); + Assert.assertFalse(authorizedDataSource.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(dataSourceIds))); + + //authorize object unauthorizdDataSource to generalUser1 + createUserDataSource(generalUser1, unauthorizdDataSource); + authorizedDataSource = dataSourceMapper.listAuthorizedDataSource(generalUser1.getId(), dataSourceIds); + + Assert.assertTrue(authorizedDataSource.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(dataSourceIds))); + } + /** * create datasource relation * @param userId @@ -289,7 +323,6 @@ public class DataSourceMapperTest { return dataSourceMap; } - /** * create datasource * @return datasource @@ -330,5 +363,41 @@ public class DataSourceMapperTest { return dataSource; } + /** + * create general user + * @return User + */ + private User createGeneralUser(String userName){ + User user = new User(); + user.setUserName(userName); + user.setUserPassword("1"); + user.setEmail("xx@123.com"); + user.setUserType(UserType.GENERAL_USER); + user.setCreateTime(new Date()); + user.setTenantId(1); + user.setUpdateTime(new Date()); + userMapper.insert(user); + return user; + } + + /** + * create the relation of user and data source + * @param user user + * @param dataSource data source + * @return DatasourceUser + */ + private DatasourceUser createUserDataSource(User user,DataSource dataSource){ + DatasourceUser datasourceUser = new DatasourceUser(); + + datasourceUser.setDatasourceId(dataSource.getId()); + datasourceUser.setUserId(user.getId()); + datasourceUser.setPerm(7); + datasourceUser.setCreateTime(DateUtils.getCurrentDate()); + datasourceUser.setUpdateTime(DateUtils.getCurrentDate()); + + dataSourceUserMapper.insert(datasourceUser); + return datasourceUser; + } + } \ No newline at end of file diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java index 7c0101612c..aaf5129c02 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java @@ -17,22 +17,36 @@ package org.apache.dolphinscheduler.dao.mapper; -import org.apache.dolphinscheduler.common.enums.ResourceType; -import org.apache.dolphinscheduler.dao.entity.*; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.dao.entity.Resource; +import org.apache.dolphinscheduler.dao.entity.ResourcesUser; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.User; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.transaction.annotation.Transactional; +import java.util.Arrays; import java.util.Date; import java.util.List; +import static java.util.stream.Collectors.toList; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; + @RunWith(SpringRunner.class) @SpringBootTest +@Transactional +@Rollback(true) public class ResourceMapperTest { @Autowired @@ -61,6 +75,59 @@ public class ResourceMapperTest { return resource; } + /** + * create resource by user + * @param user user + * @return Resource + */ + private Resource createResource(User user){ + //insertOne + Resource resource = new Resource(); + resource.setAlias(String.format("ut resource %s",user.getUserName())); + resource.setType(ResourceType.FILE); + resource.setUserId(user.getId()); + resourceMapper.insert(resource); + return resource; + } + + /** + * create user + * @return User + */ + private User createGeneralUser(String userName){ + User user = new User(); + user.setUserName(userName); + user.setUserPassword("1"); + user.setEmail("xx@123.com"); + user.setUserType(UserType.GENERAL_USER); + user.setCreateTime(new Date()); + user.setTenantId(1); + user.setUpdateTime(new Date()); + userMapper.insert(user); + return user; + } + + /** + * create resource user + * @return ResourcesUser + */ + private ResourcesUser createResourcesUser(Resource resource,User user){ + //insertOne + ResourcesUser resourcesUser = new ResourcesUser(); + resourcesUser.setCreateTime(new Date()); + resourcesUser.setUpdateTime(new Date()); + resourcesUser.setUserId(user.getId()); + resourcesUser.setResourcesId(resource.getId()); + resourceUserMapper.insert(resourcesUser); + return resourcesUser; + } + + @Test + public void testInsert(){ + Resource resource = insertOne(); + assertNotNull(resource.getId()); + assertThat(resource.getId(),greaterThan(0)); + } /** * test update */ @@ -230,4 +297,30 @@ public class ResourceMapperTest { resourceMapper.deleteById(resource.getId()); } + + @Test + public void testListAuthorizedResource(){ + // create a general user + User generalUser1 = createGeneralUser("user1"); + User generalUser2 = createGeneralUser("user2"); + // create one resource + Resource resource = createResource(generalUser2); + Resource unauthorizedResource = createResource(generalUser2); + + // need download resources + String[] resNames = new String[]{resource.getAlias(), unauthorizedResource.getAlias()}; + + List resources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames); + + Assert.assertEquals(generalUser2.getId(),resource.getUserId()); + Assert.assertFalse(resources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames))); + + + + // authorize object unauthorizedResource to generalUser + createResourcesUser(unauthorizedResource,generalUser2); + List authorizedResources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames); + Assert.assertTrue(authorizedResources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames))); + + } } \ No newline at end of file diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java index d608c841c7..0dd06484d8 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java @@ -29,13 +29,20 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.transaction.annotation.Transactional; +import java.util.Arrays; import java.util.Date; import java.util.List; +import static java.util.stream.Collectors.toList; + @RunWith(SpringRunner.class) @SpringBootTest +@Transactional +@Rollback(true) public class UdfFuncMapperTest { @Autowired @@ -133,6 +140,23 @@ public class UdfFuncMapperTest { return udfUser; } + /** + * create general user + * @return User + */ + private User createGeneralUser(String userName){ + User user = new User(); + user.setUserName(userName); + user.setUserPassword("1"); + user.setEmail("xx@123.com"); + user.setUserType(UserType.GENERAL_USER); + user.setCreateTime(new Date()); + user.setTenantId(1); + user.setUpdateTime(new Date()); + userMapper.insert(user); + return user; + } + /** * test update */ @@ -268,4 +292,30 @@ public class UdfFuncMapperTest { udfUserMapper.deleteById(udfUser.getId()); Assert.assertNotEquals(udfFuncList.size(), 0); } + + @Test + public void testListAuthorizedUdfFunc(){ + //create general user + User generalUser1 = createGeneralUser("user1"); + User generalUser2 = createGeneralUser("user2"); + + //create udf function + UdfFunc udfFunc = insertOne(generalUser1); + UdfFunc unauthorizdUdfFunc = insertOne(generalUser2); + + //udf function ids + Integer[] udfFuncIds = new Integer[]{udfFunc.getId(),unauthorizdUdfFunc.getId()}; + + List authorizedUdfFunc = udfFuncMapper.listAuthorizedUdfFunc(generalUser1.getId(), udfFuncIds); + + Assert.assertEquals(generalUser1.getId(),udfFunc.getUserId()); + Assert.assertNotEquals(generalUser1.getId(),unauthorizdUdfFunc.getUserId()); + Assert.assertFalse(authorizedUdfFunc.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(udfFuncIds))); + + + //authorize object unauthorizdUdfFunc to generalUser1 + insertOneUDFUser(generalUser1,unauthorizdUdfFunc); + authorizedUdfFunc = udfFuncMapper.listAuthorizedUdfFunc(generalUser1.getId(), udfFuncIds); + Assert.assertTrue(authorizedUdfFunc.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(udfFuncIds))); + } } \ No newline at end of file diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml index 2ccc880a41..751fd919a8 100644 --- a/dolphinscheduler-server/pom.xml +++ b/dolphinscheduler-server/pom.xml @@ -111,6 +111,27 @@ dolphinscheduler-alert + + org.powermock + powermock-module-junit4 + test + + + org.powermock + powermock-api-mockito2 + test + + + org.mockito + mockito-core + + + + + org.mockito + mockito-core + test + diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index a91f8c17e6..6c147e2628 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner; import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Lists; import org.apache.commons.io.FileUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; @@ -29,10 +30,12 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.AlertManager; +import org.apache.dolphinscheduler.server.utils.ScheduleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -203,10 +206,30 @@ public class MasterExecThread implements Runnable { Date startDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); Date endDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); processDao.saveProcessInstance(processInstance); - Date scheduleDate = processInstance.getScheduleTime(); - if(scheduleDate == null){ - scheduleDate = startDate; + // get schedules + int processDefinitionId = processInstance.getProcessDefinitionId(); + List schedules = processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId); + List listDate = Lists.newLinkedList(); + if(!CollectionUtils.isEmpty(schedules)){ + for (Schedule schedule : schedules) { + List list = ScheduleUtils.getRecentTriggerTime(schedule.getCrontab(), startDate, endDate); + listDate.addAll(list); + } + } + // get first fire date + Iterator iterator = null; + Date scheduleDate = null; + if(!CollectionUtils.isEmpty(listDate)) { + iterator = listDate.iterator(); + scheduleDate = iterator.next(); + processInstance.setScheduleTime(scheduleDate); + processDao.updateProcessInstance(processInstance); + }else{ + scheduleDate = processInstance.getScheduleTime(); + if(scheduleDate == null){ + scheduleDate = startDate; + } } while(Stopper.isRunning()){ @@ -232,11 +255,22 @@ public class MasterExecThread implements Runnable { } // current process instance sucess ,next execute - scheduleDate = DateUtils.getSomeDay(scheduleDate, 1); - if(scheduleDate.after(endDate)){ - // all success - logger.info("process {} complement completely!", processInstance.getId()); - break; + if(null == iterator){ + // loop by day + scheduleDate = DateUtils.getSomeDay(scheduleDate, 1); + if(scheduleDate.after(endDate)){ + // all success + logger.info("process {} complement completely!", processInstance.getId()); + break; + } + }else{ + // loop by schedule date + if(!iterator.hasNext()){ + // all success + logger.info("process {} complement completely!", processInstance.getId()); + break; + } + scheduleDate = iterator.next(); } logger.info("process {} start to complement {} data", diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ScheduleUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ScheduleUtils.java new file mode 100644 index 0000000000..11730b9545 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ScheduleUtils.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.utils; + +import org.quartz.impl.triggers.CronTriggerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; + +/** + * ScheduleUtils + */ +public class ScheduleUtils { + + private static final Logger logger = LoggerFactory.getLogger(ScheduleUtils.class); + + /** + * Get the execution time of the time interval + * @param cron + * @param from + * @param to + * @return + */ + public static List getRecentTriggerTime(String cron, Date from, Date to) { + return getRecentTriggerTime(cron, Integer.MAX_VALUE, from, to); + } + + /** + * Get the execution time of the time interval + * @param cron + * @param size + * @param from + * @param to + * @return + */ + public static List getRecentTriggerTime(String cron, int size, Date from, Date to) { + List list = new LinkedList(); + if(to.before(from)){ + logger.error("schedule date from:{} must before date to:{}!", from, to); + return list; + } + try { + CronTriggerImpl trigger = new CronTriggerImpl(); + trigger.setCronExpression(cron); + trigger.setStartTime(from); + trigger.setEndTime(to); + trigger.computeFirstFireTime(null); + for (int i = 0; i < size; i++) { + Date schedule = trigger.getNextFireTime(); + if(null == schedule){ + break; + } + list.add(schedule); + trigger.triggered(null); + } + } catch (ParseException e) { + logger.error("cron:{} error:{}", cron, e.getMessage()); + } + return java.util.Collections.unmodifiableList(list); + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index bb7a773d48..5f66c3477d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -21,6 +21,7 @@ import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.sift.SiftingAppender; import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.permission.PermissionCheck; import org.apache.dolphinscheduler.server.utils.LoggerUtils; import org.apache.dolphinscheduler.server.worker.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; @@ -42,7 +44,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.IOException; import java.util.*; import java.util.stream.Collectors; @@ -94,12 +95,15 @@ public class TaskScheduleThread implements Runnable { // task node TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class); + // get resource files + List resourceFiles = createProjectResFiles(taskNode); // copy hdfs/minio file to local - copyHdfsToLocal(processDao, + downloadResource( taskInstance.getExecutePath(), - createProjectResFiles(taskNode), + resourceFiles, logger); + // get process instance according to tak instance ProcessInstance processInstance = taskInstance.getProcessInstance(); @@ -204,8 +208,8 @@ public class TaskScheduleThread implements Runnable { } /** - * get task log path - * @return + * get task log path + * @return log path */ private String getTaskLogPath() { String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory()) @@ -294,14 +298,14 @@ public class TaskScheduleThread implements Runnable { } /** - * copy hdfs file to local + * download resource file * - * @param processDao * @param execLocalPath * @param projectRes * @param logger */ - private void copyHdfsToLocal(ProcessDao processDao, String execLocalPath, List projectRes, Logger logger) throws IOException { + private void downloadResource(String execLocalPath, List projectRes, Logger logger) throws Exception { + checkDownloadPermission(projectRes); for (String res : projectRes) { File resFile = new File(execLocalPath, res); if (!resFile.exists()) { @@ -321,4 +325,16 @@ public class TaskScheduleThread implements Runnable { } } } + + /** + * check download resource permission + * @param projectRes resource name list + * @throws Exception exception + */ + private void checkDownloadPermission(List projectRes) throws Exception { + int userId = taskInstance.getProcessInstance().getExecutorId(); + String[] resNames = projectRes.toArray(new String[projectRes.size()]); + PermissionCheck permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE,processDao,resNames,userId,logger); + permissionCheck.checkPermission(); + } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 7a14223b95..11934dea4c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -207,7 +207,7 @@ public abstract class AbstractCommandExecutor { // merge error information to standard output stream processBuilder.redirectErrorStream(true); // setting up user to run commands - processBuilder.command("sudo", "-u", tenantCode, commandType(), commandFile); + processBuilder.command("sudo", "-u", tenantCode, commandInterpreter(), commandFile); process = processBuilder.start(); @@ -561,7 +561,7 @@ public abstract class AbstractCommandExecutor { protected abstract String buildCommandFilePath(); - protected abstract String commandType(); + protected abstract String commandInterpreter(); protected abstract boolean checkFindApp(String line); protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java index c1ff89d4cf..c943e5d9d7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java @@ -113,7 +113,7 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { * @return python home */ @Override - protected String commandType() { + protected String commandInterpreter() { String pythonHome = getPythonHome(envFile); if (StringUtils.isEmpty(pythonHome)){ return PYTHON; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java index d1a7fa2258..db46d0d856 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java @@ -74,7 +74,7 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { * @return command type */ @Override - protected String commandType() { + protected String commandInterpreter() { return SH; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index bacb3f22eb..e8a97fecc5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -19,8 +19,11 @@ package org.apache.dolphinscheduler.server.worker.task.sql; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.alert.utils.MailUtils; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.ShowType; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.enums.UdfType; @@ -38,6 +41,7 @@ import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.permission.PermissionCheck; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.UDFUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; @@ -119,13 +123,6 @@ public class SqlTask extends AbstractTask { } dataSource= processDao.findDataSourceById(sqlParameters.getDatasource()); - - if (null == dataSource){ - logger.error("datasource not exists"); - exitStatusCode = -1; - return; - } - logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}", dataSource.getName(), dataSource.getType(), @@ -133,6 +130,12 @@ public class SqlTask extends AbstractTask { dataSource.getUserId(), dataSource.getConnectionParams()); + if (dataSource == null){ + logger.error("datasource not exists"); + exitStatusCode = -1; + return; + } + Connection con = null; List createFuncs = null; try { @@ -164,6 +167,8 @@ public class SqlTask extends AbstractTask { for(int i=0;i udfFuncList = processDao.queryUdfFunListByids(idsArray); createFuncs = UDFUtils.createFuncs(udfFuncList, taskProps.getTenantCode(), logger); } @@ -449,4 +454,33 @@ public class SqlTask extends AbstractTask { } logger.info(logPrint.toString()); } + + /** + * check udf function permission + * @param udfFunIds udf functions + * @return if has download permission return true else false + */ + private void checkUdfPermission(Integer[] udfFunIds) throws Exception{ + // process instance + ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); + int userId = processInstance.getExecutorId(); + + PermissionCheck permissionCheckUdf = new PermissionCheck(AuthorizationType.UDF,processDao,udfFunIds,userId,logger); + permissionCheckUdf.checkPermission(); + } + + /** + * check data source permission + * @param dataSourceId data source id + * @return if has download permission return true else false + */ + private void checkDataSourcePermission(int dataSourceId) throws Exception{ + // process instance + ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); + int userId = processInstance.getExecutorId(); + + PermissionCheck permissionCheckDataSource = new PermissionCheck(AuthorizationType.DATASOURCE,processDao,new Integer[]{dataSourceId},userId,logger); + permissionCheckDataSource.checkPermission(); + } + } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java new file mode 100644 index 0000000000..6f31e66213 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.master; + +import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.graph.DAG; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.SpringApplicationContext; +import org.apache.dolphinscheduler.dao.ProcessDao; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.runner.MasterExecThread; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.springframework.context.ApplicationContext; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.text.ParseException; +import java.util.*; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.mock; + +/** + * test for MasterExecThread + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({MasterExecThread.class}) +public class MasterExecThreadTest { + + private MasterExecThread masterExecThread; + + private ProcessInstance processInstance; + + private ProcessDao processDao; + + private int processDefinitionId = 1; + + private MasterConfig config; + + private ApplicationContext applicationContext; + + @Before + public void init() throws Exception{ + processDao = mock(ProcessDao.class); + + applicationContext = mock(ApplicationContext.class); + config = new MasterConfig(); + config.setMasterExecTaskNum(1); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); + + processInstance = mock(ProcessInstance.class); + Mockito.when(processInstance.getProcessDefinitionId()).thenReturn(processDefinitionId); + Mockito.when(processInstance.getState()).thenReturn(ExecutionStatus.SUCCESS); + Mockito.when(processInstance.getHistoryCmd()).thenReturn(CommandType.COMPLEMENT_DATA.toString()); + Mockito.when(processInstance.getIsSubProcess()).thenReturn(Flag.NO); + Mockito.when(processInstance.getScheduleTime()).thenReturn(DateUtils.stringToDate("2020-01-01 00:00:00")); + Map cmdParam = new HashMap<>(); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, "2020-01-01 00:00:00"); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, "2020-01-31 23:00:00"); + Mockito.when(processInstance.getCommandParam()).thenReturn(JSONObject.toJSONString(cmdParam)); + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setGlobalParamMap(Collections.EMPTY_MAP); + processDefinition.setGlobalParamList(Collections.EMPTY_LIST); + Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); + + masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processDao)); + // prepareProcess init dag + Field dag = MasterExecThread.class.getDeclaredField("dag"); + dag.setAccessible(true); + dag.set(masterExecThread, new DAG()); + PowerMockito.doNothing().when(masterExecThread, "executeProcess"); + PowerMockito.doNothing().when(masterExecThread, "postHandle"); + PowerMockito.doNothing().when(masterExecThread, "prepareProcess"); + PowerMockito.doNothing().when(masterExecThread, "runProcess"); + PowerMockito.doNothing().when(masterExecThread, "endProcess"); + } + + /** + * without schedule + * @throws ParseException + */ + @Test + public void testParallelWithOutSchedule() throws ParseException { + try{ + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess"); + method.setAccessible(true); + method.invoke(masterExecThread); + // one create save, and 1-30 for next save, and last day 31 no save + verify(processDao, times(31)).saveProcessInstance(processInstance); + }catch (Exception e){ + e.printStackTrace(); + Assert.assertTrue(false); + } + } + + /** + * with schedule + * @throws ParseException + */ + @Test + public void testParallelWithSchedule() throws ParseException { + try{ + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList()); + Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess"); + method.setAccessible(true); + method.invoke(masterExecThread); + // one create save, and 15(1 to 31 step 2) for next save, and last day 31 no save + verify(processDao, times(16)).saveProcessInstance(processInstance); + }catch (Exception e){ + Assert.assertTrue(false); + } + } + + private List zeroSchedulerList(){ + return Collections.EMPTY_LIST; + } + + private List oneSchedulerList(){ + List schedulerList = new LinkedList<>(); + Schedule schedule = new Schedule(); + schedule.setCrontab("0 0 0 1/2 * ?"); + schedulerList.add(schedule); + return schedulerList; + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ScheduleUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ScheduleUtilsTest.java new file mode 100644 index 0000000000..4fbbdab70f --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ScheduleUtilsTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.utils; + +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.junit.Test; +import java.util.Date; +import static org.junit.Assert.assertEquals; + +/** + * Test ScheduleUtils + */ +public class ScheduleUtilsTest { + + /** + * Test the getRecentTriggerTime method + */ + @Test + public void testGetRecentTriggerTime() { + Date from = DateUtils.stringToDate("2020-01-01 00:00:00"); + Date to = DateUtils.stringToDate("2020-01-31 01:00:00"); + // test date + assertEquals(0, ScheduleUtils.getRecentTriggerTime("0 0 0 * * ? ", to, from).size()); + // test error cron + assertEquals(0, ScheduleUtils.getRecentTriggerTime("0 0 0 * *", from, to).size()); + // test cron + assertEquals(31, ScheduleUtils.getRecentTriggerTime("0 0 0 * * ? ", from, to).size()); + } +} \ No newline at end of file diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss index a63157d809..fbb4f418d0 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss @@ -159,6 +159,9 @@ border-radius: 0 3px 0 0; .ans-btn-text { color: #337ab7; + .ans-icon { + font-size: 16px; + } } .assist-btn { position: absolute; @@ -206,7 +209,7 @@ color: #333; } &.active { - background: #e1e2e3; + // background: #e1e2e3; i { color: #2d8cf0; } @@ -234,7 +237,9 @@ border-radius: 3px 3px 0px 0px; } } - +#screen { + margin-right: 5px; +} .v-modal-custom-log { z-index: 101; } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue index feb756dabe..40b6d85198 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue @@ -61,20 +61,28 @@
-
+ - + + {}) - // Round robin acquisition status - this.setIntervalP = setInterval(() => { - this._getTaskState(true).then(res => {}) - }, 90000) } } else { Dag.create() diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/list.vue index ef6f83912f..a496fe8991 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/list.vue @@ -64,7 +64,7 @@ - {{item.note}} + {{item.note}} - diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue index 7a9a165873..5550864176 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue @@ -69,7 +69,7 @@ - - {{item.description}} + {{item.description}} - diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue index 8672c41295..0a22e22535 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue @@ -141,7 +141,7 @@
- {{$t('Date')}} + {{$t('Schedule date')}}
{{item.instRunningCount}} - {{item.description}} + {{item.description}} - diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue index 513d0d1321..c50d392bdb 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue @@ -52,7 +52,7 @@ {{item.fileName}} - {{item.description}} + {{item.description}} - diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/list.vue index febb215741..b0c9b5ac7b 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/list.vue @@ -67,7 +67,7 @@ v-ps