spring中使用异步事件实现同步事务

摘要:
在Spring中使用异步事件实现同步事务结合Scala+Spring,我们将采取一个很简单的场景:下订单,然后发送一封电子邮件。事件监听者代码如下:@ServiceclassOrderMailNotifierextendsApplicationListener[OrderPlacedEvent]{defonApplicationEvent{//sendinge-mail...}}在监听者方法中真正实现邮件发送。,默认情况下,Spring使用SimpleAsyncTaskExecutor类创建新的线程。开始事务存储order到数据库发送一个包装order的消息确认异步线程获得OrderPlacedEvent并开始处理。@TransactionaldefplaceOrder{orderDaosaveorderafterCommit{eventPublisherpublishEventOrderPlacedEvent}}privatedefafterCommit[T]{TransactionSynchronizationManager.registerSynchronization}当前事务提交后afterCommit()接受调用,可以安全地调用registerSynchronization()多次-监听器存储在Set并且本地保存到当前事务中,事务提交后消失。
在Spring中使用异步事件实现同步事务
结合Scala+Spring,我们将采取一个很简单的场景:下订单,然后发送一封电子邮件。
编制一个服务:
@Service
class OrderService @Autowired() (orderDao: OrderDao, mailNotifier: OrderMailNotifier) {
@Transactional
def placeOrder(order: Order) {
orderDao save order //保存订单
mailNotifier sendMail order //发送邮件
}
}
上面代码是在保存订单和发送邮件两个同步执行,发送邮件需要连接邮件服务器,比较耗时,拖延了整个性能,我们采取异步发送电子邮件,利用Spring内置的自定义事件,与JMS或其他生产者 - 消费者类似。
case class OrderPlacedEvent(order: Order) extendsApplicationEvent
@Service
class OrderService @Autowired() (orderDao: OrderDao, eventPublisher: ApplicationEventPublisher) {
@Transactional
def placeOrder(order: Order) {
orderDao save order
eventPublisher publishEvent OrderPlacedEvent(order)
}
}
区别是继承了ApplicationEvent之前是直接用OrderMailNotifier直接发送,而现在我们使用ApplicationEventPublisher发送发邮件事件了。
事件监听者代码如下:
@Service
class OrderMailNotifier extends ApplicationListener[OrderPlacedEvent] {
def onApplicationEvent(event: OrderPlacedEvent) {
//sending e-mail...
}
}
在监听者方法中真正实现邮件发送。
但是Spring的ApplicationEvents是同步事件,意味着我们并没有真正实现异步,程序还会在这里堵塞,如果希望异步,我们需要重新定义一个ApplicationEventMulticaster,实现类型SimpleApplicationEventMulticaster和TaskExecutor:
@Bean
def applicationEventMulticaster() = {
val multicaster = new SimpleApplicationEventMulticaster()
multicaster.setTaskExecutor(taskExecutor())
multicaster
}
@Bean
def taskExecutor() = {
val pool = new ThreadPoolTaskExecutor()
pool.setMaxPoolSize(10)
pool.setCorePoolSize(10)
pool.setThreadNamePrefix("Spring-Async-")
pool
}
Spring通过使用TaskExecutor已经支持广播事件了,对onApplicationEvent()标注@Async
@Async
def onApplicationEvent(event: OrderPlacedEvent) { //...
如果你希望使用@Async,可以编制自己的异步执行器:
@Configuration
@EnableAsync
class ThreadingConfig extends AsyncConfigurer {
def getAsyncExecutor = taskExecutor()
@Bean
def taskExecutor() = {
val pool = new ThreadPoolTaskExecutor()
pool.setMaxPoolSize(10)
pool.setCorePoolSize(10)
pool.setThreadNamePrefix("Spring-Async-")
pool
}
}
@ EnableAsync是足够了。,默认情况下,Spring使用SimpleAsyncTaskExecutor类创建新的线程。
以上所有设置暴露一个真正的问题。现在,我们虽然使用其他线程发送一个异步消息处理。不幸的是,我们引入竞争条件。
  1. 开始事务
  2. 存储order到数据库
  3. 发送一个包装order的消息
  4. 确认
异步线程获得OrderPlacedEvent并开始处理。现在的问题是,它发生(3)之后,还是(4)之前或者(4)之后?这有一个很大的区别!在前者的情况下,交易也尚未提交订单所以不存在于数据库中。另一方面,延迟加载可能已经在工作,致使订单对象仍然然绑定在 PersistenceContext(缺省我们使用JPA)。
解决办法是使用TransactionSynchronizationManager.,可以注册很多监听者TransactionSynchronization,它对于事务的提交或回滚都有事件发送。
@Transactional
def placeOrder(order: Order) {
orderDao save order
afterCommit {
eventPublisher publishEvent OrderPlacedEvent(order)
}
}
private def afterCommit[T](fun: => T) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter {
override def afterCommit() {
fun
}
})
}
当前事务提交后 afterCommit()接受调用,可以安全地调用registerSynchronization()多次 - 监听器存储在Set并且本地保存到当前事务中,事务提交后消失。
我们将afterCommit方法单独抽象成一个类,分离关注。
class TransactionAwareApplicationEventPublisher(delegate: ApplicationEventPublisher)
extendsApplicationEventPublisher{
override def publishEvent(event: ApplicationEvent) {
if (TransactionSynchronizationManager.isActualTransactionActive) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronizationAdapter {
override def afterCommit() {
delegate publishEvent event
}
})
}
else
delegate publishEvent event
}
}
TransactionAwareApplicationEventPublisher是实现Spring的ApplicationEventPublisher。
我们要将这个新的实现告诉Spring替换掉旧的,用@Primary:
@Resource
val applicationContext: ApplicationContext = null
@Bean
@Primary
def transactionAwareApplicationEventPublisher() =
new TransactionAwareApplicationEventPublisher(applicationContext)
再看看原来的订单服务:
@Service
class OrderService @Autowired() (orderDao: OrderDao, eventPublisher:ApplicationEventPublisher) {
@Transactional
def placeOrder(order: Order) {
orderDao save order
eventPublisher publishEvent OrderPlacedEvent(order)
}
注意这里ApplicationEventPublisher已经是我们自己实现的TransactionAwareApplicationEventPublisher,将被自动注入这个服务。
最后,要在真正订单保存的业务代码上放置事务:
def placeOrder(order: Order) {
storeOrder(order)
eventPublisher publishEvent OrderPlacedEvent(order)
}
@Transactional
def storeOrder(order: Order) = orderDao save order
当然这没有根本解决问题,如果placeOrder有一个更大的事务怎么办?

免责声明:文章转载自《spring中使用异步事件实现同步事务》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇MySQL优化服务器设置(MySQL优化配置文件)图片在 canvas 中的 选中/平移/缩放/旋转,包含了所有canvas的2D变化,让你认识到数学的重要性下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

为什么阿里规定需要在事务注解@Transactional中指定rollbackFor?

作者:Mint6 来源:http://39sd.cn/53D5D Java阿里巴巴规范提示:方法【edit】需要在Transactional注解指定rollbackFor或者在方法中显示的rollback。 异常的分类 先来看看异常的分类 error是一定会回滚的。 这里Exception是异常,他又分为运行时异常RuntimeException和非运...

spring事务配置步骤

spring事务配置流程 第一步:配置事务管理器 第二步:配置通知--》传播行为 第三步:配置切入点--》切面 AOP <!-- 事务管理器 --> <bean class="org.springframework.jdbc.datasource.DataSourceTransactionManager">...

分布式系统的一致性问题

分布式系统的一致性问题  参考: https://blog.csdn.net/zheng0518/article/details/51194942 https://blog.csdn.net/kangbin825/article/details/71006546?locationNum=7&fps=1 http://iamzhongyong.ite...

DataSourceTransactionManager手动提交事务和回滚事务

注入 @Autowiredprivate DataSourceTransactionManager transactionManager; TransactionStatus transactionStatus = transactionManager.getTransaction(new DefaultTransactionDefinition());t...

Informatica_(3)组件

  一、Informatica介绍Informatica PowerCenter 是Informatica公司开发的世界级的企业数据集成平台,也是业界领先的ETL工具。Informatica PowerCenter使用户能够方便地从异构的已有系统和数据源中抽取数据,用来建立、部署、管理企业的数据仓库,从而帮助企业做出快速、正确的决策。此产品为满足企业级要...

数据库连接,事务以及Java线程的关系

0. 前言 Spring作为Java框架王者,当前已经是基础容器框架的实际标准。Spring 除了提供了IoC、AOP特性外,还有一个极其核心和重要的特性:数据库事务。事务管理涉及到的技术点比较多,想完全理解需要花费一定的时间,本系列《Spring设计思想-事务篇》将通过如下几个方面来阐述Spring的数据库事务: 数据库连接java.sql.Conne...