From cbdc0a8279bbe3f42ae3631ecf64bf07d4d26bc6 Mon Sep 17 00:00:00 2001 From: ztzh_xieyun Date: Sun, 28 Apr 2024 19:37:42 +0800 Subject: [PATCH] =?UTF-8?q?vertx=E6=94=B9=E4=B8=BA=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E5=8A=A0=E8=BD=BD=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vertx/api/pojo/AddressRetryStrategy.java | 13 ++ .../java/com/sf/vertx/api/pojo/AppConfig.java | 6 +- .../com/sf/vertx/api/pojo/DataSecurity.java | 5 +- .../sf/vertx/api/pojo/EnvironmentConfig.java | 4 - .../sf/vertx/api/pojo/GatewayInterface.java | 2 + .../api/pojo/HttpClientOptionsConfig.java | 9 +- .../main/java/com/sf/vertx/api/pojo/Node.java | 21 +- .../com/sf/vertx/api/pojo/RouteContent.java | 16 ++ .../java/com/sf/vertx/api/pojo/Router.java | 8 +- .../com/sf/vertx/api/pojo/SacService.java | 7 +- .../com/sf/vertx/api/pojo/ServerAddress.java | 30 +-- .../java/com/sf/vertx/api/pojo/Strategy.java | 11 + .../com/sf/vertx/api/pojo/VertxConfig.java | 3 + .../sf/vertx/api/pojo/VertxOptionsConfig.java | 2 +- .../main/java/com/sf/AdminApplication.java | 1 - .../com/sf/vertx/constans/RedisKeyConfig.java | 6 +- .../com/sf/vertx/handle/BodyHandlerImpl.java | 22 +- .../handle/RateLimitHandlerRedisImpl.java | 4 +- .../com/sf/vertx/handle/RedisRateLimiter.java | 50 ++-- .../handle/RestfulFailureHandlerImpl.java | 8 +- .../com/sf/vertx/init/DynamicBuildServer.java | 218 +++++------------- .../sf/vertx/service/AppConfigService.java | 14 +- .../service/impl/AppConfigServiceImpl.java | 158 +++++++++---- .../main/java/com/sf/vertx/utils/Filter.java | 91 ++++++++ .../java/com/sf/vertx/utils/ProxyTool.java | 123 ++++++++++ .../io/vertx/httpproxy/impl/ReverseProxy.java | 130 ++++++++--- 26 files changed, 627 insertions(+), 335 deletions(-) create mode 100644 sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/AddressRetryStrategy.java create mode 100644 sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/RouteContent.java create mode 100644 sf-vertx/src/main/java/com/sf/vertx/utils/Filter.java create mode 100644 sf-vertx/src/main/java/com/sf/vertx/utils/ProxyTool.java diff --git a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/AddressRetryStrategy.java b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/AddressRetryStrategy.java new file mode 100644 index 0000000..30e3a53 --- /dev/null +++ b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/AddressRetryStrategy.java @@ -0,0 +1,13 @@ +package com.sf.vertx.api.pojo; + +import java.io.Serializable; + +import lombok.Data; + +@Data +public class AddressRetryStrategy implements Serializable { + private static final long serialVersionUID = -336914981251646244L; + private Integer threshold = 3; // 2, // 失败次数 + private Integer timeWindow = 20; //1, //时间窗口,单位s + private Integer periodicTime = 3; // vertx定时任务循环执行时间间隔 +} diff --git a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/AppConfig.java b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/AppConfig.java index 9eb08f5..9380168 100644 --- a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/AppConfig.java +++ b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/AppConfig.java @@ -10,9 +10,11 @@ public class AppConfig implements Serializable { private static final long serialVersionUID = 1518165296680157119L; private String appCode; // 应用唯一码, app访问uri添加前缀,用于网关区分多应用 private boolean exclusiveService; // 预留字段, 独立端口 - private Integer exclusiveGatewayConfigId; // 独享网关配置编号 + private Integer exclusiveGatewayConfigId; // 预留字段, 独享网关配置编号 private EnvironmentConfig environmentConfig; // 环境配置 private List service; // 服务 - private AdvancedConfig advancedConfig; // 高级配置 private DataSecurity dataSecurity; // 数据加解密 + private ApiCurrentLimitingConfig apiCurrentLimitingConfig; // 接口限流配置 + private AppCurrentLimitingConfig appCurrentLimitingConfig; // APP限流配置 + private AdvancedConfig advancedConfig; // 高级配置 } diff --git a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/DataSecurity.java b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/DataSecurity.java index ec515a1..7cef0ad 100644 --- a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/DataSecurity.java +++ b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/DataSecurity.java @@ -7,8 +7,7 @@ import lombok.Data; @Data public class DataSecurity implements Serializable { private static final long serialVersionUID = 5034274428665340830L; - private String key; //加密串 - private String algorithm; // 国密加解密 + private String algorithm; // 加密算法(ECC、RSA 和国密(SM2)) private String publicKey; // 公钥 - private String privateKey; // 私钥 + private String privateKey; // 私钥 (当加密算法为 ECC 或国密SM2时,填写私钥内容。当加密算法为 RSA 时,分别填写公私钥内容。) } diff --git a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/EnvironmentConfig.java b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/EnvironmentConfig.java index 0c6431f..c0e5e0b 100644 --- a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/EnvironmentConfig.java +++ b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/EnvironmentConfig.java @@ -9,9 +9,5 @@ import lombok.Data; @Data public class EnvironmentConfig implements Serializable { private static final long serialVersionUID = -3952046909425019869L; - private Integer defaultId; // 默认环境配置编号 - private Map> environmentGroup; // 环境节点 - - } diff --git a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/GatewayInterface.java b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/GatewayInterface.java index dcb921f..22f35a6 100644 --- a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/GatewayInterface.java +++ b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/GatewayInterface.java @@ -10,4 +10,6 @@ public class GatewayInterface implements Serializable { private String uri; private boolean uriRegular; // uri 正则 private String method; // 大写 + private Strategy strategy; // 策略 + } diff --git a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/HttpClientOptionsConfig.java b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/HttpClientOptionsConfig.java index 20aa4ae..e16fac5 100644 --- a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/HttpClientOptionsConfig.java +++ b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/HttpClientOptionsConfig.java @@ -7,8 +7,9 @@ import lombok.Data; @Data public class HttpClientOptionsConfig implements Serializable { private static final long serialVersionUID = -6302301564759941097L; - private int maxPoolSize; - private int connectTimeout; - private int http2KeepAliveTimeout; - private int idleTimeout; + private Integer maxPoolSize; + private Integer connectTimeout; + private Integer http2KeepAliveTimeout; + private Integer idleTimeout; + private Integer timeout; } diff --git a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/Node.java b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/Node.java index a06af7e..664c7d9 100644 --- a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/Node.java +++ b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/Node.java @@ -21,8 +21,7 @@ public class Node implements Comparable, Serializable { private Integer weight; private Integer effectiveWeight; private Integer currentWeight; - private boolean createHttp; // 协议 - private boolean createHttps; + private String protocol; // 协议 public Node() { } @@ -40,21 +39,13 @@ public class Node implements Comparable, Serializable { this.effectiveWeight = effectiveWeight; this.currentWeight = currentWeight; } - - public boolean isCreateHttp() { - return createHttp; + + public String getProtocol() { + return protocol; } - public void setCreateHttp(boolean createHttp) { - this.createHttp = createHttp; - } - - public boolean isCreateHttps() { - return createHttps; - } - - public void setCreateHttps(boolean createHttps) { - this.createHttps = createHttps; + public void setProtocol(String protocol) { + this.protocol = protocol; } public String getIp() { diff --git a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/RouteContent.java b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/RouteContent.java new file mode 100644 index 0000000..6af99d8 --- /dev/null +++ b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/RouteContent.java @@ -0,0 +1,16 @@ +package com.sf.vertx.api.pojo; + +import java.io.Serializable; +import java.util.List; + +import lombok.Data; + +@Data +public class RouteContent implements Serializable { + private static final long serialVersionUID = -4405058529530680618L; + private ServerAddress serverAddress; // 服务地址 + private Integer weight; //权重值 + private String headerKey; //请求头 + private List headerValues; // ["v1","v2"], + private String matchType; // 匹配类型,EQ,IN +} diff --git a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/Router.java b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/Router.java index 2464ce1..3c67134 100644 --- a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/Router.java +++ b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/Router.java @@ -1,16 +1,14 @@ package com.sf.vertx.api.pojo; import java.io.Serializable; -import java.util.Map; +import java.util.List; import lombok.Data; @Data public class Router implements Serializable { private static final long serialVersionUID = -4471811880134025210L; - private String domain; // 域名 - private Map headers; // 头字段 - private String headerVal; // 头字段 - private Integer environmentId; // 环境配置编号 + private String routeType; // 路由类型 WEIGHT_ROUTE ,HEADER_ROUTE + private List routeContent; // 路由的配置信息 } diff --git a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/SacService.java b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/SacService.java index a30617b..86384a0 100644 --- a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/SacService.java +++ b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/SacService.java @@ -8,8 +8,9 @@ import lombok.Data; @Data public class SacService implements Serializable { private static final long serialVersionUID = -5171112142954536813L; - private Strategy strategy; // 策略 + private String serviceName; // 服务名 + private String serviceModel; // 模式, NORMAL, ROUTE + private ServerAddress serverAddress; // NORMAL模式的服务地址 private List uriList; // uri列表 - private Router router; // 路由 - + private Router routeConfig; // 路由 } diff --git a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/ServerAddress.java b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/ServerAddress.java index ce814a2..caddaec 100644 --- a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/ServerAddress.java +++ b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/ServerAddress.java @@ -1,14 +1,16 @@ -//package com.sf.vertx.api.pojo; -// -//import java.io.Serializable; -// -//import lombok.Data; -// -//@Data -//public class ServerAddress implements Serializable { -// private static final long serialVersionUID = 2821255113510132943L; -// private boolean createHttp; // 协议 -// private boolean createHttps; -// private String ip; -// private int port; -//} +package com.sf.vertx.api.pojo; + +import java.io.Serializable; + +import lombok.Data; + +@Data +public class ServerAddress implements Serializable { + private static final long serialVersionUID = 7446602403871080553L; + private String address; + private String protocol; // "http" + private String host; + private int port; + private String path; // 前缀 + +} diff --git a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/Strategy.java b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/Strategy.java index d11ed38..adeaee5 100644 --- a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/Strategy.java +++ b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/Strategy.java @@ -13,4 +13,15 @@ import lombok.Data; public class Strategy implements Serializable { private static final long serialVersionUID = -8831406773224882471L; // 限流熔断 + private String type;// CURRENT_LIMITING 限流策略, 熔断策略, CIRCUIT_BREAKER + private Integer threshold; // 2, // 限流阈值(APP总和) + private Integer timeWindow; //1, //时间窗口,单位s + private Integer recovery_interval; //熔断后的恢复时间间隔,单位s + private String defaultResponse; +// ": "{ +// \"msg\": \"接口繁忙请重试\", +// \"code\": 501, +// \"data\": \"到达限流阈值\", +// }" // 默认限流响应,JSON字符串。 + } diff --git a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/VertxConfig.java b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/VertxConfig.java index b71e6a7..9477b66 100644 --- a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/VertxConfig.java +++ b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/VertxConfig.java @@ -8,7 +8,10 @@ import lombok.Data; public class VertxConfig implements Serializable { private static final long serialVersionUID = -1706421732809219829L; private Integer port; // 启动端口 + private String appHeaderKey; + private String appHeaderServiceName; private VertxOptionsConfig vertxOptionsConfig; private HttpClientOptionsConfig httpClientOptionsConfig; // 配置Vert端口连接池 + private AddressRetryStrategy addressRetryStrategy; } diff --git a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/VertxOptionsConfig.java b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/VertxOptionsConfig.java index efe4dc5..dc9f92d 100644 --- a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/VertxOptionsConfig.java +++ b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/VertxOptionsConfig.java @@ -9,7 +9,7 @@ public class VertxOptionsConfig { private int eventLoopPoolSize; private int workerPoolSize ; private int internalBlockingPoolSize; - private long blockedThreadCheckInterval; + private long blockedThreadCheckInterval; // 3000000000 打印thread wait日志 private long maxEventLoopExecuteTime; private long maxWorkerExecuteTime; //private ClusterManager clusterManager; diff --git a/sf-vertx/src/main/java/com/sf/AdminApplication.java b/sf-vertx/src/main/java/com/sf/AdminApplication.java index d0a5333..dc7d51c 100644 --- a/sf-vertx/src/main/java/com/sf/AdminApplication.java +++ b/sf-vertx/src/main/java/com/sf/AdminApplication.java @@ -5,7 +5,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.context.ApplicationContext; -import io.vertx.core.http.impl.Http1xServerRequest; import lombok.extern.slf4j.Slf4j; /** diff --git a/sf-vertx/src/main/java/com/sf/vertx/constans/RedisKeyConfig.java b/sf-vertx/src/main/java/com/sf/vertx/constans/RedisKeyConfig.java index 1b486a0..8a5985f 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/constans/RedisKeyConfig.java +++ b/sf-vertx/src/main/java/com/sf/vertx/constans/RedisKeyConfig.java @@ -14,11 +14,15 @@ public class RedisKeyConfig { public static String APP_CURRENT_LIMITING_CONFIG_KEY = null; public static String APP_CONFIG_PREFIX_KEY = null; public static String VERTX_CONFIG_STRING_KEY = null; + public static String VERTX_ADDRESS_RETRY_STRATEGY_SET_KEY = null; + public static String VERTX_ADDRESS_RETRY_STRATEGY_KEY = null; public void init() { APP_CONFIG_PREFIX_KEY = BASE_REDIS_KEY + vertxEnvironment; APP_CONFIG_SET_KEY = APP_CONFIG_PREFIX_KEY + ":set"; - APP_CURRENT_LIMITING_CONFIG_KEY = APP_CONFIG_PREFIX_KEY + ":app"; + APP_CURRENT_LIMITING_CONFIG_KEY = APP_CONFIG_PREFIX_KEY + ":app:limiting"; VERTX_CONFIG_STRING_KEY = BASE_REDIS_KEY + vertxEnvironment + ":vertx"; + VERTX_ADDRESS_RETRY_STRATEGY_KEY = APP_CONFIG_PREFIX_KEY + ":addAddressRetryStrategy"; + VERTX_ADDRESS_RETRY_STRATEGY_SET_KEY = VERTX_ADDRESS_RETRY_STRATEGY_KEY + ":set"; } } diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/BodyHandlerImpl.java b/sf-vertx/src/main/java/com/sf/vertx/handle/BodyHandlerImpl.java index e670969..9133f41 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/BodyHandlerImpl.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/BodyHandlerImpl.java @@ -1,5 +1,15 @@ package com.sf.vertx.handle; +import java.io.File; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.lang3.StringUtils; + +import com.sf.vertx.service.impl.AppConfigServiceImpl; +import com.sf.vertx.utils.ProxyTool; + /* * Copyright 2014 Red Hat, Inc. * @@ -33,16 +43,6 @@ import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.impl.FileUploadImpl; import io.vertx.ext.web.impl.RoutingContextInternal; -import java.io.File; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.lang3.StringUtils; - -import com.sf.vertx.init.DynamicBuildServer; -import com.sf.vertx.service.impl.AppConfigServiceImpl; - /** * @author Tim Fox */ @@ -80,7 +80,7 @@ public class BodyHandlerImpl implements BodyHandler { // TODO 改造了这个地方 final HttpServerRequest request = context.request(); final HttpServerResponse response = context.response(); - String sacAppHeaderKey = request.getHeader(DynamicBuildServer.SAC_APP_HEADER_KEY); + String sacAppHeaderKey = request.getHeader(AppConfigServiceImpl.getSacAppHeaderKey()); if (StringUtils.isNotBlank(sacAppHeaderKey) && AppConfigServiceImpl.appDataSecurity(sacAppHeaderKey)) { // 加解密在proxy拦截器解析跳转 // =======源码流程 diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/RateLimitHandlerRedisImpl.java b/sf-vertx/src/main/java/com/sf/vertx/handle/RateLimitHandlerRedisImpl.java index f68827e..fd3fc13 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/RateLimitHandlerRedisImpl.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/RateLimitHandlerRedisImpl.java @@ -1,8 +1,8 @@ package com.sf.vertx.handle; import com.sf.vertx.api.pojo.AppCurrentLimitingConfig; -import com.sf.vertx.init.DynamicBuildServer; import com.sf.vertx.service.impl.AppConfigServiceImpl; +import com.sf.vertx.utils.ProxyTool; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.HttpException; @@ -26,7 +26,7 @@ public class RateLimitHandlerRedisImpl implements RateLimitHandler { @Override public void handle(RoutingContext rc) { - String sacAppHeaderKey = rc.request().headers().get(DynamicBuildServer.SAC_APP_HEADER_KEY); + String sacAppHeaderKey = rc.request().headers().get(AppConfigServiceImpl.getSacAppHeaderKey()); // TODO 判断是否开启限流配置 log.info("RateLimitHandlerRedisImpl request:{}", sacAppHeaderKey); // rc.next(); diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/RedisRateLimiter.java b/sf-vertx/src/main/java/com/sf/vertx/handle/RedisRateLimiter.java index 65998c4..b581a88 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/RedisRateLimiter.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/RedisRateLimiter.java @@ -7,7 +7,6 @@ import org.springframework.data.redis.core.RedisTemplate; import com.sf.vertx.api.pojo.ApiCurrentLimitingConfig; import com.sf.vertx.api.pojo.AppCurrentLimitingConfig; import com.sf.vertx.constans.RedisKeyConfig; -import com.sf.vertx.init.DynamicBuildServer; import com.sf.vertx.service.impl.AppConfigServiceImpl; import com.sf.vertx.utils.SpringUtils; @@ -18,19 +17,23 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class RedisRateLimiter { public Boolean acquire(RoutingContext rc, int pattern) { - String appCode = rc.request().headers().get(DynamicBuildServer.SAC_APP_HEADER_KEY); + String appCode = rc.request().headers().get(AppConfigServiceImpl.getSacAppHeaderKey()); // TODO 先测试app模式,后面通过app缓存获取模式 String key = null; switch (pattern) { case 1: - AppCurrentLimitingConfig appCurrentLimitingConfig = AppConfigServiceImpl.getAppCurrentLimitingConfig(appCode); + AppCurrentLimitingConfig appCurrentLimitingConfig = AppConfigServiceImpl + .getAppCurrentLimitingConfig(appCode); key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCode; - return rateLimiter(key, appCode, appCurrentLimitingConfig.getThreshold(), appCurrentLimitingConfig.getTimeWindow()); + return rateLimiter(key, appCode, appCurrentLimitingConfig.getThreshold(), + appCurrentLimitingConfig.getTimeWindow()); case 2: - ApiCurrentLimitingConfig apiCurrentLimitingConfig = AppConfigServiceImpl.getApiCurrentLimitingConfig(appCode); + ApiCurrentLimitingConfig apiCurrentLimitingConfig = AppConfigServiceImpl + .getApiCurrentLimitingConfig(appCode); key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCode + ":" + rc.request().uri() + ":" + rc.request().method(); - return rateLimiter(key, appCode, apiCurrentLimitingConfig.getThreshold(), apiCurrentLimitingConfig.getTimeWindow()); + return rateLimiter(key, appCode, apiCurrentLimitingConfig.getThreshold(), + apiCurrentLimitingConfig.getTimeWindow()); default: break; } @@ -44,29 +47,18 @@ public class RedisRateLimiter { Object count = redisTemplate.opsForValue().get(key); // redisTemplate.delete(key); log.info("count:{}", count); - if (count == null) { - // 初始化,设置过期时间 - ThreadUtil.execAsync(() -> { - add(timeWindow, redisTemplate, key); - }); - } else if (Integer.valueOf(count.toString()) < threshold) { - ThreadUtil.execAsync(() -> { - increment(redisTemplate, key); - }); - } else { - return false; + // 初始化,设置过期时间 + ThreadUtil.execAsync(() -> { + add(timeWindow, redisTemplate, key); + }); + return count != null && Integer.valueOf(count.toString()) <= threshold ? true : false; + } + + private void add(Integer timeWindow, RedisTemplate redisTemplate, String key) { + // log.info("redis app threshold: {}", redisTemplate.opsForValue().get(key)); + Long incr = redisTemplate.opsForValue().increment(key); + if (incr == 1) { // 创建,才设置时间窗口 + redisTemplate.expire(key, timeWindow, TimeUnit.SECONDS); } - return true; - } - - private void add(Integer timeWindow, RedisTemplate redisTemplate, - String key) { - redisTemplate.opsForValue().increment(key); - log.info("redis app threshold: {}", redisTemplate.opsForValue().get(key)); - redisTemplate.expire(key, timeWindow, TimeUnit.SECONDS); - } - - private void increment(RedisTemplate redisTemplate, String key) { - redisTemplate.opsForValue().increment(key); } } diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/RestfulFailureHandlerImpl.java b/sf-vertx/src/main/java/com/sf/vertx/handle/RestfulFailureHandlerImpl.java index 25f76b5..f029e99 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/RestfulFailureHandlerImpl.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/RestfulFailureHandlerImpl.java @@ -3,12 +3,14 @@ package com.sf.vertx.handle; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.HttpException; - +import lombok.extern.slf4j.Slf4j; +@Slf4j public class RestfulFailureHandlerImpl implements RestfulFailureHandler { @Override public void handle(RoutingContext frc) { Throwable failure = frc.failure(); + log.info("Throwable error:{}", failure); if (failure instanceof HttpException) { HttpException httpException = (HttpException) failure; // frc.response().setStatusCode(404).end(); @@ -17,7 +19,9 @@ public class RestfulFailureHandlerImpl implements RestfulFailureHandler { .putHeader("Content-Type", "application/json").end(dataJson.toBuffer()); return; } - frc.response().setStatusCode(500).setStatusMessage("Server internal error:" + failure.getMessage()).end(); + frc.response().setChunked(true).setStatusCode(400) + .putHeader("Content-Type", "application/json").end("{\n" + " \"msg\": \"接口繁忙请重试\",\n" + " \"code\": 501,\n" + " \"data\": \"到达限流阈值\"\n" + "}"); + return; } } diff --git a/sf-vertx/src/main/java/com/sf/vertx/init/DynamicBuildServer.java b/sf-vertx/src/main/java/com/sf/vertx/init/DynamicBuildServer.java index b2dedd6..a0a5a00 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/init/DynamicBuildServer.java +++ b/sf-vertx/src/main/java/com/sf/vertx/init/DynamicBuildServer.java @@ -1,8 +1,6 @@ package com.sf.vertx.init; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.regex.Pattern; +import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -10,36 +8,31 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.annotation.Order; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; -import com.sf.vertx.api.pojo.AppConfig; -import com.sf.vertx.api.pojo.GatewayInterface; -import com.sf.vertx.api.pojo.Node; -import com.sf.vertx.api.pojo.SacService; import com.sf.vertx.api.pojo.VertxConfig; -import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing; -import com.sf.vertx.arithmetic.roundRobin.WeightedRoundRobin; import com.sf.vertx.constans.RedisKeyConfig; import com.sf.vertx.handle.BodyHandler; import com.sf.vertx.handle.ProxyHandler; import com.sf.vertx.handle.RateLimitHandler; import com.sf.vertx.handle.RestfulFailureHandler; -import com.sf.vertx.security.MainSecurity; import com.sf.vertx.service.AppConfigService; import com.sf.vertx.service.impl.AppConfigServiceImpl; +import com.sf.vertx.utils.Filter; +import com.sf.vertx.utils.ProxyTool; +import com.sf.vertx.utils.SpringUtils; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerOptions; -import io.vertx.core.http.HttpServerRequest; -import io.vertx.core.json.JsonObject; -import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.Router; import io.vertx.ext.web.client.WebClient; -import io.vertx.ext.web.handler.HttpException; +import io.vertx.httpproxy.Body; import io.vertx.httpproxy.HttpProxy; import io.vertx.httpproxy.ProxyContext; import io.vertx.httpproxy.ProxyInterceptor; @@ -56,11 +49,12 @@ import lombok.extern.slf4j.Slf4j; @Order(value = 10) @Component public class DynamicBuildServer implements ApplicationRunner { - public static final String SAC_APP_HEADER_KEY = "sacAppCode"; - private static ConcurrentHashMap SAC_LOADBALANCING_MAP = new ConcurrentHashMap(); @Value("${server.vertx.server.default.port}") private Integer serverDefaultPort; + @Autowired + private RedisTemplate redisTemplate; + @Autowired private AppConfigService appConfigService; @@ -71,6 +65,10 @@ public class DynamicBuildServer implements ApplicationRunner { public void run(ApplicationArguments args) throws Exception { // 初始化redis key redisKeyConfig.init(); + // 从redis同步app配置 + appConfigService.loadAppConfig(); + // 从redis同步vertx配置 + appConfigService.loadVertxConfig(); // 加载vertx、应用配置 appStartLoadData(); } @@ -79,14 +77,26 @@ public class DynamicBuildServer implements ApplicationRunner { * 应用启动, 从redis读取配置,初始化vertx服务 */ private void appStartLoadData() { + VertxConfig vertxConfig = AppConfigServiceImpl.getVertxConfig(); // TODO 编解码线程池,后面优化协程等方式 - Vertx VERTX = Vertx.vertx(new VertxOptions().setWorkerPoolSize(20)); + VertxOptions vertxOptions = new VertxOptions(); + long blockedThreadCheckInterval = vertxConfig == null || vertxConfig.getVertxOptionsConfig() == null ? -1 + : vertxConfig.getVertxOptionsConfig().getBlockedThreadCheckInterval(); + int workerPoolSize = vertxConfig == null || vertxConfig.getVertxOptionsConfig() == null ? -1 + : vertxConfig.getVertxOptionsConfig().getWorkerPoolSize(); + if (workerPoolSize != -1) { + vertxOptions.setWorkerPoolSize(workerPoolSize); + } + + if (blockedThreadCheckInterval != -1) { + vertxOptions.setBlockedThreadCheckInterval(blockedThreadCheckInterval); // 不打印Thread blocked 阻塞日志 + } + Vertx VERTX = Vertx.vertx(vertxOptions); // 创建HTTP监听 // 所有ip都能访问 HttpServerOptions httpServerOptions = new HttpServerOptions().setHost("0.0.0.0"); HttpServer server = VERTX.createHttpServer(httpServerOptions); Router mainHttpRouter = Router.router(VERTX); - VertxConfig vertxConfig = appConfigService.loadVertxConfig(); Integer serverPort = (vertxConfig == null || vertxConfig.getPort() == null) ? serverDefaultPort : vertxConfig.getPort(); log.info("serverPort:{}", serverPort); @@ -98,30 +108,38 @@ public class DynamicBuildServer implements ApplicationRunner { } }); - ConcurrentHashMap cacheAppConfig = appConfigService.loadAllConfig(); // HttpClientOptions clientOptions = new HttpClientOptions(); // clientOptions.setMaxPoolSize(20); // 最大连接池大小 -// clientOptions.setConnectTimeout(5000); // 连接超时 毫秒 +// clientOptions.setConnectTimeout(2000); // 连接超时 毫秒 // clientOptions.setHttp2KeepAliveTimeout(1); // clientOptions.setIdleTimeout(1000); // 连接空闲超时 毫秒 // HttpClient proxyClient = VERTX.createHttpClient(clientOptions); HttpClient proxyClient = VERTX.createHttpClient(); HttpProxy proxy = HttpProxy.reverseProxy(proxyClient); - proxy.originSelector(request -> Future.succeededFuture(resolveOriginAddress(cacheAppConfig, request))); + proxy.originSelector(request -> Future.succeededFuture(ProxyTool.resolveOriginAddress(request))); proxy.addInterceptor(new ProxyInterceptor() { @Override public Future handleProxyRequest(ProxyContext context) { // 修改uri context.request().setURI(); - String sacAppHeaderKey = context.request().headers().get(DynamicBuildServer.SAC_APP_HEADER_KEY); + String sacAppHeaderKey = context.request().headers().get(AppConfigServiceImpl.getSacAppHeaderKey()); log.info("addInterceptor uri appCode:{}", sacAppHeaderKey); // 判断是否需要加解析 if (AppConfigServiceImpl.appDataSecurity(sacAppHeaderKey)) { - // String data = decode(null, sacAppHeaderKey); - } return context.sendRequest(); } + + @Override + public Future handleProxyResponse(ProxyContext context) { + // 调试代码,获取reponse body + Filter filter = new Filter(); + ProxyResponse proxyResponse = context.response(); + Body body = proxyResponse.getBody(); + proxyResponse.setBody(Body.body(filter.init(context.request().getURI(), body.stream(), false))); + // 继续拦截链 + return context.sendResponse(); + } }); WebClient mainWebClient = WebClient.create(VERTX); // mainHttpRouter.route().handler(ProxyHandler.create(proxy)); @@ -132,146 +150,28 @@ public class DynamicBuildServer implements ApplicationRunner { int pattern = 2;// 1:app,2:接口默认,3:服务接口配置限流策略 mainHttpRouter.route().handler(RateLimitHandler.create(instance, pattern)).handler(BodyHandler.create()) .handler(ProxyHandler.create(mainWebClient, proxy)).failureHandler(RestfulFailureHandler.create()); - //mainHttpRouter.route().handler(BodyHandler.create()).handler(ProxyHandler.create(mainWebClient,proxy)); - } + // mainHttpRouter.route().handler(BodyHandler.create()).handler(ProxyHandler.create(mainWebClient,proxy)); - public SocketAddress resolveOriginAddress(ConcurrentHashMap cacheAppConfig, - HttpServerRequest request) { - String appCode = request.getHeader(SAC_APP_HEADER_KEY); - log.info("uri appCode:{}", appCode); - // TODO 不存在, 抛异常给前端 + // 服务健康检测重试 + Integer periodicTime = AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy() != null + && AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy().getPeriodicTime() > 0 + ? AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy().getPeriodicTime() + : 3; - AppConfig appConfig = cacheAppConfig.get(appCode); - if (appConfig != null) { - Integer environmentId = null; - SacLoadBalancing sacLoadBalancing = null; - // 2、是否存在server服务配置 - if (appConfig.getService() != null && appConfig.getService().size() > 0) { - // header传递服务名, 这样就不需要遍历,提高性能 - for (SacService service : appConfig.getService()) { - // uri是否匹配 - boolean match = false; - for (GatewayInterface gatewayInterface : service.getUriList()) { - String domain = request.authority().port() == -1 ? request.authority().host() - : request.authority().host() + ":" + request.authority().port(); - // uri匹配(正则或全量匹配) - match = gatewayInterface.isUriRegular() ? regexMatch(gatewayInterface.getUri(), request.uri()) - : StringUtils.equals(gatewayInterface.getUri(), request.uri()); - match = match ? StringUtils.equals(gatewayInterface.getMethod(), request.method().name()) - : false; - // domain匹配 - if (match) { - boolean domainVertify = service.getRouter() != null - && StringUtils.isNotBlank(service.getRouter().getDomain()) - && StringUtils.equals(service.getRouter().getDomain(), domain) ? true : false; - if (domainVertify) { - environmentId = service.getRouter().getEnvironmentId(); - } else { - environmentId = appConfig.getEnvironmentConfig().getDefaultId(); - } - break; - } - } - } - } else { - environmentId = appConfig.getEnvironmentConfig().getDefaultId(); - } - // 初始化负载均衡 - if (SAC_LOADBALANCING_MAP.get(environmentId) != null) { - sacLoadBalancing = SAC_LOADBALANCING_MAP.get(environmentId); - } else { - // 并发影响不大, 只要初始化成功一次即可 - List nodeList = appConfig.getEnvironmentConfig().getEnvironmentGroup().get(environmentId); - if (nodeList != null && nodeList.size() > 0) { - // 初始化负载均衡算法 - sacLoadBalancing = roundRobin(nodeList); - SAC_LOADBALANCING_MAP.put(environmentId, sacLoadBalancing); - } else { - // TODO 抛出异常 + // TODO 是否开启健康检测 + long timerID = VERTX.setPeriodic(periodicTime, id -> { + Set set = redisTemplate.opsForZSet().range(RedisKeyConfig.APP_CONFIG_SET_KEY, 0, -1); + for (String appCode : set) { + Set setAddressRetryStrategy = redisTemplate.opsForZSet() + .range(RedisKeyConfig.VERTX_ADDRESS_RETRY_STRATEGY_SET_KEY + ":" + appCode, 0, -1); + for (String address : setAddressRetryStrategy) { + // 发起请求,测试服务是否可用 + // TODO 调用后端配置的健康检测地址 } + } + }); - // TODO 区分https、http - Node node = sacLoadBalancing.selectNode(); - SocketAddress socketAddress = SocketAddress.inetSocketAddress(node.getPort(), node.getIp()); - log.info("负载均衡跳转地址:{}", socketAddress.host() + ";" + socketAddress.port()); - return socketAddress; - } - // TODO 如果没找到, 如何处理 - return null; } -// private String getUriAppCode(String uri) { -// // 1、判断appCode -// int count = 0; -// int appCodeLen = 16; -// StringBuffer appCode = new StringBuffer(); -// for (int i = 0; i < uri.length(); i++) { -// // 限制长度 -// if (appCode.length() == appCodeLen) { -// break; -// } -// char ch = uri.charAt(i); -// if (ch == '/') { -// if (++count == 2) { -// break; -// } -// } else { -// appCode.append(ch); -// } -// } -// return appCode.toString(); -// } - - private static boolean regexMatch(String pattern, String target) { - return Pattern.matches(pattern, target); - } - - /*** - * 解密 - */ - private static String decode(AppConfig appConfig, String bodyJson) { - String algorithm = appConfig.getDataSecurity().getAlgorithm(); - String data = null; - switch (algorithm) { - case "aes": - // 解密 - String key = appConfig.getDataSecurity().getKey(); - data = MainSecurity.aesDecrypt(bodyJson, key); - break; - default: - break; - } - return data; - } - - /*** - * 加密 - */ - private static String encryption(AppConfig appConfig, String responseData) { - String algorithm = appConfig.getDataSecurity().getAlgorithm(); - String data = null; - switch (algorithm) { - case "aes": - String key = appConfig.getDataSecurity().getKey(); - data = MainSecurity.aesEncrypt(responseData.toString(), key); - break; - default: - break; - } - return data; - } - - private static SacLoadBalancing roundRobin(List nodeList) { - WeightedRoundRobin weightedRoundRobin = new WeightedRoundRobin(); - for (Node node : nodeList) { - int weight = node.getWeight() != null ? node.getWeight() : 1; - node.setWeight(weight); - node.setCurrentWeight(weight); - node.setEffectiveWeight(weight); - WeightedRoundRobin.totalWeight += node.getEffectiveWeight(); - } - weightedRoundRobin.init(nodeList); - return weightedRoundRobin; - } } diff --git a/sf-vertx/src/main/java/com/sf/vertx/service/AppConfigService.java b/sf-vertx/src/main/java/com/sf/vertx/service/AppConfigService.java index 9b30758..af4a9d9 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/service/AppConfigService.java +++ b/sf-vertx/src/main/java/com/sf/vertx/service/AppConfigService.java @@ -1,18 +1,18 @@ package com.sf.vertx.service; -import java.util.concurrent.ConcurrentHashMap; - import com.sf.vertx.api.pojo.AppConfig; import com.sf.vertx.api.pojo.VertxConfig; +import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing; public interface AppConfigService { - ConcurrentHashMap loadAllConfig(); + + void loadAppConfig() throws Exception; void addAppConfig(String appConfig); - + void deleteAppConfig(AppConfig appConfig); - - VertxConfig loadVertxConfig(); - + + void loadVertxConfig(); + void addVertxConfig(VertxConfig vertxConfig); } diff --git a/sf-vertx/src/main/java/com/sf/vertx/service/impl/AppConfigServiceImpl.java b/sf-vertx/src/main/java/com/sf/vertx/service/impl/AppConfigServiceImpl.java index e9c8a19..e21b512 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/service/impl/AppConfigServiceImpl.java +++ b/sf-vertx/src/main/java/com/sf/vertx/service/impl/AppConfigServiceImpl.java @@ -1,7 +1,10 @@ package com.sf.vertx.service.impl; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -15,9 +18,15 @@ import com.alibaba.fastjson2.TypeReference; import com.sf.vertx.api.pojo.ApiCurrentLimitingConfig; import com.sf.vertx.api.pojo.AppConfig; import com.sf.vertx.api.pojo.AppCurrentLimitingConfig; +import com.sf.vertx.api.pojo.Node; +import com.sf.vertx.api.pojo.RouteContent; +import com.sf.vertx.api.pojo.SacService; import com.sf.vertx.api.pojo.VertxConfig; +import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing; import com.sf.vertx.constans.RedisKeyConfig; import com.sf.vertx.service.AppConfigService; +import com.sf.vertx.utils.ProxyTool; +import com.sf.vertx.utils.SpringUtils; import lombok.extern.slf4j.Slf4j; @@ -28,9 +37,36 @@ public class AppConfigServiceImpl implements AppConfigService { private String vertxEnvironment; @Autowired private RedisTemplate redisTemplate; + private static VertxConfig VERTX_CONFIG = new VertxConfig(); private static final ConcurrentHashMap CACHE_APP_CONFIG = new ConcurrentHashMap<>(); private static final ConcurrentHashMap CACHE_APP_CURRENT_LIMITING_CONFIG = new ConcurrentHashMap<>(); private static final ConcurrentHashMap CACHE_API_CURRENT_LIMITING_CONFIG = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap CACHE_APP_SERVICE = new ConcurrentHashMap<>(); + private static ConcurrentHashMap SAC_LOADBALANCING_MAP = new ConcurrentHashMap(); + + @SuppressWarnings("unchecked") + public static void addAddressRetryStrategy(String address, String appCode) { + String setKey = RedisKeyConfig.VERTX_ADDRESS_RETRY_STRATEGY_SET_KEY + ":" + appCode; + String key = RedisKeyConfig.VERTX_ADDRESS_RETRY_STRATEGY_KEY + ":" + appCode + ":" + address; + RedisTemplate redisTemplate = SpringUtils.getBean("redisTemplate", RedisTemplate.class); + Long thresholdCount = redisTemplate.opsForValue().increment(key); + // log.info("redis app threshold: {}", redisTemplate.opsForValue().get(key)); + Integer timeWindow = VERTX_CONFIG.getAddressRetryStrategy() != null + && VERTX_CONFIG.getAddressRetryStrategy().getTimeWindow() > 0 + ? VERTX_CONFIG.getAddressRetryStrategy().getTimeWindow() + : 20; + if(thresholdCount == 1) { // 创建,才设置时间窗口 + redisTemplate.expire(key, timeWindow, TimeUnit.SECONDS); + } + Integer threshold = AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy() != null + && AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy().getThreshold() > 0 + ? AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy().getThreshold() + : 3; + if(thresholdCount > threshold) { + // 设置服务不可用 + redisTemplate.opsForZSet().add(setKey, appCode + ":" + address, 0); + } + } public static boolean appDataSecurity(String appCode) { return CACHE_APP_CONFIG.get(appCode) != null && CACHE_APP_CONFIG.get(appCode).getDataSecurity() != null ? true @@ -40,15 +76,45 @@ public class AppConfigServiceImpl implements AppConfigService { public static AppCurrentLimitingConfig getAppCurrentLimitingConfig(String appCode) { return CACHE_APP_CURRENT_LIMITING_CONFIG.get(appCode); } - + public static ApiCurrentLimitingConfig getApiCurrentLimitingConfig(String appCode) { return CACHE_API_CURRENT_LIMITING_CONFIG.get(appCode); } + public static AppConfig getAppConfig(String appCode) { + return CACHE_APP_CONFIG.get(appCode); + } + + public static VertxConfig getVertxConfig() { + return VERTX_CONFIG; + } + + public static String getSacAppHeaderKey() { + return VERTX_CONFIG.getAppHeaderKey() != null ? VERTX_CONFIG.getAppHeaderKey() : "sacAppCode"; + } + + public static String getAppHeaderServiceName() { + return VERTX_CONFIG.getAppHeaderServiceName() != null ? VERTX_CONFIG.getAppHeaderServiceName() + : "sacAppServiceName"; + } + + /*** + * 加载vertx配置 + */ + public void loadVertxConfig() { + String vertxConfigKey = RedisKeyConfig.VERTX_CONFIG_STRING_KEY; + String vertxConfigValue = redisTemplate.opsForValue().get(vertxConfigKey); + if (StringUtils.isNotBlank(vertxConfigValue)) { + VERTX_CONFIG = JSONObject.parseObject(vertxConfigValue, VertxConfig.class); + } + } + /*** * 从redis加载数据 + * + * @throws Exception */ - public ConcurrentHashMap loadAllConfig() { + public void loadAppConfig() throws Exception { Set set = redisTemplate.opsForZSet().range(RedisKeyConfig.APP_CONFIG_SET_KEY, 0, -1); for (String appCode : set) { String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appCode; @@ -58,39 +124,56 @@ public class AppConfigServiceImpl implements AppConfigService { }); CACHE_APP_CONFIG.put(appCode, appConfig); + // app、api默认限流 + CACHE_API_CURRENT_LIMITING_CONFIG.put(appCode, appConfig.getApiCurrentLimitingConfig()); + CACHE_APP_CURRENT_LIMITING_CONFIG.put(appCode, appConfig.getAppCurrentLimitingConfig()); + + // app router负载均衡 + for (SacService sacService : appConfig.getService()) { + CACHE_APP_SERVICE.put(appCode + ";" + sacService.getServiceName(), sacService); + List nodeList = new ArrayList<>(); + // 获取service模式 + if (StringUtils.equals(sacService.getServiceModel(), "NORMAL")) { + Node node = new Node(); + node.setIp(sacService.getServerAddress().getHost()); + node.setPort(sacService.getServerAddress().getPort()); + node.setWeight(0); + node.setProtocol(sacService.getServerAddress().getProtocol()); + nodeList.add(node); + } else if (StringUtils.equals(sacService.getServiceModel(), "ROUTE")) { + if (sacService.getRouteConfig() != null + && StringUtils.equals(sacService.getRouteConfig().getRouteType(), "WEIGHT_ROUTE")) { + for (RouteContent routeContent : sacService.getRouteConfig().getRouteContent()) { + Node node = new Node(); + node.setIp(routeContent.getServerAddress().getHost()); + node.setPort(routeContent.getServerAddress().getPort()); + node.setWeight(routeContent.getWeight() != null && routeContent.getWeight() > 0 + ? routeContent.getWeight() + : 0); + node.setProtocol(sacService.getServerAddress().getProtocol()); + nodeList.add(node); + } + } + } + + if (nodeList.size() > 0) { + // 初始化负载均衡算法 + String key = appCode + ";" + sacService.getServiceName(); + SacLoadBalancing sacLoadBalancing = ProxyTool.roundRobin(nodeList); + SAC_LOADBALANCING_MAP.put(key, sacLoadBalancing); + } + + } } } - - // TODO 限流 - ApiCurrentLimitingConfig apiCurrentLimitingConfig = new ApiCurrentLimitingConfig(); - apiCurrentLimitingConfig.setThreshold(1); - apiCurrentLimitingConfig.setTimeWindow(10); - apiCurrentLimitingConfig.setDefaultResponse( - "{\n" + " \"msg\": \"接口繁忙请重试\",\n" + " \"code\": 501,\n" + " \"data\": \"到达限流阈值\"\n" + "}"); - CACHE_API_CURRENT_LIMITING_CONFIG.put("dsafdsfadafhappd", apiCurrentLimitingConfig); - CACHE_API_CURRENT_LIMITING_CONFIG.put("dsafdsfadafhappC", apiCurrentLimitingConfig); - - AppCurrentLimitingConfig appCurrentLimitingConfig = new AppCurrentLimitingConfig(); - appCurrentLimitingConfig.setThreshold(1); - appCurrentLimitingConfig.setTimeWindow(10); - appCurrentLimitingConfig.setDefaultResponse( - "{\n" + " \"msg\": \"接口繁忙请重试\",\n" + " \"code\": 501,\n" + " \"data\": \"到达限流阈值\"\n" + "}"); - CACHE_APP_CURRENT_LIMITING_CONFIG.put("dsafdsfadafhappd", appCurrentLimitingConfig); - CACHE_APP_CURRENT_LIMITING_CONFIG.put("dsafdsfadafhappC", appCurrentLimitingConfig); - - log.info("cacheAppConfig:{}", JSON.toJSONString(CACHE_APP_CONFIG)); - - return CACHE_APP_CONFIG; } - public AppConfig getAppConfig(String appCode) { - String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appCode; - String appCodeValue = redisTemplate.opsForValue().get(appCodeKey); - if (StringUtils.isNotBlank(appCodeValue)) { - AppConfig appConfig = JSONObject.parseObject(appCodeValue, AppConfig.class); - return appConfig; - } - return null; + public static SacLoadBalancing getSacLoadBalancing(String appCode, String serviceName) { + return SAC_LOADBALANCING_MAP.get(appCode + ";" + serviceName); + } + + public static SacService getSacService(String appCode, String serviceName) { + return CACHE_APP_SERVICE.get(appCode + ";" + serviceName); } /*** @@ -123,19 +206,6 @@ public class AppConfigServiceImpl implements AppConfigService { // String queue = RedisKeyConfig.VETX_ENVIRONMENT_KEY+vertxEnvironment+":list"; } - /*** - * 加载vertx配置 - */ - public VertxConfig loadVertxConfig() { - String vertxConfigKey = RedisKeyConfig.VERTX_CONFIG_STRING_KEY; - String vertxConfigValue = redisTemplate.opsForValue().get(vertxConfigKey); - if (StringUtils.isNotBlank(vertxConfigValue)) { - VertxConfig vertxConfig = JSONObject.parseObject(vertxConfigValue, VertxConfig.class); - return vertxConfig; - } - return null; - } - /*** * 新增、修改 * diff --git a/sf-vertx/src/main/java/com/sf/vertx/utils/Filter.java b/sf-vertx/src/main/java/com/sf/vertx/utils/Filter.java new file mode 100644 index 0000000..638c1da --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/utils/Filter.java @@ -0,0 +1,91 @@ +package com.sf.vertx.utils; + +import java.util.concurrent.atomic.AtomicBoolean; + +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.ReadStream; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class Filter implements ReadStream { + + private final AtomicBoolean paused = new AtomicBoolean(); + private ReadStream stream; + private Buffer expected = Buffer.buffer(); + private Handler dataHandler; + private Handler exceptionHandler; + private Handler endHandler; + + /*** + * + * @param s + * @param encryption true: 请求参数解密, false: 返回数据 加密 + * @return + */ + public ReadStream init(String uri, ReadStream s, Boolean encryption) { + stream = s; + stream.handler(buff -> { + if (dataHandler != null) { + byte[] bytes = new byte[buff.length()]; + for (int i = 0; i < bytes.length; i++) { + // bytes[i] = (byte) (('a' - 'A') + buff.getByte(i)); + bytes[i] = (byte) (buff.getByte(i)); + } + + String res = new String(bytes); + log.info("request uri:{}, return data:{}", uri, res); + expected.appendBytes(bytes); + dataHandler.handle(Buffer.buffer(bytes)); + } + }); + stream.exceptionHandler(err -> { + if (exceptionHandler != null) { + exceptionHandler.handle(err); + } + }); + stream.endHandler(v -> { + if (endHandler != null) { + endHandler.handle(v); + } + }); + return this; + } + + @Override + public ReadStream pause() { + paused.set(true); + stream.pause(); + return this; + } + + @Override + public ReadStream resume() { + stream.resume(); + return this; + } + + @Override + public ReadStream fetch(long amount) { + stream.fetch(amount); + return this; + } + + @Override + public ReadStream exceptionHandler(Handler handler) { + exceptionHandler = handler; + return this; + } + + @Override + public ReadStream handler(Handler handler) { + dataHandler = handler; + return this; + } + + @Override + public ReadStream endHandler(Handler handler) { + endHandler = handler; + return this; + } +} \ No newline at end of file diff --git a/sf-vertx/src/main/java/com/sf/vertx/utils/ProxyTool.java b/sf-vertx/src/main/java/com/sf/vertx/utils/ProxyTool.java new file mode 100644 index 0000000..2770733 --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/utils/ProxyTool.java @@ -0,0 +1,123 @@ +package com.sf.vertx.utils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.StringUtils; + +import com.sf.vertx.api.pojo.AppConfig; +import com.sf.vertx.api.pojo.Node; +import com.sf.vertx.api.pojo.RouteContent; +import com.sf.vertx.api.pojo.SacService; +import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing; +import com.sf.vertx.arithmetic.roundRobin.WeightedRoundRobin; +import com.sf.vertx.service.AppConfigService; +import com.sf.vertx.service.impl.AppConfigServiceImpl; + +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.web.handler.HttpException; +import lombok.extern.slf4j.Slf4j; + +/*** + * 反向代理工具类 + * + * @author xy + * + */ +@Slf4j +public class ProxyTool { + public final static Map _ERROR = new HashMap<>(); + static { + _ERROR.put(400, "Bad Request"); + _ERROR.put(401, "Unauthorized"); + _ERROR.put(403, "Forbidden"); + _ERROR.put(404, "Not Found"); + _ERROR.put(413, "Request Entity Too Large"); + _ERROR.put(415, "Unsupported Media Type"); + _ERROR.put(500, "Internal Server Error"); + _ERROR.put(502, "Bad Gateway"); + _ERROR.put(503, "Service Unavailable"); + _ERROR.put(504, "Gateway Timeout"); + _ERROR.put(504, "Gateway Timeout"); + _ERROR.put(10000, "无法找到路由地址"); + _ERROR.put(10001, "加解密算法传递错误"); + }; + + public static SocketAddress resolveOriginAddress(HttpServerRequest request) { + String appCode = request.getHeader(AppConfigServiceImpl.getSacAppHeaderKey()); + String appHeaderServiceName = request.getHeader(AppConfigServiceImpl.getAppHeaderServiceName()); + log.info("uri:{}, header appCode:{},appHeaderServiceName:{}", request.uri(), appCode, appHeaderServiceName); + AppConfig appConfig = AppConfigServiceImpl.getAppConfig(appCode); + if (appConfig != null) { + SacService sacService = AppConfigServiceImpl.getSacService(appCode, appHeaderServiceName); + if (sacService != null) { + SacLoadBalancing sacLoadBalancing = null; + // 获取service模式 + if (StringUtils.equals(sacService.getServiceModel(), "NORMAL") + || StringUtils.equals(sacService.getServiceModel(), "ROUTE")) { + sacLoadBalancing = AppConfigServiceImpl.getSacLoadBalancing(appCode, appHeaderServiceName); + } else if (sacService.getRouteConfig() != null + && StringUtils.equals(sacService.getRouteConfig().getRouteType(), "HEADER_ROUTE")) { + List nodeList = new ArrayList<>(); + for (RouteContent routeContent : sacService.getRouteConfig().getRouteContent()) { + // 判断是否uri匹配 + String headerRouteKey = request.getHeader(routeContent.getHeaderKey()); + if (routeContent.getHeaderValues() != null + && routeContent.getHeaderValues().contains(headerRouteKey)) { + Node node = new Node(); + node.setIp(routeContent.getServerAddress().getHost()); + node.setPort(routeContent.getServerAddress().getPort()); + node.setWeight(0); + node.setProtocol(sacService.getServerAddress().getProtocol()); + nodeList.add(node); + break; + } + } + + if (nodeList.size() > 0) { + sacLoadBalancing = ProxyTool.roundRobin(nodeList); + } + } + + if (sacLoadBalancing == null) { + log.error("app config error. appCode:{},serviceName:{},RouteType:{}, not find config.", appCode, + appHeaderServiceName, sacService.getRouteConfig().getRouteType()); + throw new HttpException(10000, _ERROR.get(10000)); + } + // TODO 区分https、http + Node node = sacLoadBalancing.selectNode(); + SocketAddress socketAddress = SocketAddress.inetSocketAddress(node.getPort(), node.getIp()); + log.info("sacLoadBalancing address:{},port:{}", socketAddress.host(), socketAddress.port()); + return socketAddress; + } else { + log.error("app config error. appCode:{},serviceName:{}, appCode, serviceName not find config.", appCode, + appHeaderServiceName); + throw new HttpException(10000, _ERROR.get(10000)); + } + } + log.error("app config error. appCode:{},serviceName:{}, appCode, serviceName not find config.", appCode, + appHeaderServiceName); + throw new HttpException(10000, _ERROR.get(10000)); + } + + private static boolean regexMatch(String pattern, String target) { + return Pattern.matches(pattern, target); + } + + public static SacLoadBalancing roundRobin(List nodeList) { + WeightedRoundRobin weightedRoundRobin = new WeightedRoundRobin(); + for (Node node : nodeList) { + int weight = node.getWeight() != null ? node.getWeight() : 1; + node.setWeight(weight); + node.setCurrentWeight(weight); + node.setEffectiveWeight(weight); + WeightedRoundRobin.totalWeight += node.getEffectiveWeight(); + } + weightedRoundRobin.init(nodeList); + return weightedRoundRobin; + } +} diff --git a/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java b/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java index b39373b..30c6708 100644 --- a/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java +++ b/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java @@ -10,6 +10,7 @@ */ package io.vertx.httpproxy.impl; +import java.net.ConnectException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -19,10 +20,13 @@ import java.util.function.BiFunction; import org.apache.commons.lang3.StringUtils; -import com.sf.vertx.init.DynamicBuildServer; +import com.sf.vertx.api.pojo.DataSecurity; +import com.sf.vertx.api.pojo.VertxConfig; import com.sf.vertx.security.MainSecurity; import com.sf.vertx.service.impl.AppConfigServiceImpl; +import com.sf.vertx.utils.ProxyTool; +import cn.hutool.core.thread.ThreadUtil; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; @@ -35,8 +39,10 @@ import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; import io.vertx.core.json.JsonObject; import io.vertx.core.net.NetSocket; +import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.handler.HttpException; import io.vertx.httpproxy.Body; import io.vertx.httpproxy.HttpProxy; import io.vertx.httpproxy.ProxyContext; @@ -163,9 +169,30 @@ public class ReverseProxy implements HttpProxy { }); } + /*** + * 返回错误 + * + * @param proxyRequest + * @param sc + */ private void end(ProxyRequest proxyRequest, int sc) { - proxyRequest.response().release().setStatusCode(sc).putHeader(HttpHeaders.CONTENT_LENGTH, "0").setBody(null) - .send(); + // TODO 处理反向代理返回结果 + if (sc == 502) { + JsonObject dataJson = new JsonObject( + "{\n" + " \"msg\": \"服务连接失败\",\n" + " \"code\": 502,\n" + " \"data\": \"服务连接失败\"\n" + "}"); + proxyRequest.response().release().setStatusCode(sc).putHeader("content-type", "application/json") + .putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(dataJson.size())) + .setBody(Body.body(dataJson.toBuffer())).send(); + } else { +// proxyRequest.response().release().setStatusCode(sc).putHeader(HttpHeaders.CONTENT_LENGTH, "0").setBody(null) +// .send(); + String commonError = "{\n" + " \"msg\": \"网关执行失败,默认错误信息\",\n" + " \"code\": " + sc + ",\n" + + " \"data\": \"网关执行失败,默认错误信息\"\n" + "}"; + Buffer buffer = Buffer.buffer(commonError); + proxyRequest.response().release().setStatusCode(sc).putHeader("content-type", "application/json") + .putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(commonError.length())) + .setBody(Body.body(buffer)).send(); + } } private Future resolveOrigin(HttpServerRequest proxiedRequest) { @@ -225,39 +252,62 @@ public class ReverseProxy implements HttpProxy { } private Future sendProxyRequest(ProxyRequest proxyRequest) { - // TODO 改造了这个地方 - // 创建一个响应并设置参数 -// ctx.request().response().setStatusCode(200) -// .putHeader("content-type", "text/plain").end(body); + // TODO 服务熔断策略, 如果已经熔断,将剔除负载均衡策略 + // 发起一个请求 - String sacAppHeaderKey = proxyRequest.headers().get(DynamicBuildServer.SAC_APP_HEADER_KEY); + String sacAppHeaderKey = proxyRequest.headers().get(AppConfigServiceImpl.getSacAppHeaderKey()); if (StringUtils.isNotBlank(sacAppHeaderKey) && AppConfigServiceImpl.appDataSecurity(sacAppHeaderKey)) { String body = ctx.getBodyAsString(); - String bodyData = MainSecurity.aesDecrypt(body, "dadddsdfadfadsfa33323223"); + String bodyData = bodyDecrypt(body, sacAppHeaderKey); + VertxConfig vertxConfig = AppConfigServiceImpl.getVertxConfig(); + long timeout = vertxConfig.getHttpClientOptionsConfig() != null + && vertxConfig.getHttpClientOptionsConfig().getTimeout() > 0 + ? vertxConfig.getHttpClientOptionsConfig().getTimeout() + : 1000; + long idleTimeout = vertxConfig.getHttpClientOptionsConfig() != null + && vertxConfig.getHttpClientOptionsConfig().getIdleTimeout() > 0 + ? vertxConfig.getHttpClientOptionsConfig().getTimeout() + : 1000; return Future.future(p -> { - mainWebClient.post(9198, "127.0.0.1", "/vertx/body").sendJson(bodyData, h -> { - if (h.succeeded()) { - // 释放资源 - proxyRequest.release(); - JsonObject responseData = h.result().bodyAsJsonObject(); - log.info("responseData:{}", responseData); - // 加密 - String dataStr = MainSecurity.aesEncrypt(responseData.toString(), "dadddsdfadfadsfa33323223"); - log.info("aesEncrypt dataStr:{}", dataStr); - ctx.request().response().setStatusCode(200).putHeader("content-type", "application/json") - .end(dataStr); - Buffer buffer = Buffer.buffer(dataStr); - ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200) - .putHeader("content-type", "application/json").setBody(Body.body(buffer)); - p.complete(proxyResponse); - } else { - p.fail(h.cause()); - } - }); + SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest()); + mainWebClient.post(socketAddress.port(), socketAddress.host(), proxyRequest.getURI()) + .putHeaders(proxyRequest.headers()).timeout(timeout).idleTimeout(idleTimeout) + .sendJson(bodyData, h -> { + if (h.succeeded()) { + // 释放资源 + proxyRequest.release(); + JsonObject responseData = h.result().bodyAsJsonObject(); + log.info("responseData:{}", responseData); + // 加密 + String dataStr = bodyEncrypt(bodyData, sacAppHeaderKey); + log.info("aesEncrypt dataStr:{}", dataStr); + Buffer buffer = Buffer.buffer(dataStr); + ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200) + .putHeader("content-type", "application/json").setBody(Body.body(buffer)); + p.complete(proxyResponse); + } else { + log.info("error: {}", h.cause()); + if (h.cause() instanceof ConnectException) { + log.info("connection url is error:{}", h.getClass()); + // TODO 是否开启健康检测, 需要做重试,熔断 + ThreadUtil.execAsync(() -> { + String appCode = proxyRequest.proxiedRequest().getHeader(AppConfigServiceImpl.getSacAppHeaderKey()); + AppConfigServiceImpl.addAddressRetryStrategy(socketAddress.hostAddress(), appCode); + }); + } + end(proxyRequest, 502); + } + }); }); } else { Future f = resolveOrigin(proxyRequest.proxiedRequest()); f.onFailure(err -> { + log.info("error:{}", err); + if (err instanceof ConnectException) { + // TODO 配置重试策略 + // Connection refused: /127.0.0.1:9199 + log.info("connection url is error:{}", err.getMessage()); + } // Should this be done here ? I don't think so HttpServerRequest proxiedRequest = proxyRequest.proxiedRequest(); proxiedRequest.resume(); @@ -295,5 +345,29 @@ public class ReverseProxy implements HttpProxy { return sendResponse(); } + + private String bodyEncrypt(String body, String sacAppHeaderKey) { + DataSecurity dataSecurity = AppConfigServiceImpl.getAppConfig(sacAppHeaderKey).getDataSecurity(); + switch (dataSecurity.getAlgorithm()) { + case "AES": + return MainSecurity.aesEncrypt(body, dataSecurity.getPrivateKey()); + default: + break; + } + log.info(" appcode:{}, encrypt key config is error.", sacAppHeaderKey); + throw new HttpException(10001); + } + + private String bodyDecrypt(String body, String sacAppHeaderKey) { + DataSecurity dataSecurity = AppConfigServiceImpl.getAppConfig(sacAppHeaderKey).getDataSecurity(); + switch (dataSecurity.getAlgorithm()) { + case "AES": + return MainSecurity.aesDecrypt(body, dataSecurity.getPrivateKey()); + default: + break; + } + log.info(" appcode:{}, decrypt key config is error.", sacAppHeaderKey); + throw new HttpException(10001); + } } }