Merge #1 into sac_dev from deployment_gateway_20240416

vertx错误 response header key

* deployment_gateway_20240416: (1 commits)
  vertx错误 response header key

Signed-off-by: dingtalk_rdvlmt <accounts_62d9f618aaa5896ceea4d185@mail.teambition.com>
Reviewed-by: dingtalk_rdvlmt <accounts_62d9f618aaa5896ceea4d185@mail.teambition.com>
Merged-by: dingtalk_rdvlmt <accounts_62d9f618aaa5896ceea4d185@mail.teambition.com>

CR-link: https://codeup.aliyun.com/zsmarter/sac/server/smarterFramework/change/1
This commit is contained in:
谢云 2024-05-13 16:51:32 +08:00
commit ba14fe4856
7 changed files with 129 additions and 57 deletions

View File

@ -35,6 +35,7 @@ public class SacErrorCode {
_ERROR.put(10018, "apiCode与uri不匹配");
_ERROR.put(10019, "请求不支持conetnt-type类型");
_ERROR.put(10020, "uri返回mock数据");
_ERROR.put(10021, "无法找到负载均衡路由节点");
};
public static JsonObject returnErrorMsg(Integer errorCode) {

View File

@ -3,7 +3,6 @@ package com.sf.vertx.handle;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -16,7 +15,6 @@ import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.TypeReference;
import com.hazelcast.config.Config;
import com.hazelcast.config.JoinConfig;
import com.hazelcast.config.ManagementCenterConfig;
import com.hazelcast.config.NetworkConfig;
import com.hazelcast.config.TcpIpConfig;
import com.sf.vertx.api.pojo.ApiConfig;
@ -82,16 +80,38 @@ public class AppConfigHandler {
// apiCode限流配置 appCode:apiCode - RateLimiterRegistry
private static ConcurrentHashMap<String, SacCurrentLimiting> APICODE_CONFIG_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>();
// 负载均衡路由类型 appCode:apiCode - routerType
// 执行流程 routerType= <br/>
// 1serviceNodel="NORMAL", serviceNodel="ROUTE" and RouteType = "WEIGHT_ROUTE" <br/>
// return LOADBALANCING_MAP
// 2serviceNodel="ROUTE", RouteType = "HEADER_ROUTE" <br/>
// return APICODE_CONFIG_ROUTERCONENT_MAP
private static ConcurrentHashMap<String, Integer> APICODE_CONFIG_ROUTERTYPE_MAP = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, List<RouteContent>> APICODE_CONFIG_ROUTERCONENT_MAP = new ConcurrentHashMap<>();
// apiCode熔断配置 appCode:apiCode - CircuitBreaker
private static ConcurrentHashMap<String, CircuitBreaker> APICODE_CONFIG_CIRCUIT_BREAKER_MAP = new ConcurrentHashMap<>();
// 禁用appCode
private static ConcurrentHashSet<String> DISABLED_APPCODE = new ConcurrentHashSet<String>();
public static Integer routerType(String key) {
return APICODE_CONFIG_ROUTERTYPE_MAP.get(key) != null ? APICODE_CONFIG_ROUTERTYPE_MAP.get(key) : 1;
}
public static List<RouteContent> routerConentList(String key) {
return APICODE_CONFIG_ROUTERCONENT_MAP.get(key);
}
public static Integer requestModel() {
return sacVertxConfig.getRequestModel();
}
public static String sacResponseHeaderKey() {
return sacVertxConfig.getSacResponseHeaderKey();
}
public static String rpcUri() {
return sacVertxConfig.getRpcUri();
}
@ -281,6 +301,8 @@ public class AppConfigHandler {
}
// app router负载均衡
int routerType = 1;
List<RouteContent> routeContentList = null;
for (SacService sacService : appConfig.getService()) {
List<Node> nodeList = new ArrayList<>();
// 获取service模式
@ -300,10 +322,14 @@ public class AppConfigHandler {
node.setPort(routeContent.getServerAddress().getPort());
node.setWeight(routeContent.getWeight() != null && routeContent.getWeight() > 0
? routeContent.getWeight()
: 0);
: 1);
node.setProtocol(routeContent.getServerAddress().getProtocol());
nodeList.add(node);
}
} else if (sacService.getRouteConfig() != null
&& StringUtils.equals(sacService.getRouteConfig().getRouteType(), "HEADER_ROUTE")) {
routerType = 2;
routeContentList = sacService.getRouteConfig().getRouteContent();
}
}
@ -312,10 +338,21 @@ public class AppConfigHandler {
for (ApiConfig apiConfig : sacService.getApiConfig()) {
String key = appCode + ":" + apiConfig.getApiCode();
APICODE_CONFIG_MAP.put(key, apiConfig);
if (nodeList.size() > 0) {
// 初始化负载均衡算法
SacLoadBalancing sacLoadBalancing = ProxyTool.roundRobin(nodeList);
LOADBALANCING_MAP.put(key, sacLoadBalancing);
// 负载均衡模式
APICODE_CONFIG_ROUTERTYPE_MAP.put(key, routerType);
switch (routerType) {
case 1:
if (nodeList.size() > 0) {
// 初始化负载均衡算法
SacLoadBalancing sacLoadBalancing = ProxyTool.roundRobin(nodeList);
LOADBALANCING_MAP.put(key, sacLoadBalancing);
}
break;
case 2:
APICODE_CONFIG_ROUTERCONENT_MAP.put(key, routeContentList);
default:
break;
}
if (apiConfig.getStrategy() != null && apiConfig.getStrategy().size() > 0) {
@ -345,7 +382,7 @@ public class AppConfigHandler {
.setFailuresRollingWindow(strategy.getTimeWindow() * 1000) // 毫秒
// .setTimeout(2000) // 超时时间
.setFallbackOnFailure(true) // 失败后是否调用回退函数fallback
.setResetTimeout(strategy.getRecovery_interval()) // 在开启状态下尝试重试之前所需时间
.setResetTimeout(strategy.getRecovery_interval() * 1000) // 在开启状态下尝试重试之前所需时间
).openHandler(v -> {
log.info(keyCircuitBreaker + " Circuit open");
}).halfOpenHandler(v -> {

View File

@ -17,10 +17,12 @@ public class RestfulFailureHandlerImpl implements RestfulFailureHandler {
public void handle(RoutingContext frc) {
int statusCode = 500;
JsonObject errorJson = null;
Integer sc = SacErrorCode.DEFAULT_ERROR_CODE;
try {
Throwable failure = frc.failure();
if (failure instanceof HttpException) {
HttpException httpException = (HttpException) failure;
sc = httpException.getStatusCode();
if (StringUtils.isNoneBlank(httpException.getPayload())) {
errorJson = new JsonObject(httpException.getPayload());
} else {
@ -28,6 +30,7 @@ public class RestfulFailureHandlerImpl implements RestfulFailureHandler {
}
} else if (failure instanceof MockException) {
MockException httpException = (MockException) failure;
sc = httpException.getStatusCode();
if (StringUtils.isNoneBlank(httpException.getPayload())) {
statusCode = 200;
errorJson = new JsonObject(httpException.getPayload());
@ -44,6 +47,7 @@ public class RestfulFailureHandlerImpl implements RestfulFailureHandler {
}
frc.response().setChunked(true).setStatusCode(statusCode).putHeader("Content-Type", "application/json")
.putHeader(AppConfigHandler.sacResponseHeaderKey(), String.valueOf(sc))
.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(errorJson.size())).end(errorJson.toBuffer());
return;
}

View File

@ -25,6 +25,9 @@ public class SacVertxConfig {
@Value("${server.vertx.cluster.portAutoIncrement:false}")
private boolean portAutoIncrement;
@Value("${server.vertx.sacResponseHeaderKey:sacErrorCode}")
private String sacResponseHeaderKey;
@Value("${server.vertx.requestModel:2}")
private Integer requestModel;

View File

@ -3,13 +3,17 @@ package com.sf.vertx.utils;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import com.sf.vertx.api.pojo.Node;
import com.sf.vertx.api.pojo.RouteContent;
import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing;
import com.sf.vertx.arithmetic.roundRobin.WeightedRoundRobin;
import com.sf.vertx.handle.AppConfigHandler;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.handler.HttpException;
import lombok.extern.slf4j.Slf4j;
/***
@ -25,12 +29,36 @@ public class ProxyTool {
String appCode = request.getHeader(AppConfigHandler.getAppCodeHeaderKey());
String apiCode = request.getHeader(AppConfigHandler.getApiCodeHeaderKey());
log.info("uri:{}, header appCode:{},apiCode:{}", request.uri(), appCode, apiCode);
SacLoadBalancing sacLoadBalancing = AppConfigHandler.getLoadBalancing(appCode + ":" + apiCode);
// TODO 区分httpshttp
Node node = sacLoadBalancing.selectNode();
SocketAddress socketAddress = SocketAddress.inetSocketAddress(node.getPort(), node.getIp());
log.info("sacLoadBalancing address:{},port:{}", socketAddress.host(), socketAddress.port());
return socketAddress;
// 判断 "routeType": "WEIGHT_ROUTE", // 路由类型 WEIGHT_ROUTE HEADER_ROUTE
String key = appCode + ":" + apiCode;
Integer routerType = AppConfigHandler.routerType(key);
SocketAddress socketAddress = null;
switch (routerType) {
case 1:
SacLoadBalancing sacLoadBalancing = AppConfigHandler.getLoadBalancing(key);
Node node = sacLoadBalancing.selectNode();
socketAddress = SocketAddress.inetSocketAddress(node.getPort(), node.getIp());
log.info("sacLoadBalancing address:{},port:{}", socketAddress.host(), socketAddress.port());
return socketAddress;
case 2:
List<RouteContent> routeContentList = AppConfigHandler.routerConentList(key);
if(routeContentList != null && routeContentList.size() > 0) {
for (RouteContent routeContent : routeContentList) {
String headerValue = request.getHeader(routeContent.getHeaderKey());
List<String> headerValues = routeContent.getHeaderValues();
// String matchType = routeContent.getMatchType();
if(headerValues.contains(headerValue)) {
socketAddress = SocketAddress.inetSocketAddress(routeContent.getServerAddress().getPort(), routeContent.getServerAddress().getHost());
log.info("sacLoadBalancing address:{},port:{}", socketAddress.host(), socketAddress.port());
return socketAddress;
}
}
}
break;
}
// 抛出异常,无法找到负载均衡node节点
throw new HttpException(10021);
}
public static boolean regexMatch(String pattern, String target) {

View File

@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils;
import com.sf.vertx.api.pojo.DataSecurity;
import com.sf.vertx.constans.SacErrorCode;
import com.sf.vertx.handle.AppConfigHandler;
import com.sf.vertx.init.SacVertxConfig;
import com.sf.vertx.security.MainSecurity;
import com.sf.vertx.utils.ProxyTool;
@ -181,6 +182,7 @@ public class ReverseProxy implements HttpProxy {
private void end(ProxyRequest proxyRequest, int sc) {
JsonObject json = SacErrorCode.returnErrorMsg(sc);
proxyRequest.response().release().setStatusCode(500).putHeader("content-type", "application/json")
.putHeader(AppConfigHandler.sacResponseHeaderKey(), String.valueOf(sc))
.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(json.size())).setBody(Body.body(json.toBuffer()))
.send();
}
@ -254,28 +256,27 @@ public class ReverseProxy implements HttpProxy {
if (h.result().statusCode() == 200) {
// promise.complete();
promise.complete("1");
// 释放资源
proxyRequest.release();
JsonObject responseData = h.result().bodyAsJsonObject();
log.info("responseData:{}", responseData);
// 加密
String dataStr = bodyEncrypt(responseData.toString(), appCode);
log.info("aesEncrypt dataStr:{}", dataStr);
Buffer buffer = Buffer.buffer(dataStr);
ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200)
.putHeader("content-type", "application/json")
.setBody(Body.body(buffer));
p.complete(proxyResponse);
} else {
// Throwable throwable = new Throwable("error port");
// promise.fail(throwable);
promise.fail("2");
}
// 释放资源
proxyRequest.release();
JsonObject responseData = h.result().bodyAsJsonObject();
log.info("responseData:{}", responseData);
// 加密
String dataStr = bodyEncrypt(responseData.toString(), appCode);
log.info("aesEncrypt dataStr:{}", dataStr);
Buffer buffer = Buffer.buffer(dataStr);
ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200)
.putHeader("content-type", "application/json").setBody(Body.body(buffer));
p.complete(proxyResponse);
} else {
// end(proxyRequest, 502);
// Throwable throwable = new Throwable("error port");
// promise.fail(throwable);
promise.fail("2");
promise.fail("3");
}
});
}, v -> {
@ -283,23 +284,19 @@ public class ReverseProxy implements HttpProxy {
log.info(circuitBreaker.name() + " executed when the circuit is opened:{}", v.getMessage());
if (v instanceof HalfOpenCircuitException) {
log.info(circuitBreaker.name() + " half open circuit");
return v.getMessage();
} else if (v instanceof OpenCircuitException) {
log.info(circuitBreaker.name() + " open circuit");
} else if (v instanceof NoStackTraceThrowable) {
log.info(circuitBreaker.name() + " close circuit");
return v.getMessage();
}
return "3";
}, ar -> {
// Do something with the result
log.info(circuitBreaker.name() + " interface failed result.{} ", ar);
// String
if (StringUtils.equals(ar.result(), "1") == false) {
if (StringUtils.equals(ar.result(), "3")) { // 全开,熔断
end(proxyRequest, 10016);
}
// Throwable
// if(ar.result() != null) {
// end(proxyRequest, 502);
// }
});
});
}
@ -309,25 +306,25 @@ public class ReverseProxy implements HttpProxy {
circuitBreaker.executeWithFallback(promise -> {
HttpRequest<Buffer> requestBuffer = methodGetRequestBuffer(proxyRequest);
requestBuffer.putHeaders(proxyRequest.headers()).sendJson(ctx.getBodyAsString(), h -> {
log.info("==========uri:{},response http code:{}, succeeded:{}", proxyRequest.getURI(),
h.result().statusCode(), h.succeeded());
if (h.succeeded()) {
log.info("==========uri:{},response http code:{}", proxyRequest.getURI(),
h.result().statusCode());
if (h.result().statusCode() == 200) {
// promise.complete();
promise.complete("1");
// 释放资源
proxyRequest.release();
JsonObject responseData = h.result().bodyAsJsonObject();
log.info("responseData:{}", responseData);
ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200)
.putHeader("content-type", "application/json")
.setBody(Body.body(responseData.toBuffer()));
p.complete(proxyResponse);
} else {
promise.fail("2");
}
// 释放资源
proxyRequest.release();
JsonObject responseData = h.result().bodyAsJsonObject();
log.info("responseData:{}", responseData);
ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200)
.putHeader("content-type", "application/json")
.setBody(Body.body(responseData.toBuffer()));
p.complete(proxyResponse);
} else {
promise.fail("2");
promise.fail("3");
}
});
}, v -> {
@ -335,15 +332,17 @@ public class ReverseProxy implements HttpProxy {
log.info(circuitBreaker.name() + " executed when the circuit is opened:{}", v.getMessage());
if (v instanceof HalfOpenCircuitException) {
log.info(circuitBreaker.name() + " half open circuit");
return v.getMessage();
} else if (v instanceof OpenCircuitException) {
log.info(circuitBreaker.name() + " open circuit");
} else if (v instanceof NoStackTraceThrowable) {
log.info(circuitBreaker.name() + " close circuit");
return v.getMessage();
}
return "3";
}, ar -> {
log.info(circuitBreaker.name() + " interface failed result.{} ", ar);
if (StringUtils.equals(ar.result(), "1") == false) {
if (StringUtils.equals(ar.result(), "3")) { // 全开,熔断
end(proxyRequest, 10016);
}
});
@ -380,25 +379,24 @@ public class ReverseProxy implements HttpProxy {
});
}
private HttpRequest<Buffer> methodGetRequestBuffer(ProxyRequest proxyRequest){
private HttpRequest<Buffer> methodGetRequestBuffer(ProxyRequest proxyRequest) {
SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest());
HttpRequest<Buffer> requestBuffer = null;
switch (proxyRequest.getMethod().name()) {
case "PUT":
requestBuffer = mainWebClient.put(socketAddress.port(), socketAddress.host(),
proxyRequest.getURI());
requestBuffer = mainWebClient.put(socketAddress.port(), socketAddress.host(), proxyRequest.getURI());
break;
case "DELETE":
requestBuffer = mainWebClient.delete(socketAddress.port(), socketAddress.host(),
proxyRequest.getURI());
requestBuffer = mainWebClient.delete(socketAddress.port(), socketAddress.host(), proxyRequest.getURI());
break;
case "HEAD":
requestBuffer = mainWebClient.head(socketAddress.port(), socketAddress.host(), proxyRequest.getURI());
break;
case "GET":
requestBuffer = mainWebClient.get(socketAddress.port(), socketAddress.host(),
proxyRequest.getURI());
requestBuffer = mainWebClient.get(socketAddress.port(), socketAddress.host(), proxyRequest.getURI());
break;
default:
requestBuffer = mainWebClient.post(socketAddress.port(), socketAddress.host(),
proxyRequest.getURI());
requestBuffer = mainWebClient.post(socketAddress.port(), socketAddress.host(), proxyRequest.getURI());
break;
}
return requestBuffer;

View File

@ -4,6 +4,7 @@ server:
deploymentMode: 1 # 1:单机 2:集群
requestModel: 2 # 1: 客户端传递uri. 2: uri vertx代理,不对客户端暴露uri
rpcUri: /rpc
sacResponseHeaderKey: sacErrorCode
environment: dev
server:
default: