yeskery

Spring使用TransactionalEventListener解决事务未提交读取不到数据问题

背景

在项目中,往往需要执行数据库操作后,发送消息或事件来异步调用其他组件执行相应的操作,例如:
用户注册后发送激活码;
配置修改后发送更新事件等。
但是,数据库的操作如果还未完成,此时异步调用的方法查询数据库发现没有数据,这就会出现问题。伪代码如下:

  1. void saveUser(User u) {
  2. //保存用户信息
  3. userDao.save(u);
  4. //触发保存用户事件
  5. applicationContext.publishEvent(new SaveUserEvent(u.getId()));
  6. }
  7. @EventListener
  8. void onSaveUserEvent(SaveUserEvent event) {
  9. //获取事件中的信息(用户id)
  10. Integer id = event.getEventData();
  11. //查询数据库,获取用户(此时如果用户还未插入数据库,则返回空)
  12. User u = userDao.getUserById(id);
  13. //这里可能报空指针异常!
  14. String phone = u.getPhoneNumber();
  15. MessageUtils.sendMessage(phone);
  16. }

解决问题方案

解决思路是在事务提交后再做其他的处理(如异步发消息处理等),这里还是从Spring执行事务的过程中入手,Spring事务的处理过程不再分析,这里直接看Spring事务增强器TransactionInterceptor的核心处理流程,源码如下:

  1. protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
  2. // 获取事务属性
  3. final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
  4. //加载配置中配置的TransactionManager
  5. final PlatformTransactionManager tm = determineTransactionManager(txAttr);
  6. final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
  7. // 声明式事务的处理
  8. if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
  9. TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
  10. Object retVal = null;
  11. //......
  12. retVal = invocation.proceedWithInvocation();
  13. //......
  14. commitTransactionAfterReturning(txInfo);
  15. return retVal;
  16. } else {
  17. // 编程式事务的处理......
  18. }
  19. //......
  20. }

这里主要看声明式事务的处理,因为编程式事务的处理及提交都是用户在编码中进行控制。在声明式事务处理中,当方法执行完后,会执行 commitTransactionAfterReturning 方法来进行提交事务,该方法在 TransactionAspectSupport 类中,源码如下:

  1. protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
  2. if (txInfo != null && txInfo.hasTransaction()) {
  3. txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
  4. }
  5. }

再看 commit 方法,该方法在 AbstractPlatformTransactionManager 类中,源码如下:

  1. public final void commit(TransactionStatus status) throws TransactionException {
  2. // 这里省略很多代码,如事务回滚......
  3. processCommit(defStatus);
  4. }
  5. private void processCommit(DefaultTransactionStatus status) throws TransactionException {
  6. try {
  7. boolean beforeCompletionInvoked = false;
  8. try {
  9. prepareForCommit(status);
  10. triggerBeforeCommit(status);
  11. triggerBeforeCompletion(status);
  12. beforeCompletionInvoked = true;
  13. boolean globalRollbackOnly = false;
  14. if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
  15. globalRollbackOnly = status.isGlobalRollbackOnly();
  16. }
  17. if (status.hasSavepoint()) {
  18. status.releaseHeldSavepoint();
  19. } else if (status.isNewTransaction()) {
  20. // 提交事务
  21. doCommit(status);
  22. }
  23. //......
  24. } catch (......) {
  25. // 事务异常处理......
  26. }
  27. try {
  28. // 事务提交成功后的处理-----这里是重点
  29. triggerAfterCommit(status);
  30. } finally {
  31. triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
  32. }
  33. }
  34. finally {
  35. cleanupAfterCompletion(status);
  36. }
  37. }
  38. private void triggerAfterCommit(DefaultTransactionStatus status) {
  39. if (status.isNewSynchronization()) {
  40. TransactionSynchronizationUtils.triggerAfterCommit();
  41. }
  42. }

最终会走到 TransactionSynchronizationUtils.triggerAfterCommit() 方法中,

  1. public static void triggerAfterCommit() {
  2. invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
  3. }
  4. public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) {
  5. if (synchronizations != null) {
  6. for (TransactionSynchronization synchronization : synchronizations) {
  7. synchronization.afterCommit();
  8. }
  9. }
  10. }

上面会把缓存在 TransactionSynchronizationManager 中的 TransactionSynchronization 按顺序来执行 afterCommit 方法,其中 TransactionSynchronization 以集合形式缓存在 TransactionSynchronizationManager 的 ThreadLocal 中。

方式一

