反向代理-接口连接超时

"apiConfig": [
                {
                    "apiCode": "sac6872cb7daf0249e998f11f4b7686f548",
                    "method": "POST",
                    "strategy": [],
                    "timeout": 300,
                    "uri": "*"
                },

ReverseProxy setConnectionTimeout方法
This commit is contained in:
ztzh_xieyun 2024-05-31 16:15:08 +08:00
parent 49a797870e
commit 17da90448e
22 changed files with 1618 additions and 182 deletions

View File

@ -6,14 +6,14 @@ public class SACConstants {
public static final String APP_CONFIG = "appConfig"; public static final String APP_CONFIG = "appConfig";
public static final String MOCK_PRE = "mockPre";
public static final String MOCK_POST = "mockPost";
public static final String API_CONFIG = "apiConfig"; public static final String API_CONFIG = "apiConfig";
public static final String API_SERVICE_TYPE = "apiServiceType"; public static final String API_SERVICE_TYPE = "apiServiceType";
public static final String API_SERVICE_TYPE_OPEN = "OPEN";
public static final String API_SERVICE_TYPE_SAC = "SAC";
public static final String CIRCUIT_BREAKER = "CIRCUIT_BREAKER"; public static final String CIRCUIT_BREAKER = "CIRCUIT_BREAKER";
/** /**

View File

@ -1,9 +1,5 @@
package com.sf.vertx.handle; package com.sf.vertx.handle;
import static com.sf.vertx.constans.SACConstants.CACHE_KEY_CONNECTOR;
import static com.sf.vertx.constans.SACConstants.MOCK_POST;
import static com.sf.vertx.constans.SACConstants.MOCK_PRE;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -11,7 +7,6 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
@ -25,8 +20,6 @@ import com.hazelcast.config.TcpIpConfig;
import com.sf.vertx.api.pojo.ApiConfig; import com.sf.vertx.api.pojo.ApiConfig;
import com.sf.vertx.api.pojo.AppConfig; import com.sf.vertx.api.pojo.AppConfig;
import com.sf.vertx.api.pojo.DataSecurity; import com.sf.vertx.api.pojo.DataSecurity;
import com.sf.vertx.api.pojo.MockExpectation;
import com.sf.vertx.api.pojo.MockMatchCondition;
import com.sf.vertx.api.pojo.Node; import com.sf.vertx.api.pojo.Node;
import com.sf.vertx.api.pojo.RouteContent; import com.sf.vertx.api.pojo.RouteContent;
import com.sf.vertx.api.pojo.SacService; import com.sf.vertx.api.pojo.SacService;
@ -35,6 +28,7 @@ import com.sf.vertx.api.pojo.VertxConfig;
import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing; import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing;
import com.sf.vertx.constans.RedisKeyConfig; import com.sf.vertx.constans.RedisKeyConfig;
import com.sf.vertx.enums.GatewayServiceType; import com.sf.vertx.enums.GatewayServiceType;
import com.sf.vertx.httpproxy.HttpProxy;
import com.sf.vertx.init.SacVertxConfig; import com.sf.vertx.init.SacVertxConfig;
import com.sf.vertx.pojo.ClusterEventMsg; import com.sf.vertx.pojo.ClusterEventMsg;
import com.sf.vertx.pojo.SacCurrentLimiting; import com.sf.vertx.pojo.SacCurrentLimiting;
@ -50,7 +44,6 @@ import io.vertx.core.Future;
import io.vertx.core.Vertx; import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions; import io.vertx.core.VertxOptions;
import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions; import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.net.JksOptions; import io.vertx.core.net.JksOptions;
@ -60,8 +53,6 @@ import io.vertx.ext.web.Router;
import io.vertx.ext.web.client.WebClient; import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.handler.CorsHandler; import io.vertx.ext.web.handler.CorsHandler;
import io.vertx.ext.web.handler.HttpException; import io.vertx.ext.web.handler.HttpException;
import io.vertx.ext.web.proxy.handler.ProxyHandler;
import io.vertx.httpproxy.HttpProxy;
import io.vertx.httpproxy.ProxyContext; import io.vertx.httpproxy.ProxyContext;
import io.vertx.httpproxy.ProxyInterceptor; import io.vertx.httpproxy.ProxyInterceptor;
import io.vertx.httpproxy.ProxyResponse; import io.vertx.httpproxy.ProxyResponse;
@ -80,8 +71,6 @@ public class AppConfigHandler {
public static Vertx VERTX; public static Vertx VERTX;
private static SacVertxConfig sacVertxConfig; private static SacVertxConfig sacVertxConfig;
private static RedisTemplate<String, String> redisTemplate; private static RedisTemplate<String, String> redisTemplate;
public static CircuitBreaker CONNECTION_CIRCUIT_BREAKER;
// global cache app config
private static final ConcurrentHashMap<String, AppConfig> CACHE_APP_CONFIG_MAP = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<String, AppConfig> CACHE_APP_CONFIG_MAP = new ConcurrentHashMap<>();
// global api config appCode - RateLimiterRegistry // global api config appCode - RateLimiterRegistry
private static final ConcurrentHashMap<String, SacCurrentLimiting> GLOBAL_API_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<String, SacCurrentLimiting> GLOBAL_API_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>();
@ -112,16 +101,9 @@ public class AppConfigHandler {
// apicode uri = * - appConfig // apicode uri = * - appConfig
private static ConcurrentHashMap<String, AppConfig> APICODE_APPCONFIG_MAP = new ConcurrentHashMap<>(); private static ConcurrentHashMap<String, AppConfig> APICODE_APPCONFIG_MAP = new ConcurrentHashMap<>();
// apiCode:appCode:preMock/postMock
private static ConcurrentHashMap<String, ApiConfig> MOCK_EXPECTATION_MAP = new ConcurrentHashMap<>();
// 禁用appCode // 禁用appCode
private static ConcurrentHashSet<String> DISABLED_APPCODE = new ConcurrentHashSet<String>(); private static ConcurrentHashSet<String> DISABLED_APPCODE = new ConcurrentHashSet<String>();
public static ApiConfig getMockApiConfig(String key) {
return MOCK_EXPECTATION_MAP.get(key);
}
public static AppConfig getAppConfigByDomain(String domain) { public static AppConfig getAppConfigByDomain(String domain) {
return APICODE_APPCONFIG_MAP.get(domain); return APICODE_APPCONFIG_MAP.get(domain);
} }
@ -290,8 +272,6 @@ public class AppConfigHandler {
APICODE_CONFIG_SERVICE_TYPE_MAP.remove(key); APICODE_CONFIG_SERVICE_TYPE_MAP.remove(key);
APICODE_CONFIG_CURRENT_LIMITING_MAP.remove(key); APICODE_CONFIG_CURRENT_LIMITING_MAP.remove(key);
APICODE_APPCONFIG_MAP.remove(apiConfig.getApiCode()); APICODE_APPCONFIG_MAP.remove(apiConfig.getApiCode());
MOCK_EXPECTATION_MAP.remove(key + CACHE_KEY_CONNECTOR + MOCK_PRE);
MOCK_EXPECTATION_MAP.remove(key + CACHE_KEY_CONNECTOR + MOCK_POST);
String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER"; String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER";
CircuitBreaker circuitBreaker = APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(keyCircuitBreaker); CircuitBreaker circuitBreaker = APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(keyCircuitBreaker);
if (circuitBreaker != null) { if (circuitBreaker != null) {
@ -430,25 +410,6 @@ public class AppConfigHandler {
} }
} }
} }
// mock
if (apiConfig.getMockDefaultHttpStatus() != null) {
try {
ApiConfig apiConfigMockPre = new ApiConfig();
ApiConfig apiConfigMockPost = new ApiConfig();
BeanUtils.copyProperties(apiConfigMockPre, apiConfig);
BeanUtils.copyProperties(apiConfigMockPost, apiConfig);
// pre
buidMockCacheData(apiConfigMockPre, key, true);
// post
buidMockCacheData(apiConfigMockPost, key, false);
// local cache
MOCK_EXPECTATION_MAP.put(key + CACHE_KEY_CONNECTOR + MOCK_PRE, apiConfigMockPre);
MOCK_EXPECTATION_MAP.put(key + CACHE_KEY_CONNECTOR + MOCK_POST, apiConfigMockPost);
} catch (Exception e) {
e.printStackTrace();
}
}
} }
} }
@ -456,46 +417,11 @@ public class AppConfigHandler {
} }
} }
private static void buidMockCacheData(ApiConfig apiConfig, String key, boolean isPre) {
List<MockExpectation> mockExpectations = apiConfig.getMockExpectations();
if (mockExpectations != null && !mockExpectations.isEmpty()) {
for (MockExpectation mockExpectation : mockExpectations) {
List<MockMatchCondition> matchConditions = new ArrayList<>();
for (MockMatchCondition matchCondition : mockExpectation.getMatchConditions()) {
switch (matchCondition.getParameterPosition()) {
case "query":
if (isServiceTypeSac(key) && isPre == false) {
// 后置
matchConditions.add(matchCondition);
}
// 前置
if (isPre) {
matchConditions.add(matchCondition);
}
case "header":
// 前置
matchConditions.add(matchCondition);
case "body":
// 后置
if (isPre == false) {
matchConditions.add(matchCondition);
}
default:
break;
}
}
mockExpectation.setMatchConditions(matchConditions);
}
}
}
public static void createVertx() { public static void createVertx() {
// TODO 编解码线程池,后面优化协程等方式 // TODO 编解码线程池,后面优化协程等方式
VertxOptions vertxOptions = new VertxOptions(); VertxOptions vertxOptions = new VertxOptions();
loadVertxOptions(vertxOptions); loadVertxOptions(vertxOptions);
VERTX = Vertx.vertx(vertxOptions); VERTX = Vertx.vertx(vertxOptions);
initConnectionCircuitBreaker();
createVertxRouter(); createVertxRouter();
consumerClusterEventMsg(); consumerClusterEventMsg();
} }
@ -538,7 +464,6 @@ public class AppConfigHandler {
if (res.succeeded()) { if (res.succeeded()) {
VERTX = res.result(); VERTX = res.result();
log.info("hazelcastClusterManager create success"); log.info("hazelcastClusterManager create success");
initConnectionCircuitBreaker();
createVertxRouter(); createVertxRouter();
consumerClusterEventMsg(); consumerClusterEventMsg();
} else { } else {
@ -610,13 +535,6 @@ public class AppConfigHandler {
} }
}); });
// open反向代理
// HttpClientOptions clientOptions = new HttpClientOptions();
// clientOptions.setMaxPoolSize(20); // 最大连接池大小
// clientOptions.setConnectTimeout(2000); // 连接超时 毫秒
// clientOptions.setHttp2KeepAliveTimeout(1);
// clientOptions.setIdleTimeout(1000); // 连接空闲超时 毫秒
// HttpClient proxyClient = VERTX.createHttpClient(clientOptions);
HttpClient proxyClient = VERTX.createHttpClient(); HttpClient proxyClient = VERTX.createHttpClient();
HttpProxy proxy = HttpProxy.reverseProxy(proxyClient); HttpProxy proxy = HttpProxy.reverseProxy(proxyClient);
proxy.originSelector(request -> Future.succeededFuture(ProxyTool.resolveOriginAddress(request))); proxy.originSelector(request -> Future.succeededFuture(ProxyTool.resolveOriginAddress(request)));
@ -627,14 +545,6 @@ public class AppConfigHandler {
// // 会跳转到 RestfulFailureHandlerImpl // // 会跳转到 RestfulFailureHandlerImpl
// throw new HttpException(10003); // throw new HttpException(10003);
// } // }
String appCode = context.request().headers().get(getAppCodeHeaderKey());
String apiCode = context.request().headers().get(getApiCodeHeaderKey());
String key = appCode + ":" + apiCode;
if (isServiceTypeSac(key)) {
String uri = APICODE_CONFIG_MAP.get(key).getUri();
String method = APICODE_CONFIG_MAP.get(key).getMethod();
context.request().setURI(uri).setMethod(HttpMethod.valueOf(method));
}
return context.sendRequest(); return context.sendRequest();
} }
@ -655,45 +565,19 @@ public class AppConfigHandler {
Route routeSac = mainHttpRouter.post(rpcUri()); Route routeSac = mainHttpRouter.post(rpcUri());
routeSac.handler(CorsHandler.create().addRelativeOrigin(".*")) routeSac.handler(CorsHandler.create().addRelativeOrigin(".*"))
.handler(ParameterCheckHandler.create(GatewayServiceType.SAC)) .handler(ParameterCheckHandler.create(GatewayServiceType.SAC))
.handler(AppRateLimitHandler.create(rateLimitModel)) .handler(AppRateLimitHandler.create(rateLimitModel)).handler(ApiRateLimitHandler.create(rateLimitModel))
.handler(ApiRateLimitHandler.create(rateLimitModel)) .handler(BodyPreCheckHandler.create()).handler(BodyHandler.create().setHandleFileUploads(false))
.handler(BodyPreCheckHandler.create()) .handler(BodyPostAnalysisHandler.create()).handler(BodyPostCheckHandler.create())
.handler(BodyHandler.create().setHandleFileUploads(false)) .handler(SacRouteRequestHandler.create(mainWebClient)).handler(ProxyHandler.create(proxy))
.handler(BodyPostAnalysisHandler.create())
.handler(BodyPostCheckHandler.create())
.handler(SacRouteRequestHandler.create(mainWebClient))
.handler(ProxyHandler.create(proxy))
.failureHandler(RestfulFailureHandler.create()); .failureHandler(RestfulFailureHandler.create());
Route routeOpen = mainHttpRouter.route(); Route routeOpen = mainHttpRouter.route();
routeOpen.handler(CorsHandler.create().addRelativeOrigin(".*")) routeOpen.handler(CorsHandler.create().addRelativeOrigin(".*"))
.handler(ParameterCheckHandler.create(GatewayServiceType.OPEN)) .handler(ParameterCheckHandler.create(GatewayServiceType.OPEN))
.handler(AppRateLimitHandler.create(rateLimitModel)) .handler(AppRateLimitHandler.create(rateLimitModel)).handler(ApiRateLimitHandler.create(rateLimitModel))
.handler(ApiRateLimitHandler.create(rateLimitModel)) .handler(BodyPreCheckHandler.create()).handler(BodyHandler.create().setHandleFileUploads(false))
.handler(BodyPreCheckHandler.create()) .handler(BodyPostCheckHandler.create()).handler(SacRouteRequestHandler.create(mainWebClient))
.handler(BodyHandler.create().setHandleFileUploads(false)) .handler(ProxyHandler.create(proxy)).failureHandler(RestfulFailureHandler.create());
.handler(BodyPostCheckHandler.create())
.handler(SacRouteRequestHandler.create(mainWebClient))
.handler(ProxyHandler.create(proxy))
.failureHandler(RestfulFailureHandler.create());
}
/***
* 初始化connection Breaker
*/
private static void initConnectionCircuitBreaker() {
CONNECTION_CIRCUIT_BREAKER = CircuitBreaker.create("connectionCircuitBreaker-circuit-breaker", VERTX,
new CircuitBreakerOptions().setMaxFailures(3) // 最大失败数
.setTimeout(2000) // 超时时间
.setFallbackOnFailure(true) // 失败后是否调用回退函数fallback
.setResetTimeout(10000) // 在开启状态下尝试重试之前所需时间
).openHandler(v -> {
log.info("connectionCircuitBreaker Circuit open");
}).halfOpenHandler(v -> {
log.info("connectionCircuitBreaker Circuit halfOpen");
}).closeHandler(v -> {
log.info("connectionCircuitBreaker Circuit close");
});
} }
private static void loadVertxOptions(VertxOptions vertxOptions) { private static void loadVertxOptions(VertxOptions vertxOptions) {

View File

@ -1,9 +1,6 @@
package com.sf.vertx.handle; package com.sf.vertx.handle;
import static com.sf.vertx.constans.SACConstants.API_SERVICE_TYPE; import static com.sf.vertx.constans.SACConstants.API_SERVICE_TYPE;
import static com.sf.vertx.constans.SACConstants.CACHE_KEY_CONNECTOR;
import static com.sf.vertx.constans.SACConstants.MOCK_POST;
import static com.sf.vertx.constans.SACConstants.MOCK_PRE;
import com.sf.vertx.api.pojo.ApiConfig; import com.sf.vertx.api.pojo.ApiConfig;
import com.sf.vertx.api.pojo.AppConfig; import com.sf.vertx.api.pojo.AppConfig;
@ -18,16 +15,5 @@ public abstract class BaseRoutingContextDataHandler {
AppUtils.setAppConfigToRoutingContext(appConfig, rc); AppUtils.setAppConfigToRoutingContext(appConfig, rc);
AppUtils.setApiConfigIntoRoutingContext(apiConfig, rc); AppUtils.setApiConfigIntoRoutingContext(apiConfig, rc);
rc.put(API_SERVICE_TYPE, AppConfigHandler.getServiceType(apiCacheKey)); rc.put(API_SERVICE_TYPE, AppConfigHandler.getServiceType(apiCacheKey));
// mock
ApiConfig mockPre = AppConfigHandler.getMockApiConfig(apiCacheKey + CACHE_KEY_CONNECTOR + MOCK_PRE);
if (mockPre != null) {
rc.put(MOCK_PRE, mockPre);
}
ApiConfig mockPost = AppConfigHandler.getMockApiConfig(apiCacheKey + CACHE_KEY_CONNECTOR + MOCK_POST);
if (mockPre != null) {
rc.put(MOCK_POST, mockPost);
}
} }
} }

