打算开一个新的专栏——源代码探案系列,目的是通过源代码来探索更广阔的技术世界。因为我越来越意识到,我可能缺乏一个结构化的知识体系,虽然处在一个碎片化的时代,从外界接收了大量的信息,可这些碎片化的信息,到底能不能转化为自身可用的知识,其实是需要去认真思考一番。尤其是当我注意到,许多人工作多年,在经历过从“生手”到“熟练工”这种蜕变以后,居然还是会害怕原理性内容的考察。我承认,程序员这个职业更像是一个“手艺人”,可我更想说一句古人的话——君子不器。什么是器呢?“形而上者谓之道,形而下者谓之器”,用一句更直白的话来说,就是“不能知其然而不知其所以然”,这是我一个非CS科班出身的程序员,想去写这样一个专栏的初衷,因为在我看来,“器”是永远学不完的,而“道”虽然听起来虚无缥缈,实则“朝闻道,夕死可矣”。
作为这个专栏的第一篇博客,我打算从 ASP.NET Core 中的 ConcurrencyLimiter 这个中间件开始。并发是一个爱恨交织的话题,我们喜欢高并发,因为这是程序员跻身高手行列的好机会;我们厌恶并发,因为它引入了多线程、锁、信号量这些复杂的东西。相信大家都曾被并发困扰过,古人云:他山之石,可以攻玉,还有什么比阅读源代码更朴实无华的“学习”呢?你找大牛,大牛可能忙着开会、做PPT;你找同事,同事里可能十个有八个都不知道啊。这个中间件的核心是 IQueuePolicy
,其位于以下位置,它定义了两个核心的方法:TryEnterAsync()
和 OnExit()
:
public interface IQueuePolicy
{
ValueTask<bool> TryEnterAsync();
void OnExit();
}
在其默认实现QueuePolicy
中,TryEnterAsync()
方法,决定着一个请求是会被拒绝还是接受。具体是怎么做呢?它定义了一个最大的并发请求数目,如果实际数超过了最大的并发请求数目,那么请求将会被拒绝。反之,请求将被接受。再仔细看,我们就会发现,它内部使用了SeamphoreSlim
和Interlocked
,所以,聪明的小伙伴们应该立马会联想到,这两种锁各自的作用是什么。
其中,Seamphore
是一个 Windows 内核中的一个同步信号量,适用于在多个有限的线程资源中共享内存资源,它就像一个栅栏,本身具有一定的容量,当线程数量达到这个容量后,新的线程就无法再通过,直到某个线程执行完成。SeamphoreSlim
是Seamphore
优化后的版本,在性能上表现更好一点,更推荐大家使用SeamphoreSlim
。
而 Interlocked
的则是我们熟悉的原子操作,它可以在多个线程中,对共享的内存资源进行原子加或者原子减操作。在这里,Interlocked
主要用来控制并发请求数的加和减。如果当前的并发请求数小于最大的并发请求数,表示还可以允许新的请求进来,此时,TryEnterAsync()
方法会返回true。如果此时的并发请求数大于最大的并发请求数,则需要对当前请求数进行减操作,此时,TryEnterAsync()
方法会返回false。
一旦搞清楚这一点,结合中间件的代码,我们可以非常容易地想明白,这个并发控制的实现思路。下面是QueuePolicy
中TryEnterAsync()
和OnExit()
两个方法的实现,分别代表了“加锁”和“解锁”两个不同的阶段。某种程度上,Seamphore
更像一个水闸,每次可以通过的“流量”是固定的,超出的部分会被直接“拒绝”:
//“加锁”
public ValueTask<bool> TryEnterAsync()
{
// a return value of 'false' indicates that the request is rejected
// a return value of 'true' indicates that the request may proceed
// _serverSemaphore.Release is *not* called in this method,
// it is called externally when requests leave the server
int totalRequests = Interlocked.Increment(ref _totalRequests);
//当前请求次数 > 最大请求次数,返回false表示拒绝
if (totalRequests > _maxTotalRequest) {
Interlocked.Decrement(ref _totalRequests);
return new ValueTask<bool>(false);
}
Task task = _serverSemaphore.WaitAsync();
if (task.IsCompletedSuccessfully) {
return new ValueTask<bool>(true);
}
return SemaphoreAwaited(task);
}
//“解锁”
public void OnExit()
{
_serverSemaphore.Release();
Interlocked.Decrement(ref _totalRequests);
}
揭秘 StackPolicy
除了QueuePolicy
这种实现以外,官方还提供了StackPolicy
的实现。从名称上,我们就能大致区分出它们的不同,因为我相信大家都能拎得清“队列”和“栈”。在实现StackPolicy
的过程中,首先会判断是否还有访问请求次数_freeServerSpots
,直接返回true,确保中间件可以继续执行。如果_queueLength
和我们设置的队列最大容量相同,此时,表示队列已满,需要先取消之前的请求,并保留后来的请求。
public ValueTask<bool> TryEnterAsync()
{
lock (_bufferLock) {
if (_freeServerSpots > 0) {
_freeServerSpots--;
return new ValueTask<bool>(true);
}
// 队列已满,则取消之前的请求,即_head
if (_queueLength == _maxQueueCapacity) {
_hasReachedCapacity = true;
_buffer[_head].Complete(false);
_queueLength--;
}
var tcs = _cachedResettableTCS ??
= new ResettableBooleanCompletionSource(this);
_cachedResettableTCS = null;
if (_hasReachedCapacity || _queueLength < _buffer.Count) {
_buffer[_head] = tcs;
} else {
_buffer.Add(tcs);
}
_queueLength++;
// increment _head for next time
// 如果_head = 最大队列容量,则_head需要移动至首位
_head++;
if (_head == _maxQueueCapacity) {
_head = 0;
}
return tcs.GetValueTask();
}
}
public void OnExit()
{
lock (_bufferLock) {
if (_queueLength == 0) {
_freeServerSpots++;
f (_freeServerSpots > _maxConcurrentRequests) {
_freeServerSpots--;
throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
}
return;
}
// step backwards and launch a new task
if (_head == 0) {
_head = _maxQueueCapacity - 1;
} else {
_head--;
}
_buffer[_head].Complete(true);
_queueLength--;
}
}
所以,现在,你可以感受到这两种策略的差异了,QueuePolicy
是一个水闸,“多”出来的流量会被直接拒绝掉。StackPolicy
是一个垂直的管道,每次都是先取消底部的请求,再让新的请求从顶部进来。此时,如果我们再回过头来看 ConcurrencyLimiterMiddleware 这个中间件的实现,就会有种恍然大悟的感觉。
揭秘 Middleware
public async Task Invoke(HttpContext context)
{
// Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
// 以下代码片段,其实都是调用IQueuePolicy.TryEnterAsync()
var waitInQueueTask = _queuePolicy.TryEnterAsync();
bool result;
if (waitInQueueTask.IsCompleted) {
ConcurrencyLimiterEventSource.Log.QueueSkipped();
result = waitInQueueTask.Result;
}
else {
using (ConcurrencyLimiterEventSource.Log.QueueTimer())
{
result = await waitInQueueTask;
}
}
// 当result为true,表示请求被接收,此时,让中间件继续执行
// 切记:调用_queuePolicy.OnExit()来释放锁
if (result) {
try {
await _next(context);
}
finally {
_queuePolicy.OnExit();
}
} else {
//这里就是请求被拒绝的情况,修改状态码以及输出错误信息
ConcurrencyLimiterEventSource.Log.RequestRejected();
ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
await _onRejected(context);
}
}
至此,我们就理清了整个中间件的运作机制,ConcurrencyLimiterMiddleware 中注入了IQueuePolicy
这个接口,当一个新的请求进来,中间件会调用IQueuePolicy
接口的TryEnterAsync()
方法,该方法决定了一个请求是会被接受还是拒绝。当请求被接受的时候,中间件会调用_next(context)
让请求继续往下走;当请求被拒绝的时候,中间件会修改 HTTP 状态码(503) 和 返回值,保证调用者可以收到错误信息。这就是这个中间件全部的秘密。而如果要在项目中使用这个中间件,同样是非常简单的:
// 中间件基本法,先注册后使用
// ConfigureServices()
// 或者 services.AddQueuePolicy()
services.AddStackPolicy(options =>
{
options.MaxConcurrentRequests = 2;
options.RequestQueueLimit = 25;
})
// Configure()
app.UseConcurrencyLimiter();
本文小结
这篇博客,主要揭秘了 ASP.NET Core 中的 ConcurrencyLimiter 中间件,这个中间件的主要功能是控制 ASP.NET Core 中的请求并发。作为这个中间件的核心,微软为 IQueuePolicy
接口提供了 QueuePolicy
和 StackPolicy
两种不同的策略实现。其中,QueuePolicy
是一个水闸,“多”出来的流量会被直接拒绝掉。StackPolicy
是一个垂直的管道,每次都是先取消底部的请求,再让新的请求从顶部进来。对于我们而言,这个中间件最值得学习的地方,其实是SeamphoreSlim
和Interlocked
,我们经常提到“锁”,其实,“锁”不单单是指 .NET 中Monitor
的语法糖,即lock
关键字,在同步信号量以及线程同步的相关话题中,我们还会接触到譬如 Mutex(互斥锁)、ReaderWriterLockSlim、Interlocked(原子操作)、SpinLock(自旋锁) 以及 SeamphoreSlim 等等不同的“锁”。除此之外,还有譬如AutoResetEvent、ManualResetEvent 和 ManualResetEventSlim 等等的同步信号量。如果有读者朋友对此感兴趣,可以到 MSDN 上去搜索相关的关键字,能让博主本人和大家从中有所收获,这是我坚持写下去的理由。好了,以上就是这篇博客的全部内容啦,欢迎大家在评论区留言、讨论。