支持接口限流,优化代码,引入异步, 提高性能
This commit is contained in:
parent
220ec74d5a
commit
7fbb842597
@ -65,6 +65,10 @@
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-pool2</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>cn.hutool</groupId>
|
||||
<artifactId>hutool-core</artifactId>
|
||||
</dependency>
|
||||
<!--
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
|
@ -13,11 +13,11 @@ import io.vertx.ext.web.RoutingContext;
|
||||
@VertxGen
|
||||
public interface RateLimitHandler extends Handler<RoutingContext> {
|
||||
|
||||
static RateLimitHandler create(String instance) {
|
||||
static RateLimitHandler create(String instance, int pattern) {
|
||||
switch (instance) {
|
||||
case "redis":
|
||||
RedisRateLimiter redisRateLimiter = new RedisRateLimiter();
|
||||
return new RateLimitHandlerRedisImpl(redisRateLimiter);
|
||||
return new RateLimitHandlerRedisImpl(redisRateLimiter, pattern);
|
||||
default:
|
||||
// 本地缓存
|
||||
return null;
|
||||
|
@ -17,9 +17,11 @@ import lombok.extern.slf4j.Slf4j;
|
||||
@Slf4j
|
||||
public class RateLimitHandlerRedisImpl implements RateLimitHandler {
|
||||
private RedisRateLimiter rateLimiter;
|
||||
private int pattern;// 1:app,2:接口默认,3:服务接口配置限流策略
|
||||
|
||||
public RateLimitHandlerRedisImpl(RedisRateLimiter rateLimiter) {
|
||||
public RateLimitHandlerRedisImpl(RedisRateLimiter rateLimiter, int pattern) {
|
||||
this.rateLimiter = rateLimiter;
|
||||
this.pattern = pattern;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -30,7 +32,7 @@ public class RateLimitHandlerRedisImpl implements RateLimitHandler {
|
||||
// rc.next();
|
||||
// return;
|
||||
// 获取模式
|
||||
int pattern = 1; // 1:app,2:接口默认,3:服务接口配置限流策略
|
||||
// TODO redis连不上, 切换到内存模式
|
||||
if (rateLimiter.acquire(rc, pattern)) {
|
||||
log.info("rateLimiter.acquire true");
|
||||
rc.next();
|
||||
@ -38,14 +40,15 @@ public class RateLimitHandlerRedisImpl implements RateLimitHandler {
|
||||
} else {
|
||||
switch (pattern) {
|
||||
case 1:
|
||||
AppCurrentLimitingConfig appCurrentLimitingConfig = AppConfigServiceImpl
|
||||
.getAppCurrentLimitingConfig(sacAppHeaderKey);
|
||||
AppCurrentLimitingConfig appCurrentLimitingConfig = AppConfigServiceImpl.getAppCurrentLimitingConfig(sacAppHeaderKey);
|
||||
rc.fail(new HttpException(429, appCurrentLimitingConfig.getDefaultResponse()));
|
||||
return;
|
||||
// JsonObject dataJson = new JsonObject(appCurrentLimitingConfig.getDefaultResponse());
|
||||
// rc.response().setChunked(true).setStatusCode(429).putHeader("Content-Type", "application/json").end(dataJson.toBuffer());
|
||||
// return;
|
||||
default:
|
||||
AppCurrentLimitingConfig appCurrentLimitingConfig1 = AppConfigServiceImpl.getAppCurrentLimitingConfig(sacAppHeaderKey);
|
||||
rc.fail(new HttpException(430, appCurrentLimitingConfig1.getDefaultResponse()));
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -4,12 +4,14 @@ import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
|
||||
import com.sf.vertx.api.pojo.ApiCurrentLimitingConfig;
|
||||
import com.sf.vertx.api.pojo.AppCurrentLimitingConfig;
|
||||
import com.sf.vertx.constans.RedisKeyConfig;
|
||||
import com.sf.vertx.init.DynamicBuildServer;
|
||||
import com.sf.vertx.service.impl.AppConfigServiceImpl;
|
||||
import com.sf.vertx.utils.SpringUtils;
|
||||
|
||||
import cn.hutool.core.thread.ThreadUtil;
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@ -17,37 +19,54 @@ import lombok.extern.slf4j.Slf4j;
|
||||
public class RedisRateLimiter {
|
||||
public Boolean acquire(RoutingContext rc, int pattern) {
|
||||
String appCode = rc.request().headers().get(DynamicBuildServer.SAC_APP_HEADER_KEY);
|
||||
//TODO 先测试app模式,后面通过app缓存获取模式
|
||||
switch (pattern) {
|
||||
case 1:
|
||||
String key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY +":"+ appCode;
|
||||
return rateLimiter(key, appCode);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
// TODO 先测试app模式,后面通过app缓存获取模式
|
||||
String key = null;
|
||||
switch (pattern) {
|
||||
case 1:
|
||||
AppCurrentLimitingConfig appCurrentLimitingConfig = AppConfigServiceImpl.getAppCurrentLimitingConfig(appCode);
|
||||
key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCode;
|
||||
return rateLimiter(key, appCode, appCurrentLimitingConfig.getThreshold(), appCurrentLimitingConfig.getTimeWindow());
|
||||
case 2:
|
||||
ApiCurrentLimitingConfig apiCurrentLimitingConfig = AppConfigServiceImpl.getApiCurrentLimitingConfig(appCode);
|
||||
key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCode + ":" + rc.request().uri() + ":"
|
||||
+ rc.request().method();
|
||||
return rateLimiter(key, appCode, apiCurrentLimitingConfig.getThreshold(), apiCurrentLimitingConfig.getTimeWindow());
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Boolean rateLimiter(String key, String appCode) {
|
||||
RedisTemplate<String, Integer> redisTemplate = SpringUtils.getBean("redisTemplate", RedisTemplate.class);
|
||||
AppCurrentLimitingConfig appCurrentLimitingConfig = AppConfigServiceImpl.getAppCurrentLimitingConfig(appCode);
|
||||
Object count = redisTemplate.opsForValue().get(key);
|
||||
//redisTemplate.delete(key);
|
||||
log.info("count:{}", count);
|
||||
if(count == null) {
|
||||
private Boolean rateLimiter(String key, String appCode, Integer threshold, Integer timeWindow) {
|
||||
RedisTemplate<String, Integer> redisTemplate = SpringUtils.getBean("redisTemplate", RedisTemplate.class);
|
||||
Object count = redisTemplate.opsForValue().get(key);
|
||||
// redisTemplate.delete(key);
|
||||
log.info("count:{}", count);
|
||||
if (count == null) {
|
||||
// 初始化,设置过期时间
|
||||
redisTemplate.opsForValue().increment(key);
|
||||
log.info("redis app threshold: {}", redisTemplate.opsForValue().get(key));
|
||||
redisTemplate.expire(key, appCurrentLimitingConfig.getTimeWindow(), TimeUnit.SECONDS);
|
||||
} else if(Integer.valueOf(count.toString()) < appCurrentLimitingConfig.getThreshold()) {
|
||||
redisTemplate.opsForValue().increment(key);
|
||||
ThreadUtil.execAsync(() -> {
|
||||
add(timeWindow, redisTemplate, key);
|
||||
});
|
||||
} else if (Integer.valueOf(count.toString()) < threshold) {
|
||||
ThreadUtil.execAsync(() -> {
|
||||
increment(redisTemplate, key);
|
||||
});
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void add(Integer timeWindow, RedisTemplate<String, Integer> redisTemplate,
|
||||
String key) {
|
||||
redisTemplate.opsForValue().increment(key);
|
||||
log.info("redis app threshold: {}", redisTemplate.opsForValue().get(key));
|
||||
redisTemplate.expire(key, timeWindow, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private void increment(RedisTemplate<String, Integer> redisTemplate, String key) {
|
||||
redisTemplate.opsForValue().increment(key);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -128,9 +128,11 @@ public class DynamicBuildServer implements ApplicationRunner {
|
||||
|
||||
// TODO 实例化方式 从 VertxConfig 读取
|
||||
String instance = "redis";
|
||||
mainHttpRouter.route().handler(RateLimitHandler.create(instance)).handler(BodyHandler.create())
|
||||
// 需要从配置拿,先测功能
|
||||
int pattern = 2;// 1:app,2:接口默认,3:服务接口配置限流策略
|
||||
mainHttpRouter.route().handler(RateLimitHandler.create(instance, pattern)).handler(BodyHandler.create())
|
||||
.handler(ProxyHandler.create(mainWebClient, proxy)).failureHandler(RestfulFailureHandler.create());
|
||||
// mainHttpRouter.route().handler(BodyHandler.create()).handler(ProxyHandler.create(mainWebClient,proxy));
|
||||
//mainHttpRouter.route().handler(BodyHandler.create()).handler(ProxyHandler.create(mainWebClient,proxy));
|
||||
}
|
||||
|
||||
public SocketAddress resolveOriginAddress(ConcurrentHashMap<String, AppConfig> cacheAppConfig,
|
||||
|
@ -12,6 +12,7 @@ import org.springframework.stereotype.Service;
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.alibaba.fastjson2.TypeReference;
|
||||
import com.sf.vertx.api.pojo.ApiCurrentLimitingConfig;
|
||||
import com.sf.vertx.api.pojo.AppConfig;
|
||||
import com.sf.vertx.api.pojo.AppCurrentLimitingConfig;
|
||||
import com.sf.vertx.api.pojo.VertxConfig;
|
||||
@ -29,6 +30,7 @@ public class AppConfigServiceImpl implements AppConfigService {
|
||||
private RedisTemplate<String, String> redisTemplate;
|
||||
private static final ConcurrentHashMap<String, AppConfig> CACHE_APP_CONFIG = new ConcurrentHashMap<>();
|
||||
private static final ConcurrentHashMap<String, AppCurrentLimitingConfig> CACHE_APP_CURRENT_LIMITING_CONFIG = new ConcurrentHashMap<>();
|
||||
private static final ConcurrentHashMap<String, ApiCurrentLimitingConfig> CACHE_API_CURRENT_LIMITING_CONFIG = new ConcurrentHashMap<>();
|
||||
|
||||
public static boolean appDataSecurity(String appCode) {
|
||||
return CACHE_APP_CONFIG.get(appCode) != null && CACHE_APP_CONFIG.get(appCode).getDataSecurity() != null ? true
|
||||
@ -38,6 +40,10 @@ public class AppConfigServiceImpl implements AppConfigService {
|
||||
public static AppCurrentLimitingConfig getAppCurrentLimitingConfig(String appCode) {
|
||||
return CACHE_APP_CURRENT_LIMITING_CONFIG.get(appCode);
|
||||
}
|
||||
|
||||
public static ApiCurrentLimitingConfig getApiCurrentLimitingConfig(String appCode) {
|
||||
return CACHE_API_CURRENT_LIMITING_CONFIG.get(appCode);
|
||||
}
|
||||
|
||||
/***
|
||||
* 从redis加载数据
|
||||
@ -56,6 +62,14 @@ public class AppConfigServiceImpl implements AppConfigService {
|
||||
}
|
||||
|
||||
// TODO 限流
|
||||
ApiCurrentLimitingConfig apiCurrentLimitingConfig = new ApiCurrentLimitingConfig();
|
||||
apiCurrentLimitingConfig.setThreshold(1);
|
||||
apiCurrentLimitingConfig.setTimeWindow(10);
|
||||
apiCurrentLimitingConfig.setDefaultResponse(
|
||||
"{\n" + " \"msg\": \"接口繁忙请重试\",\n" + " \"code\": 501,\n" + " \"data\": \"到达限流阈值\"\n" + "}");
|
||||
CACHE_API_CURRENT_LIMITING_CONFIG.put("dsafdsfadafhappd", apiCurrentLimitingConfig);
|
||||
CACHE_API_CURRENT_LIMITING_CONFIG.put("dsafdsfadafhappC", apiCurrentLimitingConfig);
|
||||
|
||||
AppCurrentLimitingConfig appCurrentLimitingConfig = new AppCurrentLimitingConfig();
|
||||
appCurrentLimitingConfig.setThreshold(1);
|
||||
appCurrentLimitingConfig.setTimeWindow(10);
|
||||
@ -63,6 +77,7 @@ public class AppConfigServiceImpl implements AppConfigService {
|
||||
"{\n" + " \"msg\": \"接口繁忙请重试\",\n" + " \"code\": 501,\n" + " \"data\": \"到达限流阈值\"\n" + "}");
|
||||
CACHE_APP_CURRENT_LIMITING_CONFIG.put("dsafdsfadafhappd", appCurrentLimitingConfig);
|
||||
CACHE_APP_CURRENT_LIMITING_CONFIG.put("dsafdsfadafhappC", appCurrentLimitingConfig);
|
||||
|
||||
log.info("cacheAppConfig:{}", JSON.toJSONString(CACHE_APP_CONFIG));
|
||||
|
||||
return CACHE_APP_CONFIG;
|
||||
|
Loading…
x
Reference in New Issue
Block a user