源码调整限流熔断、各种场景测试

This commit is contained in:
ztzh_xieyun 2024-05-08 19:10:15 +08:00
parent 48fc40334c
commit f884269e05
46 changed files with 3492 additions and 504 deletions

View File

@ -6,11 +6,10 @@ import java.util.List;
import lombok.Data;
@Data
public class GatewayInterface implements Serializable {
private static final long serialVersionUID = -313734935498432381L;
public class ApiConfig implements Serializable {
private static final long serialVersionUID = 5774283776114726263L;
private String apiCode;
private String uri;
private boolean uriRegular; // uri 正则
private String method; // 大写
private List<Strategy> strategy; // 策略
}

View File

@ -10,8 +10,8 @@ public class AppConfig implements Serializable {
private static final long serialVersionUID = 1518165296680157119L;
private String appCode; // 应用唯一码, app访问uri添加前缀,用于网关区分多应用
private boolean exclusiveService; // 预留字段, 独立端口
private Integer exclusiveGatewayConfigId; // 预留字段, 独享网关配置编号
private EnvironmentConfig environmentConfig; // 环境配置
private Integer exclusiveGatewayCode; // 预留字段, 独享网关配置编号
//private EnvironmentConfig environmentConfig; // 环境配置
private List<SacService> service; // 服务
private DataSecurity dataSecurity; // 数据加解密
private Strategy apiCurrentLimitingConfig; // 接口限流配置

View File

@ -11,6 +11,6 @@ public class SacService implements Serializable {
private String serviceName; // 服务名
private String serviceModel; // 模式, NORMAL, ROUTE
private ServerAddress serverAddress; // NORMAL模式的服务地址
private List<GatewayInterface> uriList; // uri列表
private List<ApiConfig> apiConfig; // request set header sacApiCode
private Router routeConfig; // 路由
}

View File

@ -8,9 +8,9 @@ import lombok.Data;
public class VertxConfig implements Serializable {
private static final long serialVersionUID = -1706421732809219829L;
private Integer port; // 启动端口
private String appHeaderKey;
private String appHeaderServiceName;
private String rateLimitModel = "redis"; // 负载均衡模式
private String appCodeHeaderKey = "sacAppCode";
private String apiCodeHeaderKey = "sacApiCode";
private String rateLimitModel = "redis"; // local,redis 负载均衡模式
private VertxOptionsConfig vertxOptionsConfig;
private HttpClientOptionsConfig httpClientOptionsConfig; // 配置Vert端口连接池
private AddressRetryStrategy addressRetryStrategy;

View File

@ -1,8 +1,16 @@
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.methodParameters=do not generate
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
org.eclipse.jdt.core.compiler.compliance=1.8
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enablePreviewFeatures=disabled
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=ignore
org.eclipse.jdt.core.compiler.problem.reportPreviewFeatures=warning
org.eclipse.jdt.core.compiler.release=disabled
org.eclipse.jdt.core.compiler.source=1.8

View File

@ -14,6 +14,7 @@
</description>
<properties>
<vertx.version>4.5.7</vertx.version>
<hystrix.version>1.5.2</hystrix.version>
</properties>
<dependencyManagement>
@ -112,10 +113,10 @@
<groupId>io.vertx</groupId>
<artifactId>vertx-web-client</artifactId>
</dependency>
<dependency>
<!--<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-circuit-breaker</artifactId>
</dependency>
</dependency>-->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-junit5</artifactId>
@ -174,7 +175,39 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<!-- for metrics -->
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
<version>2.1.12</version>
<optional>true</optional>
</dependency>
<!-- For tests and examples -->
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>${hystrix.version}</version>
<scope>provided</scope>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.jayway.restassured</groupId>
<artifactId>rest-assured</artifactId>
<version>2.8.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.6.2</version>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,101 @@
package io.vertx.circuitbreaker;
import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.impl.JsonUtil;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
/**
* Converter and mapper for {@link io.vertx.circuitbreaker.CircuitBreakerOptions}.
* NOTE: This class has been automatically generated from the {@link io.vertx.circuitbreaker.CircuitBreakerOptions} original class using Vert.x codegen.
*/
public class CircuitBreakerOptionsConverter {
private static final Base64.Decoder BASE64_DECODER = JsonUtil.BASE64_DECODER;
private static final Base64.Encoder BASE64_ENCODER = JsonUtil.BASE64_ENCODER;
static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, CircuitBreakerOptions obj) {
for (java.util.Map.Entry<String, Object> member : json) {
switch (member.getKey()) {
case "failuresRollingWindow":
if (member.getValue() instanceof Number) {
obj.setFailuresRollingWindow(((Number)member.getValue()).longValue());
}
break;
case "fallbackOnFailure":
if (member.getValue() instanceof Boolean) {
obj.setFallbackOnFailure((Boolean)member.getValue());
}
break;
case "maxFailures":
if (member.getValue() instanceof Number) {
obj.setMaxFailures(((Number)member.getValue()).intValue());
}
break;
case "maxRetries":
if (member.getValue() instanceof Number) {
obj.setMaxRetries(((Number)member.getValue()).intValue());
}
break;
case "metricsRollingBuckets":
if (member.getValue() instanceof Number) {
obj.setMetricsRollingBuckets(((Number)member.getValue()).intValue());
}
break;
case "metricsRollingWindow":
if (member.getValue() instanceof Number) {
obj.setMetricsRollingWindow(((Number)member.getValue()).longValue());
}
break;
case "notificationAddress":
if (member.getValue() instanceof String) {
obj.setNotificationAddress((String)member.getValue());
}
break;
case "notificationLocalOnly":
if (member.getValue() instanceof Boolean) {
obj.setNotificationLocalOnly((Boolean)member.getValue());
}
break;
case "notificationPeriod":
if (member.getValue() instanceof Number) {
obj.setNotificationPeriod(((Number)member.getValue()).longValue());
}
break;
case "resetTimeout":
if (member.getValue() instanceof Number) {
obj.setResetTimeout(((Number)member.getValue()).longValue());
}
break;
case "timeout":
if (member.getValue() instanceof Number) {
obj.setTimeout(((Number)member.getValue()).longValue());
}
break;
}
}
}
static void toJson(CircuitBreakerOptions obj, JsonObject json) {
toJson(obj, json.getMap());
}
static void toJson(CircuitBreakerOptions obj, java.util.Map<String, Object> json) {
json.put("failuresRollingWindow", obj.getFailuresRollingWindow());
json.put("fallbackOnFailure", obj.isFallbackOnFailure());
json.put("maxFailures", obj.getMaxFailures());
json.put("maxRetries", obj.getMaxRetries());
json.put("metricsRollingBuckets", obj.getMetricsRollingBuckets());
json.put("metricsRollingWindow", obj.getMetricsRollingWindow());
if (obj.getNotificationAddress() != null) {
json.put("notificationAddress", obj.getNotificationAddress());
}
json.put("notificationLocalOnly", obj.isNotificationLocalOnly());
json.put("notificationPeriod", obj.getNotificationPeriod());
json.put("resetTimeout", obj.getResetTimeout());
json.put("timeout", obj.getTimeout());
}
}

View File

@ -10,9 +10,9 @@ public class RedisKeyConfig {
private String vertxEnvironment;
public static final String BASE_REDIS_KEY = "vertx:config:";
public static String APP_CONFIG_PREFIX_KEY = null;
public static String APP_CONFIG_SET_KEY = null;
public static String APP_CURRENT_LIMITING_CONFIG_KEY = null;
public static String APP_CONFIG_PREFIX_KEY = null;
public static String VERTX_CONFIG_STRING_KEY = null;
public static String VERTX_ADDRESS_RETRY_STRATEGY_SET_KEY = null;
public static String VERTX_ADDRESS_RETRY_STRATEGY_KEY = null;
@ -21,7 +21,7 @@ public class RedisKeyConfig {
APP_CONFIG_PREFIX_KEY = BASE_REDIS_KEY + vertxEnvironment;
APP_CONFIG_SET_KEY = APP_CONFIG_PREFIX_KEY + ":set";
APP_CURRENT_LIMITING_CONFIG_KEY = APP_CONFIG_PREFIX_KEY + ":app:limiting";
VERTX_CONFIG_STRING_KEY = BASE_REDIS_KEY + vertxEnvironment + ":vertx";
VERTX_CONFIG_STRING_KEY = APP_CONFIG_PREFIX_KEY + ":vertx";
VERTX_ADDRESS_RETRY_STRATEGY_KEY = APP_CONFIG_PREFIX_KEY + ":addAddressRetryStrategy";
VERTX_ADDRESS_RETRY_STRATEGY_SET_KEY = VERTX_ADDRESS_RETRY_STRATEGY_KEY + ":set";
}

View File

@ -0,0 +1,7 @@
package com.sf.vertx.constans;
public class SacConstans {
public static final String CACHE_KEY_CONNECTOR = ":";
public static final String CIRCUIT_BREAKER = "CIRCUIT_BREAKER";
}

View File

@ -0,0 +1,51 @@
package com.sf.vertx.constans;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import io.vertx.core.json.JsonObject;
public class SacErrorCode {
public static final Integer DEFAULT_ERROR_CODE = 10000;
public final static Map<Integer, String> _ERROR = new HashMap<>();
static {
_ERROR.put(400, "Bad Request");
_ERROR.put(401, "Unauthorized");
_ERROR.put(403, "Forbidden");
_ERROR.put(404, "Not Found");
_ERROR.put(413, "Request Entity Too Large");
_ERROR.put(415, "Unsupported Media Type");
_ERROR.put(500, "Internal Server Error");
_ERROR.put(502, "Bad Gateway");
_ERROR.put(503, "Service Unavailable");
_ERROR.put(504, "Gateway Timeout");
_ERROR.put(504, "Gateway Timeout");
_ERROR.put(10000, "服务请求失败,请稍后再试"); // 默认错误提示
_ERROR.put(10001, "应用禁止访问,请联系管理员");
_ERROR.put(10010, "无法找到路由地址");
_ERROR.put(10011, "无法找到匹配的加解密算法");
_ERROR.put(10012, "参数传递错误");
_ERROR.put(10013, "无法匹配加解密、熔断代理模式");
_ERROR.put(10014, "反向代理执行错误");
_ERROR.put(10015, "请求url被限流");
_ERROR.put(10016, "请求url被熔断");
_ERROR.put(10017, "应用请求url被限流");
_ERROR.put(10018, "apiCode与uri不匹配");
};
public static JsonObject returnErrorMsg(Integer errorCode) {
JsonObject json = new JsonObject();
String msg = _ERROR.get(errorCode);
if(StringUtils.isBlank(msg)) {
// default
json.put("code", DEFAULT_ERROR_CODE);
json.put("msg", _ERROR.get(DEFAULT_ERROR_CODE));
} else {
json.put("code", errorCode);
json.put("msg", msg);
}
return json;
}
}

View File

@ -1,15 +1,18 @@
package com.sf.vertx.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson2.JSONObject;
import com.sf.vertx.api.pojo.AppConfig;
import com.sf.vertx.api.pojo.VertxConfig;
import com.sf.vertx.service.AppConfigService;
import io.vertx.core.json.JsonObject;
import lombok.extern.slf4j.Slf4j;
/***
@ -19,22 +22,37 @@ import lombok.extern.slf4j.Slf4j;
*
*/
@RestController
@RequestMapping("/vertx/config")
@Slf4j
@RequestMapping("/vertx")
public class AppConfigController {
@Autowired
private AppConfigService appConfigService;
@PostMapping("/saveAppConfig")
public String addAppConfig(@RequestBody String appConfig) {
@PostMapping("/app/config")
public JSONObject addAppConfig(@RequestBody AppConfig appConfig) {
appConfigService.saveAppConfig(appConfig);
return "success";
JSONObject json = new JSONObject();
json.put("code", 200);
json.put("msg", "success");
return json;
}
@PostMapping("/saveVertxConfig")
public String saveVertxConfig(@RequestBody VertxConfig vertxConfig) {
@DeleteMapping("/app/config")
public JSONObject deleteAppConfig(@RequestBody AppConfig appConfig) {
appConfigService.deleteAppConfig(appConfig);
JSONObject json = new JSONObject();
json.put("code", 200);
json.put("msg", "success");
return json;
}
@PostMapping("/vertx/config")
public JSONObject saveVertxConfig(@RequestBody VertxConfig vertxConfig) {
appConfigService.saveVertxConfig(vertxConfig);
return "success";
JSONObject json = new JSONObject();
json.put("code", 200);
json.put("msg", "success");
return json;
}

View File

@ -0,0 +1,293 @@
package com.sf.vertx.handle;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.TypeReference;
import com.sf.vertx.api.pojo.ApiConfig;
import com.sf.vertx.api.pojo.AppConfig;
import com.sf.vertx.api.pojo.Node;
import com.sf.vertx.api.pojo.RouteContent;
import com.sf.vertx.api.pojo.SacService;
import com.sf.vertx.api.pojo.Strategy;
import com.sf.vertx.api.pojo.VertxConfig;
import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing;
import com.sf.vertx.constans.RedisKeyConfig;
import com.sf.vertx.utils.ProxyTool;
import cn.hutool.core.collection.ConcurrentHashSet;
import io.vertx.circuitbreaker.CircuitBreaker;
import io.vertx.circuitbreaker.CircuitBreakerOptions;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import lombok.extern.slf4j.Slf4j;
/***
* vertx配置维护
*
* @author xy
*
*/
@Slf4j
public class AppConfigHandle {
private static VertxConfig VERTX_CONFIG = new VertxConfig();
public static Vertx VERTX;
public static CircuitBreaker CONNECTION_CIRCUIT_BREAKER;
// global cache app config
private static final ConcurrentHashMap<String, AppConfig> CACHE_APP_CONFIG_MAP = new ConcurrentHashMap<>();
// global app config appCode - Strategy
private static final ConcurrentHashMap<String, Strategy> GLOBAL_APP_CURRENT_LIMITING_CONFIG_MAP = new ConcurrentHashMap<>();
// global api config appCode - Strategy
private static final ConcurrentHashMap<String, Strategy> GLOBAL_API_CURRENT_LIMITING_CONFIG_MAP = new ConcurrentHashMap<>();
// appCode:apiCode:SacLoadBalancing
private static ConcurrentHashMap<String, SacLoadBalancing> LOADBALANCING_MAP = new ConcurrentHashMap<>();
// appCode:apiCode - ApiConfig
private static ConcurrentHashMap<String, ApiConfig> APICODE_CONFIG_MAP = new ConcurrentHashMap<>();
// apiCode限流配置 appCode:apiCode - Strategy
private static ConcurrentHashMap<String, Strategy> APICODE_CONFIG_CURRENT_LIMITING_MAP = new ConcurrentHashMap<>();
// apiCode熔断配置 appCode:apiCode - CircuitBreaker
private static ConcurrentHashMap<String, CircuitBreaker> APICODE_CONFIG_CIRCUIT_BREAKER_MAP = new ConcurrentHashMap<>();
// 禁用appCode
private static ConcurrentHashSet<String> DISABLED_APPCODE = new ConcurrentHashSet<String>();
public static void addDisabledAppcode(String appCode) {
DISABLED_APPCODE.add(appCode);
}
public static boolean isDisabledAppcode(String appCode) {
return DISABLED_APPCODE.contains(appCode);
}
public static boolean appDataSecurity(String appCode) {
return CACHE_APP_CONFIG_MAP.get(appCode).getDataSecurity() != null ? true : false;
}
public static Strategy getGlobalAppCurrentLimitingConfig(String appCode) {
return GLOBAL_APP_CURRENT_LIMITING_CONFIG_MAP.get(appCode);
}
public static Strategy getGlobalApiCurrentLimitingConfig(String appCode) {
return GLOBAL_API_CURRENT_LIMITING_CONFIG_MAP.get(appCode);
}
public static AppConfig getAppConfig(String appCode) {
return CACHE_APP_CONFIG_MAP.get(appCode);
}
public static boolean isDataSecurity(String appCode) {
return CACHE_APP_CONFIG_MAP.get(appCode).getDataSecurity() != null ? true : false;
}
public static boolean isApiCodeCircuitBreaker(String key) {
return APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key) != null ? true : false;
}
public static CircuitBreaker getApiCodeCircuitBreaker(String key) {
return APICODE_CONFIG_CIRCUIT_BREAKER_MAP.get(key);
}
public static Strategy getApiCurrentLimiting(String key) {
return APICODE_CONFIG_CURRENT_LIMITING_MAP.get(key);
}
public static VertxConfig getVertxConfig() {
return VERTX_CONFIG;
}
public static String getAppCodeHeaderKey() {
return VERTX_CONFIG.getAppCodeHeaderKey();
}
public static String getApiCodeHeaderKey() {
return VERTX_CONFIG.getApiCodeHeaderKey();
}
public static SacLoadBalancing getLoadBalancing(String key) {
return LOADBALANCING_MAP.get(key);
}
public static ApiConfig getApicodeConfigMap(String key) {
return APICODE_CONFIG_MAP.get(key);
}
public static boolean isApicodeUri(String key, String uri) {
return StringUtils.equals(APICODE_CONFIG_MAP.get(key).getUri(), uri);
}
/***
* 从redis加载数据
*
* @throws Exception
*/
public static void initAllAppConfig(RedisTemplate<String, String> redisTemplate) throws Exception {
Set<String> set = redisTemplate.opsForZSet().range(RedisKeyConfig.APP_CONFIG_SET_KEY, 0, -1);
for (String appCode : set) {
AppConfigHandle.initAppConfig(redisTemplate, appCode);
}
}
/***
* 加载vertx配置
*/
public static void initVertxConfig(RedisTemplate<String, String> redisTemplate) {
String vertxConfigKey = RedisKeyConfig.VERTX_CONFIG_STRING_KEY;
String vertxConfigValue = redisTemplate.opsForValue().get(vertxConfigKey);
if (StringUtils.isNotBlank(vertxConfigValue)) {
VERTX_CONFIG = JSONObject.parseObject(vertxConfigValue, VertxConfig.class);
}
}
public static void initAppConfig(RedisTemplate<String, String> redisTemplate, String appCode) {
String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appCode;
String appCodeValue = redisTemplate.opsForValue().get(appCodeKey);
if (StringUtils.isNotBlank(appCodeValue)) {
AppConfig appConfig = JSON.parseObject(appCodeValue, new TypeReference<AppConfig>() {
});
CACHE_APP_CONFIG_MAP.put(appCode, appConfig);
// appapi默认限流
if (appConfig.getApiCurrentLimitingConfig() != null) {
GLOBAL_API_CURRENT_LIMITING_CONFIG_MAP.put(appCode, appConfig.getApiCurrentLimitingConfig());
}
if (appConfig.getAppCurrentLimitingConfig() != null) {
GLOBAL_APP_CURRENT_LIMITING_CONFIG_MAP.put(appCode, appConfig.getAppCurrentLimitingConfig());
}
// app router负载均衡
for (SacService sacService : appConfig.getService()) {
List<Node> nodeList = new ArrayList<>();
// 获取service模式
if (StringUtils.equals(sacService.getServiceModel(), "NORMAL")) {
Node node = new Node();
node.setIp(sacService.getServerAddress().getHost());
node.setPort(sacService.getServerAddress().getPort());
node.setWeight(0);
node.setProtocol(sacService.getServerAddress().getProtocol());
nodeList.add(node);
} else if (StringUtils.equals(sacService.getServiceModel(), "ROUTE")) {
if (sacService.getRouteConfig() != null
&& StringUtils.equals(sacService.getRouteConfig().getRouteType(), "WEIGHT_ROUTE")) {
for (RouteContent routeContent : sacService.getRouteConfig().getRouteContent()) {
Node node = new Node();
node.setIp(routeContent.getServerAddress().getHost());
node.setPort(routeContent.getServerAddress().getPort());
node.setWeight(routeContent.getWeight() != null && routeContent.getWeight() > 0
? routeContent.getWeight()
: 0);
node.setProtocol(routeContent.getServerAddress().getProtocol());
nodeList.add(node);
}
}
}
// 初始化apiConfig
if (sacService.getApiConfig() != null && sacService.getApiConfig().size() > 0) {
for (ApiConfig apiConfig : sacService.getApiConfig()) {
String key = appCode + ":" + apiConfig.getApiCode();
APICODE_CONFIG_MAP.put(key, apiConfig);
if (nodeList.size() > 0) {
// 初始化负载均衡算法
SacLoadBalancing sacLoadBalancing = ProxyTool.roundRobin(nodeList);
LOADBALANCING_MAP.put(key, sacLoadBalancing);
}
if (apiConfig.getStrategy() != null && apiConfig.getStrategy().size() > 0) {
for (Strategy strategy : apiConfig.getStrategy()) {
if (StringUtils.equals(strategy.getType(), "CURRENT_LIMITING")) {
APICODE_CONFIG_CURRENT_LIMITING_MAP.put(key, strategy);
} else if (StringUtils.equals(strategy.getType(), "CIRCUIT_BREAKER")) {
String keyCircuitBreaker = key + ":" + "CIRCUIT_BREAKER";
// interfaceBreaker = CircuitBreaker.create("interfaceBreaker", VERTX,
// new CircuitBreakerOptions().setMaxFailures(3).setMaxRetries(5).setTimeout(2000)
// .setFallbackOnFailure(true)
// ).openHandler(v -> {
// log.info("Circuit opened");
// }).closeHandler(v -> {
// log.info("Circuit closed");
// });//.retryPolicy(retryCount -> retryCount * 100L);
// apiCode熔断
CircuitBreaker circuitBreaker = CircuitBreaker.create(keyCircuitBreaker + "-circuit-breaker",
VERTX, new CircuitBreakerOptions().setMaxFailures(strategy.getThreshold()) // 最大失败数
.setFailuresRollingWindow(strategy.getTimeWindow() * 1000) // 毫秒
// .setTimeout(2000) // 超时时间
.setFallbackOnFailure(true) // 失败后是否调用回退函数fallback
.setResetTimeout(strategy.getRecovery_interval()) // 在开启状态下尝试重试之前所需时间
).openHandler(v -> {
log.info(keyCircuitBreaker + " Circuit open");
}).halfOpenHandler(v -> {
log.info(keyCircuitBreaker + "Circuit halfOpen");
}).closeHandler(v -> {
log.info(keyCircuitBreaker + "Circuit close");
});
APICODE_CONFIG_CIRCUIT_BREAKER_MAP.put(keyCircuitBreaker, circuitBreaker);
}
}
}
}
}
}
}
}
public static Vertx createVertx() {
// TODO 编解码线程池,后面优化协程等方式
VertxOptions vertxOptions = new VertxOptions();
loadVertxOptions(vertxOptions);
VERTX = Vertx.vertx(vertxOptions);
initConnectionCircuitBreaker();
return VERTX;
}
/***
* 初始化connection Breaker
*/
private static void initConnectionCircuitBreaker() {
CONNECTION_CIRCUIT_BREAKER = CircuitBreaker.create("connectionCircuitBreaker-circuit-breaker", VERTX,
new CircuitBreakerOptions().setMaxFailures(3) // 最大失败数
.setTimeout(2000) // 超时时间
.setFallbackOnFailure(true) // 失败后是否调用回退函数fallback
.setResetTimeout(10000) // 在开启状态下尝试重试之前所需时间
).openHandler(v -> {
log.info("connectionCircuitBreaker Circuit open");
}).halfOpenHandler(v -> {
log.info("connectionCircuitBreaker Circuit halfOpen");
}).closeHandler(v -> {
log.info("connectionCircuitBreaker Circuit close");
});
}
private static void loadVertxOptions(VertxOptions vertxOptions) {
long blockedThreadCheckInterval = VERTX_CONFIG.getVertxOptionsConfig() == null ? -1
: VERTX_CONFIG.getVertxOptionsConfig().getBlockedThreadCheckInterval();
int workerPoolSize = VERTX_CONFIG == null || VERTX_CONFIG.getVertxOptionsConfig() == null ? -1
: VERTX_CONFIG.getVertxOptionsConfig().getWorkerPoolSize();
if (workerPoolSize != -1) {
vertxOptions.setWorkerPoolSize(workerPoolSize);
}
// TODO
blockedThreadCheckInterval = 1000000L;
if (blockedThreadCheckInterval != -1) {
vertxOptions.setBlockedThreadCheckInterval(blockedThreadCheckInterval); // 不打印Thread blocked 阻塞日志
}
}
}

View File

@ -5,11 +5,6 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import com.sf.vertx.service.impl.AppConfigServiceImpl;
import com.sf.vertx.utils.ProxyTool;
/*
* Copyright 2014 Red Hat, Inc.
*
@ -80,9 +75,14 @@ public class BodyHandlerImpl implements BodyHandler {
// TODO 改造了这个地方
final HttpServerRequest request = context.request();
final HttpServerResponse response = context.response();
String sacAppHeaderKey = request.getHeader(AppConfigServiceImpl.getSacAppHeaderKey());
if (StringUtils.isNotBlank(sacAppHeaderKey) && AppConfigServiceImpl.appDataSecurity(sacAppHeaderKey)) {
// 加解密在proxy拦截器解析跳转
String appCode = context.request().headers().get(AppConfigHandle.getAppCodeHeaderKey());
String apiCode = context.request().headers().get(AppConfigHandle.getApiCodeHeaderKey());
// 判断<br/>
// 1是否配置全局加解密.<br/>
// 2apiCode 配置熔断
String keyCircuitBreaker = appCode + ":" + apiCode + ":" + "CIRCUIT_BREAKER";
if (AppConfigHandle.isDataSecurity(appCode)
|| AppConfigHandle.isApiCodeCircuitBreaker(keyCircuitBreaker)) {
// =======源码流程
// final HttpServerRequest request = context.request();
// final HttpServerResponse response = context.response();

View File

@ -0,0 +1,19 @@
package com.sf.vertx.handle;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;
/***
* 入口参数校验
*
* @author xy
*
*/
@VertxGen
public interface ParameterCheckHandler extends Handler<RoutingContext> {
static ParameterCheckHandler create() {
return new ParameterCheckHandlerImpl();
}
}

View File

@ -0,0 +1,36 @@
package com.sf.vertx.handle;
import org.apache.commons.lang3.StringUtils;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.HttpException;
public class ParameterCheckHandlerImpl implements ParameterCheckHandler {
@Override
public void handle(RoutingContext rc) {
String appCode = rc.request().headers().get(AppConfigHandle.getAppCodeHeaderKey());
String apiCode = rc.request().headers().get(AppConfigHandle.getApiCodeHeaderKey());
String key = appCode + ":" + apiCode;
if (StringUtils.isBlank(appCode) || StringUtils.isBlank(apiCode)
|| AppConfigHandle.getAppConfig(appCode) == null
|| AppConfigHandle.getApicodeConfigMap(key) == null) {
rc.fail(new HttpException(10012));
return;
}
if(AppConfigHandle.isDisabledAppcode(apiCode)) {
rc.fail(new HttpException(10001));
return;
}
String uri = rc.request().uri();
if(AppConfigHandle.isApicodeUri(key, uri) == false) {
rc.fail(new HttpException(10018));
return;
}
rc.next();
return;
}
}

View File

@ -1,6 +1,5 @@
package com.sf.vertx.handle;
import io.vertx.circuitbreaker.CircuitBreaker;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;

View File

@ -1,6 +1,5 @@
package com.sf.vertx.handle;
import io.vertx.circuitbreaker.CircuitBreaker;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.client.WebClient;
import io.vertx.httpproxy.HttpProxy;

View File

@ -0,0 +1,135 @@
//package com.sf.vertx.handle;
//
//import java.util.HashMap;
//import java.util.Map;
//
//import org.apache.commons.lang3.StringUtils;
//
//import com.sf.vertx.api.pojo.GatewayInterface;
//import com.sf.vertx.api.pojo.SacService;
//import com.sf.vertx.api.pojo.Strategy;
//import com.sf.vertx.constans.SacConstans;
//import com.sf.vertx.service.impl.AppConfigServiceImpl;
//import com.sf.vertx.utils.ProxyTool;
//
//import io.vertx.circuitbreaker.CircuitBreaker;
//import io.vertx.httpproxy.ProxyContext;
//import lombok.extern.slf4j.Slf4j;
//
//@Slf4j
//public class ProxyModelUrlFuse {
//
// private static Map<String, Object> match(ProxyContext context) {
// Map<String, Object> match = new HashMap<>();
// Strategy strategyFuse = null; // 接口
// String sacAppHeaderKey = context.request().headers().get(AppConfigServiceImpl.getSacAppHeaderKey());
// String serviceName = context.request().headers().get(AppConfigServiceImpl.getAppHeaderServiceName());
//
// SacService sacService = AppConfigServiceImpl.getAppService(sacAppHeaderKey, serviceName);
// boolean matchUri = false;
// GatewayInterface _gatewayInterface = null;
// if (sacService.getUriList() != null && sacService.getUriList().size() > 0) {
// for (GatewayInterface gatewayInterface : sacService.getUriList()) {
// // uri匹配
// if (gatewayInterface.isUriRegular()) {
// if (ProxyTool.regexMatch(gatewayInterface.getUri(), context.request().getURI())) {
// matchUri = true;
// }
// } else {
// if (StringUtils.equals(gatewayInterface.getUri(), context.request().getURI())) {
// matchUri = true;
// }
// }
//
// if (matchUri) {
// if (gatewayInterface.getStrategy() != null && gatewayInterface.getStrategy().size() > 0) {
// for (Strategy strategy : gatewayInterface.getStrategy()) {
// if (StringUtils.equals(strategy.getType(), SacConstans.CIRCUIT_BREAKER)) {
// strategyFuse = strategy;
// _gatewayInterface = gatewayInterface;
// break;
// }
// }
// break;
// }
// }
// }
// if (strategyFuse != null) {
// match.put("strategyFuse", strategyFuse);
// }
//
// if (_gatewayInterface != null) {
// match.put("gatewayInterface", _gatewayInterface);
// }
// }
// return match;
// }
//
// private static String getBreakerName(ProxyContext context) {
// Map<String, Object> match = match(context);
// if (match.get("strategyFuse") != null) {
// GatewayInterface gatewayInterface = (GatewayInterface)match.get("gatewayInterface");
// String sacAppHeaderKey = context.request().headers().get(AppConfigServiceImpl.getSacAppHeaderKey());
// String serviceName = context.request().headers().get(AppConfigServiceImpl.getAppHeaderServiceName());
// String breakerName = sacAppHeaderKey + SacConstans.CACHE_KEY_CONNECTOR + serviceName
// + SacConstans.CACHE_KEY_CONNECTOR + gatewayInterface.getUri() + SacConstans.CACHE_KEY_CONNECTOR
// + gatewayInterface.getMethod();
//
// }
// return null;
// }
//
// /***
// * 接口熔断校验
// *
// * @param context
// */
// public static String validateFuse(ProxyContext context) {
// Map<String, Object> match = match(context);
// if (match.get("strategyFuse") != null) {
// GatewayInterface _gatewayInterface = (GatewayInterface)match.get("gatewayInterface");
// String sacAppHeaderKey = context.request().headers().get(AppConfigServiceImpl.getSacAppHeaderKey());
// String serviceName = context.request().headers().get(AppConfigServiceImpl.getAppHeaderServiceName());
// GatewayInterface gatewayInterface = (GatewayInterface)match.get("gatewayInterface");
// String breakerName = sacAppHeaderKey + SacConstans.CACHE_KEY_CONNECTOR + serviceName
// + SacConstans.CACHE_KEY_CONNECTOR + _gatewayInterface.getUri() + SacConstans.CACHE_KEY_CONNECTOR
// + _gatewayInterface.getMethod();
// CircuitBreaker circuitBreaker = AppConfigServiceImpl.getCacheApiBreakerConfig(breakerName);
// circuitBreaker.executeWithFallback(promise -> {
// if (context.response().getStatusCode() == 200) {
// promise.complete("1");
// } else {
// promise.fail("2");
// }
// }, v -> {
// // Executed when the circuit is opened
// log.info(breakerName + " executed when the circuit is opened:{}", v);
// return "3";
// }, ar -> {
// // Do something with the result
// log.info(breakerName + " interface result.{} ", ar);
// });
// }
// return match.get("strategyFuse") == null ? null : ((Strategy) match.get("strategyFuse")).getDefaultResponse();
// }
//
//// private void add(String rateLimitModel, RedisTemplate<String, Integer> redisTemplate, String key) {
////
//// // 新增
////
//// String limitKey = sacAppHeaderKey + ";" + serviceName + ";" + context.request().getMethod().name() + ";"
//// + context.request().getURI();
//// ThreadUtil.execAsync(() -> {
//// add(rateLimitModel, limitKey);
//// });
//// Buffer buffer = Buffer.buffer("Gateway Interface return error!");
//// context.response().setBody(Body.body(buffer));
////
//// // log.info("redis app threshold: {}", redisTemplate.opsForValue().get(key));
//// Long incr = redisTemplate.opsForValue().increment(key);
//// if (incr == 1) { // 创建,才设置时间窗口
//// redisTemplate.expire(key, timeWindow, TimeUnit.SECONDS);
//// }
//// }
//
//}

View File

@ -2,12 +2,7 @@ package com.sf.vertx.handle;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import com.sf.vertx.api.pojo.GatewayInterface;
import com.sf.vertx.api.pojo.SacService;
import com.sf.vertx.api.pojo.Strategy;
import com.sf.vertx.service.impl.AppConfigServiceImpl;
import com.sf.vertx.utils.ProxyTool;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.HttpException;
@ -30,67 +25,30 @@ public class RateLimitHandlerRedisImpl implements RateLimitHandler {
@Override
public void handle(RoutingContext rc) {
String sacAppHeaderKey = rc.request().headers().get(AppConfigServiceImpl.getSacAppHeaderKey());
String appHeaderServiceName = rc.request().getHeader(AppConfigServiceImpl.getAppHeaderServiceName());
if (StringUtils.isBlank(sacAppHeaderKey) || StringUtils.isBlank(appHeaderServiceName)) { // 参数传递错误
rc.fail(new HttpException(10003));
return;
}
// 查询模式
SacService sacService = AppConfigServiceImpl.getSacService(sacAppHeaderKey, appHeaderServiceName);
if (sacService == null) { // 参数传递错误
rc.fail(new HttpException(10003));
return;
}
boolean matchUri = false;
Strategy strategyInterface = null; // 接口
if (sacService.getUriList() != null) {
for (GatewayInterface uri : sacService.getUriList()) {
if (uri.isUriRegular()) {
if (ProxyTool.regexMatch(uri.getUri(), rc.request().uri())) {
matchUri = true;
}
} else {
if (StringUtils.equals(uri.getUri(), rc.request().uri())) {
matchUri = true;
}
}
if (matchUri) {
for (Strategy strategy : uri.getStrategy()) {
if (StringUtils.equals(strategy.getType(), "CURRENT_LIMITING")) {
strategyInterface = strategy;
}
}
break;
}
}
}
String appCodeHeaderKey = rc.request().headers().get(AppConfigHandle.getAppCodeHeaderKey());
String apiCodeHeaderKey = rc.request().headers().get(AppConfigHandle.getApiCodeHeaderKey());
Strategy apiCodeStrategy = AppConfigHandle.getApiCurrentLimiting(appCodeHeaderKey + ":" + apiCodeHeaderKey);
strategyInterface = strategyInterface != null ? strategyInterface
: (AppConfigServiceImpl.getAppConfig(sacAppHeaderKey) != null
&& AppConfigServiceImpl.getAppConfig(sacAppHeaderKey).getApiCurrentLimitingConfig() != null
? AppConfigServiceImpl.getAppConfig(sacAppHeaderKey).getApiCurrentLimitingConfig()
: null);
Strategy globalApiStrategy = AppConfigHandle.getGlobalApiCurrentLimitingConfig(appCodeHeaderKey);
Strategy globalAppStrategy = AppConfigHandle.getGlobalAppCurrentLimitingConfig(appCodeHeaderKey);
Strategy strategyApp = AppConfigServiceImpl.getAppConfig(sacAppHeaderKey) != null
&& AppConfigServiceImpl.getAppConfig(sacAppHeaderKey).getAppCurrentLimitingConfig() != null
? AppConfigServiceImpl.getAppConfig(sacAppHeaderKey).getAppCurrentLimitingConfig()
: null;
if (strategyInterface != null || strategyApp != null) {
Map<Integer, Boolean> retMap = rateLimiter.acquire(rc, strategyInterface, strategyApp);
Strategy apiStrategy = apiCodeStrategy != null ? apiCodeStrategy
: globalApiStrategy != null ? globalApiStrategy : null;
Strategy appStrategy = globalAppStrategy != null ? globalAppStrategy : null;
if (apiStrategy != null || appStrategy != null) {
Map<Integer, Boolean> retMap = rateLimiter.acquire(rc, apiStrategy, appStrategy);
if (strategyInterface != null && retMap.get(1) == false) {
rc.fail(new HttpException(10101, strategyInterface.getDefaultResponse()));
if (apiStrategy != null && retMap.get(1) == false) {
rc.fail(new HttpException(10015, apiStrategy.getDefaultResponse()));
return;
}
if (strategyApp != null && retMap.get(2) == false) {
rc.fail(new HttpException(10102, strategyApp.getDefaultResponse()));
if (appStrategy != null && retMap.get(2) == false) {
rc.fail(new HttpException(10017, appStrategy.getDefaultResponse()));
return;
}
}
log.info("rateLimiter.acquire true");
rc.next();
return;
}

View File

@ -8,7 +8,6 @@ import org.springframework.data.redis.core.RedisTemplate;
import com.sf.vertx.api.pojo.Strategy;
import com.sf.vertx.constans.RedisKeyConfig;
import com.sf.vertx.service.impl.AppConfigServiceImpl;
import com.sf.vertx.utils.SpringUtils;
import cn.hutool.core.thread.ThreadUtil;
@ -17,20 +16,20 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RedisRateLimiter {
public Map<Integer, Boolean> acquire(RoutingContext rc, Strategy strategyInterface, Strategy strategyApp) {
String appCode = rc.request().headers().get(AppConfigServiceImpl.getSacAppHeaderKey());
public Map<Integer, Boolean> acquire(RoutingContext rc, Strategy apiStrategy, Strategy appStrategy) {
String appCodeHeaderKey = rc.request().headers().get(AppConfigHandle.getAppCodeHeaderKey());
String key = null;
Map<Integer, Boolean> retMap = new HashMap<>();
if (strategyInterface != null) {
key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCode + ":" + rc.request().uri() + ":"
+ rc.request().method();
Boolean ret = rateLimiter(key, appCode, strategyInterface.getThreshold(), strategyInterface.getTimeWindow());
if (apiStrategy != null) {
key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCodeHeaderKey + ":" + rc.request().uri()
+ ":" + rc.request().method();
Boolean ret = rateLimiter(key, appCodeHeaderKey, apiStrategy.getThreshold(), apiStrategy.getTimeWindow());
retMap.put(1, ret);
}
if (strategyApp != null) {
key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCode;
Boolean ret = rateLimiter(key, appCode, strategyApp.getThreshold(), strategyApp.getTimeWindow());
if (appStrategy != null) {
key = RedisKeyConfig.APP_CURRENT_LIMITING_CONFIG_KEY + ":" + appCodeHeaderKey;
Boolean ret = rateLimiter(key, appCodeHeaderKey, appStrategy.getThreshold(), appStrategy.getTimeWindow());
retMap.put(2, ret);
}
return retMap;
@ -39,18 +38,17 @@ public class RedisRateLimiter {
@SuppressWarnings("unchecked")
private Boolean rateLimiter(String key, String appCode, Integer threshold, Integer timeWindow) {
RedisTemplate<String, Integer> redisTemplate = SpringUtils.getBean("redisTemplate", RedisTemplate.class);
Object count = redisTemplate.opsForValue().get(key);
Integer count = redisTemplate.opsForValue().get(key);
// redisTemplate.delete(key);
log.info("count:{}", count);
log.info("redis limiter. key:{}, count:{}", key, count);
// 初始化,设置过期时间
ThreadUtil.execAsync(() -> {
add(timeWindow, redisTemplate, key);
increment(timeWindow, redisTemplate, key);
});
return count != null && Integer.valueOf(count.toString()) <= threshold ? true : false;
return (count == null || count <= threshold) ? true : false;
}
private void add(Integer timeWindow, RedisTemplate<String, Integer> redisTemplate, String key) {
// log.info("redis app threshold: {}", redisTemplate.opsForValue().get(key));
private void increment(Integer timeWindow, RedisTemplate<String, Integer> redisTemplate, String key) {
Long incr = redisTemplate.opsForValue().increment(key);
if (incr == 1) { // 创建,才设置时间窗口
redisTemplate.expire(key, timeWindow, TimeUnit.SECONDS);

View File

@ -1,5 +1,8 @@
package com.sf.vertx.handle;
import org.apache.commons.lang3.StringUtils;
import com.sf.vertx.constans.SacErrorCode;
import com.sf.vertx.utils.ProxyTool;
import io.vertx.core.http.HttpHeaders;
@ -7,28 +10,32 @@ import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.HttpException;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RestfulFailureHandlerImpl implements RestfulFailureHandler {
@SuppressWarnings("unlikely-arg-type")
@Override
public void handle(RoutingContext frc) {
Throwable failure = frc.failure();
log.info("Throwable error:{}", failure);
if (failure instanceof HttpException) {
HttpException httpException = (HttpException) failure;
// frc.response().setStatusCode(404).end();
if(ProxyTool._ERROR.containsKey(httpException.getPayload())) {
JsonObject dataJson = new JsonObject(httpException.getPayload());
frc.response().setChunked(true).setStatusCode(httpException.getStatusCode())
.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(dataJson.size()))
.putHeader("Content-Type", "application/json").end(dataJson.toBuffer());
return;
JsonObject errorJson = null;
try {
Throwable failure = frc.failure();
if (failure instanceof HttpException) {
HttpException httpException = (HttpException) failure;
if (StringUtils.isNoneBlank(httpException.getPayload())) {
errorJson = new JsonObject(httpException.getPayload());
} else {
errorJson = SacErrorCode.returnErrorMsg(httpException.getStatusCode());
}
} else {
errorJson = SacErrorCode.returnErrorMsg(SacErrorCode.DEFAULT_ERROR_CODE);
}
} catch (Exception e) {
e.printStackTrace();
errorJson = SacErrorCode.returnErrorMsg(SacErrorCode.DEFAULT_ERROR_CODE);
}
frc.response().setChunked(true).setStatusCode(400).putHeader("Content-Type", "application/json")
.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(ProxyTool.DEFAULT_ERROR_MSG.length()))
.end(ProxyTool.DEFAULT_ERROR_MSG);
frc.response().setChunked(true).setStatusCode(500).putHeader("Content-Type", "application/json")
.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(errorJson.size())).end(errorJson.toBuffer());
return;
}

View File

@ -1,6 +1,5 @@
package com.sf.vertx.init;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
@ -11,31 +10,21 @@ import org.springframework.stereotype.Component;
import com.sf.vertx.api.pojo.VertxConfig;
import com.sf.vertx.constans.RedisKeyConfig;
import com.sf.vertx.handle.AppConfigHandle;
import com.sf.vertx.handle.BodyHandler;
import com.sf.vertx.handle.ParameterCheckHandler;
import com.sf.vertx.handle.ProxyHandler;
import com.sf.vertx.handle.RateLimitHandler;
import com.sf.vertx.handle.RestfulFailureHandler;
import com.sf.vertx.service.AppConfigService;
import com.sf.vertx.service.impl.AppConfigServiceImpl;
import com.sf.vertx.utils.Filter;
import com.sf.vertx.utils.ProxyTool;
import io.vertx.circuitbreaker.CircuitBreaker;
import io.vertx.circuitbreaker.CircuitBreakerOptions;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.client.WebClient;
import io.vertx.httpproxy.Body;
import io.vertx.httpproxy.HttpProxy;
import io.vertx.httpproxy.ProxyContext;
import io.vertx.httpproxy.ProxyInterceptor;
@ -52,15 +41,12 @@ import lombok.extern.slf4j.Slf4j;
@Order(value = 10)
@Component
public class DynamicBuildServer implements ApplicationRunner {
// TODO 后面可以和app挂钩
@Value("${server.vertx.server.default.port}")
private Integer serverDefaultPort;
public static CircuitBreaker appCircuitBreaker;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private AppConfigService appConfigService;
@Autowired
private RedisKeyConfig redisKeyConfig;
@ -68,65 +54,30 @@ public class DynamicBuildServer implements ApplicationRunner {
public void run(ApplicationArguments args) throws Exception {
// 初始化redis key
redisKeyConfig.init();
// 从redis同步app配置
appConfigService.initAllAppConfig();
// 从redis同步vertx配置
appConfigService.initVertxConfig();
AppConfigHandle.initVertxConfig(redisTemplate);
// 加载vertx应用配置
appStartLoadData();
}
/***
* 应用启动, 从redis读取配置,初始化vertx服务
* @throws Exception
*/
private void appStartLoadData() {
VertxConfig vertxConfig = AppConfigServiceImpl.getVertxConfig();
// TODO 编解码线程池,后面优化协程等方式
VertxOptions vertxOptions = new VertxOptions();
loadVertxOptions(vertxConfig, vertxOptions);
Vertx VERTX = Vertx.vertx(vertxOptions);
// CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", VERTX,
// new CircuitBreakerOptions().setMaxFailures(3).setMaxRetries(5).setTimeout(2000)
// .setFallbackOnFailure(true)
// ).openHandler(v -> {
// log.info("Circuit opened");
// }).closeHandler(v -> {
// log.info("Circuit closed");
// });//.retryPolicy(retryCount -> retryCount * 100L);
appCircuitBreaker = CircuitBreaker.create("my-circuit-breaker", VERTX, new CircuitBreakerOptions().setMaxFailures(3) // 最大失败数
.setTimeout(2000) // 超时时间
.setFallbackOnFailure(true) // 失败后是否调用回退函数fallback
.setResetTimeout(10000) // 在开启状态下尝试重试之前所需时间
).openHandler(v -> {
log.info("Circuit open");
}).halfOpenHandler(v -> {
log.info("Circuit halfOpen");
}).closeHandler(v -> {
log.info("Circuit close");
});
private void appStartLoadData() throws Exception {
Vertx vertx = AppConfigHandle.createVertx();
// 从redis同步app配置
AppConfigHandle.initAllAppConfig(redisTemplate);
CircuitBreaker interfaceBreaker = CircuitBreaker.create("my-circuit-breaker", VERTX, new CircuitBreakerOptions().setMaxFailures(3) // 最大失败数
.setTimeout(2000) // 超时时间
.setFallbackOnFailure(true) // 失败后是否调用回退函数fallback
.setResetTimeout(10000) // 在开启状态下尝试重试之前所需时间
).openHandler(v -> {
log.info("Circuit open");
}).halfOpenHandler(v -> {
log.info("Circuit halfOpen");
}).closeHandler(v -> {
log.info("Circuit close");
});
VertxConfig vertxConfig = AppConfigHandle.getVertxConfig();
// 创建HTTP监听
// 所有ip都能访问
HttpServerOptions httpServerOptions = new HttpServerOptions().setHost("0.0.0.0");
HttpServer server = VERTX.createHttpServer(httpServerOptions);
Router mainHttpRouter = Router.router(VERTX);
Integer serverPort = (vertxConfig == null || vertxConfig.getPort() == null) ? serverDefaultPort
: vertxConfig.getPort();
HttpServer server = vertx.createHttpServer(httpServerOptions);
Router mainHttpRouter = Router.router(vertx);
Integer serverPort = vertxConfig.getPort() == null ? serverDefaultPort : vertxConfig.getPort();
log.info("serverPort:{}", serverPort);
server.requestHandler(mainHttpRouter).listen(serverPort, h -> {
if (h.succeeded()) {
@ -142,72 +93,36 @@ public class DynamicBuildServer implements ApplicationRunner {
// clientOptions.setHttp2KeepAliveTimeout(1);
// clientOptions.setIdleTimeout(1000); // 连接空闲超时 毫秒
// HttpClient proxyClient = VERTX.createHttpClient(clientOptions);
HttpClient proxyClient = VERTX.createHttpClient();
HttpClient proxyClient = vertx.createHttpClient();
HttpProxy proxy = HttpProxy.reverseProxy(proxyClient);
proxy.originSelector(request -> Future.succeededFuture(ProxyTool.resolveOriginAddress(request)));
proxy.addInterceptor(new ProxyInterceptor() {
@Override
public Future<ProxyResponse> handleProxyRequest(ProxyContext context) {
// 修改uri context.request().setURI();
String sacAppHeaderKey = context.request().headers().get(AppConfigServiceImpl.getSacAppHeaderKey());
log.info("addInterceptor uri appCode:{}", sacAppHeaderKey);
// if(StringUtils.equals(sacAppHeaderKey, "dsafdsfadafhappC")) {
// // 会跳转到 RestfulFailureHandlerImpl
// throw new HttpException(10003);
// }
return context.sendRequest();
}
@Override
public Future<Void> handleProxyResponse(ProxyContext context) {
// 调试代码,获取reponse body
Filter filter = new Filter();
ProxyResponse proxyResponse = context.response();
Body body = proxyResponse.getBody();
proxyResponse.setBody(Body.body(filter.init(context.request().getURI(), body.stream(), false)));
// Filter filter = new Filter();
// ProxyResponse proxyResponse = context.response();
// Body body = proxyResponse.getBody();
// proxyResponse.setBody(Body.body(filter.init(context.request().getURI(), body.stream(), false)));
// 继续拦截链
return context.sendResponse();
}
});
WebClient mainWebClient = WebClient.create(VERTX);
// mainHttpRouter.route().handler(ProxyHandler.create(proxy));
WebClient mainWebClient = WebClient.create(vertx);
// TODO 实例化方式 VertxConfig 读取
String rateLimitModel = vertxConfig.getRateLimitModel() != null
&& StringUtils.equals(vertxConfig.getRateLimitModel(), "redis") ? "redis" : "local";
mainHttpRouter.route().handler(RateLimitHandler.create(rateLimitModel)).handler(BodyHandler.create())
.handler(ProxyHandler.create(mainWebClient, proxy))
.failureHandler(RestfulFailureHandler.create());
// mainHttpRouter.route().handler(BodyHandler.create()).handler(ProxyHandler.create(mainWebClient,proxy));
// 服务健康检测重试
// Integer periodicTime = AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy() != null
// && AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy().getPeriodicTime() > 0
// ? AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy().getPeriodicTime()
// : 3;
//
// // TODO 是否开启健康检测
// long timerID = VERTX.setPeriodic(periodicTime, id -> {
// Set<String> set = redisTemplate.opsForZSet().range(RedisKeyConfig.APP_CONFIG_SET_KEY, 0, -1);
// for (String appCode : set) {
// Set<String> setAddressRetryStrategy = redisTemplate.opsForZSet()
// .range(RedisKeyConfig.VERTX_ADDRESS_RETRY_STRATEGY_SET_KEY + ":" + appCode, 0, -1);
// for (String address : setAddressRetryStrategy) {
// // 发起请求,测试服务是否可用
// // TODO 调用后端配置的健康检测地址
// }
//
// }
// });
String rateLimitModel = vertxConfig.getRateLimitModel();
mainHttpRouter.route().handler(ParameterCheckHandler.create()).handler(RateLimitHandler.create(rateLimitModel)).handler(BodyHandler.create())
.handler(ProxyHandler.create(mainWebClient, proxy)).failureHandler(RestfulFailureHandler.create());
// mainHttpRouter.route().handler(ProxyHandler.create(mainWebClient, proxy));
}
private void loadVertxOptions(VertxConfig vertxConfig, VertxOptions vertxOptions) {
long blockedThreadCheckInterval = vertxConfig == null || vertxConfig.getVertxOptionsConfig() == null ? -1
: vertxConfig.getVertxOptionsConfig().getBlockedThreadCheckInterval();
int workerPoolSize = vertxConfig == null || vertxConfig.getVertxOptionsConfig() == null ? -1
: vertxConfig.getVertxOptionsConfig().getWorkerPoolSize();
if (workerPoolSize != -1) {
vertxOptions.setWorkerPoolSize(workerPoolSize);
}
blockedThreadCheckInterval = 1000000L;
if (blockedThreadCheckInterval != -1) {
vertxOptions.setBlockedThreadCheckInterval(blockedThreadCheckInterval); // 不打印Thread blocked 阻塞日志
}
}
}

View File

@ -2,17 +2,12 @@ package com.sf.vertx.service;
import com.sf.vertx.api.pojo.AppConfig;
import com.sf.vertx.api.pojo.VertxConfig;
import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing;
public interface AppConfigService {
void initAllAppConfig() throws Exception;
void saveAppConfig(String appConfig);
void saveAppConfig(AppConfig appConfig);
void deleteAppConfig(AppConfig appConfig);
void initVertxConfig();
void saveVertxConfig(VertxConfig vertxConfig);
}

View File

@ -1,33 +1,17 @@
package com.sf.vertx.service.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.TypeReference;
import com.sf.vertx.api.pojo.AppConfig;
import com.sf.vertx.api.pojo.Node;
import com.sf.vertx.api.pojo.RouteContent;
import com.sf.vertx.api.pojo.SacService;
import com.sf.vertx.api.pojo.Strategy;
import com.sf.vertx.api.pojo.VertxConfig;
import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing;
import com.sf.vertx.constans.RedisKeyConfig;
import com.sf.vertx.handle.AppConfigHandle;
import com.sf.vertx.service.AppConfigService;
import com.sf.vertx.utils.ProxyTool;
import com.sf.vertx.utils.SpringUtils;
import cn.hutool.core.collection.ConcurrentHashSet;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ -37,172 +21,20 @@ public class AppConfigServiceImpl implements AppConfigService {
private String vertxEnvironment;
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static VertxConfig VERTX_CONFIG = new VertxConfig();
private static final ConcurrentHashMap<String, AppConfig> CACHE_APP_CONFIG = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, Strategy> CACHE_APP_CURRENT_LIMITING_CONFIG = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, Strategy> CACHE_API_CURRENT_LIMITING_CONFIG = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, SacService> CACHE_APP_SERVICE = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, SacLoadBalancing> SAC_LOADBALANCING_MAP = new ConcurrentHashMap<String, SacLoadBalancing>();
private static ConcurrentHashSet<String> CACHE_DISABLED_APP = new ConcurrentHashSet<String>();
@SuppressWarnings("unchecked")
public static void addAddressRetryStrategy(String address, String appCode) {
String setKey = RedisKeyConfig.VERTX_ADDRESS_RETRY_STRATEGY_SET_KEY + ":" + appCode;
String key = RedisKeyConfig.VERTX_ADDRESS_RETRY_STRATEGY_KEY + ":" + appCode + ":" + address;
RedisTemplate<String, String> redisTemplate = SpringUtils.getBean("redisTemplate", RedisTemplate.class);
Long thresholdCount = redisTemplate.opsForValue().increment(key);
// log.info("redis app threshold: {}", redisTemplate.opsForValue().get(key));
Integer timeWindow = VERTX_CONFIG.getAddressRetryStrategy() != null
&& VERTX_CONFIG.getAddressRetryStrategy().getTimeWindow() > 0
? VERTX_CONFIG.getAddressRetryStrategy().getTimeWindow()
: 20;
if (thresholdCount == 1) { // 创建,才设置时间窗口
redisTemplate.expire(key, timeWindow, TimeUnit.SECONDS);
}
Integer threshold = AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy() != null
&& AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy().getThreshold() > 0
? AppConfigServiceImpl.getVertxConfig().getAddressRetryStrategy().getThreshold()
: 3;
if (thresholdCount > threshold) {
// 设置服务不可用
redisTemplate.opsForZSet().add(setKey, appCode + ":" + address, 0);
}
}
public static boolean appDataSecurity(String appCode) {
return CACHE_APP_CONFIG.get(appCode) != null && CACHE_APP_CONFIG.get(appCode).getDataSecurity() != null ? true
: false;
}
public static Strategy getAppCurrentLimitingConfig(String appCode) {
return CACHE_APP_CURRENT_LIMITING_CONFIG.get(appCode);
}
public static Strategy getApiCurrentLimitingConfig(String appCode) {
return CACHE_API_CURRENT_LIMITING_CONFIG.get(appCode);
}
public static AppConfig getAppConfig(String appCode) {
return CACHE_APP_CONFIG.get(appCode);
}
public static VertxConfig getVertxConfig() {
return VERTX_CONFIG;
}
public static String getSacAppHeaderKey() {
return VERTX_CONFIG.getAppHeaderKey() != null ? VERTX_CONFIG.getAppHeaderKey() : "sacAppCode";
}
public static String getAppHeaderServiceName() {
return VERTX_CONFIG.getAppHeaderServiceName() != null ? VERTX_CONFIG.getAppHeaderServiceName()
: "sacAppServiceName";
}
/***
* 加载vertx配置
*/
public void initVertxConfig() {
String vertxConfigKey = RedisKeyConfig.VERTX_CONFIG_STRING_KEY;
String vertxConfigValue = redisTemplate.opsForValue().get(vertxConfigKey);
if (StringUtils.isNotBlank(vertxConfigValue)) {
VERTX_CONFIG = JSONObject.parseObject(vertxConfigValue, VertxConfig.class);
}
}
private void initAppConfig(String appCode) {
String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appCode;
String appCodeValue = redisTemplate.opsForValue().get(appCodeKey);
if (StringUtils.isNotBlank(appCodeValue)) {
AppConfig appConfig = JSON.parseObject(appCodeValue, new TypeReference<AppConfig>() {
});
CACHE_APP_CONFIG.put(appCode, appConfig);
// appapi默认限流
if(appConfig.getApiCurrentLimitingConfig() != null) {
CACHE_API_CURRENT_LIMITING_CONFIG.put(appCode, appConfig.getApiCurrentLimitingConfig());
}
if(appConfig.getAppCurrentLimitingConfig() != null) {
CACHE_APP_CURRENT_LIMITING_CONFIG.put(appCode, appConfig.getAppCurrentLimitingConfig());
}
// app router负载均衡
for (SacService sacService : appConfig.getService()) {
CACHE_APP_SERVICE.put(appCode + ";" + sacService.getServiceName(), sacService);
List<Node> nodeList = new ArrayList<>();
// 获取service模式
if (StringUtils.equals(sacService.getServiceModel(), "NORMAL")) {
Node node = new Node();
node.setIp(sacService.getServerAddress().getHost());
node.setPort(sacService.getServerAddress().getPort());
node.setWeight(0);
node.setProtocol(sacService.getServerAddress().getProtocol());
nodeList.add(node);
} else if (StringUtils.equals(sacService.getServiceModel(), "ROUTE")) {
if (sacService.getRouteConfig() != null
&& StringUtils.equals(sacService.getRouteConfig().getRouteType(), "WEIGHT_ROUTE")) {
for (RouteContent routeContent : sacService.getRouteConfig().getRouteContent()) {
Node node = new Node();
node.setIp(routeContent.getServerAddress().getHost());
node.setPort(routeContent.getServerAddress().getPort());
node.setWeight(routeContent.getWeight() != null && routeContent.getWeight() > 0
? routeContent.getWeight()
: 0);
node.setProtocol(routeContent.getServerAddress().getProtocol());
nodeList.add(node);
}
}
}
if (nodeList.size() > 0) {
// 初始化负载均衡算法
String key = appCode + ";" + sacService.getServiceName();
SacLoadBalancing sacLoadBalancing = ProxyTool.roundRobin(nodeList);
SAC_LOADBALANCING_MAP.put(key, sacLoadBalancing);
}
}
}
}
/***
* 从redis加载数据
*
* @throws Exception
*/
public void initAllAppConfig() throws Exception {
Set<String> set = redisTemplate.opsForZSet().range(RedisKeyConfig.APP_CONFIG_SET_KEY, 0, -1);
for (String appCode : set) {
initAppConfig(appCode);
}
}
public static SacLoadBalancing getSacLoadBalancing(String appCode, String serviceName) {
return SAC_LOADBALANCING_MAP.get(appCode + ";" + serviceName);
}
public static SacService getSacService(String appCode, String serviceName) {
return CACHE_APP_SERVICE.get(appCode + ";" + serviceName);
}
/***
* 新增修改
*
* @param appConfig
*/
public void saveAppConfig(String appConfigStr) {
AppConfig appConfig = JSON.parseObject(appConfigStr, AppConfig.class);
public void saveAppConfig(AppConfig appConfig) {
redisTemplate.opsForZSet().add(RedisKeyConfig.APP_CONFIG_SET_KEY, appConfig.getAppCode(), 0);
String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appConfig.getAppCode();
redisTemplate.opsForValue().set(appCodeKey, appConfigStr);
redisTemplate.opsForValue().set(appCodeKey, JSONObject.toJSONString(appConfig));
// 初始化AppConfig本地缓存
initAppConfig(appConfig.getAppCode());
// 删除禁用app列表
CACHE_DISABLED_APP.remove(appConfig.getAppCode());
AppConfigHandle.initAppConfig(redisTemplate, appConfig.getAppCode());
}
/***
@ -215,8 +47,8 @@ public class AppConfigServiceImpl implements AppConfigService {
String appCodeKey = RedisKeyConfig.APP_CONFIG_PREFIX_KEY + ":" + appConfig.getAppCode();
redisTemplate.delete(appCodeKey);
// 不动本地缓存, 入口控制app被禁用, 项目启动会加载最新配置
CACHE_DISABLED_APP.add(appConfig.getAppCode());
// 禁用本地缓存
AppConfigHandle.addDisabledAppcode(appConfig.getAppCode());
}
/***
@ -227,6 +59,7 @@ public class AppConfigServiceImpl implements AppConfigService {
public void saveVertxConfig(VertxConfig vertxConfig) {
String vertxConfigKey = RedisKeyConfig.VERTX_CONFIG_STRING_KEY;
redisTemplate.opsForValue().set(vertxConfigKey, JSONObject.toJSONString(vertxConfig));
initVertxConfig();
AppConfigHandle.initVertxConfig(redisTemplate);
}
}

View File

@ -1,24 +1,15 @@
package com.sf.vertx.utils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import com.sf.vertx.api.pojo.AppConfig;
import com.sf.vertx.api.pojo.Node;
import com.sf.vertx.api.pojo.RouteContent;
import com.sf.vertx.api.pojo.SacService;
import com.sf.vertx.arithmetic.roundRobin.SacLoadBalancing;
import com.sf.vertx.arithmetic.roundRobin.WeightedRoundRobin;
import com.sf.vertx.service.impl.AppConfigServiceImpl;
import com.sf.vertx.handle.AppConfigHandle;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.handler.HttpException;
import lombok.extern.slf4j.Slf4j;
/***
@ -29,38 +20,12 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
public class ProxyTool {
public final static Map<Integer, String> _ERROR = new HashMap<>();
public static final String DEFAULT_ERROR_MSG = "{\n" + " \"msg\": \"服务请求失败,请稍后再试\",\n" + " \"code\": 501,\n" + " \"data\": \"服务请求失败,请稍后再试\"\n" + "}";
static {
_ERROR.put(400, "Bad Request");
_ERROR.put(401, "Unauthorized");
_ERROR.put(403, "Forbidden");
_ERROR.put(404, "Not Found");
_ERROR.put(413, "Request Entity Too Large");
_ERROR.put(415, "Unsupported Media Type");
_ERROR.put(500, "Internal Server Error");
_ERROR.put(502, "Bad Gateway");
_ERROR.put(503, "Service Unavailable");
_ERROR.put(504, "Gateway Timeout");
_ERROR.put(504, "Gateway Timeout");
_ERROR.put(10000, "无法找到路由地址");
_ERROR.put(10001, "加解密算法传递错误");
_ERROR.put(10003, "参数传递错误");
_ERROR.put(10101, "接口限流错误");
_ERROR.put(10102, "应用限流错误");
_ERROR.put(10103, "{\n" + " \"msg\": \"接口连接失败\",\n" + " \"code\": 10103,\n" + " \"data\": \"接口连接失败\"\n" + "}"); // 接口连接失败
_ERROR.put(10104, "{\n" + " \"msg\": \"应用连接失败\",\n" + " \"code\": 10103,\n" + " \"data\": \"应用连接失败\"\n" + "}"); // 应用连接失败
};
public static SocketAddress resolveOriginAddress(HttpServerRequest request) {
String appCode = request.getHeader(AppConfigServiceImpl.getSacAppHeaderKey());
String appHeaderServiceName = request.getHeader(AppConfigServiceImpl.getAppHeaderServiceName());
log.info("uri:{}, header appCode:{},appHeaderServiceName:{}", request.uri(), appCode, appHeaderServiceName);
SacLoadBalancing sacLoadBalancing = AppConfigServiceImpl.getSacLoadBalancing(appCode, appHeaderServiceName);
String appCode = request.getHeader(AppConfigHandle.getAppCodeHeaderKey());
String apiCode = request.getHeader(AppConfigHandle.getApiCodeHeaderKey());
log.info("uri:{}, header appCode:{},apiCode:{}", request.uri(), appCode, apiCode);
SacLoadBalancing sacLoadBalancing = AppConfigHandle.getLoadBalancing(appCode + ":" + apiCode);
// TODO 区分httpshttp
Node node = sacLoadBalancing.selectNode();
SocketAddress socketAddress = SocketAddress.inetSocketAddress(node.getPort(), node.getIp());

View File

@ -0,0 +1,257 @@
/*
* Copyright (c) 2011-2016 The original author or authors
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
package examples;
import io.vertx.circuitbreaker.CircuitBreaker;
import io.vertx.circuitbreaker.CircuitBreakerOptions;
import io.vertx.circuitbreaker.HystrixMetricHandler;
import io.vertx.circuitbreaker.RetryPolicy;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.web.Router;
/**
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
public class CircuitBreakerExamples {
public void example1(Vertx vertx) {
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions()
.setMaxFailures(5) // number of failure before opening the circuit
.setTimeout(2000) // consider a failure if the operation does not succeed in time
.setFallbackOnFailure(true) // do we call the fallback on failure
.setResetTimeout(10000) // time spent in open state before attempting to re-try
);
// ---
// Store the circuit breaker in a field and access it as follows
// ---
breaker.execute(promise -> {
// some code executing with the breaker
// the code reports failures or success on the given promise.
// if this promise is marked as failed, the breaker increased the
// number of failures
}).onComplete(ar -> {
// Get the operation result.
});
}
public void example2(Vertx vertx) {
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
);
// ---
// Store the circuit breaker in a field and access it as follows
// ---
breaker.<String>execute(promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
}).onComplete(ar -> {
// Do something with the result
});
}
public void example3(Vertx vertx) {
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
);
// ---
// Store the circuit breaker in a field and access it as follows
// ---
breaker.executeWithFallback(
promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
}, v -> {
// Executed when the circuit is opened
return "Hello";
})
.onComplete(ar -> {
// Do something with the result
});
}
public void example4(Vertx vertx) {
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
).fallback(v -> {
// Executed when the circuit is opened.
return "hello";
});
breaker.<String>execute(
promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
});
}
public void example5(Vertx vertx) {
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
).openHandler(v -> {
System.out.println("Circuit opened");
}).closeHandler(v -> {
System.out.println("Circuit closed");
});
breaker.<String>execute(
promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
});
}
public void example6(Vertx vertx) {
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
);
Promise<String> userPromise = Promise.promise();
userPromise.future().onComplete(ar -> {
// Do something with the result
});
breaker.executeAndReportWithFallback(
userPromise,
promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
}, v -> {
// Executed when the circuit is opened
return "Hello";
});
}
public void example7(Vertx vertx) {
// Enable notifications
CircuitBreakerOptions options = new CircuitBreakerOptions()
.setNotificationAddress(CircuitBreakerOptions.DEFAULT_NOTIFICATION_ADDRESS);
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx, new CircuitBreakerOptions(options));
CircuitBreaker breaker2 = CircuitBreaker.create("my-second-circuit-breaker", vertx, new CircuitBreakerOptions(options));
// Create a Vert.x Web router
Router router = Router.router(vertx);
// Register the metric handler
router.get("/hystrix-metrics").handler(HystrixMetricHandler.create(vertx));
// Create the HTTP server using the router to dispatch the requests
vertx.createHttpServer()
.requestHandler(router)
.listen(8080);
}
public void example8(Vertx vertx) {
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setMaxRetries(5).setTimeout(2000)
).openHandler(v -> {
System.out.println("Circuit opened");
}).closeHandler(v -> {
System.out.println("Circuit closed");
}).retryPolicy(RetryPolicy.exponentialDelayWithJitter(50, 500));
breaker.<String>execute(
promise -> {
vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/")
.compose(req -> req
.send()
.compose(resp -> {
if (resp.statusCode() != 200) {
return Future.failedFuture("HTTP error");
} else {
return resp.body().map(Buffer::toString);
}
})).onComplete(promise);
});
}
public void example9(Vertx vertx) {
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx);
breaker.<HttpClientResponse>failurePolicy(ar -> {
// A failure will be either a failed operation or a response with a status code other than 200
if (ar.failed()) {
return true;
}
HttpClientResponse resp = ar.result();
return resp.statusCode() != 200;
});
Future<HttpClientResponse> future = breaker.execute(promise -> {
vertx.createHttpClient()
.request(HttpMethod.GET, 8080, "localhost", "/")
.compose(request -> request.send()
// Complete when the body is fully received
.compose(response -> response.body().map(response)))
.onComplete(promise);
});
}
public void enableNotifications(CircuitBreakerOptions options) {
options.setNotificationAddress(CircuitBreakerOptions.DEFAULT_NOTIFICATION_ADDRESS);
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright (c) 2011-2016 The original author or authors
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
package examples.hystrix;
import com.netflix.hystrix.HystrixCommand;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
/**
* Examples for Hystrix
*
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
public class HystrixExamples {
public void exampleHystrix1() {
HystrixCommand<String> someCommand = getSomeCommandInstance();
String result = someCommand.execute();
}
public void exampleHystrix2(Vertx vertx) {
HystrixCommand<String> someCommand = getSomeCommandInstance();
vertx.<String>executeBlocking(
future -> future.complete(someCommand.execute())).onComplete(ar -> {
// back on the event loop
String result = ar.result();
}
);
}
public void exampleHystrix3(Vertx vertx) {
vertx.runOnContext(v -> {
Context context = vertx.getOrCreateContext();
HystrixCommand<String> command = getSomeCommandInstance();
command.observe().subscribe(result -> {
context.runOnContext(v2 -> {
// Back on context (event loop or worker)
String r = result;
});
});
});
}
private HystrixCommand<String> getSomeCommandInstance() {
return null;
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright (c) 2011-2016 The original author or authors
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
@Source(translate = false)
package examples.hystrix;
import io.vertx.docgen.Source;

View File

@ -0,0 +1,257 @@
/*
* Copyright (c) 2011-2016 The original author or authors
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
package io.vertx.circuitbreaker;
import io.vertx.circuitbreaker.impl.CircuitBreakerImpl;
import io.vertx.codegen.annotations.CacheReturn;
import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.*;
import java.util.function.Function;
/**
* An implementation of the circuit breaker pattern for Vert.x
*
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
@VertxGen
public interface CircuitBreaker {
/**
* Creates a new instance of {@link CircuitBreaker}.
*
* @param name the name
* @param vertx the Vert.x instance
* @param options the configuration option
* @return the created instance
*/
static CircuitBreaker create(String name, Vertx vertx, CircuitBreakerOptions options) {
return new CircuitBreakerImpl(name, vertx, options == null ? new CircuitBreakerOptions() : options);
}
/**
* Creates a new instance of {@link CircuitBreaker}, with default options.
*
* @param name the name
* @param vertx the Vert.x instance
* @return the created instance
*/
static CircuitBreaker create(String name, Vertx vertx) {
return new CircuitBreakerImpl(name, vertx, new CircuitBreakerOptions());
}
/**
* Closes the circuit breaker. It stops sending events on its state on the event bus.
* This method is not related to the {@code close} state of the circuit breaker. To set the circuit breaker in the
* {@code close} state, use {@link #reset()}.
*/
@Fluent
CircuitBreaker close();
/**
* Sets a {@link Handler} invoked when the circuit breaker state switches to open.
*
* @param handler the handler, must not be {@code null}
* @return the current {@link CircuitBreaker}
*/
@Fluent
CircuitBreaker openHandler(Handler<Void> handler);
/**
* Sets a {@link Handler} invoked when the circuit breaker state switches to half-open.
*
* @param handler the handler, must not be {@code null}
* @return the current {@link CircuitBreaker}
*/
@Fluent
CircuitBreaker halfOpenHandler(Handler<Void> handler);
/**
* Sets a {@link Handler} invoked when the circuit breaker state switches to close.
*
* @param handler the handler, must not be {@code null}
* @return the current {@link CircuitBreaker}
*/
@Fluent
CircuitBreaker closeHandler(Handler<Void> handler);
/**
* Executes the given operation with the circuit breaker control. The operation is generally calling an
* <em>external</em> system. The operation receives a {@link Promise} object as parameter and <strong>must</strong>
* call {@link Promise#complete(Object)} when the operation has terminated successfully. The operation must also
* call {@link Promise#fail(Throwable)} in case of failure.
* <p>
* The operation is not invoked if the circuit breaker is open, and the given fallback is called immediately. The
* circuit breaker also monitor the completion of the operation before a configure timeout. The operation is
* considered as failed if it does not terminate in time.
* <p>
* This method returns a {@link Future} object to retrieve the status and result of the operation, with the status
* being a success or a failure. If the fallback is called, the returned future is successfully completed with the
* value returned from the fallback. If the fallback throws an exception, the returned future is marked as failed.
*
* @param command the operation
* @param fallback the fallback function. It gets an exception as parameter and returns the <em>fallback</em> result
* @param <T> the type of result
* @return a future object completed when the operation or its fallback completes
*/
<T> Future<T> executeWithFallback(Handler<Promise<T>> command, Function<Throwable, T> fallback);
/**
* Same as {@link #executeWithFallback(Handler, Function)} but using a callback.
*
* @param command the operation
* @param fallback the fallback
* @param handler the completion handler receiving either the operation result or the fallback result. The
* parameter is an {@link AsyncResult} because if the fallback is not called, the error is passed
* to the handler.
* @param <T> the type of result
*/
default <T> void executeWithFallback(Handler<Promise<T>> command, Function<Throwable, T> fallback,
Handler<AsyncResult<T>> handler) {
Future<T> fut = executeWithFallback(command, fallback);
fut.onComplete(handler);
}
/**
* Same as {@link #executeWithFallback(Handler, Function)} but using the circuit breaker default fallback.
*
* @param command the operation
* @param <T> the type of result
* @return a future object completed when the operation or its fallback completes
*/
<T> Future<T> execute(Handler<Promise<T>> command);
/**
* Same as {@link #executeWithFallback(Handler, Function)} but using the circuit breaker default fallback.
*
* @param command the operation
* @param handler the completion handler receiving either the operation result or the fallback result. The
* parameter is an {@link AsyncResult} because if the fallback is not called, the error is passed
* to the handler.
* @param <T> the type of result
*/
default <T> void execute(Handler<Promise<T>> command, Handler<AsyncResult<T>> handler) {
Future<T> fut = execute(command);
fut.onComplete(handler);
}
/**
* Same as {@link #executeAndReportWithFallback(Promise, Handler, Function)} but using the circuit breaker default
* fallback.
*
* @param resultPromise the promise on which the operation result is reported
* @param command the operation
* @param <T> the type of result
* @return the current {@link CircuitBreaker}
*/
@Fluent
<T> CircuitBreaker executeAndReport(Promise<T> resultPromise, Handler<Promise<T>> command);
/**
* Executes the given operation with the circuit breaker control. The operation is generally calling an
* <em>external</em> system. The operation receives a {@link Promise} object as parameter and <strong>must</strong>
* call {@link Promise#complete(Object)} when the operation has terminated successfully. The operation must also
* call {@link Promise#fail(Throwable)} in case of failure.
* <p>
* The operation is not invoked if the circuit breaker is open, and the given fallback is called immediately. The
* circuit breaker also monitor the completion of the operation before a configure timeout. The operation is
* considered as failed if it does not terminate in time.
* <p>
* Unlike {@link #executeWithFallback(Handler, Function)}, this method does return a {@link Future} object, but
* let the caller pass a {@link Future} object on which the result is reported. If the fallback is called, the future
* is successfully completed with the value returned by the fallback function. If the fallback throws an exception,
* the future is marked as failed.
*
* @param resultPromise the promise on which the operation result is reported
* @param command the operation
* @param fallback the fallback function. It gets an exception as parameter and returns the <em>fallback</em> result
* @param <T> the type of result
* @return the current {@link CircuitBreaker}
*/
@Fluent
<T> CircuitBreaker executeAndReportWithFallback(Promise<T> resultPromise, Handler<Promise<T>> command,
Function<Throwable, T> fallback);
/**
* Sets a <em>default</em> {@link Function} invoked when the bridge is open to handle the "request", or on failure
* if {@link CircuitBreakerOptions#isFallbackOnFailure()} is enabled.
* <p>
* The function gets the exception as parameter and returns the <em>fallback</em> result.
*
* @param handler the handler
* @return the current {@link CircuitBreaker}
*/
@Fluent
<T> CircuitBreaker fallback(Function<Throwable, T> handler);
/**
* Configures the failure policy for this circuit-breaker.
*
* @return the current {@link CircuitBreaker}
* @see FailurePolicy
*/
@Fluent
default <T> CircuitBreaker failurePolicy(FailurePolicy<T> failurePolicy) {
return this;
}
/**
* Resets the circuit breaker state (number of failure set to 0 and state set to closed).
*
* @return the current {@link CircuitBreaker}
*/
@Fluent
CircuitBreaker reset();
/**
* Explicitly opens the circuit.
*
* @return the current {@link CircuitBreaker}
*/
@Fluent
CircuitBreaker open();
/**
* @return the current state.
*/
CircuitBreakerState state();
/**
* @return the current number of failures.
*/
long failureCount();
/**
* @return the name of the circuit breaker.
*/
@CacheReturn
String name();
/**
* @deprecated use {@link #retryPolicy(RetryPolicy)} instead
*/
@Fluent
@Deprecated
CircuitBreaker retryPolicy(Function<Integer, Long> retryPolicy);
/**
* Set a {@link RetryPolicy} which computes a delay before retry execution.
*/
@Fluent
CircuitBreaker retryPolicy(RetryPolicy retryPolicy);
}

View File

@ -0,0 +1,392 @@
/*
* Copyright (c) 2011-2016 The original author or authors
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
package io.vertx.circuitbreaker;
import io.vertx.codegen.annotations.DataObject;
import io.vertx.codegen.json.annotations.JsonGen;
import io.vertx.core.json.JsonObject;
/**
* Circuit breaker configuration options. All time are given in milliseconds.
*
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
@DataObject
@JsonGen(publicConverter = false)
public class CircuitBreakerOptions {
/**
* Default timeout in milliseconds.
*/
public static final long DEFAULT_TIMEOUT = 10000L;
/**
* Default number of failures.
*/
public static final int DEFAULT_MAX_FAILURES = 5;
/**
* Default value of the fallback on failure property.
*/
public static final boolean DEFAULT_FALLBACK_ON_FAILURE = false;
/**
* Default time before it attempts to re-close the circuit (half-open state) in milliseconds.
*/
public static final long DEFAULT_RESET_TIMEOUT = 30000;
/**
* Whether circuit breaker state should be delivered only to local consumers by default = {@code true}.
*/
public static final boolean DEFAULT_NOTIFICATION_LOCAL_ONLY = true;
/**
* A default address on which the circuit breakers can send their updates.
*/
public static final String DEFAULT_NOTIFICATION_ADDRESS = "vertx.circuit-breaker";
/**
* Default notification period in milliseconds.
*/
public static final long DEFAULT_NOTIFICATION_PERIOD = 2000;
/**
* Default rolling window for metrics in milliseconds.
*/
public static final long DEFAULT_METRICS_ROLLING_WINDOW = 10000;
/**
* Default number of buckets used for the rolling window.
*/
public static final int DEFAULT_METRICS_ROLLING_BUCKETS = 10;
/**
* Default number of retries.
*/
private static final int DEFAULT_MAX_RETRIES = 0;
/**
* The default rolling window span in milliseconds.
*/
private static final int DEFAULT_FAILURES_ROLLING_WINDOW = 10000;
/**
* The operation timeout.
*/
private long timeout = DEFAULT_TIMEOUT;
/**
* The max failures.
*/
private int maxFailures = DEFAULT_MAX_FAILURES;
/**
* Whether or not the fallback should be called upon failures.
*/
private boolean fallbackOnFailure = DEFAULT_FALLBACK_ON_FAILURE;
/**
* The reset timeout.
*/
private long resetTimeout = DEFAULT_RESET_TIMEOUT;
/**
* Whether circuit breaker state should be delivered only to local consumers.
*/
private boolean notificationLocalOnly = DEFAULT_NOTIFICATION_LOCAL_ONLY;
/**
* The event bus address on which the circuit breaker state is published.
*/
private String notificationAddress = null;
/**
* The state publication period in ms.
*/
private long notificationPeriod = DEFAULT_NOTIFICATION_PERIOD;
/**
* The number of retries
*/
private int maxRetries = DEFAULT_MAX_RETRIES;
/**
* The metric rolling window in ms.
*/
private long metricsRollingWindow = DEFAULT_METRICS_ROLLING_WINDOW;
/**
* The number of buckets used for the metric rolling window.
*/
private int metricsRollingBuckets = DEFAULT_METRICS_ROLLING_BUCKETS;
/**
* The failure rolling window in ms.
*/
private long failuresRollingWindow = DEFAULT_FAILURES_ROLLING_WINDOW;
/**
* Creates a new instance of {@link CircuitBreakerOptions} using the default values.
*/
public CircuitBreakerOptions() {
// Empty constructor
}
/**
* Creates a new instance of {@link CircuitBreakerOptions} by copying the other instance.
*
* @param other the instance fo copy
*/
public CircuitBreakerOptions(CircuitBreakerOptions other) {
this.timeout = other.timeout;
this.maxFailures = other.maxFailures;
this.fallbackOnFailure = other.fallbackOnFailure;
this.notificationLocalOnly = other.notificationLocalOnly;
this.notificationAddress = other.notificationAddress;
this.notificationPeriod = other.notificationPeriod;
this.resetTimeout = other.resetTimeout;
this.maxRetries = other.maxRetries;
this.metricsRollingBuckets = other.metricsRollingBuckets;
this.metricsRollingWindow = other.metricsRollingWindow;
this.failuresRollingWindow = other.failuresRollingWindow;
}
/**
* Creates a new instance of {@link CircuitBreakerOptions} from the given json object.
*
* @param json the json object
*/
public CircuitBreakerOptions(JsonObject json) {
this();
CircuitBreakerOptionsConverter.fromJson(json, this);
}
/**
* @return a json object representing the current configuration.
*/
public JsonObject toJson() {
JsonObject json = new JsonObject();
CircuitBreakerOptionsConverter.toJson(this, json);
return json;
}
/**
* @return the maximum number of failures before opening the circuit.
*/
public int getMaxFailures() {
return maxFailures;
}
/**
* Sets the maximum number of failures before opening the circuit.
*
* @param maxFailures the number of failures.
* @return the current {@link CircuitBreakerOptions} instance
*/
public CircuitBreakerOptions setMaxFailures(int maxFailures) {
this.maxFailures = maxFailures;
return this;
}
/**
* @return the configured timeout in milliseconds.
*/
public long getTimeout() {
return timeout;
}
/**
* Sets the timeout in milliseconds. If an action is not completed before this timeout, the action is considered as
* a failure.
*
* @param timeoutInMs the timeout, -1 to disable the timeout
* @return the current {@link CircuitBreakerOptions} instance
*/
public CircuitBreakerOptions setTimeout(long timeoutInMs) {
this.timeout = timeoutInMs;
return this;
}
/**
* @return whether or not the fallback is executed on failures, even when the circuit is closed.
*/
public boolean isFallbackOnFailure() {
return fallbackOnFailure;
}
/**
* Sets whether or not the fallback is executed on failure, even when the circuit is closed.
*
* @param fallbackOnFailure {@code true} to enable it.
* @return the current {@link CircuitBreakerOptions} instance
*/
public CircuitBreakerOptions setFallbackOnFailure(boolean fallbackOnFailure) {
this.fallbackOnFailure = fallbackOnFailure;
return this;
}
/**
* @return the time in milliseconds before it attempts to re-close the circuit (by going to the half-open state).
*/
public long getResetTimeout() {
return resetTimeout;
}
/**
* Sets the time in ms before it attempts to re-close the circuit (by going to the half-open state). If the circuit
* is closed when the timeout is reached, nothing happens. {@code -1} disables this feature.
*
* @param resetTimeout the time in ms
* @return the current {@link CircuitBreakerOptions} instance
*/
public CircuitBreakerOptions setResetTimeout(long resetTimeout) {
this.resetTimeout = resetTimeout;
return this;
}
/**
* @return {@code true} if circuit breaker state should be delivered only to local consumers, otherwise {@code false}
*/
public boolean isNotificationLocalOnly() {
return notificationLocalOnly;
}
/**
* Whether circuit breaker state should be delivered only to local consumers.
*
* @param notificationLocalOnly {@code true} if circuit breaker state should be delivered only to local consumers, otherwise {@code false}
* @return the current {@link CircuitBreakerOptions} instance
*/
public CircuitBreakerOptions setNotificationLocalOnly(boolean notificationLocalOnly) {
this.notificationLocalOnly = notificationLocalOnly;
return this;
}
/**
* @return the eventbus address on which the circuit breaker events are published. {@code null} if this feature has
* been disabled.
*/
public String getNotificationAddress() {
return notificationAddress;
}
/**
* Sets the event bus address on which the circuit breaker publish its state change.
*
* @param notificationAddress the address, {@code null} to disable this feature.
* @return the current {@link CircuitBreakerOptions} instance
*/
public CircuitBreakerOptions setNotificationAddress(String notificationAddress) {
this.notificationAddress = notificationAddress;
return this;
}
/**
* @return the the period in milliseconds where the circuit breaker send a notification about its state.
*/
public long getNotificationPeriod() {
return notificationPeriod;
}
/**
* Configures the period in milliseconds where the circuit breaker send a notification on the event bus with its
* current state.
*
* @param notificationPeriod the period, 0 to disable this feature.
* @return the current {@link CircuitBreakerOptions} instance
*/
public CircuitBreakerOptions setNotificationPeriod(long notificationPeriod) {
this.notificationPeriod = notificationPeriod;
return this;
}
/**
* @return the configured rolling window for metrics.
*/
public long getMetricsRollingWindow() {
return metricsRollingWindow;
}
/**
* Sets the rolling window used for metrics.
*
* @param metricsRollingWindow the period in milliseconds.
* @return the current {@link CircuitBreakerOptions} instance
*/
public CircuitBreakerOptions setMetricsRollingWindow(long metricsRollingWindow) {
this.metricsRollingWindow = metricsRollingWindow;
return this;
}
/**
* @return the configured rolling window for failures.
*/
public long getFailuresRollingWindow() {
return failuresRollingWindow;
}
/**
* Sets the rolling window used for metrics.
*
* @param metricsRollingWindow the period in milliseconds.
* @return the current {@link CircuitBreakerOptions} instance
*/
public CircuitBreakerOptions setFailuresRollingWindow(long failureRollingWindow) {
this.failuresRollingWindow = failureRollingWindow;
return this;
}
/**
* @return the configured number of buckets the rolling window is divided into.
*/
public int getMetricsRollingBuckets() {
return metricsRollingBuckets;
}
/**
* Sets the configured number of buckets the rolling window is divided into.
*
* The following must be true - metrics.rollingStats.timeInMilliseconds % metrics.rollingStats.numBuckets == 0 - otherwise it will throw an exception.
*
* In other words, 10000/10 is okay, so is 10000/20 but 10000/7 is not.
*
* @param metricsRollingBuckets the number of rolling buckets.
* @return the current {@link CircuitBreakerOptions} instance
*/
public CircuitBreakerOptions setMetricsRollingBuckets(int metricsRollingBuckets) {
this.metricsRollingBuckets = metricsRollingBuckets;
return this;
}
/**
* @return the number of times the circuit breaker tries to redo the operation before failing
*/
public int getMaxRetries() {
return maxRetries;
}
/**
* Configures the number of times the circuit breaker tries to redo the operation before failing.
*
* @param maxRetries the number of retries, 0 to disable this feature.
* @return the current {@link CircuitBreakerOptions} instance
*/
public CircuitBreakerOptions setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
return this;
}
}

View File

@ -0,0 +1,101 @@
package io.vertx.circuitbreaker;
import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.impl.JsonUtil;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
/**
* Converter and mapper for {@link io.vertx.circuitbreaker.CircuitBreakerOptions}.
* NOTE: This class has been automatically generated from the {@link io.vertx.circuitbreaker.CircuitBreakerOptions} original class using Vert.x codegen.
*/
public class CircuitBreakerOptionsConverter {
private static final Base64.Decoder BASE64_DECODER = JsonUtil.BASE64_DECODER;
private static final Base64.Encoder BASE64_ENCODER = JsonUtil.BASE64_ENCODER;
static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, CircuitBreakerOptions obj) {
for (java.util.Map.Entry<String, Object> member : json) {
switch (member.getKey()) {
case "failuresRollingWindow":
if (member.getValue() instanceof Number) {
obj.setFailuresRollingWindow(((Number)member.getValue()).longValue());
}
break;
case "fallbackOnFailure":
if (member.getValue() instanceof Boolean) {
obj.setFallbackOnFailure((Boolean)member.getValue());
}
break;
case "maxFailures":
if (member.getValue() instanceof Number) {
obj.setMaxFailures(((Number)member.getValue()).intValue());
}
break;
case "maxRetries":
if (member.getValue() instanceof Number) {
obj.setMaxRetries(((Number)member.getValue()).intValue());
}
break;
case "metricsRollingBuckets":
if (member.getValue() instanceof Number) {
obj.setMetricsRollingBuckets(((Number)member.getValue()).intValue());
}
break;
case "metricsRollingWindow":
if (member.getValue() instanceof Number) {
obj.setMetricsRollingWindow(((Number)member.getValue()).longValue());
}
break;
case "notificationAddress":
if (member.getValue() instanceof String) {
obj.setNotificationAddress((String)member.getValue());
}
break;
case "notificationLocalOnly":
if (member.getValue() instanceof Boolean) {
obj.setNotificationLocalOnly((Boolean)member.getValue());
}
break;
case "notificationPeriod":
if (member.getValue() instanceof Number) {
obj.setNotificationPeriod(((Number)member.getValue()).longValue());
}
break;
case "resetTimeout":
if (member.getValue() instanceof Number) {
obj.setResetTimeout(((Number)member.getValue()).longValue());
}
break;
case "timeout":
if (member.getValue() instanceof Number) {
obj.setTimeout(((Number)member.getValue()).longValue());
}
break;
}
}
}
static void toJson(CircuitBreakerOptions obj, JsonObject json) {
toJson(obj, json.getMap());
}
static void toJson(CircuitBreakerOptions obj, java.util.Map<String, Object> json) {
json.put("failuresRollingWindow", obj.getFailuresRollingWindow());
json.put("fallbackOnFailure", obj.isFallbackOnFailure());
json.put("maxFailures", obj.getMaxFailures());
json.put("maxRetries", obj.getMaxRetries());
json.put("metricsRollingBuckets", obj.getMetricsRollingBuckets());
json.put("metricsRollingWindow", obj.getMetricsRollingWindow());
if (obj.getNotificationAddress() != null) {
json.put("notificationAddress", obj.getNotificationAddress());
}
json.put("notificationLocalOnly", obj.isNotificationLocalOnly());
json.put("notificationPeriod", obj.getNotificationPeriod());
json.put("resetTimeout", obj.getResetTimeout());
json.put("timeout", obj.getTimeout());
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright (c) 2011-2016 The original author or authors
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
package io.vertx.circuitbreaker;
import io.vertx.codegen.annotations.VertxGen;
/**
* Circuit breaker states.
*
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
@VertxGen
public enum CircuitBreakerState {
/**
* The {@code OPEN} state. The circuit breaker is executing the fallback, and switches to the {@link #HALF_OPEN}
* state after the specified time.
*/
OPEN,
/**
* The {@code CLOSED} state. The circuit breaker lets invocations pass and collects the failures. IF the number of
* failures reach the specified threshold, the cricuit breaker switches to the {@link #OPEN} state.
*/
CLOSED,
/**
* The {@code HALF_OPEN} state. The circuit breaker has been opened, and is now checking the current situation. It
* lets pass the next invocation and determines from the result (failure or success) if the circuit breaker can
* be switched to the {@link #CLOSED} state again.
*/
HALF_OPEN
}

View File

@ -0,0 +1,35 @@
package io.vertx.circuitbreaker;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import java.util.function.Predicate;
/**
* A failure policy for the {@link CircuitBreaker}.
* <p>
* The default policy is to consider an asynchronous result as a failure if {@link AsyncResult#failed()} returns {@code true}.
* Nevertheless, sometimes this is not good enough. For example, an HTTP Client could return a response, but with an unexpected status code.
* <p>
* In this case, a custom failure policy can be configured with {@link CircuitBreaker#failurePolicy(FailurePolicy)}.
*/
@VertxGen
public interface FailurePolicy<T> extends Predicate<Future<T>> {
/**
* The default policy, which considers an asynchronous result as a failure if {@link AsyncResult#failed()} returns {@code true}.
*/
static <U> FailurePolicy<U> defaultPolicy() {
return AsyncResult::failed;
}
/**
* Invoked by the {@link CircuitBreaker} when an operation completes.
*
* @param future a completed future
* @return {@code true} if the asynchronous result should be considered as a failure, {@code false} otherwise
*/
@Override
boolean test(Future<T> future);
}

View File

@ -0,0 +1,27 @@
package io.vertx.circuitbreaker;
/**
* Exception reported when the circuit breaker is open.
* <p>
* For performance reason, this exception does not carry a stack trace. You are not allowed to set a stack trace or a
* cause to this exception. This <em>immutability</em> allows using a singleton instance.
*
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
public class HalfOpenCircuitException extends RuntimeException {
public HalfOpenCircuitException(String msg) {
super(msg, null, false, false);
}
@Override
public void setStackTrace(StackTraceElement[] stackTrace) {
throw new UnsupportedOperationException();
}
@Override
public synchronized Throwable initCause(Throwable cause) {
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,50 @@
package io.vertx.circuitbreaker;
import io.vertx.circuitbreaker.impl.HystrixMetricEventStream;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.ext.web.RoutingContext;
/**
* A Vert.x web handler to expose the circuit breaker to the Hystrix dasbboard. The handler listens to the circuit
* breaker notifications sent on the event bus.
*
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
@VertxGen
public interface HystrixMetricHandler extends Handler<RoutingContext> {
/**
* Creates the handler, using the default notification address and listening to local messages only.
*
* @param vertx the Vert.x instance
* @return the handler
*/
static HystrixMetricHandler create(Vertx vertx) {
return create(vertx, CircuitBreakerOptions.DEFAULT_NOTIFICATION_ADDRESS);
}
/**
* Creates the handler, listening only to local messages.
*
* @param vertx the Vert.x instance
* @param address the address to listen on the event bus
* @return the handler
*/
static HystrixMetricHandler create(Vertx vertx, String address) {
return create(vertx, address, CircuitBreakerOptions.DEFAULT_NOTIFICATION_LOCAL_ONLY);
}
/**
* Creates the handler.
*
* @param vertx the Vert.x instance
* @param address the address to listen on the event bus
* @param localOnly whether the consumer should only receive messages sent from this Vert.x instance
* @return the handler
*/
static HystrixMetricHandler create(Vertx vertx, String address, boolean localOnly) {
return new HystrixMetricEventStream(vertx, address, localOnly);
}
}

View File

@ -0,0 +1,29 @@
package io.vertx.circuitbreaker;
/**
* Exception reported when the circuit breaker is open.
* <p>
* For performance reason, this exception does not carry a stack trace. You are not allowed to set a stack trace or a
* cause to this exception. This <em>immutability</em> allows using a singleton instance.
*
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
public class OpenCircuitException extends RuntimeException {
public static OpenCircuitException INSTANCE = new OpenCircuitException();
private OpenCircuitException() {
super("open circuit", null, false, false);
}
@Override
public void setStackTrace(StackTraceElement[] stackTrace) {
throw new UnsupportedOperationException();
}
@Override
public synchronized Throwable initCause(Throwable cause) {
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright (c) 2011-2022 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.circuitbreaker;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.impl.Arguments;
import java.util.concurrent.ThreadLocalRandom;
import static java.lang.Math.*;
/**
* A policy for retry execution.
*/
@VertxGen
@FunctionalInterface
public interface RetryPolicy {
/**
* Create a constant delay retry policy.
*
* @param delay the constant delay in milliseconds
*/
static RetryPolicy constantDelay(long delay) {
Arguments.require(delay > 0, "delay must be strictly positive");
return (failure, retryCount) -> delay;
}
/**
* Create a linear delay retry policy.
*
* @param initialDelay the initial delay in milliseconds
* @param maxDelay maximum delay in milliseconds
*/
static RetryPolicy linearDelay(long initialDelay, long maxDelay) {
Arguments.require(initialDelay > 0, "initialDelay must be strictly positive");
Arguments.require(maxDelay >= initialDelay, "maxDelay must be greater than initialDelay");
return (failure, retryCount) -> min(maxDelay, initialDelay * retryCount);
}
/**
* Create an exponential delay with jitter retry policy.
* <p>
* Based on <em>Full Jitter</em> in <a href="https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/">Exponential Backoff And Jitter</a>.
*
* @param initialDelay the initial delay in milliseconds
* @param maxDelay maximum delay in milliseconds
*/
static RetryPolicy exponentialDelayWithJitter(long initialDelay, long maxDelay) {
Arguments.require(initialDelay > 0, "initialDelay must be strictly positive");
Arguments.require(maxDelay >= initialDelay, "maxDelay must be greater than initialDelay");
return (failure, retryCount) -> {
ThreadLocalRandom random = ThreadLocalRandom.current();
long delay = initialDelay * (1L << retryCount);
return random.nextLong(0, delay < 0 ? maxDelay : min(maxDelay, delay));
};
}
/**
* Compute a delay in milliseconds before retry is executed.
*
* @param failure the failure passed to the operation {@link io.vertx.core.Promise}
* @param retryCount the number of times operation has been retried already
* @return a delay in milliseconds before retry is executed
*/
long delay(Throwable failure, int retryCount);
}

View File

@ -0,0 +1,29 @@
package io.vertx.circuitbreaker;
/**
* Exception reported when the monitored operation timed out.
* <p>
* For performance reason, this exception does not carry a stack trace. You are not allowed to set a stack trace or a
* cause to this exception. This <em>immutability</em> allows using a singleton instance.
*
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
public class TimeoutException extends RuntimeException {
public static TimeoutException INSTANCE = new TimeoutException();
private TimeoutException() {
super("operation timeout", null, false, false);
}
@Override
public void setStackTrace(StackTraceElement[] stackTrace) {
throw new UnsupportedOperationException();
}
@Override
public synchronized Throwable initCause(Throwable cause) {
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,563 @@
/*
* Copyright (c) 2011-2016 The original author or authors
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
package io.vertx.circuitbreaker.impl;
import io.vertx.circuitbreaker.*;
import io.vertx.core.*;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.json.JsonObject;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
/**
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
public class CircuitBreakerImpl implements CircuitBreaker {
private static final Handler<Void> NOOP = (v) -> {
// Nothing...
};
private final Vertx vertx;
private final CircuitBreakerOptions options;
private final String name;
private final long periodicUpdateTask;
private Handler<Void> openHandler = NOOP;
private Handler<Void> halfOpenHandler = NOOP;
private Handler<Void> closeHandler = NOOP;
private Function fallback = null;
private FailurePolicy failurePolicy = FailurePolicy.defaultPolicy();
private CircuitBreakerState state = CircuitBreakerState.CLOSED;
private RollingCounter rollingFailures;
private final AtomicInteger passed = new AtomicInteger();
private final CircuitBreakerMetrics metrics;
private RetryPolicy retryPolicy = (failure, retryCount) -> 0L;
public CircuitBreakerImpl(String name, Vertx vertx, CircuitBreakerOptions options) {
Objects.requireNonNull(name);
Objects.requireNonNull(vertx);
this.vertx = vertx;
this.name = name;
if (options == null) {
this.options = new CircuitBreakerOptions();
} else {
this.options = new CircuitBreakerOptions(options);
}
this.rollingFailures = new RollingCounter(this.options.getFailuresRollingWindow() / 1000, TimeUnit.SECONDS);
if (this.options.getNotificationAddress() != null) {
this.metrics = new CircuitBreakerMetrics(vertx, this, this.options);
sendUpdateOnEventBus();
if (this.options.getNotificationPeriod() > 0) {
this.periodicUpdateTask = vertx.setPeriodic(this.options.getNotificationPeriod(), l -> sendUpdateOnEventBus());
} else {
this.periodicUpdateTask = -1;
}
} else {
this.metrics = null;
this.periodicUpdateTask = -1;
}
}
@Override
public CircuitBreaker close() {
if (metrics != null) {
if (periodicUpdateTask != -1) {
vertx.cancelTimer(periodicUpdateTask);
}
metrics.close();
}
return this;
}
@Override
public synchronized CircuitBreaker openHandler(Handler<Void> handler) {
Objects.requireNonNull(handler);
openHandler = handler;
return this;
}
@Override
public synchronized CircuitBreaker halfOpenHandler(Handler<Void> handler) {
Objects.requireNonNull(handler);
halfOpenHandler = handler;
return this;
}
@Override
public synchronized CircuitBreaker closeHandler(Handler<Void> handler) {
Objects.requireNonNull(handler);
closeHandler = handler;
return this;
}
@Override
public <T> CircuitBreaker fallback(Function<Throwable, T> handler) {
Objects.requireNonNull(handler);
fallback = handler;
return this;
}
@Override
public <T> CircuitBreaker failurePolicy(FailurePolicy<T> failurePolicy) {
Objects.requireNonNull(failurePolicy);
this.failurePolicy = failurePolicy;
return this;
}
/**
* A version of reset that can force the the state to `close` even if the circuit breaker is open. This is an
* internal API.
*
* @param force whether or not we force the state and allow an illegal transition
* @return the current circuit breaker.
*/
public synchronized CircuitBreaker reset(boolean force) {
rollingFailures.reset();
if (state == CircuitBreakerState.CLOSED) {
// Do nothing else.
return this;
}
if (!force && state == CircuitBreakerState.OPEN) {
// Resetting the circuit breaker while we are in the open state is an illegal transition
return this;
}
state = CircuitBreakerState.CLOSED;
closeHandler.handle(null);
sendUpdateOnEventBus();
return this;
}
@Override
public synchronized CircuitBreaker reset() {
return reset(false);
}
private synchronized void sendUpdateOnEventBus() {
if (metrics != null) {
DeliveryOptions deliveryOptions = new DeliveryOptions()
.setLocalOnly(options.isNotificationLocalOnly());
vertx.eventBus().publish(options.getNotificationAddress(), metrics.toJson(), deliveryOptions);
}
}
@Override
public synchronized CircuitBreaker open() {
state = CircuitBreakerState.OPEN;
openHandler.handle(null);
sendUpdateOnEventBus();
// Set up the attempt reset timer
long period = options.getResetTimeout();
if (period != -1) {
vertx.setTimer(period, l -> attemptReset());
}
return this;
}
@Override
public synchronized long failureCount() {
return rollingFailures.count();
}
@Override
public synchronized CircuitBreakerState state() {
return state;
}
private synchronized CircuitBreaker attemptReset() {
if (state == CircuitBreakerState.OPEN) {
passed.set(0);
state = CircuitBreakerState.HALF_OPEN;
halfOpenHandler.handle(null);
sendUpdateOnEventBus();
}
return this;
}
@Override
public <T> CircuitBreaker executeAndReportWithFallback(
Promise<T> userFuture,
Handler<Promise<T>> command,
Function<Throwable, T> fallback) {
Context context = vertx.getOrCreateContext();
CircuitBreakerState currentState;
synchronized (this) {
currentState = state;
}
CircuitBreakerMetrics.Operation call = metrics != null ? metrics.enqueue() : null;
// this future object tracks the completion of the operation
// This future is marked as failed on operation failures and timeout.
Promise<T> operationResult = Promise.promise();
if (currentState == CircuitBreakerState.CLOSED) {
Future<T> opFuture = operationResult.future();
opFuture.onComplete(new ClosedCircuitCompletion<>(context, userFuture, fallback, call));
if (options.getMaxRetries() > 0) {
executeOperation(context, command, retryFuture(context, 0, command, operationResult, call), call);
} else {
executeOperation(context, command, operationResult, call);
}
} else if (currentState == CircuitBreakerState.OPEN) {
// Fallback immediately
if (call != null) {
call.shortCircuited();
}
invokeFallback(OpenCircuitException.INSTANCE, userFuture, fallback, call);
} else if (currentState == CircuitBreakerState.HALF_OPEN) {
if (passed.incrementAndGet() == 1) {
Future<T> opFuture = operationResult.future();
opFuture.onComplete(new HalfOpenedCircuitCompletion<>(context, userFuture, fallback, call));
// Execute the operation
executeOperation(context, command, operationResult, call);
} else {
// Not selected, fallback.
if (call != null) {
call.shortCircuited();
}
invokeFallback(OpenCircuitException.INSTANCE, userFuture, fallback, call);
}
}
return this;
}
private <T> Promise<T> retryFuture(Context context, int retryCount, Handler<Promise<T>> command, Promise<T>
operationResult, CircuitBreakerMetrics.Operation call) {
Promise<T> retry = Promise.promise();
retry.future().onComplete(event -> {
if (event.succeeded()) {
reset();
context.runOnContext(v -> {
operationResult.complete(event.result());
});
return;
}
CircuitBreakerState currentState;
synchronized (this) {
currentState = state;
}
if (currentState == CircuitBreakerState.CLOSED) {
if (retryCount < options.getMaxRetries() - 1) {
executeRetryWithTimeout(event.cause(), retryCount, l -> {
context.runOnContext(v -> {
// Don't report timeout or error in the retry attempt, only the last one.
executeOperation(context, command, retryFuture(context, retryCount + 1, command, operationResult, null),
call);
});
});
} else {
executeRetryWithTimeout(event.cause(), retryCount, (l) -> {
context.runOnContext(v -> {
executeOperation(context, command, operationResult, call);
});
});
}
} else {
context.runOnContext(v -> operationResult.fail(OpenCircuitException.INSTANCE));
}
});
return retry;
}
private void executeRetryWithTimeout(Throwable failure, int retryCount, Handler<Void> action) {
long retryTimeout = retryPolicy.delay(failure, retryCount + 1);
if (retryTimeout > 0) {
vertx.setTimer(retryTimeout, (l) -> {
action.handle(null);
});
} else {
action.handle(null);
}
}
private <T> void invokeFallback(Throwable reason, Promise<T> userFuture,
Function<Throwable, T> fallback, CircuitBreakerMetrics.Operation operation) {
if (fallback == null) {
// No fallback, mark the user future as failed.
userFuture.fail(reason);
return;
}
try {
T apply = fallback.apply(reason);
if (operation != null) {
operation.fallbackSucceed();
}
userFuture.complete(apply);
} catch (Exception e) {
userFuture.fail(e);
if (operation != null) {
operation.fallbackFailed();
}
}
}
private <T> void executeOperation(Context context, Handler<Promise<T>> operation, Promise<T> operationResult,
CircuitBreakerMetrics.Operation call) {
// We use an intermediate future to avoid the passed future to complete or fail after a timeout.
Promise<T> passedFuture = Promise.promise();
// Execute the operation
if (options.getTimeout() != -1) {
long timerId = vertx.setTimer(options.getTimeout(), (l) -> {
context.runOnContext(v -> {
// Check if the operation has not already been completed
if (!operationResult.future().isComplete()) {
if (call != null) {
call.timeout();
}
operationResult.fail(TimeoutException.INSTANCE);
}
// Else Operation has completed
});
});
passedFuture.future().onComplete(v -> vertx.cancelTimer(timerId));
}
try {
passedFuture.future().onComplete(ar -> {
context.runOnContext(v -> {
if (ar.failed()) {
if (!operationResult.future().isComplete()) {
operationResult.fail(ar.cause());
}
} else {
if (!operationResult.future().isComplete()) {
operationResult.complete(ar.result());
}
}
});
});
operation.handle(passedFuture);
} catch (Throwable e) {
context.runOnContext(v -> {
if (!operationResult.future().isComplete()) {
if (call != null) {
call.error();
}
operationResult.fail(e);
}
});
}
}
@Override
public <T> Future<T> executeWithFallback(Handler<Promise<T>> operation, Function<Throwable, T> fallback) {
Promise<T> future = Promise.promise();
executeAndReportWithFallback(future, operation, fallback);
return future.future();
}
public <T> Future<T> execute(Handler<Promise<T>> operation) {
return executeWithFallback(operation, fallback);
}
@Override
public <T> CircuitBreaker executeAndReport(Promise<T> resultFuture, Handler<Promise<T>> operation) {
return executeAndReportWithFallback(resultFuture, operation, fallback);
}
@Override
public String name() {
return name;
}
private synchronized void incrementFailures() {
rollingFailures.increment();
if (rollingFailures.count() >= options.getMaxFailures()) {
if (state != CircuitBreakerState.OPEN) {
open();
} else {
// No need to do it in the previous case, open() do it.
// If open has been called, no need to send update, it will be done by the `open` method.
sendUpdateOnEventBus();
}
} else {
// Number of failure has changed, send update.
sendUpdateOnEventBus();
}
}
/**
* For testing purpose only.
*
* @return retrieve the metrics.
*/
public JsonObject getMetrics() {
return metrics.toJson();
}
public CircuitBreakerOptions options() {
return options;
}
@Override
public CircuitBreaker retryPolicy(Function<Integer, Long> retryPolicy) {
this.retryPolicy = (failure, retryCount) -> retryPolicy.apply(retryCount);
return this;
}
@Override
public CircuitBreaker retryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
return this;
}
public static class RollingCounter {
private Map<Long, Long> window;
private long timeUnitsInWindow;
private TimeUnit windowTimeUnit;
public RollingCounter(long timeUnitsInWindow, TimeUnit windowTimeUnit) {
this.windowTimeUnit = windowTimeUnit;
this.window = new LinkedHashMap<>((int) timeUnitsInWindow + 1);
this.timeUnitsInWindow = timeUnitsInWindow;
}
public void increment() {
long timeSlot = windowTimeUnit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
Long current = window.getOrDefault(timeSlot, 0L);
window.put(timeSlot, ++current);
if (window.size() > timeUnitsInWindow) {
Iterator<Long> iterator = window.keySet().iterator();
if (iterator.hasNext()) {
window.remove(iterator.next());
}
}
}
public long count() {
long windowStartTime = windowTimeUnit.convert(System.currentTimeMillis() - windowTimeUnit.toMillis(timeUnitsInWindow), TimeUnit.MILLISECONDS);
return window.entrySet().stream().filter(entry -> entry.getKey() >= windowStartTime).mapToLong(entry -> entry.getValue()).sum();
}
public void reset() {
window.clear();
}
}
@SuppressWarnings("unchecked")
private abstract class Completion<T> implements Handler<AsyncResult<T>> {
final Context context;
final Promise<T> userFuture;
final Function<Throwable, T> fallback;
final CircuitBreakerMetrics.Operation call;
protected Completion(Context context, Promise<T> userFuture, Function<Throwable, T> fallback, CircuitBreakerMetrics.Operation call) {
this.context = context;
this.userFuture = userFuture;
this.fallback = fallback;
this.call = call;
}
@Override
public void handle(AsyncResult<T> ar) {
context.runOnContext(v -> {
if (failurePolicy.test(asFuture(ar))) {
failureAction();
if (call != null) {
call.failed();
}
if (options.isFallbackOnFailure()) {
Throwable throwable = ar.cause();
if(ar.cause().getCause() == null) {
if(this instanceof CircuitBreakerImpl.HalfOpenedCircuitCompletion) {
throwable = new HalfOpenCircuitException(ar.cause().getMessage());
}
}
//ar.cause();
invokeFallback(throwable, userFuture, fallback, call);
} else {
userFuture.fail(ar.cause());
}
} else {
if (call != null) {
call.complete();
}
reset();
//The event may pass due to a user given predicate. We still want to push up the failure for the user
//to do any work
userFuture.handle(ar);
}
});
}
private Future<T> asFuture(AsyncResult<T> ar) {
Future<T> result;
if (ar instanceof Future) {
result = (Future<T>) ar;
} else if (ar.succeeded()) {
result = Future.succeededFuture(ar.result());
} else {
result = Future.failedFuture(ar.cause());
}
return result;
}
protected abstract void failureAction();
}
private class ClosedCircuitCompletion<T> extends Completion<T> {
ClosedCircuitCompletion(Context context, Promise<T> userFuture, Function<Throwable, T> fallback, CircuitBreakerMetrics.Operation call) {
super(context, userFuture, fallback, call);
}
@Override
protected void failureAction() {
incrementFailures();
}
}
private class HalfOpenedCircuitCompletion<T> extends Completion<T> {
HalfOpenedCircuitCompletion(Context context, Promise<T> userFuture, Function<Throwable, T> fallback, CircuitBreakerMetrics.Operation call) {
super(context, userFuture, fallback, call);
}
@Override
protected void failureAction() {
open();
}
}
}

View File

@ -0,0 +1,354 @@
package io.vertx.circuitbreaker.impl;
import io.vertx.circuitbreaker.CircuitBreakerOptions;
import io.vertx.core.Vertx;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.JsonObject;
import org.HdrHistogram.Histogram;
/**
* Circuit breaker metrics.
*
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
public class CircuitBreakerMetrics {
private final CircuitBreakerImpl circuitBreaker;
private final String node;
private final long circuitBreakerResetTimeout;
private final long circuitBreakerTimeout;
// Global statistics
private final RollingWindow rollingWindow;
CircuitBreakerMetrics(Vertx vertx, CircuitBreakerImpl circuitBreaker, CircuitBreakerOptions options) {
this.circuitBreaker = circuitBreaker;
this.circuitBreakerTimeout = circuitBreaker.options().getTimeout();
this.circuitBreakerResetTimeout = circuitBreaker.options().getResetTimeout();
this.node = vertx.isClustered() ? ((VertxInternal) vertx).getClusterManager().getNodeId() : "local";
this.rollingWindow = new RollingWindow(options.getMetricsRollingWindow(), options.getMetricsRollingBuckets());
}
private synchronized void evictOutdatedOperations() {
rollingWindow.updateTime();
}
public void close() {
// do nothing by default.
}
class Operation {
final long begin;
private volatile long end;
private boolean complete;
private boolean failed;
private boolean timeout;
private boolean exception;
private boolean fallbackFailed;
private boolean fallbackSucceed;
private boolean shortCircuited;
Operation() {
begin = System.nanoTime();
}
synchronized void complete() {
end = System.nanoTime();
complete = true;
CircuitBreakerMetrics.this.complete(this);
}
synchronized void failed() {
if (timeout || exception) {
// Already completed.
return;
}
end = System.nanoTime();
failed = true;
CircuitBreakerMetrics.this.complete(this);
}
synchronized void timeout() {
end = System.nanoTime();
failed = false;
timeout = true;
CircuitBreakerMetrics.this.complete(this);
}
synchronized void error() {
end = System.nanoTime();
failed = false;
exception = true;
CircuitBreakerMetrics.this.complete(this);
}
synchronized void fallbackFailed() {
fallbackFailed = true;
}
synchronized void fallbackSucceed() {
fallbackSucceed = true;
}
synchronized void shortCircuited() {
end = System.nanoTime();
shortCircuited = true;
CircuitBreakerMetrics.this.complete(this);
}
long durationInMs() {
return (end - begin) / 1_000_000;
}
}
Operation enqueue() {
return new Operation();
}
public synchronized void complete(Operation operation) {
rollingWindow.add(operation);
}
public synchronized JsonObject toJson() {
JsonObject json = new JsonObject();
// Configuration
json.put("resetTimeout", circuitBreakerResetTimeout);
json.put("timeout", circuitBreakerTimeout);
json.put("metricRollingWindow", rollingWindow.getMetricRollingWindowSizeInMs());
json.put("name", circuitBreaker.name());
json.put("node", node);
// Current state
json.put("state", circuitBreaker.state());
json.put("failures", circuitBreaker.failureCount());
// Global metrics
addSummary(json, rollingWindow.totalSummary(), MetricNames.TOTAL);
// Window metrics
evictOutdatedOperations();
addSummary(json, rollingWindow.windowSummary(), MetricNames.ROLLING);
return json;
}
private void addSummary(JsonObject json, RollingWindow.Summary summary, MetricNames names) {
long calls = summary.count();
int errorCount = summary.failures + summary.exceptions + summary.timeouts;
json.put(names.operationCountName, calls - summary.shortCircuited);
json.put(names.errorCountName, errorCount);
json.put(names.successCountName, summary.successes);
json.put(names.timeoutCountName, summary.timeouts);
json.put(names.exceptionCountName, summary.exceptions);
json.put(names.failureCountName, summary.failures);
if (calls == 0) {
json.put(names.successPercentageName, 0);
json.put(names.errorPercentageName, 0);
} else {
json.put(names.successPercentageName, ((double) summary.successes / calls) * 100);
json.put(names.errorPercentageName, ((double) (errorCount) / calls) * 100);
}
json.put(names.fallbackSuccessCountName, summary.fallbackSuccess);
json.put(names.fallbackFailureCountName, summary.fallbackFailure);
json.put(names.shortCircuitedCountName, summary.shortCircuited);
addLatency(json, summary.statistics, names);
}
private void addLatency(JsonObject json, Histogram histogram, MetricNames names) {
json.put(names.latencyMeanName, histogram.getMean());
json.put(names.latencyName, new JsonObject()
.put("0", histogram.getValueAtPercentile(0))
.put("25", histogram.getValueAtPercentile(25))
.put("50", histogram.getValueAtPercentile(50))
.put("75", histogram.getValueAtPercentile(75))
.put("90", histogram.getValueAtPercentile(90))
.put("95", histogram.getValueAtPercentile(95))
.put("99", histogram.getValueAtPercentile(99))
.put("99.5", histogram.getValueAtPercentile(99.5))
.put("100", histogram.getValueAtPercentile(100)));
}
private enum MetricNames {
ROLLING("rolling"), TOTAL("total");
private final String operationCountName;
private final String errorCountName;
private final String successCountName;
private final String timeoutCountName;
private final String exceptionCountName;
private final String failureCountName;
private final String successPercentageName;
private final String errorPercentageName;
private final String fallbackSuccessCountName;
private final String fallbackFailureCountName;
private final String shortCircuitedCountName;
private final String latencyMeanName;
private final String latencyName;
MetricNames(String prefix){
operationCountName = prefix + "OperationCount";
errorCountName = prefix + "ErrorCount";
successCountName = prefix + "SuccessCount";
timeoutCountName = prefix + "TimeoutCount";
exceptionCountName = prefix + "ExceptionCount";
failureCountName = prefix + "FailureCount";
successPercentageName = prefix + "SuccessPercentage";
errorPercentageName = prefix + "ErrorPercentage";
fallbackSuccessCountName = prefix + "FallbackSuccessCount";
fallbackFailureCountName = prefix + "FallbackFailureCount";
shortCircuitedCountName = prefix + "ShortCircuitedCount";
latencyName = prefix + "Latency";
latencyMeanName = prefix + "LatencyMean";
}
}
private static class RollingWindow {
private final Summary history;
private final Summary[] buckets;
private final long bucketSizeInNs;
RollingWindow(long windowSizeInMs, int numberOfBuckets) {
if (windowSizeInMs % numberOfBuckets != 0) {
throw new IllegalArgumentException("Window size should be divisible by number of buckets.");
}
this.buckets = new Summary[numberOfBuckets];
for (int i = 0; i < buckets.length; i++) {
this.buckets[i] = new Summary();
}
this.bucketSizeInNs = 1_000_000 * windowSizeInMs / numberOfBuckets;
this.history = new Summary();
}
public void add(Operation operation) {
getBucket(operation.end).add(operation);
}
public Summary totalSummary() {
Summary total = new Summary();
total.add(history);
total.add(windowSummary());
return total;
}
public Summary windowSummary() {
Summary window = new Summary(buckets[0].bucketIndex);
for (Summary bucket : buckets) {
window.add(bucket);
}
return window;
}
public void updateTime() {
getBucket(System.nanoTime());
}
private Summary getBucket(long timeInNs) {
long bucketIndex = timeInNs / bucketSizeInNs;
//sample too old:
if (bucketIndex < buckets[0].bucketIndex) {
return history;
}
shiftIfNecessary(bucketIndex);
return buckets[(int) (bucketIndex - buckets[0].bucketIndex)];
}
private void shiftIfNecessary(long bucketIndex) {
long shiftUnlimited = bucketIndex - buckets[buckets.length - 1].bucketIndex;
if (shiftUnlimited <= 0) {
return;
}
int shift = (int) Long.min(buckets.length, shiftUnlimited);
// Add old buckets to history
for(int i = 0; i < shift; i++) {
history.add(buckets[i]);
}
System.arraycopy(buckets, shift, buckets, 0, buckets.length - shift);
for(int i = buckets.length - shift; i < buckets.length; i++) {
buckets[i] = new Summary(bucketIndex + i + 1 - buckets.length);
}
}
public long getMetricRollingWindowSizeInMs() {
return bucketSizeInNs * buckets.length / 1_000_000;
}
private static class Summary {
final long bucketIndex;
final Histogram statistics;
private int successes;
private int failures;
private int exceptions;
private int timeouts;
private int fallbackSuccess;
private int fallbackFailure;
private int shortCircuited;
private Summary() {
this(-1);
}
private Summary(long bucketIndex) {
this.bucketIndex = bucketIndex;
statistics = new Histogram(2);
}
public void add(Summary other) {
statistics.add(other.statistics);
successes += other.successes;
failures += other.failures;
exceptions += other.exceptions;
timeouts += other.timeouts;
fallbackSuccess += other.fallbackSuccess ;
fallbackFailure += other.fallbackFailure ;
shortCircuited += other.shortCircuited ;
}
public void add(Operation operation) {
statistics.recordValue(operation.durationInMs());
if (operation.complete) {
successes++;
} else if (operation.failed) {
failures++;
} else if (operation.exception) {
exceptions++;
} else if (operation.timeout) {
timeouts++;
}
if (operation.fallbackSucceed) {
fallbackSuccess++;
} else if (operation.fallbackFailed) {
fallbackFailure++;
}
if (operation.shortCircuited) {
shortCircuited++;
}
}
public long count() {
return statistics.getTotalCount();
}
}
}
}

View File

@ -0,0 +1,139 @@
package io.vertx.circuitbreaker.impl;
import io.vertx.circuitbreaker.CircuitBreakerState;
import io.vertx.circuitbreaker.HystrixMetricHandler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Implements a handler to serve the Vert.x circuit breaker metrics as a Hystrix circuit
* breaker.
*
* @author <a href="http://escoffier.me">Clement Escoffier</a>
*/
public class HystrixMetricEventStream implements HystrixMetricHandler {
private final List<HttpServerResponse> connections = Collections.synchronizedList(new LinkedList<>());
private AtomicInteger counter = new AtomicInteger();
public HystrixMetricEventStream(Vertx vertx, String address, boolean localOnly) {
Objects.requireNonNull(vertx);
Objects.requireNonNull(address);
EventBus eventBus = vertx.eventBus();
MessageConsumer<JsonObject> consumer = localOnly ? eventBus.localConsumer(address) : eventBus.consumer(address);
consumer
.handler(message -> {
JsonObject json = build(message.body());
int id = counter.incrementAndGet();
String chunk = json.encode() + "\n\n";
connections.forEach(resp -> {
try {
resp.write("id" + ": " + id + "\n");
resp.write("data:" + chunk);
} catch (IllegalStateException e) {
// Connection close.
}
});
});
}
private JsonObject build(JsonObject body) {
String state = body.getString("state");
JsonObject json = new JsonObject();
json.put("type", "HystrixCommand");
json.put("name", body.getString("name"));
json.put("group", body.getString("node"));
json.put("currentTime", System.currentTimeMillis());
json.put("isCircuitBreakerOpen", state.equalsIgnoreCase(CircuitBreakerState.OPEN.toString()));
json.put("errorPercentage", body.getInteger("rollingErrorPercentage", 0));
json.put("errorCount", body.getInteger("rollingErrorCount", 0));
json.put("requestCount", body.getInteger("rollingOperationCount", 0));
json.put("rollingCountCollapsedRequests", 0);
json.put("rollingCountExceptionsThrown", body.getInteger("rollingExceptionCount", 0));
json.put("rollingCountFailure", body.getInteger("rollingFailureCount", 0));
json.put("rollingCountTimeout", body.getInteger("rollingTimeoutCount", 0));
json.put("rollingCountFallbackFailure", body.getInteger("rollingFallbackFailureCount", 0));
json.put("rollingCountFallbackRejection", body.getInteger("fallbackRejection", 0));
json.put("rollingCountFallbackSuccess", body.getInteger("rollingFallbackSuccessCount", 0));
json.put("rollingCountResponsesFromCache", 0);
json.put("rollingCountSemaphoreRejected", 0);
json.put("rollingCountShortCircuited", body.getInteger("rollingShortCircuitedCount", 0));
json.put("rollingCountSuccess", body.getInteger("rollingSuccessCount", 0));
json.put("rollingCountThreadPoolRejected", 0);
json.put("rollingCountTimeout", body.getInteger("rollingTimeoutCount", 0));
json.put("rollingCountBadRequests", 0);
json.put("rollingCountEmit", 0);
json.put("rollingCountFallbackEmit", 0);
json.put("rollingCountFallbackMissing", 0);
json.put("rollingMaxConcurrentExecutionCount", 0);
json.put("currentConcurrentExecutionCount", 0);
json.put("latencyExecute_mean", body.getInteger("rollingLatencyMean", 0));
json.put("latencyExecute", body.getJsonObject("rollingLatency", new JsonObject()));
json.put("latencyTotal_mean", body.getInteger("totalLatencyMean", 0));
json.put("latencyTotal", body.getJsonObject("totalLatency", new JsonObject()));
json.put("propertyValue_circuitBreakerRequestVolumeThreshold", 0);
json.put("propertyValue_circuitBreakerSleepWindowInMilliseconds", body.getLong("resetTimeout", 0L));
json.put("propertyValue_circuitBreakerErrorThresholdPercentage", 0);
json.put("propertyValue_circuitBreakerForceOpen", false);
json.put("propertyValue_circuitBreakerForceClosed", false);
json.put("propertyValue_circuitBreakerEnabled", true);
json.put("propertyValue_executionIsolationStrategy", "THREAD");
json.put("propertyValue_executionIsolationThreadTimeoutInMilliseconds", body.getLong("timeout", 0L));
json.put("propertyValue_executionIsolationThreadInterruptOnTimeout", true);
json.put("propertyValue_executionIsolationThreadPoolKeyOverride", "");
json.put("propertyValue_executionIsolationSemaphoreMaxConcurrentRequests", 0);
json.put("propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests", 0);
json.put("propertyValue_metricsRollingStatisticalWindowInMilliseconds", body.getLong("metricRollingWindow", 0L));
json.put("propertyValue_requestCacheEnabled", false);
json.put("propertyValue_requestLogEnabled", false);
json.put("reportingHosts", 1);
return json;
}
@Override
public void handle(RoutingContext rc) {
HttpServerResponse response = rc.response();
response
.setChunked(true)
.putHeader(HttpHeaders.CONTENT_TYPE, "text/event-stream")
.putHeader(HttpHeaders.CACHE_CONTROL, "no-cache")
.putHeader(HttpHeaders.CONNECTION, HttpHeaders.KEEP_ALIVE);
rc.request().connection()
.closeHandler(v -> {
connections.remove(response);
endQuietly(response);
})
.exceptionHandler(t -> {
connections.remove(response);
rc.fail(t);
});
connections.add(response);
}
private static void endQuietly(HttpServerResponse response) {
if (response.ended()) {
return;
}
try {
response.end();
} catch (IllegalStateException e) {
// Ignore it.
}
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright (c) 2011-2016 The original author or authors
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
@ModuleGen(name = "vertx-circuit-breaker", groupPackage = "io.vertx")
package io.vertx.circuitbreaker;
import io.vertx.codegen.annotations.ModuleGen;

View File

@ -14,9 +14,7 @@ import java.net.ConnectException;
import java.util.List;
import java.util.function.BiFunction;
import org.apache.commons.lang3.StringUtils;
import com.sf.vertx.init.DynamicBuildServer;
import com.sf.vertx.handle.AppConfigHandle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
@ -85,7 +83,7 @@ class SharedClientHttpStreamEndpoint extends ClientHttpEndpointBase<Lease<HttpCl
@Override
public void connect(ContextInternal context, Listener listener,
Handler<AsyncResult<ConnectResult<HttpClientConnection>>> handler) {
DynamicBuildServer.appCircuitBreaker.executeWithFallback(promise -> {
AppConfigHandle.CONNECTION_CIRCUIT_BREAKER.executeWithFallback(promise -> {
connector.httpConnect(context, ar -> {
if (ar.succeeded()) {
incRefCount();

View File

@ -10,7 +10,6 @@
*/
package io.vertx.httpproxy.impl;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -21,11 +20,14 @@ import java.util.function.BiFunction;
import org.apache.commons.lang3.StringUtils;
import com.sf.vertx.api.pojo.DataSecurity;
import com.sf.vertx.constans.SacErrorCode;
import com.sf.vertx.handle.AppConfigHandle;
import com.sf.vertx.security.MainSecurity;
import com.sf.vertx.service.impl.AppConfigServiceImpl;
import com.sf.vertx.utils.ProxyTool;
import io.vertx.circuitbreaker.CircuitBreaker;
import io.vertx.circuitbreaker.HalfOpenCircuitException;
import io.vertx.circuitbreaker.OpenCircuitException;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
@ -36,6 +38,7 @@ import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
@ -175,20 +178,10 @@ public class ReverseProxy implements HttpProxy {
* @param sc
*/
private void end(ProxyRequest proxyRequest, int sc) {
// TODO 处理反向代理返回结果
if (ProxyTool._ERROR.containsKey(sc)) {
Buffer buffer = Buffer.buffer(ProxyTool._ERROR.get(sc));
proxyRequest.response().release().setStatusCode(sc).putHeader("content-type", "application/json")
.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buffer.length())).setBody(Body.body(buffer))
.send();
} else {
// proxyRequest.response().release().setStatusCode(sc).putHeader(HttpHeaders.CONTENT_LENGTH, "0").setBody(null)
// .send();
Buffer buffer = Buffer.buffer(ProxyTool.DEFAULT_ERROR_MSG);
proxyRequest.response().release().setStatusCode(sc).putHeader("content-type", "application/json")
.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(ProxyTool.DEFAULT_ERROR_MSG.length()))
.setBody(Body.body(buffer)).send();
}
JsonObject json = SacErrorCode.returnErrorMsg(sc);
proxyRequest.response().release().setStatusCode(500).putHeader("content-type", "application/json")
.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(json.size())).setBody(Body.body(json.toBuffer()))
.send();
}
private Future<HttpClientRequest> resolveOrigin(HttpServerRequest proxiedRequest) {
@ -247,36 +240,178 @@ public class ReverseProxy implements HttpProxy {
}
}
private Future<ProxyResponse> sendProxyRequest(ProxyRequest proxyRequest) {
// TODO 服务熔断策略, 如果已经熔断,将剔除负载均衡策略
// 发起一个请求
String sacAppHeaderKey = proxyRequest.headers().get(AppConfigServiceImpl.getSacAppHeaderKey());
if (StringUtils.isNotBlank(sacAppHeaderKey) && AppConfigServiceImpl.appDataSecurity(sacAppHeaderKey)) {
return Future.future(p -> {
SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest());
private Future<ProxyResponse> breakerAndSecurity(CircuitBreaker circuitBreaker, ProxyRequest proxyRequest) {
String appCode = proxyRequest.headers().get(AppConfigHandle.getAppCodeHeaderKey());
SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest());
return Future.future(p -> {
circuitBreaker.executeWithFallback(promise -> {
mainWebClient.post(socketAddress.port(), socketAddress.host(), proxyRequest.getURI())
.putHeaders(proxyRequest.headers())
.sendJson(bodyDecrypt(ctx.getBodyAsString(), sacAppHeaderKey), h -> {
.sendJson(bodyDecrypt(ctx.getBodyAsString(), appCode), h -> {
if (h.succeeded()) {
log.info("==========uri:{},response http code:{}", proxyRequest.getURI(),
h.result().statusCode());
if (h.result().statusCode() == 200) {
// promise.complete();
promise.complete("1");
// 释放资源
proxyRequest.release();
JsonObject responseData = h.result().bodyAsJsonObject();
log.info("responseData:{}", responseData);
// 加密
String dataStr = bodyEncrypt(responseData.toString(), appCode);
log.info("aesEncrypt dataStr:{}", dataStr);
Buffer buffer = Buffer.buffer(dataStr);
ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200)
.putHeader("content-type", "application/json")
.setBody(Body.body(buffer));
p.complete(proxyResponse);
} else {
// Throwable throwable = new Throwable("error port");
// promise.fail(throwable);
promise.fail("2");
}
} else {
// end(proxyRequest, 502);
// Throwable throwable = new Throwable("error port");
// promise.fail(throwable);
promise.fail("2");
}
});
}, v -> {
// 需要传递当前状态half-open , close, 还是统计失败次数
log.info(circuitBreaker.name() + " executed when the circuit is opened:{}", v.getMessage());
if (v instanceof HalfOpenCircuitException) {
log.info(circuitBreaker.name() + " half open circuit");
} else if (v instanceof OpenCircuitException) {
log.info(circuitBreaker.name() + " open circuit");
} else if (v instanceof NoStackTraceThrowable) {
log.info(circuitBreaker.name() + " close circuit");
}
return "3";
}, ar -> {
// Do something with the result
log.info(circuitBreaker.name() + " interface failed result.{} ", ar);
// String
if (StringUtils.equals(ar.result(), "1") == false) {
end(proxyRequest, 10016);
}
// Throwable
// if(ar.result() != null) {
// end(proxyRequest, 502);
// }
});
});
}
private Future<ProxyResponse> breaker(CircuitBreaker circuitBreaker, ProxyRequest proxyRequest) {
SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest());
return Future.future(p -> {
circuitBreaker.executeWithFallback(promise -> {
mainWebClient.post(socketAddress.port(), socketAddress.host(), proxyRequest.getURI())
.putHeaders(proxyRequest.headers()).sendJson(ctx.getBodyAsString(), h -> {
if (h.succeeded()) {
log.info("==========uri:{},response http code:{}", proxyRequest.getURI(),
h.result().statusCode());
if (h.result().statusCode() == 200) {
// promise.complete();
promise.complete("1");
// 释放资源
proxyRequest.release();
JsonObject responseData = h.result().bodyAsJsonObject();
log.info("responseData:{}", responseData);
ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200)
.putHeader("content-type", "application/json")
.setBody(Body.body(responseData.toBuffer()));
p.complete(proxyResponse);
} else {
promise.fail("2");
}
} else {
promise.fail("2");
}
});
}, v -> {
// 需要传递当前状态half-open , close, 还是统计失败次数
log.info(circuitBreaker.name() + " executed when the circuit is opened:{}", v.getMessage());
if (v instanceof HalfOpenCircuitException) {
log.info(circuitBreaker.name() + " half open circuit");
} else if (v instanceof OpenCircuitException) {
log.info(circuitBreaker.name() + " open circuit");
} else if (v instanceof NoStackTraceThrowable) {
log.info(circuitBreaker.name() + " close circuit");
}
return "3";
}, ar -> {
log.info(circuitBreaker.name() + " interface failed result.{} ", ar);
if (StringUtils.equals(ar.result(), "1") == false) {
end(proxyRequest, 10016);
}
});
});
}
private Future<ProxyResponse> security(CircuitBreaker circuitBreaker, ProxyRequest proxyRequest) {
String appCode = proxyRequest.headers().get(AppConfigHandle.getAppCodeHeaderKey());
SocketAddress socketAddress = ProxyTool.resolveOriginAddress(proxyRequest.proxiedRequest());
return Future.future(p -> {
mainWebClient.post(socketAddress.port(), socketAddress.host(), proxyRequest.getURI())
.putHeaders(proxyRequest.headers()).sendJson(bodyDecrypt(ctx.getBodyAsString(), appCode), h -> {
if (h.succeeded()) {
log.info("==========uri:{},response http code:{}", proxyRequest.getURI(),
h.result().statusCode());
if (h.result().statusCode() == 200) {
// 释放资源
proxyRequest.release();
JsonObject responseData = h.result().bodyAsJsonObject();
log.info("responseData:{}", responseData);
// 加密
String dataStr = bodyEncrypt(responseData.toString(), sacAppHeaderKey);
String dataStr = bodyEncrypt(responseData.toString(), appCode);
log.info("aesEncrypt dataStr:{}", dataStr);
Buffer buffer = Buffer.buffer(dataStr);
ProxyResponse proxyResponse = proxyRequest.response().setStatusCode(200)
.putHeader("content-type", "application/json").setBody(Body.body(buffer));
p.complete(proxyResponse);
} else {
end(proxyRequest, 502);
}
});
});
} else {
log.info("interface retrun error.{}", proxyRequest.getURI());
end(proxyRequest, 502);
}
});
});
}
private Future<ProxyResponse> sendProxyRequest(ProxyRequest proxyRequest) {
// 判断<br/>
// 1是否配置全局加解密.<br/>
// 2apiCode 配置熔断
String appCode = proxyRequest.headers().get(AppConfigHandle.getAppCodeHeaderKey());
String apiCode = proxyRequest.headers().get(AppConfigHandle.getApiCodeHeaderKey());
String keyCircuitBreaker = appCode + ":" + apiCode + ":" + "CIRCUIT_BREAKER";
// 熔断
CircuitBreaker circuitBreaker = AppConfigHandle.getApiCodeCircuitBreaker(keyCircuitBreaker);
boolean isDataSecurity = AppConfigHandle.isDataSecurity(appCode);
if (isDataSecurity || circuitBreaker != null) {
try {
if (isDataSecurity && circuitBreaker != null) {
return breakerAndSecurity(circuitBreaker, proxyRequest);
} else if (isDataSecurity) {
return security(circuitBreaker, proxyRequest);
} else if (circuitBreaker != null) {
return breaker(circuitBreaker, proxyRequest);
} else {
log.info("not match any condition.appCode:{},apiCode:{}", appCode, apiCode);
throw new HttpException(10013);
}
} catch (Exception e) {
e.printStackTrace();
throw new HttpException(10014);
}
} else {
Future<HttpClientRequest> f = resolveOrigin(proxyRequest.proxiedRequest());
f.onFailure(err -> {
log.info("error:{}", err);
// Should this be done here ? I don't think so
HttpServerRequest proxiedRequest = proxyRequest.proxiedRequest();
proxiedRequest.resume();
@ -287,7 +422,6 @@ public class ReverseProxy implements HttpProxy {
end(proxyRequest, 502);
});
});
return f.compose(a -> sendProxyRequest(proxyRequest, a));
}
@ -316,28 +450,28 @@ public class ReverseProxy implements HttpProxy {
return sendResponse();
}
private String bodyEncrypt(String body, String sacAppHeaderKey) {
DataSecurity dataSecurity = AppConfigServiceImpl.getAppConfig(sacAppHeaderKey).getDataSecurity();
private String bodyEncrypt(String body, String appCode) {
DataSecurity dataSecurity = AppConfigHandle.getAppConfig(appCode).getDataSecurity();
switch (dataSecurity.getAlgorithm()) {
case "AES":
return MainSecurity.aesEncrypt(body, dataSecurity.getPrivateKey());
default:
break;
}
log.info(" appcode:{}, encrypt key config is error.", sacAppHeaderKey);
throw new HttpException(10001);
log.info(" appCode:{}, encrypt key config is error.", appCode);
throw new HttpException(10011);
}
private String bodyDecrypt(String body, String sacAppHeaderKey) {
DataSecurity dataSecurity = AppConfigServiceImpl.getAppConfig(sacAppHeaderKey).getDataSecurity();
private String bodyDecrypt(String body, String appCode) {
DataSecurity dataSecurity = AppConfigHandle.getAppConfig(appCode).getDataSecurity();
switch (dataSecurity.getAlgorithm()) {
case "AES":
return MainSecurity.aesDecrypt(body, dataSecurity.getPrivateKey());
default:
break;
}
log.info(" appcode:{}, decrypt key config is error.", sacAppHeaderKey);
throw new HttpException(10001);
log.info(" appCode:{}, decrypt key config is error.", appCode);
throw new HttpException(10011);
}
}
}

View File

@ -0,0 +1,33 @@
package com.sf.vertx;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestCaffeine {
@Test
public void init() {
Cache<String, Object> cache = Caffeine.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS)
//.expireAfterAccess(1, TimeUnit.SECONDS)
//.maximumSize(10)
.build();
cache.put("hello","1");
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
}
Object ifPresent = cache.getIfPresent("hello");
log.info("data:{}", ifPresent);
ifPresent = cache.getIfPresent("hello");
log.info("data:{}", ifPresent);
}
}

View File

@ -1,7 +1,7 @@
package com.sf.vertx;
import com.sf.vertx.api.pojo.VertxConfig;
import com.sf.vertx.service.impl.AppConfigServiceImpl;
import com.sf.vertx.handle.AppConfigHandle;
import io.vertx.circuitbreaker.CircuitBreaker;
import io.vertx.circuitbreaker.CircuitBreakerOptions;
@ -17,7 +17,7 @@ public class TestCircuitBreaker {
private static int port;
public static void main(String[] args) {
VertxConfig vertxConfig = AppConfigServiceImpl.getVertxConfig();
VertxConfig vertxConfig = AppConfigHandle.getVertxConfig();
// TODO 编解码线程池,后面优化协程等方式
VertxOptions vertxOptions = new VertxOptions();