返回
Featured image of post 七种武器:延迟队列的原理和实现总结

七种武器:延迟队列的原理和实现总结

这是最好的时代,这是最坏的时代”,英国作家查尔斯·狄更斯在两百多年前写下的这句话,如果从辩证的角度来看,它或许可以适用于任何一个时代。我们生活在一个怎样的时代呢?我想,或许是一个矛盾的时代。因为,有时它让你对未来有无限的期待,有时它又会让你陷入无尽的绝望,特别是当集体和个人的命运形成强烈反差的时候,当实用主义、精致利己主义开始盛行的时候,我们偶尔会感慨罗曼蒂克的消亡、怀念从前慢、追忆芳华,可下一秒就被卷入到同时间赛跑的庸庸碌碌当中。生活节奏越来越快,人们越来越追求实时、速度、效率,选择当下的同时,意味着选择实时满足,譬如,我想吃一块美味的蛋糕,我现在就要吃。与之相对的,则被称之延迟满足,譬如,制定一个长期的写作计划以实现个人知识网络的构建。由此可见,人生本来就有快有慢、有张有弛,此时,便引入了这篇文章的主题——延迟队列。

什么是延迟队列

延迟队列,即 DelayQueue,所以,顾名思义,首先,它是一个队列,对于队列这种数据结构,相信大家都不陌生啦!这是一种先入先出(FIFO)的数据结构,就像现实生活中排队讲究先来后到一样,普通队列中的元素都是有序的。相比普通队列,延迟队列主要多了一个延迟的属性,此时,元素何时出队不再取决于入队顺序,而是入队时指定的延迟时间,它表示该元素希望在经过该指定时间后被处理。从某种意义上来讲,延迟队列更像是一种以时间作为权重的集合。我想,单纯地介绍概念,不一定能真正深入人心,所以,请允许我举几个生活中的例子:当你在网上购物的时候,如果下单后一段时间内没有完成付款,那这个订单就会被自动取消;当你通过 Outlook 预约了会议以后,Outlook 会在会议开始前 15 分钟提醒所有与会人员;当你在网上叫外卖以后,平台会在订单即将超时前 10 分钟通知外卖小哥…这样看起来,是不是顿时觉得延迟队列的使用场景还是挺广泛的呢?因为工作上的关系,博主接触类似场景的机会还是蛮多的,所以,想系统地研究下相关的技术,最终,就有了今天这篇博客,下面我们来看看具体的实现方式有哪些。

延迟队列的实现方式

延迟队列思维导图
延迟队列思维导图

我知道,在一个短视频横行的时代,人们的注意力注定要被那些实时满足的事物消耗掉,在我有预感到,不会有多少人愿意在我这篇自以为是的文字前驻留的时候,我唯有识趣地放出这个思维导图,TLDR的这种心理,其实我完全可以感同身受,因为看一部电影永远比看一本书容易,当媒介从文字变成图片再到视频,本质上是我们获取信息的能力下降了,我们变得只能接受低密度的信息。当然,这是一个时代的症结,你可以拥有你的选择,是独善其身还是随波逐流?

数据结构

JDK 中提供了一个延迟队列的实现 DelayQueue,位于 Java.util.concurrent 这个包下面,它是一个 BlockingQueue,本质上封装了一个 PriorityQueue,队列中的元素只有到达了Delay时间,才允许从队列中取出。如下图所示,队列中放入三个订单,分别设置订单在当前时间的第 5、10、15 秒后取消:

延迟队列示意图
延迟队列示意图

对于 Java 中的 DelayQueue 而言,其对应的代码实现如下面所示:

Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);

DelayQueue<Order> delayQueue = new DelayQueue<>();
delayQueue.put(Order1);
delayQueue.put(Order2);
delayQueue.put(Order3);

System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
while (delayQueue.size() != 0) {
    Order task = delayQueue.poll();
    if (task != null) {
        System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.name, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
    }
    Thread.sleep(1000);
}

