您的当前位置:首页正文

Spring Cloud Open Feign系列【10】Feign 、 Ribbon 重试机制源码分析

2024-11-07 来源:个人技术集锦

前言

重试机制: 在发生异常时,重新尝试请求,多次还是失败时,才会抛出异常。

应用场景: 可能由于网络抖动出现第一次调用失败,尝试几次就可以恢复正常。

比如Spring提供的声明式的重试类库Spring-Retry

Feign 重试机制

Feign Core 也提供了自己的重试机制,基于Retryer接口。

Retryer接口

Retryer接口位于 feign-core包中,声明了一个连续重试方法,及一个从不重试的实例对象。

public interface Retryer extends Cloneable {
	// 不重试Retryer 示例对象,直接抛出异常
    Retryer NEVER_RETRY = new Retryer() {
        public void continueOrPropagate(RetryableException e) {
            throw e;
        }

        public Retryer clone() {
            return this;
        }
    };
	// 连续重试
    void continueOrPropagate(RetryableException var1);
}

Default 类

Default 是Retryer 接口的默认实现类,也就是Feign 的默认重试策略。

 public static class Default implements Retryer {
 		// 最大访问次数(包含了第一次和重试的次数)
        private final int maxAttempts;
        // 重试的间隔时间
        private final long period;
        // 最大重试间隔
        private final long maxPeriod;
        // 当前访问次数
        int attempt;
        // 总的重试间隔
        long sleptForMillis;
		// 默认配置(重试间隔100毫秒、最大重试间隔1S、最多访问5次)
        public Default() {
            this(100L, TimeUnit.SECONDS.toMillis(1L), 5);
        }

        public Default(long period, long maxPeriod, int maxAttempts) {
            this.period = period;
            this.maxPeriod = maxPeriod;
            this.maxAttempts = maxAttempts;
            this.attempt = 1;
        }

        protected long currentTimeMillis() {
            return System.currentTimeMillis();
        }
		// 默认重试算法
        public void continueOrPropagate(RetryableException e) {
        	 // 如果重试的次数attempt大于最大重试次数,直接抛出异常
        	 // attempt的初始值为1 ,i++是先进行运算,所以就表示当前重试次数,比较结束了之后再+1,
        	 // 当第attempt 为5时,也就是第五次进入这里,条件为true,则会直接抛出异常,不再进行请求发送。
        	 // 所以最终是,重试了四次,总共请求了5次。
            if (this.attempt++ >= this.maxAttempts) {
                throw e;
            } else {
                long interval;
                // 响应数据是否包含了 Retry-After 头,这个头用来告诉用户代理需要等待多长时间之后才能继续发送请求
                if (e.retryAfter() != null) {
                    interval = e.retryAfter().getTime() - this.currentTimeMillis();
                    if (interval > this.maxPeriod) {
                        interval = this.maxPeriod;
                    }

                    if (interval < 0L) {
                        return;
                    }
                } else {
                	// 计算重试时间间隔
                    interval = this.nextMaxInterval();
                }

                try {
                	// 线程睡眠时间间隔 第一次为 150ms
                    Thread.sleep(interval);
                } catch (InterruptedException var5) {
                    Thread.currentThread().interrupt();
                    throw e;
                }
				// 记录时间间隔总数
                this.sleptForMillis += interval;
            }
        }
		// 计算公式=》 配置的间隔时间(默认100ms)* (1.5 的(当前重试次数)次方)
		// 接着判断 当前时间时间,是否超出了最大限制,超过了则返回最大时间间隔。
        long nextMaxInterval() {
            long interval = (long)((double)this.period * Math.pow(1.5D, (double)(this.attempt - 1)));
            return interval > this.maxPeriod ? this.maxPeriod : interval;
        }

        public Retryer clone() {
            return new Retryer.Default(this.period, this.maxPeriod, this.maxAttempts);
        }
    }

Feign 重试机制源码分析

默认是不进行重试,所以需要配置。

    @Bean
    Retryer feignRetryer() {
        return new Retryer.Default();
    }

在订单服务配置一个超长的线程睡眠,我们手动触发一个读取超时,然后由这个超时异常,由最里层往上进行分析,因为之前分析过很多次执行流程源码了,这里只着重看下异常。

这个异常是有底层HTTP 框架抛出的,会被负载均衡客户端捕获到,这里catch 了Exception,所以异常都会被捕获,捕获了之后,会转化为ClientException

    static FeignException errorExecuting(Request request, IOException cause) {
        return new RetryableException(-1, String.format("%s executing %s %s", cause.getMessage(), request.httpMethod(), request.url()), request.httpMethod(), cause, (Date)null, request);
    }

最终,RetryableException还是会被方法处理器的invoke方法所捕获到,进行重试,直到重试到最大次数时,还是失败,就会抛出异常,结束循环,执行失败。