经过上面分析,只需要代码中重新生成个 TransactionSynchronization 并加入到 TransactionSynchronizationManager 的 TransactionSynchronization 集合中即可,所以有了解决方案,如下:

  1. private void writeStatisticsData(Long studentId) {
  2. if(TransactionSynchronizationManager.isActualTransactionActive()) {
  3. // 当前存在事务
  4. TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
  5. @Override
  6. public void afterCommit() {
  7. executor.execute(() -> {Student student = studentService.findById(studentId);
  8. //........
  9. });
  10. }});
  11. } else {
  12. // 当前不存在事务
  13. executor.execute(() -> {Student student = studentService.findById(studentId);
  14. //........
  15. });
  16. }
  17. }

方式二

使用 @TransactionalEventListener 结合 Spring事件监听机制,该注解自从Spring4.2版本开始有的,如下:

  1. // 事件
  2. public class StudentEvent extends ApplicationEvent {
  3. public StudentEvent(Long studentId) {
  4. super(studentId);
  5. }
  6. }
  7. // 监听器
  8. public class StudentEventListener{
  9. @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
  10. public void writeStatisticsData(StudentEvent studentEvent) {
  11. executor.execute(() -> {
  12. Student student = studentService.findById(studentEvent.getSource());
  13. //........
  14. });
  15. }
  16. }
  17. @Service
  18. public class StudentService {
  19. // Spring4.2之后,ApplicationEventPublisher自动被注入到容器中,采用Autowired即可获取
  20. @Autowired
  21. private ApplicationEventPublisher applicationEventPublisher;
  22. @Transactional
  23. public Long test() {
  24. // ......
  25. // 插入记录
  26. Long studentId = studentService.insert(student);
  27. // 发布事件
  28. applicationEventPublisher.publishEvent(new StudentEvent(studentId));
  29. // 插入学生地址记录
  30. Long addressId = addressService.insert(address);
  31. return studentId;
  32. }
  33. }

原理分析
Spring Bean在加载配置文件时,会使用 AnnotationDrivenBeanDefinitionParser 来解析 annotation-driven 标签,如下:

  1. public class TxNamespaceHandler extends NamespaceHandlerSupport {
  2. //......
  3. @Override
  4. public void init() {
  5. registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
  6. registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
  7. registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
  8. }
  9. }
  1. class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser {
  2. @Override
  3. public BeanDefinition parse(Element element, ParserContext parserContext) {
  4. // 重点——将TransactionalEventListenerFactory加入到容器中
  5. registerTransactionalEventListenerFactory(parserContext);
  6. String mode = element.getAttribute("mode");
  7. if ("aspectj".equals(mode)) {
  8. // mode="aspectj"
  9. registerTransactionAspect(element, parserContext);
  10. }
  11. else {
  12. // mode="proxy"
  13. AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
  14. }
  15. return null;
  16. }
  17. private void registerTransactionalEventListenerFactory(ParserContext parserContext) {
  18. RootBeanDefinition def = new RootBeanDefinition();
  19. def.setBeanClass(TransactionalEventListenerFactory.class);
  20. parserContext.registerBeanComponent(new BeanComponentDefinition(def,
  21. TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME));
  22. }
  23. }
  1. public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {
  2. //省略部分代码......
  3. @Override
  4. public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
  5. return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
  6. }
  7. }
  1. class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {
  2. //省略部分代码......
  3. @Override
  4. public void onApplicationEvent(ApplicationEvent event) {
  5. if (TransactionSynchronizationManager.isSynchronizationActive()) {
  6. // 事务存在时,生成TransactionSynchronization并加入到 TransactionSynchronizationManager的缓存集合中
  7. TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
  8. TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
  9. } else if (this.annotation.fallbackExecution()) {
  10. //.......
  11. }
  12. processEvent(event);
  13. } else {
  14. // 当前不存在事务什么也不做
  15. }
  16. }

上述 @TransactionalEventListener 本质上是一个 @EventListener,TransactionalEventListenerFactory类会将每一个扫描到的方法有TransactionalEventListener注解包装成ApplicationListenerMethodTransactionalAdapter对象,通过ApplicationListenerMethodTransactionalAdapter的onApplicationEvent方法可以看到若当前存在事务,就会生成TransactionSynchronization并加入到 TransactionSynchronizationManager的缓存ThreadLocal集合中,剩余流程同上述分析。(使用 @TransactionalEventListener 结合 Spring事件监听机制,并使用到异步方式感觉有点别扭,这里是为了说明问题)

本文内容来自:

  1. https://blog.csdn.net/zhuqiuhui/article/details/89299360
  2. https://www.jianshu.com/p/6f9cc1384cdf

评论

Post comment Click Refresh verification code

Tip

The feature is not yet available