sac/sf-vertx/src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java
2024-04-30 09:25:34 +08:00

369 lines
12 KiB
Java

/*
* Copyright (c) 2011-2020 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.httpproxy.impl;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.function.BiFunction;
import org.apache.commons.lang3.StringUtils;
import com.sf.vertx.api.pojo.DataSecurity;
import com.sf.vertx.security.MainSecurity;
import com.sf.vertx.service.impl.AppConfigServiceImpl;
import com.sf.vertx.utils.ProxyTool;
import io.vertx.circuitbreaker.CircuitBreaker;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.handler.HttpException;
import io.vertx.httpproxy.Body;
import io.vertx.httpproxy.HttpProxy;
import io.vertx.httpproxy.ProxyContext;
import io.vertx.httpproxy.ProxyInterceptor;
import io.vertx.httpproxy.ProxyOptions;
import io.vertx.httpproxy.ProxyRequest;
import io.vertx.httpproxy.ProxyResponse;
import io.vertx.httpproxy.cache.CacheOptions;
import io.vertx.httpproxy.spi.cache.Cache;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ReverseProxy implements HttpProxy {
private final HttpClient client;
private final boolean supportWebSocket;
private BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> selector = (req, client) -> Future
.failedFuture("No origin available");
private final List<ProxyInterceptor> interceptors = new ArrayList<>();
private RoutingContext ctx;
private CircuitBreaker breaker;
private WebClient mainWebClient;
public ReverseProxy(ProxyOptions options, HttpClient client) {
CacheOptions cacheOptions = options.getCacheOptions();
if (cacheOptions != null) {
Cache<String, Resource> cache = cacheOptions.newCache();
addInterceptor(new CachingFilter(cache));
}
this.client = client;
this.supportWebSocket = options.getSupportWebSocket();
}
@Override
public HttpProxy originRequestProvider(
BiFunction<HttpServerRequest, HttpClient, Future<HttpClientRequest>> provider) {
selector = provider;
return this;
}
@Override
public HttpProxy addInterceptor(ProxyInterceptor interceptor) {
interceptors.add(interceptor);
return this;
}
@Override
public void handle(WebClient mainWebClient, RoutingContext ctx, CircuitBreaker breaker) {
// TODO 改造了这个地方
this.ctx = ctx;
this.breaker = breaker;
this.mainWebClient = mainWebClient;
handle(ctx.request());
}
@Override
public void handle(HttpServerRequest request) {
ProxyRequest proxyRequest = ProxyRequest.reverseProxy(request);
// Encoding sanity check
Boolean chunked = HttpUtils.isChunked(request.headers());
if (chunked == null) {
end(proxyRequest, 400);
return;
}
// WebSocket upgrade tunneling
if (supportWebSocket && io.vertx.core.http.impl.HttpUtils.canUpgradeToWebSocket(request)) {
handleWebSocketUpgrade(proxyRequest);
return;
}
Proxy proxy = new Proxy(proxyRequest);
proxy.filters = interceptors.listIterator();
proxy.sendRequest().compose(proxy::sendProxyResponse);
}
private void handleWebSocketUpgrade(ProxyRequest proxyRequest) {
HttpServerRequest proxiedRequest = proxyRequest.proxiedRequest();
resolveOrigin(proxiedRequest).onComplete(ar -> {
if (ar.succeeded()) {
HttpClientRequest request = ar.result();
request.setMethod(HttpMethod.GET);
request.setURI(proxiedRequest.uri());
request.headers().addAll(proxiedRequest.headers());
Future<HttpClientResponse> fut2 = request.connect();
proxiedRequest.handler(request::write);
proxiedRequest.endHandler(v -> request.end());
proxiedRequest.resume();
fut2.onComplete(ar2 -> {
if (ar2.succeeded()) {
HttpClientResponse proxiedResponse = ar2.result();
if (proxiedResponse.statusCode() == 101) {
HttpServerResponse response = proxiedRequest.response();
response.setStatusCode(101);
response.headers().addAll(proxiedResponse.headers());
Future<NetSocket> otherso = proxiedRequest.toNetSocket();
otherso.onComplete(ar3 -> {
if (ar3.succeeded()) {
NetSocket responseSocket = ar3.result();
NetSocket proxyResponseSocket = proxiedResponse.netSocket();
responseSocket.handler(proxyResponseSocket::write);
proxyResponseSocket.handler(responseSocket::write);
responseSocket.closeHandler(v -> proxyResponseSocket.close());
proxyResponseSocket.closeHandler(v -> responseSocket.close());
} else {
// Find reproducer
System.err.println("Handle this case");
ar3.cause().printStackTrace();
}
});
} else {
// Rejection
proxiedRequest.resume();
end(proxyRequest, proxiedResponse.statusCode());
}
} else {
proxiedRequest.resume();
end(proxyRequest, 502);
}
});
} else {
proxiedRequest.resume();
end(proxyRequest, 502);
}
});
}
/***
* 返回错误
*
* @param proxyRequest
* @param sc
*/
private void end(ProxyRequest proxyRequest, int sc) {
// TODO 处理反向代理返回结果
if (ProxyTool._ERROR.containsKey(sc)) {
Buffer buffer = Buffer.buffer(ProxyTool._ERROR.get(sc));
proxyRequest.response().release().setStatusCode(sc).putHeader("content-type", "application/json")
.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buffer.length())).setBody(Body.body(buffer))
.send();
} else {
// proxyRequest.response().release().setStatusCode(sc).putHeader(HttpHeaders.CONTENT_LENGTH, "0").setBody(null)
// .send();
Buffer buffer = Buffer.buffer(ProxyTool.DEFAULT_ERROR_MSG);
proxyRequest.response().release().setStatusCode(sc).putHeader("content-type", "application/json")
.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(ProxyTool.DEFAULT_ERROR_MSG.length()))
.setBody(Body.body(buffer)).send();
}
}
private Future<HttpClientRequest> resolveOrigin(HttpServerRequest proxiedRequest) {
return selector.apply(proxiedRequest, client);
}
private class Proxy implements ProxyContext {
private final ProxyRequest request;
private ProxyResponse response;
private final Map<String, Object> attachments = new HashMap<>();
private ListIterator<ProxyInterceptor> filters;
private Proxy(ProxyRequest request) {
this.request = request;
}
@Override
public void set(String name, Object value) {
attachments.put(name, value);
}
@Override
public <T> T get(String name, Class<T> type) {
Object o = attachments.get(name);
return type.isInstance(o) ? type.cast(o) : null;
}
@Override
public ProxyRequest request() {
return request;
}
@Override
public Future<ProxyResponse> sendRequest() {
if (filters.hasNext()) {
ProxyInterceptor next = filters.next();
return next.handleProxyRequest(this);
} else {
return sendProxyRequest(request);
}
}
@Override
public ProxyResponse response() {
return response;
}
@Override
public Future<Void> sendResponse() {
if (filters.hasPrevious()) {
ProxyInterceptor filter = filters.previous();
return filter.handleProxyResponse(this);
} else {
return response.send();
}
}
private Future<ProxyResponse> sendProxyRequest(ProxyRequest proxyRequest) {
// TODO 服务熔断策略, 如果已经熔断,将剔除负载均衡策略
// 发起一个请求
String sacAppHeaderKey = proxyRequest.headers().get(AppConfigServiceImpl.getSacAppHeaderKey());
if (StringUtils.isNotBlank(sacAppHeaderKey) && AppConfigServiceImpl.appDataSecurity(sacAppHeaderKey)) {
return Future.future(p -> {
breaker.executeWithFallback(promise -> {
SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest());
mainWebClient.post(socketAddress.port(), socketAddress.host(), proxyRequest.getURI())
.putHeaders(proxyRequest.headers())
.sendJson(bodyDecrypt(ctx.getBodyAsString(), sacAppHeaderKey), h -> {
if (h.succeeded()) {
log.info("begin date:{}", System.currentTimeMillis());
// 释放资源
proxyRequest.release();
JsonObject responseData = h.result().bodyAsJsonObject();
log.info("responseData:{}", responseData);
// 加密
String dataStr = bodyEncrypt(responseData.toString(), sacAppHeaderKey);
log.info("aesEncrypt dataStr:{}", dataStr);
Buffer buffer = Buffer.buffer(dataStr);
ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200)
.putHeader("content-type", "application/json")
.setBody(Body.body(buffer));
p.complete(proxyResponse);
promise.complete("1");
log.info("end date:{}", System.currentTimeMillis());
} else {
log.info("error: {}", h.cause());
promise.fail("2");
}
});
}, v -> {
// Executed when the circuit is opened
log.info("Executed when the circuit is opened:{}", v);
return "3";
}, ar -> {
// Do something with the result
log.info("failed:{}, Result:{}", ar.failed(), ar.result());
if (StringUtils.equals(ar.result(), "1") == false) {
end(proxyRequest, 502);
}
});
});
} else {
Future<HttpClientRequest> f = resolveOrigin(proxyRequest.proxiedRequest());
f.onFailure(err -> {
log.info("error:{}", err);
if (err instanceof ConnectException) {
// TODO 配置重试策略
// Connection refused: /127.0.0.1:9199
log.info("connection url is error:{}", err.getMessage());
}
// Should this be done here ? I don't think so
HttpServerRequest proxiedRequest = proxyRequest.proxiedRequest();
proxiedRequest.resume();
Promise<Void> promise = Promise.promise();
proxiedRequest.exceptionHandler(promise::tryFail);
proxiedRequest.endHandler(promise::tryComplete);
promise.future().onComplete(ar2 -> {
end(proxyRequest, 502);
});
});
return f.compose(a -> sendProxyRequest(proxyRequest, a));
}
}
private Future<ProxyResponse> sendProxyRequest(ProxyRequest proxyRequest, HttpClientRequest request) {
Future<ProxyResponse> fut = proxyRequest.send(request);
fut.onFailure(err -> {
proxyRequest.proxiedRequest().response().setStatusCode(502).end();
});
return fut;
}
private Future<Void> sendProxyResponse(ProxyResponse response) {
this.response = response;
// Check validity
Boolean chunked = HttpUtils.isChunked(response.headers());
if (chunked == null) {
// response.request().release(); // Is it needed ???
end(response.request(), 501);
return Future.succeededFuture(); // should use END future here ???
}
return sendResponse();
}
private String bodyEncrypt(String body, String sacAppHeaderKey) {
DataSecurity dataSecurity = AppConfigServiceImpl.getAppConfig(sacAppHeaderKey).getDataSecurity();
switch (dataSecurity.getAlgorithm()) {
case "AES":
return MainSecurity.aesEncrypt(body, dataSecurity.getPrivateKey());
default:
break;
}
log.info(" appcode:{}, encrypt key config is error.", sacAppHeaderKey);
throw new HttpException(10001);
}
private String bodyDecrypt(String body, String sacAppHeaderKey) {
DataSecurity dataSecurity = AppConfigServiceImpl.getAppConfig(sacAppHeaderKey).getDataSecurity();
switch (dataSecurity.getAlgorithm()) {
case "AES":
return MainSecurity.aesDecrypt(body, dataSecurity.getPrivateKey());
default:
break;
}
log.info(" appcode:{}, decrypt key config is error.", sacAppHeaderKey);
throw new HttpException(10001);
}
}
}