From 78f816ce3e065dac5eb42284a9358a23d6c972ad Mon Sep 17 00:00:00 2001 From: ztzh_xieyun Date: Tue, 30 Apr 2024 09:25:34 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=86=97=E4=BD=99=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/sf/vertx/handle/ProxyHandler.java | 6 +- .../com/sf/vertx/handle/ProxyHandlerImpl.java | 7 +-- .../com/sf/vertx/init/DynamicBuildServer.java | 38 ++++++++---- .../service/impl/AppConfigServiceImpl.java | 2 +- .../java/io/vertx/httpproxy/HttpProxy.java | 3 +- .../io/vertx/httpproxy/impl/ReverseProxy.java | 59 ++++++++++--------- 6 files changed, 64 insertions(+), 51 deletions(-) diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandler.java b/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandler.java index 98bf9a3..7ba60fd 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandler.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandler.java @@ -3,8 +3,6 @@ package com.sf.vertx.handle; import io.vertx.circuitbreaker.CircuitBreaker; import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.Handler; -import io.vertx.core.Vertx; -import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.client.WebClient; import io.vertx.httpproxy.HttpProxy; @@ -16,8 +14,8 @@ import io.vertx.httpproxy.HttpProxy; @VertxGen public interface ProxyHandler extends Handler { - static ProxyHandler create(Vertx vertx,WebClient mainWebClient, HttpProxy httpProxy, CircuitBreaker breaker) { - return new ProxyHandlerImpl(vertx,mainWebClient, httpProxy, breaker); + static ProxyHandler create(WebClient mainWebClient, HttpProxy httpProxy, CircuitBreaker breaker) { + return new ProxyHandlerImpl(mainWebClient, httpProxy, breaker); } static ProxyHandler create(HttpProxy httpProxy) { diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandlerImpl.java b/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandlerImpl.java index b158594..bbd861e 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandlerImpl.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandlerImpl.java @@ -1,7 +1,6 @@ package com.sf.vertx.handle; import io.vertx.circuitbreaker.CircuitBreaker; -import io.vertx.core.Vertx; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.client.WebClient; import io.vertx.httpproxy.HttpProxy; @@ -12,13 +11,11 @@ import io.vertx.httpproxy.HttpProxy; public class ProxyHandlerImpl implements ProxyHandler { private final HttpProxy httpProxy; - private Vertx vertx; private CircuitBreaker breaker; private WebClient mainWebClient; - public ProxyHandlerImpl(Vertx vertx,WebClient mainWebClient, HttpProxy httpProxy, CircuitBreaker breaker) { + public ProxyHandlerImpl(WebClient mainWebClient, HttpProxy httpProxy, CircuitBreaker breaker) { this.httpProxy = httpProxy; - this.vertx = vertx; this.breaker = breaker; this.mainWebClient = mainWebClient; } @@ -34,7 +31,7 @@ public class ProxyHandlerImpl implements ProxyHandler { @Override public void handle(RoutingContext ctx) { // TODO 改造了这个地方 - httpProxy.handle(mainWebClient, ctx, vertx, breaker); + httpProxy.handle(mainWebClient, ctx, breaker); // 原始代码只有如下一句 // httpProxy.handle(ctx.request()); } diff --git a/sf-vertx/src/main/java/com/sf/vertx/init/DynamicBuildServer.java b/sf-vertx/src/main/java/com/sf/vertx/init/DynamicBuildServer.java index bfb5fa6..36d5171 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/init/DynamicBuildServer.java +++ b/sf-vertx/src/main/java/com/sf/vertx/init/DynamicBuildServer.java @@ -25,9 +25,14 @@ import io.vertx.circuitbreaker.CircuitBreakerOptions; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; +import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.JsonObject; +import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.Router; import io.vertx.ext.web.client.WebClient; import io.vertx.httpproxy.Body; @@ -82,14 +87,14 @@ public class DynamicBuildServer implements ApplicationRunner { Vertx VERTX = Vertx.vertx(vertxOptions); - CircuitBreakerOptions options = new CircuitBreakerOptions().setMaxFailures(2).setTimeout(1000) - .setFallbackOnFailure(true); - - CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", VERTX, options).openHandler(v -> { - System.out.println("Circuit opened"); - }).closeHandler(v -> { - System.out.println("Circuit closed"); - }); + CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", VERTX, + new CircuitBreakerOptions().setMaxFailures(5).setMaxRetries(5).setTimeout(2000) + .setFallbackOnFailure(true) + ).openHandler(v -> { + log.info("Circuit opened"); + }).closeHandler(v -> { + log.info("Circuit closed"); + }).retryPolicy(retryCount -> retryCount * 100L); // 创建HTTP监听 // 所有ip都能访问 @@ -143,7 +148,8 @@ public class DynamicBuildServer implements ApplicationRunner { String rateLimitModel = vertxConfig.getRateLimitModel() != null && StringUtils.equals(vertxConfig.getRateLimitModel(), "redis") ? "redis" : "local"; mainHttpRouter.route().handler(RateLimitHandler.create(rateLimitModel)).handler(BodyHandler.create()) - .handler(ProxyHandler.create(VERTX,mainWebClient, proxy, breaker)).failureHandler(RestfulFailureHandler.create()); + .handler(ProxyHandler.create(mainWebClient, proxy, breaker)) + .failureHandler(RestfulFailureHandler.create()); // mainHttpRouter.route().handler(BodyHandler.create()).handler(ProxyHandler.create(mainWebClient,proxy)); // 服务健康检测重试 @@ -165,7 +171,19 @@ public class DynamicBuildServer implements ApplicationRunner { // // } // }); - + + breaker.executeWithFallback(promise -> { + promise.complete("1"); + }, v -> { + // Executed when the circuit is opened + log.info("Executed when the circuit is opened:{}", v); + return "3"; + }, ar -> { + // Do something with the result + log.info("failed:{}, Result:{}", ar.failed(), ar.result()); + if (StringUtils.equals(ar.result(), "1") == false) { + } + }); } private void loadVertxOptions(VertxConfig vertxConfig, VertxOptions vertxOptions) { diff --git a/sf-vertx/src/main/java/com/sf/vertx/service/impl/AppConfigServiceImpl.java b/sf-vertx/src/main/java/com/sf/vertx/service/impl/AppConfigServiceImpl.java index 6771b02..236a23c 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/service/impl/AppConfigServiceImpl.java +++ b/sf-vertx/src/main/java/com/sf/vertx/service/impl/AppConfigServiceImpl.java @@ -150,7 +150,7 @@ public class AppConfigServiceImpl implements AppConfigService { node.setWeight(routeContent.getWeight() != null && routeContent.getWeight() > 0 ? routeContent.getWeight() : 0); - node.setProtocol(sacService.getServerAddress().getProtocol()); + node.setProtocol(routeContent.getServerAddress().getProtocol()); nodeList.add(node); } } diff --git a/sf-vertx/src/main/java/io/vertx/httpproxy/HttpProxy.java b/sf-vertx/src/main/java/io/vertx/httpproxy/HttpProxy.java index c73902b..816a9a3 100644 --- a/sf-vertx/src/main/java/io/vertx/httpproxy/HttpProxy.java +++ b/sf-vertx/src/main/java/io/vertx/httpproxy/HttpProxy.java @@ -19,7 +19,6 @@ import io.vertx.codegen.annotations.GenIgnore; import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.Future; import io.vertx.core.Handler; -import io.vertx.core.Vertx; import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClientRequest; import io.vertx.core.http.HttpServerRequest; @@ -120,5 +119,5 @@ public interface HttpProxy extends Handler { */ void handle(HttpServerRequest request); - void handle(WebClient mainWebClient, RoutingContext ctx, Vertx vertx, CircuitBreaker breaker); + void handle(WebClient mainWebClient, RoutingContext ctx, CircuitBreaker breaker); } diff --git a/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java b/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java index c1d9a33..9b3ec9d 100644 --- a/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java +++ b/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java @@ -28,7 +28,6 @@ import com.sf.vertx.utils.ProxyTool; import io.vertx.circuitbreaker.CircuitBreaker; import io.vertx.core.Future; import io.vertx.core.Promise; -import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClientRequest; @@ -63,7 +62,6 @@ public class ReverseProxy implements HttpProxy { .failedFuture("No origin available"); private final List interceptors = new ArrayList<>(); private RoutingContext ctx; - private Vertx vertx; private CircuitBreaker breaker; private WebClient mainWebClient; @@ -91,10 +89,9 @@ public class ReverseProxy implements HttpProxy { } @Override - public void handle(WebClient mainWebClient, RoutingContext ctx, Vertx vertx, CircuitBreaker breaker) { + public void handle(WebClient mainWebClient, RoutingContext ctx, CircuitBreaker breaker) { // TODO 改造了这个地方 this.ctx = ctx; - this.vertx = vertx; this.breaker = breaker; this.mainWebClient = mainWebClient; handle(ctx.request()); @@ -181,11 +178,11 @@ public class ReverseProxy implements HttpProxy { */ private void end(ProxyRequest proxyRequest, int sc) { // TODO 处理反向代理返回结果 - if(ProxyTool._ERROR.containsKey(sc)) { + if (ProxyTool._ERROR.containsKey(sc)) { Buffer buffer = Buffer.buffer(ProxyTool._ERROR.get(sc)); proxyRequest.response().release().setStatusCode(sc).putHeader("content-type", "application/json") - .putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buffer.length())) - .setBody(Body.body(buffer)).send(); + .putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buffer.length())).setBody(Body.body(buffer)) + .send(); } else { // proxyRequest.response().release().setStatusCode(sc).putHeader(HttpHeaders.CONTENT_LENGTH, "0").setBody(null) // .send(); @@ -261,34 +258,37 @@ public class ReverseProxy implements HttpProxy { breaker.executeWithFallback(promise -> { SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest()); mainWebClient.post(socketAddress.port(), socketAddress.host(), proxyRequest.getURI()) - .putHeaders(proxyRequest.headers()) - .sendJson(bodyDecrypt(ctx.getBodyAsString(), sacAppHeaderKey), h -> { - if (h.succeeded()) { - // 释放资源 - proxyRequest.release(); - JsonObject responseData = h.result().bodyAsJsonObject(); - log.info("responseData:{}", responseData); - // 加密 - String dataStr = responseData.toString(); //bodyEncrypt(responseData.toString(), sacAppHeaderKey); - log.info("aesEncrypt dataStr:{}", dataStr); - Buffer buffer = Buffer.buffer(dataStr); - ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200) - .putHeader("content-type", "application/json").setBody(Body.body(buffer)); - p.complete(proxyResponse); - promise.complete("1"); - } else { - log.info("error: {}", h.cause()); - promise.fail("2"); - } - }); + .putHeaders(proxyRequest.headers()) + .sendJson(bodyDecrypt(ctx.getBodyAsString(), sacAppHeaderKey), h -> { + if (h.succeeded()) { + log.info("begin date:{}", System.currentTimeMillis()); + // 释放资源 + proxyRequest.release(); + JsonObject responseData = h.result().bodyAsJsonObject(); + log.info("responseData:{}", responseData); + // 加密 + String dataStr = bodyEncrypt(responseData.toString(), sacAppHeaderKey); + log.info("aesEncrypt dataStr:{}", dataStr); + Buffer buffer = Buffer.buffer(dataStr); + ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200) + .putHeader("content-type", "application/json") + .setBody(Body.body(buffer)); + p.complete(proxyResponse); + promise.complete("1"); + log.info("end date:{}", System.currentTimeMillis()); + } else { + log.info("error: {}", h.cause()); + promise.fail("2"); + } + }); }, v -> { // Executed when the circuit is opened log.info("Executed when the circuit is opened:{}", v); return "3"; }, ar -> { // Do something with the result - log.info("failed:{}, Result:{}", ar.failed(),ar.result()); - if(StringUtils.equals(ar.result(), "1") == false) { + log.info("failed:{}, Result:{}", ar.failed(), ar.result()); + if (StringUtils.equals(ar.result(), "1") == false) { end(proxyRequest, 502); } }); @@ -312,6 +312,7 @@ public class ReverseProxy implements HttpProxy { end(proxyRequest, 502); }); }); + return f.compose(a -> sendProxyRequest(proxyRequest, a)); }