dailidong 5 years ago
parent
commit
aa191c294c
  1. 2
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
  2. 2
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/manager/EnterpriseWeChatManager.java
  3. 9
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java
  4. 3
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/template/AlertTemplate.java
  5. 2
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java
  6. 24
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtils.java
  7. 7
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java
  8. 3
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java
  9. 61
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  10. 16
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SessionService.java
  11. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FileUtils.java
  12. 20
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FourLetterWordMain.java
  13. 50
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseDAGServiceTest.java
  14. 140
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseServiceTest.java
  15. 229
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
  16. 50
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java
  17. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/ClickHouseDataSource.java
  18. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/HiveDataSource.java
  19. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/MySQLDataSource.java
  20. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/OracleDataSource.java
  21. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/PostgreDataSource.java
  22. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SQLServerDataSource.java
  23. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SparkDataSource.java
  24. 13
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DateInterval.java
  25. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
  26. 14
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
  27. 55
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java
  28. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java
  29. 2
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/threadutils/ThreadUtilsTest.java
  30. 64
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java
  31. 9
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java
  32. 8
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java
  33. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
  34. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java
  35. 157
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/permission/PermissionCheck.java
  36. 9
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  37. 13
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml
  38. 5
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml
  39. 13
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml
  40. 77
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapperTest.java
  41. 97
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java
  42. 50
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java
  43. 21
      dolphinscheduler-server/pom.xml
  44. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  45. 54
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  46. 79
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ScheduleUtils.java
  47. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  48. 32
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
  49. 20
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
  50. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
  51. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
  52. 88
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  53. 154
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
  54. 44
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ScheduleUtilsTest.java
  55. 9
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss
  56. 22
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue
  57. 3
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue
  58. 50
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/dependItemList.vue
  59. 2
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/sqlType.vue
  60. 2
      dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/list.vue
  61. 12
      dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/index.vue
  62. 2
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue
  63. 2
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue
  64. 12
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/index.vue
  65. 12
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/index.vue
  66. 2
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/list/_source/list.vue
  67. 12
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/list/index.vue
  68. 6
      dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue
  69. 10
      dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/index.vue
  70. 4
      dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/list.vue
  71. 12
      dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/index.vue
  72. 6
      dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/resource/_source/list.vue
  73. 12
      dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/resource/index.vue
  74. 12
      dolphinscheduler-ui/src/js/conf/home/pages/security/pages/queue/index.vue
  75. 2
      dolphinscheduler-ui/src/js/conf/home/pages/security/pages/tenement/_source/list.vue
  76. 12
      dolphinscheduler-ui/src/js/conf/home/pages/security/pages/tenement/index.vue
  77. 12
      dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/index.vue
  78. 2
      dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningGroups/_source/list.vue
  79. 12
      dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningGroups/index.vue
  80. 12
      dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/index.vue
  81. 12
      dolphinscheduler-ui/src/js/conf/home/pages/user/pages/token/index.vue
  82. 5
      dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js
  83. 2
      dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
  84. 2
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
  85. 5
      pom.xml

2
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java

