wenjun
4 years ago
committed by
GitHub
7 changed files with 362 additions and 2 deletions
@ -0,0 +1,78 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.api.configuration; |
||||||
|
|
||||||
|
import java.util.Map; |
||||||
|
|
||||||
|
import org.springframework.beans.factory.annotation.Value; |
||||||
|
import org.springframework.context.annotation.Configuration; |
||||||
|
|
||||||
|
@Configuration |
||||||
|
public class TrafficConfiguration { |
||||||
|
|
||||||
|
@Value("${traffic.control.global.switch:false}") |
||||||
|
private boolean trafficGlobalControlSwitch; |
||||||
|
@Value("${traffic.control.max.global.qps.rate:300}") |
||||||
|
private Integer maxGlobalQpsRate; |
||||||
|
@Value("${traffic.control.tenant.switch:false}") |
||||||
|
private boolean trafficTenantControlSwitch; |
||||||
|
@Value("${traffic.control.default.tenant.qps.rate:10}") |
||||||
|
private Integer defaultTenantQpsRate; |
||||||
|
@Value("#{'${traffic.control.customize.tenant.qps.rate:}'.empty?null:'${traffic.control.customize.tenant.qps.rate:}'}") |
||||||
|
private Map<String, Integer> customizeTenantQpsRate; |
||||||
|
|
||||||
|
public boolean isTrafficGlobalControlSwitch() { |
||||||
|
return trafficGlobalControlSwitch; |
||||||
|
} |
||||||
|
|
||||||
|
public void setTrafficGlobalControlSwitch(boolean trafficGlobalControlSwitch) { |
||||||
|
this.trafficGlobalControlSwitch = trafficGlobalControlSwitch; |
||||||
|
} |
||||||
|
|
||||||
|
public Integer getMaxGlobalQpsRate() { |
||||||
|
return maxGlobalQpsRate; |
||||||
|
} |
||||||
|
|
||||||
|
public void setMaxGlobalQpsRate(Integer maxGlobalQpsRate) { |
||||||
|
this.maxGlobalQpsRate = maxGlobalQpsRate; |
||||||
|
} |
||||||
|
|
||||||
|
public boolean isTrafficTenantControlSwitch() { |
||||||
|
return trafficTenantControlSwitch; |
||||||
|
} |
||||||
|
|
||||||
|
public void setTrafficTenantControlSwitch(boolean trafficTenantControlSwitch) { |
||||||
|
this.trafficTenantControlSwitch = trafficTenantControlSwitch; |
||||||
|
} |
||||||
|
|
||||||
|
public Integer getDefaultTenantQpsRate() { |
||||||
|
return defaultTenantQpsRate; |
||||||
|
} |
||||||
|
|
||||||
|
public void setDefaultTenantQpsRate(Integer defaultTenantQpsRate) { |
||||||
|
this.defaultTenantQpsRate = defaultTenantQpsRate; |
||||||
|
} |
||||||
|
|
||||||
|
public Map<String, Integer> getCustomizeTenantQpsRate() { |
||||||
|
return customizeTenantQpsRate; |
||||||
|
} |
||||||
|
|
||||||
|
public void setCustomizeTenantQpsRate(Map<String, Integer> customizeTenantQpsRate) { |
||||||
|
this.customizeTenantQpsRate = customizeTenantQpsRate; |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,104 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.api.interceptor; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.api.configuration.TrafficConfiguration; |
||||||
|
import org.apache.dolphinscheduler.common.utils.StringUtils; |
||||||
|
|
||||||
|
import org.apache.commons.collections.MapUtils; |
||||||
|
|
||||||
|
import java.util.Map; |
||||||
|
import java.util.concurrent.ExecutionException; |
||||||
|
import java.util.concurrent.TimeUnit; |
||||||
|
|
||||||
|
import javax.servlet.http.HttpServletRequest; |
||||||
|
import javax.servlet.http.HttpServletResponse; |
||||||
|
|
||||||
|
import org.slf4j.Logger; |
||||||
|
import org.slf4j.LoggerFactory; |
||||||
|
import org.springframework.http.HttpStatus; |
||||||
|
import org.springframework.web.servlet.HandlerInterceptor; |
||||||
|
|
||||||
|
import com.google.common.cache.CacheBuilder; |
||||||
|
import com.google.common.cache.CacheLoader; |
||||||
|
import com.google.common.cache.LoadingCache; |
||||||
|
import com.google.common.util.concurrent.RateLimiter; |
||||||
|
|
||||||
|
/** |
||||||
|
* This interceptor is used to control the traffic, consists with global traffic control and tenant-leve traffic control. |
||||||
|
* If the current coming tenant reaches his tenant-level request quota, his request will be reject fast. |
||||||
|
* If the current system request number reaches the global request quota, all coming request will be reject fast. |
||||||
|
*/ |
||||||
|
public class RateLimitInterceptor implements HandlerInterceptor { |
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(RateLimitInterceptor.class); |
||||||
|
|
||||||
|
private TrafficConfiguration trafficConfiguration; |
||||||
|
|
||||||
|
private RateLimiter globalRateLimiter; |
||||||
|
|
||||||
|
private LoadingCache<String, RateLimiter> tenantRateLimiterCache = CacheBuilder.newBuilder() |
||||||
|
.maximumSize(100) |
||||||
|
.expireAfterAccess(10, TimeUnit.MINUTES) |
||||||
|
.build(new CacheLoader<String, RateLimiter>() { |
||||||
|
@Override |
||||||
|
public RateLimiter load(String token) { |
||||||
|
// use tenant customize rate limit
|
||||||
|
Map<String, Integer> customizeTenantQpsRate = trafficConfiguration.getCustomizeTenantQpsRate(); |
||||||
|
int tenantQuota = trafficConfiguration.getDefaultTenantQpsRate(); |
||||||
|
if (MapUtils.isNotEmpty(customizeTenantQpsRate)) { |
||||||
|
tenantQuota = customizeTenantQpsRate.getOrDefault(token, trafficConfiguration.getDefaultTenantQpsRate()); |
||||||
|
} |
||||||
|
// use tenant default rate limit
|
||||||
|
return RateLimiter.create(tenantQuota, 1, TimeUnit.SECONDS); |
||||||
|
} |
||||||
|
}); |
||||||
|
|
||||||
|
@Override |
||||||
|
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws ExecutionException { |
||||||
|
// tenant-level rate limit
|
||||||
|
if (trafficConfiguration.isTrafficTenantControlSwitch()) { |
||||||
|
String token = request.getHeader("token"); |
||||||
|
if (StringUtils.isNotEmpty(token)) { |
||||||
|
RateLimiter tenantRateLimiter = tenantRateLimiterCache.get(token); |
||||||
|
if (!tenantRateLimiter.tryAcquire()) { |
||||||
|
response.setStatus(HttpStatus.TOO_MANY_REQUESTS.value()); |
||||||
|
logger.warn("Too many request, reach tenant rate limit, current tenant:{} qps is {}", token, tenantRateLimiter.getRate()); |
||||||
|
return false; |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
// global rate limit
|
||||||
|
if (trafficConfiguration.isTrafficGlobalControlSwitch()) { |
||||||
|
if (!globalRateLimiter.tryAcquire()) { |
||||||
|
response.setStatus(HttpStatus.TOO_MANY_REQUESTS.value()); |
||||||
|
logger.warn("Too many request, reach global rate limit, current qps is {}", globalRateLimiter.getRate()); |
||||||
|
return false; |
||||||
|
} |
||||||
|
} |
||||||
|
return true; |
||||||
|
} |
||||||
|
|
||||||
|
public RateLimitInterceptor(TrafficConfiguration trafficConfiguration) { |
||||||
|
this.trafficConfiguration = trafficConfiguration; |
||||||
|
if (trafficConfiguration.isTrafficGlobalControlSwitch()) { |
||||||
|
this.globalRateLimiter = RateLimiter.create(trafficConfiguration.getMaxGlobalQpsRate(), 1, TimeUnit.SECONDS); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
} |
@ -0,0 +1,60 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.api.configuration; |
||||||
|
|
||||||
|
import org.apache.commons.collections.MapUtils; |
||||||
|
|
||||||
|
import org.junit.Assert; |
||||||
|
import org.junit.Test; |
||||||
|
import org.junit.runner.RunWith; |
||||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||||
|
import org.springframework.boot.test.context.SpringBootTest; |
||||||
|
import org.springframework.test.context.junit4.SpringRunner; |
||||||
|
|
||||||
|
@RunWith(SpringRunner.class) |
||||||
|
@SpringBootTest |
||||||
|
public class TrafficConfigurationTest { |
||||||
|
|
||||||
|
@Autowired |
||||||
|
private TrafficConfiguration trafficConfiguration; |
||||||
|
|
||||||
|
@Test |
||||||
|
public void isTrafficGlobalControlSwitch() { |
||||||
|
Assert.assertFalse(trafficConfiguration.isTrafficGlobalControlSwitch()); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void getMaxGlobalQpsLimit() { |
||||||
|
Assert.assertEquals(300, (int) trafficConfiguration.getMaxGlobalQpsRate()); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void isTrafficTenantControlSwitch() { |
||||||
|
Assert.assertFalse(trafficConfiguration.isTrafficTenantControlSwitch()); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void getDefaultTenantQpsLimit() { |
||||||
|
Assert.assertEquals(10, (int) trafficConfiguration.getDefaultTenantQpsRate()); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void getCustomizeTenantQpsRate() { |
||||||
|
Assert.assertTrue(MapUtils.isEmpty(trafficConfiguration.getCustomizeTenantQpsRate())); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,89 @@ |
|||||||
|
/* |
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||||
|
* contributor license agreements. See the NOTICE file distributed with |
||||||
|
* this work for additional information regarding copyright ownership. |
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||||
|
* (the "License"); you may not use this file except in compliance with |
||||||
|
* the License. You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
*/ |
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.api.interceptor; |
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.api.configuration.TrafficConfiguration; |
||||||
|
|
||||||
|
import java.util.HashMap; |
||||||
|
import java.util.Map; |
||||||
|
import java.util.concurrent.ExecutionException; |
||||||
|
|
||||||
|
import javax.servlet.http.HttpServletRequest; |
||||||
|
import javax.servlet.http.HttpServletResponse; |
||||||
|
|
||||||
|
import org.junit.Assert; |
||||||
|
import org.junit.Test; |
||||||
|
import org.junit.runner.RunWith; |
||||||
|
import org.mockito.Mockito; |
||||||
|
import org.powermock.api.mockito.PowerMockito; |
||||||
|
import org.powermock.modules.junit4.PowerMockRunner; |
||||||
|
|
||||||
|
@RunWith(PowerMockRunner.class) |
||||||
|
public class RateLimitInterceptorTest { |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testPreHandleWithoutControl() throws ExecutionException { |
||||||
|
HttpServletRequest request = Mockito.mock(HttpServletRequest.class); |
||||||
|
HttpServletResponse response = Mockito.mock(HttpServletResponse.class); |
||||||
|
RateLimitInterceptor rateLimitInterceptor = new RateLimitInterceptor(new TrafficConfiguration()); |
||||||
|
Assert.assertTrue(rateLimitInterceptor.preHandle(request, response, null)); |
||||||
|
Assert.assertTrue(rateLimitInterceptor.preHandle(request, response, null)); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testPreHandleWithTenantLevenControl() throws ExecutionException { |
||||||
|
TrafficConfiguration trafficConfiguration = new TrafficConfiguration(); |
||||||
|
trafficConfiguration.setTrafficTenantControlSwitch(true); |
||||||
|
Map<String, Integer> map = new HashMap<>(); |
||||||
|
map.put("tenant1", 2); |
||||||
|
map.put("tenant2", 2); |
||||||
|
trafficConfiguration.setCustomizeTenantQpsRate(map); |
||||||
|
trafficConfiguration.setDefaultTenantQpsRate(4); |
||||||
|
RateLimitInterceptor rateLimitInterceptor = new RateLimitInterceptor(trafficConfiguration); |
||||||
|
|
||||||
|
HttpServletRequest tenant1Request = Mockito.mock(HttpServletRequest.class); |
||||||
|
HttpServletRequest tenant2Request = Mockito.mock(HttpServletRequest.class); |
||||||
|
PowerMockito.when(tenant1Request.getHeader(Mockito.any())).thenReturn("tenant1"); |
||||||
|
PowerMockito.when(tenant2Request.getHeader(Mockito.any())).thenReturn("tenant2"); |
||||||
|
HttpServletResponse response = Mockito.mock(HttpServletResponse.class); |
||||||
|
|
||||||
|
for (int i = 0; i < 2; i++) { |
||||||
|
rateLimitInterceptor.preHandle(tenant1Request, response, null); |
||||||
|
} |
||||||
|
Assert.assertFalse(rateLimitInterceptor.preHandle(tenant1Request, response, null)); |
||||||
|
Assert.assertTrue(rateLimitInterceptor.preHandle(tenant2Request, response, null)); |
||||||
|
} |
||||||
|
|
||||||
|
@Test |
||||||
|
public void testPreHandleWithGlobalControl() throws ExecutionException { |
||||||
|
TrafficConfiguration trafficConfiguration = new TrafficConfiguration(); |
||||||
|
trafficConfiguration.setTrafficTenantControlSwitch(true); |
||||||
|
trafficConfiguration.setTrafficGlobalControlSwitch(true); |
||||||
|
trafficConfiguration.setMaxGlobalQpsRate(3); |
||||||
|
|
||||||
|
RateLimitInterceptor rateLimitInterceptor = new RateLimitInterceptor(trafficConfiguration); |
||||||
|
HttpServletRequest request = Mockito.mock(HttpServletRequest.class); |
||||||
|
HttpServletResponse response = Mockito.mock(HttpServletResponse.class); |
||||||
|
|
||||||
|
for (int i = 0; i < 2; i++) { |
||||||
|
rateLimitInterceptor.preHandle(request, response, null); |
||||||
|
} |
||||||
|
Assert.assertFalse(rateLimitInterceptor.preHandle(request, response, null)); |
||||||
|
} |
||||||
|
|
||||||
|
} |
Loading…
Reference in new issue