保留了ReverseProxy反向代理区分body请求,这块代码可以去掉
ReverseProxy 源码部分仅使用了连接超时 request.setTimeout(AppConfigHandler.getApicodeConfigTimeOut(key)); vertx有一个使用问题,一定要注意, 异常情况可能不会抛出, 导致后台一直不返回. 很可能是业务代码逻辑问题, 比如空异常等等
This commit is contained in:
parent
17da90448e
commit
6fac2783bf
@ -31,6 +31,7 @@ import com.sf.vertx.enums.GatewayServiceType;
|
||||
import com.sf.vertx.httpproxy.HttpProxy;
|
||||
import com.sf.vertx.init.SacVertxConfig;
|
||||
import com.sf.vertx.pojo.ClusterEventMsg;
|
||||
import com.sf.vertx.pojo.SacCircuitBreaker;
|
||||
import com.sf.vertx.pojo.SacCurrentLimiting;
|
||||
import com.sf.vertx.security.MainSecurity;
|
||||
import com.sf.vertx.utils.ProxyTool;
|
||||
@ -96,7 +97,7 @@ public class AppConfigHandler {
|
||||
|
||||
private static ConcurrentHashMap<String, List<RouteContent>> APICODE_CONFIG_HEADER_ROUTERCONENT_MAP = new ConcurrentHashMap<>();
|
||||
// apiCode熔断配置 appCode:apiCode - CircuitBreaker
|
||||
private static ConcurrentHashMap<String, CircuitBreaker> APICODE_CONFIG_CIRCUIT_BREAKER_MAP = new ConcurrentHashMap<>();
|
||||
private static ConcurrentHashMap<String, SacCircuitBreaker> APICODE_CONFIG_CIRCUIT_BREAKER_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
// apicode uri = * - appConfig
|
||||
private static ConcurrentHashMap<String, AppConfig> APICODE_APPCONFIG_MAP = new ConcurrentHashMap<>();
|
||||
@ -118,12 +119,12 @@ public class AppConfigHandler {
|
||||
|
||||
public static Boolean isServiceTypeOpen(String key) {
|
||||
return APICODE_CONFIG_SERVICE_TYPE_MAP.get(key) != null
|
||||
&& StringUtils.equals(APICODE_CONFIG_SERVICE_TYPE_MAP.get(key), "OPEN");
|
||||
&& StringUtils.equals(APICODE_CONFIG_SERVICE_TYPE_MAP.get(key), GatewayServiceType.OPEN.getCode());
|
||||
}
|
||||
|
||||
public static Boolean isServiceTypeSac(String key) {
|
||||
return APICODE_CONFIG_SERVICE_TYPE_MAP.get(key) != null
|
||||
&& StringUtils.equals(APICODE_CONFIG_SERVICE_TYPE_MAP.get(key), "SAC");
|
||||
&& StringUtils.equals(APICODE_CONFIG_SERVICE_TYPE_MAP.get(key), GatewayServiceType.SAC.getCode());
|
||||
}
|
||||
|
||||
public static String getServiceType(String key) {
|
||||
@ -171,7 +172,7 @@ public class AppConfigHandler {
|
||||
return APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key) != null;
|
||||
}
|
||||
|
||||
public static CircuitBreaker getApiCodeCircuitBreaker(String key) {
|
||||
public static SacCircuitBreaker getApiCodeCircuitBreaker(String key) {
|
||||
return APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key);
|
||||
}
|
||||
|
||||
@ -273,9 +274,11 @@ public class AppConfigHandler {
|
||||
APICODE_CONFIG_CURRENT_LIMITING_MAP.remove(key);
|
||||
APICODE_APPCONFIG_MAP.remove(apiConfig.getApiCode());
|
||||
String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER";
|
||||
CircuitBreaker circuitBreaker = APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(keyCircuitBreaker);
|
||||
if (circuitBreaker != null) {
|
||||
circuitBreaker.close();
|
||||
SacCircuitBreaker sacCircuitBreaker = APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key);
|
||||
if (sacCircuitBreaker != null) {
|
||||
if(sacCircuitBreaker.getCircuitBreaker() != null) {
|
||||
sacCircuitBreaker.getCircuitBreaker().close();
|
||||
}
|
||||
APICODE_CONFIG_CIRCUIT_BREAKER_MAP.remove(keyCircuitBreaker);
|
||||
}
|
||||
}
|
||||
@ -351,7 +354,7 @@ public class AppConfigHandler {
|
||||
}
|
||||
|
||||
// OPEN模式, 域名映射
|
||||
if (isServiceTypeOpen(key) && StringUtils.equals(apiConfig.getUri(), "*")) {
|
||||
if (isServiceTypeOpen(key)) {
|
||||
APICODE_APPCONFIG_MAP.put(apiConfig.getApiCode(), appConfig);
|
||||
}
|
||||
|
||||
@ -406,7 +409,10 @@ public class AppConfigHandler {
|
||||
}).closeHandler(v -> {
|
||||
log.info(keyCircuitBreaker + " Circuit close");
|
||||
});
|
||||
APICODE_CONFIG_CIRCUIT_BREAKER_MAP.put(keyCircuitBreaker, circuitBreaker);
|
||||
SacCircuitBreaker sacCircuitBreaker = new SacCircuitBreaker();
|
||||
sacCircuitBreaker.setCircuitBreaker(circuitBreaker);
|
||||
sacCircuitBreaker.setStrategy(strategy);
|
||||
APICODE_CONFIG_CIRCUIT_BREAKER_MAP.put(key, sacCircuitBreaker);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -565,19 +571,27 @@ public class AppConfigHandler {
|
||||
Route routeSac = mainHttpRouter.post(rpcUri());
|
||||
routeSac.handler(CorsHandler.create().addRelativeOrigin(".*"))
|
||||
.handler(ParameterCheckHandler.create(GatewayServiceType.SAC))
|
||||
.handler(AppRateLimitHandler.create(rateLimitModel)).handler(ApiRateLimitHandler.create(rateLimitModel))
|
||||
.handler(BodyPreCheckHandler.create()).handler(BodyHandler.create().setHandleFileUploads(false))
|
||||
.handler(BodyPostAnalysisHandler.create()).handler(BodyPostCheckHandler.create())
|
||||
.handler(SacRouteRequestHandler.create(mainWebClient)).handler(ProxyHandler.create(proxy))
|
||||
.handler(AppRateLimitHandler.create(rateLimitModel))
|
||||
.handler(ApiRateLimitHandler.create(rateLimitModel))
|
||||
.handler(BodyPreCheckHandler.create())
|
||||
.handler(BodyHandler.create().setHandleFileUploads(false))
|
||||
.handler(BodyPostAnalysisHandler.create())
|
||||
.handler(BodyPostCheckHandler.create())
|
||||
.handler(SacRouteRequestHandler.create(mainWebClient))
|
||||
.handler(ProxyHandler.create(mainWebClient, proxy))
|
||||
.failureHandler(RestfulFailureHandler.create());
|
||||
|
||||
Route routeOpen = mainHttpRouter.route();
|
||||
routeOpen.handler(CorsHandler.create().addRelativeOrigin(".*"))
|
||||
.handler(ParameterCheckHandler.create(GatewayServiceType.OPEN))
|
||||
.handler(AppRateLimitHandler.create(rateLimitModel)).handler(ApiRateLimitHandler.create(rateLimitModel))
|
||||
.handler(BodyPreCheckHandler.create()).handler(BodyHandler.create().setHandleFileUploads(false))
|
||||
.handler(BodyPostCheckHandler.create()).handler(SacRouteRequestHandler.create(mainWebClient))
|
||||
.handler(ProxyHandler.create(proxy)).failureHandler(RestfulFailureHandler.create());
|
||||
.handler(AppRateLimitHandler.create(rateLimitModel))
|
||||
.handler(ApiRateLimitHandler.create(rateLimitModel))
|
||||
.handler(BodyPreCheckHandler.create())
|
||||
.handler(BodyHandler.create().setHandleFileUploads(false))
|
||||
.handler(BodyPostCheckHandler.create())
|
||||
.handler(SacRouteRequestHandler.create(mainWebClient))
|
||||
.handler(ProxyHandler.create(mainWebClient, proxy))
|
||||
.failureHandler(RestfulFailureHandler.create());
|
||||
}
|
||||
|
||||
private static void loadVertxOptions(VertxOptions vertxOptions) {
|
||||
@ -590,7 +604,7 @@ public class AppConfigHandler {
|
||||
}
|
||||
|
||||
// TODO
|
||||
blockedThreadCheckInterval = 1000000L;
|
||||
// blockedThreadCheckInterval = 1000000L;
|
||||
if (blockedThreadCheckInterval != -1) {
|
||||
vertxOptions.setBlockedThreadCheckInterval(blockedThreadCheckInterval); // 不打印Thread blocked 阻塞日志
|
||||
}
|
||||
|
@ -134,8 +134,9 @@ public class BodyHandlerImpl implements BodyHandler {
|
||||
// TODO 改造了这个地方 在真正解析body之前验证是否需要继续解析
|
||||
AppConfig appConfig = AppUtils.getAppConfigFromRoutingContext(context);
|
||||
ApiConfig apiConfig = AppUtils.getApiConfigFromRoutingContext(context);
|
||||
String contentType = context.request().headers().get(HttpHeaders.CONTENT_TYPE);
|
||||
String apiServiceType = context.get(API_SERVICE_TYPE);
|
||||
if (!AppUtils.isAnalysisBody(appConfig, apiConfig, apiServiceType)){
|
||||
if (!AppUtils.isAnalysisBody(appConfig, apiConfig, apiServiceType, contentType)){
|
||||
context.next();
|
||||
return;
|
||||
}
|
||||
|
@ -1,14 +1,3 @@
|
||||
/*
|
||||
* Copyright (c) 2011-2021 Contributors to the Eclipse Foundation
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
*/
|
||||
|
||||
package com.sf.vertx.handle;
|
||||
|
||||
import com.sf.vertx.httpproxy.HttpProxy;
|
||||
@ -16,6 +5,7 @@ import com.sf.vertx.httpproxy.HttpProxy;
|
||||
import io.vertx.codegen.annotations.VertxGen;
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
import io.vertx.ext.web.client.WebClient;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:emad.albloushi@gmail.com">Emad Alblueshi</a>
|
||||
@ -24,6 +14,10 @@ import io.vertx.ext.web.RoutingContext;
|
||||
@VertxGen
|
||||
public interface ProxyHandler extends Handler<RoutingContext> {
|
||||
|
||||
static ProxyHandler create(WebClient mainWebClient, HttpProxy httpProxy) {
|
||||
return new ProxyHandlerImpl(mainWebClient, httpProxy);
|
||||
}
|
||||
|
||||
static ProxyHandler create(HttpProxy httpProxy) {
|
||||
return new ProxyHandlerImpl(httpProxy);
|
||||
}
|
||||
|
@ -3,14 +3,20 @@ package com.sf.vertx.handle;
|
||||
import com.sf.vertx.httpproxy.HttpProxy;
|
||||
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
import io.vertx.ext.web.client.WebClient;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:emad.albloushi@gmail.com">Emad Alblueshi</a>
|
||||
*/
|
||||
|
||||
public class ProxyHandlerImpl implements ProxyHandler {
|
||||
|
||||
private final HttpProxy httpProxy;
|
||||
private WebClient mainWebClient;
|
||||
|
||||
public ProxyHandlerImpl(WebClient mainWebClient, HttpProxy httpProxy) {
|
||||
this.httpProxy = httpProxy;
|
||||
this.mainWebClient = mainWebClient;
|
||||
}
|
||||
|
||||
public ProxyHandlerImpl(HttpProxy httpProxy) {
|
||||
this.httpProxy = httpProxy;
|
||||
@ -22,6 +28,9 @@ public class ProxyHandlerImpl implements ProxyHandler {
|
||||
|
||||
@Override
|
||||
public void handle(RoutingContext ctx) {
|
||||
httpProxy.handle(ctx.request());
|
||||
// TODO 改造了这个地方
|
||||
httpProxy.handle(mainWebClient, ctx);
|
||||
// 原始代码只有如下一句
|
||||
// httpProxy.handle(ctx.request());
|
||||
}
|
||||
}
|
@ -1,8 +1,9 @@
|
||||
package com.sf.vertx.handle;
|
||||
|
||||
import static com.sf.vertx.constans.SACConstants.API_SERVICE_TYPE;
|
||||
import static com.sf.vertx.constans.SACConstants.CACHE_KEY_CONNECTOR;
|
||||
import static org.apache.commons.lang3.StringUtils.EMPTY;
|
||||
import static com.sf.vertx.constans.SACConstants.API_SERVICE_TYPE_SAC;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@ -13,13 +14,16 @@ import com.sf.vertx.api.pojo.AppConfig;
|
||||
import com.sf.vertx.enums.GatewayError;
|
||||
import com.sf.vertx.enums.RequestMethod;
|
||||
import com.sf.vertx.exception.ServiceException;
|
||||
import com.sf.vertx.pojo.SacCircuitBreaker;
|
||||
import com.sf.vertx.utils.AppUtils;
|
||||
import com.sf.vertx.utils.ProxyTool;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import io.vertx.circuitbreaker.CircuitBreaker;
|
||||
import io.vertx.circuitbreaker.OpenCircuitException;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.http.HttpHeaders;
|
||||
import io.vertx.core.impl.NoStackTraceThrowable;
|
||||
import io.vertx.core.json.JsonObject;
|
||||
import io.vertx.core.net.SocketAddress;
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
@ -36,10 +40,10 @@ public class SacRouteRequestHandlerImpl implements SacRouteRequestHandler {
|
||||
this.mainWebClient = mainWebClient;
|
||||
}
|
||||
|
||||
private void breakerCall(RoutingContext ctx, CircuitBreaker circuitBreaker, HttpRequest<Buffer> requestBuffer,
|
||||
private void breakerCall(RoutingContext ctx, SacCircuitBreaker sacCircuitBreaker, HttpRequest<Buffer> requestBuffer,
|
||||
String body) {
|
||||
String appCode = AppUtils.getAppConfigFromRoutingContext(ctx).getAppCode();
|
||||
circuitBreaker.executeWithFallback(promise -> {
|
||||
sacCircuitBreaker.getCircuitBreaker().executeWithFallback(promise -> {
|
||||
requestBuffer.sendJson(body, h -> {
|
||||
if (h.succeeded()) {
|
||||
log.info("==========uri:{},response http code:{}", ctx.request().uri(), h.result().statusCode());
|
||||
@ -52,18 +56,25 @@ public class SacRouteRequestHandlerImpl implements SacRouteRequestHandler {
|
||||
promise.fail("2");
|
||||
}
|
||||
JsonObject responseData = h.result().bodyAsJsonObject();
|
||||
log.info("responseData:{}", responseData);
|
||||
//log.info("responseData:{}", responseData);
|
||||
Buffer buffer;
|
||||
// 加密
|
||||
if (AppConfigHandler.isDataSecurity(appCode)) {
|
||||
String dataStr = AppConfigHandler.bodyEncrypt(responseData.toString(), appCode);
|
||||
log.info("encrypt dataStr:{}", dataStr);
|
||||
//log.info("encrypt dataStr:{}", dataStr);
|
||||
buffer = Buffer.buffer(dataStr);
|
||||
} else {
|
||||
buffer = responseData.toBuffer();
|
||||
}
|
||||
ctx.response().setChunked(true).setStatusCode(200).putHeader("Content-Type", "application/json")
|
||||
.end(buffer);
|
||||
|
||||
Integer statusCode = h.result().statusCode() > 0 ? h.result().statusCode() : 200;
|
||||
// TODO 和客户端组件沟通
|
||||
// String contentType = h.result().headers().get(HttpHeaders.CONTENT_TYPE) != null
|
||||
// ? h.result().headers().get(HttpHeaders.CONTENT_TYPE)
|
||||
// : "application/json";
|
||||
String contentType = "application/json";
|
||||
ctx.response().setChunked(true).setStatusCode(statusCode)
|
||||
.putHeader(HttpHeaders.CONTENT_TYPE, contentType).end(buffer);
|
||||
} else {
|
||||
// end(proxyRequest, 502);
|
||||
// Throwable throwable = new Throwable("error port");
|
||||
@ -72,18 +83,27 @@ public class SacRouteRequestHandlerImpl implements SacRouteRequestHandler {
|
||||
}
|
||||
});
|
||||
}, v -> {
|
||||
// 需要传递当前状态half-open , close, 还是统计失败次数
|
||||
log.info(circuitBreaker.name() + " executed when the circuit is opened:{}", v.getMessage());
|
||||
log.info(sacCircuitBreaker.getCircuitBreaker().name() + " executed when the circuit is opened:{}", v);
|
||||
if (v instanceof OpenCircuitException) {
|
||||
// v value is null, default return 3
|
||||
log.info(sacCircuitBreaker.getCircuitBreaker().name() + " open circuit");
|
||||
} else if (v instanceof NoStackTraceThrowable) {
|
||||
log.info(sacCircuitBreaker.getCircuitBreaker().name() + " close circuit");
|
||||
return v.getMessage();
|
||||
} else {
|
||||
log.info(sacCircuitBreaker.getCircuitBreaker().name() + " half open circuit");
|
||||
return v.getMessage();
|
||||
}
|
||||
return "3";
|
||||
}, ar -> {
|
||||
log.info(circuitBreaker.name() + " interface failed result.{} ", ar);
|
||||
log.info(sacCircuitBreaker.getCircuitBreaker().name() + " interface failed result.{} ", ar);
|
||||
if (StringUtils.equals(ar.result(), "3")) { // 全开,熔断
|
||||
ctx.fail(new ServiceException(GatewayError.BAD_GATEWAY));
|
||||
ctx.fail(new ServiceException(GatewayError.REQUEST_URL_IS_BROKEN, sacCircuitBreaker.getStrategy().getDefaultResponse()));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void jsonCall(RoutingContext ctx, HttpRequest<Buffer> requestBuffer, String body) {
|
||||
private void normalCall(RoutingContext ctx, HttpRequest<Buffer> requestBuffer, String body) {
|
||||
String appCode = AppUtils.getAppConfigFromRoutingContext(ctx).getAppCode();
|
||||
requestBuffer.sendJson(body, h -> {
|
||||
if (h.succeeded()) {
|
||||
@ -131,7 +151,7 @@ public class SacRouteRequestHandlerImpl implements SacRouteRequestHandler {
|
||||
}
|
||||
requestBuffer.timeout(apiConfig.getTimeout()); // 超时
|
||||
requestBuffer.putHeaders(ctx.request().headers());
|
||||
// 处理sac请求方式
|
||||
// 处理sac请求方式/{a}/{b}
|
||||
RequestMethod requestMethod = RequestMethod.getByCode(ctx.request().method().name());
|
||||
if (RequestMethod.GET.equals(requestMethod) || RequestMethod.DELETE.equals(requestMethod)
|
||||
|| RequestMethod.HEAD.equals(requestMethod)) {
|
||||
@ -154,19 +174,20 @@ public class SacRouteRequestHandlerImpl implements SacRouteRequestHandler {
|
||||
// 判断body是否解析
|
||||
AppConfig appConfig = AppUtils.getAppConfigFromRoutingContext(ctx);
|
||||
ApiConfig apiConfig = AppUtils.getApiConfigFromRoutingContext(ctx);
|
||||
String apiServiceType = ctx.get(API_SERVICE_TYPE);
|
||||
String contentType = ctx.request().headers().get(HttpHeaders.CONTENT_TYPE);
|
||||
if (AppUtils.isAnalysisBody(appConfig, apiConfig, contentType)) {
|
||||
String keyCircuitBreaker = appConfig.getAppCode() + ":" + apiConfig.getApiCode() + ":" + "CIRCUIT_BREAKER";
|
||||
CircuitBreaker circuitBreaker = AppConfigHandler.getApiCodeCircuitBreaker(keyCircuitBreaker);
|
||||
if (AppUtils.isAnalysisBody(appConfig, apiConfig, apiServiceType, contentType)) {
|
||||
String keyCircuitBreaker = appConfig.getAppCode() + CACHE_KEY_CONNECTOR + apiConfig.getApiCode();
|
||||
SacCircuitBreaker sacCircuitBreaker = AppConfigHandler.getApiCodeCircuitBreaker(keyCircuitBreaker);
|
||||
String body = AppUtils.isEnableDataSecurity(appConfig)
|
||||
? AppConfigHandler.bodyDecrypt(ctx.body().asString(), appConfig.getAppCode())
|
||||
: ctx.body().asString();
|
||||
HttpRequest<Buffer> requestBuffer = buildCallRequestBuffer(ctx, body, apiConfig);
|
||||
try {
|
||||
if (circuitBreaker != null) {
|
||||
breakerCall(ctx, circuitBreaker, requestBuffer, body);
|
||||
if (sacCircuitBreaker != null && sacCircuitBreaker.getCircuitBreaker() != null) {
|
||||
breakerCall(ctx, sacCircuitBreaker, requestBuffer, body);
|
||||
} else {
|
||||
jsonCall(ctx, requestBuffer, body);
|
||||
normalCall(ctx, requestBuffer, body);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
|
@ -10,6 +10,11 @@
|
||||
*/
|
||||
package com.sf.vertx.httpproxy;
|
||||
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
|
||||
import com.sf.vertx.httpproxy.impl.ReverseProxy;
|
||||
|
||||
import io.vertx.codegen.annotations.Fluent;
|
||||
import io.vertx.codegen.annotations.GenIgnore;
|
||||
import io.vertx.codegen.annotations.VertxGen;
|
||||
@ -20,13 +25,11 @@ import io.vertx.core.http.HttpClientRequest;
|
||||
import io.vertx.core.http.HttpServerRequest;
|
||||
import io.vertx.core.http.RequestOptions;
|
||||
import io.vertx.core.net.SocketAddress;
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
import io.vertx.ext.web.client.WebClient;
|
||||
import io.vertx.httpproxy.ProxyInterceptor;
|
||||
import io.vertx.httpproxy.ProxyOptions;
|
||||
|
||||
import com.sf.vertx.httpproxy.impl.ReverseProxy;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Handles the HTTP reverse proxy logic between the <i><b>user agent</b></i> and the <i><b>origin</b></i>.
|
||||
* <p>
|
||||
@ -118,4 +121,5 @@ public interface HttpProxy extends Handler<HttpServerRequest> {
|
||||
*/
|
||||
void handle(HttpServerRequest request);
|
||||
|
||||
void handle(WebClient mainWebClient, RoutingContext ctx);
|
||||
}
|
||||
|
@ -10,6 +10,8 @@
|
||||
*/
|
||||
package com.sf.vertx.httpproxy.impl;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.EMPTY;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@ -17,11 +19,21 @@ import java.util.ListIterator;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import com.sf.vertx.enums.GatewayError;
|
||||
import com.sf.vertx.enums.RequestMethod;
|
||||
import com.sf.vertx.handle.AppConfigHandler;
|
||||
import com.sf.vertx.httpproxy.HttpProxy;
|
||||
import com.sf.vertx.utils.AppUtils;
|
||||
import com.sf.vertx.utils.ProxyTool;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import io.vertx.circuitbreaker.CircuitBreaker;
|
||||
import io.vertx.circuitbreaker.OpenCircuitException;
|
||||
import io.vertx.core.Future;
|
||||
import io.vertx.core.Promise;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.http.HttpClient;
|
||||
import io.vertx.core.http.HttpClientRequest;
|
||||
import io.vertx.core.http.HttpClientResponse;
|
||||
@ -29,7 +41,14 @@ import io.vertx.core.http.HttpHeaders;
|
||||
import io.vertx.core.http.HttpMethod;
|
||||
import io.vertx.core.http.HttpServerRequest;
|
||||
import io.vertx.core.http.HttpServerResponse;
|
||||
import io.vertx.core.impl.NoStackTraceThrowable;
|
||||
import io.vertx.core.json.JsonObject;
|
||||
import io.vertx.core.net.NetSocket;
|
||||
import io.vertx.core.net.SocketAddress;
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
import io.vertx.ext.web.client.HttpRequest;
|
||||
import io.vertx.ext.web.client.WebClient;
|
||||
import io.vertx.httpproxy.Body;
|
||||
import io.vertx.httpproxy.ProxyContext;
|
||||
import io.vertx.httpproxy.ProxyInterceptor;
|
||||
import io.vertx.httpproxy.ProxyOptions;
|
||||
@ -37,13 +56,18 @@ import io.vertx.httpproxy.ProxyRequest;
|
||||
import io.vertx.httpproxy.ProxyResponse;
|
||||
import io.vertx.httpproxy.cache.CacheOptions;
|
||||
import io.vertx.httpproxy.spi.cache.Cache;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class ReverseProxy implements HttpProxy {
|
||||
|
||||
private final HttpClient client;
|
||||
private final boolean supportWebSocket;
|
||||
private BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> selector = (req, client) -> Future.failedFuture("No origin available");
|
||||
private BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> selector = (req, client) -> Future
|
||||
.failedFuture("No origin available");
|
||||
private final List<ProxyInterceptor> interceptors = new ArrayList<>();
|
||||
private RoutingContext ctx;
|
||||
private WebClient mainWebClient;
|
||||
|
||||
public ReverseProxy(ProxyOptions options, HttpClient client) {
|
||||
CacheOptions cacheOptions = options.getCacheOptions();
|
||||
@ -56,7 +80,8 @@ public class ReverseProxy implements HttpProxy {
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpProxy originRequestProvider(BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> provider) {
|
||||
public HttpProxy originRequestProvider(
|
||||
BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> provider) {
|
||||
selector = provider;
|
||||
return this;
|
||||
}
|
||||
@ -67,6 +92,13 @@ public class ReverseProxy implements HttpProxy {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(WebClient mainWebClient, RoutingContext ctx) {
|
||||
// TODO 改造了这个地方
|
||||
this.ctx = ctx;
|
||||
this.mainWebClient = mainWebClient;
|
||||
handle(ctx.request());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(HttpServerRequest request) {
|
||||
@ -141,13 +173,17 @@ public class ReverseProxy implements HttpProxy {
|
||||
});
|
||||
}
|
||||
|
||||
/***
|
||||
* 返回错误
|
||||
*
|
||||
* @param proxyRequest
|
||||
* @param sc
|
||||
*/
|
||||
private void end(ProxyRequest proxyRequest, int sc) {
|
||||
proxyRequest
|
||||
.response()
|
||||
.release()
|
||||
.setStatusCode(sc)
|
||||
.putHeader(HttpHeaders.CONTENT_LENGTH, "0")
|
||||
.setBody(null)
|
||||
JsonObject json = AppUtils.getResponseJsonByGatewayError(GatewayError.getByCode(sc));
|
||||
proxyRequest.response().release().setStatusCode(500).putHeader("content-type", "application/json")
|
||||
.putHeader(AppConfigHandler.sacResponseHeaderKey(), String.valueOf(sc))
|
||||
.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(json.size())).setBody(Body.body(json.toBuffer()))
|
||||
.send();
|
||||
}
|
||||
|
||||
@ -207,9 +243,175 @@ public class ReverseProxy implements HttpProxy {
|
||||
}
|
||||
}
|
||||
|
||||
private Future<ProxyResponse> breakerCall(CircuitBreaker circuitBreaker, ProxyRequest proxyRequest,
|
||||
HttpRequest<Buffer> requestBuffer, String body) {
|
||||
String appCode = proxyRequest.headers().get(AppConfigHandler.getAppCodeHeaderKey());
|
||||
return Future.future(p -> {
|
||||
circuitBreaker.executeWithFallback(promise -> {
|
||||
requestBuffer.sendJson(body, h -> {
|
||||
if (h.succeeded()) {
|
||||
log.info("==========uri:{},response http code:{}", proxyRequest.getURI(),
|
||||
h.result().statusCode());
|
||||
if (h.result().statusCode() == 200) {
|
||||
// promise.complete();
|
||||
promise.complete("1");
|
||||
} else {
|
||||
// Throwable throwable = new Throwable("error port");
|
||||
// promise.fail(throwable);
|
||||
promise.fail("2");
|
||||
}
|
||||
// 释放资源
|
||||
proxyRequest.release();
|
||||
JsonObject responseData = h.result().bodyAsJsonObject();
|
||||
log.info("responseData:{}", responseData);
|
||||
Buffer buffer;
|
||||
// 加密
|
||||
if (AppConfigHandler.isDataSecurity(appCode)) {
|
||||
String dataStr = AppConfigHandler.bodyEncrypt(responseData.toString(), appCode);
|
||||
log.info("encrypt dataStr:{}", dataStr);
|
||||
buffer = Buffer.buffer(dataStr);
|
||||
} else {
|
||||
buffer = responseData.toBuffer();
|
||||
}
|
||||
ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200)
|
||||
.putHeader("content-type", "application/json").setBody(Body.body(buffer));
|
||||
p.complete(proxyResponse);
|
||||
} else {
|
||||
// end(proxyRequest, 502);
|
||||
// Throwable throwable = new Throwable("error port");
|
||||
// promise.fail(throwable);
|
||||
promise.fail("3");
|
||||
}
|
||||
});
|
||||
}, v -> {
|
||||
log.info(circuitBreaker.name() + " executed when the circuit is opened:{}", v);
|
||||
if (v instanceof OpenCircuitException) {
|
||||
log.info(circuitBreaker.name() + " open circuit");
|
||||
} else if (v instanceof NoStackTraceThrowable) {
|
||||
log.info(circuitBreaker.name() + " close circuit");
|
||||
return v.getMessage();
|
||||
} else {
|
||||
log.info(circuitBreaker.name() + " half open circuit");
|
||||
return v.getMessage();
|
||||
}
|
||||
return "3";
|
||||
}, ar -> {
|
||||
log.info(circuitBreaker.name() + " interface failed result.{} ", ar);
|
||||
if (StringUtils.equals(ar.result(), "3")) { // 全开,熔断
|
||||
end(proxyRequest, 10016);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private Future<ProxyResponse> normalCall(ProxyRequest proxyRequest, HttpRequest<Buffer> requestBuffer,
|
||||
String body) {
|
||||
String appCode = proxyRequest.headers().get(AppConfigHandler.getAppCodeHeaderKey());
|
||||
return Future.future(p -> {
|
||||
requestBuffer.sendJson(body, h -> {
|
||||
if (h.succeeded()) {
|
||||
log.info("==========uri:{},response http code:{}", proxyRequest.getURI(),
|
||||
h.result().statusCode());
|
||||
if (h.result().statusCode() == 200) {
|
||||
// 释放资源
|
||||
proxyRequest.release();
|
||||
JsonObject responseData = h.result().bodyAsJsonObject();
|
||||
log.info("responseData:{}", responseData);
|
||||
Buffer bodyBuffer = responseData.toBuffer();
|
||||
// 加密
|
||||
if (AppConfigHandler.isDataSecurity(appCode)) {
|
||||
String dataStr = AppConfigHandler.bodyEncrypt(responseData.toString(), appCode);
|
||||
log.info("encrypt dataStr:{}", dataStr);
|
||||
bodyBuffer = Buffer.buffer(dataStr);
|
||||
}
|
||||
ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200)
|
||||
.putHeader("content-type", "application/json").setBody(Body.body(bodyBuffer));
|
||||
p.complete(proxyResponse);
|
||||
}
|
||||
} else {
|
||||
log.info("interface retrun error.{}", proxyRequest.getURI());
|
||||
end(proxyRequest, 502);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private HttpRequest<Buffer> buildCallRequestBuffer(ProxyRequest proxyRequest, String body) {
|
||||
String appCode = proxyRequest.headers().get(AppConfigHandler.getAppCodeHeaderKey());
|
||||
String apiCode = proxyRequest.headers().get(AppConfigHandler.getApiCodeHeaderKey());
|
||||
String key = appCode + ":" + apiCode;
|
||||
SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest());
|
||||
HttpRequest<Buffer> requestBuffer;
|
||||
String requestURI = proxyRequest.getURI();
|
||||
switch (proxyRequest.getMethod().name()) {
|
||||
case "PUT":
|
||||
requestBuffer = mainWebClient.put(socketAddress.port(), socketAddress.host(), requestURI);
|
||||
break;
|
||||
case "DELETE":
|
||||
requestBuffer = mainWebClient.delete(socketAddress.port(), socketAddress.host(), requestURI);
|
||||
break;
|
||||
case "HEAD":
|
||||
requestBuffer = mainWebClient.head(socketAddress.port(), socketAddress.host(), requestURI);
|
||||
break;
|
||||
case "GET":
|
||||
requestBuffer = mainWebClient.get(socketAddress.port(), socketAddress.host(), requestURI);
|
||||
break;
|
||||
default:
|
||||
requestBuffer = mainWebClient.post(socketAddress.port(), socketAddress.host(), requestURI);
|
||||
break;
|
||||
}
|
||||
requestBuffer.timeout(AppConfigHandler.getApicodeConfigTimeOut(key));
|
||||
requestBuffer.putHeaders(proxyRequest.headers());
|
||||
// 处理sac请求方式
|
||||
RequestMethod requestMethod = RequestMethod.getByCode(proxyRequest.getMethod().name());
|
||||
if (RequestMethod.GET.equals(requestMethod) || RequestMethod.DELETE.equals(requestMethod)
|
||||
|| RequestMethod.HEAD.equals(requestMethod)) {
|
||||
JsonObject jsonBody = new JsonObject(body);
|
||||
Map<String, String> queryParams = new HashMap<>();
|
||||
for (Map.Entry<String, Object> entry : jsonBody) {
|
||||
String val = entry.getValue() != null ? entry.getValue().toString() : EMPTY;
|
||||
queryParams.put(entry.getKey(), val);
|
||||
requestURI = StrUtil.replace(requestURI, "{" + entry.getKey() + "}", val);
|
||||
}
|
||||
requestBuffer.uri(requestURI);
|
||||
requestBuffer.queryParams().addAll(queryParams);
|
||||
}
|
||||
return requestBuffer;
|
||||
}
|
||||
|
||||
private Future<ProxyResponse> sendProxyRequest(ProxyRequest proxyRequest) {
|
||||
// 判断<br/>
|
||||
// 1、是否配置全局加解密.<br/>
|
||||
// 2、apiCode 配置熔断
|
||||
// AppConfig appConfig = AppUtils.getAppConfigFromRoutingContext(ctx);
|
||||
// ApiConfig apiConfig = AppUtils.getApiConfigFromRoutingContext(ctx);
|
||||
// String apiServiceType = ctx.get(API_SERVICE_TYPE);
|
||||
// String contentType = ctx.request().headers().get(HttpHeaders.CONTENT_TYPE);
|
||||
// String keyCircuitBreaker = appConfig.getAppCode() + ":" + apiConfig.getApiCode() + ":" + "CIRCUIT_BREAKER";
|
||||
// CircuitBreaker circuitBreaker = AppConfigHandler.getApiCodeCircuitBreaker(keyCircuitBreaker);
|
||||
// if (AppUtils.isAnalysisBody(appConfig, apiConfig, apiServiceType, contentType)) {
|
||||
// String body = AppConfigHandler.isDataSecurity(appConfig.getAppCode())
|
||||
// ? AppConfigHandler.bodyDecrypt(ctx.body().asString(), appConfig.getAppCode())
|
||||
// : ctx.body().asString();
|
||||
// HttpRequest<Buffer> requestBuffer = buildCallRequestBuffer(proxyRequest, body);
|
||||
// try {
|
||||
// if (circuitBreaker != null) {
|
||||
// return breakerCall(circuitBreaker, proxyRequest, requestBuffer, body);
|
||||
// } else {
|
||||
// return normalCall(proxyRequest, requestBuffer, body);
|
||||
// }
|
||||
// } catch (Exception e) {
|
||||
// e.printStackTrace();
|
||||
// int errorCode = 10014;
|
||||
// if (e instanceof HttpException) {
|
||||
// errorCode = ((HttpException) e).getStatusCode();
|
||||
// }
|
||||
// throw new HttpException(errorCode);
|
||||
// }
|
||||
// } else {
|
||||
Future<HttpClientRequest> f = resolveOrigin(proxyRequest.proxiedRequest());
|
||||
f.onFailure(err -> {
|
||||
log.info("error:", err);
|
||||
// Should this be done here ? I don't think so
|
||||
HttpServerRequest proxiedRequest = proxyRequest.proxiedRequest();
|
||||
proxiedRequest.resume();
|
||||
@ -221,21 +423,24 @@ public class ReverseProxy implements HttpProxy {
|
||||
});
|
||||
});
|
||||
return f.compose(a -> sendProxyRequest(proxyRequest, a));
|
||||
}
|
||||
// }
|
||||
|
||||
private void setConnectionTimeout(ProxyRequest proxyRequest, HttpClientRequest request) {
|
||||
String appCode = proxyRequest.headers().get(AppConfigHandler.getAppCodeHeaderKey());
|
||||
String apiCode = proxyRequest.headers().get(AppConfigHandler.getApiCodeHeaderKey());
|
||||
String key = appCode + ":" + apiCode;
|
||||
request.idleTimeout(AppConfigHandler.getApicodeConfigTimeOut(key));
|
||||
}
|
||||
|
||||
private Future<ProxyResponse> sendProxyRequest(ProxyRequest proxyRequest, HttpClientRequest request) {
|
||||
String appCode = proxyRequest.headers().get(AppConfigHandler.getAppCodeHeaderKey());
|
||||
String apiCode = proxyRequest.headers().get(AppConfigHandler.getApiCodeHeaderKey());
|
||||
String key = appCode + ":" + apiCode;
|
||||
request.setTimeout(AppConfigHandler.getApicodeConfigTimeOut(key));
|
||||
Future<ProxyResponse> fut = proxyRequest.send(request);
|
||||
// 超时
|
||||
setConnectionTimeout(proxyRequest, request);
|
||||
fut.onFailure(err -> {
|
||||
proxyRequest.proxiedRequest().response().setStatusCode(502).end();
|
||||
JsonObject errorJson = AppUtils.getResponseJsonByGatewayError(GatewayError.BAD_GATEWAY);
|
||||
proxyRequest.response().release().setStatusCode(500).putHeader("content-type", "application/json")
|
||||
.putHeader(AppConfigHandler.sacResponseHeaderKey(),
|
||||
String.valueOf(GatewayError.BAD_GATEWAY.getCode()))
|
||||
.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(errorJson.size()))
|
||||
.setBody(Body.body(errorJson.toBuffer())).send();
|
||||
// proxyRequest.proxiedRequest().response().setStatusCode(502).end();
|
||||
});
|
||||
return fut;
|
||||
}
|
||||
@ -254,6 +459,7 @@ public class ReverseProxy implements HttpProxy {
|
||||
|
||||
return sendResponse();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,13 @@
|
||||
package com.sf.vertx.pojo;
|
||||
|
||||
import com.sf.vertx.api.pojo.Strategy;
|
||||
|
||||
import io.vertx.circuitbreaker.CircuitBreaker;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class SacCircuitBreaker {
|
||||
|
||||
private CircuitBreaker circuitBreaker;
|
||||
private Strategy strategy;
|
||||
}
|
File diff suppressed because one or more lines are too long
@ -1,5 +1,6 @@
|
||||
package com.sf.vertx.utils;
|
||||
|
||||
import static com.sf.vertx.constans.SACConstants.CACHE_KEY_CONNECTOR;
|
||||
import static com.sf.vertx.constans.SACConstants.API_CONFIG;
|
||||
import static com.sf.vertx.constans.SACConstants.API_SERVICE_TYPE;
|
||||
import static com.sf.vertx.constans.SACConstants.APP_CONFIG;
|
||||
@ -18,10 +19,14 @@ import com.sf.vertx.api.pojo.MockResponse;
|
||||
import com.sf.vertx.enums.GatewayError;
|
||||
import com.sf.vertx.enums.GatewayServiceType;
|
||||
import com.sf.vertx.enums.MatchType;
|
||||
import com.sf.vertx.enums.RequestMethod;
|
||||
import com.sf.vertx.exception.ServiceException;
|
||||
import com.sf.vertx.handle.AppConfigHandler;
|
||||
import com.sf.vertx.pojo.SacCircuitBreaker;
|
||||
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.util.NumberUtil;
|
||||
import io.vertx.circuitbreaker.CircuitBreaker;
|
||||
import io.vertx.core.json.JsonObject;
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
import lombok.experimental.UtilityClass;
|
||||
@ -250,7 +255,12 @@ public class AppUtils {
|
||||
return getResponseJsonByGatewayError(GatewayError.DEFAULT_SERVICE_ERROR);
|
||||
}
|
||||
|
||||
public static boolean isAnalysisBody(AppConfig appConfig, ApiConfig apiConfig, String contentType) {
|
||||
public static boolean isAnalysisBody(AppConfig appConfig, ApiConfig apiConfig,String apiServiceType, String contentType) {
|
||||
// 不是Mock(是mock模式的话,存在body就解析。)
|
||||
if (AppUtils.isEnableMockApi(apiConfig)) {
|
||||
return true;
|
||||
}
|
||||
// 文件上传不走加解密
|
||||
if (StringUtils.startsWith(contentType, "multipart")) {
|
||||
return false;
|
||||
}
|
||||
@ -260,6 +270,20 @@ public class AppUtils {
|
||||
return false;
|
||||
}
|
||||
|
||||
return isEnableDataSecurity(appConfig);
|
||||
// SAC模式,实际API请求方式GET,DELETE,HEAD请求需要解析body,可能需要二次处理。(SAC模式请求方式参数统一使用body传递)
|
||||
// SAC模式,如果是POST或者PUT,并且没有设置数据加密则跳过
|
||||
if (AppUtils.isSACServiceType(apiServiceType)) {
|
||||
RequestMethod requestMethod = RequestMethod.getByCode(apiConfig.getMethod());
|
||||
if (RequestMethod.GET.equals(requestMethod)
|
||||
|| RequestMethod.DELETE.equals(requestMethod)
|
||||
|| RequestMethod.HEAD.equals(requestMethod)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
String keyCircuitBreaker = appConfig.getAppCode() + CACHE_KEY_CONNECTOR + apiConfig.getApiCode();
|
||||
SacCircuitBreaker sacCircuitBreaker = AppConfigHandler.getApiCodeCircuitBreaker(keyCircuitBreaker);
|
||||
boolean isDataSecurity = AppUtils.isEnableDataSecurity(appConfig);
|
||||
return (isDataSecurity || sacCircuitBreaker != null);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user