返回

源代码探案系列之 .NET Core 并发限制中间件 ConcurrencyLimiter

AI 摘要
本文是 “源代码探案系列” 的开篇,旨在通过深入分析源代码来构建结构化的知识体系。文章首先指出了程序员在从 “生手” 到 “熟练工” 转变后,仍需掌握原理性知识的重要性。接着,作者以ASP.NET Core 中的 ConcurrencyLimiter 中间件为例,探讨了并发控制的实现原理。文中详细解释了 IQueuePolicy 接口及其两种策略实现:QueuePolicy 和 StackPolicy,并通过代码示例阐释了它们的工作机制。文章还介绍了如何使用 ConcurrencyLimiterMiddleware 中间件来控制请求并发,并提供了具体的使用示例。最后,作者总结了学习并发控制中间件的重要性,并鼓励读者深入学习相关技术。

打算开一个新的专栏——源代码探案系列,目的是通过源代码来探索更广阔的技术世界。因为我越来越意识到,我可能缺乏一个结构化的知识体系,虽然处在一个碎片化的时代,从外界接收了大量的信息,可这些碎片化的信息,到底能不能转化为自身可用的知识,其实是需要去认真思考一番。尤其是当我注意到,许多人工作多年,在经历过从“生手”到“熟练工”这种蜕变以后,居然还是会害怕原理性内容的考察。我承认,程序员这个职业更像是一个“手艺人”,可我更想说一句古人的话——君子不器。什么是器呢?“形而上者谓之道,形而下者谓之器”,用一句更直白的话来说,就是“不能知其然而不知其所以然”,这是我一个非CS科班出身的程序员,想去写这样一个专栏的初衷,因为在我看来,“”是永远学不完的,而“”虽然听起来虚无缥缈,实则“朝闻道,夕死可矣”。

作为这个专栏的第一篇博客,我打算从 ASP.NET Core 中的 ConcurrencyLimiter 这个中间件开始。并发是一个爱恨交织的话题,我们喜欢高并发,因为这是程序员跻身高手行列的好机会;我们厌恶并发,因为它引入了多线程、锁、信号量这些复杂的东西。相信大家都曾被并发困扰过,古人云:他山之石,可以攻玉,还有什么比阅读源代码更朴实无华的“学习”呢?你找大牛,大牛可能忙着开会、做PPT;你找同事,同事里可能十个有八个都不知道啊。这个中间件的核心是 IQueuePolicy ,其位于以下位置,它定义了两个核心的方法:TryEnterAsync()OnExit()

public interface IQueuePolicy
{
    ValueTask<bool> TryEnterAsync();
    void OnExit();
}

在其默认实现QueuePolicy中,TryEnterAsync()方法,决定着一个请求是会被拒绝还是接受。具体是怎么做呢?它定义了一个最大的并发请求数目,如果实际数超过了最大的并发请求数目,那么请求将会被拒绝。反之,请求将被接受。再仔细看,我们就会发现,它内部使用了SeamphoreSlimInterlocked,所以,聪明的小伙伴们应该立马会联想到,这两种锁各自的作用是什么。

其中,Seamphore 是一个 Windows 内核中的一个同步信号量,适用于在多个有限的线程资源中共享内存资源,它就像一个栅栏,本身具有一定的容量,当线程数量达到这个容量后,新的线程就无法再通过,直到某个线程执行完成。SeamphoreSlimSeamphore优化后的版本,在性能上表现更好一点,更推荐大家使用SeamphoreSlim

Interlocked 的则是我们熟悉的原子操作,它可以在多个线程中,对共享的内存资源进行原子加或者原子减操作。在这里,Interlocked主要用来控制并发请求数的加和减。如果当前的并发请求数小于最大的并发请求数,表示还可以允许新的请求进来,此时,TryEnterAsync()方法会返回true。如果此时的并发请求数大于最大的并发请求数,则需要对当前请求数进行减操作,此时,TryEnterAsync()方法会返回false。

一旦搞清楚这一点,结合中间件的代码,我们可以非常容易地想明白,这个并发控制的实现思路。下面是QueuePolicyTryEnterAsync()OnExit()两个方法的实现,分别代表了“加锁”和“解锁”两个不同的阶段。某种程度上,Seamphore更像一个水闸,每次可以通过的“流量”是固定的,超出的部分会被直接“拒绝”:

//“加锁”
public ValueTask<bool> TryEnterAsync()
{
    // a return value of 'false' indicates that the request is rejected
    // a return value of 'true' indicates that the request may proceed
    // _serverSemaphore.Release is *not* called in this method, 
    // it is called externally when requests leave the server
    int totalRequests = Interlocked.Increment(ref _totalRequests);
     
    //当前请求次数 > 最大请求次数,返回false表示拒绝
    if (totalRequests > _maxTotalRequest) {
        Interlocked.Decrement(ref _totalRequests);
        return new ValueTask<bool>(false);
    }

    Task task = _serverSemaphore.WaitAsync();
    if (task.IsCompletedSuccessfully) {
        return new ValueTask<bool>(true);
    }

    return SemaphoreAwaited(task);
}

//“解锁”
public void OnExit()
{
    _serverSemaphore.Release();
    Interlocked.Decrement(ref _totalRequests);
}

揭秘 StackPolicy

除了QueuePolicy这种实现以外,官方还提供了StackPolicy的实现。从名称上,我们就能大致区分出它们的不同,因为我相信大家都能拎得清“队列”和“栈”。在实现StackPolicy的过程中,首先会判断是否还有访问请求次数_freeServerSpots,直接返回true,确保中间件可以继续执行。如果_queueLength和我们设置的队列最大容量相同,此时,表示队列已满,需要先取消之前的请求,并保留后来的请求。

