package com.sf.vertx.handle; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.core.RedisTemplate; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.TypeReference; import com.sf.vertx.api.pojo.ApiConfig; import com.sf.vertx.api.pojo.AppConfig; import com.sf.vertx.api.pojo.Node; import com.sf.vertx.api.pojo.RouteContent; import com.sf.vertx.api.pojo.SacService; import com.sf.vertx.api.pojo.Strategy; import com.sf.vertx.api.pojo.VertxConfig; import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing; import com.sf.vertx.constans.RedisKeyConfig; import com.sf.vertx.pojo.SacCurrentLimiting; import com.sf.vertx.utils.ProxyTool; import cn.hutool.core.collection.ConcurrentHashSet; import io.github.resilience4j.ratelimiter.RateLimiterConfig; import io.github.resilience4j.ratelimiter.RateLimiterRegistry; import io.vertx.circuitbreaker.CircuitBreaker; import io.vertx.circuitbreaker.CircuitBreakerOptions; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.http.HttpHeaders; import lombok.extern.slf4j.Slf4j; /*** * vertx配置维护 * * @author xy * */ @Slf4j public class AppConfigHandle { private static VertxConfig VERTX_CONFIG = new VertxConfig(); public static Vertx VERTX; public static CircuitBreaker CONNECTION_CIRCUIT_BREAKER; // global cache app config private static final ConcurrentHashMap CACHE_APP_CONFIG_MAP = new ConcurrentHashMap<>(); // global api config appCode - RateLimiterRegistry private static final ConcurrentHashMap GLOBAL_API_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>(); // global app config appCode - Strategy private static final ConcurrentHashMap GLOBAL_APP_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>(); // appCode:apiCode:SacLoadBalancing private static ConcurrentHashMap LOADBALANCING_MAP = new ConcurrentHashMap<>(); // appCode:apiCode - ApiConfig private static ConcurrentHashMap APICODE_CONFIG_MAP = new ConcurrentHashMap<>(); // apiCode限流配置 appCode:apiCode - RateLimiterRegistry private static ConcurrentHashMap APICODE_CONFIG_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>(); // apiCode熔断配置 appCode:apiCode - CircuitBreaker private static ConcurrentHashMap APICODE_CONFIG_CIRCUIT_BREAKER_MAP = new ConcurrentHashMap<>(); // 禁用appCode private static ConcurrentHashSet DISABLED_APPCODE = new ConcurrentHashSet(); public static void addDisabledAppcode(String appCode) { DISABLED_APPCODE.add(appCode); } public static boolean isDisabledAppcode(String appCode) { return DISABLED_APPCODE.contains(appCode); } public static SacCurrentLimiting getGlobalAppCurrentLimitingConfig(String appCode) { return GLOBAL_APP_CURRENT_LIMITING_MAP.get(appCode); } public static AppConfig getAppConfig(String appCode) { return CACHE_APP_CONFIG_MAP.get(appCode); } public static boolean isDataSecurity(String appCode) { return CACHE_APP_CONFIG_MAP.get(appCode) != null && CACHE_APP_CONFIG_MAP.get(appCode).getDataSecurity() != null ? true : false; } public static boolean isApiCodeCircuitBreaker(String key) { return APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key) != null ? true : false; } public static CircuitBreaker getApiCodeCircuitBreaker(String key) { return APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key); } /*** * 是否解析, 走独立请求 * @return */ public static boolean isAnalysisBody(String appCode, String apiCode, String contentType) { String keyCircuitBreaker = appCode + ":" + apiCode + ":" + "CIRCUIT_BREAKER"; CircuitBreaker circuitBreaker = AppConfigHandle.getApiCodeCircuitBreaker(keyCircuitBreaker); boolean isDataSecurity = AppConfigHandle.isDataSecurity(appCode); // 文件上传不走加解密 return (isDataSecurity || circuitBreaker != null) && StringUtils.startsWith(contentType, "multipart") == false; } /*** * 优先apicode配置限流、无法找到匹配全局限流 * * @param appCode * @param apiCode * @return */ public static SacCurrentLimiting getApiCurrentLimiting(String appCode, String apiCode) { String key = appCode + ":" + apiCode; SacCurrentLimiting sacCurrentLimiting = APICODE_CONFIG_CURRENT_LIMITING_MAP.get(key) != null ? APICODE_CONFIG_CURRENT_LIMITING_MAP.get(key) : null; sacCurrentLimiting = sacCurrentLimiting != null ? sacCurrentLimiting : (GLOBAL_API_CURRENT_LIMITING_MAP.get(appCode) != null ? GLOBAL_API_CURRENT_LIMITING_MAP.get(appCode) : null); return sacCurrentLimiting; } public static VertxConfig getVertxConfig() { return VERTX_CONFIG; } public static String getAppCodeHeaderKey() { return VERTX_CONFIG.getAppCodeHeaderKey(); } public static String getApiCodeHeaderKey() { return VERTX_CONFIG.getApiCodeHeaderKey(); } public static SacLoadBalancing getLoadBalancing(String key) { return LOADBALANCING_MAP.get(key); } public static ApiConfig getApicodeConfigMap(String key) { return APICODE_CONFIG_MAP.get(key); } public static boolean isApicodeUri(String key, String uri) { return StringUtils.equals(APICODE_CONFIG_MAP.get(key).getUri(), uri); } private static RateLimiterRegistry createRateLimiter(Strategy strategy) { RateLimiterConfig config = RateLimiterConfig.custom() .limitRefreshPeriod(Duration.ofSeconds(strategy.getTimeWindow())) .limitForPeriod(strategy.getThreshold()).timeoutDuration(Duration.ofMillis(0)).build(); RateLimiterRegistry registry = RateLimiterRegistry.of(config); return registry; } private static void initRateLimiter(String appCode, Strategy strategy, ConcurrentHashMap map) { RateLimiterRegistry registry = createRateLimiter(strategy); SacCurrentLimiting sacCurrentLimiting = new SacCurrentLimiting(); sacCurrentLimiting.setStrategy(strategy); sacCurrentLimiting.setRegistry(registry); map.put(appCode, sacCurrentLimiting); } /*** * 从redis加载数据 * * @throws Exception */ public static void initAllAppConfig(RedisTemplate redisTemplate) throws Exception { Set set = redisTemplate.opsForZSet().range(RedisKeyConfig.APP_CONFIG_SET_KEY, 0, -1); for (String appCode : set) { AppConfigHandle.initAppConfig(redisTemplate, appCode, false); } } /*** * 加载vertx配置 */ public static void initVertxConfig(RedisTemplate redisTemplate) { String vertxConfigKey = RedisKeyConfig.VERTX_CONFIG_STRING_KEY; String vertxConfigValue = redisTemplate.opsForValue().get(vertxConfigKey); if (StringUtils.isNotBlank(vertxConfigValue)) { VERTX_CONFIG = JSONObject.parseObject(vertxConfigValue, VertxConfig.class); } } private static void delAppConfigCache(String appCode) { AppConfig appConfig = CACHE_APP_CONFIG_MAP.get(appCode); if (appConfig != null) { // app、api默认限流 GLOBAL_API_CURRENT_LIMITING_MAP.remove(appCode); GLOBAL_APP_CURRENT_LIMITING_MAP.remove(appCode); for (SacService sacService : appConfig.getService()) { if (sacService.getApiConfig() != null && sacService.getApiConfig().size() > 0) { for (ApiConfig apiConfig : sacService.getApiConfig()) { String key = appCode + ":" + apiConfig.getApiCode(); APICODE_CONFIG_MAP.remove(key); LOADBALANCING_MAP.remove(key); APICODE_CONFIG_CURRENT_LIMITING_MAP.remove(key); String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER"; CircuitBreaker circuitBreaker = APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(keyCircuitBreaker); if (circuitBreaker != null) { circuitBreaker.close(); APICODE_CONFIG_CIRCUIT_BREAKER_MAP.remove(keyCircuitBreaker); } } } } // 应用配置 CACHE_APP_CONFIG_MAP.remove(appCode); } } public static void initAppConfig(RedisTemplate redisTemplate, String appCode, boolean isDelLocalCache) { // 是否需要先删除 if (isDelLocalCache) { delAppConfigCache(appCode); } String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appCode; String appCodeValue = redisTemplate.opsForValue().get(appCodeKey); if (StringUtils.isNotBlank(appCodeValue)) { AppConfig appConfig = JSON.parseObject(appCodeValue, new TypeReference() { }); CACHE_APP_CONFIG_MAP.put(appCode, appConfig); // app、api默认限流 if (appConfig.getApiCurrentLimitingConfig() != null) { initRateLimiter(appCode, appConfig.getApiCurrentLimitingConfig(), GLOBAL_API_CURRENT_LIMITING_MAP); } if (appConfig.getAppCurrentLimitingConfig() != null) { initRateLimiter(appCode, appConfig.getAppCurrentLimitingConfig(), GLOBAL_APP_CURRENT_LIMITING_MAP); } // app router负载均衡 for (SacService sacService : appConfig.getService()) { List nodeList = new ArrayList<>(); // 获取service模式 if (StringUtils.equals(sacService.getServiceModel(), "NORMAL")) { Node node = new Node(); node.setIp(sacService.getServerAddress().getHost()); node.setPort(sacService.getServerAddress().getPort()); node.setWeight(0); node.setProtocol(sacService.getServerAddress().getProtocol()); nodeList.add(node); } else if (StringUtils.equals(sacService.getServiceModel(), "ROUTE")) { if (sacService.getRouteConfig() != null && StringUtils.equals(sacService.getRouteConfig().getRouteType(), "WEIGHT_ROUTE")) { for (RouteContent routeContent : sacService.getRouteConfig().getRouteContent()) { Node node = new Node(); node.setIp(routeContent.getServerAddress().getHost()); node.setPort(routeContent.getServerAddress().getPort()); node.setWeight(routeContent.getWeight() != null && routeContent.getWeight() > 0 ? routeContent.getWeight() : 0); node.setProtocol(routeContent.getServerAddress().getProtocol()); nodeList.add(node); } } } // 初始化apiConfig if (sacService.getApiConfig() != null && sacService.getApiConfig().size() > 0) { for (ApiConfig apiConfig : sacService.getApiConfig()) { String key = appCode + ":" + apiConfig.getApiCode(); APICODE_CONFIG_MAP.put(key, apiConfig); if (nodeList.size() > 0) { // 初始化负载均衡算法 SacLoadBalancing sacLoadBalancing = ProxyTool.roundRobin(nodeList); LOADBALANCING_MAP.put(key, sacLoadBalancing); } if (apiConfig.getStrategy() != null && apiConfig.getStrategy().size() > 0) { for (Strategy strategy : apiConfig.getStrategy()) { if (StringUtils.equals(strategy.getType(), "CURRENT_LIMITING")) { RateLimiterRegistry registry = createRateLimiter(strategy); SacCurrentLimiting sacCurrentLimiting = new SacCurrentLimiting(); sacCurrentLimiting.setStrategy(strategy); sacCurrentLimiting.setRegistry(registry); APICODE_CONFIG_CURRENT_LIMITING_MAP.put(key, sacCurrentLimiting); } else if (StringUtils.equals(strategy.getType(), "CIRCUIT_BREAKER")) { String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER"; // interfaceBreaker = CircuitBreaker.create("interfaceBreaker", VERTX, // new CircuitBreakerOptions().setMaxFailures(3).setMaxRetries(5).setTimeout(2000) // .setFallbackOnFailure(true) // ).openHandler(v -> { // log.info("Circuit opened"); // }).closeHandler(v -> { // log.info("Circuit closed"); // });//.retryPolicy(retryCount -> retryCount * 100L); // apiCode熔断 CircuitBreaker circuitBreaker = CircuitBreaker .create(keyCircuitBreaker + "-circuit-breaker", VERTX, new CircuitBreakerOptions().setMaxFailures(strategy.getThreshold()) // 最大失败数 .setFailuresRollingWindow(strategy.getTimeWindow() * 1000) // 毫秒 // .setTimeout(2000) // 超时时间 .setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback) .setResetTimeout(strategy.getRecovery_interval()) // 在开启状态下,尝试重试之前所需时间 ).openHandler(v -> { log.info(keyCircuitBreaker + " Circuit open"); }).halfOpenHandler(v -> { log.info(keyCircuitBreaker + "Circuit halfOpen"); }).closeHandler(v -> { log.info(keyCircuitBreaker + "Circuit close"); }); APICODE_CONFIG_CIRCUIT_BREAKER_MAP.put(keyCircuitBreaker, circuitBreaker); } } } } } } } } public static Vertx createVertx() { // TODO 编解码线程池,后面优化协程等方式 VertxOptions vertxOptions = new VertxOptions(); loadVertxOptions(vertxOptions); VERTX = Vertx.vertx(vertxOptions); initConnectionCircuitBreaker(); return VERTX; } /*** * 初始化connection Breaker */ private static void initConnectionCircuitBreaker() { CONNECTION_CIRCUIT_BREAKER = CircuitBreaker.create("connectionCircuitBreaker-circuit-breaker", VERTX, new CircuitBreakerOptions().setMaxFailures(3) // 最大失败数 .setTimeout(2000) // 超时时间 .setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback) .setResetTimeout(10000) // 在开启状态下,尝试重试之前所需时间 ).openHandler(v -> { log.info("connectionCircuitBreaker Circuit open"); }).halfOpenHandler(v -> { log.info("connectionCircuitBreaker Circuit halfOpen"); }).closeHandler(v -> { log.info("connectionCircuitBreaker Circuit close"); }); } private static void loadVertxOptions(VertxOptions vertxOptions) { long blockedThreadCheckInterval = VERTX_CONFIG.getVertxOptionsConfig() == null ? -1 : VERTX_CONFIG.getVertxOptionsConfig().getBlockedThreadCheckInterval(); int workerPoolSize = VERTX_CONFIG == null || VERTX_CONFIG.getVertxOptionsConfig() == null ? -1 : VERTX_CONFIG.getVertxOptionsConfig().getWorkerPoolSize(); if (workerPoolSize != -1) { vertxOptions.setWorkerPoolSize(workerPoolSize); } // TODO blockedThreadCheckInterval = 1000000L; if (blockedThreadCheckInterval != -1) { vertxOptions.setBlockedThreadCheckInterval(blockedThreadCheckInterval); // 不打印Thread blocked 阻塞日志 } } }