背景

使用OpenFeign时,通常会实现RequestInterceptor接口来自定义FeignConfigurationOpenFeign暴露了feign.RequestTemplate信息,给到我们在发送请求前自定义参数信息的扩展点。

在分布式系统中,通常会将本服务的信息(UserInfoRequestId)透传至下游服务,从而实现分布式链路追踪等功能,对于像用户信息等,在Web系统中通常使用 ThreadLocal 来存储信息,在自定义的FeignConfiguration中获取ThreadLocal再塞入到feign.RequestTemplate中,实现向下游服务的传递,示例:

public class FeignConfiguration implements RequestInterceptor {
    @Override
    public void apply(RequestTemplate template) {
        ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        String userId = SubjectContext.get().getUserId();
        if (null != attributes) {
            HttpServletRequest request = attributes.getRequest();
            template.header("token", request.getHeader("TOKEN"));
            template.header("userId", userId);
        }
    }
}

简单的Context示例:

public class SubjectContext {
    protected static ThreadLocal<UserInfo> subjectContext = new ThreadLocal();
    public static void remove() {
        subjectContext.remove();
    }
    public static void set(UserInfo uerInfo) {
        subjectContext.set(uerInfo);
    }
    public static UserInfo get() {
        return (UserInfo)subjectContext.get();
    }
}

出现错误

上述代码在常规情况下,是能够按照预期执行的。

但是最近项目引入了CircuitBreaker作为服务熔断的断路器之后,上述代码在执行到SubjectContext.get()时,会抛出空指针,拿不到用户信息。

通过分析CircuitBreaker的源码,最终定位到代码出现在Resilience4JCircuitBreaker内部,在Resilience4JCircuitBreaker中有一个public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback)方法,方法入参的toRun就是封装过的我们定义的Feign接口,其包装过程在FeignCircuitBreakerInvocationHandler#asSupplier代码中,如下:

private Supplier<Object> asSupplier(final Method method, final Object[] args) {
    final RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
    return () -> {
        try {
            RequestContextHolder.setRequestAttributes(requestAttributes);
            // 执行我们的真正方法
            return dispatch.get(method).invoke(args);
        }
        catch (RuntimeException throwable) {
            throw throwable;
        }
        catch (Throwable throwable) {
            throw new RuntimeException(throwable);
        }
    };
}

Spring Cloud CircuitBreaker Resilience4j 提供了两种实现:

  1. 使用 Semaphores 的 SemaphoreBulkhead。
  2. 一个 FixedThreadPoolBulkhead,它使用一个有界队列和一个固定的线程池。

默认情况下,Spring Cloud CircuitBreaker Resilience4j 使用 FixedThreadPoolBulkhead。要修改默认行为以使用 SemaphoreBulkhead,请将属性 spring.cloud.circuitbreaker.resilience4j.enableSemaphoreDefaultBulkhead 设为 true

正是由于上述原因,默认将我们的FeignConfiguration提交给了线程池,由于我们使用的是ThreadLocal导致线程本地变量没有向子线程传递,在执行FeignConfiguration时子线程无法拿到Context信息,最终导致程序的报错。

解决办法

通过分析源码我们发现,执行任务的线程池Resilience4JCircuitBreaker#executorService是由外部传递过来进行初始化的,调用方在Resilience4JCircuitBreakerFactory#create(java.lang.String, java.lang.String, java.util.concurrent.ExecutorService)

Resilience4JCircuitBreakerFactory中发现,是由本实例在create方法被调用时传入的本类的成员变量,即:

  • private ExecutorService executorService = Executors.newCachedThreadPool();
  • private ConcurrentHashMap<String, ExecutorService> executorServices = new ConcurrentHashMap<>();

而我们在没有定义自定义Feign Group时,默认使用的就是executorService,在本类中有一个Resilience4JCircuitBreakerFactory#configureExecutorService方法专门保留了外部传入自定义线程池的扩展,我们可以自己实现创建一个支持传递Context到子线程的线程池,即可将参数向下传递,比如像这样:

@Configurable
@AllArgsConstructor
public class CircuitBreakerConfiguration implements ApplicationRunner {

    private final Resilience4JCircuitBreakerFactory factory;
    
    @Override
    public void run(ApplicationArguments args) throws Exception {

        ContextThreadPoolExecutor contextThreadPoolExecutor = 
                new ContextThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1024));

        // **change ThreadPoolExecutor**
        factory.configureExecutorService(contextThreadPoolExecutor);
    }
    
    public static class ContextThreadPoolExecutor extends ThreadPoolExecutor {

        public ContextThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }

        public void execute(Runnable command) {
            super.execute(wrap(command));
        }

        private static Runnable wrap(Runnable runnable) {
            **SubjectContext context = SubjectContext.getContext();**
            return () -> {
                // 将参数向下传递
                **SubjectContext.setContext(context);**
                try {
                    runnable.run();
                } finally {
                    **SubjectContext.clear();**
                }
            };
        }
    }
}

后记

上述的方案只解决了没有自定义Group的情况,官方在自定义Group的情况下是没有保留扩展位的,所以给官方提了一个MR并且已成功合并到主分支,如下:
Customizable groupExecutorService #180