可以看到方法执行器,每次执行的时候,都会创建一个重试器,而且方法执行的时候,是一个死循环,只有抛出异常时或者正常返回时,才会结束。

    public Object invoke(Object[] argv) throws Throwable {
        RequestTemplate template = this.buildTemplateFromArgs.create(argv);
        Options options = this.findOptions(argv);
        // 每次请求都会创建一个重试处理器
        Retryer retryer = this.retryer.clone();
        while(true) {
            try {
                return this.executeAndDecode(template, options);
            } catch (RetryableException var9) {
                RetryableException e = var9;
				// 捕获到RetryableException ,则会调用重试处理器,主要是进行执行时间计算,没有抛出异常,则继续循环 
                try {
                    retryer.continueOrPropagate(e);
                } catch (RetryableException var8) {
                    Throwable cause = var8.getCause();
                    if (this.propagationPolicy == ExceptionPropagationPolicy.UNWRAP && cause != null) {
                        throw cause;
                    }

                    throw var8;
                }

                if (this.logLevel != Level.NONE) {
                    this.logger.logRetry(this.metadata.configKey(), this.logLevel);
                }
            }
        }
    }

某些疑问

多次重试请求,发出请求的策略?

默认的重试算法,会有一个间隔时间,线程会休眠,然后再重新执行请求(在Default 源码中已经分析过了)。

计算公式=》 配置的间隔时间(默认100ms)* (1.5 的(当前重试次数)次方)。

long interval = (long)((double)this.period * Math.pow(1.5D, (double)(this.attempt - 1)));
            return interval > this.maxPeriod ? this.maxPeriod : interval;

这里可以举个例子,默认最大重试次数为5次,最大间隔时间为1秒,第一次休眠时间为100ms,由计算公式我们可以推倒出来如下结果:

1. 第一次重试,100*(1.51次方)间隔150 毫秒后再次请求
2. 第二次重试,100*(1.52次方)间隔225 毫秒后再次请求
3. 第三次重试,100*(1.53次方)间隔337 毫秒后再次请求
4. 第四次重试,100*(1.54次方)间隔506 毫秒后再次请求,
5. 第五次,这个时候,访问次数attempt 达到了最大值5次,不会再重试了,而是直接抛出异常,结束请求

其他异常,会被重试吗?

通过以上源码分析,可知,只有IO 异常时,才会解析为可重试异常,进行重试操作。

Ribbon 重试机制

和Feign 一样,Ribbon 也是有重试机制,接下来按照上面的套路分析下Ribbon 。

RxJava

RxJava - JVM响应式扩展Reactive Extensions 用于使用Java VM的可观察序列编写异步和基于事件的程序的库。

RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。RxJS 中含有两个基本概念:ObservablesObserver。Observables作为被观察者,是一个值或事件的流集合;而Observer则作为观察者,根据 Observables 进行处理。

Observables 与 Observer 之间的订阅发布关系(观察者模式) 如下:

订阅:Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable。
发布:Observable 通过回调 next 方法向 Observer 发布事件。

RetryHandler 接口

RetryHandler接口是 Ribbon 的重试处理器,用来处理重试逻辑。

public interface RetryHandler {
    RetryHandler DEFAULT = new DefaultLoadBalancerRetryHandler();
	// 判断当前异常是否应该重试
    boolean isRetriableException(Throwable var1, boolean var2);
	// 是否是熔断类型异常;
    boolean isCircuitTrippingException(Throwable var1);
	// 获取当前节点最大重试次数
    int getMaxRetriesOnSameServer();
	// 获取调用不同节点的最大重试次数
    int getMaxRetriesOnNextServer();
}

DefaultLoadBalancerRetryHandler

DefaultLoadBalancerRetryHandlerRetryHandler接口的默认实现类,重点是看它的属性和构造函数:

	// 定义了可以重试的异常
    private List<Class<? extends Throwable>> retriable = Lists.newArrayList(new Class[]{ConnectException.class, SocketTimeoutException.class});
    // 定义了可以重试的异常
    private List<Class<? extends Throwable>> circuitRelated = Lists.newArrayList(new Class[]{SocketException.class, SocketTimeoutException.class});
    // 对应 MaxAutoRetries 配置
    protected final int retrySameServer;
    // 对应 MaxAutoRetriesNextServer 配置
    protected final int retryNextServer;
    //  是否开启重试
    protected final boolean retryEnabled;
	// 没有参数时,表示不重试
    public DefaultLoadBalancerRetryHandler() {
        this.retrySameServer = 0;
        this.retryNextServer = 0;
        this.retryEnabled = false;
    }

RequestSpecificRetryHandler

RequestSpecificRetryHandler 也实现了RetryHandler接口,从名字上看,是具有请求特征的重试处理器,每次请求时,Ribbon 都会创建单独的一个RequestSpecificRetryHandler (这是实际使用的处理器) ,也就是会和当前客户端的配置(IClientConfig对象)绑定,实现不同请求,不同配置策略。

可以看到,RequestSpecificRetryHandler代理了一个RetryHandler,默认是DefaultLoadBalancerRetryHandler,看他们的参数,可以知道,默认Ribbon 重试机制也是关闭的。

