返回

源代码探案系列之 .NET Core 限流中间件 AspNetCoreRateLimit

在上一篇文章中,博主带领大家一起深入了解 ConcurrencyLimiter 这个中间件,正当我得意洋洋地向 Catcher Wong 大佬吹嘘这一点小收获时,大佬一脸嫌弃地说,一个单机版的方案有什么好得意的啊。大佬言下之意,显然是指,这个中间件在分布式环境中毫无用武之地。其实,你只需要稍微想一下,就能想明白这个问题。毕竟,它只是通过SeamphoreSlim控制线程数量而已,一旦放到分布式环境中,这个并发控制就被大大地削弱。所以,在今天这篇文章中,博主会带领大家一起“探案ASP.NET Core 中的限流中间件 AspNetCoreRateLimite,希望大家可以从中感悟到不一样的东西。对我而言,这可能是人到中年的焦虑感所催生出来的一种源动力,同时亦是为了不让那些订阅专栏的同学失望。

关于“限流”这个话题,我个人以为,它可以引申出非常多的东西,譬如“熔断”和“限流”,其实可以看作是同一类问题的“一体两面”。最早接触熔断,是源于 Spring Cloud 中的 Hystrix,它其实是指当服务不可用的时候,客户端应该采取什么样的措施去应对,实际使用中我们可能会考虑重试、超时、降级等策略。相应地,当服务端在面对来自客户端的异常流量时,就产生了“限流”这个概念,“限流”可以是线程隔离**(线程数 + 队列大小限制),可以是信号量隔离(设置最大并发请求数目),可以是限制QPS。这里,我们讨论的主要是第三种,而实现限流的常见算法主要有计数器算法漏桶算法令牌桶算法。这里,AspNetCoreRateLimit 这个中间件,则主要使用了计数器算法,并配合 IMemoryCacheIDistributedCache 分别实现了基于内存和基于分布式缓存的持久化逻辑。

源代码解读

首先,使用者通过配置定义了一个或者多个规则,这些规则决定了每个客户端在访问特定终结点时,一段时间内可以访问的最大次数。 RateLimitMiddleware 通过注入的IRateLimitProcessor 来匹配规则,然后依次判断每个规则是否达到了限流条件。一旦达到限流条件,中间件会改变 HTTP 响应的状态码、响应头、返回值,告知使用者已达到最大调用次数。而针对每一种 IRateLimitProcessor ,主要通过ProcessRequestAsync() 方法来实现计数,如果上一次的请求对应的时间戳 + 规则中时间间隔 >= 当前时间,则说明请求没有过期,此时,就需要给这个计数增加1。好了,现在我们来针对 AspNetCoreRateLimit 中的核心部件逐个进行解读。

RateLimitProcessor

RateLimitProcessor,是一个抽象类,实现了IRateLimitProcessor接口,公开的方法有 3 个:ProcessRequestAsync()IsWhitelisted()GetRateLimitHeaders()。在此基础上,派生出ClientRateLimitProcessorIpRateLimitProcessor两个子类。两者最大的不同在于,其所依赖的Store不同,前者为IClientPolicyStore,后者IIpPolicyStore,它们都实现了同一个接口IRateLimitStore

1public interface IRateLimitStore<T>
2{
3    Task<bool> ExistsAsync(string id, CancellationToken cancellationToken = default);
4    Task<T> GetAsync(string id, CancellationToken cancellationToken = default);
5    Task RemoveAsync(string id, CancellationToken cancellationToken = default);
6    Task SetAsync(string id, T entry, TimeSpan? expirationTime = null, 
7        CancellationToken cancellationToken = default
8    );
9}

