调整代码,支持断路器

This commit is contained in:
ztzh_xieyun 2024-04-29 11:38:24 +08:00
parent fc8e4789b4
commit 6c68c85b8e
8 changed files with 189 additions and 149 deletions

View File

@ -66,9 +66,9 @@
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-core</artifactId>
</dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-core</artifactId>
</dependency>
<!--
<dependency>
<groupId>io.vertx</groupId>
@ -112,6 +112,10 @@
<groupId>io.vertx</groupId>
<artifactId>vertx-web-client</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-circuit-breaker</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-junit5</artifactId>
@ -167,8 +171,8 @@
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
</dependencies>

View File

@ -7,6 +7,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.sf.vertx.api.pojo.AppConfig;
import com.sf.vertx.api.pojo.VertxConfig;
import com.sf.vertx.service.AppConfigService;
import lombok.extern.slf4j.Slf4j;
@ -18,21 +19,22 @@ import lombok.extern.slf4j.Slf4j;
*
*/
@RestController
@RequestMapping("/vertx/test/redis")
@RequestMapping("/vertx/config")
@Slf4j
public class AppConfigController {
@Autowired
private AppConfigService appConfigService;
@PostMapping("/addAppConfig")
@PostMapping("/saveAppConfig")
public String addAppConfig(@RequestBody String appConfig) {
appConfigService.addAppConfig(appConfig);
return null;
appConfigService.saveAppConfig(appConfig);
return "success";
}
@PostMapping("/addVertxConfig")
public String addVertxConfig() {
return null;
@PostMapping("/saveVertxConfig")
public String saveVertxConfig(@RequestBody VertxConfig vertxConfig) {
appConfigService.saveVertxConfig(vertxConfig);
return "success";
}

View File

@ -32,27 +32,37 @@ public class RateLimitHandlerRedisImpl implements RateLimitHandler {
public void handle(RoutingContext rc) {
String sacAppHeaderKey = rc.request().headers().get(AppConfigServiceImpl.getSacAppHeaderKey());
String appHeaderServiceName = rc.request().getHeader(AppConfigServiceImpl.getAppHeaderServiceName());
if (StringUtils.isBlank(sacAppHeaderKey) || StringUtils.isBlank(appHeaderServiceName)) { // 参数传递错误
rc.fail(new HttpException(10003));
return;
}
// 查询模式
SacService sacService = AppConfigServiceImpl.getSacService(sacAppHeaderKey, appHeaderServiceName);
if (sacService == null) { // 参数传递错误
rc.fail(new HttpException(10003));
return;
}
boolean matchUri = false;
Strategy strategyInterface = null; // 接口
for (GatewayInterface uri : sacService.getUriList()) {
if (uri.isUriRegular()) {
if (ProxyTool.regexMatch(uri.getUri(), rc.request().uri())) {
matchUri = true;
}
} else {
if (StringUtils.equals(uri.getUri(), rc.request().uri())) {
matchUri = true;
}
}
if (matchUri) {
for (Strategy strategy : uri.getStrategy()) {
if (StringUtils.equals(strategy.getType(), "CURRENT_LIMITING")) {
strategyInterface = strategy;
if (sacService.getUriList() != null) {
for (GatewayInterface uri : sacService.getUriList()) {
if (uri.isUriRegular()) {
if (ProxyTool.regexMatch(uri.getUri(), rc.request().uri())) {
matchUri = true;
}
} else {
if (StringUtils.equals(uri.getUri(), rc.request().uri())) {
matchUri = true;
}
}
break;
if (matchUri) {
for (Strategy strategy : uri.getStrategy()) {
if (StringUtils.equals(strategy.getType(), "CURRENT_LIMITING")) {
strategyInterface = strategy;
}
}
break;
}
}
}
@ -61,22 +71,25 @@ public class RateLimitHandlerRedisImpl implements RateLimitHandler {
&& AppConfigServiceImpl.getAppConfig(sacAppHeaderKey).getApiCurrentLimitingConfig() != null
? AppConfigServiceImpl.getAppConfig(sacAppHeaderKey).getApiCurrentLimitingConfig()
: null);
Strategy strategyApp = AppConfigServiceImpl.getAppConfig(sacAppHeaderKey) != null
&& AppConfigServiceImpl.getAppConfig(sacAppHeaderKey).getAppCurrentLimitingConfig() != null ?
AppConfigServiceImpl.getAppConfig(sacAppHeaderKey).getAppCurrentLimitingConfig() : null;
Map<Integer, Boolean> retMap = rateLimiter.acquire(rc, strategyInterface, strategyApp);
if(strategyInterface != null && retMap.get(1) == false) {
rc.fail(new HttpException(10101, strategyInterface.getDefaultResponse()));
return;
&& AppConfigServiceImpl.getAppConfig(sacAppHeaderKey).getAppCurrentLimitingConfig() != null
? AppConfigServiceImpl.getAppConfig(sacAppHeaderKey).getAppCurrentLimitingConfig()
: null;
if (strategyInterface != null || strategyApp != null) {
Map<Integer, Boolean> retMap = rateLimiter.acquire(rc, strategyInterface, strategyApp);
if (strategyInterface != null && retMap.get(1) == false) {
rc.fail(new HttpException(10101, strategyInterface.getDefaultResponse()));
return;
}
if (strategyApp != null && retMap.get(2) == false) {
rc.fail(new HttpException(10102, strategyApp.getDefaultResponse()));
return;
}
}
if(strategyApp != null && retMap.get(2) == false) {
rc.fail(new HttpException(10102, strategyApp.getDefaultResponse()));
return;
}
log.info("rateLimiter.acquire true");
rc.next();
return;

View File

@ -19,7 +19,6 @@ import lombok.extern.slf4j.Slf4j;
public class RedisRateLimiter {
public Map<Integer, Boolean> acquire(RoutingContext rc, Strategy strategyInterface, Strategy strategyApp) {
String appCode = rc.request().headers().get(AppConfigServiceImpl.getSacAppHeaderKey());
// TODO 先测试app模式,后面通过app缓存获取模式
String key = null;
Map<Integer, Boolean> retMap = new HashMap<>();
if (strategyInterface != null) {

View File

@ -1,7 +1,5 @@
package com.sf.vertx.init;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@ -21,13 +19,13 @@ import com.sf.vertx.service.AppConfigService;
import com.sf.vertx.service.impl.AppConfigServiceImpl;
import com.sf.vertx.utils.Filter;
import com.sf.vertx.utils.ProxyTool;
import com.sf.vertx.utils.SpringUtils;
import io.vertx.circuitbreaker.CircuitBreaker;
import io.vertx.circuitbreaker.CircuitBreakerOptions;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.ext.web.Router;
@ -66,9 +64,9 @@ public class DynamicBuildServer implements ApplicationRunner {
// 初始化redis key
redisKeyConfig.init();
// 从redis同步app配置
appConfigService.loadAppConfig();
appConfigService.initAllAppConfig();
// 从redis同步vertx配置
appConfigService.loadVertxConfig();
appConfigService.initVertxConfig();
// 加载vertx应用配置
appStartLoadData();
}
@ -80,18 +78,19 @@ public class DynamicBuildServer implements ApplicationRunner {
VertxConfig vertxConfig = AppConfigServiceImpl.getVertxConfig();
// TODO 编解码线程池,后面优化协程等方式
VertxOptions vertxOptions = new VertxOptions();
long blockedThreadCheckInterval = vertxConfig == null || vertxConfig.getVertxOptionsConfig() == null ? -1
: vertxConfig.getVertxOptionsConfig().getBlockedThreadCheckInterval();
int workerPoolSize = vertxConfig == null || vertxConfig.getVertxOptionsConfig() == null ? -1
: vertxConfig.getVertxOptionsConfig().getWorkerPoolSize();
if (workerPoolSize != -1) {
vertxOptions.setWorkerPoolSize(workerPoolSize);
}
loadVertxOptions(vertxConfig, vertxOptions);
if (blockedThreadCheckInterval != -1) {
vertxOptions.setBlockedThreadCheckInterval(blockedThreadCheckInterval); // 不打印Thread blocked 阻塞日志
}
Vertx VERTX = Vertx.vertx(vertxOptions);
CircuitBreakerOptions options = new CircuitBreakerOptions().setMaxFailures(5).setTimeout(5000)
.setFallbackOnFailure(true);
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", VERTX, options).openHandler(v -> {
System.out.println("Circuit opened");
}).closeHandler(v -> {
System.out.println("Circuit closed");
});
// 创建HTTP监听
// 所有ip都能访问
HttpServerOptions httpServerOptions = new HttpServerOptions().setHost("0.0.0.0");
@ -123,10 +122,6 @@ public class DynamicBuildServer implements ApplicationRunner {
// 修改uri context.request().setURI();
String sacAppHeaderKey = context.request().headers().get(AppConfigServiceImpl.getSacAppHeaderKey());
log.info("addInterceptor uri appCode:{}", sacAppHeaderKey);
// 判断是否需要加解析
if (AppConfigServiceImpl.appDataSecurity(sacAppHeaderKey)) {
// String data = decode(null, sacAppHeaderKey);
}
return context.sendRequest();
}
@ -145,31 +140,45 @@ public class DynamicBuildServer implements ApplicationRunner {
// mainHttpRouter.route().handler(ProxyHandler.create(proxy));
// TODO 实例化方式 VertxConfig 读取
String rateLimitModel = vertxConfig.getRateLimitModel() != null && StringUtils.equals(vertxConfig.getRateLimitModel(), "redis") ? "redis" : "local";
String rateLimitModel = vertxConfig.getRateLimitModel() != null
&& StringUtils.equals(vertxConfig.getRateLimitModel(), "redis") ? "redis" : "local";
mainHttpRouter.route().handler(RateLimitHandler.create(rateLimitModel)).handler(BodyHandler.create())
.handler(ProxyHandler.create(mainWebClient, proxy)).failureHandler(RestfulFailureHandler.create());
// mainHttpRouter.route().handler(BodyHandler.create()).handler(ProxyHandler.create(mainWebClient,proxy));
// 服务健康检测重试
Integer periodicTime = AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy() != null
&& AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy().getPeriodicTime() > 0
? AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy().getPeriodicTime()
: 3;
// TODO 是否开启健康检测
long timerID = VERTX.setPeriodic(periodicTime, id -> {
Set<String> set = redisTemplate.opsForZSet().range(RedisKeyConfig.APP_CONFIG_SET_KEY, 0, -1);
for (String appCode : set) {
Set<String> setAddressRetryStrategy = redisTemplate.opsForZSet()
.range(RedisKeyConfig.VERTX_ADDRESS_RETRY_STRATEGY_SET_KEY + ":" + appCode, 0, -1);
for (String address : setAddressRetryStrategy) {
// 发起请求,测试服务是否可用
// TODO 调用后端配置的健康检测地址
}
}
});
// Integer periodicTime = AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy() != null
// && AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy().getPeriodicTime() > 0
// ? AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy().getPeriodicTime()
// : 3;
//
// // TODO 是否开启健康检测
// long timerID = VERTX.setPeriodic(periodicTime, id -> {
// Set<String> set = redisTemplate.opsForZSet().range(RedisKeyConfig.APP_CONFIG_SET_KEY, 0, -1);
// for (String appCode : set) {
// Set<String> setAddressRetryStrategy = redisTemplate.opsForZSet()
// .range(RedisKeyConfig.VERTX_ADDRESS_RETRY_STRATEGY_SET_KEY + ":" + appCode, 0, -1);
// for (String address : setAddressRetryStrategy) {
// // 发起请求,测试服务是否可用
// // TODO 调用后端配置的健康检测地址
// }
//
// }
// });
}
private void loadVertxOptions(VertxConfig vertxConfig, VertxOptions vertxOptions) {
long blockedThreadCheckInterval = vertxConfig == null || vertxConfig.getVertxOptionsConfig() == null ? -1
: vertxConfig.getVertxOptionsConfig().getBlockedThreadCheckInterval();
int workerPoolSize = vertxConfig == null || vertxConfig.getVertxOptionsConfig() == null ? -1
: vertxConfig.getVertxOptionsConfig().getWorkerPoolSize();
if (workerPoolSize != -1) {
vertxOptions.setWorkerPoolSize(workerPoolSize);
}
if (blockedThreadCheckInterval != -1) {
vertxOptions.setBlockedThreadCheckInterval(blockedThreadCheckInterval); // 不打印Thread blocked 阻塞日志
}
}
}

View File

@ -6,13 +6,13 @@ import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing;
public interface AppConfigService {
void loadAppConfig() throws Exception;
void initAllAppConfig() throws Exception;
void addAppConfig(String appConfig);
void saveAppConfig(String appConfig);
void deleteAppConfig(AppConfig appConfig);
void loadVertxConfig();
void initVertxConfig();
void addVertxConfig(VertxConfig vertxConfig);
void saveVertxConfig(VertxConfig vertxConfig);
}

View File

@ -27,6 +27,7 @@ import com.sf.vertx.service.AppConfigService;
import com.sf.vertx.utils.ProxyTool;
import com.sf.vertx.utils.SpringUtils;
import cn.hutool.core.collection.ConcurrentHashSet;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ -42,6 +43,7 @@ public class AppConfigServiceImpl implements AppConfigService {
private static final ConcurrentHashMap<String, Strategy> CACHE_API_CURRENT_LIMITING_CONFIG = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, SacService> CACHE_APP_SERVICE = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, SacLoadBalancing> SAC_LOADBALANCING_MAP = new ConcurrentHashMap<String, SacLoadBalancing>();
private static ConcurrentHashSet<String> CACHE_DISABLED_APP = new ConcurrentHashSet<String>();
@SuppressWarnings("unchecked")
public static void addAddressRetryStrategy(String address, String appCode) {
@ -54,14 +56,14 @@ public class AppConfigServiceImpl implements AppConfigService {
&& VERTX_CONFIG.getAddressRetryStrategy().getTimeWindow() > 0
? VERTX_CONFIG.getAddressRetryStrategy().getTimeWindow()
: 20;
if(thresholdCount == 1) { // 创建,才设置时间窗口
if (thresholdCount == 1) { // 创建,才设置时间窗口
redisTemplate.expire(key, timeWindow, TimeUnit.SECONDS);
}
Integer threshold = AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy() != null
&& AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy().getThreshold() > 0
? AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy().getThreshold()
: 3;
if(thresholdCount > threshold) {
if (thresholdCount > threshold) {
// 设置服务不可用
redisTemplate.opsForZSet().add(setKey, appCode + ":" + address, 0);
}
@ -100,7 +102,7 @@ public class AppConfigServiceImpl implements AppConfigService {
/***
* 加载vertx配置
*/
public void loadVertxConfig() {
public void initVertxConfig() {
String vertxConfigKey = RedisKeyConfig.VERTX_CONFIG_STRING_KEY;
String vertxConfigValue = redisTemplate.opsForValue().get(vertxConfigKey);
if (StringUtils.isNotBlank(vertxConfigValue)) {
@ -108,62 +110,72 @@ public class AppConfigServiceImpl implements AppConfigService {
}
}
private void initAppConfig(String appCode) {
String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appCode;
String appCodeValue = redisTemplate.opsForValue().get(appCodeKey);
if (StringUtils.isNotBlank(appCodeValue)) {
AppConfig appConfig = JSON.parseObject(appCodeValue, new TypeReference<AppConfig>() {
});
CACHE_APP_CONFIG.put(appCode, appConfig);
// appapi默认限流
if(appConfig.getApiCurrentLimitingConfig() != null) {
CACHE_API_CURRENT_LIMITING_CONFIG.put(appCode, appConfig.getApiCurrentLimitingConfig());
}
if(appConfig.getAppCurrentLimitingConfig() != null) {
CACHE_APP_CURRENT_LIMITING_CONFIG.put(appCode, appConfig.getAppCurrentLimitingConfig());
}
// app router负载均衡
for (SacService sacService : appConfig.getService()) {
CACHE_APP_SERVICE.put(appCode + ";" + sacService.getServiceName(), sacService);
List<Node> nodeList = new ArrayList<>();
// 获取service模式
if (StringUtils.equals(sacService.getServiceModel(), "NORMAL")) {
Node node = new Node();
node.setIp(sacService.getServerAddress().getHost());
node.setPort(sacService.getServerAddress().getPort());
node.setWeight(0);
node.setProtocol(sacService.getServerAddress().getProtocol());
nodeList.add(node);
} else if (StringUtils.equals(sacService.getServiceModel(), "ROUTE")) {
if (sacService.getRouteConfig() != null
&& StringUtils.equals(sacService.getRouteConfig().getRouteType(), "WEIGHT_ROUTE")) {
for (RouteContent routeContent : sacService.getRouteConfig().getRouteContent()) {
Node node = new Node();
node.setIp(routeContent.getServerAddress().getHost());
node.setPort(routeContent.getServerAddress().getPort());
node.setWeight(routeContent.getWeight() != null && routeContent.getWeight() > 0
? routeContent.getWeight()
: 0);
node.setProtocol(sacService.getServerAddress().getProtocol());
nodeList.add(node);
}
}
}
if (nodeList.size() > 0) {
// 初始化负载均衡算法
String key = appCode + ";" + sacService.getServiceName();
SacLoadBalancing sacLoadBalancing = ProxyTool.roundRobin(nodeList);
SAC_LOADBALANCING_MAP.put(key, sacLoadBalancing);
}
}
}
}
/***
* 从redis加载数据
*
* @throws Exception
*/
public void loadAppConfig() throws Exception {
public void initAllAppConfig() throws Exception {
Set<String> set = redisTemplate.opsForZSet().range(RedisKeyConfig.APP_CONFIG_SET_KEY, 0, -1);
for (String appCode : set) {
String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appCode;
String appCodeValue = redisTemplate.opsForValue().get(appCodeKey);
if (StringUtils.isNotBlank(appCodeValue)) {
AppConfig appConfig = JSON.parseObject(appCodeValue, new TypeReference<AppConfig>() {
});
CACHE_APP_CONFIG.put(appCode, appConfig);
// appapi默认限流
CACHE_API_CURRENT_LIMITING_CONFIG.put(appCode, appConfig.getApiCurrentLimitingConfig());
CACHE_APP_CURRENT_LIMITING_CONFIG.put(appCode, appConfig.getAppCurrentLimitingConfig());
// app router负载均衡
for (SacService sacService : appConfig.getService()) {
CACHE_APP_SERVICE.put(appCode + ";" + sacService.getServiceName(), sacService);
List<Node> nodeList = new ArrayList<>();
// 获取service模式
if (StringUtils.equals(sacService.getServiceModel(), "NORMAL")) {
Node node = new Node();
node.setIp(sacService.getServerAddress().getHost());
node.setPort(sacService.getServerAddress().getPort());
node.setWeight(0);
node.setProtocol(sacService.getServerAddress().getProtocol());
nodeList.add(node);
} else if (StringUtils.equals(sacService.getServiceModel(), "ROUTE")) {
if (sacService.getRouteConfig() != null
&& StringUtils.equals(sacService.getRouteConfig().getRouteType(), "WEIGHT_ROUTE")) {
for (RouteContent routeContent : sacService.getRouteConfig().getRouteContent()) {
Node node = new Node();
node.setIp(routeContent.getServerAddress().getHost());
node.setPort(routeContent.getServerAddress().getPort());
node.setWeight(routeContent.getWeight() != null && routeContent.getWeight() > 0
? routeContent.getWeight()
: 0);
node.setProtocol(sacService.getServerAddress().getProtocol());
nodeList.add(node);
}
}
}
if (nodeList.size() > 0) {
// 初始化负载均衡算法
String key = appCode + ";" + sacService.getServiceName();
SacLoadBalancing sacLoadBalancing = ProxyTool.roundRobin(nodeList);
SAC_LOADBALANCING_MAP.put(key, sacLoadBalancing);
}
}
}
initAppConfig(appCode);
}
}
@ -180,15 +192,17 @@ public class AppConfigServiceImpl implements AppConfigService {
*
* @param appConfig
*/
public void addAppConfig(String appConfigStr) {
public void saveAppConfig(String appConfigStr) {
AppConfig appConfig = JSON.parseObject(appConfigStr, AppConfig.class);
redisTemplate.opsForZSet().add(RedisKeyConfig.APP_CONFIG_SET_KEY, appConfig.getAppCode(), 0);
String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appConfig.getAppCode();
redisTemplate.opsForValue().set(appCodeKey, appConfigStr);
// 发送redis队列,vertx处理
// String queue = RedisKeyConfig.VETX_ENVIRONMENT_KEY+vertxEnvironment+":list";
// 初始化AppConfig本地缓存
initAppConfig(appConfig.getAppCode());
// 删除禁用app列表
CACHE_DISABLED_APP.remove(appConfig.getAppCode());
}
/***
@ -200,9 +214,9 @@ public class AppConfigServiceImpl implements AppConfigService {
redisTemplate.opsForZSet().remove(RedisKeyConfig.APP_CONFIG_SET_KEY, appConfig.getAppCode());
String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appConfig.getAppCode();
redisTemplate.delete(appCodeKey);
// 发送redis队列,vertx处理
// String queue = RedisKeyConfig.VETX_ENVIRONMENT_KEY+vertxEnvironment+":list";
// 不动本地缓存, 入口控制app被禁用, 项目启动会加载最新配置
CACHE_DISABLED_APP.add(appConfig.getAppCode());
}
/***
@ -210,11 +224,9 @@ public class AppConfigServiceImpl implements AppConfigService {
*
* @param appConfig
*/
public void addVertxConfig(VertxConfig vertxConfig) {
public void saveVertxConfig(VertxConfig vertxConfig) {
String vertxConfigKey = RedisKeyConfig.VERTX_CONFIG_STRING_KEY;
redisTemplate.opsForValue().set(vertxConfigKey, JSONObject.toJSONString(vertxConfig));
// 发送redis队列,vertx处理
// String queue = RedisKeyConfig.VETX_ENVIRONMENT_KEY+vertxEnvironment+":list";
initVertxConfig();
}
}

View File

@ -46,6 +46,7 @@ public class ProxyTool {
_ERROR.put(10000, "无法找到路由地址");
_ERROR.put(10001, "加解密算法传递错误");
_ERROR.put(10003, "参数传递错误");
_ERROR.put(10101, "接口限流错误");
_ERROR.put(10102, "应用限流错误");