From 4d295f38cf8484b9f765e4a7c1bce2504e2235ff Mon Sep 17 00:00:00 2001 From: ztzh_xieyun Date: Mon, 29 Apr 2024 16:15:45 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=AF=86=E7=9B=B4=E6=8E=A5=E9=99=90?= =?UTF-8?q?=E6=B5=81=E7=86=94=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../roundRobin/WeightedRoundRobin.java | 8 +-- .../com/sf/vertx/handle/ProxyHandler.java | 6 +- .../com/sf/vertx/handle/ProxyHandlerImpl.java | 10 ++- .../com/sf/vertx/init/DynamicBuildServer.java | 4 +- .../java/com/sf/vertx/utils/ProxyTool.java | 60 +++--------------- .../java/io/vertx/httpproxy/HttpProxy.java | 10 +-- .../io/vertx/httpproxy/impl/ReverseProxy.java | 61 +++++++++---------- .../java/com/sf/vertx/TestCircuitBreaker.java | 60 ++++++++++++++++++ 8 files changed, 121 insertions(+), 98 deletions(-) create mode 100644 sf-vertx/src/test/java/com/sf/vertx/TestCircuitBreaker.java diff --git a/sf-vertx-api/src/main/java/com/sf/vertx/arithmetic/roundRobin/WeightedRoundRobin.java b/sf-vertx-api/src/main/java/com/sf/vertx/arithmetic/roundRobin/WeightedRoundRobin.java index c990d6f..8aa0ee5 100644 --- a/sf-vertx-api/src/main/java/com/sf/vertx/arithmetic/roundRobin/WeightedRoundRobin.java +++ b/sf-vertx-api/src/main/java/com/sf/vertx/arithmetic/roundRobin/WeightedRoundRobin.java @@ -11,9 +11,9 @@ import com.sf.vertx.api.pojo.Node; */ public class WeightedRoundRobin implements SacLoadBalancing { - private static List nodes = new ArrayList<>(); + private List nodes = new ArrayList<>(); // 权重之和 - public static Integer totalWeight = 0; + public Integer totalWeight = 0; // 准备模拟数据 // static { // nodes.add(new Node("192.168.1.101", 1)); @@ -91,7 +91,7 @@ public class WeightedRoundRobin implements SacLoadBalancing { Thread thread = new Thread(() -> { WeightedRoundRobin weightedRoundRobin1 = new WeightedRoundRobin(); weightedRoundRobin1.init(serverAddressList); - for (int i = 1; i <= totalWeight; i++) { + for (int i = 1; i <= weightedRoundRobin1.totalWeight; i++) { Node node = weightedRoundRobin1.selectNode(); System.out.println(Thread.currentThread().getName() + "==第" + i + "次轮询选中[当前权重最大]的节点:" + node + "\n"); } @@ -99,7 +99,7 @@ public class WeightedRoundRobin implements SacLoadBalancing { thread.start(); WeightedRoundRobin weightedRoundRobin2 = new WeightedRoundRobin(); weightedRoundRobin2.init(serverAddressList); - for (int i = 1; i <= totalWeight; i++) { + for (int i = 1; i <= weightedRoundRobin2.totalWeight; i++) { Node node = weightedRoundRobin2.selectNode(); System.out.println(Thread.currentThread().getName() + "==第" + i + "次轮询选中[当前权重最大]的节点:" + node + "\n"); } diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandler.java b/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandler.java index 29d3ba5..98bf9a3 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandler.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandler.java @@ -1,7 +1,9 @@ package com.sf.vertx.handle; +import io.vertx.circuitbreaker.CircuitBreaker; import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.Handler; +import io.vertx.core.Vertx; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.client.WebClient; @@ -14,8 +16,8 @@ import io.vertx.httpproxy.HttpProxy; @VertxGen public interface ProxyHandler extends Handler { - static ProxyHandler create(WebClient mainWebClient, HttpProxy httpProxy) { - return new ProxyHandlerImpl(mainWebClient, httpProxy); + static ProxyHandler create(Vertx vertx,WebClient mainWebClient, HttpProxy httpProxy, CircuitBreaker breaker) { + return new ProxyHandlerImpl(vertx,mainWebClient, httpProxy, breaker); } static ProxyHandler create(HttpProxy httpProxy) { diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandlerImpl.java b/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandlerImpl.java index 34a168f..b158594 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandlerImpl.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandlerImpl.java @@ -1,5 +1,7 @@ package com.sf.vertx.handle; +import io.vertx.circuitbreaker.CircuitBreaker; +import io.vertx.core.Vertx; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.client.WebClient; import io.vertx.httpproxy.HttpProxy; @@ -10,10 +12,14 @@ import io.vertx.httpproxy.HttpProxy; public class ProxyHandlerImpl implements ProxyHandler { private final HttpProxy httpProxy; + private Vertx vertx; + private CircuitBreaker breaker; private WebClient mainWebClient; - public ProxyHandlerImpl(WebClient mainWebClient, HttpProxy httpProxy) { + public ProxyHandlerImpl(Vertx vertx,WebClient mainWebClient, HttpProxy httpProxy, CircuitBreaker breaker) { this.httpProxy = httpProxy; + this.vertx = vertx; + this.breaker = breaker; this.mainWebClient = mainWebClient; } @@ -28,7 +34,7 @@ public class ProxyHandlerImpl implements ProxyHandler { @Override public void handle(RoutingContext ctx) { // TODO 改造了这个地方 - httpProxy.handle(ctx, mainWebClient); + httpProxy.handle(mainWebClient, ctx, vertx, breaker); // 原始代码只有如下一句 // httpProxy.handle(ctx.request()); } diff --git a/sf-vertx/src/main/java/com/sf/vertx/init/DynamicBuildServer.java b/sf-vertx/src/main/java/com/sf/vertx/init/DynamicBuildServer.java index d1979e8..bfb5fa6 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/init/DynamicBuildServer.java +++ b/sf-vertx/src/main/java/com/sf/vertx/init/DynamicBuildServer.java @@ -82,7 +82,7 @@ public class DynamicBuildServer implements ApplicationRunner { Vertx VERTX = Vertx.vertx(vertxOptions); - CircuitBreakerOptions options = new CircuitBreakerOptions().setMaxFailures(5).setTimeout(5000) + CircuitBreakerOptions options = new CircuitBreakerOptions().setMaxFailures(2).setTimeout(1000) .setFallbackOnFailure(true); CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", VERTX, options).openHandler(v -> { @@ -143,7 +143,7 @@ public class DynamicBuildServer implements ApplicationRunner { String rateLimitModel = vertxConfig.getRateLimitModel() != null && StringUtils.equals(vertxConfig.getRateLimitModel(), "redis") ? "redis" : "local"; mainHttpRouter.route().handler(RateLimitHandler.create(rateLimitModel)).handler(BodyHandler.create()) - .handler(ProxyHandler.create(mainWebClient, proxy, breaker)).failureHandler(RestfulFailureHandler.create()); + .handler(ProxyHandler.create(VERTX,mainWebClient, proxy, breaker)).failureHandler(RestfulFailureHandler.create()); // mainHttpRouter.route().handler(BodyHandler.create()).handler(ProxyHandler.create(mainWebClient,proxy)); // 服务健康检测重试 diff --git a/sf-vertx/src/main/java/com/sf/vertx/utils/ProxyTool.java b/sf-vertx/src/main/java/com/sf/vertx/utils/ProxyTool.java index cab26ab..3a33570 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/utils/ProxyTool.java +++ b/sf-vertx/src/main/java/com/sf/vertx/utils/ProxyTool.java @@ -59,57 +59,13 @@ public class ProxyTool { String appCode = request.getHeader(AppConfigServiceImpl.getSacAppHeaderKey()); String appHeaderServiceName = request.getHeader(AppConfigServiceImpl.getAppHeaderServiceName()); log.info("uri:{}, header appCode:{},appHeaderServiceName:{}", request.uri(), appCode, appHeaderServiceName); - AppConfig appConfig = AppConfigServiceImpl.getAppConfig(appCode); - if (appConfig != null) { - SacService sacService = AppConfigServiceImpl.getSacService(appCode, appHeaderServiceName); - if (sacService != null) { - SacLoadBalancing sacLoadBalancing = null; - // 获取service模式 - if (StringUtils.equals(sacService.getServiceModel(), "NORMAL") - || StringUtils.equals(sacService.getServiceModel(), "ROUTE")) { - sacLoadBalancing = AppConfigServiceImpl.getSacLoadBalancing(appCode, appHeaderServiceName); - } else if (sacService.getRouteConfig() != null - && StringUtils.equals(sacService.getRouteConfig().getRouteType(), "HEADER_ROUTE")) { - List nodeList = new ArrayList<>(); - for (RouteContent routeContent : sacService.getRouteConfig().getRouteContent()) { - // 判断是否uri匹配 - String headerRouteKey = request.getHeader(routeContent.getHeaderKey()); - if (routeContent.getHeaderValues() != null - && routeContent.getHeaderValues().contains(headerRouteKey)) { - Node node = new Node(); - node.setIp(routeContent.getServerAddress().getHost()); - node.setPort(routeContent.getServerAddress().getPort()); - node.setWeight(0); - node.setProtocol(sacService.getServerAddress().getProtocol()); - nodeList.add(node); - break; - } - } - - if (nodeList.size() > 0) { - sacLoadBalancing = ProxyTool.roundRobin(nodeList); - } - } - - if (sacLoadBalancing == null) { - log.error("app config error. appCode:{},serviceName:{},RouteType:{}, not find config.", appCode, - appHeaderServiceName, sacService.getRouteConfig().getRouteType()); - throw new HttpException(10000, _ERROR.get(10000)); - } - // TODO 区分https、http - Node node = sacLoadBalancing.selectNode(); - SocketAddress socketAddress = SocketAddress.inetSocketAddress(node.getPort(), node.getIp()); - log.info("sacLoadBalancing address:{},port:{}", socketAddress.host(), socketAddress.port()); - return socketAddress; - } else { - log.error("app config error. appCode:{},serviceName:{}, appCode, serviceName not find config.", appCode, - appHeaderServiceName); - throw new HttpException(10000, _ERROR.get(10000)); - } - } - log.error("app config error. appCode:{},serviceName:{}, appCode, serviceName not find config.", appCode, - appHeaderServiceName); - throw new HttpException(10000, _ERROR.get(10000)); + + SacLoadBalancing sacLoadBalancing = AppConfigServiceImpl.getSacLoadBalancing(appCode, appHeaderServiceName); + // TODO 区分https、http + Node node = sacLoadBalancing.selectNode(); + SocketAddress socketAddress = SocketAddress.inetSocketAddress(node.getPort(), node.getIp()); + log.info("sacLoadBalancing address:{},port:{}", socketAddress.host(), socketAddress.port()); + return socketAddress; } public static boolean regexMatch(String pattern, String target) { @@ -123,7 +79,7 @@ public class ProxyTool { node.setWeight(weight); node.setCurrentWeight(weight); node.setEffectiveWeight(weight); - WeightedRoundRobin.totalWeight += node.getEffectiveWeight(); + weightedRoundRobin.totalWeight += node.getEffectiveWeight(); } weightedRoundRobin.init(nodeList); return weightedRoundRobin; diff --git a/sf-vertx/src/main/java/io/vertx/httpproxy/HttpProxy.java b/sf-vertx/src/main/java/io/vertx/httpproxy/HttpProxy.java index 1c1b2a7..c73902b 100644 --- a/sf-vertx/src/main/java/io/vertx/httpproxy/HttpProxy.java +++ b/sf-vertx/src/main/java/io/vertx/httpproxy/HttpProxy.java @@ -10,11 +10,16 @@ */ package io.vertx.httpproxy; +import java.util.function.BiFunction; +import java.util.function.Function; + +import io.vertx.circuitbreaker.CircuitBreaker; import io.vertx.codegen.annotations.Fluent; import io.vertx.codegen.annotations.GenIgnore; import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Vertx; import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClientRequest; import io.vertx.core.http.HttpServerRequest; @@ -24,9 +29,6 @@ import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.client.WebClient; import io.vertx.httpproxy.impl.ReverseProxy; -import java.util.function.BiFunction; -import java.util.function.Function; - /** * Handles the HTTP reverse proxy logic between the user agent and the origin. *

@@ -118,5 +120,5 @@ public interface HttpProxy extends Handler { */ void handle(HttpServerRequest request); - void handle(RoutingContext ctx, WebClient mainWebClient); + void handle(WebClient mainWebClient, RoutingContext ctx, Vertx vertx, CircuitBreaker breaker); } diff --git a/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java b/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java index 10ee072..c1d9a33 100644 --- a/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java +++ b/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java @@ -21,14 +21,14 @@ import java.util.function.BiFunction; import org.apache.commons.lang3.StringUtils; import com.sf.vertx.api.pojo.DataSecurity; -import com.sf.vertx.api.pojo.VertxConfig; import com.sf.vertx.security.MainSecurity; import com.sf.vertx.service.impl.AppConfigServiceImpl; import com.sf.vertx.utils.ProxyTool; -import cn.hutool.core.thread.ThreadUtil; +import io.vertx.circuitbreaker.CircuitBreaker; import io.vertx.core.Future; import io.vertx.core.Promise; +import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClientRequest; @@ -63,6 +63,8 @@ public class ReverseProxy implements HttpProxy { .failedFuture("No origin available"); private final List interceptors = new ArrayList<>(); private RoutingContext ctx; + private Vertx vertx; + private CircuitBreaker breaker; private WebClient mainWebClient; public ReverseProxy(ProxyOptions options, HttpClient client) { @@ -89,9 +91,11 @@ public class ReverseProxy implements HttpProxy { } @Override - public void handle(RoutingContext ctx, WebClient mainWebClient) { + public void handle(WebClient mainWebClient, RoutingContext ctx, Vertx vertx, CircuitBreaker breaker) { // TODO 改造了这个地方 this.ctx = ctx; + this.vertx = vertx; + this.breaker = breaker; this.mainWebClient = mainWebClient; handle(ctx.request()); } @@ -178,10 +182,10 @@ public class ReverseProxy implements HttpProxy { private void end(ProxyRequest proxyRequest, int sc) { // TODO 处理反向代理返回结果 if(ProxyTool._ERROR.containsKey(sc)) { - JsonObject dataJson = new JsonObject(ProxyTool._ERROR.get(sc)); + Buffer buffer = Buffer.buffer(ProxyTool._ERROR.get(sc)); proxyRequest.response().release().setStatusCode(sc).putHeader("content-type", "application/json") - .putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(dataJson.size())) - .setBody(Body.body(dataJson.toBuffer())).send(); + .putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buffer.length())) + .setBody(Body.body(buffer)).send(); } else { // proxyRequest.response().release().setStatusCode(sc).putHeader(HttpHeaders.CONTENT_LENGTH, "0").setBody(null) // .send(); @@ -250,51 +254,44 @@ public class ReverseProxy implements HttpProxy { private Future sendProxyRequest(ProxyRequest proxyRequest) { // TODO 服务熔断策略, 如果已经熔断,将剔除负载均衡策略 - // 发起一个请求 String sacAppHeaderKey = proxyRequest.headers().get(AppConfigServiceImpl.getSacAppHeaderKey()); if (StringUtils.isNotBlank(sacAppHeaderKey) && AppConfigServiceImpl.appDataSecurity(sacAppHeaderKey)) { - String body = ctx.getBodyAsString(); - String bodyData = bodyDecrypt(body, sacAppHeaderKey); - VertxConfig vertxConfig = AppConfigServiceImpl.getVertxConfig(); - long timeout = vertxConfig.getHttpClientOptionsConfig() != null - && vertxConfig.getHttpClientOptionsConfig().getTimeout() > 0 - ? vertxConfig.getHttpClientOptionsConfig().getTimeout() - : 1000; - long idleTimeout = vertxConfig.getHttpClientOptionsConfig() != null - && vertxConfig.getHttpClientOptionsConfig().getIdleTimeout() > 0 - ? vertxConfig.getHttpClientOptionsConfig().getTimeout() - : 1000; return Future.future(p -> { - SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest()); - mainWebClient.post(socketAddress.port(), socketAddress.host(), proxyRequest.getURI()) - .putHeaders(proxyRequest.headers()).timeout(timeout).idleTimeout(idleTimeout) - .sendJson(bodyData, h -> { + breaker.executeWithFallback(promise -> { + SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest()); + mainWebClient.post(socketAddress.port(), socketAddress.host(), proxyRequest.getURI()) + .putHeaders(proxyRequest.headers()) + .sendJson(bodyDecrypt(ctx.getBodyAsString(), sacAppHeaderKey), h -> { if (h.succeeded()) { // 释放资源 proxyRequest.release(); JsonObject responseData = h.result().bodyAsJsonObject(); log.info("responseData:{}", responseData); // 加密 - String dataStr = bodyEncrypt(bodyData, sacAppHeaderKey); + String dataStr = responseData.toString(); //bodyEncrypt(responseData.toString(), sacAppHeaderKey); log.info("aesEncrypt dataStr:{}", dataStr); Buffer buffer = Buffer.buffer(dataStr); ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200) .putHeader("content-type", "application/json").setBody(Body.body(buffer)); p.complete(proxyResponse); + promise.complete("1"); } else { log.info("error: {}", h.cause()); - if (h.cause() instanceof ConnectException) { - log.info("connection url is error:{}", h.getClass()); - // TODO 是否开启健康检测, 需要做重试,熔断 - ThreadUtil.execAsync(() -> { - String appCode = proxyRequest.proxiedRequest().getHeader(AppConfigServiceImpl.getSacAppHeaderKey()); - AppConfigServiceImpl.addAddressRetryStrategy(socketAddress.hostAddress(), appCode); - }); - } - end(proxyRequest, 502); + promise.fail("2"); } }); + }, v -> { + // Executed when the circuit is opened + log.info("Executed when the circuit is opened:{}", v); + return "3"; + }, ar -> { + // Do something with the result + log.info("failed:{}, Result:{}", ar.failed(),ar.result()); + if(StringUtils.equals(ar.result(), "1") == false) { + end(proxyRequest, 502); + } + }); }); } else { Future f = resolveOrigin(proxyRequest.proxiedRequest()); diff --git a/sf-vertx/src/test/java/com/sf/vertx/TestCircuitBreaker.java b/sf-vertx/src/test/java/com/sf/vertx/TestCircuitBreaker.java new file mode 100644 index 0000000..b3a4a3a --- /dev/null +++ b/sf-vertx/src/test/java/com/sf/vertx/TestCircuitBreaker.java @@ -0,0 +1,60 @@ +package com.sf.vertx; + +import com.sf.vertx.api.pojo.VertxConfig; +import com.sf.vertx.service.impl.AppConfigServiceImpl; + +import io.vertx.circuitbreaker.CircuitBreaker; +import io.vertx.circuitbreaker.CircuitBreakerOptions; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpMethod; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TestCircuitBreaker { + private static int port; + public static void main(String[] args) { + + VertxConfig vertxConfig = AppConfigServiceImpl.getVertxConfig(); + // TODO 编解码线程池,后面优化协程等方式 + VertxOptions vertxOptions = new VertxOptions(); + + Vertx VERTX = Vertx.vertx(vertxOptions); + + CircuitBreakerOptions options = new CircuitBreakerOptions().setMaxFailures(3).setTimeout(5000) + .setFallbackOnFailure(true); + + CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", VERTX, options).openHandler(v -> { + System.out.println("Circuit opened"); + }).closeHandler(v -> { + System.out.println("Circuit closed"); + }); + for (int i = 0; i < 20; i++) { + port = 9199; + if(i % 2 == 0) { + port = 9198; + log.info("i:{},port:{}", i, port); + } + breaker.executeWithFallback(promise -> { + VERTX.createHttpClient().request(HttpMethod.POST, port, "localhost", "/vertx/body").compose(req -> { + return req.send("body").compose(resp -> { + if (resp.statusCode() != 200) { + return Future.failedFuture("HTTP error"); + } else { + log.info("success req:{}", req.getPort()); + return resp.body().map(Buffer::toString); + } + }); + }).onComplete(promise); + }, v -> { + // Executed when the circuit is opened + return "Hello (fallback)"; + }, ar -> { + // Do something with the result + System.out.println("Result: " + ar.result()); + }); + } + } +}