背景
在项目中,往往需要执行数据库操作后,发送消息或事件来异步调用其他组件执行相应的操作,例如:
用户注册后发送激活码;
配置修改后发送更新事件等。
但是,数据库的操作如果还未完成,此时异步调用的方法查询数据库发现没有数据,这就会出现问题。伪代码如下:
void saveUser(User u) {
//保存用户信息
userDao.save(u);
//触发保存用户事件
applicationContext.publishEvent(new SaveUserEvent(u.getId()));
}
@EventListener
void onSaveUserEvent(SaveUserEvent event) {
//获取事件中的信息(用户id)
Integer id = event.getEventData();
//查询数据库,获取用户(此时如果用户还未插入数据库,则返回空)
User u = userDao.getUserById(id);
//这里可能报空指针异常!
String phone = u.getPhoneNumber();
MessageUtils.sendMessage(phone);
}
解决问题方案
解决思路是在事务提交后再做其他的处理(如异步发消息处理等),这里还是从Spring执行事务的过程中入手,Spring事务的处理过程不再分析,这里直接看Spring事务增强器TransactionInterceptor的核心处理流程,源码如下:
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
// 获取事务属性
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
//加载配置中配置的TransactionManager
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 声明式事务的处理
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
//......
retVal = invocation.proceedWithInvocation();
//......
commitTransactionAfterReturning(txInfo);
return retVal;
} else {
// 编程式事务的处理......
}
//......
}
这里主要看声明式事务的处理,因为编程式事务的处理及提交都是用户在编码中进行控制。在声明式事务处理中,当方法执行完后,会执行 commitTransactionAfterReturning 方法来进行提交事务,该方法在 TransactionAspectSupport 类中,源码如下:
protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
if (txInfo != null && txInfo.hasTransaction()) {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
再看 commit 方法,该方法在 AbstractPlatformTransactionManager 类中,源码如下:
public final void commit(TransactionStatus status) throws TransactionException {
// 这里省略很多代码,如事务回滚......
processCommit(defStatus);
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
if (status.hasSavepoint()) {
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
// 提交事务
doCommit(status);
}
//......
} catch (......) {
// 事务异常处理......
}
try {
// 事务提交成功后的处理-----这里是重点
triggerAfterCommit(status);
} finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
private void triggerAfterCommit(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
TransactionSynchronizationUtils.triggerAfterCommit();
}
}
最终会走到 TransactionSynchronizationUtils.triggerAfterCommit() 方法中,
public static void triggerAfterCommit() {
invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
}
public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) {
if (synchronizations != null) {
for (TransactionSynchronization synchronization : synchronizations) {
synchronization.afterCommit();
}
}
}
上面会把缓存在 TransactionSynchronizationManager 中的 TransactionSynchronization 按顺序来执行 afterCommit 方法,其中 TransactionSynchronization 以集合形式缓存在 TransactionSynchronizationManager 的 ThreadLocal 中。
方式一
经过上面分析,只需要代码中重新生成个 TransactionSynchronization 并加入到 TransactionSynchronizationManager 的 TransactionSynchronization 集合中即可,所以有了解决方案,如下:
private void writeStatisticsData(Long studentId) {
if(TransactionSynchronizationManager.isActualTransactionActive()) {
// 当前存在事务
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
executor.execute(() -> {Student student = studentService.findById(studentId);
//........
});
}});
} else {
// 当前不存在事务
executor.execute(() -> {Student student = studentService.findById(studentId);
//........
});
}
}
方式二
使用 @TransactionalEventListener 结合 Spring事件监听机制,该注解自从Spring4.2版本开始有的,如下:
// 事件
public class StudentEvent extends ApplicationEvent {
public StudentEvent(Long studentId) {
super(studentId);
}
}
// 监听器
public class StudentEventListener{
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void writeStatisticsData(StudentEvent studentEvent) {
executor.execute(() -> {
Student student = studentService.findById(studentEvent.getSource());
//........
});
}
}
@Service
public class StudentService {
// Spring4.2之后,ApplicationEventPublisher自动被注入到容器中,采用Autowired即可获取
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@Transactional
public Long test() {
// ......
// 插入记录
Long studentId = studentService.insert(student);
// 发布事件
applicationEventPublisher.publishEvent(new StudentEvent(studentId));
// 插入学生地址记录
Long addressId = addressService.insert(address);
return studentId;
}
}
原理分析:
Spring Bean在加载配置文件时,会使用 AnnotationDrivenBeanDefinitionParser 来解析 annotation-driven 标签,如下:
public class TxNamespaceHandler extends NamespaceHandlerSupport {
//......
@Override
public void init() {
registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
}
}
class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser {
@Override
public BeanDefinition parse(Element element, ParserContext parserContext) {
// 重点——将TransactionalEventListenerFactory加入到容器中
registerTransactionalEventListenerFactory(parserContext);
String mode = element.getAttribute("mode");
if ("aspectj".equals(mode)) {
// mode="aspectj"
registerTransactionAspect(element, parserContext);
}
else {
// mode="proxy"
AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
}
return null;
}
private void registerTransactionalEventListenerFactory(ParserContext parserContext) {
RootBeanDefinition def = new RootBeanDefinition();
def.setBeanClass(TransactionalEventListenerFactory.class);
parserContext.registerBeanComponent(new BeanComponentDefinition(def,
TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME));
}
}
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {
//省略部分代码......
@Override
public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
}
}
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {
//省略部分代码......
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 事务存在时,生成TransactionSynchronization并加入到 TransactionSynchronizationManager的缓存集合中
TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
} else if (this.annotation.fallbackExecution()) {
//.......
}
processEvent(event);
} else {
// 当前不存在事务什么也不做
}
}
上述 @TransactionalEventListener 本质上是一个 @EventListener,TransactionalEventListenerFactory类会将每一个扫描到的方法有TransactionalEventListener注解包装成ApplicationListenerMethodTransactionalAdapter对象,通过ApplicationListenerMethodTransactionalAdapter的onApplicationEvent方法可以看到若当前存在事务,就会生成TransactionSynchronization并加入到 TransactionSynchronizationManager的缓存ThreadLocal集合中,剩余流程同上述分析。(使用 @TransactionalEventListener 结合 Spring事件监听机制,并使用到异步方式感觉有点别扭,这里是为了说明问题)
本文内容来自: