登录后台

页面导航

本文编写于 1160 天前,最后修改于 1160 天前,其中某些信息可能已经过时。

背景

在项目中,往往需要执行数据库操作后,发送消息或事件来异步调用其他组件执行相应的操作,例如:
用户注册后发送激活码;
配置修改后发送更新事件等。
但是,数据库的操作如果还未完成,此时异步调用的方法查询数据库发现没有数据,这就会出现问题。伪代码如下:

void saveUser(User u) {
    //保存用户信息
    userDao.save(u);
    //触发保存用户事件
    applicationContext.publishEvent(new SaveUserEvent(u.getId()));
}

@EventListener
void onSaveUserEvent(SaveUserEvent event) {
    //获取事件中的信息(用户id)
    Integer id = event.getEventData();
    //查询数据库,获取用户(此时如果用户还未插入数据库,则返回空)
    User u = userDao.getUserById(id);
    //这里可能报空指针异常!
    String phone = u.getPhoneNumber();
    MessageUtils.sendMessage(phone);
}

解决问题方案

解决思路是在事务提交后再做其他的处理(如异步发消息处理等),这里还是从Spring执行事务的过程中入手,Spring事务的处理过程不再分析,这里直接看Spring事务增强器TransactionInterceptor的核心处理流程,源码如下:

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
  // 获取事务属性
  final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
  //加载配置中配置的TransactionManager
  final PlatformTransactionManager tm = determineTransactionManager(txAttr);
  final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
  
  // 声明式事务的处理
  if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
    TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
    Object retVal = null;
    //......
    retVal = invocation.proceedWithInvocation();
    //......
    commitTransactionAfterReturning(txInfo);
    return retVal;
  } else {
    // 编程式事务的处理......
  }
  //......
}

这里主要看声明式事务的处理,因为编程式事务的处理及提交都是用户在编码中进行控制。在声明式事务处理中,当方法执行完后,会执行 commitTransactionAfterReturning 方法来进行提交事务,该方法在 TransactionAspectSupport 类中,源码如下:

protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
  if (txInfo != null && txInfo.hasTransaction()) {
    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
  }
}

再看 commit 方法,该方法在 AbstractPlatformTransactionManager 类中,源码如下:

public final void commit(TransactionStatus status) throws TransactionException {
    // 这里省略很多代码,如事务回滚......
        processCommit(defStatus);
}

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;
            try {
        prepareForCommit(status);
                triggerBeforeCommit(status);
                triggerBeforeCompletion(status);
                beforeCompletionInvoked = true;
                boolean globalRollbackOnly = false;
                if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                    globalRollbackOnly = status.isGlobalRollbackOnly();
                }
                if (status.hasSavepoint()) {
                    status.releaseHeldSavepoint();
                } else if (status.isNewTransaction()) {
          // 提交事务
                    doCommit(status);
                }
                //......
            } catch (......) {
                // 事务异常处理......
            }

            
            try {
        // 事务提交成功后的处理-----这里是重点
                triggerAfterCommit(status);
            } finally {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
            }
        }
        finally {
            cleanupAfterCompletion(status);
    }
}

private void triggerAfterCommit(DefaultTransactionStatus status) {
  if (status.isNewSynchronization()) {
    TransactionSynchronizationUtils.triggerAfterCommit();
  }
}

最终会走到 TransactionSynchronizationUtils.triggerAfterCommit() 方法中,

public static void triggerAfterCommit() {
        invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
}

public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) {
        if (synchronizations != null) {
      for (TransactionSynchronization synchronization : synchronizations) {
                synchronization.afterCommit();
      }
        }
}

上面会把缓存在 TransactionSynchronizationManager 中的 TransactionSynchronization 按顺序来执行 afterCommit 方法,其中 TransactionSynchronization 以集合形式缓存在 TransactionSynchronizationManager 的 ThreadLocal 中。

方式一

