观察者模式

观察者模式是一种对象行为型模式。它表示的是一种对象与对象之间具有依赖关系,当一个对象发生改变的时候,这个对象所依赖的对象也会做出反应。Spring 事件驱动模型就是观察者模式很经典的一个应用。Spring 事件驱动模型非常有用,在很多场景都可以解耦我们的代码。比如我们每次添加商品的时候都需要重新更新商品索引,这个时候就可以利用观察者模式来解决这个问题。
image.png

自定义观察者模式

观察者模式请参考这篇文章 设计模式-观察者

Spring 事件驱动模型中的三种角色

事件角色

ApplicationEvent (org.springframework.context包下)充当事件的角色,这是一个抽象类,它继承了java.util.EventObject并实现了 java.io.Serializable接口。

EventObject并不提供默认构造器,它需要外部传递一个名为source的构造器参数用于记录并跟踪事件的来源,如Spring事件ContextRefreshedEvent,其事件源为当前ApplicationContext。

Spring 中默认存在以下事件,他们都是对ApplicationContextEvent的实现(继承自ApplicationContextEvent):

ContextStartedEvent:ApplicationContext 启动后触发的事件;
ContextStoppedEvent:ApplicationContext 停止后触发的事件;
ContextRefreshedEvent:ApplicationContext 初始化或刷新完成后触发的事件;
ContextClosedEvent:ApplicationContext 关闭后触发的事件。
image.png

事件监听者角色

ApplicationListener充当了事件监听者角色,它是一个接口,里面只定义了一个 onApplicationEvent()方法来处理ApplicationEvent。ApplicationListener接口类源码如下,可以看出接口定义看出接口中的事件只要实现了 ApplicationEvent就可以了。所以,在 Spring中我们只要实现 ApplicationListener 接口的 onApplicationEvent() 方法即可完成监听事件。

package org.springframework.context;
import java.util.EventListener;

@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
    void onApplicationEvent(E var1);
}

事件发布者角色

ApplicationEventPublisher充当了事件的发布者,它也是一个接口。

@FunctionalInterface
public interface ApplicationEventPublisher {
    default void publishEvent(ApplicationEvent event) {
        this.publishEvent((Object)event);
    }

    void publishEvent(Object var1);
}

ApplicationEventPublisher接口的publishEvent()这个方法在AbstractApplicationContext类中被实现,阅读这个方法的实现,你会发现实际上事件真正是通过ApplicationEventMulticaster来广播出去的。

Spring 的事件流程总结

  1. 定义一个事件: 实现一个继承自ApplicationEvent,并且写相应的构造函数;
  2. 定义一个事件监听者:实现ApplicationListener接口,重写onApplicationEvent()方法;
  3. 使用事件发布者发布消息: 可以通过ApplicationEventPublisherpublishEvent()方法发布消息。

例子:

// 定义一个事件,继承自ApplicationEvent并且写相应的构造函数
public class DemoEvent extends ApplicationEvent{
    private static final long serialVersionUID = 1L;

    private String message;

    public DemoEvent(Object source,String message){
        super(source);
        this.message = message;
    }

    public String getMessage() {
         return message;
          }

    
// 定义一个事件监听者,实现ApplicationListener接口,重写 onApplicationEvent() 方法;
@Component
public class DemoListener implements ApplicationListener<DemoEvent>{

    //使用onApplicationEvent接收消息
    @Override
    public void onApplicationEvent(DemoEvent event) {
        String msg = event.getMessage();
        System.out.println("接收到的信息是:"+msg);
    }

}
// 发布事件,可以通过ApplicationEventPublisher  的 publishEvent() 方法发布消息。
@Component
public class DemoPublisher {

    @Autowired
    ApplicationContext applicationContext;

    public void publish(String message){
        //发布事件
        applicationContext.publishEvent(new DemoEvent(this, message));
    }
}

如果你想使用顺序性的listener,那么只需要使用 @Order注解就可以了。

注解驱动 @EventListener

事件监听不在用ApplicationListener接口,而是基于注解@EventListener驱动

@Component
public class OrderCreateEventListenerAnnotation {
    