其中,Order 类要求实现 Delayed 接口,可以注意到这个 compareTo() 方法和 .NET 里的 IComparable 完全一样 :)

public class Order implements Delayed {
    @JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
    private long time;
    String name;
    
    public Order(String name, long time, TimeUnit unit) {
        this.name = name;
        this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
    }
    
    @Override
    public long getDelay(TimeUnit unit) {
        return time - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        Order Order = (Order) o;
        long diff = this.time - Order.time;
        if (diff <= 0) {
            return -1;
        } else {
            return 1;
        }
    }
}

此时,我们可以得到下面的结果,三个订单分别在第 5、10、15 秒后被执行,这样就实现了一个最简单的延时队列。我不会告诉你,为了得到这个演示结果,我特意搭建了一个 Java 环境:

Java 中的 DelayQueue 效果演示
Java 中的 DelayQueue 效果演示

.NET 中一直没有提供类似的实现,直到 .NET 6.0 中新增了 PriorityQueue 这个数据结构,它允许我们为队列中的元素定义一个优先级,此时,我们可以用下面的方法实现上面的功能:

var utcNow = DateTime.UtcNow;
var queue = new PriorityQueue<FooBar, long>();
queue.Enqueue(new FooBar() { Foo = "001", Bar = "100" }, new DateTimeOffset(utcNow.AddSeconds(5)).ToUnixTimeSeconds());
queue.Enqueue(new FooBar() { Foo = "002", Bar = "200" }, new DateTimeOffset(utcNow.AddSeconds(10)).ToUnixTimeSeconds());
queue.Enqueue(new FooBar() { Foo = "003", Bar = "300" }, new DateTimeOffset(utcNow.AddSeconds(15)).ToUnixTimeSeconds());

while (queue.Count > 0)
{
    var current = new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds();
    var flag = queue.TryPeek(out var item, out var timestamp);
    if (!flag || current < timestamp){
        continue;
    } else {
        item = queue.Dequeue();
        _logger.LogInformation($"{DateTimeOffset.UtcNow}:Hello DelayQueue, {item.Foo}, {item.Bar}.");
    }
}

基本思路是,每次生成一个时间戳作为队列元素的“权重”,然后用当前时间和这个时间戳进行比较,如果时间到了,则从队列中出队,否则继续轮询:

.NET 中的 PriorityQueue 效果演示
.NET 中的 PriorityQueue 效果演示

可以注意到,它可以按照我们预期的时间和顺序,从队列中取出相应的元素,考虑到这个方法里使用了轮询,做法着实算不上优秀,不过对于我们理解 DelayQueue 非常有帮助,属于一种最基础的的实现。

定时任务

接下来,我们来说第二种实现方式,定时任务,这种方式就非常的朴实无华啦,因为对于一个延迟执行的任务而言,其本质就是一个定点执行、执行一次的定时任务啦,所以,理论上普通的 Timer 一样可以做这件事情。不过,考虑到任务的持久化、分布式等等的问题,我们还是建议使用相对成熟的定时任务框架,例如 Quartz.NETHangfire 等等来实现。这里博主以 Quartz.NET 为例:

public async Task PutJob<T>(TimeSpan delay, T jobData, Action<T> callback)
{
    var jobDetail = JobBuilder.Create<DelayJob<T>>()
        .WithIdentity(Guid.NewGuid().ToString("N"), JobGroup)
        .UsingJobData(JobParameters, JsonConvert.SerializeObject(jobData))
        .Build();

    jobDetail.JobDataMap[JobDelegate] = callback;

    var trigger = TriggerBuilder.Create()
        .WithIdentity($"{jobDetail.Key.Name}Trigger", JobGroup)
        .ForJob(jobDetail.Key)
        .StartAt(DateTimeOffset.UtcNow.Add(delay))
        .WithSimpleSchedule(x => x
            .WithRepeatCount(0)
            .WithIntervalInSeconds(0)
        )
        .Build();

    await _scheduler.ScheduleJob(jobDetail, trigger);
}

