app、api限流改为resilience4j

This commit is contained in:
ztzh_xieyun 2024-05-09 18:01:15 +08:00
parent 5548ee6c44
commit 135f4a0b8f
12 changed files with 434 additions and 205 deletions

View File

@ -15,10 +15,18 @@
<properties> <properties>
<vertx.version>4.5.7</vertx.version> <vertx.version>4.5.7</vertx.version>
<hystrix.version>1.5.2</hystrix.version> <hystrix.version>1.5.2</hystrix.version>
<resilience4j.version>2.2.0</resilience4j.version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bom</artifactId>
<version>${resilience4j.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency> <dependency>
<groupId>io.vertx</groupId> <groupId>io.vertx</groupId>
<artifactId>vertx-stack-depchain</artifactId> <artifactId>vertx-stack-depchain</artifactId>
@ -206,8 +214,15 @@
<dependency> <dependency>
<groupId>com.github.ben-manes.caffeine</groupId> <groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId> <artifactId>caffeine</artifactId>
<version>2.6.2</version>
</dependency> </dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-ratelimiter</artifactId>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bulkhead</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -2,7 +2,6 @@ package com.sf.vertx.handle;
import io.vertx.codegen.annotations.VertxGen; import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.RoutingContext;
/*** /***
@ -11,16 +10,16 @@ import io.vertx.ext.web.RoutingContext;
* *
*/ */
@VertxGen @VertxGen
public interface RateLimitHandler extends Handler<RoutingContext> { public interface ApiRateLimitHandler extends Handler<RoutingContext> {
static RateLimitHandler create(String instance) { static ApiRateLimitHandler create(String instance) {
switch (instance) { switch (instance) {
case "redis": case "redis":
RedisRateLimiter redisRateLimiter = new RedisRateLimiter(); //RedisRateLimiter redisRateLimiter = new RedisRateLimiter();
return new RateLimitHandlerRedisImpl(redisRateLimiter); //return new RateLimitHandlerRedisImpl(redisRateLimiter);
default: default:
// 本地缓存 // 本地缓存
return null; return new ApiRateLimitHandlerImpl();
} }
} }

View File

@ -0,0 +1,47 @@
package com.sf.vertx.handle;
import com.sf.vertx.constans.RedisKeyConfig;
import com.sf.vertx.pojo.SacCurrentLimiting;
import io.github.resilience4j.core.functions.CheckedRunnable;
import io.github.resilience4j.ratelimiter.RateLimiter;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.HttpException;
/***
* 内存存储
*
* @author xy
*
*/
public class ApiRateLimitHandlerImpl implements ApiRateLimitHandler {
@Override
public void handle(RoutingContext rc) {
String appCode = rc.request().headers().get(AppConfigHandle.getAppCodeHeaderKey());
String apiCode = rc.request().headers().get(AppConfigHandle.getApiCodeHeaderKey());
SacCurrentLimiting currentLimiting = AppConfigHandle.getApiCurrentLimiting(appCode, apiCode);
if(currentLimiting != null) {
String key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCode + ":" + apiCode + ":" + rc.request().uri()
+ ":" + rc.request().method();
RateLimiter rateLimiter = currentLimiting.getRegistry().rateLimiter(key);
CheckedRunnable restrictedCall = RateLimiter.decorateCheckedRunnable(rateLimiter,
() -> {
rc.next();
return;
});
try {
restrictedCall.run();
} catch (Throwable t) {
t.printStackTrace();
rc.fail(new HttpException(10015, currentLimiting.getStrategy().getDefaultResponse()));
return;
}
} else {
rc.next();
return;
}
}
}

View File

@ -1,5 +1,6 @@
package com.sf.vertx.handle; package com.sf.vertx.handle;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -20,9 +21,12 @@ import com.sf.vertx.api.pojo.Strategy;
import com.sf.vertx.api.pojo.VertxConfig; import com.sf.vertx.api.pojo.VertxConfig;
import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing; import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing;
import com.sf.vertx.constans.RedisKeyConfig; import com.sf.vertx.constans.RedisKeyConfig;
import com.sf.vertx.pojo.SacCurrentLimiting;
import com.sf.vertx.utils.ProxyTool; import com.sf.vertx.utils.ProxyTool;
import cn.hutool.core.collection.ConcurrentHashSet; import cn.hutool.core.collection.ConcurrentHashSet;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.github.resilience4j.ratelimiter.RateLimiterRegistry;
import io.vertx.circuitbreaker.CircuitBreaker; import io.vertx.circuitbreaker.CircuitBreaker;
import io.vertx.circuitbreaker.CircuitBreakerOptions; import io.vertx.circuitbreaker.CircuitBreakerOptions;
import io.vertx.core.Vertx; import io.vertx.core.Vertx;
@ -42,59 +46,71 @@ public class AppConfigHandle {
public static CircuitBreaker CONNECTION_CIRCUIT_BREAKER; public static CircuitBreaker CONNECTION_CIRCUIT_BREAKER;
// global cache app config // global cache app config
private static final ConcurrentHashMap<String, AppConfig> CACHE_APP_CONFIG_MAP = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<String, AppConfig> CACHE_APP_CONFIG_MAP = new ConcurrentHashMap<>();
// global api config appCode - RateLimiterRegistry
private static final ConcurrentHashMap<String, SacCurrentLimiting> GLOBAL_API_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>();
// global app config appCode - Strategy // global app config appCode - Strategy
private static final ConcurrentHashMap<String, Strategy> GLOBAL_APP_CURRENT_LIMITING_CONFIG_MAP = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<String, SacCurrentLimiting> GLOBAL_APP_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>();
// global api config appCode - Strategy
private static final ConcurrentHashMap<String, Strategy> GLOBAL_API_CURRENT_LIMITING_CONFIG_MAP = new ConcurrentHashMap<>();
// appCode:apiCode:SacLoadBalancing // appCode:apiCode:SacLoadBalancing
private static ConcurrentHashMap<String, SacLoadBalancing> LOADBALANCING_MAP = new ConcurrentHashMap<>(); private static ConcurrentHashMap<String, SacLoadBalancing> LOADBALANCING_MAP = new ConcurrentHashMap<>();
// appCode:apiCode - ApiConfig // appCode:apiCode - ApiConfig
private static ConcurrentHashMap<String, ApiConfig> APICODE_CONFIG_MAP = new ConcurrentHashMap<>(); private static ConcurrentHashMap<String, ApiConfig> APICODE_CONFIG_MAP = new ConcurrentHashMap<>();
// apiCode限流配置 appCode:apiCode - Strategy // apiCode限流配置 appCode:apiCode - RateLimiterRegistry
private static ConcurrentHashMap<String, Strategy> APICODE_CONFIG_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>(); private static ConcurrentHashMap<String, SacCurrentLimiting> APICODE_CONFIG_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>();
// apiCode熔断配置 appCode:apiCode - CircuitBreaker // apiCode熔断配置 appCode:apiCode - CircuitBreaker
private static ConcurrentHashMap<String, CircuitBreaker> APICODE_CONFIG_CIRCUIT_BREAKER_MAP = new ConcurrentHashMap<>(); private static ConcurrentHashMap<String, CircuitBreaker> APICODE_CONFIG_CIRCUIT_BREAKER_MAP = new ConcurrentHashMap<>();
// 禁用appCode // 禁用appCode
private static ConcurrentHashSet<String> DISABLED_APPCODE = new ConcurrentHashSet<String>(); private static ConcurrentHashSet<String> DISABLED_APPCODE = new ConcurrentHashSet<String>();
public static void addDisabledAppcode(String appCode) { public static void addDisabledAppcode(String appCode) {
DISABLED_APPCODE.add(appCode); DISABLED_APPCODE.add(appCode);
}
public static boolean isDisabledAppcode(String appCode) {
return DISABLED_APPCODE.contains(appCode);
}
public static Strategy getGlobalAppCurrentLimitingConfig(String appCode) {
return GLOBAL_APP_CURRENT_LIMITING_CONFIG_MAP.get(appCode);
} }
public static Strategy getGlobalApiCurrentLimitingConfig(String appCode) { public static boolean isDisabledAppcode(String appCode) {
return GLOBAL_API_CURRENT_LIMITING_CONFIG_MAP.get(appCode); return DISABLED_APPCODE.contains(appCode);
}
public static SacCurrentLimiting getGlobalAppCurrentLimitingConfig(String appCode) {
return GLOBAL_APP_CURRENT_LIMITING_MAP.get(appCode);
} }
public static AppConfig getAppConfig(String appCode) { public static AppConfig getAppConfig(String appCode) {
return CACHE_APP_CONFIG_MAP.get(appCode); return CACHE_APP_CONFIG_MAP.get(appCode);
} }
public static boolean isDataSecurity(String appCode) { public static boolean isDataSecurity(String appCode) {
return CACHE_APP_CONFIG_MAP.get(appCode) != null && CACHE_APP_CONFIG_MAP.get(appCode).getDataSecurity() != null ? true : false; return CACHE_APP_CONFIG_MAP.get(appCode) != null && CACHE_APP_CONFIG_MAP.get(appCode).getDataSecurity() != null
? true
: false;
} }
public static boolean isApiCodeCircuitBreaker(String key) { public static boolean isApiCodeCircuitBreaker(String key) {
return APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key) != null ? true : false; return APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key) != null ? true : false;
} }
public static CircuitBreaker getApiCodeCircuitBreaker(String key) { public static CircuitBreaker getApiCodeCircuitBreaker(String key) {
return APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key); return APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key);
} }
public static Strategy getApiCurrentLimiting(String key) { /***
return APICODE_CONFIG_CURRENT_LIMITING_MAP.get(key); * 优先apicode配置限流无法找到匹配全局限流
*
* @param appCode
* @param apiCode
* @return
*/
public static SacCurrentLimiting getApiCurrentLimiting(String appCode, String apiCode) {
String key = appCode + ":" + apiCode;
SacCurrentLimiting sacCurrentLimiting = APICODE_CONFIG_CURRENT_LIMITING_MAP.get(key) != null
? APICODE_CONFIG_CURRENT_LIMITING_MAP.get(key)
: null;
sacCurrentLimiting = sacCurrentLimiting != null ? sacCurrentLimiting
: (GLOBAL_API_CURRENT_LIMITING_MAP.get(appCode) != null ? GLOBAL_API_CURRENT_LIMITING_MAP.get(appCode)
: null);
return sacCurrentLimiting;
} }
public static VertxConfig getVertxConfig() { public static VertxConfig getVertxConfig() {
return VERTX_CONFIG; return VERTX_CONFIG;
} }
@ -106,19 +122,36 @@ public class AppConfigHandle {
public static String getApiCodeHeaderKey() { public static String getApiCodeHeaderKey() {
return VERTX_CONFIG.getApiCodeHeaderKey(); return VERTX_CONFIG.getApiCodeHeaderKey();
} }
public static SacLoadBalancing getLoadBalancing(String key) { public static SacLoadBalancing getLoadBalancing(String key) {
return LOADBALANCING_MAP.get(key); return LOADBALANCING_MAP.get(key);
} }
public static ApiConfig getApicodeConfigMap(String key) { public static ApiConfig getApicodeConfigMap(String key) {
return APICODE_CONFIG_MAP.get(key); return APICODE_CONFIG_MAP.get(key);
} }
public static boolean isApicodeUri(String key, String uri) { public static boolean isApicodeUri(String key, String uri) {
return StringUtils.equals(APICODE_CONFIG_MAP.get(key).getUri(), uri); return StringUtils.equals(APICODE_CONFIG_MAP.get(key).getUri(), uri);
} }
private static RateLimiterRegistry createRateLimiter(Strategy strategy) {
RateLimiterConfig config = RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(strategy.getTimeWindow()))
.limitForPeriod(strategy.getThreshold()).timeoutDuration(Duration.ofMillis(0)).build();
RateLimiterRegistry registry = RateLimiterRegistry.of(config);
return registry;
}
private static void initRateLimiter(String appCode, Strategy strategy,
ConcurrentHashMap<String, SacCurrentLimiting> map) {
RateLimiterRegistry registry = createRateLimiter(strategy);
SacCurrentLimiting sacCurrentLimiting = new SacCurrentLimiting();
sacCurrentLimiting.setStrategy(strategy);
sacCurrentLimiting.setRegistry(registry);
map.put(appCode, sacCurrentLimiting);
}
/*** /***
* 从redis加载数据 * 从redis加载数据
* *
@ -144,10 +177,10 @@ public class AppConfigHandle {
private static void delAppConfigCache(String appCode) { private static void delAppConfigCache(String appCode) {
AppConfig appConfig = CACHE_APP_CONFIG_MAP.get(appCode); AppConfig appConfig = CACHE_APP_CONFIG_MAP.get(appCode);
if(appConfig != null) { if (appConfig != null) {
// appapi默认限流 // appapi默认限流
GLOBAL_API_CURRENT_LIMITING_CONFIG_MAP.remove(appCode); GLOBAL_API_CURRENT_LIMITING_MAP.remove(appCode);
GLOBAL_APP_CURRENT_LIMITING_CONFIG_MAP.remove(appCode); GLOBAL_APP_CURRENT_LIMITING_MAP.remove(appCode);
for (SacService sacService : appConfig.getService()) { for (SacService sacService : appConfig.getService()) {
if (sacService.getApiConfig() != null && sacService.getApiConfig().size() > 0) { if (sacService.getApiConfig() != null && sacService.getApiConfig().size() > 0) {
for (ApiConfig apiConfig : sacService.getApiConfig()) { for (ApiConfig apiConfig : sacService.getApiConfig()) {
@ -157,7 +190,7 @@ public class AppConfigHandle {
APICODE_CONFIG_CURRENT_LIMITING_MAP.remove(key); APICODE_CONFIG_CURRENT_LIMITING_MAP.remove(key);
String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER"; String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER";
CircuitBreaker circuitBreaker = APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(keyCircuitBreaker); CircuitBreaker circuitBreaker = APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(keyCircuitBreaker);
if(circuitBreaker != null) { if (circuitBreaker != null) {
circuitBreaker.close(); circuitBreaker.close();
APICODE_CONFIG_CIRCUIT_BREAKER_MAP.remove(keyCircuitBreaker); APICODE_CONFIG_CIRCUIT_BREAKER_MAP.remove(keyCircuitBreaker);
} }
@ -168,13 +201,14 @@ public class AppConfigHandle {
CACHE_APP_CONFIG_MAP.remove(appCode); CACHE_APP_CONFIG_MAP.remove(appCode);
} }
} }
public static void initAppConfig(RedisTemplate<String, String> redisTemplate, String appCode, boolean isDelLocalCache) { public static void initAppConfig(RedisTemplate<String, String> redisTemplate, String appCode,
boolean isDelLocalCache) {
// 是否需要先删除 // 是否需要先删除
if(isDelLocalCache) { if (isDelLocalCache) {
delAppConfigCache(appCode); delAppConfigCache(appCode);
} }
String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appCode; String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appCode;
String appCodeValue = redisTemplate.opsForValue().get(appCodeKey); String appCodeValue = redisTemplate.opsForValue().get(appCodeKey);
if (StringUtils.isNotBlank(appCodeValue)) { if (StringUtils.isNotBlank(appCodeValue)) {
@ -184,11 +218,11 @@ public class AppConfigHandle {
// appapi默认限流 // appapi默认限流
if (appConfig.getApiCurrentLimitingConfig() != null) { if (appConfig.getApiCurrentLimitingConfig() != null) {
GLOBAL_API_CURRENT_LIMITING_CONFIG_MAP.put(appCode, appConfig.getApiCurrentLimitingConfig()); initRateLimiter(appCode, appConfig.getApiCurrentLimitingConfig(), GLOBAL_API_CURRENT_LIMITING_MAP);
} }
if (appConfig.getAppCurrentLimitingConfig() != null) { if (appConfig.getAppCurrentLimitingConfig() != null) {
GLOBAL_APP_CURRENT_LIMITING_CONFIG_MAP.put(appCode, appConfig.getAppCurrentLimitingConfig()); initRateLimiter(appCode, appConfig.getAppCurrentLimitingConfig(), GLOBAL_APP_CURRENT_LIMITING_MAP);
} }
// app router负载均衡 // app router负载均衡
@ -231,8 +265,13 @@ public class AppConfigHandle {
if (apiConfig.getStrategy() != null && apiConfig.getStrategy().size() > 0) { if (apiConfig.getStrategy() != null && apiConfig.getStrategy().size() > 0) {
for (Strategy strategy : apiConfig.getStrategy()) { for (Strategy strategy : apiConfig.getStrategy()) {
if (StringUtils.equals(strategy.getType(), "CURRENT_LIMITING")) { if (StringUtils.equals(strategy.getType(), "CURRENT_LIMITING")) {
APICODE_CONFIG_CURRENT_LIMITING_MAP.put(key, strategy); RateLimiterRegistry registry = createRateLimiter(strategy);
SacCurrentLimiting sacCurrentLimiting = new SacCurrentLimiting();
sacCurrentLimiting.setStrategy(strategy);
sacCurrentLimiting.setRegistry(registry);
APICODE_CONFIG_CURRENT_LIMITING_MAP.put(key, sacCurrentLimiting);
} else if (StringUtils.equals(strategy.getType(), "CIRCUIT_BREAKER")) { } else if (StringUtils.equals(strategy.getType(), "CIRCUIT_BREAKER")) {
String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER"; String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER";
// interfaceBreaker = CircuitBreaker.create("interfaceBreaker", VERTX, // interfaceBreaker = CircuitBreaker.create("interfaceBreaker", VERTX,
@ -245,19 +284,20 @@ public class AppConfigHandle {
// });//.retryPolicy(retryCount -> retryCount * 100L); // });//.retryPolicy(retryCount -> retryCount * 100L);
// apiCode熔断 // apiCode熔断
CircuitBreaker circuitBreaker = CircuitBreaker.create(keyCircuitBreaker + "-circuit-breaker", CircuitBreaker circuitBreaker = CircuitBreaker
VERTX, new CircuitBreakerOptions().setMaxFailures(strategy.getThreshold()) // 最大失败数 .create(keyCircuitBreaker + "-circuit-breaker", VERTX,
.setFailuresRollingWindow(strategy.getTimeWindow() * 1000) // 毫秒 new CircuitBreakerOptions().setMaxFailures(strategy.getThreshold()) // 最大失败数
// .setTimeout(2000) // 超时时间 .setFailuresRollingWindow(strategy.getTimeWindow() * 1000) // 毫秒
.setFallbackOnFailure(true) // 失败后是否调用回退函数fallback // .setTimeout(2000) // 超时时间
.setResetTimeout(strategy.getRecovery_interval()) // 在开启状态下尝试重试之前所需时间 .setFallbackOnFailure(true) // 失败后是否调用回退函数fallback
).openHandler(v -> { .setResetTimeout(strategy.getRecovery_interval()) // 在开启状态下尝试重试之前所需时间
log.info(keyCircuitBreaker + " Circuit open"); ).openHandler(v -> {
}).halfOpenHandler(v -> { log.info(keyCircuitBreaker + " Circuit open");
log.info(keyCircuitBreaker + "Circuit halfOpen"); }).halfOpenHandler(v -> {
}).closeHandler(v -> { log.info(keyCircuitBreaker + "Circuit halfOpen");
log.info(keyCircuitBreaker + "Circuit close"); }).closeHandler(v -> {
}); log.info(keyCircuitBreaker + "Circuit close");
});
APICODE_CONFIG_CIRCUIT_BREAKER_MAP.put(keyCircuitBreaker, circuitBreaker); APICODE_CONFIG_CIRCUIT_BREAKER_MAP.put(keyCircuitBreaker, circuitBreaker);
} }
} }
@ -269,17 +309,17 @@ public class AppConfigHandle {
} }
} }
} }
public static Vertx createVertx() { public static Vertx createVertx() {
// TODO 编解码线程池,后面优化协程等方式 // TODO 编解码线程池,后面优化协程等方式
VertxOptions vertxOptions = new VertxOptions(); VertxOptions vertxOptions = new VertxOptions();
loadVertxOptions(vertxOptions); loadVertxOptions(vertxOptions);
VERTX = Vertx.vertx(vertxOptions); VERTX = Vertx.vertx(vertxOptions);
initConnectionCircuitBreaker(); initConnectionCircuitBreaker();
return VERTX; return VERTX;
} }
/*** /***
* 初始化connection Breaker * 初始化connection Breaker
*/ */
@ -297,7 +337,6 @@ public class AppConfigHandle {
log.info("connectionCircuitBreaker Circuit close"); log.info("connectionCircuitBreaker Circuit close");
}); });
} }
private static void loadVertxOptions(VertxOptions vertxOptions) { private static void loadVertxOptions(VertxOptions vertxOptions) {
long blockedThreadCheckInterval = VERTX_CONFIG.getVertxOptionsConfig() == null ? -1 long blockedThreadCheckInterval = VERTX_CONFIG.getVertxOptionsConfig() == null ? -1
@ -314,7 +353,5 @@ public class AppConfigHandle {
vertxOptions.setBlockedThreadCheckInterval(blockedThreadCheckInterval); // 不打印Thread blocked 阻塞日志 vertxOptions.setBlockedThreadCheckInterval(blockedThreadCheckInterval); // 不打印Thread blocked 阻塞日志
} }
} }
} }

