vertx改为动态加载配置

This commit is contained in:
ztzh_xieyun 2024-04-28 19:37:42 +08:00
parent 0a5b0e8d78
commit cbdc0a8279
26 changed files with 627 additions and 335 deletions

View File

@ -0,0 +1,13 @@
package com.sf.vertx.api.pojo;
import java.io.Serializable;
import lombok.Data;
@Data
public class AddressRetryStrategy implements Serializable {
private static final long serialVersionUID = -336914981251646244L;
private Integer threshold = 3; // 2, // 失败次数
private Integer timeWindow = 20; //1, //时间窗口单位s
private Integer periodicTime = 3; // vertx定时任务循环执行时间间隔
}

View File

@ -10,9 +10,11 @@ public class AppConfig implements Serializable {
private static final long serialVersionUID = 1518165296680157119L;
private String appCode; // 应用唯一码, app访问uri添加前缀,用于网关区分多应用
private boolean exclusiveService; // 预留字段, 独立端口
private Integer exclusiveGatewayConfigId; // 独享网关配置编号
private Integer exclusiveGatewayConfigId; // 预留字段, 独享网关配置编号
private EnvironmentConfig environmentConfig; // 环境配置
private List<SacService> service; // 服务
private AdvancedConfig advancedConfig; // 高级配置
private DataSecurity dataSecurity; // 数据加解密
private ApiCurrentLimitingConfig apiCurrentLimitingConfig; // 接口限流配置
private AppCurrentLimitingConfig appCurrentLimitingConfig; // APP限流配置
private AdvancedConfig advancedConfig; // 高级配置
}

View File

@ -7,8 +7,7 @@ import lombok.Data;
@Data
public class DataSecurity implements Serializable {
private static final long serialVersionUID = 5034274428665340830L;
private String key; //加密串
private String algorithm; // 国密加解密
private String algorithm; // 加密算法ECCRSA 和国密SM2
private String publicKey; // 公钥
private String privateKey; // 私钥
private String privateKey; // 私钥 当加密算法为 ECC 或国密SM2时填写私钥内容当加密算法为 RSA 分别填写公私钥内容)
}

View File

@ -9,9 +9,5 @@ import lombok.Data;
@Data
public class EnvironmentConfig implements Serializable {
private static final long serialVersionUID = -3952046909425019869L;
private Integer defaultId; // 默认环境配置编号
private Map<Integer, List<Node>> environmentGroup; // 环境节点
}

View File

@ -10,4 +10,6 @@ public class GatewayInterface implements Serializable {
private String uri;
private boolean uriRegular; // uri 正则
private String method; // 大写
private Strategy strategy; // 策略
}

View File

@ -7,8 +7,9 @@ import lombok.Data;
@Data
public class HttpClientOptionsConfig implements Serializable {
private static final long serialVersionUID = -6302301564759941097L;
private int maxPoolSize;
private int connectTimeout;
private int http2KeepAliveTimeout;
private int idleTimeout;
private Integer maxPoolSize;
private Integer connectTimeout;
private Integer http2KeepAliveTimeout;
private Integer idleTimeout;
private Integer timeout;
}

View File