对于 Quartz 而言,核心的对象只有三个:JobTriggerSchedulerb,通过这三个对象,我们就可以创建一个定时任务,其中, DelayJob<T> 是表示一个带参数的任务,它实现了 IJob 接口,可以在任务执行时触发对应的委托:

internal class DelayJob<T> : IJob
{
    public Task Execute(IJobExecutionContext context)
    {
        var jobDetail = context.JobDetail;
        var callback = jobDetail.JobDataMap[QuartzDelayQueue.JobDelegate] as Action<T>;
        var jobData = context.MergedJobDataMap[QuartzDelayQueue.JobParameters]?.ToString();
        var jobParam = JsonConvert.DeserializeObject<T>(jobData);
        callback?.Invoke(jobParam);
        return Task.CompletedTask;
    }
}

使用时非常简单,只要给一个延迟时间和回调函数即可:

await _delayQueue.PutJob(
    TimeSpan.FromSeconds(10),
    new FooBar() { Foo = "Foo", Bar = "Bar" },
    x => _logger.LogInformation($"{DateTimeOffset.UtcNow}:Hello DelayQueue, {x.Foo}, {x.Bar}.")
);

基于 Quartz 实现延时任务
基于 Quartz 实现延时任务

可以注意到,Quartz 在指定时间成功触发了回调函数,这样就达到了延时执行的目的。

Redis

接下来,分享两种基于 Redis 实现延迟队列的做法,分别基于 Redis 的 Key 过期机制 和 RedisZSet 结构,前者依赖 Redis 提供的发布-订阅机制,后者则是利用 ZSet 里每个成员的 score 属性实现排序。

基于 Redis 的 Key 过期机制

这个做法主要是利用 Redis 中的 Key 过期机制,简单来讲,就是利用 Redis 中的发布/订阅功能,如果我们开启了 Redis 的 Key 过期事件监听,那么,当某个 Key 过期的时候,Redis 就会把这条消息发布出来,通过订阅这个事件,从而达到延迟队列的效果。首先,确保 Redis 开启了 Key 过期事件监听,修改 Redis 的配置文件 redis.conf 如下:

notify-keyspace-events Ex

在这种情况下,如果我们为某一个 Key 指定了过期时间,那么,当到达这个过期时间以后,Redis 会向名为 __keyevent@0__:expired 的频道中推送一条消息,消息的内容为过期的这个 Key,其中 @0 表示默认的 Redis 库,这里以 CSRedis 这个库为例来进行演示:

public Task PutJob<T>(TimeSpan delay, T jobData, Action<T> callback)
{
    var guid = Guid.NewGuid().ToString("N");

    // Default Database
    // EXPIRED_KEYS_CHANNEL = "__keyevent@{0}__:expired";
    var channel = string.Format(EXPIRED_KEYS_CHANNEL, 0);

    _redisClient.Set(guid, jobData, delay);
    _redisClient.Subscribe((channel, new Action<CSRedisClient.SubscribeMessageEventArgs>(msg =>
    {
        if (msg.Body != guid) return;
        callback?.Invoke(jobData);
    })));

    _logger.LogInformation($"{DateTimeOffset.UtcNow}:Put a new delay job.");

    return Task.CompletedTask;
}

代码非常好理解,写入 Key 的时候设置一个过期时间,然后订阅 Key 过期的事件,因为 Key 过期事件的内容就是对应的 Key,所以,需要做一次判断避免重复触发。此时,我们可以得到下面的结果:

基于 Redis 的 Key 过期机制实现延迟队列
基于 Redis 的 Key 过期机制实现延迟队列

可以注意到,该任务在第 29 秒时创建,经过 5 秒后,因为 Key 过期而触发回调函数。需要说明的是,Redis 里的发布/订阅是不保证可靠性的,针对所有试图通过 Redis 实现消息队列的想法,我只想说,如果数据量不大,并且不需要可靠性保证的话,可以凑活着用一用,否则,还是建议使用专业的消息队列。

基于 Redis 的 ZSet 结构

