返回
Featured image of post 再议 DDD 视角下的 EFCore 与 领域事件

再议 DDD 视角下的 EFCore 与 领域事件

在上家公司工作的时候,我们有部分业务是采用事件/消息驱动的形式。虽然,当时博主还没能用上诸如 KafkaRabbitMQ 这样的消息中间件,可数据库 + Quartz 这样一个堪称“简陋”的组合,完全不影响博主对事件/消息驱动这种思想的启蒙。后来,在实现数据库审计数据同步 等问题的时候,更是从实践层面上加深了这一印象。再后来,博主陆陆续续地接触了 DDD,其中 领域事件 的概念,第一次让博主意识到,原来事件可以和聚合根产生某种联系。退一步讲,即使你没有接触过 DDD,你只要听说过 MediatR 或者 CQRS,相信你立马就能明白我在说什么。最近的一次 Code Review,这个问题再次浮出水面,一个人在面对过去的时候,会非常容易生出物是人非的感慨,代码和人类最大的区别就在于,代码可以永远以某种永恒的形式存在,就像很多年后我打开高中时候用 Visual Basic 编写的程序,它依然可以像我第一次看见它一样运行。所以,一直在变化的大抵是我,无非是人类更擅长自我说服,它让你相信你一直“不忘初心”。因此,今天我想再聊聊 DDD 视角下的 EFCore 与 领域事件。

似曾相识燕归来

其实,人生中有特别多的似曾相识,就像 Wesley 老大哥和我说起 Kubernetes 的时候,我脑海中一直浮现着的画面,是第一次见到他的时候,他意气风发地给我讲 MSBuild 和 单元测试。为什么会记得他意气风发的样子呢?大概是有一天我到他这个年龄的时候,我终于羡慕彼时彼刻的他,还拥有着这样一副意气风发的面孔罢。对于大部分事件/消息驱动的业务,相信大家都见到过类似下面这样的代码片段:

 1// 保存订单
 2var orderInfo = new OrderInfo(
 3    address: "陕西省西安市雁塔区大雁塔北广场", 
 4    telephone: "13456789091", 
 5    quantity: 10, 
 6    remarak: "盛夏白瓷梅子汤,碎冰碰壁铛啷响"
 7); 
 8
 9_repository.Insert(orderInfo);
10_chinookContext.SaveChnages();
11
12// 发布消息
13var orderInfoCreateEvent = orderInfo.Adapt<OrderInfoCreateEvent>();
14eventBus.Publish(orderInfoCratedEvent)

这段代码非常容易理解,当我们创建完一个订单以后,需要发布一条订单创建的消息。当时组内做 Code Review 的时候,大家都普遍认为,Publish() 需要放在 SaveChanges() 后面,理由是:如果 Publish() 放在 SaveChanges() 前面,可能会出现消息发出去了,而数据没有保存成功的情况。这个想法当然没有问题,唯一的问题在于,实际业务中构造消息的过程绝不可能如此简单,如果它依赖中间过程的变量或者参数,你不可能总是有机会把这个过程放到 SaveChanges() 后面,更不必说,实际业务中可能会要求你在订单里处理客户相关的事件。显然,这种方案对代码的侵入非常严重。那么,有没有更好一点的方案呢?

 1// Entity 定义,适用于无单主键或使用联合主键
 2public abstract class Entity : IEntity
 3{
 4	private List<IDomainEvent> _domainEvents = null;
 5	public IReadOnlyCollection<IDomainEvent> DomainEvents 
 6        => _domainEvents?.AsReadOnly();
 7
 8    // 添加事件
 9	public void AddDomainEvent(IDomainEvent eventItem)
10	{
11        _domainEvents = _domainEvents ?? new List<IDomainEvent>();
12        _domainEvents.Add(eventItem);
13    }
14    
15    // 移除事件
16	public void RemoveDomainEvent(IDomainEvent eventItem)
17	{
18	    _domainEvents?.Remove(eventItem);
19	}
20    
21    // 清除事件
22	public void ClearDomainEvents()
23	{
24	    _domainEvents?.Clear();
25	}
26
27	public abstract object[] GetKeys();
28	public virtual DateTime CreatedAt { get; set; }
29	public virtual string CreatedBy { get; set; }
30}
31
32// Entity 定义,适用于单主键
33public abstract class Entity<TKey> : Entity, IEntity<TKey>
34{
35    public TKey Id { get; set; }
36	public override object[] GetKeys() => new object[] { Id };
37}

