diff --git a/sf-vertx/src/main/java/com/sf/vertx/constans/SacErrorCode.java b/sf-vertx/src/main/java/com/sf/vertx/constans/SacErrorCode.java index 2160364..f2a85a6 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/constans/SacErrorCode.java +++ b/sf-vertx/src/main/java/com/sf/vertx/constans/SacErrorCode.java @@ -35,6 +35,7 @@ public class SacErrorCode { _ERROR.put(10018, "apiCode与uri不匹配"); _ERROR.put(10019, "请求不支持conetnt-type类型"); _ERROR.put(10020, "uri返回mock数据"); + _ERROR.put(10021, "无法找到负载均衡路由节点"); }; public static JsonObject returnErrorMsg(Integer errorCode) { diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/AppConfigHandler.java b/sf-vertx/src/main/java/com/sf/vertx/handle/AppConfigHandler.java index cd3a8bc..f81abd6 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/AppConfigHandler.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/AppConfigHandler.java @@ -3,7 +3,6 @@ package com.sf.vertx.handle; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -16,7 +15,6 @@ import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.TypeReference; import com.hazelcast.config.Config; import com.hazelcast.config.JoinConfig; -import com.hazelcast.config.ManagementCenterConfig; import com.hazelcast.config.NetworkConfig; import com.hazelcast.config.TcpIpConfig; import com.sf.vertx.api.pojo.ApiConfig; @@ -82,16 +80,38 @@ public class AppConfigHandler { // apiCode限流配置 appCode:apiCode - RateLimiterRegistry private static ConcurrentHashMap APICODE_CONFIG_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>(); + // 负载均衡路由类型 appCode:apiCode - routerType + // 执行流程 routerType=
+ // 1、serviceNodel="NORMAL", serviceNodel="ROUTE" and RouteType = "WEIGHT_ROUTE"
+ // return LOADBALANCING_MAP + // 2、serviceNodel="ROUTE", RouteType = "HEADER_ROUTE"
+ // return APICODE_CONFIG_ROUTERCONENT_MAP + private static ConcurrentHashMap APICODE_CONFIG_ROUTERTYPE_MAP = new ConcurrentHashMap<>(); + + private static ConcurrentHashMap> APICODE_CONFIG_ROUTERCONENT_MAP = new ConcurrentHashMap<>(); // apiCode熔断配置 appCode:apiCode - CircuitBreaker private static ConcurrentHashMap APICODE_CONFIG_CIRCUIT_BREAKER_MAP = new ConcurrentHashMap<>(); // 禁用appCode private static ConcurrentHashSet DISABLED_APPCODE = new ConcurrentHashSet(); + + public static Integer routerType(String key) { + return APICODE_CONFIG_ROUTERTYPE_MAP.get(key) != null ? APICODE_CONFIG_ROUTERTYPE_MAP.get(key) : 1; + } + + public static List routerConentList(String key) { + return APICODE_CONFIG_ROUTERCONENT_MAP.get(key); + } + public static Integer requestModel() { return sacVertxConfig.getRequestModel(); } + public static String sacResponseHeaderKey() { + return sacVertxConfig.getSacResponseHeaderKey(); + } + public static String rpcUri() { return sacVertxConfig.getRpcUri(); } @@ -279,8 +299,10 @@ public class AppConfigHandler { if (appConfig.getAppCurrentLimitingConfig() != null) { initRateLimiter(appCode, appConfig.getAppCurrentLimitingConfig(), GLOBAL_APP_CURRENT_LIMITING_MAP); } - + // app router负载均衡 + int routerType = 1; + List routeContentList = null; for (SacService sacService : appConfig.getService()) { List nodeList = new ArrayList<>(); // 获取service模式 @@ -300,10 +322,14 @@ public class AppConfigHandler { node.setPort(routeContent.getServerAddress().getPort()); node.setWeight(routeContent.getWeight() != null && routeContent.getWeight() > 0 ? routeContent.getWeight() - : 0); + : 1); node.setProtocol(routeContent.getServerAddress().getProtocol()); nodeList.add(node); } + } else if (sacService.getRouteConfig() != null + && StringUtils.equals(sacService.getRouteConfig().getRouteType(), "HEADER_ROUTE")) { + routerType = 2; + routeContentList = sacService.getRouteConfig().getRouteContent(); } } @@ -312,10 +338,21 @@ public class AppConfigHandler { for (ApiConfig apiConfig : sacService.getApiConfig()) { String key = appCode + ":" + apiConfig.getApiCode(); APICODE_CONFIG_MAP.put(key, apiConfig); - if (nodeList.size() > 0) { - // 初始化负载均衡算法 - SacLoadBalancing sacLoadBalancing = ProxyTool.roundRobin(nodeList); - LOADBALANCING_MAP.put(key, sacLoadBalancing); + + // 负载均衡模式 + APICODE_CONFIG_ROUTERTYPE_MAP.put(key, routerType); + switch (routerType) { + case 1: + if (nodeList.size() > 0) { + // 初始化负载均衡算法 + SacLoadBalancing sacLoadBalancing = ProxyTool.roundRobin(nodeList); + LOADBALANCING_MAP.put(key, sacLoadBalancing); + } + break; + case 2: + APICODE_CONFIG_ROUTERCONENT_MAP.put(key, routeContentList); + default: + break; } if (apiConfig.getStrategy() != null && apiConfig.getStrategy().size() > 0) { @@ -345,7 +382,7 @@ public class AppConfigHandler { .setFailuresRollingWindow(strategy.getTimeWindow() * 1000) // 毫秒 // .setTimeout(2000) // 超时时间 .setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback) - .setResetTimeout(strategy.getRecovery_interval()) // 在开启状态下,尝试重试之前所需时间 + .setResetTimeout(strategy.getRecovery_interval() * 1000) // 在开启状态下,尝试重试之前所需时间 ).openHandler(v -> { log.info(keyCircuitBreaker + " Circuit open"); }).halfOpenHandler(v -> { diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/RestfulFailureHandlerImpl.java b/sf-vertx/src/main/java/com/sf/vertx/handle/RestfulFailureHandlerImpl.java index 0deebe7..1b2401c 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/RestfulFailureHandlerImpl.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/RestfulFailureHandlerImpl.java @@ -17,10 +17,12 @@ public class RestfulFailureHandlerImpl implements RestfulFailureHandler { public void handle(RoutingContext frc) { int statusCode = 500; JsonObject errorJson = null; + Integer sc = SacErrorCode.DEFAULT_ERROR_CODE; try { Throwable failure = frc.failure(); if (failure instanceof HttpException) { HttpException httpException = (HttpException) failure; + sc = httpException.getStatusCode(); if (StringUtils.isNoneBlank(httpException.getPayload())) { errorJson = new JsonObject(httpException.getPayload()); } else { @@ -28,6 +30,7 @@ public class RestfulFailureHandlerImpl implements RestfulFailureHandler { } } else if (failure instanceof MockException) { MockException httpException = (MockException) failure; + sc = httpException.getStatusCode(); if (StringUtils.isNoneBlank(httpException.getPayload())) { statusCode = 200; errorJson = new JsonObject(httpException.getPayload()); @@ -44,6 +47,7 @@ public class RestfulFailureHandlerImpl implements RestfulFailureHandler { } frc.response().setChunked(true).setStatusCode(statusCode).putHeader("Content-Type", "application/json") + .putHeader(AppConfigHandler.sacResponseHeaderKey(), String.valueOf(sc)) .putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(errorJson.size())).end(errorJson.toBuffer()); return; } diff --git a/sf-vertx/src/main/java/com/sf/vertx/init/SacVertxConfig.java b/sf-vertx/src/main/java/com/sf/vertx/init/SacVertxConfig.java index 196a316..f2472bf 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/init/SacVertxConfig.java +++ b/sf-vertx/src/main/java/com/sf/vertx/init/SacVertxConfig.java @@ -25,6 +25,9 @@ public class SacVertxConfig { @Value("${server.vertx.cluster.portAutoIncrement:false}") private boolean portAutoIncrement; + @Value("${server.vertx.sacResponseHeaderKey:sacErrorCode}") + private String sacResponseHeaderKey; + @Value("${server.vertx.requestModel:2}") private Integer requestModel; diff --git a/sf-vertx/src/main/java/com/sf/vertx/utils/ProxyTool.java b/sf-vertx/src/main/java/com/sf/vertx/utils/ProxyTool.java index 7ac7a60..61ddaa3 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/utils/ProxyTool.java +++ b/sf-vertx/src/main/java/com/sf/vertx/utils/ProxyTool.java @@ -3,13 +3,17 @@ package com.sf.vertx.utils; import java.util.List; import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; + import com.sf.vertx.api.pojo.Node; +import com.sf.vertx.api.pojo.RouteContent; import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing; import com.sf.vertx.arithmetic.roundRobin.WeightedRoundRobin; import com.sf.vertx.handle.AppConfigHandler; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.net.SocketAddress; +import io.vertx.ext.web.handler.HttpException; import lombok.extern.slf4j.Slf4j; /*** @@ -25,12 +29,36 @@ public class ProxyTool { String appCode = request.getHeader(AppConfigHandler.getAppCodeHeaderKey()); String apiCode = request.getHeader(AppConfigHandler.getApiCodeHeaderKey()); log.info("uri:{}, header appCode:{},apiCode:{}", request.uri(), appCode, apiCode); - SacLoadBalancing sacLoadBalancing = AppConfigHandler.getLoadBalancing(appCode + ":" + apiCode); - // TODO 区分https、http - Node node = sacLoadBalancing.selectNode(); - SocketAddress socketAddress = SocketAddress.inetSocketAddress(node.getPort(), node.getIp()); - log.info("sacLoadBalancing address:{},port:{}", socketAddress.host(), socketAddress.port()); - return socketAddress; + // 判断 "routeType": "WEIGHT_ROUTE", // 路由类型 WEIGHT_ROUTE ,HEADER_ROUTE + String key = appCode + ":" + apiCode; + Integer routerType = AppConfigHandler.routerType(key); + SocketAddress socketAddress = null; + switch (routerType) { + case 1: + SacLoadBalancing sacLoadBalancing = AppConfigHandler.getLoadBalancing(key); + Node node = sacLoadBalancing.selectNode(); + socketAddress = SocketAddress.inetSocketAddress(node.getPort(), node.getIp()); + log.info("sacLoadBalancing address:{},port:{}", socketAddress.host(), socketAddress.port()); + return socketAddress; + case 2: + List routeContentList = AppConfigHandler.routerConentList(key); + if(routeContentList != null && routeContentList.size() > 0) { + for (RouteContent routeContent : routeContentList) { + String headerValue = request.getHeader(routeContent.getHeaderKey()); + List headerValues = routeContent.getHeaderValues(); + // String matchType = routeContent.getMatchType(); + if(headerValues.contains(headerValue)) { + socketAddress = SocketAddress.inetSocketAddress(routeContent.getServerAddress().getPort(), routeContent.getServerAddress().getHost()); + log.info("sacLoadBalancing address:{},port:{}", socketAddress.host(), socketAddress.port()); + return socketAddress; + } + } + } + break; + } + + // 抛出异常,无法找到负载均衡node节点 + throw new HttpException(10021); } public static boolean regexMatch(String pattern, String target) { diff --git a/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java b/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java index ee94b51..62ab494 100644 --- a/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java +++ b/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java @@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils; import com.sf.vertx.api.pojo.DataSecurity; import com.sf.vertx.constans.SacErrorCode; import com.sf.vertx.handle.AppConfigHandler; +import com.sf.vertx.init.SacVertxConfig; import com.sf.vertx.security.MainSecurity; import com.sf.vertx.utils.ProxyTool; @@ -181,6 +182,7 @@ public class ReverseProxy implements HttpProxy { private void end(ProxyRequest proxyRequest, int sc) { JsonObject json = SacErrorCode.returnErrorMsg(sc); proxyRequest.response().release().setStatusCode(500).putHeader("content-type", "application/json") + .putHeader(AppConfigHandler.sacResponseHeaderKey(), String.valueOf(sc)) .putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(json.size())).setBody(Body.body(json.toBuffer())) .send(); } @@ -254,28 +256,27 @@ public class ReverseProxy implements HttpProxy { if (h.result().statusCode() == 200) { // promise.complete(); promise.complete("1"); - // 释放资源 - proxyRequest.release(); - JsonObject responseData = h.result().bodyAsJsonObject(); - log.info("responseData:{}", responseData); - // 加密 - String dataStr = bodyEncrypt(responseData.toString(), appCode); - log.info("aesEncrypt dataStr:{}", dataStr); - Buffer buffer = Buffer.buffer(dataStr); - ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200) - .putHeader("content-type", "application/json") - .setBody(Body.body(buffer)); - p.complete(proxyResponse); } else { // Throwable throwable = new Throwable("error port"); // promise.fail(throwable); promise.fail("2"); } + // 释放资源 + proxyRequest.release(); + JsonObject responseData = h.result().bodyAsJsonObject(); + log.info("responseData:{}", responseData); + // 加密 + String dataStr = bodyEncrypt(responseData.toString(), appCode); + log.info("aesEncrypt dataStr:{}", dataStr); + Buffer buffer = Buffer.buffer(dataStr); + ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200) + .putHeader("content-type", "application/json").setBody(Body.body(buffer)); + p.complete(proxyResponse); } else { // end(proxyRequest, 502); // Throwable throwable = new Throwable("error port"); // promise.fail(throwable); - promise.fail("2"); + promise.fail("3"); } }); }, v -> { @@ -283,23 +284,19 @@ public class ReverseProxy implements HttpProxy { log.info(circuitBreaker.name() + " executed when the circuit is opened:{}", v.getMessage()); if (v instanceof HalfOpenCircuitException) { log.info(circuitBreaker.name() + " half open circuit"); + return v.getMessage(); } else if (v instanceof OpenCircuitException) { log.info(circuitBreaker.name() + " open circuit"); } else if (v instanceof NoStackTraceThrowable) { log.info(circuitBreaker.name() + " close circuit"); + return v.getMessage(); } return "3"; }, ar -> { - // Do something with the result log.info(circuitBreaker.name() + " interface failed result.{} ", ar); -// String - if (StringUtils.equals(ar.result(), "1") == false) { + if (StringUtils.equals(ar.result(), "3")) { // 全开,熔断 end(proxyRequest, 10016); } - // Throwable -// if(ar.result() != null) { -// end(proxyRequest, 502); -// } }); }); } @@ -309,25 +306,25 @@ public class ReverseProxy implements HttpProxy { circuitBreaker.executeWithFallback(promise -> { HttpRequest requestBuffer = methodGetRequestBuffer(proxyRequest); requestBuffer.putHeaders(proxyRequest.headers()).sendJson(ctx.getBodyAsString(), h -> { + log.info("==========uri:{},response http code:{}, succeeded:{}", proxyRequest.getURI(), + h.result().statusCode(), h.succeeded()); if (h.succeeded()) { - log.info("==========uri:{},response http code:{}", proxyRequest.getURI(), - h.result().statusCode()); if (h.result().statusCode() == 200) { // promise.complete(); promise.complete("1"); - // 释放资源 - proxyRequest.release(); - JsonObject responseData = h.result().bodyAsJsonObject(); - log.info("responseData:{}", responseData); - ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200) - .putHeader("content-type", "application/json") - .setBody(Body.body(responseData.toBuffer())); - p.complete(proxyResponse); } else { promise.fail("2"); } + // 释放资源 + proxyRequest.release(); + JsonObject responseData = h.result().bodyAsJsonObject(); + log.info("responseData:{}", responseData); + ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200) + .putHeader("content-type", "application/json") + .setBody(Body.body(responseData.toBuffer())); + p.complete(proxyResponse); } else { - promise.fail("2"); + promise.fail("3"); } }); }, v -> { @@ -335,15 +332,17 @@ public class ReverseProxy implements HttpProxy { log.info(circuitBreaker.name() + " executed when the circuit is opened:{}", v.getMessage()); if (v instanceof HalfOpenCircuitException) { log.info(circuitBreaker.name() + " half open circuit"); + return v.getMessage(); } else if (v instanceof OpenCircuitException) { log.info(circuitBreaker.name() + " open circuit"); } else if (v instanceof NoStackTraceThrowable) { log.info(circuitBreaker.name() + " close circuit"); + return v.getMessage(); } return "3"; }, ar -> { log.info(circuitBreaker.name() + " interface failed result.{} ", ar); - if (StringUtils.equals(ar.result(), "1") == false) { + if (StringUtils.equals(ar.result(), "3")) { // 全开,熔断 end(proxyRequest, 10016); } }); @@ -379,26 +378,25 @@ public class ReverseProxy implements HttpProxy { }); }); } - - private HttpRequest methodGetRequestBuffer(ProxyRequest proxyRequest){ + + private HttpRequest methodGetRequestBuffer(ProxyRequest proxyRequest) { SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest()); HttpRequest requestBuffer = null; switch (proxyRequest.getMethod().name()) { case "PUT": - requestBuffer = mainWebClient.put(socketAddress.port(), socketAddress.host(), - proxyRequest.getURI()); + requestBuffer = mainWebClient.put(socketAddress.port(), socketAddress.host(), proxyRequest.getURI()); break; case "DELETE": - requestBuffer = mainWebClient.delete(socketAddress.port(), socketAddress.host(), - proxyRequest.getURI()); + requestBuffer = mainWebClient.delete(socketAddress.port(), socketAddress.host(), proxyRequest.getURI()); + break; + case "HEAD": + requestBuffer = mainWebClient.head(socketAddress.port(), socketAddress.host(), proxyRequest.getURI()); break; case "GET": - requestBuffer = mainWebClient.get(socketAddress.port(), socketAddress.host(), - proxyRequest.getURI()); + requestBuffer = mainWebClient.get(socketAddress.port(), socketAddress.host(), proxyRequest.getURI()); break; default: - requestBuffer = mainWebClient.post(socketAddress.port(), socketAddress.host(), - proxyRequest.getURI()); + requestBuffer = mainWebClient.post(socketAddress.port(), socketAddress.host(), proxyRequest.getURI()); break; } return requestBuffer; diff --git a/sf-vertx/src/main/resources/application.yml b/sf-vertx/src/main/resources/application.yml index 6511868..0a6405d 100644 --- a/sf-vertx/src/main/resources/application.yml +++ b/sf-vertx/src/main/resources/application.yml @@ -4,6 +4,7 @@ server: deploymentMode: 1 # 1:单机 2:集群 requestModel: 2 # 1: 客户端传递uri. 2: uri vertx代理,不对客户端暴露uri rpcUri: /rpc + sacResponseHeaderKey: sacErrorCode environment: dev server: default: