package com.sf.vertx.handle; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.core.RedisTemplate; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.TypeReference; import com.hazelcast.config.Config; import com.hazelcast.config.JoinConfig; import com.hazelcast.config.ManagementCenterConfig; import com.hazelcast.config.NetworkConfig; import com.hazelcast.config.TcpIpConfig; import com.sf.vertx.api.pojo.ApiConfig; import com.sf.vertx.api.pojo.AppConfig; import com.sf.vertx.api.pojo.Node; import com.sf.vertx.api.pojo.RouteContent; import com.sf.vertx.api.pojo.SacService; import com.sf.vertx.api.pojo.Strategy; import com.sf.vertx.api.pojo.VertxConfig; import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing; import com.sf.vertx.constans.RedisKeyConfig; import com.sf.vertx.init.SacVertxConfig; import com.sf.vertx.pojo.ClusterEventMsg; import com.sf.vertx.pojo.SacCurrentLimiting; import com.sf.vertx.utils.ProxyTool; import cn.hutool.core.collection.ConcurrentHashSet; import io.github.resilience4j.ratelimiter.RateLimiterConfig; import io.github.resilience4j.ratelimiter.RateLimiterRegistry; import io.vertx.circuitbreaker.CircuitBreaker; import io.vertx.circuitbreaker.CircuitBreakerOptions; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerOptions; import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.ext.web.Route; import io.vertx.ext.web.Router; import io.vertx.ext.web.client.WebClient; import io.vertx.httpproxy.HttpProxy; import io.vertx.httpproxy.ProxyContext; import io.vertx.httpproxy.ProxyInterceptor; import io.vertx.httpproxy.ProxyResponse; import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager; import lombok.extern.slf4j.Slf4j; /*** * vertx配置维护 * * @author xy * */ @Slf4j public class AppConfigHandler { private static VertxConfig VERTX_CONFIG = new VertxConfig(); public static Vertx VERTX; private static SacVertxConfig sacVertxConfig; private static RedisTemplate redisTemplate; public static CircuitBreaker CONNECTION_CIRCUIT_BREAKER; // global cache app config private static final ConcurrentHashMap CACHE_APP_CONFIG_MAP = new ConcurrentHashMap<>(); // global api config appCode - RateLimiterRegistry private static final ConcurrentHashMap GLOBAL_API_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>(); // global app config appCode - Strategy private static final ConcurrentHashMap GLOBAL_APP_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>(); // appCode:apiCode:SacLoadBalancing private static ConcurrentHashMap LOADBALANCING_MAP = new ConcurrentHashMap<>(); // appCode:apiCode - ApiConfig private static ConcurrentHashMap APICODE_CONFIG_MAP = new ConcurrentHashMap<>(); // apiCode限流配置 appCode:apiCode - RateLimiterRegistry private static ConcurrentHashMap APICODE_CONFIG_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>(); // apiCode熔断配置 appCode:apiCode - CircuitBreaker private static ConcurrentHashMap APICODE_CONFIG_CIRCUIT_BREAKER_MAP = new ConcurrentHashMap<>(); // 禁用appCode private static ConcurrentHashSet DISABLED_APPCODE = new ConcurrentHashSet(); public static Integer requestModel() { return sacVertxConfig.getRequestModel(); } public static String rpcUri() { return sacVertxConfig.getRpcUri(); } public static void addDisabledAppcode(String appCode) { DISABLED_APPCODE.add(appCode); } public static boolean isDisabledAppcode(String appCode) { return DISABLED_APPCODE.contains(appCode); } public static SacCurrentLimiting getGlobalAppCurrentLimitingConfig(String appCode) { return GLOBAL_APP_CURRENT_LIMITING_MAP.get(appCode); } public static AppConfig getAppConfig(String appCode) { return CACHE_APP_CONFIG_MAP.get(appCode); } public static void init(RedisTemplate _redisTemplate, SacVertxConfig _sacVertxConfig) { redisTemplate = _redisTemplate; sacVertxConfig = _sacVertxConfig; } public static boolean isDataSecurity(String appCode) { return CACHE_APP_CONFIG_MAP.get(appCode) != null && CACHE_APP_CONFIG_MAP.get(appCode).getDataSecurity() != null ? true : false; } public static boolean isApiCodeCircuitBreaker(String key) { return APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key) != null ? true : false; } public static CircuitBreaker getApiCodeCircuitBreaker(String key) { return APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key); } /*** * 是否解析, 走独立请求 * * @return */ public static boolean isAnalysisBody(String appCode, String apiCode, String contentType) { String keyCircuitBreaker = appCode + ":" + apiCode + ":" + "CIRCUIT_BREAKER"; CircuitBreaker circuitBreaker = AppConfigHandler.getApiCodeCircuitBreaker(keyCircuitBreaker); boolean isDataSecurity = AppConfigHandler.isDataSecurity(appCode); // 文件上传不走加解密 return (isDataSecurity || circuitBreaker != null) && StringUtils.startsWith(contentType, "multipart") == false; } /*** * 优先apicode配置限流、无法找到匹配全局限流 * * @param appCode * @param apiCode * @return */ public static SacCurrentLimiting getApiCurrentLimiting(String appCode, String apiCode) { String key = appCode + ":" + apiCode; SacCurrentLimiting sacCurrentLimiting = APICODE_CONFIG_CURRENT_LIMITING_MAP.get(key) != null ? APICODE_CONFIG_CURRENT_LIMITING_MAP.get(key) : null; sacCurrentLimiting = sacCurrentLimiting != null ? sacCurrentLimiting : (GLOBAL_API_CURRENT_LIMITING_MAP.get(appCode) != null ? GLOBAL_API_CURRENT_LIMITING_MAP.get(appCode) : null); return sacCurrentLimiting; } public static VertxConfig getVertxConfig() { return VERTX_CONFIG; } public static String getAppCodeHeaderKey() { return VERTX_CONFIG.getAppCodeHeaderKey(); } public static String getApiCodeHeaderKey() { return VERTX_CONFIG.getApiCodeHeaderKey(); } public static SacLoadBalancing getLoadBalancing(String key) { return LOADBALANCING_MAP.get(key); } public static ApiConfig getApicodeConfigMap(String key) { return APICODE_CONFIG_MAP.get(key); } public static boolean isApicodeUri(String key, String uri, String httpMethod) { return StringUtils.equals(APICODE_CONFIG_MAP.get(key).getUri(), uri) && StringUtils.equals(httpMethod, APICODE_CONFIG_MAP.get(key).getMethod()); } public static String mock(String key) { return APICODE_CONFIG_MAP.get(key).getMockResponse(); } private static RateLimiterRegistry createRateLimiter(Strategy strategy) { RateLimiterConfig config = RateLimiterConfig.custom() .limitRefreshPeriod(Duration.ofSeconds(strategy.getTimeWindow())) .limitForPeriod(strategy.getThreshold()).timeoutDuration(Duration.ofMillis(0)).build(); RateLimiterRegistry registry = RateLimiterRegistry.of(config); return registry; } private static void initRateLimiter(String appCode, Strategy strategy, ConcurrentHashMap map) { RateLimiterRegistry registry = createRateLimiter(strategy); SacCurrentLimiting sacCurrentLimiting = new SacCurrentLimiting(); sacCurrentLimiting.setStrategy(strategy); sacCurrentLimiting.setRegistry(registry); map.put(appCode, sacCurrentLimiting); } /*** * 从redis加载数据 * * @throws Exception */ public static void initAllAppConfig() { Set set = redisTemplate.opsForZSet().range(RedisKeyConfig.APP_CONFIG_SET_KEY, 0, -1); for (String appCode : set) { AppConfigHandler.initAppConfig(appCode, false); } } /*** * 加载vertx配置 */ public static void initVertxConfig() { String vertxConfigKey = RedisKeyConfig.VERTX_CONFIG_STRING_KEY; String vertxConfigValue = redisTemplate.opsForValue().get(vertxConfigKey); if (StringUtils.isNotBlank(vertxConfigValue)) { VERTX_CONFIG = JSONObject.parseObject(vertxConfigValue, VertxConfig.class); } } private static void delAppConfigCache(String appCode) { AppConfig appConfig = CACHE_APP_CONFIG_MAP.get(appCode); if (appConfig != null) { // app、api默认限流 GLOBAL_API_CURRENT_LIMITING_MAP.remove(appCode); GLOBAL_APP_CURRENT_LIMITING_MAP.remove(appCode); for (SacService sacService : appConfig.getService()) { if (sacService.getApiConfig() != null && sacService.getApiConfig().size() > 0) { for (ApiConfig apiConfig : sacService.getApiConfig()) { String key = appCode + ":" + apiConfig.getApiCode(); APICODE_CONFIG_MAP.remove(key); LOADBALANCING_MAP.remove(key); APICODE_CONFIG_CURRENT_LIMITING_MAP.remove(key); String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER"; CircuitBreaker circuitBreaker = APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(keyCircuitBreaker); if (circuitBreaker != null) { circuitBreaker.close(); APICODE_CONFIG_CIRCUIT_BREAKER_MAP.remove(keyCircuitBreaker); } } } } // 应用配置 CACHE_APP_CONFIG_MAP.remove(appCode); } } public static void initAppConfig(String appCode, boolean isDelLocalCache) { // 是否需要先删除 if (isDelLocalCache) { delAppConfigCache(appCode); } String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appCode; String appCodeValue = redisTemplate.opsForValue().get(appCodeKey); if (StringUtils.isNotBlank(appCodeValue)) { AppConfig appConfig = JSON.parseObject(appCodeValue, new TypeReference() { }); CACHE_APP_CONFIG_MAP.put(appCode, appConfig); // app、api默认限流 if (appConfig.getApiCurrentLimitingConfig() != null) { initRateLimiter(appCode, appConfig.getApiCurrentLimitingConfig(), GLOBAL_API_CURRENT_LIMITING_MAP); } if (appConfig.getAppCurrentLimitingConfig() != null) { initRateLimiter(appCode, appConfig.getAppCurrentLimitingConfig(), GLOBAL_APP_CURRENT_LIMITING_MAP); } // app router负载均衡 for (SacService sacService : appConfig.getService()) { List nodeList = new ArrayList<>(); // 获取service模式 if (StringUtils.equals(sacService.getServiceModel(), "NORMAL")) { Node node = new Node(); node.setIp(sacService.getServerAddress().getHost()); node.setPort(sacService.getServerAddress().getPort()); node.setWeight(0); node.setProtocol(sacService.getServerAddress().getProtocol()); nodeList.add(node); } else if (StringUtils.equals(sacService.getServiceModel(), "ROUTE")) { if (sacService.getRouteConfig() != null && StringUtils.equals(sacService.getRouteConfig().getRouteType(), "WEIGHT_ROUTE")) { for (RouteContent routeContent : sacService.getRouteConfig().getRouteContent()) { Node node = new Node(); node.setIp(routeContent.getServerAddress().getHost()); node.setPort(routeContent.getServerAddress().getPort()); node.setWeight(routeContent.getWeight() != null && routeContent.getWeight() > 0 ? routeContent.getWeight() : 0); node.setProtocol(routeContent.getServerAddress().getProtocol()); nodeList.add(node); } } } // 初始化apiConfig if (sacService.getApiConfig() != null && sacService.getApiConfig().size() > 0) { for (ApiConfig apiConfig : sacService.getApiConfig()) { String key = appCode + ":" + apiConfig.getApiCode(); APICODE_CONFIG_MAP.put(key, apiConfig); if (nodeList.size() > 0) { // 初始化负载均衡算法 SacLoadBalancing sacLoadBalancing = ProxyTool.roundRobin(nodeList); LOADBALANCING_MAP.put(key, sacLoadBalancing); } if (apiConfig.getStrategy() != null && apiConfig.getStrategy().size() > 0) { for (Strategy strategy : apiConfig.getStrategy()) { if (StringUtils.equals(strategy.getType(), "CURRENT_LIMITING")) { RateLimiterRegistry registry = createRateLimiter(strategy); SacCurrentLimiting sacCurrentLimiting = new SacCurrentLimiting(); sacCurrentLimiting.setStrategy(strategy); sacCurrentLimiting.setRegistry(registry); APICODE_CONFIG_CURRENT_LIMITING_MAP.put(key, sacCurrentLimiting); } else if (StringUtils.equals(strategy.getType(), "CIRCUIT_BREAKER")) { String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER"; // interfaceBreaker = CircuitBreaker.create("interfaceBreaker", VERTX, // new CircuitBreakerOptions().setMaxFailures(3).setMaxRetries(5).setTimeout(2000) // .setFallbackOnFailure(true) // ).openHandler(v -> { // log.info("Circuit opened"); // }).closeHandler(v -> { // log.info("Circuit closed"); // });//.retryPolicy(retryCount -> retryCount * 100L); // apiCode熔断 CircuitBreaker circuitBreaker = CircuitBreaker .create(keyCircuitBreaker + "-circuit-breaker", VERTX, new CircuitBreakerOptions().setMaxFailures(strategy.getThreshold()) // 最大失败数 .setFailuresRollingWindow(strategy.getTimeWindow() * 1000) // 毫秒 // .setTimeout(2000) // 超时时间 .setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback) .setResetTimeout(strategy.getRecovery_interval()) // 在开启状态下,尝试重试之前所需时间 ).openHandler(v -> { log.info(keyCircuitBreaker + " Circuit open"); }).halfOpenHandler(v -> { log.info(keyCircuitBreaker + "Circuit halfOpen"); }).closeHandler(v -> { log.info(keyCircuitBreaker + "Circuit close"); }); APICODE_CONFIG_CIRCUIT_BREAKER_MAP.put(keyCircuitBreaker, circuitBreaker); } } } } } } } } public static void createVertx() { // TODO 编解码线程池,后面优化协程等方式 VertxOptions vertxOptions = new VertxOptions(); loadVertxOptions(vertxOptions); VERTX = Vertx.vertx(vertxOptions); initConnectionCircuitBreaker(); createVertxRouter(); } private static Config hazelcastConfig(SacVertxConfig sacVertxConfig) { // 集群 Config hazelcastConfig = new Config(); hazelcastConfig.setClusterName(sacVertxConfig.getClusterName()); // 集群名字 NetworkConfig networkConfig = new NetworkConfig(); networkConfig.setPort(5701); networkConfig.setPortAutoIncrement(true); JoinConfig join = new JoinConfig(); TcpIpConfig tcpIpConfig = new TcpIpConfig(); tcpIpConfig.setEnabled(true); String[] clusterIps = sacVertxConfig.getClusterIp().split(","); List members = Arrays.asList(clusterIps); tcpIpConfig.setMembers(members); join.setTcpIpConfig(tcpIpConfig); networkConfig.setJoin(join); hazelcastConfig.setNetworkConfig(networkConfig); // TODO 还有问题,不会使用 ManagementCenterConfig managementCenterConfig = new ManagementCenterConfig(); Set interfaces = new HashSet<>(); interfaces.add("http://192.168.1.68:8080/mancenter"); managementCenterConfig.setTrustedInterfaces(interfaces); hazelcastConfig.setManagementCenterConfig(managementCenterConfig); return hazelcastConfig; } public static Vertx createHazelcastClusterVertx() { Config hazelcastConfig = hazelcastConfig(sacVertxConfig); ClusterManager hazelcastClusterManager = new HazelcastClusterManager(hazelcastConfig); // TODO 编解码线程池,后面优化协程等方式 VertxOptions vertxOptions = new VertxOptions(); loadVertxOptions(vertxOptions); vertxOptions.setClusterManager(hazelcastClusterManager); Vertx.clusteredVertx(vertxOptions, res -> { if (res.succeeded()) { VERTX = res.result(); log.info("hazelcastClusterManager create success"); initConnectionCircuitBreaker(); createVertxRouter(); // 订阅消息 VERTX.eventBus().consumer("sac_cluster_event", message -> { if (message.body() != null) { ClusterEventMsg msg = JSONObject.parseObject(message.body().toString(), ClusterEventMsg.class); log.info("Received message: {}", msg); // message.reply("我是返回数据===" + message.body()); if (msg.getType() == 1) { if (msg.getOperation() == 1) { // 初始化AppConfig本地缓存 AppConfigHandler.initAppConfig(msg.getAppCode(), true); } else if (msg.getOperation() == 3) { // 禁用本地缓存 AppConfigHandler.addDisabledAppcode(msg.getAppCode()); } } } }); } else { res.cause().printStackTrace(); log.info("hazelcastClusterManager create failure"); } }); return VERTX; } /*** * 发布消息,订阅消息 * * @param msg */ public static void publishClusterEventMsg(ClusterEventMsg msg) { VERTX.eventBus().publish("sac_cluster_event", JSONObject.toJSONString(msg)); } private static void createVertxRouter() { // consul初始化 // ConsulHandler.init(vertx); // ConsulHandler.init1(vertx); // 从redis同步app配置 initAllAppConfig(); VertxConfig vertxConfig = AppConfigHandler.getVertxConfig(); // 创建HTTP监听 // 所有ip都能访问 HttpServerOptions httpServerOptions = new HttpServerOptions().setHost("0.0.0.0"); HttpServer server = VERTX.createHttpServer(httpServerOptions); Router mainHttpRouter = Router.router(VERTX); Integer serverPort = vertxConfig.getPort() == null ? sacVertxConfig.getPort() : vertxConfig.getPort(); log.info("serverPort:{}", serverPort); server.requestHandler(mainHttpRouter).listen(serverPort, h -> { if (h.succeeded()) { log.info("HTTP端口监听成功:{}", serverPort); } else { log.error("HTTP端口监听失败:{}", serverPort); } }); // HttpClientOptions clientOptions = new HttpClientOptions(); // clientOptions.setMaxPoolSize(20); // 最大连接池大小 // clientOptions.setConnectTimeout(2000); // 连接超时 毫秒 // clientOptions.setHttp2KeepAliveTimeout(1); // clientOptions.setIdleTimeout(1000); // 连接空闲超时 毫秒 // HttpClient proxyClient = VERTX.createHttpClient(clientOptions); HttpClient proxyClient = VERTX.createHttpClient(); HttpProxy proxy = HttpProxy.reverseProxy(proxyClient); proxy.originSelector(request -> Future.succeededFuture(ProxyTool.resolveOriginAddress(request))); proxy.addInterceptor(new ProxyInterceptor() { @Override public Future handleProxyRequest(ProxyContext context) { // if(StringUtils.equals(sacAppHeaderKey, "dsafdsfadafhappC")) { // // 会跳转到 RestfulFailureHandlerImpl // throw new HttpException(10003); // } if (AppConfigHandler.requestModel() == 2) { String appCode = context.request().headers().get(getAppCodeHeaderKey()); String apiCode = context.request().headers().get(getApiCodeHeaderKey()); String key = appCode + ":" + apiCode; String uri = APICODE_CONFIG_MAP.get(key).getUri(); String method = APICODE_CONFIG_MAP.get(key).getMethod(); context.request().setURI(uri).setMethod(HttpMethod.valueOf(method)); } return context.sendRequest(); } @Override public Future handleProxyResponse(ProxyContext context) { // 调试代码,获取reponse body // Filter filter = new Filter(); // ProxyResponse proxyResponse = context.response(); // Body body = proxyResponse.getBody(); // proxyResponse.setBody(Body.body(filter.init(context.request().getURI(), body.stream(), false))); // 继续拦截链 return context.sendResponse(); } }); WebClient mainWebClient = WebClient.create(VERTX); String rateLimitModel = vertxConfig.getRateLimitModel(); rateLimitModel = "local"; Route route = null; if (requestModel() == 2) { route = mainHttpRouter.route(rpcUri()); } else { route = mainHttpRouter.route(); } route.handler(ParameterCheckHandler.create()) .handler(AppRateLimitHandler.create(rateLimitModel)).handler(ApiRateLimitHandler.create(rateLimitModel)) .handler(BodyHandler.create()).handler(ProxyHandler.create(mainWebClient, proxy)) .failureHandler(RestfulFailureHandler.create()); // mainHttpRouter.route().handler(ProxyHandler.create(mainWebClient, proxy)); } /*** * 初始化connection Breaker */ private static void initConnectionCircuitBreaker() { CONNECTION_CIRCUIT_BREAKER = CircuitBreaker.create("connectionCircuitBreaker-circuit-breaker", VERTX, new CircuitBreakerOptions().setMaxFailures(3) // 最大失败数 .setTimeout(2000) // 超时时间 .setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback) .setResetTimeout(10000) // 在开启状态下,尝试重试之前所需时间 ).openHandler(v -> { log.info("connectionCircuitBreaker Circuit open"); }).halfOpenHandler(v -> { log.info("connectionCircuitBreaker Circuit halfOpen"); }).closeHandler(v -> { log.info("connectionCircuitBreaker Circuit close"); }); } private static void loadVertxOptions(VertxOptions vertxOptions) { long blockedThreadCheckInterval = VERTX_CONFIG.getVertxOptionsConfig() == null ? -1 : VERTX_CONFIG.getVertxOptionsConfig().getBlockedThreadCheckInterval(); int workerPoolSize = VERTX_CONFIG == null || VERTX_CONFIG.getVertxOptionsConfig() == null ? -1 : VERTX_CONFIG.getVertxOptionsConfig().getWorkerPoolSize(); if (workerPoolSize != -1) { vertxOptions.setWorkerPoolSize(workerPoolSize); } // TODO blockedThreadCheckInterval = 1000000L; if (blockedThreadCheckInterval != -1) { vertxOptions.setBlockedThreadCheckInterval(blockedThreadCheckInterval); // 不打印Thread blocked 阻塞日志 } } }