加载中......
输入验证码,即可复制
微信扫码下载好向圈APP, 登陆后即可进入消息页面查看验证码
只需要3秒时间
目录


  • Spring的Async注解线程池扩展方案

    • 目录
    • 1. 扩展目的
    • 2. 扩展实现

      • 2.1 扩展Async注解的执行拦截器AnnotationAsyncExecutionInterceptor
      • 2.2 扩展Async注解的Spring代理顾问AsyncAnnotationAdvisor
      • 2.3 扩展Async注解的 Spring Bean 后置处理器AsyncAnnotationBeanPostProcessor
      • 2.4 扩展代理异步配置类ProxyAsyncConfiguration
      • 2.5 扩展异步代理配置选择器AsyncConfigurationSelector
      • 2.6 扩展异步启动注解@EnableAsync

    • 3. 额外扩展:给@Async注解代理指定线程池

1. 扩展目的


  • 异步调用,改用Spring提供的@Aysnc注解实现,代替手写线程池执行。
  • 在实际场景中,可能会遇到需要将主线程的一些个性化参数、变量、数据传递到子线程中使用的需求。
  • InheritableThreadLocal可以解决子线程继承父线程值的需求,但是它存在一些问题。

    • SessionUser.SESSION_USER是中台提供,无法修改。
    • InheritableThreadLocal在线程池机制应用中并不友好,不及时在子线程中清除的话,会造成线程安全问题。

实现思路有两种:

  • 针对ThreadLocal进行扩展,并说服中台统一改用扩展后的ThreadLocal
  • 针对@EnableAsync@Async注解进行扩展,将手动copy的代码写入到Spring代理类中。
第一种要跟中台打交道,就很烦,能够天平自己独立解决,就自己解决。第二种会是一个不错的选择,扩展实现也并不困难。
2. 扩展实现

2.1 扩展Async注解的执行拦截器AnnotationAsyncExecutionInterceptor

类全名:org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor
从调试记录可以分析得出AnnotationAsyncExecutionInterceptor#invoke方法,正是创建异步任务并且执行异步任务的核心代码所在,我们要做的就是重写这个方法,将父线程的运行参数手动copy到子线程任务体中。

Java教程之Spring的Async注解线程池扩展方案-1.jpg

2.2 扩展Async注解的Spring代理顾问AsyncAnnotationAdvisor

我们依靠追踪AnnotationAsyncExecutionInterceptor的构造方法调用,定位到了它。
全类名:org.springframework.scheduling.annotation.AsyncAnnotationAdvisor
补充说明:代理顾问(Advisor)、建议(Advice)以及Spring代理实现原理
Spring @EnableAsync默认的代理模式是 JDK 代理,代理机制如下:
Spring 一个 Bean 会在 BeanPostProcessor#postProcessAfterInitialization()这个生命周期环节,遍历所有的BeanPostProcessor实例,判断Bean是否符合代理条件,如果符合代理条件,就给 Bean 代理对象中追加建议(Advice)对象,这样就完成了代理。
而建议(Advice)对象是由顾问(Advisor)对象创建和提供。
上一小节提到的异步执行拦截器AnnotationAsyncExecutionInterceptor就是实现了Advice接口的类。
@Async注解的代理过程中,异步执行拦截器AnnotationAsyncExecutionInterceptor就是通过AsyncAnnotationAdvisor#buildAdvice方法创建的。
所以,当我们想要将扩展的新的异步执行拦截器LibraAnnotationAsyncExecutionInterceptor用起来,则需要相应的,还要把AsyncAnnotationAdvisor#buildAdvice方法重写。
2.3 扩展Async注解的 Spring Bean 后置处理器AsyncAnnotationBeanPostProcessor

我们依靠追踪AsyncAnnotationAdvisor的构造方法调用,定位到了它。
类全名:org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor
这个没什么好说的,Spring Bean 的生命周期其中一环。是 Spring Bean 实现代理的起点。
开发人员可以自定义一个BeanPostProcessor类,把它注册到 Bean 容器中,它就会自动生效,并将后续的每一个 Bean 实例进行条件判断以及进行代理。
我们要重写的方法是:AsyncAnnotationBeanPostProcessor#setBeanFactory。这个方法构造了异步代理顾问AsyncAnnotationAdvisor对象。
2.4 扩展代理异步配置类ProxyAsyncConfiguration

