Browse Source

Merge branch 'dev' into dev

pull/3/MERGE
dailidong 5 years ago committed by GitHub
parent
commit
c60b7b7752
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      .github/workflows/ci_ut.yml
  2. 21
      dolphinscheduler-alert/pom.xml
  3. 4
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java
  4. 2
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtils.java
  5. 2
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/FuncUtils.java
  6. 2
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/JSONUtils.java
  7. 11
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java
  8. 4
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java
  9. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/ServiceModelToSwagger2MapperImpl.java
  10. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/BaseController.java
  11. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataAnalysisController.java
  12. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java
  13. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoginController.java
  14. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  15. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
  16. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java
  17. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TenantController.java
  18. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/UsersController.java
  19. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/AlertGroupService.java
  20. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java
  21. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java
  22. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  23. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
  24. 497
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  25. 19
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  26. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
  27. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SessionService.java
  28. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
  29. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java
  30. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java
  31. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
  32. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
  33. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZooKeeperState.java
  34. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java
  35. 4
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/AbstractControllerTest.java
  36. 9
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AccessTokenServiceTest.java
  37. 294
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  38. 10
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SessionServiceTest.java
  39. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/ClickHouseDataSource.java
  40. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/DB2ServerDataSource.java
  41. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/HiveDataSource.java
  42. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/MySQLDataSource.java
  43. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/OracleDataSource.java
  44. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/PostgreDataSource.java
  45. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SQLServerDataSource.java
  46. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SparkDataSource.java
  47. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DateInterval.java
  48. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNodeRelation.java
  49. 37
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java
  50. 26
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ArrayUtils.java
  51. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ConnectionUtils.java
  52. 398
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java
  53. 12
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/EnumUtils.java
  54. 5
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
  55. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HttpUtils.java
  56. 45
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/IOUtils.java
  57. 30
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
  58. 174
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DependentUtilsTest.java
  59. 170
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PreconditionsTest.java
  60. 8
      dolphinscheduler-dao/pom.xml
  61. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
  62. 12
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java
  63. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java
  64. 15
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
  65. 33
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  66. 42
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/MysqlPerformance.java
  67. 39
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/PostgrePerformance.java
  68. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  69. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  70. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/AbstractMonitor.java
  71. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
  72. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
  73. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverter.java
  74. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
  75. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
  76. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
  77. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
  78. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
  79. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
  80. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
  81. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
  82. 22
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  83. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java
  84. 1
      dolphinscheduler-ui/pom.xml
  85. 2
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss
  86. 21
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue
  87. 90
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue
  88. 19
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/http.vue
  89. 17
      dolphinscheduler-ui/src/js/conf/home/store/dag/mutations.js
  90. 2
      dolphinscheduler-ui/src/js/conf/home/store/dag/state.js
  91. 1
      dolphinscheduler-ui/src/js/module/components/fileUpdate/definitionUpdate.vue
  92. 16
      pom.xml

11
.github/workflows/ci_ut.yml