    @EventListener
    public void orderCreateEvent(OrderCreateEvent event) {
        System.out.println("订单创建事件,@EventListener 注解驱动实现");
    }
}

如果在方法内没有使用事件对象,方法上也可以去掉它,但是要指定这个方法监听的是哪个事件类:

@Component
public class OrderCreateEventListenerAnnotation {
    @EventListener(OrderCreateEvent.class)
    public void orderCreateEvent() {
        System.out.println("订单创建事件,@EventListener 注解驱动实现");
    }
}

条件事件

在一些时候可能只要满足某些条件才进行对事件监听,这时就可以用@EventListener#condition属性来指定条件,条件是一个SpEL表达式,关于SpEL请参考baeldung SpEL 或官网SpEL

@EventListener(condition = "#event.order.orderStatus eq '待付款'")
public void orderCreateEventCondition(OrderCreateEvent event) {
    System.out.println("订单创建事件,@EventListener 注解驱动实现");
}

全局异步事件的支持

Spring 事件机制默认是同步阻塞的,如果 ApplicationEventPublisher 发布事件之后他会一直阻塞等待listener 响应,多个 listener 的情况下前面的没有执行完后面的一直被阻塞。

覆盖默认ApplicationEventMulticaster,通过源码就可以知道它的原理。
AbstractApplicationContext#initApplicationEventMulticaster初始化方法。

/**
 * Initialize the ApplicationEventMulticaster.
 * Uses SimpleApplicationEventMulticaster if none defined in the context.
 * @see org.springframework.context.event.SimpleApplicationEventMulticaster
 */
protected void initApplicationEventMulticaster() {
   ConfigurableListableBeanFactory beanFactory = getBeanFactory();
    //IOC容器中存在
    //beanName=APPLICATION_EVENT_MULTICASTER_BEAN_NAME("applicationEventMulticaster")
    //使用容器中的对象
   if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
      this.applicationEventMulticaster =
            beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
      if (logger.isTraceEnabled()) {
         logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
      }
   }
   else {
       //否则使用 SimpleApplicationEventMulticaster
      this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
      beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
      if (logger.isTraceEnabled()) {
         logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
               "[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
      }
   }
}

通过上面一段源码,我们就可以自定义实现ApplicationEventMulticaster接口,只需要向IOC容器中注册一个beanName="applicationEventMulticaster"实现ApplicationEventMulticaster接口的对象就可以覆盖默认事件发布规则了。

@Configuration
public class AsynchronousSpringEventsConfig {
    @Bean(name = "applicationEventMulticaster")
    public ApplicationEventMulticaster simpleApplicationEventMulticaster() {
        SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
        //设置任务执行器
        eventMulticaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return eventMulticaster;
    }
}

上面的代码就实现了异步发布事件,这个是全局定义所有Spring 事件都会变成异步。

单个异步事件支持@Async

结合@Async注解实现异步事件,使用很简单,只需要在相关方法或类上添加@Async注解,使用在方法上只针对这一个方法异步调用,使用在类上所有的方法都是异步调用;必须使用注解@EnableAsync开启异步功能。线程池支持配置模式,如果你不想使用默认的线程池配置,可以手动指定。具体可以参考这篇文章Async 注解

开启异步:

@EnableAsync
@SpringBootApplication
public class SpringEventExampleApplication {

   public static void main(String[] args) {
      SpringApplication.run(SpringEventExampleApplication.class, args);
   }
}

异步监听事件

@Async
@EventListener
public void orderCreateEventAsync(OrderCreateEvent event) {
    System.out.println("订单创建事件,@EventListener 注解驱动实现");
}

泛型支持

也可以自定义泛型类实现事件调度。让我们创建一个泛型事件类,没有继承任何父类或接口

