打算开一个新的专栏——源代码探案系列,目的是通过源代码来探索更广阔的技术世界。因为我越来越意识到,我可能缺乏一个结构化的知识体系,虽然处在一个碎片化的时代,从外界接收了大量的信息,可这些碎片化的信息,到底能不能转化为自身可用的知识,其实是需要去认真思考一番。尤其是当我注意到,许多人工作多年,在经历过从“生手”到“熟练工”这种蜕变以后,居然还是会害怕原理性内容的考察。我承认,程序员这个职业更像是一个“手艺人”,可我更想说一句古人的话——君子不器。什么是器呢?“形而上者谓之道,形而下者谓之器”,用一句更直白的话来说,就是“不能知其然而不知其所以然”,这是我一个非CS科班出身的程序员,想去写这样一个专栏的初衷,因为在我看来,“”是永远学不完的,而“”虽然听起来虚无缥缈,实则“朝闻道,夕死可矣”。

作为这个专栏的第一篇博客,我打算从 ASP.NET Core 中的 ConcurrencyLimiter 这个中间件开始。并发是一个爱恨交织的话题,我们喜欢高并发,因为这是程序员跻身高手行列的好机会;我们厌恶并发,因为它引入了多线程、锁、信号量这些复杂的东西。相信大家都曾被并发困扰过,古人云:他山之石,可以攻玉,还有什么比阅读源代码更朴实无华的“学习”呢?你找大牛,大牛可能忙着开会、做PPT;你找同事,同事里可能十个有八个都不知道啊。这个中间件的核心是 IQueuePolicy ,其位于以下位置,它定义了两个核心的方法:TryEnterAsync()OnExit()

1
2
3
4
5
public interface IQueuePolicy
{
ValueTask<bool> TryEnterAsync();
void OnExit();
}

在其默认实现QueuePolicy中,TryEnterAsync()方法,决定着一个请求是会被拒绝还是接受。具体是怎么做呢?它定义了一个最大的并发请求数目,如果实际数超过了最大的并发请求数目,那么请求将会被拒绝。反之,请求将被接受。再仔细看,我们就会发现,它内部使用了SeamphoreSlimInterlocked,所以,聪明的小伙伴们应该立马会联想到,这两种锁各自的作用是什么。

其中,Seamphore 是一个 Windows 内核中的一个同步信号量,适用于在多个有限的线程资源中共享内存资源,它就像一个栅栏,本身具有一定的容量,当线程数量达到这个容量后,新的线程就无法再通过,直到某个线程执行完成。SeamphoreSlimSeamphore优化后的版本,在性能上表现更好一点,更推荐大家使用SeamphoreSlim

Interlocked 的则是我们熟悉的原子操作,它可以在多个线程中,对共享的内存资源进行原子加或者原子减操作。在这里,Interlocked主要用来控制并发请求数的加和减。如果当前的并发请求数小于最大的并发请求数,表示还可以允许新的请求进来,此时,TryEnterAsync()方法会返回true。如果此时的并发请求数大于最大的并发请求数,则需要对当前请求数进行减操作,此时,TryEnterAsync()方法会返回false。

一旦搞清楚这一点,结合中间件的代码,我们可以非常容易地想明白,这个并发控制的实现思路。下面是QueuePolicyTryEnterAsync()OnExit()两个方法的实现,分别代表了“加锁”和“解锁”两个不同的阶段。某种程度上,Seamphore更像一个水闸,每次可以通过的“流量”是固定的,超出的部分会被直接“拒绝”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//“加锁”
public ValueTask<bool> TryEnterAsync()
{
// a return value of 'false' indicates that the request is rejected
// a return value of 'true' indicates that the request may proceed
// _serverSemaphore.Release is *not* called in this method,
// it is called externally when requests leave the server
int totalRequests = Interlocked.Increment(ref _totalRequests);

//当前请求次数 > 最大请求次数,返回false表示拒绝
if (totalRequests > _maxTotalRequest) {
Interlocked.Decrement(ref _totalRequests);
return new ValueTask<bool>(false);
}

Task task = _serverSemaphore.WaitAsync();
if (task.IsCompletedSuccessfully) {
return new ValueTask<bool>(true);
}

return SemaphoreAwaited(task);
}

//“解锁”
public void OnExit()
{
_serverSemaphore.Release();
Interlocked.Decrement(ref _totalRequests);
}

揭秘 StackPolicy

