Browse Source

Add IntegretionTest for registry module (#15981)

3.2.2-release-bak
Wenjun Ruan 6 months ago committed by GitHub
parent
commit
7c8fa9b48c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 16
      .github/workflows/unit-test.yml
  2. 21
      dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-aliyunVoice/src/test/resources/logback.xml
  3. 21
      dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-dingtalk/src/test/resources/logback.xml
  4. 21
      dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-email/src/test/resources/logback.xml
  5. 21
      dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-feishu/src/test/resources/logback.xml
  6. 21
      dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-http/src/test/resources/logback.xml
  7. 21
      dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-pagerduty/src/test/resources/logback.xml
  8. 21
      dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-prometheus/src/test/resources/logback.xml
  9. 21
      dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-script/src/test/resources/logback.xml
  10. 21
      dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-slack/src/test/resources/logback.xml
  11. 21
      dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-telegram/src/test/resources/logback.xml
  12. 21
      dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-webexteams/src/test/resources/logback.xml
  13. 21
      dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-wechat/src/test/resources/logback.xml
  14. 21
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/resources/logback.xml
  15. 11
      dolphinscheduler-api/src/test/resources/application.yaml
  16. 0
      dolphinscheduler-api/src/test/resources/logback.xml
  17. 8
      dolphinscheduler-bom/pom.xml
  18. 21
      dolphinscheduler-common/src/test/resources/logback.xml
  19. 21
      dolphinscheduler-data-quality/src/test/resources/logback.xml
  20. 45
      dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java
  21. 25
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/pom.xml
  22. 55
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java
  23. 2
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryProperties.java
  24. 2
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManagerTest.java
  25. 143
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryTest.java
  26. 70
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryTestCase.java
  27. 20
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/resources/application.yaml
  28. 21
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/resources/logback.xml
  29. 60
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/pom.xml
  30. 290
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java
  31. 25
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/pom.xml
  32. 10
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/EphemeralDateManager.java
  33. 54
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcOperator.java
  34. 38
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java
  35. 12
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryAutoConfiguration.java
  36. 6
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConstant.java
  37. 34
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/LockUtils.java
  38. 57
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/RegistryLockManager.java
  39. 18
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/SubscribeDataManager.java
  40. 3
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryDataMapper.java
  41. 2
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryLockMapper.java
  42. 4
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/resources/mysql_registry_init.sql
  43. 41
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryTestCase.java
  44. 103
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/MysqlJdbcRegistryTestCase.java
  45. 98
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/PostgresqlJdbcRegistryTestCase.java
  46. 31
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/resources/application-mysql.yaml
  47. 28
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/resources/application-postgresql.yaml
  48. 21
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/resources/logback.xml
  49. 37
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/pom.xml
  50. 8
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java
  51. 96
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java
  52. 4
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryAutoConfiguration.java
  53. 67
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryProperties.java
  54. 131
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java
  55. 71
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTestCase.java
  56. 30
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/resources/application.yaml
  57. 21
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/resources/logback.xml
  58. 1
      dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
  59. 21
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-abs/src/test/resources/logback.xml
  60. 21
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/test/resources/logback.xml
  61. 21
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/test/resources/logback.xml
  62. 21
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/test/resources/logback.xml
  63. 21
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/resources/logback.xml
  64. 21
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/test/resources/logback.xml
  65. 24
      pom.xml

16
.github/workflows/unit-test.yml

@ -76,7 +76,7 @@ jobs:
restore-keys: ${{ runner.os }}-maven-
- name: Run Unit tests
run: ./mvnw clean verify -B -Dmaven.test.skip=false -Dspotless.skip=true -DskipUT=false -DskipIT=false
run: ./mvnw clean verify -B -Dmaven.test.skip=false -Dspotless.skip=true -DskipUT=false
- name: Upload coverage report to codecov
run: CODECOV_TOKEN="09c2663f-b091-4258-8a47-c981827eb29a" bash <(curl -s https://codecov.io/bash)
@ -99,23 +99,11 @@ jobs:
-Dsonar.login=e4058004bc6be89decf558ac819aa1ecbee57682
-Dsonar.exclusions=,dolphinscheduler-ui/src/**/i18n/locale/*.js,dolphinscheduler-microbench/src/**/*
-Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
-DskipUT=true -DskipIT=true
-DskipUT=true
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
- name: Collect logs
continue-on-error: true
run: |
mkdir -p ${LOG_DIR}
docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml logs dolphinscheduler-postgresql > ${LOG_DIR}/db.txt
- name: Upload logs
uses: actions/upload-artifact@v2
continue-on-error: true
with:
name: unit-test-logs
path: ${LOG_DIR}
result:
name: Unit Test
runs-on: ubuntu-latest

21
dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-aliyunVoice/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration scan="true" scanPeriod="120 seconds">
<logger name="*" level="ERROR"/>
</configuration>

21
dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-dingtalk/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration scan="true" scanPeriod="120 seconds">
<logger name="*" level="ERROR"/>
</configuration>

21
dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-email/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration scan="true" scanPeriod="120 seconds">
<logger name="*" level="ERROR"/>
</configuration>

21
dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-feishu/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration scan="true" scanPeriod="120 seconds">
<logger name="*" level="ERROR"/>
</configuration>

21
dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-http/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration scan="true" scanPeriod="120 seconds">
<logger name="*" level="ERROR"/>
</configuration>

21
dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-pagerduty/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration scan="true" scanPeriod="120 seconds">
<logger name="*" level="ERROR"/>
</configuration>

21
dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-prometheus/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration scan="true" scanPeriod="120 seconds">
<logger name="*" level="ERROR"/>
</configuration>

21
dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-script/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration scan="true" scanPeriod="120 seconds">
<logger name="*" level="ERROR"/>
</configuration>

21
dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-slack/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration scan="true" scanPeriod="120 seconds">
<logger name="*" level="ERROR"/>
</configuration>

21
dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-telegram/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration scan="true" scanPeriod="120 seconds">
<logger name="*" level="ERROR"/>
</configuration>

21
dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-webexteams/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration scan="true" scanPeriod="120 seconds">
<logger name="*" level="ERROR"/>
</configuration>

21
dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-wechat/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration scan="true" scanPeriod="120 seconds">
<logger name="*" level="ERROR"/>
</configuration>

21
dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration scan="true" scanPeriod="120 seconds">
<logger name="*" level="ERROR"/>
</configuration>

11
dolphinscheduler-api/src/test/resources/application.yaml

@ -44,6 +44,17 @@ mybatis-plus:
registry:
type: zookeeper
zookeeper:
namespace: dolphinscheduler
connect-string: localhost:2181
retry-policy:
base-sleep-time: 60ms
max-sleep: 300ms
max-retries: 5
session-timeout: 30s
connection-timeout: 9s
block-until-connected: 600ms
digest: ~
api:
audit-enable: true

0
dolphinscheduler-api/src/test/resources/logback-spring.xml → dolphinscheduler-api/src/test/resources/logback.xml

8
dolphinscheduler-bom/pom.xml

@ -37,6 +37,7 @@
<druid.version>1.2.20</druid.version>
<curator-test.version>2.12.0</curator-test.version>
<jetcd.version>0.5.11</jetcd.version>
<!-- todo: Should upgrade the jetcd.test, 0.7.1 doesn't support arm -->
<jetcd.test.version>0.7.1</jetcd.test.version>
<io.grpc.version>1.41.0</io.grpc.version>
<commons-codec.version>1.11</commons-codec.version>
@ -943,6 +944,13 @@
</dependency>
<!-- test dependencies on TestContainers -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>

21
dolphinscheduler-common/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration scan="true" scanPeriod="120 seconds">
<logger name="*" level="ERROR"/>
</configuration>

21
dolphinscheduler-data-quality/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration scan="true" scanPeriod="120 seconds">
<logger name="*" level="ERROR"/>
</configuration>

45
dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Registry.java

@ -26,10 +26,20 @@ import java.util.Collection;
import lombok.NonNull;
/**
* Registry
* The SPI interface for registry center, each registry plugin should implement this interface.
*/
public interface Registry extends Closeable {
/**
* Start the registry, once started, the registry will connect to the registry center.
*/
void start();
/**
* Whether the registry is connected
*
* @return true if connected, false otherwise.
*/
boolean isConnected();
/**
@ -40,7 +50,13 @@ public interface Registry extends Closeable {
*/
void connectUntilTimeout(@NonNull Duration timeout) throws RegistryException;
boolean subscribe(String path, SubscribeListener listener);
/**
* Subscribe the path, when the path has expose {@link Event}, the listener will be triggered.
*
* @param path the path to subscribe
* @param listener the listener to be triggered
*/
void subscribe(String path, SubscribeListener listener);
/**
* Remove the path from the subscribe list.
@ -53,35 +69,34 @@ public interface Registry extends Closeable {
void addConnectionStateListener(ConnectionListener listener);
/**
* @return the value
* Get the value of the key, if key not exist will throw {@link RegistryException}
*/
String get(String key);
String get(String key) throws RegistryException;
/**
* @param key
* @param value
* Put the key-value pair into the registry
*
* @param key the key, cannot be null
* @param value the value, cannot be null
* @param deleteOnDisconnect if true, when the connection state is disconnected, the key will be deleted
*/
void put(String key, String value, boolean deleteOnDisconnect);
/**
* This function will delete the keys whose prefix is {@param key}
*
* @param key the prefix of deleted key
* @throws if the key not exists, there is a registryException
* Delete the key from the registry
*/
void delete(String key);
/**
* @return {@code true} if key exists.
* E.g: registry contains the following keys:[/test/test1/test2,]
* if the key: /test
* Return: test1
* Return the children of the key
*/
Collection<String> children(String key);
/**
* @return if key exists,return true
* Check if the key exists
*
* @param key the key to check
* @return true if the key exists
*/
boolean exists(String key);

25
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/pom.xml

@ -31,6 +31,15 @@
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-it</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
@ -49,18 +58,22 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

55
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java

@ -71,6 +71,7 @@ import io.netty.handler.ssl.SslContext;
@Slf4j
public class EtcdRegistry implements Registry {
private final EtcdRegistryProperties etcdRegistryProperties;
private final Client client;
private EtcdConnectionStateListener etcdConnectionStateListener;
@ -83,9 +84,8 @@ public class EtcdRegistry implements Registry {
private final Map<String, Watch.Watcher> watcherMap = new ConcurrentHashMap<>();
private static final long TIME_TO_LIVE_SECONDS = 30L;
public EtcdRegistry(EtcdRegistryProperties registryProperties) throws SSLException {
this.etcdRegistryProperties = registryProperties;
ClientBuilder clientBuilder = Client.builder()
.endpoints(Util.toURIs(Splitter.on(",").trimResults().splitToList(registryProperties.getEndpoints())))
.namespace(byteSequence(registryProperties.getNamespace()))
@ -129,6 +129,11 @@ public class EtcdRegistry implements Registry {
}
@Override
public void start() {
// The start has been set in the constructor
}
@Override
public boolean isConnected() {
return client.getKVClient().get(byteSequence("/")).join() != null;
@ -145,7 +150,7 @@ public class EtcdRegistry implements Registry {
* @return if subcribe Returns true if no exception was thrown
*/
@Override
public boolean subscribe(String path, SubscribeListener listener) {
public void subscribe(String path, SubscribeListener listener) {
try {
ByteSequence watchKey = byteSequence(path);
WatchOption watchOption =
@ -159,7 +164,6 @@ public class EtcdRegistry implements Registry {
} catch (Exception e) {
throw new RegistryException("Failed to subscribe listener for key: " + path, e);
}
return true;
}
/**
@ -193,7 +197,7 @@ public class EtcdRegistry implements Registry {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RegistryException("etcd get data error", e);
} catch (ExecutionException e) {
} catch (Exception e) {
throw new RegistryException("etcd get data error, key = " + key, e);
}
}
@ -206,7 +210,8 @@ public class EtcdRegistry implements Registry {
try {
if (deleteOnDisconnect) {
// keep the key by lease, if disconnected, the lease will expire and the key will delete
long leaseId = etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease(key, TIME_TO_LIVE_SECONDS);
long leaseId = etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease(key,
etcdRegistryProperties.getTtl().get(ChronoUnit.SECONDS));
PutOption putOption = PutOption.newBuilder().withLeaseId(leaseId).build();
client.getKVClient().put(byteSequence(key), byteSequence(value), putOption).get();
} else {
@ -289,47 +294,59 @@ public class EtcdRegistry implements Registry {
*/
@Override
public boolean acquireLock(String key) {
Map<String, Long> leaseIdMap = threadLocalLockMap.get();
if (null == leaseIdMap) {
leaseIdMap = new HashMap<>();
threadLocalLockMap.set(leaseIdMap);
}
if (leaseIdMap.containsKey(key)) {
return true;
}
Lock lockClient = client.getLockClient();
Lease leaseClient = client.getLeaseClient();
// get the lock with a lease
try {
long leaseId = leaseClient.grant(TIME_TO_LIVE_SECONDS).get().getID();
long leaseId = leaseClient.grant(etcdRegistryProperties.getTtl().get(ChronoUnit.SECONDS)).get().getID();
// keep the lease
client.getLeaseClient().keepAlive(leaseId, Observers.observer(response -> {
}));
lockClient.lock(byteSequence(key), leaseId).get();
// save the leaseId for release Lock
if (null == threadLocalLockMap.get()) {
threadLocalLockMap.set(new HashMap<>());
}
threadLocalLockMap.get().put(key, leaseId);
leaseIdMap.put(key, leaseId);
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RegistryException("etcd get lock error", e);
} catch (ExecutionException e) {
} catch (Exception e) {
throw new RegistryException("etcd get lock error, lockKey: " + key, e);
}
}
@Override
public boolean acquireLock(String key, long timeout) {
Map<String, Long> leaseIdMap = threadLocalLockMap.get();
if (null == leaseIdMap) {
leaseIdMap = new HashMap<>();
threadLocalLockMap.set(leaseIdMap);
}
if (leaseIdMap.containsKey(key)) {
return true;
}
Lock lockClient = client.getLockClient();
Lease leaseClient = client.getLeaseClient();
// get the lock with a lease
try {
long leaseId = leaseClient.grant(TIME_TO_LIVE_SECONDS).get().getID();
long leaseId = leaseClient.grant(etcdRegistryProperties.getTtl().get(ChronoUnit.SECONDS)).get().getID();
// keep the lease
lockClient.lock(byteSequence(key), leaseId).get(timeout, TimeUnit.MICROSECONDS);
lockClient.lock(byteSequence(key), leaseId).get(timeout, TimeUnit.MILLISECONDS);
client.getLeaseClient().keepAlive(leaseId, Observers.observer(response -> {
}));
// save the leaseId for release Lock
if (null == threadLocalLockMap.get()) {
threadLocalLockMap.set(new HashMap<>());
}
threadLocalLockMap.get().put(key, leaseId);
leaseIdMap.put(key, leaseId);
return true;
} catch (TimeoutException timeoutException) {
log.debug("Acquire lock: {} in {}/ms timeout", key, timeout);
@ -337,7 +354,7 @@ public class EtcdRegistry implements Registry {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RegistryException("etcd get lock error", e);
} catch (ExecutionException e) {
} catch (Exception e) {
throw new RegistryException("etcd get lock error, lockKey: " + key, e);
}
}

2
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryProperties.java

@ -33,6 +33,8 @@ public class EtcdRegistryProperties {
private String namespace = "dolphinscheduler";
private Duration connectionTimeout = Duration.ofSeconds(9);
private Duration ttl = Duration.ofSeconds(30);
// auth
private String user;
private String password;

2
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdKeepAliveLeaseManagerTest.java

@ -43,7 +43,7 @@ class EtcdKeepAliveLeaseManagerTest {
.withNodes(1)
.withImage("ibmcom/etcd:3.2.24")
.build();
server.restart();
server.cluster().start();
client = Client.builder().endpoints(server.clientEndpoints()).build();

143
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryTest.java

@ -1,143 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.etcd;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.etcd.jetcd.test.EtcdClusterExtension;
public class EtcdRegistryTest {
private static final Logger logger = LoggerFactory.getLogger(EtcdRegistryTest.class);
public static EtcdRegistry registry;
@BeforeAll
public static void before() throws Exception {
EtcdClusterExtension server = EtcdClusterExtension.builder()
.withNodes(1)
.withImage("ibmcom/etcd:3.2.24")
.build();
EtcdRegistryProperties properties = new EtcdRegistryProperties();
server.restart();
properties.setEndpoints(String.valueOf(server.clientEndpoints().get(0)));
registry = new EtcdRegistry(properties);
registry.put("/sub", "sub", false);
}
@Test
public void persistTest() {
registry.put("/nodes/m1", "", false);
registry.put("/nodes/m2", "", false);
Assertions.assertEquals(Arrays.asList("m1", "m2"), registry.children("/nodes"));
Assertions.assertTrue(registry.exists("/nodes/m1"));
registry.delete("/nodes/m2");
Assertions.assertFalse(registry.exists("/nodes/m2"));
registry.delete("/nodes");
Assertions.assertFalse(registry.exists("/nodes/m1"));
}
@Test
public void lockTest() {
CountDownLatch preCountDownLatch = new CountDownLatch(1);
CountDownLatch allCountDownLatch = new CountDownLatch(2);
List<String> testData = new ArrayList<>();
new Thread(() -> {
registry.acquireLock("/lock");
preCountDownLatch.countDown();
logger.info(Thread.currentThread().getName()
+ " :I got the lock, but I don't want to work. I want to rest for a while");
try {
Thread.sleep(1000);
logger.info(Thread.currentThread().getName() + " :I'm going to start working");
testData.add("thread1");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
logger.info(Thread.currentThread().getName() + " :I have finished my work, now I release the lock");
registry.releaseLock("/lock");
allCountDownLatch.countDown();
}
}).start();
try {
preCountDownLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
new Thread(() -> {
try {
logger.info(Thread.currentThread().getName() + " :I am trying to acquire the lock");
registry.acquireLock("/lock");
logger.info(Thread.currentThread().getName() + " :I got the lock and I started working");
testData.add("thread2");
} finally {
registry.releaseLock("/lock");
allCountDownLatch.countDown();
}
}).start();
try {
allCountDownLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Assertions.assertEquals(testData, Arrays.asList("thread1", "thread2"));
}
@Test
public void subscribeTest() {
boolean status = registry.subscribe("/sub", new TestListener());
// The following add and delete operations are used for debugging
registry.put("/sub/m1", "tt", false);
registry.put("/sub/m2", "tt", false);
registry.delete("/sub/m2");
registry.delete("/sub");
Assertions.assertTrue(status);
}
static class TestListener implements SubscribeListener {
@Override
public void notify(Event event) {
logger.info("I'm test listener");
}
}
@AfterAll
public static void after() throws IOException {
registry.close();
}
}

70
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistryTestCase.java

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.etcd;
import org.apache.dolphinscheduler.plugin.registry.RegistryTestCase;
import java.net.URI;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import io.etcd.jetcd.launcher.EtcdCluster;
import io.etcd.jetcd.test.EtcdClusterExtension;
@SpringBootTest(classes = EtcdRegistryProperties.class)
@SpringBootApplication(scanBasePackageClasses = EtcdRegistryProperties.class)
public class EtcdRegistryTestCase extends RegistryTestCase<EtcdRegistry> {
@Autowired
private EtcdRegistryProperties etcdRegistryProperties;
private static EtcdCluster etcdCluster;
@SneakyThrows
@BeforeAll
public static void setUpTestingServer() {
etcdCluster = EtcdClusterExtension.builder()
.withNodes(1)
.withImage("ibmcom/etcd:3.2.24")
.build()
.cluster();
etcdCluster.start();
System.setProperty("registry.endpoints",
etcdCluster.clientEndpoints().stream().map(URI::toString).collect(Collectors.joining(",")));
}
@SneakyThrows
@Override
public EtcdRegistry createRegistry() {
return new EtcdRegistry(etcdRegistryProperties);
}
@SneakyThrows
@AfterAll
public static void tearDownTestingServer() {
try (EtcdCluster cluster = etcdCluster) {
}
}
}

20
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/resources/application.yaml

@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
registry:
type: etcd
ttl: 2s

21
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<logger name="*" level="ERROR"/>
</configuration>

60
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/pom.xml

@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-plugins</artifactId>
<version>dev-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-registry-it</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<skip>false</skip>
</configuration>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

290
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java

@ -0,0 +1,290 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertThrows;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import lombok.SneakyThrows;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import com.google.common.truth.Truth;
public abstract class RegistryTestCase<R extends Registry> {
protected R registry;
@BeforeEach
public void setupRegistry() {
registry = createRegistry();
}
@SneakyThrows
@AfterEach
public void tearDownRegistry() {
try (R registry = this.registry) {
}
}
@Test
public void testIsConnected() {
registry.start();
Truth.assertThat(registry.isConnected()).isTrue();
}
@Test
public void testConnectUntilTimeout() {
registry.start();
await().atMost(Duration.ofSeconds(10))
.untilAsserted(() -> registry.connectUntilTimeout(Duration.ofSeconds(3)));
}
@SneakyThrows
@Test
public void testSubscribe() {
registry.start();
final AtomicBoolean subscribeAdded = new AtomicBoolean(false);
final AtomicBoolean subscribeRemoved = new AtomicBoolean(false);
final AtomicBoolean subscribeUpdated = new AtomicBoolean(false);
SubscribeListener subscribeListener = event -> {
System.out.println("Receive event: " + event);
if (event.type() == Event.Type.ADD) {
subscribeAdded.compareAndSet(false, true);
}
if (event.type() == Event.Type.REMOVE) {
subscribeRemoved.compareAndSet(false, true);
}
if (event.type() == Event.Type.UPDATE) {
subscribeUpdated.compareAndSet(false, true);
}
};
String key = "/nodes/master" + System.nanoTime();
registry.subscribe(key, subscribeListener);
registry.put(key, String.valueOf(System.nanoTime()), true);
// Sleep 3 seconds here since in mysql jdbc registry
// If multiple event occurs in a refresh time, only the last event will be triggered
Thread.sleep(3000);
registry.put(key, String.valueOf(System.nanoTime()), true);
Thread.sleep(3000);
registry.delete(key);
await().atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
Assertions.assertTrue(subscribeAdded.get());
Assertions.assertTrue(subscribeUpdated.get());
Assertions.assertTrue(subscribeRemoved.get());
});
}
@SneakyThrows
@Test
public void testUnsubscribe() {
registry.start();
final AtomicBoolean subscribeAdded = new AtomicBoolean(false);
final AtomicBoolean subscribeRemoved = new AtomicBoolean(false);
final AtomicBoolean subscribeUpdated = new AtomicBoolean(false);
SubscribeListener subscribeListener = event -> {
if (event.type() == Event.Type.ADD) {
subscribeAdded.compareAndSet(false, true);
}
if (event.type() == Event.Type.REMOVE) {
subscribeRemoved.compareAndSet(false, true);
}
if (event.type() == Event.Type.UPDATE) {
subscribeUpdated.compareAndSet(false, true);
}
};
String key = "/nodes/master" + System.nanoTime();
String value = "127.0.0.1:8080";
registry.subscribe(key, subscribeListener);
registry.unsubscribe(key);
registry.put(key, value, true);
registry.put(key, value, true);
registry.delete(key);
Thread.sleep(2000);
Assertions.assertFalse(subscribeAdded.get());
Assertions.assertFalse(subscribeRemoved.get());
Assertions.assertFalse(subscribeUpdated.get());
}
@SneakyThrows
@Test
public void testAddConnectionStateListener() {
AtomicReference<ConnectionState> connectionState = new AtomicReference<>();
registry.addConnectionStateListener(connectionState::set);
Truth.assertThat(connectionState.get()).isNull();
registry.start();
await().atMost(Duration.ofSeconds(2))
.until(() -> ConnectionState.CONNECTED == connectionState.get());
}
@Test
public void testGet() {
registry.start();
String key = "/nodes/master" + System.nanoTime();
String value = "127.0.0.1:8080";
assertThrows(RegistryException.class, () -> registry.get(key));
registry.put(key, value, true);
Truth.assertThat(registry.get(key)).isEqualTo(value);
}
@Test
public void testPut() {
registry.start();
String key = "/nodes/master" + System.nanoTime();
String value = "127.0.0.1:8080";
registry.put(key, value, true);
Truth.assertThat(registry.get(key)).isEqualTo(value);
// Update the value
registry.put(key, "123", true);
Truth.assertThat(registry.get(key)).isEqualTo("123");
}
@Test
public void testDelete() {
registry.start();
String key = "/nodes/master" + System.nanoTime();
String value = "127.0.0.1:8080";
// Delete a non-existent key
registry.delete(key);
registry.put(key, value, true);
Truth.assertThat(registry.get(key)).isEqualTo(value);
registry.delete(key);
Truth.assertThat(registry.exists(key)).isFalse();
}
@Test
public void testChildren() {
registry.start();
String master1 = "/nodes/children/127.0.0.1:8080";
String master2 = "/nodes/children/127.0.0.2:8080";
String value = "123";
registry.put(master1, value, true);
registry.put(master2, value, true);
Truth.assertThat(registry.children("/nodes/children"))
.containsAtLeastElementsIn(Lists.newArrayList("127.0.0.1:8080", "127.0.0.2:8080"));
}
@Test
public void testExists() {
registry.start();
String key = "/nodes/master" + System.nanoTime();
String value = "123";
Truth.assertThat(registry.exists(key)).isFalse();
registry.put(key, value, true);
Truth.assertThat(registry.exists(key)).isTrue();
}
@SneakyThrows
@Test
public void testAcquireLock() {
registry.start();
String lockKey = "/lock" + System.nanoTime();
// 1. Acquire the lock at the main thread
Truth.assertThat(registry.acquireLock(lockKey)).isTrue();
// Acquire the lock at the main thread again
// It should acquire success
Truth.assertThat(registry.acquireLock(lockKey)).isTrue();
// Acquire the lock at another thread
// It should acquire failed
CompletableFuture<Boolean> acquireResult = CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey));
assertThrows(TimeoutException.class, () -> acquireResult.get(3000, TimeUnit.MILLISECONDS));
}
@SneakyThrows
@Test
public void testAcquireLock_withTimeout() {
registry.start();
String lockKey = "/lock" + System.nanoTime();
// 1. Acquire the lock in the main thread
Truth.assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
// Acquire the lock in the main thread
// It should acquire success
Truth.assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
// Acquire the lock at another thread
// It should acquire failed
CompletableFuture<Boolean> acquireResult =
CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000));
Truth.assertThat(acquireResult.get()).isFalse();
}
@SneakyThrows
@Test
public void testReleaseLock() {
registry.start();
String lockKey = "/lock" + System.nanoTime();
// 1. Acquire the lock in the main thread
Truth.assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
// Acquire the lock at another thread
// It should acquire failed
CompletableFuture<Boolean> acquireResult =
CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000));
Truth.assertThat(acquireResult.get()).isFalse();
// 2. Release the lock in the main thread
Truth.assertThat(registry.releaseLock(lockKey)).isTrue();
// Acquire the lock at another thread
// It should acquire success
acquireResult = CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000));
Truth.assertThat(acquireResult.get()).isTrue();
}
public abstract R createRegistry();
}

25
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/pom.xml

@ -72,6 +72,31 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-it</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

10
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/EphemeralDateManager.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/EphemeralDateManager.java

@ -15,12 +15,10 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.jdbc.task;
package org.apache.dolphinscheduler.plugin.registry.jdbc;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcOperator;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
@ -42,7 +40,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* This thread is used to check the connect state to jdbc.
*/
@Slf4j
public class EphemeralDateManager implements AutoCloseable {
class EphemeralDateManager implements AutoCloseable {
private ConnectionState connectionState;
private final JdbcOperator jdbcOperator;
@ -51,7 +49,7 @@ public class EphemeralDateManager implements AutoCloseable {
private final Set<Long> ephemeralDateIds = Collections.synchronizedSet(new HashSet<>());
private final ScheduledExecutorService scheduledExecutorService;
public EphemeralDateManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) {
EphemeralDateManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) {
this.registryProperties = registryProperties;
this.jdbcOperator = checkNotNull(jdbcOperator);
this.scheduledExecutorService = Executors.newScheduledThreadPool(
@ -151,7 +149,7 @@ public class EphemeralDateManager implements AutoCloseable {
}
}
private void updateEphemeralDateTerm() throws SQLException {
private void updateEphemeralDateTerm() {
if (!jdbcOperator.updateEphemeralDataTerm(ephemeralDateIds)) {
log.warn("Update jdbc registry ephemeral data: {} term error", ephemeralDateIds);
}

54
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcOperator.java

@ -29,26 +29,25 @@ import org.apache.commons.lang3.StringUtils;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.springframework.dao.DuplicateKeyException;
@Component
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "jdbc")
public class JdbcOperator {
public final class JdbcOperator {
@Autowired
private JdbcRegistryDataMapper jdbcRegistryDataMapper;
@Autowired
private JdbcRegistryLockMapper jdbcRegistryLockMapper;
private final JdbcRegistryDataMapper jdbcRegistryDataMapper;
private final JdbcRegistryLockMapper jdbcRegistryLockMapper;
private final long expireTimeWindow;
public JdbcOperator(JdbcRegistryProperties registryProperties) {
JdbcOperator(JdbcRegistryProperties registryProperties,
JdbcRegistryDataMapper jdbcRegistryDataMapper,
JdbcRegistryLockMapper jdbcRegistryLockMapper) {
this.expireTimeWindow =
registryProperties.getTermExpireTimes() * registryProperties.getTermRefreshInterval().toMillis();
this.jdbcRegistryDataMapper = jdbcRegistryDataMapper;
this.jdbcRegistryLockMapper = jdbcRegistryLockMapper;
}
public void healthCheck() {
@ -62,17 +61,21 @@ public class JdbcOperator {
public Long insertOrUpdateEphemeralData(String key, String value) throws SQLException {
JdbcRegistryData jdbcRegistryData = jdbcRegistryDataMapper.selectByKey(key);
if (jdbcRegistryData != null) {
long id = jdbcRegistryData.getId();
if (jdbcRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) {
jdbcRegistryData.setDataValue(value);
jdbcRegistryData.setLastUpdateTime(new Date());
jdbcRegistryData.setLastTerm(System.currentTimeMillis());
if (jdbcRegistryDataMapper.updateById(jdbcRegistryData) <= 0) {
throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value));
}
return id;
return jdbcRegistryData.getId();
}
jdbcRegistryData = JdbcRegistryData.builder()
.dataKey(key)
.dataValue(value)
.dataType(DataType.EPHEMERAL.getTypeValue())
.lastTerm(System.currentTimeMillis())
.lastUpdateTime(new Date())
.createTime(new Date())
.build();
jdbcRegistryDataMapper.insert(jdbcRegistryData);
return jdbcRegistryData.getId();
@ -81,17 +84,21 @@ public class JdbcOperator {
public long insertOrUpdatePersistentData(String key, String value) throws SQLException {
JdbcRegistryData jdbcRegistryData = jdbcRegistryDataMapper.selectByKey(key);
if (jdbcRegistryData != null) {
long id = jdbcRegistryData.getId();
if (jdbcRegistryDataMapper.updateDataAndTermById(id, value, System.currentTimeMillis()) <= 0) {
jdbcRegistryData.setDataValue(value);
jdbcRegistryData.setLastUpdateTime(new Date());
jdbcRegistryData.setLastTerm(System.currentTimeMillis());
if (jdbcRegistryDataMapper.updateById(jdbcRegistryData) <= 0) {
throw new SQLException(String.format("update registry value failed, key: %s, value: %s", key, value));
}
return id;
return jdbcRegistryData.getId();
}
jdbcRegistryData = JdbcRegistryData.builder()
.dataKey(key)
.dataValue(value)
.dataType(DataType.PERSISTENT.getTypeValue())
.lastTerm(System.currentTimeMillis())
.lastUpdateTime(new Date())
.createTime(new Date())
.build();
jdbcRegistryDataMapper.insert(jdbcRegistryData);
return jdbcRegistryData.getId();
@ -127,7 +134,7 @@ public class JdbcOperator {
.collect(Collectors.toList());
}
public boolean existKey(String key) throws SQLException {
public boolean existKey(String key) {
JdbcRegistryData jdbcRegistryData = jdbcRegistryDataMapper.selectByKey(key);
return jdbcRegistryData != null;
}
@ -136,24 +143,25 @@ public class JdbcOperator {
* Try to acquire the target Lock, if cannot acquire, return null.
*/
@SuppressWarnings("checkstyle:IllegalCatch")
public JdbcRegistryLock tryToAcquireLock(String key) throws SQLException {
public JdbcRegistryLock tryToAcquireLock(String key) {
JdbcRegistryLock jdbcRegistryLock = JdbcRegistryLock.builder()
.lockKey(key)
.lockOwner(JdbcRegistryConstant.LOCK_OWNER)
.lockOwner(LockUtils.getLockOwner())
.lastTerm(System.currentTimeMillis())
.lastUpdateTime(new Date())
.build();
try {
jdbcRegistryLockMapper.insert(jdbcRegistryLock);
return jdbcRegistryLock;
} catch (Exception e) {
if (e instanceof SQLIntegrityConstraintViolationException) {
if (e instanceof SQLIntegrityConstraintViolationException || e instanceof DuplicateKeyException) {
return null;
}
throw e;
}
}
public JdbcRegistryLock getLockById(long lockId) throws SQLException {
public JdbcRegistryLock getLockById(long lockId) {
return jdbcRegistryLockMapper.selectById(lockId);
}
@ -161,7 +169,7 @@ public class JdbcOperator {
return jdbcRegistryLockMapper.deleteById(lockId) > 0;
}
public boolean updateEphemeralDataTerm(Collection<Long> ephemeralDateIds) throws SQLException {
public boolean updateEphemeralDataTerm(Collection<Long> ephemeralDateIds) {
if (CollectionUtils.isEmpty(ephemeralDateIds)) {
return true;
}

38
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java

@ -17,9 +17,7 @@
package org.apache.dolphinscheduler.plugin.registry.jdbc;
import org.apache.dolphinscheduler.plugin.registry.jdbc.task.EphemeralDateManager;
import org.apache.dolphinscheduler.plugin.registry.jdbc.task.RegistryLockManager;
import org.apache.dolphinscheduler.plugin.registry.jdbc.task.SubscribeDataManager;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryData;
import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.registry.api.Registry;
@ -30,31 +28,24 @@ import java.sql.SQLException;
import java.time.Duration;
import java.util.Collection;
import javax.annotation.PostConstruct;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* This is one of the implementation of {@link Registry}, with this implementation, you need to rely on mysql database to
* store the DolphinScheduler master/worker's metadata and do the server registry/unRegistry.
*/
@Component
@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "jdbc")
@Slf4j
public class JdbcRegistry implements Registry {
public final class JdbcRegistry implements Registry {
private final JdbcRegistryProperties jdbcRegistryProperties;
private final EphemeralDateManager ephemeralDateManager;
private final SubscribeDataManager subscribeDataManager;
private final RegistryLockManager registryLockManager;
private JdbcOperator jdbcOperator;
private final JdbcOperator jdbcOperator;
public JdbcRegistry(JdbcRegistryProperties jdbcRegistryProperties,
JdbcOperator jdbcOperator) {
JdbcRegistry(JdbcRegistryProperties jdbcRegistryProperties,
JdbcOperator jdbcOperator) {
this.jdbcOperator = jdbcOperator;
jdbcOperator.clearExpireLock();
jdbcOperator.clearExpireEphemeralDate();
@ -65,7 +56,7 @@ public class JdbcRegistry implements Registry {
log.info("Initialize Jdbc Registry...");
}
@PostConstruct
@Override
public void start() {
log.info("Starting Jdbc Registry...");
// start a jdbc connect check
@ -103,10 +94,9 @@ public class JdbcRegistry implements Registry {
}
@Override
public boolean subscribe(String path, SubscribeListener listener) {
public void subscribe(String path, SubscribeListener listener) {
// new a schedule thread to query the path, if the path
subscribeDataManager.addListener(path, listener);
return true;
}
@Override
@ -122,8 +112,18 @@ public class JdbcRegistry implements Registry {
@Override
public String get(String key) {
// get the key value
return subscribeDataManager.getData(key);
try {
// get the key value
JdbcRegistryData data = jdbcOperator.getData(key);
if (data == null) {
throw new RegistryException("key: " + key + " not exist");
}
return data.getDataValue();
} catch (RegistryException registryException) {
throw registryException;
} catch (Exception e) {
throw new RegistryException(String.format("Get key: %s error", key), e);
}
}
@Override

12
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryAutoConfiguration.java

@ -49,6 +49,18 @@ public class JdbcRegistryAutoConfiguration {
log.info("Load JdbcRegistryAutoConfiguration");
}
@Bean
public JdbcOperator jdbcOperator(JdbcRegistryProperties jdbcRegistryProperties,
JdbcRegistryDataMapper jdbcRegistryDataMapper,
JdbcRegistryLockMapper jdbcRegistryLockMapper) {
return new JdbcOperator(jdbcRegistryProperties, jdbcRegistryDataMapper, jdbcRegistryLockMapper);
}
@Bean
public JdbcRegistry jdbcRegistry(JdbcRegistryProperties jdbcRegistryProperties, JdbcOperator jdbcOperator) {
return new JdbcRegistry(jdbcRegistryProperties, jdbcOperator);
}
@Bean
@ConditionalOnMissingBean
public SqlSessionFactory sqlSessionFactory(JdbcRegistryProperties jdbcRegistryProperties) throws Exception {

6
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryConstant.java

@ -17,15 +17,11 @@
package org.apache.dolphinscheduler.plugin.registry.jdbc;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import lombok.experimental.UtilityClass;
@UtilityClass
public final class JdbcRegistryConstant {
final class JdbcRegistryConstant {
public static final long LOCK_ACQUIRE_INTERVAL = 1_000;
public static final String LOCK_OWNER = NetUtils.getHost() + "_" + OSUtils.getProcessID();
}

34
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/LockUtils.java

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.jdbc;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import lombok.experimental.UtilityClass;
@UtilityClass
public class LockUtils {
private static final String LOCK_OWNER_PREFIX = NetUtils.getHost() + "_" + OSUtils.getProcessID() + "_";
public static String getLockOwner() {
return LOCK_OWNER_PREFIX + Thread.currentThread().getName();
}
}

57
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/RegistryLockManager.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/RegistryLockManager.java

@ -15,12 +15,9 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.jdbc.task;
package org.apache.dolphinscheduler.plugin.registry.jdbc;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcOperator;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryConstant;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryLock;
import org.apache.dolphinscheduler.registry.api.RegistryException;
@ -40,14 +37,15 @@ import lombok.extern.slf4j.Slf4j;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@Slf4j
public class RegistryLockManager implements AutoCloseable {
class RegistryLockManager implements AutoCloseable {
private final JdbcOperator jdbcOperator;
private final JdbcRegistryProperties registryProperties;
// lock owner -> lock
private final Map<String, JdbcRegistryLock> lockHoldMap;
private final ScheduledExecutorService lockTermUpdateThreadPool;
public RegistryLockManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) {
RegistryLockManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) {
this.registryProperties = registryProperties;
this.jdbcOperator = jdbcOperator;
this.lockHoldMap = new ConcurrentHashMap<>();
@ -67,20 +65,24 @@ public class RegistryLockManager implements AutoCloseable {
* Acquire the lock, if cannot get the lock will await.
*/
public void acquireLock(String lockKey) throws RegistryException {
// maybe we can use the computeIf absent
lockHoldMap.computeIfAbsent(lockKey, key -> {
JdbcRegistryLock jdbcRegistryLock;
try {
while ((jdbcRegistryLock = jdbcOperator.tryToAcquireLock(lockKey)) == null) {
log.debug("Acquire the lock {} failed try again", key);
// acquire failed, wait and try again
ThreadUtils.sleep(JdbcRegistryConstant.LOCK_ACQUIRE_INTERVAL);
try {
while (true) {
JdbcRegistryLock jdbcRegistryLock = lockHoldMap.get(lockKey);
if (jdbcRegistryLock != null && LockUtils.getLockOwner().equals(jdbcRegistryLock.getLockOwner())) {
return;
}
} catch (SQLException e) {
throw new RegistryException("Acquire the lock error", e);
jdbcRegistryLock = jdbcOperator.tryToAcquireLock(lockKey);
if (jdbcRegistryLock != null) {
lockHoldMap.put(lockKey, jdbcRegistryLock);
return;
}
log.debug("Acquire the lock {} failed try again", lockKey);
// acquire failed, wait and try again
ThreadUtils.sleep(JdbcRegistryConstant.LOCK_ACQUIRE_INTERVAL);
}
return jdbcRegistryLock;
});
} catch (Exception ex) {
throw new RegistryException("Acquire the lock: " + lockKey + " error", ex);
}
}
/**
@ -88,21 +90,22 @@ public class RegistryLockManager implements AutoCloseable {
*/
public boolean acquireLock(String lockKey, long timeout) throws RegistryException {
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < timeout) {
try {
if (lockHoldMap.containsKey(lockKey)) {
try {
while (System.currentTimeMillis() - startTime < timeout) {
JdbcRegistryLock jdbcRegistryLock = lockHoldMap.get(lockKey);
if (jdbcRegistryLock != null && LockUtils.getLockOwner().equals(jdbcRegistryLock.getLockOwner())) {
return true;
}
JdbcRegistryLock jdbcRegistryLock = jdbcOperator.tryToAcquireLock(lockKey);
jdbcRegistryLock = jdbcOperator.tryToAcquireLock(lockKey);
if (jdbcRegistryLock != null) {
lockHoldMap.put(lockKey, jdbcRegistryLock);
return true;
}
} catch (SQLException e) {
throw new RegistryException("Acquire the lock: " + lockKey + " error", e);
log.debug("Acquire the lock {} failed try again", lockKey);
ThreadUtils.sleep(JdbcRegistryConstant.LOCK_ACQUIRE_INTERVAL);
}
log.debug("Acquire the lock {} failed try again", lockKey);
ThreadUtils.sleep(JdbcRegistryConstant.LOCK_ACQUIRE_INTERVAL);
} catch (Exception e) {
throw new RegistryException("Acquire the lock: " + lockKey + " error", e);
}
return false;
}
@ -115,6 +118,7 @@ public class RegistryLockManager implements AutoCloseable {
jdbcOperator.releaseLock(jdbcRegistryLock.getId());
lockHoldMap.remove(lockKey);
} catch (SQLException e) {
lockHoldMap.remove(lockKey);
throw new RegistryException(String.format("Release lock: %s error", lockKey), e);
}
}
@ -149,7 +153,6 @@ public class RegistryLockManager implements AutoCloseable {
if (!jdbcOperator.updateLockTerm(lockIds)) {
log.warn("Update the lock: {} term failed.", lockIds);
}
jdbcOperator.clearExpireLock();
} catch (Exception e) {
log.error("Update lock term error", e);
}

18
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/task/SubscribeDataManager.java → dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/SubscribeDataManager.java

@ -15,10 +15,8 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.jdbc.task;
package org.apache.dolphinscheduler.plugin.registry.jdbc;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcOperator;
import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties;
import org.apache.dolphinscheduler.plugin.registry.jdbc.model.JdbcRegistryData;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
@ -42,7 +40,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* Used to refresh if the subscribe path has been changed.
*/
@Slf4j
public class SubscribeDataManager implements AutoCloseable {
class SubscribeDataManager implements AutoCloseable {
private final JdbcOperator jdbcOperator;
private final JdbcRegistryProperties registryProperties;
@ -50,7 +48,7 @@ public class SubscribeDataManager implements AutoCloseable {
private final ScheduledExecutorService dataSubscribeCheckThreadPool;
private final Map<String, JdbcRegistryData> jdbcRegistryDataMap = new ConcurrentHashMap<>();
public SubscribeDataManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) {
SubscribeDataManager(JdbcRegistryProperties registryProperties, JdbcOperator jdbcOperator) {
this.registryProperties = registryProperties;
this.jdbcOperator = jdbcOperator;
this.dataSubscribeCheckThreadPool = Executors.newScheduledThreadPool(
@ -75,12 +73,8 @@ public class SubscribeDataManager implements AutoCloseable {
dataSubScribeMap.remove(path);
}
public String getData(String path) {
JdbcRegistryData jdbcRegistryData = jdbcRegistryDataMap.get(path);
if (jdbcRegistryData == null) {
return null;
}
return jdbcRegistryData.getDataValue();
public JdbcRegistryData getData(String path) {
return jdbcRegistryDataMap.get(path);
}
@Override
@ -107,6 +101,7 @@ public class SubscribeDataManager implements AutoCloseable {
List<JdbcRegistryData> addedData = new ArrayList<>();
List<JdbcRegistryData> deletedData = new ArrayList<>();
List<JdbcRegistryData> updatedData = new ArrayList<>();
for (Map.Entry<String, JdbcRegistryData> entry : currentJdbcDataMap.entrySet()) {
JdbcRegistryData newData = entry.getValue();
JdbcRegistryData oldData = jdbcRegistryDataMap.get(entry.getKey());
@ -118,6 +113,7 @@ public class SubscribeDataManager implements AutoCloseable {
}
}
}
for (Map.Entry<String, JdbcRegistryData> entry : jdbcRegistryDataMap.entrySet()) {
if (!currentJdbcDataMap.containsKey(entry.getKey())) {
deletedData.add(entry.getValue());

3
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryDataMapper.java

@ -40,9 +40,6 @@ public interface JdbcRegistryDataMapper extends BaseMapper<JdbcRegistryData> {
@Select("select * from t_ds_jdbc_registry_data where data_key like CONCAT (#{key}, '%')")
List<JdbcRegistryData> fuzzyQueryByKey(@Param("key") String key);
@Update("update t_ds_jdbc_registry_data set data_value = #{data}, last_term = #{term} where id = #{id}")
int updateDataAndTermById(@Param("id") long id, @Param("data") String data, @Param("term") long term);
@Delete("delete from t_ds_jdbc_registry_data where data_key = #{key}")
void deleteByKey(@Param("key") String key);

2
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/mapper/JdbcRegistryLockMapper.java

@ -38,7 +38,7 @@ public interface JdbcRegistryLockMapper extends BaseMapper<JdbcRegistryLock> {
@Update({"<script>",
"update t_ds_jdbc_registry_lock",
"set last_term = #{term}",
"set last_term = #{term}, last_update_time = #{lastUpdateTime}",
"where id IN ",
"<foreach item='id' index='index' collection='ids' open='(' separator=',' close=')'>",
" #{id}",

4
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/resources/mysql_registry_init.sql

@ -24,7 +24,7 @@ CREATE TABLE `t_ds_jdbc_registry_data`
`data_value` text NOT NULL COMMENT 'data, like zookeeper node value',
`data_type` tinyint(4) NOT NULL COMMENT '1: ephemeral node, 2: persistent node',
`last_term` bigint NOT NULL COMMENT 'last term time',
`last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'last update time',
`last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'last update time',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
PRIMARY KEY (`id`),
unique (`data_key`)
@ -39,7 +39,7 @@ CREATE TABLE `t_ds_jdbc_registry_lock`
`lock_key` varchar(256) NOT NULL COMMENT 'lock path',
`lock_owner` varchar(256) NOT NULL COMMENT 'the lock owner, ip_processId',
`last_term` bigint NOT NULL COMMENT 'last term time',
`last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'last update time',
`last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'last update time',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
PRIMARY KEY (`id`),
unique (`lock_key`)

41
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryTestCase.java

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.jdbc;
import org.apache.dolphinscheduler.plugin.registry.RegistryTestCase;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest(classes = {JdbcRegistryProperties.class})
@SpringBootApplication(scanBasePackageClasses = JdbcRegistryProperties.class)
public abstract class JdbcRegistryTestCase extends RegistryTestCase<JdbcRegistry> {
@Autowired
private JdbcRegistryProperties jdbcRegistryProperties;
@Autowired
private JdbcOperator jdbcOperator;
@Override
public JdbcRegistry createRegistry() {
return new JdbcRegistry(jdbcRegistryProperties, jdbcOperator);
}
}

103
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/MysqlJdbcRegistryTestCase.java

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.test.context.ActiveProfiles;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import com.google.common.collect.Lists;
@ActiveProfiles("mysql")
class MysqlJdbcRegistryTestCase extends JdbcRegistryTestCase {
private static GenericContainer<?> mysqlContainer;
@SneakyThrows
@BeforeAll
public static void setUpTestingServer() {
mysqlContainer = new MySQLContainer(DockerImageName.parse("mysql:8.0"))
.withUsername("root")
.withPassword("root")
.withDatabaseName("dolphinscheduler")
.withNetwork(Network.newNetwork())
.withExposedPorts(3306)
.waitingFor(Wait.forHealthcheck());
mysqlContainer.setPortBindings(Lists.newArrayList("3306:3306"));
Startables.deepStart(Stream.of(mysqlContainer)).join();
try (
Connection connection = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/dolphinscheduler?useSSL=false&serverTimezone=UTC", "root", "root");
Statement statement = connection.createStatement();) {
statement.execute(
"CREATE TABLE `t_ds_jdbc_registry_data`\n" +
"(\n" +
" `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',\n" +
" `data_key` varchar(256) NOT NULL COMMENT 'key, like zookeeper node path',\n" +
" `data_value` text NOT NULL COMMENT 'data, like zookeeper node value',\n"
+
" `data_type` tinyint(4) NOT NULL COMMENT '1: ephemeral node, 2: persistent node',\n"
+
" `last_term` bigint NOT NULL COMMENT 'last term time',\n" +
" `last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'last update time',\n"
+
" `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',\n"
+
" PRIMARY KEY (`id`),\n" +
" unique (`data_key`)\n" +
") ENGINE = InnoDB\n" +
" DEFAULT CHARSET = utf8;");
statement.execute(
"CREATE TABLE `t_ds_jdbc_registry_lock`\n" +
"(\n" +
" `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key',\n" +
" `lock_key` varchar(256) NOT NULL COMMENT 'lock path',\n" +
" `lock_owner` varchar(256) NOT NULL COMMENT 'the lock owner, ip_processId',\n" +
" `last_term` bigint NOT NULL COMMENT 'last term time',\n" +
" `last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'last update time',\n"
+
" `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',\n"
+
" PRIMARY KEY (`id`),\n" +
" unique (`lock_key`)\n" +
") ENGINE = InnoDB\n" +
" DEFAULT CHARSET = utf8;");
}
}
@SneakyThrows
@AfterAll
public static void tearDownTestingServer() {
mysqlContainer.close();
}
}

98
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/PostgresqlJdbcRegistryTestCase.java

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import com.google.common.collect.Lists;
@ActiveProfiles("postgresql")
@SpringBootTest(classes = {JdbcRegistryProperties.class})
@SpringBootApplication(scanBasePackageClasses = JdbcRegistryProperties.class)
public class PostgresqlJdbcRegistryTestCase extends JdbcRegistryTestCase {
private static GenericContainer<?> postgresqlContainer;
@SneakyThrows
@BeforeAll
public static void setUpTestingServer() {
postgresqlContainer = new PostgreSQLContainer(DockerImageName.parse("postgres:16.0"))
.withUsername("root")
.withPassword("root")
.withDatabaseName("dolphinscheduler")
.withNetwork(Network.newNetwork())
.withExposedPorts(5432);
postgresqlContainer.setPortBindings(Lists.newArrayList("5432:5432"));
Startables.deepStart(Stream.of(postgresqlContainer)).join();
try (
Connection connection = DriverManager.getConnection("jdbc:postgresql://localhost:5432/dolphinscheduler",
"root", "root");
Statement statement = connection.createStatement();) {
statement.execute(
"create table t_ds_jdbc_registry_data\n" +
"(\n" +
" id serial\n" +
" constraint t_ds_jdbc_registry_data_pk primary key,\n" +
" data_key varchar not null,\n" +
" data_value text not null,\n" +
" data_type int4 not null,\n" +
" last_term bigint not null,\n" +
" last_update_time timestamp default current_timestamp not null,\n" +
" create_time timestamp default current_timestamp not null\n" +
");");
statement.execute(
"create unique index t_ds_jdbc_registry_data_key_uindex on t_ds_jdbc_registry_data (data_key);");
statement.execute(
"create table t_ds_jdbc_registry_lock\n" +
"(\n" +
" id serial\n" +
" constraint t_ds_jdbc_registry_lock_pk primary key,\n" +
" lock_key varchar not null,\n" +
" lock_owner varchar not null,\n" +
" last_term bigint not null,\n" +
" last_update_time timestamp default current_timestamp not null,\n" +
" create_time timestamp default current_timestamp not null\n" +
");");
statement.execute(
"create unique index t_ds_jdbc_registry_lock_key_uindex on t_ds_jdbc_registry_lock (lock_key);");
}
}
@SneakyThrows
@AfterAll
public static void tearDownTestingServer() {
postgresqlContainer.close();
}
}

31
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/resources/application-mysql.yaml

@ -0,0 +1,31 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
spring:
sql:
init:
schema-locations: classpath:mysql_registry_init.sql
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
username: root
password: root
registry:
type: jdbc
term-refresh-interval: 1s
term-expire-times: 1

28
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/resources/application-postgresql.yaml

@ -0,0 +1,28 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
spring:
datasource:
driver-class-name: org.postgresql.Driver
url: jdbc:postgresql://localhost:5432/dolphinscheduler
username: root
password: root
registry:
type: jdbc
term-refresh-interval: 1s
term-expire-times: 1

21
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<logger name="*" level="ERROR"/>
</configuration>

37
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/pom.xml

@ -26,12 +26,37 @@
<artifactId>dolphinscheduler-registry-zookeeper</artifactId>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-bom</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-registry-it</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
@ -41,6 +66,7 @@
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
@ -51,11 +77,6 @@
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
@ -70,5 +91,11 @@
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

8
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperConnectionStateListener.java

@ -26,11 +26,11 @@ import org.apache.curator.framework.state.ConnectionStateListener;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public final class ZookeeperConnectionStateListener implements ConnectionStateListener {
final class ZookeeperConnectionStateListener implements ConnectionStateListener {
private final ConnectionListener listener;
public ZookeeperConnectionStateListener(ConnectionListener listener) {
ZookeeperConnectionStateListener(ConnectionListener listener) {
this.listener = listener;
}
@ -38,6 +38,10 @@ public final class ZookeeperConnectionStateListener implements ConnectionStateLi
public void stateChanged(CuratorFramework client,
org.apache.curator.framework.state.ConnectionState newState) {
switch (newState) {
case CONNECTED:
log.info("Registry connected");
listener.onUpdate(ConnectionState.CONNECTED);
break;
case LOST:
log.warn("Registry disconnected");
listener.onUpdate(ConnectionState.DISCONNECTED);

96
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java

@ -45,17 +45,16 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import com.google.common.base.Strings;
@Slf4j
public final class ZookeeperRegistry implements Registry {
final class ZookeeperRegistry implements Registry {
private final ZookeeperRegistryProperties.ZookeeperProperties properties;
private final CuratorFramework client;
@ -64,7 +63,7 @@ public final class ZookeeperRegistry implements Registry {
private static final ThreadLocal<Map<String, InterProcessMutex>> threadLocalLockMap = new ThreadLocal<>();
public ZookeeperRegistry(ZookeeperRegistryProperties registryProperties) {
ZookeeperRegistry(ZookeeperRegistryProperties registryProperties) {
properties = registryProperties.getZookeeper();
final ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(
@ -82,28 +81,24 @@ public final class ZookeeperRegistry implements Registry {
final String digest = properties.getDigest();
if (!Strings.isNullOrEmpty(digest)) {
buildDigest(builder, digest);
builder.authorization("digest", digest.getBytes(StandardCharsets.UTF_8))
.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List<ACL> getAclForPath(final String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
}
client = builder.build();
}
private void buildDigest(CuratorFrameworkFactory.Builder builder, String digest) {
builder.authorization("digest", digest.getBytes(StandardCharsets.UTF_8))
.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List<ACL> getAclForPath(final String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
}
@PostConstruct
@Override
public void start() {
client.start();
try {
@ -127,19 +122,19 @@ public final class ZookeeperRegistry implements Registry {
try {
if (!client.blockUntilConnected((int) timeout.toMillis(), MILLISECONDS)) {
throw new RegistryException(
String.format("Cannot connect to the Zookeeper registry in %s s", timeout.getSeconds()));
String.format("Cannot connect to registry in %s s", timeout.getSeconds()));
}
} catch (RegistryException e) {
throw e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RegistryException(
String.format("Cannot connect to the Zookeeper registry in %s s", timeout.getSeconds()), e);
String.format("Cannot connect to registry in %s s", timeout.getSeconds()), e);
}
}
@Override
public boolean subscribe(String path, SubscribeListener listener) {
public void subscribe(String path, SubscribeListener listener) {
final TreeCache treeCache = treeCacheMap.computeIfAbsent(path, $ -> new TreeCache(client, path));
treeCache.getListenable().addListener(($, event) -> listener.notify(new EventAdaptor(event, path)));
try {
@ -148,7 +143,6 @@ public final class ZookeeperRegistry implements Registry {
treeCacheMap.remove(path);
throw new RegistryException("Failed to subscribe listener for key: " + path, e);
}
return true;
}
@Override
@ -215,22 +209,29 @@ public final class ZookeeperRegistry implements Registry {
@Override
public boolean acquireLock(String key) {
InterProcessMutex interProcessMutex = new InterProcessMutex(client, key);
Map<String, InterProcessMutex> processMutexMap = threadLocalLockMap.get();
if (null == processMutexMap) {
processMutexMap = new HashMap<>();
threadLocalLockMap.set(processMutexMap);
}
InterProcessMutex interProcessMutex = null;
try {
interProcessMutex =
Optional.ofNullable(processMutexMap.get(key)).orElse(new InterProcessMutex(client, key));
if (interProcessMutex.isAcquiredInThisProcess()) {
// Since etcd/jdbc cannot implement a reentrant lock, we need to check if the lock is already acquired
// If it is already acquired, return true directly
// This means you only need to release once when you acquire multiple times
return true;
}
Map<String, InterProcessMutex> processMutexMap = threadLocalLockMap.get();
if (null == processMutexMap) {
processMutexMap = new HashMap<>();
threadLocalLockMap.set(processMutexMap);
}
interProcessMutex.acquire();
processMutexMap.put(key, interProcessMutex);
return true;
} catch (Exception e) {
try {
interProcessMutex.release();
if (interProcessMutex != null) {
interProcessMutex.release();
}
throw new RegistryException(String.format("zookeeper get lock: %s error", key), e);
} catch (Exception exception) {
throw new RegistryException(String.format("zookeeper get lock: %s error", key), e);
@ -240,22 +241,28 @@ public final class ZookeeperRegistry implements Registry {
@Override
public boolean acquireLock(String key, long timeout) {
InterProcessMutex interProcessMutex = new InterProcessMutex(client, key);
Map<String, InterProcessMutex> processMutexMap = threadLocalLockMap.get();
if (null == processMutexMap) {
processMutexMap = new HashMap<>();
threadLocalLockMap.set(processMutexMap);
}
InterProcessMutex interProcessMutex = null;
try {
interProcessMutex =
Optional.ofNullable(processMutexMap.get(key)).orElse(new InterProcessMutex(client, key));
if (interProcessMutex.isAcquiredInThisProcess()) {
return true;
}
Map<String, InterProcessMutex> processMutexMap = threadLocalLockMap.get();
if (null == processMutexMap) {
processMutexMap = new HashMap<>();
threadLocalLockMap.set(processMutexMap);
if (interProcessMutex.acquire(timeout, MILLISECONDS)) {
processMutexMap.put(key, interProcessMutex);
return true;
}
interProcessMutex.acquire(timeout, MILLISECONDS);
processMutexMap.put(key, interProcessMutex);
return true;
return false;
} catch (Exception e) {
try {
interProcessMutex.release();
if (interProcessMutex != null) {
interProcessMutex.release();
}
throw new RegistryException(String.format("zookeeper get lock: %s error", key), e);
} catch (Exception exception) {
throw new RegistryException(String.format("zookeeper get lock: %s error", key), e);
@ -269,11 +276,12 @@ public final class ZookeeperRegistry implements Registry {
if (processMutexMap == null) {
return true;
}
if (null == processMutexMap.get(key)) {
InterProcessMutex interProcessMutex = processMutexMap.get(key);
if (null == interProcessMutex) {
return false;
}
try {
processMutexMap.get(key).release();
interProcessMutex.release();
processMutexMap.remove(key);
if (processMutexMap.isEmpty()) {
threadLocalLockMap.remove();

4
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryAutoConfiguration.java

@ -40,7 +40,9 @@ public class ZookeeperRegistryAutoConfiguration {
@Bean
@ConditionalOnMissingBean(value = Registry.class)
public ZookeeperRegistry zookeeperRegistry(ZookeeperRegistryProperties zookeeperRegistryProperties) {
return new ZookeeperRegistry(zookeeperRegistryProperties);
ZookeeperRegistry zookeeperRegistry = new ZookeeperRegistry(zookeeperRegistryProperties);
zookeeperRegistry.start();
return zookeeperRegistry;
}
}

67
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryProperties.java

@ -17,24 +17,87 @@
package org.apache.dolphinscheduler.plugin.registry.zookeeper;
import org.apache.commons.lang3.StringUtils;
import java.time.Duration;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.Errors;
import org.springframework.validation.Validator;
@Slf4j
@Data
@NoArgsConstructor
@AllArgsConstructor
@Configuration
@ConfigurationProperties(prefix = "registry")
public class ZookeeperRegistryProperties {
class ZookeeperRegistryProperties implements Validator {
private ZookeeperProperties zookeeper = new ZookeeperProperties();
private String type;
@Override
public boolean supports(Class<?> clazz) {
return ZookeeperRegistryProperties.class.isAssignableFrom(clazz);
}
@Override
public void validate(Object target, Errors errors) {
ZookeeperRegistryProperties zookeeperRegistryProperties = (ZookeeperRegistryProperties) target;
if (zookeeperRegistryProperties.getZookeeper() == null) {
errors.rejectValue("zookeeper", "zookeeper", "zookeeper properties is required");
}
ZookeeperProperties zookeeper = zookeeperRegistryProperties.getZookeeper();
if (StringUtils.isEmpty(zookeeper.getNamespace())) {
errors.rejectValue("zookeeper.namespace", "", "zookeeper.namespace cannot be null");
}
if (StringUtils.isEmpty(zookeeper.getConnectString())) {
errors.rejectValue("zookeeper.connectString", "", "zookeeper.connectString cannot be null");
}
if (zookeeper.getRetryPolicy() == null) {
errors.rejectValue("zookeeper.retryPolicy", "", "zookeeper.retryPolicy cannot be null");
}
if (zookeeper.getSessionTimeout() == null || zookeeper.getSessionTimeout().isZero()
|| zookeeper.getSessionTimeout().isNegative()) {
errors.rejectValue("zookeeper.sessionTimeout", "", "zookeeper.sessionTimeout should be positive");
}
if (zookeeper.getConnectionTimeout() == null || zookeeper.getConnectionTimeout().isZero()
|| zookeeper.getConnectionTimeout().isNegative()) {
errors.rejectValue("zookeeper.connectionTimeout", "", "zookeeper.connectionTimeout should be positive");
}
if (zookeeper.getBlockUntilConnected() == null || zookeeper.getBlockUntilConnected().isZero()
|| zookeeper.getBlockUntilConnected().isNegative()) {
errors.rejectValue("zookeeper.blockUntilConnected", "", "zookeeper.blockUntilConnected should be positive");
}
printConfig();
}
private void printConfig() {
String config =
"\n****************************ZookeeperRegistryProperties**************************************" +
"\n namespace -> " + zookeeper.getNamespace() +
"\n connectString -> " + zookeeper.getConnectString() +
"\n retryPolicy -> " + zookeeper.getRetryPolicy() +
"\n digest -> " + zookeeper.getDigest() +
"\n sessionTimeout -> " + zookeeper.getSessionTimeout() +
"\n connectionTimeout -> " + zookeeper.getConnectionTimeout() +
"\n blockUntilConnected -> " + zookeeper.getBlockUntilConnected() +
"\n****************************ZookeeperRegistryProperties**************************************";
log.info(config);
}
@Data
public static final class ZookeeperProperties {
private String namespace;
private String namespace = "dolphinscheduler";
private String connectString;
private RetryPolicy retryPolicy = new RetryPolicy();
private String digest;

131
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTest.java

@ -1,131 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.zookeeper;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.curator.test.TestingServer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZookeeperRegistryTest {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperRegistryTest.class);
TestingServer server;
ZookeeperRegistry registry;
@BeforeEach
public void before() throws Exception {
server = new TestingServer(true);
ZookeeperRegistryProperties p = new ZookeeperRegistryProperties();
p.getZookeeper().setConnectString(server.getConnectString());
registry = new ZookeeperRegistry(p);
registry.start();
registry.put("/sub", "", false);
}
@Test
public void persistTest() {
registry.put("/nodes/m1", "", false);
registry.put("/nodes/m2", "", false);
Assertions.assertEquals(Arrays.asList("m2", "m1"), registry.children("/nodes"));
Assertions.assertTrue(registry.exists("/nodes/m1"));
registry.delete("/nodes/m2");
Assertions.assertFalse(registry.exists("/nodes/m2"));
}
@Test
public void lockTest() throws InterruptedException {
CountDownLatch preCountDownLatch = new CountDownLatch(1);
CountDownLatch allCountDownLatch = new CountDownLatch(2);
List<String> testData = new ArrayList<>();
new Thread(() -> {
registry.acquireLock("/lock");
preCountDownLatch.countDown();
logger.info(Thread.currentThread().getName()
+ " :I got the lock, but I don't want to work. I want to rest for a while");
try {
Thread.sleep(1000);
logger.info(Thread.currentThread().getName() + " :I'm going to start working");
testData.add("thread1");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
logger.info(Thread.currentThread().getName() + " :I have finished my work, now I release the lock");
registry.releaseLock("/lock");
allCountDownLatch.countDown();
}
}).start();
preCountDownLatch.await(5, TimeUnit.SECONDS);
new Thread(() -> {
try {
logger.info(Thread.currentThread().getName() + " :I am trying to acquire the lock");
registry.acquireLock("/lock");
logger.info(Thread.currentThread().getName() + " :I got the lock and I started working");
testData.add("thread2");
} finally {
registry.releaseLock("/lock");
allCountDownLatch.countDown();
}
}).start();
allCountDownLatch.await(5, TimeUnit.SECONDS);
Assertions.assertEquals(testData, Arrays.asList("thread1", "thread2"));
}
@Test
public void subscribeTest() {
boolean status = registry.subscribe("/sub", new TestListener());
Assertions.assertTrue(status);
}
static class TestListener implements SubscribeListener {
@Override
public void notify(Event event) {
logger.info("I'm test listener");
}
}
@AfterEach
public void after() throws IOException {
registry.close();
server.close();
}
}

71
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistryTestCase.java

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.registry.zookeeper;
import org.apache.dolphinscheduler.plugin.registry.RegistryTestCase;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import com.google.common.collect.Lists;
@SpringBootTest(classes = ZookeeperRegistryProperties.class)
@SpringBootApplication(scanBasePackageClasses = ZookeeperRegistryProperties.class)
class ZookeeperRegistryTestCase extends RegistryTestCase<ZookeeperRegistry> {
@Autowired
private ZookeeperRegistryProperties zookeeperRegistryProperties;
private static GenericContainer<?> zookeeperContainer;
private static final Network NETWORK = Network.newNetwork();
@SneakyThrows
@BeforeAll
public static void setUpTestingServer() {
zookeeperContainer = new GenericContainer<>(DockerImageName.parse("zookeeper:3.8"))
.withNetwork(NETWORK);
zookeeperContainer.setPortBindings(Lists.newArrayList("2181:2181"));
Startables.deepStart(Stream.of(zookeeperContainer)).join();
System.setProperty("registry.zookeeper.connect-string", "localhost:2181");
}
@SneakyThrows
@Override
public ZookeeperRegistry createRegistry() {
return new ZookeeperRegistry(zookeeperRegistryProperties);
}
@SneakyThrows
@AfterAll
public static void tearDownTestingServer() {
zookeeperContainer.close();
}
}

30
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/resources/application.yaml

@ -0,0 +1,30 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
registry:
type: zookeeper
zookeeper:
namespace: dolphinscheduler
connect-string: 127.0.0.1:2181
retry-policy:
base-sleep-time: 60ms
max-sleep: 300ms
max-retries: 5
session-timeout: 30s
connection-timeout: 9s
block-until-connected: 600ms
digest: ~

21
dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<logger name="*" level="ERROR"/>
</configuration>

1
dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml

@ -32,5 +32,6 @@
<module>dolphinscheduler-registry-zookeeper</module>
<module>dolphinscheduler-registry-jdbc</module>
<module>dolphinscheduler-registry-etcd</module>
<module>dolphinscheduler-registry-it</module>
</modules>
</project>

21
dolphinscheduler-storage-plugin/dolphinscheduler-storage-abs/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<logger name="*" level="ERROR"/>
</configuration>

21
dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<logger name="*" level="ERROR"/>
</configuration>

21
dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<logger name="*" level="ERROR"/>
</configuration>

21
dolphinscheduler-storage-plugin/dolphinscheduler-storage-obs/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<logger name="*" level="ERROR"/>
</configuration>

21
dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<logger name="*" level="ERROR"/>
</configuration>

21
dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/test/resources/logback.xml

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<logger name="*" level="ERROR"/>
</configuration>

24
pom.xml

@ -73,7 +73,6 @@
<maven-javadoc-plugin.version>2.10.3</maven-javadoc-plugin.version>
<maven-source-plugin.version>2.4</maven-source-plugin.version>
<maven-surefire-plugin.version>3.0.0-M6</maven-surefire-plugin.version>
<maven-failsafe-plugin.version>3.0.0-M6</maven-failsafe-plugin.version>
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<maven-shade-plugin.version>3.2.1</maven-shade-plugin.version>
<rpm-maven-plugion.version>2.2.0</rpm-maven-plugion.version>
@ -100,7 +99,6 @@
<spotless.skip>false</spotless.skip>
<skipUT>false</skipUT>
<skipIT>true</skipIT>
</properties>
<dependencyManagement>
@ -446,6 +444,7 @@
<artifactId>maven-javadoc-plugin</artifactId>
<version>${maven-javadoc-plugin.version}</version>
<configuration>
<quiet>true</quiet>
<source>8</source>
<failOnError>false</failOnError>
</configuration>
@ -609,33 +608,12 @@
<version>${maven-surefire-plugin.version}</version>
<configuration>
<skip>${skipUT}</skip>
<excludes>
<exclude>**/*IT.java</exclude>
</excludes>
<systemPropertyVariables>
<jacoco-agent.destfile>${project.build.directory}/jacoco.exec</jacoco-agent.destfile>
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${maven-failsafe-plugin.version}</version>
<configuration>
<skip>${skipIT}</skip>
<additionalClasspathElements>dolphinscheduler-dao/src/main/resources</additionalClasspathElements>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- jenkins plugin jacoco report-->
<plugin>
<groupId>org.jacoco</groupId>

Loading…
Cancel
Save