From 17da90448e20fa0f71cbb82162372e0a0e0eed62 Mon Sep 17 00:00:00 2001 From: ztzh_xieyun Date: Fri, 31 May 2024 16:15:08 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=8D=E5=90=91=E4=BB=A3=E7=90=86-=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E8=BF=9E=E6=8E=A5=E8=B6=85=E6=97=B6=20"apiConfig":=20?= =?UTF-8?q?[=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20{=20=20=20?= =?UTF-8?q?=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20"apiCode"?= =?UTF-8?q?:=20"sac6872cb7daf0249e998f11f4b7686f548",=20=20=20=20=20=20=20?= =?UTF-8?q?=20=20=20=20=20=20=20=20=20=20=20=20=20=20"method":=20"POST",?= =?UTF-8?q?=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20?= =?UTF-8?q?"strategy":=20[],=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20?= =?UTF-8?q?=20=20=20=20=20=20"timeout":=20300,=20=20=20=20=20=20=20=20=20?= =?UTF-8?q?=20=20=20=20=20=20=20=20=20=20=20=20"uri":=20"*"=20=20=20=20=20?= =?UTF-8?q?=20=20=20=20=20=20=20=20=20=20=20=20},?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ReverseProxy setConnectionTimeout方法 --- .../com/sf/vertx/constans/SACConstants.java | 8 +- .../com/sf/vertx/handle/AppConfigHandler.java | 136 +-------- .../handle/BaseRoutingContextDataHandler.java | 14 - .../handle/BodyPostCheckHandlerImpl.java | 23 +- .../vertx/handle/BodyPreCheckHandlerImpl.java | 21 +- .../handle/OpenParameterCheckHandlerImpl.java | 4 + .../com/sf/vertx/handle/ProxyHandler.java | 34 +++ .../com/sf/vertx/handle/ProxyHandlerImpl.java | 27 ++ .../handle/SacRouteRequestHandlerImpl.java | 15 +- .../com/sf/vertx/httpproxy/HttpProxy.java | 121 ++++++++ .../httpproxy/impl/BufferedReadStream.java | 82 ++++++ .../httpproxy/impl/BufferingReadStream.java | 76 +++++ .../httpproxy/impl/BufferingWriteStream.java | 69 +++++ .../sf/vertx/httpproxy/impl/CacheControl.java | 50 ++++ .../sf/vertx/httpproxy/impl/CacheImpl.java | 23 ++ .../vertx/httpproxy/impl/CachingFilter.java | 150 ++++++++++ .../sf/vertx/httpproxy/impl/HttpUtils.java | 55 ++++ .../sf/vertx/httpproxy/impl/ParseUtils.java | 81 ++++++ .../vertx/httpproxy/impl/ProxiedRequest.java | 230 +++++++++++++++ .../vertx/httpproxy/impl/ProxiedResponse.java | 271 ++++++++++++++++++ .../com/sf/vertx/httpproxy/impl/Resource.java | 51 ++++ .../sf/vertx/httpproxy/impl/ReverseProxy.java | 259 +++++++++++++++++ 22 files changed, 1618 insertions(+), 182 deletions(-) create mode 100644 sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandler.java create mode 100644 sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandlerImpl.java create mode 100644 sf-vertx/src/main/java/com/sf/vertx/httpproxy/HttpProxy.java create mode 100644 sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/BufferedReadStream.java create mode 100644 sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/BufferingReadStream.java create mode 100644 sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/BufferingWriteStream.java create mode 100644 sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/CacheControl.java create mode 100644 sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/CacheImpl.java create mode 100644 sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/CachingFilter.java create mode 100644 sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/HttpUtils.java create mode 100644 sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/ParseUtils.java create mode 100644 sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/ProxiedRequest.java create mode 100644 sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/ProxiedResponse.java create mode 100644 sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/Resource.java create mode 100644 sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/ReverseProxy.java diff --git a/sf-vertx/src/main/java/com/sf/vertx/constans/SACConstants.java b/sf-vertx/src/main/java/com/sf/vertx/constans/SACConstants.java index b894382..237e8cb 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/constans/SACConstants.java +++ b/sf-vertx/src/main/java/com/sf/vertx/constans/SACConstants.java @@ -6,13 +6,13 @@ public class SACConstants { public static final String APP_CONFIG = "appConfig"; - public static final String MOCK_PRE = "mockPre"; - - public static final String MOCK_POST = "mockPost"; - public static final String API_CONFIG = "apiConfig"; public static final String API_SERVICE_TYPE = "apiServiceType"; + + public static final String API_SERVICE_TYPE_OPEN = "OPEN"; + + public static final String API_SERVICE_TYPE_SAC = "SAC"; public static final String CIRCUIT_BREAKER = "CIRCUIT_BREAKER"; diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/AppConfigHandler.java b/sf-vertx/src/main/java/com/sf/vertx/handle/AppConfigHandler.java index ddd94a2..8ecd781 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/AppConfigHandler.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/AppConfigHandler.java @@ -1,9 +1,5 @@ package com.sf.vertx.handle; -import static com.sf.vertx.constans.SACConstants.CACHE_KEY_CONNECTOR; -import static com.sf.vertx.constans.SACConstants.MOCK_POST; -import static com.sf.vertx.constans.SACConstants.MOCK_PRE; - import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -11,7 +7,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.data.redis.core.RedisTemplate; @@ -25,8 +20,6 @@ import com.hazelcast.config.TcpIpConfig; import com.sf.vertx.api.pojo.ApiConfig; import com.sf.vertx.api.pojo.AppConfig; import com.sf.vertx.api.pojo.DataSecurity; -import com.sf.vertx.api.pojo.MockExpectation; -import com.sf.vertx.api.pojo.MockMatchCondition; import com.sf.vertx.api.pojo.Node; import com.sf.vertx.api.pojo.RouteContent; import com.sf.vertx.api.pojo.SacService; @@ -35,6 +28,7 @@ import com.sf.vertx.api.pojo.VertxConfig; import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing; import com.sf.vertx.constans.RedisKeyConfig; import com.sf.vertx.enums.GatewayServiceType; +import com.sf.vertx.httpproxy.HttpProxy; import com.sf.vertx.init.SacVertxConfig; import com.sf.vertx.pojo.ClusterEventMsg; import com.sf.vertx.pojo.SacCurrentLimiting; @@ -50,7 +44,6 @@ import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.http.HttpClient; -import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerOptions; import io.vertx.core.net.JksOptions; @@ -60,8 +53,6 @@ import io.vertx.ext.web.Router; import io.vertx.ext.web.client.WebClient; import io.vertx.ext.web.handler.CorsHandler; import io.vertx.ext.web.handler.HttpException; -import io.vertx.ext.web.proxy.handler.ProxyHandler; -import io.vertx.httpproxy.HttpProxy; import io.vertx.httpproxy.ProxyContext; import io.vertx.httpproxy.ProxyInterceptor; import io.vertx.httpproxy.ProxyResponse; @@ -80,8 +71,6 @@ public class AppConfigHandler { public static Vertx VERTX; private static SacVertxConfig sacVertxConfig; private static RedisTemplate redisTemplate; - public static CircuitBreaker CONNECTION_CIRCUIT_BREAKER; - // global cache app config private static final ConcurrentHashMap CACHE_APP_CONFIG_MAP = new ConcurrentHashMap<>(); // global api config appCode - RateLimiterRegistry private static final ConcurrentHashMap GLOBAL_API_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>(); @@ -112,16 +101,9 @@ public class AppConfigHandler { // apicode uri = * - appConfig private static ConcurrentHashMap APICODE_APPCONFIG_MAP = new ConcurrentHashMap<>(); - // apiCode:appCode:preMock/postMock - private static ConcurrentHashMap MOCK_EXPECTATION_MAP = new ConcurrentHashMap<>(); - // 禁用appCode private static ConcurrentHashSet DISABLED_APPCODE = new ConcurrentHashSet(); - public static ApiConfig getMockApiConfig(String key) { - return MOCK_EXPECTATION_MAP.get(key); - } - public static AppConfig getAppConfigByDomain(String domain) { return APICODE_APPCONFIG_MAP.get(domain); } @@ -143,7 +125,7 @@ public class AppConfigHandler { return APICODE_CONFIG_SERVICE_TYPE_MAP.get(key) != null && StringUtils.equals(APICODE_CONFIG_SERVICE_TYPE_MAP.get(key), "SAC"); } - + public static String getServiceType(String key) { return APICODE_CONFIG_SERVICE_TYPE_MAP.get(key); } @@ -290,8 +272,6 @@ public class AppConfigHandler { APICODE_CONFIG_SERVICE_TYPE_MAP.remove(key); APICODE_CONFIG_CURRENT_LIMITING_MAP.remove(key); APICODE_APPCONFIG_MAP.remove(apiConfig.getApiCode()); - MOCK_EXPECTATION_MAP.remove(key + CACHE_KEY_CONNECTOR + MOCK_PRE); - MOCK_EXPECTATION_MAP.remove(key + CACHE_KEY_CONNECTOR + MOCK_POST); String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER"; CircuitBreaker circuitBreaker = APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(keyCircuitBreaker); if (circuitBreaker != null) { @@ -430,25 +410,6 @@ public class AppConfigHandler { } } } - - // mock - if (apiConfig.getMockDefaultHttpStatus() != null) { - try { - ApiConfig apiConfigMockPre = new ApiConfig(); - ApiConfig apiConfigMockPost = new ApiConfig(); - BeanUtils.copyProperties(apiConfigMockPre, apiConfig); - BeanUtils.copyProperties(apiConfigMockPost, apiConfig); - // pre - buidMockCacheData(apiConfigMockPre, key, true); - // post - buidMockCacheData(apiConfigMockPost, key, false); - // local cache - MOCK_EXPECTATION_MAP.put(key + CACHE_KEY_CONNECTOR + MOCK_PRE, apiConfigMockPre); - MOCK_EXPECTATION_MAP.put(key + CACHE_KEY_CONNECTOR + MOCK_POST, apiConfigMockPost); - } catch (Exception e) { - e.printStackTrace(); - } - } } } @@ -456,46 +417,11 @@ public class AppConfigHandler { } } - private static void buidMockCacheData(ApiConfig apiConfig, String key, boolean isPre) { - List mockExpectations = apiConfig.getMockExpectations(); - if (mockExpectations != null && !mockExpectations.isEmpty()) { - for (MockExpectation mockExpectation : mockExpectations) { - List matchConditions = new ArrayList<>(); - for (MockMatchCondition matchCondition : mockExpectation.getMatchConditions()) { - switch (matchCondition.getParameterPosition()) { - case "query": - if (isServiceTypeSac(key) && isPre == false) { - // 后置 - matchConditions.add(matchCondition); - } - // 前置 - if (isPre) { - matchConditions.add(matchCondition); - } - case "header": - // 前置 - matchConditions.add(matchCondition); - case "body": - // 后置 - if (isPre == false) { - matchConditions.add(matchCondition); - } - default: - break; - } - } - mockExpectation.setMatchConditions(matchConditions); - } - } - } - public static void createVertx() { // TODO 编解码线程池,后面优化协程等方式 VertxOptions vertxOptions = new VertxOptions(); loadVertxOptions(vertxOptions); VERTX = Vertx.vertx(vertxOptions); - - initConnectionCircuitBreaker(); createVertxRouter(); consumerClusterEventMsg(); } @@ -538,7 +464,6 @@ public class AppConfigHandler { if (res.succeeded()) { VERTX = res.result(); log.info("hazelcastClusterManager create success"); - initConnectionCircuitBreaker(); createVertxRouter(); consumerClusterEventMsg(); } else { @@ -610,13 +535,6 @@ public class AppConfigHandler { } }); - // open反向代理 -// HttpClientOptions clientOptions = new HttpClientOptions(); -// clientOptions.setMaxPoolSize(20); // 最大连接池大小 -// clientOptions.setConnectTimeout(2000); // 连接超时 毫秒 -// clientOptions.setHttp2KeepAliveTimeout(1); -// clientOptions.setIdleTimeout(1000); // 连接空闲超时 毫秒 -// HttpClient proxyClient = VERTX.createHttpClient(clientOptions); HttpClient proxyClient = VERTX.createHttpClient(); HttpProxy proxy = HttpProxy.reverseProxy(proxyClient); proxy.originSelector(request -> Future.succeededFuture(ProxyTool.resolveOriginAddress(request))); @@ -627,14 +545,6 @@ public class AppConfigHandler { // // 会跳转到 RestfulFailureHandlerImpl // throw new HttpException(10003); // } - String appCode = context.request().headers().get(getAppCodeHeaderKey()); - String apiCode = context.request().headers().get(getApiCodeHeaderKey()); - String key = appCode + ":" + apiCode; - if (isServiceTypeSac(key)) { - String uri = APICODE_CONFIG_MAP.get(key).getUri(); - String method = APICODE_CONFIG_MAP.get(key).getMethod(); - context.request().setURI(uri).setMethod(HttpMethod.valueOf(method)); - } return context.sendRequest(); } @@ -655,45 +565,19 @@ public class AppConfigHandler { Route routeSac = mainHttpRouter.post(rpcUri()); routeSac.handler(CorsHandler.create().addRelativeOrigin(".*")) .handler(ParameterCheckHandler.create(GatewayServiceType.SAC)) - .handler(AppRateLimitHandler.create(rateLimitModel)) - .handler(ApiRateLimitHandler.create(rateLimitModel)) - .handler(BodyPreCheckHandler.create()) - .handler(BodyHandler.create().setHandleFileUploads(false)) - .handler(BodyPostAnalysisHandler.create()) - .handler(BodyPostCheckHandler.create()) - .handler(SacRouteRequestHandler.create(mainWebClient)) - .handler(ProxyHandler.create(proxy)) + .handler(AppRateLimitHandler.create(rateLimitModel)).handler(ApiRateLimitHandler.create(rateLimitModel)) + .handler(BodyPreCheckHandler.create()).handler(BodyHandler.create().setHandleFileUploads(false)) + .handler(BodyPostAnalysisHandler.create()).handler(BodyPostCheckHandler.create()) + .handler(SacRouteRequestHandler.create(mainWebClient)).handler(ProxyHandler.create(proxy)) .failureHandler(RestfulFailureHandler.create()); Route routeOpen = mainHttpRouter.route(); routeOpen.handler(CorsHandler.create().addRelativeOrigin(".*")) .handler(ParameterCheckHandler.create(GatewayServiceType.OPEN)) - .handler(AppRateLimitHandler.create(rateLimitModel)) - .handler(ApiRateLimitHandler.create(rateLimitModel)) - .handler(BodyPreCheckHandler.create()) - .handler(BodyHandler.create().setHandleFileUploads(false)) - .handler(BodyPostCheckHandler.create()) - .handler(SacRouteRequestHandler.create(mainWebClient)) - .handler(ProxyHandler.create(proxy)) - .failureHandler(RestfulFailureHandler.create()); - } - - /*** - * 初始化connection Breaker - */ - private static void initConnectionCircuitBreaker() { - CONNECTION_CIRCUIT_BREAKER = CircuitBreaker.create("connectionCircuitBreaker-circuit-breaker", VERTX, - new CircuitBreakerOptions().setMaxFailures(3) // 最大失败数 - .setTimeout(2000) // 超时时间 - .setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback) - .setResetTimeout(10000) // 在开启状态下,尝试重试之前所需时间 - ).openHandler(v -> { - log.info("connectionCircuitBreaker Circuit open"); - }).halfOpenHandler(v -> { - log.info("connectionCircuitBreaker Circuit halfOpen"); - }).closeHandler(v -> { - log.info("connectionCircuitBreaker Circuit close"); - }); + .handler(AppRateLimitHandler.create(rateLimitModel)).handler(ApiRateLimitHandler.create(rateLimitModel)) + .handler(BodyPreCheckHandler.create()).handler(BodyHandler.create().setHandleFileUploads(false)) + .handler(BodyPostCheckHandler.create()).handler(SacRouteRequestHandler.create(mainWebClient)) + .handler(ProxyHandler.create(proxy)).failureHandler(RestfulFailureHandler.create()); } private static void loadVertxOptions(VertxOptions vertxOptions) { diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/BaseRoutingContextDataHandler.java b/sf-vertx/src/main/java/com/sf/vertx/handle/BaseRoutingContextDataHandler.java index d8b6e3e..88a80d7 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/BaseRoutingContextDataHandler.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/BaseRoutingContextDataHandler.java @@ -1,9 +1,6 @@ package com.sf.vertx.handle; import static com.sf.vertx.constans.SACConstants.API_SERVICE_TYPE; -import static com.sf.vertx.constans.SACConstants.CACHE_KEY_CONNECTOR; -import static com.sf.vertx.constans.SACConstants.MOCK_POST; -import static com.sf.vertx.constans.SACConstants.MOCK_PRE; import com.sf.vertx.api.pojo.ApiConfig; import com.sf.vertx.api.pojo.AppConfig; @@ -18,16 +15,5 @@ public abstract class BaseRoutingContextDataHandler { AppUtils.setAppConfigToRoutingContext(appConfig, rc); AppUtils.setApiConfigIntoRoutingContext(apiConfig, rc); rc.put(API_SERVICE_TYPE, AppConfigHandler.getServiceType(apiCacheKey)); - - // mock - ApiConfig mockPre = AppConfigHandler.getMockApiConfig(apiCacheKey + CACHE_KEY_CONNECTOR + MOCK_PRE); - if (mockPre != null) { - rc.put(MOCK_PRE, mockPre); - } - - ApiConfig mockPost = AppConfigHandler.getMockApiConfig(apiCacheKey + CACHE_KEY_CONNECTOR + MOCK_POST); - if (mockPre != null) { - rc.put(MOCK_POST, mockPost); - } } } diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/BodyPostCheckHandlerImpl.java b/sf-vertx/src/main/java/com/sf/vertx/handle/BodyPostCheckHandlerImpl.java index 60982f1..2dddef8 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/BodyPostCheckHandlerImpl.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/BodyPostCheckHandlerImpl.java @@ -1,6 +1,5 @@ package com.sf.vertx.handle; -import static com.sf.vertx.constans.SACConstants.MOCK_POST; import com.sf.vertx.api.pojo.ApiConfig; import com.sf.vertx.api.pojo.MockResponse; @@ -15,22 +14,20 @@ import lombok.extern.slf4j.Slf4j; public class BodyPostCheckHandlerImpl implements BodyPostCheckHandler { @Override - public void handle(RoutingContext rc) { + public void handle(RoutingContext ctx) { try { + ApiConfig apiConfig = AppUtils.getApiConfigFromRoutingContext(ctx); // mock - ApiConfig mock = rc.get(MOCK_POST); - if(mock != null) { - MockResponse mockResponse = AppUtils.mock(rc, mock); - if (mockResponse != null) { - rc.fail(new MockException(mockResponse.getHttpStatus(), mockResponse.getMockResponse())); - return; - } - } + MockResponse mockResponse = AppUtils.mock(ctx, apiConfig); + if (mockResponse != null) { + ctx.fail(new MockException(mockResponse.getHttpStatus(), mockResponse.getMockResponse())); + return; + } } catch (Exception e) { - log.error("BodyPreCheckHandlerImpl:",e); - rc.fail(new ServiceException(GatewayError.DEFAULT_SERVICE_ERROR)); + log.error("BodyPostCheckHandlerImpl:",e); + ctx.fail(new ServiceException(GatewayError.DEFAULT_SERVICE_ERROR)); return; } - rc.next(); + ctx.next(); } } diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/BodyPreCheckHandlerImpl.java b/sf-vertx/src/main/java/com/sf/vertx/handle/BodyPreCheckHandlerImpl.java index 24baad5..f03dca3 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/BodyPreCheckHandlerImpl.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/BodyPreCheckHandlerImpl.java @@ -1,13 +1,7 @@ package com.sf.vertx.handle; -import static com.sf.vertx.constans.SACConstants.MOCK_PRE; - -import com.sf.vertx.api.pojo.ApiConfig; -import com.sf.vertx.api.pojo.MockResponse; import com.sf.vertx.enums.GatewayError; -import com.sf.vertx.exception.MockException; import com.sf.vertx.exception.ServiceException; -import com.sf.vertx.utils.AppUtils; import io.vertx.ext.web.RoutingContext; import lombok.extern.slf4j.Slf4j; @@ -16,22 +10,13 @@ import lombok.extern.slf4j.Slf4j; public class BodyPreCheckHandlerImpl implements BodyPreCheckHandler { @Override - public void handle(RoutingContext rc) { + public void handle(RoutingContext ctx) { try { - // mock - ApiConfig mock = rc.get(MOCK_PRE); - if(mock != null) { - MockResponse mockResponse = AppUtils.mock(rc, mock); - if (mockResponse != null) { - rc.fail(new MockException(mockResponse.getHttpStatus(), mockResponse.getMockResponse())); - return; - } - } } catch (Exception e) { log.error("BodyPreCheckHandlerImpl:",e); - rc.fail(new ServiceException(GatewayError.DEFAULT_SERVICE_ERROR)); + ctx.fail(new ServiceException(GatewayError.DEFAULT_SERVICE_ERROR)); return; } - rc.next(); + ctx.next(); } } \ No newline at end of file diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/OpenParameterCheckHandlerImpl.java b/sf-vertx/src/main/java/com/sf/vertx/handle/OpenParameterCheckHandlerImpl.java index 6902225..fbcde44 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/OpenParameterCheckHandlerImpl.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/OpenParameterCheckHandlerImpl.java @@ -34,6 +34,10 @@ public class OpenParameterCheckHandlerImpl extends BaseRoutingContextDataHandler // 设置上下午数据 this.ctxData(rc, appConfig, apiConfig, apiCacheKey); + + // 反向代理负载均衡,需要从header拿到appCode,apiCode + rc.request().headers().add(AppConfigHandler.getAppCodeHeaderKey(), appConfig.getAppCode()); + rc.request().headers().add(AppConfigHandler.getApiCodeHeaderKey(), domain); } catch (Exception e) { log.error("OpenParameterCheckHandlerImpl Error:", e); rc.fail(new ServiceException(GatewayError.DEFAULT_SERVICE_ERROR)); diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandler.java b/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandler.java new file mode 100644 index 0000000..11d9a7e --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandler.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2011-2021 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package com.sf.vertx.handle; + +import com.sf.vertx.httpproxy.HttpProxy; + +import io.vertx.codegen.annotations.VertxGen; +import io.vertx.core.Handler; +import io.vertx.ext.web.RoutingContext; + +/** + * @author Emad Alblueshi + */ + +@VertxGen +public interface ProxyHandler extends Handler { + + static ProxyHandler create(HttpProxy httpProxy) { + return new ProxyHandlerImpl(httpProxy); + } + + static ProxyHandler create(HttpProxy httpProxy, int port, String host) { + return new ProxyHandlerImpl(httpProxy, port, host); + } +} diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandlerImpl.java b/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandlerImpl.java new file mode 100644 index 0000000..9a751c1 --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/ProxyHandlerImpl.java @@ -0,0 +1,27 @@ +package com.sf.vertx.handle; + +import com.sf.vertx.httpproxy.HttpProxy; + +import io.vertx.ext.web.RoutingContext; + +/** + * @author Emad Alblueshi + */ + +public class ProxyHandlerImpl implements ProxyHandler { + + private final HttpProxy httpProxy; + + public ProxyHandlerImpl(HttpProxy httpProxy) { + this.httpProxy = httpProxy; + } + + public ProxyHandlerImpl(HttpProxy httpProxy, int port, String host) { + this.httpProxy = httpProxy.origin(port, host); + } + + @Override + public void handle(RoutingContext ctx) { + httpProxy.handle(ctx.request()); + } +} diff --git a/sf-vertx/src/main/java/com/sf/vertx/handle/SacRouteRequestHandlerImpl.java b/sf-vertx/src/main/java/com/sf/vertx/handle/SacRouteRequestHandlerImpl.java index 2859630..580bbbb 100644 --- a/sf-vertx/src/main/java/com/sf/vertx/handle/SacRouteRequestHandlerImpl.java +++ b/sf-vertx/src/main/java/com/sf/vertx/handle/SacRouteRequestHandlerImpl.java @@ -2,7 +2,7 @@ package com.sf.vertx.handle; import static com.sf.vertx.constans.SACConstants.API_SERVICE_TYPE; import static org.apache.commons.lang3.StringUtils.EMPTY; - +import static com.sf.vertx.constans.SACConstants.API_SERVICE_TYPE_SAC; import java.util.HashMap; import java.util.Map; @@ -19,6 +19,7 @@ import com.sf.vertx.utils.ProxyTool; import cn.hutool.core.util.StrUtil; import io.vertx.circuitbreaker.CircuitBreaker; import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpHeaders; import io.vertx.core.json.JsonObject; import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; @@ -110,8 +111,8 @@ public class SacRouteRequestHandlerImpl implements SacRouteRequestHandler { private HttpRequest buildCallRequestBuffer(RoutingContext ctx, String body, ApiConfig apiConfig) { SocketAddress socketAddress = ProxyTool.resolveOriginAddress(ctx); HttpRequest requestBuffer; - String requestURI = ctx.request().uri(); - switch (ctx.request().method().name()) { + String requestURI = apiConfig.getUri(); + switch (apiConfig.getMethod()) { case "PUT": requestBuffer = mainWebClient.put(socketAddress.port(), socketAddress.host(), requestURI); break; @@ -153,10 +154,10 @@ public class SacRouteRequestHandlerImpl implements SacRouteRequestHandler { // 判断body是否解析 AppConfig appConfig = AppUtils.getAppConfigFromRoutingContext(ctx); ApiConfig apiConfig = AppUtils.getApiConfigFromRoutingContext(ctx); - String apiServiceType = ctx.get(API_SERVICE_TYPE); - String keyCircuitBreaker = appConfig.getAppCode() + ":" + apiConfig.getApiCode() + ":" + "CIRCUIT_BREAKER"; - CircuitBreaker circuitBreaker = AppConfigHandler.getApiCodeCircuitBreaker(keyCircuitBreaker); - if (AppUtils.isAnalysisBody(appConfig, apiConfig, apiServiceType)) { + String contentType = ctx.request().headers().get(HttpHeaders.CONTENT_TYPE); + if (AppUtils.isAnalysisBody(appConfig, apiConfig, contentType)) { + String keyCircuitBreaker = appConfig.getAppCode() + ":" + apiConfig.getApiCode() + ":" + "CIRCUIT_BREAKER"; + CircuitBreaker circuitBreaker = AppConfigHandler.getApiCodeCircuitBreaker(keyCircuitBreaker); String body = AppUtils.isEnableDataSecurity(appConfig) ? AppConfigHandler.bodyDecrypt(ctx.body().asString(), appConfig.getAppCode()) : ctx.body().asString(); diff --git a/sf-vertx/src/main/java/com/sf/vertx/httpproxy/HttpProxy.java b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/HttpProxy.java new file mode 100644 index 0000000..1979174 --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/HttpProxy.java @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2011-2020 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package com.sf.vertx.httpproxy; + +import io.vertx.codegen.annotations.Fluent; +import io.vertx.codegen.annotations.GenIgnore; +import io.vertx.codegen.annotations.VertxGen; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpClientRequest; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.RequestOptions; +import io.vertx.core.net.SocketAddress; +import io.vertx.httpproxy.ProxyInterceptor; +import io.vertx.httpproxy.ProxyOptions; + +import com.sf.vertx.httpproxy.impl.ReverseProxy; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * Handles the HTTP reverse proxy logic between the user agent and the origin. + *

+ * @author Julien Viet + */ +@VertxGen +public interface HttpProxy extends Handler { + + /** + * Create a new {@code HttpProxy} instance. + * + * @param client the {@code HttpClient} that forwards outbound requests to the origin. + * @return a reference to this, so the API can be used fluently. + */ + static HttpProxy reverseProxy(HttpClient client) { + return new ReverseProxy(new ProxyOptions(), client); + } + + /** + * Create a new {@code HttpProxy} instance. + * + * @param client the {@code HttpClient} that forwards outbound requests to the origin. + * @return a reference to this, so the API can be used fluently. + */ + static HttpProxy reverseProxy(ProxyOptions options, HttpClient client) { + return new ReverseProxy(options, client); + } + + /** + * Set the {@code SocketAddress} of the origin. + * + * @param address the {@code SocketAddress} of the origin + * @return a reference to this, so the API can be used fluently + */ + @Fluent + default HttpProxy origin(SocketAddress address) { + return originSelector(req -> Future.succeededFuture(address)); + } + + /** + * Set the host name and port number of the origin. + * + * @param port the port number of the origin server + * @param host the host name of the origin server + * @return a reference to this, so the API can be used fluently + */ + @Fluent + default HttpProxy origin(int port, String host) { + return origin(SocketAddress.inetSocketAddress(port, host)); + } + + /** + * Set a selector that resolves the origin address based on the incoming HTTP request. + * + * @param selector the selector + * @return a reference to this, so the API can be used fluently + */ + @Fluent + default HttpProxy originSelector(Function> selector) { + return originRequestProvider((req, client) -> selector + .apply(req) + .flatMap(server -> client.request(new RequestOptions().setServer(server)))); + } + + /** + * Set a provider that creates the request to the origin server based the incoming HTTP request. + * Setting a provider overrides any origin selector previously set. + * + * @param provider the provider + * @return a reference to this, so the API can be used fluently + */ + @GenIgnore() + @Fluent + HttpProxy originRequestProvider(BiFunction> provider); + + /** + * Add an interceptor to the interceptor chain. + * + * @param interceptor + * @return a reference to this, so the API can be used fluently + */ + @Fluent + HttpProxy addInterceptor(ProxyInterceptor interceptor); + + /** + * Handle the outbound {@code HttpServerRequest}. + * + * @param request the outbound {@code HttpServerRequest} + */ + void handle(HttpServerRequest request); + +} diff --git a/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/BufferedReadStream.java b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/BufferedReadStream.java new file mode 100644 index 0000000..19439e8 --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/BufferedReadStream.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2011-2020 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package com.sf.vertx.httpproxy.impl; + +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.ReadStream; + +public class BufferedReadStream implements ReadStream { + + private long demand = 0L; + private Handler endHandler; + private Handler handler; + private boolean ended = false; + private final Buffer content; + + public BufferedReadStream() { + this.content = Buffer.buffer(); + } + + public BufferedReadStream(Buffer content) { + this.content = content; + } + + @Override + public ReadStream exceptionHandler(Handler handler) { + return this; + } + + @Override + public ReadStream handler(Handler handler) { + this.handler = handler; + return this; + } + + @Override + public ReadStream pause() { + demand = 0L; + return this; + } + + @Override + public ReadStream resume() { + fetch(Long.MAX_VALUE); + return this; + } + + @Override + public ReadStream fetch(long amount) { + if (!ended && amount > 0) { + ended = true; + demand += amount; + if (demand < 0L) { + demand = Long.MAX_VALUE; + } + if (demand != Long.MAX_VALUE) { + demand--; + } + if (handler != null && content.length() > 0) { + handler.handle(content); + } + if (endHandler != null) { + endHandler.handle(null); + } + } + return this; + } + + @Override + public ReadStream endHandler(Handler endHandler) { + this.endHandler = endHandler; + return this; + } +} diff --git a/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/BufferingReadStream.java b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/BufferingReadStream.java new file mode 100644 index 0000000..95e915d --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/BufferingReadStream.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2011-2020 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package com.sf.vertx.httpproxy.impl; + +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.ReadStream; + +class BufferingReadStream implements ReadStream { + + private final ReadStream stream; + private final Buffer content; + private Handler endHandler; + + public BufferingReadStream(ReadStream stream, Buffer content) { + this.stream = stream; + this.content = content; + } + + @Override + public ReadStream exceptionHandler(Handler handler) { + stream.exceptionHandler(handler); + return this; + } + + @Override + public ReadStream handler(Handler handler) { + if (handler != null) { + stream.handler(buff -> { + content.appendBuffer(buff); + handler.handle(buff); + }); + } else { + stream.handler(null); + } + return this; + } + + @Override + public ReadStream pause() { + stream.pause(); + return this; + } + + @Override + public ReadStream resume() { + stream.resume(); + return this; + } + + @Override + public ReadStream fetch(long amount) { + stream.fetch(amount); + return this; + } + + @Override + public ReadStream endHandler(Handler endHandler) { + if (endHandler != null) { + stream.endHandler(v -> { + endHandler.handle(null); + }); + } else { + stream.endHandler(null); + } + return this; + } +} diff --git a/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/BufferingWriteStream.java b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/BufferingWriteStream.java new file mode 100644 index 0000000..2176dd4 --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/BufferingWriteStream.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2011-2020 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package com.sf.vertx.httpproxy.impl; + +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; + +class BufferingWriteStream implements WriteStream { + + private final Buffer content; + + public BufferingWriteStream() { + this.content = Buffer.buffer(); + } + + public Buffer content() { + return content; + } + + @Override + public WriteStream exceptionHandler(Handler handler) { + return this; + } + + @Override + public Future write(Buffer data) { + content.appendBuffer(data); + return Future.succeededFuture(); + } + + @Override + public void write(Buffer data, Handler> handler) { + content.appendBuffer(data); + handler.handle(Future.succeededFuture()); + } + + @Override + public void end(Handler> handler) { + handler.handle(Future.succeededFuture()); + } + + @Override + public WriteStream setWriteQueueMaxSize(int maxSize) { + return this; + } + + @Override + public boolean writeQueueFull() { + return false; + } + + @Override + public WriteStream drainHandler(@Nullable Handler handler) { + return this; + } +} diff --git a/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/CacheControl.java b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/CacheControl.java new file mode 100644 index 0000000..7aac807 --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/CacheControl.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2011-2020 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package com.sf.vertx.httpproxy.impl; + +/** + * @author Julien Viet + */ +public class CacheControl { + + private int maxAge; + private boolean _public; + + public CacheControl parse(String header) { + maxAge = -1; + _public = false; + String[] parts = header.split(","); // No regex + for (String part : parts) { + part = part.trim().toLowerCase(); + switch (part) { + case "public": + _public = true; + break; + default: + if (part.startsWith("max-age=")) { + maxAge = Integer.parseInt(part.substring(8)); + + } + break; + } + } + return this; + } + + public int maxAge() { + return maxAge; + } + + public boolean isPublic() { + return _public; + } + +} diff --git a/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/CacheImpl.java b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/CacheImpl.java new file mode 100644 index 0000000..57eb7f3 --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/CacheImpl.java @@ -0,0 +1,23 @@ +package com.sf.vertx.httpproxy.impl; + +import io.vertx.httpproxy.cache.CacheOptions; +import io.vertx.httpproxy.spi.cache.Cache; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Simplistic implementation. + */ +public class CacheImpl extends LinkedHashMap implements Cache { + + private final int maxSize; + + public CacheImpl(CacheOptions options) { + this.maxSize = options.getMaxSize(); + } + + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > maxSize; + } +} diff --git a/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/CachingFilter.java b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/CachingFilter.java new file mode 100644 index 0000000..fa7ad06 --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/CachingFilter.java @@ -0,0 +1,150 @@ +package com.sf.vertx.httpproxy.impl; + +import io.vertx.core.Future; +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.httpproxy.Body; +import io.vertx.httpproxy.ProxyContext; +import io.vertx.httpproxy.ProxyInterceptor; +import io.vertx.httpproxy.ProxyRequest; +import io.vertx.httpproxy.ProxyResponse; +import io.vertx.httpproxy.spi.cache.Cache; + +import java.time.Instant; +import java.util.function.BiFunction; + +class CachingFilter implements ProxyInterceptor { + + private static final BiFunction CACHE_GET_AND_VALIDATE = (key, resource) -> { + long now = System.currentTimeMillis(); + long val = resource.timestamp + resource.maxAge; + return val < now ? null : resource; + }; + + private final Cache cache; + + public CachingFilter(Cache cache) { + this.cache = cache; + } + + @Override + public Future handleProxyRequest(ProxyContext context) { + Future future = tryHandleProxyRequestFromCache(context); + if (future != null) { + return future; + } + return context.sendRequest(); + } + + @Override + public Future handleProxyResponse(ProxyContext context) { + return sendAndTryCacheProxyResponse(context); + } + + private Future sendAndTryCacheProxyResponse(ProxyContext context) { + + ProxyResponse response = context.response(); + Resource cached = context.get("cached_resource", Resource.class); + + if (cached != null && response.getStatusCode() == 304) { + // Warning: this relies on the fact that HttpServerRequest will not send a body for HEAD + response.release(); + cached.init(response); + return context.sendResponse(); + } + + ProxyRequest request = response.request(); + if (response.publicCacheControl() && response.maxAge() > 0) { + if (request.getMethod() == HttpMethod.GET) { + String absoluteUri = request.absoluteURI(); + Resource res = new Resource( + absoluteUri, + response.getStatusCode(), + response.getStatusMessage(), + response.headers(), + System.currentTimeMillis(), + response.maxAge()); + Body body = response.getBody(); + response.setBody(Body.body(new BufferingReadStream(body.stream(), res.content), body.length())); + Future fut = context.sendResponse(); + fut.onSuccess(v -> { + cache.put(absoluteUri, res); + }); + return fut; + } else { + if (request.getMethod() == HttpMethod.HEAD) { + Resource resource = cache.get(request.absoluteURI()); + if (resource != null) { + if (!revalidateResource(response, resource)) { + // Invalidate cache + cache.remove(request.absoluteURI()); + } + } + } + return context.sendResponse(); + } + } else { + return context.sendResponse(); + } + } + + private static boolean revalidateResource(ProxyResponse response, Resource resource) { + if (resource.etag != null && response.etag() != null) { + return resource.etag.equals(response.etag()); + } + return true; + } + + private Future tryHandleProxyRequestFromCache(ProxyContext context) { + + ProxyRequest proxyRequest = context.request(); + + HttpServerRequest response = proxyRequest.proxiedRequest(); + + Resource resource; + HttpMethod method = response.method(); + if (method == HttpMethod.GET || method == HttpMethod.HEAD) { + String cacheKey = proxyRequest.absoluteURI(); + resource = cache.computeIfPresent(cacheKey, CACHE_GET_AND_VALIDATE); + if (resource == null) { + return null; + } + } else { + return null; + } + + String cacheControlHeader = response.getHeader(HttpHeaders.CACHE_CONTROL); + if (cacheControlHeader != null) { + CacheControl cacheControl = new CacheControl().parse(cacheControlHeader); + if (cacheControl.maxAge() >= 0) { + long now = System.currentTimeMillis(); + long currentAge = now - resource.timestamp; + if (currentAge > cacheControl.maxAge() * 1000) { + String etag = resource.headers.get(HttpHeaders.ETAG); + if (etag != null) { + proxyRequest.headers().set(HttpHeaders.IF_NONE_MATCH, resource.etag); + context.set("cached_resource", resource); + return context.sendRequest(); + } else { + return null; + } + } + } + } + + // + String ifModifiedSinceHeader = response.getHeader(HttpHeaders.IF_MODIFIED_SINCE); + if ((response.method() == HttpMethod.GET || response.method() == HttpMethod.HEAD) && ifModifiedSinceHeader != null && resource.lastModified != null) { + Instant ifModifiedSince = ParseUtils.parseHeaderDate(ifModifiedSinceHeader); + if (!ifModifiedSince.isAfter(resource.lastModified)) { + response.response().setStatusCode(304).end(); + return Future.succeededFuture(); + } + } + proxyRequest.release(); + ProxyResponse proxyResponse = proxyRequest.response(); + resource.init(proxyResponse); + return Future.succeededFuture(proxyResponse); + } +} diff --git a/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/HttpUtils.java b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/HttpUtils.java new file mode 100644 index 0000000..936223b --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/HttpUtils.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2011-2020 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package com.sf.vertx.httpproxy.impl; + +import io.vertx.core.MultiMap; +import io.vertx.core.http.HttpHeaders; + +import java.time.Instant; +import java.util.List; + +class HttpUtils { + + static Boolean isChunked(MultiMap headers) { + List te = headers.getAll("transfer-encoding"); + if (te != null) { + boolean chunked = false; + for (String val : te) { + if (val.equals("chunked")) { + chunked = true; + } else { + return null; + } + } + return chunked; + } else { + return false; + } + } + + static Instant dateHeader(MultiMap headers) { + String dateHeader = headers.get(HttpHeaders.DATE); + if (dateHeader == null) { + List warningHeaders = headers.getAll("warning"); + if (warningHeaders.size() > 0) { + for (String warningHeader : warningHeaders) { + Instant date = ParseUtils.parseWarningHeaderDate(warningHeader); + if (date != null) { + return date; + } + } + } + return null; + } else { + return ParseUtils.parseHeaderDate(dateHeader); + } + } +} diff --git a/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/ParseUtils.java b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/ParseUtils.java new file mode 100644 index 0000000..91e7d0a --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/ParseUtils.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2011-2020 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package com.sf.vertx.httpproxy.impl; + +import java.time.*; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.util.*; + +/** + * @author Julien Viet + */ +public class ParseUtils { + + public static final DateTimeFormatter RFC_850_DATE_TIME = new DateTimeFormatterBuilder() + .appendPattern("EEEE, dd-MMM-yy HH:mm:ss") + .parseLenient() + .appendLiteral(" GMT") + .toFormatter(Locale.US) + .withZone(ZoneId.of("UTC")); + + public static final DateTimeFormatter ASC_TIME = new DateTimeFormatterBuilder() + .appendPattern("EEE MMM d HH:mm:ss yyyy") + .parseLenient() + .toFormatter(Locale.US) + .withZone(ZoneId.of("UTC")); + + public static Instant parseHeaderDate(String value) { + try { + return parseHttpDate(value); + } catch (Exception e) { + return null; + } + } + + public static Instant parseWarningHeaderDate(String value) { + // warn-code + int index = value.indexOf(' '); + if (index > 0) { + // warn-agent + index = value.indexOf(' ', index + 1); + if (index > 0) { + // warn-text + index = value.indexOf(' ', index + 1); + if (index > 0) { + // warn-date + int len = value.length(); + if (index + 2 < len && value.charAt(index + 1) == '"' && value.charAt(len - 1) == '"') { + // Space for 2 double quotes + return parseHeaderDate(value.substring(index + 2, len - 1)); + } + } + } + } + return null; + } + + public static String formatHttpDate(Instant date) { + return DateTimeFormatter.RFC_1123_DATE_TIME.format(OffsetDateTime.ofInstant(date, ZoneOffset.UTC)); + } + + // https://www.rfc-editor.org/rfc/rfc9110#http.date + public static Instant parseHttpDate(String value) throws Exception { + int pos = value.indexOf(','); + if (pos == 3) { // e.g. Sun, 06 Nov 1994 08:49:37 GMT + return DateTimeFormatter.RFC_1123_DATE_TIME.parse(value, Instant::from); + } + if (pos == -1) { // e.g. Sun Nov 6 08:49:37 1994 + return ASC_TIME.parse(value, Instant::from); + } + return RFC_850_DATE_TIME.parse(value, Instant::from); // e.g. Sunday, 06-Nov-94 08:49:37 GMT + } +} diff --git a/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/ProxiedRequest.java b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/ProxiedRequest.java new file mode 100644 index 0000000..cba573a --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/ProxiedRequest.java @@ -0,0 +1,230 @@ +/* + * Copyright (c) 2011-2020 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package com.sf.vertx.httpproxy.impl; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.Promise; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClientRequest; +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpVersion; +import io.vertx.core.http.impl.HttpServerRequestInternal; +import io.vertx.core.impl.ContextInternal; +import io.vertx.core.net.HostAndPort; +import io.vertx.core.streams.Pipe; +import io.vertx.httpproxy.Body; +import io.vertx.httpproxy.ProxyRequest; +import io.vertx.httpproxy.ProxyResponse; + +import java.util.Map; +import java.util.Objects; + +public class ProxiedRequest implements ProxyRequest { + + private static final CharSequence X_FORWARDED_HOST = HttpHeaders.createOptimized("x-forwarded-host"); + + private static final MultiMap HOP_BY_HOP_HEADERS = MultiMap.caseInsensitiveMultiMap() + .add(HttpHeaders.CONNECTION, "whatever") + .add(HttpHeaders.KEEP_ALIVE, "whatever") + .add(HttpHeaders.PROXY_AUTHENTICATE, "whatever") + .add(HttpHeaders.PROXY_AUTHORIZATION, "whatever") + .add("te", "whatever") + .add("trailer", "whatever") + .add(HttpHeaders.TRANSFER_ENCODING, "whatever") + .add(HttpHeaders.UPGRADE, "whatever"); + + final ContextInternal context; + private HttpMethod method; + private final HttpVersion version; + private String uri; + private final String absoluteURI; + private Body body; + private HostAndPort authority; + private final MultiMap headers; + HttpClientRequest request; + private final HttpServerRequest proxiedRequest; + + public ProxiedRequest(HttpServerRequest proxiedRequest) { + + // Determine content length + long contentLength = -1L; + String contentLengthHeader = proxiedRequest.getHeader(HttpHeaders.CONTENT_LENGTH); + if (contentLengthHeader != null) { + try { + contentLength = Long.parseLong(contentLengthHeader); + } catch (NumberFormatException e) { + // Ignore ??? + } + } + + this.method = proxiedRequest.method(); + this.version = proxiedRequest.version(); + this.body = Body.body(proxiedRequest, contentLength); + this.uri = proxiedRequest.uri(); + this.headers = MultiMap.caseInsensitiveMultiMap().addAll(proxiedRequest.headers()); + this.absoluteURI = proxiedRequest.absoluteURI(); + this.proxiedRequest = proxiedRequest; + this.context = (ContextInternal) ((HttpServerRequestInternal) proxiedRequest).context(); + this.authority = proxiedRequest.authority(); + } + + @Override + public HttpVersion version() { + return version; + } + + @Override + public String getURI() { + return uri; + } + + @Override + public ProxyRequest setURI(String uri) { + this.uri = uri; + return this; + } + + @Override + public Body getBody() { + return body; + } + + @Override + public ProxyRequest setBody(Body body) { + this.body = body; + return this; + } + + @Override + public ProxyRequest setAuthority(HostAndPort authority) { + Objects.requireNonNull(authority); + this.authority= authority; + return this; + } + + @Override + public HostAndPort getAuthority() { + return authority; + } + + @Override + public String absoluteURI() { + return absoluteURI; + } + + @Override + public HttpMethod getMethod() { + return method; + } + + @Override + public ProxyRequest setMethod(HttpMethod method) { + this.method = method; + return this; + } + + @Override + public HttpServerRequest proxiedRequest() { + return proxiedRequest; + } + + @Override + public ProxyRequest release() { + body.stream().resume(); + headers.clear(); + body = null; + return this; + } + + @Override + public ProxyResponse response() { + return new ProxiedResponse(this, proxiedRequest.response()); + } + + void sendRequest(Handler> responseHandler) { + + request.response().map(r -> { + r.pause(); // Pause it + return new ProxiedResponse(this, proxiedRequest.response(), r); + }).onComplete(responseHandler); + + + request.setMethod(method); + request.setURI(uri); + + // Add all headers + for (Map.Entry header : headers) { + String name = header.getKey(); + String value = header.getValue(); + if (!HOP_BY_HOP_HEADERS.contains(name) && !name.equals("host")) { + request.headers().add(name, value); + } + } + + // + if (authority != null) { + request.authority(authority); + HostAndPort proxiedAuthority = proxiedRequest.authority(); + if (!equals(authority, proxiedAuthority)) { + // Should cope with existing forwarded host headers + request.putHeader(X_FORWARDED_HOST, proxiedAuthority.toString()); + } + } + + long len = body.length(); + if (len >= 0) { + request.putHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(len)); + } else { + Boolean isChunked = HttpUtils.isChunked(proxiedRequest.headers()); + request.setChunked(len == -1 && Boolean.TRUE == isChunked); + } + + Pipe pipe = body.stream().pipe(); + pipe.endOnComplete(true); + pipe.endOnFailure(false); + pipe.to(request, ar -> { + if (ar.failed()) { + request.reset(); + } + }); + } + + private static boolean equals(HostAndPort hp1, HostAndPort hp2) { + if (hp1 == null || hp2 == null) { + return false; + } + return hp1.host().equals(hp2.host()) && hp1.port() == hp2.port(); + } + + @Override + public ProxyRequest putHeader(CharSequence name, CharSequence value) { + headers.set(name, value); + return this; + } + + @Override + public MultiMap headers() { + return headers; + } + + @Override + public Future send(HttpClientRequest request) { + Promise promise = context.promise(); + this.request = request; + sendRequest(promise); + return promise.future(); + } +} diff --git a/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/ProxiedResponse.java b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/ProxiedResponse.java new file mode 100644 index 0000000..a6236c9 --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/ProxiedResponse.java @@ -0,0 +1,271 @@ +/* + * Copyright (c) 2011-2020 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package com.sf.vertx.httpproxy.impl; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.Promise; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClientResponse; +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.http.HttpVersion; +import io.vertx.core.streams.Pipe; +import io.vertx.core.streams.ReadStream; +import io.vertx.httpproxy.Body; +import io.vertx.httpproxy.ProxyRequest; +import io.vertx.httpproxy.ProxyResponse; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +class ProxiedResponse implements ProxyResponse { + + private final ProxiedRequest request; + private final HttpServerResponse proxiedResponse; + private int statusCode; + private String statusMessage; + private Body body; + private final MultiMap headers; + private HttpClientResponse response; + private long maxAge; + private String etag; + private boolean publicCacheControl; + + ProxiedResponse(ProxiedRequest request, HttpServerResponse proxiedResponse) { + this.response = null; + this.statusCode = 200; + this.headers = MultiMap.caseInsensitiveMultiMap(); + this.request = request; + this.proxiedResponse = proxiedResponse; + } + + ProxiedResponse(ProxiedRequest request, HttpServerResponse proxiedResponse, HttpClientResponse response) { + + // Determine content length + long contentLength = -1L; + String contentLengthHeader = response.getHeader(HttpHeaders.CONTENT_LENGTH); + if (contentLengthHeader != null) { + try { + contentLength = Long.parseLong(contentLengthHeader); + } catch (NumberFormatException e) { + // Ignore ??? + } + } + + this.request = request; + this.response = response; + this.proxiedResponse = proxiedResponse; + this.statusCode = response.statusCode(); + this.statusMessage = response.statusMessage(); + this.body = Body.body(response, contentLength); + + long maxAge = -1; + boolean publicCacheControl = false; + String cacheControlHeader = response.getHeader(HttpHeaders.CACHE_CONTROL); + if (cacheControlHeader != null) { + CacheControl cacheControl = new CacheControl().parse(cacheControlHeader); + if (cacheControl.isPublic()) { + publicCacheControl = true; + if (cacheControl.maxAge() > 0) { + maxAge = (long)cacheControl.maxAge() * 1000; + } else { + String dateHeader = response.getHeader(HttpHeaders.DATE); + String expiresHeader = response.getHeader(HttpHeaders.EXPIRES); + if (dateHeader != null && expiresHeader != null) { + maxAge = ParseUtils.parseHeaderDate(expiresHeader).toEpochMilli() - ParseUtils.parseHeaderDate(dateHeader).toEpochMilli(); + } + } + } + } + this.maxAge = maxAge; + this.publicCacheControl = publicCacheControl; + this.etag = response.getHeader(HttpHeaders.ETAG); + this.headers = MultiMap.caseInsensitiveMultiMap().addAll(response.headers()); + } + + @Override + public ProxyRequest request() { + return request; + } + + @Override + public int getStatusCode() { + return statusCode; + } + + @Override + public ProxyResponse setStatusCode(int sc) { + statusCode = sc; + return this; + } + + @Override + public String getStatusMessage() { + return statusMessage; + } + + @Override + public ProxyResponse setStatusMessage(String statusMessage) { + this.statusMessage = statusMessage; + return this; + } + + @Override + public Body getBody() { + return body; + } + + @Override + public ProxyResponse setBody(Body body) { + this.body = body; + return this; + } + + @Override + public boolean publicCacheControl() { + return publicCacheControl; + } + + @Override + public long maxAge() { + return maxAge; + } + + @Override + public String etag() { + return etag; + } + + @Override + public MultiMap headers() { + return headers; + } + + @Override + public ProxyResponse putHeader(CharSequence name, CharSequence value) { + headers.set(name, value); + return this; + } + + @Override + public Future send() { + Promise promise = request.context.promise(); + send(promise); + return promise.future(); + } + + public void send(Handler> completionHandler) { + // Set stuff + proxiedResponse.setStatusCode(statusCode); + + if(statusMessage != null) { + proxiedResponse.setStatusMessage(statusMessage); + } + + // Date header + Instant date = HttpUtils.dateHeader(headers); + if (date == null) { + date = Instant.now(); + } + try { + proxiedResponse.putHeader("date", ParseUtils.formatHttpDate(date)); + } catch (Exception e) { + e.printStackTrace(); + } + + // Warning header + List warningHeaders = headers.getAll("warning"); + if (warningHeaders.size() > 0) { + warningHeaders = new ArrayList<>(warningHeaders); + String dateHeader = headers.get("date"); + Instant dateInstant = dateHeader != null ? ParseUtils.parseHeaderDate(dateHeader) : null; + Iterator i = warningHeaders.iterator(); + // Suppress incorrect warning header + while (i.hasNext()) { + String warningHeader = i.next(); + Instant warningInstant = ParseUtils.parseWarningHeaderDate(warningHeader); + if (warningInstant != null && dateInstant != null && !warningInstant.equals(dateInstant)) { + i.remove(); + } + } + } + proxiedResponse.putHeader("warning", warningHeaders); + + // Handle other headers + headers.forEach(header -> { + String name = header.getKey(); + String value = header.getValue(); + if (name.equalsIgnoreCase("date") || name.equalsIgnoreCase("warning") || name.equalsIgnoreCase("transfer-encoding")) { + // Skip + } else { + proxiedResponse.headers().add(name, value); + } + }); + + // + if (body == null) { + proxiedResponse.end(); + return; + } + + long len = body.length(); + if (len >= 0) { + proxiedResponse.putHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(len)); + } else { + if (request.proxiedRequest().version() == HttpVersion.HTTP_1_0) { + // Special handling for HTTP 1.0 clients that cannot handle chunked encoding + // we need to buffer the content + BufferingWriteStream buffer = new BufferingWriteStream(); + body.stream().pipeTo(buffer, ar -> { + if (ar.succeeded()) { + Buffer content = buffer.content(); + proxiedResponse.end(content, completionHandler); + } else { + System.out.println("Not implemented"); + } + }); + return; + } + proxiedResponse.setChunked(true); + } + ReadStream bodyStream = body.stream(); + sendResponse(bodyStream, completionHandler); + } + + @Override + public ProxyResponse release() { + if (response != null) { + response.resume(); + response = null; + body = null; + headers.clear(); + } + return this; + } + + private void sendResponse(ReadStream body, Handler> completionHandler) { + Pipe pipe = body.pipe(); + pipe.endOnSuccess(true); + pipe.endOnFailure(false); + pipe.to(proxiedResponse, ar -> { + if (ar.failed()) { + request.request.reset(); + proxiedResponse.reset(); + } + completionHandler.handle(ar); + }); + } +} diff --git a/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/Resource.java b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/Resource.java new file mode 100644 index 0000000..6899882 --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/Resource.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2011-2020 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package com.sf.vertx.httpproxy.impl; + +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpHeaders; +import io.vertx.httpproxy.Body; +import io.vertx.httpproxy.ProxyResponse; + +import java.time.Instant; + +class Resource { + + final String absoluteUri; + final int statusCode; + final String statusMessage; + final MultiMap headers; + final long timestamp; + final long maxAge; + final Instant lastModified; + final String etag; + final Buffer content = Buffer.buffer(); + + Resource(String absoluteUri, int statusCode, String statusMessage, MultiMap headers, long timestamp, long maxAge) { + String lastModifiedHeader = headers.get(HttpHeaders.LAST_MODIFIED); + this.absoluteUri = absoluteUri; + this.statusCode = statusCode; + this.statusMessage = statusMessage; + this.headers = headers; + this.timestamp = timestamp; + this.maxAge = maxAge; + this.lastModified = lastModifiedHeader != null ? ParseUtils.parseHeaderDate(lastModifiedHeader) : null; + this.etag = headers.get(HttpHeaders.ETAG); + } + + void init(ProxyResponse proxyResponse) { + proxyResponse.setStatusCode(200); + proxyResponse.setStatusMessage(statusMessage); + proxyResponse.headers().addAll(headers); + proxyResponse.setBody(Body.body(content)); + } +} diff --git a/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/ReverseProxy.java b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/ReverseProxy.java new file mode 100644 index 0000000..4cfa5e5 --- /dev/null +++ b/sf-vertx/src/main/java/com/sf/vertx/httpproxy/impl/ReverseProxy.java @@ -0,0 +1,259 @@ +/* + * Copyright (c) 2011-2020 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package com.sf.vertx.httpproxy.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.function.BiFunction; + +import com.sf.vertx.handle.AppConfigHandler; +import com.sf.vertx.httpproxy.HttpProxy; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpClientRequest; +import io.vertx.core.http.HttpClientResponse; +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.net.NetSocket; +import io.vertx.httpproxy.ProxyContext; +import io.vertx.httpproxy.ProxyInterceptor; +import io.vertx.httpproxy.ProxyOptions; +import io.vertx.httpproxy.ProxyRequest; +import io.vertx.httpproxy.ProxyResponse; +import io.vertx.httpproxy.cache.CacheOptions; +import io.vertx.httpproxy.spi.cache.Cache; + +public class ReverseProxy implements HttpProxy { + + private final HttpClient client; + private final boolean supportWebSocket; + private BiFunction> selector = (req, client) -> Future.failedFuture("No origin available"); + private final List interceptors = new ArrayList<>(); + + public ReverseProxy(ProxyOptions options, HttpClient client) { + CacheOptions cacheOptions = options.getCacheOptions(); + if (cacheOptions != null) { + Cache cache = cacheOptions.newCache(); + addInterceptor(new CachingFilter(cache)); + } + this.client = client; + this.supportWebSocket = options.getSupportWebSocket(); + } + + @Override + public HttpProxy originRequestProvider(BiFunction> provider) { + selector = provider; + return this; + } + + @Override + public HttpProxy addInterceptor(ProxyInterceptor interceptor) { + interceptors.add(interceptor); + return this; + } + + + @Override + public void handle(HttpServerRequest request) { + ProxyRequest proxyRequest = ProxyRequest.reverseProxy(request); + + // Encoding sanity check + Boolean chunked = HttpUtils.isChunked(request.headers()); + if (chunked == null) { + end(proxyRequest, 400); + return; + } + + // WebSocket upgrade tunneling + if (supportWebSocket && io.vertx.core.http.impl.HttpUtils.canUpgradeToWebSocket(request)) { + handleWebSocketUpgrade(proxyRequest); + return; + } + + Proxy proxy = new Proxy(proxyRequest); + proxy.filters = interceptors.listIterator(); + proxy.sendRequest().compose(proxy::sendProxyResponse); + } + + private void handleWebSocketUpgrade(ProxyRequest proxyRequest) { + HttpServerRequest proxiedRequest = proxyRequest.proxiedRequest(); + resolveOrigin(proxiedRequest).onComplete(ar -> { + if (ar.succeeded()) { + HttpClientRequest request = ar.result(); + request.setMethod(HttpMethod.GET); + request.setURI(proxiedRequest.uri()); + request.headers().addAll(proxiedRequest.headers()); + Future fut2 = request.connect(); + proxiedRequest.handler(request::write); + proxiedRequest.endHandler(v -> request.end()); + proxiedRequest.resume(); + fut2.onComplete(ar2 -> { + if (ar2.succeeded()) { + HttpClientResponse proxiedResponse = ar2.result(); + if (proxiedResponse.statusCode() == 101) { + HttpServerResponse response = proxiedRequest.response(); + response.setStatusCode(101); + response.headers().addAll(proxiedResponse.headers()); + Future otherso = proxiedRequest.toNetSocket(); + otherso.onComplete(ar3 -> { + if (ar3.succeeded()) { + NetSocket responseSocket = ar3.result(); + NetSocket proxyResponseSocket = proxiedResponse.netSocket(); + responseSocket.handler(proxyResponseSocket::write); + proxyResponseSocket.handler(responseSocket::write); + responseSocket.closeHandler(v -> proxyResponseSocket.close()); + proxyResponseSocket.closeHandler(v -> responseSocket.close()); + } else { + // Find reproducer + System.err.println("Handle this case"); + ar3.cause().printStackTrace(); + } + }); + } else { + // Rejection + proxiedRequest.resume(); + end(proxyRequest, proxiedResponse.statusCode()); + } + } else { + proxiedRequest.resume(); + end(proxyRequest, 502); + } + }); + } else { + proxiedRequest.resume(); + end(proxyRequest, 502); + } + }); + } + + private void end(ProxyRequest proxyRequest, int sc) { + proxyRequest + .response() + .release() + .setStatusCode(sc) + .putHeader(HttpHeaders.CONTENT_LENGTH, "0") + .setBody(null) + .send(); + } + + private Future resolveOrigin(HttpServerRequest proxiedRequest) { + return selector.apply(proxiedRequest, client); + } + + private class Proxy implements ProxyContext { + + private final ProxyRequest request; + private ProxyResponse response; + private final Map attachments = new HashMap<>(); + private ListIterator filters; + + private Proxy(ProxyRequest request) { + this.request = request; + } + + @Override + public void set(String name, Object value) { + attachments.put(name, value); + } + + @Override + public T get(String name, Class type) { + Object o = attachments.get(name); + return type.isInstance(o) ? type.cast(o) : null; + } + + @Override + public ProxyRequest request() { + return request; + } + + @Override + public Future sendRequest() { + if (filters.hasNext()) { + ProxyInterceptor next = filters.next(); + return next.handleProxyRequest(this); + } else { + return sendProxyRequest(request); + } + } + + @Override + public ProxyResponse response() { + return response; + } + + @Override + public Future sendResponse() { + if (filters.hasPrevious()) { + ProxyInterceptor filter = filters.previous(); + return filter.handleProxyResponse(this); + } else { + return response.send(); + } + } + + private Future sendProxyRequest(ProxyRequest proxyRequest) { + Future f = resolveOrigin(proxyRequest.proxiedRequest()); + f.onFailure(err -> { + // Should this be done here ? I don't think so + HttpServerRequest proxiedRequest = proxyRequest.proxiedRequest(); + proxiedRequest.resume(); + Promise promise = Promise.promise(); + proxiedRequest.exceptionHandler(promise::tryFail); + proxiedRequest.endHandler(promise::tryComplete); + promise.future().onComplete(ar2 -> { + end(proxyRequest, 502); + }); + }); + return f.compose(a -> sendProxyRequest(proxyRequest, a)); + } + + private void setConnectionTimeout(ProxyRequest proxyRequest, HttpClientRequest request) { + String appCode = proxyRequest.headers().get(AppConfigHandler.getAppCodeHeaderKey()); + String apiCode = proxyRequest.headers().get(AppConfigHandler.getApiCodeHeaderKey()); + String key = appCode + ":" + apiCode; + request.idleTimeout(AppConfigHandler.getApicodeConfigTimeOut(key)); + } + + private Future sendProxyRequest(ProxyRequest proxyRequest, HttpClientRequest request) { + Future fut = proxyRequest.send(request); + // 超时 + setConnectionTimeout(proxyRequest, request); + fut.onFailure(err -> { + proxyRequest.proxiedRequest().response().setStatusCode(502).end(); + }); + return fut; + } + + private Future sendProxyResponse(ProxyResponse response) { + + this.response = response; + + // Check validity + Boolean chunked = HttpUtils.isChunked(response.headers()); + if (chunked == null) { + // response.request().release(); // Is it needed ??? + end(response.request(), 501); + return Future.succeededFuture(); // should use END future here ??? + } + + return sendResponse(); + } + } + +}