转载

Spring事件发布机制和源码

最近打算整理一些工作中用到的设计模式和应用到这些设计模式的例子。每篇文章都会以“一个例子,一个模式,一点原理"的方式展开。将会陈述用到这个模式的场景,这个模式本身的介绍,模式背后的源码。

1 : 一个例子

业务场景是用户报名一个活动。活动报名成功以后,完成3件事。

  • 用户与活动关系入库
  • 用户如果是受邀请来报名的,需要告知邀请人邀请成功,邀请关系入库。
  • 用户收到报名成功的消息推送

考虑到业务可以拆分成相对独立的3个部分,方便以后拓展。这里选择了spring自带的事件发布机制做业务解耦。

2: 流程执行

看下具体的流程是如何执行的。

  1. 引入事件发布器
@Slf4j
@Service
@AllArgsConstructor
public class ActivityService extends BaseService {
    private final ApplicationEventPublisher publisher; }
复制代码
  1. 发布3种业务对应的事件
// 邀请事件
log.info("邀请事件,活动id:{},邀请人用户id:{},接受人用户id:{},已经邀请人数:{},还需要邀请人数:{}", activity.getId(), inviteUserId, userId, invitedCount, inviteToCount);
               publisher.publishEvent(ActivityWrapper.ActivityInviteEventForSend());
               