经过上面分析,只需要代码中重新生成个 TransactionSynchronization 并加入到 TransactionSynchronizationManager 的 TransactionSynchronization 集合中即可,所以有了解决方案,如下:

private void writeStatisticsData(Long studentId) {
  if(TransactionSynchronizationManager.isActualTransactionActive()) {
            // 当前存在事务
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
              @Override
              public void afterCommit() {
                executor.execute(() -> {Student student = studentService.findById(studentId);
                //........
                });
              }});
        } else {
            // 当前不存在事务
            executor.execute(() -> {Student student = studentService.findById(studentId);
                //........
                });
        }
}

方式二

使用 @TransactionalEventListener 结合 Spring事件监听机制,该注解自从Spring4.2版本开始有的,如下:

// 事件
public class StudentEvent extends ApplicationEvent {
    public StudentEvent(Long studentId) {
        super(studentId);
    }
}

// 监听器
public class StudentEventListener{
  @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
  public void writeStatisticsData(StudentEvent studentEvent) {
    executor.execute(() -> {
      Student student = studentService.findById(studentEvent.getSource());
      //........
    });
  }
}

@Service
public class StudentService {
  // Spring4.2之后,ApplicationEventPublisher自动被注入到容器中,采用Autowired即可获取
  @Autowired
  private ApplicationEventPublisher applicationEventPublisher;
  
  @Transactional
  public Long test() {
    // ......
    // 插入记录
    Long studentId = studentService.insert(student);
    // 发布事件
    applicationEventPublisher.publishEvent(new StudentEvent(studentId));
    // 插入学生地址记录
    Long addressId = addressService.insert(address);
    return studentId;
  }
 
}

原理分析
Spring Bean在加载配置文件时,会使用 AnnotationDrivenBeanDefinitionParser 来解析 annotation-driven 标签,如下:

public class TxNamespaceHandler extends NamespaceHandlerSupport {
  //......
    @Override
    public void init() {
        registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
        registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
        registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
    }
}
class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser {
  
  @Override
    public BeanDefinition parse(Element element, ParserContext parserContext) {
    // 重点——将TransactionalEventListenerFactory加入到容器中
        registerTransactionalEventListenerFactory(parserContext);
        String mode = element.getAttribute("mode");
        if ("aspectj".equals(mode)) {
            // mode="aspectj"
            registerTransactionAspect(element, parserContext);
        }
        else {
            // mode="proxy"
            AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
        }
        return null;
    }
  
  private void registerTransactionalEventListenerFactory(ParserContext parserContext) {
        RootBeanDefinition def = new RootBeanDefinition();
        def.setBeanClass(TransactionalEventListenerFactory.class);
        parserContext.registerBeanComponent(new BeanComponentDefinition(def,
                TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME));
    }
}
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {
  
  //省略部分代码......
  
    @Override
    public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
        return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
    }
}
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {
  
   //省略部分代码......
  
    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
      // 事务存在时,生成TransactionSynchronization并加入到 TransactionSynchronizationManager的缓存集合中
            TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
            TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
        } else if (this.annotation.fallbackExecution()) {
            //.......
            }
            processEvent(event);
        } else {
            // 当前不存在事务什么也不做
        }
    }

上述 @TransactionalEventListener 本质上是一个 @EventListener,TransactionalEventListenerFactory类会将每一个扫描到的方法有TransactionalEventListener注解包装成ApplicationListenerMethodTransactionalAdapter对象,通过ApplicationListenerMethodTransactionalAdapter的onApplicationEvent方法可以看到若当前存在事务,就会生成TransactionSynchronization并加入到 TransactionSynchronizationManager的缓存ThreadLocal集合中,剩余流程同上述分析。(使用 @TransactionalEventListener 结合 Spring事件监听机制,并使用到异步方式感觉有点别扭,这里是为了说明问题)

本文内容来自:

  1. https://blog.csdn.net/zhuqiuhui/article/details/89299360
  2. https://www.jianshu.com/p/6f9cc1384cdf