View File

@ -1,6 +1,5 @@
package com.sf.vertx.handle; package com.sf.vertx.handle;
import static com.sf.vertx.constans.SACConstants.MOCK_POST;
import com.sf.vertx.api.pojo.ApiConfig; import com.sf.vertx.api.pojo.ApiConfig;
import com.sf.vertx.api.pojo.MockResponse; import com.sf.vertx.api.pojo.MockResponse;
@ -15,22 +14,20 @@ import lombok.extern.slf4j.Slf4j;
public class BodyPostCheckHandlerImpl implements BodyPostCheckHandler { public class BodyPostCheckHandlerImpl implements BodyPostCheckHandler {
@Override @Override
public void handle(RoutingContext rc) { public void handle(RoutingContext ctx) {
try { try {
ApiConfig apiConfig = AppUtils.getApiConfigFromRoutingContext(ctx);
// mock // mock
ApiConfig mock = rc.get(MOCK_POST); MockResponse mockResponse = AppUtils.mock(ctx, apiConfig);
if(mock != null) { if (mockResponse != null) {
MockResponse mockResponse = AppUtils.mock(rc, mock); ctx.fail(new MockException(mockResponse.getHttpStatus(), mockResponse.getMockResponse()));
if (mockResponse != null) { return;
rc.fail(new MockException(mockResponse.getHttpStatus(), mockResponse.getMockResponse())); }
return;
}
}
} catch (Exception e) { } catch (Exception e) {
log.error("BodyPreCheckHandlerImpl:",e); log.error("BodyPostCheckHandlerImpl:",e);
rc.fail(new ServiceException(GatewayError.DEFAULT_SERVICE_ERROR)); ctx.fail(new ServiceException(GatewayError.DEFAULT_SERVICE_ERROR));
return; return;
} }
rc.next(); ctx.next();
} }
} }