View File

@ -0,0 +1,24 @@
package com.sf.vertx.handle;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;
/***
* 限流熔断, redis存储
* @author xy
*
*/
@VertxGen
public interface AppRateLimitHandler extends Handler<RoutingContext> {
static AppRateLimitHandler create(String instance) {
switch (instance) {
case "redis":
default:
// 本地缓存
return new AppRateLimitHandlerImpl();
}
}
}

View File

@ -0,0 +1,43 @@
package com.sf.vertx.handle;
import com.sf.vertx.constans.RedisKeyConfig;
import com.sf.vertx.pojo.SacCurrentLimiting;
import io.github.resilience4j.core.functions.CheckedRunnable;
import io.github.resilience4j.ratelimiter.RateLimiter;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.HttpException;
/***
* 内存存储
*
* @author xy
*
*/
public class AppRateLimitHandlerImpl implements AppRateLimitHandler {
@Override
public void handle(RoutingContext rc) {
String appCode = rc.request().headers().get(AppConfigHandle.getAppCodeHeaderKey());
SacCurrentLimiting currentLimiting = AppConfigHandle.getGlobalAppCurrentLimitingConfig(appCode);
if (currentLimiting != null) {
String key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCode;
RateLimiter rateLimiter = currentLimiting.getRegistry().rateLimiter(key);
CheckedRunnable restrictedCall = RateLimiter.decorateCheckedRunnable(rateLimiter, () -> {
rc.next();
return;
});
try {
restrictedCall.run();
} catch (Throwable t) {
t.printStackTrace();
rc.fail(new HttpException(10015, currentLimiting.getStrategy().getDefaultResponse()));
return;
}
} else {
rc.next();
return;
}
}
}