@ -21,8 +21,7 @@ public class Node implements Comparable<Node>, Serializable {
private Integer weight;
private Integer effectiveWeight;
private Integer currentWeight;
private boolean createHttp; // 协议
private boolean createHttps;
private String protocol; // 协议
public Node() {
}
@ -40,21 +39,13 @@ public class Node implements Comparable<Node>, Serializable {
this.effectiveWeight = effectiveWeight;
this.currentWeight = currentWeight;
}
public boolean isCreateHttp() {
return createHttp;
public String getProtocol() {
return protocol;
}
public void setCreateHttp(boolean createHttp) {
this.createHttp = createHttp;
}
public boolean isCreateHttps() {
return createHttps;
}
public void setCreateHttps(boolean createHttps) {
this.createHttps = createHttps;
public void setProtocol(String protocol) {
this.protocol = protocol;
}
public String getIp() {

View File

@ -0,0 +1,16 @@
package com.sf.vertx.api.pojo;
import java.io.Serializable;
import java.util.List;
import lombok.Data;
@Data
public class RouteContent implements Serializable {
private static final long serialVersionUID = -4405058529530680618L;
private ServerAddress serverAddress; // 服务地址
private Integer weight; //权重值
private String headerKey; //请求头
private List<String> headerValues; // ["v1","v2"],
private String matchType; // 匹配类型EQ,IN
}

View File

@ -1,16 +1,14 @@
package com.sf.vertx.api.pojo;
import java.io.Serializable;
import java.util.Map;
import java.util.List;
import lombok.Data;
@Data
public class Router implements Serializable {
private static final long serialVersionUID = -4471811880134025210L;
private String domain; // 域名
private Map<String, String> headers; // 头字段
private String headerVal; // 头字段
private Integer environmentId; // 环境配置编号
private String routeType; // 路由类型 WEIGHT_ROUTE HEADER_ROUTE
private List<RouteContent> routeContent; // 路由的配置信息
}

View File

@ -8,8 +8,9 @@ import lombok.Data;
@Data
public class SacService implements Serializable {
private static final long serialVersionUID = -5171112142954536813L;
private Strategy strategy; // 策略
private String serviceName; // 服务名
private String serviceModel; // 模式, NORMAL, ROUTE
private ServerAddress serverAddress; // NORMAL模式的服务地址
private List<GatewayInterface> uriList; // uri列表
private Router router; // 路由
private Router routeConfig; // 路由
}

View File

@ -1,14 +1,16 @@
//package com.sf.vertx.api.pojo;
//
//import java.io.Serializable;
//
//import lombok.Data;
//
//@Data
//public class ServerAddress implements Serializable {
// private static final long serialVersionUID = 2821255113510132943L;
// private boolean createHttp; // 协议
// private boolean createHttps;
// private String ip;
// private int port;
//}
package com.sf.vertx.api.pojo;
import java.io.Serializable;
import lombok.Data;
@Data
public class ServerAddress implements Serializable {
private static final long serialVersionUID = 7446602403871080553L;
private String address;
private String protocol; // "http"
private String host;
private int port;
private String path; // 前缀
}

View File

@ -13,4 +13,15 @@ import lombok.Data;
public class Strategy implements Serializable {
private static final long serialVersionUID = -8831406773224882471L;
// 限流熔断
private String type;// CURRENT_LIMITING 限流策略, 熔断策略, CIRCUIT_BREAKER
private Integer threshold; // 2, // 限流阈值(APP总和)
private Integer timeWindow; //1, //时间窗口单位s
private Integer recovery_interval; //熔断后的恢复时间间隔单位s
private String defaultResponse;
// ": "{
// \"msg\": \"接口繁忙请重试\",
// \"code\": 501,
// \"data\": \"到达限流阈值\",
// }" // 默认限流响应JSON字符串。
}

View File

@ -8,7 +8,10 @@ import lombok.Data;
public class VertxConfig implements Serializable {
private static final long serialVersionUID = -1706421732809219829L;
private Integer port; // 启动端口
private String appHeaderKey;
private String appHeaderServiceName;
private VertxOptionsConfig vertxOptionsConfig;
private HttpClientOptionsConfig httpClientOptionsConfig; // 配置Vert端口连接池
private AddressRetryStrategy addressRetryStrategy;
}

View File

@ -9,7 +9,7 @@ public class VertxOptionsConfig {
private int eventLoopPoolSize;
private int workerPoolSize ;
private int internalBlockingPoolSize;
private long blockedThreadCheckInterval;
private long blockedThreadCheckInterval; // 3000000000 打印thread wait日志
private long maxEventLoopExecuteTime;
private long maxWorkerExecuteTime;
//private ClusterManager clusterManager;

View File

@ -5,7 +5,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.ApplicationContext;
import io.vertx.core.http.impl.Http1xServerRequest;
import lombok.extern.slf4j.Slf4j;
/**

View File

@ -14,11 +14,15 @@ public class RedisKeyConfig {
public static String APP_CURRENT_LIMITING_CONFIG_KEY = null;
public static String APP_CONFIG_PREFIX_KEY = null;
public static String VERTX_CONFIG_STRING_KEY = null;
public static String VERTX_ADDRESS_RETRY_STRATEGY_SET_KEY = null;
public static String VERTX_ADDRESS_RETRY_STRATEGY_KEY = null;
public void init() {
APP_CONFIG_PREFIX_KEY = BASE_REDIS_KEY + vertxEnvironment;
APP_CONFIG_SET_KEY = APP_CONFIG_PREFIX_KEY + ":set";
APP_CURRENT_LIMITING_CONFIG_KEY = APP_CONFIG_PREFIX_KEY + ":app";
APP_CURRENT_LIMITING_CONFIG_KEY = APP_CONFIG_PREFIX_KEY + ":app:limiting";
VERTX_CONFIG_STRING_KEY = BASE_REDIS_KEY + vertxEnvironment + ":vertx";
VERTX_ADDRESS_RETRY_STRATEGY_KEY = APP_CONFIG_PREFIX_KEY + ":addAddressRetryStrategy";
VERTX_ADDRESS_RETRY_STRATEGY_SET_KEY = VERTX_ADDRESS_RETRY_STRATEGY_KEY + ":set";
}
}

View File

@ -1,5 +1,15 @@
package com.sf.vertx.handle;
import java.io.File;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import com.sf.vertx.service.impl.AppConfigServiceImpl;
import com.sf.vertx.utils.ProxyTool;
/*
* Copyright 2014 Red Hat, Inc.
*
@ -33,16 +43,6 @@ import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.impl.FileUploadImpl;
import io.vertx.ext.web.impl.RoutingContextInternal;
import java.io.File;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import com.sf.vertx.init.DynamicBuildServer;
import com.sf.vertx.service.impl.AppConfigServiceImpl;
/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
@ -80,7 +80,7 @@ public class BodyHandlerImpl implements BodyHandler {
// TODO 改造了这个地方
final HttpServerRequest request = context.request();
final HttpServerResponse response = context.response();
String sacAppHeaderKey = request.getHeader(DynamicBuildServer.SAC_APP_HEADER_KEY);
String sacAppHeaderKey = request.getHeader(AppConfigServiceImpl.getSacAppHeaderKey());
if (StringUtils.isNotBlank(sacAppHeaderKey) && AppConfigServiceImpl.appDataSecurity(sacAppHeaderKey)) {
// 加解密在proxy拦截器解析跳转
// =======源码流程

View File

@ -1,8 +1,8 @@
package com.sf.vertx.handle;
import com.sf.vertx.api.pojo.AppCurrentLimitingConfig;
import com.sf.vertx.init.DynamicBuildServer;
import com.sf.vertx.service.impl.AppConfigServiceImpl;
import com.sf.vertx.utils.ProxyTool;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.HttpException;
@ -26,7 +26,7 @@ public class RateLimitHandlerRedisImpl implements RateLimitHandler {
@Override
public void handle(RoutingContext rc) {
String sacAppHeaderKey = rc.request().headers().get(DynamicBuildServer.SAC_APP_HEADER_KEY);
String sacAppHeaderKey = rc.request().headers().get(AppConfigServiceImpl.getSacAppHeaderKey());
// TODO 判断是否开启限流配置
log.info("RateLimitHandlerRedisImpl request:{}", sacAppHeaderKey);
// rc.next();

View File

@ -7,7 +7,6 @@ import org.springframework.data.redis.core.RedisTemplate;
import com.sf.vertx.api.pojo.ApiCurrentLimitingConfig;
import com.sf.vertx.api.pojo.AppCurrentLimitingConfig;
import com.sf.vertx.constans.RedisKeyConfig;
import com.sf.vertx.init.DynamicBuildServer;
import com.sf.vertx.service.impl.AppConfigServiceImpl;
import com.sf.vertx.utils.SpringUtils;
@ -18,19 +17,23 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RedisRateLimiter {
public Boolean acquire(RoutingContext rc, int pattern) {
String appCode = rc.request().headers().get(DynamicBuildServer.SAC_APP_HEADER_KEY);
String appCode = rc.request().headers().get(AppConfigServiceImpl.getSacAppHeaderKey());
// TODO 先测试app模式,后面通过app缓存获取模式
String key = null;
switch (pattern) {
case 1:
AppCurrentLimitingConfig appCurrentLimitingConfig = AppConfigServiceImpl.getAppCurrentLimitingConfig(appCode);
AppCurrentLimitingConfig appCurrentLimitingConfig = AppConfigServiceImpl
.getAppCurrentLimitingConfig(appCode);
key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCode;
return rateLimiter(key, appCode, appCurrentLimitingConfig.getThreshold(), appCurrentLimitingConfig.getTimeWindow());
return rateLimiter(key, appCode, appCurrentLimitingConfig.getThreshold(),
appCurrentLimitingConfig.getTimeWindow());
case 2:
ApiCurrentLimitingConfig apiCurrentLimitingConfig = AppConfigServiceImpl.getApiCurrentLimitingConfig(appCode);
ApiCurrentLimitingConfig apiCurrentLimitingConfig = AppConfigServiceImpl
.getApiCurrentLimitingConfig(appCode);
key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCode + ":" + rc.request().uri() + ":"
+ rc.request().method();
return rateLimiter(key, appCode, apiCurrentLimitingConfig.getThreshold(), apiCurrentLimitingConfig.getTimeWindow());
return rateLimiter(key, appCode, apiCurrentLimitingConfig.getThreshold(),
apiCurrentLimitingConfig.getTimeWindow());
default:
break;
}
@ -44,29 +47,18 @@ public class RedisRateLimiter {
Object count = redisTemplate.opsForValue().get(key);
// redisTemplate.delete(key);
log.info("count:{}", count);
if (count == null) {
// 初始化,设置过期时间
ThreadUtil.execAsync(() -> {
add(timeWindow, redisTemplate, key);
});
} else if (Integer.valueOf(count.toString()) < threshold) {
ThreadUtil.execAsync(() -> {
increment(redisTemplate, key);
});
} else {
return false;
// 初始化,设置过期时间
ThreadUtil.execAsync(() -> {
add(timeWindow, redisTemplate, key);
});
return count != null && Integer.valueOf(count.toString()) <= threshold ? true : false;
}
private void add(Integer timeWindow, RedisTemplate<String, Integer> redisTemplate, String key) {
// log.info("redis app threshold: {}", redisTemplate.opsForValue().get(key));
Long incr = redisTemplate.opsForValue().increment(key);
if (incr == 1) { // 创建,才设置时间窗口
redisTemplate.expire(key, timeWindow, TimeUnit.SECONDS);
}
return true;
}
private void add(Integer timeWindow, RedisTemplate<String, Integer> redisTemplate,
String key) {
redisTemplate.opsForValue().increment(key);
log.info("redis app threshold: {}", redisTemplate.opsForValue().get(key));
redisTemplate.expire(key, timeWindow, TimeUnit.SECONDS);
}
private void increment(RedisTemplate<String, Integer> redisTemplate, String key) {
redisTemplate.opsForValue().increment(key);
}
}

View File

@ -3,12 +3,14 @@ package com.sf.vertx.handle;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.HttpException;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RestfulFailureHandlerImpl implements RestfulFailureHandler {
@Override
public void handle(RoutingContext frc) {
Throwable failure = frc.failure();
log.info("Throwable error:{}", failure);
if (failure instanceof HttpException) {
HttpException httpException = (HttpException) failure;
// frc.response().setStatusCode(404).end();
@ -17,7 +19,9 @@ public class RestfulFailureHandlerImpl implements RestfulFailureHandler {
.putHeader("Content-Type", "application/json").end(dataJson.toBuffer());
return;
}
frc.response().setStatusCode(500).setStatusMessage("Server internal error:" + failure.getMessage()).end();
frc.response().setChunked(true).setStatusCode(400)
.putHeader("Content-Type", "application/json").end("{\n" + " \"msg\": \"接口繁忙请重试\",\n" + " \"code\": 501,\n" + " \"data\": \"到达限流阈值\"\n" + "}");
return;
}
}

View File

@ -1,8 +1,6 @@
package com.sf.vertx.init;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -10,36 +8,31 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import com.sf.vertx.api.pojo.AppConfig;
import com.sf.vertx.api.pojo.GatewayInterface;
import com.sf.vertx.api.pojo.Node;
import com.sf.vertx.api.pojo.SacService;
import com.sf.vertx.api.pojo.VertxConfig;
import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing;
import com.sf.vertx.arithmetic.roundRobin.WeightedRoundRobin;
import com.sf.vertx.constans.RedisKeyConfig;
import com.sf.vertx.handle.BodyHandler;
import com.sf.vertx.handle.ProxyHandler;
import com.sf.vertx.handle.RateLimitHandler;
import com.sf.vertx.handle.RestfulFailureHandler;
import com.sf.vertx.security.MainSecurity;
import com.sf.vertx.service.AppConfigService;
import com.sf.vertx.service.impl.AppConfigServiceImpl;
import com.sf.vertx.utils.Filter;
import com.sf.vertx.utils.ProxyTool;
import com.sf.vertx.utils.SpringUtils;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.handler.HttpException;
import io.vertx.httpproxy.Body;
import io.vertx.httpproxy.HttpProxy;
import io.vertx.httpproxy.ProxyContext;
import io.vertx.httpproxy.ProxyInterceptor;
@ -56,11 +49,12 @@ import lombok.extern.slf4j.Slf4j;
@Order(value = 10)
@Component
public class DynamicBuildServer implements ApplicationRunner {
public static final String SAC_APP_HEADER_KEY = "sacAppCode";
private static ConcurrentHashMap<Integer, SacLoadBalancing> SAC_LOADBALANCING_MAP = new ConcurrentHashMap<Integer, SacLoadBalancing>();
@Value("${server.vertx.server.default.port}")
private Integer serverDefaultPort;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private AppConfigService appConfigService;
@ -71,6 +65,10 @@ public class DynamicBuildServer implements ApplicationRunner {
public void run(ApplicationArguments args) throws Exception {
// 初始化redis key
redisKeyConfig.init();
// 从redis同步app配置
appConfigService.loadAppConfig();
// 从redis同步vertx配置
appConfigService.loadVertxConfig();
// 加载vertx应用配置
appStartLoadData();
}
@ -79,14 +77,26 @@ public class DynamicBuildServer implements ApplicationRunner {
* 应用启动, 从redis读取配置,初始化vertx服务
*/
private void appStartLoadData() {
VertxConfig vertxConfig = AppConfigServiceImpl.getVertxConfig();
// TODO 编解码线程池,后面优化协程等方式
Vertx VERTX = Vertx.vertx(new VertxOptions().setWorkerPoolSize(20));
VertxOptions vertxOptions = new VertxOptions();
long blockedThreadCheckInterval = vertxConfig == null || vertxConfig.getVertxOptionsConfig() == null ? -1
: vertxConfig.getVertxOptionsConfig().getBlockedThreadCheckInterval();
int workerPoolSize = vertxConfig == null || vertxConfig.getVertxOptionsConfig() == null ? -1
: vertxConfig.getVertxOptionsConfig().getWorkerPoolSize();
if (workerPoolSize != -1) {
vertxOptions.setWorkerPoolSize(workerPoolSize);
}
if (blockedThreadCheckInterval != -1) {
vertxOptions.setBlockedThreadCheckInterval(blockedThreadCheckInterval); // 不打印Thread blocked 阻塞日志
}
Vertx VERTX = Vertx.vertx(vertxOptions);
// 创建HTTP监听
// 所有ip都能访问
HttpServerOptions httpServerOptions = new HttpServerOptions().setHost("0.0.0.0");
HttpServer server = VERTX.createHttpServer(httpServerOptions);
Router mainHttpRouter = Router.router(VERTX);
VertxConfig vertxConfig = appConfigService.loadVertxConfig();
Integer serverPort = (vertxConfig == null || vertxConfig.getPort() == null) ? serverDefaultPort
: vertxConfig.getPort();
log.info("serverPort:{}", serverPort);
@ -98,30 +108,38 @@ public class DynamicBuildServer implements ApplicationRunner {
}
});
ConcurrentHashMap<String, AppConfig> cacheAppConfig = appConfigService.loadAllConfig();
// HttpClientOptions clientOptions = new HttpClientOptions();
// clientOptions.setMaxPoolSize(20); // 最大连接池大小
// clientOptions.setConnectTimeout(5000); // 连接超时 毫秒
// clientOptions.setConnectTimeout(2000); // 连接超时 毫秒
// clientOptions.setHttp2KeepAliveTimeout(1);
// clientOptions.setIdleTimeout(1000); // 连接空闲超时 毫秒
// HttpClient proxyClient = VERTX.createHttpClient(clientOptions);
HttpClient proxyClient = VERTX.createHttpClient();
HttpProxy proxy = HttpProxy.reverseProxy(proxyClient);
proxy.originSelector(request -> Future.succeededFuture(resolveOriginAddress(cacheAppConfig, request)));
proxy.originSelector(request -> Future.succeededFuture(ProxyTool.resolveOriginAddress(request)));
proxy.addInterceptor(new ProxyInterceptor() {
@Override
public Future<ProxyResponse> handleProxyRequest(ProxyContext context) {
// 修改uri context.request().setURI();
String sacAppHeaderKey = context.request().headers().get(DynamicBuildServer.SAC_APP_HEADER_KEY);
String sacAppHeaderKey = context.request().headers().get(AppConfigServiceImpl.getSacAppHeaderKey());
log.info("addInterceptor uri appCode:{}", sacAppHeaderKey);
// 判断是否需要加解析
if (AppConfigServiceImpl.appDataSecurity(sacAppHeaderKey)) {
// String data = decode(null, sacAppHeaderKey);
}
return context.sendRequest();
}
@Override
public Future<Void> handleProxyResponse(ProxyContext context) {
// 调试代码,获取reponse body
Filter filter = new Filter();
ProxyResponse proxyResponse = context.response();
Body body = proxyResponse.getBody();
proxyResponse.setBody(Body.body(filter.init(context.request().getURI(), body.stream(), false)));
// 继续拦截链
return context.sendResponse();
}
});
WebClient mainWebClient = WebClient.create(VERTX);
// mainHttpRouter.route().handler(ProxyHandler.create(proxy));
@ -132,146 +150,28 @@ public class DynamicBuildServer implements ApplicationRunner {
int pattern = 2;// 1:app,2:接口默认,3:服务接口配置限流策略
mainHttpRouter.route().handler(RateLimitHandler.create(instance, pattern)).handler(BodyHandler.create())
.handler(ProxyHandler.create(mainWebClient, proxy)).failureHandler(RestfulFailureHandler.create());
//mainHttpRouter.route().handler(BodyHandler.create()).handler(ProxyHandler.create(mainWebClient,proxy));
}
// mainHttpRouter.route().handler(BodyHandler.create()).handler(ProxyHandler.create(mainWebClient,proxy));
public SocketAddress resolveOriginAddress(ConcurrentHashMap<String, AppConfig> cacheAppConfig,
HttpServerRequest request) {
String appCode = request.getHeader(SAC_APP_HEADER_KEY);
log.info("uri appCode:{}", appCode);
// TODO 不存在, 抛异常给前端
// 服务健康检测重试
Integer periodicTime = AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy() != null
&& AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy().getPeriodicTime() > 0
? AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy().getPeriodicTime()
: 3;
AppConfig appConfig = cacheAppConfig.get(appCode);
if (appConfig != null) {
Integer environmentId = null;
SacLoadBalancing sacLoadBalancing = null;
// 2是否存在server服务配置
if (appConfig.getService() != null && appConfig.getService().size() > 0) {
// header传递服务名, 这样就不需要遍历,提高性能
for (SacService service : appConfig.getService()) {
// uri是否匹配
boolean match = false;
for (GatewayInterface gatewayInterface : service.getUriList()) {
String domain = request.authority().port() == -1 ? request.authority().host()
: request.authority().host() + ":" + request.authority().port();
// uri匹配(正则或全量匹配)
match = gatewayInterface.isUriRegular() ? regexMatch(gatewayInterface.getUri(), request.uri())
: StringUtils.equals(gatewayInterface.getUri(), request.uri());
match = match ? StringUtils.equals(gatewayInterface.getMethod(), request.method().name())
: false;
// domain匹配
if (match) {
boolean domainVertify = service.getRouter() != null
&& StringUtils.isNotBlank(service.getRouter().getDomain())
&& StringUtils.equals(service.getRouter().getDomain(), domain) ? true : false;
if (domainVertify) {
environmentId = service.getRouter().getEnvironmentId();
} else {
environmentId = appConfig.getEnvironmentConfig().getDefaultId();
}
break;
}
}
}
} else {
environmentId = appConfig.getEnvironmentConfig().getDefaultId();
}
// 初始化负载均衡
if (SAC_LOADBALANCING_MAP.get(environmentId) != null) {
sacLoadBalancing = SAC_LOADBALANCING_MAP.get(environmentId);
} else {
// 并发影响不大, 只要初始化成功一次即可
List<Node> nodeList = appConfig.getEnvironmentConfig().getEnvironmentGroup().get(environmentId);
if (nodeList != null && nodeList.size() > 0) {
// 初始化负载均衡算法
sacLoadBalancing = roundRobin(nodeList);
SAC_LOADBALANCING_MAP.put(environmentId, sacLoadBalancing);
} else {
// TODO 抛出异常
// TODO 是否开启健康检测
long timerID = VERTX.setPeriodic(periodicTime, id -> {
Set<String> set = redisTemplate.opsForZSet().range(RedisKeyConfig.APP_CONFIG_SET_KEY, 0, -1);
for (String appCode : set) {
Set<String> setAddressRetryStrategy = redisTemplate.opsForZSet()
.range(RedisKeyConfig.VERTX_ADDRESS_RETRY_STRATEGY_SET_KEY + ":" + appCode, 0, -1);
for (String address : setAddressRetryStrategy) {
// 发起请求,测试服务是否可用
// TODO 调用后端配置的健康检测地址
}
}
});
// TODO 区分httpshttp
Node node = sacLoadBalancing.selectNode();
SocketAddress socketAddress = SocketAddress.inetSocketAddress(node.getPort(), node.getIp());
log.info("负载均衡跳转地址:{}", socketAddress.host() + ";" + socketAddress.port());
return socketAddress;
}
// TODO 如果没找到, 如何处理
return null;
}
// private String getUriAppCode(String uri) {
// // 1判断appCode
// int count = 0;
// int appCodeLen = 16;
// StringBuffer appCode = new StringBuffer();
// for (int i = 0; i < uri.length(); i++) {
// // 限制长度
// if (appCode.length() == appCodeLen) {
// break;
// }
// char ch = uri.charAt(i);
// if (ch == '/') {
// if (++count == 2) {
// break;
// }
// } else {
// appCode.append(ch);
// }
// }
// return appCode.toString();
// }
private static boolean regexMatch(String pattern, String target) {
return Pattern.matches(pattern, target);
}
/***
* 解密
*/
private static String decode(AppConfig appConfig, String bodyJson) {
String algorithm = appConfig.getDataSecurity().getAlgorithm();
String data = null;
switch (algorithm) {
case "aes":
// 解密
String key = appConfig.getDataSecurity().getKey();
data = MainSecurity.aesDecrypt(bodyJson, key);
break;
default:
break;
}
return data;
}
/***
* 加密
*/
private static String encryption(AppConfig appConfig, String responseData) {
String algorithm = appConfig.getDataSecurity().getAlgorithm();
String data = null;
switch (algorithm) {
case "aes":
String key = appConfig.getDataSecurity().getKey();
data = MainSecurity.aesEncrypt(responseData.toString(), key);
break;
default:
break;
}
return data;
}
private static SacLoadBalancing roundRobin(List<Node> nodeList) {
WeightedRoundRobin weightedRoundRobin = new WeightedRoundRobin();
for (Node node : nodeList) {
int weight = node.getWeight() != null ? node.getWeight() : 1;
node.setWeight(weight);
node.setCurrentWeight(weight);
node.setEffectiveWeight(weight);
WeightedRoundRobin.totalWeight += node.getEffectiveWeight();
}
weightedRoundRobin.init(nodeList);
return weightedRoundRobin;
}
}

View File

@ -1,18 +1,18 @@
package com.sf.vertx.service;
import java.util.concurrent.ConcurrentHashMap;
import com.sf.vertx.api.pojo.AppConfig;
import com.sf.vertx.api.pojo.VertxConfig;
import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing;
public interface AppConfigService {
ConcurrentHashMap<String, AppConfig> loadAllConfig();
void loadAppConfig() throws Exception;
void addAppConfig(String appConfig);
void deleteAppConfig(AppConfig appConfig);
VertxConfig loadVertxConfig();
void loadVertxConfig();
void addVertxConfig(VertxConfig vertxConfig);
}

View File

@ -1,7 +1,10 @@
package com.sf.vertx.service.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -15,9 +18,15 @@ import com.alibaba.fastjson2.TypeReference;
import com.sf.vertx.api.pojo.ApiCurrentLimitingConfig;
import com.sf.vertx.api.pojo.AppConfig;
import com.sf.vertx.api.pojo.AppCurrentLimitingConfig;
import com.sf.vertx.api.pojo.Node;
import com.sf.vertx.api.pojo.RouteContent;
import com.sf.vertx.api.pojo.SacService;
import com.sf.vertx.api.pojo.VertxConfig;
import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing;
import com.sf.vertx.constans.RedisKeyConfig;
import com.sf.vertx.service.AppConfigService;
import com.sf.vertx.utils.ProxyTool;
import com.sf.vertx.utils.SpringUtils;
import lombok.extern.slf4j.Slf4j;
@ -28,9 +37,36 @@ public class AppConfigServiceImpl implements AppConfigService {
private String vertxEnvironment;
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static VertxConfig VERTX_CONFIG = new VertxConfig();
private static final ConcurrentHashMap<String, AppConfig> CACHE_APP_CONFIG = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, AppCurrentLimitingConfig> CACHE_APP_CURRENT_LIMITING_CONFIG = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, ApiCurrentLimitingConfig> CACHE_API_CURRENT_LIMITING_CONFIG = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, SacService> CACHE_APP_SERVICE = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, SacLoadBalancing> SAC_LOADBALANCING_MAP = new ConcurrentHashMap<String, SacLoadBalancing>();
@SuppressWarnings("unchecked")
public static void addAddressRetryStrategy(String address, String appCode) {
String setKey = RedisKeyConfig.VERTX_ADDRESS_RETRY_STRATEGY_SET_KEY + ":" + appCode;
String key = RedisKeyConfig.VERTX_ADDRESS_RETRY_STRATEGY_KEY + ":" + appCode + ":" + address;
RedisTemplate<String, String> redisTemplate = SpringUtils.getBean("redisTemplate", RedisTemplate.class);
Long thresholdCount = redisTemplate.opsForValue().increment(key);
// log.info("redis app threshold: {}", redisTemplate.opsForValue().get(key));
Integer timeWindow = VERTX_CONFIG.getAddressRetryStrategy() != null
&& VERTX_CONFIG.getAddressRetryStrategy().getTimeWindow() > 0
? VERTX_CONFIG.getAddressRetryStrategy().getTimeWindow()
: 20;
if(thresholdCount == 1) { // 创建,才设置时间窗口
redisTemplate.expire(key, timeWindow, TimeUnit.SECONDS);
}
Integer threshold = AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy() != null
&& AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy().getThreshold() > 0
? AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy().getThreshold()
: 3;
if(thresholdCount > threshold) {
// 设置服务不可用
redisTemplate.opsForZSet().add(setKey, appCode + ":" + address, 0);
}
}
public static boolean appDataSecurity(String appCode) {
return CACHE_APP_CONFIG.get(appCode) != null && CACHE_APP_CONFIG.get(appCode).getDataSecurity() != null ? true
@ -40,15 +76,45 @@ public class AppConfigServiceImpl implements AppConfigService {
public static AppCurrentLimitingConfig getAppCurrentLimitingConfig(String appCode) {
return CACHE_APP_CURRENT_LIMITING_CONFIG.get(appCode);
}
public static ApiCurrentLimitingConfig getApiCurrentLimitingConfig(String appCode) {
return CACHE_API_CURRENT_LIMITING_CONFIG.get(appCode);
}
public static AppConfig getAppConfig(String appCode) {
return CACHE_APP_CONFIG.get(appCode);
}
public static VertxConfig getVertxConfig() {
return VERTX_CONFIG;
}
public static String getSacAppHeaderKey() {
return VERTX_CONFIG.getAppHeaderKey() != null ? VERTX_CONFIG.getAppHeaderKey() : "sacAppCode";
}
public static String getAppHeaderServiceName() {
return VERTX_CONFIG.getAppHeaderServiceName() != null ? VERTX_CONFIG.getAppHeaderServiceName()
: "sacAppServiceName";
}
/***
* 加载vertx配置
*/
public void loadVertxConfig() {
String vertxConfigKey = RedisKeyConfig.VERTX_CONFIG_STRING_KEY;
String vertxConfigValue = redisTemplate.opsForValue().get(vertxConfigKey);
if (StringUtils.isNotBlank(vertxConfigValue)) {
VERTX_CONFIG = JSONObject.parseObject(vertxConfigValue, VertxConfig.class);
}
}
/***
* 从redis加载数据
*
* @throws Exception
*/
public ConcurrentHashMap<String, AppConfig> loadAllConfig() {
public void loadAppConfig() throws Exception {
Set<String> set = redisTemplate.opsForZSet().range(RedisKeyConfig.APP_CONFIG_SET_KEY, 0, -1);
for (String appCode : set) {
String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appCode;
@ -58,39 +124,56 @@ public class AppConfigServiceImpl implements AppConfigService {
});
CACHE_APP_CONFIG.put(appCode, appConfig);
// appapi默认限流
CACHE_API_CURRENT_LIMITING_CONFIG.put(appCode, appConfig.getApiCurrentLimitingConfig());
CACHE_APP_CURRENT_LIMITING_CONFIG.put(appCode, appConfig.getAppCurrentLimitingConfig());
// app router负载均衡
for (SacService sacService : appConfig.getService()) {
CACHE_APP_SERVICE.put(appCode + ";" + sacService.getServiceName(), sacService);
List<Node> nodeList = new ArrayList<>();
// 获取service模式
if (StringUtils.equals(sacService.getServiceModel(), "NORMAL")) {
Node node = new Node();
node.setIp(sacService.getServerAddress().getHost());
node.setPort(sacService.getServerAddress().getPort());
node.setWeight(0);
node.setProtocol(sacService.getServerAddress().getProtocol());
nodeList.add(node);
} else if (StringUtils.equals(sacService.getServiceModel(), "ROUTE")) {
if (sacService.getRouteConfig() != null
&& StringUtils.equals(sacService.getRouteConfig().getRouteType(), "WEIGHT_ROUTE")) {
for (RouteContent routeContent : sacService.getRouteConfig().getRouteContent()) {
Node node = new Node();
node.setIp(routeContent.getServerAddress().getHost());
node.setPort(routeContent.getServerAddress().getPort());
node.setWeight(routeContent.getWeight() != null && routeContent.getWeight() > 0
? routeContent.getWeight()
: 0);
node.setProtocol(sacService.getServerAddress().getProtocol());
nodeList.add(node);
}
}
}
if (nodeList.size() > 0) {
// 初始化负载均衡算法
String key = appCode + ";" + sacService.getServiceName();
SacLoadBalancing sacLoadBalancing = ProxyTool.roundRobin(nodeList);
SAC_LOADBALANCING_MAP.put(key, sacLoadBalancing);
}
}
}
}
// TODO 限流
ApiCurrentLimitingConfig apiCurrentLimitingConfig = new ApiCurrentLimitingConfig();
apiCurrentLimitingConfig.setThreshold(1);
apiCurrentLimitingConfig.setTimeWindow(10);
apiCurrentLimitingConfig.setDefaultResponse(
"{\n" + " \"msg\": \"接口繁忙请重试\",\n" + " \"code\": 501,\n" + " \"data\": \"到达限流阈值\"\n" + "}");
CACHE_API_CURRENT_LIMITING_CONFIG.put("dsafdsfadafhappd", apiCurrentLimitingConfig);
CACHE_API_CURRENT_LIMITING_CONFIG.put("dsafdsfadafhappC", apiCurrentLimitingConfig);
AppCurrentLimitingConfig appCurrentLimitingConfig = new AppCurrentLimitingConfig();
appCurrentLimitingConfig.setThreshold(1);
appCurrentLimitingConfig.setTimeWindow(10);
appCurrentLimitingConfig.setDefaultResponse(
"{\n" + " \"msg\": \"接口繁忙请重试\",\n" + " \"code\": 501,\n" + " \"data\": \"到达限流阈值\"\n" + "}");
CACHE_APP_CURRENT_LIMITING_CONFIG.put("dsafdsfadafhappd", appCurrentLimitingConfig);
CACHE_APP_CURRENT_LIMITING_CONFIG.put("dsafdsfadafhappC", appCurrentLimitingConfig);
log.info("cacheAppConfig:{}", JSON.toJSONString(CACHE_APP_CONFIG));
return CACHE_APP_CONFIG;
}
public AppConfig getAppConfig(String appCode) {
String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appCode;
String appCodeValue = redisTemplate.opsForValue().get(appCodeKey);
if (StringUtils.isNotBlank(appCodeValue)) {
AppConfig appConfig = JSONObject.parseObject(appCodeValue, AppConfig.class);
return appConfig;
}
return null;
public static SacLoadBalancing getSacLoadBalancing(String appCode, String serviceName) {
return SAC_LOADBALANCING_MAP.get(appCode + ";" + serviceName);
}
public static SacService getSacService(String appCode, String serviceName) {
return CACHE_APP_SERVICE.get(appCode + ";" + serviceName);
}
/***
@ -123,19 +206,6 @@ public class AppConfigServiceImpl implements AppConfigService {
// String queue = RedisKeyConfig.VETX_ENVIRONMENT_KEY+vertxEnvironment+":list";
}
/***
* 加载vertx配置
*/
public VertxConfig loadVertxConfig() {
String vertxConfigKey = RedisKeyConfig.VERTX_CONFIG_STRING_KEY;
String vertxConfigValue = redisTemplate.opsForValue().get(vertxConfigKey);
if (StringUtils.isNotBlank(vertxConfigValue)) {
VertxConfig vertxConfig = JSONObject.parseObject(vertxConfigValue, VertxConfig.class);
return vertxConfig;
}
return null;
}
/***
* 新增修改
*

View File

@ -0,0 +1,91 @@
package com.sf.vertx.utils;
import java.util.concurrent.atomic.AtomicBoolean;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class Filter implements ReadStream<Buffer> {
private final AtomicBoolean paused = new AtomicBoolean();
private ReadStream<Buffer> stream;
private Buffer expected = Buffer.buffer();
private Handler<Buffer> dataHandler;
private Handler<Throwable> exceptionHandler;
private Handler<Void> endHandler;
/***
*
* @param s
* @param encryption true: 请求参数解密, false: 返回数据 加密
* @return
*/
public ReadStream<Buffer> init(String uri, ReadStream<Buffer> s, Boolean encryption) {
stream = s;
stream.handler(buff -> {
if (dataHandler != null) {
byte[] bytes = new byte[buff.length()];
for (int i = 0; i < bytes.length; i++) {
// bytes[i] = (byte) (('a' - 'A') + buff.getByte(i));
bytes[i] = (byte) (buff.getByte(i));
}
String res = new String(bytes);
log.info("request uri:{}, return data:{}", uri, res);
expected.appendBytes(bytes);
dataHandler.handle(Buffer.buffer(bytes));
}
});
stream.exceptionHandler(err -> {
if (exceptionHandler != null) {
exceptionHandler.handle(err);
}
});
stream.endHandler(v -> {
if (endHandler != null) {
endHandler.handle(v);
}
});
return this;
}
@Override
public ReadStream<Buffer> pause() {
paused.set(true);
stream.pause();
return this;
}
@Override
public ReadStream<Buffer> resume() {
stream.resume();
return this;
}
@Override
public ReadStream<Buffer> fetch(long amount) {
stream.fetch(amount);
return this;
}
@Override
public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
return this;
}
@Override
public ReadStream<Buffer> handler(Handler<Buffer> handler) {
dataHandler = handler;
return this;
}
@Override
public ReadStream<Buffer> endHandler(Handler<Void> handler) {
endHandler = handler;
return this;
}
}

View File

@ -0,0 +1,123 @@
package com.sf.vertx.utils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import com.sf.vertx.api.pojo.AppConfig;
import com.sf.vertx.api.pojo.Node;
import com.sf.vertx.api.pojo.RouteContent;
import com.sf.vertx.api.pojo.SacService;
import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing;
import com.sf.vertx.arithmetic.roundRobin.WeightedRoundRobin;
import com.sf.vertx.service.AppConfigService;
import com.sf.vertx.service.impl.AppConfigServiceImpl;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.handler.HttpException;
import lombok.extern.slf4j.Slf4j;
/***
* 反向代理工具类
*
* @author xy
*
*/
@Slf4j
public class ProxyTool {
public final static Map<Integer, String> _ERROR = new HashMap<>();
static {
_ERROR.put(400, "Bad Request");
_ERROR.put(401, "Unauthorized");
_ERROR.put(403, "Forbidden");
_ERROR.put(404, "Not Found");
_ERROR.put(413, "Request Entity Too Large");
_ERROR.put(415, "Unsupported Media Type");
_ERROR.put(500, "Internal Server Error");
_ERROR.put(502, "Bad Gateway");
_ERROR.put(503, "Service Unavailable");
_ERROR.put(504, "Gateway Timeout");
_ERROR.put(504, "Gateway Timeout");
_ERROR.put(10000, "无法找到路由地址");
_ERROR.put(10001, "加解密算法传递错误");
};
public static SocketAddress resolveOriginAddress(HttpServerRequest request) {
String appCode = request.getHeader(AppConfigServiceImpl.getSacAppHeaderKey());
String appHeaderServiceName = request.getHeader(AppConfigServiceImpl.getAppHeaderServiceName());
log.info("uri:{}, header appCode:{},appHeaderServiceName:{}", request.uri(), appCode, appHeaderServiceName);
AppConfig appConfig = AppConfigServiceImpl.getAppConfig(appCode);
if (appConfig != null) {
SacService sacService = AppConfigServiceImpl.getSacService(appCode, appHeaderServiceName);
if (sacService != null) {
SacLoadBalancing sacLoadBalancing = null;
// 获取service模式
if (StringUtils.equals(sacService.getServiceModel(), "NORMAL")
|| StringUtils.equals(sacService.getServiceModel(), "ROUTE")) {
sacLoadBalancing = AppConfigServiceImpl.getSacLoadBalancing(appCode, appHeaderServiceName);
} else if (sacService.getRouteConfig() != null
&& StringUtils.equals(sacService.getRouteConfig().getRouteType(), "HEADER_ROUTE")) {
List<Node> nodeList = new ArrayList<>();
for (RouteContent routeContent : sacService.getRouteConfig().getRouteContent()) {
// 判断是否uri匹配
String headerRouteKey = request.getHeader(routeContent.getHeaderKey());
if (routeContent.getHeaderValues() != null
&& routeContent.getHeaderValues().contains(headerRouteKey)) {
Node node = new Node();
node.setIp(routeContent.getServerAddress().getHost());
node.setPort(routeContent.getServerAddress().getPort());
node.setWeight(0);
node.setProtocol(sacService.getServerAddress().getProtocol());
nodeList.add(node);
break;
}
}
if (nodeList.size() > 0) {
sacLoadBalancing = ProxyTool.roundRobin(nodeList);
}
}
if (sacLoadBalancing == null) {
log.error("app config error. appCode:{},serviceName:{},RouteType:{}, not find config.", appCode,
appHeaderServiceName, sacService.getRouteConfig().getRouteType());
throw new HttpException(10000, _ERROR.get(10000));
}
// TODO 区分httpshttp
Node node = sacLoadBalancing.selectNode();
SocketAddress socketAddress = SocketAddress.inetSocketAddress(node.getPort(), node.getIp());
log.info("sacLoadBalancing address:{},port:{}", socketAddress.host(), socketAddress.port());
return socketAddress;
} else {
log.error("app config error. appCode:{},serviceName:{}, appCode, serviceName not find config.", appCode,
appHeaderServiceName);
throw new HttpException(10000, _ERROR.get(10000));
}
}
log.error("app config error. appCode:{},serviceName:{}, appCode, serviceName not find config.", appCode,
appHeaderServiceName);
throw new HttpException(10000, _ERROR.get(10000));
}
private static boolean regexMatch(String pattern, String target) {
return Pattern.matches(pattern, target);
}
public static SacLoadBalancing roundRobin(List<Node> nodeList) {
WeightedRoundRobin weightedRoundRobin = new WeightedRoundRobin();
for (Node node : nodeList) {
int weight = node.getWeight() != null ? node.getWeight() : 1;
node.setWeight(weight);
node.setCurrentWeight(weight);
node.setEffectiveWeight(weight);
WeightedRoundRobin.totalWeight += node.getEffectiveWeight();
}
weightedRoundRobin.init(nodeList);
return weightedRoundRobin;
}
}

View File

@ -10,6 +10,7 @@
*/
package io.vertx.httpproxy.impl;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -19,10 +20,13 @@ import java.util.function.BiFunction;
import org.apache.commons.lang3.StringUtils;
import com.sf.vertx.init.DynamicBuildServer;
import com.sf.vertx.api.pojo.DataSecurity;
import com.sf.vertx.api.pojo.VertxConfig;
import com.sf.vertx.security.MainSecurity;
import com.sf.vertx.service.impl.AppConfigServiceImpl;
import com.sf.vertx.utils.ProxyTool;
import cn.hutool.core.thread.ThreadUtil;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
@ -35,8 +39,10 @@ import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.handler.HttpException;
import io.vertx.httpproxy.Body;
import io.vertx.httpproxy.HttpProxy;
import io.vertx.httpproxy.ProxyContext;
@ -163,9 +169,30 @@ public class ReverseProxy implements HttpProxy {
});
}
/***
* 返回错误
*
* @param proxyRequest
* @param sc
*/
private void end(ProxyRequest proxyRequest, int sc) {
proxyRequest.response().release().setStatusCode(sc).putHeader(HttpHeaders.CONTENT_LENGTH, "0").setBody(null)
.send();
// TODO 处理反向代理返回结果
if (sc == 502) {
JsonObject dataJson = new JsonObject(
"{\n" + " \"msg\": \"服务连接失败\",\n" + " \"code\": 502,\n" + " \"data\": \"服务连接失败\"\n" + "}");
proxyRequest.response().release().setStatusCode(sc).putHeader("content-type", "application/json")
.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(dataJson.size()))
.setBody(Body.body(dataJson.toBuffer())).send();
} else {
// proxyRequest.response().release().setStatusCode(sc).putHeader(HttpHeaders.CONTENT_LENGTH, "0").setBody(null)
// .send();
String commonError = "{\n" + " \"msg\": \"网关执行失败,默认错误信息\",\n" + " \"code\": " + sc + ",\n"
+ " \"data\": \"网关执行失败,默认错误信息\"\n" + "}";
Buffer buffer = Buffer.buffer(commonError);
proxyRequest.response().release().setStatusCode(sc).putHeader("content-type", "application/json")
.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(commonError.length()))
.setBody(Body.body(buffer)).send();
}
}
private Future<HttpClientRequest> resolveOrigin(HttpServerRequest proxiedRequest) {
@ -225,39 +252,62 @@ public class ReverseProxy implements HttpProxy {
}
private Future<ProxyResponse> sendProxyRequest(ProxyRequest proxyRequest) {
// TODO 改造了这个地方
// 创建一个响应并设置参数
// ctx.request().response().setStatusCode(200)
// .putHeader("content-type", "text/plain").end(body);
// TODO 服务熔断策略, 如果已经熔断,将剔除负载均衡策略
// 发起一个请求
String sacAppHeaderKey = proxyRequest.headers().get(DynamicBuildServer.SAC_APP_HEADER_KEY);
String sacAppHeaderKey = proxyRequest.headers().get(AppConfigServiceImpl.getSacAppHeaderKey());
if (StringUtils.isNotBlank(sacAppHeaderKey) && AppConfigServiceImpl.appDataSecurity(sacAppHeaderKey)) {
String body = ctx.getBodyAsString();
String bodyData = MainSecurity.aesDecrypt(body, "dadddsdfadfadsfa33323223");
String bodyData = bodyDecrypt(body, sacAppHeaderKey);
VertxConfig vertxConfig = AppConfigServiceImpl.getVertxConfig();
long timeout = vertxConfig.getHttpClientOptionsConfig() != null
&& vertxConfig.getHttpClientOptionsConfig().getTimeout() > 0
? vertxConfig.getHttpClientOptionsConfig().getTimeout()
: 1000;
long idleTimeout = vertxConfig.getHttpClientOptionsConfig() != null
&& vertxConfig.getHttpClientOptionsConfig().getIdleTimeout() > 0
? vertxConfig.getHttpClientOptionsConfig().getTimeout()
: 1000;
return Future.future(p -> {
mainWebClient.post(9198, "127.0.0.1", "/vertx/body").sendJson(bodyData, h -> {
if (h.succeeded()) {
// 释放资源
proxyRequest.release();
JsonObject responseData = h.result().bodyAsJsonObject();
log.info("responseData:{}", responseData);
// 加密
String dataStr = MainSecurity.aesEncrypt(responseData.toString(), "dadddsdfadfadsfa33323223");
log.info("aesEncrypt dataStr:{}", dataStr);
ctx.request().response().setStatusCode(200).putHeader("content-type", "application/json")
.end(dataStr);
Buffer buffer = Buffer.buffer(dataStr);
ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200)
.putHeader("content-type", "application/json").setBody(Body.body(buffer));
p.complete(proxyResponse);
} else {
p.fail(h.cause());
}
});
SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest());
mainWebClient.post(socketAddress.port(), socketAddress.host(), proxyRequest.getURI())
.putHeaders(proxyRequest.headers()).timeout(timeout).idleTimeout(idleTimeout)
.sendJson(bodyData, h -> {
if (h.succeeded()) {
// 释放资源
proxyRequest.release();
JsonObject responseData = h.result().bodyAsJsonObject();
log.info("responseData:{}", responseData);
// 加密
String dataStr = bodyEncrypt(bodyData, sacAppHeaderKey);
log.info("aesEncrypt dataStr:{}", dataStr);
Buffer buffer = Buffer.buffer(dataStr);
ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200)
.putHeader("content-type", "application/json").setBody(Body.body(buffer));
p.complete(proxyResponse);
} else {
log.info("error: {}", h.cause());
if (h.cause() instanceof ConnectException) {
log.info("connection url is error:{}", h.getClass());
// TODO 是否开启健康检测, 需要做重试,熔断
ThreadUtil.execAsync(() -> {
String appCode = proxyRequest.proxiedRequest().getHeader(AppConfigServiceImpl.getSacAppHeaderKey());
AppConfigServiceImpl.addAddressRetryStrategy(socketAddress.hostAddress(), appCode);
});
}
end(proxyRequest, 502);
}
});
});
} else {
Future<HttpClientRequest> f = resolveOrigin(proxyRequest.proxiedRequest());
f.onFailure(err -> {
log.info("error:{}", err);
if (err instanceof ConnectException) {
// TODO 配置重试策略
// Connection refused: /127.0.0.1:9199
log.info("connection url is error:{}", err.getMessage());
}
// Should this be done here ? I don't think so
HttpServerRequest proxiedRequest = proxyRequest.proxiedRequest();
proxiedRequest.resume();
@ -295,5 +345,29 @@ public class ReverseProxy implements HttpProxy {
return sendResponse();
}
private String bodyEncrypt(String body, String sacAppHeaderKey) {
DataSecurity dataSecurity = AppConfigServiceImpl.getAppConfig(sacAppHeaderKey).getDataSecurity();
switch (dataSecurity.getAlgorithm()) {
case "AES":
return MainSecurity.aesEncrypt(body, dataSecurity.getPrivateKey());
default:
break;
}
log.info(" appcode:{}, encrypt key config is error.", sacAppHeaderKey);
throw new HttpException(10001);
}
private String bodyDecrypt(String body, String sacAppHeaderKey) {
DataSecurity dataSecurity = AppConfigServiceImpl.getAppConfig(sacAppHeaderKey).getDataSecurity();
switch (dataSecurity.getAlgorithm()) {
case "AES":
return MainSecurity.aesDecrypt(body, dataSecurity.getPrivateKey());
default:
break;
}
log.info(" appcode:{}, decrypt key config is error.", sacAppHeaderKey);
throw new HttpException(10001);
}
}
}