diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java index a41498e258..58a37c2f41 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java @@ -56,7 +56,7 @@ public class AlertServer { logger.info("alert server ready start "); while (Stopper.isRunning()){ try { - Thread.sleep(Constants.ALERT_SCAN_INTERVEL); + Thread.sleep(Constants.ALERT_SCAN_INTERVAL); } catch (InterruptedException e) { logger.error(e.getMessage(),e); Thread.currentThread().interrupt(); diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/manager/EnterpriseWeChatManager.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/manager/EnterpriseWeChatManager.java index 510d73b9f7..9bcad56c24 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/manager/EnterpriseWeChatManager.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/manager/EnterpriseWeChatManager.java @@ -32,7 +32,7 @@ import java.util.Map; * Enterprise WeChat Manager */ public class EnterpriseWeChatManager { - private static final Logger logger = LoggerFactory.getLogger(MsgManager.class); + private static final Logger logger = LoggerFactory.getLogger(EnterpriseWeChatManager.class); /** * Enterprise We Chat send * @param alert the alert diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java index 718ef50948..5feb36b60f 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java @@ -65,7 +65,7 @@ public class AlertSender{ users = alertDao.listUserByAlertgroupId(alert.getAlertGroupId()); // receiving group list - List receviersList = new ArrayList(); + List receviersList = new ArrayList<>(); for(User user:users){ receviersList.add(user.getEmail()); } @@ -77,7 +77,7 @@ public class AlertSender{ } // copy list - List receviersCcList = new ArrayList(); + List receviersCcList = new ArrayList<>(); // Custom Copier @@ -101,6 +101,11 @@ public class AlertSender{ }else if (alert.getAlertType() == AlertType.SMS){ retMaps = emailManager.send(getReciversForSMS(users), alert.getTitle(), alert.getContent(),alert.getShowType()); alert.setInfo(retMaps); + } else { + logger.error("AlertType is not defined. code: {}, descp: {}", + alert.getAlertType().getCode(), + alert.getAlertType().getDescp()); + return; } //send flag diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/template/AlertTemplate.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/template/AlertTemplate.java index cc74ff71ee..81b5e65f27 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/template/AlertTemplate.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/template/AlertTemplate.java @@ -34,6 +34,9 @@ public interface AlertTemplate { /** * default showAll is true + * @param content alert message content + * @param showType show type + * @return a message from a specified alert template */ default String getMessageFromTemplate(String content,ShowType showType){ return getMessageFromTemplate(content,showType,true); diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java index f96873bdde..55c0f9ffbf 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java @@ -133,7 +133,7 @@ public class Constants { public static final String TH_END = ""; - public static final int ALERT_SCAN_INTERVEL = 5000; + public static final int ALERT_SCAN_INTERVAL = 5000; public static final String MARKDOWN_QUOTE = ">"; diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtils.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtils.java index a4c3720581..ff8822421a 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtils.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtils.java @@ -201,22 +201,22 @@ public class EnterpriseWeChatUtils { public static String markdownTable(String title,String content){ List mapItemsList = JSONUtils.toList(content, LinkedHashMap.class); StringBuilder contents = new StringBuilder(200); - for (LinkedHashMap mapItems : mapItemsList){ - Set> entries = mapItems.entrySet(); + if (null != mapItemsList) { + for (LinkedHashMap mapItems : mapItemsList){ + Set> entries = mapItems.entrySet(); + Iterator> iterator = entries.iterator(); + StringBuilder t = new StringBuilder(String.format("`%s`%s",title,Constants.MARKDOWN_ENTER)); - Iterator> iterator = entries.iterator(); + while (iterator.hasNext()){ - StringBuilder t = new StringBuilder(String.format("`%s`%s",title,Constants.MARKDOWN_ENTER)); - while (iterator.hasNext()){ - - Map.Entry entry = iterator.next(); - t.append(Constants.MARKDOWN_QUOTE); - t.append(entry.getKey()).append(":").append(entry.getValue()); - t.append(Constants.MARKDOWN_ENTER); + Map.Entry entry = iterator.next(); + t.append(Constants.MARKDOWN_QUOTE); + t.append(entry.getKey()).append(":").append(entry.getValue()); + t.append(Constants.MARKDOWN_ENTER); + } + contents.append(t); } - - contents.append(t); } return contents.toString(); } diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java index e911817354..7ebe6a7863 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java @@ -96,7 +96,7 @@ public class MailUtils { return retMap; } - receivers.removeIf((from) -> (StringUtils.isEmpty(from))); + receivers.removeIf(StringUtils::isEmpty); if (showType == ShowType.TABLE || showType == ShowType.TEXT){ // send email @@ -185,7 +185,7 @@ public class MailUtils { /** * get MimeMessage - * @param receivers + * @param receivers receivers * @return the MimeMessage * @throws MessagingException */ @@ -229,8 +229,7 @@ public class MailUtils { } }; - Session session = Session.getInstance(props, auth); - return session; + return Session.getInstance(props, auth); } /** diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java index 4367fbb4a8..c2f479d101 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java @@ -205,8 +205,7 @@ public class PropertyUtils { return null; } try { - String[] propertyArray = value.split(splitStr); - return propertyArray; + return value.split(splitStr); } catch (PatternSyntaxException e) { logger.info(e.getMessage(),e); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 576f0c3eba..8fa4c013c7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -29,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.server.utils.ScheduleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -497,31 +499,58 @@ public class ExecutorService extends BaseService{ } } + if ( start == null || end == null) { + return 0; + } + if(commandType == CommandType.COMPLEMENT_DATA){ runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode; - if(runMode == RunMode.RUN_MODE_SERIAL){ - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end)); - command.setCommandParam(JSONUtils.toJson(cmdParam)); - return processDao.createCommand(command); - }else if (runMode == RunMode.RUN_MODE_PARALLEL){ - int runCunt = 0; - while(!start.after(end)){ - runCunt += 1; + if(null != start && null != end && start.before(end)){ + if(runMode == RunMode.RUN_MODE_SERIAL){ cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); - cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start)); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end)); command.setCommandParam(JSONUtils.toJson(cmdParam)); - processDao.createCommand(command); - start = DateUtils.getSomeDay(start, 1); + return processDao.createCommand(command); + }else if (runMode == RunMode.RUN_MODE_PARALLEL){ + List schedules = processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefineId); + List listDate = new LinkedList<>(); + if(!CollectionUtils.isEmpty(schedules)){ + for (Schedule item : schedules) { + List list = ScheduleUtils.getRecentTriggerTime(item.getCrontab(), start, end); + listDate.addAll(list); + } + } + if(!CollectionUtils.isEmpty(listDate)){ + // loop by schedule date + for (Date date : listDate) { + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(date)); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(date)); + command.setCommandParam(JSONUtils.toJson(cmdParam)); + processDao.createCommand(command); + } + return listDate.size(); + }else{ + // loop by day + int runCunt = 0; + while(!start.after(end)) { + runCunt += 1; + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start)); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start)); + command.setCommandParam(JSONUtils.toJson(cmdParam)); + processDao.createCommand(command); + start = DateUtils.getSomeDay(start, 1); + } + return runCunt; + } } - return runCunt; + }else{ + logger.error("there is not vaild schedule date for the process definition: id:{},date:{}", + processDefineId, schedule); } }else{ command.setCommandParam(JSONUtils.toJson(cmdParam)); return processDao.createCommand(command); - } - - return 0; + } } /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SessionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SessionService.java index f3f66d730d..2588dd0e65 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SessionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SessionService.java @@ -139,12 +139,16 @@ public class SessionService extends BaseService{ * @param loginUser login user */ public void signOut(String ip, User loginUser) { - /** - * query session by user id and ip - */ - Session session = sessionMapper.queryByUserIdAndIp(loginUser.getId(),ip); + try { + /** + * query session by user id and ip + */ + Session session = sessionMapper.queryByUserIdAndIp(loginUser.getId(),ip); - //delete session - sessionMapper.deleteById(session.getId()); + //delete session + sessionMapper.deleteById(session.getId()); + }catch (Exception e){ + logger.warn("userId : {} , ip : {} , find more one session",loginUser.getId(),ip); + } } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FileUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FileUtils.java index f88d26164b..355f6c954e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FileUtils.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FileUtils.java @@ -57,7 +57,7 @@ public class FileUtils { Files.copy(file.getInputStream(), Paths.get(destFilename)); } catch (IOException e) { - logger.error(String.format("failed to copy file , {} is empty file", file.getOriginalFilename()), e); + logger.error("failed to copy file , {} is empty file", file.getOriginalFilename(), e); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FourLetterWordMain.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FourLetterWordMain.java index d41830eff5..b04e773aea 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FourLetterWordMain.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FourLetterWordMain.java @@ -59,23 +59,22 @@ public class FourLetterWordMain { */ public static String send4LetterWord(String host, int port, String cmd, int timeout) throws IOException { - LOG.info("connecting to " + host + " " + port); - Socket sock = new Socket(); + LOG.info("connecting to {} {}", host, port); InetSocketAddress hostaddress= host != null ? new InetSocketAddress(host, port) : new InetSocketAddress(InetAddress.getByName(null), port); - BufferedReader reader = null; - try { + + try (Socket sock = new Socket(); + OutputStream outstream = sock.getOutputStream(); + BufferedReader reader = + new BufferedReader( + new InputStreamReader(sock.getInputStream()))) { sock.setSoTimeout(timeout); sock.connect(hostaddress, timeout); - OutputStream outstream = sock.getOutputStream(); outstream.write(cmd.getBytes()); outstream.flush(); // this replicates NC - close the output stream before reading sock.shutdownOutput(); - reader = - new BufferedReader( - new InputStreamReader(sock.getInputStream())); StringBuilder sb = new StringBuilder(); String line; while((line = reader.readLine()) != null) { @@ -84,11 +83,6 @@ public class FourLetterWordMain { return sb.toString(); } catch (SocketTimeoutException e) { throw new IOException("Exception while executing four letter word: " + cmd, e); - } finally { - sock.close(); - if (reader != null) { - reader.close(); - } } } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseDAGServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseDAGServiceTest.java new file mode 100644 index 0000000000..bb6e3882fe --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseDAGServiceTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.service; + +import org.apache.dolphinscheduler.common.graph.DAG; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.model.TaskNodeRelation; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class BaseDAGServiceTest { + + @Test + public void testProcessInstance2DAG(){ + + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setProcessInstanceJson("{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-61567\"," + + "\"name\":\"开始\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo '1'\"}," + + "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," + + "\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," + + "\"workerGroupId\":-1,\"preTasks\":[]},{\"type\":\"SHELL\",\"id\":\"tasks-6-3ug5ej\",\"name\":\"结束\"," + + "\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo '1'\"},\"description\":\"\"," + + "\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," + + "\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," + + "\"workerGroupId\":-1,\"preTasks\":[\"开始\"]}],\"tenantId\":-1,\"timeout\":0}"); + + DAG relationDAG = BaseDAGService.processInstance2DAG(processInstance); + + Assert.assertTrue(relationDAG.containsNode("开始")); + + } +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseServiceTest.java new file mode 100644 index 0000000000..02086a8259 --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseServiceTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.service; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.common.utils.HadoopUtils; +import org.apache.dolphinscheduler.dao.entity.User; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.mock.web.MockCookie; +import org.springframework.mock.web.MockHttpServletRequest; + +import javax.servlet.http.Cookie; +import java.util.HashMap; +import java.util.Map; + +@RunWith(PowerMockRunner.class) +@PowerMockIgnore({"sun.security.*", "javax.net.*"}) +@PrepareForTest({HadoopUtils.class}) +public class BaseServiceTest { + + private static final Logger logger = LoggerFactory.getLogger(BaseServiceTest.class); + + private BaseService baseService; + + @Mock + private HadoopUtils hadoopUtils; + + @Before + public void setUp() { + baseService = new BaseService(); + } + + @Test + public void testIsAdmin(){ + + User user = new User(); + user.setUserType(UserType.ADMIN_USER); + //ADMIN_USER + boolean isAdmin = baseService.isAdmin(user); + Assert.assertTrue(isAdmin); + //GENERAL_USER + user.setUserType(UserType.GENERAL_USER); + isAdmin = baseService.isAdmin(user); + Assert.assertFalse(isAdmin); + + } + + @Test + public void testPutMsg(){ + + Map result = new HashMap<>(); + baseService.putMsg(result, Status.SUCCESS); + Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS)); + //has params + baseService.putMsg(result, Status.PROJECT_NOT_FOUNT,"test"); + + } + @Test + public void testPutMsgTwo(){ + + Result result = new Result(); + baseService.putMsg(result, Status.SUCCESS); + Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg()); + //has params + baseService.putMsg(result,Status.PROJECT_NOT_FOUNT,"test"); + } + @Test + public void testGetCookie(){ + + MockHttpServletRequest request = new MockHttpServletRequest(); + MockCookie mockCookie = new MockCookie("userId","1"); + request.setCookies(mockCookie); + //cookie is not null + Cookie cookie = BaseService.getCookie(request,"userId"); + Assert.assertNotNull(cookie); + //cookie is null + cookie = BaseService.getCookie(request,"userName"); + Assert.assertNull(cookie); + + } + @Test + public void testCreateTenantDirIfNotExists(){ + + PowerMockito.mockStatic(HadoopUtils.class); + PowerMockito.when(HadoopUtils.getInstance()).thenReturn(hadoopUtils); + + try { + baseService.createTenantDirIfNotExists("test"); + } catch (Exception e) { + Assert.assertTrue(false); + logger.error("CreateTenantDirIfNotExists error ",e); + e.printStackTrace(); + } + + } + @Test + public void testHasPerm(){ + + User user = new User(); + user.setId(1); + //create user + boolean hasPerm = baseService.hasPerm(user,1); + Assert.assertTrue(hasPerm); + + //admin + user.setId(2); + user.setUserType(UserType.ADMIN_USER); + hasPerm = baseService.hasPerm(user,1); + Assert.assertTrue(hasPerm); + + } + +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java new file mode 100644 index 0000000000..b4f3e7e31f --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.service; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.RunMode; +import org.apache.dolphinscheduler.dao.ProcessDao; +import org.apache.dolphinscheduler.dao.entity.*; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.text.ParseException; +import java.util.*; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; + +/** + * test for ExecutorService + */ +@RunWith(MockitoJUnitRunner.Silent.class) +public class ExecutorService2Test { + + @InjectMocks + private ExecutorService executorService; + + @Mock + private ProcessDao processDao; + + @Mock + private ProcessDefinitionMapper processDefinitionMapper; + + @Mock + private ProjectMapper projectMapper; + + @Mock + private ProjectService projectService; + + private int processDefinitionId = 1; + + private int tenantId = 1; + + private int userId = 1; + + private ProcessDefinition processDefinition = new ProcessDefinition(); + + private User loginUser = new User(); + + private String projectName = "projectName"; + + private Project project = new Project(); + + private String cronTime; + + @Before + public void init(){ + // user + loginUser.setId(userId); + + // processDefinition + processDefinition.setId(processDefinitionId); + processDefinition.setReleaseState(ReleaseState.ONLINE); + processDefinition.setTenantId(tenantId); + processDefinition.setUserId(userId); + + // project + project.setName(projectName); + + // cronRangeTime + cronTime = "2020-01-01 00:00:00,2020-01-31 23:00:00"; + + // mock + Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project); + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(checkProjectAndAuth()); + Mockito.when(processDefinitionMapper.selectById(processDefinitionId)).thenReturn(processDefinition); + Mockito.when(processDao.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant()); + Mockito.when(processDao.createCommand(any(Command.class))).thenReturn(1); + } + + /** + * not complement + * @throws ParseException + */ + @Test + public void testNoComplement() throws ParseException { + try { + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectName, + processDefinitionId, cronTime, CommandType.START_PROCESS, + null, null, + null, null, 0, + "", "", RunMode.RUN_MODE_SERIAL, + Priority.LOW, 0, 110); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + verify(processDao, times(1)).createCommand(any(Command.class)); + }catch (Exception e){ + Assert.assertTrue(false); + } + } + + /** + * date error + * @throws ParseException + */ + @Test + public void testDateError() throws ParseException { + try { + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectName, + processDefinitionId, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + "", "", RunMode.RUN_MODE_SERIAL, + Priority.LOW, 0, 110); + Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); + verify(processDao, times(0)).createCommand(any(Command.class)); + }catch (Exception e){ + Assert.assertTrue(false); + } + } + + /** + * serial + * @throws ParseException + */ + @Test + public void testSerial() throws ParseException { + try { + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectName, + processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + "", "", RunMode.RUN_MODE_SERIAL, + Priority.LOW, 0, 110); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + verify(processDao, times(1)).createCommand(any(Command.class)); + }catch (Exception e){ + Assert.assertTrue(false); + } + } + + /** + * without schedule + * @throws ParseException + */ + @Test + public void testParallelWithOutSchedule() throws ParseException { + try{ + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectName, + processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + "", "", RunMode.RUN_MODE_PARALLEL, + Priority.LOW, 0, 110); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + verify(processDao, times(31)).createCommand(any(Command.class)); + }catch (Exception e){ + Assert.assertTrue(false); + } + } + + /** + * with schedule + * @throws ParseException + */ + @Test + public void testParallelWithSchedule() throws ParseException { + try{ + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList()); + Map result = executorService.execProcessInstance(loginUser, projectName, + processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + "", "", RunMode.RUN_MODE_PARALLEL, + Priority.LOW, 0, 110); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + verify(processDao, times(16)).createCommand(any(Command.class)); + }catch (Exception e){ + Assert.assertTrue(false); + } + } + + private List zeroSchedulerList(){ + return Collections.EMPTY_LIST; + } + + private List oneSchedulerList(){ + List schedulerList = new LinkedList<>(); + Schedule schedule = new Schedule(); + schedule.setCrontab("0 0 0 1/2 * ?"); + schedulerList.add(schedule); + return schedulerList; + } + + private Map checkProjectAndAuth(){ + Map result = new HashMap<>(); + result.put(Constants.STATUS, Status.SUCCESS); + return result; + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java new file mode 100644 index 0000000000..1c371e799e --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.enums; + +import com.baomidou.mybatisplus.annotation.EnumValue; + +/** + * Authorization type + */ +public enum AuthorizationType { + /** + * 0 RESOURCE_FILE; + * 1 DATASOURCE; + * 2 UDF; + */ + RESOURCE_FILE(0, "resource file"), + DATASOURCE(1, "data source"), + UDF(2, "udf function"); + + AuthorizationType(int code, String descp){ + this.code = code; + this.descp = descp; + } + + @EnumValue + private final int code; + private final String descp; + + public int getCode() { + return code; + } + + public String getDescp() { + return descp; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/ClickHouseDataSource.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/ClickHouseDataSource.java index 457ddb0a1d..fe76497ff8 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/ClickHouseDataSource.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/ClickHouseDataSource.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.job.db; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +59,7 @@ public class ClickHouseDataSource extends BaseDataSource { public void isConnectable() throws Exception { Connection con = null; try { - Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); + Class.forName(Constants.COM_CLICKHOUSE_JDBC_DRIVER); con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword()); } finally { if (con != null) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/HiveDataSource.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/HiveDataSource.java index 101efae793..cddedd1f73 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/HiveDataSource.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/HiveDataSource.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.job.db; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +62,7 @@ public class HiveDataSource extends BaseDataSource { public void isConnectable() throws Exception { Connection con = null; try { - Class.forName("org.apache.hive.jdbc.HiveDriver"); + Class.forName(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER); con = DriverManager.getConnection(getJdbcUrl(), getUser(), ""); } finally { if (con != null) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/MySQLDataSource.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/MySQLDataSource.java index 3cf2b2ce8c..fa149e67e2 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/MySQLDataSource.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/MySQLDataSource.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.job.db; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +57,7 @@ public class MySQLDataSource extends BaseDataSource { public void isConnectable() throws Exception { Connection con = null; try { - Class.forName("com.mysql.jdbc.Driver"); + Class.forName(Constants.COM_MYSQL_JDBC_DRIVER); con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword()); } finally { if (con != null) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/OracleDataSource.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/OracleDataSource.java index 551c7823cb..c3dc3a96df 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/OracleDataSource.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/OracleDataSource.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.job.db; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +59,7 @@ public class OracleDataSource extends BaseDataSource { public void isConnectable() throws Exception { Connection con = null; try { - Class.forName("oracle.jdbc.driver.OracleDriver"); + Class.forName(Constants.COM_ORACLE_JDBC_DRIVER); con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword()); } finally { if (con != null) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/PostgreDataSource.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/PostgreDataSource.java index 5241b4c7ef..4989e7681e 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/PostgreDataSource.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/PostgreDataSource.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.job.db; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +61,7 @@ public class PostgreDataSource extends BaseDataSource { public void isConnectable() throws Exception { Connection con = null; try { - Class.forName("org.postgresql.Driver"); + Class.forName(Constants.ORG_POSTGRESQL_DRIVER); con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword()); } finally { if (con != null) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SQLServerDataSource.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SQLServerDataSource.java index fe398eb88b..8554992efc 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SQLServerDataSource.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SQLServerDataSource.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.job.db; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +55,7 @@ public class SQLServerDataSource extends BaseDataSource { public void isConnectable() throws Exception { Connection con = null; try { - Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver"); + Class.forName(Constants.COM_SQLSERVER_JDBC_DRIVER); con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword()); } finally { if (con != null) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SparkDataSource.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SparkDataSource.java index eb455124de..5d10c63e5d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SparkDataSource.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SparkDataSource.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.job.db; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +64,7 @@ public class SparkDataSource extends BaseDataSource { public void isConnectable() throws Exception { Connection con = null; try { - Class.forName("org.apache.hive.jdbc.HiveDriver"); + Class.forName(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER); con = DriverManager.getConnection(getJdbcUrl(), getUser(), ""); } finally { if (con != null) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DateInterval.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DateInterval.java index 3519d5c535..9dc2f34cc1 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DateInterval.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DateInterval.java @@ -35,13 +35,12 @@ public class DateInterval { @Override public boolean equals(Object obj) { - try{ - DateInterval dateInterval = (DateInterval) obj; - return startTime.equals(dateInterval.getStartTime()) && - endTime.equals(dateInterval.getEndTime()); - }catch (Exception e){ - return false; - } + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DateInterval that = (DateInterval) o; + return startTime.equals(that.startTime) && + endTime.equals(that.endTime); + } public Date getStartTime() { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java index 1c58f879a0..cad6914cb8 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java @@ -25,7 +25,7 @@ public class Stopper { private static volatile AtomicBoolean signal = new AtomicBoolean(false); - public static final boolean isStoped(){ + public static final boolean isStopped(){ return signal.get(); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java index afc3c44825..541281f793 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java @@ -119,7 +119,9 @@ public class HadoopUtils implements Closeable { fsRelatedProps.forEach((key, value) -> configuration.set(key, value)); }else{ logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS ); - throw new RuntimeException("property:{} can not to be empty, please set!"); + throw new RuntimeException( + String.format("property: %s can not to be empty, please set!", Constants.FS_DEFAULTFS) + ); } }else{ logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS); @@ -219,10 +221,12 @@ public class HadoopUtils implements Closeable { return null; } - FSDataInputStream in = fs.open(new Path(hdfsFilePath)); - BufferedReader br = new BufferedReader(new InputStreamReader(in)); - Stream stream = br.lines().skip(skipLineNums).limit(limit); - return stream.collect(Collectors.toList()); + try (FSDataInputStream in = fs.open(new Path(hdfsFilePath))){ + BufferedReader br = new BufferedReader(new InputStreamReader(in)); + Stream stream = br.lines().skip(skipLineNums).limit(limit); + return stream.collect(Collectors.toList()); + } + } /** diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java index a7e1334ad1..f62e106680 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java @@ -115,21 +115,19 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ } } - /** * create zookeeper path according the zk node type. * @param zkNodeType zookeeper node type * @return register zookeeper path * @throws Exception */ - private String createZNodePath(ZKNodeType zkNodeType) throws Exception { + private String createZNodePath(ZKNodeType zkNodeType, String host) throws Exception { // specify the format of stored data in ZK nodes String heartbeatZKInfo = ResInfo.getHeartBeatInfo(new Date()); // create temporary sequence nodes for master znode - String parentPath = getZNodeParentPath(zkNodeType); - String serverPathPrefix = parentPath + "/" + OSUtils.getHost(); - String registerPath = serverPathPrefix + UNDERLINE; - super.persistEphemeral(registerPath, heartbeatZKInfo); + String registerPath= getZNodeParentPath(zkNodeType) + SINGLE_SLASH + host; + + super.persistEphemeral(registerPath, heartbeatZKInfo); logger.info("register {} node {} success" , zkNodeType.toString(), registerPath); return registerPath; } @@ -148,7 +146,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ zkNodeType.toString(), host); return registerPath; } - registerPath = createZNodePath(zkNodeType); + registerPath = createZNodePath(zkNodeType, host); // handle dead server handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP); @@ -166,25 +164,19 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ * @throws Exception errors */ public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception { - //ip_sequenceno - String[] zNodesPath = zNode.split("\\/"); - String ipSeqNo = zNodesPath[zNodesPath.length - 1]; - + String host = getHostByEventDataPath(zNode); String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX; - //check server restart, if restart , dead server path in zk should be delete if(opType.equals(DELETE_ZK_OP)){ - String[] ipAndSeqNo = ipSeqNo.split(UNDERLINE); - String ip = ipAndSeqNo[0]; - removeDeadServerByHost(ip, type); + removeDeadServerByHost(host, type); }else if(opType.equals(ADD_ZK_OP)){ - String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo; + String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host; if(!super.isExisted(deadServerPath)){ //add dead server info to zk dead server path : /dead-servers/ - super.persist(deadServerPath,(type + UNDERLINE + ipSeqNo)); + super.persist(deadServerPath,(type + UNDERLINE + host)); logger.info("{} server dead , and {} added to zk dead server path success" , zkNodeType.toString(), zNode); @@ -285,21 +277,13 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ } Map serverMaps = getServerMaps(zkNodeType); for(String hostKey : serverMaps.keySet()){ - if(hostKey.startsWith(host + UNDERLINE)){ + if(hostKey.startsWith(host)){ return true; } } return false; } - /** - * get zkclient - * @return zookeeper client - */ - public CuratorFramework getZkClient() { - return zkClient; - } - /** * * @return get worker node parent path @@ -436,19 +420,22 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{ } /** - * get host ip, string format: masterParentPath/ip_000001/value + * get host ip, string format: masterParentPath/ip * @param path path - * @return host ip, string format: masterParentPath/ip_000001/value + * @return host ip, string format: masterParentPath/ip */ protected String getHostByEventDataPath(String path) { - int startIndex = path.lastIndexOf("/")+1; - int endIndex = path.lastIndexOf("_"); - - if(startIndex >= endIndex){ - logger.error("parse ip error"); + if(StringUtils.isEmpty(path)){ + logger.error("empty path!"); + return ""; + } + String[] pathArray = path.split(SINGLE_SLASH); + if(pathArray.length < 1){ + logger.error("parse ip error: {}", path); return ""; } - return path.substring(startIndex, endIndex); + return pathArray[pathArray.length - 1]; + } /** * acquire zk lock diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java index daec765315..5aa25552d7 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java @@ -77,6 +77,7 @@ public class ZookeeperCachedOperator extends ZookeeperOperator { return treeCache; } + @Override public void close() { treeCache.close(); try { diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/threadutils/ThreadUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/threadutils/ThreadUtilsTest.java index 52c8031e75..2c76f40c0b 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/threadutils/ThreadUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/threadutils/ThreadUtilsTest.java @@ -98,7 +98,7 @@ public class ThreadUtilsTest { public void testStopper() { assertTrue(Stopper.isRunning()); Stopper.stop(); - assertTrue(Stopper.isStoped()); + assertTrue(Stopper.isStopped()); } /** diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java index a45fbfff6f..820b2fdaf4 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.dao; import com.alibaba.fastjson.JSONObject; import com.cronutils.model.Cron; +import org.apache.commons.lang.ArrayUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; import org.apache.dolphinscheduler.common.model.DateInterval; @@ -25,7 +26,6 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.queue.ITaskQueue; import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters; -import org.apache.dolphinscheduler.common.utils.ArrayUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.IpUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -44,6 +44,7 @@ import org.springframework.transaction.annotation.Transactional; import java.util.*; import java.util.stream.Collectors; +import static java.util.stream.Collectors.toSet; import static org.apache.dolphinscheduler.common.Constants.*; /** @@ -462,12 +463,9 @@ public class ProcessDao { return null; } - if(null == tenant){ + if(tenant == null){ User user = userMapper.selectById(userId); - - if (null != user) { - tenant = tenantMapper.queryById(user.getTenantId()); - } + tenant = tenantMapper.queryById(user.getTenantId()); } return tenant; } @@ -974,6 +972,9 @@ public class ProcessDao { public Boolean submitTaskToQueue(TaskInstance taskInstance) { try{ + if(taskInstance.isSubProcess()){ + return true; + } if(taskInstance.getState().typeIsFinished()){ logger.info(String.format("submit to task queue, but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString())); return true; @@ -1460,6 +1461,15 @@ public class ProcessDao { return scheduleMapper.selectById(id); } + /** + * query Schedule by processDefinitionId + * @param processDefinitionId processDefinitionId + * @see Schedule + */ + public List queryReleaseSchedulerListByProcessDefinitionId(int processDefinitionId) { + return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId); + } + /** * query need failover process instance * @param host host @@ -1770,5 +1780,47 @@ public class ProcessDao { return projectIdList; } + /** + * list unauthorized udf function + * @param userId user id + * @param needChecks data source id array + * @return unauthorized udf function list + */ + public List listUnauthorized(int userId,T[] needChecks,AuthorizationType authorizationType){ + List resultList = new ArrayList(); + + if (!ArrayUtils.isEmpty(needChecks)) { + Set originResSet = new HashSet(Arrays.asList(needChecks)); + + switch (authorizationType){ + case RESOURCE_FILE: + Set authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t -> t.getAlias()).collect(toSet()); + originResSet.removeAll(authorizedResources); + break; + case DATASOURCE: + Set authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId,needChecks).stream().map(t -> t.getId()).collect(toSet()); + originResSet.removeAll(authorizedDatasources); + break; + case UDF: + Set authorizedUdfs = udfFuncMapper.listAuthorizedUdfFunc(userId, needChecks).stream().map(t -> t.getId()).collect(toSet()); + originResSet.removeAll(authorizedUdfs); + break; + } + + resultList.addAll(originResSet); + } + + return resultList; + } + + /** + * get user by user id + * @param userId user id + * @return User + */ + public User getUserById(int userId){ + return userMapper.queryDetailsById(userId); + } + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java index 2b4944dd61..f95fbc7a4d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java @@ -77,4 +77,13 @@ public interface DataSourceMapper extends BaseMapper { List listAllDataSourceByType(@Param("type") Integer type); + /** + * list authorized UDF function + * @param userId userId + * @param dataSourceIds data source id array + * @return UDF function list + */ + List listAuthorizedDataSource(@Param("userId") int userId,@Param("dataSourceIds")T[] dataSourceIds); + + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java index 4e3d9c3f45..cf65e5d08a 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java @@ -83,4 +83,12 @@ public interface ResourceMapper extends BaseMapper { * @return tenant code */ String queryTenantCodeByResourceName(@Param("resName") String resName); + + /** + * list authorized resource + * @param userId userId + * @param resNames resource names + * @return resource list + */ + List listAuthorizedResource(@Param("userId") int userId,@Param("resNames")T[] resNames); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java index 3a1d125f48..8a49c8ff4f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java @@ -60,4 +60,11 @@ public interface ScheduleMapper extends BaseMapper { */ List queryByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId); + /** + * query schedule list by process definition id + * @param processDefinitionId + * @return + */ + List queryReleaseSchedulerListByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId); + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java index 03ad58da86..5a8734233c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java @@ -78,5 +78,12 @@ public interface UdfFuncMapper extends BaseMapper { */ List queryAuthedUdfFunc(@Param("userId") int userId); + /** + * list authorized UDF function + * @param userId userId + * @param udfIds UDF function id array + * @return UDF function list + */ + List listAuthorizedUdfFunc (@Param("userId") int userId,@Param("udfIds")T[] udfIds); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/permission/PermissionCheck.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/permission/PermissionCheck.java new file mode 100644 index 0000000000..63d4c1c8af --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/permission/PermissionCheck.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.dao.permission; + +import org.apache.dolphinscheduler.common.enums.AuthorizationType; +import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.dao.ProcessDao; +import org.apache.dolphinscheduler.dao.entity.User; +import org.slf4j.Logger; + +import java.util.List; + +public class PermissionCheck { + /** + * logger + */ + private Logger logger; + /** + * Authorization Type + */ + private AuthorizationType authorizationType; + + /** + * Authorization Type + */ + private ProcessDao processDao; + + /** + * need check array + */ + private T[] needChecks; + + /** + * user id + */ + private int userId; + + /** + * permission check + * @param authorizationType authorization type + * @param processDao process dao + */ + public PermissionCheck(AuthorizationType authorizationType, ProcessDao processDao) { + this.authorizationType = authorizationType; + this.processDao = processDao; + } + + /** + * permission check + * @param authorizationType + * @param processDao + * @param needChecks + * @param userId + */ + public PermissionCheck(AuthorizationType authorizationType, ProcessDao processDao, T[] needChecks, int userId) { + this.authorizationType = authorizationType; + this.processDao = processDao; + this.needChecks = needChecks; + this.userId = userId; + } + + /** + * permission check + * @param authorizationType + * @param processDao + * @param needChecks + * @param userId + * @param logger + */ + public PermissionCheck(AuthorizationType authorizationType, ProcessDao processDao, T[] needChecks, int userId,Logger logger) { + this.authorizationType = authorizationType; + this.processDao = processDao; + this.needChecks = needChecks; + this.userId = userId; + this.logger = logger; + } + + public AuthorizationType getAuthorizationType() { + return authorizationType; + } + + public void setAuthorizationType(AuthorizationType authorizationType) { + this.authorizationType = authorizationType; + } + + public ProcessDao getProcessDao() { + return processDao; + } + + public void setProcessDao(ProcessDao processDao) { + this.processDao = processDao; + } + + public T[] getNeedChecks() { + return needChecks; + } + + public void setNeedChecks(T[] needChecks) { + this.needChecks = needChecks; + } + + public int getUserId() { + return userId; + } + + public void setUserId(int userId) { + this.userId = userId; + } + + /** + * has permission + * @return true if has permission + */ + public boolean hasPermission(){ + try { + checkPermission(); + return true; + } catch (Exception e) { + return false; + } + } + + /** + * check permission + * @throws Exception exception + */ + public void checkPermission() throws Exception{ + if(this.needChecks.length > 0){ + // get user type in order to judge whether the user is admin + User user = processDao.getUserById(userId); + if (user.getUserType() != UserType.ADMIN_USER){ + List unauthorizedList = processDao.listUnauthorized(userId,needChecks,authorizationType); + // if exist unauthorized resource + if(CollectionUtils.isNotEmpty(unauthorizedList)){ + logger.error("user {} didn't has permission of {}: {}", user.getUserName(), authorizationType.getDescp(),unauthorizedList.toString()); + throw new RuntimeException(String.format("user %s didn't has permission of %s %s", user.getUserName(), authorizationType.getDescp(), unauthorizedList.get(0))); + } + } + } + } + +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index 26d0f1e8e2..ac38ddd2e8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -96,7 +96,13 @@ public class DagHelper { for (String startNodeName : startNodeList) { TaskNode startNode = findNodeByName(taskNodeList, startNodeName); List childNodeList = new ArrayList<>(); - if (TaskDependType.TASK_POST == taskDependType) { + if (startNode == null) { + logger.error("start node name [{}] is not in task node list [{}] ", + startNodeName, + taskNodeList + ); + continue; + } else if (TaskDependType.TASK_POST == taskDependType) { childNodeList = getFlowNodeListPost(startNode, taskNodeList); } else if (TaskDependType.TASK_PRE == taskDependType) { childNodeList = getFlowNodeListPre(startNode, recoveryNodeNameList, taskNodeList); @@ -129,7 +135,6 @@ public class DagHelper { if (null != depList && null != startNode && depList.contains(startNode.getName())) { resultList.addAll(getFlowNodeListPost(taskNode, taskNodeList)); } - } resultList.add(startNode); return resultList; diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml index b296d5fc3e..15536ae652 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml @@ -74,6 +74,19 @@ from t_ds_datasource where type = #{type} + \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml index 402c864251..ddae96a509 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml @@ -55,4 +55,9 @@ from t_ds_schedules where process_definition_id =#{processDefinitionId} + \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml index 8a041babf0..0aa10607c4 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml @@ -74,4 +74,17 @@ WHERE u.id = rel.udf_id AND rel.user_id = #{userId} + \ No newline at end of file diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapperTest.java index c826236239..92df6cc45c 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapperTest.java @@ -17,12 +17,14 @@ package org.apache.dolphinscheduler.dao.mapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.DatasourceUser; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import org.apache.dolphinscheduler.dao.entity.User; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -34,7 +36,9 @@ import org.springframework.transaction.annotation.Transactional; import java.util.*; -import static org.hamcrest.Matchers.*; +import static java.util.stream.Collectors.toList; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.*; /** @@ -58,6 +62,9 @@ public class DataSourceMapperTest { @Autowired DataSourceUserMapper dataSourceUserMapper; + @Autowired + private UserMapper userMapper; + /** * test insert */ @@ -244,6 +251,33 @@ public class DataSourceMapperTest { } } + @Test + public void testListAuthorizedDataSource(){ + //create general user + User generalUser1 = createGeneralUser("user1"); + User generalUser2 = createGeneralUser("user2"); + + //create data source + DataSource dataSource = createDataSource(generalUser1.getId(), "ds-1"); + DataSource unauthorizdDataSource = createDataSource(generalUser2.getId(), "ds-2"); + + + //data source ids + Integer[] dataSourceIds = new Integer[]{dataSource.getId(),unauthorizdDataSource.getId()}; + + List authorizedDataSource = dataSourceMapper.listAuthorizedDataSource(generalUser1.getId(), dataSourceIds); + + Assert.assertEquals(generalUser1.getId(),dataSource.getUserId()); + Assert.assertNotEquals(generalUser1.getId(),unauthorizdDataSource.getUserId()); + Assert.assertFalse(authorizedDataSource.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(dataSourceIds))); + + //authorize object unauthorizdDataSource to generalUser1 + createUserDataSource(generalUser1, unauthorizdDataSource); + authorizedDataSource = dataSourceMapper.listAuthorizedDataSource(generalUser1.getId(), dataSourceIds); + + Assert.assertTrue(authorizedDataSource.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(dataSourceIds))); + } + /** * create datasource relation * @param userId @@ -289,7 +323,6 @@ public class DataSourceMapperTest { return dataSourceMap; } - /** * create datasource * @return datasource @@ -330,5 +363,41 @@ public class DataSourceMapperTest { return dataSource; } + /** + * create general user + * @return User + */ + private User createGeneralUser(String userName){ + User user = new User(); + user.setUserName(userName); + user.setUserPassword("1"); + user.setEmail("xx@123.com"); + user.setUserType(UserType.GENERAL_USER); + user.setCreateTime(new Date()); + user.setTenantId(1); + user.setUpdateTime(new Date()); + userMapper.insert(user); + return user; + } + + /** + * create the relation of user and data source + * @param user user + * @param dataSource data source + * @return DatasourceUser + */ + private DatasourceUser createUserDataSource(User user,DataSource dataSource){ + DatasourceUser datasourceUser = new DatasourceUser(); + + datasourceUser.setDatasourceId(dataSource.getId()); + datasourceUser.setUserId(user.getId()); + datasourceUser.setPerm(7); + datasourceUser.setCreateTime(DateUtils.getCurrentDate()); + datasourceUser.setUpdateTime(DateUtils.getCurrentDate()); + + dataSourceUserMapper.insert(datasourceUser); + return datasourceUser; + } + } \ No newline at end of file diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java index 7c0101612c..aaf5129c02 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java @@ -17,22 +17,36 @@ package org.apache.dolphinscheduler.dao.mapper; -import org.apache.dolphinscheduler.common.enums.ResourceType; -import org.apache.dolphinscheduler.dao.entity.*; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.dao.entity.Resource; +import org.apache.dolphinscheduler.dao.entity.ResourcesUser; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.User; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.transaction.annotation.Transactional; +import java.util.Arrays; import java.util.Date; import java.util.List; +import static java.util.stream.Collectors.toList; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; + @RunWith(SpringRunner.class) @SpringBootTest +@Transactional +@Rollback(true) public class ResourceMapperTest { @Autowired @@ -61,6 +75,59 @@ public class ResourceMapperTest { return resource; } + /** + * create resource by user + * @param user user + * @return Resource + */ + private Resource createResource(User user){ + //insertOne + Resource resource = new Resource(); + resource.setAlias(String.format("ut resource %s",user.getUserName())); + resource.setType(ResourceType.FILE); + resource.setUserId(user.getId()); + resourceMapper.insert(resource); + return resource; + } + + /** + * create user + * @return User + */ + private User createGeneralUser(String userName){ + User user = new User(); + user.setUserName(userName); + user.setUserPassword("1"); + user.setEmail("xx@123.com"); + user.setUserType(UserType.GENERAL_USER); + user.setCreateTime(new Date()); + user.setTenantId(1); + user.setUpdateTime(new Date()); + userMapper.insert(user); + return user; + } + + /** + * create resource user + * @return ResourcesUser + */ + private ResourcesUser createResourcesUser(Resource resource,User user){ + //insertOne + ResourcesUser resourcesUser = new ResourcesUser(); + resourcesUser.setCreateTime(new Date()); + resourcesUser.setUpdateTime(new Date()); + resourcesUser.setUserId(user.getId()); + resourcesUser.setResourcesId(resource.getId()); + resourceUserMapper.insert(resourcesUser); + return resourcesUser; + } + + @Test + public void testInsert(){ + Resource resource = insertOne(); + assertNotNull(resource.getId()); + assertThat(resource.getId(),greaterThan(0)); + } /** * test update */ @@ -230,4 +297,30 @@ public class ResourceMapperTest { resourceMapper.deleteById(resource.getId()); } + + @Test + public void testListAuthorizedResource(){ + // create a general user + User generalUser1 = createGeneralUser("user1"); + User generalUser2 = createGeneralUser("user2"); + // create one resource + Resource resource = createResource(generalUser2); + Resource unauthorizedResource = createResource(generalUser2); + + // need download resources + String[] resNames = new String[]{resource.getAlias(), unauthorizedResource.getAlias()}; + + List resources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames); + + Assert.assertEquals(generalUser2.getId(),resource.getUserId()); + Assert.assertFalse(resources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames))); + + + + // authorize object unauthorizedResource to generalUser + createResourcesUser(unauthorizedResource,generalUser2); + List authorizedResources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames); + Assert.assertTrue(authorizedResources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames))); + + } } \ No newline at end of file diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java index d608c841c7..0dd06484d8 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java @@ -29,13 +29,20 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.transaction.annotation.Transactional; +import java.util.Arrays; import java.util.Date; import java.util.List; +import static java.util.stream.Collectors.toList; + @RunWith(SpringRunner.class) @SpringBootTest +@Transactional +@Rollback(true) public class UdfFuncMapperTest { @Autowired @@ -133,6 +140,23 @@ public class UdfFuncMapperTest { return udfUser; } + /** + * create general user + * @return User + */ + private User createGeneralUser(String userName){ + User user = new User(); + user.setUserName(userName); + user.setUserPassword("1"); + user.setEmail("xx@123.com"); + user.setUserType(UserType.GENERAL_USER); + user.setCreateTime(new Date()); + user.setTenantId(1); + user.setUpdateTime(new Date()); + userMapper.insert(user); + return user; + } + /** * test update */ @@ -268,4 +292,30 @@ public class UdfFuncMapperTest { udfUserMapper.deleteById(udfUser.getId()); Assert.assertNotEquals(udfFuncList.size(), 0); } + + @Test + public void testListAuthorizedUdfFunc(){ + //create general user + User generalUser1 = createGeneralUser("user1"); + User generalUser2 = createGeneralUser("user2"); + + //create udf function + UdfFunc udfFunc = insertOne(generalUser1); + UdfFunc unauthorizdUdfFunc = insertOne(generalUser2); + + //udf function ids + Integer[] udfFuncIds = new Integer[]{udfFunc.getId(),unauthorizdUdfFunc.getId()}; + + List authorizedUdfFunc = udfFuncMapper.listAuthorizedUdfFunc(generalUser1.getId(), udfFuncIds); + + Assert.assertEquals(generalUser1.getId(),udfFunc.getUserId()); + Assert.assertNotEquals(generalUser1.getId(),unauthorizdUdfFunc.getUserId()); + Assert.assertFalse(authorizedUdfFunc.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(udfFuncIds))); + + + //authorize object unauthorizdUdfFunc to generalUser1 + insertOneUDFUser(generalUser1,unauthorizdUdfFunc); + authorizedUdfFunc = udfFuncMapper.listAuthorizedUdfFunc(generalUser1.getId(), udfFuncIds); + Assert.assertTrue(authorizedUdfFunc.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(udfFuncIds))); + } } \ No newline at end of file diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml index 2ccc880a41..751fd919a8 100644 --- a/dolphinscheduler-server/pom.xml +++ b/dolphinscheduler-server/pom.xml @@ -111,6 +111,27 @@ dolphinscheduler-alert + + org.powermock + powermock-module-junit4 + test + + + org.powermock + powermock-api-mockito2 + test + + + org.mockito + mockito-core + + + + + org.mockito + mockito-core + test + diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 2e52b83464..65c5607af7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -173,7 +173,7 @@ public class MasterServer implements IStoppable { try { //execute only once - if(Stopper.isStoped()){ + if(Stopper.isStopped()){ return; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index a91f8c17e6..84b1114b84 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner; import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Lists; import org.apache.commons.io.FileUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; @@ -29,10 +30,12 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.AlertManager; +import org.apache.dolphinscheduler.server.utils.ScheduleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -203,10 +206,30 @@ public class MasterExecThread implements Runnable { Date startDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); Date endDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); processDao.saveProcessInstance(processInstance); - Date scheduleDate = processInstance.getScheduleTime(); - if(scheduleDate == null){ - scheduleDate = startDate; + // get schedules + int processDefinitionId = processInstance.getProcessDefinitionId(); + List schedules = processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId); + List listDate = Lists.newLinkedList(); + if(!CollectionUtils.isEmpty(schedules)){ + for (Schedule schedule : schedules) { + List list = ScheduleUtils.getRecentTriggerTime(schedule.getCrontab(), startDate, endDate); + listDate.addAll(list); + } + } + // get first fire date + Iterator iterator = null; + Date scheduleDate = null; + if(!CollectionUtils.isEmpty(listDate)) { + iterator = listDate.iterator(); + scheduleDate = iterator.next(); + processInstance.setScheduleTime(scheduleDate); + processDao.updateProcessInstance(processInstance); + }else{ + scheduleDate = processInstance.getScheduleTime(); + if(scheduleDate == null){ + scheduleDate = startDate; + } } while(Stopper.isRunning()){ @@ -231,12 +254,23 @@ public class MasterExecThread implements Runnable { break; } - // current process instance sucess ,next execute - scheduleDate = DateUtils.getSomeDay(scheduleDate, 1); - if(scheduleDate.after(endDate)){ - // all success - logger.info("process {} complement completely!", processInstance.getId()); - break; + // current process instance success ,next execute + if(null == iterator){ + // loop by day + scheduleDate = DateUtils.getSomeDay(scheduleDate, 1); + if(scheduleDate.after(endDate)){ + // all success + logger.info("process {} complement completely!", processInstance.getId()); + break; + } + }else{ + // loop by schedule date + if(!iterator.hasNext()){ + // all success + logger.info("process {} complement completely!", processInstance.getId()); + break; + } + scheduleDate = iterator.next(); } logger.info("process {} start to complement {} data", @@ -541,7 +575,7 @@ public class MasterExecThread implements Runnable { private DependResult isTaskDepsComplete(String taskName) { Collection startNodes = dag.getBeginNode(); - // ff the vertex returns true directly + // if the vertex returns true directly if(startNodes.contains(taskName)){ return DependResult.SUCCESS; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ScheduleUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ScheduleUtils.java new file mode 100644 index 0000000000..11730b9545 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ScheduleUtils.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.utils; + +import org.quartz.impl.triggers.CronTriggerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; + +/** + * ScheduleUtils + */ +public class ScheduleUtils { + + private static final Logger logger = LoggerFactory.getLogger(ScheduleUtils.class); + + /** + * Get the execution time of the time interval + * @param cron + * @param from + * @param to + * @return + */ + public static List getRecentTriggerTime(String cron, Date from, Date to) { + return getRecentTriggerTime(cron, Integer.MAX_VALUE, from, to); + } + + /** + * Get the execution time of the time interval + * @param cron + * @param size + * @param from + * @param to + * @return + */ + public static List getRecentTriggerTime(String cron, int size, Date from, Date to) { + List list = new LinkedList(); + if(to.before(from)){ + logger.error("schedule date from:{} must before date to:{}!", from, to); + return list; + } + try { + CronTriggerImpl trigger = new CronTriggerImpl(); + trigger.setCronExpression(cron); + trigger.setStartTime(from); + trigger.setEndTime(to); + trigger.computeFirstFireTime(null); + for (int i = 0; i < size; i++) { + Date schedule = trigger.getNextFireTime(); + if(null == schedule){ + break; + } + list.add(schedule); + trigger.triggered(null); + } + } catch (ParseException e) { + logger.error("cron:{} error:{}", cron, e.getMessage()); + } + return java.util.Collections.unmodifiableList(list); + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index b3301d1825..d270880408 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -201,7 +201,7 @@ public class WorkerServer implements IStoppable { try { //execute only once - if(Stopper.isStoped()){ + if(Stopper.isStopped()){ return; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index bb7a773d48..5f66c3477d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -21,6 +21,7 @@ import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.sift.SiftingAppender; import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.permission.PermissionCheck; import org.apache.dolphinscheduler.server.utils.LoggerUtils; import org.apache.dolphinscheduler.server.worker.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; @@ -42,7 +44,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.IOException; import java.util.*; import java.util.stream.Collectors; @@ -94,12 +95,15 @@ public class TaskScheduleThread implements Runnable { // task node TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class); + // get resource files + List resourceFiles = createProjectResFiles(taskNode); // copy hdfs/minio file to local - copyHdfsToLocal(processDao, + downloadResource( taskInstance.getExecutePath(), - createProjectResFiles(taskNode), + resourceFiles, logger); + // get process instance according to tak instance ProcessInstance processInstance = taskInstance.getProcessInstance(); @@ -204,8 +208,8 @@ public class TaskScheduleThread implements Runnable { } /** - * get task log path - * @return + * get task log path + * @return log path */ private String getTaskLogPath() { String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory()) @@ -294,14 +298,14 @@ public class TaskScheduleThread implements Runnable { } /** - * copy hdfs file to local + * download resource file * - * @param processDao * @param execLocalPath * @param projectRes * @param logger */ - private void copyHdfsToLocal(ProcessDao processDao, String execLocalPath, List projectRes, Logger logger) throws IOException { + private void downloadResource(String execLocalPath, List projectRes, Logger logger) throws Exception { + checkDownloadPermission(projectRes); for (String res : projectRes) { File resFile = new File(execLocalPath, res); if (!resFile.exists()) { @@ -321,4 +325,16 @@ public class TaskScheduleThread implements Runnable { } } } + + /** + * check download resource permission + * @param projectRes resource name list + * @throws Exception exception + */ + private void checkDownloadPermission(List projectRes) throws Exception { + int userId = taskInstance.getProcessInstance().getExecutorId(); + String[] resNames = projectRes.toArray(new String[projectRes.size()]); + PermissionCheck permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE,processDao,resNames,userId,logger); + permissionCheck.checkPermission(); + } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 7a14223b95..8774186bc5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -30,10 +30,7 @@ import org.slf4j.Logger; import java.io.*; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.List; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -207,7 +204,14 @@ public abstract class AbstractCommandExecutor { // merge error information to standard output stream processBuilder.redirectErrorStream(true); // setting up user to run commands - processBuilder.command("sudo", "-u", tenantCode, commandType(), commandFile); + List command = new LinkedList<>(); + command.add("sudo"); + command.add("-u"); + command.add(tenantCode); + command.add(commandInterpreter()); + command.addAll(commandOptions()); + command.add(commandFile); + processBuilder.command(command); process = processBuilder.start(); @@ -559,9 +563,11 @@ public abstract class AbstractCommandExecutor { } } - + protected List commandOptions() { + return Collections.emptyList(); + } protected abstract String buildCommandFilePath(); - protected abstract String commandType(); + protected abstract String commandInterpreter(); protected abstract boolean checkFindApp(String line); protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java index c1ff89d4cf..a673134488 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java @@ -26,6 +26,7 @@ import java.io.*; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.function.Consumer; @@ -108,12 +109,22 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { } } + /** + * get command options + * @return command options list + */ + @Override + protected List commandOptions() { + // unbuffered binary stdout and stderr + return Collections.singletonList("-u"); + } + /** * get python home * @return python home */ @Override - protected String commandType() { + protected String commandInterpreter() { String pythonHome = getPythonHome(envFile); if (StringUtils.isEmpty(pythonHome)){ return PYTHON; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java index d1a7fa2258..db46d0d856 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java @@ -74,7 +74,7 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { * @return command type */ @Override - protected String commandType() { + protected String commandInterpreter() { return SH; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index bacb3f22eb..a2f07c8be3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -19,8 +19,11 @@ package org.apache.dolphinscheduler.server.worker.task.sql; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.alert.utils.MailUtils; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.ShowType; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.enums.UdfType; @@ -38,6 +41,7 @@ import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.permission.PermissionCheck; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.UDFUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; @@ -101,7 +105,7 @@ public class SqlTask extends AbstractTask { // set the name of the current thread String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId()); Thread.currentThread().setName(threadLoggerInfoName); - logger.info(sqlParameters.toString()); + logger.info("Full sql parameters: {}", sqlParameters); logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}", sqlParameters.getType(), sqlParameters.getDatasource(), @@ -119,13 +123,6 @@ public class SqlTask extends AbstractTask { } dataSource= processDao.findDataSourceById(sqlParameters.getDatasource()); - - if (null == dataSource){ - logger.error("datasource not exists"); - exitStatusCode = -1; - return; - } - logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}", dataSource.getName(), dataSource.getType(), @@ -133,6 +130,12 @@ public class SqlTask extends AbstractTask { dataSource.getUserId(), dataSource.getConnectionParams()); + if (dataSource == null){ + logger.error("datasource not exists"); + exitStatusCode = -1; + return; + } + Connection con = null; List createFuncs = null; try { @@ -164,6 +167,8 @@ public class SqlTask extends AbstractTask { for(int i=0;i udfFuncList = processDao.queryUdfFunListByids(idsArray); createFuncs = UDFUtils.createFuncs(udfFuncList, taskProps.getTenantCode(), logger); } @@ -284,12 +289,12 @@ public class SqlTask extends AbstractTask { } } - try (PreparedStatement stmt = prepareStatementAndBind(connection, mainSqlBinds)) { + try (PreparedStatement stmt = prepareStatementAndBind(connection, mainSqlBinds); + ResultSet resultSet = stmt.executeQuery()) { // decide whether to executeQuery or executeUpdate based on sqlType if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) { // query statements need to be convert to JsonArray and inserted into Alert to send JSONArray resultJSONArray = new JSONArray(); - ResultSet resultSet = stmt.executeQuery(); ResultSetMetaData md = resultSet.getMetaData(); int num = md.getColumnCount(); @@ -300,11 +305,10 @@ public class SqlTask extends AbstractTask { } resultJSONArray.add(mapOfColValues); } - resultSet.close(); logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); // if there is a result set - if (resultJSONArray.size() > 0) { + if ( !resultJSONArray.isEmpty() ) { if (StringUtils.isNotEmpty(sqlParameters.getTitle())) { sendAttachment(sqlParameters.getTitle(), JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); @@ -332,6 +336,12 @@ public class SqlTask extends AbstractTask { } catch (Exception e) { logger.error(e.getMessage(),e); throw new RuntimeException(e.getMessage()); + } finally { + try { + connection.close(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } } return connection; } @@ -344,22 +354,23 @@ public class SqlTask extends AbstractTask { * @throws Exception */ private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) throws Exception { - PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql()); // is the timeout set boolean timeoutFlag = taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED; - if(timeoutFlag){ - stmt.setQueryTimeout(taskProps.getTaskTimeout()); - } - Map params = sqlBinds.getParamsMap(); - if(params != null) { - for (Map.Entry entry : params.entrySet()) { - Property prop = entry.getValue(); - ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue()); + try (PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql())) { + if(timeoutFlag){ + stmt.setQueryTimeout(taskProps.getTaskTimeout()); } + Map params = sqlBinds.getParamsMap(); + if(params != null) { + for (Map.Entry entry : params.entrySet()) { + Property prop = entry.getValue(); + ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue()); + } + } + logger.info("prepare statement replace sql : {} ", stmt); + return stmt; } - logger.info("prepare statement replace sql : {} ",stmt.toString()); - return stmt; } /** @@ -447,6 +458,35 @@ public class SqlTask extends AbstractTask { for(int i=1;i<=sqlParamsMap.size();i++){ logPrint.append(sqlParamsMap.get(i).getValue()+"("+sqlParamsMap.get(i).getType()+")"); } - logger.info(logPrint.toString()); + logger.info("Sql Params are {}", logPrint); + } + + /** + * check udf function permission + * @param udfFunIds udf functions + * @return if has download permission return true else false + */ + private void checkUdfPermission(Integer[] udfFunIds) throws Exception{ + // process instance + ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); + int userId = processInstance.getExecutorId(); + + PermissionCheck permissionCheckUdf = new PermissionCheck(AuthorizationType.UDF,processDao,udfFunIds,userId,logger); + permissionCheckUdf.checkPermission(); } + + /** + * check data source permission + * @param dataSourceId data source id + * @return if has download permission return true else false + */ + private void checkDataSourcePermission(int dataSourceId) throws Exception{ + // process instance + ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); + int userId = processInstance.getExecutorId(); + + PermissionCheck permissionCheckDataSource = new PermissionCheck(AuthorizationType.DATASOURCE,processDao,new Integer[]{dataSourceId},userId,logger); + permissionCheckDataSource.checkPermission(); + } + } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java new file mode 100644 index 0000000000..6f31e66213 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.master; + +import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.graph.DAG; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.SpringApplicationContext; +import org.apache.dolphinscheduler.dao.ProcessDao; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.runner.MasterExecThread; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.springframework.context.ApplicationContext; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.text.ParseException; +import java.util.*; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.mock; + +/** + * test for MasterExecThread + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({MasterExecThread.class}) +public class MasterExecThreadTest { + + private MasterExecThread masterExecThread; + + private ProcessInstance processInstance; + + private ProcessDao processDao; + + private int processDefinitionId = 1; + + private MasterConfig config; + + private ApplicationContext applicationContext; + + @Before + public void init() throws Exception{ + processDao = mock(ProcessDao.class); + + applicationContext = mock(ApplicationContext.class); + config = new MasterConfig(); + config.setMasterExecTaskNum(1); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); + + processInstance = mock(ProcessInstance.class); + Mockito.when(processInstance.getProcessDefinitionId()).thenReturn(processDefinitionId); + Mockito.when(processInstance.getState()).thenReturn(ExecutionStatus.SUCCESS); + Mockito.when(processInstance.getHistoryCmd()).thenReturn(CommandType.COMPLEMENT_DATA.toString()); + Mockito.when(processInstance.getIsSubProcess()).thenReturn(Flag.NO); + Mockito.when(processInstance.getScheduleTime()).thenReturn(DateUtils.stringToDate("2020-01-01 00:00:00")); + Map cmdParam = new HashMap<>(); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, "2020-01-01 00:00:00"); + cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, "2020-01-31 23:00:00"); + Mockito.when(processInstance.getCommandParam()).thenReturn(JSONObject.toJSONString(cmdParam)); + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setGlobalParamMap(Collections.EMPTY_MAP); + processDefinition.setGlobalParamList(Collections.EMPTY_LIST); + Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); + + masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processDao)); + // prepareProcess init dag + Field dag = MasterExecThread.class.getDeclaredField("dag"); + dag.setAccessible(true); + dag.set(masterExecThread, new DAG()); + PowerMockito.doNothing().when(masterExecThread, "executeProcess"); + PowerMockito.doNothing().when(masterExecThread, "postHandle"); + PowerMockito.doNothing().when(masterExecThread, "prepareProcess"); + PowerMockito.doNothing().when(masterExecThread, "runProcess"); + PowerMockito.doNothing().when(masterExecThread, "endProcess"); + } + + /** + * without schedule + * @throws ParseException + */ + @Test + public void testParallelWithOutSchedule() throws ParseException { + try{ + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList()); + Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess"); + method.setAccessible(true); + method.invoke(masterExecThread); + // one create save, and 1-30 for next save, and last day 31 no save + verify(processDao, times(31)).saveProcessInstance(processInstance); + }catch (Exception e){ + e.printStackTrace(); + Assert.assertTrue(false); + } + } + + /** + * with schedule + * @throws ParseException + */ + @Test + public void testParallelWithSchedule() throws ParseException { + try{ + Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList()); + Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess"); + method.setAccessible(true); + method.invoke(masterExecThread); + // one create save, and 15(1 to 31 step 2) for next save, and last day 31 no save + verify(processDao, times(16)).saveProcessInstance(processInstance); + }catch (Exception e){ + Assert.assertTrue(false); + } + } + + private List zeroSchedulerList(){ + return Collections.EMPTY_LIST; + } + + private List oneSchedulerList(){ + List schedulerList = new LinkedList<>(); + Schedule schedule = new Schedule(); + schedule.setCrontab("0 0 0 1/2 * ?"); + schedulerList.add(schedule); + return schedulerList; + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ScheduleUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ScheduleUtilsTest.java new file mode 100644 index 0000000000..4fbbdab70f --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ScheduleUtilsTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.utils; + +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.junit.Test; +import java.util.Date; +import static org.junit.Assert.assertEquals; + +/** + * Test ScheduleUtils + */ +public class ScheduleUtilsTest { + + /** + * Test the getRecentTriggerTime method + */ + @Test + public void testGetRecentTriggerTime() { + Date from = DateUtils.stringToDate("2020-01-01 00:00:00"); + Date to = DateUtils.stringToDate("2020-01-31 01:00:00"); + // test date + assertEquals(0, ScheduleUtils.getRecentTriggerTime("0 0 0 * * ? ", to, from).size()); + // test error cron + assertEquals(0, ScheduleUtils.getRecentTriggerTime("0 0 0 * *", from, to).size()); + // test cron + assertEquals(31, ScheduleUtils.getRecentTriggerTime("0 0 0 * * ? ", from, to).size()); + } +} \ No newline at end of file diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss index a63157d809..fbb4f418d0 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss @@ -159,6 +159,9 @@ border-radius: 0 3px 0 0; .ans-btn-text { color: #337ab7; + .ans-icon { + font-size: 16px; + } } .assist-btn { position: absolute; @@ -206,7 +209,7 @@ color: #333; } &.active { - background: #e1e2e3; + // background: #e1e2e3; i { color: #2d8cf0; } @@ -234,7 +237,9 @@ border-radius: 3px 3px 0px 0px; } } - +#screen { + margin-right: 5px; +} .v-modal-custom-log { z-index: 101; } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue index feb756dabe..40b6d85198 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue @@ -61,20 +61,28 @@
-
+ - + + {}) - // Round robin acquisition status - this.setIntervalP = setInterval(() => { - this._getTaskState(true).then(res => {}) - }, 90000) } } else { Dag.create() diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue index 43ed9a1b59..3f009eb75c 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue @@ -513,7 +513,8 @@ this.workerGroupId = o.workerGroupId } - this.params = o.params || {}; + this.params = o.params || {} + this.dependence = o.dependence || {} } this.isContentBox = true diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/dependItemList.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/dependItemList.vue index d6c865716a..abec923af5 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/dependItemList.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/dependItemList.vue @@ -22,7 +22,7 @@ - + @@ -64,7 +64,6 @@ data () { return { list: [], - definitionList: [], projectList: [], cycleList: cycleList, isInstance: false, @@ -88,16 +87,19 @@ _add () { // btn loading this.isLoading = true - // dependItemList index - let is = (value) => _.some(this.dependItemList, { definitionId: value }) - let noArr = _.filter(this.definitionList, v => !is(v.value)) - let value = noArr[0] && noArr[0].value || null - let val = value || this.definitionList[0].value + // add task list let projectId = this.projectList[0].value - this._getDependItemList(val).then(depTasksList => { - this.$nextTick(() => { - this.$emit('dependItemListEvent', _.concat(this.dependItemList, this._rtNewParams(val, depTasksList,projectId))) + this._getProcessByProjectId(projectId).then(definitionList => { + // dependItemList index + let is = (value) => _.some(this.dependItemList, { definitionId: value }) + let noArr = _.filter(definitionList, v => !is(v.value)) + let value = noArr[0] && noArr[0].value || null + let val = value || definitionList[0].value + this._getDependItemList(val).then(depTasksList => { + this.$nextTick(() => { + this.$emit('dependItemListEvent', _.concat(this.dependItemList, this._rtNewParams(val, definitionList, depTasksList, projectId))) + }) }) }) // remove tooltip @@ -131,25 +133,25 @@ */ _getProcessList () { return new Promise((resolve, reject) => { - this.definitionList = _.map(_.cloneDeep(this.store.state.dag.processListS), v => { + let definitionList = _.map(_.cloneDeep(this.store.state.dag.processListS), v => { return { value: v.id, label: v.name } }) - resolve() + resolve(definitionList) }) }, _getProcessByProjectId (id) { return new Promise((resolve, reject) => { this.store.dispatch('dag/getProcessByProjectId', { projectId: id }).then(res => { - this.definitionList = _.map(_.cloneDeep(res), v => { + let definitionList = _.map(_.cloneDeep(res), v => { return { value: v.id, label: v.name } }) - resolve(res) + resolve(definitionList) }) }) }, @@ -175,7 +177,7 @@ _onChangeProjectId ({ value }) { this._getProcessByProjectId(value).then(definitionList => { /*this.$set(this.dependItemList, this.itemIndex, this._dlOldParams(value, definitionList, item))*/ - let definitionId = definitionList[0].id + let definitionId = definitionList[0].value this._getDependItemList(definitionId).then(depTasksList => { let item = this.dependItemList[this.itemIndex] // init set depTasks All @@ -192,7 +194,7 @@ // init set depTasks All item.depTasks = 'ALL' // set dependItemList item data - this.$set(this.dependItemList, this.itemIndex, this._rtOldParams(value, depTasksList, item)) + this.$set(this.dependItemList, this.itemIndex, this._rtOldParams(value, item.definitionList, depTasksList, item)) }) }, _onChangeCycle ({ value }) { @@ -200,10 +202,12 @@ this.$set(this.dependItemList[this.itemIndex], 'dateValue', list[0].value) this.$set(this.dependItemList[this.itemIndex], 'dateValueList', list) }, - _rtNewParams (value, depTasksList,projectId) { + _rtNewParams (value, definitionList, depTasksList, projectId) { return { projectId: projectId, definitionId: value, + // dependItem need private definitionList + definitionList: definitionList, depTasks: 'ALL', depTasksList: depTasksList, cycle: 'day', @@ -212,10 +216,12 @@ state: '' } }, - _rtOldParams (value,depTasksList, item) { + _rtOldParams (value, definitionList, depTasksList, item) { return { projectId: item.projectId, definitionId: value, + // dependItem need private definitionList + definitionList: definitionList, depTasks: item.depTasks || 'ALL', depTasksList: depTasksList, cycle: item.cycle, @@ -254,12 +260,12 @@ this.isInstance = this.router.history.current.name === 'projects-instance-details' // get processlist this._getProjectList().then(() => { - let projectId = this.projectList[0].value if (!this.dependItemList.length) { + let projectId = this.projectList[0].value this._getProcessByProjectId(projectId).then(definitionList => { - let value = this.definitionList[0].value + let value = definitionList[0].value this._getDependItemList(value).then(depTasksList => { - this.$emit('dependItemListEvent', _.concat(this.dependItemList, this._rtNewParams(value, depTasksList,projectId))) + this.$emit('dependItemListEvent', _.concat(this.dependItemList, this._rtNewParams(value, definitionList, depTasksList, projectId))) }) }) } else { @@ -269,7 +275,7 @@ this._getDependItemList(ids, false).then(res => { _.map(this.dependItemList, (v, i) => { this._getProcessByProjectId(v.projectId).then(definitionList => { - this.$set(this.dependItemList, i, this._rtOldParams(v.definitionId, ['ALL'].concat(_.map(res[v.definitionId] || [], v => v.name)), v)) + this.$set(this.dependItemList, i, this._rtOldParams(v.definitionId, definitionList, ['ALL'].concat(_.map(res[v.definitionId] || [], v => v.name)), v)) }) }) }) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/sqlType.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/sqlType.vue index 49fb4a2525..ecb7d2c855 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/sqlType.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/sqlType.vue @@ -20,7 +20,7 @@ v-model="sqlTypeId" :disabled="isDetails" @on-change="_handleSqlTypeChanged" - style="width: 90px;"> + style="width: 120px;"> - {{item.note}} + {{item.note}} - diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/index.vue b/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/index.vue index 19640ccd38..e8435589ad 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/index.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/index.vue @@ -124,10 +124,14 @@ _getList (flag) { this.isLoading = !flag this.getDatasourcesListP(this.searchParams).then(res => { - this.datasourcesList = [] - this.datasourcesList = res.totalList - this.total = res.total - this.isLoading = false + if(this.searchParams.pageNo>1 && res.totalList.length == 0) { + this.searchParams.pageNo = this.searchParams.pageNo -1 + } else { + this.datasourcesList = [] + this.datasourcesList = res.totalList + this.total = res.total + this.isLoading = false + } }).catch(e => { this.isLoading = false }) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue index 7a9a165873..5550864176 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue @@ -69,7 +69,7 @@ - - {{item.description}} + {{item.description}} - diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue index 8672c41295..0a22e22535 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue @@ -141,7 +141,7 @@
- {{$t('Date')}} + {{$t('Schedule date')}}
{ - this.processListP = [] - this.processListP = res.totalList - this.total = res.total - this.isLoading = false + if(this.searchParams.pageNo>1 && res.totalList.length == 0) { + this.searchParams.pageNo = this.searchParams.pageNo -1 + } else { + this.processListP = [] + this.processListP = res.totalList + this.total = res.total + this.isLoading = false + } }).catch(e => { this.isLoading = false }) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/index.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/index.vue index 4bf169b854..7bcf9ac26b 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/index.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/index.vue @@ -105,10 +105,14 @@ _getProcessInstanceListP (flag) { this.isLoading = !flag this.getProcessInstance(this.searchParams).then(res => { - this.processInstanceList = [] - this.processInstanceList = res.totalList - this.total = res.total - this.isLoading = false + if(this.searchParams.pageNo>1 && res.totalList.length == 0) { + this.searchParams.pageNo = this.searchParams.pageNo -1 + } else { + this.processInstanceList = [] + this.processInstanceList = res.totalList + this.total = res.total + this.isLoading = false + } }).catch(e => { this.isLoading = false }) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/list/_source/list.vue index bf5bc12690..8acee8e453 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/list/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/list/_source/list.vue @@ -66,7 +66,7 @@ {{item.instRunningCount}} - {{item.description}} + {{item.description}} - diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/list/index.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/list/index.vue index 74121f5f5a..d9828c030e 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/list/index.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/list/index.vue @@ -110,10 +110,14 @@ _getList (flag) { this.isLoading = !flag this.getProjectsList(this.searchParams).then(res => { - this.projectsList = [] - this.projectsList = res.totalList - this.total = res.total - this.isLoading = false + if(this.searchParams.pageNo>1 && res.totalList.length == 0) { + this.searchParams.pageNo = this.searchParams.pageNo -1 + } else { + this.projectsList = [] + this.projectsList = res.totalList + this.total = res.total + this.isLoading = false + } }).catch(e => { this.isLoading = false }) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue index 513d0d1321..783eeadab5 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue @@ -46,13 +46,13 @@ {{parseInt(pageNo === 1 ? ($index + 1) : (($index + 1) + (pageSize * (pageNo - 1))))}} - + {{item.alias}} - {{item.fileName}} + {{item.fileName}} - {{item.description}} + {{item.description}} - diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/index.vue b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/index.vue index de7f773866..8f1f939307 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/index.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/index.vue @@ -91,9 +91,13 @@ _getList (flag) { this.isLoading = !flag this.getResourcesListP(this.searchParams).then(res => { - this.fileResourcesList = res.totalList - this.total = res.total - this.isLoading = false + if(this.searchParams.pageNo>1 && res.totalList.length == 0) { + this.searchParams.pageNo = this.searchParams.pageNo -1 + } else { + this.fileResourcesList = res.totalList + this.total = res.total + this.isLoading = false + } }).catch(e => { this.isLoading = false }) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/list.vue index febb215741..5f788f8223 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/list.vue @@ -55,7 +55,7 @@ v-ps