返回
Featured image of post 浅议 CancellationToken 在前后端协同取消场景中的应用

浅议 CancellationToken 在前后端协同取消场景中的应用

AI 摘要
本文深入探讨了在生成式AI领域中,前后端协同取消机制的重要性和实现方式。作者首先回顾了流式传输技术,然后通过 .NET 中的 CancellationToken 和 CancellationTokenSource 的使用示例,展示了如何在异步编程模型中处理任务取消请求。文章进一步分析了在 Web 服务中,如何通过 HttpContext.RequestAborted 属性感知客户端的取消请求,并给出了具体的代码示例。最后,作者讨论了在实际开发中,如何优化取消机制的实现,以提高代码的效率和可维护性。

两个月前,我写过一篇题为为《关于 ChatGPT 的流式传输,你需要知道的一切》的文章。当时,我主要聚焦于 “流式传输” 这个概念。因此,Server-Sent EventsWebSocket 等技术,便顺理成章地成为了我的写作内容。然而,当我们将视野扩展到整个生成式 AI 领域时,我们会发现 “取消” 是一个非常普遍的业务场景。尽管我曾在这篇文章中提到了 AbortControllerCancellationToken,但我并不认为我完全解决了当时的问题,即:如何让前、后端的取消动作真正地协同起来?言下之意,我希望前端的 AbortController 发起取消请求以后,后端的 CancellationToken 能够及时感知并响应这一变化。这一切就好比,AI 智能体固然可以通过 “观察” 来感知外部的变化,可当用户决定停止生成的时候,一切都应该戛然而止,无论你是不是为了节省那一点点 token。所以,当两个月前的子弹正中眉心时,我决定继续探讨这个话题。由此,便有了今天这篇稍显多余的博客。

前后端协同取消

我必须承认,在推崇前后端分离的当下,我这个想法难免显得不合时宜。可什么是合时宜呢?在刚刚落幕的巴黎奥运会上,35 岁的 “龙队” 马龙,斩获了个人第 6 枚奥运金牌。对此,这个被誉为 “六边形战士” 的男人表示,“只要心怀热爱,永远都是当打之年”。这是否说明,一切的不合时宜都是自我设限,而年龄不过是个数字。在以往的工作中,我接触的主要是 “Fire and Forget” 这类场景。特别是当一个任务相对短暂时,有没有真正地取消从来都不会成为讨论的重点。直到最近做 Agent 的时候,我发觉这一切其实可以做得更好,即便我的原动力是为了省钱。

async Task Main() {
    Console.WriteLine("[HeartBeat] 服务运行中,请按 Ctrl + C 键取消...");
    
    var cts = new CancellationTokenSource();
    Console.CancelKeyPress += (sender, e) => {
        e.Cancel = true;
        cts.Cancel();
    };

    try {
        await HeartBeatAsync(cts.Token);
    } catch (OperationCanceledException) {
        Console.WriteLine("[HeartBeat] 服务已停止.");
    }
}

首先,我们来考虑一般场景下的 CancellationToken 的使用,通常它需要搭配 CancellationTokenSource 来使用。当然,产生这一切的背景是 .NET 中提供了基于任务(Task)的异步编程模型 TAP,我们需要一种机制来处理任务的取消请求。考虑到线程本身只有一个 Abort() 方法,CancellationTokenSource 以及 CancellationToken 这种在多线程环境中传达取消请求、无需终止线程的技术应运而生。参考上述示例,CancellationTokenSource 在其中扮演着调度者的角色,它向 HeartBeatAsync() 这个方法传递了一个令牌,并在某个时间点通过 Cancel() 方法触发了取消动作。相对应地,在 HeartBeatAsync() 方法中,我们需要及时检查 CancellationToken 的 IsCancellationRequested 属性,以判断是否有取消请求产生,如以下代码片段所示:

async Task HeartBeatAsync(CancellationToken cancellationToken) {
    while (!cancellationToken.IsCancellationRequested) {
        Console.WriteLine($"[HeartBeat] {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
        await Task.Delay(1000, cancellationToken);
    }
}

可以注意到,async 关键字会造成对方法的异步化污染,类似地,CancellationToken 需要在所有的异步方法中传播下去,这可能会造成一定的副作用。关于这个问题,我们先暂时将其搁置在一边。现在,让我们来思考一个问题:在这个示例中取消请求由我们来发起,那么,在一个 Web 服务中,取消请求应该有谁来发起呢?你可能会说,你这个问题问得可真没水平,这肯定是客户端嘛!好,顺着这个思路继续深挖下去,客户端发起了取消请求以后,服务器端如何感知到这一变化的呢?在 ASP.NET Core 中,我们可以通过 HttpContext 中的 RequestAborted 属性来进行判断,它本身就是一个 CancellationToken。当客户端连接到服务器端时,ASP.NET Core 负责构建 HttpContext,其中的 RequestAborted 则来自于一个 CancellationTokenSource 实例;当客户端从服务器端断开连接时,框架将会调用这个 CancellationTokenSource 实例的 Cancel() 方法。下面是一个简化的流程示意图:

服务器端如何感知客户端的取消请求
服务器端如何感知客户端的取消请求

为了验证这个猜想,我们可以通过下面的代码来验证。设想我们拥有一个经典的接口 Echo(),它可以像一个复读机一样重复我们的话语。在此,我们人为设置了 2s 的延迟,目的是便于观察 RequestAborted 属性:

[HttpGet("echo")]
public async Task<string> Echo(string text) {
    _logger.LogInformation("[{0}] IsCancellationRequested: {1}", 
        DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), 
        HttpContext.RequestAborted.IsCancellationRequested
    );

    await Task.Delay(1000 * 2);

    _logger.LogInformation("[{0}] IsCancellationRequested: {1}", 
        DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), 
        HttpContext.RequestAborted.IsCancellationRequested
    );

    return text;
}