@ -15,7 +15,7 @@
# limitations under the License.
#
on: ["pull_request"]
on: [push, "pull_request"]
env:
DOCKER_DIR: ./docker
LOG_DIR: /tmp/dolphinscheduler
@ -49,11 +49,12 @@ jobs:
export MAVEN_OPTS='-Dmaven.repo.local=.m2/repository -XX:+TieredCompilation -XX:TieredStopAtLevel=1 -XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -Xmx3g'
mvn test -Dmaven.test.skip=false cobertura:cobertura
CODECOV_TOKEN="09c2663f-b091-4258-8a47-c981827eb29a" bash <(curl -s https://codecov.io/bash)
- name: Run SonarCloud analysis
- name: Run SonarCloud Analysis
run: >
mvn clean --batch-mode
verify
org.sonarsource.scanner.maven:sonar-maven-plugin:3.6.1.1688:sonar
mvn verify --batch-mode
org.sonarsource.scanner.maven:sonar-maven-plugin:3.6.1.1688:sonar
-Dsonar.junit.reportPaths=target/cobertura
-Dmaven.test.skip=true
-Dsonar.host.url=https://sonarcloud.io
-Dsonar.organization=apache
-Dsonar.projectKey=apache-dolphinscheduler

21
dolphinscheduler-alert/pom.xml

@ -67,21 +67,6 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@ -92,12 +77,6 @@
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<!--excel poi-->
<dependency>
<groupId>org.apache.poi</groupId>

4
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java

@ -16,14 +16,14 @@
*/
package org.apache.dolphinscheduler.alert.runner;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.alert.manager.EmailManager;
import org.apache.dolphinscheduler.alert.manager.EnterpriseWeChatManager;
import org.apache.dolphinscheduler.alert.utils.Constants;
import org.apache.dolphinscheduler.alert.utils.EnterpriseWeChatUtils;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.User;

2
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtils.java

@ -17,11 +17,11 @@
package org.apache.dolphinscheduler.alert.utils;
import org.apache.dolphinscheduler.common.enums.ShowType;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Alert;
import com.alibaba.fastjson.JSON;
import com.google.common.reflect.TypeToken;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;

2
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/FuncUtils.java

@ -16,7 +16,7 @@
*/
package org.apache.dolphinscheduler.alert.utils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
public class FuncUtils {

2
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/JSONUtils.java

@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.alert.utils;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

11
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java

@ -21,11 +21,11 @@ import freemarker.cache.StringTemplateLoader;
import freemarker.template.Configuration;
import freemarker.template.Template;
import freemarker.template.TemplateException;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.mail.EmailException;
import org.apache.commons.mail.HtmlEmail;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.IOUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ResourceUtils;
@ -35,7 +35,6 @@ import javax.mail.internet.*;
import java.io.*;
import java.util.*;
import static org.apache.dolphinscheduler.alert.utils.PropertyUtils.getInt;
/**
@ -422,8 +421,8 @@ public class MailUtils {
* @param e the exception
*/
private static void handleException(Collection<String> receivers, Map<String, Object> retMap, Exception e) {
logger.error("Send email to {} failed", StringUtils.join(",", receivers), e);
retMap.put(Constants.MESSAGE, "Send email to {" + StringUtils.join(",", receivers) + "} failed," + e.toString());
logger.error("Send email to {} failed {}", receivers, e);
retMap.put(Constants.MESSAGE, "Send email to {" + StringUtils.join(receivers, ",") + "} failed," + e.toString());
}
/**

4
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java

@ -16,8 +16,8 @@
*/
package org.apache.dolphinscheduler.alert.utils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.utils.IOUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/ServiceModelToSwagger2MapperImpl.java

@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.api.configuration;
import com.google.common.collect.Multimap;
import io.swagger.models.*;
import io.swagger.models.parameters.Parameter;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.context.MessageSource;
@ -159,7 +159,8 @@ public class ServiceModelToSwagger2MapperImpl extends ServiceModelToSwagger2Mapp
Iterator<String> it = from.getTags().iterator();
while(it.hasNext()){
String tag = it.next();
list.add(StringUtils.isNotBlank(tag) ? messageSource.getMessage(tag, null, tag, locale) : " ");
list.add(
StringUtils.isNotBlank(tag) ? messageSource.getMessage(tag, null, tag, locale) : " ");
}
operation.setTags(list);

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/BaseController.java

@ -20,8 +20,8 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.commons.lang3.StringUtils;
import javax.servlet.http.HttpServletRequest;
import java.text.MessageFormat;
@ -67,7 +67,7 @@ public class BaseController {
public static String getClientIpAddress(HttpServletRequest request) {
String clientIp = request.getHeader(HTTP_X_FORWARDED_FOR);
if (StringUtils.isNotEmpty(clientIp) && !StringUtils.equalsIgnoreCase(HTTP_HEADER_UNKNOWN, clientIp)) {
if (StringUtils.isNotEmpty(clientIp) && !clientIp.equalsIgnoreCase(HTTP_HEADER_UNKNOWN)) {
int index = clientIp.indexOf(COMMA);
if (index != -1) {
return clientIp.substring(0, index);
@ -77,7 +77,7 @@ public class BaseController {
}
clientIp = request.getHeader(HTTP_X_REAL_IP);
if (StringUtils.isNotEmpty(clientIp) && !StringUtils.equalsIgnoreCase(HTTP_HEADER_UNKNOWN, clientIp)) {
if (StringUtils.isNotEmpty(clientIp) && !clientIp.equalsIgnoreCase(HTTP_HEADER_UNKNOWN)) {
return clientIp;
}

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataAnalysisController.java

@ -188,7 +188,7 @@ public class DataAnalysisController extends BaseController{
public Result countQueueState(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value="projectId", required=false, defaultValue = "0") int projectId){
try{
logger.info("count command state, user:{}, start date: {}, end date:{}, project id {}",
logger.info("count command state, user:{}, project id {}",
loginUser.getUserName(), projectId);
Map<String, Object> result = dataAnalysisService.countQueueState(loginUser, projectId);
return returnDataList(result);

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java

@ -461,7 +461,7 @@ public class DataSourceController extends BaseController {
@GetMapping(value="/kerberos-startup-state")
@ResponseStatus(HttpStatus.OK)
public Result getKerberosStartupState(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser){
logger.info("login user {},get kerberos startup state : {}", loginUser.getUserName());
logger.info("login user {}", loginUser.getUserName());
try{
// if upload resource is HDFS and kerberos startup is true , else false
return success(Status.SUCCESS.getMsg(), CommonUtils.getKerberosStartupState());

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoginController.java

@ -22,10 +22,10 @@ import org.apache.dolphinscheduler.api.service.SessionService;
import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import io.swagger.annotations.*;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -16,12 +16,12 @@
*/
package org.apache.dolphinscheduler.api.controller;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import io.swagger.annotations.*;
import org.slf4j.Logger;
@ -460,8 +460,8 @@ public class ProcessDefinitionController extends BaseController{
}
}
if(deleteFailedIdList.size() > 0){
putMsg(result, Status.BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR,StringUtils.join(deleteFailedIdList.toArray(),","));
if(!deleteFailedIdList.isEmpty()){
putMsg(result, Status.BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR,StringUtils.join(deleteFailedIdList,","));
}else{
putMsg(result, Status.SUCCESS);
}

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java

@ -16,7 +16,6 @@
*/
package org.apache.dolphinscheduler.api.controller;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.utils.Result;
@ -26,6 +25,7 @@ import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.queue.TaskQueueFactory;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import io.swagger.annotations.*;
import org.slf4j.Logger;
@ -390,7 +390,7 @@ public class ProcessInstanceController extends BaseController{
}
}
if(deleteFailedIdList.size() > 0){
putMsg(result, Status.BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR,StringUtils.join(deleteFailedIdList.toArray(),","));
putMsg(result, Status.BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR,StringUtils.join(deleteFailedIdList,","));
}else{
putMsg(result, Status.SUCCESS);
}

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java

@ -267,11 +267,12 @@ public class ProjectController extends BaseController {
})
@PostMapping(value="/import-definition")
public Result importProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("file") MultipartFile file){
@RequestParam("file") MultipartFile file,
@RequestParam("projectName") String projectName){
try{
logger.info("import process definition by id, login user:{}",
loginUser.getUserName());
Map<String, Object> result = processDefinitionService.importProcessDefinition(loginUser,file);
logger.info("import process definition by id, login user:{}, project: {}",
loginUser.getUserName(), projectName);
Map<String, Object> result = processDefinitionService.importProcessDefinition(loginUser, file, projectName);
return returnDataList(result);
}catch (Exception e){
logger.error(IMPORT_PROCESS_DEFINE_ERROR.getMsg(),e);

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TenantController.java

@ -137,7 +137,7 @@ public class TenantController extends BaseController{
@GetMapping(value="/list")
@ResponseStatus(HttpStatus.OK)
public Result queryTenantlist(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser){
logger.info("login user {}, query tenant list");
logger.info("login user {}, query tenant list", loginUser.getUserName());
try{
Map<String, Object> result = tenantService.queryTenantList(loginUser);
return returnDataList(result);

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/UsersController.java

@ -324,7 +324,7 @@ public class UsersController extends BaseController{
@GetMapping(value="/get-user-info")
@ResponseStatus(HttpStatus.OK)
public Result getUserInfo(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser){
logger.info("login user {},get user info : {}", loginUser.getUserName());
logger.info("login user {},get user info", loginUser.getUserName());
try{
Map<String, Object> result = usersService.getUserInfo(loginUser);
return returnDataList(result);
@ -344,7 +344,7 @@ public class UsersController extends BaseController{
@GetMapping(value="/list")
@ResponseStatus(HttpStatus.OK)
public Result listUser(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser){
logger.info("login user {}, user list");
logger.info("login user {}, user list", loginUser.getUserName());
try{
Map<String, Object> result = usersService.queryAllGeneralUsers(loginUser);
return returnDataList(result);
@ -364,7 +364,7 @@ public class UsersController extends BaseController{
@GetMapping(value="/list-all")
@ResponseStatus(HttpStatus.OK)
public Result listAll(@RequestAttribute(value = Constants.SESSION_USER) User loginUser){
logger.info("login user {}, user list");
logger.info("login user {}, user list", loginUser.getUserName());
try{
Map<String, Object> result = usersService.queryUserList(loginUser);
return returnDataList(result);

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/AlertGroupService.java

@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.AlertGroup;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.UserAlertGroup;
@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.UserAlertGroupMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java

@ -21,8 +21,8 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.commons.lang3.StringUtils;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
@ -104,7 +104,7 @@ public class BaseService {
Cookie[] cookies = request.getCookies();
if (cookies != null && cookies.length > 0) {
for (Cookie cookie : cookies) {
if (StringUtils.equalsIgnoreCase(name, cookie.getName())) {
if (StringUtils.isNotEmpty(name) && name.equalsIgnoreCase(cookie.getName())) {
return cookie;
}
}

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java

@ -27,8 +27,8 @@ import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.queue.TaskQueueFactory;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.slf4j.Logger;

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -23,12 +23,12 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java

@ -20,9 +20,9 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.log.LogClient;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

497
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -16,9 +16,16 @@
*/
package org.apache.dolphinscheduler.api.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.dolphinscheduler.api.dto.treeview.Instance;
import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
@ -31,15 +38,8 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.slf4j.Logger;
@ -56,8 +56,10 @@ import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID;
@ -482,50 +484,21 @@ public class ProcessDefinitionService extends BaseDAGService {
* @param response response
*/
public void exportProcessDefinitionById(User loginUser, String projectName, Integer processDefinitionId, HttpServletResponse response) {
//export project info
Project project = projectMapper.queryByName(projectName);
//check user access for project
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
if (resultStatus == Status.SUCCESS) {
//get workflow info
ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId);
if (processDefinition != null) {
JSONObject jsonObject = JSONUtils.parseObject(processDefinition.getProcessDefinitionJson());
JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject taskNode = jsonArray.getJSONObject(i);
if (taskNode.get("type") != null && taskNode.get("type") != "") {
String taskType = taskNode.getString("type");
if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){
JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params"));
DataSource dataSource = dataSourceMapper.selectById((Integer) sqlParameters.get("datasource"));
if (dataSource != null) {
sqlParameters.put("datasourceName", dataSource.getName());
}
taskNode.put("params", sqlParameters);
}else if(taskType.equals(TaskType.DEPENDENT.name())){
JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence"));
if(dependentParameters != null){
JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList");
for (int j = 0; j < dependTaskList.size(); j++) {
JSONObject dependentTaskModel = dependTaskList.getJSONObject(j);
JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList");
for (int k = 0; k < dependItemList.size(); k++) {
JSONObject dependentItem = dependItemList.getJSONObject(k);
int definitionId = dependentItem.getInteger("definitionId");
ProcessDefinition definition = processDefineMapper.queryByDefineId(definitionId);
if(definition != null){
dependentItem.put("projectName",definition.getProjectName());
dependentItem.put("definitionName",definition.getName());
}
}
}
taskNode.put("dependence", dependentParameters);
}
}
}
}
jsonObject.put("tasks", jsonArray);
processDefinition.setProcessDefinitionJson(jsonObject.toString());
if (null != processDefinition) {
//correct task param which has data source or dependent param
String correctProcessDefinitionJson = addTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson());
processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson);
Map<String, Object> row = new LinkedHashMap<>();
row.put("projectName", processDefinition.getProjectName());
@ -535,8 +508,9 @@ public class ProcessDefinitionService extends BaseDAGService {
row.put("processDefinitionLocations", processDefinition.getLocations());
row.put("processDefinitionConnects", processDefinition.getConnects());
//schedule info
List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
if (schedules.size() > 0) {
if (!schedules.isEmpty()) {
Schedule schedule = schedules.get(0);
row.put("scheduleWarningType", schedule.getWarningType());
row.put("scheduleWarningGroupId", schedule.getWarningGroupId());
@ -556,6 +530,8 @@ public class ProcessDefinitionService extends BaseDAGService {
}
}
//create workflow json file
String rowsJson = JSONUtils.toJsonString(row);
response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE);
response.setHeader("Content-Disposition", "attachment;filename="+processDefinition.getName()+".json");
@ -564,38 +540,136 @@ public class ProcessDefinitionService extends BaseDAGService {
try {
out = response.getOutputStream();
buff = new BufferedOutputStream(out);
buff.write(rowsJson.getBytes("UTF-8"));
buff.write(rowsJson.getBytes(StandardCharsets.UTF_8));
buff.flush();
buff.close();
} catch (IOException e) {
e.printStackTrace();
logger.warn("export process fail", e);
}finally {
try {
buff.close();
out.close();
} catch (Exception e) {
e.printStackTrace();
if (null != buff) {
try {
buff.close();
} catch (Exception e) {
logger.warn("export process buffer not close", e);
}
}
if (null != out) {
try {
out.close();
} catch (Exception e) {
logger.warn("export process output stream not close", e);
}
}
}
}
}
}
/**
* correct task param which has datasource or dependent
* @param processDefinitionJson processDefinitionJson
* @return correct processDefinitionJson
*/
public String addTaskNodeSpecialParam(String processDefinitionJson) {
JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson);
JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject taskNode = jsonArray.getJSONObject(i);
if (StringUtils.isNotEmpty(taskNode.getString("type"))) {
String taskType = taskNode.getString("type");
if(checkTaskHasDataSource(taskType)){
// add sqlParameters
JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params"));
DataSource dataSource = dataSourceMapper.selectById((Integer) sqlParameters.get("datasource"));
if (null != dataSource) {
sqlParameters.put("datasourceName", dataSource.getName());
}
taskNode.put("params", sqlParameters);
}else if(checkTaskHasDependent(taskType)){
// add dependent param
JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence"));
if(null != dependentParameters){
JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList");
for (int j = 0; j < dependTaskList.size(); j++) {
JSONObject dependentTaskModel = dependTaskList.getJSONObject(j);
JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList");
for (int k = 0; k < dependItemList.size(); k++) {
JSONObject dependentItem = dependItemList.getJSONObject(k);
int definitionId = dependentItem.getInteger("definitionId");
ProcessDefinition definition = processDefineMapper.queryByDefineId(definitionId);
if(null != definition){
dependentItem.put("projectName",definition.getProjectName());
dependentItem.put("definitionName",definition.getName());
}
}
}
taskNode.put("dependence", dependentParameters);
}
}
}
}
jsonObject.put("tasks", jsonArray);
return jsonObject.toString();
}
/**
* check task if has dependent
* @param taskType task type
* @return if task has dependent return true else false
*/
private boolean checkTaskHasDependent(String taskType) {
return taskType.equals(TaskType.DEPENDENT.name());
}
/**
* check task if has data source info
* @param taskType task type
* @return if task has data source return true else false
*/
private boolean checkTaskHasDataSource(String taskType) {
return taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name());
}
/**
* check task if has sub process
* @param taskType task type
* @return if task has sub process return true else false
*/
private boolean checkTaskHasSubProcess(String taskType) {
return taskType.equals(TaskType.SUB_PROCESS.name());
}
/**
* import process definition
* @param loginUser login user
* @param file process metadata json file
* @param currentProjectName current project name
* @return
*/
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> importProcessDefinition(User loginUser, MultipartFile file) {
public Map<String, Object> importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) {
Map<String, Object> result = new HashMap<>(5);
JSONObject json = null;
try(InputStreamReader inputStreamReader = new InputStreamReader( file.getInputStream(), "UTF-8" )) {
JSONObject json;
//read workflow json
try(InputStreamReader inputStreamReader = new InputStreamReader( file.getInputStream(), StandardCharsets.UTF_8)) {
BufferedReader streamReader = new BufferedReader(inputStreamReader);
StringBuilder respomseStrBuilder = new StringBuilder();
String inputStr = "";
String inputStr;
while ((inputStr = streamReader.readLine())!= null){
respomseStrBuilder.append( inputStr );
}
json = JSONObject.parseObject( respomseStrBuilder.toString() );
if(json != null){
String projectName = null;
if(null != json){
String originProjectName = null;
String processDefinitionName = null;
String processDefinitionJson = null;
String processDefinitionDesc = null;
@ -613,151 +687,254 @@ public class ProcessDefinitionService extends BaseDAGService {
String scheduleWorkerGroupId = null;
String scheduleWorkerGroupName = null;
if (ObjectUtils.allNotNull(json.get("projectName"))) {
projectName = json.get("projectName").toString();
if (Objects.nonNull(json.get("projectName"))) {
originProjectName = json.get("projectName").toString();
} else {
putMsg(result, Status.DATA_IS_NULL, "processDefinitionName");
return result;
}
if (ObjectUtils.allNotNull(json.get("processDefinitionName"))) {
if (Objects.nonNull(json.get("processDefinitionName"))) {
processDefinitionName = json.get("processDefinitionName").toString();
} else {
putMsg(result, Status.DATA_IS_NULL, "processDefinitionName");
return result;
}
if (ObjectUtils.allNotNull(json.get("processDefinitionJson"))) {
if (Objects.nonNull(json.get("processDefinitionJson"))) {
processDefinitionJson = json.get("processDefinitionJson").toString();
} else {
putMsg(result, Status.DATA_IS_NULL, "processDefinitionJson");
return result;
}
if (ObjectUtils.allNotNull(json.get("processDefinitionDescription"))) {
if (Objects.nonNull(json.get("processDefinitionDescription"))) {
processDefinitionDesc = json.get("processDefinitionDescription").toString();
}
if (ObjectUtils.allNotNull(json.get("processDefinitionLocations"))) {
if (Objects.nonNull(json.get("processDefinitionLocations"))) {
processDefinitionLocations = json.get("processDefinitionLocations").toString();
}
if (ObjectUtils.allNotNull(json.get("processDefinitionConnects"))) {
if (Objects.nonNull(json.get("processDefinitionConnects"))) {
processDefinitionConnects = json.get("processDefinitionConnects").toString();
}
Project project = projectMapper.queryByName(projectName);
if(project != null){
processDefinitionName = recursionProcessDefinitionName(project.getId(), processDefinitionName, 1);
}
//check user access for org project
Project originProject = projectMapper.queryByName(originProjectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, originProject, originProjectName);
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson);
JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
for (int j = 0; j < jsonArray.size(); j++) {
JSONObject taskNode = jsonArray.getJSONObject(j);
String taskType = taskNode.getString("type");
if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())) {
JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params"));
List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName"));
if (dataSources.size() > 0) {
DataSource dataSource = dataSources.get(0);
sqlParameters.put("datasource", dataSource.getId());
}
taskNode.put("params", sqlParameters);
}else if(taskType.equals(TaskType.DEPENDENT.name())){
JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence"));
if(dependentParameters != null){
JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList");
for (int h = 0; h < dependTaskList.size(); h++) {
JSONObject dependentTaskModel = dependTaskList.getJSONObject(h);
JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList");
for (int k = 0; k < dependItemList.size(); k++) {
JSONObject dependentItem = dependItemList.getJSONObject(k);
Project dependentItemProject = projectMapper.queryByName(dependentItem.getString("projectName"));
if(dependentItemProject != null){
ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getId(),dependentItem.getString("definitionName"));
if(definition != null){
dependentItem.put("projectId",dependentItemProject.getId());
dependentItem.put("definitionId",definition.getId());
if (resultStatus == Status.SUCCESS) {
//use currentProjectName to query
Project targetProject = projectMapper.queryByName(currentProjectName);
if(null != targetProject){
processDefinitionName = recursionProcessDefinitionName(targetProject.getId(), processDefinitionName, 1);
}
JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson);
JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
for (int j = 0; j < jsonArray.size(); j++) {
JSONObject taskNode = jsonArray.getJSONObject(j);
String taskType = taskNode.getString("type");
if(checkTaskHasDataSource(taskType)) {
JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params"));
List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName"));
if (!dataSources.isEmpty()) {
DataSource dataSource = dataSources.get(0);
sqlParameters.put("datasource", dataSource.getId());
}
taskNode.put("params", sqlParameters);
}else if(checkTaskHasDependent(taskType)){
JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence"));
if(dependentParameters != null){
JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList");
for (int h = 0; h < dependTaskList.size(); h++) {
JSONObject dependentTaskModel = dependTaskList.getJSONObject(h);
JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList");
for (int k = 0; k < dependItemList.size(); k++) {
JSONObject dependentItem = dependItemList.getJSONObject(k);
Project dependentItemProject = projectMapper.queryByName(dependentItem.getString("projectName"));
if(dependentItemProject != null){
ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getId(),dependentItem.getString("definitionName"));
if(definition != null){
dependentItem.put("projectId",dependentItemProject.getId());
dependentItem.put("definitionId",definition.getId());
}
}
}
}
taskNode.put("dependence", dependentParameters);
}
taskNode.put("dependence", dependentParameters);
}
}
}
jsonObject.put("tasks", jsonArray);
Map<String, Object> createProcessDefinitionResult = createProcessDefinition(loginUser,projectName,processDefinitionName,jsonObject.toString(),processDefinitionDesc,processDefinitionLocations,processDefinitionConnects);
Integer processDefinitionId = null;
if (ObjectUtils.allNotNull(createProcessDefinitionResult.get("processDefinitionId"))) {
processDefinitionId = Integer.parseInt(createProcessDefinitionResult.get("processDefinitionId").toString());
}
if (ObjectUtils.allNotNull(json.get("scheduleCrontab")) && processDefinitionId != null) {
Date now = new Date();
Schedule scheduleObj = new Schedule();
scheduleObj.setProjectName(projectName);
scheduleObj.setProcessDefinitionId(processDefinitionId);
scheduleObj.setProcessDefinitionName(processDefinitionName);
scheduleObj.setCreateTime(now);
scheduleObj.setUpdateTime(now);
scheduleObj.setUserId(loginUser.getId());
scheduleObj.setUserName(loginUser.getUserName());
scheduleCrontab = json.get("scheduleCrontab").toString();
scheduleObj.setCrontab(scheduleCrontab);
if (ObjectUtils.allNotNull(json.get("scheduleStartTime"))) {
scheduleStartTime = json.get("scheduleStartTime").toString();
scheduleObj.setStartTime(DateUtils.stringToDate(scheduleStartTime));
}
if (ObjectUtils.allNotNull(json.get("scheduleEndTime"))) {
scheduleEndTime = json.get("scheduleEndTime").toString();
scheduleObj.setEndTime(DateUtils.stringToDate(scheduleEndTime));
}
if (ObjectUtils.allNotNull(json.get("scheduleWarningType"))) {
scheduleWarningType = json.get("scheduleWarningType").toString();
scheduleObj.setWarningType(WarningType.valueOf(scheduleWarningType));
}
if (ObjectUtils.allNotNull(json.get("scheduleWarningGroupId"))) {
scheduleWarningGroupId = json.get("scheduleWarningGroupId").toString();
scheduleObj.setWarningGroupId(Integer.parseInt(scheduleWarningGroupId));
}
if (ObjectUtils.allNotNull(json.get("scheduleFailureStrategy"))) {
scheduleFailureStrategy = json.get("scheduleFailureStrategy").toString();
scheduleObj.setFailureStrategy(FailureStrategy.valueOf(scheduleFailureStrategy));
}
if (ObjectUtils.allNotNull(json.get("scheduleReleaseState"))) {
scheduleReleaseState = json.get("scheduleReleaseState").toString();
scheduleObj.setReleaseState(ReleaseState.valueOf(scheduleReleaseState));
//recursive sub-process parameter correction map key for old process id value for new process id
Map<Integer, Integer> subProcessIdMap = new HashMap<>(20);
List<Object> subProcessList = jsonArray.stream()
.filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).getString("type")))
.collect(Collectors.toList());
if (!subProcessList.isEmpty()) {
importSubProcess(loginUser, targetProject, jsonArray, subProcessIdMap);
}
if (ObjectUtils.allNotNull(json.get("scheduleProcessInstancePriority"))) {
scheduleProcessInstancePriority = json.get("scheduleProcessInstancePriority").toString();
scheduleObj.setProcessInstancePriority(Priority.valueOf(scheduleProcessInstancePriority));
jsonObject.put("tasks", jsonArray);
Map<String, Object> createProcessDefinitionResult = createProcessDefinition(loginUser,currentProjectName,processDefinitionName,jsonObject.toString(),processDefinitionDesc,processDefinitionLocations,processDefinitionConnects);
Integer processDefinitionId = null;
if (Objects.nonNull(createProcessDefinitionResult.get("processDefinitionId"))) {
processDefinitionId = Integer.parseInt(createProcessDefinitionResult.get("processDefinitionId").toString());
}
if (ObjectUtils.allNotNull(json.get("scheduleWorkerGroupId"))) {
scheduleWorkerGroupId = json.get("scheduleWorkerGroupId").toString();
if(scheduleWorkerGroupId != null){
scheduleObj.setWorkerGroupId(Integer.parseInt(scheduleWorkerGroupId));
}else{
if (ObjectUtils.allNotNull(json.get("scheduleWorkerGroupName"))) {
scheduleWorkerGroupName = json.get("scheduleWorkerGroupName").toString();
List<WorkerGroup> workerGroups = workerGroupMapper.queryWorkerGroupByName(scheduleWorkerGroupName);
if(workerGroups.size() > 0){
scheduleObj.setWorkerGroupId(workerGroups.get(0).getId());
if (Objects.nonNull(json.get("scheduleCrontab")) && processDefinitionId != null) {
Date now = new Date();
Schedule scheduleObj = new Schedule();
scheduleObj.setProjectName(currentProjectName);
scheduleObj.setProcessDefinitionId(processDefinitionId);
scheduleObj.setProcessDefinitionName(processDefinitionName);
scheduleObj.setCreateTime(now);
scheduleObj.setUpdateTime(now);
scheduleObj.setUserId(loginUser.getId());
scheduleObj.setUserName(loginUser.getUserName());
scheduleCrontab = json.get("scheduleCrontab").toString();
scheduleObj.setCrontab(scheduleCrontab);
if (Objects.nonNull(json.get("scheduleStartTime"))) {
scheduleStartTime = json.get("scheduleStartTime").toString();
scheduleObj.setStartTime(DateUtils.stringToDate(scheduleStartTime));
}
if (Objects.nonNull(json.get("scheduleEndTime"))) {
scheduleEndTime = json.get("scheduleEndTime").toString();
scheduleObj.setEndTime(DateUtils.stringToDate(scheduleEndTime));
}
if (Objects.nonNull(json.get("scheduleWarningType"))) {
scheduleWarningType = json.get("scheduleWarningType").toString();
scheduleObj.setWarningType(WarningType.valueOf(scheduleWarningType));
}
if (Objects.nonNull(json.get("scheduleWarningGroupId"))) {
scheduleWarningGroupId = json.get("scheduleWarningGroupId").toString();
scheduleObj.setWarningGroupId(Integer.parseInt(scheduleWarningGroupId));
}
if (Objects.nonNull(json.get("scheduleFailureStrategy"))) {
scheduleFailureStrategy = json.get("scheduleFailureStrategy").toString();
scheduleObj.setFailureStrategy(FailureStrategy.valueOf(scheduleFailureStrategy));
}
if (Objects.nonNull(json.get("scheduleReleaseState"))) {
scheduleReleaseState = json.get("scheduleReleaseState").toString();
scheduleObj.setReleaseState(ReleaseState.valueOf(scheduleReleaseState));
}
if (Objects.nonNull(json.get("scheduleProcessInstancePriority"))) {
scheduleProcessInstancePriority = json.get("scheduleProcessInstancePriority").toString();
scheduleObj.setProcessInstancePriority(Priority.valueOf(scheduleProcessInstancePriority));
}
if (Objects.nonNull(json.get("scheduleWorkerGroupId"))) {
scheduleWorkerGroupId = json.get("scheduleWorkerGroupId").toString();
if(scheduleWorkerGroupId != null){
scheduleObj.setWorkerGroupId(Integer.parseInt(scheduleWorkerGroupId));
}else{
if (Objects.nonNull(json.get("scheduleWorkerGroupName"))) {
scheduleWorkerGroupName = json.get("scheduleWorkerGroupName").toString();
List<WorkerGroup> workerGroups = workerGroupMapper.queryWorkerGroupByName(scheduleWorkerGroupName);
if(!workerGroups.isEmpty()){
scheduleObj.setWorkerGroupId(workerGroups.get(0).getId());
}
}
}
}
scheduleMapper.insert(scheduleObj);
}
scheduleMapper.insert(scheduleObj);
putMsg(result, Status.SUCCESS);
return result;
}
}else{
putMsg(result, Status.EXPORT_PROCESS_DEFINE_BY_ID_ERROR);
putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR);
return result;
}
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
putMsg(result, Status.SUCCESS);
return result;
}
/**
* check import process has sub process
* recursion create sub process
* @param loginUser login user
* @param targetProject target project
*/
public void importSubProcess(User loginUser, Project targetProject, JSONArray jsonArray, Map<Integer, Integer> subProcessIdMap) {
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject taskNode = jsonArray.getJSONObject(i);
String taskType = taskNode.getString("type");
if (checkTaskHasSubProcess(taskType)) {
//get sub process info
JSONObject subParams = JSONUtils.parseObject(taskNode.getString("params"));
Integer subProcessId = subParams.getInteger("processDefinitionId");
ProcessDefinition subProcess = processDefineMapper.queryByDefineId(subProcessId);
String subProcessJson = subProcess.getProcessDefinitionJson();
//check current project has sub process
ProcessDefinition currentProjectSubProcess = processDefineMapper.queryByDefineName(targetProject.getId(), subProcess.getName());
if (null == currentProjectSubProcess) {
JSONArray subJsonArray = (JSONArray) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get("tasks");
List<Object> subProcessList = subJsonArray.stream()
.filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).getString("type")))
.collect(Collectors.toList());
if (!subProcessList.isEmpty()) {
importSubProcess(loginUser, targetProject, subJsonArray, subProcessIdMap);
//sub process processId correct
if (!subProcessIdMap.isEmpty()) {
for (Map.Entry<Integer, Integer> entry : subProcessIdMap.entrySet()) {
String oldSubProcessId = "\"processDefinitionId\":" + entry.getKey();
String newSubProcessId = "\"processDefinitionId\":" + entry.getValue();
subProcessJson = subProcessJson.replaceAll(oldSubProcessId, newSubProcessId);
}
subProcessIdMap.clear();
}
}
//if sub-process recursion
Date now = new Date();
//create sub process in target project
ProcessDefinition processDefine = new ProcessDefinition();
processDefine.setName(subProcess.getName());
processDefine.setVersion(subProcess.getVersion());
processDefine.setReleaseState(subProcess.getReleaseState());
processDefine.setProjectId(targetProject.getId());
processDefine.setUserId(loginUser.getId());
processDefine.setProcessDefinitionJson(subProcessJson);
processDefine.setDescription(subProcess.getDescription());
processDefine.setLocations(subProcess.getLocations());
processDefine.setConnects(subProcess.getConnects());
processDefine.setTimeout(subProcess.getTimeout());
processDefine.setTenantId(subProcess.getTenantId());
processDefine.setGlobalParams(subProcess.getGlobalParams());
processDefine.setCreateTime(now);
processDefine.setUpdateTime(now);
processDefine.setFlag(subProcess.getFlag());
processDefine.setReceivers(subProcess.getReceivers());
processDefine.setReceiversCc(subProcess.getReceiversCc());
processDefineMapper.insert(processDefine);
logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), processDefine.getName());
//modify task node
ProcessDefinition newSubProcessDefine = processDefineMapper.queryByDefineName(processDefine.getProjectId(),processDefine.getName());
if (null != newSubProcessDefine) {
subProcessIdMap.put(subProcessId, newSubProcessDefine.getId());
subParams.put("processDefinitionId", newSubProcessDefine.getId());
taskNode.put("params", subParams);
}
}
}
}
}
/**

19
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.dao.ProcessDao;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.slf4j.Logger;
@ -195,12 +194,11 @@ public class ProcessInstanceService extends BaseDAGService {
processInstance.setDuration(DateUtils.differSec(processInstance.getStartTime(),processInstance.getEndTime()));
}
Set<String> exclusionSet = new HashSet<String>(){{
add(Constants.CLASS);
add("locations");
add("connects");
add("processInstanceJson");
}};
Set<String> exclusionSet = new HashSet<String>();
exclusionSet.add(Constants.CLASS);
exclusionSet.add("locations");
exclusionSet.add("connects");
exclusionSet.add("processInstanceJson");
PageInfo pageInfo = new PageInfo<ProcessInstance>(pageNo, pageSize);
pageInfo.setTotalCount((int) processInstanceList.getTotal());
@ -490,13 +488,14 @@ public class ProcessInstanceService extends BaseDAGService {
}
ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processInstanceId);
List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstanceId);
//process instance priority
int processInstancePriority = processInstance.getProcessInstancePriority().ordinal();
if (processInstance == null) {
if (null == processInstance) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
//process instance priority
int processInstancePriority = processInstance.getProcessInstancePriority().ordinal();
// delete zk queue
if (CollectionUtils.isNotEmpty(taskInstanceList)){
for (TaskInstance taskInstance : taskInstanceList){

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
@ -38,7 +39,6 @@ import org.apache.dolphinscheduler.server.quartz.ProcessScheduleJob;
import org.apache.dolphinscheduler.server.quartz.QuartzExecutors;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.lang3.StringUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -374,12 +374,12 @@ public class SchedulerService extends BaseService {
try {
switch (scheduleStatus) {
case ONLINE: {
logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}, port: {}", project.getId(), processDefinition.getId(), masterServers);
logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
setSchedule(project.getId(), id);
break;
}
case OFFLINE: {
logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}, port: {}", project.getId(), processDefinition.getId(), masterServers);
logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
deleteSchedule(project.getId(), id);
break;
}

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SessionService.java

@ -139,7 +139,6 @@ public class SessionService extends BaseService{
* @param loginUser login user
*/
public void signOut(String ip, User loginUser) {
try {
/**
* query session by user id and ip
*/
@ -147,8 +146,5 @@ public class SessionService extends BaseService{
//delete session
sessionMapper.deleteById(session.getId());
}catch (Exception e){
}
}
}

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
@ -32,7 +33,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -116,10 +116,9 @@ public class TaskInstanceService extends BaseService {
page, project.getId(), processInstanceId, searchVal, taskName, statusArray, host, start, end
);
PageInfo pageInfo = new PageInfo<ProcessInstance>(pageNo, pageSize);
Set<String> exclusionSet = new HashSet<String>(){{
add(Constants.CLASS);
add("taskJson");
}};
Set<String> exclusionSet = new HashSet<>();
exclusionSet.add(Constants.CLASS);
exclusionSet.add("taskJson");
List<TaskInstance> taskInstanceList = taskInstanceIPage.getRecords();
for(TaskInstance taskInstance : taskInstanceList){
taskInstance.setDuration(DateUtils.differSec(taskInstance.getStartTime(),

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java

@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.api.service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
@ -26,6 +25,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.hadoop.fs.FileStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java

@ -29,7 +29,7 @@ import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.slf4j.Logger;

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java

@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
@ -27,7 +28,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java

@ -21,8 +21,8 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.commons.lang.StringUtils;
import java.text.MessageFormat;
import java.util.HashMap;
@ -148,7 +148,7 @@ public class CheckUtils {
* @return true if regex pattern is right, otherwise return false
*/
private static boolean regexChecks(String str, Pattern pattern) {
if (org.apache.commons.lang3.StringUtils.isEmpty(str)) {
if (StringUtils.isEmpty(str)) {
return false;
}

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZooKeeperState.java

@ -16,7 +16,7 @@
*/
package org.apache.dolphinscheduler.api.utils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java

@ -17,10 +17,10 @@
package org.apache.dolphinscheduler.api.utils;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

4
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/AbstractControllerTest.java

@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.api.controller;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.service.SessionService;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.commons.lang3.StringUtils;
import org.junit.*;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
@ -32,8 +32,6 @@ import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.web.context.WebApplicationContext;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
@Ignore
@RunWith(SpringRunner.class)

9
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AccessTokenServiceTest.java

@ -18,11 +18,12 @@ package org.apache.dolphinscheduler.api.service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.lang3.time.DateUtils;
import java.util.Calendar;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.AccessToken;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.AccessTokenMapper;
@ -152,7 +153,7 @@ public class AccessTokenServiceTest {
accessToken.setId(1);
accessToken.setUserId(1);
accessToken.setToken("AccessTokenServiceTest");
Date date = DateUtils.addDays(new Date(),30);
Date date = DateUtils.add(new Date(),Calendar.DAY_OF_MONTH, 30);
accessToken.setExpireTime(date);
return accessToken;
}
@ -175,7 +176,7 @@ public class AccessTokenServiceTest {
* @return
*/
private String getDate(){
Date date = DateUtils.addDays(new Date(),30);
return org.apache.dolphinscheduler.common.utils.DateUtils.dateToString(date);
Date date = DateUtils.add(new Date(), Calendar.DAY_OF_MONTH, 30);
return DateUtils.dateToString(date);
}
}

294
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -16,38 +16,80 @@
*/
package org.apache.dolphinscheduler.api.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.User;
import com.alibaba.fastjson.JSON;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.http.entity.ContentType;
import org.json.JSONException;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.skyscreamer.jsonassert.JSONAssert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.mock.web.MockMultipartFile;
import org.springframework.web.multipart.MultipartFile;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
@RunWith(SpringRunner.class)
@RunWith(MockitoJUnitRunner.Silent.class)
@SpringBootTest(classes = ApiApplicationServer.class)
public class ProcessDefinitionServiceTest {
private static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionServiceTest.class);
@Autowired
@InjectMocks
ProcessDefinitionService processDefinitionService;
@Mock
private DataSourceMapper dataSourceMapper;
@Mock
private ProcessDefinitionMapper processDefineMapper;
@Mock
private ProjectMapper projectMapper;
@Mock
private ProjectService projectService;
@Test
public void queryProccessDefinitionList() throws Exception {
String projectName = "project_test1";
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
Project project = getProject(projectName);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.PROJECT_NOT_FOUNT, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result);
Map<String, Object> map = processDefinitionService.queryProccessDefinitionList(loginUser,"project_test1");
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS));
logger.info(JSON.toJSONString(map));
@ -55,10 +97,20 @@ public class ProcessDefinitionServiceTest {
@Test
public void queryProcessDefinitionListPagingTest() throws Exception {
String projectName = "project_test1";
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
Project project = getProject(projectName);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.PROJECT_NOT_FOUNT, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result);
Map<String, Object> map = processDefinitionService.queryProcessDefinitionListPaging(loginUser, "project_test1", "",1, 5,0);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS));
@ -67,13 +119,243 @@ public class ProcessDefinitionServiceTest {
@Test
public void deleteProcessDefinitionByIdTest() throws Exception {
String projectName = "project_test1";
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
Project project = getProject(projectName);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> map = processDefinitionService.deleteProcessDefinitionById(loginUser, "li_sql_test", 6);
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.PROJECT_NOT_FOUNT, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result);
Map<String, Object> map = processDefinitionService.deleteProcessDefinitionById(loginUser, "project_test1", 6);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS));
logger.info(JSON.toJSONString(map));
}
/**
* add datasource param and dependent when export process
* @throws JSONException
*/
@Test
public void testAddTaskNodeSpecialParam() throws JSONException {
Mockito.when(dataSourceMapper.selectById(1)).thenReturn(getDataSource());
Mockito.when(processDefineMapper.queryByDefineId(2)).thenReturn(getProcessDefinition());
String sqlDependentJson = "{\"globalParams\":[]," +
"\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," +
"\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\"," +
"\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\"" +
",\"localParams\":[],\"connParams\":\"\"," +
"\"preStatements\":[],\"postStatements\":[]}," +
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," +
"\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\"," +
"\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," +
"\"preTasks\":[\"dependent\"]},{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\"," +
"\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"," +
"\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\"," +
"\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\"," +
"\"cycle\":\"day\",\"dateValue\":\"today\"}]}]},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
"\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
String corSqlDependentJson = processDefinitionService.addTaskNodeSpecialParam(sqlDependentJson);
JSONAssert.assertEquals(sqlDependentJson,corSqlDependentJson,false);
}
/**
* import sub process test
*/
@Test
public void testImportSubProcess() {
User loginUser = new User();
loginUser.setId(1);
loginUser.setUserType(UserType.ADMIN_USER);
Project testProject = getProject("test");
//Recursive subprocess sub2 process in sub1 process and sub1process in top process
String topProcessJson = "{\"globalParams\":[]," +
"\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-38634\",\"name\":\"shell1\"," +
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"}," +
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," +
"\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false}," +
"\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}," +
"{\"type\":\"SUB_PROCESS\",\"id\":\"tasks-44207\",\"name\":\"shell-4\"," +
"\"params\":{\"processDefinitionId\":39},\"description\":\"\",\"runFlag\":\"NORMAL\"," +
"\"dependence\":{},\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false}," +
"\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," +
"\"preTasks\":[\"shell1\"]}],\"tenantId\":1,\"timeout\":0}";
String sub1ProcessJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-84090\"," +
"\"name\":\"shell-4\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-4\\\"\"}," +
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," +
"\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false}," +
"\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]},{\"type\":\"SUB_PROCESS\"," +
"\"id\":\"tasks-87364\",\"name\":\"shell-5\"," +
"\"params\":{\"processDefinitionId\":46},\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{}," +
"\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
"\"workerGroupId\":-1,\"preTasks\":[\"shell-4\"]}],\"tenantId\":1,\"timeout\":0}";
String sub2ProcessJson = "{\"globalParams\":[]," +
"\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-52423\",\"name\":\"shell-5\"," +
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo \\\"shell-5\\\"\"},\"description\":\"\"," +
"\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," +
"\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
JSONObject jsonObject = JSONUtils.parseObject(topProcessJson);
JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
String originSubJson = jsonArray.toString();
Map<Integer, Integer> subProcessIdMap = new HashMap<>(20);
ProcessDefinition shellDefinition1 = new ProcessDefinition();
shellDefinition1.setId(39);
shellDefinition1.setName("shell-4");
shellDefinition1.setProjectId(2);
shellDefinition1.setProcessDefinitionJson(sub1ProcessJson);
ProcessDefinition shellDefinition2 = new ProcessDefinition();
shellDefinition2.setId(46);
shellDefinition2.setName("shell-5");
shellDefinition2.setProjectId(2);
shellDefinition2.setProcessDefinitionJson(sub2ProcessJson);
Mockito.when(processDefineMapper.queryByDefineId(39)).thenReturn(shellDefinition1);
Mockito.when(processDefineMapper.queryByDefineId(46)).thenReturn(shellDefinition2);
Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(), "shell-5")).thenReturn(null);
Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(), "shell-4")).thenReturn(null);
Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(), "testProject")).thenReturn(shellDefinition2);
processDefinitionService.importSubProcess(loginUser,testProject,jsonArray,subProcessIdMap);
String correctSubJson = jsonArray.toString();
Assert.assertEquals(originSubJson, correctSubJson);
}
@Test
public void testImportProcessDefinitionById() throws IOException {
String processJson = "{\"projectName\":\"testProject\",\"processDefinitionName\":\"shell-4\"," +
"\"processDefinitionJson\":\"{\\\"tenantId\\\":1,\\\"globalParams\\\":[]," +
"\\\"tasks\\\":[{\\\"workerGroupId\\\":-1,\\\"description\\\":\\\"\\\",\\\"runFlag\\\":\\\"NORMAL\\\"," +
"\\\"type\\\":\\\"SHELL\\\",\\\"params\\\":{\\\"rawScript\\\":\\\"#!/bin/bash\\\\necho \\\\\\\"shell-4\\\\\\\"\\\"," +
"\\\"localParams\\\":[],\\\"resourceList\\\":[]},\\\"timeout\\\":{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}," +
"\\\"maxRetryTimes\\\":\\\"0\\\",\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"name\\\":\\\"shell-4\\\"," +
"\\\"dependence\\\":{},\\\"retryInterval\\\":\\\"1\\\",\\\"preTasks\\\":[],\\\"id\\\":\\\"tasks-84090\\\"}," +
"{\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"name\\\":\\\"shell-5\\\",\\\"workerGroupId\\\":-1," +
"\\\"description\\\":\\\"\\\",\\\"dependence\\\":{},\\\"preTasks\\\":[\\\"shell-4\\\"],\\\"id\\\":\\\"tasks-87364\\\"," +
"\\\"runFlag\\\":\\\"NORMAL\\\",\\\"type\\\":\\\"SUB_PROCESS\\\",\\\"params\\\":{\\\"processDefinitionId\\\":46}," +
"\\\"timeout\\\":{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}}],\\\"timeout\\\":0}\"," +
"\"processDefinitionDescription\":\"\",\"processDefinitionLocations\":\"{\\\"tasks-84090\\\":{\\\"name\\\":\\\"shell-4\\\"," +
"\\\"targetarr\\\":\\\"\\\",\\\"x\\\":128,\\\"y\\\":114},\\\"tasks-87364\\\":{\\\"name\\\":\\\"shell-5\\\"," +
"\\\"targetarr\\\":\\\"tasks-84090\\\",\\\"x\\\":266,\\\"y\\\":115}}\"," +
"\"processDefinitionConnects\":\"[{\\\"endPointSourceId\\\":\\\"tasks-84090\\\"," +
"\\\"endPointTargetId\\\":\\\"tasks-87364\\\"}]\"}";
String subProcessJson = "{\"globalParams\":[]," +
"\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-52423\",\"name\":\"shell-5\"," +
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo \\\"shell-5\\\"\"},\"description\":\"\"," +
"\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," +
"\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
FileUtils.writeStringToFile(new File("/tmp/task.json"),processJson);
File file = new File("/tmp/task.json");
FileInputStream fileInputStream = new FileInputStream("/tmp/task.json");
MultipartFile multipartFile = new MockMultipartFile(file.getName(), file.getName(),
ContentType.APPLICATION_OCTET_STREAM.toString(), fileInputStream);
User loginUser = new User();
loginUser.setId(1);
loginUser.setUserType(UserType.ADMIN_USER);
String currentProjectName = "testProject";
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.SUCCESS, currentProjectName);
ProcessDefinition shellDefinition2 = new ProcessDefinition();
shellDefinition2.setId(46);
shellDefinition2.setName("shell-5");
shellDefinition2.setProjectId(2);
shellDefinition2.setProcessDefinitionJson(subProcessJson);
Mockito.when(projectMapper.queryByName(currentProjectName)).thenReturn(getProject(currentProjectName));
Mockito.when(projectService.checkProjectAndAuth(loginUser, getProject(currentProjectName), currentProjectName)).thenReturn(result);
Mockito.when(processDefineMapper.queryByDefineId(46)).thenReturn(shellDefinition2);
//import process
Map<String, Object> importProcessResult = processDefinitionService.importProcessDefinition(loginUser, multipartFile, currentProjectName);
Assert.assertEquals(Status.SUCCESS, importProcessResult.get(Constants.STATUS));
boolean delete = file.delete();
Assert.assertTrue(delete);
}
/**
* get mock datasource
* @return DataSource
*/
private DataSource getDataSource(){
DataSource dataSource = new DataSource();
dataSource.setId(2);
dataSource.setName("test");
return dataSource;
}
/**
* get mock processDefinition
* @return ProcessDefinition
*/
private ProcessDefinition getProcessDefinition(){
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(46);
processDefinition.setName("testProject");
processDefinition.setProjectId(2);
return processDefinition;
}
/**
* get mock Project
* @param projectName projectName
* @return Project
*/
private Project getProject(String projectName){
Project project = new Project();
project.setId(1);
project.setName(projectName);
project.setUserId(1);
return project;
}
private void putMsg(Map<String, Object> result, Status status, Object... statusParams) {
result.put(Constants.STATUS, status);
if (statusParams != null && statusParams.length > 0) {
result.put(Constants.MSG, MessageFormat.format(status.getMsg(), statusParams));
} else {
result.put(Constants.MSG, status.getMsg());
}
}
}

10
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SessionServiceTest.java

@ -16,10 +16,11 @@
*/
package org.apache.dolphinscheduler.api.service;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import java.util.Calendar;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Session;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.SessionMapper;
@ -121,6 +122,9 @@ public class SessionServiceTest {
String ip = "127.0.0.1";
User user = new User();
user.setId(userId);
Mockito.when(sessionMapper.queryByUserIdAndIp(userId,ip)).thenReturn(getSession());
sessionService.signOut(ip ,user);
}
@ -130,7 +134,7 @@ public class SessionServiceTest {
Session session = new Session();
session.setId(sessionId);
session.setIp("127.0.0.1");
session.setLastLoginTime(DateUtils.addDays(new Date(),40));
session.setLastLoginTime(DateUtils.add(new Date(), Calendar.DAY_OF_MONTH, 40));
session.setUserId(1);
return session;
}

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/ClickHouseDataSource.java

@ -66,7 +66,6 @@ public class ClickHouseDataSource extends BaseDataSource {
con.close();
} catch (SQLException e) {
logger.error("ClickHouse datasource try conn close conn error", e);
throw e;
}
}
}

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/DB2ServerDataSource.java

@ -66,7 +66,6 @@ public class DB2ServerDataSource extends BaseDataSource {
con.close();
} catch (SQLException e) {
logger.error("DB2 Server datasource try conn close conn error", e);
throw e;
}
}
}

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/HiveDataSource.java

@ -69,7 +69,6 @@ public class HiveDataSource extends BaseDataSource {
con.close();
} catch (SQLException e) {
logger.error("hive datasource try conn close conn error", e);
throw e;
}
}
}

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/MySQLDataSource.java

@ -64,7 +64,6 @@ public class MySQLDataSource extends BaseDataSource {
con.close();
} catch (SQLException e) {
logger.error("Mysql datasource try conn close conn error", e);
throw e;
}
}
}

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/OracleDataSource.java

@ -66,7 +66,6 @@ public class OracleDataSource extends BaseDataSource {
con.close();
} catch (SQLException e) {
logger.error("Oracle datasource try conn close conn error", e);
throw e;
}
}
}

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/PostgreDataSource.java

@ -68,7 +68,6 @@ public class PostgreDataSource extends BaseDataSource {
con.close();
} catch (SQLException e) {
logger.error("Postgre datasource try conn close conn error", e);
throw e;
}
}
}

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SQLServerDataSource.java

@ -62,7 +62,6 @@ public class SQLServerDataSource extends BaseDataSource {
con.close();
} catch (SQLException e) {
logger.error("SQL Server datasource try conn close conn error", e);
throw e;
}
}
}

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/job/db/SparkDataSource.java

@ -71,7 +71,6 @@ public class SparkDataSource extends BaseDataSource {
con.close();
} catch (SQLException e) {
logger.error("Spark datasource try conn close conn error", e);
throw e;
}
}
}

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DateInterval.java

@ -59,4 +59,5 @@ public class DateInterval {
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
}

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNodeRelation.java

@ -53,6 +53,7 @@ public class TaskNodeRelation {
}
@Override
public boolean equals(Object o){
if (!(o instanceof TaskNodeRelation)) {
return false;

37
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java

@ -37,8 +37,27 @@ public class TaskQueueZkImpl implements ITaskQueue {
private static final Logger logger = LoggerFactory.getLogger(TaskQueueZkImpl.class);
private final ZookeeperOperator zookeeperOperator;
@Autowired
private ZookeeperOperator zookeeperOperator;
public TaskQueueZkImpl(ZookeeperOperator zookeeperOperator) {
this.zookeeperOperator = zookeeperOperator;
try {
String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
String tasksKillPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);
for (String key : new String[]{tasksQueuePath,tasksKillPath}){
if (!zookeeperOperator.isExisted(key)){
zookeeperOperator.persist(key, "");
logger.info("create tasks queue parent node success : {}", key);
}
}
} catch (Exception e) {
logger.error("create tasks queue parent node failure", e);
}
}
/**
* get all tasks from tasks queue
@ -321,20 +340,20 @@ public class TaskQueueZkImpl implements ITaskQueue {
public void delete(){
try {
String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);
String tasksKillPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);
for(String taskQueuePath : new String[]{tasksQueuePath,tasksCancelPath}){
if(zookeeperOperator.isExisted(taskQueuePath)){
List<String> list = zookeeperOperator.getChildrenKeys(taskQueuePath);
for (String key : new String[]{tasksQueuePath,tasksKillPath}){
if (zookeeperOperator.isExisted(key)){
List<String> list = zookeeperOperator.getChildrenKeys(key);
for (String task : list) {
zookeeperOperator.remove(taskQueuePath + Constants.SINGLE_SLASH + task);
logger.info("delete task from tasks queue : {}/{} ",taskQueuePath,task);
zookeeperOperator.remove(key + Constants.SINGLE_SLASH + task);
logger.info("delete task from tasks queue : {}/{} ", key, task);
}
}
}
} catch (Exception e) {
logger.error("delete all tasks in tasks queue failure",e);
logger.error("delete all tasks in tasks queue failure", e);
}
}
@ -344,7 +363,7 @@ public class TaskQueueZkImpl implements ITaskQueue {
* @return
*/
public String getTasksPath(String key){
return "/dolphinscheduler" + Constants.SINGLE_SLASH + key;
return zookeeperOperator.getZookeeperConfig().getDsRoot() + Constants.SINGLE_SLASH + key;
}
}

26
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ArrayUtils.java

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
public class ArrayUtils {
public static boolean isEmpty(final int[] array) {
return array == null || array.length == 0;
}
}

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ConnectionUtils.java

@ -51,7 +51,6 @@ public class ConnectionUtils {
}
} catch (SQLException e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(e);
} finally {
try {
if (stmt != null) {
@ -60,7 +59,6 @@ public class ConnectionUtils {
}
} catch (SQLException e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(e);
} finally {
try {
if (conn != null) {
@ -69,7 +67,6 @@ public class ConnectionUtils {
}
} catch (SQLException e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(e);
}
}
}

398
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java

File diff suppressed because one or more lines are too long

12
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/EnumUtils.java

@ -30,4 +30,16 @@ public class EnumUtils {
return null;
}
}
public static <E extends Enum<E>> boolean isValidEnum(final Class<E> enumClass, final String enumName) {
if (enumName == null) {
return false;
}
try {
Enum.valueOf(enumClass, enumName);
return true;
} catch (final IllegalArgumentException ex) {
return false;
}
}
}

5
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java

@ -116,9 +116,9 @@ public class HadoopUtils implements Closeable {
if(StringUtils.isNotBlank(defaultFSProp)){
Map<String, String> fsRelatedProps = PropertyUtils.getPrefixedProperties("fs.");
configuration.set(Constants.FS_DEFAULTFS,defaultFSProp);
fsRelatedProps.entrySet().stream().forEach(entry -> configuration.set(entry.getKey(), entry.getValue()));
fsRelatedProps.forEach((key, value) -> configuration.set(key, value));
}else{
logger.error("property:{} can not to be empty, please set!");
logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS );
throw new RuntimeException("property:{} can not to be empty, please set!");
}
}else{
@ -337,7 +337,6 @@ public class HadoopUtils implements Closeable {
* @throws Exception errors
*/
public FileStatus[] listFileStatus(String filePath)throws Exception{
Path path = new Path(filePath);
try {
return fs.listStatus(new Path(filePath));
} catch (IOException e) {

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HttpUtils.java

@ -67,7 +67,7 @@ public class HttpUtils {
logger.warn("http entity is null");
}
}else{
logger.error("htt get:{} response status code is not 200!");
logger.error("http get:{} response status code is not 200!", response.getStatusLine().getStatusCode());
}
}catch (Exception e){
logger.error(e.getMessage(),e);

45
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/IOUtils.java

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
public class IOUtils {
public static void closeQuietly(InputStream fis){
if(fis != null){
try {
fis.close();
} catch (IOException ignore) {
}
}
}
public static void closeQuietly(InputStreamReader reader){
if(reader != null){
try {
reader.close();
} catch (IOException ignore) {
}
}
}
}

30
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java

@ -18,6 +18,8 @@ package org.apache.dolphinscheduler.common.utils;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Objects;
import java.util.regex.Pattern;
@ -122,4 +124,32 @@ public class StringUtils {
}
return false;
}
public static String join(final Iterable<?> iterable, final String separator){
Iterator<?> iterator = iterable.iterator();
if (iterator == null) {
return null;
}
if (!iterator.hasNext()) {
return EMPTY;
}
final Object first = iterator.next();
if (!iterable.iterator().hasNext()) {
return Objects.toString(first, "");
}
final StringBuilder buf = new StringBuilder(64);
if (first != null) {
buf.append(first);
}
while (iterator.hasNext()) {
if (separator != null) {
buf.append(separator);
}
final Object obj = iterator.next();
if (obj != null) {
buf.append(obj);
}
}
return buf.toString();
}
}

174
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DependentUtilsTest.java

@ -34,18 +34,73 @@ public class DependentUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(ShellExecutorTest.class);
@Test
public void getDependResultForRelation() {
//failed
DependentRelation dependentRelation = DependentRelation.AND;
List<DependResult> dependResultList = new ArrayList<>();
dependResultList.add(DependResult.FAILED);
dependResultList.add(DependResult.SUCCESS);
DependResult result = DependentUtils.getDependResultForRelation( dependentRelation, dependResultList);
DependResult result = DependentUtils.getDependResultForRelation(dependentRelation, dependResultList);
Assert.assertEquals(result, DependResult.FAILED);
//waiting
dependResultList = new ArrayList<>();
dependResultList.add(DependResult.WAITING);
dependResultList.add(DependResult.SUCCESS);
result = DependentUtils.getDependResultForRelation(dependentRelation, dependResultList);
Assert.assertEquals(result, DependResult.WAITING);
//success
dependResultList = new ArrayList<>();
dependResultList.add(DependResult.SUCCESS);
dependResultList.add(DependResult.SUCCESS);
result = DependentUtils.getDependResultForRelation(dependentRelation, dependResultList);
Assert.assertEquals(result, DependResult.SUCCESS);
//one success
dependResultList = new ArrayList<>();
dependResultList.add(DependResult.SUCCESS);
result = DependentUtils.getDependResultForRelation(dependentRelation, dependResultList);
Assert.assertEquals(result, DependResult.SUCCESS);
//one failed
dependResultList = new ArrayList<>();
dependResultList.add(DependResult.FAILED);
result = DependentUtils.getDependResultForRelation(dependentRelation, dependResultList);
Assert.assertEquals(result, DependResult.FAILED);
//or success
dependentRelation = DependentRelation.OR;
dependResultList = new ArrayList<>();
dependResultList.add(DependResult.FAILED);
dependResultList.add(DependResult.SUCCESS);
result = DependentUtils.getDependResultForRelation(dependentRelation, dependResultList);
Assert.assertEquals(result, DependResult.SUCCESS);
//waiting
dependResultList = new ArrayList<>();
dependResultList.add(DependResult.WAITING);
dependResultList.add(DependResult.FAILED);
result = DependentUtils.getDependResultForRelation(dependentRelation, dependResultList);
Assert.assertEquals(result, DependResult.WAITING);
Assert.assertEquals(DependentUtils.getDependResultForRelation( dependentRelation, dependResultList),
DependResult.SUCCESS);
//success
dependResultList = new ArrayList<>();
dependResultList.add(DependResult.SUCCESS);
dependResultList.add(DependResult.SUCCESS);
result = DependentUtils.getDependResultForRelation(dependentRelation, dependResultList);
Assert.assertEquals(result, DependResult.SUCCESS);
//one success
dependResultList = new ArrayList<>();
dependResultList.add(DependResult.SUCCESS);
result = DependentUtils.getDependResultForRelation(dependentRelation, dependResultList);
Assert.assertEquals(result, DependResult.SUCCESS);
//one failed
dependResultList = new ArrayList<>();
dependResultList.add(DependResult.FAILED);
result = DependentUtils.getDependResultForRelation(dependentRelation, dependResultList);
Assert.assertEquals(result, DependResult.FAILED);
}
@Test
@ -101,6 +156,115 @@ public class DependentUtilsTest {
Assert.assertEquals(dateIntervals.get(0), monthHead);
Assert.assertEquals(dateIntervals.get(dateIntervals.size() - 1), monthThis);
dateIntervals = DependentUtils.getDateIntervalList(DateUtils.stringToDate("2019-02-04 10:00:00"), "last1Hour");
DateInterval expect = new DateInterval(DateUtils.stringToDate("2019-02-04 09:00:00"),
DateUtils.getEndOfHour(DateUtils.stringToDate("2019-02-04 09:00:00")));
Assert.assertEquals(expect, dateIntervals.get(0));
dateIntervals = DependentUtils.getDateIntervalList(DateUtils.stringToDate("2019-02-04 10:00:00"), "last2Hours");
expect = new DateInterval(DateUtils.stringToDate("2019-02-04 08:00:00"),
DateUtils.getEndOfHour(DateUtils.stringToDate("2019-02-04 08:00:00")));
Assert.assertEquals(expect, dateIntervals.get(0));
dateIntervals = DependentUtils.getDateIntervalList(DateUtils.stringToDate("2019-02-04 10:00:00"), "last3Hours");
expect = new DateInterval(DateUtils.stringToDate("2019-02-04 07:00:00"),
DateUtils.getEndOfHour(DateUtils.stringToDate("2019-02-04 07:00:00")));
Assert.assertEquals(expect, dateIntervals.get(0));
dateValue = "last3Days";
dateIntervals = DependentUtils.getDateIntervalList(DateUtils.stringToDate("2019-02-10 07:00:00"), dateValue);
expect = new DateInterval(DateUtils.stringToDate("2019-02-07 00:00:00"),
DateUtils.getEndOfDay(DateUtils.stringToDate("2019-02-07 00:00:00")));
Assert.assertEquals(expect, dateIntervals.get(0));
dateValue = "last7Days";
dateIntervals = DependentUtils.getDateIntervalList(DateUtils.stringToDate("2019-02-10 07:00:00"), dateValue);
expect = new DateInterval(DateUtils.stringToDate("2019-02-03 00:00:00"),
DateUtils.getEndOfDay(DateUtils.stringToDate("2019-02-03 00:00:00")));
Assert.assertEquals(expect, dateIntervals.get(0));
dateValue = "lastWeek";
dateIntervals = DependentUtils.getDateIntervalList(DateUtils.stringToDate("2019-02-10 07:00:00"), dateValue);
expect = new DateInterval(DateUtils.stringToDate("2019-01-28 00:00:00"),
DateUtils.getEndOfDay(DateUtils.stringToDate("2019-01-28 00:00:00")));
Assert.assertEquals(expect, dateIntervals.get(0));
expect = new DateInterval(DateUtils.stringToDate("2019-02-03 00:00:00"),
DateUtils.getEndOfDay(DateUtils.stringToDate("2019-02-03 00:00:00")));
Assert.assertEquals(expect, dateIntervals.get(6));
Assert.assertEquals(7, dateIntervals.size());
dateValue = "lastMonday";
dateIntervals = DependentUtils.getDateIntervalList(DateUtils.stringToDate("2019-02-10 07:00:00"), dateValue);
expect = new DateInterval(DateUtils.stringToDate("2019-01-28 00:00:00"),
DateUtils.getEndOfDay(DateUtils.stringToDate("2019-01-28 00:00:00")));
Assert.assertEquals(expect, dateIntervals.get(0));
Assert.assertEquals(1, dateIntervals.size());
dateValue = "lastTuesday";
dateIntervals = DependentUtils.getDateIntervalList(DateUtils.stringToDate("2019-02-10 07:00:00"), dateValue);
expect = new DateInterval(DateUtils.stringToDate("2019-01-29 00:00:00"),
DateUtils.getEndOfDay(DateUtils.stringToDate("2019-01-29 00:00:00")));
Assert.assertEquals(expect, dateIntervals.get(0));
Assert.assertEquals(1, dateIntervals.size());
dateValue = "lastWednesday";
dateIntervals = DependentUtils.getDateIntervalList(DateUtils.stringToDate("2019-02-10 07:00:00"), dateValue);
expect = new DateInterval(DateUtils.stringToDate("2019-01-30 00:00:00"),
DateUtils.getEndOfDay(DateUtils.stringToDate("2019-01-30 00:00:00")));
Assert.assertEquals(expect, dateIntervals.get(0));
Assert.assertEquals(1, dateIntervals.size());
dateValue = "lastThursday";
dateIntervals = DependentUtils.getDateIntervalList(DateUtils.stringToDate("2019-02-10 07:00:00"), dateValue);
expect = new DateInterval(DateUtils.stringToDate("2019-01-31 00:00:00"),
DateUtils.getEndOfDay(DateUtils.stringToDate("2019-01-31 00:00:00")));
Assert.assertEquals(expect, dateIntervals.get(0));
Assert.assertEquals(1, dateIntervals.size());
dateValue = "lastFriday";
dateIntervals = DependentUtils.getDateIntervalList(DateUtils.stringToDate("2019-02-10 07:00:00"), dateValue);
expect = new DateInterval(DateUtils.stringToDate("2019-02-01 00:00:00"),
DateUtils.getEndOfDay(DateUtils.stringToDate("2019-02-01 00:00:00")));
Assert.assertEquals(expect, dateIntervals.get(0));
Assert.assertEquals(1, dateIntervals.size());
dateValue = "lastSaturday";
dateIntervals = DependentUtils.getDateIntervalList(DateUtils.stringToDate("2019-02-10 07:00:00"), dateValue);
expect = new DateInterval(DateUtils.stringToDate("2019-02-02 00:00:00"),
DateUtils.getEndOfDay(DateUtils.stringToDate("2019-02-02 00:00:00")));
Assert.assertEquals(expect, dateIntervals.get(0));
Assert.assertEquals(1, dateIntervals.size());
dateValue = "lastSunday";
dateIntervals = DependentUtils.getDateIntervalList(DateUtils.stringToDate("2019-02-10 07:00:00"), dateValue);
expect = new DateInterval(DateUtils.stringToDate("2019-02-03 00:00:00"),
DateUtils.getEndOfDay(DateUtils.stringToDate("2019-02-03 00:00:00")));
Assert.assertEquals(expect, dateIntervals.get(0));
Assert.assertEquals(1, dateIntervals.size());
dateValue = "lastMonth";
dateIntervals = DependentUtils.getDateIntervalList(DateUtils.stringToDate("2019-02-10 07:00:00"), dateValue);
expect = new DateInterval(DateUtils.stringToDate("2019-01-01 00:00:00"),
DateUtils.getEndOfDay(DateUtils.stringToDate("2019-01-01 00:00:00")));
Assert.assertEquals(expect, dateIntervals.get(0));
expect = new DateInterval(DateUtils.stringToDate("2019-01-31 00:00:00"),
DateUtils.getEndOfDay(DateUtils.stringToDate("2019-01-31 00:00:00")));
Assert.assertEquals(expect, dateIntervals.get(30));
Assert.assertEquals(31, dateIntervals.size());
dateValue = "lastMonthBegin";
dateIntervals = DependentUtils.getDateIntervalList(DateUtils.stringToDate("2019-02-10 07:00:00"), dateValue);
expect = new DateInterval(DateUtils.stringToDate("2019-01-01 00:00:00"),
DateUtils.getEndOfDay(DateUtils.stringToDate("2019-01-01 00:00:00")));
Assert.assertEquals(expect, dateIntervals.get(0));
Assert.assertEquals(1, dateIntervals.size());
dateValue = "lastMonthEnd";
dateIntervals = DependentUtils.getDateIntervalList(DateUtils.stringToDate("2019-02-10 07:00:00"), dateValue);
expect = new DateInterval(DateUtils.stringToDate("2019-01-31 00:00:00"),
DateUtils.getEndOfDay(DateUtils.stringToDate("2019-01-31 00:00:00")));
Assert.assertEquals(expect, dateIntervals.get(0));
Assert.assertEquals(1, dateIntervals.size());
}
@Test
@ -197,4 +361,4 @@ public class DependentUtilsTest {
Assert.assertEquals(dateIntervals.get(30), di2);
}
}
}

170
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PreconditionsTest.java

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
public class PreconditionsTest {
public static final Logger logger = LoggerFactory.getLogger(PreconditionsTest.class);
/**
* Test checkNotNull
*/
@Test
public void testCheckNotNull() throws Exception {
String testReference = "test reference";
//test reference is not null
Assert.assertEquals(testReference,Preconditions.checkNotNull(testReference));
Assert.assertEquals(testReference,Preconditions.checkNotNull(testReference,"reference is null"));
Assert.assertEquals(testReference,Preconditions.checkNotNull(testReference,"%s is null",testReference));
//test reference is null
try {
Preconditions.checkNotNull(null);
} catch (NullPointerException ex) {
assertNull(ex.getMessage());
}
try {
Preconditions.checkNotNull("");
} catch (NullPointerException ex) {
assertNull(ex.getMessage());
}
//test reference is null ,expect contains errorMessage
try {
Preconditions.checkNotNull(null,"reference is null");
} catch (NullPointerException ex) {
assertThat(ex.getMessage(), containsString("reference is null"));
}
try {
Preconditions.checkNotNull("","reference is null");
} catch (NullPointerException ex) {
assertThat(ex.getMessage(), containsString("reference is null"));
}
//test reference is null ,expect contains errorMessageTemplate and errorMessageArgs
try {
Preconditions.checkNotNull(null,"%s is null",testReference);
} catch (NullPointerException ex) {
assertThat(ex.getMessage(), containsString(testReference + " is null"));
}
try {
Preconditions.checkNotNull("","%s is null",testReference);
} catch (NullPointerException ex) {
assertThat(ex.getMessage(), containsString(testReference + " is null"));
}
}
/**
* Test checkArgument
*/
@Test
public void testCheckArgument() throws Exception {
int argument = 100;
//boolean condition is true
Preconditions.checkArgument(argument > 0 && argument < 200);
//boolean condition is false
try {
Preconditions.checkArgument(argument > 0 && argument < 50);
} catch (IllegalArgumentException ex) {
assertNull(ex.getMessage());
}
//boolean condition is false ,expect contains errorMessage
try {
Preconditions.checkArgument(argument > 300, "argument is error");
} catch (IllegalArgumentException ex) {
assertThat(ex.getMessage(), containsString("argument is error"));
}
//boolean condition is false,expect contains errorMessageTemplate and errorMessageArgs
try {
Preconditions.checkArgument(argument > 0 && argument < 99, "argument %s is error",argument);
} catch (IllegalArgumentException ex) {
assertThat(ex.getMessage(), containsString( "argument " + argument + " is error"));
}
}
/**
* Test checkState
*/
@Test
public void testCheckState() throws Exception {
int state = 1;
//boolean condition is true
Preconditions.checkState(state == 1);
Preconditions.checkState(state > -1);
//boolean condition is false
try {
Preconditions.checkState(state > 2);
} catch (IllegalStateException ex) {
assertNull(ex.getMessage());
}
//boolean condition is false ,expect contains errorMessage
try {
Preconditions.checkState(state < 1, "state is error");
} catch (IllegalStateException ex) {
assertThat(ex.getMessage(), containsString("state is error"));
}
//boolean condition is false,expect contains errorMessageTemplate and errorMessageArgs
try {
Preconditions.checkState(state < -1 , "state %s is error",state);
} catch (IllegalStateException ex) {
assertThat(ex.getMessage(), containsString( "state " + state + " is error"));
}
}
/**
* Test checkElementIndex
*/
@Test
public void testCheckElementIndex() throws Exception {
int index = 2;
int size = 30;
//boolean condition is true
Preconditions.checkElementIndex(index, size);
//boolean condition is false
try {
Preconditions.checkElementIndex(-1, 10);
} catch (IndexOutOfBoundsException ex) {
assertThat(ex.getMessage(), containsString("Index: -1, Size: 10"));
}
//boolean condition is false ,expect contains errorMessage
try {
Preconditions.checkElementIndex(100, 50, "index is greater than size");
} catch (IndexOutOfBoundsException ex) {
assertThat(ex.getMessage(), containsString("index is greater than size Index: 100, Size: 50"));
}
}
}

8
dolphinscheduler-dao/pom.xml

@ -108,14 +108,6 @@
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java

@ -17,9 +17,9 @@
package org.apache.dolphinscheduler.dao;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.ShowType;

12
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java

@ -18,20 +18,19 @@ package org.apache.dolphinscheduler.dao;
import com.alibaba.fastjson.JSONObject;
import com.cronutils.model.Cron;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.queue.TaskQueueFactory;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.utils.ArrayUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.IpUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.dao.utils.cron.CronUtils;
@ -458,9 +457,12 @@ public class ProcessDao {
if(tenantId >= 0){
tenant = tenantMapper.queryById(tenantId);
}
if(tenant == null){
if(null == tenant){
User user = userMapper.selectById(userId);
tenant = tenantMapper.queryById(user.getTenantId());
if (null != user) {
tenant = tenantMapper.queryById(user.getTenantId());
}
}
return tenant;
}

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java

@ -20,11 +20,11 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.TaskRecordStatus;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskRecord;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

15
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java

@ -17,14 +17,13 @@
package org.apache.dolphinscheduler.dao.upgrade;
import com.alibaba.druid.pool.DruidDataSource;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.SchemaUtils;
import org.apache.dolphinscheduler.common.utils.ScriptRunner;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.AbstractBaseDao;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -156,7 +155,9 @@ public abstract class UpgradeDao extends AbstractBaseDao {
throw new RuntimeException(e.getMessage(),e);
} catch (Exception e) {
try {
conn.rollback();
if (null != conn) {
conn.rollback();
}
} catch (SQLException e1) {
logger.error(e1.getMessage(),e1);
}
@ -313,7 +314,9 @@ public abstract class UpgradeDao extends AbstractBaseDao {
throw new RuntimeException(e.getMessage(),e);
} catch (SQLException e) {
try {
conn.rollback();
if (null != conn) {
conn.rollback();
}
} catch (SQLException e1) {
logger.error(e1.getMessage(),e1);
}
@ -321,7 +324,9 @@ public abstract class UpgradeDao extends AbstractBaseDao {
throw new RuntimeException(e.getMessage(),e);
} catch (Exception e) {
try {
conn.rollback();
if (null != conn) {
conn.rollback();
}
} catch (SQLException e1) {
logger.error(e1.getMessage(),e1);
}

33
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -24,9 +24,9 @@ import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -78,17 +78,17 @@ public class DagHelper {
List<String> startNodeList = startNodeNameList;
if(taskDependType != TaskDependType.TASK_POST
&& startNodeList.size() == 0){
&& CollectionUtils.isEmpty(startNodeList)){
logger.error("start node list is empty! cannot continue run the process ");
return destFlowNodeList;
}
List<TaskNode> destTaskNodeList = new ArrayList<>();
List<TaskNode> tmpTaskNodeList = new ArrayList<>();
if (taskDependType == TaskDependType.TASK_POST
&& recoveryNodeNameList.size() > 0) {
&& CollectionUtils.isNotEmpty(recoveryNodeNameList)) {
startNodeList = recoveryNodeNameList;
}
if (startNodeList == null || startNodeList.size() == 0) {
if (CollectionUtils.isEmpty(startNodeList)) {
// no special designation start nodes
tmpTaskNodeList = taskNodeList;
} else {
@ -126,10 +126,8 @@ public class DagHelper {
List<TaskNode> resultList = new ArrayList<>();
for (TaskNode taskNode : taskNodeList) {
List<String> depList = taskNode.getDepList();
if (depList != null) {
if (depList.contains(startNode.getName())) {
resultList.addAll(getFlowNodeListPost(taskNode, taskNodeList));
}
if (null != depList && null != startNode && depList.contains(startNode.getName())) {
resultList.addAll(getFlowNodeListPost(taskNode, taskNodeList));
}
}
@ -149,9 +147,12 @@ public class DagHelper {
List<TaskNode> resultList = new ArrayList<>();
List<String> depList = startNode.getDepList();
resultList.add(startNode);
if (depList == null || depList.size() == 0) {
List<String> depList = new ArrayList<>();
if (null != startNode) {
depList = startNode.getDepList();
resultList.add(startNode);
}
if (CollectionUtils.isEmpty(depList)) {
return resultList;
}
for (String depNodeName : depList) {
@ -180,7 +181,10 @@ public class DagHelper {
TaskDependType depNodeType) throws Exception {
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
List<TaskNode> taskNodeList = processData.getTasks();
List<TaskNode> taskNodeList = new ArrayList<>();
if (null != processData) {
taskNodeList = processData.getTasks();
}
List<TaskNode> destTaskNodeList = generateFlowNodeListByStartNode(taskNodeList, startNodeNameList, recoveryNodeNameList, depNodeType);
if (destTaskNodeList.isEmpty()) {
return null;
@ -201,7 +205,10 @@ public class DagHelper {
Map<String, TaskNode> forbidTaskNodeMap = new ConcurrentHashMap<>();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
List<TaskNode> taskNodeList = processData.getTasks();
List<TaskNode> taskNodeList = new ArrayList<>();
if (null != processData) {
taskNodeList = processData.getTasks();
}
for(TaskNode node : taskNodeList){
if(node.isForbidden()){
forbidTaskNodeMap.putIfAbsent(node.getName(), node);

42
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/MysqlPerformance.java

@ -17,12 +17,7 @@
package org.apache.dolphinscheduler.dao.utils;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.MonitorDBDao;
import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.dao.MonitorDBDao.VARIABLE_NAME;
import java.sql.Connection;
import java.sql.ResultSet;
@ -30,7 +25,12 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
import static org.apache.dolphinscheduler.dao.MonitorDBDao.VARIABLE_NAME;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.MonitorDBDao;
import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* mysql performance
@ -55,26 +55,28 @@ public class MysqlPerformance extends BaseDBPerformance{
try{
pstmt = conn.createStatement();
ResultSet rs1 = pstmt.executeQuery("show global variables");
while(rs1.next()){
if(rs1.getString(VARIABLE_NAME).toUpperCase().equals("MAX_CONNECTIONS")){
monitorRecord.setMaxConnections( Long.parseLong(rs1.getString("value")));
try (ResultSet rs1 = pstmt.executeQuery("show global variables")) {
while(rs1.next()){
if(rs1.getString(VARIABLE_NAME).equalsIgnoreCase("MAX_CONNECTIONS")){
monitorRecord.setMaxConnections( Long.parseLong(rs1.getString("value")));
}
}
}
ResultSet rs2 = pstmt.executeQuery("show global status");
while(rs2.next()){
if(rs2.getString(VARIABLE_NAME).toUpperCase().equals("MAX_USED_CONNECTIONS")){
monitorRecord.setMaxUsedConnections(Long.parseLong(rs2.getString("value")));
}else if(rs2.getString(VARIABLE_NAME).toUpperCase().equals("THREADS_CONNECTED")){
monitorRecord.setThreadsConnections(Long.parseLong(rs2.getString("value")));
}else if(rs2.getString(VARIABLE_NAME).toUpperCase().equals("THREADS_RUNNING")){
monitorRecord.setThreadsRunningConnections(Long.parseLong(rs2.getString("value")));
try (ResultSet rs2 = pstmt.executeQuery("show global status")) {
while(rs2.next()){
if(rs2.getString(VARIABLE_NAME).equalsIgnoreCase("MAX_USED_CONNECTIONS")){
monitorRecord.setMaxUsedConnections(Long.parseLong(rs2.getString("value")));
}else if(rs2.getString(VARIABLE_NAME).equalsIgnoreCase("THREADS_CONNECTED")){
monitorRecord.setThreadsConnections(Long.parseLong(rs2.getString("value")));
}else if(rs2.getString(VARIABLE_NAME).equalsIgnoreCase("THREADS_RUNNING")){
monitorRecord.setThreadsRunningConnections(Long.parseLong(rs2.getString("value")));
}
}
}
}catch (Exception e) {
monitorRecord.setState(Flag.NO);
logger.error("SQLException " + e);
logger.error("SQLException ", e);
}finally {
try {
if (pstmt != null) {

39
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/PostgrePerformance.java

@ -16,6 +16,12 @@
*/
package org.apache.dolphinscheduler.dao.utils;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.MonitorDBDao;
@ -23,12 +29,6 @@ import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Date;
/**
* postgresql performance
*/
@ -50,26 +50,27 @@ public class PostgrePerformance extends BaseDBPerformance {
Statement pstmt= null;
try{
pstmt = conn.createStatement();
ResultSet rs1 = pstmt.executeQuery("select count(*) from pg_stat_activity;");
while(rs1.next()){
monitorRecord.setThreadsConnections(rs1.getInt("count"));
break;
try (ResultSet rs1 = pstmt.executeQuery("select count(*) from pg_stat_activity;")) {
if(rs1.next()){
monitorRecord.setThreadsConnections(rs1.getInt("count"));
}
}
ResultSet rs2 = pstmt.executeQuery("show max_connections");
while(rs2.next()){
monitorRecord.setMaxConnections( rs2.getInt("max_connections"));
break;
try (ResultSet rs2 = pstmt.executeQuery("show max_connections")) {
if(rs2.next()){
monitorRecord.setMaxConnections( rs2.getInt("max_connections"));
}
}
ResultSet rs3 = pstmt.executeQuery("select count(*) from pg_stat_activity pg where pg.state = 'active';");
while(rs3.next()){
monitorRecord.setThreadsRunningConnections(rs3.getInt("count"));
break;
try (ResultSet rs3 = pstmt.executeQuery("select count(*) from pg_stat_activity pg where pg.state = 'active';")) {
if(rs3.next()){
monitorRecord.setThreadsRunningConnections(rs3.getInt("count"));
}
}
}catch (Exception e) {
monitorRecord.setState(Flag.NO);
logger.error("SQLException " + e);
logger.error("SQLException ", e);
}finally {
try {
if (pstmt != null) {

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -16,7 +16,6 @@
*/
package org.apache.dolphinscheduler.server.master;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper;
@ -24,6 +23,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.master.runner;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.graph.DAG;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/monitor/AbstractMonitor.java

@ -16,8 +16,8 @@
*/
package org.apache.dolphinscheduler.server.monitor;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java

@ -19,10 +19,10 @@ package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.rpc.LogClient;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java

@ -18,8 +18,8 @@ package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import java.text.MessageFormat;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/log/SensitiveDataConverter.java

@ -20,8 +20,8 @@ package org.apache.dolphinscheduler.server.worker.log;
import ch.qos.logback.classic.pattern.MessageConverter;
import ch.qos.logback.classic.spi.ILoggingEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.utils.SensitiveLogUtil;
import java.util.regex.Matcher;

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java

@ -16,16 +16,12 @@
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java

@ -20,11 +20,11 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import java.io.*;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java

@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java

@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.EnumUtils;
import org.apache.dolphinscheduler.server.worker.task.dependent.DependentTask;
import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask;
import org.apache.dolphinscheduler.server.worker.task.http.HttpTask;
@ -27,7 +28,6 @@ import org.apache.dolphinscheduler.server.worker.task.python.PythonTask;
import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask;
import org.apache.dolphinscheduler.server.worker.task.spark.SparkTask;
import org.apache.dolphinscheduler.server.worker.task.sql.SqlTask;
import org.apache.commons.lang3.EnumUtils;
import org.slf4j.Logger;
/**

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java

@ -21,12 +21,12 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import java.util.ArrayList;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.worker.task.http;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.io.Charsets;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.HttpMethod;
import org.apache.dolphinscheduler.common.enums.HttpParametersType;
@ -30,6 +29,7 @@ import org.apache.dolphinscheduler.common.task.http.HttpParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.utils.ParamUtils;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java

@ -23,10 +23,10 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import java.util.ArrayList;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java

@ -16,13 +16,13 @@
*/
package org.apache.dolphinscheduler.server.worker.task.spark;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.enums.SparkVersion;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;

22
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -19,8 +19,6 @@ package org.apache.dolphinscheduler.server.worker.task.sql;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.EnumUtils;
import org.apache.dolphinscheduler.alert.utils.MailUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ShowType;
@ -33,10 +31,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlBinds;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.DataSource;
@ -124,6 +119,13 @@ public class SqlTask extends AbstractTask {
}
dataSource= processDao.findDataSourceById(sqlParameters.getDatasource());
if (null == dataSource){
logger.error("datasource not exists");
exitStatusCode = -1;
return;
}
logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}",
dataSource.getName(),
dataSource.getType(),
@ -131,12 +133,6 @@ public class SqlTask extends AbstractTask {
dataSource.getUserId(),
dataSource.getConnectionParams());
if (dataSource == null){
logger.error("datasource not exists");
exitStatusCode = -1;
return;
}
Connection con = null;
List<String> createFuncs = null;
try {
@ -182,7 +178,7 @@ public class SqlTask extends AbstractTask {
try {
con.close();
} catch (SQLException e) {
throw e;
logger.error(e.getMessage(),e);
}
}
}

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java

@ -16,12 +16,12 @@
*/
package org.apache.dolphinscheduler.server.worker.task.spark;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.enums.SparkVersion;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;

1
dolphinscheduler-ui/pom.xml

@ -31,6 +31,7 @@
<properties>
<node.version>v12.12.0</node.version>
<npm.version>6.11.3</npm.version>
<sonar.sources>src</sonar.sources>
</properties>
<profiles>
<profile>

2
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss

@ -309,13 +309,13 @@ svg path:hover {
//min-width: calc(100% - 220px);
width: 8000px;
height: 5000px;
position: relative;
svg:not(:root){
z-index: 11;
}
}
.jtk-demo-canvas {
position: relative;
height: 100%;
}

21
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue

@ -156,12 +156,12 @@
},
methods: {
...mapActions('dag', ['saveDAGchart', 'updateInstance', 'updateDefinition', 'getTaskState']),
...mapMutations('dag', ['addTasks', 'resetParams', 'setIsEditDag', 'setName']),
...mapMutations('dag', ['addTasks', 'cacheTasks', 'resetParams', 'setIsEditDag', 'setName']),
// DAG automatic layout
dagAutomaticLayout() {
$('#canvas').html('')
// Destroy round robin
Dag.init({
dag: this,
@ -187,6 +187,13 @@
})
if (this.tasks.length) {
Dag.backfill(true)
if (this.type === 'instance') {
this._getTaskState(false).then(res => {})
// Round robin acquisition status
this.setIntervalP = setInterval(() => {
this._getTaskState(true).then(res => {})
}, 90000)
}
} else {
Dag.create()
}
@ -488,6 +495,14 @@
removeNodesEvent(fromThis)
}, 100)
},
/**
* Cache the item
* @param item
* @param fromThis
*/
cacheTaskInfo({item, fromThis}) {
self.cacheTasks(item)
},
close ({ flag, fromThis }) {
// Edit status does not allow deletion of nodes
if (flag) {

90
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue

@ -16,7 +16,7 @@
*/
<template>
<div class="form-model-model" v-clickoutside="_handleClose">
<div class="title-box">
<div class="title-box">
<span class="name">{{$t('Current node settings')}}</span>
<span class="go-subtask">
<!-- Component can't pop up box to do component processing -->
@ -184,6 +184,7 @@
<m-http
v-if="taskType === 'HTTP'"
@on-params="_onParams"
@on-cache-params="_onCacheParams"
ref="HTTP"
:backfill-item="backfillItem">
</m-http>
@ -333,6 +334,31 @@
_onParams (o) {
this.params = Object.assign(this.params, {}, o)
},
_onCacheParams (o) {
this.params = Object.assign(this.params, {}, o)
this._cacheItem()
},
_cacheItem () {
this.$emit('cacheTaskInfo', {
item: {
type: this.taskType,
id: this.id,
name: this.name,
params: this.params,
description: this.description,
runFlag: this.runFlag,
dependence: this.dependence,
maxRetryTimes: this.maxRetryTimes,
retryInterval: this.retryInterval,
timeout: this.timeout,
taskInstancePriority: this.taskInstancePriority,
workerGroupId: this.workerGroupId
},
fromThis: this
})
},
/**
* verification name
*/
@ -431,29 +457,43 @@
}
},
watch: {
/**
* Watch the item change, cache the value it changes
**/
_item (val) {
this._cacheItem()
}
},
created () {
// Unbind copy and paste events
JSP.removePaste()
// Backfill data
let taskList = this.store.state.dag.tasks
//fillback use cacheTasks
let cacheTasks = this.store.state.dag.cacheTasks
let o = {}
if (taskList.length) {
taskList.forEach(v => {
if (v.id === this.id) {
o = v
this.backfillItem = v
if (cacheTasks[this.id]) {
o = cacheTasks[this.id]
this.backfillItem = cacheTasks[this.id]
} else {
if (taskList.length) {
taskList.forEach(v => {
if (v.id === this.id) {
o = v
this.backfillItem = v
}
})
}
})
// Non-null objects represent backfill
if (!_.isEmpty(o)) {
this.name = o.name
this.taskInstancePriority = o.taskInstancePriority
this.runFlag = o.runFlag || 'NORMAL'
this.description = o.description
this.maxRetryTimes = o.maxRetryTimes
this.retryInterval = o.retryInterval
}
// Non-null objects represent backfill
if (!_.isEmpty(o)) {
this.name = o.name
this.taskInstancePriority = o.taskInstancePriority
this.runFlag = o.runFlag || 'NORMAL'
this.description = o.description
this.maxRetryTimes = o.maxRetryTimes
this.retryInterval = o.retryInterval
// If the workergroup has been deleted, set the default workergroup
var hasMatch = false;
@ -471,7 +511,6 @@
this.workerGroupId = o.workerGroupId
}
}
}
this.isContentBox = true
},
@ -490,6 +529,23 @@
*/
_isGoSubProcess () {
return this.taskType === 'SUB_PROCESS' && this.name
},
//Define the item model
_item () {
return {
type: this.taskType,
id: this.id,
name: this.name,
description: this.description,
runFlag: this.runFlag,
dependence: this.dependence,
maxRetryTimes: this.maxRetryTimes,
retryInterval: this.retryInterval,
timeout: this.timeout,
taskInstancePriority: this.taskInstancePriority,
workerGroupId: this.workerGroupId
}
}
},
components: {

19
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/http.vue

@ -164,7 +164,26 @@
return true
}
},
computed: {
cacheParams () {
return {
localParams: this.localParams,
httpParams: this.httpParams,
url: this.url,
httpMethod: this.httpMethod,
httpCheckCondition: this.httpCheckCondition,
condition: this.condition
}
}
},
watch: {
/**
* Watch the cacheParams
* @param val
*/
cacheParams (val) {
this.$emit('on-cache-params', val);
}
},
created () {
let o = this.backfillItem

17
dolphinscheduler-ui/src/js/conf/home/store/dag/mutations.js

@ -125,6 +125,11 @@ export default {
} else {
state.tasks.push(payload)
}
if (state.cacheTasks[payload.id]) {
state.cacheTasks[payload.id] = Object.assign(state.cacheTasks[payload.id], {}, payload)
} else {
state.cacheTasks[payload.id] = payload;
}
let dom = $(`#${payload.id}`)
state.locations[payload.id] = _.assign(state.locations[payload.id], {
name: dom.find('.name-p').text(),
@ -132,5 +137,17 @@ export default {
x: parseInt(dom.css('left'), 10),
y: parseInt(dom.css('top'), 10)
})
},
/**
* Cache the input
* @param state
* @param payload
*/
cacheTasks (state, payload) {
if (state.cacheTasks[payload.id]) {
state.cacheTasks[payload.id] = Object.assign(state.cacheTasks[payload.id], {}, payload)
} else {
state.cacheTasks[payload.id] = payload;
}
}
}

2
dolphinscheduler-ui/src/js/conf/home/store/dag/state.js

@ -29,6 +29,8 @@ export default {
globalParams: [],
// Node information
tasks: [],
// Node cache information, cache the previous input
cacheTasks: {},
// Timeout alarm
timeout: 0,
// tenant id

1
dolphinscheduler-ui/src/js/module/components/fileUpdate/definitionUpdate.vue

@ -141,6 +141,7 @@
let self = this
let formData = new FormData()
formData.append('file', this.file)
formData.append('projectName',this.store.state.dag.projectName)
io.post(`projects/import-definition`, res => {
this.$message.success(res.msg)
resolve()

16
pom.xml

@ -116,6 +116,8 @@
<cobertura-maven-plugin.version>2.7</cobertura-maven-plugin.version>
<mockito.version>2.21.0</mockito.version>
<powermock.version>2.0.2</powermock.version>
<jasper-runtime.version>5.5.23</jasper-runtime.version>
<servlet-api.version>2.5</servlet-api.version>
</properties>
<dependencyManagement>
@ -516,6 +518,17 @@
<version>${jcip.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
<version>${jasper-runtime.version}</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>${servlet-api.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
@ -661,6 +674,7 @@
<configuration>
<includes>
<include>**/common/utils/*.java</include>
<include>**/common/threadutils/*.java</include>
<include>**/common/graph/*.java</include>
<include>**/common/queue/*.java</include>
<include>**/api/utils/CheckUtilsTest.java</include>
@ -675,6 +689,7 @@
<include>**/api/service/WorkerGroupServiceTest.java</include>
<include>**/api/service/AlertGroupServiceTest.java</include>
<include>**/api/service/ProjectServiceTest.java</include>
<include>**/api/service/ProcessDefinitionServiceTest.java</include>
<include>**/api/service/UdfFuncServiceTest.java</include>
<include>**/alert/utils/ExcelUtilsTest.java</include>
<include>**/alert/utils/FuncUtilsTest.java</include>
@ -808,6 +823,7 @@
<check>
</check>
<aggregate>true</aggregate>
<outputDirectory>./target/cobertura</outputDirectory>
<encoding>${project.build.sourceEncoding}</encoding>
<quiet>true</quiet>
<format>xml</format>

Loading…
Cancel
Save