From b0fce77c05194d9e3db52b83d99508bbfdeef602 Mon Sep 17 00:00:00 2001 From: ztzh_xieyun Date: Sat, 11 May 2024 17:07:17 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=8E=A5=E5=8F=A3=E8=BD=AC?= =?UTF-8?q?=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io/vertx/httpproxy/impl/ReverseProxy.java | 83 ++++++++++++------- 1 file changed, 53 insertions(+), 30 deletions(-) diff --git a/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java b/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java index ca54f79..ee94b51 100644 --- a/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java +++ b/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java @@ -43,6 +43,7 @@ import io.vertx.core.json.JsonObject; import io.vertx.core.net.NetSocket; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; +import io.vertx.ext.web.client.HttpRequest; import io.vertx.ext.web.client.WebClient; import io.vertx.ext.web.handler.HttpException; import io.vertx.httpproxy.Body; @@ -242,11 +243,10 @@ public class ReverseProxy implements HttpProxy { private Future breakerAndSecurity(CircuitBreaker circuitBreaker, ProxyRequest proxyRequest) { String appCode = proxyRequest.headers().get(AppConfigHandler.getAppCodeHeaderKey()); - SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest()); return Future.future(p -> { circuitBreaker.executeWithFallback(promise -> { - mainWebClient.post(socketAddress.port(), socketAddress.host(), proxyRequest.getURI()) - .putHeaders(proxyRequest.headers()) + HttpRequest requestBuffer = methodGetRequestBuffer(proxyRequest); + requestBuffer.putHeaders(proxyRequest.headers()) .sendJson(bodyDecrypt(ctx.getBodyAsString(), appCode), h -> { if (h.succeeded()) { log.info("==========uri:{},response http code:{}", proxyRequest.getURI(), @@ -305,32 +305,31 @@ public class ReverseProxy implements HttpProxy { } private Future breaker(CircuitBreaker circuitBreaker, ProxyRequest proxyRequest) { - SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest()); return Future.future(p -> { circuitBreaker.executeWithFallback(promise -> { - mainWebClient.post(socketAddress.port(), socketAddress.host(), proxyRequest.getURI()) - .putHeaders(proxyRequest.headers()).sendJson(ctx.getBodyAsString(), h -> { - if (h.succeeded()) { - log.info("==========uri:{},response http code:{}", proxyRequest.getURI(), - h.result().statusCode()); - if (h.result().statusCode() == 200) { - // promise.complete(); - promise.complete("1"); - // 释放资源 - proxyRequest.release(); - JsonObject responseData = h.result().bodyAsJsonObject(); - log.info("responseData:{}", responseData); - ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200) - .putHeader("content-type", "application/json") - .setBody(Body.body(responseData.toBuffer())); - p.complete(proxyResponse); - } else { - promise.fail("2"); - } - } else { - promise.fail("2"); - } - }); + HttpRequest requestBuffer = methodGetRequestBuffer(proxyRequest); + requestBuffer.putHeaders(proxyRequest.headers()).sendJson(ctx.getBodyAsString(), h -> { + if (h.succeeded()) { + log.info("==========uri:{},response http code:{}", proxyRequest.getURI(), + h.result().statusCode()); + if (h.result().statusCode() == 200) { + // promise.complete(); + promise.complete("1"); + // 释放资源 + proxyRequest.release(); + JsonObject responseData = h.result().bodyAsJsonObject(); + log.info("responseData:{}", responseData); + ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200) + .putHeader("content-type", "application/json") + .setBody(Body.body(responseData.toBuffer())); + p.complete(proxyResponse); + } else { + promise.fail("2"); + } + } else { + promise.fail("2"); + } + }); }, v -> { // 需要传递当前状态half-open , close, 还是统计失败次数 log.info(circuitBreaker.name() + " executed when the circuit is opened:{}", v.getMessage()); @@ -353,10 +352,10 @@ public class ReverseProxy implements HttpProxy { private Future security(CircuitBreaker circuitBreaker, ProxyRequest proxyRequest) { String appCode = proxyRequest.headers().get(AppConfigHandler.getAppCodeHeaderKey()); - SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest()); return Future.future(p -> { - mainWebClient.post(socketAddress.port(), socketAddress.host(), proxyRequest.getURI()) - .putHeaders(proxyRequest.headers()).sendJson(bodyDecrypt(ctx.getBodyAsString(), appCode), h -> { + HttpRequest requestBuffer = methodGetRequestBuffer(proxyRequest); + requestBuffer.putHeaders(proxyRequest.headers()).sendJson(bodyDecrypt(ctx.getBodyAsString(), appCode), + h -> { if (h.succeeded()) { log.info("==========uri:{},response http code:{}", proxyRequest.getURI(), h.result().statusCode()); @@ -380,6 +379,30 @@ public class ReverseProxy implements HttpProxy { }); }); } + + private HttpRequest methodGetRequestBuffer(ProxyRequest proxyRequest){ + SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest()); + HttpRequest requestBuffer = null; + switch (proxyRequest.getMethod().name()) { + case "PUT": + requestBuffer = mainWebClient.put(socketAddress.port(), socketAddress.host(), + proxyRequest.getURI()); + break; + case "DELETE": + requestBuffer = mainWebClient.delete(socketAddress.port(), socketAddress.host(), + proxyRequest.getURI()); + break; + case "GET": + requestBuffer = mainWebClient.get(socketAddress.port(), socketAddress.host(), + proxyRequest.getURI()); + break; + default: + requestBuffer = mainWebClient.post(socketAddress.port(), socketAddress.host(), + proxyRequest.getURI()); + break; + } + return requestBuffer; + } private Future sendProxyRequest(ProxyRequest proxyRequest) { // 判断