@ -56,7 +56,7 @@ public class AlertServer {
logger.info("alert server ready start ");
while (Stopper.isRunning()){
try {
Thread.sleep(Constants.ALERT_SCAN_INTERVEL);
Thread.sleep(Constants.ALERT_SCAN_INTERVAL);
} catch (InterruptedException e) {
logger.error(e.getMessage(),e);
Thread.currentThread().interrupt();

2
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/manager/EnterpriseWeChatManager.java

@ -32,7 +32,7 @@ import java.util.Map;
* Enterprise WeChat Manager
*/
public class EnterpriseWeChatManager {
private static final Logger logger = LoggerFactory.getLogger(MsgManager.class);
private static final Logger logger = LoggerFactory.getLogger(EnterpriseWeChatManager.class);
/**
* Enterprise We Chat send
* @param alert the alert

9
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java

@ -65,7 +65,7 @@ public class AlertSender{
users = alertDao.listUserByAlertgroupId(alert.getAlertGroupId());
// receiving group list
List<String> receviersList = new ArrayList<String>();
List<String> receviersList = new ArrayList<>();
for(User user:users){
receviersList.add(user.getEmail());
}
@ -77,7 +77,7 @@ public class AlertSender{
}
// copy list
List<String> receviersCcList = new ArrayList<String>();
List<String> receviersCcList = new ArrayList<>();
// Custom Copier
@ -101,6 +101,11 @@ public class AlertSender{
}else if (alert.getAlertType() == AlertType.SMS){
retMaps = emailManager.send(getReciversForSMS(users), alert.getTitle(), alert.getContent(),alert.getShowType());
alert.setInfo(retMaps);
} else {
logger.error("AlertType is not defined. code: {}, descp: {}",
alert.getAlertType().getCode(),
alert.getAlertType().getDescp());
return;
}
//send flag

3
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/template/AlertTemplate.java

@ -34,6 +34,9 @@ public interface AlertTemplate {
/**
* default showAll is true
* @param content alert message content
* @param showType show type
* @return a message from a specified alert template
*/
default String getMessageFromTemplate(String content,ShowType showType){
return getMessageFromTemplate(content,showType,true);

2
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/Constants.java

@ -133,7 +133,7 @@ public class Constants {
public static final String TH_END = "</th>";
public static final int ALERT_SCAN_INTERVEL = 5000;
public static final int ALERT_SCAN_INTERVAL = 5000;
public static final String MARKDOWN_QUOTE = ">";

24
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtils.java

@ -201,22 +201,22 @@ public class EnterpriseWeChatUtils {
public static String markdownTable(String title,String content){
List<LinkedHashMap> mapItemsList = JSONUtils.toList(content, LinkedHashMap.class);
StringBuilder contents = new StringBuilder(200);
for (LinkedHashMap mapItems : mapItemsList){
Set<Map.Entry<String, String>> entries = mapItems.entrySet();
if (null != mapItemsList) {
for (LinkedHashMap mapItems : mapItemsList){
Set<Map.Entry<String, String>> entries = mapItems.entrySet();
Iterator<Map.Entry<String, String>> iterator = entries.iterator();
StringBuilder t = new StringBuilder(String.format("`%s`%s",title,Constants.MARKDOWN_ENTER));
Iterator<Map.Entry<String, String>> iterator = entries.iterator();
while (iterator.hasNext()){
StringBuilder t = new StringBuilder(String.format("`%s`%s",title,Constants.MARKDOWN_ENTER));
while (iterator.hasNext()){
Map.Entry<String, String> entry = iterator.next();
t.append(Constants.MARKDOWN_QUOTE);
t.append(entry.getKey()).append(":").append(entry.getValue());
t.append(Constants.MARKDOWN_ENTER);
Map.Entry<String, String> entry = iterator.next();
t.append(Constants.MARKDOWN_QUOTE);
t.append(entry.getKey()).append(":").append(entry.getValue());
t.append(Constants.MARKDOWN_ENTER);
}
contents.append(t);
}
contents.append(t);
}
return contents.toString();
}

7
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java

@ -96,7 +96,7 @@ public class MailUtils {
return retMap;
}
receivers.removeIf((from) -> (StringUtils.isEmpty(from)));
receivers.removeIf(StringUtils::isEmpty);
if (showType == ShowType.TABLE || showType == ShowType.TEXT){
// send email
@ -185,7 +185,7 @@ public class MailUtils {
/**
* get MimeMessage
* @param receivers
* @param receivers receivers
* @return the MimeMessage
* @throws MessagingException
*/
@ -229,8 +229,7 @@ public class MailUtils {
}
};
Session session = Session.getInstance(props, auth);
return session;
return Session.getInstance(props, auth);
}
/**

3
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java

@ -205,8 +205,7 @@ public class PropertyUtils {
return null;
}
try {
String[] propertyArray = value.split(splitStr);
return propertyArray;
return value.split(splitStr);
} catch (PatternSyntaxException e) {
logger.info(e.getMessage(),e);
}

61
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
@ -29,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.server.utils.ScheduleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -497,31 +499,58 @@ public class ExecutorService extends BaseService{
}
}
if ( start == null || end == null) {
return 0;
}
if(commandType == CommandType.COMPLEMENT_DATA){
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
if(runMode == RunMode.RUN_MODE_SERIAL){
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end));
command.setCommandParam(JSONUtils.toJson(cmdParam));
return processDao.createCommand(command);
}else if (runMode == RunMode.RUN_MODE_PARALLEL){
int runCunt = 0;
while(!start.after(end)){
runCunt += 1;
if(null != start && null != end && start.before(end)){
if(runMode == RunMode.RUN_MODE_SERIAL){
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end));
command.setCommandParam(JSONUtils.toJson(cmdParam));
processDao.createCommand(command);
start = DateUtils.getSomeDay(start, 1);
return processDao.createCommand(command);
}else if (runMode == RunMode.RUN_MODE_PARALLEL){
List<Schedule> schedules = processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefineId);
List<Date> listDate = new LinkedList<>();
if(!CollectionUtils.isEmpty(schedules)){
for (Schedule item : schedules) {
List<Date> list = ScheduleUtils.getRecentTriggerTime(item.getCrontab(), start, end);
listDate.addAll(list);
}
}
if(!CollectionUtils.isEmpty(listDate)){
// loop by schedule date
for (Date date : listDate) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(date));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(date));
command.setCommandParam(JSONUtils.toJson(cmdParam));
processDao.createCommand(command);
}
return listDate.size();
}else{
// loop by day
int runCunt = 0;
while(!start.after(end)) {
runCunt += 1;
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.dateToString(start));
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(start));
command.setCommandParam(JSONUtils.toJson(cmdParam));
processDao.createCommand(command);
start = DateUtils.getSomeDay(start, 1);
}
return runCunt;
}
}
return runCunt;
}else{
logger.error("there is not vaild schedule date for the process definition: id:{},date:{}",
processDefineId, schedule);
}
}else{
command.setCommandParam(JSONUtils.toJson(cmdParam));
return processDao.createCommand(command);
}
return 0;
}
}
/**

16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SessionService.java

@ -139,12 +139,16 @@ public class SessionService extends BaseService{
* @param loginUser login user
*/
public void signOut(String ip, User loginUser) {
/**
* query session by user id and ip
*/
Session session = sessionMapper.queryByUserIdAndIp(loginUser.getId(),ip);
try {
/**
* query session by user id and ip
*/
Session session = sessionMapper.queryByUserIdAndIp(loginUser.getId(),ip);
//delete session
sessionMapper.deleteById(session.getId());
//delete session
sessionMapper.deleteById(session.getId());
}catch (Exception e){
logger.warn("userId : {} , ip : {} , find more one session",loginUser.getId(),ip);
}
}
}

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FileUtils.java

@ -57,7 +57,7 @@ public class FileUtils {
Files.copy(file.getInputStream(), Paths.get(destFilename));
} catch (IOException e) {
logger.error(String.format("failed to copy file , {} is empty file", file.getOriginalFilename()), e);
logger.error("failed to copy file , {} is empty file", file.getOriginalFilename(), e);
}
}

20
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FourLetterWordMain.java

@ -59,23 +59,22 @@ public class FourLetterWordMain {
*/
public static String send4LetterWord(String host, int port, String cmd, int timeout)
throws IOException {
LOG.info("connecting to " + host + " " + port);
Socket sock = new Socket();
LOG.info("connecting to {} {}", host, port);
InetSocketAddress hostaddress= host != null ? new InetSocketAddress(host, port) :
new InetSocketAddress(InetAddress.getByName(null), port);
BufferedReader reader = null;
try {
try (Socket sock = new Socket();
OutputStream outstream = sock.getOutputStream();
BufferedReader reader =
new BufferedReader(
new InputStreamReader(sock.getInputStream()))) {
sock.setSoTimeout(timeout);
sock.connect(hostaddress, timeout);
OutputStream outstream = sock.getOutputStream();
outstream.write(cmd.getBytes());
outstream.flush();
// this replicates NC - close the output stream before reading
sock.shutdownOutput();
reader =
new BufferedReader(
new InputStreamReader(sock.getInputStream()));
StringBuilder sb = new StringBuilder();
String line;
while((line = reader.readLine()) != null) {
@ -84,11 +83,6 @@ public class FourLetterWordMain {
return sb.toString();
} catch (SocketTimeoutException e) {
throw new IOException("Exception while executing four letter word: " + cmd, e);
} finally {
sock.close();
if (reader != null) {
reader.close();
}
}
}
}

50
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseDAGServiceTest.java

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class BaseDAGServiceTest {
@Test
public void testProcessInstance2DAG(){
ProcessInstance processInstance = new ProcessInstance();
processInstance.setProcessInstanceJson("{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-61567\"," +
"\"name\":\"开始\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo '1'\"}," +
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
"\"workerGroupId\":-1,\"preTasks\":[]},{\"type\":\"SHELL\",\"id\":\"tasks-6-3ug5ej\",\"name\":\"结束\"," +
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo '1'\"},\"description\":\"\"," +
"\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
"\"workerGroupId\":-1,\"preTasks\":[\"开始\"]}],\"tenantId\":-1,\"timeout\":0}");
DAG<String, TaskNode, TaskNodeRelation> relationDAG = BaseDAGService.processInstance2DAG(processInstance);
Assert.assertTrue(relationDAG.containsNode("开始"));
}
}

140
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseServiceTest.java

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.mock.web.MockCookie;
import org.springframework.mock.web.MockHttpServletRequest;
import javax.servlet.http.Cookie;
import java.util.HashMap;
import java.util.Map;
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"sun.security.*", "javax.net.*"})
@PrepareForTest({HadoopUtils.class})
public class BaseServiceTest {
private static final Logger logger = LoggerFactory.getLogger(BaseServiceTest.class);
private BaseService baseService;
@Mock
private HadoopUtils hadoopUtils;
@Before
public void setUp() {
baseService = new BaseService();
}
@Test
public void testIsAdmin(){
User user = new User();
user.setUserType(UserType.ADMIN_USER);
//ADMIN_USER
boolean isAdmin = baseService.isAdmin(user);
Assert.assertTrue(isAdmin);
//GENERAL_USER
user.setUserType(UserType.GENERAL_USER);
isAdmin = baseService.isAdmin(user);
Assert.assertFalse(isAdmin);
}
@Test
public void testPutMsg(){
Map<String, Object> result = new HashMap<>();
baseService.putMsg(result, Status.SUCCESS);
Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS));
//has params
baseService.putMsg(result, Status.PROJECT_NOT_FOUNT,"test");
}
@Test
public void testPutMsgTwo(){
Result result = new Result();
baseService.putMsg(result, Status.SUCCESS);
Assert.assertEquals(Status.SUCCESS.getMsg(),result.getMsg());
//has params
baseService.putMsg(result,Status.PROJECT_NOT_FOUNT,"test");
}
@Test
public void testGetCookie(){
MockHttpServletRequest request = new MockHttpServletRequest();
MockCookie mockCookie = new MockCookie("userId","1");
request.setCookies(mockCookie);
//cookie is not null
Cookie cookie = BaseService.getCookie(request,"userId");
Assert.assertNotNull(cookie);
//cookie is null
cookie = BaseService.getCookie(request,"userName");
Assert.assertNull(cookie);
}
@Test
public void testCreateTenantDirIfNotExists(){
PowerMockito.mockStatic(HadoopUtils.class);
PowerMockito.when(HadoopUtils.getInstance()).thenReturn(hadoopUtils);
try {
baseService.createTenantDirIfNotExists("test");
} catch (Exception e) {
Assert.assertTrue(false);
logger.error("CreateTenantDirIfNotExists error ",e);
e.printStackTrace();
}
}
@Test
public void testHasPerm(){
User user = new User();
user.setId(1);
//create user
boolean hasPerm = baseService.hasPerm(user,1);
Assert.assertTrue(hasPerm);
//admin
user.setId(2);
user.setUserType(UserType.ADMIN_USER);
hasPerm = baseService.hasPerm(user,1);
Assert.assertTrue(hasPerm);
}
}

229
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java

@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.text.ParseException;
import java.util.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
/**
* test for ExecutorService
*/
@RunWith(MockitoJUnitRunner.Silent.class)
public class ExecutorService2Test {
@InjectMocks
private ExecutorService executorService;
@Mock
private ProcessDao processDao;
@Mock
private ProcessDefinitionMapper processDefinitionMapper;
@Mock
private ProjectMapper projectMapper;
@Mock
private ProjectService projectService;
private int processDefinitionId = 1;
private int tenantId = 1;
private int userId = 1;
private ProcessDefinition processDefinition = new ProcessDefinition();
private User loginUser = new User();
private String projectName = "projectName";
private Project project = new Project();
private String cronTime;
@Before
public void init(){
// user
loginUser.setId(userId);
// processDefinition
processDefinition.setId(processDefinitionId);
processDefinition.setReleaseState(ReleaseState.ONLINE);
processDefinition.setTenantId(tenantId);
processDefinition.setUserId(userId);
// project
project.setName(projectName);
// cronRangeTime
cronTime = "2020-01-01 00:00:00,2020-01-31 23:00:00";
// mock
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(checkProjectAndAuth());
Mockito.when(processDefinitionMapper.selectById(processDefinitionId)).thenReturn(processDefinition);
Mockito.when(processDao.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
Mockito.when(processDao.createCommand(any(Command.class))).thenReturn(1);
}
/**
* not complement
* @throws ParseException
*/
@Test
public void testNoComplement() throws ParseException {
try {
Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.START_PROCESS,
null, null,
null, null, 0,
"", "", RunMode.RUN_MODE_SERIAL,
Priority.LOW, 0, 110);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processDao, times(1)).createCommand(any(Command.class));
}catch (Exception e){
Assert.assertTrue(false);
}
}
/**
* date error
* @throws ParseException
*/
@Test
public void testDateError() throws ParseException {
try {
Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, "2020-01-31 23:00:00,2020-01-01 00:00:00", CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
"", "", RunMode.RUN_MODE_SERIAL,
Priority.LOW, 0, 110);
Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
verify(processDao, times(0)).createCommand(any(Command.class));
}catch (Exception e){
Assert.assertTrue(false);
}
}
/**
* serial
* @throws ParseException
*/
@Test
public void testSerial() throws ParseException {
try {
Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
"", "", RunMode.RUN_MODE_SERIAL,
Priority.LOW, 0, 110);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processDao, times(1)).createCommand(any(Command.class));
}catch (Exception e){
Assert.assertTrue(false);
}
}
/**
* without schedule
* @throws ParseException
*/
@Test
public void testParallelWithOutSchedule() throws ParseException {
try{
Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
"", "", RunMode.RUN_MODE_PARALLEL,
Priority.LOW, 0, 110);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processDao, times(31)).createCommand(any(Command.class));
}catch (Exception e){
Assert.assertTrue(false);
}
}
/**
* with schedule
* @throws ParseException
*/
@Test
public void testParallelWithSchedule() throws ParseException {
try{
Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
"", "", RunMode.RUN_MODE_PARALLEL,
Priority.LOW, 0, 110);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processDao, times(16)).createCommand(any(Command.class));
}catch (Exception e){
Assert.assertTrue(false);
}
}
private List<Schedule> zeroSchedulerList(){
return Collections.EMPTY_LIST;
}
private List<Schedule> oneSchedulerList(){
List<Schedule> schedulerList = new LinkedList<>();
Schedule schedule = new Schedule();
schedule.setCrontab("0 0 0 1/2 * ?");
schedulerList.add(schedule);
return schedulerList;
}
private Map<String, Object> checkProjectAndAuth(){
Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, Status.SUCCESS);
return result;
}
}

