Spring使用TransactionalEventListener解决事务未提交读取不到数据问题
背景
在项目中,往往需要执行数据库操作后,发送消息或事件来异步调用其他组件执行相应的操作,例如:
用户注册后发送激活码;
配置修改后发送更新事件等。
但是,数据库的操作如果还未完成,此时异步调用的方法查询数据库发现没有数据,这就会出现问题。伪代码如下:
void saveUser(User u) {//保存用户信息userDao.save(u);//触发保存用户事件applicationContext.publishEvent(new SaveUserEvent(u.getId()));}@EventListenervoid onSaveUserEvent(SaveUserEvent event) {//获取事件中的信息(用户id)Integer id = event.getEventData();//查询数据库,获取用户(此时如果用户还未插入数据库,则返回空)User u = userDao.getUserById(id);//这里可能报空指针异常!String phone = u.getPhoneNumber();MessageUtils.sendMessage(phone);}
解决问题方案
解决思路是在事务提交后再做其他的处理(如异步发消息处理等),这里还是从Spring执行事务的过程中入手,Spring事务的处理过程不再分析,这里直接看Spring事务增强器TransactionInterceptor的核心处理流程,源码如下:
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable {// 获取事务属性final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);//加载配置中配置的TransactionManagerfinal PlatformTransactionManager tm = determineTransactionManager(txAttr);final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);// 声明式事务的处理if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);Object retVal = null;//......retVal = invocation.proceedWithInvocation();//......commitTransactionAfterReturning(txInfo);return retVal;} else {// 编程式事务的处理......}//......}
这里主要看声明式事务的处理,因为编程式事务的处理及提交都是用户在编码中进行控制。在声明式事务处理中,当方法执行完后,会执行 commitTransactionAfterReturning 方法来进行提交事务,该方法在 TransactionAspectSupport 类中,源码如下:
protected void commitTransactionAfterReturning(TransactionInfo txInfo) {if (txInfo != null && txInfo.hasTransaction()) {txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());}}
再看 commit 方法,该方法在 AbstractPlatformTransactionManager 类中,源码如下:
public final void commit(TransactionStatus status) throws TransactionException {// 这里省略很多代码,如事务回滚......processCommit(defStatus);}private void processCommit(DefaultTransactionStatus status) throws TransactionException {try {boolean beforeCompletionInvoked = false;try {prepareForCommit(status);triggerBeforeCommit(status);triggerBeforeCompletion(status);beforeCompletionInvoked = true;boolean globalRollbackOnly = false;if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {globalRollbackOnly = status.isGlobalRollbackOnly();}if (status.hasSavepoint()) {status.releaseHeldSavepoint();} else if (status.isNewTransaction()) {// 提交事务doCommit(status);}//......} catch (......) {// 事务异常处理......}try {// 事务提交成功后的处理-----这里是重点triggerAfterCommit(status);} finally {triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);}}finally {cleanupAfterCompletion(status);}}private void triggerAfterCommit(DefaultTransactionStatus status) {if (status.isNewSynchronization()) {TransactionSynchronizationUtils.triggerAfterCommit();}}
最终会走到 TransactionSynchronizationUtils.triggerAfterCommit() 方法中,
public static void triggerAfterCommit() {invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());}public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) {if (synchronizations != null) {for (TransactionSynchronization synchronization : synchronizations) {synchronization.afterCommit();}}}
上面会把缓存在 TransactionSynchronizationManager 中的 TransactionSynchronization 按顺序来执行 afterCommit 方法,其中 TransactionSynchronization 以集合形式缓存在 TransactionSynchronizationManager 的 ThreadLocal 中。
方式一
经过上面分析,只需要代码中重新生成个 TransactionSynchronization 并加入到 TransactionSynchronizationManager 的 TransactionSynchronization 集合中即可,所以有了解决方案,如下:
private void writeStatisticsData(Long studentId) {if(TransactionSynchronizationManager.isActualTransactionActive()) {// 当前存在事务TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {@Overridepublic void afterCommit() {executor.execute(() -> {Student student = studentService.findById(studentId);//........});}});} else {// 当前不存在事务executor.execute(() -> {Student student = studentService.findById(studentId);//........});}}
方式二
使用 @TransactionalEventListener 结合 Spring事件监听机制,该注解自从Spring4.2版本开始有的,如下:
// 事件public class StudentEvent extends ApplicationEvent {public StudentEvent(Long studentId) {super(studentId);}}// 监听器public class StudentEventListener{@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)public void writeStatisticsData(StudentEvent studentEvent) {executor.execute(() -> {Student student = studentService.findById(studentEvent.getSource());//........});}}@Servicepublic class StudentService {// Spring4.2之后,ApplicationEventPublisher自动被注入到容器中,采用Autowired即可获取@Autowiredprivate ApplicationEventPublisher applicationEventPublisher;@Transactionalpublic Long test() {// ......// 插入记录Long studentId = studentService.insert(student);// 发布事件applicationEventPublisher.publishEvent(new StudentEvent(studentId));// 插入学生地址记录Long addressId = addressService.insert(address);return studentId;}}
原理分析:
Spring Bean在加载配置文件时,会使用 AnnotationDrivenBeanDefinitionParser 来解析 annotation-driven 标签,如下:
public class TxNamespaceHandler extends NamespaceHandlerSupport {//......@Overridepublic void init() {registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());}}
class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser {@Overridepublic BeanDefinition parse(Element element, ParserContext parserContext) {// 重点——将TransactionalEventListenerFactory加入到容器中registerTransactionalEventListenerFactory(parserContext);String mode = element.getAttribute("mode");if ("aspectj".equals(mode)) {// mode="aspectj"registerTransactionAspect(element, parserContext);}else {// mode="proxy"AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);}return null;}private void registerTransactionalEventListenerFactory(ParserContext parserContext) {RootBeanDefinition def = new RootBeanDefinition();def.setBeanClass(TransactionalEventListenerFactory.class);parserContext.registerBeanComponent(new BeanComponentDefinition(def,TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME));}}
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {//省略部分代码......@Overridepublic ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);}}
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {//省略部分代码......@Overridepublic void onApplicationEvent(ApplicationEvent event) {if (TransactionSynchronizationManager.isSynchronizationActive()) {// 事务存在时,生成TransactionSynchronization并加入到 TransactionSynchronizationManager的缓存集合中TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);} else if (this.annotation.fallbackExecution()) {//.......}processEvent(event);} else {// 当前不存在事务什么也不做}}
上述 @TransactionalEventListener 本质上是一个 @EventListener,TransactionalEventListenerFactory类会将每一个扫描到的方法有TransactionalEventListener注解包装成ApplicationListenerMethodTransactionalAdapter对象,通过ApplicationListenerMethodTransactionalAdapter的onApplicationEvent方法可以看到若当前存在事务,就会生成TransactionSynchronization并加入到 TransactionSynchronizationManager的缓存ThreadLocal集合中,剩余流程同上述分析。(使用 @TransactionalEventListener 结合 Spring事件监听机制,并使用到异步方式感觉有点别扭,这里是为了说明问题)
本文内容来自:
评论