可以注意到,这些都是典型的基于键-值的存储,所以,不管是基于内存的IMemeryCache,还是基于分布式缓存的IDistributedCache,都可以做到无缝切换。不同的Processor,本质上是它们生成缓存键的方式不同,例如,IpRateLimitProcessor是用一个前缀来表示一组IP,而ClientRateLimitProcessor则是用通过客户端前缀和客户端Id来作为区分:

 1// src/AspNetCoreRateLimit/Core/IpRateLimitProcessor.cs
 2public async Task<IEnumerable<RateLimitRule>> GetMatchingRulesAsync(
 3    ClientRequestIdentity identity, 
 4    CancellationToken cancellationToken = default
 5)
 6{
 7    var policies = await _policyStore.GetAsync(
 8        $"{_options.IpPolicyPrefix}",
 9        cancellationToken
10    );
11    var rules = new List<RateLimitRule>();
12    if (policies?.IpRules?.Any() == true)
13    {
14        // search for rules with IP intervals containing client IP
15        var matchPolicies = policies.IpRules
16            .Where(r => IpParser.ContainsIp(r.Ip, identity.ClientIp));
17        foreach (var item in matchPolicies)
18        {
19            rules.AddRange(item.Rules);
20        }
21    }
22
23    return GetMatchingRules(identity, rules);
24}
25
26// src/AspNetCoreRateLimit/Core/ClientRateLimitProcessor.cs
27public async Task<IEnumerable<RateLimitRule>> GetMatchingRulesAsync(
28    ClientRequestIdentity identity, 
29    CancellationToken cancellationToken = default
30)
31{
32    var policy = await _policyStore.GetAsync(
33        $"{_options.ClientPolicyPrefix}_{identity.ClientId}", 
34        cancellationToken
35    );
36    return GetMatchingRules(identity, policy?.Rules);
37}

对于RateLimitProcessor而言,其实现思路是,通过CounterKeyBuilder及其子类来生成计数器标识(CounterId),然后再通过AsyncKeyLock来实现计数,最终通过IRateLimitCounterStore来实现存储:

 1public virtual async Task<RateLimitCounter> ProcessRequestAsync(
 2    ClientRequestIdentity requestIdentity, 
 3    RateLimitRule rule, 
 4    CancellationToken cancellationToken = default
 5)
 6{
 7    var counter = new RateLimitCounter
 8    {
 9        Timestamp = DateTime.UtcNow,
10        Count = 1
11    };
12
13    // 生成CounterId
14    var counterId = BuildCounterKey(requestIdentity, rule);
15    
16    // 基于AsyncLock的计数器
17    // serial reads and writes on same key
18    using (await AsyncLock.WriterLockAsync(counterId).ConfigureAwait(false))
19    {
20        var entry = await _counterStore.GetAsync(counterId, cancellationToken);
21
22        if (entry.HasValue)
23        {
24            // entry has not expired
25            if (entry.Value.Timestamp + rule.PeriodTimespan.Value >= DateTime.UtcNow)
26            {
27                // increment request count
28                var totalCount = entry.Value.Count + _config.RateIncrementer?.Invoke() ?? 1;
29
30                // deep copy
31                counter = new RateLimitCounter
32                {
33                    Timestamp = entry.Value.Timestamp,
34                    Count = totalCount
35                };
36            }
37        }
38        
39        // 计数器存储
40        // stores: id (string) - timestamp (datetime) - total_requests (long)
41        await _counterStore.SetAsync(
42            counterId, 
43            counter, 
44            rule.PeriodTimespan.Value, 
45            cancellationToken
46        );
47    }
48
49    return counter;
50}

AsyncKeyLock

在分析RateLimitProcessor类的时候,我们提到了AsyncKeyLock。对于AsyncKeyLock的实现,我个人认为这是整个中间件的精华,因为这里出现了,和SeamphoreSlim一样经典的东西,这里用到了自旋锁SpinLock。我个人理解,SpinLock 约等于 Interlocked + 内核级别的while。这部分代码本身并不复杂,难就难在这样一个精妙的想法上面。其中,AsyncKeyLockDoorman 这个类的实现,应该是参考了微软的一篇博客—— Building Async Coordination Primitives, Part 7: AsyncReaderWriterLock,因为ReaderLockAsync()WriterLockAsync()ReaderRelease()WriterRelease() 这 4 个关键方法完全一样。结合限流这个场景来看,它是典型的“多写”场景,因为如果是相同的请求,那么就会产生相同的计数器标识(CounterId),所以,这个AsyncLockDoorman这个类所定义的上下文边界,其实是“一读多写”的问题,所以,我们可以注意到,它里面定义了一个“写”操作的队列_waitingWriters,一个“读操作”的_waitingReader

 1public AsyncKeyLockDoorman(Action<AsyncKeyLockDoorman> reset)
 2{   
 3    // 多个写入者
 4    _waitingWriters = new Queue<TaskCompletionSource<Releaser>>();
 5    // 单个读取者
 6    _waitingReader = new TaskCompletionSource<Releaser>();
 7    _status = 0;
 8
 9    _readerReleaser = Task.FromResult(new Releaser(this, false));
10    _writerReleaser = Task.FromResult(new Releaser(this, true));
11    _reset = reset;
12}

