294 lines
11 KiB
Java
294 lines
11 KiB
Java
![]() |
package com.sf.vertx.handle;
|
|||
|
|
|||
|
import java.util.ArrayList;
|
|||
|
import java.util.List;
|
|||
|
import java.util.Set;
|
|||
|
import java.util.concurrent.ConcurrentHashMap;
|
|||
|
|
|||
|
import org.apache.commons.lang3.StringUtils;
|
|||
|
import org.springframework.data.redis.core.RedisTemplate;
|
|||
|
|
|||
|
import com.alibaba.fastjson2.JSON;
|
|||
|
import com.alibaba.fastjson2.JSONObject;
|
|||
|
import com.alibaba.fastjson2.TypeReference;
|
|||
|
import com.sf.vertx.api.pojo.ApiConfig;
|
|||
|
import com.sf.vertx.api.pojo.AppConfig;
|
|||
|
import com.sf.vertx.api.pojo.Node;
|
|||
|
import com.sf.vertx.api.pojo.RouteContent;
|
|||
|
import com.sf.vertx.api.pojo.SacService;
|
|||
|
import com.sf.vertx.api.pojo.Strategy;
|
|||
|
import com.sf.vertx.api.pojo.VertxConfig;
|
|||
|
import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing;
|
|||
|
import com.sf.vertx.constans.RedisKeyConfig;
|
|||
|
import com.sf.vertx.utils.ProxyTool;
|
|||
|
|
|||
|
import cn.hutool.core.collection.ConcurrentHashSet;
|
|||
|
import io.vertx.circuitbreaker.CircuitBreaker;
|
|||
|
import io.vertx.circuitbreaker.CircuitBreakerOptions;
|
|||
|
import io.vertx.core.Vertx;
|
|||
|
import io.vertx.core.VertxOptions;
|
|||
|
import lombok.extern.slf4j.Slf4j;
|
|||
|
|
|||
|
/***
|
|||
|
* vertx配置维护
|
|||
|
*
|
|||
|
* @author xy
|
|||
|
*
|
|||
|
*/
|
|||
|
@Slf4j
|
|||
|
public class AppConfigHandle {
|
|||
|
private static VertxConfig VERTX_CONFIG = new VertxConfig();
|
|||
|
public static Vertx VERTX;
|
|||
|
public static CircuitBreaker CONNECTION_CIRCUIT_BREAKER;
|
|||
|
// global cache app config
|
|||
|
private static final ConcurrentHashMap<String, AppConfig> CACHE_APP_CONFIG_MAP = new ConcurrentHashMap<>();
|
|||
|
// global app config appCode - Strategy
|
|||
|
private static final ConcurrentHashMap<String, Strategy> GLOBAL_APP_CURRENT_LIMITING_CONFIG_MAP = new ConcurrentHashMap<>();
|
|||
|
// global api config appCode - Strategy
|
|||
|
private static final ConcurrentHashMap<String, Strategy> GLOBAL_API_CURRENT_LIMITING_CONFIG_MAP = new ConcurrentHashMap<>();
|
|||
|
// appCode:apiCode:SacLoadBalancing
|
|||
|
private static ConcurrentHashMap<String, SacLoadBalancing> LOADBALANCING_MAP = new ConcurrentHashMap<>();
|
|||
|
// appCode:apiCode - ApiConfig
|
|||
|
private static ConcurrentHashMap<String, ApiConfig> APICODE_CONFIG_MAP = new ConcurrentHashMap<>();
|
|||
|
// apiCode限流配置 appCode:apiCode - Strategy
|
|||
|
private static ConcurrentHashMap<String, Strategy> APICODE_CONFIG_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>();
|
|||
|
|
|||
|
// apiCode熔断配置 appCode:apiCode - CircuitBreaker
|
|||
|
private static ConcurrentHashMap<String, CircuitBreaker> APICODE_CONFIG_CIRCUIT_BREAKER_MAP = new ConcurrentHashMap<>();
|
|||
|
|
|||
|
// 禁用appCode
|
|||
|
private static ConcurrentHashSet<String> DISABLED_APPCODE = new ConcurrentHashSet<String>();
|
|||
|
|
|||
|
public static void addDisabledAppcode(String appCode) {
|
|||
|
DISABLED_APPCODE.add(appCode);
|
|||
|
}
|
|||
|
|
|||
|
public static boolean isDisabledAppcode(String appCode) {
|
|||
|
return DISABLED_APPCODE.contains(appCode);
|
|||
|
}
|
|||
|
|
|||
|
public static boolean appDataSecurity(String appCode) {
|
|||
|
return CACHE_APP_CONFIG_MAP.get(appCode).getDataSecurity() != null ? true : false;
|
|||
|
}
|
|||
|
|
|||
|
public static Strategy getGlobalAppCurrentLimitingConfig(String appCode) {
|
|||
|
return GLOBAL_APP_CURRENT_LIMITING_CONFIG_MAP.get(appCode);
|
|||
|
}
|
|||
|
|
|||
|
public static Strategy getGlobalApiCurrentLimitingConfig(String appCode) {
|
|||
|
return GLOBAL_API_CURRENT_LIMITING_CONFIG_MAP.get(appCode);
|
|||
|
}
|
|||
|
|
|||
|
public static AppConfig getAppConfig(String appCode) {
|
|||
|
return CACHE_APP_CONFIG_MAP.get(appCode);
|
|||
|
}
|
|||
|
|
|||
|
public static boolean isDataSecurity(String appCode) {
|
|||
|
return CACHE_APP_CONFIG_MAP.get(appCode).getDataSecurity() != null ? true : false;
|
|||
|
}
|
|||
|
|
|||
|
public static boolean isApiCodeCircuitBreaker(String key) {
|
|||
|
return APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key) != null ? true : false;
|
|||
|
}
|
|||
|
|
|||
|
public static CircuitBreaker getApiCodeCircuitBreaker(String key) {
|
|||
|
return APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key);
|
|||
|
}
|
|||
|
|
|||
|
public static Strategy getApiCurrentLimiting(String key) {
|
|||
|
return APICODE_CONFIG_CURRENT_LIMITING_MAP.get(key);
|
|||
|
}
|
|||
|
|
|||
|
public static VertxConfig getVertxConfig() {
|
|||
|
return VERTX_CONFIG;
|
|||
|
}
|
|||
|
|
|||
|
public static String getAppCodeHeaderKey() {
|
|||
|
return VERTX_CONFIG.getAppCodeHeaderKey();
|
|||
|
}
|
|||
|
|
|||
|
public static String getApiCodeHeaderKey() {
|
|||
|
return VERTX_CONFIG.getApiCodeHeaderKey();
|
|||
|
}
|
|||
|
|
|||
|
public static SacLoadBalancing getLoadBalancing(String key) {
|
|||
|
return LOADBALANCING_MAP.get(key);
|
|||
|
}
|
|||
|
|
|||
|
public static ApiConfig getApicodeConfigMap(String key) {
|
|||
|
return APICODE_CONFIG_MAP.get(key);
|
|||
|
}
|
|||
|
|
|||
|
public static boolean isApicodeUri(String key, String uri) {
|
|||
|
return StringUtils.equals(APICODE_CONFIG_MAP.get(key).getUri(), uri);
|
|||
|
}
|
|||
|
|
|||
|
/***
|
|||
|
* 从redis加载数据
|
|||
|
*
|
|||
|
* @throws Exception
|
|||
|
*/
|
|||
|
public static void initAllAppConfig(RedisTemplate<String, String> redisTemplate) throws Exception {
|
|||
|
Set<String> set = redisTemplate.opsForZSet().range(RedisKeyConfig.APP_CONFIG_SET_KEY, 0, -1);
|
|||
|
for (String appCode : set) {
|
|||
|
AppConfigHandle.initAppConfig(redisTemplate, appCode);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/***
|
|||
|
* 加载vertx配置
|
|||
|
*/
|
|||
|
public static void initVertxConfig(RedisTemplate<String, String> redisTemplate) {
|
|||
|
String vertxConfigKey = RedisKeyConfig.VERTX_CONFIG_STRING_KEY;
|
|||
|
String vertxConfigValue = redisTemplate.opsForValue().get(vertxConfigKey);
|
|||
|
if (StringUtils.isNotBlank(vertxConfigValue)) {
|
|||
|
VERTX_CONFIG = JSONObject.parseObject(vertxConfigValue, VertxConfig.class);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
public static void initAppConfig(RedisTemplate<String, String> redisTemplate, String appCode) {
|
|||
|
String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appCode;
|
|||
|
String appCodeValue = redisTemplate.opsForValue().get(appCodeKey);
|
|||
|
if (StringUtils.isNotBlank(appCodeValue)) {
|
|||
|
AppConfig appConfig = JSON.parseObject(appCodeValue, new TypeReference<AppConfig>() {
|
|||
|
});
|
|||
|
CACHE_APP_CONFIG_MAP.put(appCode, appConfig);
|
|||
|
|
|||
|
// app、api默认限流
|
|||
|
if (appConfig.getApiCurrentLimitingConfig() != null) {
|
|||
|
GLOBAL_API_CURRENT_LIMITING_CONFIG_MAP.put(appCode, appConfig.getApiCurrentLimitingConfig());
|
|||
|
|
|||
|
}
|
|||
|
|
|||
|
if (appConfig.getAppCurrentLimitingConfig() != null) {
|
|||
|
GLOBAL_APP_CURRENT_LIMITING_CONFIG_MAP.put(appCode, appConfig.getAppCurrentLimitingConfig());
|
|||
|
}
|
|||
|
|
|||
|
// app router负载均衡
|
|||
|
for (SacService sacService : appConfig.getService()) {
|
|||
|
List<Node> nodeList = new ArrayList<>();
|
|||
|
// 获取service模式
|
|||
|
if (StringUtils.equals(sacService.getServiceModel(), "NORMAL")) {
|
|||
|
Node node = new Node();
|
|||
|
node.setIp(sacService.getServerAddress().getHost());
|
|||
|
node.setPort(sacService.getServerAddress().getPort());
|
|||
|
node.setWeight(0);
|
|||
|
node.setProtocol(sacService.getServerAddress().getProtocol());
|
|||
|
nodeList.add(node);
|
|||
|
} else if (StringUtils.equals(sacService.getServiceModel(), "ROUTE")) {
|
|||
|
if (sacService.getRouteConfig() != null
|
|||
|
&& StringUtils.equals(sacService.getRouteConfig().getRouteType(), "WEIGHT_ROUTE")) {
|
|||
|
for (RouteContent routeContent : sacService.getRouteConfig().getRouteContent()) {
|
|||
|
Node node = new Node();
|
|||
|
node.setIp(routeContent.getServerAddress().getHost());
|
|||
|
node.setPort(routeContent.getServerAddress().getPort());
|
|||
|
node.setWeight(routeContent.getWeight() != null && routeContent.getWeight() > 0
|
|||
|
? routeContent.getWeight()
|
|||
|
: 0);
|
|||
|
node.setProtocol(routeContent.getServerAddress().getProtocol());
|
|||
|
nodeList.add(node);
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
// 初始化apiConfig
|
|||
|
if (sacService.getApiConfig() != null && sacService.getApiConfig().size() > 0) {
|
|||
|
for (ApiConfig apiConfig : sacService.getApiConfig()) {
|
|||
|
String key = appCode + ":" + apiConfig.getApiCode();
|
|||
|
APICODE_CONFIG_MAP.put(key, apiConfig);
|
|||
|
if (nodeList.size() > 0) {
|
|||
|
// 初始化负载均衡算法
|
|||
|
SacLoadBalancing sacLoadBalancing = ProxyTool.roundRobin(nodeList);
|
|||
|
LOADBALANCING_MAP.put(key, sacLoadBalancing);
|
|||
|
}
|
|||
|
|
|||
|
if (apiConfig.getStrategy() != null && apiConfig.getStrategy().size() > 0) {
|
|||
|
for (Strategy strategy : apiConfig.getStrategy()) {
|
|||
|
if (StringUtils.equals(strategy.getType(), "CURRENT_LIMITING")) {
|
|||
|
APICODE_CONFIG_CURRENT_LIMITING_MAP.put(key, strategy);
|
|||
|
} else if (StringUtils.equals(strategy.getType(), "CIRCUIT_BREAKER")) {
|
|||
|
String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER";
|
|||
|
// interfaceBreaker = CircuitBreaker.create("interfaceBreaker", VERTX,
|
|||
|
// new CircuitBreakerOptions().setMaxFailures(3).setMaxRetries(5).setTimeout(2000)
|
|||
|
// .setFallbackOnFailure(true)
|
|||
|
// ).openHandler(v -> {
|
|||
|
// log.info("Circuit opened");
|
|||
|
// }).closeHandler(v -> {
|
|||
|
// log.info("Circuit closed");
|
|||
|
// });//.retryPolicy(retryCount -> retryCount * 100L);
|
|||
|
|
|||
|
// apiCode熔断
|
|||
|
CircuitBreaker circuitBreaker = CircuitBreaker.create(keyCircuitBreaker + "-circuit-breaker",
|
|||
|
VERTX, new CircuitBreakerOptions().setMaxFailures(strategy.getThreshold()) // 最大失败数
|
|||
|
.setFailuresRollingWindow(strategy.getTimeWindow() * 1000) // 毫秒
|
|||
|
// .setTimeout(2000) // 超时时间
|
|||
|
.setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback)
|
|||
|
.setResetTimeout(strategy.getRecovery_interval()) // 在开启状态下,尝试重试之前所需时间
|
|||
|
).openHandler(v -> {
|
|||
|
log.info(keyCircuitBreaker + " Circuit open");
|
|||
|
}).halfOpenHandler(v -> {
|
|||
|
log.info(keyCircuitBreaker + "Circuit halfOpen");
|
|||
|
}).closeHandler(v -> {
|
|||
|
log.info(keyCircuitBreaker + "Circuit close");
|
|||
|
});
|
|||
|
APICODE_CONFIG_CIRCUIT_BREAKER_MAP.put(keyCircuitBreaker, circuitBreaker);
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
public static Vertx createVertx() {
|
|||
|
// TODO 编解码线程池,后面优化协程等方式
|
|||
|
VertxOptions vertxOptions = new VertxOptions();
|
|||
|
loadVertxOptions(vertxOptions);
|
|||
|
VERTX = Vertx.vertx(vertxOptions);
|
|||
|
|
|||
|
initConnectionCircuitBreaker();
|
|||
|
return VERTX;
|
|||
|
}
|
|||
|
|
|||
|
/***
|
|||
|
* 初始化connection Breaker
|
|||
|
*/
|
|||
|
private static void initConnectionCircuitBreaker() {
|
|||
|
CONNECTION_CIRCUIT_BREAKER = CircuitBreaker.create("connectionCircuitBreaker-circuit-breaker", VERTX,
|
|||
|
new CircuitBreakerOptions().setMaxFailures(3) // 最大失败数
|
|||
|
.setTimeout(2000) // 超时时间
|
|||
|
.setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback)
|
|||
|
.setResetTimeout(10000) // 在开启状态下,尝试重试之前所需时间
|
|||
|
).openHandler(v -> {
|
|||
|
log.info("connectionCircuitBreaker Circuit open");
|
|||
|
}).halfOpenHandler(v -> {
|
|||
|
log.info("connectionCircuitBreaker Circuit halfOpen");
|
|||
|
}).closeHandler(v -> {
|
|||
|
log.info("connectionCircuitBreaker Circuit close");
|
|||
|
});
|
|||
|
}
|
|||
|
|
|||
|
|
|||
|
private static void loadVertxOptions(VertxOptions vertxOptions) {
|
|||
|
long blockedThreadCheckInterval = VERTX_CONFIG.getVertxOptionsConfig() == null ? -1
|
|||
|
: VERTX_CONFIG.getVertxOptionsConfig().getBlockedThreadCheckInterval();
|
|||
|
int workerPoolSize = VERTX_CONFIG == null || VERTX_CONFIG.getVertxOptionsConfig() == null ? -1
|
|||
|
: VERTX_CONFIG.getVertxOptionsConfig().getWorkerPoolSize();
|
|||
|
if (workerPoolSize != -1) {
|
|||
|
vertxOptions.setWorkerPoolSize(workerPoolSize);
|
|||
|
}
|
|||
|
|
|||
|
// TODO
|
|||
|
blockedThreadCheckInterval = 1000000L;
|
|||
|
if (blockedThreadCheckInterval != -1) {
|
|||
|
vertxOptions.setBlockedThreadCheckInterval(blockedThreadCheckInterval); // 不打印Thread blocked 阻塞日志
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
|
|||
|
|
|||
|
}
|