50
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
/**
* Authorization type
*/
public enum AuthorizationType {
/**
* 0 RESOURCE_FILE;
* 1 DATASOURCE;
* 2 UDF;
*/
RESOURCE_FILE(0, "resource file"),
DATASOURCE(1, "data source"),
UDF(2, "udf function");
AuthorizationType(int code, String descp){
this.code = code;
this.descp = descp;
}
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/ClickHouseDataSource.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.job.db;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -58,7 +59,7 @@ public class ClickHouseDataSource extends BaseDataSource {
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
Class.forName(Constants.COM_CLICKHOUSE_JDBC_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/HiveDataSource.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.job.db;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -61,7 +62,7 @@ public class HiveDataSource extends BaseDataSource {
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
Class.forName(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), "");
} finally {
if (con != null) {

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/MySQLDataSource.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.job.db;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -56,7 +57,7 @@ public class MySQLDataSource extends BaseDataSource {
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("com.mysql.jdbc.Driver");
Class.forName(Constants.COM_MYSQL_JDBC_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/OracleDataSource.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.job.db;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -58,7 +59,7 @@ public class OracleDataSource extends BaseDataSource {
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("oracle.jdbc.driver.OracleDriver");
Class.forName(Constants.COM_ORACLE_JDBC_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/PostgreDataSource.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.job.db;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -60,7 +61,7 @@ public class PostgreDataSource extends BaseDataSource {
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("org.postgresql.Driver");
Class.forName(Constants.ORG_POSTGRESQL_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SQLServerDataSource.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.job.db;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -54,7 +55,7 @@ public class SQLServerDataSource extends BaseDataSource {
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
Class.forName(Constants.COM_SQLSERVER_JDBC_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SparkDataSource.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.common.job.db;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -63,7 +64,7 @@ public class SparkDataSource extends BaseDataSource {
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
Class.forName(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), "");
} finally {
if (con != null) {

13
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DateInterval.java

@ -35,13 +35,12 @@ public class DateInterval {
@Override
public boolean equals(Object obj) {
try{
DateInterval dateInterval = (DateInterval) obj;
return startTime.equals(dateInterval.getStartTime()) &&
endTime.equals(dateInterval.getEndTime());
}catch (Exception e){
return false;
}
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DateInterval that = (DateInterval) o;
return startTime.equals(that.startTime) &&
endTime.equals(that.endTime);
}
public Date getStartTime() {

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java

@ -25,7 +25,7 @@ public class Stopper {
private static volatile AtomicBoolean signal = new AtomicBoolean(false);
public static final boolean isStoped(){
public static final boolean isStopped(){
return signal.get();
}

14
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java

@ -119,7 +119,9 @@ public class HadoopUtils implements Closeable {
fsRelatedProps.forEach((key, value) -> configuration.set(key, value));
}else{
logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS );
throw new RuntimeException("property:{} can not to be empty, please set!");
throw new RuntimeException(
String.format("property: %s can not to be empty, please set!", Constants.FS_DEFAULTFS)
);
}
}else{
logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS);
@ -219,10 +221,12 @@ public class HadoopUtils implements Closeable {
return null;
}
FSDataInputStream in = fs.open(new Path(hdfsFilePath));
BufferedReader br = new BufferedReader(new InputStreamReader(in));
Stream<String> stream = br.lines().skip(skipLineNums).limit(limit);
return stream.collect(Collectors.toList());
try (FSDataInputStream in = fs.open(new Path(hdfsFilePath))){
BufferedReader br = new BufferedReader(new InputStreamReader(in));
Stream<String> stream = br.lines().skip(skipLineNums).limit(limit);
return stream.collect(Collectors.toList());
}
}
/**

55
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java

@ -115,21 +115,19 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
}
}
/**
* create zookeeper path according the zk node type.
* @param zkNodeType zookeeper node type
* @return register zookeeper path
* @throws Exception
*/
private String createZNodePath(ZKNodeType zkNodeType) throws Exception {
private String createZNodePath(ZKNodeType zkNodeType, String host) throws Exception {
// specify the format of stored data in ZK nodes
String heartbeatZKInfo = ResInfo.getHeartBeatInfo(new Date());
// create temporary sequence nodes for master znode
String parentPath = getZNodeParentPath(zkNodeType);
String serverPathPrefix = parentPath + "/" + OSUtils.getHost();
String registerPath = serverPathPrefix + UNDERLINE;
super.persistEphemeral(registerPath, heartbeatZKInfo);
String registerPath= getZNodeParentPath(zkNodeType) + SINGLE_SLASH + host;
super.persistEphemeral(registerPath, heartbeatZKInfo);
logger.info("register {} node {} success" , zkNodeType.toString(), registerPath);
return registerPath;
}
@ -148,7 +146,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
zkNodeType.toString(), host);
return registerPath;
}
registerPath = createZNodePath(zkNodeType);
registerPath = createZNodePath(zkNodeType, host);
// handle dead server
handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP);
@ -166,25 +164,19 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
* @throws Exception errors
*/
public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception {
//ip_sequenceno
String[] zNodesPath = zNode.split("\\/");
String ipSeqNo = zNodesPath[zNodesPath.length - 1];
String host = getHostByEventDataPath(zNode);
String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
//check server restart, if restart , dead server path in zk should be delete
if(opType.equals(DELETE_ZK_OP)){
String[] ipAndSeqNo = ipSeqNo.split(UNDERLINE);
String ip = ipAndSeqNo[0];
removeDeadServerByHost(ip, type);
removeDeadServerByHost(host, type);
}else if(opType.equals(ADD_ZK_OP)){
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
if(!super.isExisted(deadServerPath)){
//add dead server info to zk dead server path : /dead-servers/
super.persist(deadServerPath,(type + UNDERLINE + ipSeqNo));
super.persist(deadServerPath,(type + UNDERLINE + host));
logger.info("{} server dead , and {} added to zk dead server path success" ,
zkNodeType.toString(), zNode);
@ -285,21 +277,13 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
}
Map<String, String> serverMaps = getServerMaps(zkNodeType);
for(String hostKey : serverMaps.keySet()){
if(hostKey.startsWith(host + UNDERLINE)){
if(hostKey.startsWith(host)){
return true;
}
}
return false;
}
/**
* get zkclient
* @return zookeeper client
*/
public CuratorFramework getZkClient() {
return zkClient;
}
/**
*
* @return get worker node parent path
@ -436,19 +420,22 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator{
}
/**
* get host ip, string format: masterParentPath/ip_000001/value
* get host ip, string format: masterParentPath/ip
* @param path path
* @return host ip, string format: masterParentPath/ip_000001/value
* @return host ip, string format: masterParentPath/ip
*/
protected String getHostByEventDataPath(String path) {
int startIndex = path.lastIndexOf("/")+1;
int endIndex = path.lastIndexOf("_");
if(startIndex >= endIndex){
logger.error("parse ip error");
if(StringUtils.isEmpty(path)){
logger.error("empty path!");
return "";
}
String[] pathArray = path.split(SINGLE_SLASH);
if(pathArray.length < 1){
logger.error("parse ip error: {}", path);
return "";
}
return path.substring(startIndex, endIndex);
return pathArray[pathArray.length - 1];
}
/**
* acquire zk lock

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java

@ -77,6 +77,7 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
return treeCache;
}
@Override
public void close() {
treeCache.close();
try {

2
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/threadutils/ThreadUtilsTest.java

@ -98,7 +98,7 @@ public class ThreadUtilsTest {
public void testStopper() {
assertTrue(Stopper.isRunning());
Stopper.stop();
assertTrue(Stopper.isStoped());
assertTrue(Stopper.isStopped());
}
/**

64
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java

@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.dao;
import com.alibaba.fastjson.JSONObject;
import com.cronutils.model.Cron;
import org.apache.commons.lang.ArrayUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.model.DateInterval;
@ -25,7 +26,6 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.utils.ArrayUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.IpUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -44,6 +44,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.*;
/**
@ -462,12 +463,9 @@ public class ProcessDao {
return null;
}
if(null == tenant){
if(tenant == null){
User user = userMapper.selectById(userId);
if (null != user) {
tenant = tenantMapper.queryById(user.getTenantId());
}
tenant = tenantMapper.queryById(user.getTenantId());
}
return tenant;
}
@ -974,6 +972,9 @@ public class ProcessDao {
public Boolean submitTaskToQueue(TaskInstance taskInstance) {
try{
if(taskInstance.isSubProcess()){
return true;
}
if(taskInstance.getState().typeIsFinished()){
logger.info(String.format("submit to task queue, but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString()));
return true;
@ -1460,6 +1461,15 @@ public class ProcessDao {
return scheduleMapper.selectById(id);
}
/**
* query Schedule by processDefinitionId
* @param processDefinitionId processDefinitionId
* @see Schedule
*/
public List<Schedule> queryReleaseSchedulerListByProcessDefinitionId(int processDefinitionId) {
return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId);
}
/**
* query need failover process instance
* @param host host
@ -1770,5 +1780,47 @@ public class ProcessDao {
return projectIdList;
}
/**
* list unauthorized udf function
* @param userId user id
* @param needChecks data source id array
* @return unauthorized udf function list
*/
public <T> List<T> listUnauthorized(int userId,T[] needChecks,AuthorizationType authorizationType){
List<T> resultList = new ArrayList<T>();
if (!ArrayUtils.isEmpty(needChecks)) {
Set<T> originResSet = new HashSet<T>(Arrays.asList(needChecks));
switch (authorizationType){
case RESOURCE_FILE:
Set<String> authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t -> t.getAlias()).collect(toSet());
originResSet.removeAll(authorizedResources);
break;
case DATASOURCE:
Set<Integer> authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId,needChecks).stream().map(t -> t.getId()).collect(toSet());
originResSet.removeAll(authorizedDatasources);
break;
case UDF:
Set<Integer> authorizedUdfs = udfFuncMapper.listAuthorizedUdfFunc(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
originResSet.removeAll(authorizedUdfs);
break;
}
resultList.addAll(originResSet);
}
return resultList;
}
/**
* get user by user id
* @param userId user id
* @return User
*/
public User getUserById(int userId){
return userMapper.queryDetailsById(userId);
}
}

9
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java

@ -77,4 +77,13 @@ public interface DataSourceMapper extends BaseMapper<DataSource> {
List<DataSource> listAllDataSourceByType(@Param("type") Integer type);
/**
* list authorized UDF function
* @param userId userId
* @param dataSourceIds data source id array
* @return UDF function list
*/
<T> List<DataSource> listAuthorizedDataSource(@Param("userId") int userId,@Param("dataSourceIds")T[] dataSourceIds);
}

8
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java

@ -83,4 +83,12 @@ public interface ResourceMapper extends BaseMapper<Resource> {
* @return tenant code
*/
String queryTenantCodeByResourceName(@Param("resName") String resName);
/**
* list authorized resource
* @param userId userId
* @param resNames resource names
* @return resource list
*/
<T> List<Resource> listAuthorizedResource(@Param("userId") int userId,@Param("resNames")T[] resNames);
}

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java

@ -60,4 +60,11 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
*/
List<Schedule> queryByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId);
/**
* query schedule list by process definition id
* @param processDefinitionId
* @return
*/
List<Schedule> queryReleaseSchedulerListByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId);
}

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java

@ -78,5 +78,12 @@ public interface UdfFuncMapper extends BaseMapper<UdfFunc> {
*/
List<UdfFunc> queryAuthedUdfFunc(@Param("userId") int userId);
/**
* list authorized UDF function
* @param userId userId
* @param udfIds UDF function id array
* @return UDF function list
*/
<T> List<UdfFunc> listAuthorizedUdfFunc (@Param("userId") int userId,@Param("udfIds")T[] udfIds);
}

157
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/permission/PermissionCheck.java

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.permission;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.User;
import org.slf4j.Logger;
import java.util.List;
public class PermissionCheck<T> {
/**
* logger
*/
private Logger logger;
/**
* Authorization Type
*/
private AuthorizationType authorizationType;
/**
* Authorization Type
*/
private ProcessDao processDao;
/**
* need check array
*/
private T[] needChecks;
/**
* user id
*/
private int userId;
/**
* permission check
* @param authorizationType authorization type
* @param processDao process dao
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessDao processDao) {
this.authorizationType = authorizationType;
this.processDao = processDao;
}
/**
* permission check
* @param authorizationType
* @param processDao
* @param needChecks
* @param userId
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessDao processDao, T[] needChecks, int userId) {
this.authorizationType = authorizationType;
this.processDao = processDao;
this.needChecks = needChecks;
this.userId = userId;
}
/**
* permission check
* @param authorizationType
* @param processDao
* @param needChecks
* @param userId
* @param logger
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessDao processDao, T[] needChecks, int userId,Logger logger) {
this.authorizationType = authorizationType;
this.processDao = processDao;
this.needChecks = needChecks;
this.userId = userId;
this.logger = logger;
}
public AuthorizationType getAuthorizationType() {
return authorizationType;
}
public void setAuthorizationType(AuthorizationType authorizationType) {
this.authorizationType = authorizationType;
}
public ProcessDao getProcessDao() {
return processDao;
}
public void setProcessDao(ProcessDao processDao) {
this.processDao = processDao;
}
public T[] getNeedChecks() {
return needChecks;
}
public void setNeedChecks(T[] needChecks) {
this.needChecks = needChecks;
}
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
/**
* has permission
* @return true if has permission
*/
public boolean hasPermission(){
try {
checkPermission();
return true;
} catch (Exception e) {
return false;
}
}
/**
* check permission
* @throws Exception exception
*/
public void checkPermission() throws Exception{
if(this.needChecks.length > 0){
// get user type in order to judge whether the user is admin
User user = processDao.getUserById(userId);
if (user.getUserType() != UserType.ADMIN_USER){
List<T> unauthorizedList = processDao.listUnauthorized(userId,needChecks,authorizationType);
// if exist unauthorized resource
if(CollectionUtils.isNotEmpty(unauthorizedList)){
logger.error("user {} didn't has permission of {}: {}", user.getUserName(), authorizationType.getDescp(),unauthorizedList.toString());
throw new RuntimeException(String.format("user %s didn't has permission of %s %s", user.getUserName(), authorizationType.getDescp(), unauthorizedList.get(0)));
}
}
}
}
}

9
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -96,7 +96,13 @@ public class DagHelper {
for (String startNodeName : startNodeList) {
TaskNode startNode = findNodeByName(taskNodeList, startNodeName);
List<TaskNode> childNodeList = new ArrayList<>();
if (TaskDependType.TASK_POST == taskDependType) {
if (startNode == null) {
logger.error("start node name [{}] is not in task node list [{}] ",
startNodeName,
taskNodeList
);
continue;
} else if (TaskDependType.TASK_POST == taskDependType) {
childNodeList = getFlowNodeListPost(startNode, taskNodeList);
} else if (TaskDependType.TASK_PRE == taskDependType) {
childNodeList = getFlowNodeListPre(startNode, recoveryNodeNameList, taskNodeList);
@ -129,7 +135,6 @@ public class DagHelper {
if (null != depList && null != startNode && depList.contains(startNode.getName())) {
resultList.addAll(getFlowNodeListPost(taskNode, taskNodeList));
}
}
resultList.add(startNode);
return resultList;

13
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml

@ -74,6 +74,19 @@
from t_ds_datasource
where type = #{type}
</select>
<select id="listAuthorizedDataSource" resultType="org.apache.dolphinscheduler.dao.entity.DataSource">
select *
from t_ds_datasource
where
id in (select datasource_id from t_ds_relation_datasource_user where user_id=#{userId}
union select id as datasource_id from t_ds_datasource where user_id=#{userId})
<if test="dataSourceIds != null and dataSourceIds != ''">
and id in
<foreach collection="dataSourceIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
</select>
</mapper>

5
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.xml

@ -55,4 +55,9 @@
from t_ds_schedules
where process_definition_id =#{processDefinitionId}
</select>
<select id="queryReleaseSchedulerListByProcessDefinitionId" resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
select *
from t_ds_schedules
where process_definition_id =#{processDefinitionId} and release_state = 1
</select>
</mapper>

13
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml

@ -74,4 +74,17 @@
WHERE u.id = rel.udf_id
AND rel.user_id = #{userId}
</select>
<select id="listAuthorizedUdfFunc" resultType="org.apache.dolphinscheduler.dao.entity.UdfFunc">
select *
from t_ds_udfs
where
id in (select udf_id from t_ds_relation_udfs_user where user_id=#{userId}
union select id as udf_id from t_ds_udfs where user_id=#{userId})
<if test="udfIds != null and udfIds != ''">
and id in
<foreach collection="udfIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
</select>
</mapper>

77
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapperTest.java

@ -17,12 +17,14 @@
package org.apache.dolphinscheduler.dao.mapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.DatasourceUser;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.dao.entity.User;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -34,7 +36,9 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import static org.hamcrest.Matchers.*;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.*;
/**
@ -58,6 +62,9 @@ public class DataSourceMapperTest {
@Autowired
DataSourceUserMapper dataSourceUserMapper;
@Autowired
private UserMapper userMapper;
/**
* test insert
*/
@ -244,6 +251,33 @@ public class DataSourceMapperTest {
}
}
@Test
public void testListAuthorizedDataSource(){
//create general user
User generalUser1 = createGeneralUser("user1");
User generalUser2 = createGeneralUser("user2");
//create data source
DataSource dataSource = createDataSource(generalUser1.getId(), "ds-1");
DataSource unauthorizdDataSource = createDataSource(generalUser2.getId(), "ds-2");
//data source ids
Integer[] dataSourceIds = new Integer[]{dataSource.getId(),unauthorizdDataSource.getId()};
List<DataSource> authorizedDataSource = dataSourceMapper.listAuthorizedDataSource(generalUser1.getId(), dataSourceIds);
Assert.assertEquals(generalUser1.getId(),dataSource.getUserId());
Assert.assertNotEquals(generalUser1.getId(),unauthorizdDataSource.getUserId());
Assert.assertFalse(authorizedDataSource.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(dataSourceIds)));
//authorize object unauthorizdDataSource to generalUser1
createUserDataSource(generalUser1, unauthorizdDataSource);
authorizedDataSource = dataSourceMapper.listAuthorizedDataSource(generalUser1.getId(), dataSourceIds);
Assert.assertTrue(authorizedDataSource.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(dataSourceIds)));
}
/**
* create datasource relation
* @param userId
@ -289,7 +323,6 @@ public class DataSourceMapperTest {
return dataSourceMap;
}
/**
* create datasource
* @return datasource
@ -330,5 +363,41 @@ public class DataSourceMapperTest {
return dataSource;
}
/**
* create general user
* @return User
*/
private User createGeneralUser(String userName){
User user = new User();
user.setUserName(userName);
user.setUserPassword("1");
user.setEmail("xx@123.com");
user.setUserType(UserType.GENERAL_USER);
user.setCreateTime(new Date());
user.setTenantId(1);
user.setUpdateTime(new Date());
userMapper.insert(user);
return user;
}
/**
* create the relation of user and data source
* @param user user
* @param dataSource data source
* @return DatasourceUser
*/
private DatasourceUser createUserDataSource(User user,DataSource dataSource){
DatasourceUser datasourceUser = new DatasourceUser();
datasourceUser.setDatasourceId(dataSource.getId());
datasourceUser.setUserId(user.getId());
datasourceUser.setPerm(7);
datasourceUser.setCreateTime(DateUtils.getCurrentDate());
datasourceUser.setUpdateTime(DateUtils.getCurrentDate());
dataSourceUserMapper.insert(datasourceUser);
return datasourceUser;
}
}

97
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java

@ -17,22 +17,36 @@
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.dao.entity.*;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.ResourcesUser;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
@Rollback(true)
public class ResourceMapperTest {
@Autowired
@ -61,6 +75,59 @@ public class ResourceMapperTest {
return resource;
}
/**
* create resource by user
* @param user user
* @return Resource
*/
private Resource createResource(User user){
//insertOne
Resource resource = new Resource();
resource.setAlias(String.format("ut resource %s",user.getUserName()));
resource.setType(ResourceType.FILE);
resource.setUserId(user.getId());
resourceMapper.insert(resource);
return resource;
}
/**
* create user
* @return User
*/
private User createGeneralUser(String userName){
User user = new User();
user.setUserName(userName);
user.setUserPassword("1");
user.setEmail("xx@123.com");
user.setUserType(UserType.GENERAL_USER);
user.setCreateTime(new Date());
user.setTenantId(1);
user.setUpdateTime(new Date());
userMapper.insert(user);
return user;
}
/**
* create resource user
* @return ResourcesUser
*/
private ResourcesUser createResourcesUser(Resource resource,User user){
//insertOne
ResourcesUser resourcesUser = new ResourcesUser();
resourcesUser.setCreateTime(new Date());
resourcesUser.setUpdateTime(new Date());
resourcesUser.setUserId(user.getId());
resourcesUser.setResourcesId(resource.getId());
resourceUserMapper.insert(resourcesUser);
return resourcesUser;
}
@Test
public void testInsert(){
Resource resource = insertOne();
assertNotNull(resource.getId());
assertThat(resource.getId(),greaterThan(0));
}
/**
* test update
*/
@ -230,4 +297,30 @@ public class ResourceMapperTest {
resourceMapper.deleteById(resource.getId());
}
@Test
public void testListAuthorizedResource(){
// create a general user
User generalUser1 = createGeneralUser("user1");
User generalUser2 = createGeneralUser("user2");
// create one resource
Resource resource = createResource(generalUser2);
Resource unauthorizedResource = createResource(generalUser2);
// need download resources
String[] resNames = new String[]{resource.getAlias(), unauthorizedResource.getAlias()};
List<Resource> resources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames);
Assert.assertEquals(generalUser2.getId(),resource.getUserId());
Assert.assertFalse(resources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames)));
// authorize object unauthorizedResource to generalUser
createResourcesUser(unauthorizedResource,generalUser2);
List<Resource> authorizedResources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames);
Assert.assertTrue(authorizedResources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames)));
}
}

50
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java

@ -29,13 +29,20 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import static java.util.stream.Collectors.toList;
@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
@Rollback(true)
public class UdfFuncMapperTest {
@Autowired
@ -133,6 +140,23 @@ public class UdfFuncMapperTest {
return udfUser;
}
/**
* create general user
* @return User
*/
private User createGeneralUser(String userName){
User user = new User();
user.setUserName(userName);
user.setUserPassword("1");
user.setEmail("xx@123.com");
user.setUserType(UserType.GENERAL_USER);
user.setCreateTime(new Date());
user.setTenantId(1);
user.setUpdateTime(new Date());
userMapper.insert(user);
return user;
}
/**
* test update
*/
@ -268,4 +292,30 @@ public class UdfFuncMapperTest {
udfUserMapper.deleteById(udfUser.getId());
Assert.assertNotEquals(udfFuncList.size(), 0);
}
@Test
public void testListAuthorizedUdfFunc(){
//create general user
User generalUser1 = createGeneralUser("user1");
User generalUser2 = createGeneralUser("user2");
//create udf function
UdfFunc udfFunc = insertOne(generalUser1);
UdfFunc unauthorizdUdfFunc = insertOne(generalUser2);
//udf function ids
Integer[] udfFuncIds = new Integer[]{udfFunc.getId(),unauthorizdUdfFunc.getId()};
List<UdfFunc> authorizedUdfFunc = udfFuncMapper.listAuthorizedUdfFunc(generalUser1.getId(), udfFuncIds);
Assert.assertEquals(generalUser1.getId(),udfFunc.getUserId());
Assert.assertNotEquals(generalUser1.getId(),unauthorizdUdfFunc.getUserId());
Assert.assertFalse(authorizedUdfFunc.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(udfFuncIds)));
//authorize object unauthorizdUdfFunc to generalUser1
insertOneUDFUser(generalUser1,unauthorizdUdfFunc);
authorizedUdfFunc = udfFuncMapper.listAuthorizedUdfFunc(generalUser1.getId(), udfFuncIds);
Assert.assertTrue(authorizedUdfFunc.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(udfFuncIds)));
}
}

21
dolphinscheduler-server/pom.xml

@ -111,6 +111,27 @@
<artifactId>dolphinscheduler-alert</artifactId>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -173,7 +173,7 @@ public class MasterServer implements IStoppable {
try {
//execute only once
if(Stopper.isStoped()){
if(Stopper.isStopped()){
return;
}

54
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
@ -29,10 +30,12 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.server.utils.ScheduleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -203,10 +206,30 @@ public class MasterExecThread implements Runnable {
Date startDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
Date endDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
processDao.saveProcessInstance(processInstance);
Date scheduleDate = processInstance.getScheduleTime();
if(scheduleDate == null){
scheduleDate = startDate;
// get schedules
int processDefinitionId = processInstance.getProcessDefinitionId();
List<Schedule> schedules = processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId);
List<Date> listDate = Lists.newLinkedList();
if(!CollectionUtils.isEmpty(schedules)){
for (Schedule schedule : schedules) {
List<Date> list = ScheduleUtils.getRecentTriggerTime(schedule.getCrontab(), startDate, endDate);
listDate.addAll(list);
}
}
// get first fire date
Iterator<Date> iterator = null;
Date scheduleDate = null;
if(!CollectionUtils.isEmpty(listDate)) {
iterator = listDate.iterator();
scheduleDate = iterator.next();
processInstance.setScheduleTime(scheduleDate);
processDao.updateProcessInstance(processInstance);
}else{
scheduleDate = processInstance.getScheduleTime();
if(scheduleDate == null){
scheduleDate = startDate;
}
}
while(Stopper.isRunning()){
@ -231,12 +254,23 @@ public class MasterExecThread implements Runnable {
break;
}
// current process instance sucess ,next execute
scheduleDate = DateUtils.getSomeDay(scheduleDate, 1);
if(scheduleDate.after(endDate)){
// all success
logger.info("process {} complement completely!", processInstance.getId());
break;
// current process instance success ,next execute
if(null == iterator){
// loop by day
scheduleDate = DateUtils.getSomeDay(scheduleDate, 1);
if(scheduleDate.after(endDate)){
// all success
logger.info("process {} complement completely!", processInstance.getId());
break;
}
}else{
// loop by schedule date
if(!iterator.hasNext()){
// all success
logger.info("process {} complement completely!", processInstance.getId());
break;
}
scheduleDate = iterator.next();
}
logger.info("process {} start to complement {} data",
@ -541,7 +575,7 @@ public class MasterExecThread implements Runnable {
private DependResult isTaskDepsComplete(String taskName) {
Collection<String> startNodes = dag.getBeginNode();
// ff the vertex returns true directly
// if the vertex returns true directly
if(startNodes.contains(taskName)){
return DependResult.SUCCESS;
}

79
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ScheduleUtils.java

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.quartz.impl.triggers.CronTriggerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
/**
* ScheduleUtils
*/
public class ScheduleUtils {
private static final Logger logger = LoggerFactory.getLogger(ScheduleUtils.class);
/**
* Get the execution time of the time interval
* @param cron
* @param from
* @param to
* @return
*/
public static List<Date> getRecentTriggerTime(String cron, Date from, Date to) {
return getRecentTriggerTime(cron, Integer.MAX_VALUE, from, to);
}
/**
* Get the execution time of the time interval
* @param cron
* @param size
* @param from
* @param to
* @return
*/
public static List<Date> getRecentTriggerTime(String cron, int size, Date from, Date to) {
List list = new LinkedList<Date>();
if(to.before(from)){
logger.error("schedule date from:{} must before date to:{}!", from, to);
return list;
}
try {
CronTriggerImpl trigger = new CronTriggerImpl();
trigger.setCronExpression(cron);
trigger.setStartTime(from);
trigger.setEndTime(to);
trigger.computeFirstFireTime(null);
for (int i = 0; i < size; i++) {
Date schedule = trigger.getNextFireTime();
if(null == schedule){
break;
}
list.add(schedule);
trigger.triggered(null);
}
} catch (ParseException e) {
logger.error("cron:{} error:{}", cron, e.getMessage());
}
return java.util.Collections.unmodifiableList(list);
}
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -201,7 +201,7 @@ public class WorkerServer implements IStoppable {
try {
//execute only once
if(Stopper.isStoped()){
if(Stopper.isStopped()){
return;
}

32
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

@ -21,6 +21,7 @@ import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.permission.PermissionCheck;
import org.apache.dolphinscheduler.server.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.worker.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
@ -42,7 +44,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
@ -94,12 +95,15 @@ public class TaskScheduleThread implements Runnable {
// task node
TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
// get resource files
List<String> resourceFiles = createProjectResFiles(taskNode);
// copy hdfs/minio file to local
copyHdfsToLocal(processDao,
downloadResource(
taskInstance.getExecutePath(),
createProjectResFiles(taskNode),
resourceFiles,
logger);
// get process instance according to tak instance
ProcessInstance processInstance = taskInstance.getProcessInstance();
@ -204,8 +208,8 @@ public class TaskScheduleThread implements Runnable {
}
/**
* get task log path
* @return
* get task log path
* @return log path
*/
private String getTaskLogPath() {
String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory())
@ -294,14 +298,14 @@ public class TaskScheduleThread implements Runnable {
}
/**
* copy hdfs file to local
* download resource file
*
* @param processDao
* @param execLocalPath
* @param projectRes
* @param logger
*/
private void copyHdfsToLocal(ProcessDao processDao, String execLocalPath, List<String> projectRes, Logger logger) throws IOException {
private void downloadResource(String execLocalPath, List<String> projectRes, Logger logger) throws Exception {
checkDownloadPermission(projectRes);
for (String res : projectRes) {
File resFile = new File(execLocalPath, res);
if (!resFile.exists()) {
@ -321,4 +325,16 @@ public class TaskScheduleThread implements Runnable {
}
}
}
/**
* check download resource permission
* @param projectRes resource name list
* @throws Exception exception
*/
private void checkDownloadPermission(List<String> projectRes) throws Exception {
int userId = taskInstance.getProcessInstance().getExecutorId();
String[] resNames = projectRes.toArray(new String[projectRes.size()]);
PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE,processDao,resNames,userId,logger);
permissionCheck.checkPermission();
}
}

20
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java

@ -30,10 +30,7 @@ import org.slf4j.Logger;
import java.io.*;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -207,7 +204,14 @@ public abstract class AbstractCommandExecutor {
// merge error information to standard output stream
processBuilder.redirectErrorStream(true);
// setting up user to run commands
processBuilder.command("sudo", "-u", tenantCode, commandType(), commandFile);
List<String> command = new LinkedList<>();
command.add("sudo");
command.add("-u");
command.add(tenantCode);
command.add(commandInterpreter());
command.addAll(commandOptions());
command.add(commandFile);
processBuilder.command(command);
process = processBuilder.start();
@ -559,9 +563,11 @@ public abstract class AbstractCommandExecutor {
}
}
protected List<String> commandOptions() {
return Collections.emptyList();
}
protected abstract String buildCommandFilePath();
protected abstract String commandType();
protected abstract String commandInterpreter();
protected abstract boolean checkFindApp(String line);
protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
}

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java

@ -26,6 +26,7 @@ import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.function.Consumer;
@ -108,12 +109,22 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
}
}
/**
* get command options
* @return command options list
*/
@Override
protected List<String> commandOptions() {
// unbuffered binary stdout and stderr
return Collections.singletonList("-u");
}
/**
* get python home
* @return python home
*/
@Override
protected String commandType() {
protected String commandInterpreter() {
String pythonHome = getPythonHome(envFile);
if (StringUtils.isEmpty(pythonHome)){
return PYTHON;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java

@ -74,7 +74,7 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
* @return command type
*/
@Override
protected String commandType() {
protected String commandInterpreter() {
return SH;
}

88
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -19,8 +19,11 @@ package org.apache.dolphinscheduler.server.worker.task.sql;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.alert.utils.MailUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.ShowType;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.UdfType;
@ -38,6 +41,7 @@ import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.permission.PermissionCheck;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.UDFUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
@ -101,7 +105,7 @@ public class SqlTask extends AbstractTask {
// set the name of the current thread
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
logger.info(sqlParameters.toString());
logger.info("Full sql parameters: {}", sqlParameters);
logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}",
sqlParameters.getType(),
sqlParameters.getDatasource(),
@ -119,13 +123,6 @@ public class SqlTask extends AbstractTask {
}
dataSource= processDao.findDataSourceById(sqlParameters.getDatasource());
if (null == dataSource){
logger.error("datasource not exists");
exitStatusCode = -1;
return;
}
logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}",
dataSource.getName(),
dataSource.getType(),
@ -133,6 +130,12 @@ public class SqlTask extends AbstractTask {
dataSource.getUserId(),
dataSource.getConnectionParams());
if (dataSource == null){
logger.error("datasource not exists");
exitStatusCode = -1;
return;
}
Connection con = null;
List<String> createFuncs = null;
try {
@ -164,6 +167,8 @@ public class SqlTask extends AbstractTask {
for(int i=0;i<ids.length;i++){
idsArray[i]=Integer.parseInt(ids[i]);
}
// check udf permission
checkUdfPermission(ArrayUtils.toObject(idsArray));
List<UdfFunc> udfFuncList = processDao.queryUdfFunListByids(idsArray);
createFuncs = UDFUtils.createFuncs(udfFuncList, taskProps.getTenantCode(), logger);
}
@ -284,12 +289,12 @@ public class SqlTask extends AbstractTask {
}
}
try (PreparedStatement stmt = prepareStatementAndBind(connection, mainSqlBinds)) {
try (PreparedStatement stmt = prepareStatementAndBind(connection, mainSqlBinds);
ResultSet resultSet = stmt.executeQuery()) {
// decide whether to executeQuery or executeUpdate based on sqlType
if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
// query statements need to be convert to JsonArray and inserted into Alert to send
JSONArray resultJSONArray = new JSONArray();
ResultSet resultSet = stmt.executeQuery();
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
@ -300,11 +305,10 @@ public class SqlTask extends AbstractTask {
}
resultJSONArray.add(mapOfColValues);
}
resultSet.close();
logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
// if there is a result set
if (resultJSONArray.size() > 0) {
if ( !resultJSONArray.isEmpty() ) {
if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
sendAttachment(sqlParameters.getTitle(),
JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
@ -332,6 +336,12 @@ public class SqlTask extends AbstractTask {
} catch (Exception e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage());
} finally {
try {
connection.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
return connection;
}
@ -344,22 +354,23 @@ public class SqlTask extends AbstractTask {
* @throws Exception
*/
private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) throws Exception {
PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql());
// is the timeout set
boolean timeoutFlag = taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED ||
taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED;
if(timeoutFlag){
stmt.setQueryTimeout(taskProps.getTaskTimeout());
}
Map<Integer, Property> params = sqlBinds.getParamsMap();
if(params != null) {
for (Map.Entry<Integer, Property> entry : params.entrySet()) {
Property prop = entry.getValue();
ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue());
try (PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql())) {
if(timeoutFlag){
stmt.setQueryTimeout(taskProps.getTaskTimeout());
}
Map<Integer, Property> params = sqlBinds.getParamsMap();
if(params != null) {
for (Map.Entry<Integer, Property> entry : params.entrySet()) {
Property prop = entry.getValue();
ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue());
}
}
logger.info("prepare statement replace sql : {} ", stmt);
return stmt;
}
logger.info("prepare statement replace sql : {} ",stmt.toString());
return stmt;
}
/**
@ -447,6 +458,35 @@ public class SqlTask extends AbstractTask {
for(int i=1;i<=sqlParamsMap.size();i++){
logPrint.append(sqlParamsMap.get(i).getValue()+"("+sqlParamsMap.get(i).getType()+")");
}
logger.info(logPrint.toString());
logger.info("Sql Params are {}", logPrint);
}
/**
* check udf function permission
* @param udfFunIds udf functions
* @return if has download permission return true else false
*/
private void checkUdfPermission(Integer[] udfFunIds) throws Exception{
// process instance
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
int userId = processInstance.getExecutorId();
PermissionCheck<Integer> permissionCheckUdf = new PermissionCheck<Integer>(AuthorizationType.UDF,processDao,udfFunIds,userId,logger);
permissionCheckUdf.checkPermission();
}
/**
* check data source permission
* @param dataSourceId data source id
* @return if has download permission return true else false
*/
private void checkDataSourcePermission(int dataSourceId) throws Exception{
// process instance
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
int userId = processInstance.getExecutorId();
PermissionCheck<Integer> permissionCheckDataSource = new PermissionCheck<Integer>(AuthorizationType.DATASOURCE,processDao,new Integer[]{dataSourceId},userId,logger);
permissionCheckDataSource.checkPermission();
}
}

154
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.MasterExecThread;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.context.ApplicationContext;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.text.ParseException;
import java.util.*;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.mock;
/**
* test for MasterExecThread
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({MasterExecThread.class})
public class MasterExecThreadTest {
private MasterExecThread masterExecThread;
private ProcessInstance processInstance;
private ProcessDao processDao;
private int processDefinitionId = 1;
private MasterConfig config;
private ApplicationContext applicationContext;
@Before
public void init() throws Exception{
processDao = mock(ProcessDao.class);
applicationContext = mock(ApplicationContext.class);
config = new MasterConfig();
config.setMasterExecTaskNum(1);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
processInstance = mock(ProcessInstance.class);
Mockito.when(processInstance.getProcessDefinitionId()).thenReturn(processDefinitionId);
Mockito.when(processInstance.getState()).thenReturn(ExecutionStatus.SUCCESS);
Mockito.when(processInstance.getHistoryCmd()).thenReturn(CommandType.COMPLEMENT_DATA.toString());
Mockito.when(processInstance.getIsSubProcess()).thenReturn(Flag.NO);
Mockito.when(processInstance.getScheduleTime()).thenReturn(DateUtils.stringToDate("2020-01-01 00:00:00"));
Map<String, String> cmdParam = new HashMap<>();
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, "2020-01-01 00:00:00");
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, "2020-01-31 23:00:00");
Mockito.when(processInstance.getCommandParam()).thenReturn(JSONObject.toJSONString(cmdParam));
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setGlobalParamMap(Collections.EMPTY_MAP);
processDefinition.setGlobalParamList(Collections.EMPTY_LIST);
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
masterExecThread = PowerMockito.spy(new MasterExecThread(processInstance, processDao));
// prepareProcess init dag
Field dag = MasterExecThread.class.getDeclaredField("dag");
dag.setAccessible(true);
dag.set(masterExecThread, new DAG());
PowerMockito.doNothing().when(masterExecThread, "executeProcess");
PowerMockito.doNothing().when(masterExecThread, "postHandle");
PowerMockito.doNothing().when(masterExecThread, "prepareProcess");
PowerMockito.doNothing().when(masterExecThread, "runProcess");
PowerMockito.doNothing().when(masterExecThread, "endProcess");
}
/**
* without schedule
* @throws ParseException
*/
@Test
public void testParallelWithOutSchedule() throws ParseException {
try{
Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(zeroSchedulerList());
Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess");
method.setAccessible(true);
method.invoke(masterExecThread);
// one create save, and 1-30 for next save, and last day 31 no save
verify(processDao, times(31)).saveProcessInstance(processInstance);
}catch (Exception e){
e.printStackTrace();
Assert.assertTrue(false);
}
}
/**
* with schedule
* @throws ParseException
*/
@Test
public void testParallelWithSchedule() throws ParseException {
try{
Mockito.when(processDao.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId)).thenReturn(oneSchedulerList());
Method method = MasterExecThread.class.getDeclaredMethod("executeComplementProcess");
method.setAccessible(true);
method.invoke(masterExecThread);
// one create save, and 15(1 to 31 step 2) for next save, and last day 31 no save
verify(processDao, times(16)).saveProcessInstance(processInstance);
}catch (Exception e){
Assert.assertTrue(false);
}
}
private List<Schedule> zeroSchedulerList(){
return Collections.EMPTY_LIST;
}
private List<Schedule> oneSchedulerList(){
List<Schedule> schedulerList = new LinkedList<>();
Schedule schedule = new Schedule();
schedule.setCrontab("0 0 0 1/2 * ?");
schedulerList.add(schedule);
return schedulerList;
}
}

44
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ScheduleUtilsTest.java

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.junit.Test;
import java.util.Date;
import static org.junit.Assert.assertEquals;
/**
* Test ScheduleUtils
*/
public class ScheduleUtilsTest {
/**
* Test the getRecentTriggerTime method
*/
@Test
public void testGetRecentTriggerTime() {
Date from = DateUtils.stringToDate("2020-01-01 00:00:00");
Date to = DateUtils.stringToDate("2020-01-31 01:00:00");
// test date
assertEquals(0, ScheduleUtils.getRecentTriggerTime("0 0 0 * * ? ", to, from).size());
// test error cron
assertEquals(0, ScheduleUtils.getRecentTriggerTime("0 0 0 * *", from, to).size());
// test cron
assertEquals(31, ScheduleUtils.getRecentTriggerTime("0 0 0 * * ? ", from, to).size());
}
}

9
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss

@ -159,6 +159,9 @@
border-radius: 0 3px 0 0;
.ans-btn-text {
color: #337ab7;
.ans-icon {
font-size: 16px;
}
}
.assist-btn {
position: absolute;
@ -206,7 +209,7 @@
color: #333;
}
&.active {
background: #e1e2e3;
// background: #e1e2e3;
i {
color: #2d8cf0;
}
@ -234,7 +237,9 @@
border-radius: 3px 3px 0px 0px;
}
}
#screen {
margin-right: 5px;
}
.v-modal-custom-log {
z-index: 101;
}

22
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue

@ -61,20 +61,28 @@
<span v-if="name" class="copy-name" @click="_copyName" :data-clipboard-text="name"><em class="ans-icon-copy" data-container="body" data-toggle="tooltip" :title="$t('Copy name')" ></em></span>
</div>
<div class="save-btn">
<div class="operation" style="vertical-align: middle;">
<div class="operation" style="vertical-align: middle;">
<a href="javascript:"
v-for="(item,$index) in toolOperList"
:class="_operationClass(item)"
:id="item.code"
:key="$index"
@click="_ckOperation(item,$event)">
<em :class="item.icon" data-toggle="tooltip" :title="item.description" ></em>
<x-button type="text" data-container="body" :icon="item.icon" v-tooltip.light="item.desc"></x-button>
</a>
</div>
<x-button type="text" icon="ans-icon-triangle-solid-right" @click="dagAutomaticLayout"></x-button>
<x-button
type="primary"
v-tooltip.light="$t('Format DAG')"
icon="ans-icon-triangle-solid-right"
size="xsmall"
data-container="body"
v-if="type === 'instance'"
style="vertical-align: middle;"
@click="dagAutomaticLayout">
</x-button>
<x-button
data-toggle="tooltip"
:title="$t('Refresh DAG status')"
v-tooltip.light="$t('Refresh DAG status')"
data-container="body"
style="vertical-align: middle;"
icon="ans-icon-refresh"
@ -189,10 +197,6 @@
Dag.backfill(true)
if (this.type === 'instance') {
this._getTaskState(false).then(res => {})
// Round robin acquisition status
this.setIntervalP = setInterval(() => {
this._getTaskState(true).then(res => {})
}, 90000)
}
} else {
Dag.create()

3
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue

@ -513,7 +513,8 @@
this.workerGroupId = o.workerGroupId
}
this.params = o.params || {};
this.params = o.params || {}
this.dependence = o.dependence || {}
}
this.isContentBox = true

50
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/dependItemList.vue

@ -22,7 +22,7 @@
</x-option>
</x-select>
<x-select filterable :style="{width:isInstance ? '450px' : '450px'}" :disabled="isDetails" v-model="el.definitionId" @on-change="_onChangeDefinitionId">
<x-option v-for="item in definitionList" :key="item.value" :value="item.value" :label="item.label">
<x-option v-for="item in el.definitionList" :key="item.value" :value="item.value" :label="item.label">
</x-option>
</x-select>
<x-select filterable :style="{width:isInstance ? '450px' : '450px'}" :disabled="isDetails" v-model="el.depTasks">
@ -64,7 +64,6 @@
data () {
return {
list: [],
definitionList: [],
projectList: [],
cycleList: cycleList,
isInstance: false,
@ -88,16 +87,19 @@
_add () {
// btn loading
this.isLoading = true
// dependItemList index
let is = (value) => _.some(this.dependItemList, { definitionId: value })
let noArr = _.filter(this.definitionList, v => !is(v.value))
let value = noArr[0] && noArr[0].value || null
let val = value || this.definitionList[0].value
// add task list
let projectId = this.projectList[0].value
this._getDependItemList(val).then(depTasksList => {
this.$nextTick(() => {
this.$emit('dependItemListEvent', _.concat(this.dependItemList, this._rtNewParams(val, depTasksList,projectId)))
this._getProcessByProjectId(projectId).then(definitionList => {
// dependItemList index
let is = (value) => _.some(this.dependItemList, { definitionId: value })
let noArr = _.filter(definitionList, v => !is(v.value))
let value = noArr[0] && noArr[0].value || null
let val = value || definitionList[0].value
this._getDependItemList(val).then(depTasksList => {
this.$nextTick(() => {
this.$emit('dependItemListEvent', _.concat(this.dependItemList, this._rtNewParams(val, definitionList, depTasksList, projectId)))
})
})
})
// remove tooltip
@ -131,25 +133,25 @@
*/
_getProcessList () {
return new Promise((resolve, reject) => {
this.definitionList = _.map(_.cloneDeep(this.store.state.dag.processListS), v => {
let definitionList = _.map(_.cloneDeep(this.store.state.dag.processListS), v => {
return {
value: v.id,
label: v.name
}
})
resolve()
resolve(definitionList)
})
},
_getProcessByProjectId (id) {
return new Promise((resolve, reject) => {
this.store.dispatch('dag/getProcessByProjectId', { projectId: id }).then(res => {
this.definitionList = _.map(_.cloneDeep(res), v => {
let definitionList = _.map(_.cloneDeep(res), v => {
return {
value: v.id,
label: v.name
}
})
resolve(res)
resolve(definitionList)
})
})
},
@ -175,7 +177,7 @@
_onChangeProjectId ({ value }) {
this._getProcessByProjectId(value).then(definitionList => {
/*this.$set(this.dependItemList, this.itemIndex, this._dlOldParams(value, definitionList, item))*/
let definitionId = definitionList[0].id
let definitionId = definitionList[0].value
this._getDependItemList(definitionId).then(depTasksList => {
let item = this.dependItemList[this.itemIndex]
// init set depTasks All
@ -192,7 +194,7 @@
// init set depTasks All
item.depTasks = 'ALL'
// set dependItemList item data
this.$set(this.dependItemList, this.itemIndex, this._rtOldParams(value, depTasksList, item))
this.$set(this.dependItemList, this.itemIndex, this._rtOldParams(value, item.definitionList, depTasksList, item))
})
},
_onChangeCycle ({ value }) {
@ -200,10 +202,12 @@
this.$set(this.dependItemList[this.itemIndex], 'dateValue', list[0].value)
this.$set(this.dependItemList[this.itemIndex], 'dateValueList', list)
},
_rtNewParams (value, depTasksList,projectId) {
_rtNewParams (value, definitionList, depTasksList, projectId) {
return {
projectId: projectId,
definitionId: value,
// dependItem need private definitionList
definitionList: definitionList,
depTasks: 'ALL',
depTasksList: depTasksList,
cycle: 'day',
@ -212,10 +216,12 @@
state: ''
}
},
_rtOldParams (value,depTasksList, item) {
_rtOldParams (value, definitionList, depTasksList, item) {
return {
projectId: item.projectId,
definitionId: value,
// dependItem need private definitionList
definitionList: definitionList,
depTasks: item.depTasks || 'ALL',
depTasksList: depTasksList,
cycle: item.cycle,
@ -254,12 +260,12 @@
this.isInstance = this.router.history.current.name === 'projects-instance-details'
// get processlist
this._getProjectList().then(() => {
let projectId = this.projectList[0].value
if (!this.dependItemList.length) {
let projectId = this.projectList[0].value
this._getProcessByProjectId(projectId).then(definitionList => {
let value = this.definitionList[0].value
let value = definitionList[0].value
this._getDependItemList(value).then(depTasksList => {
this.$emit('dependItemListEvent', _.concat(this.dependItemList, this._rtNewParams(value, depTasksList,projectId)))
this.$emit('dependItemListEvent', _.concat(this.dependItemList, this._rtNewParams(value, definitionList, depTasksList, projectId)))
})
})
} else {
@ -269,7 +275,7 @@
this._getDependItemList(ids, false).then(res => {
_.map(this.dependItemList, (v, i) => {
this._getProcessByProjectId(v.projectId).then(definitionList => {
this.$set(this.dependItemList, i, this._rtOldParams(v.definitionId, ['ALL'].concat(_.map(res[v.definitionId] || [], v => v.name)), v))
this.$set(this.dependItemList, i, this._rtOldParams(v.definitionId, definitionList, ['ALL'].concat(_.map(res[v.definitionId] || [], v => v.name)), v))
})
})
})

2
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/sqlType.vue

@ -20,7 +20,7 @@
v-model="sqlTypeId"
:disabled="isDetails"
@on-change="_handleSqlTypeChanged"
style="width: 90px;">
style="width: 120px;">
<x-option
v-for="city in sqlTypeList"
:key="city.id"

2
dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/_source/list.vue

@ -64,7 +64,7 @@
</m-tooltips-JSON>
</td>
<td>
<span v-if="item.note" class="ellipsis" v-tooltip.large.top.start="{text: item.note, maxWidth: '500px'}">{{item.note}}</span>
<span v-if="item.note" class="ellipsis" v-tooltip.large.top.start.light="{text: item.note, maxWidth: '500px'}">{{item.note}}</span>
<span v-else>-</span>
</td>
<td>

12
dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/index.vue

@ -124,10 +124,14 @@
_getList (flag) {
this.isLoading = !flag
this.getDatasourcesListP(this.searchParams).then(res => {
this.datasourcesList = []
this.datasourcesList = res.totalList
this.total = res.total
this.isLoading = false
if(this.searchParams.pageNo>1 && res.totalList.length == 0) {
this.searchParams.pageNo = this.searchParams.pageNo -1
} else {
this.datasourcesList = []
this.datasourcesList = res.totalList
this.total = res.total
this.isLoading = false
}
}).catch(e => {
this.isLoading = false
})

2
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue

@ -69,7 +69,7 @@
<span v-else>-</span>
</td>
<td>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start.light="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-else>-</span>
</td>
<td>

2
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/start.vue

@ -141,7 +141,7 @@
</div>
<div class="clearfix list">
<div class="text">
{{$t('Date')}}
{{$t('Schedule date')}}
</div>
<div class="cont">
<x-datepicker

12
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/index.vue

@ -100,10 +100,14 @@
_getList (flag) {
this.isLoading = !flag
this.getProcessListP(this.searchParams).then(res => {
this.processListP = []
this.processListP = res.totalList
this.total = res.total
this.isLoading = false
if(this.searchParams.pageNo>1 && res.totalList.length == 0) {
this.searchParams.pageNo = this.searchParams.pageNo -1
} else {
this.processListP = []
this.processListP = res.totalList
this.total = res.total
this.isLoading = false
}
}).catch(e => {
this.isLoading = false
})

12
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/index.vue

@ -105,10 +105,14 @@
_getProcessInstanceListP (flag) {
this.isLoading = !flag
this.getProcessInstance(this.searchParams).then(res => {
this.processInstanceList = []
this.processInstanceList = res.totalList
this.total = res.total
this.isLoading = false
if(this.searchParams.pageNo>1 && res.totalList.length == 0) {
this.searchParams.pageNo = this.searchParams.pageNo -1
} else {
this.processInstanceList = []
this.processInstanceList = res.totalList
this.total = res.total
this.isLoading = false
}
}).catch(e => {
this.isLoading = false
})

2
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/list/_source/list.vue

@ -66,7 +66,7 @@
<span>{{item.instRunningCount}}</span>
</td>
<td>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start.light="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-else>-</span>
</td>
<td>

12
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/list/index.vue

@ -110,10 +110,14 @@
_getList (flag) {
this.isLoading = !flag
this.getProjectsList(this.searchParams).then(res => {
this.projectsList = []
this.projectsList = res.totalList
this.total = res.total
this.isLoading = false
if(this.searchParams.pageNo>1 && res.totalList.length == 0) {
this.searchParams.pageNo = this.searchParams.pageNo -1
} else {
this.projectsList = []
this.projectsList = res.totalList
this.total = res.total
this.isLoading = false
}
}).catch(e => {
this.isLoading = false
})

6
dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/_source/list.vue

@ -46,13 +46,13 @@
<span>{{parseInt(pageNo === 1 ? ($index + 1) : (($index + 1) + (pageSize * (pageNo - 1))))}}</span>
</td>
<td>
<span class="ellipsis">
<span class="ellipsis" v-tooltip.large.top.start.light="{text: item.alias, maxWidth: '500px'}">
<a href="javascript:" class="links" @click="_go(item)">{{item.alias}}</a>
</span>
</td>
<td><span class="ellipsis">{{item.fileName}}</span></td>
<td><span class="ellipsis" v-tooltip.large.top.start.light="{text: item.fileName, maxWidth: '500px'}">{{item.fileName}}</span></td>
<td>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start.light="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-else>-</span>
</td>
<td>

10
dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/list/index.vue

@ -91,9 +91,13 @@
_getList (flag) {
this.isLoading = !flag
this.getResourcesListP(this.searchParams).then(res => {
this.fileResourcesList = res.totalList
this.total = res.total
this.isLoading = false
if(this.searchParams.pageNo>1 && res.totalList.length == 0) {
this.searchParams.pageNo = this.searchParams.pageNo -1
} else {
this.fileResourcesList = res.totalList
this.total = res.total
this.isLoading = false
}
}).catch(e => {
this.isLoading = false
})

4
dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/_source/list.vue

@ -55,7 +55,7 @@ v-ps<template>
<span>{{$index + 1}}</span>
</td>
<td>
<span class="ellipsis">
<span class="ellipsis" v-tooltip.large.top.start.light="{text: item.funcName, maxWidth: '500px'}">
<a href="javascript:" class="links">{{item.funcName}}</a>
</span>
</td>
@ -67,7 +67,7 @@ v-ps<template>
<span>{{item.type}}</span>
</td>
<td>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start.light="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-else>-</span>
</td>
<td>

12
dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/function/index.vue

@ -109,10 +109,14 @@
_getList (flag) {
this.isLoading = !flag
this.getUdfFuncListP(this.searchParams).then(res => {
this.udfFuncList = []
this.udfFuncList = res.totalList
this.total = res.total
this.isLoading = false
if(this.searchParams.pageNo>1 && res.totalList.length == 0) {
this.searchParams.pageNo = this.searchParams.pageNo -1
} else {
this.udfFuncList = []
this.udfFuncList = res.totalList
this.total = res.total
this.isLoading = false
}
}).catch(e => {
this.isLoading = false
})

6
dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/resource/_source/list.vue

@ -49,16 +49,16 @@
<span>{{parseInt(pageNo === 1 ? ($index + 1) : (($index + 1) + (pageSize * (pageNo - 1))))}}</span>
</td>
<td>
<span class="ellipsis">
<span class="ellipsis" v-tooltip.large.top.start.light="{text: item.alias, maxWidth: '500px'}">
<a href="javascript:" class="links" >{{item.alias}}</a>
</span>
</td>
<td><span class="ellipsis">{{item.fileName}}</span></td>
<td><span class="ellipsis" v-tooltip.large.top.start.light="{text: item.fileName, maxWidth: '500px'}">{{item.fileName}}</span></td>
<td>
<span>{{_rtSize(item.size)}}</span>
</td>
<td>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start.light="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-else>-</span>
</td>
<td>

12
dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/udf/pages/resource/index.vue

@ -96,10 +96,14 @@
_getList (flag) {
this.isLoading = !flag
this.getResourcesListP(this.searchParams).then(res => {
this.udfResourcesList = []
this.udfResourcesList = res.totalList
this.total = res.total
this.isLoading = false
if(this.searchParams.pageNo>1 && res.totalList.length == 0) {
this.searchParams.pageNo = this.searchParams.pageNo -1
} else {
this.udfResourcesList = []
this.udfResourcesList = res.totalList
this.total = res.total
this.isLoading = false
}
}).catch(e => {
this.isLoading = false
})

12
dolphinscheduler-ui/src/js/conf/home/pages/security/pages/queue/index.vue

@ -118,10 +118,14 @@
_getList (flag) {
this.isLoading = !flag
this.getQueueListP(this.searchParams).then(res => {
this.queueList = []
this.queueList = res.totalList
this.total = res.total
this.isLoading = false
if(this.searchParams.pageNo>1 && res.totalList.length == 0) {
this.searchParams.pageNo = this.searchParams.pageNo -1
} else {
this.queueList = []
this.queueList = res.totalList
this.total = res.total
this.isLoading = false
}
}).catch(e => {
this.isLoading = false
})

2
dolphinscheduler-ui/src/js/conf/home/pages/security/pages/tenement/_source/list.vue

@ -59,7 +59,7 @@
</span>
</td>
<td>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start.light="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-else>-</span>
</td>
<td>

12
dolphinscheduler-ui/src/js/conf/home/pages/security/pages/tenement/index.vue

@ -122,10 +122,14 @@
_getList (flag) {
this.isLoading = !flag
this.getTenantListP(this.searchParams).then(res => {
this.tenementList = []
this.tenementList = res.totalList
this.total = res.total
this.isLoading = false
if(this.searchParams.pageNo>1 && res.totalList.length == 0) {
this.searchParams.pageNo = this.searchParams.pageNo -1
} else {
this.tenementList = []
this.tenementList = res.totalList
this.total = res.total
this.isLoading = false
}
}).catch(e => {
this.isLoading = false
})

12
dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/index.vue

@ -120,10 +120,14 @@
_getList (flag) {
this.isLoading = !flag
this.getUsersListP(this.searchParams).then(res => {
this.userList = []
this.userList = res.totalList
this.total = res.total
this.isLoading = false
if(this.searchParams.pageNo>1 && res.totalList.length == 0) {
this.searchParams.pageNo = this.searchParams.pageNo -1
} else {
this.userList = []
this.userList = res.totalList
this.total = res.total
this.isLoading = false
}
}).catch(e => {
this.isLoading = false
})

2
dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningGroups/_source/list.vue

@ -52,7 +52,7 @@
</td>
<td><span>{{item.groupType === 'EMAIL' ? `${$t('Email')}` : `${$t('SMS')}`}}</span></td>
<td>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-if="item.description" class="ellipsis" v-tooltip.large.top.start.light="{text: item.description, maxWidth: '500px'}">{{item.description}}</span>
<span v-else>-</span>
</td>
<td>

12
dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningGroups/index.vue

@ -122,10 +122,14 @@
_getList (flag) {
this.isLoading = !flag
this.getAlertgroupP(this.searchParams).then(res => {
this.alertgroupList = []
this.alertgroupList = res.totalList
this.total = res.total
this.isLoading = false
if(this.searchParams.pageNo>1 && res.totalList.length == 0) {
this.searchParams.pageNo = this.searchParams.pageNo -1
} else {
this.alertgroupList = []
this.alertgroupList = res.totalList
this.total = res.total
this.isLoading = false
}
}).catch(e => {
this.isLoading = false
})

12
dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/index.vue

@ -121,10 +121,14 @@
_getList (flag) {
this.isLoading = !flag
this.getWorkerGroups(this.searchParams).then(res => {
this.workerGroupList = []
this.workerGroupList = res.totalList
this.total = res.total
this.isLoading = false
if(this.searchParams.pageNo>1 && res.totalList.length == 0) {
this.searchParams.pageNo = this.searchParams.pageNo -1
} else {
this.workerGroupList = []
this.workerGroupList = res.totalList
this.total = res.total
this.isLoading = false
}
}).catch(e => {
this.isLoading = false
})

12
dolphinscheduler-ui/src/js/conf/home/pages/user/pages/token/index.vue

@ -122,10 +122,14 @@
_getList (flag) {
this.isLoading = !flag
this.getTokenListP(this.searchParams).then(res => {
this.tokenList = []
this.tokenList = res.totalList
this.total = res.total
this.isLoading = false
if(this.searchParams.pageNo>1 && res.totalList.length == 0) {
this.searchParams.pageNo = this.searchParams.pageNo -1
} else {
this.tokenList = []
this.tokenList = res.totalList
this.total = res.total
this.isLoading = false
}
}).catch(e => {
this.isLoading = false
})

5
dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js

@ -147,6 +147,11 @@ export default {
let processInstanceJson = JSON.parse(res.data.processInstanceJson)
// tasks info
state.tasks = processInstanceJson.tasks
// tasks cache
state.cacheTasks = {}
processInstanceJson.tasks.forEach(v => {
state.cacheTasks[v.id] = v
})
// global params
state.globalParams = processInstanceJson.globalParams
// timeout

2
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@ -361,6 +361,7 @@ export default {
'Recipient': 'Recipient',
'Cc': 'Cc',
'Whether it is a complement process?': 'Whether it is a complement process?',
'Schedule date': 'Schedule date',
'Mode of execution': 'Mode of execution',
'Serial execution': 'Serial execution',
'Parallel execution': 'Parallel execution',
@ -374,6 +375,7 @@ export default {
'All_1': 'All',
'Toolbar': 'Toolbar',
'View variables': 'View variables',
'Format DAG': 'Format DAG',
'Refresh DAG status': 'Refresh DAG status',
'Return_1': 'Return',
'Please enter format': 'Please enter format',

2
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -356,6 +356,7 @@ export default {
'Recipient': '收件人',
'Cc': '抄送人',
'Whether it is a complement process?': '是否补数',
'Schedule date': '调度日期',
'Mode of execution': '执行方式',
'Serial execution': '串行执行',
'Parallel execution': '并行执行',
@ -369,6 +370,7 @@ export default {
'All_1': '成功或失败都发',
'Toolbar': '工具栏',
'View variables': '查看变量',
'Format DAG': '格式化DAG',
'Refresh DAG status': '刷新DAG状态',
'Return_1': '返回上一节点',
'Please enter format': '请输入格式为',

5
pom.xml

@ -694,6 +694,9 @@
<include>**/api/service/ProcessDefinitionServiceTest.java</include>
<include>**/api/service/UdfFuncServiceTest.java</include>
<include>**/api/service/ResourcesServiceTest.java</include>
<include>**/api/service/ExecutorService2Test.java</include>
<include>**/api/service/BaseServiceTest.java</include>
<include>**/api/service/BaseDAGServiceTest.java</include>
<include>**/alert/utils/ExcelUtilsTest.java</include>
<include>**/alert/utils/FuncUtilsTest.java</include>
<include>**/alert/utils/JSONUtilsTest.java</include>
@ -701,6 +704,8 @@
<include>**/server/utils/SparkArgsUtilsTest.java</include>
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
<include>**/server/utils/ParamUtilsTest.java</include>
<include>**/server/utils/ScheduleUtilsTest.java</include>
<include>**/server/master/MasterExecThreadTest.java</include>
<include>**/dao/mapper/AccessTokenMapperTest.java</include>
<include>**/dao/mapper/AlertGroupMapperTest.java</include>
<include>**/dao/mapper/AlertMapperTest.java</include>

Loading…
Cancel
Save