对于“写”操作而言,当一个新的写入者希望进来的时候,如果此时锁没有被别人占用,那么这个新的写入者会获得这个锁,状态值m_status会被修改为-1。反之,如果此时这个锁已经被别人占用了,那么这个新的写入者将会进入等待队列。

 1public Task<Releaser> WriterLockAsync()
 2{
 3    lock (_waitingWriters)
 4    {
 5        if (_status == 0)
 6        {
 7            _status = -1;
 8            return _writerReleaser;
 9        }
10        else
11        {
12            var waiter = new TaskCompletionSource<Releaser>();
13            _waitingWriters.Enqueue(waiter);
14            return waiter.Task;
15        }
16    }
17}

对于“读”操作而言,我们来思考这样一个问题,什么时候“读”操作会被允许呢?答案是这一时刻没有写入者正在“写”或者“等”,因为如果不这样的话,就会发生我们平常所说的“脏读”,所以,这种情况下,就必须强迫“读取者”去等待写入者“空闲”下来。此时,不难理解ReadLockAsync()的实现:

 1public Task<Releaser> ReaderLockAsync()
 2{
 3    lock (_waitingWriters)
 4    {
 5        if (_status >= 0 && _waitingWriters.Count == 0)
 6        {
 7            ++_status;
 8            return _readerReleaser;
 9        }
10        else
11        {
12            ++_readersWaiting;
13            return _waitingReader.Task.ContinueWith(t => t.Result);
14        }
15    }
16}

现在,让我们把视线拉回到AsyncKeyLock,它负责维护一组AsyncKeyLockDoorman,其内部部通过一个字典来维护CounterIdAsyncKeyLockDoorman间的关系。与此同时,为了减少创建·AsyncKeyLockDoorman·带来的性能损耗,它使用一个栈来存储AsyncKeyLockDoorman。每次获取AsyncKeyLockDoorman的过程,本质上就是为指定的Key分配AsyncKeyLockDoorman的过程,同时会更新其引用数RefCount。相应地,释放AsyncKeyLockDoorman的过程,本质上就是减少其引用数RefCount,从字典中移除指定Key,“归还”对象池的过程:

 1// GetDoorman()
 2private static AsyncKeyLockDoorman GetDoorman(string key)
 3{
 4    AsyncKeyLockDoorman doorman;
 5    bool lockTaken = false;
 6    try
 7    {
 8        _spinLock.Enter(ref lockTaken);
 9        if (!Keys.TryGetValue(key, out doorman))
10        {
11            doorman = (Pool.Count > 0) ? Pool.Pop() : 
12                new AsyncKeyLockDoorman(ReleaseDoorman);
13            doorman.Key = key;
14            Keys.Add(key, doorman);
15        }
16
17        doorman.RefCount++;
18    }
19    finally
20    {
21        if (lockTaken)
22        {
23            _spinLock.Exit();
24        }
25    }
26
27    return doorman;
28}
29
30// ReleaseDoorman()
31private static void ReleaseDoorman(AsyncKeyLockDoorman doorman)
32{
33    bool lockTaken = false;
34    try
35    {
36        _spinLock.Enter(ref lockTaken);
37        if (--doorman.RefCount == 0)
38        {
39            Keys.Remove(doorman.Key);
40            if (Pool.Count < MaxPoolSize)
41            {
42                doorman.Key = null;
43                Pool.Push(doorman);
44            }
45        }
46    }
47    finally
48    {
49        if (lockTaken)
50        {
51            _spinLock.Exit();
52        }
53    }
54}

