加密直接限流熔断
This commit is contained in:
parent
cc1929eb77
commit
4d295f38cf
@ -11,9 +11,9 @@ import com.sf.vertx.api.pojo.Node;
|
|||||||
*/
|
*/
|
||||||
public class WeightedRoundRobin implements SacLoadBalancing {
|
public class WeightedRoundRobin implements SacLoadBalancing {
|
||||||
|
|
||||||
private static List<Node> nodes = new ArrayList<>();
|
private List<Node> nodes = new ArrayList<>();
|
||||||
// 权重之和
|
// 权重之和
|
||||||
public static Integer totalWeight = 0;
|
public Integer totalWeight = 0;
|
||||||
// 准备模拟数据
|
// 准备模拟数据
|
||||||
// static {
|
// static {
|
||||||
// nodes.add(new Node("192.168.1.101", 1));
|
// nodes.add(new Node("192.168.1.101", 1));
|
||||||
@ -91,7 +91,7 @@ public class WeightedRoundRobin implements SacLoadBalancing {
|
|||||||
Thread thread = new Thread(() -> {
|
Thread thread = new Thread(() -> {
|
||||||
WeightedRoundRobin weightedRoundRobin1 = new WeightedRoundRobin();
|
WeightedRoundRobin weightedRoundRobin1 = new WeightedRoundRobin();
|
||||||
weightedRoundRobin1.init(serverAddressList);
|
weightedRoundRobin1.init(serverAddressList);
|
||||||
for (int i = 1; i <= totalWeight; i++) {
|
for (int i = 1; i <= weightedRoundRobin1.totalWeight; i++) {
|
||||||
Node node = weightedRoundRobin1.selectNode();
|
Node node = weightedRoundRobin1.selectNode();
|
||||||
System.out.println(Thread.currentThread().getName() + "==第" + i + "次轮询选中[当前权重最大]的节点:" + node + "\n");
|
System.out.println(Thread.currentThread().getName() + "==第" + i + "次轮询选中[当前权重最大]的节点:" + node + "\n");
|
||||||
}
|
}
|
||||||
@ -99,7 +99,7 @@ public class WeightedRoundRobin implements SacLoadBalancing {
|
|||||||
thread.start();
|
thread.start();
|
||||||
WeightedRoundRobin weightedRoundRobin2 = new WeightedRoundRobin();
|
WeightedRoundRobin weightedRoundRobin2 = new WeightedRoundRobin();
|
||||||
weightedRoundRobin2.init(serverAddressList);
|
weightedRoundRobin2.init(serverAddressList);
|
||||||
for (int i = 1; i <= totalWeight; i++) {
|
for (int i = 1; i <= weightedRoundRobin2.totalWeight; i++) {
|
||||||
Node node = weightedRoundRobin2.selectNode();
|
Node node = weightedRoundRobin2.selectNode();
|
||||||
System.out.println(Thread.currentThread().getName() + "==第" + i + "次轮询选中[当前权重最大]的节点:" + node + "\n");
|
System.out.println(Thread.currentThread().getName() + "==第" + i + "次轮询选中[当前权重最大]的节点:" + node + "\n");
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
package com.sf.vertx.handle;
|
package com.sf.vertx.handle;
|
||||||
|
|
||||||
|
import io.vertx.circuitbreaker.CircuitBreaker;
|
||||||
import io.vertx.codegen.annotations.VertxGen;
|
import io.vertx.codegen.annotations.VertxGen;
|
||||||
import io.vertx.core.Handler;
|
import io.vertx.core.Handler;
|
||||||
|
import io.vertx.core.Vertx;
|
||||||
import io.vertx.ext.web.Router;
|
import io.vertx.ext.web.Router;
|
||||||
import io.vertx.ext.web.RoutingContext;
|
import io.vertx.ext.web.RoutingContext;
|
||||||
import io.vertx.ext.web.client.WebClient;
|
import io.vertx.ext.web.client.WebClient;
|
||||||
@ -14,8 +16,8 @@ import io.vertx.httpproxy.HttpProxy;
|
|||||||
@VertxGen
|
@VertxGen
|
||||||
public interface ProxyHandler extends Handler<RoutingContext> {
|
public interface ProxyHandler extends Handler<RoutingContext> {
|
||||||
|
|
||||||
static ProxyHandler create(WebClient mainWebClient, HttpProxy httpProxy) {
|
static ProxyHandler create(Vertx vertx,WebClient mainWebClient, HttpProxy httpProxy, CircuitBreaker breaker) {
|
||||||
return new ProxyHandlerImpl(mainWebClient, httpProxy);
|
return new ProxyHandlerImpl(vertx,mainWebClient, httpProxy, breaker);
|
||||||
}
|
}
|
||||||
|
|
||||||
static ProxyHandler create(HttpProxy httpProxy) {
|
static ProxyHandler create(HttpProxy httpProxy) {
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
package com.sf.vertx.handle;
|
package com.sf.vertx.handle;
|
||||||
|
|
||||||
|
import io.vertx.circuitbreaker.CircuitBreaker;
|
||||||
|
import io.vertx.core.Vertx;
|
||||||
import io.vertx.ext.web.RoutingContext;
|
import io.vertx.ext.web.RoutingContext;
|
||||||
import io.vertx.ext.web.client.WebClient;
|
import io.vertx.ext.web.client.WebClient;
|
||||||
import io.vertx.httpproxy.HttpProxy;
|
import io.vertx.httpproxy.HttpProxy;
|
||||||
@ -10,10 +12,14 @@ import io.vertx.httpproxy.HttpProxy;
|
|||||||
public class ProxyHandlerImpl implements ProxyHandler {
|
public class ProxyHandlerImpl implements ProxyHandler {
|
||||||
|
|
||||||
private final HttpProxy httpProxy;
|
private final HttpProxy httpProxy;
|
||||||
|
private Vertx vertx;
|
||||||
|
private CircuitBreaker breaker;
|
||||||
private WebClient mainWebClient;
|
private WebClient mainWebClient;
|
||||||
|
|
||||||
public ProxyHandlerImpl(WebClient mainWebClient, HttpProxy httpProxy) {
|
public ProxyHandlerImpl(Vertx vertx,WebClient mainWebClient, HttpProxy httpProxy, CircuitBreaker breaker) {
|
||||||
this.httpProxy = httpProxy;
|
this.httpProxy = httpProxy;
|
||||||
|
this.vertx = vertx;
|
||||||
|
this.breaker = breaker;
|
||||||
this.mainWebClient = mainWebClient;
|
this.mainWebClient = mainWebClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -28,7 +34,7 @@ public class ProxyHandlerImpl implements ProxyHandler {
|
|||||||
@Override
|
@Override
|
||||||
public void handle(RoutingContext ctx) {
|
public void handle(RoutingContext ctx) {
|
||||||
// TODO 改造了这个地方
|
// TODO 改造了这个地方
|
||||||
httpProxy.handle(ctx, mainWebClient);
|
httpProxy.handle(mainWebClient, ctx, vertx, breaker);
|
||||||
// 原始代码只有如下一句
|
// 原始代码只有如下一句
|
||||||
// httpProxy.handle(ctx.request());
|
// httpProxy.handle(ctx.request());
|
||||||
}
|
}
|
||||||
|
@ -82,7 +82,7 @@ public class DynamicBuildServer implements ApplicationRunner {
|
|||||||
|
|
||||||
Vertx VERTX = Vertx.vertx(vertxOptions);
|
Vertx VERTX = Vertx.vertx(vertxOptions);
|
||||||
|
|
||||||
CircuitBreakerOptions options = new CircuitBreakerOptions().setMaxFailures(5).setTimeout(5000)
|
CircuitBreakerOptions options = new CircuitBreakerOptions().setMaxFailures(2).setTimeout(1000)
|
||||||
.setFallbackOnFailure(true);
|
.setFallbackOnFailure(true);
|
||||||
|
|
||||||
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", VERTX, options).openHandler(v -> {
|
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", VERTX, options).openHandler(v -> {
|
||||||
@ -143,7 +143,7 @@ public class DynamicBuildServer implements ApplicationRunner {
|
|||||||
String rateLimitModel = vertxConfig.getRateLimitModel() != null
|
String rateLimitModel = vertxConfig.getRateLimitModel() != null
|
||||||
&& StringUtils.equals(vertxConfig.getRateLimitModel(), "redis") ? "redis" : "local";
|
&& StringUtils.equals(vertxConfig.getRateLimitModel(), "redis") ? "redis" : "local";
|
||||||
mainHttpRouter.route().handler(RateLimitHandler.create(rateLimitModel)).handler(BodyHandler.create())
|
mainHttpRouter.route().handler(RateLimitHandler.create(rateLimitModel)).handler(BodyHandler.create())
|
||||||
.handler(ProxyHandler.create(mainWebClient, proxy, breaker)).failureHandler(RestfulFailureHandler.create());
|
.handler(ProxyHandler.create(VERTX,mainWebClient, proxy, breaker)).failureHandler(RestfulFailureHandler.create());
|
||||||
// mainHttpRouter.route().handler(BodyHandler.create()).handler(ProxyHandler.create(mainWebClient,proxy));
|
// mainHttpRouter.route().handler(BodyHandler.create()).handler(ProxyHandler.create(mainWebClient,proxy));
|
||||||
|
|
||||||
// 服务健康检测重试
|
// 服务健康检测重试
|
||||||
|
@ -59,57 +59,13 @@ public class ProxyTool {
|
|||||||
String appCode = request.getHeader(AppConfigServiceImpl.getSacAppHeaderKey());
|
String appCode = request.getHeader(AppConfigServiceImpl.getSacAppHeaderKey());
|
||||||
String appHeaderServiceName = request.getHeader(AppConfigServiceImpl.getAppHeaderServiceName());
|
String appHeaderServiceName = request.getHeader(AppConfigServiceImpl.getAppHeaderServiceName());
|
||||||
log.info("uri:{}, header appCode:{},appHeaderServiceName:{}", request.uri(), appCode, appHeaderServiceName);
|
log.info("uri:{}, header appCode:{},appHeaderServiceName:{}", request.uri(), appCode, appHeaderServiceName);
|
||||||
AppConfig appConfig = AppConfigServiceImpl.getAppConfig(appCode);
|
|
||||||
if (appConfig != null) {
|
SacLoadBalancing sacLoadBalancing = AppConfigServiceImpl.getSacLoadBalancing(appCode, appHeaderServiceName);
|
||||||
SacService sacService = AppConfigServiceImpl.getSacService(appCode, appHeaderServiceName);
|
// TODO 区分https、http
|
||||||
if (sacService != null) {
|
Node node = sacLoadBalancing.selectNode();
|
||||||
SacLoadBalancing sacLoadBalancing = null;
|
SocketAddress socketAddress = SocketAddress.inetSocketAddress(node.getPort(), node.getIp());
|
||||||
// 获取service模式
|
log.info("sacLoadBalancing address:{},port:{}", socketAddress.host(), socketAddress.port());
|
||||||
if (StringUtils.equals(sacService.getServiceModel(), "NORMAL")
|
return socketAddress;
|
||||||
|| StringUtils.equals(sacService.getServiceModel(), "ROUTE")) {
|
|
||||||
sacLoadBalancing = AppConfigServiceImpl.getSacLoadBalancing(appCode, appHeaderServiceName);
|
|
||||||
} else if (sacService.getRouteConfig() != null
|
|
||||||
&& StringUtils.equals(sacService.getRouteConfig().getRouteType(), "HEADER_ROUTE")) {
|
|
||||||
List<Node> nodeList = new ArrayList<>();
|
|
||||||
for (RouteContent routeContent : sacService.getRouteConfig().getRouteContent()) {
|
|
||||||
// 判断是否uri匹配
|
|
||||||
String headerRouteKey = request.getHeader(routeContent.getHeaderKey());
|
|
||||||
if (routeContent.getHeaderValues() != null
|
|
||||||
&& routeContent.getHeaderValues().contains(headerRouteKey)) {
|
|
||||||
Node node = new Node();
|
|
||||||
node.setIp(routeContent.getServerAddress().getHost());
|
|
||||||
node.setPort(routeContent.getServerAddress().getPort());
|
|
||||||
node.setWeight(0);
|
|
||||||
node.setProtocol(sacService.getServerAddress().getProtocol());
|
|
||||||
nodeList.add(node);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nodeList.size() > 0) {
|
|
||||||
sacLoadBalancing = ProxyTool.roundRobin(nodeList);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sacLoadBalancing == null) {
|
|
||||||
log.error("app config error. appCode:{},serviceName:{},RouteType:{}, not find config.", appCode,
|
|
||||||
appHeaderServiceName, sacService.getRouteConfig().getRouteType());
|
|
||||||
throw new HttpException(10000, _ERROR.get(10000));
|
|
||||||
}
|
|
||||||
// TODO 区分https、http
|
|
||||||
Node node = sacLoadBalancing.selectNode();
|
|
||||||
SocketAddress socketAddress = SocketAddress.inetSocketAddress(node.getPort(), node.getIp());
|
|
||||||
log.info("sacLoadBalancing address:{},port:{}", socketAddress.host(), socketAddress.port());
|
|
||||||
return socketAddress;
|
|
||||||
} else {
|
|
||||||
log.error("app config error. appCode:{},serviceName:{}, appCode, serviceName not find config.", appCode,
|
|
||||||
appHeaderServiceName);
|
|
||||||
throw new HttpException(10000, _ERROR.get(10000));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log.error("app config error. appCode:{},serviceName:{}, appCode, serviceName not find config.", appCode,
|
|
||||||
appHeaderServiceName);
|
|
||||||
throw new HttpException(10000, _ERROR.get(10000));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean regexMatch(String pattern, String target) {
|
public static boolean regexMatch(String pattern, String target) {
|
||||||
@ -123,7 +79,7 @@ public class ProxyTool {
|
|||||||
node.setWeight(weight);
|
node.setWeight(weight);
|
||||||
node.setCurrentWeight(weight);
|
node.setCurrentWeight(weight);
|
||||||
node.setEffectiveWeight(weight);
|
node.setEffectiveWeight(weight);
|
||||||
WeightedRoundRobin.totalWeight += node.getEffectiveWeight();
|
weightedRoundRobin.totalWeight += node.getEffectiveWeight();
|
||||||
}
|
}
|
||||||
weightedRoundRobin.init(nodeList);
|
weightedRoundRobin.init(nodeList);
|
||||||
return weightedRoundRobin;
|
return weightedRoundRobin;
|
||||||
|
@ -10,11 +10,16 @@
|
|||||||
*/
|
*/
|
||||||
package io.vertx.httpproxy;
|
package io.vertx.httpproxy;
|
||||||
|
|
||||||
|
import java.util.function.BiFunction;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
import io.vertx.circuitbreaker.CircuitBreaker;
|
||||||
import io.vertx.codegen.annotations.Fluent;
|
import io.vertx.codegen.annotations.Fluent;
|
||||||
import io.vertx.codegen.annotations.GenIgnore;
|
import io.vertx.codegen.annotations.GenIgnore;
|
||||||
import io.vertx.codegen.annotations.VertxGen;
|
import io.vertx.codegen.annotations.VertxGen;
|
||||||
import io.vertx.core.Future;
|
import io.vertx.core.Future;
|
||||||
import io.vertx.core.Handler;
|
import io.vertx.core.Handler;
|
||||||
|
import io.vertx.core.Vertx;
|
||||||
import io.vertx.core.http.HttpClient;
|
import io.vertx.core.http.HttpClient;
|
||||||
import io.vertx.core.http.HttpClientRequest;
|
import io.vertx.core.http.HttpClientRequest;
|
||||||
import io.vertx.core.http.HttpServerRequest;
|
import io.vertx.core.http.HttpServerRequest;
|
||||||
@ -24,9 +29,6 @@ import io.vertx.ext.web.RoutingContext;
|
|||||||
import io.vertx.ext.web.client.WebClient;
|
import io.vertx.ext.web.client.WebClient;
|
||||||
import io.vertx.httpproxy.impl.ReverseProxy;
|
import io.vertx.httpproxy.impl.ReverseProxy;
|
||||||
|
|
||||||
import java.util.function.BiFunction;
|
|
||||||
import java.util.function.Function;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handles the HTTP reverse proxy logic between the <i><b>user agent</b></i> and the <i><b>origin</b></i>.
|
* Handles the HTTP reverse proxy logic between the <i><b>user agent</b></i> and the <i><b>origin</b></i>.
|
||||||
* <p>
|
* <p>
|
||||||
@ -118,5 +120,5 @@ public interface HttpProxy extends Handler<HttpServerRequest> {
|
|||||||
*/
|
*/
|
||||||
void handle(HttpServerRequest request);
|
void handle(HttpServerRequest request);
|
||||||
|
|
||||||
void handle(RoutingContext ctx, WebClient mainWebClient);
|
void handle(WebClient mainWebClient, RoutingContext ctx, Vertx vertx, CircuitBreaker breaker);
|
||||||
}
|
}
|
||||||
|
@ -21,14 +21,14 @@ import java.util.function.BiFunction;
|
|||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
import com.sf.vertx.api.pojo.DataSecurity;
|
import com.sf.vertx.api.pojo.DataSecurity;
|
||||||
import com.sf.vertx.api.pojo.VertxConfig;
|
|
||||||
import com.sf.vertx.security.MainSecurity;
|
import com.sf.vertx.security.MainSecurity;
|
||||||
import com.sf.vertx.service.impl.AppConfigServiceImpl;
|
import com.sf.vertx.service.impl.AppConfigServiceImpl;
|
||||||
import com.sf.vertx.utils.ProxyTool;
|
import com.sf.vertx.utils.ProxyTool;
|
||||||
|
|
||||||
import cn.hutool.core.thread.ThreadUtil;
|
import io.vertx.circuitbreaker.CircuitBreaker;
|
||||||
import io.vertx.core.Future;
|
import io.vertx.core.Future;
|
||||||
import io.vertx.core.Promise;
|
import io.vertx.core.Promise;
|
||||||
|
import io.vertx.core.Vertx;
|
||||||
import io.vertx.core.buffer.Buffer;
|
import io.vertx.core.buffer.Buffer;
|
||||||
import io.vertx.core.http.HttpClient;
|
import io.vertx.core.http.HttpClient;
|
||||||
import io.vertx.core.http.HttpClientRequest;
|
import io.vertx.core.http.HttpClientRequest;
|
||||||
@ -63,6 +63,8 @@ public class ReverseProxy implements HttpProxy {
|
|||||||
.failedFuture("No origin available");
|
.failedFuture("No origin available");
|
||||||
private final List<ProxyInterceptor> interceptors = new ArrayList<>();
|
private final List<ProxyInterceptor> interceptors = new ArrayList<>();
|
||||||
private RoutingContext ctx;
|
private RoutingContext ctx;
|
||||||
|
private Vertx vertx;
|
||||||
|
private CircuitBreaker breaker;
|
||||||
private WebClient mainWebClient;
|
private WebClient mainWebClient;
|
||||||
|
|
||||||
public ReverseProxy(ProxyOptions options, HttpClient client) {
|
public ReverseProxy(ProxyOptions options, HttpClient client) {
|
||||||
@ -89,9 +91,11 @@ public class ReverseProxy implements HttpProxy {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handle(RoutingContext ctx, WebClient mainWebClient) {
|
public void handle(WebClient mainWebClient, RoutingContext ctx, Vertx vertx, CircuitBreaker breaker) {
|
||||||
// TODO 改造了这个地方
|
// TODO 改造了这个地方
|
||||||
this.ctx = ctx;
|
this.ctx = ctx;
|
||||||
|
this.vertx = vertx;
|
||||||
|
this.breaker = breaker;
|
||||||
this.mainWebClient = mainWebClient;
|
this.mainWebClient = mainWebClient;
|
||||||
handle(ctx.request());
|
handle(ctx.request());
|
||||||
}
|
}
|
||||||
@ -178,10 +182,10 @@ public class ReverseProxy implements HttpProxy {
|
|||||||
private void end(ProxyRequest proxyRequest, int sc) {
|
private void end(ProxyRequest proxyRequest, int sc) {
|
||||||
// TODO 处理反向代理返回结果
|
// TODO 处理反向代理返回结果
|
||||||
if(ProxyTool._ERROR.containsKey(sc)) {
|
if(ProxyTool._ERROR.containsKey(sc)) {
|
||||||
JsonObject dataJson = new JsonObject(ProxyTool._ERROR.get(sc));
|
Buffer buffer = Buffer.buffer(ProxyTool._ERROR.get(sc));
|
||||||
proxyRequest.response().release().setStatusCode(sc).putHeader("content-type", "application/json")
|
proxyRequest.response().release().setStatusCode(sc).putHeader("content-type", "application/json")
|
||||||
.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(dataJson.size()))
|
.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buffer.length()))
|
||||||
.setBody(Body.body(dataJson.toBuffer())).send();
|
.setBody(Body.body(buffer)).send();
|
||||||
} else {
|
} else {
|
||||||
// proxyRequest.response().release().setStatusCode(sc).putHeader(HttpHeaders.CONTENT_LENGTH, "0").setBody(null)
|
// proxyRequest.response().release().setStatusCode(sc).putHeader(HttpHeaders.CONTENT_LENGTH, "0").setBody(null)
|
||||||
// .send();
|
// .send();
|
||||||
@ -250,51 +254,44 @@ public class ReverseProxy implements HttpProxy {
|
|||||||
|
|
||||||
private Future<ProxyResponse> sendProxyRequest(ProxyRequest proxyRequest) {
|
private Future<ProxyResponse> sendProxyRequest(ProxyRequest proxyRequest) {
|
||||||
// TODO 服务熔断策略, 如果已经熔断,将剔除负载均衡策略
|
// TODO 服务熔断策略, 如果已经熔断,将剔除负载均衡策略
|
||||||
|
|
||||||
// 发起一个请求
|
// 发起一个请求
|
||||||
String sacAppHeaderKey = proxyRequest.headers().get(AppConfigServiceImpl.getSacAppHeaderKey());
|
String sacAppHeaderKey = proxyRequest.headers().get(AppConfigServiceImpl.getSacAppHeaderKey());
|
||||||
if (StringUtils.isNotBlank(sacAppHeaderKey) && AppConfigServiceImpl.appDataSecurity(sacAppHeaderKey)) {
|
if (StringUtils.isNotBlank(sacAppHeaderKey) && AppConfigServiceImpl.appDataSecurity(sacAppHeaderKey)) {
|
||||||
String body = ctx.getBodyAsString();
|
|
||||||
String bodyData = bodyDecrypt(body, sacAppHeaderKey);
|
|
||||||
VertxConfig vertxConfig = AppConfigServiceImpl.getVertxConfig();
|
|
||||||
long timeout = vertxConfig.getHttpClientOptionsConfig() != null
|
|
||||||
&& vertxConfig.getHttpClientOptionsConfig().getTimeout() > 0
|
|
||||||
? vertxConfig.getHttpClientOptionsConfig().getTimeout()
|
|
||||||
: 1000;
|
|
||||||
long idleTimeout = vertxConfig.getHttpClientOptionsConfig() != null
|
|
||||||
&& vertxConfig.getHttpClientOptionsConfig().getIdleTimeout() > 0
|
|
||||||
? vertxConfig.getHttpClientOptionsConfig().getTimeout()
|
|
||||||
: 1000;
|
|
||||||
return Future.future(p -> {
|
return Future.future(p -> {
|
||||||
SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest());
|
breaker.executeWithFallback(promise -> {
|
||||||
mainWebClient.post(socketAddress.port(), socketAddress.host(), proxyRequest.getURI())
|
SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest());
|
||||||
.putHeaders(proxyRequest.headers()).timeout(timeout).idleTimeout(idleTimeout)
|
mainWebClient.post(socketAddress.port(), socketAddress.host(), proxyRequest.getURI())
|
||||||
.sendJson(bodyData, h -> {
|
.putHeaders(proxyRequest.headers())
|
||||||
|
.sendJson(bodyDecrypt(ctx.getBodyAsString(), sacAppHeaderKey), h -> {
|
||||||
if (h.succeeded()) {
|
if (h.succeeded()) {
|
||||||
// 释放资源
|
// 释放资源
|
||||||
proxyRequest.release();
|
proxyRequest.release();
|
||||||
JsonObject responseData = h.result().bodyAsJsonObject();
|
JsonObject responseData = h.result().bodyAsJsonObject();
|
||||||
log.info("responseData:{}", responseData);
|
log.info("responseData:{}", responseData);
|
||||||
// 加密
|
// 加密
|
||||||
String dataStr = bodyEncrypt(bodyData, sacAppHeaderKey);
|
String dataStr = responseData.toString(); //bodyEncrypt(responseData.toString(), sacAppHeaderKey);
|
||||||
log.info("aesEncrypt dataStr:{}", dataStr);
|
log.info("aesEncrypt dataStr:{}", dataStr);
|
||||||
Buffer buffer = Buffer.buffer(dataStr);
|
Buffer buffer = Buffer.buffer(dataStr);
|
||||||
ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200)
|
ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200)
|
||||||
.putHeader("content-type", "application/json").setBody(Body.body(buffer));
|
.putHeader("content-type", "application/json").setBody(Body.body(buffer));
|
||||||
p.complete(proxyResponse);
|
p.complete(proxyResponse);
|
||||||
|
promise.complete("1");
|
||||||
} else {
|
} else {
|
||||||
log.info("error: {}", h.cause());
|
log.info("error: {}", h.cause());
|
||||||
if (h.cause() instanceof ConnectException) {
|
promise.fail("2");
|
||||||
log.info("connection url is error:{}", h.getClass());
|
|
||||||
// TODO 是否开启健康检测, 需要做重试,熔断
|
|
||||||
ThreadUtil.execAsync(() -> {
|
|
||||||
String appCode = proxyRequest.proxiedRequest().getHeader(AppConfigServiceImpl.getSacAppHeaderKey());
|
|
||||||
AppConfigServiceImpl.addAddressRetryStrategy(socketAddress.hostAddress(), appCode);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
end(proxyRequest, 502);
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
}, v -> {
|
||||||
|
// Executed when the circuit is opened
|
||||||
|
log.info("Executed when the circuit is opened:{}", v);
|
||||||
|
return "3";
|
||||||
|
}, ar -> {
|
||||||
|
// Do something with the result
|
||||||
|
log.info("failed:{}, Result:{}", ar.failed(),ar.result());
|
||||||
|
if(StringUtils.equals(ar.result(), "1") == false) {
|
||||||
|
end(proxyRequest, 502);
|
||||||
|
}
|
||||||
|
});
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
Future<HttpClientRequest> f = resolveOrigin(proxyRequest.proxiedRequest());
|
Future<HttpClientRequest> f = resolveOrigin(proxyRequest.proxiedRequest());
|
||||||
|
60
sf-vertx/src/test/java/com/sf/vertx/TestCircuitBreaker.java
Normal file
60
sf-vertx/src/test/java/com/sf/vertx/TestCircuitBreaker.java
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
package com.sf.vertx;
|
||||||
|
|
||||||
|
import com.sf.vertx.api.pojo.VertxConfig;
|
||||||
|
import com.sf.vertx.service.impl.AppConfigServiceImpl;
|
||||||
|
|
||||||
|
import io.vertx.circuitbreaker.CircuitBreaker;
|
||||||
|
import io.vertx.circuitbreaker.CircuitBreakerOptions;
|
||||||
|
import io.vertx.core.Future;
|
||||||
|
import io.vertx.core.Vertx;
|
||||||
|
import io.vertx.core.VertxOptions;
|
||||||
|
import io.vertx.core.buffer.Buffer;
|
||||||
|
import io.vertx.core.http.HttpMethod;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class TestCircuitBreaker {
|
||||||
|
private static int port;
|
||||||
|
public static void main(String[] args) {
|
||||||
|
|
||||||
|
VertxConfig vertxConfig = AppConfigServiceImpl.getVertxConfig();
|
||||||
|
// TODO 编解码线程池,后面优化协程等方式
|
||||||
|
VertxOptions vertxOptions = new VertxOptions();
|
||||||
|
|
||||||
|
Vertx VERTX = Vertx.vertx(vertxOptions);
|
||||||
|
|
||||||
|
CircuitBreakerOptions options = new CircuitBreakerOptions().setMaxFailures(3).setTimeout(5000)
|
||||||
|
.setFallbackOnFailure(true);
|
||||||
|
|
||||||
|
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", VERTX, options).openHandler(v -> {
|
||||||
|
System.out.println("Circuit opened");
|
||||||
|
}).closeHandler(v -> {
|
||||||
|
System.out.println("Circuit closed");
|
||||||
|
});
|
||||||
|
for (int i = 0; i < 20; i++) {
|
||||||
|
port = 9199;
|
||||||
|
if(i % 2 == 0) {
|
||||||
|
port = 9198;
|
||||||
|
log.info("i:{},port:{}", i, port);
|
||||||
|
}
|
||||||
|
breaker.executeWithFallback(promise -> {
|
||||||
|
VERTX.createHttpClient().request(HttpMethod.POST, port, "localhost", "/vertx/body").compose(req -> {
|
||||||
|
return req.send("body").compose(resp -> {
|
||||||
|
if (resp.statusCode() != 200) {
|
||||||
|
return Future.failedFuture("HTTP error");
|
||||||
|
} else {
|
||||||
|
log.info("success req:{}", req.getPort());
|
||||||
|
return resp.body().map(Buffer::toString);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}).onComplete(promise);
|
||||||
|
}, v -> {
|
||||||
|
// Executed when the circuit is opened
|
||||||
|
return "Hello (fallback)";
|
||||||
|
}, ar -> {
|
||||||
|
// Do something with the result
|
||||||
|
System.out.println("Result: " + ar.result());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user