我们不妨来换一种思路,既然我们期待这些 Publish() 相关的代码片段总是在 SaveChanges() 后面执行,那么,我们是不是可以将这些事件/消息存储下来,然后在某个合适的时机进行触发呢?当然,利用 .NET 里的委托就能达到这种延迟执行的目的,我们这里采用的方案是为每个 Entity 增加一个 DomainEvents 的属性,并通过重写 DbContextSaveChanges() 方法来实现消息的分发,这里我们暂时不考虑事务,因为对于像 KafkaRabbitMQ 这样的消息队列,基本上都不支持消息的撤回,换言之,其实这里是保证不了 SavaChanges()Publish() 的一致性的:

 1// OrderInfoCreatedEvent 继承自 DomainEvent
 2public class OrderInfoCreatedEvent : DomainEvent
 3{
 4    public string Remark { get; set; }
 5    public string Address { get; set; }
 6    public string Telephone { get; set; }
 7    public decimal Quantity { get; set; }
 8}
 9
10// DomainEvent 实现了 IDomainEvent 接口
11public class DomainEvent : IDomainEvent
12{
13    public Guid EventId { get; set; } = Guid.NewGuid();
14}

此时,注意到,所有的消息都实现了 IDomainEvent 接口,所以,对于一开始的示例,我们可以像下面这样来改造。这里,我们采用 DDD 的思想来改造这段代码,即按照“充血模型”,为 OrderInfo 类添加更多的行为,我们不妨假设,当创建订单的时候,需要产生一条 OrderInfoCreatedEvent 消息;当修改订单地址的时候,需要产生一条 OrderInfoUpdatedEvent 消息,我们来看看改造以后的代码会变成什么样子:

 1var orderInfo = new OrderInfo(
 2    address: "陕西省西安市雁塔区大雁塔北广场", 
 3    telephone: "13456789091", 
 4    quantity: 10, 
 5    remarak: "盛夏白瓷梅子汤,碎冰碰壁铛啷响"
 6); 
 7// 确认订单
 8orderInfo.Confirm();
 9_repository.Insert(orderInfo);
10
11// 修改地址
12orderInfo.ModifyAddress("陕西省西安市雁塔区卜蜂莲花超市");
13await _chinookContext.SaveChangesAsync();

其中,OrderInfo 内部定义了 Confirm()ModifyAddress 两个方法用来处理对应的领域事件:

 1public class OrderInfo : Entity<Guid>
 2{
 3    // .... 
 4
 5    public void Confirm()
 6    {
 7        CreatedBy = "System";
 8        CreatedAt = DateTime.Now;
 9        AddDomainEvent(this.Adapt<OrderInfoCreatedEvent>());
10    }
11
12    public void ModifyAddress(string address)
13    {
14        Address = address;
15        AddDomainEvent(this.Adapt<OrderInfoUpdatedEvent>());
16    }
17}

可以注意到,鉴于 Entity 这个基类中可以操作领域事件,所以,我们只需要在合适的位置调用 AddDomainEvent() 即可。当然,按照 DDD 的思想,业务通常是针对某个聚合根来开展的,所以,你会看到人们更倾向于让 DomainEvent 成为聚合根的一部分。这里,博主的主要目的是想证明这种方案的可行性,个人以为,即使是放在实体上,一样是无伤大雅。大家可能会疑惑,博主你这样改造完以后,Publish()相关的代码片段哪里去了呢?还记得博主说过要对 DbContext 动一点小手术吗?我们一起来看看:

 1public class ChinookContext : DbContext
 2{
 3    private readonly IDomainEventDispatcher _domainEventDispatcher;
 4    public ChinookContext(IDomainEventDispatcher domainEventDispatcher)
 5    {
 6        _domainEventDispatcher = domainEventDispatcher;
 7    }
 8
 9    // .... 
10
11    public override async Task<int> SaveChangesAsync(
12        CancellationToken cancellationToken = default)
13    {
14        var entities = ChangeTracker.Entries()
15            .Where(x => x.Entity is Entity && ((Entity)x.Entity).DomainEvents.Any())
16            .Select(x => (Entity)x.Entity)
17            .ToList();
18
19        foreach (var entity in entities)
20        {
21            await _domainEventDispatcher.DispatchDomainEvent(entity.DomainEvents, cancellationToken);
22            entity.ClearDomainEvents();
23        }
24
25        return await base.SaveChangesAsync(cancellationToken);
26    }
27}

