eda 事件驱动架构

事件驱动架构(Event Driven Architecture,EDA)一个事件驱动框架(EDA)定义了一个设计和实现一个应用系统的方法学,在这个系统里事件可传输于松散耦合的组件和服务之间。一个事件驱动系统典型地由事件消费者和事件产生者组成。事件消费者向事件管理器订阅事件,事件产生者向事件管理器发布事件。当事件管理器从事件产生者那接收到一个事件时,事件管理把这个事件转送给相应的事件消费者。如果这个事件消费者是不可用的,事件管理者将保留这个事件,一段间隔之后再次转送该事件消费者。这种事件传送方法在基于消息的系统里就是:储存(store)和转送(forward)

构建一个包含事件驱动构架的应用程序和系统,会使这些应用程序和系统响应更灵敏,因为事件驱动的系统更适合应用在不可预知的和异步的环境里。
事件驱动架构在具体实现中是指由一系列相关组件构成的应用,而组件之间通过事件机制完成一定的业务功能。由于在一个EDA系统中各个组件都只专注于处理输入的消息与发布输出的消息,因而EDA系统能够更有加效地对管道化(pipelined)的、由多软件模块链接而成的并发事件流(concurrent processing of events)进行处理。
EDA系统中各组件以异步方式响应事件,在本质上是可以并行的,因而在政府部门的电子政务应用中具有极大的优势。其具备以下特点:
◆ 并发执行
◆ 事件触发/数据触发/时间规则触发
◆ 实时/增量响应
◆ 分布式事件系统处理
事件驱动架构优势编辑
事件驱动设计和开发所提供的优势如下所示:
◆ EDA提高了对不断变化的业务需求的响应,最大限度地减少了对现有业务应用的影响,也常消除了对新打包应用的需要。如果采用特有的粗颗粒服务模型可以基于业务目标快速确定可控的业务变更,并直接、迅速、有效地实施变更以达到业务敏捷性和完整性。
◆ 可以更容易开发和维护大规模分布式应用程序和不可预知的服务或异步服务;
◆ 可以很容易,低成本地集成、再集成、再配置新的和已存在的应用程序和服务。
◆ 促进远程组件和服务的再使用,拥有一个更灵敏、没有Bug的开发环境。
从时间维度来看EDA的优势:
◆ 短期利益:更容易定制,因为设计对动态处理有更好的响应;
◆ 长期利益:系统和组织的状态变得更精准,对实时变化的响应接近于同步。



事件驱动的方法编辑
·综合数据来源
·Alert定义和通知
·业务流程的定义
·多种来源事件关联
·alerts自动回复
上述前三个综合数据来源,alert定义/通知和业务流程定义是至关重要的组成部分。在第四清单中,多种来源事件关联使系统关联可能出现在无关的系统级事件表面上,制作一个有关业务级别的活动。在上述清单的第五能力,alerts自动回复。自动回复,很简单,因为记录错误或复杂援引一个自动的过程中对内部或外部的制度。这种技术,用户不仅要干预真正的特殊条件。还应该使用户定义的各种视觉的隐喻,让事件都最有意义的显示给用户。mash最多的活动通知与charts, maps (2D or 3D)使用Enterprise 2.0技术在这种情况下将是非常有效的。代表事件的表格数据掩盖了真正发生的事情,破坏了事件驱动的业务流程的实现。



https://baike.baidu.com/item/%E4%BA%8B%E4%BB%B6%E9%A9%B1%E5%8A%A8%E6%9E%B6%E6%9E%84/883637?fr=aladdin



EDA 是一种侧重于以生成/消费为基础的异步通信的架构模式。这主要对照于传统的基于线程的同步系统。
EDA 是一种以事件 (event)为核心,提供事件产生,路由,消费已经结果回调等机制的架构模式。



简单地说, 面向服务架构 (Service-Oriented Architecture, SOA) 是一种 IT 架构策略,其基于面向服务的概念之上。自从 2002 开始为大家熟知以来,SOA 已经逐渐地成为业界的标准,也得到了广泛的应用。



SOA != Web Service



因为 SOA 经常和 Web Service 相提并论,所以导致大家一直有一个误区,认为这两者是等同的,其实不然。虽然两者有很多的关联,但它们是截然不同的两个概念,或者说考虑问题的角度不同。虽然 SOA 最初的流行离不开 Web Service 的贡献,但如今 SOA 早已超越了 Web Service 的范畴,变成一个独立的设计理念。



SOA 的局限性



SOA 主要从系统解构的角度入手,它侧重于将整个应用分解为一系列独立的服务,并指定各种标准和基础设施来使得这些服务易于重用,能够很容易地被各种平台上的应用来使用。但是在服务实际业务时出现了一些问题,因为 SOA 更多地是关注静态的信息,所以不能很好地与动态业务匹配。比如 SOA 不能很好地回答类似下面的一些问题:



比如在一个证券公司,有很完善的交易系统、后台系统、账务系统和头寸管理系统等,当一个客户的下单在交易所执行后,所有的这些系统都应该得到通知,并做出相应的处理。这里面其实包含了几个问题:谁来负责触发这样一个事件,各个系统如何得到通知?如何保证各个系统行动的一致性? SOA 架构由于其关注点的限制,并不能很好地解决上述问题,而这些问题往往又是实际系统非常需要的特性。因此,EDA 与 SOA 的集成引起了人们的注意。



通过引入面向事件的机制,使得系统具备了感知和快速响应业务事件的能力。其实不管是 SOA 还是 EDA 都不是什么新技术,无非是在一些旧的概念上添加了一些新元素。



什么是事件(Event)?
事件就是状态的显著变化,比如说前面提到的客户下单被执行。从来源来分,事件可以分为系统内部事件和外部事件。从类型来分,可以分为业务事件和系统事件。



其实你可以从 SOA 或 EDA 的身上很容易看到以前的技术 ( 比如 CORBA 或者 DCOM) 的影子。任何系统都可以简化为组件 / 服务加上通道 (channel,解决通讯的问题 ),如果说 SOA 关注于组件和服务的话,那么 EDA 更多地关注通道。



回页首



Event-Driven SOA



我们一般将 SOA 和 EDA 的集成体称之为事件驱动的面向服务架构 (Event-Driven SOA),可以将其理解为 SOA 的一种衍生。SOA 和 EDA 的交互主要体现在以下几个方面:



将事件处理的能力引入到 SOA



一个事件的产生可以触发一个或多个服务被调用,这样就把这些静态的功能动态地串联起来。



服务本身也可以产生事件



服务除了完成特定的功能外,也可以根据自身需要产生某个事件。



有人将 EDA 和 SOA 的关系与人体做了一个形象的比喻,如果把 SOA 比作手和脚的话,那 EDA 就像人的眼睛和耳朵。当眼睛发现一只狮子正朝你奔来时,一个消息被发送到大脑,然后大脑向你的手脚发出指令:赶快跑。



回页首



Event-Driven SOA 架构的特点



当然,任何一种架构模式都有其适用的场景,Event-Driven SOA 自然也不例外。



首先,它适用于异步的环境。如果你的系统对实时性要求比较高,请不要使用该架构。



第二,如果你的系统需要面对复杂的异构环境——跨平台 / 跨语言,那么面向服务的架构能够很好地应对。



第三,将系统功能分解为适当粒度并且重用性高的一个个服务,可以显著地提高 IT 系统的适应性和效率,进而提高投资回报率 (ROI)。



第四,引入事件处理的能力以后,每个服务都是由不同的事件驱动,这样当某个事件发生后,系统的不同服务就能够自动地进行触发。这对那些有更高自动化要求的系统来说非常适合。



第五,与面向过程的系统中客户端必须轮询更改请求 ( 通过 API 调用 ) 不同,事件驱动架构允许系统和组件在事件发生时实时动态地做出响应。事件驱动架构通过引入长时间运行的处理功能来弥补 SOA 的不足。这一点对于金融系统来说尤其重要,比如说一次股票买卖从客户下单到最终交割会经历几天的生命周期。



最后,Event-Driven SOA 使得增加事件的 consumer 和 producer 非常容易,这样就使得增加系统吞吐量也变得很简单,系统的弹性非常好,非常适合那些业务量持续增加的系统。在这方面,有一个 EDA 的变体 SEDA(Staged Event-Driven Architecture)将这方面的设计发挥到了极致,详细的介绍请参考正文后的参考资料。



回页首



Event-Driven SOA 在金融系统的应用



金融系统的实际需求



在当今社会,市场变化莫测,商机稍纵即逝,企业需要有极强的灵活性和应变能力,金融行业尤其如此,特别是在中国这样一个金融行业处于快速发展的市场里。企业要求 IT 系统能够快速地对业务需求做出应对,否则就会丧失先发优势。这有点类似于现代战争条件下,各国都要求部队具备快速反应能力,这种能力主要体现在 IT 部门能够通过快速开发或者重用 / 整合现有资源来达到快速响应业务需求。还有,金融行业业务越来越庞大复杂,所涉及的第三方系统或者遗留系统非常多,这就要求 IT 系统有很强的整合能力及对异构环境的适应能力。最后,由于金融行业的发展日新月异,特定金融业务都会在其初期发展后迎来一个快速膨胀期,业务量和业务类型会急剧增加,这也要求 IT 系统有很好的可扩展性。



对照前面提到的 Event-Driven SOA 的特点,我们可以很直观地发现该架构可以很好地满足金融系统的实际需求。当然,金融系统也是包罗万象,特点各不一样,这里可能更偏重于金融行业的交易系统。



为什么选择 Event-Driven SOA ——适用性讨论



除了上面提到的这些大的因素之外,我们还可以深入到具体系统的内部,从一些微观层面来考虑 Event-Driven SOA 是否仍然能够符合我们的要求。下图是一个证券公司股票交易系统的简图:



图 1. 证券公司股票交易系统概略图
图 1. 证券公司股票交易系统概略图