View File

@ -1,13 +1,7 @@
package com.sf.vertx.handle; package com.sf.vertx.handle;
import static com.sf.vertx.constans.SACConstants.MOCK_PRE;
import com.sf.vertx.api.pojo.ApiConfig;
import com.sf.vertx.api.pojo.MockResponse;
import com.sf.vertx.enums.GatewayError; import com.sf.vertx.enums.GatewayError;
import com.sf.vertx.exception.MockException;
import com.sf.vertx.exception.ServiceException; import com.sf.vertx.exception.ServiceException;
import com.sf.vertx.utils.AppUtils;
import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.RoutingContext;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -16,22 +10,13 @@ import lombok.extern.slf4j.Slf4j;
public class BodyPreCheckHandlerImpl implements BodyPreCheckHandler { public class BodyPreCheckHandlerImpl implements BodyPreCheckHandler {
@Override @Override
public void handle(RoutingContext rc) { public void handle(RoutingContext ctx) {
try { try {
// mock
ApiConfig mock = rc.get(MOCK_PRE);
if(mock != null) {
MockResponse mockResponse = AppUtils.mock(rc, mock);
if (mockResponse != null) {
rc.fail(new MockException(mockResponse.getHttpStatus(), mockResponse.getMockResponse()));
return;
}
}
} catch (Exception e) { } catch (Exception e) {
log.error("BodyPreCheckHandlerImpl:",e); log.error("BodyPreCheckHandlerImpl:",e);
rc.fail(new ServiceException(GatewayError.DEFAULT_SERVICE_ERROR)); ctx.fail(new ServiceException(GatewayError.DEFAULT_SERVICE_ERROR));
return; return;
} }
rc.next(); ctx.next();
} }
} }

View File