public ValueTask<bool> TryEnterAsync()
{
    lock (_bufferLock) {
        if (_freeServerSpots > 0) {
            _freeServerSpots--;
            return new ValueTask<bool>(true);
        }

        // 队列已满,则取消之前的请求,即_head
        if (_queueLength == _maxQueueCapacity) {
            _hasReachedCapacity = true;
            _buffer[_head].Complete(false);
            _queueLength--;
        }

        var tcs = _cachedResettableTCS ?? 
            = new ResettableBooleanCompletionSource(this);
        _cachedResettableTCS = null;

        if (_hasReachedCapacity || _queueLength < _buffer.Count) {
            _buffer[_head] = tcs;
        } else {
            _buffer.Add(tcs);
        }
        _queueLength++;

        // increment _head for next time
        // 如果_head = 最大队列容量,则_head需要移动至首位
        _head++;
        if (_head == _maxQueueCapacity) {
            _head = 0;
        }

        return tcs.GetValueTask();
    }
}

public void OnExit()
{
    lock (_bufferLock) {
        if (_queueLength == 0) {
            _freeServerSpots++;

            f (_freeServerSpots > _maxConcurrentRequests) {
                _freeServerSpots--;
                throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
            }

            return;
    }

    // step backwards and launch a new task
    if (_head == 0) {
        _head = _maxQueueCapacity - 1;
    } else {
        _head--;
    }

    _buffer[_head].Complete(true);
    _queueLength--;

    }
}

所以,现在,你可以感受到这两种策略的差异了,QueuePolicy是一个水闸,“多”出来的流量会被直接拒绝掉。StackPolicy是一个垂直的管道,每次都是先取消底部的请求,再让新的请求从顶部进来。此时,如果我们再回过头来看 ConcurrencyLimiterMiddleware 这个中间件的实现,就会有种恍然大悟的感觉。

揭秘 Middleware

public async Task Invoke(HttpContext context)
{
    // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
    // 以下代码片段,其实都是调用IQueuePolicy.TryEnterAsync()
    var waitInQueueTask = _queuePolicy.TryEnterAsync();
    bool result;

    if (waitInQueueTask.IsCompleted) {
        ConcurrencyLimiterEventSource.Log.QueueSkipped();
        result = waitInQueueTask.Result;
    }
    else {
        using (ConcurrencyLimiterEventSource.Log.QueueTimer())
        {
            result = await waitInQueueTask;
        }
    }
    
    // 当result为true,表示请求被接收,此时,让中间件继续执行
    // 切记:调用_queuePolicy.OnExit()来释放锁
    if (result) {
        try {
            await _next(context);
        }
        finally {
            _queuePolicy.OnExit();
        }
    } else {
        //这里就是请求被拒绝的情况,修改状态码以及输出错误信息
        ConcurrencyLimiterEventSource.Log.RequestRejected();
        ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
        context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
        await _onRejected(context);
    }
}

至此,我们就理清了整个中间件的运作机制,ConcurrencyLimiterMiddleware 中注入了IQueuePolicy这个接口,当一个新的请求进来,中间件会调用IQueuePolicy接口的TryEnterAsync()方法,该方法决定了一个请求是会被接受还是拒绝。当请求被接受的时候,中间件会调用_next(context)让请求继续往下走;当请求被拒绝的时候,中间件会修改 HTTP 状态码(503) 和 返回值,保证调用者可以收到错误信息。这就是这个中间件全部的秘密。而如果要在项目中使用这个中间件,同样是非常简单的:


// 中间件基本法,先注册后使用
// ConfigureServices()
// 或者 services.AddQueuePolicy()
services.AddStackPolicy(options =>
{
    options.MaxConcurrentRequests = 2;
    options.RequestQueueLimit = 25;
})

// Configure()
app.UseConcurrencyLimiter();

本文小结

这篇博客,主要揭秘了 ASP.NET Core 中的 ConcurrencyLimiter 中间件,这个中间件的主要功能是控制 ASP.NET Core 中的请求并发。作为这个中间件的核心,微软为 IQueuePolicy 接口提供了 QueuePolicyStackPolicy 两种不同的策略实现。其中,QueuePolicy是一个水闸,“多”出来的流量会被直接拒绝掉。StackPolicy是一个垂直的管道,每次都是先取消底部的请求,再让新的请求从顶部进来。对于我们而言,这个中间件最值得学习的地方,其实是SeamphoreSlimInterlocked,我们经常提到“”,其实,“”不单单是指 .NET 中Monitor的语法糖,即lock关键字,在同步信号量以及线程同步的相关话题中,我们还会接触到譬如 Mutex(互斥锁)、ReaderWriterLockSlim、Interlocked(原子操作)SpinLock(自旋锁) 以及 SeamphoreSlim 等等不同的“”。除此之外,还有譬如AutoResetEvent、ManualResetEvent 和 ManualResetEventSlim 等等的同步信号量。如果有读者朋友对此感兴趣,可以到 MSDN 上去搜索相关的关键字,能让博主本人和大家从中有所收获,这是我坚持写下去的理由。好了,以上就是这篇博客的全部内容啦,欢迎大家在评论区留言、讨论。

Built with Hugo v0.126.1
Theme Stack designed by Jimmy
已创作 273 篇文章,共计 1032286 字