AsyncAnnotationBeanPostProcessor不是一般的 Spring Bean。它有几个限制,导致它不能直接通过@Component或者@Configuration来创建实例。

  • AsyncAnnotationBeanPostProcessor仅仅是实现了基于 JDK 代理,如果开发决定另外一种(基于ASPECTJ编织),那么它就应该受到某种条件判断来进行 Bean 实例化。
  • AsyncAnnotationBeanPostProcessor还需要配置指定的线程池、排序等等属性,所以无法直接使用@Component注解注册为 Bean。
我们阅读一下@EnableAsync注解源码:
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(AsyncConfigurationSelector.class)public @interface EnableAsync {    Class<? extends Annotation> annotation() default Annotation.class;    boolean proxyTargetClass() default false;    AdviceMode mode() default AdviceMode.PROXY;    int order() default Ordered.LOWEST_PRECEDENCE;}进一步阅读AsyncConfigurationSelector的源码:
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {    private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =            "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";    /**   * 分别为EnableAsync.mode()的PROXY和ASPECTJ值返回{@link ProxyAsyncConfiguration}或{@code AspectJAsyncConfiguration} 。     */    @Override    @Nullable    public String[] selectImports(AdviceMode adviceMode) {        switch (adviceMode) {            case PROXY:                return new String[] {ProxyAsyncConfiguration.class.getName()};            case ASPECTJ:                return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};            default:                return null;        }    }}谜底揭晓,ProxyAsyncConfiguration原来是在这里开始注册到 Spring 容器中的。
Spring Boot 启动后,会根据@EnableAsync注解的mode()方法的具体值,来决定整个Spring的 Bean 代理机制。
既然 Spring 代理机制只会有一种,所以,也就只会在两种机制的配置类中选择其中一个来进行实例化。
而默认EnableAsync$mode()默认值是AdviceMode.PROXY,所以默认采用 JDK 代理机制。
2.5 扩展异步代理配置选择器AsyncConfigurationSelector

类全名:org.springframework.scheduling.annotation.AsyncConfigurationSelector
2.6 扩展异步启动注解@EnableAsync

类全名:org.springframework.scheduling.annotation.EnableAsync
3. 额外扩展:给@Async注解代理指定线程池

@Async会自动根据类型TaskExecutor.class从 Spring Bean 容器中找一个已经实例化的异步任务执行器(线程池)。如果找不到,则另寻他路,尝试从 Spring Bean 容器中查找名称为taskExecutorExecutor.class实例。最后都还是未找到呢,就默认自动new一个SimpleAsyncTaskExecutor来用。
补充说明:TaskExecutor.class是Spring定义的,而Executor.classJDK定义的。
场景:其他小伙伴、或者旧代码已经实现过了一个线程池,但是这个线程池,是个Executor.class类型,且 Bean 实例名称不是taskExecutor(假设是libraThreadPool),正常情况下@Async根本无法找到它。
需求:通过配置,将@Async的默认线程池,指定为名为libraThreadPoolExecutor.class类型线程池。
我们只需要注册一个实现AsyncConfigurer接口的配置类
org.springframework.scheduling.annotation.AbstractAsyncConfiguration#setConfigurers
  /**     * Collect any {@link AsyncConfigurer} beans through autowiring.     */    @Autowired(required = false)    void setConfigurers(Collection<AsyncConfigurer> configurers) {        if (CollectionUtils.isEmpty(configurers)) {            return;        }        if (configurers.size() > 1) {            throw new IllegalStateException("Only one AsyncConfigurer may exist");        }        AsyncConfigurer configurer = configurers.iterator().next();        this.executor = configurer::getAsyncExecutor;        this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;    }
广告圈
9758 查看 1 0 反对

说说我的看法高级模式

您需要登录后才可以回帖 登录|立即注册

  • konyan150

    2022-9-2 07:39:34 使用道具

    来自: 北京来自: 北京来自: 北京来自: 北京
    转发了

相关阅读