@ -34,6 +34,10 @@ public class OpenParameterCheckHandlerImpl extends BaseRoutingContextDataHandler
// 设置上下午数据 // 设置上下午数据
this.ctxData(rc, appConfig, apiConfig, apiCacheKey); this.ctxData(rc, appConfig, apiConfig, apiCacheKey);
// 反向代理负载均衡,需要从header拿到appCode,apiCode
rc.request().headers().add(AppConfigHandler.getAppCodeHeaderKey(), appConfig.getAppCode());
rc.request().headers().add(AppConfigHandler.getApiCodeHeaderKey(), domain);
} catch (Exception e) { } catch (Exception e) {
log.error("OpenParameterCheckHandlerImpl Error:", e); log.error("OpenParameterCheckHandlerImpl Error:", e);
rc.fail(new ServiceException(GatewayError.DEFAULT_SERVICE_ERROR)); rc.fail(new ServiceException(GatewayError.DEFAULT_SERVICE_ERROR));

View File

@ -0,0 +1,34 @@
/*
* Copyright (c) 2011-2021 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package com.sf.vertx.handle;
import com.sf.vertx.httpproxy.HttpProxy;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;
/**
* @author <a href="mailto:emad.albloushi@gmail.com">Emad Alblueshi</a>
*/
@VertxGen
public interface ProxyHandler extends Handler<RoutingContext> {
static ProxyHandler create(HttpProxy httpProxy) {
return new ProxyHandlerImpl(httpProxy);
}
static ProxyHandler create(HttpProxy httpProxy, int port, String host) {
return new ProxyHandlerImpl(httpProxy, port, host);
}
}

View File

@ -0,0 +1,27 @@
package com.sf.vertx.handle;
import com.sf.vertx.httpproxy.HttpProxy;
import io.vertx.ext.web.RoutingContext;
/**
* @author <a href="mailto:emad.albloushi@gmail.com">Emad Alblueshi</a>
*/
public class ProxyHandlerImpl implements ProxyHandler {
private final HttpProxy httpProxy;
public ProxyHandlerImpl(HttpProxy httpProxy) {
this.httpProxy = httpProxy;
}
public ProxyHandlerImpl(HttpProxy httpProxy, int port, String host) {
this.httpProxy = httpProxy.origin(port, host);
}
@Override
public void handle(RoutingContext ctx) {
httpProxy.handle(ctx.request());
}
}

View File

@ -2,7 +2,7 @@ package com.sf.vertx.handle;
import static com.sf.vertx.constans.SACConstants.API_SERVICE_TYPE; import static com.sf.vertx.constans.SACConstants.API_SERVICE_TYPE;
import static org.apache.commons.lang3.StringUtils.EMPTY; import static org.apache.commons.lang3.StringUtils.EMPTY;
import static com.sf.vertx.constans.SACConstants.API_SERVICE_TYPE_SAC;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -19,6 +19,7 @@ import com.sf.vertx.utils.ProxyTool;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import io.vertx.circuitbreaker.CircuitBreaker; import io.vertx.circuitbreaker.CircuitBreaker;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.json.JsonObject; import io.vertx.core.json.JsonObject;
import io.vertx.core.net.SocketAddress; import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.RoutingContext;
@ -110,8 +111,8 @@ public class SacRouteRequestHandlerImpl implements SacRouteRequestHandler {
private HttpRequest<Buffer> buildCallRequestBuffer(RoutingContext ctx, String body, ApiConfig apiConfig) { private HttpRequest<Buffer> buildCallRequestBuffer(RoutingContext ctx, String body, ApiConfig apiConfig) {
SocketAddress socketAddress = ProxyTool.resolveOriginAddress(ctx); SocketAddress socketAddress = ProxyTool.resolveOriginAddress(ctx);
HttpRequest<Buffer> requestBuffer; HttpRequest<Buffer> requestBuffer;
String requestURI = ctx.request().uri(); String requestURI = apiConfig.getUri();
switch (ctx.request().method().name()) { switch (apiConfig.getMethod()) {
case "PUT": case "PUT":
requestBuffer = mainWebClient.put(socketAddress.port(), socketAddress.host(), requestURI); requestBuffer = mainWebClient.put(socketAddress.port(), socketAddress.host(), requestURI);
break; break;
@ -153,10 +154,10 @@ public class SacRouteRequestHandlerImpl implements SacRouteRequestHandler {
// 判断body是否解析 // 判断body是否解析
AppConfig appConfig = AppUtils.getAppConfigFromRoutingContext(ctx); AppConfig appConfig = AppUtils.getAppConfigFromRoutingContext(ctx);
ApiConfig apiConfig = AppUtils.getApiConfigFromRoutingContext(ctx); ApiConfig apiConfig = AppUtils.getApiConfigFromRoutingContext(ctx);
String apiServiceType = ctx.get(API_SERVICE_TYPE); String contentType = ctx.request().headers().get(HttpHeaders.CONTENT_TYPE);
String keyCircuitBreaker = appConfig.getAppCode() + ":" + apiConfig.getApiCode() + ":" + "CIRCUIT_BREAKER"; if (AppUtils.isAnalysisBody(appConfig, apiConfig, contentType)) {
CircuitBreaker circuitBreaker = AppConfigHandler.getApiCodeCircuitBreaker(keyCircuitBreaker); String keyCircuitBreaker = appConfig.getAppCode() + ":" + apiConfig.getApiCode() + ":" + "CIRCUIT_BREAKER";
if (AppUtils.isAnalysisBody(appConfig, apiConfig, apiServiceType)) { CircuitBreaker circuitBreaker = AppConfigHandler.getApiCodeCircuitBreaker(keyCircuitBreaker);
String body = AppUtils.isEnableDataSecurity(appConfig) String body = AppUtils.isEnableDataSecurity(appConfig)
? AppConfigHandler.bodyDecrypt(ctx.body().asString(), appConfig.getAppCode()) ? AppConfigHandler.bodyDecrypt(ctx.body().asString(), appConfig.getAppCode())
: ctx.body().asString(); : ctx.body().asString();

View File

@ -0,0 +1,121 @@
/*
* Copyright (c) 2011-2020 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package com.sf.vertx.httpproxy;
import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.net.SocketAddress;
import io.vertx.httpproxy.ProxyInterceptor;
import io.vertx.httpproxy.ProxyOptions;
import com.sf.vertx.httpproxy.impl.ReverseProxy;
import java.util.function.BiFunction;
import java.util.function.Function;
/**
* Handles the HTTP reverse proxy logic between the <i><b>user agent</b></i> and the <i><b>origin</b></i>.
* <p>
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
@VertxGen
public interface HttpProxy extends Handler<HttpServerRequest> {
/**
* Create a new {@code HttpProxy} instance.
*
* @param client the {@code HttpClient} that forwards <i><b>outbound</b></i> requests to the <i><b>origin</b></i>.
* @return a reference to this, so the API can be used fluently.
*/
static HttpProxy reverseProxy(HttpClient client) {
return new ReverseProxy(new ProxyOptions(), client);
}
/**
* Create a new {@code HttpProxy} instance.
*
* @param client the {@code HttpClient} that forwards <i><b>outbound</b></i> requests to the <i><b>origin</b></i>.
* @return a reference to this, so the API can be used fluently.
*/
static HttpProxy reverseProxy(ProxyOptions options, HttpClient client) {
return new ReverseProxy(options, client);
}
/**
* Set the {@code SocketAddress} of the <i><b>origin</b></i>.
*
* @param address the {@code SocketAddress} of the <i><b>origin</b></i>
* @return a reference to this, so the API can be used fluently
*/
@Fluent
default HttpProxy origin(SocketAddress address) {
return originSelector(req -> Future.succeededFuture(address));
}
/**
* Set the host name and port number of the <i><b>origin</b></i>.
*
* @param port the port number of the <i><b>origin</b></i> server
* @param host the host name of the <i><b>origin</b></i> server
* @return a reference to this, so the API can be used fluently
*/
@Fluent
default HttpProxy origin(int port, String host) {
return origin(SocketAddress.inetSocketAddress(port, host));
}
/**
* Set a selector that resolves the <i><b>origin</b></i> address based on the incoming HTTP request.
*
* @param selector the selector
* @return a reference to this, so the API can be used fluently
*/
@Fluent
default HttpProxy originSelector(Function<HttpServerRequest, Future<SocketAddress>> selector) {
return originRequestProvider((req, client) -> selector
.apply(req)
.flatMap(server -> client.request(new RequestOptions().setServer(server))));
}
/**
* Set a provider that creates the request to the <i><b>origin</b></i> server based the incoming HTTP request.
* Setting a provider overrides any origin selector previously set.
*
* @param provider the provider
* @return a reference to this, so the API can be used fluently
*/
@GenIgnore()
@Fluent
HttpProxy originRequestProvider(BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> provider);
/**
* Add an interceptor to the interceptor chain.
*
* @param interceptor
* @return a reference to this, so the API can be used fluently
*/
@Fluent
HttpProxy addInterceptor(ProxyInterceptor interceptor);
/**
* Handle the <i><b>outbound</b></i> {@code HttpServerRequest}.
*
* @param request the outbound {@code HttpServerRequest}
*/
void handle(HttpServerRequest request);
}

View File

@ -0,0 +1,82 @@
/*
* Copyright (c) 2011-2020 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package com.sf.vertx.httpproxy.impl;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
public class BufferedReadStream implements ReadStream<Buffer> {
private long demand = 0L;
private Handler<Void> endHandler;
private Handler<Buffer> handler;
private boolean ended = false;
private final Buffer content;
public BufferedReadStream() {
this.content = Buffer.buffer();
}
public BufferedReadStream(Buffer content) {
this.content = content;
}
@Override
public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
return this;
}
@Override
public ReadStream<Buffer> handler(Handler<Buffer> handler) {
this.handler = handler;
return this;
}
@Override
public ReadStream<Buffer> pause() {
demand = 0L;
return this;
}
@Override
public ReadStream<Buffer> resume() {
fetch(Long.MAX_VALUE);
return this;
}
@Override
public ReadStream<Buffer> fetch(long amount) {
if (!ended && amount > 0) {
ended = true;
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
if (demand != Long.MAX_VALUE) {
demand--;
}
if (handler != null && content.length() > 0) {
handler.handle(content);
}
if (endHandler != null) {
endHandler.handle(null);
}
}
return this;
}
@Override
public ReadStream<Buffer> endHandler(Handler<Void> endHandler) {
this.endHandler = endHandler;
return this;
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright (c) 2011-2020 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package com.sf.vertx.httpproxy.impl;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
class BufferingReadStream implements ReadStream<Buffer> {
private final ReadStream<Buffer> stream;
private final Buffer content;
private Handler<Void> endHandler;
public BufferingReadStream(ReadStream<Buffer> stream, Buffer content) {
this.stream = stream;
this.content = content;
}
@Override
public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
stream.exceptionHandler(handler);
return this;
}
@Override
public ReadStream<Buffer> handler(Handler<Buffer> handler) {
if (handler != null) {
stream.handler(buff -> {
content.appendBuffer(buff);
handler.handle(buff);
});
} else {
stream.handler(null);
}
return this;
}
@Override
public ReadStream<Buffer> pause() {
stream.pause();
return this;
}
@Override
public ReadStream<Buffer> resume() {
stream.resume();
return this;
}
@Override
public ReadStream<Buffer> fetch(long amount) {
stream.fetch(amount);
return this;
}
@Override
public ReadStream<Buffer> endHandler(Handler<Void> endHandler) {
if (endHandler != null) {
stream.endHandler(v -> {
endHandler.handle(null);
});
} else {
stream.endHandler(null);
}
return this;
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright (c) 2011-2020 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package com.sf.vertx.httpproxy.impl;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
class BufferingWriteStream implements WriteStream<Buffer> {
private final Buffer content;
public BufferingWriteStream() {
this.content = Buffer.buffer();
}
public Buffer content() {
return content;
}
@Override
public WriteStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
return this;
}
@Override
public Future<Void> write(Buffer data) {
content.appendBuffer(data);
return Future.succeededFuture();
}
@Override
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
content.appendBuffer(data);
handler.handle(Future.succeededFuture());
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
handler.handle(Future.succeededFuture());
}
@Override
public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
return this;
}
@Override
public boolean writeQueueFull() {
return false;
}
@Override
public WriteStream<Buffer> drainHandler(@Nullable Handler<Void> handler) {
return this;
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright (c) 2011-2020 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package com.sf.vertx.httpproxy.impl;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class CacheControl {
private int maxAge;
private boolean _public;
public CacheControl parse(String header) {
maxAge = -1;
_public = false;
String[] parts = header.split(","); // No regex
for (String part : parts) {
part = part.trim().toLowerCase();
switch (part) {
case "public":
_public = true;
break;
default:
if (part.startsWith("max-age=")) {
maxAge = Integer.parseInt(part.substring(8));
}
break;
}
}
return this;
}
public int maxAge() {
return maxAge;
}
public boolean isPublic() {
return _public;
}
}

View File

@ -0,0 +1,23 @@
package com.sf.vertx.httpproxy.impl;
import io.vertx.httpproxy.cache.CacheOptions;
import io.vertx.httpproxy.spi.cache.Cache;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* Simplistic implementation.
*/
public class CacheImpl<K, V> extends LinkedHashMap<K, V> implements Cache<K, V> {
private final int maxSize;
public CacheImpl(CacheOptions options) {
this.maxSize = options.getMaxSize();
}
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > maxSize;
}
}

View File

@ -0,0 +1,150 @@
package com.sf.vertx.httpproxy.impl;
import io.vertx.core.Future;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.httpproxy.Body;
import io.vertx.httpproxy.ProxyContext;
import io.vertx.httpproxy.ProxyInterceptor;
import io.vertx.httpproxy.ProxyRequest;
import io.vertx.httpproxy.ProxyResponse;
import io.vertx.httpproxy.spi.cache.Cache;
import java.time.Instant;
import java.util.function.BiFunction;
class CachingFilter implements ProxyInterceptor {
private static final BiFunction<String, Resource, Resource> CACHE_GET_AND_VALIDATE = (key, resource) -> {
long now = System.currentTimeMillis();
long val = resource.timestamp + resource.maxAge;
return val < now ? null : resource;
};
private final Cache<String, Resource> cache;
public CachingFilter(Cache<String, Resource> cache) {
this.cache = cache;
}
@Override
public Future<ProxyResponse> handleProxyRequest(ProxyContext context) {
Future<ProxyResponse> future = tryHandleProxyRequestFromCache(context);
if (future != null) {
return future;
}
return context.sendRequest();
}
@Override
public Future<Void> handleProxyResponse(ProxyContext context) {
return sendAndTryCacheProxyResponse(context);
}
private Future<Void> sendAndTryCacheProxyResponse(ProxyContext context) {
ProxyResponse response = context.response();
Resource cached = context.get("cached_resource", Resource.class);
if (cached != null && response.getStatusCode() == 304) {
// Warning: this relies on the fact that HttpServerRequest will not send a body for HEAD
response.release();
cached.init(response);
return context.sendResponse();
}
ProxyRequest request = response.request();
if (response.publicCacheControl() && response.maxAge() > 0) {
if (request.getMethod() == HttpMethod.GET) {
String absoluteUri = request.absoluteURI();
Resource res = new Resource(
absoluteUri,
response.getStatusCode(),
response.getStatusMessage(),
response.headers(),
System.currentTimeMillis(),
response.maxAge());
Body body = response.getBody();
response.setBody(Body.body(new BufferingReadStream(body.stream(), res.content), body.length()));
Future<Void> fut = context.sendResponse();
fut.onSuccess(v -> {
cache.put(absoluteUri, res);
});
return fut;
} else {
if (request.getMethod() == HttpMethod.HEAD) {
Resource resource = cache.get(request.absoluteURI());
if (resource != null) {
if (!revalidateResource(response, resource)) {
// Invalidate cache
cache.remove(request.absoluteURI());
}
}
}
return context.sendResponse();
}
} else {
return context.sendResponse();
}
}
private static boolean revalidateResource(ProxyResponse response, Resource resource) {
if (resource.etag != null && response.etag() != null) {
return resource.etag.equals(response.etag());
}
return true;
}
private Future<ProxyResponse> tryHandleProxyRequestFromCache(ProxyContext context) {
ProxyRequest proxyRequest = context.request();
HttpServerRequest response = proxyRequest.proxiedRequest();
Resource resource;
HttpMethod method = response.method();
if (method == HttpMethod.GET || method == HttpMethod.HEAD) {
String cacheKey = proxyRequest.absoluteURI();
resource = cache.computeIfPresent(cacheKey, CACHE_GET_AND_VALIDATE);
if (resource == null) {
return null;
}
} else {
return null;
}
String cacheControlHeader = response.getHeader(HttpHeaders.CACHE_CONTROL);
if (cacheControlHeader != null) {
CacheControl cacheControl = new CacheControl().parse(cacheControlHeader);
if (cacheControl.maxAge() >= 0) {
long now = System.currentTimeMillis();
long currentAge = now - resource.timestamp;
if (currentAge > cacheControl.maxAge() * 1000) {
String etag = resource.headers.get(HttpHeaders.ETAG);
if (etag != null) {
proxyRequest.headers().set(HttpHeaders.IF_NONE_MATCH, resource.etag);
context.set("cached_resource", resource);
return context.sendRequest();
} else {
return null;
}
}
}
}
//
String ifModifiedSinceHeader = response.getHeader(HttpHeaders.IF_MODIFIED_SINCE);
if ((response.method() == HttpMethod.GET || response.method() == HttpMethod.HEAD) && ifModifiedSinceHeader != null && resource.lastModified != null) {
Instant ifModifiedSince = ParseUtils.parseHeaderDate(ifModifiedSinceHeader);
if (!ifModifiedSince.isAfter(resource.lastModified)) {
response.response().setStatusCode(304).end();
return Future.succeededFuture();
}
}
proxyRequest.release();
ProxyResponse proxyResponse = proxyRequest.response();
resource.init(proxyResponse);
return Future.succeededFuture(proxyResponse);
}
}

View File

@ -0,0 +1,55 @@
/*
* Copyright (c) 2011-2020 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package com.sf.vertx.httpproxy.impl;
import io.vertx.core.MultiMap;
import io.vertx.core.http.HttpHeaders;
import java.time.Instant;
import java.util.List;
class HttpUtils {
static Boolean isChunked(MultiMap headers) {
List<String> te = headers.getAll("transfer-encoding");
if (te != null) {
boolean chunked = false;
for (String val : te) {
if (val.equals("chunked")) {
chunked = true;
} else {
return null;
}
}
return chunked;
} else {
return false;
}
}
static Instant dateHeader(MultiMap headers) {
String dateHeader = headers.get(HttpHeaders.DATE);
if (dateHeader == null) {
List<String> warningHeaders = headers.getAll("warning");
if (warningHeaders.size() > 0) {
for (String warningHeader : warningHeaders) {
Instant date = ParseUtils.parseWarningHeaderDate(warningHeader);
if (date != null) {
return date;
}
}
}
return null;
} else {
return ParseUtils.parseHeaderDate(dateHeader);
}
}
}

View File

@ -0,0 +1,81 @@
/*
* Copyright (c) 2011-2020 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package com.sf.vertx.httpproxy.impl;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.util.*;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class ParseUtils {
public static final DateTimeFormatter RFC_850_DATE_TIME = new DateTimeFormatterBuilder()
.appendPattern("EEEE, dd-MMM-yy HH:mm:ss")
.parseLenient()
.appendLiteral(" GMT")
.toFormatter(Locale.US)
.withZone(ZoneId.of("UTC"));
public static final DateTimeFormatter ASC_TIME = new DateTimeFormatterBuilder()
.appendPattern("EEE MMM d HH:mm:ss yyyy")
.parseLenient()
.toFormatter(Locale.US)
.withZone(ZoneId.of("UTC"));
public static Instant parseHeaderDate(String value) {
try {
return parseHttpDate(value);
} catch (Exception e) {
return null;
}
}
public static Instant parseWarningHeaderDate(String value) {
// warn-code
int index = value.indexOf(' ');
if (index > 0) {
// warn-agent
index = value.indexOf(' ', index + 1);
if (index > 0) {
// warn-text
index = value.indexOf(' ', index + 1);
if (index > 0) {
// warn-date
int len = value.length();
if (index + 2 < len && value.charAt(index + 1) == '"' && value.charAt(len - 1) == '"') {
// Space for 2 double quotes
return parseHeaderDate(value.substring(index + 2, len - 1));
}
}
}
}
return null;
}
public static String formatHttpDate(Instant date) {
return DateTimeFormatter.RFC_1123_DATE_TIME.format(OffsetDateTime.ofInstant(date, ZoneOffset.UTC));
}
// https://www.rfc-editor.org/rfc/rfc9110#http.date
public static Instant parseHttpDate(String value) throws Exception {
int pos = value.indexOf(',');
if (pos == 3) { // e.g. Sun, 06 Nov 1994 08:49:37 GMT
return DateTimeFormatter.RFC_1123_DATE_TIME.parse(value, Instant::from);
}
if (pos == -1) { // e.g. Sun Nov 6 08:49:37 1994
return ASC_TIME.parse(value, Instant::from);
}
return RFC_850_DATE_TIME.parse(value, Instant::from); // e.g. Sunday, 06-Nov-94 08:49:37 GMT
}
}

View File

@ -0,0 +1,230 @@
/*
* Copyright (c) 2011-2020 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package com.sf.vertx.httpproxy.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.impl.HttpServerRequestInternal;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.HostAndPort;
import io.vertx.core.streams.Pipe;
import io.vertx.httpproxy.Body;
import io.vertx.httpproxy.ProxyRequest;
import io.vertx.httpproxy.ProxyResponse;
import java.util.Map;
import java.util.Objects;
public class ProxiedRequest implements ProxyRequest {
private static final CharSequence X_FORWARDED_HOST = HttpHeaders.createOptimized("x-forwarded-host");
private static final MultiMap HOP_BY_HOP_HEADERS = MultiMap.caseInsensitiveMultiMap()
.add(HttpHeaders.CONNECTION, "whatever")
.add(HttpHeaders.KEEP_ALIVE, "whatever")
.add(HttpHeaders.PROXY_AUTHENTICATE, "whatever")
.add(HttpHeaders.PROXY_AUTHORIZATION, "whatever")
.add("te", "whatever")
.add("trailer", "whatever")
.add(HttpHeaders.TRANSFER_ENCODING, "whatever")
.add(HttpHeaders.UPGRADE, "whatever");
final ContextInternal context;
private HttpMethod method;
private final HttpVersion version;
private String uri;
private final String absoluteURI;
private Body body;
private HostAndPort authority;
private final MultiMap headers;
HttpClientRequest request;
private final HttpServerRequest proxiedRequest;
public ProxiedRequest(HttpServerRequest proxiedRequest) {
// Determine content length
long contentLength = -1L;
String contentLengthHeader = proxiedRequest.getHeader(HttpHeaders.CONTENT_LENGTH);
if (contentLengthHeader != null) {
try {
contentLength = Long.parseLong(contentLengthHeader);
} catch (NumberFormatException e) {
// Ignore ???
}
}
this.method = proxiedRequest.method();
this.version = proxiedRequest.version();
this.body = Body.body(proxiedRequest, contentLength);
this.uri = proxiedRequest.uri();
this.headers = MultiMap.caseInsensitiveMultiMap().addAll(proxiedRequest.headers());
this.absoluteURI = proxiedRequest.absoluteURI();
this.proxiedRequest = proxiedRequest;
this.context = (ContextInternal) ((HttpServerRequestInternal) proxiedRequest).context();
this.authority = proxiedRequest.authority();
}
@Override
public HttpVersion version() {
return version;
}
@Override
public String getURI() {
return uri;
}
@Override
public ProxyRequest setURI(String uri) {
this.uri = uri;
return this;
}
@Override
public Body getBody() {
return body;
}
@Override
public ProxyRequest setBody(Body body) {
this.body = body;
return this;
}
@Override
public ProxyRequest setAuthority(HostAndPort authority) {
Objects.requireNonNull(authority);
this.authority= authority;
return this;
}
@Override
public HostAndPort getAuthority() {
return authority;
}
@Override
public String absoluteURI() {
return absoluteURI;
}
@Override
public HttpMethod getMethod() {
return method;
}
@Override
public ProxyRequest setMethod(HttpMethod method) {
this.method = method;
return this;
}
@Override
public HttpServerRequest proxiedRequest() {
return proxiedRequest;
}
@Override
public ProxyRequest release() {
body.stream().resume();
headers.clear();
body = null;
return this;
}
@Override
public ProxyResponse response() {
return new ProxiedResponse(this, proxiedRequest.response());
}
void sendRequest(Handler<AsyncResult<ProxyResponse>> responseHandler) {
request.response().<ProxyResponse>map(r -> {
r.pause(); // Pause it
return new ProxiedResponse(this, proxiedRequest.response(), r);
}).onComplete(responseHandler);
request.setMethod(method);
request.setURI(uri);
// Add all headers
for (Map.Entry<String, String> header : headers) {
String name = header.getKey();
String value = header.getValue();
if (!HOP_BY_HOP_HEADERS.contains(name) && !name.equals("host")) {
request.headers().add(name, value);
}
}
//
if (authority != null) {
request.authority(authority);
HostAndPort proxiedAuthority = proxiedRequest.authority();
if (!equals(authority, proxiedAuthority)) {
// Should cope with existing forwarded host headers
request.putHeader(X_FORWARDED_HOST, proxiedAuthority.toString());
}
}
long len = body.length();
if (len >= 0) {
request.putHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(len));
} else {
Boolean isChunked = HttpUtils.isChunked(proxiedRequest.headers());
request.setChunked(len == -1 && Boolean.TRUE == isChunked);
}
Pipe<Buffer> pipe = body.stream().pipe();
pipe.endOnComplete(true);
pipe.endOnFailure(false);
pipe.to(request, ar -> {
if (ar.failed()) {
request.reset();
}
});
}
private static boolean equals(HostAndPort hp1, HostAndPort hp2) {
if (hp1 == null || hp2 == null) {
return false;
}
return hp1.host().equals(hp2.host()) && hp1.port() == hp2.port();
}
@Override
public ProxyRequest putHeader(CharSequence name, CharSequence value) {
headers.set(name, value);
return this;
}
@Override
public MultiMap headers() {
return headers;
}
@Override
public Future<ProxyResponse> send(HttpClientRequest request) {
Promise<ProxyResponse> promise = context.promise();
this.request = request;
sendRequest(promise);
return promise.future();
}
}

View File

@ -0,0 +1,271 @@
/*
* Copyright (c) 2011-2020 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package com.sf.vertx.httpproxy.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.streams.Pipe;
import io.vertx.core.streams.ReadStream;
import io.vertx.httpproxy.Body;
import io.vertx.httpproxy.ProxyRequest;
import io.vertx.httpproxy.ProxyResponse;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
class ProxiedResponse implements ProxyResponse {
private final ProxiedRequest request;
private final HttpServerResponse proxiedResponse;
private int statusCode;
private String statusMessage;
private Body body;
private final MultiMap headers;
private HttpClientResponse response;
private long maxAge;
private String etag;
private boolean publicCacheControl;
ProxiedResponse(ProxiedRequest request, HttpServerResponse proxiedResponse) {
this.response = null;
this.statusCode = 200;
this.headers = MultiMap.caseInsensitiveMultiMap();
this.request = request;
this.proxiedResponse = proxiedResponse;
}
ProxiedResponse(ProxiedRequest request, HttpServerResponse proxiedResponse, HttpClientResponse response) {
// Determine content length
long contentLength = -1L;
String contentLengthHeader = response.getHeader(HttpHeaders.CONTENT_LENGTH);
if (contentLengthHeader != null) {
try {
contentLength = Long.parseLong(contentLengthHeader);
} catch (NumberFormatException e) {
// Ignore ???
}
}
this.request = request;
this.response = response;
this.proxiedResponse = proxiedResponse;
this.statusCode = response.statusCode();
this.statusMessage = response.statusMessage();
this.body = Body.body(response, contentLength);
long maxAge = -1;
boolean publicCacheControl = false;
String cacheControlHeader = response.getHeader(HttpHeaders.CACHE_CONTROL);
if (cacheControlHeader != null) {
CacheControl cacheControl = new CacheControl().parse(cacheControlHeader);
if (cacheControl.isPublic()) {
publicCacheControl = true;
if (cacheControl.maxAge() > 0) {
maxAge = (long)cacheControl.maxAge() * 1000;
} else {
String dateHeader = response.getHeader(HttpHeaders.DATE);
String expiresHeader = response.getHeader(HttpHeaders.EXPIRES);
if (dateHeader != null && expiresHeader != null) {
maxAge = ParseUtils.parseHeaderDate(expiresHeader).toEpochMilli() - ParseUtils.parseHeaderDate(dateHeader).toEpochMilli();
}
}
}
}
this.maxAge = maxAge;
this.publicCacheControl = publicCacheControl;
this.etag = response.getHeader(HttpHeaders.ETAG);
this.headers = MultiMap.caseInsensitiveMultiMap().addAll(response.headers());
}
@Override
public ProxyRequest request() {
return request;
}
@Override
public int getStatusCode() {
return statusCode;
}
@Override
public ProxyResponse setStatusCode(int sc) {
statusCode = sc;
return this;
}
@Override
public String getStatusMessage() {
return statusMessage;
}
@Override
public ProxyResponse setStatusMessage(String statusMessage) {
this.statusMessage = statusMessage;
return this;
}
@Override
public Body getBody() {
return body;
}
@Override
public ProxyResponse setBody(Body body) {
this.body = body;
return this;
}
@Override
public boolean publicCacheControl() {
return publicCacheControl;
}
@Override
public long maxAge() {
return maxAge;
}
@Override
public String etag() {
return etag;
}
@Override
public MultiMap headers() {
return headers;
}
@Override
public ProxyResponse putHeader(CharSequence name, CharSequence value) {
headers.set(name, value);
return this;
}
@Override
public Future<Void> send() {
Promise<Void> promise = request.context.promise();
send(promise);
return promise.future();
}
public void send(Handler<AsyncResult<Void>> completionHandler) {
// Set stuff
proxiedResponse.setStatusCode(statusCode);
if(statusMessage != null) {
proxiedResponse.setStatusMessage(statusMessage);
}
// Date header
Instant date = HttpUtils.dateHeader(headers);
if (date == null) {
date = Instant.now();
}
try {
proxiedResponse.putHeader("date", ParseUtils.formatHttpDate(date));
} catch (Exception e) {
e.printStackTrace();
}
// Warning header
List<String> warningHeaders = headers.getAll("warning");
if (warningHeaders.size() > 0) {
warningHeaders = new ArrayList<>(warningHeaders);
String dateHeader = headers.get("date");
Instant dateInstant = dateHeader != null ? ParseUtils.parseHeaderDate(dateHeader) : null;
Iterator<String> i = warningHeaders.iterator();
// Suppress incorrect warning header
while (i.hasNext()) {
String warningHeader = i.next();
Instant warningInstant = ParseUtils.parseWarningHeaderDate(warningHeader);
if (warningInstant != null && dateInstant != null && !warningInstant.equals(dateInstant)) {
i.remove();
}
}
}
proxiedResponse.putHeader("warning", warningHeaders);
// Handle other headers
headers.forEach(header -> {
String name = header.getKey();
String value = header.getValue();
if (name.equalsIgnoreCase("date") || name.equalsIgnoreCase("warning") || name.equalsIgnoreCase("transfer-encoding")) {
// Skip
} else {
proxiedResponse.headers().add(name, value);
}
});
//
if (body == null) {
proxiedResponse.end();
return;
}
long len = body.length();
if (len >= 0) {
proxiedResponse.putHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(len));
} else {
if (request.proxiedRequest().version() == HttpVersion.HTTP_1_0) {
// Special handling for HTTP 1.0 clients that cannot handle chunked encoding
// we need to buffer the content
BufferingWriteStream buffer = new BufferingWriteStream();
body.stream().pipeTo(buffer, ar -> {
if (ar.succeeded()) {
Buffer content = buffer.content();
proxiedResponse.end(content, completionHandler);
} else {
System.out.println("Not implemented");
}
});
return;
}
proxiedResponse.setChunked(true);
}
ReadStream<Buffer> bodyStream = body.stream();
sendResponse(bodyStream, completionHandler);
}
@Override
public ProxyResponse release() {
if (response != null) {
response.resume();
response = null;
body = null;
headers.clear();
}
return this;
}
private void sendResponse(ReadStream<Buffer> body, Handler<AsyncResult<Void>> completionHandler) {
Pipe<Buffer> pipe = body.pipe();
pipe.endOnSuccess(true);
pipe.endOnFailure(false);
pipe.to(proxiedResponse, ar -> {
if (ar.failed()) {
request.request.reset();
proxiedResponse.reset();
}
completionHandler.handle(ar);
});
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright (c) 2011-2020 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package com.sf.vertx.httpproxy.impl;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.httpproxy.Body;
import io.vertx.httpproxy.ProxyResponse;
import java.time.Instant;
class Resource {
final String absoluteUri;
final int statusCode;
final String statusMessage;
final MultiMap headers;
final long timestamp;
final long maxAge;
final Instant lastModified;
final String etag;
final Buffer content = Buffer.buffer();
Resource(String absoluteUri, int statusCode, String statusMessage, MultiMap headers, long timestamp, long maxAge) {
String lastModifiedHeader = headers.get(HttpHeaders.LAST_MODIFIED);
this.absoluteUri = absoluteUri;
this.statusCode = statusCode;
this.statusMessage = statusMessage;
this.headers = headers;
this.timestamp = timestamp;
this.maxAge = maxAge;
this.lastModified = lastModifiedHeader != null ? ParseUtils.parseHeaderDate(lastModifiedHeader) : null;
this.etag = headers.get(HttpHeaders.ETAG);
}
void init(ProxyResponse proxyResponse) {
proxyResponse.setStatusCode(200);
proxyResponse.setStatusMessage(statusMessage);
proxyResponse.headers().addAll(headers);
proxyResponse.setBody(Body.body(content));
}
}

View File

@ -0,0 +1,259 @@
/*
* Copyright (c) 2011-2020 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package com.sf.vertx.httpproxy.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.function.BiFunction;
import com.sf.vertx.handle.AppConfigHandler;
import com.sf.vertx.httpproxy.HttpProxy;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.net.NetSocket;
import io.vertx.httpproxy.ProxyContext;
import io.vertx.httpproxy.ProxyInterceptor;
import io.vertx.httpproxy.ProxyOptions;
import io.vertx.httpproxy.ProxyRequest;
import io.vertx.httpproxy.ProxyResponse;
import io.vertx.httpproxy.cache.CacheOptions;
import io.vertx.httpproxy.spi.cache.Cache;
public class ReverseProxy implements HttpProxy {
private final HttpClient client;
private final boolean supportWebSocket;
private BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> selector = (req, client) -> Future.failedFuture("No origin available");
private final List<ProxyInterceptor> interceptors = new ArrayList<>();
public ReverseProxy(ProxyOptions options, HttpClient client) {
CacheOptions cacheOptions = options.getCacheOptions();
if (cacheOptions != null) {
Cache<String, Resource> cache = cacheOptions.newCache();
addInterceptor(new CachingFilter(cache));
}
this.client = client;
this.supportWebSocket = options.getSupportWebSocket();
}
@Override
public HttpProxy originRequestProvider(BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> provider) {
selector = provider;
return this;
}
@Override
public HttpProxy addInterceptor(ProxyInterceptor interceptor) {
interceptors.add(interceptor);
return this;
}
@Override
public void handle(HttpServerRequest request) {
ProxyRequest proxyRequest = ProxyRequest.reverseProxy(request);
// Encoding sanity check
Boolean chunked = HttpUtils.isChunked(request.headers());
if (chunked == null) {
end(proxyRequest, 400);
return;
}
// WebSocket upgrade tunneling
if (supportWebSocket && io.vertx.core.http.impl.HttpUtils.canUpgradeToWebSocket(request)) {
handleWebSocketUpgrade(proxyRequest);
return;
}
Proxy proxy = new Proxy(proxyRequest);
proxy.filters = interceptors.listIterator();
proxy.sendRequest().compose(proxy::sendProxyResponse);
}
private void handleWebSocketUpgrade(ProxyRequest proxyRequest) {
HttpServerRequest proxiedRequest = proxyRequest.proxiedRequest();
resolveOrigin(proxiedRequest).onComplete(ar -> {
if (ar.succeeded()) {
HttpClientRequest request = ar.result();
request.setMethod(HttpMethod.GET);
request.setURI(proxiedRequest.uri());
request.headers().addAll(proxiedRequest.headers());
Future<HttpClientResponse> fut2 = request.connect();
proxiedRequest.handler(request::write);
proxiedRequest.endHandler(v -> request.end());
proxiedRequest.resume();
fut2.onComplete(ar2 -> {
if (ar2.succeeded()) {
HttpClientResponse proxiedResponse = ar2.result();
if (proxiedResponse.statusCode() == 101) {
HttpServerResponse response = proxiedRequest.response();
response.setStatusCode(101);
response.headers().addAll(proxiedResponse.headers());
Future<NetSocket> otherso = proxiedRequest.toNetSocket();
otherso.onComplete(ar3 -> {
if (ar3.succeeded()) {
NetSocket responseSocket = ar3.result();
NetSocket proxyResponseSocket = proxiedResponse.netSocket();
responseSocket.handler(proxyResponseSocket::write);
proxyResponseSocket.handler(responseSocket::write);
responseSocket.closeHandler(v -> proxyResponseSocket.close());
proxyResponseSocket.closeHandler(v -> responseSocket.close());
} else {
// Find reproducer
System.err.println("Handle this case");
ar3.cause().printStackTrace();
}
});
} else {
// Rejection
proxiedRequest.resume();
end(proxyRequest, proxiedResponse.statusCode());
}
} else {
proxiedRequest.resume();
end(proxyRequest, 502);
}
});
} else {
proxiedRequest.resume();
end(proxyRequest, 502);
}
});
}
private void end(ProxyRequest proxyRequest, int sc) {
proxyRequest
.response()
.release()
.setStatusCode(sc)
.putHeader(HttpHeaders.CONTENT_LENGTH, "0")
.setBody(null)
.send();
}
private Future<HttpClientRequest> resolveOrigin(HttpServerRequest proxiedRequest) {
return selector.apply(proxiedRequest, client);
}
private class Proxy implements ProxyContext {
private final ProxyRequest request;
private ProxyResponse response;
private final Map<String, Object> attachments = new HashMap<>();
private ListIterator<ProxyInterceptor> filters;
private Proxy(ProxyRequest request) {
this.request = request;
}
@Override
public void set(String name, Object value) {
attachments.put(name, value);
}
@Override
public <T> T get(String name, Class<T> type) {
Object o = attachments.get(name);
return type.isInstance(o) ? type.cast(o) : null;
}
@Override
public ProxyRequest request() {
return request;
}
@Override
public Future<ProxyResponse> sendRequest() {
if (filters.hasNext()) {
ProxyInterceptor next = filters.next();
return next.handleProxyRequest(this);
} else {
return sendProxyRequest(request);
}
}
@Override
public ProxyResponse response() {
return response;
}
@Override
public Future<Void> sendResponse() {
if (filters.hasPrevious()) {
ProxyInterceptor filter = filters.previous();
return filter.handleProxyResponse(this);
} else {
return response.send();
}
}
private Future<ProxyResponse> sendProxyRequest(ProxyRequest proxyRequest) {
Future<HttpClientRequest> f = resolveOrigin(proxyRequest.proxiedRequest());
f.onFailure(err -> {
// Should this be done here ? I don't think so
HttpServerRequest proxiedRequest = proxyRequest.proxiedRequest();
proxiedRequest.resume();
Promise<Void> promise = Promise.promise();
proxiedRequest.exceptionHandler(promise::tryFail);
proxiedRequest.endHandler(promise::tryComplete);
promise.future().onComplete(ar2 -> {
end(proxyRequest, 502);
});
});
return f.compose(a -> sendProxyRequest(proxyRequest, a));
}
private void setConnectionTimeout(ProxyRequest proxyRequest, HttpClientRequest request) {
String appCode = proxyRequest.headers().get(AppConfigHandler.getAppCodeHeaderKey());
String apiCode = proxyRequest.headers().get(AppConfigHandler.getApiCodeHeaderKey());
String key = appCode + ":" + apiCode;
request.idleTimeout(AppConfigHandler.getApicodeConfigTimeOut(key));
}
private Future<ProxyResponse> sendProxyRequest(ProxyRequest proxyRequest, HttpClientRequest request) {
Future<ProxyResponse> fut = proxyRequest.send(request);
// 超时
setConnectionTimeout(proxyRequest, request);
fut.onFailure(err -> {
proxyRequest.proxiedRequest().response().setStatusCode(502).end();
});
return fut;
}
private Future<Void> sendProxyResponse(ProxyResponse response) {
this.response = response;
// Check validity
Boolean chunked = HttpUtils.isChunked(response.headers());
if (chunked == null) {
// response.request().release(); // Is it needed ???
end(response.request(), 501);
return Future.succeededFuture(); // should use END future here ???
}
return sendResponse();
}
}
}