RateLimitMiddleware

OK,到这里,我们再回过头去看源代码解读这里的内容,大概就可以串起来整合中间件的调用链路,Middleware->RateLimteProcessor->AsyncKeyLock->AsyncKeyLockDoorman,坦白来讲,我一直没能想明白为什么要用SpinLock?难道仅仅是为了减少等待时间、提高性能吗?经过精简,我们发现,整个中间件的Invoke()方法,大致要经历下面几个阶段:

 1public async Task Invoke(HttpContext context)
 2{
 3    // 检查限流是否启用
 4    if (_options == null)
 5    {
 6        await _next.Invoke(context);
 7        return;
 8    }
 9
10    // 获取用户身份
11    var identity = await ResolveIdentityAsync(context);
12
13    // 检查白名单
14    if (_processor.IsWhitelisted(identity))
15    {
16        await _next.Invoke(context);
17        return;
18    }
19
20    //获取限流规则
21    var rulesDict = new Dictionary<RateLimitRule, RateLimitCounter>();
22    var rules = await _processor.GetMatchingRulesAsync(
23        identity, 
24        context.RequestAborted
25    );
26    foreach (var rule in rules)
27    {
28        // 获取计数器数目
29        var rateLimitCounter = await _processor.ProcessRequestAsync(
30            identity, 
31            rule, 
32            context.RequestAborted
33        );
34        if (rule.Limit > 0)
35        {
36            // 请求未过期
37            if (rateLimitCounter.Timestamp + rule.PeriodTimespan.Value < DateTime.UtcNow)
38            {
39                continue;
40            }
41
42            // 请求过期
43            if (rateLimitCounter.Count > rule.Limit)
44            {
45                // 各种记日志,告诉调用者多长时间后再重试
46                var retryAfter = rateLimitCounter.Timestamp.RetryAfterFrom(rule);
47                
48                // ...
49
50                // 中止请求
51                await ReturnQuotaExceededResponse(context, rule, retryAfter);
52                return;
53            }
54        }
55        else
56        {
57            // Limit <= 0, 相当于直接不允许放行,中止请求
58            await ReturnQuotaExceededResponse(
59                context, 
60                rule, 
61                int.MaxValue.ToString(System.Globalization.CultureInfo.InvariantCulture)
62            ); 
63        }
64
65        // ...
66    }
67
68    // 设置X-Rate-Limit头
69    // ...
70
71    await _next.Invoke(context);
72}

本文小结

作为 并发限制 这一篇的“姊妹篇”,这一篇的难度相对上一篇堪称“高山仰止”,主要的难点是 SpinLock 、“一读多写”的异步读写锁 AsyncKeyLock 以及 AsyncKeyLockDoorman 。如果大家感兴趣的话,可以去搜索一下 AsyncKeyLock 这个关键字,大家就会发现在好多开源项目 中都能找到类似的代码片段,莫非这是某种神奇的算法吗?阅读源代码,其实是一个无法“立竿见影”的学习方法,有时候我们要通过叙述或者表达来输出我们对待一件事物的看法。这是因为,我们自以为是的“学会”和真正的“学会”,这两者间可能千差万别,就像我最近在用 ABP vNext 搭建一个小项目,阅读文档的时候,眼睛觉得它“学会”了,而实际需要需要扩展或者替换 ABP 的实体/服务的时候。我的手会告诉我,它真的“不会”。做一个知难行易的“调包”侠也许会非常容易,可正因为如此,你要凸显自我就会非常困难。世上的事情,“夫夷以近,则游者众;险以远,则至者少。而世之奇伟、瑰怪,非常之观,常在于险远,而人之所罕至焉,故非有志者不能至也”,哪怕就是增长一下见识呢,你说对吧……

Built with Hugo
Theme Stack designed by Jimmy