sac支持hazelcast集群, 消息订阅发布
This commit is contained in:
parent
0b3bc2fd0e
commit
e812d6dbf8
@ -78,12 +78,11 @@
|
||||
<groupId>cn.hutool</groupId>
|
||||
<artifactId>hutool-core</artifactId>
|
||||
</dependency>
|
||||
<!--
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
<artifactId>vertx-hazelcast</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!--
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
<artifactId>vertx-zookeeper</artifactId>
|
||||
@ -215,14 +214,18 @@
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>caffeine</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.github.resilience4j</groupId>
|
||||
<artifactId>resilience4j-ratelimiter</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.github.resilience4j</groupId>
|
||||
<artifactId>resilience4j-bulkhead</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.github.resilience4j</groupId>
|
||||
<artifactId>resilience4j-ratelimiter</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.github.resilience4j</groupId>
|
||||
<artifactId>resilience4j-bulkhead</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.vertx</groupId>
|
||||
<artifactId>vertx-consul-client</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -12,9 +12,6 @@ import com.sf.vertx.api.pojo.AppConfig;
|
||||
import com.sf.vertx.api.pojo.VertxConfig;
|
||||
import com.sf.vertx.service.AppConfigService;
|
||||
|
||||
import io.vertx.core.json.JsonObject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/***
|
||||
* 测试redis
|
||||
*
|
||||
@ -26,35 +23,50 @@ import lombok.extern.slf4j.Slf4j;
|
||||
public class AppConfigController {
|
||||
@Autowired
|
||||
private AppConfigService appConfigService;
|
||||
|
||||
|
||||
@PostMapping("/app/config")
|
||||
public JSONObject addAppConfig(@RequestBody AppConfig appConfig) {
|
||||
appConfigService.saveAppConfig(appConfig);
|
||||
JSONObject json = new JSONObject();
|
||||
json.put("code", 200);
|
||||
json.put("msg", "success");
|
||||
try {
|
||||
appConfigService.saveAppConfig(appConfig);
|
||||
json.put("code", 200);
|
||||
json.put("msg", "success");
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
json.put("code", 500);
|
||||
json.put("msg", e.getMessage());
|
||||
}
|
||||
return json;
|
||||
}
|
||||
|
||||
|
||||
@DeleteMapping("/app/config")
|
||||
public JSONObject deleteAppConfig(@RequestBody AppConfig appConfig) {
|
||||
appConfigService.deleteAppConfig(appConfig);
|
||||
JSONObject json = new JSONObject();
|
||||
json.put("code", 200);
|
||||
json.put("msg", "success");
|
||||
try {
|
||||
appConfigService.deleteAppConfig(appConfig);
|
||||
json.put("code", 200);
|
||||
json.put("msg", "success");
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
json.put("code", 500);
|
||||
json.put("msg", e.getMessage());
|
||||
}
|
||||
return json;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@PostMapping("/vertx/config")
|
||||
public JSONObject saveVertxConfig(@RequestBody VertxConfig vertxConfig) {
|
||||
appConfigService.saveVertxConfig(vertxConfig);
|
||||
JSONObject json = new JSONObject();
|
||||
json.put("code", 200);
|
||||
json.put("msg", "success");
|
||||
try {
|
||||
appConfigService.saveVertxConfig(vertxConfig);
|
||||
json.put("code", 200);
|
||||
json.put("msg", "success");
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
json.put("code", 500);
|
||||
json.put("msg", e.getMessage());
|
||||
}
|
||||
return json;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import io.github.resilience4j.core.functions.CheckedRunnable;
|
||||
import io.github.resilience4j.ratelimiter.RateLimiter;
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
import io.vertx.ext.web.handler.HttpException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/***
|
||||
* 内存存储
|
||||
@ -14,6 +15,7 @@ import io.vertx.ext.web.handler.HttpException;
|
||||
* @author xy
|
||||
*
|
||||
*/
|
||||
@Slf4j
|
||||
public class ApiRateLimitHandlerImpl implements ApiRateLimitHandler {
|
||||
|
||||
@Override
|
||||
@ -35,7 +37,8 @@ public class ApiRateLimitHandlerImpl implements ApiRateLimitHandler {
|
||||
try {
|
||||
restrictedCall.run();
|
||||
} catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
//t.printStackTrace();
|
||||
log.info("api ratelimit:{}", key);
|
||||
rc.fail(new HttpException(10015, currentLimiting.getStrategy().getDefaultResponse()));
|
||||
return;
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package com.sf.vertx.handle;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -12,6 +13,10 @@ import org.springframework.data.redis.core.RedisTemplate;
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.alibaba.fastjson2.TypeReference;
|
||||
import com.hazelcast.config.Config;
|
||||
import com.hazelcast.config.JoinConfig;
|
||||
import com.hazelcast.config.NetworkConfig;
|
||||
import com.hazelcast.config.TcpIpConfig;
|
||||
import com.sf.vertx.api.pojo.ApiConfig;
|
||||
import com.sf.vertx.api.pojo.AppConfig;
|
||||
import com.sf.vertx.api.pojo.Node;
|
||||
@ -21,6 +26,8 @@ import com.sf.vertx.api.pojo.Strategy;
|
||||
import com.sf.vertx.api.pojo.VertxConfig;
|
||||
import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing;
|
||||
import com.sf.vertx.constans.RedisKeyConfig;
|
||||
import com.sf.vertx.init.SacVertxConfig;
|
||||
import com.sf.vertx.pojo.ClusterEventMsg;
|
||||
import com.sf.vertx.pojo.SacCurrentLimiting;
|
||||
import com.sf.vertx.utils.ProxyTool;
|
||||
|
||||
@ -29,8 +36,20 @@ import io.github.resilience4j.ratelimiter.RateLimiterConfig;
|
||||
import io.github.resilience4j.ratelimiter.RateLimiterRegistry;
|
||||
import io.vertx.circuitbreaker.CircuitBreaker;
|
||||
import io.vertx.circuitbreaker.CircuitBreakerOptions;
|
||||
import io.vertx.core.Future;
|
||||
import io.vertx.core.Vertx;
|
||||
import io.vertx.core.VertxOptions;
|
||||
import io.vertx.core.http.HttpClient;
|
||||
import io.vertx.core.http.HttpServer;
|
||||
import io.vertx.core.http.HttpServerOptions;
|
||||
import io.vertx.core.spi.cluster.ClusterManager;
|
||||
import io.vertx.ext.web.Router;
|
||||
import io.vertx.ext.web.client.WebClient;
|
||||
import io.vertx.httpproxy.HttpProxy;
|
||||
import io.vertx.httpproxy.ProxyContext;
|
||||
import io.vertx.httpproxy.ProxyInterceptor;
|
||||
import io.vertx.httpproxy.ProxyResponse;
|
||||
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/***
|
||||
@ -95,6 +114,7 @@ public class AppConfigHandler {
|
||||
|
||||
/***
|
||||
* 是否解析, 走独立请求
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public static boolean isAnalysisBody(String appCode, String apiCode, String contentType) {
|
||||
@ -102,8 +122,9 @@ public class AppConfigHandler {
|
||||
CircuitBreaker circuitBreaker = AppConfigHandler.getApiCodeCircuitBreaker(keyCircuitBreaker);
|
||||
boolean isDataSecurity = AppConfigHandler.isDataSecurity(appCode);
|
||||
// 文件上传不走加解密
|
||||
return (isDataSecurity || circuitBreaker != null) && StringUtils.startsWith(contentType, "multipart") == false;
|
||||
return (isDataSecurity || circuitBreaker != null) && StringUtils.startsWith(contentType, "multipart") == false;
|
||||
}
|
||||
|
||||
/***
|
||||
* 优先apicode配置限流、无法找到匹配全局限流
|
||||
*
|
||||
@ -143,9 +164,10 @@ public class AppConfigHandler {
|
||||
}
|
||||
|
||||
public static boolean isApicodeUri(String key, String uri, String httpMethod) {
|
||||
return StringUtils.equals(APICODE_CONFIG_MAP.get(key).getUri(), uri) && StringUtils.equals(httpMethod, APICODE_CONFIG_MAP.get(key).getMethod());
|
||||
return StringUtils.equals(APICODE_CONFIG_MAP.get(key).getUri(), uri)
|
||||
&& StringUtils.equals(httpMethod, APICODE_CONFIG_MAP.get(key).getMethod());
|
||||
}
|
||||
|
||||
|
||||
public static String mock(String key) {
|
||||
return APICODE_CONFIG_MAP.get(key).getMockResponse();
|
||||
}
|
||||
@ -172,7 +194,7 @@ public class AppConfigHandler {
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
public static void initAllAppConfig(RedisTemplate<String, String> redisTemplate) throws Exception {
|
||||
public static void initAllAppConfig(RedisTemplate<String, String> redisTemplate) {
|
||||
Set<String> set = redisTemplate.opsForZSet().range(RedisKeyConfig.APP_CONFIG_SET_KEY, 0, -1);
|
||||
for (String appCode : set) {
|
||||
AppConfigHandler.initAppConfig(redisTemplate, appCode, false);
|
||||
@ -325,16 +347,157 @@ public class AppConfigHandler {
|
||||
}
|
||||
}
|
||||
|
||||
public static Vertx createVertx() {
|
||||
public static void createVertx(RedisTemplate<String, String> redisTemplate, SacVertxConfig sacVertxConfig) {
|
||||
// TODO 编解码线程池,后面优化协程等方式
|
||||
VertxOptions vertxOptions = new VertxOptions();
|
||||
loadVertxOptions(vertxOptions);
|
||||
VERTX = Vertx.vertx(vertxOptions);
|
||||
|
||||
initConnectionCircuitBreaker();
|
||||
createVertxRouter(VERTX, redisTemplate, sacVertxConfig.getPort());
|
||||
}
|
||||
|
||||
private static Config hazelcastConfig(SacVertxConfig sacVertxConfig) {
|
||||
// 集群
|
||||
Config hazelcastConfig = new Config();
|
||||
hazelcastConfig.setClusterName(sacVertxConfig.getClusterName()); // 集群名字
|
||||
NetworkConfig networkConfig = new NetworkConfig();
|
||||
networkConfig.setPort(5701);
|
||||
networkConfig.setPortAutoIncrement(true);
|
||||
|
||||
JoinConfig join = new JoinConfig();
|
||||
TcpIpConfig tcpIpConfig = new TcpIpConfig();
|
||||
tcpIpConfig.setEnabled(true);
|
||||
String[] clusterIps = sacVertxConfig.getClusterIp().split(",");
|
||||
List<String> members = Arrays.asList(clusterIps);
|
||||
tcpIpConfig.setMembers(members);
|
||||
join.setTcpIpConfig(tcpIpConfig);
|
||||
networkConfig.setJoin(join);
|
||||
hazelcastConfig.setNetworkConfig(networkConfig);
|
||||
|
||||
// TODO 还有问题,不会使用
|
||||
// ManagementCenterConfig managementCenterConfig = new ManagementCenterConfig();
|
||||
// Set<String> interfaces = new HashSet<>();
|
||||
// interfaces.add("http://192.168.1.68:8080/mancenter");
|
||||
// managementCenterConfig.setTrustedInterfaces(interfaces);
|
||||
// hazelcastConfig.setManagementCenterConfig(managementCenterConfig);
|
||||
return hazelcastConfig;
|
||||
}
|
||||
|
||||
public static Vertx createHazelcastClusterVertx(RedisTemplate<String, String> redisTemplate,
|
||||
SacVertxConfig sacVertxConfig) {
|
||||
Config hazelcastConfig = hazelcastConfig(sacVertxConfig);
|
||||
ClusterManager hazelcastClusterManager = new HazelcastClusterManager(hazelcastConfig);
|
||||
// TODO 编解码线程池,后面优化协程等方式
|
||||
VertxOptions vertxOptions = new VertxOptions();
|
||||
loadVertxOptions(vertxOptions);
|
||||
vertxOptions.setClusterManager(hazelcastClusterManager);
|
||||
Vertx.clusteredVertx(vertxOptions, res -> {
|
||||
if (res.succeeded()) {
|
||||
VERTX = res.result();
|
||||
log.info("hazelcastClusterManager create success");
|
||||
initConnectionCircuitBreaker();
|
||||
createVertxRouter(VERTX, redisTemplate, sacVertxConfig.getPort());
|
||||
// 订阅消息
|
||||
VERTX.eventBus().consumer("sac_cluster_event", message -> {
|
||||
if (message.body() != null) {
|
||||
ClusterEventMsg msg = JSONObject.parseObject(message.body().toString(), ClusterEventMsg.class);
|
||||
log.info("Received message: {}", msg);
|
||||
// message.reply("我是返回数据===" + message.body());
|
||||
if (msg.getType() == 1) {
|
||||
if (msg.getOperation() == 1) {
|
||||
// 初始化AppConfig本地缓存
|
||||
AppConfigHandler.initAppConfig(redisTemplate, msg.getAppCode(), true);
|
||||
} else if (msg.getOperation() == 3) {
|
||||
// 禁用本地缓存
|
||||
AppConfigHandler.addDisabledAppcode(msg.getAppCode());
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
res.cause().printStackTrace();
|
||||
log.info("hazelcastClusterManager create failure");
|
||||
}
|
||||
});
|
||||
return VERTX;
|
||||
}
|
||||
|
||||
/***
|
||||
* 发布消息,订阅消息
|
||||
*
|
||||
* @param msg
|
||||
*/
|
||||
public static void publishClusterEventMsg(ClusterEventMsg msg) {
|
||||
VERTX.eventBus().publish("sac_cluster_event", JSONObject.toJSONString(msg));
|
||||
}
|
||||
|
||||
private static void createVertxRouter(Vertx vertx, RedisTemplate<String, String> redisTemplate,
|
||||
Integer serverDefaultPort) {
|
||||
// consul初始化
|
||||
// ConsulHandler.init(vertx);
|
||||
// ConsulHandler.init1(vertx);
|
||||
|
||||
// 从redis同步app配置
|
||||
initAllAppConfig(redisTemplate);
|
||||
|
||||
VertxConfig vertxConfig = AppConfigHandler.getVertxConfig();
|
||||
// 创建HTTP监听
|
||||
// 所有ip都能访问
|
||||
HttpServerOptions httpServerOptions = new HttpServerOptions().setHost("0.0.0.0");
|
||||
HttpServer server = vertx.createHttpServer(httpServerOptions);
|
||||
Router mainHttpRouter = Router.router(vertx);
|
||||
Integer serverPort = vertxConfig.getPort() == null ? serverDefaultPort : vertxConfig.getPort();
|
||||
log.info("serverPort:{}", serverPort);
|
||||
server.requestHandler(mainHttpRouter).listen(serverPort, h -> {
|
||||
if (h.succeeded()) {
|
||||
log.info("HTTP端口监听成功:{}", serverPort);
|
||||
} else {
|
||||
log.error("HTTP端口监听失败:{}", serverPort);
|
||||
}
|
||||
});
|
||||
|
||||
// HttpClientOptions clientOptions = new HttpClientOptions();
|
||||
// clientOptions.setMaxPoolSize(20); // 最大连接池大小
|
||||
// clientOptions.setConnectTimeout(2000); // 连接超时 毫秒
|
||||
// clientOptions.setHttp2KeepAliveTimeout(1);
|
||||
// clientOptions.setIdleTimeout(1000); // 连接空闲超时 毫秒
|
||||
// HttpClient proxyClient = VERTX.createHttpClient(clientOptions);
|
||||
HttpClient proxyClient = vertx.createHttpClient();
|
||||
HttpProxy proxy = HttpProxy.reverseProxy(proxyClient);
|
||||
proxy.originSelector(request -> Future.succeededFuture(ProxyTool.resolveOriginAddress(request)));
|
||||
proxy.addInterceptor(new ProxyInterceptor() {
|
||||
@Override
|
||||
public Future<ProxyResponse> handleProxyRequest(ProxyContext context) {
|
||||
// if(StringUtils.equals(sacAppHeaderKey, "dsafdsfadafhappC")) {
|
||||
// // 会跳转到 RestfulFailureHandlerImpl
|
||||
// throw new HttpException(10003);
|
||||
// }
|
||||
return context.sendRequest();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> handleProxyResponse(ProxyContext context) {
|
||||
// 调试代码,获取reponse body
|
||||
// Filter filter = new Filter();
|
||||
// ProxyResponse proxyResponse = context.response();
|
||||
// Body body = proxyResponse.getBody();
|
||||
// proxyResponse.setBody(Body.body(filter.init(context.request().getURI(), body.stream(), false)));
|
||||
// 继续拦截链
|
||||
return context.sendResponse();
|
||||
}
|
||||
});
|
||||
WebClient mainWebClient = WebClient.create(vertx);
|
||||
|
||||
String rateLimitModel = vertxConfig.getRateLimitModel();
|
||||
rateLimitModel = "local";
|
||||
mainHttpRouter.route().handler(ParameterCheckHandler.create())
|
||||
.handler(AppRateLimitHandler.create(rateLimitModel)).handler(ApiRateLimitHandler.create(rateLimitModel))
|
||||
.handler(BodyHandler.create()).handler(ProxyHandler.create(mainWebClient, proxy))
|
||||
.failureHandler(RestfulFailureHandler.create());
|
||||
// mainHttpRouter.route().handler(ProxyHandler.create(mainWebClient, proxy));
|
||||
}
|
||||
|
||||
/***
|
||||
* 初始化connection Breaker
|
||||
*/
|
||||
|
@ -7,6 +7,7 @@ import io.github.resilience4j.core.functions.CheckedRunnable;
|
||||
import io.github.resilience4j.ratelimiter.RateLimiter;
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
import io.vertx.ext.web.handler.HttpException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/***
|
||||
* 内存存储
|
||||
@ -14,6 +15,7 @@ import io.vertx.ext.web.handler.HttpException;
|
||||
* @author xy
|
||||
*
|
||||
*/
|
||||
@Slf4j
|
||||
public class AppRateLimitHandlerImpl implements AppRateLimitHandler {
|
||||
|
||||
@Override
|
||||
@ -31,7 +33,8 @@ public class AppRateLimitHandlerImpl implements AppRateLimitHandler {
|
||||
try {
|
||||
restrictedCall.run();
|
||||
} catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
//t.printStackTrace();
|
||||
log.info("app ratelimit:{}", key);
|
||||
rc.fail(new HttpException(10015, currentLimiting.getStrategy().getDefaultResponse()));
|
||||
return;
|
||||
}
|
||||
|
109
sf-vertx/src/main/java/com/sf/vertx/handle/ConsulHandler.java
Normal file
109
sf-vertx/src/main/java/com/sf/vertx/handle/ConsulHandler.java
Normal file
@ -0,0 +1,109 @@
|
||||
package com.sf.vertx.handle;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
|
||||
import io.vertx.core.Vertx;
|
||||
import io.vertx.ext.consul.ConsulClientOptions;
|
||||
import io.vertx.ext.consul.KeyValue;
|
||||
import io.vertx.ext.consul.KeyValueList;
|
||||
import io.vertx.ext.consul.Watch;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class ConsulHandler {
|
||||
|
||||
public static void init(Vertx vertx) {
|
||||
ConsulClientOptions options = new ConsulClientOptions().setHost("127.0.0.1").setPort(8500);
|
||||
|
||||
Watch.keyPrefix("apiCode_", vertx, options)
|
||||
.setHandler(res -> {
|
||||
if (res.succeeded()) {
|
||||
//
|
||||
// 删除
|
||||
boolean isDel = (res.nextResult() == null || res.nextResult().getList() == null) && (res.prevResult() != null && res.prevResult().getList() != null);
|
||||
// if(isDel) {
|
||||
// log.info("isDel");
|
||||
// KeyValueList keyValueList = res.prevResult();
|
||||
// if(keyValueList != null && keyValueList.getList() != null) {
|
||||
// for(KeyValue keyValue : keyValueList.getList()) {
|
||||
// log.info("keyValue:{}", JSONObject.toJSONString(keyValue));
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// // 新增
|
||||
// boolean isAdd = (res.nextResult() != null && res.nextResult().getList() != null) && (res.prevResult() == null || res.prevResult().getList() == null);
|
||||
// if(isAdd) {
|
||||
// log.info("isAdd");
|
||||
// KeyValueList keyValueList = res.nextResult();
|
||||
// if(keyValueList != null && keyValueList.getList() != null) {
|
||||
// for(KeyValue keyValue : keyValueList.getList()) {
|
||||
// log.info("keyValue:{}", JSONObject.toJSONString(keyValue));
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // 修改
|
||||
// boolean isModify = (res.nextResult() != null && res.nextResult().getList() != null) && (res.prevResult() != null && res.prevResult().getList() != null);
|
||||
// if(isModify) {
|
||||
// log.info("isModify");
|
||||
// KeyValueList keyValueList = res.nextResult();
|
||||
// if(keyValueList != null && keyValueList.getList() != null) {
|
||||
// for(KeyValue keyValue : keyValueList.getList()) {
|
||||
// log.info("keyValue:{}", JSONObject.toJSONString(keyValue));
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
} else {
|
||||
res.cause().printStackTrace();
|
||||
}
|
||||
})
|
||||
.start();
|
||||
}
|
||||
|
||||
public static void init1(Vertx vertx) {
|
||||
ConsulClientOptions options = new ConsulClientOptions().setHost("127.0.0.1").setPort(8500);
|
||||
|
||||
Watch.keyPrefix("apiCode_", vertx, options)
|
||||
.setHandler(res -> {
|
||||
if (res.succeeded()) {
|
||||
// 删除
|
||||
boolean isDel = (res.nextResult() == null || res.nextResult().getList() == null) && (res.prevResult() != null && res.prevResult().getList() != null);
|
||||
if(isDel) {
|
||||
log.info("isDel");
|
||||
KeyValueList keyValueList = res.prevResult();
|
||||
if(keyValueList != null && keyValueList.getList() != null) {
|
||||
for(KeyValue keyValue : keyValueList.getList()) {
|
||||
log.info("keyValue:{}", JSONObject.toJSONString(keyValue));
|
||||
}
|
||||
}
|
||||
}
|
||||
// 新增
|
||||
boolean isAdd = (res.nextResult() != null && res.nextResult().getList() != null) && (res.prevResult() == null || res.prevResult().getList() == null);
|
||||
if(isAdd) {
|
||||
log.info("isAdd");
|
||||
KeyValueList keyValueList = res.nextResult();
|
||||
if(keyValueList != null && keyValueList.getList() != null) {
|
||||
for(KeyValue keyValue : keyValueList.getList()) {
|
||||
log.info("keyValue:{}", JSONObject.toJSONString(keyValue));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 修改
|
||||
boolean isModify = (res.nextResult() != null && res.nextResult().getList() != null) && (res.prevResult() != null && res.prevResult().getList() != null);
|
||||
if(isModify) {
|
||||
log.info("isModify");
|
||||
KeyValueList keyValueList = res.nextResult();
|
||||
if(keyValueList != null && keyValueList.getList() != null) {
|
||||
for(KeyValue keyValue : keyValueList.getList()) {
|
||||
log.info("keyValue:{}", JSONObject.toJSONString(keyValue));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
res.cause().printStackTrace();
|
||||
}
|
||||
})
|
||||
.start();
|
||||
}
|
||||
}
|
@ -1,35 +1,15 @@
|
||||
package com.sf.vertx.init;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.sf.vertx.api.pojo.VertxConfig;
|
||||
import com.sf.vertx.constans.RedisKeyConfig;
|
||||
import com.sf.vertx.handle.ApiRateLimitHandler;
|
||||
import com.sf.vertx.handle.AppConfigHandler;
|
||||
import com.sf.vertx.handle.AppRateLimitHandler;
|
||||
import com.sf.vertx.handle.BodyHandler;
|
||||
import com.sf.vertx.handle.ParameterCheckHandler;
|
||||
import com.sf.vertx.handle.ProxyHandler;
|
||||
import com.sf.vertx.handle.RestfulFailureHandler;
|
||||
import com.sf.vertx.utils.ProxyTool;
|
||||
|
||||
import io.vertx.core.Future;
|
||||
import io.vertx.core.Vertx;
|
||||
import io.vertx.core.http.HttpClient;
|
||||
import io.vertx.core.http.HttpServer;
|
||||
import io.vertx.core.http.HttpServerOptions;
|
||||
import io.vertx.ext.web.Router;
|
||||
import io.vertx.ext.web.client.WebClient;
|
||||
import io.vertx.httpproxy.HttpProxy;
|
||||
import io.vertx.httpproxy.ProxyContext;
|
||||
import io.vertx.httpproxy.ProxyInterceptor;
|
||||
import io.vertx.httpproxy.ProxyResponse;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/***
|
||||
@ -43,8 +23,9 @@ import lombok.extern.slf4j.Slf4j;
|
||||
@Component
|
||||
public class DynamicBuildServer implements ApplicationRunner {
|
||||
// TODO 后面可以和app挂钩
|
||||
@Value("${server.vertx.server.default.port}")
|
||||
private Integer serverDefaultPort;
|
||||
@Autowired
|
||||
private SacVertxConfig sacVertxConfig;
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String, String> redisTemplate;
|
||||
|
||||
@ -59,75 +40,19 @@ public class DynamicBuildServer implements ApplicationRunner {
|
||||
AppConfigHandler.initVertxConfig(redisTemplate);
|
||||
|
||||
// 加载vertx、应用配置
|
||||
appStartLoadData();
|
||||
startVertxService();
|
||||
}
|
||||
|
||||
|
||||
/***
|
||||
* 应用启动, 从redis读取配置,初始化vertx服务
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
private void appStartLoadData() throws Exception {
|
||||
Vertx vertx = AppConfigHandler.createVertx();
|
||||
// 从redis同步app配置
|
||||
AppConfigHandler.initAllAppConfig(redisTemplate);
|
||||
|
||||
VertxConfig vertxConfig = AppConfigHandler.getVertxConfig();
|
||||
// 创建HTTP监听
|
||||
// 所有ip都能访问
|
||||
HttpServerOptions httpServerOptions = new HttpServerOptions().setHost("0.0.0.0");
|
||||
HttpServer server = vertx.createHttpServer(httpServerOptions);
|
||||
Router mainHttpRouter = Router.router(vertx);
|
||||
Integer serverPort = vertxConfig.getPort() == null ? serverDefaultPort : vertxConfig.getPort();
|
||||
log.info("serverPort:{}", serverPort);
|
||||
server.requestHandler(mainHttpRouter).listen(serverPort, h -> {
|
||||
if (h.succeeded()) {
|
||||
log.info("HTTP端口监听成功:{}", serverPort);
|
||||
} else {
|
||||
log.error("HTTP端口监听失败:{}", serverPort);
|
||||
}
|
||||
});
|
||||
|
||||
// HttpClientOptions clientOptions = new HttpClientOptions();
|
||||
// clientOptions.setMaxPoolSize(20); // 最大连接池大小
|
||||
// clientOptions.setConnectTimeout(2000); // 连接超时 毫秒
|
||||
// clientOptions.setHttp2KeepAliveTimeout(1);
|
||||
// clientOptions.setIdleTimeout(1000); // 连接空闲超时 毫秒
|
||||
// HttpClient proxyClient = VERTX.createHttpClient(clientOptions);
|
||||
HttpClient proxyClient = vertx.createHttpClient();
|
||||
HttpProxy proxy = HttpProxy.reverseProxy(proxyClient);
|
||||
proxy.originSelector(request -> Future.succeededFuture(ProxyTool.resolveOriginAddress(request)));
|
||||
proxy.addInterceptor(new ProxyInterceptor() {
|
||||
@Override
|
||||
public Future<ProxyResponse> handleProxyRequest(ProxyContext context) {
|
||||
// if(StringUtils.equals(sacAppHeaderKey, "dsafdsfadafhappC")) {
|
||||
// // 会跳转到 RestfulFailureHandlerImpl
|
||||
// throw new HttpException(10003);
|
||||
// }
|
||||
return context.sendRequest();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> handleProxyResponse(ProxyContext context) {
|
||||
// 调试代码,获取reponse body
|
||||
// Filter filter = new Filter();
|
||||
// ProxyResponse proxyResponse = context.response();
|
||||
// Body body = proxyResponse.getBody();
|
||||
// proxyResponse.setBody(Body.body(filter.init(context.request().getURI(), body.stream(), false)));
|
||||
// 继续拦截链
|
||||
return context.sendResponse();
|
||||
}
|
||||
});
|
||||
WebClient mainWebClient = WebClient.create(vertx);
|
||||
|
||||
String rateLimitModel = vertxConfig.getRateLimitModel();
|
||||
rateLimitModel = "local";
|
||||
mainHttpRouter.route().handler(ParameterCheckHandler.create())
|
||||
.handler(AppRateLimitHandler.create(rateLimitModel))
|
||||
.handler(ApiRateLimitHandler.create(rateLimitModel))
|
||||
.handler(BodyHandler.create()).handler(ProxyHandler.create(mainWebClient, proxy))
|
||||
.failureHandler(RestfulFailureHandler.create());
|
||||
// mainHttpRouter.route().handler(ProxyHandler.create(mainWebClient, proxy));
|
||||
private void startVertxService() throws Exception {
|
||||
// 单机
|
||||
//AppConfigHandler.createVertx(redisTemplate, sacVertxConfig);
|
||||
// 集群
|
||||
AppConfigHandler.createHazelcastClusterVertx(redisTemplate, sacVertxConfig);
|
||||
}
|
||||
|
||||
}
|
||||
|
24
sf-vertx/src/main/java/com/sf/vertx/init/SacVertxConfig.java
Normal file
24
sf-vertx/src/main/java/com/sf/vertx/init/SacVertxConfig.java
Normal file
@ -0,0 +1,24 @@
|
||||
package com.sf.vertx.init;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/***
|
||||
* 配置文件
|
||||
* @author xy
|
||||
*
|
||||
*/
|
||||
@Component
|
||||
@Data
|
||||
public class SacVertxConfig {
|
||||
@Value("${server.vertx.server.default.port}")
|
||||
private Integer port;
|
||||
|
||||
@Value("${server.vertx.cluster.ip}")
|
||||
private String clusterIp;
|
||||
|
||||
@Value("${server.vertx.cluster.clusterName}")
|
||||
private String clusterName;
|
||||
}
|
@ -0,0 +1,13 @@
|
||||
package com.sf.vertx.pojo;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class ClusterEventMsg implements Serializable{
|
||||
private static final long serialVersionUID = -3380175299815557039L;
|
||||
private int type; // 1: app
|
||||
private int operation; // 1:新增,修改,3:删除
|
||||
private String appCode;
|
||||
}
|
@ -10,6 +10,7 @@ import com.sf.vertx.api.pojo.AppConfig;
|
||||
import com.sf.vertx.api.pojo.VertxConfig;
|
||||
import com.sf.vertx.constans.RedisKeyConfig;
|
||||
import com.sf.vertx.handle.AppConfigHandler;
|
||||
import com.sf.vertx.pojo.ClusterEventMsg;
|
||||
import com.sf.vertx.service.AppConfigService;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -19,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
public class AppConfigServiceImpl implements AppConfigService {
|
||||
@Value("${server.vertx.environment}")
|
||||
private String vertxEnvironment;
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String, String> redisTemplate;
|
||||
|
||||
@ -32,9 +34,12 @@ public class AppConfigServiceImpl implements AppConfigService {
|
||||
String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appConfig.getAppCode();
|
||||
redisTemplate.opsForValue().set(appCodeKey, JSONObject.toJSONString(appConfig));
|
||||
|
||||
// 初始化AppConfig本地缓存
|
||||
AppConfigHandler.initAppConfig(redisTemplate, appConfig.getAppCode(), true);
|
||||
|
||||
// 发布集群消息
|
||||
ClusterEventMsg msg = new ClusterEventMsg();
|
||||
msg.setType(1);
|
||||
msg.setOperation(1);
|
||||
msg.setAppCode(appConfig.getAppCode());
|
||||
AppConfigHandler.publishClusterEventMsg(msg);
|
||||
}
|
||||
|
||||
/***
|
||||
@ -46,9 +51,13 @@ public class AppConfigServiceImpl implements AppConfigService {
|
||||
redisTemplate.opsForZSet().remove(RedisKeyConfig.APP_CONFIG_SET_KEY, appConfig.getAppCode());
|
||||
String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appConfig.getAppCode();
|
||||
redisTemplate.delete(appCodeKey);
|
||||
|
||||
// 禁用本地缓存
|
||||
AppConfigHandler.addDisabledAppcode(appConfig.getAppCode());
|
||||
|
||||
// 发送集群消息
|
||||
ClusterEventMsg msg = new ClusterEventMsg();
|
||||
msg.setType(1);
|
||||
msg.setOperation(3);
|
||||
msg.setAppCode(appConfig.getAppCode());
|
||||
AppConfigHandler.publishClusterEventMsg(msg);
|
||||
}
|
||||
|
||||
/***
|
||||
|
30
sf-vertx/src/main/resources/cluster.xml
Normal file
30
sf-vertx/src/main/resources/cluster.xml
Normal file
@ -0,0 +1,30 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<hazelcast
|
||||
xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-4.1.xsd"
|
||||
xmlns="http://www.hazelcast.com/schema/config"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||
|
||||
<!-- 配置集群网络 -->
|
||||
<network>
|
||||
<!-- 配置Hazelcast节点之间通信的端口 -->
|
||||
<port auto-increment="true">5701</port>
|
||||
|
||||
<!-- 配置节点发现方式为TCP/IP -->
|
||||
<join>
|
||||
<tcp-ip enabled="true">
|
||||
<interface>127.0.0.1</interface>
|
||||
<!-- 添加集群中其他节点的IP地址 -->
|
||||
<member>127.0.0.1</member>
|
||||
|
||||
<!-- 添加更多节点的IP地址 -->
|
||||
</tcp-ip>
|
||||
</join>
|
||||
</network>
|
||||
|
||||
<!-- 管理中心的配置 -->
|
||||
<management-center enabled="true">
|
||||
<!-- 指定管理中心的 URL -->
|
||||
<url>http://127.0.0.1:8080/mancenter</url>
|
||||
</management-center>
|
||||
|
||||
</hazelcast>
|
Loading…
x
Reference in New Issue
Block a user