From 8e93939121043b828f73ea2898c6102aa858524b Mon Sep 17 00:00:00 2001 From: wind Date: Tue, 30 Nov 2021 08:29:25 +0800 Subject: [PATCH] [DS-6963][MasterServer]add cache manager for tenant and user (#6974) * cache manager for tenant and user * add test * fix checkstyle * add dependencies * fix sonar check * use spring boot cache configuration * add license * check style * check style * add license * change proxy name to processor * add license header * check style * add license file * add license file * add license Co-authored-by: caishunfeng <534328519@qq.com> --- .../api/service/impl/QueueServiceImpl.java | 14 +- .../api/service/impl/TenantServiceImpl.java | 29 ++- .../api/service/impl/UsersServiceImpl.java | 24 ++- .../common/enums/CacheType.java | 24 +++ dolphinscheduler-dist/release-docs/LICENSE | 3 + .../licenses/LICENSE-caffeine.txt | 202 ++++++++++++++++++ .../licenses/LICENSE-checker-qual.txt | 22 ++ .../LICENSE-spring-boot-starter-cache.txt | 202 ++++++++++++++++++ .../remote/command/CacheExpireCommand.java | 73 +++++++ .../remote/command/CommandType.java | 7 +- .../command/cache/CacheExpireCommandTest.java | 39 ++++ dolphinscheduler-server/pom.xml | 14 ++ .../server/master/MasterServer.java | 4 + .../master/processor/CacheProcessor.java | 58 +++++ .../main/resources/application-master.yaml | 8 + .../master/processor/CacheProcessorTest.java | 77 +++++++ dolphinscheduler-service/pom.xml | 5 + .../cache/processor/BaseCacheProcessor.java | 22 ++ .../cache/processor/QueueCacheProcessor.java | 22 ++ .../cache/processor/TenantCacheProcessor.java | 26 +++ .../cache/processor/UserCacheProcessor.java | 26 +++ .../processor/impl/CacheProcessorFactory.java | 58 +++++ .../impl/QueueCacheProcessorImpl.java | 50 +++++ .../impl/TenantCacheProcessorImpl.java | 64 ++++++ .../impl/UserCacheProcessorImpl.java | 59 +++++ .../cache/service/CacheNotifyService.java | 24 +++ .../service/impl/CacheNotifyServiceImpl.java | 132 ++++++++++++ .../service/process/ProcessService.java | 25 +-- .../service/cache/CacheNotifyServiceTest.java | 86 ++++++++ .../processor/QueueCacheProcessorTest.java | 60 ++++++ .../processor/TenantCacheProcessorTest.java | 78 +++++++ .../cache/processor/UserCacheProxyTest.java | 76 +++++++ .../service/process/ProcessServiceTest.java | 157 +++++++------- tools/dependencies/known-dependencies.txt | 3 + 34 files changed, 1666 insertions(+), 107 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java create mode 100644 dolphinscheduler-dist/release-docs/licenses/LICENSE-caffeine.txt create mode 100644 dolphinscheduler-dist/release-docs/licenses/LICENSE-checker-qual.txt create mode 100644 dolphinscheduler-dist/release-docs/licenses/LICENSE-spring-boot-starter-cache.txt create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java create mode 100644 dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/BaseCacheProcessor.java create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessor.java create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessor.java create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProcessor.java create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/CacheProcessorFactory.java create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/QueueCacheProcessorImpl.java create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/TenantCacheProcessorImpl.java create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/UserCacheProcessorImpl.java create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/CacheNotifyService.java create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/impl/CacheNotifyServiceImpl.java create mode 100644 dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java create mode 100644 dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessorTest.java create mode 100644 dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessorTest.java create mode 100644 dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProxyTest.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java index e29883307e..0ad0cf7cd1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java @@ -22,10 +22,13 @@ import org.apache.dolphinscheduler.api.service.QueueService; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CacheType; import org.apache.dolphinscheduler.dao.entity.Queue; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.QueueMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; +import org.apache.dolphinscheduler.remote.command.CacheExpireCommand; +import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService; import org.apache.commons.lang.StringUtils; @@ -56,6 +59,9 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService { @Autowired private UserMapper userMapper; + @Autowired + private CacheNotifyService cacheNotifyService; + /** * query queue list * @@ -89,7 +95,7 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService { public Result queryList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { Result result = new Result(); if (!isAdmin(loginUser)) { - putMsg(result,Status.USER_NO_OPERATION_PERM); + putMsg(result, Status.USER_NO_OPERATION_PERM); return result; } @@ -222,6 +228,8 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService { queueMapper.updateById(queueObj); + cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.QUEUE, queueObj).convert2Command()); + putMsg(result, Status.SUCCESS); return result; @@ -230,7 +238,7 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService { /** * verify queue and queueName * - * @param queue queue + * @param queue queue * @param queueName queue name * @return true if the queue name not exists, otherwise return false */ @@ -319,7 +327,7 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService { * @param newQueue new queue name * @return true if need to update user */ - private boolean checkIfQueueIsInUsing (String oldQueue, String newQueue) { + private boolean checkIfQueueIsInUsing(String oldQueue, String newQueue) { return !oldQueue.equals(newQueue) && userMapper.existUser(oldQueue) == Boolean.TRUE; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java index d1320e4221..bda7170d6a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.RegexUtils; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CacheType; import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; @@ -33,6 +34,8 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; +import org.apache.dolphinscheduler.remote.command.CacheExpireCommand; +import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; @@ -67,13 +70,16 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService @Autowired private UserMapper userMapper; + @Autowired + private CacheNotifyService cacheNotifyService; + /** * create tenant * - * @param loginUser login user + * @param loginUser login user * @param tenantCode tenant code - * @param queueId queue id - * @param desc description + * @param queueId queue id + * @param desc description * @return create result code * @throws Exception exception */ @@ -135,7 +141,7 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService Result result = new Result(); if (!isAdmin(loginUser)) { - putMsg(result,Status.USER_NO_OPERATION_PERM); + putMsg(result, Status.USER_NO_OPERATION_PERM); return result; } @@ -154,11 +160,11 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService /** * updateProcessInstance tenant * - * @param loginUser login user - * @param id tenant id + * @param loginUser login user + * @param id tenant id * @param tenantCode tenant code - * @param queueId queue id - * @param desc description + * @param queueId queue id + * @param desc description * @return update result code * @throws Exception exception */ @@ -213,6 +219,9 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService tenant.setUpdateTime(now); tenantMapper.updateById(tenant); + // notify master to expire cache + cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.TENANT, tenant).convert2Command()); + result.put(Constants.STATUS, Status.SUCCESS); result.put(Constants.MSG, Status.SUCCESS.getMsg()); return result; @@ -271,6 +280,10 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService tenantMapper.deleteById(id); processInstanceMapper.updateProcessInstanceByTenantId(id, -1); + + // notify master to expire cache + cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.TENANT, tenant).convert2Command()); + putMsg(result, Status.SUCCESS); return result; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java index a98d45f4ae..a3c8e8b5a5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java @@ -26,8 +26,8 @@ import org.apache.dolphinscheduler.api.utils.CheckUtils; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CacheType; import org.apache.dolphinscheduler.common.enums.Flag; -import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.EncryptionUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; @@ -53,6 +53,9 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils; +import org.apache.dolphinscheduler.remote.command.CacheExpireCommand; +import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService; +import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; @@ -118,6 +121,9 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService { @Autowired private ProjectMapper projectMapper; + @Autowired + private CacheNotifyService cacheNotifyService; + /** * create user, only system admin have permission @@ -322,7 +328,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService { public Result queryUserList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { Result result = new Result(); if (!isAdmin(loginUser)) { - putMsg(result,Status.USER_NO_OPERATION_PERM); + putMsg(result, Status.USER_NO_OPERATION_PERM); return result; } @@ -342,8 +348,6 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService { /** * updateProcessInstance user * - * - * @param loginUser * @param userId user id * @param userName user name * @param userPassword user password @@ -474,6 +478,8 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService { // updateProcessInstance user userMapper.updateById(user); + cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.USER, user).convert2Command()); + putMsg(result, Status.SUCCESS); return result; } @@ -521,8 +527,13 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService { } accessTokenMapper.deleteAccessTokenByUserId(id); - + userMapper.deleteById(id); + + if (user != null) { + cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.USER, user).convert2Command()); + } + putMsg(result, Status.SUCCESS); return result; @@ -1066,6 +1077,9 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService { Date now = new Date(); user.setUpdateTime(now); userMapper.updateById(user); + + cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.USER, user).convert2Command()); + User responseUser = userMapper.queryByUserNameAccurately(userName); putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, responseUser); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java new file mode 100644 index 0000000000..db31eeac77 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.enums; + +public enum CacheType { + TENANT, + USER, + QUEUE; +} diff --git a/dolphinscheduler-dist/release-docs/LICENSE b/dolphinscheduler-dist/release-docs/LICENSE index a9abb80840..5be5dde8d1 100644 --- a/dolphinscheduler-dist/release-docs/LICENSE +++ b/dolphinscheduler-dist/release-docs/LICENSE @@ -225,6 +225,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. aws-sdk-java 1.7.4: https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk/1.7.4, Apache 2.0 bonecp 0.8.0.RELEASE: https://github.com/wwadge/bonecp, Apache 2.0 byte-buddy 1.9.16: https://mvnrepository.com/artifact/net.bytebuddy/byte-buddy/1.9.16, Apache 2.0 + caffeine 2.9.2: https://mvnrepository.com/artifact/com.github.ben-manes.caffeine/caffeine/2.9.2, Apache 2.0 classmate 1.5.1: https://mvnrepository.com/artifact/com.fasterxml/classmate/1.5.1, Apache 2.0 clickhouse-jdbc 0.1.52: https://mvnrepository.com/artifact/ru.yandex.clickhouse/clickhouse-jdbc/0.1.52, Apache 2.0 commons-beanutils 1.9.4 https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils/1.9.4, Apache 2.0 @@ -369,6 +370,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. spring-boot-starter-logging 2.5.6: https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-logging/2.5.6, Apache 2.0 spring-boot-starter-quartz 2.5.6: https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-quartz/2.5.6, Apache 2.0 spring-boot-starter-web 2.5.6: https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web/2.5.6, Apache 2.0 + spring-boot-starter-cache 2.5.6: https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-cache/2.5.6, Apache 2.0 spring-context 5.3.12: https://mvnrepository.com/artifact/org.springframework/spring-context/5.3.12, Apache 2.0 spring-context-support 5.3.12: https://mvnrepository.com/artifact/org.springframework/spring-context-support/5.3.12, Apache 2.0 spring-core 5.3.12: https://mvnrepository.com/artifact/org.springframework/spring-core, Apache 2.0 @@ -472,6 +474,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. slf4j-api 1.7.5: https://mvnrepository.com/artifact/org.slf4j/slf4j-api/1.7.5, MIT animal-sniffer-annotations 1.14 https://mvnrepository.com/artifact/org.codehaus.mojo/animal-sniffer-annotations/1.14, MIT checker-compat-qual 2.0.0 https://mvnrepository.com/artifact/org.checkerframework/checker-compat-qual/2.0.0, MIT + GPLv2 + checker-qual 3.10.0 https://mvnrepository.com/artifact/org.checkerframework/checker-qual/3.10.0, MIT + GPLv2 Java-WebSocket 1.5.1: https://github.com/TooTallNate/Java-WebSocket MIT ======================================================================== diff --git a/dolphinscheduler-dist/release-docs/licenses/LICENSE-caffeine.txt b/dolphinscheduler-dist/release-docs/licenses/LICENSE-caffeine.txt new file mode 100644 index 0000000000..0b42f8f262 --- /dev/null +++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-caffeine.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/dolphinscheduler-dist/release-docs/licenses/LICENSE-checker-qual.txt b/dolphinscheduler-dist/release-docs/licenses/LICENSE-checker-qual.txt new file mode 100644 index 0000000000..7b59b5c982 --- /dev/null +++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-checker-qual.txt @@ -0,0 +1,22 @@ +Checker Framework qualifiers +Copyright 2004-present by the Checker Framework developers + +MIT License: + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/dolphinscheduler-dist/release-docs/licenses/LICENSE-spring-boot-starter-cache.txt b/dolphinscheduler-dist/release-docs/licenses/LICENSE-spring-boot-starter-cache.txt new file mode 100644 index 0000000000..82714d7648 --- /dev/null +++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-spring-boot-starter-cache.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java new file mode 100644 index 0000000000..26170af13b --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.common.enums.CacheType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; + +import java.io.Serializable; + +/** + * db task ack request command + */ +public class CacheExpireCommand implements Serializable { + + private CacheType cacheType; + private Class updateObjClass; + private String updateObjJson; + + public CacheExpireCommand() { + super(); + } + + public CacheExpireCommand(CacheType cacheType, Object updateObj) { + this.cacheType = cacheType; + this.updateObjClass = updateObj.getClass(); + this.updateObjJson = JSONUtils.toJsonString(updateObj); + } + + public CacheType getCacheType() { + return cacheType; + } + + public Class getUpdateObjClass() { + return updateObjClass; + } + + public String getUpdateObjJson() { + return updateObjJson; + } + + /** + * package command + * + * @return command + */ + public Command convert2Command() { + Command command = new Command(); + command.setType(CommandType.CACHE_EXPIRE); + byte[] body = JSONUtils.toJsonByteArray(this); + command.setBody(body); + return command; + } + + @Override + public String toString() { + return String.format("CacheExpireCommand{CacheType=%s, updateObjClass=%s, updateObjJson=%s}", cacheType, updateObjClass, updateObjJson); + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index 786d10c209..cf2cfe51bb 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -136,5 +136,10 @@ public enum CommandType { /** * state event request */ - STATE_EVENT_REQUEST; + STATE_EVENT_REQUEST, + + /** + * cache expire + */ + CACHE_EXPIRE; } diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java new file mode 100644 index 0000000000..1c91d15fb0 --- /dev/null +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command.cache; + +import org.apache.dolphinscheduler.common.enums.CacheType; +import org.apache.dolphinscheduler.remote.command.CacheExpireCommand; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; + +import org.junit.Assert; +import org.junit.Test; + +public class CacheExpireCommandTest { + + @Test + public void testConvert2Command() { + CacheExpireCommand cacheExpireCommand = new CacheExpireCommand(CacheType.TENANT, 1); + Assert.assertEquals(Integer.class, cacheExpireCommand.getUpdateObjClass()); + Assert.assertEquals("1", cacheExpireCommand.getUpdateObjJson()); + + Command command = cacheExpireCommand.convert2Command(); + Assert.assertEquals(CommandType.CACHE_EXPIRE, command.getType()); + } +} diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml index a8c20d3c0e..0ca83d5af5 100644 --- a/dolphinscheduler-server/pom.xml +++ b/dolphinscheduler-server/pom.xml @@ -130,6 +130,20 @@ spring-boot-test test + + org.springframework.boot + spring-boot-starter-cache + + + log4j-api + org.apache.logging.log4j + + + log4j-to-slf4j + org.apache.logging.log4j + + + diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index cbc0d6af1e..c1e22f8d07 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.processor.CacheProcessor; import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; @@ -39,6 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.FilterType; import org.springframework.transaction.annotation.EnableTransactionManagement; @@ -52,6 +54,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement; }) }) @EnableTransactionManagement +@EnableCaching public class MasterServer implements IStoppable { private static final Logger logger = LoggerFactory.getLogger(MasterServer.class); @@ -92,6 +95,7 @@ public class MasterServer implements IStoppable { this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, new StateEventProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, new CacheProcessor()); this.nettyRemotingServer.start(); // self tolerant diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java new file mode 100644 index 0000000000..64571b8fca --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.processor; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.remote.command.CacheExpireCommand; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.cache.processor.impl.CacheProcessorFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +import io.netty.channel.Channel; + +/** + * cache process from master/api + */ +public class CacheProcessor implements NettyRequestProcessor { + + private final Logger logger = LoggerFactory.getLogger(CacheProcessor.class); + + private CacheProcessorFactory cacheProcessorFactory; + + public CacheProcessor() { + this.cacheProcessorFactory = SpringApplicationContext.getBean(CacheProcessorFactory.class); + } + + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.CACHE_EXPIRE == command.getType(), String.format("invalid command type: %s", command.getType())); + + CacheExpireCommand cacheExpireCommand = JSONUtils.parseObject(command.getBody(), CacheExpireCommand.class); + + logger.info("received command : {}", cacheExpireCommand); + + cacheProcessorFactory.getCacheProcessor(cacheExpireCommand.getCacheType()).cacheExpire(cacheExpireCommand.getUpdateObjClass(), cacheExpireCommand.getUpdateObjJson()); + } +} diff --git a/dolphinscheduler-server/src/main/resources/application-master.yaml b/dolphinscheduler-server/src/main/resources/application-master.yaml index c1d3a8e187..86796c1195 100644 --- a/dolphinscheduler-server/src/main/resources/application-master.yaml +++ b/dolphinscheduler-server/src/main/resources/application-master.yaml @@ -17,6 +17,14 @@ spring: application: name: master-server + cache: + # default enable cache, you can disable by `type: none` + type: caffeine + cache-names: + - tenant + - user + caffeine: + spec: maximumSize=100,expireAfterWrite=60s,recordStats master: listen-port: 5678 diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java new file mode 100644 index 0000000000..02f32160c4 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.processor; + +import org.apache.dolphinscheduler.common.enums.CacheType; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.remote.command.CacheExpireCommand; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor; +import org.apache.dolphinscheduler.service.cache.processor.impl.CacheProcessorFactory; +import org.apache.dolphinscheduler.service.cache.processor.impl.TenantCacheProcessorImpl; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import io.netty.channel.Channel; + +/** + * task ack processor test + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({SpringApplicationContext.class}) +public class CacheProcessorTest { + + private CacheProcessor cacheProcessor; + + @InjectMocks + private TenantCacheProcessorImpl tenantCacheProcessor; + + @Mock + private Channel channel; + + @Mock + private CacheProcessorFactory cacheProcessorFactory; + + @Before + public void before() { + PowerMockito.mockStatic(SpringApplicationContext.class); + PowerMockito.when(SpringApplicationContext.getBean(TenantCacheProcessor.class)).thenReturn(tenantCacheProcessor); + PowerMockito.when(SpringApplicationContext.getBean(CacheProcessorFactory.class)).thenReturn(cacheProcessorFactory); + Mockito.when(cacheProcessorFactory.getCacheProcessor(CacheType.TENANT)).thenReturn(tenantCacheProcessor); + cacheProcessor = new CacheProcessor(); + } + + @Test + public void testProcess() { + Tenant tenant = new Tenant(); + tenant.setId(1); + CacheExpireCommand cacheExpireCommand = new CacheExpireCommand(CacheType.TENANT, tenant); + Command command = cacheExpireCommand.convert2Command(); + + cacheProcessor.process(channel, command); + } +} diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-service/pom.xml index 50b8736198..c4f54c3d35 100644 --- a/dolphinscheduler-service/pom.xml +++ b/dolphinscheduler-service/pom.xml @@ -90,6 +90,11 @@ micrometer-core provided + + + com.github.ben-manes.caffeine + caffeine + diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/BaseCacheProcessor.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/BaseCacheProcessor.java new file mode 100644 index 0000000000..2b0dc8be8d --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/BaseCacheProcessor.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.cache.processor; + +public interface BaseCacheProcessor { + void cacheExpire(Class updateObjClass, String updateObjJson); +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessor.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessor.java new file mode 100644 index 0000000000..4b438eb0ac --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessor.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.cache.processor; + +public interface QueueCacheProcessor extends BaseCacheProcessor { + public void expireAllUserCache(); +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessor.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessor.java new file mode 100644 index 0000000000..c6b8000547 --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessor.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.cache.processor; + +import org.apache.dolphinscheduler.dao.entity.Tenant; + +public interface TenantCacheProcessor extends BaseCacheProcessor { + void update(int tenantId); + + Tenant queryById(int tenantId); +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProcessor.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProcessor.java new file mode 100644 index 0000000000..1d93b42ea8 --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProcessor.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.cache.processor; + +import org.apache.dolphinscheduler.dao.entity.User; + +public interface UserCacheProcessor extends BaseCacheProcessor { + void update(int userId); + + User selectById(int userId); +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/CacheProcessorFactory.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/CacheProcessorFactory.java new file mode 100644 index 0000000000..16a6ccd9ad --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/CacheProcessorFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.cache.processor.impl; + +import org.apache.dolphinscheduler.common.enums.CacheType; +import org.apache.dolphinscheduler.service.cache.processor.BaseCacheProcessor; +import org.apache.dolphinscheduler.service.cache.processor.QueueCacheProcessor; +import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor; +import org.apache.dolphinscheduler.service.cache.processor.UserCacheProcessor; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.annotation.PostConstruct; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class CacheProcessorFactory { + + @Autowired + private TenantCacheProcessor tenantCacheProcessor; + + @Autowired + private UserCacheProcessor userCacheProcessor; + + @Autowired + private QueueCacheProcessor queueCacheProcessor; + + Map cacheProcessorMap = new ConcurrentHashMap<>(); + + @PostConstruct + private void init() { + cacheProcessorMap.put(CacheType.TENANT, tenantCacheProcessor); + cacheProcessorMap.put(CacheType.USER, userCacheProcessor); + cacheProcessorMap.put(CacheType.QUEUE, queueCacheProcessor); + } + + public BaseCacheProcessor getCacheProcessor(CacheType cacheType) { + return cacheProcessorMap.get(cacheType); + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/QueueCacheProcessorImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/QueueCacheProcessorImpl.java new file mode 100644 index 0000000000..174d59fba9 --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/QueueCacheProcessorImpl.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.cache.processor.impl; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Queue; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.cache.processor.QueueCacheProcessor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cache.annotation.CacheEvict; +import org.springframework.stereotype.Component; + +@Component +public class QueueCacheProcessorImpl implements QueueCacheProcessor { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + @Override + @CacheEvict(cacheNames = "user", allEntries = true) + public void expireAllUserCache() { + // just evict cache + logger.debug("expire all user cache"); + } + + @Override + public void cacheExpire(Class updateObjClass, String updateObjJson) { + Queue updateQueue = (Queue) JSONUtils.parseObject(updateObjJson, updateObjClass); + if (updateQueue == null) { + return; + } + SpringApplicationContext.getBean(QueueCacheProcessor.class).expireAllUserCache(); + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/TenantCacheProcessorImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/TenantCacheProcessorImpl.java new file mode 100644 index 0000000000..3ca801400b --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/TenantCacheProcessorImpl.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.cache.processor.impl; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.mapper.TenantMapper; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cache.annotation.CacheConfig; +import org.springframework.cache.annotation.CacheEvict; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.stereotype.Component; + +@Component +@CacheConfig(cacheNames = "tenant") +public class TenantCacheProcessorImpl implements TenantCacheProcessor { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + @Autowired + private TenantMapper tenantMapper; + + @Override + @CacheEvict + public void update(int tenantId) { + // just evict cache + } + + @Override + @Cacheable(sync = true) + public Tenant queryById(int tenantId) { + logger.debug("tenant cache proxy, tenantId:{}", tenantId); + return tenantMapper.queryById(tenantId); + } + + @Override + public void cacheExpire(Class updateObjClass, String updateObjJson) { + Tenant updateTenant = (Tenant) JSONUtils.parseObject(updateObjJson, updateObjClass); + if (updateTenant == null) { + return; + } + SpringApplicationContext.getBean(TenantCacheProcessor.class).update(updateTenant.getId()); + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/UserCacheProcessorImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/UserCacheProcessorImpl.java new file mode 100644 index 0000000000..fb25fb57be --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/UserCacheProcessorImpl.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.cache.processor.impl; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.UserMapper; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.cache.processor.UserCacheProcessor; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cache.annotation.CacheConfig; +import org.springframework.cache.annotation.CacheEvict; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.stereotype.Component; + +@Component +@CacheConfig(cacheNames = "user") +public class UserCacheProcessorImpl implements UserCacheProcessor { + + @Autowired + private UserMapper userMapper; + + @Override + @CacheEvict + public void update(int userId) { + // just evict cache + } + + @Override + @Cacheable(sync = true) + public User selectById(int userId) { + return userMapper.selectById(userId); + } + + @Override + public void cacheExpire(Class updateObjClass, String updateObjJson) { + User user = (User) JSONUtils.parseObject(updateObjJson, updateObjClass); + if (user == null) { + return; + } + SpringApplicationContext.getBean(UserCacheProcessor.class).update(user.getId()); + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/CacheNotifyService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/CacheNotifyService.java new file mode 100644 index 0000000000..3b051b6c81 --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/CacheNotifyService.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.cache.service; + +import org.apache.dolphinscheduler.remote.command.Command; + +public interface CacheNotifyService { + void notifyMaster(Command command); +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/impl/CacheNotifyServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/impl/CacheNotifyServiceImpl.java new file mode 100644 index 0000000000..1aa15c2788 --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/impl/CacheNotifyServiceImpl.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.cache.service.impl; + +import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.remote.NettyRemotingClient; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService; +import org.apache.dolphinscheduler.service.registry.RegistryClient; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import io.netty.channel.Channel; + +/** + * cache notify service + */ +@Service +public class CacheNotifyServiceImpl implements CacheNotifyService { + + private final Logger logger = LoggerFactory.getLogger(CacheNotifyServiceImpl.class); + + @Autowired + private RegistryClient registryClient; + + /** + * remote channels + */ + private static final ConcurrentHashMap REMOTE_CHANNELS = new ConcurrentHashMap<>(); + + /** + * netty remoting client + */ + private final NettyRemotingClient nettyRemotingClient; + + public CacheNotifyServiceImpl() { + final NettyClientConfig clientConfig = new NettyClientConfig(); + this.nettyRemotingClient = new NettyRemotingClient(clientConfig); + } + + /** + * add channel + * + * @param channel channel + */ + private void cache(Host host, NettyRemoteChannel channel) { + REMOTE_CHANNELS.put(host, channel); + } + + /** + * remove channel + */ + private void remove(Host host) { + REMOTE_CHANNELS.remove(host); + } + + /** + * get remote channel + * + * @return netty remote channel + */ + private NettyRemoteChannel getRemoteChannel(Host host) { + NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(host); + if (nettyRemoteChannel != null) { + if (nettyRemoteChannel.isActive()) { + return nettyRemoteChannel; + } else { + this.remove(host); + } + } + + Channel channel = nettyRemotingClient.getChannel(host); + if (channel == null) { + return null; + } + + NettyRemoteChannel remoteChannel = new NettyRemoteChannel(channel); + this.cache(host, remoteChannel); + return remoteChannel; + } + + /** + * send result to master + * + * @param command command + */ + @Override + public void notifyMaster(Command command) { + logger.info("send result, command:{}", command.toString()); + + List serverList = registryClient.getServerList(NodeType.MASTER); + if (CollectionUtils.isEmpty(serverList)) { + return; + } + + for (Server server : serverList) { + Host host = new Host(server.getHost(), server.getPort()); + NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(host); + if (nettyRemoteChannel == null) { + continue; + } + nettyRemoteChannel.writeAndFlush(command); + } + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index fbf29b3bfe..e7093417a2 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.service.process; -import static java.util.stream.Collectors.toSet; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; @@ -28,6 +27,8 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; +import static java.util.stream.Collectors.toSet; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -93,14 +94,14 @@ import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; -import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; -import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor; +import org.apache.dolphinscheduler.service.cache.processor.UserCacheProcessor; import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; @@ -147,7 +148,7 @@ public class ProcessService { ExecutionStatus.READY_STOP.ordinal()}; @Autowired - private UserMapper userMapper; + private UserCacheProcessor userCacheProcessor; @Autowired private ProcessDefinitionMapper processDefineMapper; @@ -186,7 +187,7 @@ public class ProcessService { private ErrorCommandMapper errorCommandMapper; @Autowired - private TenantMapper tenantMapper; + private TenantCacheProcessor tenantCacheProcessor; @Autowired private ProjectMapper projectMapper; @@ -725,7 +726,7 @@ public class ProcessService { public Tenant getTenantForProcess(int tenantId, int userId) { Tenant tenant = null; if (tenantId >= 0) { - tenant = tenantMapper.queryById(tenantId); + tenant = tenantCacheProcessor.queryById(tenantId); } if (userId == 0) { @@ -733,8 +734,8 @@ public class ProcessService { } if (tenant == null) { - User user = userMapper.selectById(userId); - tenant = tenantMapper.queryById(user.getTenantId()); + User user = userCacheProcessor.selectById(userId); + tenant = tenantCacheProcessor.queryById(user.getTenantId()); } return tenant; } @@ -1950,11 +1951,11 @@ public class ProcessService { return StringUtils.EMPTY; } int userId = resourceList.get(0).getUserId(); - User user = userMapper.selectById(userId); + User user = userCacheProcessor.selectById(userId); if (Objects.isNull(user)) { return StringUtils.EMPTY; } - Tenant tenant = tenantMapper.selectById(user.getTenantId()); + Tenant tenant = tenantCacheProcessor.queryById(user.getTenantId()); if (Objects.isNull(tenant)) { return StringUtils.EMPTY; } @@ -2024,7 +2025,7 @@ public class ProcessService { if (processInstance == null) { return queue; } - User executor = userMapper.selectById(processInstance.getExecutorId()); + User executor = userCacheProcessor.selectById(processInstance.getExecutorId()); if (executor != null) { queue = executor.getQueue(); } @@ -2135,7 +2136,7 @@ public class ProcessService { * @return User */ public User getUserById(int userId) { - return userMapper.selectById(userId); + return userCacheProcessor.selectById(userId); } /** diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java new file mode 100644 index 0000000000..83f1f00cef --- /dev/null +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.cache; + +import org.apache.dolphinscheduler.common.enums.CacheType; +import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.remote.NettyRemotingServer; +import org.apache.dolphinscheduler.remote.command.CacheExpireCommand; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.config.NettyServerConfig; +import org.apache.dolphinscheduler.service.cache.service.impl.CacheNotifyServiceImpl; +import org.apache.dolphinscheduler.service.registry.RegistryClient; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +/** + * tenant cache proxy test + */ +@RunWith(MockitoJUnitRunner.Silent.class) +public class CacheNotifyServiceTest { + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @InjectMocks + private CacheNotifyServiceImpl cacheNotifyService; + + @Mock + private RegistryClient registryClient; + + @Test + public void testNotifyMaster() { + User user1 = new User(); + user1.setId(100); + Command cacheExpireCommand = new CacheExpireCommand(CacheType.USER, user1).convert2Command(); + + NettyServerConfig serverConfig = new NettyServerConfig(); + + NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); + nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, (channel, command) -> { + Assert.assertEquals(cacheExpireCommand, command); + }); + nettyRemotingServer.start(); + + List serverList = new ArrayList<>(); + Server server = new Server(); + server.setHost("127.0.0.1"); + server.setPort(serverConfig.getListenPort()); + serverList.add(server); + + Mockito.when(registryClient.getServerList(NodeType.MASTER)).thenReturn(serverList); + + cacheNotifyService.notifyMaster(cacheExpireCommand); + + nettyRemotingServer.close(); + } +} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessorTest.java new file mode 100644 index 0000000000..ee61564512 --- /dev/null +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessorTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.cache.processor; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Queue; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.cache.processor.impl.QueueCacheProcessorImpl; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +/** + * tenant cache proxy test + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({SpringApplicationContext.class}) +public class QueueCacheProcessorTest { + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @InjectMocks + private QueueCacheProcessorImpl queueCacheProcessor; + + @Before + public void before() { + PowerMockito.mockStatic(SpringApplicationContext.class); + PowerMockito.when(SpringApplicationContext.getBean(QueueCacheProcessor.class)).thenReturn(queueCacheProcessor); + } + + @Test + public void testCacheExpire() { + Queue queue = new Queue(); + queue.setId(100); + queueCacheProcessor.cacheExpire(Queue.class, JSONUtils.toJsonString(queue)); + } +} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessorTest.java new file mode 100644 index 0000000000..4ecc970ccd --- /dev/null +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessorTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.cache.processor; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.mapper.TenantMapper; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.cache.processor.impl.TenantCacheProcessorImpl; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +/** + * tenant cache proxy test + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({SpringApplicationContext.class}) +public class TenantCacheProcessorTest { + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @InjectMocks + private TenantCacheProcessorImpl tenantCacheProcessor; + + @Mock + private TenantMapper tenantMapper; + + @Before + public void before() { + PowerMockito.mockStatic(SpringApplicationContext.class); + PowerMockito.when(SpringApplicationContext.getBean(TenantCacheProcessor.class)).thenReturn(tenantCacheProcessor); + } + + @Test + public void testQueryById() { + Tenant tenant1 = new Tenant(); + tenant1.setId(100); + tenant1.setDescription("test1"); + + Mockito.when(tenantMapper.queryById(100)).thenReturn(tenant1); + Assert.assertEquals(tenant1, tenantCacheProcessor.queryById(100)); + } + + @Test + public void testCacheExpire() { + Tenant tenant1 = new Tenant(); + tenant1.setId(100); + tenant1.setDescription("test1"); + tenantCacheProcessor.cacheExpire(Tenant.class, JSONUtils.toJsonString(tenant1)); + } +} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProxyTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProxyTest.java new file mode 100644 index 0000000000..f786f43c97 --- /dev/null +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProxyTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.cache.processor; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.UserMapper; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.cache.processor.impl.UserCacheProcessorImpl; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +/** + * tenant cache proxy test + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({SpringApplicationContext.class}) +public class UserCacheProxyTest { + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @InjectMocks + private UserCacheProcessorImpl userCacheProcessor; + + @Mock + private UserMapper userMapper; + + @Before + public void before() { + PowerMockito.mockStatic(SpringApplicationContext.class); + PowerMockito.when(SpringApplicationContext.getBean(UserCacheProcessor.class)).thenReturn(userCacheProcessor); + } + + @Test + public void testQueryById() { + User user1 = new User(); + user1.setId(100); + + Mockito.when(userMapper.selectById(100)).thenReturn(user1); + Assert.assertEquals(user1, userCacheProcessor.selectById(100)); + } + + @Test + public void testCacheExpire() { + User user = new User(); + user.setId(100); + userCacheProcessor.cacheExpire(User.class, JSONUtils.toJsonString(user)); + } +} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 06eae1efeb..346f120e5c 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.service.process; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; + import static org.mockito.ArgumentMatchers.any; import org.apache.dolphinscheduler.common.Constants; @@ -59,7 +60,7 @@ import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; -import org.apache.dolphinscheduler.dao.mapper.UserMapper; +import org.apache.dolphinscheduler.service.cache.processor.impl.UserCacheProcessorImpl; import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest; @@ -111,7 +112,7 @@ public class ProcessServiceTest { @Mock private ProcessInstanceMapper processInstanceMapper; @Mock - private UserMapper userMapper; + private UserCacheProcessorImpl userCacheProcessor; @Mock private TaskInstanceMapper taskInstanceMapper; @Mock @@ -253,7 +254,7 @@ public class ProcessServiceTest { command.setProcessDefinitionCode(222); command.setCommandType(CommandType.REPEAT_RUNNING); command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\"" - + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}"); + + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}"); Assert.assertNull(processService.handleCommand(logger, host, command, processDefinitionCacheMaps)); int definitionVersion = 1; @@ -287,7 +288,7 @@ public class ProcessServiceTest { Mockito.when(processDefineMapper.queryByCode(command1.getProcessDefinitionCode())).thenReturn(processDefinition); Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition)); + processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition)); Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance); Assert.assertNotNull(processService.handleCommand(logger, host, command1, processDefinitionCacheMaps)); @@ -366,7 +367,7 @@ public class ProcessServiceTest { command6.setCommandParam("{\"ProcessInstanceId\":223}"); command6.setCommandType(CommandType.RECOVER_SERIAL_WAIT); command6.setProcessDefinitionVersion(1); - Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L,Constants.RUNNING_PROCESS_STATE,223)).thenReturn(lists); + Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L, Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists); Mockito.when(processInstanceMapper.updateNextProcessIdById(223, 222)).thenReturn(true); Mockito.when(commandMapper.deleteById(6)).thenReturn(1); ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6, processDefinitionCacheMaps); @@ -387,7 +388,7 @@ public class ProcessServiceTest { command7.setCommandType(CommandType.RECOVER_SERIAL_WAIT); command7.setProcessDefinitionVersion(1); Mockito.when(commandMapper.deleteById(7)).thenReturn(1); - Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L,Constants.RUNNING_PROCESS_STATE,224)).thenReturn(null); + Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null); ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7, processDefinitionCacheMaps); Assert.assertTrue(processInstance8 == null); @@ -409,7 +410,7 @@ public class ProcessServiceTest { command9.setCommandType(CommandType.RECOVER_SERIAL_WAIT); command9.setProcessDefinitionVersion(1); Mockito.when(processInstanceMapper.queryDetailById(225)).thenReturn(processInstance9); - Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(12L,Constants.RUNNING_PROCESS_STATE,0)).thenReturn(lists); + Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(12L, Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists); Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1); Mockito.when(commandMapper.deleteById(9)).thenReturn(1); ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9, processDefinitionCacheMaps); @@ -460,7 +461,7 @@ public class ProcessServiceTest { public void testGetUserById() { User user = new User(); user.setId(123); - Mockito.when(userMapper.selectById(123)).thenReturn(user); + Mockito.when(userCacheProcessor.selectById(123)).thenReturn(user); Assert.assertEquals(user, processService.getUserById(123)); } @@ -502,7 +503,7 @@ public class ProcessServiceTest { processTaskRelationLog.setPostTaskVersion(postTaskVersion); relationLogList.add(processTaskRelationLog); Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(parentProcessDefineCode - , parentProcessDefineVersion)).thenReturn(relationLogList); + , parentProcessDefineVersion)).thenReturn(relationLogList); List taskDefinitionLogs = new ArrayList<>(); TaskDefinitionLog taskDefinitionLog1 = new TaskDefinitionLog(); @@ -538,14 +539,14 @@ public class ProcessServiceTest { operator.setUserType(UserType.GENERAL_USER); long projectCode = 751485690568704L; String taskJson = "[{\"code\":751500437479424,\"name\":\"aa\",\"version\":1,\"description\":\"\",\"delayTime\":0," - + "\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"sleep 1s\\necho 11\"," - + "\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"waitStartTimeout\":{}}," - + "\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"yarn\",\"failRetryTimes\":0,\"failRetryInterval\":1," - + "\"timeoutFlag\":\"OPEN\",\"timeoutNotifyStrategy\":\"FAILED\",\"timeout\":1,\"environmentCode\":751496815697920}," - + "{\"code\":751516889636864,\"name\":\"bb\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[]," - + "\"localParams\":[],\"rawScript\":\"echo 22\",\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]}," - + "\"waitStartTimeout\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\"," - + "\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":0,\"delayTime\":\"0\",\"environmentCode\":-1}]"; + + "\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"sleep 1s\\necho 11\"," + + "\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"waitStartTimeout\":{}}," + + "\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"yarn\",\"failRetryTimes\":0,\"failRetryInterval\":1," + + "\"timeoutFlag\":\"OPEN\",\"timeoutNotifyStrategy\":\"FAILED\",\"timeout\":1,\"environmentCode\":751496815697920}," + + "{\"code\":751516889636864,\"name\":\"bb\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[]," + + "\"localParams\":[],\"rawScript\":\"echo 22\",\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]}," + + "\"waitStartTimeout\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\"," + + "\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":0,\"delayTime\":\"0\",\"environmentCode\":-1}]"; List taskDefinitionLogs = JSONUtils.toList(taskJson, TaskDefinitionLog.class); TaskDefinitionLog taskDefinition = new TaskDefinitionLog(); taskDefinition.setCode(751500437479424L); @@ -636,10 +637,10 @@ public class ProcessServiceTest { processInstance.setId(62); taskInstance.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"\"}]"); taskInstance.setTaskParams("{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select id from tb_test limit 1\"," - + "\"udfs\":\"\",\"sqlType\":\"0\",\"sendEmail\":false,\"displayRows\":10,\"title\":\"\"," - + "\"groupId\":null,\"localParams\":[{\"prop\":\"test1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"12\"}]," - + "\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[],\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"]," - + "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":\"{}\"}"); + + "\"udfs\":\"\",\"sqlType\":\"0\",\"sendEmail\":false,\"displayRows\":10,\"title\":\"\"," + + "\"groupId\":null,\"localParams\":[{\"prop\":\"test1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"12\"}]," + + "\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[],\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"]," + + "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":\"{}\"}"); processService.changeOutParam(taskInstance); } @@ -647,65 +648,65 @@ public class ProcessServiceTest { public void testUpdateTaskDefinitionResources() throws Exception { TaskDefinition taskDefinition = new TaskDefinition(); String taskParameters = "{\n" - + " \"mainClass\": \"org.apache.dolphinscheduler.SparkTest\",\n" - + " \"mainJar\": {\n" - + " \"id\": 1\n" - + " },\n" - + " \"deployMode\": \"cluster\",\n" - + " \"resourceList\": [\n" - + " {\n" - + " \"id\": 3\n" - + " },\n" - + " {\n" - + " \"id\": 4\n" - + " }\n" - + " ],\n" - + " \"localParams\": [],\n" - + " \"driverCores\": 1,\n" - + " \"driverMemory\": \"512M\",\n" - + " \"numExecutors\": 2,\n" - + " \"executorMemory\": \"2G\",\n" - + " \"executorCores\": 2,\n" - + " \"appName\": \"\",\n" - + " \"mainArgs\": \"\",\n" - + " \"others\": \"\",\n" - + " \"programType\": \"JAVA\",\n" - + " \"sparkVersion\": \"SPARK2\",\n" - + " \"dependence\": {},\n" - + " \"conditionResult\": {\n" - + " \"successNode\": [\n" - + " \"\"\n" - + " ],\n" - + " \"failedNode\": [\n" - + " \"\"\n" - + " ]\n" - + " },\n" - + " \"waitStartTimeout\": {}\n" - + "}"; + + " \"mainClass\": \"org.apache.dolphinscheduler.SparkTest\",\n" + + " \"mainJar\": {\n" + + " \"id\": 1\n" + + " },\n" + + " \"deployMode\": \"cluster\",\n" + + " \"resourceList\": [\n" + + " {\n" + + " \"id\": 3\n" + + " },\n" + + " {\n" + + " \"id\": 4\n" + + " }\n" + + " ],\n" + + " \"localParams\": [],\n" + + " \"driverCores\": 1,\n" + + " \"driverMemory\": \"512M\",\n" + + " \"numExecutors\": 2,\n" + + " \"executorMemory\": \"2G\",\n" + + " \"executorCores\": 2,\n" + + " \"appName\": \"\",\n" + + " \"mainArgs\": \"\",\n" + + " \"others\": \"\",\n" + + " \"programType\": \"JAVA\",\n" + + " \"sparkVersion\": \"SPARK2\",\n" + + " \"dependence\": {},\n" + + " \"conditionResult\": {\n" + + " \"successNode\": [\n" + + " \"\"\n" + + " ],\n" + + " \"failedNode\": [\n" + + " \"\"\n" + + " ]\n" + + " },\n" + + " \"waitStartTimeout\": {}\n" + + "}"; taskDefinition.setTaskParams(taskParameters); Map resourceMap = - Stream.of(1, 3, 4) - .map(i -> { - Resource resource = new Resource(); - resource.setId(i); - resource.setFileName("file" + i); - resource.setFullName("/file" + i); - return resource; - }) - .collect( - Collectors.toMap( - Resource::getId, - resource -> resource) - ); + Stream.of(1, 3, 4) + .map(i -> { + Resource resource = new Resource(); + resource.setId(i); + resource.setFileName("file" + i); + resource.setFullName("/file" + i); + return resource; + }) + .collect( + Collectors.toMap( + Resource::getId, + resource -> resource) + ); for (Integer integer : Arrays.asList(1, 3, 4)) { Mockito.when(resourceMapper.selectById(integer)) - .thenReturn(resourceMap.get(integer)); + .thenReturn(resourceMap.get(integer)); } Whitebox.invokeMethod(processService, - "updateTaskDefinitionResources", - taskDefinition); + "updateTaskDefinitionResources", + taskDefinition); String taskParams = taskDefinition.getTaskParams(); SparkParameters sparkParameters = JSONUtils.parseObject(taskParams, SparkParameters.class); @@ -731,15 +732,15 @@ public class ProcessServiceTest { // test if input is null ResourceInfo resourceInfoNull = null; ResourceInfo updatedResourceInfo1 = Whitebox.invokeMethod(processService, - "updateResourceInfo", - resourceInfoNull); + "updateResourceInfo", + resourceInfoNull); Assert.assertNull(updatedResourceInfo1); // test if resource id less than 1 ResourceInfo resourceInfoVoid = new ResourceInfo(); ResourceInfo updatedResourceInfo2 = Whitebox.invokeMethod(processService, - "updateResourceInfo", - resourceInfoVoid); + "updateResourceInfo", + resourceInfoVoid); Assert.assertNull(updatedResourceInfo2); // test normal situation @@ -751,8 +752,8 @@ public class ProcessServiceTest { resource.setFullName("/test.txt"); Mockito.when(resourceMapper.selectById(1)).thenReturn(resource); ResourceInfo updatedResourceInfo3 = Whitebox.invokeMethod(processService, - "updateResourceInfo", - resourceInfoNormal); + "updateResourceInfo", + resourceInfoNormal); Assert.assertEquals(1, updatedResourceInfo3.getId()); Assert.assertEquals("test.txt", updatedResourceInfo3.getRes()); diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index c5a3b736a8..7de10ade22 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -16,7 +16,9 @@ avro-1.7.4.jar aws-java-sdk-1.7.4.jar bonecp-0.8.0.RELEASE.jar byte-buddy-1.9.16.jar +caffeine-2.9.2.jar checker-compat-qual-2.0.0.jar +checker-qual-3.10.0.jar classmate-1.5.1.jar clickhouse-jdbc-0.1.52.jar commons-email-1.5.jar @@ -202,6 +204,7 @@ spring-boot-starter-json-2.5.6.jar spring-boot-starter-logging-2.5.6.jar spring-boot-starter-quartz-2.5.6.jar spring-boot-starter-web-2.5.6.jar +spring-boot-starter-cache-2.5.6.jar spring-context-5.3.12.jar spring-context-support-5.3.12.jar spring-core-5.3.12.jar