View File

@ -1,10 +0,0 @@
package com.sf.vertx.handle;
/***
* 内存存储
* @author xy
*
*/
public class RateLimitHandlerImpl {
}

View File

@ -1,63 +1,63 @@
package com.sf.vertx.handle; //package com.sf.vertx.handle;
//
import java.util.Map; //import java.util.Map;
//
import com.sf.vertx.api.pojo.Strategy; //import com.sf.vertx.api.pojo.Strategy;
import com.sf.vertx.constans.SacErrorCode; //import com.sf.vertx.constans.SacErrorCode;
//
import io.vertx.ext.web.RoutingContext; //import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.HttpException; //import io.vertx.ext.web.handler.HttpException;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
//
/*** ///***
* redis存储 // * redis存储
* // * 解决不了时间窗口滑动, 使用resilience4j替代
* @author xy // *
* // * @author xy
*/ // *
@Slf4j // */
public class RateLimitHandlerRedisImpl implements RateLimitHandler { //@Slf4j
private RedisRateLimiter rateLimiter; //public class RateLimitHandlerRedisImpl implements RateLimitHandler {
// private int pattern;// 1:app,2:接口默认,3:服务接口配置限流策略 // private RedisRateLimiter rateLimiter;
// // private int pattern;// 1:app,2:接口默认,3:服务接口配置限流策略
public RateLimitHandlerRedisImpl(RedisRateLimiter rateLimiter) { //
this.rateLimiter = rateLimiter; // public RateLimitHandlerRedisImpl(RedisRateLimiter rateLimiter) {
} // this.rateLimiter = rateLimiter;
// }
@Override //
public void handle(RoutingContext rc) { // @Override
try { // public void handle(RoutingContext rc) {
// TODO 测试异常 // try {
String appCodeHeaderKey = rc.request().headers().get(AppConfigHandle.getAppCodeHeaderKey()); // String appCodeHeaderKey = rc.request().headers().get(AppConfigHandle.getAppCodeHeaderKey());
String apiCodeHeaderKey = rc.request().headers().get(AppConfigHandle.getApiCodeHeaderKey()); // String apiCodeHeaderKey = rc.request().headers().get(AppConfigHandle.getApiCodeHeaderKey());
Strategy apiCodeStrategy = AppConfigHandle.getApiCurrentLimiting(appCodeHeaderKey + ":" + apiCodeHeaderKey); // Strategy apiCodeStrategy = AppConfigHandle.getApiCurrentLimiting(appCodeHeaderKey + ":" + apiCodeHeaderKey);
//
Strategy globalApiStrategy = AppConfigHandle.getGlobalApiCurrentLimitingConfig(appCodeHeaderKey); // Strategy globalApiStrategy = AppConfigHandle.getGlobalApiCurrentLimitingConfig(appCodeHeaderKey);
Strategy globalAppStrategy = AppConfigHandle.getGlobalAppCurrentLimitingConfig(appCodeHeaderKey); // Strategy globalAppStrategy = AppConfigHandle.getGlobalAppCurrentLimitingConfig(appCodeHeaderKey);
//
Strategy apiStrategy = apiCodeStrategy != null ? apiCodeStrategy // Strategy apiStrategy = apiCodeStrategy != null ? apiCodeStrategy
: globalApiStrategy != null ? globalApiStrategy : null; // : globalApiStrategy != null ? globalApiStrategy : null;
Strategy appStrategy = globalAppStrategy != null ? globalAppStrategy : null; // Strategy appStrategy = globalAppStrategy != null ? globalAppStrategy : null;
//
if (apiStrategy != null || appStrategy != null) { // if (apiStrategy != null || appStrategy != null) {
Map<Integer, Boolean> retMap = rateLimiter.acquire(rc, apiStrategy, appStrategy); // Map<Integer, Boolean> retMap = rateLimiter.acquire(rc, apiStrategy, appStrategy);
//
if (apiStrategy != null && retMap.get(1) == false) { // if (apiStrategy != null && retMap.get(1) == false) {
rc.fail(new HttpException(10015, apiStrategy.getDefaultResponse())); // rc.fail(new HttpException(10015, apiStrategy.getDefaultResponse()));
return; // return;
} // }
//
if (appStrategy != null && retMap.get(2) == false) { // if (appStrategy != null && retMap.get(2) == false) {
rc.fail(new HttpException(10017, appStrategy.getDefaultResponse())); // rc.fail(new HttpException(10017, appStrategy.getDefaultResponse()));
return; // return;
} // }
} // }
} catch (Exception e) { // } catch (Exception e) {
e.printStackTrace(); // e.printStackTrace();
rc.fail(new HttpException(SacErrorCode.DEFAULT_ERROR_CODE)); // rc.fail(new HttpException(SacErrorCode.DEFAULT_ERROR_CODE));
return; // return;
} // }
rc.next(); // rc.next();
return; // return;
} // }
} //}

