package com.sf.vertx; import org.junit.Test; import com.sf.vertx.api.pojo.VertxConfig; import com.sf.vertx.handle.AppConfigHandler; import io.vertx.circuitbreaker.CircuitBreaker; import io.vertx.circuitbreaker.CircuitBreakerOptions; import io.vertx.circuitbreaker.HalfOpenCircuitException; import io.vertx.circuitbreaker.OpenCircuitException; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpMethod; import io.vertx.core.impl.NoStackTraceThrowable; import lombok.extern.slf4j.Slf4j; @Slf4j public class TestCircuitBreaker { private static int port; CircuitBreaker rateLimiter; @Test public void rateLimiter() { VertxOptions vertxOptions = new VertxOptions(); Vertx VERTX = Vertx.vertx(vertxOptions); // apiCode熔断 rateLimiter = CircuitBreaker.create("rateLimiter-circuit-breaker", VERTX, new CircuitBreakerOptions().setMaxFailures(2) // 最大失败数 .setFailuresRollingWindow(5 * 1000) // 毫秒 .setTimeout(-1) // 超时时间 .setFallbackOnFailure(true) // 失败后是否调用回退函数(fallback) .setResetTimeout(-1) // 在开启状态下,尝试重试之前所需时间 ).openHandler(v -> { log.info(" Circuit open"); }).halfOpenHandler(v -> { log.info("Circuit halfOpen"); }).closeHandler(v -> { log.info("Circuit close"); }); for (int i = 0; i < 20; i++) { try { Thread.sleep(2000L); } catch (InterruptedException e) { } rateLimiter.executeWithFallback(promise -> { promise.fail("1"); }, v -> { // 需要传递当前状态half-open , close, 还是统计失败次数 //log.info(" executed when the circuit is opened:{}", v.getMessage()); if (v instanceof HalfOpenCircuitException) { log.info(" half open circuit"); } else if (v instanceof OpenCircuitException) { log.info(" open circuit"); } else if (v instanceof NoStackTraceThrowable) { log.info(" close circuit"); } return "3"; }, ar -> { // Do something with the result log.info(" interface failed result.{} ", ar); }); } try { Thread.sleep(5000L); } catch (InterruptedException e) { } } public static void main(String[] args) { VertxConfig vertxConfig = AppConfigHandler.getVertxConfig(); // TODO 编解码线程池,后面优化协程等方式 VertxOptions vertxOptions = new VertxOptions(); Vertx VERTX = Vertx.vertx(vertxOptions); CircuitBreakerOptions options = new CircuitBreakerOptions().setMaxFailures(3).setTimeout(5000) .setFallbackOnFailure(true); CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", VERTX, options).openHandler(v -> { System.out.println("Circuit opened"); }).closeHandler(v -> { System.out.println("Circuit closed"); }); for (int i = 0; i < 20; i++) { port = 9199; if (i % 2 == 0) { port = 9198; log.info("i:{},port:{}", i, port); } breaker.executeWithFallback(promise -> { VERTX.createHttpClient().request(HttpMethod.POST, port, "localhost", "/vertx/body").compose(req -> { return req.send("body").compose(resp -> { if (resp.statusCode() != 200) { return Future.failedFuture("HTTP error"); } else { log.info("success req:{}", req.getPort()); return resp.body().map(Buffer::toString); } }); }).onComplete(promise); }, v -> { // Executed when the circuit is opened return "Hello (fallback)"; }, ar -> { // Do something with the result System.out.println("Result: " + ar.result()); }); } } }