从上图我们可以看出,整个应用被分为很多子系统,各个子系统之间存在着大量的信息交互。而且大部分应用输入都需要经历一个比较长的生命周期,比如说一个客户订单输入到系统后,会先后经历前台系统 (Front Office),中台系统 (Middle Office) 以及后台系统 (Back Office),而且每个系统内部又包括很多服务组件。除了系统层面的跨度外, 这个生命周期也体现在时间长度上。而且,如今所有的金融系统都追求 STP (Straight Through Processing) 的能力,主张尽可能少的人工干预,这样所有的服务组件都需要具备自触发的能力。



回页首



Event-Driven SOA 架构设计



架构师在着手每次的架构设计时,其实都是在提出并回答一系列的问题,把这些问题都回答了,架构设计也就出来了。比如我们每次肯定都会问:系统的最终用户是谁,他们会如何来使用该系统,他们的核心诉求是什么。当然,不是所有的问题都能有一个圆满的答案,更多的时候其实是一个取舍的过程。比如说系统的关键指标我们很难一下子全部满足,就需要结合具体的业务需求和人力物力以及时间的具体情况来做取舍。下表就列出了一些我在做 Event-Driven SOA 架构设计时认为比较关键的问题(在遵循一般架构设计的原则的基础之上),看看你是否也有同感。



表一:Event-Driven SOA 架构设计时的几个关键考虑



领域 关键考虑
设计原则 业务为先
坚持简单适用的原则
系统的关键指标有哪些?互联互通最重要
设计演变 功能分解,服务定义,事件的定义及分类
基础架构的选择
EDA 的实现途径
最佳实践 如何重用已有的基础组件



下面我就其中的几点具体展开讨论一下:



业务为先



任何的技术或者架构思想都是由具体的业务需求驱动的,比如 SOA 的出现是由于人们打破竖井应用 (application silos) 并追求功能重用的强烈需求,而 EDA 的出现也迎合了业务流程化、自动化的趋势。所以,任何的架构设计都要服从于自身业务的具体需求,没有最好的架构设计,只有最合适的。



在 SOA 实践中,尤其强调业务为先的原则,因为我们必须先进行业务流程的整合重组,然后才是 IT 系统的服务化。业务流程本身的问题还是需要从业务本身去解决,再好的技术也解决不了业务的问题。试想一下,如果一个企业各个部门之间各自为战,缺乏协作和沟通,那么可能开发出一个好的面向服务的 IT 系统吗?



除了业务部门的努力外,IT 部门在做任何架构设计的决定前,必须确保理解清楚了业务部门的具体需求。所以说,项目前期 IT 部门和业务部门之间的协作和交流非常重要。



这里很容易有一个误区,尤其是对那些经验丰富的架构师。他们往往拥有丰富的 IT 经验和业务知识,自认为已经非常了解业务部门的需求,甚至有些时候都能够指导业务部门如何去改进。在这种自负的情绪中,他们觉得可以先把所谓先进的 IT 系统开发出来,然后再去推广,他们认为用户肯定会欣然接受这些系统,因为他们代表着先进的理念,但往往事与愿违。姑且不去深究究竟孰对孰错,退一万步讲,一个没有充分听取用户意见,没有用户参与的系统能够那么容易得到用户的认可吗?即便你是对的。



互联互通



在 Event-Driven SOA 的实施过程中,有几个关键指标:服务的分类和创建,事件的定义和管理,服务的互联互通,业务流程的理解和 IT 实现等。那我们应该更加关注哪个指标呢?因为我们往往很难一下子兼顾所有的指标。个人认为这其中最重要的就是服务的互通互联。当然这里所讲的互通互联并没有那么简单,并不是仅仅建立起通讯的通道就可以,它包括以下几个方面的内容:



无论通讯的方式如何,最好做到自动化
实现通讯的方式有很多种:同步调用,异步消息,Socket 甚至是文件,无论采用哪种,最好做到自动化的实现。任何人工的干预都容易引起错误和延迟。



通讯的双方之间需要定义清晰的接口,有共同的异常应对机制
特别是当通讯的双方是由不同的开发团队来完成,一定要在开始阶段就定义清楚接口,并在随后的开发过程中严格遵守,同时保持实时的沟通。这里面需要强调的一点就是异常的应对机制,要让双方都充分理解可能面对的异常情况及应对措施。



基础数据的共享
在金融系统中,会用到大量的基础数据(一般称之为 Reference Data),这些数据在各个系统都会用到。但事实上情况往往并不如此,经常是各个系统各自为战,不用或者是使用不同的数据源,导致在通讯过程中的识别歧义。



做到以上这些,技术上并不困难,更重要的是项目之间的协作和执行力强的领导团队。



结合到实际的例子,比如美国三军联合作战系统,其核心就是其“数据链”系统,它使得战场上的指挥中心、作战部队和武器平台能够实时交换数据,达到精确协作的目的。从下面这段描述我们就能感受到这种高效无缝协作的威力:



“在 7 年之后的海湾战争中,初级的“数据链”就已显威战场。以美军拦截导弹作战为例,就可以看到“数据链”的作用。伊军的“飞毛腿”导弹一发射,12 秒钟之后,位于太平洋上空的美国防支援计划(DSP)的导弹预警卫星就发现了“飞毛腿”,并迅速测出它的航行轨道及预定着陆地区,报警信息及有关数据迅速传递到位于澳大利亚的美国航天司令部的一个数据处理中心,数据中心的巨型计算机紧急处理这些数据之后,得到对“飞毛腿”导弹进行有效拦截的参数,然后航天司令部将这些参数通过卫星传给位于沙特阿拉伯的“爱国者”防空导弹指挥中心。防空导弹指挥中心立刻将数据装填到“爱国者”导弹上并发射,整个过程只需要 3 分钟左右的时间,而“飞毛腿”至少要飞行 4 ~ 5 分钟才能到达预定目标的上空,这就为拦截导弹创造了条件。…”



设计考虑



在明确了以上这些设计原则外, 我们需要一步步考虑整个架构的实现途径。首先面临的就是一些基础架构的选择。



基础架构的选择
在这里我们需要回答一系列的问题:自己开发还是购买?开源的还是商业的?选择什么 Web Service 的基础平台?选择什么样的消息中间件(Message Oriented Middleware, MOM)?是否采用企业服务总线(Enterprise Service Bus, ESB)?



这其中讨论的最多的就是是否以及如何使用 ESB。个人观点,ESB 是有价值的,仅当系统确实需要 ESB 的功能时。Accenture 首席技术官 Don Rippert 在他的一次早期访谈中提到发挥 SOA 的全部潜力大致需要以下 4 个步骤:



开始采用 SOA 架构,使用 XML 等标准的方式来使用应用程序接口
捕获一些业务过程,并将它们转化为 Web 服务
引入并全面使用企业服务总线
将业务过程执行语言(Business Process Execution Language, BPEL)集成进来,利用业务过程建模工具和 BPEL 可以创建不同的应用行为,而无需修改软件
为什么将 ESB 的使用放在第三个步骤呢,那我们需要从 ESB 的定义入手,来了解 ESB 究竟带给我们些什么。ESB 应该被理解为模式而不是产品,它应该至少具备以下这些功能:



服务的虚拟化,支持虚拟化通讯参与方之间的服务交互并对其进行管理。意思就是服务只需要关注完成自己的功能,不需要关心哪个服务调用它以及它需要调用哪个服务。
服务的转化、包装以及桥接
消息的传递、过滤以及路由
服务编制(Orchestration)
还记得前面将 EDA/SOA 和人体进行类比的例子吗?如果按照该思路,ESB 就可以看作是人体的中枢神经系统。其接受眼睛传入的“狮子来了”的信息,整体加工后成为协调的运动性传出,手脚也就开始动作了。



从上面的定义可以看出,ESB 更多地关注应用流程方面的信息,将业务流程剥离出来并将其交由 ESB 来统一管理。因此,有一个非常简单的标准来判断是否需要采用企业服务总线:就是看你的应用本身是否有很复杂的业务流程,而且可能这些流程会经常发生变化。依据这条标准,我觉得很多应用一开始都没有复杂到需要立即采用企业服务总线,比如说一个股票的后台管理系统,其业务流程相对来说比较简单固定,就没有必要引入企业服务总线这样重量级的解决方案。



当然,ESB 中分解流程信息的思想我们还是可以借鉴的,只不过我们可以用更简单的方法来实现。



EDA 的实现途径
在 EDA 中,按照事件简易程度的不同,事件处理模型可以分为以下三种:



简单事件处理 (Simple Event Processing)
流事件处理 (Stream Event Processing)
复杂事件处理 (Complex Event Processing, CEP)
在一个成熟的事件驱动架构中,这三种往往会混合在一起使用。目前,很多公司都推出了支持 CEP 功能的产品。但是在实际应用过程中,我们还是需要秉承由简入繁的原则。能用简单的事件处理解决问题,就没必要使用复杂的。



实现事件驱动架构最简单、直观的方式就是使用消息。在 JMS 的体系架构里,我们很容易来实现事件驱动的一些基础元素:事件的生产者、消费者和通道。下图为在发布 / 订阅模式下,消息发布者、订阅者以及消息通道和主题之间的交互。



图 2. 一个发布者、多个订阅者、事件通道和主题之间的交互
图 2. 一个发布者、多个订阅者、事件通道和主题之间的交互



