SAC规范RPC调用支持

This commit is contained in:
akun 2024-05-22 18:09:35 +08:00
parent 4b4d4cd2b7
commit a7d7679047
2 changed files with 87 additions and 105 deletions

View File

@ -20,6 +20,7 @@ import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RequestBody;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.HttpException;
import io.vertx.httpproxy.*;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
@ -50,10 +51,6 @@ import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.client.WebClient;
import io.vertx.httpproxy.HttpProxy;
import io.vertx.httpproxy.ProxyContext;
import io.vertx.httpproxy.ProxyInterceptor;
import io.vertx.httpproxy.ProxyResponse;
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;
import lombok.extern.slf4j.Slf4j;
@ -185,23 +182,28 @@ public class AppConfigHandler {
return true;
}
ApiConfig apicodeConfig = AppConfigHandler.getApicodeConfig(apiCodeCacheKey);
// GET,DELETE,HEAD请求不解析body
RequestMethod requestMethod = RequestMethod.getByCode(apicodeConfig.getMethod());
if (requestMethod == null || RequestMethod.GET.equals(requestMethod)
|| RequestMethod.DELETE.equals(requestMethod)
|| RequestMethod.HEAD.equals(requestMethod)){
return false;
}
// 模糊匹配不解析body
String uri = apicodeConfig.getUri();
if (StringUtils.equals(uri, "*")) {
return false;
}
// SAC请求方式GET,DELETE,HEAD请求需要解析body,SAC请求方式参数统一使用body传递可能需要二次处理
if (isServiceTypeSac(apiCodeCacheKey)){
RequestMethod requestMethod = RequestMethod.getByCode(apicodeConfig.getMethod());
if (RequestMethod.GET.equals(requestMethod)
|| RequestMethod.DELETE.equals(requestMethod)
|| RequestMethod.HEAD.equals(requestMethod)){
return true;
}
}
if (StringUtils.startsWith(contentType, "multipart")){
return false;
}
String keyCircuitBreaker = apiCodeCacheKey + ":" + "CIRCUIT_BREAKER";
CircuitBreaker circuitBreaker = AppConfigHandler.getApiCodeCircuitBreaker(keyCircuitBreaker);
boolean isDataSecurity = AppConfigHandler.isDataSecurity(appCode);
// 文件上传不走加解密
return (isDataSecurity || circuitBreaker != null) && StringUtils.startsWith(contentType, "multipart") == false;
return (isDataSecurity || circuitBreaker != null);
}
/***
@ -257,13 +259,17 @@ public class AppConfigHandler {
public static MockResponse mock(String key, RoutingContext rc) {
if (APICODE_CONFIG_MAP.get(key) != null && APICODE_CONFIG_MAP.get(key).getMockDefaultHttpStatus() != null){
String appCode = rc.request().headers().get(AppConfigHandler.getAppCodeHeaderKey());
// 如果是sac服务query和body参数都从body中取
Integer httpStatus = APICODE_CONFIG_MAP.get(key).getMockDefaultHttpStatus();
String mockResponse = APICODE_CONFIG_MAP.get(key).getMockDefaultResponse();
List<MockExpectation> mockExpectations = APICODE_CONFIG_MAP.get(key).getMockExpectations();
if (mockExpectations != null && !mockExpectations.isEmpty()){
RequestBody body = rc.body();
JsonObject jsonBody = body.asJsonObject();
String bodyStr = rc.body().asString();
if (AppConfigHandler.isDataSecurity(appCode)) {
bodyStr = AppConfigHandler.bodyDecrypt(bodyStr, appCode);
}
JsonObject jsonBody = new JsonObject(bodyStr);
for (MockExpectation mockExpectation : mockExpectations) {
// 匹配条件需要全部匹配成功
boolean matchSuccess = true;

View File

@ -17,12 +17,12 @@ import java.util.ListIterator;
import java.util.Map;
import java.util.function.BiFunction;
import cn.hutool.core.util.StrUtil;
import com.sf.vertx.enums.RequestMethod;
import org.apache.commons.lang3.StringUtils;
import com.sf.vertx.api.pojo.DataSecurity;
import com.sf.vertx.constans.SacErrorCode;
import com.sf.vertx.handle.AppConfigHandler;
import com.sf.vertx.security.MainSecurity;
import com.sf.vertx.utils.ProxyTool;
import io.vertx.circuitbreaker.CircuitBreaker;
@ -57,6 +57,8 @@ import io.vertx.httpproxy.cache.CacheOptions;
import io.vertx.httpproxy.spi.cache.Cache;
import lombok.extern.slf4j.Slf4j;
import static org.apache.commons.lang3.StringUtils.EMPTY;
@Slf4j
public class ReverseProxy implements HttpProxy {
@ -242,16 +244,15 @@ public class ReverseProxy implements HttpProxy {
}
}
private Future<ProxyResponse> breakerAndSecurity(CircuitBreaker circuitBreaker, ProxyRequest proxyRequest) {
private Future<ProxyResponse> breakerCall(CircuitBreaker circuitBreaker
, ProxyRequest proxyRequest
, HttpRequest<Buffer> requestBuffer
, String body
) {
String appCode = proxyRequest.headers().get(AppConfigHandler.getAppCodeHeaderKey());
String apiCode = proxyRequest.headers().get(AppConfigHandler.getApiCodeHeaderKey());
String key = appCode + ":" + apiCode;
return Future.future(p -> {
circuitBreaker.executeWithFallback(promise -> {
HttpRequest<Buffer> requestBuffer = methodGetRequestBuffer(proxyRequest);
requestBuffer.timeout(AppConfigHandler.getApicodeConfigTimeOut(key));
requestBuffer.putHeaders(proxyRequest.headers())
.sendJson(AppConfigHandler.bodyDecrypt(ctx.getBodyAsString(), appCode), h -> {
requestBuffer.sendJson(body, h -> {
if (h.succeeded()) {
log.info("==========uri:{},response http code:{}", proxyRequest.getURI(),
h.result().statusCode());
@ -267,12 +268,19 @@ public class ReverseProxy implements HttpProxy {
proxyRequest.release();
JsonObject responseData = h.result().bodyAsJsonObject();
log.info("responseData:{}", responseData);
Buffer buffer;
// 加密
if (AppConfigHandler.isDataSecurity(appCode)) {
String dataStr = AppConfigHandler.bodyEncrypt(responseData.toString(), appCode);
log.info("aesEncrypt dataStr:{}", dataStr);
Buffer buffer = Buffer.buffer(dataStr);
ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200)
.putHeader("content-type", "application/json").setBody(Body.body(buffer));
log.info("encrypt dataStr:{}", dataStr);
buffer = Buffer.buffer(dataStr);
}else {
buffer = responseData.toBuffer();
}
ProxyResponse proxyResponse = proxyRequest.response()
.setStatusCode(200)
.putHeader("content-type", "application/json")
.setBody(Body.body(buffer));
p.complete(proxyResponse);
} else {
// end(proxyRequest, 502);
@ -303,66 +311,11 @@ public class ReverseProxy implements HttpProxy {
});
}
private Future<ProxyResponse> breaker(CircuitBreaker circuitBreaker, ProxyRequest proxyRequest) {
String appCode = proxyRequest.headers().get(AppConfigHandler.getAppCodeHeaderKey());
String apiCode = proxyRequest.headers().get(AppConfigHandler.getApiCodeHeaderKey());
String key = appCode + ":" + apiCode;
return Future.future(p -> {
circuitBreaker.executeWithFallback(promise -> {
HttpRequest<Buffer> requestBuffer = methodGetRequestBuffer(proxyRequest);
requestBuffer.timeout(AppConfigHandler.getApicodeConfigTimeOut(key));
requestBuffer.putHeaders(proxyRequest.headers()).sendJson(ctx.getBodyAsString(), h -> {
if (h.succeeded()) {
log.info("==========uri:{},response http code:{}", proxyRequest.getURI(),
h.result().statusCode());
if (h.result().statusCode() == 200) {
// promise.complete();
promise.complete("1");
} else {
promise.fail("2");
}
// 释放资源
proxyRequest.release();
JsonObject responseData = h.result().bodyAsJsonObject();
log.info("responseData:{}", responseData);
ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200)
.putHeader("content-type", "application/json")
.setBody(Body.body(responseData.toBuffer()));
p.complete(proxyResponse);
} else {
promise.fail("3");
}
});
}, v -> {
// 需要传递当前状态half-open , close, 还是统计失败次数
log.info(circuitBreaker.name() + " executed when the circuit is opened:{}", v.getMessage());
if (v instanceof HalfOpenCircuitException) {
log.info(circuitBreaker.name() + " half open circuit");
return v.getMessage();
} else if (v instanceof OpenCircuitException) {
log.info(circuitBreaker.name() + " open circuit");
} else if (v instanceof NoStackTraceThrowable) {
log.info(circuitBreaker.name() + " close circuit");
return v.getMessage();
}
return "3";
}, ar -> {
log.info(circuitBreaker.name() + " interface failed result.{} ", ar);
if (StringUtils.equals(ar.result(), "3")) { // 全开,熔断
end(proxyRequest, 10016);
}
});
});
}
private Future<ProxyResponse> security(CircuitBreaker circuitBreaker, ProxyRequest proxyRequest) {
private Future<ProxyResponse> normalCall(ProxyRequest proxyRequest, HttpRequest<Buffer> requestBuffer, String body) {
String appCode = proxyRequest.headers().get(AppConfigHandler.getAppCodeHeaderKey());
String apiCode = proxyRequest.headers().get(AppConfigHandler.getApiCodeHeaderKey());
String key = appCode + ":" + apiCode;
return Future.future(p -> {
HttpRequest<Buffer> requestBuffer = methodGetRequestBuffer(proxyRequest);
requestBuffer.timeout(AppConfigHandler.getApicodeConfigTimeOut(key));
requestBuffer.putHeaders(proxyRequest.headers()).sendJson(AppConfigHandler.bodyDecrypt(ctx.getBodyAsString(), appCode),
requestBuffer.sendJson(body,
h -> {
if (h.succeeded()) {
log.info("==========uri:{},response http code:{}", proxyRequest.getURI(),
@ -372,12 +325,15 @@ public class ReverseProxy implements HttpProxy {
proxyRequest.release();
JsonObject responseData = h.result().bodyAsJsonObject();
log.info("responseData:{}", responseData);
Buffer bodyBuffer = responseData.toBuffer();
// 加密
if (AppConfigHandler.isDataSecurity(appCode)) {
String dataStr = AppConfigHandler.bodyEncrypt(responseData.toString(), appCode);
log.info("encrypt dataStr:{}", dataStr);
Buffer buffer = Buffer.buffer(dataStr);
bodyBuffer = Buffer.buffer(dataStr);
}
ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200)
.putHeader("content-type", "application/json").setBody(Body.body(buffer));
.putHeader("content-type", "application/json").setBody(Body.body(bodyBuffer));
p.complete(proxyResponse);
}
} else {
@ -388,26 +344,48 @@ public class ReverseProxy implements HttpProxy {
});
}
private HttpRequest<Buffer> methodGetRequestBuffer(ProxyRequest proxyRequest) {
private HttpRequest<Buffer> buildCallRequestBuffer(ProxyRequest proxyRequest,String body) {
String appCode = proxyRequest.headers().get(AppConfigHandler.getAppCodeHeaderKey());
String apiCode = proxyRequest.headers().get(AppConfigHandler.getApiCodeHeaderKey());
String key = appCode + ":" + apiCode;
SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest());
HttpRequest<Buffer> requestBuffer = null;
HttpRequest<Buffer> requestBuffer;
String requestURI = proxyRequest.getURI();
switch (proxyRequest.getMethod().name()) {
case "PUT":
requestBuffer = mainWebClient.put(socketAddress.port(), socketAddress.host(), proxyRequest.getURI());
requestBuffer = mainWebClient.put(socketAddress.port(), socketAddress.host(), requestURI);
break;
case "DELETE":
requestBuffer = mainWebClient.delete(socketAddress.port(), socketAddress.host(), proxyRequest.getURI());
requestBuffer = mainWebClient.delete(socketAddress.port(), socketAddress.host(), requestURI);
break;
case "HEAD":
requestBuffer = mainWebClient.head(socketAddress.port(), socketAddress.host(), proxyRequest.getURI());
requestBuffer = mainWebClient.head(socketAddress.port(), socketAddress.host(), requestURI);
break;
case "GET":
requestBuffer = mainWebClient.get(socketAddress.port(), socketAddress.host(), proxyRequest.getURI());
requestBuffer = mainWebClient.get(socketAddress.port(), socketAddress.host(), requestURI);
break;
default:
requestBuffer = mainWebClient.post(socketAddress.port(), socketAddress.host(), proxyRequest.getURI());
requestBuffer = mainWebClient.post(socketAddress.port(), socketAddress.host(), requestURI);
break;
}
requestBuffer.timeout(AppConfigHandler.getApicodeConfigTimeOut(key));
requestBuffer.putHeaders(proxyRequest.headers());
// 处理sac请求方式
RequestMethod requestMethod = RequestMethod.getByCode(proxyRequest.getMethod().name());
if (RequestMethod.GET.equals(requestMethod)
|| RequestMethod.DELETE.equals(requestMethod)
|| RequestMethod.HEAD.equals(requestMethod)){
JsonObject jsonBody = new JsonObject(body);
Map<String, String> queryParams = new HashMap<>();
for (Map.Entry<String, Object> entry : jsonBody) {
String val = entry.getValue() != null ? entry.getValue().toString() : EMPTY;
queryParams.put(entry.getKey(),val);
requestURI = StrUtil.replace(requestURI,"{" + entry.getKey() +"}",val);
}
requestBuffer.uri(requestURI);
requestBuffer.queryParams().addAll(queryParams);
}
return requestBuffer;
}
@ -421,16 +399,13 @@ public class ReverseProxy implements HttpProxy {
String keyCircuitBreaker = appCode + ":" + apiCode + ":" + "CIRCUIT_BREAKER";
CircuitBreaker circuitBreaker = AppConfigHandler.getApiCodeCircuitBreaker(keyCircuitBreaker);
if (AppConfigHandler.isAnalysisBody(appCode, apiCode, contentType)) {
String body = AppConfigHandler.isDataSecurity(appCode) ? AppConfigHandler.bodyDecrypt(ctx.body().asString(), appCode):ctx.body().asString();
HttpRequest<Buffer> requestBuffer = buildCallRequestBuffer(proxyRequest,body);
try {
if (AppConfigHandler.isDataSecurity(appCode) && circuitBreaker != null) {
return breakerAndSecurity(circuitBreaker, proxyRequest);
} else if (AppConfigHandler.isDataSecurity(appCode)) {
return security(circuitBreaker, proxyRequest);
} else if (circuitBreaker != null) {
return breaker(circuitBreaker, proxyRequest);
if (circuitBreaker != null) {
return breakerCall(circuitBreaker, proxyRequest,requestBuffer,body);
} else {
log.info("not match any condition.appCode:{},apiCode:{}", appCode, apiCode);
throw new HttpException(10013);
return normalCall(proxyRequest,requestBuffer,body);
}
} catch (Exception e) {
e.printStackTrace();
@ -443,7 +418,7 @@ public class ReverseProxy implements HttpProxy {
} else {
Future<HttpClientRequest> f = resolveOrigin(proxyRequest.proxiedRequest());
f.onFailure(err -> {
log.info("error:{}", err);
log.info("error:", err);
// Should this be done here ? I don't think so
HttpServerRequest proxiedRequest = proxyRequest.proxiedRequest();
proxiedRequest.resume();
@ -493,4 +468,5 @@ public class ReverseProxy implements HttpProxy {
}
}