可以注意到,我们对 DbContextSaveChangesAsync() 方法进行了重写,并利用 EntityFrameworkChangeTracker 特性对附加在每个实体的 DomainEvents进行收集,此时,我们只需要把每一个领域事件发布出去即可。请注意,DbContextSaveChanges() 方法拥有多个重载形式,保险起见,你应该重写所有方法,这里仅仅以 SaveChangesAsync() 方法作为演示。对于 IDomainEventDispatcher 这个接口而言,它的定义其实非常简单,就单纯是一个事件分发器,你可以提供任何消息中间件,如 KafkaRabbitMQ 等等的实现:

1public interface IDomainEventDispatcher
2{
3    public Task DispatchDomainEvent<TDomainEvent>(
4        IEnumerable<TDomainEvent> domainEvents, 
5        CancellationToken cancellationToken = default
6    ) where TDomainEvent : IDomainEvent;
7}

在大多数关于 DDD 的文章中,当提到“事件”这个概念的时候,下面的这个事件处理器的定义一定不会缺席。这其实可以牵扯出领域事件和集成事件的区别,领域事件,可以认为是一个领域内,不同聚合根之间互相传递事件,因此,领域事件通常都是进程内的事件,像 MediatR 这样的库就非常合适;而集成事件,则是不同微服务间互相传递事件,因此,集成事件一定是跨服务、跨进程的事件,像 KafkaRabbitMQ 这样的消息队列就非常合适。

1public interface IEventHandler<TEventData> : IEventHandler 
2    where TEventData : IEventData
3{
4    void HandleEvent(TEventData eventData);
5}

虽然,博主目前工作中接触的主要是集成事件,甚至我们发布到 Kafka 中的是二进制形式的 Protobuf,可博主还是在尝试不断思考,是不是当下这个处境就是做好的方案。坦白讲,这篇博客里的内容并不算新颖,因为类似的 EventBus,我在两年前左右就曾亲手实现过,至少在上家公司的时候,面对同样的侵入式的“消息服务”,我个人当时是非常推崇这种事件/消息驱动业务的理念的。现在回过头再看,其实是因为整个物流的生命周期是确定的,业务上的分歧更多的是在中间环节产生,所以,至少在当时看来,事件/消息驱动业务的理念是完全正确的,唯一的问题在于“面条式”的业务代码被这些“消息服务”侵入地面目全非。直到来到现在的公司,发觉业务被 Kafka 肢解地支离破碎,平时工作中大家问的最多的问题居然是,Topic 是啥?Protobuf 是啥?Command 还是 Event?这无疑又让我对这种方案的合理性产生怀疑,作为一个人类,果然都拥有着始终都打不破的历史局限性啊!

且将新火试新茶