LoadBalancerCommand

FeignLoadBalancerexecuteWithLoadBalancer方法中调用buildLoadBalancerCommand方法构造LoadBalancerCommand对象。这个类是将Ribbon将请求转为RxJava API调用的实现。

该类的selectServer 方法,会在注册中心中,根据负载均衡算法获取到一个健康的可用服务,然后返回一个Observable对象,这是一个观察者对象,创建的时候传入了一个Subscriber 订阅者对象,当Observable对象被订阅时,Subscriber 中的call 方法会执行。

	// 定义一个事件源
	// Create操作符是用来创建一个Observable
    private Observable<Server> selectServer() {
        return Observable.create(new OnSubscribe<Server>() {
        	// Observable 包含了一个OnSubscribe 对象
        	// 当Observable被订阅(subscribe)时,OnSubscribe接口的call方法会被执行
            @Override
            public void call(Subscriber<? super Server> next) {
                try {
                	// 事件源被订阅时,执行call
                	// 负载均衡查询可用服务
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    // 
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
                    next.onError(e);
                }
            }
        });
    }

submit 方法中,会创建一个Observable 对象,用来观察请求执行状态,如果失败,则会重试,达到最大次数,抛出异常,结束重试。

 public Observable<T> submit(final ServerOperation<T> operation) {
        // 省略.....
        // 获取重试器中 最大重试次数
        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
		
        // (server == null ? selectServer() : Observable.just(server)) 
        // 获取可用服务的Observable 对象
        Observable<T> o = (server == null ? selectServer() : Observable.just(server))
        		// concatMap发射的数据集是有序的
                .concatMap(new Func1<Server, Observable<T>>() {
                    @Override
                    // 为选中的每台服务器执行调用
                    public Observable<T> call(Server server) {
                        context.setServer(server);
                        // 获取当前服务的监控记录
                        final ServerStats stats = loadBalancerContext.getServerStats(server);
                        // 为每次尝试和重试调用
                        Observable<T> o = Observable
                                .just(server)
                                .concatMap(new Func1<Server, Observable<T>>() {
                                    @Override
                                    public Observable<T> call(final Server server) {
                                    	// 重试次数计数
                                        context.incAttemptCount();
                                        loadBalancerContext.noteOpenConnection(stats);
                                        
                                        if (listenerInvoker != null) {
                                            try {
                                                listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                            } catch (AbortExecutionException e) {
                                                return Observable.error(e);
                                            }
                                        }
                                        
                                        final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                        // 
                                        return operation.call(server).doOnEach(new Observer<T>() {
                                            private T entity;
                                            @Override
                                            public void onCompleted() {
                                                recordStats(tracer, stats, entity, null);
                                                // TODO: What to do if onNext or onError are never called?
                                            }

                                            @Override
                                            public void onError(Throwable e) {
                                                recordStats(tracer, stats, null, e);
                                                logger.debug("Got error {} when executed on server {}", e, server);
                                                if (listenerInvoker != null) {
                                                    listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                                }
                                            }

                                            @Override
                                            public void onNext(T entity) {
                                                this.entity = entity;
                                                if (listenerInvoker != null) {
                                                    listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                                }
                                            }                            
                                            
                                            private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                                tracer.stop();
                                                loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                            }
                                        });
                                    }
                                });
                        // 针对同一实例的重试回调
                        if (maxRetrysSame > 0) 
                            o = o.retry(retryPolicy(maxRetrysSame, true));
                        return o;
                    }
                });
         // 重试下一个实例的回调
        if (maxRetrysNext > 0 && server == null) 
            o = o.retry(retryPolicy(maxRetrysNext, false));
        // 重试超过次数则终止调用并设置对应异常的回调
        return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
        	// 封装异常信息并返回
            @Override
            public Observable<T> call(Throwable e) {
                // 省略......
                return Observable.error(e);
            }
        });
    }

使用案例

以下为全局配置,也可以添加名称前缀,指定客户端。

# Ribbon 配置 
ribbon:
  MaxAutoRetries: 2
  MaxAutoRetriesNextServer: 3
  OkToRetryOnAllOperations: false

各参数说明如下:

  • MaxAutoRetries:单节点最大重试次数,达到最大值时,切换到下一个示例
  • MaxAutoRetriesNextServer:更换下一个重试节点的最大次数,可以设置为服务提供者副本数,也是就每个副本都查询一次。
  • OkToRetryOnAllOperations: 是否对所有请求进行重试,默认fasle,则只会对GET请求进行重试,建议配置为false,不然添加数据接口,会造成多条重复,也就是幂等性问题。

重试总次数计算公式:

MaxAutoRetries+MaxAutoRetries+1*MaxAutoRetriesNextServer

第一次请求时异常,重试2次,查询下一个节点,发送一次请求失败,进入重试,再重试2次失败,继续查询三次节点重试,所以以上配置会重试(2+3*3=11)次

总结:

显示全文