//泛型事件类
public class GenericEvent<T> {
    private final T data;
    public GenericEvent(T data) {
        this.data = data;
    }
    public T getData() {
        return data;
    }
}

//泛型事件用注解驱动监听 @EventListener
@EventListener
public void orderListener(GenericEvent<Order> event) {
    System.out.println("通用泛型事件监听,订单");
}

事件对象可以是任意对象,也可以不是泛型类型,比如我们直接发布一个Order监听一个Order

//发布
applicationEventPublisher.publishEvent(order);

//监听
@EventListener
public void order(Order order) {
    System.out.println("监听一个订单");
}

事物事件@TransactionalEventListener

从Spring4.2开始提供了一个新的事件监听注解@TransactionalEventListener,它是@EventListener的扩展,允许将事件的监听绑定到事物的一个阶段。有以下四个阶段:

  • AFTER_COMMIT(默认值)事务成功提交时触发事件
  • AFTER_ROLLBACK - 事务回滚
  • AFTER_COMPLETION - 事务已完成(AFTER_COMMIT 或 AFTER_ROLLBACK)
  • BEFORE_COMMIT 在事务提交之前触发事件
@TransactionalEventListener(phase = BEFORE_COMMIT)
public void txEvent(Order order) {
    System.out.println("事物监听");
}

仅当存在事件生成器正在运行且即将提交的事务时,才会调用此侦听器

如果标记的方法没有事务,则不会发送事件。我们可以通过将fallbackExecution 属性设置为true来实现事件发送,即@TransactionalEventListener(fallbackExecution = true)

    /**
     * Whether the event should be processed if no transaction is running.
     */
    boolean fallbackExecution() default false;

这个注解的功能和使用TransactionSynchronizationManager可以实现一样的效果,具体请参看此类的方法。

新事件继续传播

当我们监听一个事件处理完成时,还需要发布另一个事件,一般我们想到的是调用ApplicationEventPublisher#publishEvent发布事件方法,但Spring提供了另一种更加灵活的新的事件继续传播机制,监听方法返回一个事件,也就是方法的返回值就是一个事件对象。

//监听方法方法返回一个新的事件
@EventListener
public OrderCreateEvent orderReturnEvent(Order order) {
    System.out.println("监听一个订单,返回一个新的事件 OrderCreateEvent");
    return new OrderCreateEvent(this,order);
}

//监听方法方法返回多个事件-集合
@EventListener
public Collection<OrderCreateEvent> orderReturnListEvent(Order order) {
	System.out.println("监听一个订单,返回集合的事件 OrderCreateEvent");
    return Collections.singleton(new OrderCreateEvent(this, order));
}

//监听方法方法返回多个事件-数组
@EventListener
public Object[] orderReturnArrayEvent(Order order) {
    System.out.println("监听一个订单,返回数组的事件 OrderCreateEvent");
    return new OrderCreateEvent[]{new OrderCreateEvent(this, order), new OrderCreateEvent(this, order)};
}

注意返回集合事件时,集合内事件类型可以不同

事件机制源码分析

在Spring中提供了Event的基类:ApplicationEvent,如果事件要想被Spring监听那么就必须继承该类,同样该类也继承了Java中的事件基类:EventObject

有了事件源,我们要定义事件监听者用于处理事件,所有的事件监听者都要继承 org.springframework.context.ApplicationListener接口。

/**
 * Interface to be implemented by application event listeners.
 * Based on the standard {@code java.util.EventListener} interface
 * for the Observer design pattern.
 *
 * <p>As of Spring 3.0, an ApplicationListener can generically declare the event type
 * that it is interested in. When registered with a Spring ApplicationContext, events
 * will be filtered accordingly, with the listener getting invoked for matching event
 * objects only.
 *
 * @author Rod Johnson
 * @author Juergen Hoeller
 * @param <E> the specific ApplicationEvent subclass to listen to
 * @see org.springframework.context.event.ApplicationEventMulticaster
 */
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {

}