接下来,我想介绍的是 Redis 中的 ZSet,即有序集合。其实,从一开始的 DelayQueue 大家就能注意到一件事情,那就是这个延迟队列最重要的是,要给一个“权重”来实现排序。所以,在 .NET 6.0 没有发布以前,人们为了实现类似 DelayQueue 的数据结构,通常只能通过 SortedList 这个类型来实现,感兴趣的朋友不妨参考这个项目:DelayQueue,这里面最大的难点是什么呢?SortedList是一个线程不安全的集合,需要考虑锁的问题,这说明什么呢?这说明模拟 DelayQueue 的关键是找到这样一个有序集合,显然 ZSet 刚好就是这样一个类型,它里面有一个 score 属性,我们只需要把延迟时间放到这个属性上即可。

public class ZSetDelayQueue<T> where T : class
{
    private readonly CSRedisClient _redisClient;
    private const string QueueName = "DelayQueue";

    public ZSetDelayQueue(CSRedisClient redisClient)
    {
        _redisClient = redisClient;
    }

    public Task Enqueue(T item, TimeSpan delay)
    {
        var score = new DateTimeOffset(DateTime.UtcNow.Add(delay)).ToUnixTimeSeconds();
        _redisClient.ZAdd(QueueName, (score, JsonConvert.SerializeObject(item)));
        return Task.CompletedTask;
    }

    public async Task<T> Dequeue()
    {
        var score = new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds(); ;
        var records = _redisClient.ZRangeByScore(QueueName, 0, score, 1);
        if (records.Count() > 0) {
            var item = JsonConvert.DeserializeObject<T>(records[0]);
            await _redisClient.ZRemAsync(QueueName, item);
            return item;
        }

        return null;
    }

    public bool IsEmpty()
    {
        var count = _redisClient.ZCount(QueueName, 0, decimal.MaxValue);
        return count == 0;
    }
}

好了,现在一切都顺利成章了,元素入队的时候计算出对应的时间戳,这个时间戳就是 ZSet 里的 score 属性,调用ZAdd() 即可;同理,元素出队,则是利用 ZRangeByScore() 返回从 0 到 当前时间戳内的一个元素,显然,如果当前时间戳大于或者等于该元素的时间戳,表示这个元素设定的延迟时间已经到了,此时,我们需要调用ZRem() 命令将其从集合中移除,和 Java 里面的 DelayQueue 类似,Redis 会按照 score 属性由小到大排序,这样时间早的会被先取出来,时间晚的会被后取出来,不得不说,这一切堪称完美,接下来就非常简单啦!

var redisZSetDelayQueue = _serviceProvider.GetService<ZSetDelayQueue<FooBar>>();
await redisZSetDelayQueue.Enqueue(new FooBar() { Foo = "001", Bar = "100" }, TimeSpan.FromMinutes(1));
await redisZSetDelayQueue.Enqueue(new FooBar() { Foo = "002", Bar = "200" }, TimeSpan.FromMinutes(2));
await redisZSetDelayQueue.Enqueue(new FooBar() { Foo = "003", Bar = "300" }, TimeSpan.FromMinutes(3));

while (!redisZSetDelayQueue.IsEmpty())
{
    var item = await redisZSetDelayQueue.Dequeue();
    if (item == null) {
        continue;
    } else {
        _logger.LogInformation($"{DateTimeOffset.UtcNow}:Hello DelayQueue, {item.Foo}, {item.Bar}.");
    }
}

这个就和一开始的例子非常接近了,对吧? 效果如何呢,我们一起来看看:

基于 Redis 的 ZSet 类型实现延迟队列
基于 Redis 的 ZSet 类型实现延迟队列

可以看到,三个任务分别在 1 分钟、2 分钟 和 3 分钟后执行,这个延迟队列,个人表示还行,哈哈!事实上,基于 Redis 的延迟队列,业界的方案还是蛮多的,个人比较推荐 有赞 技术团队的方案,感兴趣的朋友可以在本文的基础上做进一步的探究,我个人关注这个话题,是因为我不太喜欢定时任务轮询的做法,虽然这是一种万金油式的做法,我个人更喜欢下面的做法。