OK,到目前为止,我们基本上讲清楚了整个方案的运作机制,其实,早在两年前,博主就曾使用过类似的技术来实现 数据库审计,彼时彼刻,我对于 DDD领域事件,更多的是一种浅尝辄止的态度,甚至在上家公司工作时的核心冲突,是源于它有大量的数据同步的需求,我们需要一种更优雅的方式来“通知”数据的变更,而不是在代码里到处“埋点”,所以,从某种意义上来讲,今时今日与那年那月是如此的似曾相识,我还是想找到一种方法来规避这些“埋点”。诚然,重写 DbContextSaveChnages() 方法是一种方案,不过,自从 EntityFramework支持 [https://github.com/dotnet/efcore/pull/21862](SaveChanges Events) 特性以后,我们又有了一种新的选择。

1public event EventHandler<SavingChangesEventArgs> SavingChanges;
2public event EventHandler<SavedChangesEventArgs> SavedChanges;
3public event EventHandler<SaveChangesFailedEventArgs> SaveChangesFailed;

简单来说,这个特性为 DbContext 增加了三个事件,分别表示保存中、保存后、保存失败,所以,像这种保存到数据库以后再发消息的“一心流”,我们可以使用用下面的方法:

 1_chinookContext.SavedChanges += async (s, e) => 
 2{
 3    var context = s as ChinookContext;
 4
 5    var entities = context.ChangeTracker.Entries()
 6        .Where(x => x.Entity is Entity && ((Entity)x.Entity).DomainEvents.Any())
 7        .Select(x => (Entity)x.Entity)
 8        .ToList();
 9
10    foreach (var entity in entities)
11    {
12        await _domainEventDispatcher.DispatchDomainEvent(entity.DomainEvents);
13        entity.ClearDomainEvents();
14    }
15};

这个方案看起来还不错,特别是你没有机会去修改 DbContext 的时候,这会是个非常完美的思路。当然,如果觉得这个方案还不过瘾,你还可以考虑繼承 SaveChangesInterceptor 实现一个拦截器,下面以其中一个虚方法 SavedChanges() 为例:

 1public class DbContextInterceptor : SaveChangesInterceptor
 2{
 3    public override int SavedChanges(
 4        SaveChangesCompletedEventData eventData, int result)
 5    {
 6        // 获取 DbContext
 7        var context = eventData.Context;
 8
 9        // ....
10
11        return base.SavedChanges(eventData, result);
12    }
13}

显然,只要在这里拿到 DbContext,剩下的事情就变得非常简单啦!什么叫做“且将新火试新茶”呢?大概就是当你看到这段代码的时候,突然意识到两年前写的数据库审计,应该可以使用这个方法重构一下。这个世界上的东西,每一分每一秒都在发生着变化,连同在这里写下这些只言片语的我,连同在屏幕前看到这些文过饰非的你,也许,追求永恒不变的人都是贪心的人,只是我们自己不愿意承认罢了。可恰恰是因为这个世界纷繁多变,所以,矢志不渝、海枯石烂这种只在童话故事里出现的字眼,始能衬托出这人世间的弥足珍贵,不是吗?就像我以前以为代码不会变,可当你看到过去的代码,突然有了新的体会的时候,这一切终究还是变了。大抵,一切事物间的关系,像磁场一样有强弱的区别,只需要一个人轻轻地走开,留给对方熟悉世界里的仓促与陌生。

谁道人生无再少

从写下这篇文章的那一刻,我已然三十岁啦,至少从年龄上我已不能再被叫做少年。现在写博客更像是一种心态的描摹,甚至有非常多的内容或者想法,都是在写作过程中一点点追加上去的,所以,你看到我选用了几句诗做了这篇文章的二级标题,根本原因是一开始计划写的时候,只有一个非常模糊的大纲。写这篇文章的动机,一来是先后在两家公司遇到伴随着“埋点”而生的代码侵入问题,二来则是对消息/事件驱动这种业务模式的反思。譬如过去实现 EventBus 都是用 EventHandler 来处理消息,强类型/泛型带来是依赖注入、程序集扫描方面的便利。而新公司则是采用委托来实现消息的订阅处理,两种模式都可以工作得非常好,我属实说不上来哪一种会更好一点。在这个过程中,渐渐地理解了过去理解不了的原理,所以,这应该可以算作一种收获,DDD对当下业务而言是否合适,我还没有找到答案,不过,单纯地去使用“充血模型”应该可以算做一种进步,就像以前实现 ValueObject 要考虑很多东西,而现在可以直接使用 Record,这种意识有时候是潜移默化的、甚至是可遇不可求的,也许,这就是一个人开始变老的征兆,谁知道呢?谢谢大家,本文完!

Built with Hugo
Theme Stack designed by Jimmy