除了QueuePolicy这种实现以外,官方还提供了StackPolicy的实现。从名称上,我们就能大致区分出它们的不同,因为我相信大家都能拎得清“队列”和“栈”。在实现StackPolicy的过程中,首先会判断是否还有访问请求次数_freeServerSpots,直接返回true,确保中间件可以继续执行。如果_queueLength和我们设置的队列最大容量相同,此时,表示队列已满,需要先取消之前的请求,并保留后来的请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public ValueTask<bool> TryEnterAsync()
{
lock (_bufferLock) {
if (_freeServerSpots > 0) {
_freeServerSpots--;
return new ValueTask<bool>(true);
}

// 队列已满,则取消之前的请求,即_head
if (_queueLength == _maxQueueCapacity) {
_hasReachedCapacity = true;
_buffer[_head].Complete(false);
_queueLength--;
}

var tcs = _cachedResettableTCS ??
= new ResettableBooleanCompletionSource(this);
_cachedResettableTCS = null;

if (_hasReachedCapacity || _queueLength < _buffer.Count) {
_buffer[_head] = tcs;
} else {
_buffer.Add(tcs);
}
_queueLength++;

// increment _head for next time
// 如果_head = 最大队列容量,则_head需要移动至首位
_head++;
if (_head == _maxQueueCapacity) {
_head = 0;
}

return tcs.GetValueTask();
}
}

public void OnExit()
{
lock (_bufferLock) {
if (_queueLength == 0) {
_freeServerSpots++;

f (_freeServerSpots > _maxConcurrentRequests) {
_freeServerSpots--;
throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
}

return;
}

// step backwards and launch a new task
if (_head == 0) {
_head = _maxQueueCapacity - 1;
} else {
_head--;
}

_buffer[_head].Complete(true);
_queueLength--;

}
}

所以,现在,你可以感受到这两种策略的差异了,QueuePolicy是一个水闸,“多”出来的流量会被直接拒绝掉。StackPolicy是一个垂直的管道,每次都是先取消底部的请求,再让新的请求从顶部进来。此时,如果我们再回过头来看 ConcurrencyLimiterMiddleware 这个中间件的实现,就会有种恍然大悟的感觉。

揭秘 Middleware

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public async Task Invoke(HttpContext context)
{
// Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
// 以下代码片段,其实都是调用IQueuePolicy.TryEnterAsync()
var waitInQueueTask = _queuePolicy.TryEnterAsync();
bool result;

if (waitInQueueTask.IsCompleted) {
ConcurrencyLimiterEventSource.Log.QueueSkipped();
result = waitInQueueTask.Result;
}
else {
using (ConcurrencyLimiterEventSource.Log.QueueTimer())
{
result = await waitInQueueTask;
}
}

// 当result为true,表示请求被接收,此时,让中间件继续执行
// 切记:调用_queuePolicy.OnExit()来释放锁
if (result) {
try {
await _next(context);
}
finally {
_queuePolicy.OnExit();
}
} else {
//这里就是请求被拒绝的情况,修改状态码以及输出错误信息
ConcurrencyLimiterEventSource.Log.RequestRejected();
ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
await _onRejected(context);
}
}

至此,我们就理清了整个中间件的运作机制,ConcurrencyLimiterMiddleware 中注入了IQueuePolicy这个接口,当一个新的请求进来,中间件会调用IQueuePolicy接口的TryEnterAsync()方法,该方法决定了一个请求是会被接受还是拒绝。当请求被接受的时候,中间件会调用_next(context)让请求继续往下走;当请求被拒绝的时候,中间件会修改 HTTP 状态码(503) 和 返回值,保证调用者可以收到错误信息。这就是这个中间件全部的秘密。而如果要在项目中使用这个中间件,同样是非常简单的:

1
2
3
4
5
6
7
8
9
10
11
12

// 中间件基本法,先注册后使用
// ConfigureServices()
// 或者 services.AddQueuePolicy()
services.AddStackPolicy(options =>
{
options.MaxConcurrentRequests = 2;
options.RequestQueueLimit = 25;
})

// Configure()
app.UseConcurrencyLimiter();

本文小结

这篇博客,主要揭秘了 ASP.NET Core 中的 ConcurrencyLimiter 中间件,这个中间件的主要功能是控制 ASP.NET Core 中的请求并发。作为这个中间件的核心,微软为 IQueuePolicy 接口提供了 QueuePolicyStackPolicy 两种不同的策略实现。其中,QueuePolicy是一个水闸,“多”出来的流量会被直接拒绝掉。StackPolicy是一个垂直的管道,每次都是先取消底部的请求,再让新的请求从顶部进来。对于我们而言,这个中间件最值得学习的地方,其实是SeamphoreSlimInterlocked,我们经常提到“”,其实,“”不单单是指 .NET 中Monitor的语法糖,即lock关键字,在同步信号量以及线程同步的相关话题中,我们还会接触到譬如 Mutex(互斥锁)、ReaderWriterLockSlim、Interlocked(原子操作)SpinLock(自旋锁) 以及 SeamphoreSlim 等等不同的“”。除此之外,还有譬如AutoResetEvent、ManualResetEvent 和 ManualResetEventSlim 等等的同步信号量。如果有读者朋友对此感兴趣,可以到 MSDN 上去搜索相关的关键字,能让博主本人和大家从中有所收获,这是我坚持写下去的理由。好了,以上就是这篇博客的全部内容啦,欢迎大家在评论区留言、讨论。