ApplicationListener提供了 一个基于ApplicationEvent的泛型,所以你指定了某个类的监听者只会处理该类型的event。

Spring基于ApplicationEventPublisher来发布事件,那么监听器是如何获取到事件呢?

注意到 ApplicationListener 上面的注释写到:@param <E> the specific ApplicationEvent subclass to listen to ApplicationEventMulticaster,从名称上看这个类的作用应该是用于事件广播。

ApplicationEventMulticaster是一个接口,提供了如下方法:

  • addApplicationListener(ApplicationListener<?> listener):新增一个listener;
  • addApplicationListenerBean(String listenerBeanName):新增一个listener,参数为bean name;
  • removeApplicationListener(ApplicationListener<?> listener):删除listener;
  • removeApplicationListenerBean(String listenerBeanName):根据bean name 删除listener;
  • multicastEvent(ApplicationEvent event):广播事件;
  • multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType):广播事件,指定事件的source类型。

从接口的方法看,该类的作用就是添加监听器然后对所有监听器或者指定监听器发送事件进行处理。

ApplicationEventMulticaster有两个实现类:

  • SimpleApplicationEventMulticaster
  • AbstractApplicationEventMulticaster

因为AbstractApplicationEventMulticaster是一个抽象类,并且 SimpleApplicationEventMulticaster也继承了SimpleApplicationEventMulticaster,所以我们直接看SimpleApplicationEventMulticaster

public abstract class AbstractApplicationEventMulticaster
        implements ApplicationEventMulticaster, BeanClassLoaderAware, BeanFactoryAware {

    private final DefaultListenerRetriever defaultRetriever = new DefaultListenerRetriever();

    final Map<ListenerCacheKey, CachedListenerRetriever> retrieverCache = new ConcurrentHashMap<>(64);

    @Override
    public void addApplicationListener(ApplicationListener<?> listener) {
        synchronized (this.defaultRetriever) {
            // Explicitly remove target for a proxy, if registered already,
            // in order to avoid double invocations of the same listener.
            Object singletonTarget = AopProxyUtils.getSingletonTarget(listener);
            if (singletonTarget instanceof ApplicationListener) {
                this.defaultRetriever.applicationListeners.remove(singletonTarget);
            }
            this.defaultRetriever.applicationListeners.add(listener);
            this.retrieverCache.clear();
        }
    }
}

addApplicationListener 方法用于新增监听器,新增的逻辑主要在这一句:

defaultRetriever.applicationListeners.add(listener);

继续看DefaultListenerRetriever的实现:

private class DefaultListenerRetriever {
    public final Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>();
    public final Set<String> applicationListenerBeans = new LinkedHashSet<>();

    public Collection<ApplicationListener<?>> getApplicationListeners() {
        List<ApplicationListener<?>> allListeners = new ArrayList<>(
                this.applicationListeners.size() + this.applicationListenerBeans.size());
        allListeners.addAll(this.applicationListeners);
        if (!this.applicationListenerBeans.isEmpty()) {
            BeanFactory beanFactory = getBeanFactory();
            for (String listenerBeanName : this.applicationListenerBeans) {
                try {
                    ApplicationListener<?> listener =
                            beanFactory.getBean(listenerBeanName, ApplicationListener.class);
                    if (!allListeners.contains(listener)) {
                        allListeners.add(listener);
                    }
                } catch (NoSuchBeanDefinitionException ex) {
                    // Singleton listener instance (without backing bean definition) disappeared -
                    // probably in the middle of the destruction phase
                }
            }
        }
        AnnotationAwareOrderComparator.sort(allListeners);
        return allListeners;
    }
}

最终还是 持有了一个applicationListeners的集合,跟发布订阅设计模式一样。

剩下的逻辑就好去解释,顺着咱们前面讲过的发布订阅模式的使用套路撸下去就行,事件广播的方法multicastEvent不外乎就是遍历所有的监听器进行匹配。

参考

JavaGuide
Spring 事件使用-观察者、发布/订阅模式
Spring 中的事件机制