返回

源代码探案系列之 .NET Core 并发限制中间件 ConcurrencyLimiter

打算开一个新的专栏——源代码探案系列,目的是通过源代码来探索更广阔的技术世界。因为我越来越意识到,我可能缺乏一个结构化的知识体系,虽然处在一个碎片化的时代,从外界接收了大量的信息,可这些碎片化的信息,到底能不能转化为自身可用的知识,其实是需要去认真思考一番。尤其是当我注意到,许多人工作多年,在经历过从“生手”到“熟练工”这种蜕变以后,居然还是会害怕原理性内容的考察。我承认,程序员这个职业更像是一个“手艺人”,可我更想说一句古人的话——君子不器。什么是器呢?“形而上者谓之道,形而下者谓之器”,用一句更直白的话来说,就是“不能知其然而不知其所以然”,这是我一个非CS科班出身的程序员,想去写这样一个专栏的初衷,因为在我看来,“”是永远学不完的,而“”虽然听起来虚无缥缈,实则“朝闻道,夕死可矣”。

作为这个专栏的第一篇博客,我打算从 ASP.NET Core 中的 ConcurrencyLimiter 这个中间件开始。并发是一个爱恨交织的话题,我们喜欢高并发,因为这是程序员跻身高手行列的好机会;我们厌恶并发,因为它引入了多线程、锁、信号量这些复杂的东西。相信大家都曾被并发困扰过,古人云:他山之石,可以攻玉,还有什么比阅读源代码更朴实无华的“学习”呢?你找大牛,大牛可能忙着开会、做PPT;你找同事,同事里可能十个有八个都不知道啊。这个中间件的核心是 IQueuePolicy ,其位于以下位置,它定义了两个核心的方法:TryEnterAsync()OnExit()

1public interface IQueuePolicy
2{
3    ValueTask<bool> TryEnterAsync();
4    void OnExit();
5}

在其默认实现QueuePolicy中,TryEnterAsync()方法,决定着一个请求是会被拒绝还是接受。具体是怎么做呢?它定义了一个最大的并发请求数目,如果实际数超过了最大的并发请求数目,那么请求将会被拒绝。反之,请求将被接受。再仔细看,我们就会发现,它内部使用了SeamphoreSlimInterlocked,所以,聪明的小伙伴们应该立马会联想到,这两种锁各自的作用是什么。

其中,Seamphore 是一个 Windows 内核中的一个同步信号量,适用于在多个有限的线程资源中共享内存资源,它就像一个栅栏,本身具有一定的容量,当线程数量达到这个容量后,新的线程就无法再通过,直到某个线程执行完成。SeamphoreSlimSeamphore优化后的版本,在性能上表现更好一点,更推荐大家使用SeamphoreSlim

Interlocked 的则是我们熟悉的原子操作,它可以在多个线程中,对共享的内存资源进行原子加或者原子减操作。在这里,Interlocked主要用来控制并发请求数的加和减。如果当前的并发请求数小于最大的并发请求数,表示还可以允许新的请求进来,此时,TryEnterAsync()方法会返回true。如果此时的并发请求数大于最大的并发请求数,则需要对当前请求数进行减操作,此时,TryEnterAsync()方法会返回false。

一旦搞清楚这一点,结合中间件的代码,我们可以非常容易地想明白,这个并发控制的实现思路。下面是QueuePolicyTryEnterAsync()OnExit()两个方法的实现,分别代表了“加锁”和“解锁”两个不同的阶段。某种程度上,Seamphore更像一个水闸,每次可以通过的“流量”是固定的,超出的部分会被直接“拒绝”:

 1//“加锁”
 2public ValueTask<bool> TryEnterAsync()
 3{
 4    // a return value of 'false' indicates that the request is rejected
 5    // a return value of 'true' indicates that the request may proceed
 6    // _serverSemaphore.Release is *not* called in this method, 
 7    // it is called externally when requests leave the server
 8    int totalRequests = Interlocked.Increment(ref _totalRequests);
 9     
10    //当前请求次数 > 最大请求次数,返回false表示拒绝
11    if (totalRequests > _maxTotalRequest) {
12        Interlocked.Decrement(ref _totalRequests);
13        return new ValueTask<bool>(false);
14    }
15
16    Task task = _serverSemaphore.WaitAsync();
17    if (task.IsCompletedSuccessfully) {
18        return new ValueTask<bool>(true);
19    }
20
21    return SemaphoreAwaited(task);
22}
23
24//“解锁”
25public void OnExit()
26{
27    _serverSemaphore.Release();
28    Interlocked.Decrement(ref _totalRequests);
29}

揭秘 StackPolicy

