package com.sf.vertx.handle; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.core.RedisTemplate; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.TypeReference; import com.sf.vertx.api.pojo.ApiConfig; import com.sf.vertx.api.pojo.AppConfig; import com.sf.vertx.api.pojo.Node; import com.sf.vertx.api.pojo.RouteContent; import com.sf.vertx.api.pojo.SacService; import com.sf.vertx.api.pojo.Strategy; import com.sf.vertx.api.pojo.VertxConfig; import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing; import com.sf.vertx.constans.RedisKeyConfig; import com.sf.vertx.utils.ProxyTool; import cn.hutool.core.collection.ConcurrentHashSet; import io.vertx.circuitbreaker.CircuitBreaker; import io.vertx.circuitbreaker.CircuitBreakerOptions; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import lombok.extern.slf4j.Slf4j; /*** * vertx配置维护 * * @author xy * */ @Slf4j public class AppConfigHandle { private static VertxConfig VERTX_CONFIG = new VertxConfig(); public static Vertx VERTX; public static CircuitBreaker CONNECTION_CIRCUIT_BREAKER; // global cache app config private static final ConcurrentHashMap CACHE_APP_CONFIG_MAP = new ConcurrentHashMap<>(); // global app config appCode - Strategy private static final ConcurrentHashMap GLOBAL_APP_CURRENT_LIMITING_CONFIG_MAP = new ConcurrentHashMap<>(); // global api config appCode - Strategy private static final ConcurrentHashMap GLOBAL_API_CURRENT_LIMITING_CONFIG_MAP = new ConcurrentHashMap<>(); // appCode:apiCode:SacLoadBalancing private static ConcurrentHashMap LOADBALANCING_MAP = new ConcurrentHashMap<>(); // appCode:apiCode - ApiConfig private static ConcurrentHashMap APICODE_CONFIG_MAP = new ConcurrentHashMap<>(); // apiCode限流配置 appCode:apiCode - Strategy private static ConcurrentHashMap APICODE_CONFIG_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>(); // apiCode熔断配置 appCode:apiCode - CircuitBreaker private static ConcurrentHashMap APICODE_CONFIG_CIRCUIT_BREAKER_MAP = new ConcurrentHashMap<>(); // 禁用appCode private static ConcurrentHashSet DISABLED_APPCODE = new ConcurrentHashSet(); public static void addDisabledAppcode(String appCode) { DISABLED_APPCODE.add(appCode); } public static boolean isDisabledAppcode(String appCode) { return DISABLED_APPCODE.contains(appCode); } public static Strategy getGlobalAppCurrentLimitingConfig(String appCode) { return GLOBAL_APP_CURRENT_LIMITING_CONFIG_MAP.get(appCode); } public static Strategy getGlobalApiCurrentLimitingConfig(String appCode) { return GLOBAL_API_CURRENT_LIMITING_CONFIG_MAP.get(appCode); } public static AppConfig getAppConfig(String appCode) { return CACHE_APP_CONFIG_MAP.get(appCode); } public static boolean isDataSecurity(String appCode) { return CACHE_APP_CONFIG_MAP.get(appCode) != null && CACHE_APP_CONFIG_MAP.get(appCode).getDataSecurity() != null ? true : false; } public static boolean isApiCodeCircuitBreaker(String key) { return APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key) != null ? true : false; } public static CircuitBreaker getApiCodeCircuitBreaker(String key) { return APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key); } public static Strategy getApiCurrentLimiting(String key) { return APICODE_CONFIG_CURRENT_LIMITING_MAP.get(key); } public static VertxConfig getVertxConfig() { return VERTX_CONFIG; } public static String getAppCodeHeaderKey() { return VERTX_CONFIG.getAppCodeHeaderKey(); } public static String getApiCodeHeaderKey() { return VERTX_CONFIG.getApiCodeHeaderKey(); } public static SacLoadBalancing getLoadBalancing(String key) { return LOADBALANCING_MAP.get(key); } public static ApiConfig getApicodeConfigMap(String key) { return APICODE_CONFIG_MAP.get(key); } public static boolean isApicodeUri(String key, String uri) { return StringUtils.equals(APICODE_CONFIG_MAP.get(key).getUri(), uri); } /*** * 从redis加载数据 * * @throws Exception */ public static void initAllAppConfig(RedisTemplate redisTemplate) throws Exception { Set set = redisTemplate.opsForZSet().range(RedisKeyConfig.APP_CONFIG_SET_KEY, 0, -1); for (String appCode : set) { AppConfigHandle.initAppConfig(redisTemplate, appCode, false); } } /*** * 加载vertx配置 */ public static void initVertxConfig(RedisTemplate redisTemplate) { String vertxConfigKey = RedisKeyConfig.VERTX_CONFIG_STRING_KEY; String vertxConfigValue = redisTemplate.opsForValue().get(vertxConfigKey); if (StringUtils.isNotBlank(vertxConfigValue)) { VERTX_CONFIG = JSONObject.parseObject(vertxConfigValue, VertxConfig.class); } } private static void delAppConfigCache(String appCode) { AppConfig appConfig = CACHE_APP_CONFIG_MAP.get(appCode); if(appConfig != null) { // app、api默认限流 GLOBAL_API_CURRENT_LIMITING_CONFIG_MAP.remove(appCode); GLOBAL_APP_CURRENT_LIMITING_CONFIG_MAP.remove(appCode); for (SacService sacService : appConfig.getService()) { if (sacService.getApiConfig() != null && sacService.getApiConfig().size() > 0) { for (ApiConfig apiConfig : sacService.getApiConfig()) { String key = appCode + ":" + apiConfig.getApiCode(); APICODE_CONFIG_MAP.remove(key); LOADBALANCING_MAP.remove(key); APICODE_CONFIG_CURRENT_LIMITING_MAP.remove(key); String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER"; CircuitBreaker circuitBreaker = APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(keyCircuitBreaker); if(circuitBreaker != null) { circuitBreaker.close(); APICODE_CONFIG_CIRCUIT_BREAKER_MAP.remove(keyCircuitBreaker); } } } } // 应用配置 CACHE_APP_CONFIG_MAP.remove(appCode); } } public static void initAppConfig(RedisTemplate redisTemplate, String appCode, boolean isDelLocalCache) { // 是否需要先删除 if(isDelLocalCache) { delAppConfigCache(appCode); } String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appCode; String appCodeValue = redisTemplate.opsForValue().get(appCodeKey); if (StringUtils.isNotBlank(appCodeValue)) { AppConfig appConfig = JSON.parseObject(appCodeValue, new TypeReference() { }); CACHE_APP_CONFIG_MAP.put(appCode, appConfig); // app、api默认限流 if (appConfig.getApiCurrentLimitingConfig() != null) { GLOBAL_API_CURRENT_LIMITING_CONFIG_MAP.put(appCode, appConfig.getApiCurrentLimitingConfig()); } if (appConfig.getAppCurrentLimitingConfig() != null) { GLOBAL_APP_CURRENT_LIMITING_CONFIG_MAP.put(appCode, appConfig.getAppCurrentLimitingConfig()); } // app router负载均衡 for (SacService sacService : appConfig.getService()) { List nodeList = new ArrayList<>(); // 获取service模式 if (StringUtils.equals(sacService.getServiceModel(), "NORMAL")) { Node node = new Node(); node.setIp(sacService.getServerAddress().getHost()); node.setPort(sacService.getServerAddress().getPort()); node.setWeight(0); node.setProtocol(sacService.getServerAddress().getProtocol()); nodeList.add(node); } else if (StringUtils.equals(sacService.getServiceModel(), "ROUTE")) { if (sacService.getRouteConfig() != null && StringUtils.equals(sacService.getRouteConfig().getRouteType(), "WEIGHT_ROUTE")) { for (RouteContent routeContent : sacService.getRouteConfig().getRouteContent()) { Node node = new Node(); node.setIp(routeContent.getServerAddress().getHost()); node.setPort(routeContent.getServerAddress().getPort()); node.setWeight(routeContent.getWeight() != null && routeContent.getWeight() > 0 ? routeContent.getWeight() : 0); node.setProtocol(routeContent.getServerAddress().getProtocol()); nodeList.add(node); } } } // 初始化apiConfig if (sacService.getApiConfig() != null && sacService.getApiConfig().size() > 0) { for (ApiConfig apiConfig : sacService.getApiConfig()) { String key = appCode + ":" + apiConfig.getApiCode(); APICODE_CONFIG_MAP.put(key, apiConfig); if (nodeList.size() > 0) { // 初始化负载均衡算法 SacLoadBalancing sacLoadBalancing = ProxyTool.roundRobin(nodeList); LOADBALANCING_MAP.put(key, sacLoadBalancing); } if (apiConfig.getStrategy() != null && apiConfig.getStrategy().size() > 0) { for (Strategy strategy : apiConfig.getStrategy()) { if (StringUtils.equals(strategy.getType(), "CURRENT_LIMITING")) { APICODE_CONFIG_CURRENT_LIMITING_MAP.put(key, strategy); } else if (StringUtils.equals(strategy.getType(), "CIRCUIT_BREAKER")) { String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER"; // interfaceBreaker = CircuitBreaker.create("interfaceBreaker", VERTX, // new CircuitBreakerOptions().setMaxFailures(3).setMaxRetries(5).setTimeout(2000) // .setFallbackOnFailure(true) // ).openHandler(v -> { // log.info("Circuit opened"); // }).closeHandler(v -> { // log.info("Circuit closed"); // });//.retryPolicy(retryCount -> retryCount * 100L); // apiCode熔断 CircuitBreaker circuitBreaker = CircuitBreaker.create(keyCircuitBreaker + "-circuit-breaker", VERTX, new CircuitBreakerOptions().setMaxFailures(strategy.getThreshold()) // 最大失败数 .setFailuresRollingWindow(strategy.getTimeWindow() * 1000) // 毫秒 // .setTimeout(2000) // 超时时间 .setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback) .setResetTimeout(strategy.getRecovery_interval()) // 在开启状态下,尝试重试之前所需时间 ).openHandler(v -> { log.info(keyCircuitBreaker + " Circuit open"); }).halfOpenHandler(v -> { log.info(keyCircuitBreaker + "Circuit halfOpen"); }).closeHandler(v -> { log.info(keyCircuitBreaker + "Circuit close"); }); APICODE_CONFIG_CIRCUIT_BREAKER_MAP.put(keyCircuitBreaker, circuitBreaker); } } } } } } } } public static Vertx createVertx() { // TODO 编解码线程池,后面优化协程等方式 VertxOptions vertxOptions = new VertxOptions(); loadVertxOptions(vertxOptions); VERTX = Vertx.vertx(vertxOptions); initConnectionCircuitBreaker(); return VERTX; } /*** * 初始化connection Breaker */ private static void initConnectionCircuitBreaker() { CONNECTION_CIRCUIT_BREAKER = CircuitBreaker.create("connectionCircuitBreaker-circuit-breaker", VERTX, new CircuitBreakerOptions().setMaxFailures(3) // 最大失败数 .setTimeout(2000) // 超时时间 .setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback) .setResetTimeout(10000) // 在开启状态下,尝试重试之前所需时间 ).openHandler(v -> { log.info("connectionCircuitBreaker Circuit open"); }).halfOpenHandler(v -> { log.info("connectionCircuitBreaker Circuit halfOpen"); }).closeHandler(v -> { log.info("connectionCircuitBreaker Circuit close"); }); } private static void loadVertxOptions(VertxOptions vertxOptions) { long blockedThreadCheckInterval = VERTX_CONFIG.getVertxOptionsConfig() == null ? -1 : VERTX_CONFIG.getVertxOptionsConfig().getBlockedThreadCheckInterval(); int workerPoolSize = VERTX_CONFIG == null || VERTX_CONFIG.getVertxOptionsConfig() == null ? -1 : VERTX_CONFIG.getVertxOptionsConfig().getWorkerPoolSize(); if (workerPoolSize != -1) { vertxOptions.setWorkerPoolSize(workerPoolSize); } // TODO blockedThreadCheckInterval = 1000000L; if (blockedThreadCheckInterval != -1) { vertxOptions.setBlockedThreadCheckInterval(blockedThreadCheckInterval); // 不打印Thread blocked 阻塞日志 } } }