View File

@ -1,57 +1,57 @@
package com.sf.vertx.handle; //package com.sf.vertx.handle;
//
import java.util.HashMap; //import java.util.HashMap;
import java.util.Map; //import java.util.Map;
import java.util.concurrent.TimeUnit; //import java.util.concurrent.TimeUnit;
//
import org.springframework.data.redis.core.RedisTemplate; //import org.springframework.data.redis.core.RedisTemplate;
//
import com.sf.vertx.api.pojo.Strategy; //import com.sf.vertx.api.pojo.Strategy;
import com.sf.vertx.constans.RedisKeyConfig; //import com.sf.vertx.constans.RedisKeyConfig;
import com.sf.vertx.utils.SpringUtils; //import com.sf.vertx.utils.SpringUtils;
//
import cn.hutool.core.thread.ThreadUtil; //import cn.hutool.core.thread.ThreadUtil;
import io.vertx.ext.web.RoutingContext; //import io.vertx.ext.web.RoutingContext;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
//
@Slf4j //@Slf4j
public class RedisRateLimiter { //public class RedisRateLimiter {
public Map<Integer, Boolean> acquire(RoutingContext rc, Strategy apiStrategy, Strategy appStrategy) { // public Map<Integer, Boolean> acquire(RoutingContext rc, Strategy apiStrategy, Strategy appStrategy) {
String appCodeHeaderKey = rc.request().headers().get(AppConfigHandle.getAppCodeHeaderKey()); // String appCodeHeaderKey = rc.request().headers().get(AppConfigHandle.getAppCodeHeaderKey());
String key = null; // String key = null;
Map<Integer, Boolean> retMap = new HashMap<>(); // Map<Integer, Boolean> retMap = new HashMap<>();
if (apiStrategy != null) { // if (apiStrategy != null) {
key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCodeHeaderKey + ":" + rc.request().uri() // key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCodeHeaderKey + ":" + rc.request().uri()
+ ":" + rc.request().method(); // + ":" + rc.request().method();
Boolean ret = rateLimiter(key, appCodeHeaderKey, apiStrategy.getThreshold(), apiStrategy.getTimeWindow()); // Boolean ret = rateLimiter(key, appCodeHeaderKey, apiStrategy.getThreshold(), apiStrategy.getTimeWindow());
retMap.put(1, ret); // retMap.put(1, ret);
} // }
//
if (appStrategy != null) { // if (appStrategy != null) {
key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCodeHeaderKey; // key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCodeHeaderKey;
Boolean ret = rateLimiter(key, appCodeHeaderKey, appStrategy.getThreshold(), appStrategy.getTimeWindow()); // Boolean ret = rateLimiter(key, appCodeHeaderKey, appStrategy.getThreshold(), appStrategy.getTimeWindow());
retMap.put(2, ret); // retMap.put(2, ret);
} // }
return retMap; // return retMap;
} // }
//
@SuppressWarnings("unchecked") // @SuppressWarnings("unchecked")
private Boolean rateLimiter(String key, String appCode, Integer threshold, Integer timeWindow) { // private Boolean rateLimiter(String key, String appCode, Integer threshold, Integer timeWindow) {
RedisTemplate<String, Integer> redisTemplate = SpringUtils.getBean("redisTemplate", RedisTemplate.class); // RedisTemplate<String, Integer> redisTemplate = SpringUtils.getBean("redisTemplate", RedisTemplate.class);
Integer count = redisTemplate.opsForValue().get(key); // Integer count = redisTemplate.opsForValue().get(key);
// redisTemplate.delete(key); // // redisTemplate.delete(key);
log.info("redis limiter. key:{}, count:{}", key, count); // log.info("redis limiter. key:{}, count:{}", key, count);
// 初始化,设置过期时间 // // 初始化,设置过期时间
ThreadUtil.execAsync(() -> { // ThreadUtil.execAsync(() -> {
increment(timeWindow, redisTemplate, key); // increment(timeWindow, redisTemplate, key);
}); // });
return (count == null || count <= threshold) ? true : false; // return (count == null || count <= threshold) ? true : false;
} // }
//
private void increment(Integer timeWindow, RedisTemplate<String, Integer> redisTemplate, String key) { // private void increment(Integer timeWindow, RedisTemplate<String, Integer> redisTemplate, String key) {
Long incr = redisTemplate.opsForValue().increment(key); // Long incr = redisTemplate.opsForValue().increment(key);
if (incr == 1) { // 创建,才设置时间窗口 // if (incr == 1) { // 创建,才设置时间窗口
redisTemplate.expire(key, timeWindow, TimeUnit.SECONDS); // redisTemplate.expire(key, timeWindow, TimeUnit.SECONDS);
} // }
} // }
} //}