(图片来自 http://www.ibm.com/developerworks/cn/opensource/os-ag-eventdriven/index.html)



严格意义上来说,事件和消息是不同的概念。消息代表非直接交互时简短的信息,而事件往往代表状态的显著变化。可以把事件看作消息的子类,因为后者还包括包含数据的消息等。而且,在实际应用中,一个消息中往往同时包含事件和数据的内容。比如系统接收客户的订单后,它会发布一条消息:其中既包括事件(新增客户订单),又包括新订单的具体数据。



基础组件



在确定了系统的架构后,我们需要着手来实现它。经过这么多年的实践,人们也总结出一些基础的组件,这些组件对于事件驱动的面向服务架构来说是必不可少的,或者说经常被使用到的。



Web 服务基础架构 (XML,SOAP,WSDL,UDDI 和 Quality of services)
企业服务总线(针对复杂应用)
消息中间件
监控体系
异常处理的讨论
配置和规则引擎
其中第一、二项大家讨论得最多,第三项也经常被提及。作为消息运转的基础,消息中间件(Message-Oriented Middleware,MOM)必须做到安全、可靠和快捷。市面上有很多很成熟的产品,比如 WebSphere MQ,Apache ActiveMQ 等。而且还有些针对特定行业的特色化产品,比如 WebSphere MQ Low Latency Messaging 是一款专门针对金融行业的中间件,用来满足高吞吐量、低延迟的业务需求。



而后三项讨论的并不多,但这些对于我们的应用来说又都是非常关键的。我会在后续的文章中逐一进行介绍。



图 3. 各个子系统和基础组件之间的协作
图 3. 各个子系统和基础组件之间的协作



回页首



结束语



采用某个概念非常简单,我们实际需要的是如何结合自身项目的实际需求,真正地利用这些概念背后那些好的思想。利用这些智慧结晶来解决面临的问题,这就需要大家多从实际出发来思考问题。很多时候,过多的概念只会让你更加混淆,我们真正需要记住的不是这些名词,而是这些名词背后的思想——这些在软件架构中一直被传承的东西



https://www.cnblogs.com/SUNSHINEC/p/8301910.html



事件驱动架构模式是一种非常流行的分布式异步架构模式,经常被用与构建高可伸缩性的应用程序。当然它也适合小型应用,复杂应用和规模比较大的应用。这种架构模式由一系列高度解耦的、异步接收和处理事件的单一职责的组件所组成。
  事件驱动架构由两个主要的拓扑组成,分别是调停者拓扑和代理者拓扑。调停者拓扑通过一个中央的调停者来编排各种处理步骤。然而代理者拓扑适用于那些当你想将事件链式的聚在一起但不使用中央调停者的情况。由于这两种模式特性以及实现均不一样,所以理解哪一个模式最适合你的实际情况是非常重要的。



调停者拓扑
调停者拓扑适合那些有很多步骤需要处理,并且需要按照某种程度的编排来处理的事件。举个例子,一个处理股票交易的事件首先需要你首先验证交易的本身合法性,然后检查这个股票交易是否合规,然后把股票交给股票代理商,计算佣金,然后通过代理商将股票移送给客户。这些步骤都需要一个编排中心来决定这些步骤的顺序,并且决定哪些能是串行的,哪些是并行的。



调停者拓扑主要有4个主要的架构组件组成:事件队列(Event Queue)、调停者(Mediator)、事件通道(Event Channel)和事件处理器(Event Processor)。
  事件流通过客户端发送到消息队列,事件队则传递消息到调停者。调停者接收到队列传递过来的原始消息,然后编排成异步的消息发送到事件通道,事件通道则通过事件处理器执行处理过程的每一步。事件处理器则监听事件通道,根据自身不同的业务逻辑来处理从调停者接受的事件。图2-1说明了事件驱动架构中通用的调停者拓扑架构:



图2-1
通常来说可以形成拥有十几个到几百个事件队列的事件驱动的体系结构。这种架构并没有规定事件队列组件如何实现,可以是一个消息队列,一个 Web Service终端,等等,或者任何包含这些方式的组合。



这种模式有两种事件类型:初始化事件和待处理事件。初始化事件是调停者接受到的原始事件,待处理事件是由调停者生成的事件,并由事件处理器接收。



事件调停者的责任就是编排传递进来的初始化事件。对于初始化事件的每一步,调停者都会通过事件通道发送一个具体的待处理事件。然后被事件处理器接收并处理。很重要的一点需要记住,事件调停者并不处理任何的业务逻辑,它只是知道处理这些初始化时候需要哪些步骤。



事件通道被事件调停者用来异步传递待处理消息到事件处理器。事件通道可以是一个消息队列(点对点),也可以是一个消息主题(发布/订阅),尽管消息主题的传输模式被调停者拓扑广泛使用,以便待处理事件能够同时被多个事件处理器处理(每个事件处理器根据接受的事件处理的任务是不同的)。



事件处理器包含了处理待处理事件的业务逻辑。每一个事件处理器相对于整个应用程序来说都是自包含的,独立的,高度解耦的,并且只处理指定的任务。然而每一个事件处理器的划分粒度可以从细粒度(比如只计算一个订单的销售税)到粗粒度(比如处理保险理赔)。但是无论怎样,记住重要的一点,每一个事件处理器都只处理单个业务任务并且不依赖其他事件处理器的结果。



事件调停者可以有多种方式来实现。作为一个架构师,你应该了解每一种实现的选项以便选择一个更适合你的需求的实现。



最简单和最通用的调停者实现就是用开源的集成中心,比如Spring Integration, Apache Camel或者Mule ESB。这些开源的集成中心的事件流通常都是Java实现或者通过DSL(领域特定语言)来实现。对于更复杂的调解与编排,可以通过与开源的BPEL引擎比如Apache ODE相结合,使用BPEL(业务流程执行语言)来实现。
BPEL是标准的类XML的语言,用来描述处理初始化事件需要的数据和步骤。对于那些更复杂的应用程序,可以使用业务流程管理,比如jBPM实现。



理解你的需求并且选择合适的实现对于使用这种拓扑结构的事件驱动架构的成功起着决定性的作用。与使用BPM只是来解决简单的路由逻辑一样,使用开源的集成中心来处理非常复杂的业务流程的编排,必然会也会导致失败。



为了阐明事件调停者是如何工作的,假设你向一家保险公司投保了并且现在打算搬家。在这种情况下,初始的事件可能会叫做“重新安置事件”。事件调停者处理这个“重新安置事件”的步骤包含在图2.2中。对于每一个初始化的事件步骤而言,事件调停者会创建一系列的待处理事件(比如“修改地址”,“重新计算报价”等待),发送这些待处理事件到事件通道并且等待相应的事件处理器处理(比如客户处理器,报价处理器等)这个处理过程会一直等到所有的步骤都处理完成才结束。在重新计算报价和更新索赔之间有个小棒,表示这可以并行地处理
https://www.jianshu.com/p/ffc408ae257f



简单来说,事件驱动架构就是基于事件进行通信的软件架构,它具有以下的特点:



分布式异步架构,事件生产者和消费者高度解耦(生产者不知道有多少消费者要消费对应事件),事件消费者之间也高度解耦(消费者之间也互不感知)
更好的性能,由于事件的异步本质,软件不易产生拥堵,能够处理更高的流量。
事件处理器可以独立的开发,测试,部署,并容易接入到整个生态系统,故可扩展性好。
Orchestration vs Choreography
我们在聊事件驱动架构时,先了解下Orchestration 和 Choreography的区别:



他们都是服务组合的方式,一种是中心化的方式,另外一种是去中心化的方式,直接从下图就可以看出他们之间的区别:



Orchestration vs choreography.png
在上一篇文章里面已经介绍了,由于HTTP的同步特性(https://www.jianshu.com/p/a2c6126262d6),在很多场景下,Blocking式的http调用其实并不适用,因此为了提高系统性能,缩短总体响应时间(e.g后端多个服务编排的时间过长),提高系统的整体可用性(e.g: 下游downstream服务不稳定,导致的可用性问题),越来越多的系统都采用了Event Driven Architecture的方式进行服务的集成。



在Event-Based的系统里,服务的集成是通过Choreography的方式,而不是Orchestration的方式来实现的,举个列子如下图:



Orchestration (1).png



Choreography.png
Orchestration,在图中Customers API在创建Customer的过程中,直接调用了Account API进行了账号的创建,在返回体中包含了一个hypermedia link,允许consumer通过这个链接获取到Account的信息。
Choreography,在图中Customers API并没有试图直接创建账号,而是抛出一个“CustomerCreated”的事件到queue或者stream里面,然后就将用户的数据反馈给消费者了,之后,相应的事件订阅者,比如Accounts API就被通知了,或者主动去拉取“CustomerCreated”事件,获取到事件后,执行相应的操作,创建了Account并抛出了“AccountCreated”事件,然后Customer API作为这个事件的订阅者,被通知,进行了相应的关联操作,因此,当consumer获取customer的时候,Account的信息已经被内联到customer数据结构了。



事件驱动和异步IO
通常,我们写服务器处理模型的程序时,有以下几种模型:
(1)每收到一个请求,创建一个新的进程,来处理该请求;
(2)每收到一个请求,创建一个新的线程,来处理该请求;
(3)每收到一个请求,放入一个事件列表,让主进程通过非阻塞I/O方式来处理请求
上面的几种方式,各有千秋,
第(1)中方法,由于创建新的进程的开销比较大,所以,会导致服务器性能比较差,但实现比较简单。
第(2)种方式,由于要涉及到线程的同步,有可能会面临死锁等问题。
第(3)种方式,在写应用程序代码时,逻辑比前面两种都复杂。
综合考虑各方面因素,一般普遍认为第(3)种方式是大多数网络服务器采用的方式



看图说话讲事件驱动模型
在UI编程中,常常要对鼠标点击进行相应,首先如何获得鼠标点击呢?
方式一:创建一个线程,该线程一直循环检测是否有鼠标点击,那么这个方式有以下几个缺点:



  1. CPU资源浪费,可能鼠标点击的频率非常小,但是扫描线程还是会一直循环检测,这会造成很多的CPU资源浪费;如果扫描鼠标点击的接口是阻塞的呢?

  2. 如果是堵塞的,又会出现下面这样的问题,如果我们不但要扫描鼠标点击,还要扫描键盘是否按下,由于扫描鼠标时被堵塞了,那么可能永远不会去扫描键盘;

  3. 如果一个循环需要扫描的设备非常多,这又会引来响应时间的问题;
    所以,该方式是非常不好的。



方式二:就是事件驱动模型
目前大部分的UI编程都是事件驱动模型,如很多UI平台都会提供onClick()事件,这个事件就代表鼠标按下事件。事件驱动模型大体思路如下:



  1. 有一个事件(消息)队列;

  2. 鼠标按下时,往这个队列中增加一个点击事件(消息);

  3. 有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数,如onClick()、onKeyDown()等;

  4. 事件(消息)一般都各自保存各自的处理函数指针,这样,每个消息都有独立的处理函数;



事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。



让我们用例子来比较和对比一下单线程、多线程以及事件驱动编程模型。下图展示了随着时间的推移,这三种模式下程序所做的工作。这个程序有3个任务需要完成,每个任务都在等待I/O操作时阻塞自身。阻塞在I/O操作上所花费的时间已经用灰色框标示出来了。



在单线程同步模型中,任务按照顺序执行。如果某个任务因为I/O而阻塞,其他所有的任务都必须等待,直到它完成之后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。如果任务之间并没有互相依赖的关系,但仍然需要互相等待的话这就使得程序不必要的降低了运行速度。



在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操作系统来管理,在多处理器系统上可以并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其他线程得以继续执行。与完成类似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,因为这类程序不得不通过线程同步机制如锁、可重入函数、线程局部存储或者其他机制来处理线程安全问题,如果实现不当就会导致出现微妙且令人痛不欲生的bug。



在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其他昂贵的操作时,注册一个回调到事件循环中,然后当I/O操作完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽可能的得以执行而不需要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,因为程序员不需要关心线程安全问题。



当我们面对如下的环境时,事件驱动模型通常是一个好的选择:



程序中有许多任务,而且…
任务之间高度独立(因此它们不需要互相通信,或者等待彼此)而且…
在等待事件到来时,某些任务会阻塞。
当应用程序需要在任务间共享可变的数据时,这也是一个不错的选择,因为这里不需要采用同步处理。



网络应用程序通常都有上述这些特点,这使得它们能够很好的契合事件驱动编程模型。



事件驱动机制跟消息驱动机制相比



事件:按下鼠标,按下键盘,按下游戏手柄,将U盘插入USB接口,都将产生事件。比如说按下鼠标左键,将产生鼠标左键被按下的事件。
消息:当鼠标被按下,产生了鼠标按下事件,windows侦测到这一事件的发生,随即发出鼠标被按下的消息到消息队列中,这消息附带了一系列相关的事件信息,比如鼠标哪个键被按了,在哪个窗口被按的,按下点的坐标是多少?如此等等。



1.要理解事件驱动和程序,就需要与非事件驱动的程序进行比较。实际上,现代的程序大多是事件驱动的,比如多线程的程序,肯定是事件驱动的。早期则存在许多非事件驱动的程序,这样的程序,在需要等待某个条件触发时,会不断地检查这个条件,直到条件满足,这是很浪费cpu时间的。而事件驱动的程序,则有机会释放cpu从而进入睡眠态(注意是有机会,当然程序也可自行决定不释放cpu),当事件触发时被操作系统唤醒,这样就能更加有效地使用cpu.
2.再说什么是事件驱动的程序。一个典型的事件驱动的程序,就是一个死循环,并以一个线程的形式存在,这个死循环包括两个部分,第一个部分是按照一定的条件接收并选择一个要处理的事件,第二个部分就是事件的处理过程。程序的执行过程就是选择事件和处理事件,而当没有任何事件触发时,程序会因查询事件队列失败而进入睡眠状态,从而释放cpu。
3.事件驱动的程序,必定会直接或者间接拥有一个事件队列,用于存储未能及时处理的事件。
4.事件驱动的程序的行为,完全受外部输入的事件控制,所以,事件驱动的系统中,存在大量这种程序,并以事件作为主要的通信方式。
5.事件驱动的程序,还有一个最大的好处,就是可以按照一定的顺序处理队列中的事件,而这个顺序则是由事件的触发顺序决定的,这一特性往往被用于保证某些过程的原子化。
6.目前windows,linux,nucleus,vxworks都是事件驱动的,只有一些单片机可能是非事件驱动的。



事件模式耦合高,同模块内好用;消息模式耦合低,跨模块好用。事件模式集成其它语言比较繁琐,消息模式集成其他语言比较轻松。事件是侵入式设计,霸占你的主循环;消息是非侵入式设计,将主循环该怎样设计的自由留给用户。如果你在设计一个东西举棋不定,那么你可以参考win32的GetMessage,本身就是一个藕合度极低的接口,又足够自由,接口任何语言都很方便,具体应用场景再在其基础上封装成事件并不是难事,接口耦合较低,即便哪天事件框架调整,修改外层即可,不会伤经动骨。而如果直接实现成事件,那就完全反过来了。



https://www.cnblogs.com/lguow/p/10750296.html



https://blog.csdn.net/bkxiaoc/article/details/76020360
简单记录自己对于 消息驱动 和 事件驱动的理解。



关于这二者的具体区别,于实现上来说,二者都是 注册绑定,然后交付执行。



消息驱动模型在注册的时候仅仅注册一个回调函数作为处理函数。
而事件驱动模型则需要注册多个函数作为处理函数。



消息驱动模型由于处理函数只有一个的缘故,
故需要在回调函数中使用switch等手段,
对消息进行派发并具体处理。
而事件驱动模型则需要在各个回调函数中处理各自的事物。



所以从框架角度说,
消息驱动模型的复用性高于事件驱动模型,
或者说事件驱动模型一般用于处理某个特定的问题。



而造成这种情形的原因是,
消息驱动模型不需要知道具体的消息含义,
而事件驱动模型则需要知道具体的事件含义,
否则无法通过回调函数处理。



细节说明:
事件驱动机制跟消息驱动机制相比



9102年,重新审视 事件驱动 与 消息驱动。



事件
驱动
消息
需要理解的是这三个词的含义。



首先聊一下何为驱动,
驱动具象化理解就像开车一样,点一脚油就动一下。



抽象的代码表示驱动一词



func driver() {
while(condition) {
watch(worker);
}
}
1
2
3
4
5
6
何为事件,字面意思,
意思是比较重大、对一定的人群会产生一定影响的事情。



简单用代码描述如下



// 用户定义
register(read_fn, flag);



// 用户定义
func read_fn(bytes []) {
dosomething();
}



// 框架提供
func register(do, flag) {
fn[flag] = do;
}



// 框架提供
func watch(worker) {
// 例子,此处也可能是 epoll poll select 等
// 或者是经过包装的高级模型
which= WaitForMultipleObjecct();
fnctx[which].flag;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
何为消息,同字面意思



// 用户定义
func worker(msg) {
switch(msg) {
case 0: break;
case 1: break;
default: break;
//….
}
}



// 框架提供
func watch(worker) {
// 阻塞队列常用于实现消息队列和事件队列
msg = GetHeadFromBlockQueue();
worker(msg);
}



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
上述就是常见框架的缩影,具体到日常框架中,再对比一下使用方式,如下。



例如 libuv 或libevent的使用示例



struct event* ev_listen = event_new(xxx);



event_add(ev_listen, NULL);
bufferevent_setcb(bev, socket_read_cb, NULL, event_cb, arg);



// 事件循环
event_base_dispatch(base);
1
2
3
4
5
6
7
再例如



// 省略注册窗口类时对于消息分发函数注册等代码。
// ….



// 消息泵
BOOL ret;
while((ret=GetMessage(&msg,hWnd,0,0))!=0) {
if(ret==-1) {
//handle the error and possibly exit
return -1;
}
else {
TranslateMessage(&msg);
DispatchMessage(&msg);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
实现上二者差异并不算大,例如消息驱动中,可以向消息队列中压入消息唤醒,而事件驱动中也可以从外部写fd 唤醒。



事件的作用范围为系统提供的事件种类。
消息的作用范围为系统提供的消息种类。



可以实现自定义消息,却无法自定义事件。
当然消息本身可以看做是一种特殊的事件。



二者应用领域有一定交集,各有所长。



事件驱动模型主要应用于 网络框架,有着固定的几大基础事件。
[被]连接,[被]断开,收,发。



消息驱动模型常见于 窗口框架。需要应付种类繁多的消息。
WM_xxxxx
https://www.zhihu.com/question/30393750



消息驱动
A发送一个消息到消息队列,B收到该消息。A很明确这个消息是发给B的。通常是点对点模式。



事件驱动
A发出一个事件,B或者C收到这个事件,或者没人收到这个事件,A只会产生一个事件,不会关心谁会处理这个事件 。通常是发布订阅模式。



https://www.jianshu.com/p/df711376c33e
消息驱动和事件驱动很类似,都是先有一个事件,然后产生一个相应的消息,再把消息放入消息队列,由需要的项目获取。他们的区别是消息是谁产生的



消息驱动:鼠标管自己点击不需要和系统有过多的交互,消息由系统(第三方)循环检测,来捕获并放入消息队列。消息对于点击事件来说是被动产生的,高内聚。



事件驱动:鼠标点击产生点击事件后要向系统发送消息“我点击了”的消息,消息是主动产生的。再发送到消息队列中。



ps:二者都可以根据消息队列判断有没有事件,然后没有的话也都可以进入睡眠。



事件模式耦合高,同模块内好用;消息模式耦合低,跨模块好用。事件模式集成其它语言比较繁琐,消息模式集成其他语言比较轻松。事件是侵入式设计,霸占你的主循环;消息是非侵入式设计,将主循环该怎样设计的自由留给用户。如果你在设计一个东西举棋不定,那么你可以参考win32的GetMessage,本身就是一个藕合度极低的接口,又足够自由,接口任何语言都很方便,具体应用场景再在其基础上封装成事件并不是难事,接口耦合较低,即便哪天事件框架调整,修改外层即可,不会伤经动骨。而如果直接实现成事件,那就完全反过来了。



Jfinal中dreamlu的事件驱动插件Jfinal-event
准备:先新建一个插件。初始化时,扫描指定包里面的有@Listen.classannotion的类放入List中,然后从每一个监听器的泛型type中获取事件类。生成一个map容器,其中键是事件,值是监听器。然后初始化EventKit(将上面的map传入),方便接下来使用。
工作:在需要监听的地方用EventKit.postEvent(XXXEvent.class)表示发生了一个XXXEvent.class事件,然后EventKit从之前的map中通过XXXEvent.class知道有哪些监听器会监听这个事件,然后建立一个新的线程执行这些监听器的新的实例并start()。



https://www.cnblogs.com/davenkin/p/eda-coding-pratices.html



单纯地讲事件驱动架构(Event Driven Architecture, EDA),那是几十年前就出现了的话题;单纯地讲领域事件,那也是这些年被大量提及并讨论得快熟透了的软件用语。然而,就笔者的观察看,事件驱动架构远没有想象中那样普遍地被开发团队所接受。即便搞微服务的人都知道除了同步的HTTP还有异步的消息机制,即便搞DDD的人都知道领域事件是其中的一等公民,事件驱动架构所带来的优点并没有相应地转化为软件从业者的青睐。



我尝试着去思考其中的原因,总结出了两点:第一是事件驱动可能是客观世界的运作方式,但不是人的自然思考问题的方式;第二是事件驱动架构在给软件带来好处的同时,又会增加额外的复杂性,比如调试的困难性,又比如并不直观的最终一致性。



当然,事实上有不少软件项目都使用了消息队列,但是这里需要明确的是,对消息队列的使用并不意味着你的项目就一定是事件驱动架构,很多项目只是由于技术方面的驱动,小范围地采用了某些消息队列(比如RabbitMQ和Kafka等)的产品而已。偌大一个系统,如果你的消息队列只是用作邮件发送的通知,那么这样系统自然谈不上采用了事件驱动架构。



放到当下,微服务兴起,DDD重现,在采用事件驱动架构时, 我们需要考虑业务的建模、领域事件的设计、DDD的约束、限界上下文的边界以及更多技术方面的因素,这一个系统工程应该如何从头到尾的落地,是需要经过思考和推敲的。还是那句话,有讲究的编程并不是一件易事。



诚然,用好事件驱动架构存在实践上的难处,然而它的优点也委实诱人,本文希望形成一定的“条理”和“套路”,让事件驱动架构能够更简单的落地。



本文主要分为两大部分,第一部分独立于具体的消息队列实现来讲解通用的对领域事件的建模,第二部分以一个真实的微服务系统为例,采用RabbitMQ作为消息队列,并以此分享完整的事件驱动架构落地实践。



本文以DDD为基础进行编码,其中会涉及到DDD中的不少概念,比如聚合根、资源库和应用服务等,对DDD不熟悉的读者可以参考笔者的DDD编码实践文章。



本文的示例代码请参考github上的e-commerce-sample项目。



第一部分:领域事件的建模
领域事件是DDD中的一个概念,表示的是在一个领域中所发生的一次对业务有价值的事情,落到技术层面就是在一个业务实体对象(通常来说是聚合根)的状态发生了变化之后需要发出一个领域事件。虽然事件驱动架构中的“事件”不一定指“领域事件”,但本文由于密切结合DDD,因此当提到事件时,我们特指“领域事件”。



创建领域事件
关于领域事件的基础知识,请参考笔者的在微服务中使用领域事件文章,本文直接进入编码实践环节。



在建模领域事件时,首先需要记录事件的一些通用信息,比如唯一标识ID和创建时间等,为此创建事件基类DomainEvent:



public abstract class DomainEvent {
private final String _id;
private final DomainEventType _type;
private final Instant _createdAt;
}
在DDD场景下,领域事件一般随着聚合根状态的更新而产生,另外,在事件的消费方,有时我们希望监听发生在某个聚合根下的所有事件,为此笔者建议为每一个聚合根对象创建相应的事件基类,其中包含聚合根的ID,比如对于订单(Order)类,创建OrderEvent:



public abstract class OrderEvent extends DomainEvent {
private final String orderId;
}
然后对于实际的Order事件,统一继承自OrderEvent,比如对于创建订单的OrderCreatedEvent事件:



public class OrderCreatedEvent extends OrderEvent {
private final BigDecimal price;
private final Address address;
private final List items;
private final Instant createdAt;
}
领域事件的继承链如下:



领域事件继承链



在创建领域事件时,需要注意2点:



领域事件本身应该是不变的(Immutable);
领域事件应该携带与事件发生时相关的上下文数据信息,但是并不是整个聚合根的状态数据,例如,在创建订单时可以携带订单的基本信息,而对于产品(Product)名称更新的ProductNameUpdatedEvent事件,则应该同时包含更新前后的产品名称:
public class ProductNameUpdatedEvent extends ProductEvent {
private String oldName; //更新前的名称
private String newName; // 更新后的名称
}
发布领域事件
发布领域事件有多种方式,比如可以在应用服务(ApplicationService)中发布,也可以在资源库(Repository)中发布,还可以引入事件表的方式,这3种发布方式的详细比较可以参考笔者的在微服务中使用领域事件文章。笔者建议采用事件表方式,这里展开讨论一下。



通常的业务处理过程都会更新数据库然后发布领域事件,这里一个比较重要的点是:我们需要保证数据库更新和事件发布之间的原子性,也即要么二者都成功,要么都失败。在传统的实践方式中,全局事务(Global Transaction/XA Transaction)通常用于解决此类问题。然而,全局事务本身的效率是很低的,另外,一些技术框架并不提供对全局事务的支持。当前,一种比较受推崇的方式是引入事件表,其流程大致如下:



在更新业务表的同时,将领域事件一并保存到数据库的事件表中,此时业务表和事件表在同一个本地事务中,即保证了原子性,又保证了效率。



在后台开启一个任务,将事件表中的事件发布到消息队列中,发送成功之后删除掉事件。



但是,这里又有一个问题:在第2步中,我们如何保证发布事件和删除事件之间的原子性呢?答案是:我们不用保证它们的原子性,我们需要保证的是“至少一次投递”,并且保证消费方幂等。此时的大致场景如下:



代码中先发布事件,成功后再从事件表中删除事件;
发布消息成功,事件删除也成功,皆大欢喜;
如果消息发布不成功,那么代码中不会执行事件删除逻辑,就像事情没有发生一样,一致性得到保证;
如果消息发布成功,但是事件删除失败,那么在第二次任务执行时,会重新发布消息,导致消息的重复发送。然而,由于我们要求了消费方的幂等性,也即消费方多次消费同一条消息是ok的,整个过程的一致性也得到了保证。
发布领域事件的整个流程如下:



领域事件的发布



接受用户请求;
处理用户请求;
写入业务表;
写入事件表,事件表和业务表的更新在同一个本地数据库事务中;
事务完成后,即时触发事件的发送(比如可以通过Spring AOP的方式完成,也可以定时扫描事件表,还可以借助诸如MySQL的binlog之类的机制);
后台任务读取事件表;
后台任务发送事件到消息队列;
发送成功后删除事件。
更多有关事件表的介绍,请参考Chris Richardson的”Transaction Outbox模式”和Udi Dahan的”在不使用分布式事务条件下如何处理消息可靠性”的视频。



在事件表场景下,一种常见的做法是将领域事件保存到聚合根中,然后在Repository保存聚合根的时候,将事件保存到事件表中。这种方式对于所有的Repository/聚合根都采用的方式处理,因此可以创建对应的抽象基类。



创建所有聚合根的基类DomainEventAwareAggregate如下:



public abstract class DomainEventAwareAggregate {
@JsonIgnore
private final List events = newArrayList();



protected void raiseEvent(DomainEvent event) {
this.events.add(event);
}

void clearEvents() {
this.events.clear();
}

List<DomainEvent> getEvents() {
return Collections.unmodifiableList(events);
} } 这里的raiseEvent()方法用于在具体的聚合根对象中产生领域事件,然后在Repository中获取到事件,与聚合根对象一起完成持久化,创建DomainEventAwareRepository基类如下:


public abstract class DomainEventAwareRepository {
@Autowired
private DomainEventDao eventDao;



public void save(AR aggregate) {
eventDao.insert(aggregate.getEvents());
aggregate.clearEvents();
doSave(aggregate);
}

protected abstract void doSave(AR aggregate); } 具体的聚合根在实现业务逻辑之后调用raiseEvent()方法生成事件,以“更改Order收货地址”业务过程为例:


public class Order extends DomainEventAwareAggregate {



//......

public void changeAddressDetail(String detail) {
if (this.status == PAID) {
throw new OrderCannotBeModifiedException(this.id);
}

this.address = this.address.changeDetailTo(detail);
raiseEvent(new OrderAddressChangedEvent(getId().toString(), detail, address.getDetail()));
}

//...... } 在保存Order的时候,只需要处理Order自身的持久化即可,事件的持久化已经在DomainEventAwareRepository基类中完成:


@Component
public class OrderRepository extends DomainEventAwareRepository {



//......

@Override
protected void doSave(Order order) {
String sql = "INSERT INTO ORDERS (ID, JSON_CONTENT) VALUES (:id, :json) " +
"ON DUPLICATE KEY UPDATE JSON_CONTENT=:json;";
Map<String, String> paramMap = of("id", order.getId().toString(), "json", objectMapper.writeValueAsString(order));
jdbcTemplate.update(sql, paramMap);
}

//......


}
当业务操作的事务完成之后,需要通知消息发送设施即时发布事件到消息队列。发布过程最好做成异步的后台操作,这样不会影响业务处理的正常返回,也不会影响业务处理的效率。在Spring Boot项目中,可以考虑采用AOP的方式,在HTTP的POST/PUT/PATCH/DELETE方法完成之后统一发布事件:



@Aspect
@Component
public class DomainEventPublishAspect {



//......
@After("@annotation(org.springframework.web.bind.annotation.PostMapping) || " +
"@annotation(org.springframework.web.bind.annotation.PutMapping) || " +
"@annotation(org.springframework.web.bind.annotation.PatchMapping) || " +
"@annotation(org.springframework.web.bind.annotation.DeleteMapping) ||")
public void publishEvents(JoinPoint joinPoint) {
logger.info("Trigger domain event publish process.");
taskExecutor.execute(() -> publisher.publish());
}
//...... } 以上,我们使用了TaskExecutor在后台开启新的线程完成事件发布,实际的发布由RabbitDomainEventPublisher完成:


@Component
public class DomainEventPublisher {



// ......
public void publish() {
Instant now = Instant.now();
LockConfiguration configuration = new LockConfiguration("domain-event-publisher", now.plusSeconds(10), now.plusSeconds(1));
distributedLockExecutor.execute(this::doPublish, configuration);
}

//...... }


这里,我们使用了分发布锁来处理并发发送的情况,doPublish()方法将调用实际的消息队列(比如RabbitMQ/Kafka等)API完成消息发送。更多的代码细节,请参考本文的示例代码。



消费领域事件
在事件消费时,除了完成基本的消费逻辑外,我们需要重点关注以下两点:



消费方的幂等性
消费方有可能进一步产生事件
对于“消费方的幂等性”,在上文中我们讲到事件的发送机制保证的是“至少一次投递”,为了能够正确地处理重复消息,要求消费方是幂等的,即多次消费事件与单次消费该事件的效果相同。为此,在消费方创建一个事件记录表,用于记录已经消费过的事件,在处理事件时,首先检查该事件是否已经被消费过,如果是则不做任何消费处理。



对于第2点,我们依然沿用前文讲到的事件表的方式。事实上,无论是处理HTTP请求,还是作为消息的消费方,对于聚合根来讲都是无感知的,领域事件由聚合根产生进而由Repository持久化,这些过程都与具体的业务操作源头无关。



综上,在消费领域事件的过程中,程序需要更新业务表、事件记录表以及事件发送表,这3个操作过程属于同一个本地事务,此时整个事件的发布和消费过程如下:



事件的发布和消费全流程



在编码实践时,可以考虑与事件发布过程相同的AOP方式完成对事件的记录,以Spring和RabbitMQ为例,可以将@RabbitListener通过AOP代理起来:



@Aspect
@Component
public class DomainEventRecordingConsumerAspect {



//......
@Around("@annotation(org.springframework.amqp.rabbit.annotation.RabbitHandler) || " +
"@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")
public Object recordEvents(ProceedingJoinPoint joinPoint) throws Throwable {
return domainEventRecordingConsumer.recordAndConsume(joinPoint);
}
//...... } 然后在代理过程中通过DomainEventRecordingConsumer完成事件的记录:


@Component
public class DomainEventRecordingConsumer {



//......
@Transactional
public Object recordAndConsume(ProceedingJoinPoint joinPoint) throws Throwable {
Object[] args = joinPoint.getArgs();
Optional<Object> optionalEvent = Arrays.stream(args)
.filter(o -> o instanceof DomainEvent)
.findFirst();

if (optionalEvent.isPresent()) {
DomainEvent event = (DomainEvent) optionalEvent.get();
try {
dao.recordEvent(event);
} catch (DuplicateKeyException dke) {
logger.warn("Duplicated {} skipped.", event);
return null;
}

return joinPoint.proceed();
}
return joinPoint.proceed();
}
//...... } 这里的DomainEventRecordingConsumer通过直接向事件记录表中插入事件的方式来判断消息是否重复,如果发生重复主键异常DuplicateKeyException,即表示该事件已经在记录表中存在了,因此直接return null;而不再执行业务过程。


需要特别注意的一点是,这里的封装方法recordAndConsume()需要打上@Transactional注解,这样才能保证对事件的记录和业务处理在同一个事务中完成。



此外,由于消费完毕后也需要即时发送事件,因此需要在发布事件的AOP配置DomainEventPublishAspect中加入@RabbitListener:



@Aspect
@Component
public class DomainEventPublishAspect {



//......
@After("@annotation(org.springframework.web.bind.annotation.PostMapping) || " +
"@annotation(org.springframework.web.bind.annotation.PutMapping) || " +
"@annotation(org.springframework.web.bind.annotation.PatchMapping) || " +
"@annotation(org.springframework.web.bind.annotation.DeleteMapping) ||" +
"@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener) ||")
public void publishEvents(JoinPoint joinPoint) {
logger.info("Trigger domain event publish process.");
taskExecutor.execute(() -> publisher.publish());
}
//...... } 事件驱动架构的2种风格 事件驱动架构存在多种风格,本文就其中的2种主要风格展开讨论,它们是:


事件通知
事件携带状态转移(Event-Carried State Transfer)
在“事件通知”风格中,事件只是作为一种信号传递到消费方,消费方需要的数据需要额外API请求从源事件系统获取,如图:



事件通知



在上图的事件通知风格中,对事件的处理流程如下:



发布方发布事件
消费方接收事件并处理
消费方调用发布方的API以获取事件相关数据
消费方更新自身状态
这种风格的好处是,事件可以设计得非常简单,通常只需要携带聚合根的ID即可,由此进一步降低了事件驱动系统中的耦合度。然而,消费方需要的数据依然需要额外的API调用从发布方获取,这又从另一个角度增加了系统之间的耦合性。此外,如果源系统宕机,消费方也无法完成后续操作,因此可用性会受到影响。



在“事件携带状态转移”中,消费方所需要的数据直接从事件中获取,因此不需要额外的API请求:



事件携带状态转移



这种风格的好处在于,即便发布方系统不可用,消费方依然可以完成对事件的处理。



笔者的建议是,对于发布方来说,作为一种数据提供者的“自我修养”,事件应该包含足够多的上下文数据,而对于消费方来讲,可以根据自身的实际情况确定具体采用哪种风格。在同一个系统中,同时采用2种风格是可以接受的。比如,对于基于事件的CQRS而言,可以采用“事件通知”,此时的事件只是一个“触发器”,一个聚合下的所有事件所触发的结果是一样的,即都是告知消费方需要从源系统中同步数据,因此此时的消费方可以对聚合下的所有事件一并处理,而不用为每一种事件单独开发处理逻辑。



事实上,事件驱动还存在第3种风格,即事件溯源,本文不对此展开讨论。更多有关事件驱动架构不同风格的介绍,请参考Martin Fowler的“事件风格”文章。



第二部分:基于RabbitMQ的示例项目
在本部分中,我将以一个简单的电商平台微服务系统为例,采用RabbitMQ作为消息机制讲解事件驱动架构落地的全过程。



该电商系统包含3个微服务,分别是:



订单(Order)服务:用于用户下单
产品(Product)服务:用于管理/展示产品信息
库存(Inventory)服务:用于管理产品对应的库存
整个系统包含以下代码库:



代码库 用途 地址
order-backend Order服务 https://github.com/e-commerce-sample/order-backend
product-backend Product服务 https://github.com/e-commerce-sample/product-backend
inventory-backend Inventory服务 https://github.com/e-commerce-sample/inventory-backend
common 共享依赖包 https://github.com/e-commerce-sample/common
devops 基础设施 https://github.com/e-commerce-sample/devops
其中,common代码库包含了所有服务所共享的代码和配置,包括所有服务中的所有事件(请注意,这种做法只是笔者为了编码上的便利,并不是一种好的实践,一种更好的实践是各个服务各自管理自身产生的事件),以及RabbitMQ的通用配置(即每个服务都采用相同的方式配置RabbitMQ设施),同时也包含了异常处理和分布式锁等配置。devops库中包含了RabbitMQ的Docker镜像,用于在本地测试。



整个系统中涉及到的领域事件如下:



示例电商系统中的领域事件



其中:



Order服务自己消费了自己产生的所有OrderEvent用于CQRS同步读写模型;
Inventory服务消费了Order服务的OrderCreatedEvent事件,用于在下单之后即时扣减库存;
Inventory服务消费了Product服务的ProductCreatedEvent和ProductNameChangedEvent事件,用于同步产品信息;
Product服务消费了Inventory服务的InventoryChangedEvent用于更新产品库存。
配置RabbitMQ
阅读本小节需要熟悉RabbitMQ中的基本概念,建议不熟悉RabbitMQ的读者事先参考RabbitMQ入门文章。



这里介绍2种RabbitMQ的配置方式,一种简单的,一种稍微复杂的。两种配置过程中会反复使用到以下概念,读者可以先行熟悉:



概念 类型 解释 命名 示例
发送方Exchange Exchange 用于接收某个微服务中所有消息的Exchange,一个服务只有一个发送方Exchange xxx-publish-x order-publish-x
发送方DLX Exchange 用于接收发送方无法路由的消息 xxx-publish-dlx order-publish-dlx
发送方DLQ Queue 用于存放发送方DLX的消息 xxx-publish-dlq order-publish-dlq
接收方Queue Queue 用于接收发送方Exchange的消息,一个服务只有一个接收方Queue用于接收所有外部消息 xxx-receive-q product-receive-q
接收方DLX Exchange 死信Exchange,用于接收消费失败的消息 xxx-receive-dlx product-receive-dlx
接收方DLQ Queue 死信队列,用于存放接收方DLX的消息 xxx-receive-dlq product-receive-dlq
接收方恢复Exchange Exchange 用于接收从接收方DLQ中手动恢复的消息,接收方Queue应该绑定到接收方恢复Exchange xxx-receive-recover-x product-receive-recover-x
在简单配置方式下,消息流向图如下:



简单方式的RabbitMQ配置



发送方发布事件到发送方Exchange
消息到达消费方的接收方Queue
消费成功处理消息,更新本地数据库
如果消息处理失败,消息被放入接收方DLX
消息到达死信队列接收方DLQ
对死信消息做手工处理(比如作日志记录等)
对于发送方而言,事件驱动架构提倡的是“发送后不管”机制,即发送方只需要保证事件成功发送即可,而不用关心是谁消费了该事件。因此在配置发送方的RabbitMQ时,可以简单到只配置一个发送方Exchange即可,该Exchange用于接收某个微服务中所有类型的事件。在消费方,首先配置一个接收方Queue用于接收来自所有发送方Exchange的所有类型的事件,除此之外对于消费失败的事件,需要发送到接收方DLX,进而发送到接收方DLQ中,对于接收方DLQ的事件,采用手动处理的形式恢复消费。



在简单方式下的RabbitMQ配置如下:



简单方式的RabbitMQ配置



在第2种配置方式稍微复杂一点,其建立在第1种基础之上,增加了发送方的死信机制以及消费方用于恢复消费的Exchange,此时的消息流向如下:



配置发送方DLQ和接收方恢复Exchange



发送方发布事件
事件发布失败时被放入死信Exchange发送方DLX
消息到达死信队列发送方DLQ
对于发送方DLQ中的消息进行人工处理,重新发送
如果事件发布正常,则会到达接收方Queue
正常处理事件,更新本地数据库
事件处理失败时,发到接收方DLX,进而路由到接收方DLQ
手工处理死信消息,将其发到接收方恢复Exchange,进而重新发到接收方Queue
此时的RabbitMQ配置如下:



在以上2种方式中,我们都启用了RabbitMQ的“发送方确认”和“消费方确认”,另外,发送方确认也可以通过RabbitMQ的事务(不是分布式事务)替代,不过效率更低。更多关于RabbitMQ的知识,可以参考笔者的Spring AMQP学习笔记和RabbitMQ最佳实践。



系统演示
启动RabbitMQ,切换到ecommerce-sample/devops/local/rabbitmq目录,运行:
./start-rabbitmq.sh
启动Order服务:切换到ecommerce-sample/order-backend项目,运行:
./run.sh //监听8080端口,调试5005端口
启动Product服务:切换到ecommerce-sample/product-backend项目,运行:
./run.sh //监听8082端口,调试5006端口
启动Inventory服务:切换到ecommerce-sample/inventory-backend项目,运行:
./run.sh //监听8083端口,调试5007端口
创建Product:
curl -X POST

http://localhost:8082/products

-H ‘Content-Type: application/json’

-H ‘cache-control: no-cache’

-d ‘{
“name”:”好吃的苹果”,
“description”:”原生态的苹果”,
“price”: 10.0
}’
此时返回Product ID:



{“id”:”3c11b3f6217f478fbdb486998b9b2fee”}
查看Product:
curl -X GET

http://localhost:8082/products/3c11b3f6217f478fbdb486998b9b2fee

-H ‘cache-control: no-cache’
返回如下:



{
“id”: {
“id”: “3c11b3f6217f478fbdb486998b9b2fee”
},
“name”: “好吃的苹果”,
“price”: 10,
“createdAt”: 1564361781956,
“inventory”: 0,
“description”: “原生态的苹果”
}
可以看到,新创建的Product的库存(inventory)默认为0。



创建Product时,会创建ProductCreatedEvent,Inventory服务接收到该事件后会自动创建对应的Inventory,日志如下:
2019-07-29 08:56:22.276 – INFO [taskExecutor-1] c.e.i.i.InventoryEventHandler : Created inventory[5e3298520019442b8a6d97724ab57d53] for product[3c11b3f6217f478fbdb486998b9b2fee].
增加Inventory为10:
curl -X POST

http://localhost:8083/inventories/5e3298520019442b8a6d97724ab57d53/increase

-H ‘Content-Type: application/json’

-H ‘cache-control: no-cache’

-d ‘{
“increaseNumber”:10
}’
增加Inventory之后,会发送InventoryChangedEvent,Product服务接收到该事件后会自动同步自己的库存,再次查看Product:
curl -X GET

http://localhost:8082/products/3c11b3f6217f478fbdb486998b9b2fee

-H ‘cache-control: no-cache’
返回如下:



{
“id”: {
“id”: “3c11b3f6217f478fbdb486998b9b2fee”
},
“name”: “好吃的苹果”,
“price”: 10,
“createdAt”: 1564361781956,
“inventory”: 10,
“description”: “原生态的苹果”
}
可以看到,Product的库存已经更新为10。



至此,Product和Inventory都准备好了,让我们下单吧:
curl -X POST

http://localhost:8080/orders

-H ‘Content-Type: application/json’

-H ‘cache-control: no-cache’

-d ‘{
“items”: [
{
“productId”: “3c11b3f6217f478fbdb486998b9b2fee”,
“count”: 2,
“itemPrice”: 10
}
],
“address”: {
“province”: “四川”,
“city”: “成都”,
“detail”: “天府软件园1号”
}
}’
返回Order ID:



{
“id”: “d764407855d74ff0b5bb75250483229f”
}
创建订单之后,会发送OrderCreatedEvent,Inventory服务接收到该事件会自动扣减相应库存:
2019-07-29 09:11:31.202 – INFO [taskExecutor-1] c.e.i.i.InventoryEventHandler : Inventory[5e3298520019442b8a6d97724ab57d53] decreased to 8 due to order[d764407855d74ff0b5bb75250483229f] creation.
同时,Inventory将发送InventoryChangedEvent,Product服务接收到该事件会自动更新Product的库存,再次查看Product:



curl -X GET

http://localhost:8082/products/3c11b3f6217f478fbdb486998b9b2fee

-H ‘cache-control: no-cache’
返回如下:



{
“id”: {
“id”: “3c11b3f6217f478fbdb486998b9b2fee”
},
“name”: “好吃的苹果”,
“price”: 10,
“createdAt”: 1564361781956,
“inventory”: 8,
“description”: “原生态的苹果”
}
可以看到,Product的库存从10减少到了8,因为先前下单时我们选了2个Product。



总结
本文首先独立于消息队列的技术实现,讲到了事件驱动架构在落地过程中的诸多方面以及问题,包括领域事件的建模、通过聚合根暂存事件然后由Repository完成存储,再由后台任务读取事件表完成事件的实际发布。在消费方,通过幂等性解决在“至少一次投递”的情况下所带来的重复消费问题。另外,还讲到了事件驱动架构的2种常见风格,即事件通知和事件携带状态转移,以及他们之间的优劣势。在第二部分,以RabbitMQ为例,分享了如何在一个微服务化的系统中落地事件驱动架构。



https://www.jianshu.com/p/fbd3951259eb
https://martinfowler.com/articles/201701-event-driven.html



https://blog.csdn.net/weixin_30892763/article/details/101438259



https://insights.thoughtworks.cn/use-domain-events-in-microservices/



稍微回想一下计算机硬件的工作原理我们便不难发现,整个计算机的工作过程其实就是一个对事件的处理过程。当你点击鼠标、敲击键盘或者插上U盘时,计算机便以中断的形式处理各种外部事件。在软件开发领域,事件驱动架构(Event Driven Architecture,EDA)早已被开发者用于各种实践,典型的应用场景比如浏览器对用户输入的处理、消息机制以及SOA。最近几年重新进入开发者视野的响应式编程(Reactive Programming)更是将事件作为该编程模型中的一等公民。可见,“事件”这个概念一直在计算机科学领域中扮演着重要的角色。



认识领域事件
领域事件(Domain Events)是领域驱动设计(Domain Driven Design,DDD)中的一个概念,用于捕获我们所建模的领域中所发生过的事情。领域事件本身也作为通用语言(Ubiquitous Language)的一部分成为包括领域专家在内的所有项目成员的交流用语。比如,在用户注册过程中,我们可能会说“当用户注册成功之后,发送一封欢迎邮件给客户。”,此时的“用户已经注册”便是一个领域事件。



当然,并不是所有发生过的事情都可以成为领域事件。一个领域事件必须对业务有价值,有助于形成完整的业务闭环,也即一个领域事件将导致进一步的业务操作。举个咖啡厅建模的例子,当客户来到前台时将产生“客户已到达”的事件,如果你关注的是客户接待,比如需要为客户预留位置等,那么此时的“客户已到达”便是一个典型的领域事件,因为它将用于触发下一步——“预留位置”操作;但是如果你建模的是咖啡结账系统,那么此时的“客户已到达”便没有多大存在的必要——你不可能在用户到达时就立即向客户要钱对吧,而”客户已下单“才是对结账系统有用的事件。



在微服务(Microservices)架构实践中,人们大量地借用了DDD中的概念和技术,比如一个微服务应该对应DDD中的一个限界上下文(Bounded Context);在微服务设计中应该首先识别出DDD中的聚合根(Aggregate Root);还有在微服务之间集成时采用DDD中的防腐层(Anti-Corruption Layer, ACL);我们甚至可以说DDD和微服务有着天生的默契。更多有关DDD的内容,请参考笔者的另一篇文章或参考《领域驱动设计》及《实现领域驱动设计》。



在DDD中有一条原则:一个业务用例对应一个事务,一个事务对应一个聚合根,也即在一次事务中,只能对一个聚合根进行操作。但是在实际应用中,我们经常发现一个用例需要修改多个聚合根的情况,并且不同的聚合根还处于不同的限界上下文中。比如,当你在电商网站上买了东西之后,你的积分会相应增加。这里的购买行为可能被建模为一个订单(Order)对象,而积分可以建模成账户(Account)对象的某个属性,订单和账户均为聚合根,并且分别属于订单系统和账户系统。显然,我们需要在订单和积分之间维护数据一致性,通常的做法是在同一个事务中同时更新两者,但是这会存在以下问题:



违背DDD中”单个事务修改单个聚合根”的设计原则;
需要在不同的系统之间采用重量级的分布式事务(Distributed Transactioin,也叫XA事务或者全局事务);
在不同系统之间产生强耦合。
通过引入领域事件,我们可以很好地解决上述问题。 总的来说,领域事件给我们带来以下好处:



解耦微服务(限界上下文);
帮助我们深入理解领域模型;
提供审计和报告的数据来源;
迈向事件溯源(Event Sourcing)和CQRS等。
还是以上面的电商网站为例,当用户下单之后,订单系统将发出一个“用户已下单”的领域事件,并发布到消息系统中,此时下单便完成了。账户系统订阅了消息系统中的“用户已下单”事件,当事件到达时进行处理,提取事件中的订单信息,再调用自身的积分引擎(也有可能是另一个微服务)计算积分,最后更新用户积分。可以看到,此时的订单系统在发送了事件之后,整个用例操作便结束了,根本不用关心是谁收到了事件或者对事件做了什么处理。事件的消费方可以是账户系统,也可以是任何一个对事件感兴趣的第三方,比如物流系统。由此,各个微服务之间的耦合关系便解开了。值得注意的一点是,此时各个微服务之间不再是强一致性,而是基于事件的最终一致性。



事件风暴(Event Storming)
事件风暴是一项团队活动,旨在通过领域事件识别出聚合根,进而划分微服务的限界上下文。在活动中,团队先通过头脑风暴的形式罗列出领域中所有的领域事件,整合之后形成最终的领域事件集合,然后对于每一个事件,标注出导致该事件的命令(Command),再然后为每个事件标注出命令发起方的角色,命令可以是用户发起,也可以是第三方系统调用或者是定时器触发等。最后对事件进行分类整理出聚合根以及限界上下文。事件风暴还有一个额外的好处是可以加深参与人员对领域的认识。需要注意的是,在事件风暴活动中,领域专家是必须在场的。更多有关事件风暴的内容,请参考这里。



创建领域事件
领域事件应该回答“什么人什么时候做了什么事情”这样的问题,在实际编码中,可以考虑采用层超类型(Layer Supertype)来包含事件的某些共有属性:



public abstract class Event {
private final UUID id;
private final DateTime createdTime;



public Event() {
this.id = UUID.randomUUID();
this.createdTime = new DateTime();
} } 可以看到,领域事件还包含了ID,但是该ID并不是实体(Entity)层面的ID概念,而是主要用于事件追溯和日志。另外,由于领域事件描述的是过去发生的事情,我们应该将领域事件建模成不可变的(Immutable)。从DDD概念上讲,领域事件更像一种特殊的值对象(Value Object)。对于上文中提到的咖啡厅例子,创建“客户已到达”事件如下:


public final class CustomerArrivedEvent extends Event {
private final int customerNumber;



public CustomerArrivedEvent(int customerNumber) {
super();
this.customerNumber = customerNumber;
} } 在这个CustomerArrivedEvent事件中,除了继承自Event的属性外,还自定义了一个与该事件密切关联的业务属性——客户人数(customerNumber)——这样后续操作便可预留相应数目的座位了。另外,我们将所有属性以及CustomerArrivedEvent本身都声明成了final,并且不向外暴露任何可能修改这些属性的方法,这样便保证了事件的不变性。


发布领域事件
在使用领域事件时,我们通常采用“发布-订阅”的方式来集成不同的模块或系统。在单个微服务内部,我们可以使用领域事件来集成不同的功能组件,比如在上文中提到的“用户注册之后向用户发送欢迎邮件”的例子中,注册组件发出一个事件,邮件发送组件接收到该事件后向用户发送邮件。



在微服务内部使用领域事件时,我们不一定非得引入消息中间件(比如ActiveMQ等)。还是以上面的“注册后发送欢迎邮件”为例,注册行为和发送邮件行为虽然通过领域事件集成,但是他们依然发生在同一个线程中,并且是同步的。另外需要注意的是,在限界上下文之内使用领域事件时,我们依然需要遵循“一个事务只更新一个聚合根”的原则,违反之往往意味着我们对聚合根的拆分是错的。即便确实存在这样的情况,也应该通过异步的方式(此时需要引入消息中间件)对不同的聚合根采用不同的事务,此时可以考虑使用后台任务。



除了用于微服务的内部,领域事件更多的是被用于集成不同的微服务,如上文中的“电商订单”例子。



通常,领域事件产生于领域对象中,或者更准确的说是产生于聚合根中。在具体编码实现时,有多种方式可用于发布领域事件。



一种直接的方式是在聚合根中直接调用发布事件的Service对象。以上文中的“电商订单”为例,当创建订单时,发布“订单已创建”领域事件。此时可以考虑在订单对象的构造函数中发布事件:



public class Order {
public Order(EventPublisher eventPublisher) {
//create order

//…

eventPublisher.publish(new OrderPlacedEvent());

}
}
注:为了把焦点集中在事件发布上,我们对Order对象做了简化,Order对象本身在实际编码中不具备参考性。



可以看到,为了发布OrderPlacedEvent事件,我们需要将Service对象EventPublisher传入,这显然是一种API污染,即Order作为一个领域对象只需要关注和业务相关的数据,而不是诸如EventPublisher这样的基础设施对象。另一种方法是由NServiceBus的创始人Udi Dahan提出来的,即在领域对象中通过调用EventPublisher上的静态方法发布领域事件:



public class Order {
public Order() {
//create order
//…
EventPublisher.publish(new OrderPlacedEvent());
}
}
这种方法虽然避免了API污染,但是这里的publish()静态方法将产生副作用,对Order对象的测试带来了难处。此时,我们可以采用“在聚合根中临时保存领域事件”的方式予以改进:



public class Order {



private List<Event> events;

public Order() {
//create order
//...
events.add(new OrderPlacedEvent());
}

public List<Event> getEvents() {
return events;
}

public void clearEvents() {
events.clear();

} } 在测试Order对象时,我们便你可以通过验证events集合保证Order对象在创建时的确发布了OrderPlacedEvent事件:


@Test
public void shouldPublishEventWhenCreateOrder() {
Order order = new Order();
List events = order.getEvents();
assertEquals(1, events.size());
Event event = events.get(0);
assertTrue(event instanceof OrderPlacedEvent);
}
在这种方式中,聚合根对领域事件的保存只能是临时的,在对该聚合根操作完成之后,我们应该将领域事件发布出去并及时清空events集合。可以考虑在持久化聚合根时进行这样的操作,在DDD中即为资源库(Repository):



public class OrderRepository {
private EventPublisher eventPublisher;



public void save(Order order) {
List<Event> events = order.getEvents();
events.forEach(event -> eventPublisher.publish(event));
order.clearEvents();

//save the order
//...
} } 除此之外,还有一种与“临时保存领域事件”相似的做法是“在聚合根方法中直接返回领域事件”,然后在Repository中进行发布。这种方式依然有很好的可测性,并且开发人员不用手动清空先前的事件集合,不过还是得记住在Repository中将事件发布出去。另外,这种方式不适合创建聚合根的场景,因为此时的创建过程既要返回聚合根本身,又要返回领域事件。


这种方式也有不好的地方,比如它要求开发人员在每次更新聚合根时都必须记得清空events集合,忘记这么做将为程序带来严重的bug。不过虽然如此,这依然是笔者比较推荐的方式。



业务操作和事件发布的原子性
虽然在不同聚合根之间我们采用了基于领域事件的最终一致性,但是在业务操作和事件发布之间我们依然需要采用强一致性,也即这两者的发生应该是原子的,要么全部成功,要么全部失败,否则最终一致性根本无从谈起。以上文中“订单积分”为例,如果客户下单成功,但是事件发送失败,下游的账户系统便拿不到事件,导致最终客户的积分并不增加。



要保证业务操作和事件发布之间的原子性,最直接的方法便是采用XA事务,比如Java中的JTA,这种方式由于其重量级并不被人们所看好。但是,对于一些对性能要求不那么高的系统,这种方式未尝不是一个选择。一些开发框架已经能够支持独立于应用服务器的XA事务管理器(如Atomikos和Bitronix),比如Spring Boot作为一个微服务框架便提供了对Atomikos和Bitronix的支持。



如果JTA不是你的选项,那么可以考虑采用事件表的方式。这种方式首先将事件保存到聚合根所在的数据库中,由于事件表和聚合根表同属一个数据库,整个过程只需要一个本地事务就能完成。然后,在一个单独的后台任务中读取事件表中未发布的事件,再将事件发布到消息中间件中。



这种方式需要注意两个问题,第一个是由于发布了事件之后需要将表中的事件标记成“已发布”状态,即依然涉及到对数据库的操作,因此发布事件和标记“已发布”之间需要原子性。当然,此时依旧可以采用XA事务,但是这违背了采用事件表的初衷。一种解决方法是将事件的消费方创建成幂等的,即消费方可以多次消费同一个事件而不污染系统数据。这个过程大致为:整个过程中事件发送和数据库更新采用各自的事务管理,此时有可能发生的情况是事件发送成功而数据库更新失败,这样在下一次事件发布操作中,由于先前发布过的事件在数据库中依然是“未发布”状态,该事件将被重新发布到消息系统中,导致事件重复,但由于事件的消费方是幂等的,因此事件重复不会存在问题。



另外一个需要注意的问题是持久化机制的选择。其实对于DDD中的聚合根来说,NoSQL是相比于关系型数据库更合适的选择,比如用MongoDB的Document保存聚合根便是种很自然的方式。但是多数NoSQL是不支持ACID的,也就是说不能保证聚合更新和事件发布之间的原子性。还好,关系型数据库也在向NoSQL方向发展,比如新版本的PostgreSQL(版本9.4)和MySQL(版本5.7)已经能够提供具备NoSQL特征的JSON存储和基于JSON的查询。此时,我们可以考虑将聚合根序列化成JSON格式的数据进行保存,从而避免了使用重量级的ORM工具,又可以在多个数据之间保证ACID,何乐而不为?



总结
领域事件主要用于解耦微服务,此时各个微服务之间将形成最终一致性。事件风暴活动有助于我们对微服务进行拆分,并且有助于我们深入了解某个领域。领域事件作为已经发生过的历史数据,在建模时应该将其创建为不可变的特殊值对象。存在多种方式用于发布领域事件,其中“在聚合中临时保存领域事件”的方式是值得推崇的。另外,我们需要考虑到聚合更新和事件发布之间的原子性,可以考虑使用XA事务或者采用单独的事件表。为了避免事件重复带来的问题,最好的方式是将事件的消费方创建为幂等的



https://microservices.io/patterns/data/transactional-outbox.html
https://vimeo.com/channels/946231/111998645


Category architect