From 9e0de4ba4fc7cfaf4724659acaf9a52690a46d2b Mon Sep 17 00:00:00 2001 From: ztzh_xieyun Date: Tue, 14 May 2024 17:56:05 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E8=B6=85=E6=97=B6=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=20=E6=94=AF=E6=8C=81=E4=B8=8D=E5=90=8C=E8=AF=B7?= =?UTF-8?q?=E6=B1=82=E6=A8=A1=E5=BC=8Fopen,sac?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/sf/vertx/api/pojo/ApiConfig.java | 1 + .../com/sf/vertx/api/pojo/SacService.java | 1 + .../com/sf/vertx/api/pojo/VertxConfig.java | 2 +- .../main/java/com/sf/AdminApplication.java | 3 - .../com/sf/vertx/handle/AppConfigHandler.java | 66 +++++++++++-------- .../handle/ParameterCheckHandlerImpl.java | 5 +- .../com/sf/vertx/init/SacVertxConfig.java | 3 - .../java/com/sf/vertx/utils/ProxyTool.java | 2 +- .../io/vertx/httpproxy/impl/ReverseProxy.java | 26 ++++++-- sf-vertx/src/main/resources/application.yml | 1 - 10 files changed, 67 insertions(+), 43 deletions(-) diff --git a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/ApiConfig.java b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/ApiConfig.java index 5985a7e..e8b4a7b 100644 --- a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/ApiConfig.java +++ b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/ApiConfig.java @@ -11,6 +11,7 @@ public class ApiConfig implements Serializable { private String apiCode; private String uri; private String method; // 大写 + private Long timeout = 3000L; // 超时时间,单位毫秒 private String mockResponse; private List strategy; // 策略 } diff --git a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/SacService.java b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/SacService.java index 55b35a1..2adfb26 100644 --- a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/SacService.java +++ b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/SacService.java @@ -9,6 +9,7 @@ import lombok.Data; public class SacService implements Serializable { private static final long serialVersionUID = -5171112142954536813L; private String serviceName; // 服务名 + private String serviceType; // 服务类型,SAC=SAC规范服务,OPEN=开放服务 private String serviceModel; // 模式, NORMAL, ROUTE private ServerAddress serverAddress; // NORMAL模式的服务地址 private List apiConfig; // request set header sacApiCode diff --git a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/VertxConfig.java b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/VertxConfig.java index 4532615..f71aa38 100644 --- a/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/VertxConfig.java +++ b/sf-vertx-api/src/main/java/com/sf/vertx/api/pojo/VertxConfig.java @@ -10,7 +10,7 @@ public class VertxConfig implements Serializable { private Integer port; // 启动端口 private String appCodeHeaderKey = "sacAppCode"; private String apiCodeHeaderKey = "sacApiCode"; - private String rateLimitModel = "redis"; // local,redis 负载均衡模式 + private String rateLimitModel = "local"; // local,redis 负载均衡模式 private VertxOptionsConfig vertxOptionsConfig; private HttpClientOptionsConfig httpClientOptionsConfig; // 配置Vert端口连接池 private AddressRetryStrategy addressRetryStrategy; diff --git a/sf-vertx/src/main/java/com/sf/AdminApplication.java b/sf-vertx/src/main/java/com/sf/AdminApplication.java index dc7d51c..4407402 100644 --- a/sf-vertx/src/main/java/com/sf/AdminApplication.java +++ b/sf-vertx/src/main/java/com/sf/AdminApplication.java @@ -5,14 +5,11 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.context.ApplicationContext; -import lombok.extern.slf4j.Slf4j; - /** * 启动程序 * * @author ztzh */ -@Slf4j @SpringBootApplication(exclude = { DataSourceAutoConfiguration.class }) public class AdminApplication { // 全局 ApplicationContext 实例,方便 getBean 拿到 Spring/SpringBoot 注入的类实例 diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/AppConfigHandler.java b/sf-vertx/src/main/java/com/sf/vertx/handle/AppConfigHandler.java index ba81bdf..ced4047 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/AppConfigHandler.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/AppConfigHandler.java @@ -80,7 +80,8 @@ public class AppConfigHandler { private static ConcurrentHashMap APICODE_CONFIG_MAP = new ConcurrentHashMap<>(); // apiCode限流配置 appCode:apiCode - RateLimiterRegistry private static ConcurrentHashMap APICODE_CONFIG_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>(); - + // 服务类型, apiCode限流配置 appCode:apiCode - 服务类型,SAC=SAC规范服务,OPEN=开放服务 + private static ConcurrentHashMap APICODE_CONFIG_SERVICE_TYPE_MAP = new ConcurrentHashMap<>(); // 负载均衡路由类型 appCode:apiCode - routerType // 执行流程 routerType=
// 1、serviceNodel="NORMAL", serviceNodel="ROUTE" and RouteType = "WEIGHT_ROUTE" @@ -90,7 +91,7 @@ public class AppConfigHandler { // return APICODE_CONFIG_ROUTERCONENT_MAP private static ConcurrentHashMap APICODE_CONFIG_ROUTERTYPE_MAP = new ConcurrentHashMap<>(); - private static ConcurrentHashMap> APICODE_CONFIG_ROUTERCONENT_MAP = new ConcurrentHashMap<>(); + private static ConcurrentHashMap> APICODE_CONFIG_HEADER_ROUTERCONENT_MAP = new ConcurrentHashMap<>(); // apiCode熔断配置 appCode:apiCode - CircuitBreaker private static ConcurrentHashMap APICODE_CONFIG_CIRCUIT_BREAKER_MAP = new ConcurrentHashMap<>(); @@ -101,12 +102,18 @@ public class AppConfigHandler { return APICODE_CONFIG_ROUTERTYPE_MAP.get(key) != null ? APICODE_CONFIG_ROUTERTYPE_MAP.get(key) : 1; } - public static List routerConentList(String key) { - return APICODE_CONFIG_ROUTERCONENT_MAP.get(key); + public static List routerHeaderConentList(String key) { + return APICODE_CONFIG_HEADER_ROUTERCONENT_MAP.get(key); } - public static Integer requestModel() { - return sacVertxConfig.getRequestModel(); + public static Boolean isServiceTypeOpen(String key) { + return APICODE_CONFIG_SERVICE_TYPE_MAP.get(key) != null + && StringUtils.equals(APICODE_CONFIG_SERVICE_TYPE_MAP.get(key), "OPEN"); + } + + public static Boolean isServiceTypeSac(String key) { + return APICODE_CONFIG_SERVICE_TYPE_MAP.get(key) != null + && StringUtils.equals(APICODE_CONFIG_SERVICE_TYPE_MAP.get(key), "SAC"); } public static String sacResponseHeaderKey() { @@ -199,12 +206,20 @@ public class AppConfigHandler { return LOADBALANCING_MAP.get(key); } - public static ApiConfig getApicodeConfigMap(String key) { + public static ApiConfig getApicodeConfig(String key) { return APICODE_CONFIG_MAP.get(key); } + + public static Long getApicodeConfigTimeOut(String key) { + return APICODE_CONFIG_MAP.get(key) != null ? APICODE_CONFIG_MAP.get(key).getTimeout() : 3000L; + } public static boolean isApicodeUri(String key, String uri, String httpMethod) { - return StringUtils.equals(APICODE_CONFIG_MAP.get(key).getUri(), uri) + String apiCodeUri = APICODE_CONFIG_MAP.get(key).getUri(); + if (StringUtils.equals(apiCodeUri, "*")) { + return true; + } + return StringUtils.equals(apiCodeUri, uri) && StringUtils.equals(httpMethod, APICODE_CONFIG_MAP.get(key).getMethod()); } @@ -264,6 +279,7 @@ public class AppConfigHandler { String key = appCode + ":" + apiConfig.getApiCode(); APICODE_CONFIG_MAP.remove(key); LOADBALANCING_MAP.remove(key); + APICODE_CONFIG_SERVICE_TYPE_MAP.remove(key); APICODE_CONFIG_CURRENT_LIMITING_MAP.remove(key); String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER"; CircuitBreaker circuitBreaker = APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(keyCircuitBreaker); @@ -339,7 +355,8 @@ public class AppConfigHandler { for (ApiConfig apiConfig : sacService.getApiConfig()) { String key = appCode + ":" + apiConfig.getApiCode(); APICODE_CONFIG_MAP.put(key, apiConfig); - + APICODE_CONFIG_SERVICE_TYPE_MAP.put(key, sacService.getServiceType()); + // 负载均衡模式 APICODE_CONFIG_ROUTERTYPE_MAP.put(key, routerType); switch (routerType) { @@ -351,14 +368,13 @@ public class AppConfigHandler { } break; case 2: - APICODE_CONFIG_ROUTERCONENT_MAP.put(key, routeContentList); + APICODE_CONFIG_HEADER_ROUTERCONENT_MAP.put(key, routeContentList); default: break; } if (apiConfig.getStrategy() != null && apiConfig.getStrategy().size() > 0) { for (Strategy strategy : apiConfig.getStrategy()) { - if (StringUtils.equals(strategy.getType(), "CURRENT_LIMITING")) { RateLimiterRegistry registry = createRateLimiter(strategy); SacCurrentLimiting sacCurrentLimiting = new SacCurrentLimiting(); @@ -381,7 +397,7 @@ public class AppConfigHandler { .create(keyCircuitBreaker + "-circuit-breaker", VERTX, new CircuitBreakerOptions().setMaxFailures(strategy.getThreshold()) // 最大失败数 .setFailuresRollingWindow(strategy.getTimeWindow() * 1000) // 毫秒 - // .setTimeout(2000) // 超时时间 + //.setTimeout(apiConfig.getTimeout()) // 超时时间,不要开启, 配置会设置接口超时, 这个参数有bug,半开超时会卡死 .setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback) .setResetTimeout(strategy.getRecovery_interval() * 1000) // 在开启状态下,尝试重试之前所需时间 ).openHandler(v -> { @@ -534,16 +550,14 @@ public class AppConfigHandler { // // 会跳转到 RestfulFailureHandlerImpl // throw new HttpException(10003); // } - - if (AppConfigHandler.requestModel() == 2) { - String appCode = context.request().headers().get(getAppCodeHeaderKey()); - String apiCode = context.request().headers().get(getApiCodeHeaderKey()); - String key = appCode + ":" + apiCode; + String appCode = context.request().headers().get(getAppCodeHeaderKey()); + String apiCode = context.request().headers().get(getApiCodeHeaderKey()); + String key = appCode + ":" + apiCode; + if (isServiceTypeSac(key)) { String uri = APICODE_CONFIG_MAP.get(key).getUri(); String method = APICODE_CONFIG_MAP.get(key).getMethod(); context.request().setURI(uri).setMethod(HttpMethod.valueOf(method)); } - return context.sendRequest(); } @@ -559,19 +573,17 @@ public class AppConfigHandler { } }); WebClient mainWebClient = WebClient.create(VERTX); - String rateLimitModel = vertxConfig.getRateLimitModel(); - rateLimitModel = "local"; - Route route = null; - if (requestModel() == 2) { - route = mainHttpRouter.route(rpcUri()); - } else { - route = mainHttpRouter.route(); - } - route.handler(ParameterCheckHandler.create()).handler(AppRateLimitHandler.create(rateLimitModel)) + Route routeSac = mainHttpRouter.route(rpcUri()); + Route routeOpen = mainHttpRouter.route(); + routeSac.handler(ParameterCheckHandler.create()).handler(AppRateLimitHandler.create(rateLimitModel)) .handler(ApiRateLimitHandler.create(rateLimitModel)).handler(BodyHandler.create()) .handler(ProxyHandler.create(mainWebClient, proxy)).failureHandler(RestfulFailureHandler.create()); // mainHttpRouter.route().handler(ProxyHandler.create(mainWebClient, proxy)); + + routeOpen.handler(ParameterCheckHandler.create()).handler(AppRateLimitHandler.create(rateLimitModel)) + .handler(ApiRateLimitHandler.create(rateLimitModel)).handler(BodyHandler.create()) + .handler(ProxyHandler.create(mainWebClient, proxy)).failureHandler(RestfulFailureHandler.create()); } /*** diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/ParameterCheckHandlerImpl.java b/sf-vertx/src/main/java/com/sf/vertx/handle/ParameterCheckHandlerImpl.java index e47a24d..85e370b 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/ParameterCheckHandlerImpl.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/ParameterCheckHandlerImpl.java @@ -12,13 +12,12 @@ public class ParameterCheckHandlerImpl implements ParameterCheckHandler { @Override public void handle(RoutingContext rc) { try { - // TODO 测试异常 String appCode = rc.request().headers().get(AppConfigHandler.getAppCodeHeaderKey()); String apiCode = rc.request().headers().get(AppConfigHandler.getApiCodeHeaderKey()); String key = appCode + ":" + apiCode; if (StringUtils.isBlank(appCode) || StringUtils.isBlank(apiCode) || AppConfigHandler.getAppConfig(appCode) == null - || AppConfigHandler.getApicodeConfigMap(key) == null) { + || AppConfigHandler.getApicodeConfig(key) == null) { rc.fail(new HttpException(10012)); return; } @@ -31,7 +30,7 @@ public class ParameterCheckHandlerImpl implements ParameterCheckHandler { // 通过请求模式来区分 String uri = rc.request().uri(); String httpMethod = rc.request().method().name(); - if(AppConfigHandler.requestModel() == 1 && AppConfigHandler.isApicodeUri(key, uri, httpMethod) == false) { + if(AppConfigHandler.isServiceTypeOpen(key) && AppConfigHandler.isApicodeUri(key, uri, httpMethod) == false) { rc.fail(new HttpException(10018)); return; } diff --git a/sf-vertx/src/main/java/com/sf/vertx/init/SacVertxConfig.java b/sf-vertx/src/main/java/com/sf/vertx/init/SacVertxConfig.java index 2744792..799c18f 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/init/SacVertxConfig.java +++ b/sf-vertx/src/main/java/com/sf/vertx/init/SacVertxConfig.java @@ -28,9 +28,6 @@ public class SacVertxConfig { @Value("${server.vertx.sacResponseHeaderKey:sacErrorCode}") private String sacResponseHeaderKey; - @Value("${server.vertx.requestModel:2}") - private Integer requestModel; - @Value("${server.vertx.rpcUri:/rpc}") private String rpcUri; diff --git a/sf-vertx/src/main/java/com/sf/vertx/utils/ProxyTool.java b/sf-vertx/src/main/java/com/sf/vertx/utils/ProxyTool.java index 5113e96..e5f8305 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/utils/ProxyTool.java +++ b/sf-vertx/src/main/java/com/sf/vertx/utils/ProxyTool.java @@ -39,7 +39,7 @@ public class ProxyTool { log.info("sacLoadBalancing address:{},port:{}", socketAddress.host(), socketAddress.port()); return socketAddress; case 2: - List routeContentList = AppConfigHandler.routerConentList(key); + List routeContentList = AppConfigHandler.routerHeaderConentList(key); if(routeContentList != null && routeContentList.size() > 0) { for (RouteContent routeContent : routeContentList) { String headerValue = request.getHeader(routeContent.getHeaderKey()); diff --git a/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java b/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java index 62ab494..a50dbff 100644 --- a/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java +++ b/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java @@ -22,7 +22,6 @@ import org.apache.commons.lang3.StringUtils; import com.sf.vertx.api.pojo.DataSecurity; import com.sf.vertx.constans.SacErrorCode; import com.sf.vertx.handle.AppConfigHandler; -import com.sf.vertx.init.SacVertxConfig; import com.sf.vertx.security.MainSecurity; import com.sf.vertx.utils.ProxyTool; @@ -245,9 +244,12 @@ public class ReverseProxy implements HttpProxy { private Future breakerAndSecurity(CircuitBreaker circuitBreaker, ProxyRequest proxyRequest) { String appCode = proxyRequest.headers().get(AppConfigHandler.getAppCodeHeaderKey()); + String apiCode = proxyRequest.headers().get(AppConfigHandler.getApiCodeHeaderKey()); + String key = appCode + ":" + apiCode; return Future.future(p -> { circuitBreaker.executeWithFallback(promise -> { HttpRequest requestBuffer = methodGetRequestBuffer(proxyRequest); + requestBuffer.timeout(AppConfigHandler.getApicodeConfigTimeOut(key)); requestBuffer.putHeaders(proxyRequest.headers()) .sendJson(bodyDecrypt(ctx.getBodyAsString(), appCode), h -> { if (h.succeeded()) { @@ -302,13 +304,17 @@ public class ReverseProxy implements HttpProxy { } private Future breaker(CircuitBreaker circuitBreaker, ProxyRequest proxyRequest) { + String appCode = proxyRequest.headers().get(AppConfigHandler.getAppCodeHeaderKey()); + String apiCode = proxyRequest.headers().get(AppConfigHandler.getApiCodeHeaderKey()); + String key = appCode + ":" + apiCode; return Future.future(p -> { circuitBreaker.executeWithFallback(promise -> { HttpRequest requestBuffer = methodGetRequestBuffer(proxyRequest); + requestBuffer.timeout(AppConfigHandler.getApicodeConfigTimeOut(key)); requestBuffer.putHeaders(proxyRequest.headers()).sendJson(ctx.getBodyAsString(), h -> { - log.info("==========uri:{},response http code:{}, succeeded:{}", proxyRequest.getURI(), - h.result().statusCode(), h.succeeded()); if (h.succeeded()) { + log.info("==========uri:{},response http code:{}", proxyRequest.getURI(), + h.result().statusCode()); if (h.result().statusCode() == 200) { // promise.complete(); promise.complete("1"); @@ -351,8 +357,11 @@ public class ReverseProxy implements HttpProxy { private Future security(CircuitBreaker circuitBreaker, ProxyRequest proxyRequest) { String appCode = proxyRequest.headers().get(AppConfigHandler.getAppCodeHeaderKey()); + String apiCode = proxyRequest.headers().get(AppConfigHandler.getApiCodeHeaderKey()); + String key = appCode + ":" + apiCode; return Future.future(p -> { HttpRequest requestBuffer = methodGetRequestBuffer(proxyRequest); + requestBuffer.timeout(AppConfigHandler.getApicodeConfigTimeOut(key)); requestBuffer.putHeaders(proxyRequest.headers()).sendJson(bodyDecrypt(ctx.getBodyAsString(), appCode), h -> { if (h.succeeded()) { @@ -451,9 +460,18 @@ public class ReverseProxy implements HttpProxy { } private Future sendProxyRequest(ProxyRequest proxyRequest, HttpClientRequest request) { + String appCode = proxyRequest.headers().get(AppConfigHandler.getAppCodeHeaderKey()); + String apiCode = proxyRequest.headers().get(AppConfigHandler.getApiCodeHeaderKey()); + String key = appCode + ":" + apiCode; + request.setTimeout(AppConfigHandler.getApicodeConfigTimeOut(key)); Future fut = proxyRequest.send(request); fut.onFailure(err -> { - proxyRequest.proxiedRequest().response().setStatusCode(502).end(); + JsonObject json = SacErrorCode.returnErrorMsg(502); + proxyRequest.response().release().setStatusCode(500).putHeader("content-type", "application/json") + .putHeader(AppConfigHandler.sacResponseHeaderKey(), String.valueOf(502)) + .putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(json.size())).setBody(Body.body(json.toBuffer())) + .send(); + //proxyRequest.proxiedRequest().response().setStatusCode(502).end(); }); return fut; } diff --git a/sf-vertx/src/main/resources/application.yml b/sf-vertx/src/main/resources/application.yml index adb9c17..62896e5 100644 --- a/sf-vertx/src/main/resources/application.yml +++ b/sf-vertx/src/main/resources/application.yml @@ -2,7 +2,6 @@ server: vertx: deploymentMode: 1 # 1:单机 2:集群 - requestModel: 2 # 1: 客户端传递uri. 2: uri vertx代理,不对客户端暴露uri isSSL: false # vertx https或者http启动, ssl 证书有过期时间 rpcUri: /rpc sacResponseHeaderKey: sacErrorCode