View File

@ -10,11 +10,12 @@ import org.springframework.stereotype.Component;
import com.sf.vertx.api.pojo.VertxConfig; import com.sf.vertx.api.pojo.VertxConfig;
import com.sf.vertx.constans.RedisKeyConfig; import com.sf.vertx.constans.RedisKeyConfig;
import com.sf.vertx.handle.ApiRateLimitHandler;
import com.sf.vertx.handle.AppConfigHandle; import com.sf.vertx.handle.AppConfigHandle;
import com.sf.vertx.handle.AppRateLimitHandler;
import com.sf.vertx.handle.BodyHandler; import com.sf.vertx.handle.BodyHandler;
import com.sf.vertx.handle.ParameterCheckHandler; import com.sf.vertx.handle.ParameterCheckHandler;
import com.sf.vertx.handle.ProxyHandler; import com.sf.vertx.handle.ProxyHandler;
import com.sf.vertx.handle.RateLimitHandler;
import com.sf.vertx.handle.RestfulFailureHandler; import com.sf.vertx.handle.RestfulFailureHandler;
import com.sf.vertx.utils.ProxyTool; import com.sf.vertx.utils.ProxyTool;
@ -56,21 +57,21 @@ public class DynamicBuildServer implements ApplicationRunner {
redisKeyConfig.init(); redisKeyConfig.init();
// 从redis同步vertx配置 // 从redis同步vertx配置
AppConfigHandle.initVertxConfig(redisTemplate); AppConfigHandle.initVertxConfig(redisTemplate);
// 加载vertx应用配置 // 加载vertx应用配置
appStartLoadData(); appStartLoadData();
} }
/*** /***
* 应用启动, 从redis读取配置,初始化vertx服务 * 应用启动, 从redis读取配置,初始化vertx服务
* @throws Exception *
* @throws Exception
*/ */
private void appStartLoadData() throws Exception { private void appStartLoadData() throws Exception {
Vertx vertx = AppConfigHandle.createVertx(); Vertx vertx = AppConfigHandle.createVertx();
// 从redis同步app配置 // 从redis同步app配置
AppConfigHandle.initAllAppConfig(redisTemplate); AppConfigHandle.initAllAppConfig(redisTemplate);
VertxConfig vertxConfig = AppConfigHandle.getVertxConfig(); VertxConfig vertxConfig = AppConfigHandle.getVertxConfig();
// 创建HTTP监听 // 创建HTTP监听
// 所有ip都能访问 // 所有ip都能访问
@ -120,8 +121,12 @@ public class DynamicBuildServer implements ApplicationRunner {
WebClient mainWebClient = WebClient.create(vertx); WebClient mainWebClient = WebClient.create(vertx);
String rateLimitModel = vertxConfig.getRateLimitModel(); String rateLimitModel = vertxConfig.getRateLimitModel();
mainHttpRouter.route().handler(ParameterCheckHandler.create()).handler(RateLimitHandler.create(rateLimitModel)).handler(BodyHandler.create()) rateLimitModel = "local";
.handler(ProxyHandler.create(mainWebClient, proxy)).failureHandler(RestfulFailureHandler.create()); mainHttpRouter.route().handler(ParameterCheckHandler.create())
.handler(AppRateLimitHandler.create(rateLimitModel))
.handler(ApiRateLimitHandler.create(rateLimitModel))
.handler(BodyHandler.create()).handler(ProxyHandler.create(mainWebClient, proxy))
.failureHandler(RestfulFailureHandler.create());
// mainHttpRouter.route().handler(ProxyHandler.create(mainWebClient, proxy)); // mainHttpRouter.route().handler(ProxyHandler.create(mainWebClient, proxy));
} }

View File

@ -0,0 +1,13 @@
package com.sf.vertx.pojo;
import com.sf.vertx.api.pojo.Strategy;
import io.github.resilience4j.ratelimiter.RateLimiterRegistry;
import lombok.Data;
@Data
public class SacCurrentLimiting {
private RateLimiterRegistry registry;
private Strategy strategy;
}

View File

@ -1,20 +1,76 @@
package com.sf.vertx; package com.sf.vertx;
import org.junit.Test;
import com.sf.vertx.api.pojo.VertxConfig; import com.sf.vertx.api.pojo.VertxConfig;
import com.sf.vertx.handle.AppConfigHandle; import com.sf.vertx.handle.AppConfigHandle;
import io.vertx.circuitbreaker.CircuitBreaker; import io.vertx.circuitbreaker.CircuitBreaker;
import io.vertx.circuitbreaker.CircuitBreakerOptions; import io.vertx.circuitbreaker.CircuitBreakerOptions;
import io.vertx.circuitbreaker.HalfOpenCircuitException;
import io.vertx.circuitbreaker.OpenCircuitException;
import io.vertx.core.Future; import io.vertx.core.Future;
import io.vertx.core.Vertx; import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions; import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpMethod;
import io.vertx.core.impl.NoStackTraceThrowable;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class TestCircuitBreaker { public class TestCircuitBreaker {
private static int port; private static int port;
CircuitBreaker rateLimiter;
@Test
public void rateLimiter() {
VertxOptions vertxOptions = new VertxOptions();
Vertx VERTX = Vertx.vertx(vertxOptions);
// apiCode熔断
rateLimiter = CircuitBreaker.create("rateLimiter-circuit-breaker", VERTX,
new CircuitBreakerOptions().setMaxFailures(2) // 最大失败数
.setFailuresRollingWindow(5 * 1000) // 毫秒
.setTimeout(-1) // 超时时间
.setFallbackOnFailure(true) // 失败后是否调用回退函数fallback
.setResetTimeout(-1) // 在开启状态下尝试重试之前所需时间
).openHandler(v -> {
log.info(" Circuit open");
}).halfOpenHandler(v -> {
log.info("Circuit halfOpen");
}).closeHandler(v -> {
log.info("Circuit close");
});
for (int i = 0; i < 20; i++) {
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
}
rateLimiter.executeWithFallback(promise -> {
promise.fail("1");
}, v -> {
// 需要传递当前状态half-open , close, 还是统计失败次数
//log.info(" executed when the circuit is opened:{}", v.getMessage());
if (v instanceof HalfOpenCircuitException) {
log.info(" half open circuit");
} else if (v instanceof OpenCircuitException) {
log.info(" open circuit");
} else if (v instanceof NoStackTraceThrowable) {
log.info(" close circuit");
}
return "3";
}, ar -> {
// Do something with the result
log.info(" interface failed result.{} ", ar);
});
}
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
}
}
public static void main(String[] args) { public static void main(String[] args) {
VertxConfig vertxConfig = AppConfigHandle.getVertxConfig(); VertxConfig vertxConfig = AppConfigHandle.getVertxConfig();
@ -33,7 +89,7 @@ public class TestCircuitBreaker {
}); });
for (int i = 0; i < 20; i++) { for (int i = 0; i < 20; i++) {
port = 9199; port = 9199;
if(i % 2 == 0) { if (i % 2 == 0) {
port = 9198; port = 9198;
log.info("i:{},port:{}", i, port); log.info("i:{},port:{}", i, port);
} }