diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AppConfiguration.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AppConfiguration.java index badc0a905f..5fc291d083 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AppConfiguration.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AppConfiguration.java @@ -19,17 +19,24 @@ package org.apache.dolphinscheduler.api.configuration; import org.apache.dolphinscheduler.api.interceptor.LocaleChangeInterceptor; import org.apache.dolphinscheduler.api.interceptor.LoginHandlerInterceptor; +import org.apache.dolphinscheduler.api.interceptor.RateLimitInterceptor; +import java.util.Locale; + +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.cors.CorsConfiguration; import org.springframework.web.cors.UrlBasedCorsConfigurationSource; import org.springframework.web.filter.CorsFilter; import org.springframework.web.servlet.LocaleResolver; -import org.springframework.web.servlet.config.annotation.*; +import org.springframework.web.servlet.config.annotation.ContentNegotiationConfigurer; +import org.springframework.web.servlet.config.annotation.InterceptorRegistry; +import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; +import org.springframework.web.servlet.config.annotation.ViewControllerRegistry; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; import org.springframework.web.servlet.i18n.CookieLocaleResolver; -import java.util.Locale; /** * application configuration @@ -43,6 +50,9 @@ public class AppConfiguration implements WebMvcConfigurer { public static final String PATH_PATTERN = "/**"; public static final String LOCALE_LANGUAGE_COOKIE = "language"; + @Autowired + private TrafficConfiguration trafficConfiguration; + @Bean public CorsFilter corsFilter() { CorsConfiguration config = new CorsConfiguration(); @@ -79,10 +89,18 @@ public class AppConfiguration implements WebMvcConfigurer { return new LocaleChangeInterceptor(); } + @Bean + public RateLimitInterceptor createRateLimitInterceptor() { + return new RateLimitInterceptor(trafficConfiguration); + } + @Override public void addInterceptors(InterceptorRegistry registry) { // i18n registry.addInterceptor(localeChangeInterceptor()); + if (trafficConfiguration.isTrafficGlobalControlSwitch() || trafficConfiguration.isTrafficTenantControlSwitch()) { + registry.addInterceptor(createRateLimitInterceptor()); + } registry.addInterceptor(loginInterceptor()) .addPathPatterns(LOGIN_INTERCEPTOR_PATH_PATTERN) .excludePathPatterns(LOGIN_PATH_PATTERN, REGISTER_PATH_PATTERN, diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/TrafficConfiguration.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/TrafficConfiguration.java new file mode 100644 index 0000000000..75a9c25410 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/TrafficConfiguration.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.configuration; + +import java.util.Map; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class TrafficConfiguration { + + @Value("${traffic.control.global.switch:false}") + private boolean trafficGlobalControlSwitch; + @Value("${traffic.control.max.global.qps.rate:300}") + private Integer maxGlobalQpsRate; + @Value("${traffic.control.tenant.switch:false}") + private boolean trafficTenantControlSwitch; + @Value("${traffic.control.default.tenant.qps.rate:10}") + private Integer defaultTenantQpsRate; + @Value("#{'${traffic.control.customize.tenant.qps.rate:}'.empty?null:'${traffic.control.customize.tenant.qps.rate:}'}") + private Map customizeTenantQpsRate; + + public boolean isTrafficGlobalControlSwitch() { + return trafficGlobalControlSwitch; + } + + public void setTrafficGlobalControlSwitch(boolean trafficGlobalControlSwitch) { + this.trafficGlobalControlSwitch = trafficGlobalControlSwitch; + } + + public Integer getMaxGlobalQpsRate() { + return maxGlobalQpsRate; + } + + public void setMaxGlobalQpsRate(Integer maxGlobalQpsRate) { + this.maxGlobalQpsRate = maxGlobalQpsRate; + } + + public boolean isTrafficTenantControlSwitch() { + return trafficTenantControlSwitch; + } + + public void setTrafficTenantControlSwitch(boolean trafficTenantControlSwitch) { + this.trafficTenantControlSwitch = trafficTenantControlSwitch; + } + + public Integer getDefaultTenantQpsRate() { + return defaultTenantQpsRate; + } + + public void setDefaultTenantQpsRate(Integer defaultTenantQpsRate) { + this.defaultTenantQpsRate = defaultTenantQpsRate; + } + + public Map getCustomizeTenantQpsRate() { + return customizeTenantQpsRate; + } + + public void setCustomizeTenantQpsRate(Map customizeTenantQpsRate) { + this.customizeTenantQpsRate = customizeTenantQpsRate; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptor.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptor.java new file mode 100644 index 0000000000..f8839a3560 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptor.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.interceptor; + +import org.apache.dolphinscheduler.api.configuration.TrafficConfiguration; +import org.apache.dolphinscheduler.common.utils.StringUtils; + +import org.apache.commons.collections.MapUtils; + +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.web.servlet.HandlerInterceptor; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.util.concurrent.RateLimiter; + +/** + * This interceptor is used to control the traffic, consists with global traffic control and tenant-leve traffic control. + * If the current coming tenant reaches his tenant-level request quota, his request will be reject fast. + * If the current system request number reaches the global request quota, all coming request will be reject fast. + */ +public class RateLimitInterceptor implements HandlerInterceptor { + + private static final Logger logger = LoggerFactory.getLogger(RateLimitInterceptor.class); + + private TrafficConfiguration trafficConfiguration; + + private RateLimiter globalRateLimiter; + + private LoadingCache tenantRateLimiterCache = CacheBuilder.newBuilder() + .maximumSize(100) + .expireAfterAccess(10, TimeUnit.MINUTES) + .build(new CacheLoader() { + @Override + public RateLimiter load(String token) { + // use tenant customize rate limit + Map customizeTenantQpsRate = trafficConfiguration.getCustomizeTenantQpsRate(); + int tenantQuota = trafficConfiguration.getDefaultTenantQpsRate(); + if (MapUtils.isNotEmpty(customizeTenantQpsRate)) { + tenantQuota = customizeTenantQpsRate.getOrDefault(token, trafficConfiguration.getDefaultTenantQpsRate()); + } + // use tenant default rate limit + return RateLimiter.create(tenantQuota, 1, TimeUnit.SECONDS); + } + }); + + @Override + public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws ExecutionException { + // tenant-level rate limit + if (trafficConfiguration.isTrafficTenantControlSwitch()) { + String token = request.getHeader("token"); + if (StringUtils.isNotEmpty(token)) { + RateLimiter tenantRateLimiter = tenantRateLimiterCache.get(token); + if (!tenantRateLimiter.tryAcquire()) { + response.setStatus(HttpStatus.TOO_MANY_REQUESTS.value()); + logger.warn("Too many request, reach tenant rate limit, current tenant:{} qps is {}", token, tenantRateLimiter.getRate()); + return false; + } + } + } + // global rate limit + if (trafficConfiguration.isTrafficGlobalControlSwitch()) { + if (!globalRateLimiter.tryAcquire()) { + response.setStatus(HttpStatus.TOO_MANY_REQUESTS.value()); + logger.warn("Too many request, reach global rate limit, current qps is {}", globalRateLimiter.getRate()); + return false; + } + } + return true; + } + + public RateLimitInterceptor(TrafficConfiguration trafficConfiguration) { + this.trafficConfiguration = trafficConfiguration; + if (trafficConfiguration.isTrafficGlobalControlSwitch()) { + this.globalRateLimiter = RateLimiter.create(trafficConfiguration.getMaxGlobalQpsRate(), 1, TimeUnit.SECONDS); + } + } + +} diff --git a/dolphinscheduler-api/src/main/resources/application-api.properties b/dolphinscheduler-api/src/main/resources/application-api.properties index f42588112b..619fb89f9d 100644 --- a/dolphinscheduler-api/src/main/resources/application-api.properties +++ b/dolphinscheduler-api/src/main/resources/application-api.properties @@ -47,6 +47,15 @@ spring.messages.basename=i18n/messages # Authentication types (supported types: PASSWORD) security.authentication.type=PASSWORD +# Traffic control, if you turn on this config, the maximum number of request/s will be limited. +# global max request number per second +# default tenant-level max request number +#traffic.control.global.switch=true +#traffic.control.max.global.qps.rate=500 +#traffic.control.tenant.switch=true +#traffic.control.default.tenant.qps.rate=10 +#traffic.control.customize.tenant.qps.rate={'tenant1':11,'tenant2':20} + #============================================================================ # LDAP Config # mock ldap server from https://www.forumsys.com/tutorials/integration-how-to/ldap/online-ldap-test-server/ diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/TrafficConfigurationTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/TrafficConfigurationTest.java new file mode 100644 index 0000000000..b5e9244186 --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/TrafficConfigurationTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.configuration; + +import org.apache.commons.collections.MapUtils; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +@RunWith(SpringRunner.class) +@SpringBootTest +public class TrafficConfigurationTest { + + @Autowired + private TrafficConfiguration trafficConfiguration; + + @Test + public void isTrafficGlobalControlSwitch() { + Assert.assertFalse(trafficConfiguration.isTrafficGlobalControlSwitch()); + } + + @Test + public void getMaxGlobalQpsLimit() { + Assert.assertEquals(300, (int) trafficConfiguration.getMaxGlobalQpsRate()); + } + + @Test + public void isTrafficTenantControlSwitch() { + Assert.assertFalse(trafficConfiguration.isTrafficTenantControlSwitch()); + } + + @Test + public void getDefaultTenantQpsLimit() { + Assert.assertEquals(10, (int) trafficConfiguration.getDefaultTenantQpsRate()); + } + + @Test + public void getCustomizeTenantQpsRate() { + Assert.assertTrue(MapUtils.isEmpty(trafficConfiguration.getCustomizeTenantQpsRate())); + } +} \ No newline at end of file diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptorTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptorTest.java new file mode 100644 index 0000000000..95ae3e3779 --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptorTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.interceptor; + +import org.apache.dolphinscheduler.api.configuration.TrafficConfiguration; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.modules.junit4.PowerMockRunner; + +@RunWith(PowerMockRunner.class) +public class RateLimitInterceptorTest { + + @Test + public void testPreHandleWithoutControl() throws ExecutionException { + HttpServletRequest request = Mockito.mock(HttpServletRequest.class); + HttpServletResponse response = Mockito.mock(HttpServletResponse.class); + RateLimitInterceptor rateLimitInterceptor = new RateLimitInterceptor(new TrafficConfiguration()); + Assert.assertTrue(rateLimitInterceptor.preHandle(request, response, null)); + Assert.assertTrue(rateLimitInterceptor.preHandle(request, response, null)); + } + + @Test + public void testPreHandleWithTenantLevenControl() throws ExecutionException { + TrafficConfiguration trafficConfiguration = new TrafficConfiguration(); + trafficConfiguration.setTrafficTenantControlSwitch(true); + Map map = new HashMap<>(); + map.put("tenant1", 2); + map.put("tenant2", 2); + trafficConfiguration.setCustomizeTenantQpsRate(map); + trafficConfiguration.setDefaultTenantQpsRate(4); + RateLimitInterceptor rateLimitInterceptor = new RateLimitInterceptor(trafficConfiguration); + + HttpServletRequest tenant1Request = Mockito.mock(HttpServletRequest.class); + HttpServletRequest tenant2Request = Mockito.mock(HttpServletRequest.class); + PowerMockito.when(tenant1Request.getHeader(Mockito.any())).thenReturn("tenant1"); + PowerMockito.when(tenant2Request.getHeader(Mockito.any())).thenReturn("tenant2"); + HttpServletResponse response = Mockito.mock(HttpServletResponse.class); + + for (int i = 0; i < 2; i++) { + rateLimitInterceptor.preHandle(tenant1Request, response, null); + } + Assert.assertFalse(rateLimitInterceptor.preHandle(tenant1Request, response, null)); + Assert.assertTrue(rateLimitInterceptor.preHandle(tenant2Request, response, null)); + } + + @Test + public void testPreHandleWithGlobalControl() throws ExecutionException { + TrafficConfiguration trafficConfiguration = new TrafficConfiguration(); + trafficConfiguration.setTrafficTenantControlSwitch(true); + trafficConfiguration.setTrafficGlobalControlSwitch(true); + trafficConfiguration.setMaxGlobalQpsRate(3); + + RateLimitInterceptor rateLimitInterceptor = new RateLimitInterceptor(trafficConfiguration); + HttpServletRequest request = Mockito.mock(HttpServletRequest.class); + HttpServletResponse response = Mockito.mock(HttpServletResponse.class); + + for (int i = 0; i < 2; i++) { + rateLimitInterceptor.preHandle(request, response, null); + } + Assert.assertFalse(rateLimitInterceptor.preHandle(request, response, null)); + } + +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 5842dcec4b..dbd33dfeb7 100644 --- a/pom.xml +++ b/pom.xml @@ -795,6 +795,7 @@ ${maven-surefire-plugin.version} + **/api/configuration/TrafficConfigurationTest.java **/api/controller/ProcessDefinitionControllerTest.java **/api/controller/TenantControllerTest.java **/api/dto/resources/filter/ResourceFilterTest.java @@ -805,6 +806,7 @@ **/api/exceptions/ServiceExceptionTest.java **/api/interceptor/LocaleChangeInterceptorTest.java **/api/interceptor/LoginHandlerInterceptorTest.java + **/api/interceptor/RateLimitInterceptorTest.java **/api/security/impl/pwd/PasswordAuthenticatorTest.java **/api/security/impl/ldap/LdapAuthenticatorTest.java **/api/security/SecurityConfigLDAPTest.java