消息队列

OK,提到消息队列的话,参照面试八股文,我们会说,消息队列最主要的作用是削峰平谷,换句话说,消息队列可以将短时间内堆积的大量的请求任务“削峰”,然后“平摊”到平时请求任务较少的时段,所以,好像平时一提起 RabbitMQ 或者 Kafka 这样的东西,大家脑海中浮现出来的就是高并发、高吞吐、高性能这种类似糖尿病“三多一少”的存在,回顾我们一开始从生活中得到的启示,有没有一种可能,我们使用消息队列,并不单单是为了让这条消息被快速地消费,而是可以“让子弹飞一会儿”呢?我想,一切皆有可能。下面,我们以 RabbitMQ 为例,来展示如何实现一个延迟队列:

RabbitMQ 死信队列工作流程示意图
RabbitMQ 死信队列工作流程示意图

如图所示,假设消息发送方把消息投递到延迟交换机 default.delay.exchange,该交换机绑定了延迟队列 default.delay.queue,显然,正常情况下,消息会出现在这个延迟队列中。接下来,为了让死信机制生效,我们必须对这个延迟队列做一点设置,这里主要有三个参数,x-message-ttl 表示队列中消息的存活时间,x-dead-letter-exchange 表示消息过期以后再次投递时的死信交换器,x-dead-letter-routing-key 表示消息过期以后再次投递时的路由键名。通常情况下,在 RabbitMQ 中消息进入死信队列的前提有三种,即消息过期、队列已满和消息被拒绝。其中,x-max-lengthx-max-length-bytes 这两个属性,可以分别用来指队列中的最大消息数、最大字节数、而消息被拒绝,则是指主动调用BasicReject() 方法,针对这两种情况触发的死信,我们这里可以不用太关心,因为我们显然考虑的是因为消息过期而触发的死信。OK,讲完了理论,我们来看看代码层面具体是如何实现的吧!

using (var channel = _connection.CreateModel())
{
    // 普通/延迟交换机 default.delay.exchnage
    var exchangeNormal = "default.delay.exchnage";
    channel.ExchangeDeclare(exchangeNormal, "direct", true, false, null);

    // 普通/延迟队列
    var queueNormal = "default.delay.queue";
    var arguments = new Dictionary<string, object>
    {
        ["x-message-ttl"] = 5000,
        ["x-dead-letter-exchange"] = "default.deadletter.exchange",
        ["x-dead-letter-routing-key"] = "dead.routingKey"
    };
    channel.QueueDeclare(queue: queueNormal, true, false, false, arguments: arguments);

    // 绑定交换器
    channel.QueueBind(queueNormal, exchangeNormal, "normal.routingKey");

    // 发送消息
    var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(jobData));
    var properties = channel.CreateBasicProperties();
    properties.DeliveryMode = 2;
    channel.BasicPublish(exchange: exchangeNormal, routingKey: "normal.routingKey", mandatory: true, basicProperties: properties, body: body);
}

简单来说,某一个队列如果需要死信队列,那么你就需要为其设置x-message-ttlx-dead-letter-exchangex-dead-letter-routing-key 这三个属性即可,你完全不用关心消息是如何投递到这个死信队列中,而对于消息的消费者来说,它只需要从这个死信队列中接收消息即可,因为能被投递到死信队列里的消息,一定是因为消息时间到了或者说过期了,这样就等于间接实现了延迟队列:

// 死信交换机 default.deadletter.exchange
var exchangeDead = "default.deadletter.exchange";
_consumerChannel.ExchangeDeclare(exchangeDead, "direct", true, false, null);

// 死信队列 default.deadletter.queue
var queueDead = "default.deadletter.queue";
_consumerChannel.QueueDeclare(queue: queueDead, true, false, false, null);

// 绑定交换器
_consumerChannel.QueueBind(queueDead, exchangeDead, "dead.routingKey");