// 消息事件        
publisher.publishEvent(ActivityWrapper.ActivityNoticeEventForSend();
           
// 报名事件           publisher.publishEvent(ActivityWrapper.ActivityEnrollEventForSend());

复制代码
  1. 事件对象封装 以消息事件举例
/**
     * 封装报名事件
     *
     * @param activityEnrollDTO dto
     * @param userId            用户id
     * @return event
     */
public static ActivityEnrollEvent ActivityEnrollEventForSend(ActivityEnrollDTO activityEnrollDTO) {
        ActivityEnrollEvent activityEnrollEvent = CloneUtils.clone(activityEnrollDTO, ActivityEnrollEvent.class);
        return activityEnrollEvent
                .setUserId(userId)
                .setPicbookCollectionId(picbookCollectionId)
                .setPid(pid)
                .setAppId(appId)
                .setInviteUserId(inviteUserId);
    }
复制代码
  1. 构造事件对应的消息监听器并处理业务逻辑。这里以消息事件为例子。消息监听器在接收到事件以后,封装事件发往mq。
@Slf4j
@Service
@AllArgsConstructor
public class ActivityNoticeListener {

    private final IRocketMqService rocketMqService;

    @Async
    @EventListener
    public void activityNoticeHandler(ActivityNoticeEvent activityNoticeEvent) {

        List<ActivityNoticeEvent.ActivityNotice> activityNoticeList = activityNoticeEvent.getActivityNoticeList();
        if (CollectionUtil.isNotEmpty(activityNoticeList)) {

            // 1 将消息发往MQ
            for (ActivityNoticeEvent.ActivityNotice activityNotice : activityNoticeList) {

                log.info("消息事件,活动id:{},目标用户:{},消息类型:{},", activityNotice.getId(),
                        activityNotice.getOpenId()
                        , activityNotice.getMsgType());


                // 2.1 获取消息类型
                ActivityMsgTemplateTypeEnum typeEnum = ActivityMsgTemplateTypeEnum.get(activityNotice.getMsgType());


                // 2.2 投送消息体
                String messageRequest = JSON.toJSONString(activityNotice);
                sendRocketMqMsg(messageRequest, typeEnum);
            }

        }
    }
复制代码

3: 效果复盘

业务模型大概是在4月份完成,历经了很多次迭代。很明显的好处是:

  • 迭代中多次调整了消息业务代码。包括发什么样的消息,携带什么样的参数,什么时间发。在这些迭代中只需要修改wrapper中封装的事件对象,就可以控制上层的消息对象。即便是后来大改一次,修了消息的触发逻辑,由于过程的清晰改起来也很省力。

  • 定位bug非常方便。

4: 实现一个简单的事件机制

spring的事件机制实现起来非常简单。无非就是3步,搞一个事件发布器【spring自带】,定义一个事件,注册一个监听器【spring4.2以后只需要注解】。工作起来就是,事件发布器发布事件,监听器接受事件,结束。

  1. 定义事件,需要继承ApplicationEvent,需要传入事件源
@Data
@Accessors(chain = true)
public class ActivityEnrollEvent extends AbstractEvent {

public ActivityEnrollEvent(Object source) {
		super(source);
		this.timestamp = System.currentTimeMillis();
	}
}
复制代码
  1. 定义监听器,添加注解 @EventListener
@Component
public class DemoListener {

    @EventListener
    public void doSomething(Event event){
    }
}
复制代码
  1. 发布事件
private final ApplicationEventPublisher publisher;
    publisher.publishEvent(Event event)
    
复制代码

Demo链接 zhuanlan.zhihu.com/p/85067174 【搬运 如有侵权立即删除】

5 源码

spring事件是非常重要的一个机制,在spring application的生命周期中有很重要的地位。 下面介绍源码

  1. 事件发布器初始化 spring事件是由事件发布器ApplicationEventMulticaster发布的。事件发布器在spring application context启动时初始化。就是非常熟悉的refresh()中可以找到initApplicationEventMulticaster()。
try {
				// Allows post-processing of the bean factory in context subclasses.
				postProcessBeanFactory(beanFactory);

				// Invoke factory processors registered as beans in the context.
				invokeBeanFactoryPostProcessors(beanFactory);

				// Register bean processors that intercept bean creation.
				registerBeanPostProcessors(beanFactory);

				// Initialize message source for this context.
				initMessageSource();

				// Initialize event multicaster for this context.
				initApplicationEventMulticaster();

				// Initialize other special beans in specific context subclasses.
				onRefresh();

				// Check for listener beans and register them.
				registerListeners();

				// Instantiate all remaining (non-lazy-init) singletons.
				finishBeanFactoryInitialization(beanFactory);

				// Last step: publish corresponding event.
				finishRefresh();
			}
           
复制代码

这个方法点进去很简单。如果自定义了applicationEventMulticaster就用自定义的,如果没有自定义就用spring默认的。容器是通过检查核心容器beanFactory来检查是不是有自定义事件发布器的注入的。

/**
	 * Initialize the ApplicationEventMulticaster.
	 * Uses SimpleApplicationEventMulticaster if none defined in the context.
	 * @see org.springframework.context.event.SimpleApplicationEventMulticaster
	 */
	protected void initApplicationEventMulticaster() {
		ConfigurableListableBeanFactory beanFactory = getBeanFactory();
		if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
			this.applicationEventMulticaster =
					beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
			if (logger.isTraceEnabled()) {
				logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
			}
		}
		else {
			this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
			beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
			if (logger.isTraceEnabled()) {
				logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
						"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
			}
		}
	}
复制代码
  1. 注册监听器Bean 在上一步的refresh方法中可以找到registerListeners()。
// Check for listener beans and register them.
registerListeners();

复制代码

点开以后这里的源码很有意思。首先是注入了静态的监听器。在注册我们写的实现了ApplicationListener接口的自定义监听器时候,这里只向事件监听器multiCaster传递了beanName,并不是bean的实例本身。这里的注释是 【Do not initialize FactoryBeans here: We need to leave all regular beans uninitialized to let post-processors apply to them!】。意思是等待bean post-processors初始化监听器bean。bean post-processors通常是在容器级别对bean生命周期进行增强。在这里,翻阅了一些网上的资料,个人觉得作用是识别出实现了applicationEventLister接口的监听器bean并注入到容器中。

protected void registerListeners() {
		// Register statically specified listeners first.
		for (ApplicationListener<?> listener : getApplicationListeners()) {
			getApplicationEventMulticaster().addApplicationListener(listener);
		}

		// Do not initialize FactoryBeans here: We need to leave all regular beans
		// uninitialized to let post-processors apply to them!
		String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
		for (String listenerBeanName : listenerBeanNames) {
			getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
		}

		// Publish early application events now that we finally have a multicaster...
		Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
		this.earlyApplicationEvents = null;
		if (earlyEventsToProcess != null) {
			for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
				getApplicationEventMulticaster().multicastEvent(earlyEvent);
			}
		}
	}

复制代码

具体的listener实例加载确实是由一个实现了BeanPostProcessor接口的类实现。这个类的名字是ApplicationListenerDetector.这个类的注解要印证了之前的猜测。看注释。detect beans which implement the ApplicationListener.

{@code BeanPostProcessor} that detects beans which implement the {@code ApplicationListener}
 * interface. This catches beans that can't reliably be detected by {@code getBeanNamesForType}
 * and related operations which only work against top-level beans.

class ApplicationListenerDetector implements DestructionAwareBeanPostProcessor, MergedBeanDefinitionPostProcessor { }