接下来,我们将编写客户端代码。运用前文中掌握的技巧,我们可以让 CancellationTokenSource 在 2s 以后触发取消请求。让我们来一起看看,这一变化将带来什么结果?

var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(2));

try {
    using var httpClient = new HttpClient();
    var response = await httpClient.GetStringAsync("https://localhost:7261/api/Chat/Echo?text=你好", cts.Token);
} catch (OperationCanceledException) {
    Console.WriteLine("请求已取消");
}

此时,我们可以注意到,RequestAborted 中的 IsCancellationRequested 中的属性,在 2s 前后发生了变化:

HttpContext 中 RequestAborted 的变化(1)
HttpContext 中 RequestAborted 的变化(1)

作为对比,我们这里放一张正常请求的日志截图,相信大家可以都发现其中的差异:

HttpContext 中 RequestAborted 的变化(2)
HttpContext 中 RequestAborted 的变化(2)

这说明什么呢?这表明,在 Web 服务中,我们可以利用 RequestAborted 来判断客户端是否发起了取消请求。实际上,我们还有更简单的做法,我们可以直接在控制器方法中添加一个 CancellationToken 类型的参数,ASP.NET Core 会自动将 RequestAborted 注入到其中,而这恰好解释了我们的疑问:我们都知道可以在 Controller 中使用这个参数,但我们从来没有想过这个参数从哪里来。对于《关于 ChatGPT 的流式传输,你需要知道的一切》这篇文章的困惑,此刻终于破云见日,这个当时没有解决的问题,现在看来竟然是如此的简单:

[HttpGet("streaming")]
[HttpPost("streaming")]
public async Task GetStreamingAsync(CancellationToken cancellationToken) {
    try {
        cancellationToken.ThrowIfCancellationRequested();

        var text = "天之道,损有余而补不足,是故虚胜实,不足胜有余。其意博,其理奥,其趣深,天地之象分,阴阳之候列,变化之由表,死生之兆彰,不谋而遗迹自同";

        HttpContext.Response.ContentType = "text/event-stream";

        foreach (var item in text) {
            var payload = JsonSerializer.Serialize(new { text = item.ToString() });
            var message = $"data: {payload}\n\n";

            await HttpContext.Response.WriteAsync(message, Encoding.UTF8, cancellationToken);
            await HttpContext.Response.Body.FlushAsync(cancellationToken);
            await Task.Delay(200);
        }

        await HttpContext.Response.WriteAsync("data: [DONE]", cancellationToken);
        await HttpContext.Response.Body.FlushAsync(cancellationToken);
        await HttpContext.Response.CompleteAsync();
    } catch (OperationCanceledException ex) {
        _logger.LogInformation("Operation is canceled.");
    }
}

此时,我们只需要在前端使用 fetch() 函数以及 AbortController,即可实现真正意义上的取消,如下图所示:

前端发起取消请求
前端发起取消请求

日志表明,此刻后端成功了响应了前端发起的取消请求:

后端响应取消请求
后端响应取消请求

当然,此前我用 SignalR 实现的双向取消机制,现在同样可以工作:

public async Task Generate(string requestId, string prompt) {
    var cts = new CancellationTokenSource();
    _cancellationTokens[requestId] = cts;

    try {
        cts.Token.ThrowIfCancellationRequested();

        await foreach (var item in _textGenerator.Generate(prompt, cts.Token)) {
            await Clients.Caller.SendAsync("ReceiveChunks", JsonSerializer.Serialize(new { text = item }), requestId, cts.Token);
        }
    } catch (OperationCanceledException ex) {
        _logger.LogInformation("The task is canceled.");
    }

}

public async Task Cancel(string requestId) {
    if (_cancellationTokens.TryGetValue(requestId, out var cts)) {
        await cts.CancelAsync();
        await Clients.Caller.SendAsync("GenerationCancelled", requestId, cts.Token);

        _cancellationTokens.Remove(requestId);
    }
}

这个时候,你可能会意识到,你的代码充被着大量的 CancellationToken 入侵了。例如,你会注意到,这里我必须将这个令牌传递给 ITextGenerator 这个接口,而如果该接口内部还有更多的依赖项,这无疑就会变成一种莫名的负担。我一直在考虑优化 Wikit 在 “停止生成” 功能上的代码,现在看来,“长痛不如短痛”,或许我应该尽快开始这块的工作,虽然我预感到这个过程可能会有些痛苦。如图所示,下面是一种解决 CancellationToken 侵入性问题的解决思路,即通过 IHttpContextAccessor 接口封装一个全新的 ICancellationTokenProvider 接口:

interface ICancellationTokenProvider
{
    CancellationToken GetCancellationToken();
    Task<CancellationToken> GetCancellationTokenAsync();
}

一个基本的实现如下。此时,我们只需要在应该 “取消” 的地方,注入该服务即可:

class CancellationTokenProvider : ICancellationTokenProvider
{
    private readonly IHttpContextAccessor _httpContextAccessor;
    public CancellationTokenProvider(IHttpContextAccessor httpContextAccessor)
    {
        _httpContextAccessor = httpContextAccessor;
    }

    public CancellationToken GetCancellationToken()
    {
        return _httpContextAccessor?.HttpContext?.RequestAborted ?? CancellationToken.None;
    }

    public Task<CancellationToken> GetCancellationTokenAsync()
    {
        var cancellationToken =  _httpContextAccessor?.HttpContext?.RequestAborted ?? CancellationToken.None;
        return Task.FromResult(cancellationToken);
    }
}

参考资料

Built with Hugo v0.126.1
Theme Stack designed by Jimmy
已创作 273 篇文章,共计 1032286 字