Browse Source

[Feature][JsonSplit-api]merge code from dev to json_split_two (#5923)

* [BUG-#5678][Registry]fix registry init node miss (#5686)

* [Improvement][UI] Update the update time after the user information is successfully modified (#5684)

* improve

edit the userinfo success, but the updatetime is not the latest.

* Improved shell task execution result log information, adding process.waitFor() and process.exitValue() information to the original log (#5691)

Co-authored-by: shenglm <shenglm840722@126.com>

* [Feature-#5565][Master Worker-Server] Global Param passed by sense dependencies (#5603)

* add globalParams new plan with varPool

* add unit test

* add python task varPoolParams


Co-authored-by: wangxj <wangxj31>

* Issue robot translation judgment changed to Chinese (#5694)



Co-authored-by: chenxingchun <438044805@qq.com>

* the update function should use post instead of get (#5703)

* enhance form verify (#5696)

* checkState only supports %s not {} (#5711)

* [Fix-5701]When deleting a user, the accessToken associated with the user should also be deleted (#5697)

* update

* fix the codestyle error

* fix the compile error

* support rollback

* [Fix-5699][UI] Fix update user error in user information (#5700)

* [Improvement] the automatically generated spi service name in alert-plugin is wrong (#5676)

* bug fix

the auto generated spi service can't be recongized



* include a  new method

* [Improvement-5622][project management] Modify the title (#5723)

* [Fix-5714] When updating the existing alarm instance, the creation time should't be updated (#5715)



* add a new init method.

* [Fix#5758] There are some problems in the api documentation that need to be improved (#5759)

* add the necessary parameters

* openapi improve

* fix code style error

* [FIX-#5721][master-server] Global params parameter missing (#5757)



Co-authored-by: wangxj <wangxj31>

* [Fix-5738][UI] The cancel button in the pop-up dialog of `batch copy` and `batch move`  doesn't work. (#5739)

* Update relatedItems.vue

* Update relatedItems.vue

* [Improvement#5741][Worker] Improve task process status log  (#5776)

* [Improvement-5773][server] need to support two parameters related to task (#5774)

* add some new parameter for task

* restore official properties

* improve imports

* modify a variable's name

Co-authored-by: jiang hua <jiang.hua@zhaopin.com.cn>

* [FIX-5786][Improvement][Server] When the Worker turns down, the MasterServer cannot handle the Remove event correctly and throws NPE

* [Improvement][Worker] Task log may be lost #5775 (#5783)

* [Imporvement #5725][CheckStyle] upgrade checkstyle file (#5789)

* [Imporvement #5725][CheckStyle] upgrade checkstyle file
  Upgrade checkstyle.xml to support checkstyle version 8.24+

* change ci checkstyle version

* [Fix-5795][Improvement][Server] The starttime field in the HttpTask log is not displayed as expected.  (#5796)

* improve timestamp format

make the startime in the log of httptask to be easier to read.


* fix bad code smell and update the note.

* [Imporvement #5621][job instance] start-time and end-time (#5621) (#5797)

·the list of workflow instances is sorted by start time and end time
·This closes #5621

* fix (#5803)

Co-authored-by: shuangbofu <fusb@tuya.com>

* fix: Remove duplicate "registryClient.close" method calls (#5805)

Co-authored-by: wen-hemin <wenhemin@apache.com>

* [Improvement][SPI] support load single plugin (#5794)

change load operation of 'registry.plugin.dir'

* [Improvement][Api Module] refactor registry client, remove spring annotation (#5814)

* fix: refactor registry client, remove spring annotation

* fix UT

* fix UT

* fix checkstyle

* fix UT

* fix UT

* fix UT

* fix: Rename RegistryCenterUtils method name

Co-authored-by: wen-hemin <wenhemin@apache.com>

* [Fix-5699][UI] Fix update user error in user information introduced by #5700 (#5735)

* [Fix-5726] When we used the UI page, we found some problems such as parameter validation, parameter update shows success but actually work (#5727)

* enhance the validation in UI

* enchance form verifaction

* simplify disable condition

* fix: Remove unused class (#5833)

Co-authored-by: wen-hemin <wenhemin@apache.com>

* [fix-5737] [Bug][Datasource] datsource other param check error (#5835)

Co-authored-by: wanggang <wanggy01@servyou.com.cn>

* [Fix-5719][K8s] Fix Ingress tls: got map expected array On TLS enabled On Kubernetes

[Fix-5719][K8s] Fix Ingress tls: got map expected array On TLS enabled On Kubernetes

* [Fix-5825][BUG][WEB] the resource tree in the process definition of latest dev branch can't display correctly (#5826)

* resoures-shows-error

* fix codestyle error

* add license header for new js

* fix codesmell

* [Improvement-5852][server] Support two parameters related to task for the rest of type of tasks. (#5867)

* provide two system parameters to support the rest of type of tasks

* provide two system parameters to support the rest of type of tasks

* improve test conversion

* [Improvement][Fix-5769][UI]When we try to delete the existing dag, the console in web browser would shows exception (#5770)

* fix bug

* cache the this variable

* Avoid self name

* fix code style compile error

* [Fix-5781][UT] Fix test coverage in sonar (#5817)

* build(UT): make jacoco running in offline-instrumentation

issue: #5781

* build(UT): remove the jacoco agent dependency in microbench

issue: #5781

* [Fix-5808][Server]  When we try to transfer data using datax between  different types of data sources, the worker will exit with ClassCastException (#5809)

* bug fix

* fix bug

* simplify the code format

* add a new parameter to make it easier to understand.

* [Fix-5830][Improvement][UI] Improve the selection style in dag edit dialog (#5829)

* improve the selection style

* update another file

* remove unnecessary css part.

* [Fix-5904][upgrade]fix dev branch upgrade mysql sql script error (#5821)

* fix dev branch upgrade mysql sql script error.

* Update naming convention.

* [Improvement][Api Module] refactor DataSourceParam and DependentParam, remove spring annotation (#5832)

* fix: refactor api utils class, remove spring annotation.

* fix: Optimization comments

Co-authored-by: wen-hemin <wenhemin@apache.com>

* correct the wrong annotion from zk queue implemented to java priority blocking queue (#5906)

Co-authored-by: ywang46 <ywang46@paypal.com>

Co-authored-by: Kirs <acm_master@163.com>
Co-authored-by: kyoty <echohlne@gmail.com>
Co-authored-by: ji04xiaogang <ji04xiaogang@163.com>
Co-authored-by: shenglm <shenglm840722@126.com>
Co-authored-by: wangxj3 <857234426@qq.com>
Co-authored-by: xingchun-chen <55787491+xingchun-chen@users.noreply.github.com>
Co-authored-by: chenxingchun <438044805@qq.com>
Co-authored-by: Shiwen Cheng <chengshiwen0103@gmail.com>
Co-authored-by: Jianchao Wang <akingchao@qq.com>
Co-authored-by: Tanvi Moharir <74228962+tanvimoharir@users.noreply.github.com>
Co-authored-by: Hua Jiang <jianghuachinacom@163.com>
Co-authored-by: jiang hua <jiang.hua@zhaopin.com.cn>
Co-authored-by: Wenjun Ruan <861923274@qq.com>
Co-authored-by: Tandoy <56899730+Tandoy@users.noreply.github.com>
Co-authored-by: 傅双波 <786183073@qq.com>
Co-authored-by: shuangbofu <fusb@tuya.com>
Co-authored-by: wen-hemin <39549317+wen-hemin@users.noreply.github.com>
Co-authored-by: wen-hemin <wenhemin@apache.com>
Co-authored-by: geosmart <geosmart@hotmail.com>
Co-authored-by: wanggang <wanggy01@servyou.com.cn>
Co-authored-by: AzureCN <colorazure@163.com>
Co-authored-by: 深刻 <tsund@qq.com>
Co-authored-by: zhuangchong <37063904+zhuangchong@users.noreply.github.com>
Co-authored-by: Yao WANG <Yao.MR.CN@gmail.com>
Co-authored-by: ywang46 <ywang46@paypal.com>
Co-authored-by: JinyLeeChina <297062848@qq.com>
2.0.7-release
JinyLeeChina 3 years ago committed by GitHub
parent
commit
7427dc6fd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      .github/workflows/ci_ut.yml
  2. 2
      docker/kubernetes/dolphinscheduler/templates/ingress.yaml
  3. 7
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/pom.xml
  4. 7
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml
  5. 7
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/pom.xml
  6. 7
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-http/pom.xml
  7. 7
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/pom.xml
  8. 7
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-slack/pom.xml
  9. 7
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/pom.xml
  10. 7
      dolphinscheduler-alert/pom.xml
  11. 7
      dolphinscheduler-api/pom.xml
  12. 84
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java
  13. 114
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java
  14. 39
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java
  15. 38
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java
  16. 84
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java
  17. 110
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java
  18. 7
      dolphinscheduler-common/pom.xml
  19. 13
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  20. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/datasource/AbstractDatasourceProcessor.java
  21. 8
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/datasource/DatasourceUtilTest.java
  22. 6
      dolphinscheduler-dao/pom.xml
  23. 7
      dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml
  24. 7
      dolphinscheduler-remote/pom.xml
  25. 6
      dolphinscheduler-server/pom.xml
  26. 73
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
  27. 54
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/RemoveZKNode.java
  28. 34
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
  29. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
  30. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
  31. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
  32. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java
  33. 18
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
  34. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
  35. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
  36. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  37. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
  38. 47
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java
  39. 61
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
  40. 6
      dolphinscheduler-service/pom.xml
  41. 3
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
  42. 6
      dolphinscheduler-spi/pom.xml
  43. 5
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.js
  44. 8
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue
  45. 44
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/resourceTree.js
  46. 39
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue
  47. 39
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue
  48. 38
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue
  49. 38
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue
  50. 40
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue
  51. 37
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/waterdrop.vue
  52. 5
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/udp/udp.vue
  53. 27
      pom.xml
  54. 4
      sql/dolphinscheduler_mysql.sql
  55. 4
      sql/dolphinscheduler_postgre.sql
  56. 10
      sql/upgrade/1.4.0_schema/mysql/dolphinscheduler_ddl.sql
  57. 8
      sql/upgrade/1.4.0_schema/postgresql/dolphinscheduler_ddl.sql

2
.github/workflows/ci_ut.yml

@ -64,7 +64,7 @@ jobs:
- name: Compile - name: Compile
run: | run: |
export MAVEN_OPTS='-Dmaven.repo.local=.m2/repository -XX:+TieredCompilation -XX:TieredStopAtLevel=1 -XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -Xmx5g' export MAVEN_OPTS='-Dmaven.repo.local=.m2/repository -XX:+TieredCompilation -XX:TieredStopAtLevel=1 -XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -Xmx5g'
mvn test -B -Dmaven.test.skip=false mvn clean verify -B -Dmaven.test.skip=false
- name: Upload coverage report to codecov - name: Upload coverage report to codecov
run: | run: |
CODECOV_TOKEN="09c2663f-b091-4258-8a47-c981827eb29a" bash <(curl -s https://codecov.io/bash) CODECOV_TOKEN="09c2663f-b091-4258-8a47-c981827eb29a" bash <(curl -s https://codecov.io/bash)

2
docker/kubernetes/dolphinscheduler/templates/ingress.yaml

@ -49,7 +49,7 @@ spec:
{{- end }} {{- end }}
{{- if .Values.ingress.tls.enabled }} {{- if .Values.ingress.tls.enabled }}
tls: tls:
hosts: - hosts:
- {{ .Values.ingress.host }} - {{ .Values.ingress.host }}
secretName: {{ .Values.ingress.tls.secretName }} secretName: {{ .Values.ingress.tls.secretName }}
{{- end }} {{- end }}

7
dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/pom.xml

@ -68,6 +68,13 @@
<type>jar</type> <type>jar</type>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

7
dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml

@ -118,6 +118,13 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

7
dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/pom.xml

@ -68,6 +68,13 @@
<type>jar</type> <type>jar</type>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

7
dolphinscheduler-alert-plugin/dolphinscheduler-alert-http/pom.xml

@ -62,6 +62,13 @@
<type>jar</type> <type>jar</type>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

7
dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/pom.xml

@ -64,6 +64,13 @@
<type>jar</type> <type>jar</type>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

7
dolphinscheduler-alert-plugin/dolphinscheduler-alert-slack/pom.xml

@ -69,6 +69,13 @@
<type>jar</type> <type>jar</type>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

7
dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/pom.xml

@ -64,6 +64,13 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

7
dolphinscheduler-alert/pom.xml

@ -108,6 +108,13 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

7
dolphinscheduler-api/pom.xml

@ -247,5 +247,12 @@
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

84
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java

@ -1,84 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.utils.exportprocess;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* task node add datasource param strategy
*/
@Service
public class DataSourceParam implements ProcessAddTaskParam, InitializingBean {
private static final String PARAMS = "params";
@Autowired
private DataSourceMapper dataSourceMapper;
/**
* add datasource params
* @param taskNode task node json object
* @return task node json object
*/
@Override
public JsonNode addExportSpecialParam(JsonNode taskNode) {
// add sqlParameters
ObjectNode sqlParameters = (ObjectNode) taskNode.path(PARAMS);
DataSource dataSource = dataSourceMapper.selectById(sqlParameters.get("datasource").asInt());
if (null != dataSource) {
sqlParameters.put("datasourceName", dataSource.getName());
}
((ObjectNode)taskNode).set(PARAMS, sqlParameters);
return taskNode;
}
/**
* import process add datasource params
* @param taskNode task node json object
* @return task node json object
*/
@Override
public JsonNode addImportSpecialParam(JsonNode taskNode) {
ObjectNode sqlParameters = (ObjectNode) taskNode.path(PARAMS);
List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.path("datasourceName").asText());
if (!dataSources.isEmpty()) {
DataSource dataSource = dataSources.get(0);
sqlParameters.put("datasource", dataSource.getId());
}
((ObjectNode)taskNode).set(PARAMS, sqlParameters);
return taskNode;
}
/**
* put datasource strategy
*/
@Override
public void afterPropertiesSet() {
TaskNodeParamFactory.register(TaskType.SQL.getDesc(), this);
TaskNodeParamFactory.register(TaskType.PROCEDURE.getDesc(), this);
}
}

114
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java

@ -1,114 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.utils.exportprocess;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* task node add dependent param strategy
*/
@Service
public class DependentParam implements ProcessAddTaskParam, InitializingBean {
private static final String DEPENDENCE = "dependence";
@Autowired
ProcessDefinitionMapper processDefineMapper;
@Autowired
ProjectMapper projectMapper;
/**
* add dependent param
* @param taskNode task node json object
* @return task node json object
*/
@Override
public JsonNode addExportSpecialParam(JsonNode taskNode) {
// add dependent param
ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.path(DEPENDENCE).asText());
if (null != dependentParameters) {
ArrayNode dependTaskList = (ArrayNode) dependentParameters.get("dependTaskList");
for (int j = 0; j < dependTaskList.size(); j++) {
JsonNode dependentTaskModel = dependTaskList.path(j);
ArrayNode dependItemList = (ArrayNode) dependentTaskModel.get("dependItemList");
for (int k = 0; k < dependItemList.size(); k++) {
ObjectNode dependentItem = (ObjectNode) dependItemList.path(k);
int definitionId = dependentItem.path("definitionId").asInt();
ProcessDefinition definition = processDefineMapper.queryByDefineId(definitionId);
if (null != definition) {
dependentItem.put("projectName", definition.getProjectName());
dependentItem.put("definitionName", definition.getName());
}
}
}
((ObjectNode)taskNode).set(DEPENDENCE, dependentParameters);
}
return taskNode;
}
/**
* import process add dependent param
* @param taskNode task node json object
* @return
*/
@Override
public JsonNode addImportSpecialParam(JsonNode taskNode) {
ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.path(DEPENDENCE).asText());
if(dependentParameters != null){
ArrayNode dependTaskList = (ArrayNode) dependentParameters.path("dependTaskList");
for (int h = 0; h < dependTaskList.size(); h++) {
ObjectNode dependentTaskModel = (ObjectNode) dependTaskList.path(h);
ArrayNode dependItemList = (ArrayNode) dependentTaskModel.get("dependItemList");
for (int k = 0; k < dependItemList.size(); k++) {
ObjectNode dependentItem = (ObjectNode) dependItemList.path(k);
Project dependentItemProject = projectMapper.queryByName(dependentItem.path("projectName").asText());
if(dependentItemProject != null){
ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getCode(),dependentItem.path("definitionName").asText());
if(definition != null){
dependentItem.put("projectId",dependentItemProject.getId());
dependentItem.put("definitionId",definition.getId());
}
}
}
}
((ObjectNode)taskNode).set(DEPENDENCE, dependentParameters);
}
return taskNode;
}
/**
* put dependent strategy
*/
@Override
public void afterPropertiesSet() {
TaskNodeParamFactory.register(TaskType.DEPENDENT.getDesc(), this);
}
}

39
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java

@ -1,39 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.utils.exportprocess;
import com.fasterxml.jackson.databind.JsonNode;
/**
* ProcessAddTaskParam
*/
public interface ProcessAddTaskParam {
/**
* add export task special param: sql task dependent task
* @param taskNode task node json object
* @return task node json object
*/
JsonNode addExportSpecialParam(JsonNode taskNode);
/**
* add task special param: sql task dependent task
* @param taskNode task node json object
* @return task node json object
*/
JsonNode addImportSpecialParam(JsonNode taskNode);
}

38
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java

@ -1,38 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.utils.exportprocess;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* task node param factory
*/
public class TaskNodeParamFactory {
private static Map<String, ProcessAddTaskParam> taskServices = new ConcurrentHashMap<>();
public static ProcessAddTaskParam getByTaskType(String taskType){
return taskServices.get(taskType);
}
static void register(String taskType, ProcessAddTaskParam addSpecialTaskParam){
if (null != taskType) {
taskServices.put(taskType, addSpecialTaskParam);
}
}
}

84
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java

@ -1,84 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.utils.exportprocess;
import org.apache.dolphinscheduler.api.controller.AbstractControllerTest;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.json.JSONException;
import org.junit.Test;
import org.skyscreamer.jsonassert.JSONAssert;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* DataSourceParamTest
*/
public class DataSourceParamTest extends AbstractControllerTest {
@Test
public void testAddExportDependentSpecialParam() throws JSONException {
String sqlJson = "{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\","
+ "\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\","
+ "\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\""
+ ",\"localParams\":[],\"connParams\":\"\","
+ "\"preStatements\":[],\"postStatements\":[]},"
+ "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\","
+ "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\","
+ "\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,"
+ "\"preTasks\":[\"dependent\"]}";
ObjectNode taskNode = JSONUtils.parseObject(sqlJson);
if (StringUtils.isNotEmpty(taskNode.path("type").asText())) {
String taskType = taskNode.path("type").asText();
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
JsonNode sql = addTaskParam.addExportSpecialParam(taskNode);
JSONAssert.assertEquals(taskNode.toString(), sql.toString(), false);
}
}
@Test
public void testAddImportDependentSpecialParam() throws JSONException {
String sqlJson = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\","
+ "\"type\":\"SQL\",\"params\":{\"postStatements\":[],"
+ "\"connParams\":\"\",\"receiversCc\":\"\",\"udfs\":\"\","
+ "\"type\":\"MYSQL\",\"title\":\"\",\"sql\":\"show tables\",\""
+ "preStatements\":[],\"sqlType\":\"1\",\"receivers\":\"\",\"datasource\":1,"
+ "\"showType\":\"TABLE\",\"localParams\":[],\"datasourceName\":\"dsmetadata\"},\"timeout\""
+ ":{\"enable\":false,\"strategy\":\"\"},\"maxRetryTimes\":\"0\","
+ "\"taskInstancePriority\":\"MEDIUM\",\"name\":\"mysql\",\"dependence\":{},"
+ "\"retryInterval\":\"1\",\"preTasks\":[\"dependent\"],\"id\":\"tasks-8745\"}";
ObjectNode taskNode = JSONUtils.parseObject(sqlJson);
if (StringUtils.isNotEmpty(taskNode.path("type").asText())) {
String taskType = taskNode.path("type").asText();
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
JsonNode sql = addTaskParam.addImportSpecialParam(taskNode);
JSONAssert.assertEquals(taskNode.toString(), sql.toString(), false);
}
}
}

110
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java

@ -1,110 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.utils.exportprocess;
import org.apache.dolphinscheduler.api.controller.AbstractControllerTest;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.json.JSONException;
import org.junit.Test;
import org.skyscreamer.jsonassert.JSONAssert;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* DependentParamTest
*/
public class DependentParamTest extends AbstractControllerTest {
@Test
public void testAddExportDependentSpecialParam() throws JSONException {
String dependentJson = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\","
+ "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\","
+ "\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\","
+ "\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\","
+ "\"cycle\":\"day\",\"dateValue\":\"today\"}]}]}}";
ObjectNode taskNode = JSONUtils.parseObject(dependentJson);
if (StringUtils.isNotEmpty(taskNode.path("type").asText())) {
String taskType = taskNode.path("type").asText();
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
JsonNode dependent = addTaskParam.addExportSpecialParam(taskNode);
JSONAssert.assertEquals(taskNode.toString(), dependent.toString(), false);
}
String dependentEmpty = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\","
+ "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"}";
ObjectNode taskEmpty = JSONUtils.parseObject(dependentEmpty);
if (StringUtils.isNotEmpty(taskEmpty.path("type").asText())) {
String taskType = taskEmpty.path("type").asText();
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
JsonNode dependent = addTaskParam.addImportSpecialParam(taskEmpty);
JSONAssert.assertEquals(taskEmpty.toString(), dependent.toString(), false);
}
}
@Test
public void testAddImportDependentSpecialParam() throws JSONException {
String dependentJson = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\""
+ ",\"type\":\"DEPENDENT\",\"params\":{},\"timeout\":{\"enable\":false,"
+ "\"strategy\":\"\"},\"maxRetryTimes\":\"0\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"name\":\"dependent\","
+ "\"dependence\":{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\","
+ "\"definitionName\":\"shell-1\",\"depTasks\":\"shell-1\",\"projectName\":\"test\","
+ "\"projectId\":1,\"cycle\":\"day\",\"definitionId\":7}],\"relation\":\"AND\"}],"
+ "\"relation\":\"AND\"},\"retryInterval\":\"1\",\"preTasks\":[],\"id\":\"tasks-55485\"}";
ObjectNode taskNode = JSONUtils.parseObject(dependentJson);
if (StringUtils.isNotEmpty(taskNode.path("type").asText())) {
String taskType = taskNode.path("type").asText();
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
JsonNode dependent = addTaskParam.addImportSpecialParam(taskNode);
JSONAssert.assertEquals(taskNode.toString(), dependent.toString(), false);
}
String dependentEmpty = "{\"workerGroupId\":-1,\"description\":\"\",\"runFlag\":\"NORMAL\""
+ ",\"type\":\"DEPENDENT\",\"params\":{},\"timeout\":{\"enable\":false,"
+ "\"strategy\":\"\"},\"maxRetryTimes\":\"0\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"name\":\"dependent\",\"retryInterval\":\"1\",\"preTasks\":[],\"id\":\"tasks-55485\"}";
JsonNode taskNodeEmpty = JSONUtils.parseObject(dependentEmpty);
if (StringUtils.isNotEmpty(taskNodeEmpty.path("type").asText())) {
String taskType = taskNodeEmpty.path("type").asText();
ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType);
JsonNode dependent = addTaskParam.addImportSpecialParam(taskNode);
JSONAssert.assertEquals(taskNodeEmpty.toString(), dependent.toString(), false);
}
}
}

7
dolphinscheduler-common/pom.xml

@ -90,6 +90,13 @@
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>commons-configuration</groupId> <groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId> <artifactId>commons-configuration</artifactId>

13
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -1095,4 +1095,17 @@ public final class Constants {
public static final boolean DOCKER_MODE = StringUtils.isNotEmpty(System.getenv("DOCKER")); public static final boolean DOCKER_MODE = StringUtils.isNotEmpty(System.getenv("DOCKER"));
public static final boolean KUBERNETES_MODE = StringUtils.isNotEmpty(System.getenv("KUBERNETES_SERVICE_HOST")) && StringUtils.isNotEmpty(System.getenv("KUBERNETES_SERVICE_PORT")); public static final boolean KUBERNETES_MODE = StringUtils.isNotEmpty(System.getenv("KUBERNETES_SERVICE_HOST")) && StringUtils.isNotEmpty(System.getenv("KUBERNETES_SERVICE_PORT"));
/**
* task parameter keys
*/
public static final String TASK_PARAMS = "params";
public static final String TASK_PARAMS_DATASOURCE = "datasource";
public static final String TASK_PARAMS_DATASOURCE_NAME = "datasourceName";
public static final String TASK_DEPENDENCE = "dependence";
public static final String TASK_DEPENDENCE_DEPEND_TASK_LIST = "dependTaskList";
public static final String TASK_DEPENDENCE_DEPEND_ITEM_LIST = "dependItemList";
public static final String TASK_DEPENDENCE_PROJECT_ID = "projectId";
public static final String TASK_DEPENDENCE_PROJECT_NAME = "projectName";
public static final String TASK_DEPENDENCE_DEFINITION_ID = "definitionId";
public static final String TASK_DEPENDENCE_DEFINITION_NAME = "definitionName";
} }

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/datasource/AbstractDatasourceProcessor.java

@ -30,7 +30,7 @@ public abstract class AbstractDatasourceProcessor implements DatasourceProcessor
private static final Pattern DATABASE_PATTER = Pattern.compile("^[a-zA-Z0-9\\_\\-\\.]+$"); private static final Pattern DATABASE_PATTER = Pattern.compile("^[a-zA-Z0-9\\_\\-\\.]+$");
private static final Pattern PARAMS_PATTER = Pattern.compile("^[a-zA-Z0-9]+$"); private static final Pattern PARAMS_PATTER = Pattern.compile("^[a-zA-Z0-9\\-\\_\\/]+$");
@Override @Override
public void checkDatasourceParam(BaseDataSourceParamDTO baseDataSourceParamDTO) { public void checkDatasourceParam(BaseDataSourceParamDTO baseDataSourceParamDTO) {

8
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/datasource/DatasourceUtilTest.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.common.datasource; package org.apache.dolphinscheduler.common.datasource;
import java.util.HashMap;
import java.util.Map;
import org.apache.dolphinscheduler.common.datasource.mysql.MysqlConnectionParam; import org.apache.dolphinscheduler.common.datasource.mysql.MysqlConnectionParam;
import org.apache.dolphinscheduler.common.datasource.mysql.MysqlDatasourceParamDTO; import org.apache.dolphinscheduler.common.datasource.mysql.MysqlDatasourceParamDTO;
import org.apache.dolphinscheduler.common.datasource.mysql.MysqlDatasourceProcessor; import org.apache.dolphinscheduler.common.datasource.mysql.MysqlDatasourceProcessor;
@ -44,7 +46,11 @@ public class DatasourceUtilTest {
MysqlDatasourceParamDTO mysqlDatasourceParamDTO = new MysqlDatasourceParamDTO(); MysqlDatasourceParamDTO mysqlDatasourceParamDTO = new MysqlDatasourceParamDTO();
mysqlDatasourceParamDTO.setHost("localhost"); mysqlDatasourceParamDTO.setHost("localhost");
mysqlDatasourceParamDTO.setDatabase("default"); mysqlDatasourceParamDTO.setDatabase("default");
mysqlDatasourceParamDTO.setOther(null); Map<String, String> other = new HashMap<>();
other.put("serverTimezone", "Asia/Shanghai");
other.put("queryTimeout", "-1");
other.put("characterEncoding", "utf8");
mysqlDatasourceParamDTO.setOther(other);
DatasourceUtil.checkDatasourceParam(mysqlDatasourceParamDTO); DatasourceUtil.checkDatasourceParam(mysqlDatasourceParamDTO);
Assert.assertTrue(true); Assert.assertTrue(true);
} }

6
dolphinscheduler-dao/pom.xml

@ -48,6 +48,12 @@
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>com.baomidou</groupId> <groupId>com.baomidou</groupId>
<artifactId>mybatis-plus</artifactId> <artifactId>mybatis-plus</artifactId>

7
dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml

@ -70,6 +70,13 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

7
dolphinscheduler-remote/pom.xml

@ -73,6 +73,13 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>

6
dolphinscheduler-server/pom.xml

@ -77,6 +77,12 @@
<artifactId>mockito-core</artifactId> <artifactId>mockito-core</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.springframework</groupId> <groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId> <artifactId>spring-test</artifactId>

73
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java

@ -43,69 +43,15 @@ public class ParamUtils {
/** /**
* parameter conversion * parameter conversion
* @param globalParams global params * Warning:
* @param globalParamsMap global params map * When you first invoke the function of convert, the variables of localParams and varPool in the ShellParameters will be modified.
* @param localParams local params * But in the whole system the variables of localParams and varPool have been used in other functions. I'm not sure if this current
* @param commandType command type * situation is wrong. So I cannot modify the original logic.
* @param scheduleTime schedule time *
* @return global params
*/
public static Map<String,Property> convert(Map<String,Property> globalParams,
Map<String,String> globalParamsMap,
Map<String,Property> localParams,
Map<String,Property> varParams,
CommandType commandType,
Date scheduleTime) {
if (globalParams == null && localParams == null) {
return null;
}
// if it is a complement,
// you need to pass in the task instance id to locate the time
// of the process instance complement
Map<String,String> timeParams = BusinessTimeUtils
.getBusinessTime(commandType,
scheduleTime);
if (globalParamsMap != null) {
timeParams.putAll(globalParamsMap);
}
if (globalParams != null && localParams != null) {
localParams.putAll(globalParams);
globalParams = localParams;
} else if (globalParams == null && localParams != null) {
globalParams = localParams;
}
if (varParams != null) {
varParams.putAll(globalParams);
globalParams = varParams;
}
Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, Property> en = iter.next();
Property property = en.getValue();
if (StringUtils.isNotEmpty(property.getValue())
&& property.getValue().startsWith("$")) {
/**
* local parameter refers to global parameter with the same name
* note: the global parameters of the process instance here are solidified parameters,
* and there are no variables in them.
*/
String val = property.getValue();
val = ParameterUtils.convertParameterPlaceholders(val, timeParams);
property.setValue(val);
}
}
return globalParams;
}
/**
* parameter conversion
* @param taskExecutionContext the context of this task instance * @param taskExecutionContext the context of this task instance
* @param parameters the parameters * @param parameters the parameters
* @return global params * @return global params
*
*/ */
public static Map<String,Property> convert(TaskExecutionContext taskExecutionContext, AbstractParameters parameters) { public static Map<String,Property> convert(TaskExecutionContext taskExecutionContext, AbstractParameters parameters) {
Preconditions.checkNotNull(taskExecutionContext); Preconditions.checkNotNull(taskExecutionContext);
@ -115,8 +61,11 @@ public class ParamUtils {
CommandType commandType = CommandType.of(taskExecutionContext.getCmdTypeIfComplement()); CommandType commandType = CommandType.of(taskExecutionContext.getCmdTypeIfComplement());
Date scheduleTime = taskExecutionContext.getScheduleTime(); Date scheduleTime = taskExecutionContext.getScheduleTime();
// combining local and global parameters
Map<String,Property> localParams = parameters.getLocalParametersMap(); Map<String,Property> localParams = parameters.getLocalParametersMap();
Map<String,Property> varParams = parameters.getVarPoolMap();
if (globalParams == null && localParams == null) { if (globalParams == null && localParams == null) {
return null; return null;
} }
@ -141,6 +90,10 @@ public class ParamUtils {
} else if (globalParams == null && localParams != null) { } else if (globalParams == null && localParams != null) {
globalParams = localParams; globalParams = localParams;
} }
if (varParams != null) {
varParams.putAll(globalParams);
globalParams = varParams;
}
Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator(); Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
Map.Entry<String, Property> en = iter.next(); Map.Entry<String, Property> en = iter.next();

54
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/RemoveZKNode.java

@ -1,54 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan("org.apache.dolphinscheduler")
public class RemoveZKNode implements CommandLineRunner {
private static final Integer ARGS_LENGTH = 1;
private static final Logger logger = LoggerFactory.getLogger(RemoveZKNode.class);
/**
* zookeeper operator
*/
private RegistryClient registryClient = RegistryClient.getInstance();
public static void main(String[] args) {
new SpringApplicationBuilder(RemoveZKNode.class).web(WebApplicationType.NONE).run(args);
}
@Override
public void run(String... args) throws Exception {
if (args.length != ARGS_LENGTH) {
logger.error("Usage: <node>");
return;
}
registryClient.remove(args[0]);
registryClient.close();
}
}

34
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.task.datax;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.common.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.common.datasource.DatasourceUtil; import org.apache.dolphinscheduler.common.datasource.DatasourceUtil;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
@ -154,13 +153,8 @@ public class DataxTask extends AbstractTask {
String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId()); String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName); Thread.currentThread().setName(threadLoggerInfoName);
// combining local and global parameters // replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
taskExecutionContext.getDefinedParams(),
dataXParameters.getLocalParametersMap(),
dataXParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
// run datax procesDataSourceService.s // run datax procesDataSourceService.s
String jsonFilePath = buildDataxJsonFile(paramsMap); String jsonFilePath = buildDataxJsonFile(paramsMap);
@ -258,7 +252,7 @@ public class DataxTask extends AbstractTask {
} }
ArrayNode urlArr = readerConn.putArray("jdbcUrl"); ArrayNode urlArr = readerConn.putArray("jdbcUrl");
urlArr.add(DatasourceUtil.getJdbcUrl(DbType.valueOf(dataXParameters.getDtType()), dataSourceCfg)); urlArr.add(DatasourceUtil.getJdbcUrl(DbType.valueOf(dataXParameters.getDsType()), dataSourceCfg));
readerConnArr.add(readerConn); readerConnArr.add(readerConn);
@ -276,7 +270,7 @@ public class DataxTask extends AbstractTask {
ArrayNode tableArr = writerConn.putArray("table"); ArrayNode tableArr = writerConn.putArray("table");
tableArr.add(dataXParameters.getTargetTable()); tableArr.add(dataXParameters.getTargetTable());
writerConn.put("jdbcUrl", DatasourceUtil.getJdbcUrl(DbType.valueOf(dataXParameters.getDsType()), dataTargetCfg)); writerConn.put("jdbcUrl", DatasourceUtil.getJdbcUrl(DbType.valueOf(dataXParameters.getDtType()), dataTargetCfg));
writerConnArr.add(writerConn); writerConnArr.add(writerConn);
ObjectNode writerParam = JSONUtils.createObjectNode(); ObjectNode writerParam = JSONUtils.createObjectNode();
@ -443,31 +437,31 @@ public class DataxTask extends AbstractTask {
} }
public String loadJvmEnv(DataxParameters dataXParameters) { public String loadJvmEnv(DataxParameters dataXParameters) {
int xms = dataXParameters.getXms() < 1 ? 1 : dataXParameters.getXms(); int xms = Math.max(dataXParameters.getXms(), 1);
int xmx = dataXParameters.getXmx() < 1 ? 1 : dataXParameters.getXmx(); int xmx = Math.max(dataXParameters.getXmx(), 1);
return String.format(JVM_PARAM, xms, xmx); return String.format(JVM_PARAM, xms, xmx);
} }
/** /**
* parsing synchronized column names in SQL statements * parsing synchronized column names in SQL statements
* *
* @param dsType the database type of the data source * @param sourceType the database type of the data source
* @param dtType the database type of the data target * @param targetType the database type of the data target
* @param dataSourceCfg the database connection parameters of the data source * @param dataSourceCfg the database connection parameters of the data source
* @param sql sql for data synchronization * @param sql sql for data synchronization
* @return Keyword converted column names * @return Keyword converted column names
*/ */
private String[] parsingSqlColumnNames(DbType dsType, DbType dtType, BaseConnectionParam dataSourceCfg, String sql) { private String[] parsingSqlColumnNames(DbType sourceType, DbType targetType, BaseConnectionParam dataSourceCfg, String sql) {
String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(dsType, sql); String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(sourceType, sql);
if (columnNames == null || columnNames.length == 0) { if (columnNames == null || columnNames.length == 0) {
logger.info("try to execute sql analysis query column name"); logger.info("try to execute sql analysis query column name");
columnNames = tryExecuteSqlResolveColumnNames(dataSourceCfg, sql); columnNames = tryExecuteSqlResolveColumnNames(sourceType, dataSourceCfg, sql);
} }
notNull(columnNames, String.format("parsing sql columns failed : %s", sql)); notNull(columnNames, String.format("parsing sql columns failed : %s", sql));
return DataxUtils.convertKeywordsColumns(dtType, columnNames); return DataxUtils.convertKeywordsColumns(targetType, columnNames);
} }
/** /**
@ -548,13 +542,13 @@ public class DataxTask extends AbstractTask {
* @param sql sql for data synchronization * @param sql sql for data synchronization
* @return column name array * @return column name array
*/ */
public String[] tryExecuteSqlResolveColumnNames(BaseConnectionParam baseDataSource, String sql) { public String[] tryExecuteSqlResolveColumnNames(DbType sourceType, BaseConnectionParam baseDataSource, String sql) {
String[] columnNames; String[] columnNames;
sql = String.format("SELECT t.* FROM ( %s ) t WHERE 0 = 1", sql); sql = String.format("SELECT t.* FROM ( %s ) t WHERE 0 = 1", sql);
sql = sql.replace(";", ""); sql = sql.replace(";", "");
try ( try (
Connection connection = DatasourceUtil.getConnection(DbType.valueOf(dataXParameters.getDtType()), baseDataSource); Connection connection = DatasourceUtil.getConnection(sourceType, baseDataSource);
PreparedStatement stmt = connection.prepareStatement(sql); PreparedStatement stmt = connection.prepareStatement(sql);
ResultSet resultSet = stmt.executeQuery()) { ResultSet resultSet = stmt.executeQuery()) {

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.task.flink; package org.apache.dolphinscheduler.server.worker.task.flink;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
@ -80,13 +79,8 @@ public class FlinkTask extends AbstractYarnTask {
if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) { if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) {
String args = flinkParameters.getMainArgs(); String args = flinkParameters.getMainArgs();
// replace placeholder // combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
taskExecutionContext.getDefinedParams(),
flinkParameters.getLocalParametersMap(),
flinkParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
logger.info("param Map : {}", paramsMap); logger.info("param Map : {}", paramsMap);
if (paramsMap != null) { if (paramsMap != null) {

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.worker.task.http; package org.apache.dolphinscheduler.server.worker.task.http;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.HttpMethod; import org.apache.dolphinscheduler.common.enums.HttpMethod;
import org.apache.dolphinscheduler.common.enums.HttpParametersType; import org.apache.dolphinscheduler.common.enums.HttpParametersType;
import org.apache.dolphinscheduler.common.process.HttpProperty; import org.apache.dolphinscheduler.common.process.HttpProperty;
@ -137,13 +136,9 @@ public class HttpTask extends AbstractTask {
protected CloseableHttpResponse sendRequest(CloseableHttpClient client) throws IOException { protected CloseableHttpResponse sendRequest(CloseableHttpClient client) throws IOException {
RequestBuilder builder = createRequestBuilder(); RequestBuilder builder = createRequestBuilder();
// replace placeholder // replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
taskExecutionContext.getDefinedParams(),
httpParameters.getLocalParametersMap(),
httpParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
List<HttpProperty> httpPropertyList = new ArrayList<>(); List<HttpProperty> httpPropertyList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(httpParameters.getHttpParams())) { if (CollectionUtils.isNotEmpty(httpParameters.getHttpParams())) {
for (HttpProperty httpProperty : httpParameters.getHttpParams()) { for (HttpProperty httpProperty : httpParameters.getHttpParams()) {

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.worker.task.mr; package org.apache.dolphinscheduler.server.worker.task.mr;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
@ -84,13 +83,8 @@ public class MapReduceTask extends AbstractYarnTask {
mapreduceParameters.setQueue(taskExecutionContext.getQueue()); mapreduceParameters.setQueue(taskExecutionContext.getQueue());
setMainJarName(); setMainJarName();
// replace placeholder // replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
taskExecutionContext.getDefinedParams(),
mapreduceParameters.getLocalParametersMap(),
mapreduceParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
if (paramsMap != null) { if (paramsMap != null) {
String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap)); String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap));

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java

@ -30,7 +30,6 @@ import static org.apache.dolphinscheduler.common.enums.DataType.VARCHAR;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.datasource.ConnectionParam; import org.apache.dolphinscheduler.common.datasource.ConnectionParam;
import org.apache.dolphinscheduler.common.datasource.DatasourceUtil; import org.apache.dolphinscheduler.common.datasource.DatasourceUtil;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.Direct;
@ -119,12 +118,7 @@ public class ProcedureTask extends AbstractTask {
connection = DatasourceUtil.getConnection(dbType, connectionParam); connection = DatasourceUtil.getConnection(dbType, connectionParam);
// combining local and global parameters // combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
taskExecutionContext.getDefinedParams(),
procedureParameters.getLocalParametersMap(),
procedureParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
// call method // call method
stmt = connection.prepareCall(procedureParameters.getMethod()); stmt = connection.prepareCall(procedureParameters.getMethod());

18
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java

@ -14,25 +14,26 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.worker.task.python;
package org.apache.dolphinscheduler.server.worker.task.python;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.python.PythonParameters; import org.apache.dolphinscheduler.common.task.python.PythonParameters;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.VarPoolUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
import org.slf4j.Logger;
import java.util.Map; import java.util.Map;
import org.slf4j.Logger;
/** /**
* python task * python task
*/ */
@ -115,13 +116,8 @@ public class PythonTask extends AbstractTask {
private String buildCommand() throws Exception { private String buildCommand() throws Exception {
String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n"); String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
// replace placeholder // combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
taskExecutionContext.getDefinedParams(),
pythonParameters.getLocalParametersMap(),
pythonParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
try { try {
rawPythonScript = VarPoolUtils.convertPythonScriptPlaceholders(rawPythonScript); rawPythonScript = VarPoolUtils.convertPythonScriptPlaceholders(rawPythonScript);

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java

@ -21,7 +21,6 @@ import static java.util.Calendar.DAY_OF_MONTH;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
@ -42,10 +41,8 @@ import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions; import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -166,7 +163,7 @@ public class ShellTask extends AbstractTask {
private String parseScript(String script) { private String parseScript(String script) {
// combining local and global parameters // combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,shellParameters); Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
if (taskExecutionContext.getScheduleTime() != null) { if (taskExecutionContext.getScheduleTime() != null) {

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.task.spark; package org.apache.dolphinscheduler.server.worker.task.spark;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.SparkVersion; import org.apache.dolphinscheduler.common.enums.SparkVersion;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
@ -109,13 +108,8 @@ public class SparkTask extends AbstractYarnTask {
// other parameters // other parameters
args.addAll(SparkArgsUtils.buildArgs(sparkParameters)); args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
// replace placeholder // replace placeholder, and combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
taskExecutionContext.getDefinedParams(),
sparkParameters.getLocalParametersMap(),
sparkParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
String command = null; String command = null;

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.task.sql;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.common.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.common.datasource.DatasourceUtil; import org.apache.dolphinscheduler.common.datasource.DatasourceUtil;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
@ -166,14 +165,8 @@ public class SqlTask extends AbstractTask {
Map<Integer, Property> sqlParamsMap = new HashMap<>(); Map<Integer, Property> sqlParamsMap = new HashMap<>();
StringBuilder sqlBuilder = new StringBuilder(); StringBuilder sqlBuilder = new StringBuilder();
// find process instance by task id // combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
sqlParameters.getLocalParametersMap(),
sqlParameters.getVarPoolMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
// spell SQL according to the final user-defined variable // spell SQL according to the final user-defined variable
if (paramsMap == null) { if (paramsMap == null) {
@ -276,7 +269,6 @@ public class SqlTask extends AbstractTask {
} }
} }
public String setNonQuerySqlReturn(String updateResult, List<Property> properties) { public String setNonQuerySqlReturn(String updateResult, List<Property> properties) {
String result = null; String result = null;
for (Property info :properties) { for (Property info :properties) {

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.task.sqoop; package org.apache.dolphinscheduler.server.worker.task.sqoop;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
@ -73,12 +72,8 @@ public class SqoopTask extends AbstractYarnTask {
SqoopJobGenerator generator = new SqoopJobGenerator(); SqoopJobGenerator generator = new SqoopJobGenerator();
String script = generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext); String script = generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext);
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sqoopTaskExecutionContext.getDefinedParams()), // combining local and global parameters
sqoopTaskExecutionContext.getDefinedParams(), Map<String, Property> paramsMap = ParamUtils.convert(sqoopTaskExecutionContext,getParameters());
sqoopParameters.getLocalParametersMap(),
sqoopParameters.getVarPoolMap(),
CommandType.of(sqoopTaskExecutionContext.getCmdTypeIfComplement()),
sqoopTaskExecutionContext.getScheduleTime());
if (paramsMap != null) { if (paramsMap != null) {
String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));

47
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java

@ -14,27 +14,21 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master; package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import java.util.Calendar; import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* user define param * user define param
*/ */
@ -42,7 +36,6 @@ public class ParamsTest {
private static final Logger logger = LoggerFactory.getLogger(ParamsTest.class); private static final Logger logger = LoggerFactory.getLogger(ParamsTest.class);
@Test @Test
public void systemParamsTest() throws Exception { public void systemParamsTest() throws Exception {
String command = "${system.biz.date}"; String command = "${system.biz.date}";
@ -56,12 +49,10 @@ public class ParamsTest {
logger.info("start process : {}",command); logger.info("start process : {}",command);
Calendar calendar = Calendar.getInstance(); Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date()); calendar.setTime(new Date());
calendar.add(Calendar.DAY_OF_MONTH, -5); calendar.add(Calendar.DAY_OF_MONTH, -5);
command = "${system.biz.date}"; command = "${system.biz.date}";
// complement data // complement data
timeParams = BusinessTimeUtils timeParams = BusinessTimeUtils
@ -71,40 +62,4 @@ public class ParamsTest {
logger.info("complement data : {}",command); logger.info("complement data : {}",command);
} }
@Test
public void convertTest() throws Exception {
Map<String, Property> globalParams = new HashMap<>();
Property property = new Property();
property.setProp("global_param");
property.setDirect(Direct.IN);
property.setType(DataType.VARCHAR);
property.setValue("${system.biz.date}");
globalParams.put("global_param", property);
Map<String, String> globalParamsMap = new HashMap<>();
globalParamsMap.put("global_param", "${system.biz.date}");
Map<String, Property> localParams = new HashMap<>();
Property localProperty = new Property();
localProperty.setProp("local_param");
localProperty.setDirect(Direct.IN);
localProperty.setType(DataType.VARCHAR);
localProperty.setValue("${global_param}");
localParams.put("local_param", localProperty);
Map<String, Property> varPoolParams = new HashMap<>();
Property varProperty = new Property();
varProperty.setProp("local_param");
varProperty.setDirect(Direct.IN);
varProperty.setType(DataType.VARCHAR);
varProperty.setValue("${global_param}");
varPoolParams.put("varPool", varProperty);
Map<String, Property> paramsMap = ParamUtils.convert(globalParams, globalParamsMap,
localParams,varPoolParams, CommandType.START_PROCESS, new Date());
logger.info(JSONUtils.toJsonString(paramsMap));
}
} }

61
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java

@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
@ -85,7 +84,7 @@ public class ParamUtilsTest {
localParams.put("local_param", localProperty); localParams.put("local_param", localProperty);
Property varProperty = new Property(); Property varProperty = new Property();
varProperty.setProp("local_param"); varProperty.setProp("varPool");
varProperty.setDirect(Direct.IN); varProperty.setDirect(Direct.IN);
varProperty.setType(DataType.VARCHAR); varProperty.setType(DataType.VARCHAR);
varProperty.setValue("${global_param}"); varProperty.setValue("${global_param}");
@ -93,42 +92,72 @@ public class ParamUtilsTest {
} }
/** /**
* Test convert * This is basic test case for ParamUtils.convert.
* Warning:
* As you can see,this case invokes the function of convert in different situations. When you first invoke the function of convert,
* the variables of localParams and varPool in the ShellParameters will be modified. But in the whole system the variables of localParams
* and varPool have been used in other functions. I'm not sure if this current situation is wrong. So I cannot modify the original logic.
*/ */
@Test @Test
public void testConvert() { public void testConvert() {
//The expected value //The expected value
String expected = "{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," String expected = "{\"varPool\":{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," + "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}"; + "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
//The expected value when globalParams is null but localParams is not null //The expected value when globalParams is null but localParams is not null
String expected1 = "{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," String expected1 = "{\"varPool\":{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}"; + "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
//Define expected date , the month is 0-base //Define expected date , the month is 0-base
Calendar calendar = Calendar.getInstance(); Calendar calendar = Calendar.getInstance();
calendar.set(2019, 11, 30); calendar.set(2019, 11, 30);
Date date = calendar.getTime(); Date date = calendar.getTime();
List<Property> globalParamList = globalParams.values().stream().collect(Collectors.toList());
List<Property> localParamList = localParams.values().stream().collect(Collectors.toList());
List<Property> varPoolParamList = varPoolParams.values().stream().collect(Collectors.toList());
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskName("params test");
taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
taskExecutionContext.setHost("127.0.0.1:1234");
taskExecutionContext.setExecutePath("/tmp/test");
taskExecutionContext.setLogPath("/log");
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setExecutorId(1);
taskExecutionContext.setCmdTypeIfComplement(0);
taskExecutionContext.setScheduleTime(date);
taskExecutionContext.setGlobalParams(JSONUtils.toJsonString(globalParamList));
taskExecutionContext.setDefinedParams(globalParamsMap);
taskExecutionContext.setVarPool("[{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"${global_param}\"}]");
taskExecutionContext.setTaskParams(
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss]\\necho \\\" ${task_execution_id} \\\"\\necho \\\" ${task_execution_path}\\\"\\n\","
+ "\"localParams\":"
+ "[],\"resourceList\":[]}");
ShellParameters shellParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ShellParameters.class);
shellParameters.setLocalParams(localParamList);
String varPoolParamsJson = JSONUtils.toJsonString(varPoolParams,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
shellParameters.setVarPool(taskExecutionContext.getVarPool());
shellParameters.dealOutParam(varPoolParamsJson);
//Invoke convert //Invoke convert
Map<String, Property> paramsMap = ParamUtils.convert(globalParams, globalParamsMap, localParams, varPoolParams,CommandType.START_PROCESS, date); Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, shellParameters);
String result = JSONUtils.toJsonString(paramsMap); String result = JSONUtils.toJsonString(paramsMap);
assertEquals(expected, result); assertEquals(expected, result);
for (Map.Entry<String, Property> entry : paramsMap.entrySet()) {
String key = entry.getKey();
Property prop = entry.getValue();
logger.info(key + " : " + prop.getValue());
}
//Invoke convert with null globalParams //Invoke convert with null globalParams
Map<String, Property> paramsMap1 = ParamUtils.convert(null, globalParamsMap, localParams,varPoolParams, CommandType.START_PROCESS, date); taskExecutionContext.setDefinedParams(null);
Map<String, Property> paramsMap1 = ParamUtils.convert(taskExecutionContext, shellParameters);
String result1 = JSONUtils.toJsonString(paramsMap1); String result1 = JSONUtils.toJsonString(paramsMap1);
assertEquals(expected1, result1); assertEquals(expected1, result1);
// Null check, invoke convert with null globalParams and null localParams // Null check, invoke convert with null globalParams and null localParams
Map<String, Property> paramsMap2 = ParamUtils.convert(null, globalParamsMap, null, varPoolParams,CommandType.START_PROCESS, date); shellParameters.setLocalParams(null);
Map<String, Property> paramsMap2 = ParamUtils.convert(taskExecutionContext, shellParameters);
assertNull(paramsMap2); assertNull(paramsMap2);
} }

6
dolphinscheduler-service/pom.xml

@ -94,5 +94,11 @@
<artifactId>mockito-core</artifactId> <artifactId>mockito-core</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java

@ -25,8 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
* A singleton of a task queue implemented with zookeeper * A singleton of a task queue implemented using PriorityBlockingQueue
* tasks queue implementation
*/ */
@Service @Service
public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> { public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> {

6
dolphinscheduler-spi/pom.xml

@ -57,6 +57,12 @@
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>

5
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.js

@ -110,6 +110,7 @@ Dag.prototype.toolbarEvent = function ({ item, code, is }) {
* Echo data display * Echo data display
*/ */
Dag.prototype.backfill = function (arg) { Dag.prototype.backfill = function (arg) {
const that = this
if (arg) { if (arg) {
const marginX = 100 const marginX = 100
const g = new dagre.graphlib.Graph() const g = new dagre.graphlib.Graph()
@ -144,7 +145,7 @@ Dag.prototype.backfill = function (arg) {
instance: this.instance, instance: this.instance,
options: { options: {
onRemoveNodes ($id) { onRemoveNodes ($id) {
this.dag.removeEventModelById($id) that.dag.removeEventModelById($id)
} }
} }
}) })
@ -167,7 +168,7 @@ Dag.prototype.backfill = function (arg) {
instance: this.instance, instance: this.instance,
options: { options: {
onRemoveNodes ($id) { onRemoveNodes ($id) {
this.dag.removeEventModelById($id) that.dag.removeEventModelById($id)
} }
} }
}) })

8
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue

@ -893,4 +893,12 @@
.operBtn { .operBtn {
padding: 8px 6px; padding: 8px 6px;
} }
.el-drawer__body {
::selection {
background: #409EFF;
color: white;
}
}
</style> </style>

44
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/resourceTree.js

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export function diGuiTree (items) { // Recursive convenience tree structure
items.forEach(item => {
item.children === '' || item.children === undefined || item.children === null || item.children.length === 0
? operationTree(item) : diGuiTree(item.children)
})
}
export function operationTree (item) {
if (item.dirctory) {
item.isDisabled = true
}
delete item.children
}
export function searchTree (element, id) {
// 根据id查找节点
if (element.id === id) {
return element
} else if (element.children) {
let i
let result = null
for (i = 0; result === null && i < element.children.length; i++) {
result = searchTree(element.children[i], id)
}
return result
}
return null
}

39
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue

@ -204,6 +204,7 @@
import '@riophae/vue-treeselect/dist/vue-treeselect.css' import '@riophae/vue-treeselect/dist/vue-treeselect.css'
import disabledState from '@/module/mixin/disabledState' import disabledState from '@/module/mixin/disabledState'
import Clipboard from 'clipboard' import Clipboard from 'clipboard'
import { diGuiTree, searchTree } from './_source/resourceTree'
export default { export default {
name: 'flink', name: 'flink',
@ -398,40 +399,14 @@
}) })
return true return true
}, },
diGuiTree (item) { // Recursive convenience tree structure
item.forEach(item => {
item.children === '' || item.children === undefined || item.children === null || item.children.length === 0
? this.operationTree(item) : this.diGuiTree(item.children)
})
},
operationTree (item) {
if (item.dirctory) {
item.isDisabled = true
}
delete item.children
},
searchTree (element, id) {
// id
if (element.id === id) {
return element
} else if (element.children !== null) {
let i
let result = null
for (i = 0; result === null && i < element.children.length; i++) {
result = this.searchTree(element.children[i], id)
}
return result
}
return null
},
dataProcess (backResource) { dataProcess (backResource) {
let isResourceId = [] let isResourceId = []
let resourceIdArr = [] let resourceIdArr = []
if (this.resourceList.length > 0) { if (this.resourceList.length > 0) {
this.resourceList.forEach(v => { this.resourceList.forEach(v => {
this.mainJarList.forEach(v1 => { this.mainJarList.forEach(v1 => {
if (this.searchTree(v1, v)) { if (searchTree(v1, v)) {
isResourceId.push(this.searchTree(v1, v)) isResourceId.push(searchTree(v1, v))
} }
}) })
}) })
@ -503,8 +478,8 @@
if (this.resourceList.length > 0) { if (this.resourceList.length > 0) {
this.resourceList.forEach(v => { this.resourceList.forEach(v => {
this.mainJarList.forEach(v1 => { this.mainJarList.forEach(v1 => {
if (this.searchTree(v1, v)) { if (searchTree(v1, v)) {
isResourceId.push(this.searchTree(v1, v)) isResourceId.push(searchTree(v1, v))
} }
}) })
}) })
@ -538,8 +513,8 @@
created () { created () {
let item = this.store.state.dag.resourcesListS let item = this.store.state.dag.resourcesListS
let items = this.store.state.dag.resourcesListJar let items = this.store.state.dag.resourcesListJar
this.diGuiTree(item) diGuiTree(item)
this.diGuiTree(items) diGuiTree(items)
this.mainJarList = item this.mainJarList = item
this.mainJarLists = items this.mainJarLists = items
let o = this.backfillItem let o = this.backfillItem

39
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/mr.vue

@ -117,6 +117,7 @@
import '@riophae/vue-treeselect/dist/vue-treeselect.css' import '@riophae/vue-treeselect/dist/vue-treeselect.css'
import disabledState from '@/module/mixin/disabledState' import disabledState from '@/module/mixin/disabledState'
import Clipboard from 'clipboard' import Clipboard from 'clipboard'
import { diGuiTree, searchTree } from './_source/resourceTree'
export default { export default {
name: 'mr', name: 'mr',
@ -210,40 +211,14 @@
_onCacheResourcesData (a) { _onCacheResourcesData (a) {
this.cacheResourceList = a this.cacheResourceList = a
}, },
diGuiTree (item) { // Recursive convenience tree structure
item.forEach(item => {
item.children === '' || item.children === undefined || item.children === null || item.children.length === 0
? this.operationTree(item) : this.diGuiTree(item.children)
})
},
operationTree (item) {
if (item.dirctory) {
item.isDisabled = true
}
delete item.children
},
searchTree (element, id) {
// id
if (element.id === id) {
return element
} else if (element.children !== null) {
let i
let result = null
for (i = 0; result === null && i < element.children.length; i++) {
result = this.searchTree(element.children[i], id)
}
return result
}
return null
},
dataProcess (backResource) { dataProcess (backResource) {
let isResourceId = [] let isResourceId = []
let resourceIdArr = [] let resourceIdArr = []
if (this.resourceList.length > 0) { if (this.resourceList.length > 0) {
this.resourceList.forEach(v => { this.resourceList.forEach(v => {
this.mainJarList.forEach(v1 => { this.mainJarList.forEach(v1 => {
if (this.searchTree(v1, v)) { if (searchTree(v1, v)) {
isResourceId.push(this.searchTree(v1, v)) isResourceId.push(searchTree(v1, v))
} }
}) })
}) })
@ -359,8 +334,8 @@
if (this.resourceList.length > 0) { if (this.resourceList.length > 0) {
this.resourceList.forEach(v => { this.resourceList.forEach(v => {
this.mainJarList.forEach(v1 => { this.mainJarList.forEach(v1 => {
if (this.searchTree(v1, v)) { if (searchTree(v1, v)) {
isResourceId.push(this.searchTree(v1, v)) isResourceId.push(searchTree(v1, v))
} }
}) })
}) })
@ -388,8 +363,8 @@
created () { created () {
let item = this.store.state.dag.resourcesListS let item = this.store.state.dag.resourcesListS
let items = this.store.state.dag.resourcesListJar let items = this.store.state.dag.resourcesListJar
this.diGuiTree(item) diGuiTree(item)
this.diGuiTree(items) diGuiTree(items)
this.mainJarList = item this.mainJarList = item
this.mainJarLists = items this.mainJarLists = items
let o = this.backfillItem let o = this.backfillItem

38
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/python.vue

@ -66,6 +66,8 @@
import disabledState from '@/module/mixin/disabledState' import disabledState from '@/module/mixin/disabledState'
import codemirror from '@/conf/home/pages/resource/pages/file/pages/_source/codemirror' import codemirror from '@/conf/home/pages/resource/pages/file/pages/_source/codemirror'
import Clipboard from 'clipboard' import Clipboard from 'clipboard'
import { diGuiTree, searchTree } from './_source/resourceTree'
let editor let editor
export default { export default {
@ -198,40 +200,14 @@
return editor return editor
}, },
diGuiTree (item) { // Recursive convenience tree structure
item.forEach(item => {
item.children === '' || item.children === undefined || item.children === null || item.children.length === 0
? this.operationTree(item) : this.diGuiTree(item.children)
})
},
operationTree (item) {
if (item.dirctory) {
item.isDisabled = true
}
delete item.children
},
searchTree (element, id) {
// id
if (element.id === id) {
return element
} else if (element.children !== null) {
let i
let result = null
for (i = 0; result === null && i < element.children.length; i++) {
result = this.searchTree(element.children[i], id)
}
return result
}
return null
},
dataProcess (backResource) { dataProcess (backResource) {
let isResourceId = [] let isResourceId = []
let resourceIdArr = [] let resourceIdArr = []
if (this.resourceList.length > 0) { if (this.resourceList.length > 0) {
this.resourceList.forEach(v => { this.resourceList.forEach(v => {
this.resourceOptions.forEach(v1 => { this.resourceOptions.forEach(v1 => {
if (this.searchTree(v1, v)) { if (searchTree(v1, v)) {
isResourceId.push(this.searchTree(v1, v)) isResourceId.push(searchTree(v1, v))
} }
}) })
}) })
@ -297,8 +273,8 @@
if (this.resourceList.length > 0) { if (this.resourceList.length > 0) {
this.resourceList.forEach(v => { this.resourceList.forEach(v => {
this.resourceOptions.forEach(v1 => { this.resourceOptions.forEach(v1 => {
if (this.searchTree(v1, v)) { if (searchTree(v1, v)) {
isResourceId.push(this.searchTree(v1, v)) isResourceId.push(searchTree(v1, v))
} }
}) })
}) })
@ -317,7 +293,7 @@
}, },
created () { created () {
let item = this.store.state.dag.resourcesListS let item = this.store.state.dag.resourcesListS
this.diGuiTree(item) diGuiTree(item)
this.resourceOptions = item this.resourceOptions = item
let o = this.backfillItem let o = this.backfillItem

38
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/shell.vue

@ -69,6 +69,8 @@
import '@riophae/vue-treeselect/dist/vue-treeselect.css' import '@riophae/vue-treeselect/dist/vue-treeselect.css'
import codemirror from '@/conf/home/pages/resource/pages/file/pages/_source/codemirror' import codemirror from '@/conf/home/pages/resource/pages/file/pages/_source/codemirror'
import Clipboard from 'clipboard' import Clipboard from 'clipboard'
import { diGuiTree, searchTree } from './_source/resourceTree'
let editor let editor
export default { export default {
@ -208,40 +210,14 @@
return editor return editor
}, },
diGuiTree (item) { // Recursive convenience tree structure
item.forEach(item => {
item.children === '' || item.children === undefined || item.children === null || item.children.length === 0
? this.operationTree(item) : this.diGuiTree(item.children)
})
},
operationTree (item) {
if (item.dirctory) {
item.isDisabled = true
}
delete item.children
},
searchTree (element, id) {
// id
if (element.id === id) {
return element
} else if (element.children !== null) {
let i
let result = null
for (i = 0; result === null && i < element.children.length; i++) {
result = this.searchTree(element.children[i], id)
}
return result
}
return null
},
dataProcess (backResource) { dataProcess (backResource) {
let isResourceId = [] let isResourceId = []
let resourceIdArr = [] let resourceIdArr = []
if (this.resourceList.length > 0) { if (this.resourceList.length > 0) {
this.resourceList.forEach(v => { this.resourceList.forEach(v => {
this.options.forEach(v1 => { this.options.forEach(v1 => {
if (this.searchTree(v1, v)) { if (searchTree(v1, v)) {
isResourceId.push(this.searchTree(v1, v)) isResourceId.push(searchTree(v1, v))
} }
}) })
}) })
@ -307,8 +283,8 @@
if (this.resourceList.length > 0) { if (this.resourceList.length > 0) {
this.resourceList.forEach(v => { this.resourceList.forEach(v => {
this.options.forEach(v1 => { this.options.forEach(v1 => {
if (this.searchTree(v1, v)) { if (searchTree(v1, v)) {
isResourceId.push(this.searchTree(v1, v)) isResourceId.push(searchTree(v1, v))
} }
}) })
}) })
@ -327,7 +303,7 @@
}, },
created () { created () {
let item = this.store.state.dag.resourcesListS let item = this.store.state.dag.resourcesListS
this.diGuiTree(item) diGuiTree(item)
this.options = item this.options = item
let o = this.backfillItem let o = this.backfillItem

40
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue

@ -205,6 +205,8 @@
import '@riophae/vue-treeselect/dist/vue-treeselect.css' import '@riophae/vue-treeselect/dist/vue-treeselect.css'
import disabledState from '@/module/mixin/disabledState' import disabledState from '@/module/mixin/disabledState'
import Clipboard from 'clipboard' import Clipboard from 'clipboard'
import { diGuiTree, searchTree } from './_source/resourceTree'
export default { export default {
name: 'spark', name: 'spark',
data () { data () {
@ -313,40 +315,14 @@
_onCacheResourcesData (a) { _onCacheResourcesData (a) {
this.cacheResourceList = a this.cacheResourceList = a
}, },
diGuiTree (item) { // Recursive convenience tree structure
item.forEach(item => {
item.children === '' || item.children === undefined || item.children === null || item.children.length === 0
? this.operationTree(item) : this.diGuiTree(item.children)
})
},
operationTree (item) {
if (item.dirctory) {
item.isDisabled = true
}
delete item.children
},
searchTree (element, id) {
// id
if (element.id === id) {
return element
} else if (element.children !== null) {
let i
let result = null
for (i = 0; result === null && i < element.children.length; i++) {
result = this.searchTree(element.children[i], id)
}
return result
}
return null
},
dataProcess (backResource) { dataProcess (backResource) {
let isResourceId = [] let isResourceId = []
let resourceIdArr = [] let resourceIdArr = []
if (this.resourceList.length > 0) { if (this.resourceList.length > 0) {
this.resourceList.forEach(v => { this.resourceList.forEach(v => {
this.mainJarList.forEach(v1 => { this.mainJarList.forEach(v1 => {
if (this.searchTree(v1, v)) { if (searchTree(v1, v)) {
isResourceId.push(this.searchTree(v1, v)) isResourceId.push(searchTree(v1, v))
} }
}) })
}) })
@ -521,8 +497,8 @@
if (this.resourceList.length > 0) { if (this.resourceList.length > 0) {
this.resourceList.forEach(v => { this.resourceList.forEach(v => {
this.mainJarList.forEach(v1 => { this.mainJarList.forEach(v1 => {
if (this.searchTree(v1, v)) { if (searchTree(v1, v)) {
isResourceId.push(this.searchTree(v1, v)) isResourceId.push(searchTree(v1, v))
} }
}) })
}) })
@ -557,8 +533,8 @@
created () { created () {
let item = this.store.state.dag.resourcesListS let item = this.store.state.dag.resourcesListS
let items = this.store.state.dag.resourcesListJar let items = this.store.state.dag.resourcesListJar
this.diGuiTree(item) diGuiTree(item)
this.diGuiTree(items) diGuiTree(items)
this.mainJarList = item this.mainJarList = item
this.mainJarLists = items this.mainJarLists = items
let o = this.backfillItem let o = this.backfillItem

37
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/waterdrop.vue

@ -99,6 +99,7 @@
import disabledState from '@/module/mixin/disabledState' import disabledState from '@/module/mixin/disabledState'
import Treeselect from '@riophae/vue-treeselect' import Treeselect from '@riophae/vue-treeselect'
import '@riophae/vue-treeselect/dist/vue-treeselect.css' import '@riophae/vue-treeselect/dist/vue-treeselect.css'
import { diGuiTree, searchTree } from './_source/resourceTree'
export default { export default {
name: 'waterdrop', name: 'waterdrop',
@ -228,40 +229,14 @@
return true return true
}, },
diGuiTree (item) { // Recursive convenience tree structure
item.forEach(item => {
item.children === '' || item.children === undefined || item.children === null || item.children.length === 0
? this.operationTree(item) : this.diGuiTree(item.children)
})
},
operationTree (item) {
if (item.dirctory) {
item.isDisabled = true
}
delete item.children
},
searchTree (element, id) {
// id
if (element.id === id) {
return element
} else if (element.children !== null) {
let i
let result = null
for (i = 0; result === null && i < element.children.length; i++) {
result = this.searchTree(element.children[i], id)
}
return result
}
return null
},
dataProcess (backResource) { dataProcess (backResource) {
let isResourceId = [] let isResourceId = []
let resourceIdArr = [] let resourceIdArr = []
if (this.resourceList.length > 0) { if (this.resourceList.length > 0) {
this.resourceList.forEach(v => { this.resourceList.forEach(v => {
this.options.forEach(v1 => { this.options.forEach(v1 => {
if (this.searchTree(v1, v)) { if (searchTree(v1, v)) {
isResourceId.push(this.searchTree(v1, v)) isResourceId.push(searchTree(v1, v))
} }
}) })
}) })
@ -340,8 +315,8 @@
if (this.resourceList.length > 0) { if (this.resourceList.length > 0) {
this.resourceList.forEach(v => { this.resourceList.forEach(v => {
this.options.forEach(v1 => { this.options.forEach(v1 => {
if (this.searchTree(v1, v)) { if (searchTree(v1, v)) {
isResourceId.push(this.searchTree(v1, v)) isResourceId.push(searchTree(v1, v))
} }
}) })
}) })
@ -364,7 +339,7 @@
}, },
created () { created () {
let item = this.store.state.dag.resourcesListS let item = this.store.state.dag.resourcesListS
this.diGuiTree(item) diGuiTree(item)
this.options = item this.options = item
let o = this.backfillItem let o = this.backfillItem

5
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/udp/udp.vue

@ -281,6 +281,11 @@
max-height: 600px; max-height: 600px;
overflow-y: scroll; overflow-y: scroll;
padding:0 20px; padding:0 20px;
::selection {
background: #409EFF ;
color: white;
}
} }
.title { .title {
line-height: 36px; line-height: 36px;

27
pom.xml

@ -409,6 +409,14 @@
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<version>${jacoco.version}</version>
<classifier>runtime</classifier>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>mysql</groupId> <groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId> <artifactId>mysql-connector-java</artifactId>
@ -826,6 +834,9 @@
<artifactId>maven-surefire-plugin</artifactId> <artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version> <version>${maven-surefire-plugin.version}</version>
<configuration> <configuration>
<systemPropertyVariables>
<jacoco-agent.destfile>${project.build.directory}/jacoco.exec</jacoco-agent.destfile>
</systemPropertyVariables>
<includes> <includes>
<!--registry plugin --> <!--registry plugin -->
<include>**/plugin/registry/zookeeper/ZookeeperRegistryTest.java</include> <include>**/plugin/registry/zookeeper/ZookeeperRegistryTest.java</include>
@ -1093,19 +1104,23 @@
<artifactId>jacoco-maven-plugin</artifactId> <artifactId>jacoco-maven-plugin</artifactId>
<version>${jacoco.version}</version> <version>${jacoco.version}</version>
<configuration> <configuration>
<destFile>target/jacoco.exec</destFile> <dataFile>${project.build.directory}/jacoco.exec</dataFile>
<dataFile>target/jacoco.exec</dataFile>
</configuration> </configuration>
<executions> <executions>
<execution> <execution>
<id>jacoco-initialize</id> <id>default-instrument</id>
<goals>
<goal>instrument</goal>
</goals>
</execution>
<execution>
<id>default-restore-instrumented-classes</id>
<goals> <goals>
<goal>prepare-agent</goal> <goal>restore-instrumented-classes</goal>
</goals> </goals>
</execution> </execution>
<execution> <execution>
<id>jacoco-site</id> <id>default-report</id>
<phase>test</phase>
<goals> <goals>
<goal>report</goal> <goal>report</goal>
</goals> </goals>

4
sql/dolphinscheduler_mysql.sql

@ -305,7 +305,7 @@ CREATE TABLE `t_ds_alertgroup`(
`create_time` datetime DEFAULT NULL COMMENT 'create time', `create_time` datetime DEFAULT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time', `update_time` datetime DEFAULT NULL COMMENT 'update time',
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
UNIQUE KEY `t_ds_alertgroup_name_UN` (`group_name`) UNIQUE KEY `t_ds_alertgroup_name_un` (`group_name`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ---------------------------- -- ----------------------------
@ -352,7 +352,7 @@ CREATE TABLE `t_ds_datasource` (
`create_time` datetime NOT NULL COMMENT 'create time', `create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time', `update_time` datetime DEFAULT NULL COMMENT 'update time',
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
UNIQUE KEY `t_ds_datasource_name_UN` (`name`, `type`) UNIQUE KEY `t_ds_datasource_name_un` (`name`, `type`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ---------------------------- -- ----------------------------

4
sql/dolphinscheduler_postgre.sql

@ -209,7 +209,7 @@ CREATE TABLE t_ds_alertgroup(
create_time timestamp DEFAULT NULL, create_time timestamp DEFAULT NULL,
update_time timestamp DEFAULT NULL, update_time timestamp DEFAULT NULL,
PRIMARY KEY (id), PRIMARY KEY (id),
CONSTRAINT t_ds_alertgroup_name_UN UNIQUE (group_name) CONSTRAINT t_ds_alertgroup_name_un UNIQUE (group_name)
) ; ) ;
-- --
@ -250,7 +250,7 @@ CREATE TABLE t_ds_datasource (
create_time timestamp NOT NULL , create_time timestamp NOT NULL ,
update_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL ,
PRIMARY KEY (id), PRIMARY KEY (id),
CONSTRAINT t_ds_datasource_name_UN UNIQUE (name, type) CONSTRAINT t_ds_datasource_name_un UNIQUE (name, type)
) ; ) ;
-- --

10
sql/upgrade/1.4.0_schema/mysql/dolphinscheduler_ddl.sql

@ -285,9 +285,9 @@ BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.STATISTICS IF NOT EXISTS (SELECT 1 FROM information_schema.STATISTICS
WHERE TABLE_NAME='t_ds_alertgroup' WHERE TABLE_NAME='t_ds_alertgroup'
AND TABLE_SCHEMA=(SELECT DATABASE()) AND TABLE_SCHEMA=(SELECT DATABASE())
AND INDEX_NAME ='t_ds_alertgroup_name_UN') AND INDEX_NAME ='t_ds_alertgroup_name_un')
THEN THEN
ALTER TABLE t_ds_alertgroup ADD UNIQUE KEY `t_ds_alertgroup_name_UN` (`group_name`); ALTER TABLE t_ds_alertgroup ADD UNIQUE KEY `t_ds_alertgroup_name_un` (`group_name`);
END IF; END IF;
END; END;
@ -302,12 +302,12 @@ drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_datasource_A_add_UN_datasourceName;
delimiter d// delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_datasource_A_add_UN_datasourceName() CREATE PROCEDURE uc_dolphin_T_t_ds_datasource_A_add_UN_datasourceName()
BEGIN BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS IF NOT EXISTS (SELECT 1 FROM information_schema.STATISTICS
WHERE TABLE_NAME='t_ds_datasource' WHERE TABLE_NAME='t_ds_datasource'
AND TABLE_SCHEMA=(SELECT DATABASE()) AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='t_ds_datasource_name_UN') AND INDEX_NAME ='t_ds_datasource_name_un')
THEN THEN
ALTER TABLE t_ds_datasource ADD UNIQUE KEY `t_ds_datasource_name_UN` (`name`, `type`); ALTER TABLE t_ds_datasource ADD UNIQUE KEY `t_ds_datasource_name_un` (`name`, `type`);
END IF; END IF;
END; END;

8
sql/upgrade/1.4.0_schema/postgresql/dolphinscheduler_ddl.sql

@ -274,9 +274,9 @@ CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_alertgroup_A_add_UN_groupName() RET
BEGIN BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_stat_all_indexes IF NOT EXISTS (SELECT 1 FROM pg_stat_all_indexes
WHERE relname='t_ds_alertgroup' WHERE relname='t_ds_alertgroup'
AND indexrelname ='t_ds_alertgroup_name_UN') AND indexrelname ='t_ds_alertgroup_name_un')
THEN THEN
ALTER TABLE t_ds_alertgroup ADD CONSTRAINT t_ds_alertgroup_name_UN UNIQUE (group_name); ALTER TABLE t_ds_alertgroup ADD CONSTRAINT t_ds_alertgroup_name_un UNIQUE (group_name);
END IF; END IF;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
@ -292,9 +292,9 @@ CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_datasource_A_add_UN_datasourceName(
BEGIN BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_stat_all_indexes IF NOT EXISTS (SELECT 1 FROM pg_stat_all_indexes
WHERE relname='t_ds_datasource' WHERE relname='t_ds_datasource'
AND indexrelname ='t_ds_datasource_name_UN') AND indexrelname ='t_ds_datasource_name_un')
THEN THEN
ALTER TABLE t_ds_datasource ADD CONSTRAINT t_ds_datasource_name_UN UNIQUE (name, type); ALTER TABLE t_ds_datasource ADD CONSTRAINT t_ds_datasource_name_un UNIQUE (name, type);
END IF; END IF;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;

Loading…
Cancel
Save