// 消费消息
_basicConsumer = new EventingBasicConsumer(_consumerChannel);
_consumerChannel.BasicConsume(queue: queueDead, autoAck: false, consumer: _basicConsumer);
_basicConsumer.Received += (s, e) =>
{
    var body = Encoding.UTF8.GetString(e.Body.ToArray());

    // TODO:

    _consumerChannel.BasicAck(e.DeliveryTag, false);
};

如下图所示,消息首先会被发送到延迟队列 default.delay.queue 中,此时,消息还没有过期,不会触发死信机制,注意到,这时候队列中会有 4 条消息:

RabbitMQ 死信队列工作流程-01
RabbitMQ 死信队列工作流程-01

一段时间后,消息过期,触发死信机制。此时,消息会被在再次转发到死信交换机 default.deadletter.exchange 中,并最终达到死信队列 default.deadletter.queue

RabbitMQ 死信队列工作流程-02
RabbitMQ 死信队列工作流程-02

至此,我们就利用 RabbitMQ 里的 TTL + DLX 特性实现了一个延迟队列,达到了延迟执行的目的。不过,只要你使用消息队列,就一定会遇到消息堆积的问题,而一旦发生消息堆积,延迟执行的这个时间可能就会不准,如果你特别看重这个时间准确与否,那么,实际运作中还有一部分工作完要做。我们目前用定时任务轮训的做法,最大的问题是它产生大量重复且无用的请求,每天单单是相关日志就上百兆,这就算是我下班以后的一点探索,我现在依然觉得,那个定时任务的 API 设计得莫名其妙。

RabbitMQ 死信队列工作流程-03
RabbitMQ 死信队列工作流程-03

最后,我们来说说 Kafka,虽然 Kafka 单机的 QPS 要远远超过 RabbitMQ 1 到 2 个数量级,但这种快是以牺牲一部分功能作为代价的,像典型的重试和死信,这两样儿都需要使用者自己去实现,比如死信,我们现在是为每个 topic 创建一个对应的死信的 topic 来实现的,比如,我们有一个 topic 叫做 orderInfo,与之相对应地,我们会同时创建一个叫做 orderInfo_DLQ 的 topic,作为它的死信队列。当然,你还需要一个机制去收集和转发过期消息,基本上你还是需要一个 Timer 去做某种轮询,也许,是因为它选择了 Kafka,所以,需要一个定时任务系统来作为补充,毕竟,技术选型这种问题,注定是要政治正确的啦!

本文小结

《七种武器》是著名武侠小说家古龙先生的代表作之一,原本指长生剑、孔雀翎、碧玉刀、多情环、霸王枪、离别钩等七种精妙绝伦的武器,这里则是用来指实现延迟队列的各种方法,延迟队列适用于那些需要延迟执行的场合,在如今这样一个追求实时性、快节奏生活的时代,人们对快乐和满足的要求有实时和延时的区别,用罗翔老师的话来讲,即时快乐是一种低级的快乐,是一种短暂的、易得的快乐,从这个角度来看,延时满足则是一种需要培养和付出的高级快乐。此中优劣,我们不必去分个泾渭分明,就像这些不同的实现方式,更多的只是场景上的差异,而非功能上的差异,延迟队列可以认为是一种以时间作为权重的、有序的集合;Java 里的 DelayQueue,.NET 里的 PriorityQueue,可以实现进程内的、单机版的延迟队列;而像 QuartzHangfire 这类任务调度系统,则可以更精确地控制时间;通过 Redis 里的发布-订阅、ZSet,我们让 DelayQueue 离分布式稍微接近了一点;而 RabbitMQ 里的 TTL + DLX 特性,则让博主比两年前更加理解死信队列……这难道不是一种延时满足吗?你以前不明白的概念,有一天突然有了新的认识,我想,这就是整个过程的意义所在。当然,时间轮算法对我来说还有一点点难,我能留到未来的某一天争取搞懂它吗?好了,以上就是这篇博客的全部内容啦,祝各位晚安,谢谢大家。

Built with Hugo
Theme Stack designed by Jimmy