复制代码

代码也很明确。if(instanceof ApplicationListener) --> context.addApplicationListener

@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) {
		if (bean instanceof ApplicationListener) {
			// potentially not detected as a listener by getBeanNamesForType retrieval
			Boolean flag = this.singletonNames.get(beanName);
			if (Boolean.TRUE.equals(flag)) {
				// singleton bean (top-level or inner): register on the fly
				this.applicationContext.addApplicationListener((ApplicationListener<?>) bean);
			}
			else if (Boolean.FALSE.equals(flag)) {
				if (logger.isWarnEnabled() && !this.applicationContext.containsBean(beanName)) {
					// inner bean with other scope - can't reliably process events
					logger.warn("Inner bean '" + beanName + "' implements ApplicationListener interface " +
							"but is not reachable for event multicasting by its containing ApplicationContext " +
							"because it does not have singleton scope. Only top-level listener beans are allowed " +
							"to be of non-singleton scope.");
				}
				this.singletonNames.remove(beanName);
			}
		}
		return bean;
	}
复制代码
  1. 发布事件 整个事件的发布流程都在ApplicationEventMultiCaster中完成。 首先定义了一个缓存map.key由事件类型type和源类型sourceType定义。listenerRetriever维护了一个监听器列表。
final Map<ListenerCacheKey, ListenerRetriever> retrieverCache = new ConcurrentHashMap<>(64);

//key
public ListenerCacheKey(ResolvableType eventType, @Nullable Class<?> sourceType) {
			Assert.notNull(eventType, "Event type must not be null");
			this.eventType = eventType;
			this.sourceType = sourceType;
		}
//value
private class ListenerRetriever {

		public final Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>();
        }
复制代码

然后当当所发布的事件类型和事件源类型与Map中的key匹配时, 将直接返回value中的监听器列表作为匹配结果,通常这发生在事件不是第一次发布时,能避免遍历所有监听器并进行过滤,如果事件时第一次发布,则会遍历所有的事件监听器,并根据事件类型和事件源类型进行匹配。

/**
	 * Actually retrieve the application listeners for the given event and source type.
	 * @param eventType the event type
	 * @param sourceType the event source type
	 * @param retriever the ListenerRetriever, if supposed to populate one (for caching purposes)
	 * @return the pre-filtered list of application listeners for the given event and source type
	 */
	private Collection<ApplicationListener<?>> retrieveApplicationListeners(
			ResolvableType eventType, Class<?> sourceType, ListenerRetriever retriever) {

		//这是存放匹配的监听器的列表
		LinkedList<ApplicationListener<?>> allListeners = new LinkedList<ApplicationListener<?>>();
		Set<ApplicationListener<?>> listeners;
		Set<String> listenerBeans;
		synchronized (this.retrievalMutex) {
			listeners = new LinkedHashSet<ApplicationListener<?>>(this.defaultRetriever.applicationListeners);
			listenerBeans = new LinkedHashSet<String>(this.defaultRetriever.applicationListenerBeans);
		}
		//遍历所有的监听器
		for (ApplicationListener<?> listener : listeners) {
			//判断该事件监听器是否匹配
			if (supportsEvent(listener, eventType, sourceType)) {
				if (retriever != null) {
					retriever.applicationListeners.add(listener);
				}
				//将匹配的监听器加入列表
				allListeners.add(listener);
			}
		}

		//对匹配的监听器列表进行排序
		AnnotationAwareOrderComparator.sort(allListeners);
		return allListeners;
	}
复制代码

最后调用匹配到的listener执行逻辑

@Override
	public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
		//获取事件类型
		ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
		//遍历所有和事件匹配的事件监听器
		for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
			//获取事件发布器内的任务执行器,默认该方法返回null
			Executor executor = getTaskExecutor();
			if (executor != null) {
                                //异步回调监听方法
				executor.execute(new Runnable() {
					@Override
					public void run() {
						invokeListener(listener, event);
					}
				});
			}
			else {
				//同步回调监听方法
				invokeListener(listener, event);
			}
		}
	}
复制代码

6 最后

夹带一些个人感受。最近几个月连续从0到1,也自己出去做一个项目。 深切感受到要做好业务,技术不可少,但是也需要很多软能力辅助。真的挺难的。 努力了不一定成功,不努力一定失败。 如履薄冰。

原文  https://juejin.im/post/5f16e8b16fb9a07e9a07c0db
正文到此结束
Loading...