diff --git a/sf-vertx/pom.xml b/sf-vertx/pom.xml index d0f36ef..0365cfe 100644 --- a/sf-vertx/pom.xml +++ b/sf-vertx/pom.xml @@ -15,10 +15,18 @@ 4.5.7 1.5.2 + 2.2.0 + + io.github.resilience4j + resilience4j-bom + ${resilience4j.version} + pom + import + io.vertx vertx-stack-depchain @@ -206,8 +214,15 @@ com.github.ben-manes.caffeine caffeine - 2.6.2 + + io.github.resilience4j + resilience4j-ratelimiter + + + io.github.resilience4j + resilience4j-bulkhead + diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/RateLimitHandler.java b/sf-vertx/src/main/java/com/sf/vertx/handle/ApiRateLimitHandler.java similarity index 50% rename from sf-vertx/src/main/java/com/sf/vertx/handle/RateLimitHandler.java rename to sf-vertx/src/main/java/com/sf/vertx/handle/ApiRateLimitHandler.java index 141db87..4f17776 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/RateLimitHandler.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/ApiRateLimitHandler.java @@ -2,7 +2,6 @@ package com.sf.vertx.handle; import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.Handler; -import io.vertx.core.json.JsonObject; import io.vertx.ext.web.RoutingContext; /*** @@ -11,16 +10,16 @@ import io.vertx.ext.web.RoutingContext; * */ @VertxGen -public interface RateLimitHandler extends Handler { +public interface ApiRateLimitHandler extends Handler { - static RateLimitHandler create(String instance) { + static ApiRateLimitHandler create(String instance) { switch (instance) { case "redis": - RedisRateLimiter redisRateLimiter = new RedisRateLimiter(); - return new RateLimitHandlerRedisImpl(redisRateLimiter); + //RedisRateLimiter redisRateLimiter = new RedisRateLimiter(); + //return new RateLimitHandlerRedisImpl(redisRateLimiter); default: // 本地缓存 - return null; + return new ApiRateLimitHandlerImpl(); } } diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/ApiRateLimitHandlerImpl.java b/sf-vertx/src/main/java/com/sf/vertx/handle/ApiRateLimitHandlerImpl.java new file mode 100644 index 0000000..fc394c0 --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/ApiRateLimitHandlerImpl.java @@ -0,0 +1,47 @@ +package com.sf.vertx.handle; + +import com.sf.vertx.constans.RedisKeyConfig; +import com.sf.vertx.pojo.SacCurrentLimiting; + +import io.github.resilience4j.core.functions.CheckedRunnable; +import io.github.resilience4j.ratelimiter.RateLimiter; +import io.vertx.ext.web.RoutingContext; +import io.vertx.ext.web.handler.HttpException; + +/*** + * 内存存储 + * + * @author xy + * + */ +public class ApiRateLimitHandlerImpl implements ApiRateLimitHandler { + + @Override + public void handle(RoutingContext rc) { + String appCode = rc.request().headers().get(AppConfigHandle.getAppCodeHeaderKey()); + String apiCode = rc.request().headers().get(AppConfigHandle.getApiCodeHeaderKey()); + + SacCurrentLimiting currentLimiting = AppConfigHandle.getApiCurrentLimiting(appCode, apiCode); + + if(currentLimiting != null) { + String key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCode + ":" + apiCode + ":" + rc.request().uri() + + ":" + rc.request().method(); + RateLimiter rateLimiter = currentLimiting.getRegistry().rateLimiter(key); + CheckedRunnable restrictedCall = RateLimiter.decorateCheckedRunnable(rateLimiter, + () -> { + rc.next(); + return; + }); + try { + restrictedCall.run(); + } catch (Throwable t) { + t.printStackTrace(); + rc.fail(new HttpException(10015, currentLimiting.getStrategy().getDefaultResponse())); + return; + } + } else { + rc.next(); + return; + } + } +} diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/AppConfigHandle.java b/sf-vertx/src/main/java/com/sf/vertx/handle/AppConfigHandle.java index 6e3ba63..9416f81 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/AppConfigHandle.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/AppConfigHandle.java @@ -1,5 +1,6 @@ package com.sf.vertx.handle; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -20,9 +21,12 @@ import com.sf.vertx.api.pojo.Strategy; import com.sf.vertx.api.pojo.VertxConfig; import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing; import com.sf.vertx.constans.RedisKeyConfig; +import com.sf.vertx.pojo.SacCurrentLimiting; import com.sf.vertx.utils.ProxyTool; import cn.hutool.core.collection.ConcurrentHashSet; +import io.github.resilience4j.ratelimiter.RateLimiterConfig; +import io.github.resilience4j.ratelimiter.RateLimiterRegistry; import io.vertx.circuitbreaker.CircuitBreaker; import io.vertx.circuitbreaker.CircuitBreakerOptions; import io.vertx.core.Vertx; @@ -42,59 +46,71 @@ public class AppConfigHandle { public static CircuitBreaker CONNECTION_CIRCUIT_BREAKER; // global cache app config private static final ConcurrentHashMap CACHE_APP_CONFIG_MAP = new ConcurrentHashMap<>(); + // global api config appCode - RateLimiterRegistry + private static final ConcurrentHashMap GLOBAL_API_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>(); // global app config appCode - Strategy - private static final ConcurrentHashMap GLOBAL_APP_CURRENT_LIMITING_CONFIG_MAP = new ConcurrentHashMap<>(); - // global api config appCode - Strategy - private static final ConcurrentHashMap GLOBAL_API_CURRENT_LIMITING_CONFIG_MAP = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap GLOBAL_APP_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>(); // appCode:apiCode:SacLoadBalancing private static ConcurrentHashMap LOADBALANCING_MAP = new ConcurrentHashMap<>(); // appCode:apiCode - ApiConfig private static ConcurrentHashMap APICODE_CONFIG_MAP = new ConcurrentHashMap<>(); - // apiCode限流配置 appCode:apiCode - Strategy - private static ConcurrentHashMap APICODE_CONFIG_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>(); - + // apiCode限流配置 appCode:apiCode - RateLimiterRegistry + private static ConcurrentHashMap APICODE_CONFIG_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>(); + // apiCode熔断配置 appCode:apiCode - CircuitBreaker private static ConcurrentHashMap APICODE_CONFIG_CIRCUIT_BREAKER_MAP = new ConcurrentHashMap<>(); // 禁用appCode private static ConcurrentHashSet DISABLED_APPCODE = new ConcurrentHashSet(); - + public static void addDisabledAppcode(String appCode) { - DISABLED_APPCODE.add(appCode); - } - - public static boolean isDisabledAppcode(String appCode) { - return DISABLED_APPCODE.contains(appCode); - } - - public static Strategy getGlobalAppCurrentLimitingConfig(String appCode) { - return GLOBAL_APP_CURRENT_LIMITING_CONFIG_MAP.get(appCode); + DISABLED_APPCODE.add(appCode); } - public static Strategy getGlobalApiCurrentLimitingConfig(String appCode) { - return GLOBAL_API_CURRENT_LIMITING_CONFIG_MAP.get(appCode); + public static boolean isDisabledAppcode(String appCode) { + return DISABLED_APPCODE.contains(appCode); + } + + public static SacCurrentLimiting getGlobalAppCurrentLimitingConfig(String appCode) { + return GLOBAL_APP_CURRENT_LIMITING_MAP.get(appCode); } public static AppConfig getAppConfig(String appCode) { return CACHE_APP_CONFIG_MAP.get(appCode); } - + public static boolean isDataSecurity(String appCode) { - return CACHE_APP_CONFIG_MAP.get(appCode) != null && CACHE_APP_CONFIG_MAP.get(appCode).getDataSecurity() != null ? true : false; + return CACHE_APP_CONFIG_MAP.get(appCode) != null && CACHE_APP_CONFIG_MAP.get(appCode).getDataSecurity() != null + ? true + : false; } - + public static boolean isApiCodeCircuitBreaker(String key) { return APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key) != null ? true : false; } - + public static CircuitBreaker getApiCodeCircuitBreaker(String key) { return APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key); } - - public static Strategy getApiCurrentLimiting(String key) { - return APICODE_CONFIG_CURRENT_LIMITING_MAP.get(key); + + /*** + * 优先apicode配置限流、无法找到匹配全局限流 + * + * @param appCode + * @param apiCode + * @return + */ + public static SacCurrentLimiting getApiCurrentLimiting(String appCode, String apiCode) { + String key = appCode + ":" + apiCode; + SacCurrentLimiting sacCurrentLimiting = APICODE_CONFIG_CURRENT_LIMITING_MAP.get(key) != null + ? APICODE_CONFIG_CURRENT_LIMITING_MAP.get(key) + : null; + sacCurrentLimiting = sacCurrentLimiting != null ? sacCurrentLimiting + : (GLOBAL_API_CURRENT_LIMITING_MAP.get(appCode) != null ? GLOBAL_API_CURRENT_LIMITING_MAP.get(appCode) + : null); + return sacCurrentLimiting; } - + public static VertxConfig getVertxConfig() { return VERTX_CONFIG; } @@ -106,19 +122,36 @@ public class AppConfigHandle { public static String getApiCodeHeaderKey() { return VERTX_CONFIG.getApiCodeHeaderKey(); } - + public static SacLoadBalancing getLoadBalancing(String key) { return LOADBALANCING_MAP.get(key); } - + public static ApiConfig getApicodeConfigMap(String key) { return APICODE_CONFIG_MAP.get(key); } - + public static boolean isApicodeUri(String key, String uri) { return StringUtils.equals(APICODE_CONFIG_MAP.get(key).getUri(), uri); } - + + private static RateLimiterRegistry createRateLimiter(Strategy strategy) { + RateLimiterConfig config = RateLimiterConfig.custom() + .limitRefreshPeriod(Duration.ofSeconds(strategy.getTimeWindow())) + .limitForPeriod(strategy.getThreshold()).timeoutDuration(Duration.ofMillis(0)).build(); + RateLimiterRegistry registry = RateLimiterRegistry.of(config); + return registry; + } + + private static void initRateLimiter(String appCode, Strategy strategy, + ConcurrentHashMap map) { + RateLimiterRegistry registry = createRateLimiter(strategy); + SacCurrentLimiting sacCurrentLimiting = new SacCurrentLimiting(); + sacCurrentLimiting.setStrategy(strategy); + sacCurrentLimiting.setRegistry(registry); + map.put(appCode, sacCurrentLimiting); + } + /*** * 从redis加载数据 * @@ -144,10 +177,10 @@ public class AppConfigHandle { private static void delAppConfigCache(String appCode) { AppConfig appConfig = CACHE_APP_CONFIG_MAP.get(appCode); - if(appConfig != null) { + if (appConfig != null) { // app、api默认限流 - GLOBAL_API_CURRENT_LIMITING_CONFIG_MAP.remove(appCode); - GLOBAL_APP_CURRENT_LIMITING_CONFIG_MAP.remove(appCode); + GLOBAL_API_CURRENT_LIMITING_MAP.remove(appCode); + GLOBAL_APP_CURRENT_LIMITING_MAP.remove(appCode); for (SacService sacService : appConfig.getService()) { if (sacService.getApiConfig() != null && sacService.getApiConfig().size() > 0) { for (ApiConfig apiConfig : sacService.getApiConfig()) { @@ -157,7 +190,7 @@ public class AppConfigHandle { APICODE_CONFIG_CURRENT_LIMITING_MAP.remove(key); String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER"; CircuitBreaker circuitBreaker = APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(keyCircuitBreaker); - if(circuitBreaker != null) { + if (circuitBreaker != null) { circuitBreaker.close(); APICODE_CONFIG_CIRCUIT_BREAKER_MAP.remove(keyCircuitBreaker); } @@ -168,13 +201,14 @@ public class AppConfigHandle { CACHE_APP_CONFIG_MAP.remove(appCode); } } - - public static void initAppConfig(RedisTemplate redisTemplate, String appCode, boolean isDelLocalCache) { + + public static void initAppConfig(RedisTemplate redisTemplate, String appCode, + boolean isDelLocalCache) { // 是否需要先删除 - if(isDelLocalCache) { + if (isDelLocalCache) { delAppConfigCache(appCode); } - + String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appCode; String appCodeValue = redisTemplate.opsForValue().get(appCodeKey); if (StringUtils.isNotBlank(appCodeValue)) { @@ -184,11 +218,11 @@ public class AppConfigHandle { // app、api默认限流 if (appConfig.getApiCurrentLimitingConfig() != null) { - GLOBAL_API_CURRENT_LIMITING_CONFIG_MAP.put(appCode, appConfig.getApiCurrentLimitingConfig()); + initRateLimiter(appCode, appConfig.getApiCurrentLimitingConfig(), GLOBAL_API_CURRENT_LIMITING_MAP); } if (appConfig.getAppCurrentLimitingConfig() != null) { - GLOBAL_APP_CURRENT_LIMITING_CONFIG_MAP.put(appCode, appConfig.getAppCurrentLimitingConfig()); + initRateLimiter(appCode, appConfig.getAppCurrentLimitingConfig(), GLOBAL_APP_CURRENT_LIMITING_MAP); } // app router负载均衡 @@ -231,8 +265,13 @@ public class AppConfigHandle { if (apiConfig.getStrategy() != null && apiConfig.getStrategy().size() > 0) { for (Strategy strategy : apiConfig.getStrategy()) { + if (StringUtils.equals(strategy.getType(), "CURRENT_LIMITING")) { - APICODE_CONFIG_CURRENT_LIMITING_MAP.put(key, strategy); + RateLimiterRegistry registry = createRateLimiter(strategy); + SacCurrentLimiting sacCurrentLimiting = new SacCurrentLimiting(); + sacCurrentLimiting.setStrategy(strategy); + sacCurrentLimiting.setRegistry(registry); + APICODE_CONFIG_CURRENT_LIMITING_MAP.put(key, sacCurrentLimiting); } else if (StringUtils.equals(strategy.getType(), "CIRCUIT_BREAKER")) { String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER"; // interfaceBreaker = CircuitBreaker.create("interfaceBreaker", VERTX, @@ -245,19 +284,20 @@ public class AppConfigHandle { // });//.retryPolicy(retryCount -> retryCount * 100L); // apiCode熔断 - CircuitBreaker circuitBreaker = CircuitBreaker.create(keyCircuitBreaker + "-circuit-breaker", - VERTX, new CircuitBreakerOptions().setMaxFailures(strategy.getThreshold()) // 最大失败数 - .setFailuresRollingWindow(strategy.getTimeWindow() * 1000) // 毫秒 - // .setTimeout(2000) // 超时时间 - .setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback) - .setResetTimeout(strategy.getRecovery_interval()) // 在开启状态下,尝试重试之前所需时间 - ).openHandler(v -> { - log.info(keyCircuitBreaker + " Circuit open"); - }).halfOpenHandler(v -> { - log.info(keyCircuitBreaker + "Circuit halfOpen"); - }).closeHandler(v -> { - log.info(keyCircuitBreaker + "Circuit close"); - }); + CircuitBreaker circuitBreaker = CircuitBreaker + .create(keyCircuitBreaker + "-circuit-breaker", VERTX, + new CircuitBreakerOptions().setMaxFailures(strategy.getThreshold()) // 最大失败数 + .setFailuresRollingWindow(strategy.getTimeWindow() * 1000) // 毫秒 + // .setTimeout(2000) // 超时时间 + .setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback) + .setResetTimeout(strategy.getRecovery_interval()) // 在开启状态下,尝试重试之前所需时间 + ).openHandler(v -> { + log.info(keyCircuitBreaker + " Circuit open"); + }).halfOpenHandler(v -> { + log.info(keyCircuitBreaker + "Circuit halfOpen"); + }).closeHandler(v -> { + log.info(keyCircuitBreaker + "Circuit close"); + }); APICODE_CONFIG_CIRCUIT_BREAKER_MAP.put(keyCircuitBreaker, circuitBreaker); } } @@ -269,17 +309,17 @@ public class AppConfigHandle { } } } - + public static Vertx createVertx() { // TODO 编解码线程池,后面优化协程等方式 VertxOptions vertxOptions = new VertxOptions(); loadVertxOptions(vertxOptions); VERTX = Vertx.vertx(vertxOptions); - + initConnectionCircuitBreaker(); return VERTX; } - + /*** * 初始化connection Breaker */ @@ -297,7 +337,6 @@ public class AppConfigHandle { log.info("connectionCircuitBreaker Circuit close"); }); } - private static void loadVertxOptions(VertxOptions vertxOptions) { long blockedThreadCheckInterval = VERTX_CONFIG.getVertxOptionsConfig() == null ? -1 @@ -314,7 +353,5 @@ public class AppConfigHandle { vertxOptions.setBlockedThreadCheckInterval(blockedThreadCheckInterval); // 不打印Thread blocked 阻塞日志 } } - - } diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/AppRateLimitHandler.java b/sf-vertx/src/main/java/com/sf/vertx/handle/AppRateLimitHandler.java new file mode 100644 index 0000000..0ad7c3e --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/AppRateLimitHandler.java @@ -0,0 +1,24 @@ +package com.sf.vertx.handle; + +import io.vertx.codegen.annotations.VertxGen; +import io.vertx.core.Handler; +import io.vertx.ext.web.RoutingContext; + +/*** + * 限流熔断, redis存储 + * @author xy + * + */ +@VertxGen +public interface AppRateLimitHandler extends Handler { + + static AppRateLimitHandler create(String instance) { + switch (instance) { + case "redis": + default: + // 本地缓存 + return new AppRateLimitHandlerImpl(); + } + + } +} diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/AppRateLimitHandlerImpl.java b/sf-vertx/src/main/java/com/sf/vertx/handle/AppRateLimitHandlerImpl.java new file mode 100644 index 0000000..bf9f2d3 --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/AppRateLimitHandlerImpl.java @@ -0,0 +1,43 @@ +package com.sf.vertx.handle; + +import com.sf.vertx.constans.RedisKeyConfig; +import com.sf.vertx.pojo.SacCurrentLimiting; + +import io.github.resilience4j.core.functions.CheckedRunnable; +import io.github.resilience4j.ratelimiter.RateLimiter; +import io.vertx.ext.web.RoutingContext; +import io.vertx.ext.web.handler.HttpException; + +/*** + * 内存存储 + * + * @author xy + * + */ +public class AppRateLimitHandlerImpl implements AppRateLimitHandler { + + @Override + public void handle(RoutingContext rc) { + String appCode = rc.request().headers().get(AppConfigHandle.getAppCodeHeaderKey()); + SacCurrentLimiting currentLimiting = AppConfigHandle.getGlobalAppCurrentLimitingConfig(appCode); + + if (currentLimiting != null) { + String key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCode; + RateLimiter rateLimiter = currentLimiting.getRegistry().rateLimiter(key); + CheckedRunnable restrictedCall = RateLimiter.decorateCheckedRunnable(rateLimiter, () -> { + rc.next(); + return; + }); + try { + restrictedCall.run(); + } catch (Throwable t) { + t.printStackTrace(); + rc.fail(new HttpException(10015, currentLimiting.getStrategy().getDefaultResponse())); + return; + } + } else { + rc.next(); + return; + } + } +} diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/RateLimitHandlerImpl.java b/sf-vertx/src/main/java/com/sf/vertx/handle/RateLimitHandlerImpl.java deleted file mode 100644 index bc42b07..0000000 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/RateLimitHandlerImpl.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.sf.vertx.handle; - -/*** - * 内存存储 - * @author xy - * - */ -public class RateLimitHandlerImpl { - -} diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/RateLimitHandlerRedisImpl.java b/sf-vertx/src/main/java/com/sf/vertx/handle/RateLimitHandlerRedisImpl.java index 47dd930..13e7998 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/RateLimitHandlerRedisImpl.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/RateLimitHandlerRedisImpl.java @@ -1,63 +1,63 @@ -package com.sf.vertx.handle; - -import java.util.Map; - -import com.sf.vertx.api.pojo.Strategy; -import com.sf.vertx.constans.SacErrorCode; - -import io.vertx.ext.web.RoutingContext; -import io.vertx.ext.web.handler.HttpException; -import lombok.extern.slf4j.Slf4j; - -/*** - * redis存储 - * - * @author xy - * - */ -@Slf4j -public class RateLimitHandlerRedisImpl implements RateLimitHandler { - private RedisRateLimiter rateLimiter; - // private int pattern;// 1:app,2:接口默认,3:服务接口配置限流策略 - - public RateLimitHandlerRedisImpl(RedisRateLimiter rateLimiter) { - this.rateLimiter = rateLimiter; - } - - @Override - public void handle(RoutingContext rc) { - try { - // TODO 测试异常 - String appCodeHeaderKey = rc.request().headers().get(AppConfigHandle.getAppCodeHeaderKey()); - String apiCodeHeaderKey = rc.request().headers().get(AppConfigHandle.getApiCodeHeaderKey()); - Strategy apiCodeStrategy = AppConfigHandle.getApiCurrentLimiting(appCodeHeaderKey + ":" + apiCodeHeaderKey); - - Strategy globalApiStrategy = AppConfigHandle.getGlobalApiCurrentLimitingConfig(appCodeHeaderKey); - Strategy globalAppStrategy = AppConfigHandle.getGlobalAppCurrentLimitingConfig(appCodeHeaderKey); - - Strategy apiStrategy = apiCodeStrategy != null ? apiCodeStrategy - : globalApiStrategy != null ? globalApiStrategy : null; - Strategy appStrategy = globalAppStrategy != null ? globalAppStrategy : null; - - if (apiStrategy != null || appStrategy != null) { - Map retMap = rateLimiter.acquire(rc, apiStrategy, appStrategy); - - if (apiStrategy != null && retMap.get(1) == false) { - rc.fail(new HttpException(10015, apiStrategy.getDefaultResponse())); - return; - } - - if (appStrategy != null && retMap.get(2) == false) { - rc.fail(new HttpException(10017, appStrategy.getDefaultResponse())); - return; - } - } - } catch (Exception e) { - e.printStackTrace(); - rc.fail(new HttpException(SacErrorCode.DEFAULT_ERROR_CODE)); - return; - } - rc.next(); - return; - } -} +//package com.sf.vertx.handle; +// +//import java.util.Map; +// +//import com.sf.vertx.api.pojo.Strategy; +//import com.sf.vertx.constans.SacErrorCode; +// +//import io.vertx.ext.web.RoutingContext; +//import io.vertx.ext.web.handler.HttpException; +//import lombok.extern.slf4j.Slf4j; +// +///*** +// * redis存储 +// * 解决不了时间窗口滑动, 使用resilience4j替代 +// * +// * @author xy +// * +// */ +//@Slf4j +//public class RateLimitHandlerRedisImpl implements RateLimitHandler { +// private RedisRateLimiter rateLimiter; +// // private int pattern;// 1:app,2:接口默认,3:服务接口配置限流策略 +// +// public RateLimitHandlerRedisImpl(RedisRateLimiter rateLimiter) { +// this.rateLimiter = rateLimiter; +// } +// +// @Override +// public void handle(RoutingContext rc) { +// try { +// String appCodeHeaderKey = rc.request().headers().get(AppConfigHandle.getAppCodeHeaderKey()); +// String apiCodeHeaderKey = rc.request().headers().get(AppConfigHandle.getApiCodeHeaderKey()); +// Strategy apiCodeStrategy = AppConfigHandle.getApiCurrentLimiting(appCodeHeaderKey + ":" + apiCodeHeaderKey); +// +// Strategy globalApiStrategy = AppConfigHandle.getGlobalApiCurrentLimitingConfig(appCodeHeaderKey); +// Strategy globalAppStrategy = AppConfigHandle.getGlobalAppCurrentLimitingConfig(appCodeHeaderKey); +// +// Strategy apiStrategy = apiCodeStrategy != null ? apiCodeStrategy +// : globalApiStrategy != null ? globalApiStrategy : null; +// Strategy appStrategy = globalAppStrategy != null ? globalAppStrategy : null; +// +// if (apiStrategy != null || appStrategy != null) { +// Map retMap = rateLimiter.acquire(rc, apiStrategy, appStrategy); +// +// if (apiStrategy != null && retMap.get(1) == false) { +// rc.fail(new HttpException(10015, apiStrategy.getDefaultResponse())); +// return; +// } +// +// if (appStrategy != null && retMap.get(2) == false) { +// rc.fail(new HttpException(10017, appStrategy.getDefaultResponse())); +// return; +// } +// } +// } catch (Exception e) { +// e.printStackTrace(); +// rc.fail(new HttpException(SacErrorCode.DEFAULT_ERROR_CODE)); +// return; +// } +// rc.next(); +// return; +// } +//} diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/RedisRateLimiter.java b/sf-vertx/src/main/java/com/sf/vertx/handle/RedisRateLimiter.java index e8c17db..520d253 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/RedisRateLimiter.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/RedisRateLimiter.java @@ -1,57 +1,57 @@ -package com.sf.vertx.handle; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.springframework.data.redis.core.RedisTemplate; - -import com.sf.vertx.api.pojo.Strategy; -import com.sf.vertx.constans.RedisKeyConfig; -import com.sf.vertx.utils.SpringUtils; - -import cn.hutool.core.thread.ThreadUtil; -import io.vertx.ext.web.RoutingContext; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class RedisRateLimiter { - public Map acquire(RoutingContext rc, Strategy apiStrategy, Strategy appStrategy) { - String appCodeHeaderKey = rc.request().headers().get(AppConfigHandle.getAppCodeHeaderKey()); - String key = null; - Map retMap = new HashMap<>(); - if (apiStrategy != null) { - key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCodeHeaderKey + ":" + rc.request().uri() - + ":" + rc.request().method(); - Boolean ret = rateLimiter(key, appCodeHeaderKey, apiStrategy.getThreshold(), apiStrategy.getTimeWindow()); - retMap.put(1, ret); - } - - if (appStrategy != null) { - key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCodeHeaderKey; - Boolean ret = rateLimiter(key, appCodeHeaderKey, appStrategy.getThreshold(), appStrategy.getTimeWindow()); - retMap.put(2, ret); - } - return retMap; - } - - @SuppressWarnings("unchecked") - private Boolean rateLimiter(String key, String appCode, Integer threshold, Integer timeWindow) { - RedisTemplate redisTemplate = SpringUtils.getBean("redisTemplate", RedisTemplate.class); - Integer count = redisTemplate.opsForValue().get(key); - // redisTemplate.delete(key); - log.info("redis limiter. key:{}, count:{}", key, count); - // 初始化,设置过期时间 - ThreadUtil.execAsync(() -> { - increment(timeWindow, redisTemplate, key); - }); - return (count == null || count <= threshold) ? true : false; - } - - private void increment(Integer timeWindow, RedisTemplate redisTemplate, String key) { - Long incr = redisTemplate.opsForValue().increment(key); - if (incr == 1) { // 创建,才设置时间窗口 - redisTemplate.expire(key, timeWindow, TimeUnit.SECONDS); - } - } -} +//package com.sf.vertx.handle; +// +//import java.util.HashMap; +//import java.util.Map; +//import java.util.concurrent.TimeUnit; +// +//import org.springframework.data.redis.core.RedisTemplate; +// +//import com.sf.vertx.api.pojo.Strategy; +//import com.sf.vertx.constans.RedisKeyConfig; +//import com.sf.vertx.utils.SpringUtils; +// +//import cn.hutool.core.thread.ThreadUtil; +//import io.vertx.ext.web.RoutingContext; +//import lombok.extern.slf4j.Slf4j; +// +//@Slf4j +//public class RedisRateLimiter { +// public Map acquire(RoutingContext rc, Strategy apiStrategy, Strategy appStrategy) { +// String appCodeHeaderKey = rc.request().headers().get(AppConfigHandle.getAppCodeHeaderKey()); +// String key = null; +// Map retMap = new HashMap<>(); +// if (apiStrategy != null) { +// key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCodeHeaderKey + ":" + rc.request().uri() +// + ":" + rc.request().method(); +// Boolean ret = rateLimiter(key, appCodeHeaderKey, apiStrategy.getThreshold(), apiStrategy.getTimeWindow()); +// retMap.put(1, ret); +// } +// +// if (appStrategy != null) { +// key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCodeHeaderKey; +// Boolean ret = rateLimiter(key, appCodeHeaderKey, appStrategy.getThreshold(), appStrategy.getTimeWindow()); +// retMap.put(2, ret); +// } +// return retMap; +// } +// +// @SuppressWarnings("unchecked") +// private Boolean rateLimiter(String key, String appCode, Integer threshold, Integer timeWindow) { +// RedisTemplate redisTemplate = SpringUtils.getBean("redisTemplate", RedisTemplate.class); +// Integer count = redisTemplate.opsForValue().get(key); +// // redisTemplate.delete(key); +// log.info("redis limiter. key:{}, count:{}", key, count); +// // 初始化,设置过期时间 +// ThreadUtil.execAsync(() -> { +// increment(timeWindow, redisTemplate, key); +// }); +// return (count == null || count <= threshold) ? true : false; +// } +// +// private void increment(Integer timeWindow, RedisTemplate redisTemplate, String key) { +// Long incr = redisTemplate.opsForValue().increment(key); +// if (incr == 1) { // 创建,才设置时间窗口 +// redisTemplate.expire(key, timeWindow, TimeUnit.SECONDS); +// } +// } +//} diff --git a/sf-vertx/src/main/java/com/sf/vertx/init/DynamicBuildServer.java b/sf-vertx/src/main/java/com/sf/vertx/init/DynamicBuildServer.java index a90bf63..ed03ada 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/init/DynamicBuildServer.java +++ b/sf-vertx/src/main/java/com/sf/vertx/init/DynamicBuildServer.java @@ -10,11 +10,12 @@ import org.springframework.stereotype.Component; import com.sf.vertx.api.pojo.VertxConfig; import com.sf.vertx.constans.RedisKeyConfig; +import com.sf.vertx.handle.ApiRateLimitHandler; import com.sf.vertx.handle.AppConfigHandle; +import com.sf.vertx.handle.AppRateLimitHandler; import com.sf.vertx.handle.BodyHandler; import com.sf.vertx.handle.ParameterCheckHandler; import com.sf.vertx.handle.ProxyHandler; -import com.sf.vertx.handle.RateLimitHandler; import com.sf.vertx.handle.RestfulFailureHandler; import com.sf.vertx.utils.ProxyTool; @@ -56,21 +57,21 @@ public class DynamicBuildServer implements ApplicationRunner { redisKeyConfig.init(); // 从redis同步vertx配置 AppConfigHandle.initVertxConfig(redisTemplate); + // 加载vertx、应用配置 appStartLoadData(); } - - /*** * 应用启动, 从redis读取配置,初始化vertx服务 - * @throws Exception + * + * @throws Exception */ private void appStartLoadData() throws Exception { Vertx vertx = AppConfigHandle.createVertx(); // 从redis同步app配置 AppConfigHandle.initAllAppConfig(redisTemplate); - + VertxConfig vertxConfig = AppConfigHandle.getVertxConfig(); // 创建HTTP监听 // 所有ip都能访问 @@ -120,8 +121,12 @@ public class DynamicBuildServer implements ApplicationRunner { WebClient mainWebClient = WebClient.create(vertx); String rateLimitModel = vertxConfig.getRateLimitModel(); - mainHttpRouter.route().handler(ParameterCheckHandler.create()).handler(RateLimitHandler.create(rateLimitModel)).handler(BodyHandler.create()) - .handler(ProxyHandler.create(mainWebClient, proxy)).failureHandler(RestfulFailureHandler.create()); + rateLimitModel = "local"; + mainHttpRouter.route().handler(ParameterCheckHandler.create()) + .handler(AppRateLimitHandler.create(rateLimitModel)) + .handler(ApiRateLimitHandler.create(rateLimitModel)) + .handler(BodyHandler.create()).handler(ProxyHandler.create(mainWebClient, proxy)) + .failureHandler(RestfulFailureHandler.create()); // mainHttpRouter.route().handler(ProxyHandler.create(mainWebClient, proxy)); } diff --git a/sf-vertx/src/main/java/com/sf/vertx/pojo/SacCurrentLimiting.java b/sf-vertx/src/main/java/com/sf/vertx/pojo/SacCurrentLimiting.java new file mode 100644 index 0000000..c5c13b8 --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/pojo/SacCurrentLimiting.java @@ -0,0 +1,13 @@ +package com.sf.vertx.pojo; + +import com.sf.vertx.api.pojo.Strategy; + +import io.github.resilience4j.ratelimiter.RateLimiterRegistry; +import lombok.Data; + +@Data +public class SacCurrentLimiting { + + private RateLimiterRegistry registry; + private Strategy strategy; +} diff --git a/sf-vertx/src/test/java/com/sf/vertx/TestCircuitBreaker.java b/sf-vertx/src/test/java/com/sf/vertx/TestCircuitBreaker.java index 140d55d..1fe1ba0 100644 --- a/sf-vertx/src/test/java/com/sf/vertx/TestCircuitBreaker.java +++ b/sf-vertx/src/test/java/com/sf/vertx/TestCircuitBreaker.java @@ -1,20 +1,76 @@ package com.sf.vertx; +import org.junit.Test; + import com.sf.vertx.api.pojo.VertxConfig; import com.sf.vertx.handle.AppConfigHandle; import io.vertx.circuitbreaker.CircuitBreaker; import io.vertx.circuitbreaker.CircuitBreakerOptions; +import io.vertx.circuitbreaker.HalfOpenCircuitException; +import io.vertx.circuitbreaker.OpenCircuitException; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpMethod; +import io.vertx.core.impl.NoStackTraceThrowable; import lombok.extern.slf4j.Slf4j; @Slf4j public class TestCircuitBreaker { private static int port; + CircuitBreaker rateLimiter; + + @Test + public void rateLimiter() { + VertxOptions vertxOptions = new VertxOptions(); + Vertx VERTX = Vertx.vertx(vertxOptions); + // apiCode熔断 + rateLimiter = CircuitBreaker.create("rateLimiter-circuit-breaker", VERTX, + new CircuitBreakerOptions().setMaxFailures(2) // 最大失败数 + .setFailuresRollingWindow(5 * 1000) // 毫秒 + .setTimeout(-1) // 超时时间 + .setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback) + .setResetTimeout(-1) // 在开启状态下,尝试重试之前所需时间 + ).openHandler(v -> { + log.info(" Circuit open"); + }).halfOpenHandler(v -> { + log.info("Circuit halfOpen"); + }).closeHandler(v -> { + log.info("Circuit close"); + }); + + for (int i = 0; i < 20; i++) { + try { + Thread.sleep(2000L); + } catch (InterruptedException e) { + } + rateLimiter.executeWithFallback(promise -> { + promise.fail("1"); + }, v -> { + // 需要传递当前状态half-open , close, 还是统计失败次数 + //log.info(" executed when the circuit is opened:{}", v.getMessage()); + if (v instanceof HalfOpenCircuitException) { + log.info(" half open circuit"); + } else if (v instanceof OpenCircuitException) { + log.info(" open circuit"); + } else if (v instanceof NoStackTraceThrowable) { + log.info(" close circuit"); + } + return "3"; + }, ar -> { + // Do something with the result + log.info(" interface failed result.{} ", ar); + }); + } + + try { + Thread.sleep(5000L); + } catch (InterruptedException e) { + } + } + public static void main(String[] args) { VertxConfig vertxConfig = AppConfigHandle.getVertxConfig(); @@ -33,7 +89,7 @@ public class TestCircuitBreaker { }); for (int i = 0; i < 20; i++) { port = 9199; - if(i % 2 == 0) { + if (i % 2 == 0) { port = 9198; log.info("i:{},port:{}", i, port); }