两个月前,我写过一篇题为为《关于 ChatGPT 的流式传输,你需要知道的一切》的文章。当时,我主要聚焦于 “流式传输” 这个概念。因此,Server-Sent Events、WebSocket 等技术,便顺理成章地成为了我的写作内容。然而,当我们将视野扩展到整个生成式 AI 领域时,我们会发现 “取消” 是一个非常普遍的业务场景。尽管我曾在这篇文章中提到了 AbortController
和 CancellationToken
,但我并不认为我完全解决了当时的问题,即:如何让前、后端的取消动作真正地协同起来?言下之意,我希望前端的 AbortController 发起取消请求以后,后端的 CancellationToken 能够及时感知并响应这一变化。这一切就好比,AI 智能体固然可以通过 “观察” 来感知外部的变化,可当用户决定停止生成的时候,一切都应该戛然而止,无论你是不是为了节省那一点点 token。所以,当两个月前的子弹正中眉心时,我决定继续探讨这个话题。由此,便有了今天这篇稍显多余的博客。
前后端协同取消
我必须承认,在推崇前后端分离的当下,我这个想法难免显得不合时宜。可什么是合时宜呢?在刚刚落幕的巴黎奥运会上,35 岁的 “龙队” 马龙,斩获了个人第 6 枚奥运金牌。对此,这个被誉为 “六边形战士” 的男人表示,“只要心怀热爱,永远都是当打之年”。这是否说明,一切的不合时宜都是自我设限,而年龄不过是个数字。在以往的工作中,我接触的主要是 “Fire and Forget” 这类场景。特别是当一个任务相对短暂时,有没有真正地取消从来都不会成为讨论的重点。直到最近做 Agent 的时候,我发觉这一切其实可以做得更好,即便我的原动力是为了省钱。
async Task Main() {
Console.WriteLine("[HeartBeat] 服务运行中,请按 Ctrl + C 键取消...");
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (sender, e) => {
e.Cancel = true;
cts.Cancel();
};
try {
await HeartBeatAsync(cts.Token);
} catch (OperationCanceledException) {
Console.WriteLine("[HeartBeat] 服务已停止.");
}
}
首先,我们来考虑一般场景下的 CancellationToken 的使用,通常它需要搭配 CancellationTokenSource 来使用。当然,产生这一切的背景是 .NET 中提供了基于任务(Task)的异步编程模型 TAP,我们需要一种机制来处理任务的取消请求。考虑到线程本身只有一个 Abort() 方法,CancellationTokenSource 以及 CancellationToken 这种在多线程环境中传达取消请求、无需终止线程的技术应运而生。参考上述示例,CancellationTokenSource 在其中扮演着调度者的角色,它向 HeartBeatAsync()
这个方法传递了一个令牌,并在某个时间点通过 Cancel() 方法触发了取消动作。相对应地,在 HeartBeatAsync()
方法中,我们需要及时检查 CancellationToken 的 IsCancellationRequested
属性,以判断是否有取消请求产生,如以下代码片段所示:
async Task HeartBeatAsync(CancellationToken cancellationToken) {
while (!cancellationToken.IsCancellationRequested) {
Console.WriteLine($"[HeartBeat] {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
await Task.Delay(1000, cancellationToken);
}
}
可以注意到,async
关键字会造成对方法的异步化污染,类似地,CancellationToken
需要在所有的异步方法中传播下去,这可能会造成一定的副作用。关于这个问题,我们先暂时将其搁置在一边。现在,让我们来思考一个问题:在这个示例中取消请求由我们来发起,那么,在一个 Web 服务中,取消请求应该有谁来发起呢?你可能会说,你这个问题问得可真没水平,这肯定是客户端嘛!好,顺着这个思路继续深挖下去,客户端发起了取消请求以后,服务器端如何感知到这一变化的呢?在 ASP.NET Core 中,我们可以通过 HttpContext
中的 RequestAborted
属性来进行判断,它本身就是一个 CancellationToken。当客户端连接到服务器端时,ASP.NET Core 负责构建 HttpContext,其中的 RequestAborted
则来自于一个 CancellationTokenSource 实例;当客户端从服务器端断开连接时,框架将会调用这个 CancellationTokenSource 实例的 Cancel() 方法。下面是一个简化的流程示意图:
为了验证这个猜想,我们可以通过下面的代码来验证。设想我们拥有一个经典的接口 Echo(),它可以像一个复读机一样重复我们的话语。在此,我们人为设置了 2s 的延迟,目的是便于观察 RequestAborted
属性:
[HttpGet("echo")]
public async Task<string> Echo(string text) {
_logger.LogInformation("[{0}] IsCancellationRequested: {1}",
DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"),
HttpContext.RequestAborted.IsCancellationRequested
);
await Task.Delay(1000 * 2);
_logger.LogInformation("[{0}] IsCancellationRequested: {1}",
DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"),
HttpContext.RequestAborted.IsCancellationRequested
);
return text;
}
接下来,我们将编写客户端代码。运用前文中掌握的技巧,我们可以让 CancellationTokenSource 在 2s 以后触发取消请求。让我们来一起看看,这一变化将带来什么结果?
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(2));
try {
using var httpClient = new HttpClient();
var response = await httpClient.GetStringAsync("https://localhost:7261/api/Chat/Echo?text=你好", cts.Token);
} catch (OperationCanceledException) {
Console.WriteLine("请求已取消");
}
此时,我们可以注意到,RequestAborted
中的 IsCancellationRequested
中的属性,在 2s 前后发生了变化:
作为对比,我们这里放一张正常请求的日志截图,相信大家可以都发现其中的差异:
这说明什么呢?这表明,在 Web 服务中,我们可以利用 RequestAborted
来判断客户端是否发起了取消请求。实际上,我们还有更简单的做法,我们可以直接在控制器方法中添加一个 CancellationToken
类型的参数,ASP.NET Core 会自动将 RequestAborted
注入到其中,而这恰好解释了我们的疑问:我们都知道可以在 Controller 中使用这个参数,但我们从来没有想过这个参数从哪里来。对于《关于 ChatGPT 的流式传输,你需要知道的一切》这篇文章的困惑,此刻终于破云见日,这个当时没有解决的问题,现在看来竟然是如此的简单:
[HttpGet("streaming")]
[HttpPost("streaming")]
public async Task GetStreamingAsync(CancellationToken cancellationToken) {
try {
cancellationToken.ThrowIfCancellationRequested();
var text = "天之道,损有余而补不足,是故虚胜实,不足胜有余。其意博,其理奥,其趣深,天地之象分,阴阳之候列,变化之由表,死生之兆彰,不谋而遗迹自同";
HttpContext.Response.ContentType = "text/event-stream";
foreach (var item in text) {
var payload = JsonSerializer.Serialize(new { text = item.ToString() });
var message = $"data: {payload}\n\n";
await HttpContext.Response.WriteAsync(message, Encoding.UTF8, cancellationToken);
await HttpContext.Response.Body.FlushAsync(cancellationToken);
await Task.Delay(200);
}
await HttpContext.Response.WriteAsync("data: [DONE]", cancellationToken);
await HttpContext.Response.Body.FlushAsync(cancellationToken);
await HttpContext.Response.CompleteAsync();
} catch (OperationCanceledException ex) {
_logger.LogInformation("Operation is canceled.");
}
}
此时,我们只需要在前端使用 fetch()
函数以及 AbortController,即可实现真正意义上的取消,如下图所示:
日志表明,此刻后端成功了响应了前端发起的取消请求:
当然,此前我用 SignalR 实现的双向取消机制,现在同样可以工作:
public async Task Generate(string requestId, string prompt) {
var cts = new CancellationTokenSource();
_cancellationTokens[requestId] = cts;
try {
cts.Token.ThrowIfCancellationRequested();
await foreach (var item in _textGenerator.Generate(prompt, cts.Token)) {
await Clients.Caller.SendAsync("ReceiveChunks", JsonSerializer.Serialize(new { text = item }), requestId, cts.Token);
}
} catch (OperationCanceledException ex) {
_logger.LogInformation("The task is canceled.");
}
}
public async Task Cancel(string requestId) {
if (_cancellationTokens.TryGetValue(requestId, out var cts)) {
await cts.CancelAsync();
await Clients.Caller.SendAsync("GenerationCancelled", requestId, cts.Token);
_cancellationTokens.Remove(requestId);
}
}
这个时候,你可能会意识到,你的代码充被着大量的 CancellationToken
入侵了。例如,你会注意到,这里我必须将这个令牌传递给 ITextGenerator
这个接口,而如果该接口内部还有更多的依赖项,这无疑就会变成一种莫名的负担。我一直在考虑优化 Wikit 在 “停止生成” 功能上的代码,现在看来,“长痛不如短痛”,或许我应该尽快开始这块的工作,虽然我预感到这个过程可能会有些痛苦。如图所示,下面是一种解决 CancellationToken
侵入性问题的解决思路,即通过 IHttpContextAccessor
接口封装一个全新的 ICancellationTokenProvider
接口:
interface ICancellationTokenProvider
{
CancellationToken GetCancellationToken();
Task<CancellationToken> GetCancellationTokenAsync();
}
一个基本的实现如下。此时,我们只需要在应该 “取消” 的地方,注入该服务即可:
class CancellationTokenProvider : ICancellationTokenProvider
{
private readonly IHttpContextAccessor _httpContextAccessor;
public CancellationTokenProvider(IHttpContextAccessor httpContextAccessor)
{
_httpContextAccessor = httpContextAccessor;
}
public CancellationToken GetCancellationToken()
{
return _httpContextAccessor?.HttpContext?.RequestAborted ?? CancellationToken.None;
}
public Task<CancellationToken> GetCancellationTokenAsync()
{
var cancellationToken = _httpContextAccessor?.HttpContext?.RequestAborted ?? CancellationToken.None;
return Task.FromResult(cancellationToken);
}
}