除了QueuePolicy这种实现以外,官方还提供了StackPolicy的实现。从名称上,我们就能大致区分出它们的不同,因为我相信大家都能拎得清“队列”和“栈”。在实现StackPolicy的过程中,首先会判断是否还有访问请求次数_freeServerSpots,直接返回true,确保中间件可以继续执行。如果_queueLength和我们设置的队列最大容量相同,此时,表示队列已满,需要先取消之前的请求,并保留后来的请求。

 1public ValueTask<bool> TryEnterAsync()
 2{
 3    lock (_bufferLock) {
 4        if (_freeServerSpots > 0) {
 5            _freeServerSpots--;
 6            return new ValueTask<bool>(true);
 7        }
 8
 9        // 队列已满,则取消之前的请求,即_head
10        if (_queueLength == _maxQueueCapacity) {
11            _hasReachedCapacity = true;
12            _buffer[_head].Complete(false);
13            _queueLength--;
14        }
15
16        var tcs = _cachedResettableTCS ?? 
17            = new ResettableBooleanCompletionSource(this);
18        _cachedResettableTCS = null;
19
20        if (_hasReachedCapacity || _queueLength < _buffer.Count) {
21            _buffer[_head] = tcs;
22        } else {
23            _buffer.Add(tcs);
24        }
25        _queueLength++;
26
27        // increment _head for next time
28        // 如果_head = 最大队列容量,则_head需要移动至首位
29        _head++;
30        if (_head == _maxQueueCapacity) {
31            _head = 0;
32        }
33
34        return tcs.GetValueTask();
35    }
36}
37
38public void OnExit()
39{
40    lock (_bufferLock) {
41        if (_queueLength == 0) {
42            _freeServerSpots++;
43
44            f (_freeServerSpots > _maxConcurrentRequests) {
45                _freeServerSpots--;
46                throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
47            }
48
49            return;
50    }
51
52    // step backwards and launch a new task
53    if (_head == 0) {
54        _head = _maxQueueCapacity - 1;
55    } else {
56        _head--;
57    }
58
59    _buffer[_head].Complete(true);
60    _queueLength--;
61
62    }
63}

所以,现在,你可以感受到这两种策略的差异了,QueuePolicy是一个水闸,“多”出来的流量会被直接拒绝掉。StackPolicy是一个垂直的管道,每次都是先取消底部的请求,再让新的请求从顶部进来。此时,如果我们再回过头来看 ConcurrencyLimiterMiddleware 这个中间件的实现,就会有种恍然大悟的感觉。

揭秘 Middleware

 1public async Task Invoke(HttpContext context)
 2{
 3    // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
 4    // 以下代码片段,其实都是调用IQueuePolicy.TryEnterAsync()
 5    var waitInQueueTask = _queuePolicy.TryEnterAsync();
 6    bool result;
 7
 8    if (waitInQueueTask.IsCompleted) {
 9        ConcurrencyLimiterEventSource.Log.QueueSkipped();
10        result = waitInQueueTask.Result;
11    }
12    else {
13        using (ConcurrencyLimiterEventSource.Log.QueueTimer())
14        {
15            result = await waitInQueueTask;
16        }
17    }
18    
19    // 当result为true,表示请求被接收,此时,让中间件继续执行
20    // 切记:调用_queuePolicy.OnExit()来释放锁
21    if (result) {
22        try {
23            await _next(context);
24        }
25        finally {
26            _queuePolicy.OnExit();
27        }
28    } else {
29        //这里就是请求被拒绝的情况,修改状态码以及输出错误信息
30        ConcurrencyLimiterEventSource.Log.RequestRejected();
31        ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
32        context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
33        await _onRejected(context);
34    }
35}

至此,我们就理清了整个中间件的运作机制,ConcurrencyLimiterMiddleware 中注入了IQueuePolicy这个接口,当一个新的请求进来,中间件会调用IQueuePolicy接口的TryEnterAsync()方法,该方法决定了一个请求是会被接受还是拒绝。当请求被接受的时候,中间件会调用_next(context)让请求继续往下走;当请求被拒绝的时候,中间件会修改 HTTP 状态码(503) 和 返回值,保证调用者可以收到错误信息。这就是这个中间件全部的秘密。而如果要在项目中使用这个中间件,同样是非常简单的:

 1
 2// 中间件基本法,先注册后使用
 3// ConfigureServices()
 4// 或者 services.AddQueuePolicy()
 5services.AddStackPolicy(options =>
 6{
 7    options.MaxConcurrentRequests = 2;
 8    options.RequestQueueLimit = 25;
 9})
10
11// Configure()
12app.UseConcurrencyLimiter();

本文小结

这篇博客,主要揭秘了 ASP.NET Core 中的 ConcurrencyLimiter 中间件,这个中间件的主要功能是控制 ASP.NET Core 中的请求并发。作为这个中间件的核心,微软为 IQueuePolicy 接口提供了 QueuePolicyStackPolicy 两种不同的策略实现。其中,QueuePolicy是一个水闸,“多”出来的流量会被直接拒绝掉。StackPolicy是一个垂直的管道,每次都是先取消底部的请求,再让新的请求从顶部进来。对于我们而言,这个中间件最值得学习的地方,其实是SeamphoreSlimInterlocked,我们经常提到“”,其实,“”不单单是指 .NET 中Monitor的语法糖,即lock关键字,在同步信号量以及线程同步的相关话题中,我们还会接触到譬如 Mutex(互斥锁)、ReaderWriterLockSlim、Interlocked(原子操作)SpinLock(自旋锁) 以及 SeamphoreSlim 等等不同的“”。除此之外,还有譬如AutoResetEvent、ManualResetEvent 和 ManualResetEventSlim 等等的同步信号量。如果有读者朋友对此感兴趣,可以到 MSDN 上去搜索相关的关键字,能让博主本人和大家从中有所收获,这是我坚持写下去的理由。好了,以上就是这篇博客的全部内容啦,欢迎大家在评论区留言、讨论。

Built with Hugo
Theme Stack designed by Jimmy