“这是最好的时代,这是最坏的时代”,英国作家查尔斯·狄更斯在两百多年前写下的这句话,如果从辩证的角度来看,它或许可以适用于任何一个时代。我们生活在一个怎样的时代呢?我想,或许是一个矛盾的时代。因为,有时它让你对未来有无限的期待,有时它又会让你陷入无尽的绝望,特别是当集体和个人的命运形成强烈反差的时候,当实用主义、精致利己主义开始盛行的时候,我们偶尔会感慨罗曼蒂克的消亡、怀念从前慢、追忆芳华,可下一秒就被卷入到同时间赛跑的庸庸碌碌当中。生活节奏越来越快,人们越来越追求实时、速度、效率,选择当下的同时,意味着选择实时满足,譬如,我想吃一块美味的蛋糕,我现在就要吃。与之相对的,则被称之延迟满足,譬如,制定一个长期的写作计划以实现个人知识网络的构建。由此可见,人生本来就有快有慢、有张有弛,此时,便引入了这篇文章的主题——延迟队列。
什么是延迟队列
延迟队列,即 DelayQueue
,所以,顾名思义,首先,它是一个队列,对于队列这种数据结构,相信大家都不陌生啦!这是一种先入先出(FIFO
)的数据结构,就像现实生活中排队讲究先来后到一样,普通队列中的元素都是有序的。相比普通队列,延迟队列主要多了一个延迟的属性,此时,元素何时出队不再取决于入队顺序,而是入队时指定的延迟时间,它表示该元素希望在经过该指定时间后被处理。从某种意义上来讲,延迟队列更像是一种以时间作为权重的集合。我想,单纯地介绍概念,不一定能真正深入人心,所以,请允许我举几个生活中的例子:当你在网上购物的时候,如果下单后一段时间内没有完成付款,那这个订单就会被自动取消;当你通过 Outlook
预约了会议以后,Outlook
会在会议开始前 15 分钟提醒所有与会人员;当你在网上叫外卖以后,平台会在订单即将超时前 10 分钟通知外卖小哥…这样看起来,是不是顿时觉得延迟队列的使用场景还是挺广泛的呢?因为工作上的关系,博主接触类似场景的机会还是蛮多的,所以,想系统地研究下相关的技术,最终,就有了今天这篇博客,下面我们来看看具体的实现方式有哪些。
延迟队列的实现方式
我知道,在一个短视频横行的时代,人们的注意力注定要被那些实时满足的事物消耗掉,在我有预感到,不会有多少人愿意在我这篇自以为是的文字前驻留的时候,我唯有识趣地放出这个思维导图,TLDR
的这种心理,其实我完全可以感同身受,因为看一部电影永远比看一本书容易,当媒介从文字变成图片再到视频,本质上是我们获取信息的能力下降了,我们变得只能接受低密度的信息。当然,这是一个时代的症结,你可以拥有你的选择,是独善其身还是随波逐流?
数据结构
JDK
中提供了一个延迟队列的实现 DelayQueue
,位于 Java.util.concurrent
这个包下面,它是一个 BlockingQueue
,本质上封装了一个 PriorityQueue
,队列中的元素只有到达了Delay
时间,才允许从队列中取出。如下图所示,队列中放入三个订单,分别设置订单在当前时间的第 5、10、15 秒后取消:
对于 Java
中的 DelayQueue
而言,其对应的代码实现如下面所示:
Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
DelayQueue<Order> delayQueue = new DelayQueue<>();
delayQueue.put(Order1);
delayQueue.put(Order2);
delayQueue.put(Order3);
System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
while (delayQueue.size() != 0) {
Order task = delayQueue.poll();
if (task != null) {
System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.name, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
Thread.sleep(1000);
}
其中,Order
类要求实现 Delayed
接口,可以注意到这个 compareTo()
方法和 .NET 里的 IComparable
完全一样 :)
public class Order implements Delayed {
@JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private long time;
String name;
public Order(String name, long time, TimeUnit unit) {
this.name = name;
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
Order Order = (Order) o;
long diff = this.time - Order.time;
if (diff <= 0) {
return -1;
} else {
return 1;
}
}
}
此时,我们可以得到下面的结果,三个订单分别在第 5、10、15 秒后被执行,这样就实现了一个最简单的延时队列。我不会告诉你,为了得到这个演示结果,我特意搭建了一个 Java
环境:
.NET
中一直没有提供类似的实现,直到 .NET 6.0 中新增了 PriorityQueue
这个数据结构,它允许我们为队列中的元素定义一个优先级,此时,我们可以用下面的方法实现上面的功能:
var utcNow = DateTime.UtcNow;
var queue = new PriorityQueue<FooBar, long>();
queue.Enqueue(new FooBar() { Foo = "001", Bar = "100" }, new DateTimeOffset(utcNow.AddSeconds(5)).ToUnixTimeSeconds());
queue.Enqueue(new FooBar() { Foo = "002", Bar = "200" }, new DateTimeOffset(utcNow.AddSeconds(10)).ToUnixTimeSeconds());
queue.Enqueue(new FooBar() { Foo = "003", Bar = "300" }, new DateTimeOffset(utcNow.AddSeconds(15)).ToUnixTimeSeconds());
while (queue.Count > 0)
{
var current = new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds();
var flag = queue.TryPeek(out var item, out var timestamp);
if (!flag || current < timestamp){
continue;
} else {
item = queue.Dequeue();
_logger.LogInformation($"{DateTimeOffset.UtcNow}:Hello DelayQueue, {item.Foo}, {item.Bar}.");
}
}
基本思路是,每次生成一个时间戳作为队列元素的“权重”,然后用当前时间和这个时间戳进行比较,如果时间到了,则从队列中出队,否则继续轮询:
可以注意到,它可以按照我们预期的时间和顺序,从队列中取出相应的元素,考虑到这个方法里使用了轮询,做法着实算不上优秀,不过对于我们理解 DelayQueue
非常有帮助,属于一种最基础的的实现。
定时任务
接下来,我们来说第二种实现方式,定时任务,这种方式就非常的朴实无华啦,因为对于一个延迟执行的任务而言,其本质就是一个定点执行、执行一次的定时任务啦,所以,理论上普通的 Timer
一样可以做这件事情。不过,考虑到任务的持久化、分布式等等的问题,我们还是建议使用相对成熟的定时任务框架,例如 Quartz.NET、Hangfire 等等来实现。这里博主以 Quartz.NET 为例:
public async Task PutJob<T>(TimeSpan delay, T jobData, Action<T> callback)
{
var jobDetail = JobBuilder.Create<DelayJob<T>>()
.WithIdentity(Guid.NewGuid().ToString("N"), JobGroup)
.UsingJobData(JobParameters, JsonConvert.SerializeObject(jobData))
.Build();
jobDetail.JobDataMap[JobDelegate] = callback;
var trigger = TriggerBuilder.Create()
.WithIdentity($"{jobDetail.Key.Name}Trigger", JobGroup)
.ForJob(jobDetail.Key)
.StartAt(DateTimeOffset.UtcNow.Add(delay))
.WithSimpleSchedule(x => x
.WithRepeatCount(0)
.WithIntervalInSeconds(0)
)
.Build();
await _scheduler.ScheduleJob(jobDetail, trigger);
}
对于 Quartz
而言,核心的对象只有三个:Job
、Trigger
和 Schedulerb
,通过这三个对象,我们就可以创建一个定时任务,其中, DelayJob<T>
是表示一个带参数的任务,它实现了 IJob
接口,可以在任务执行时触发对应的委托:
internal class DelayJob<T> : IJob
{
public Task Execute(IJobExecutionContext context)
{
var jobDetail = context.JobDetail;
var callback = jobDetail.JobDataMap[QuartzDelayQueue.JobDelegate] as Action<T>;
var jobData = context.MergedJobDataMap[QuartzDelayQueue.JobParameters]?.ToString();
var jobParam = JsonConvert.DeserializeObject<T>(jobData);
callback?.Invoke(jobParam);
return Task.CompletedTask;
}
}
使用时非常简单,只要给一个延迟时间和回调函数即可:
await _delayQueue.PutJob(
TimeSpan.FromSeconds(10),
new FooBar() { Foo = "Foo", Bar = "Bar" },
x => _logger.LogInformation($"{DateTimeOffset.UtcNow}:Hello DelayQueue, {x.Foo}, {x.Bar}.")
);
可以注意到,Quartz
在指定时间成功触发了回调函数,这样就达到了延时执行的目的。
Redis
接下来,分享两种基于 Redis 实现延迟队列的做法,分别基于 Redis
的 Key 过期机制 和 Redis
的 ZSet
结构,前者依赖 Redis
提供的发布-订阅机制,后者则是利用 ZSet
里每个成员的 score
属性实现排序。
基于 Redis 的 Key 过期机制
这个做法主要是利用 Redis
中的 Key 过期机制,简单来讲,就是利用 Redis
中的发布/订阅功能,如果我们开启了 Redis
的 Key 过期事件监听,那么,当某个 Key 过期的时候,Redis
就会把这条消息发布出来,通过订阅这个事件,从而达到延迟队列的效果。首先,确保 Redis
开启了 Key 过期事件监听,修改 Redis 的配置文件 redis.conf
如下:
notify-keyspace-events Ex
在这种情况下,如果我们为某一个 Key 指定了过期时间,那么,当到达这个过期时间以后,Redis
会向名为 __keyevent@0__:expired
的频道中推送一条消息,消息的内容为过期的这个 Key,其中 @0
表示默认的 Redis
库,这里以 CSRedis 这个库为例来进行演示:
public Task PutJob<T>(TimeSpan delay, T jobData, Action<T> callback)
{
var guid = Guid.NewGuid().ToString("N");
// Default Database
// EXPIRED_KEYS_CHANNEL = "__keyevent@{0}__:expired";
var channel = string.Format(EXPIRED_KEYS_CHANNEL, 0);
_redisClient.Set(guid, jobData, delay);
_redisClient.Subscribe((channel, new Action<CSRedisClient.SubscribeMessageEventArgs>(msg =>
{
if (msg.Body != guid) return;
callback?.Invoke(jobData);
})));
_logger.LogInformation($"{DateTimeOffset.UtcNow}:Put a new delay job.");
return Task.CompletedTask;
}
代码非常好理解,写入 Key 的时候设置一个过期时间,然后订阅 Key 过期的事件,因为 Key 过期事件的内容就是对应的 Key,所以,需要做一次判断避免重复触发。此时,我们可以得到下面的结果:
可以注意到,该任务在第 29 秒时创建,经过 5 秒后,因为 Key 过期而触发回调函数。需要说明的是,Redis
里的发布/订阅是不保证可靠性的,针对所有试图通过 Redis
实现消息队列的想法,我只想说,如果数据量不大,并且不需要可靠性保证的话,可以凑活着用一用,否则,还是建议使用专业的消息队列。
基于 Redis 的 ZSet 结构
接下来,我想介绍的是 Redis
中的 ZSet
,即有序集合。其实,从一开始的 DelayQueue
大家就能注意到一件事情,那就是这个延迟队列最重要的是,要给一个“权重”来实现排序。所以,在 .NET 6.0 没有发布以前,人们为了实现类似 DelayQueue
的数据结构,通常只能通过 SortedList
这个类型来实现,感兴趣的朋友不妨参考这个项目:DelayQueue,这里面最大的难点是什么呢?SortedList
是一个线程不安全的集合,需要考虑锁的问题,这说明什么呢?这说明模拟 DelayQueue
的关键是找到这样一个有序集合,显然 ZSet
刚好就是这样一个类型,它里面有一个 score
属性,我们只需要把延迟时间放到这个属性上即可。
public class ZSetDelayQueue<T> where T : class
{
private readonly CSRedisClient _redisClient;
private const string QueueName = "DelayQueue";
public ZSetDelayQueue(CSRedisClient redisClient)
{
_redisClient = redisClient;
}
public Task Enqueue(T item, TimeSpan delay)
{
var score = new DateTimeOffset(DateTime.UtcNow.Add(delay)).ToUnixTimeSeconds();
_redisClient.ZAdd(QueueName, (score, JsonConvert.SerializeObject(item)));
return Task.CompletedTask;
}
public async Task<T> Dequeue()
{
var score = new DateTimeOffset(DateTime.UtcNow).ToUnixTimeSeconds(); ;
var records = _redisClient.ZRangeByScore(QueueName, 0, score, 1);
if (records.Count() > 0) {
var item = JsonConvert.DeserializeObject<T>(records[0]);
await _redisClient.ZRemAsync(QueueName, item);
return item;
}
return null;
}
public bool IsEmpty()
{
var count = _redisClient.ZCount(QueueName, 0, decimal.MaxValue);
return count == 0;
}
}
好了,现在一切都顺利成章了,元素入队的时候计算出对应的时间戳,这个时间戳就是 ZSet
里的 score
属性,调用ZAdd()
即可;同理,元素出队,则是利用 ZRangeByScore()
返回从 0 到 当前时间戳内的一个元素,显然,如果当前时间戳大于或者等于该元素的时间戳,表示这个元素设定的延迟时间已经到了,此时,我们需要调用ZRem()
命令将其从集合中移除,和 Java
里面的 DelayQueue
类似,Redis
会按照 score
属性由小到大排序,这样时间早的会被先取出来,时间晚的会被后取出来,不得不说,这一切堪称完美,接下来就非常简单啦!
var redisZSetDelayQueue = _serviceProvider.GetService<ZSetDelayQueue<FooBar>>();
await redisZSetDelayQueue.Enqueue(new FooBar() { Foo = "001", Bar = "100" }, TimeSpan.FromMinutes(1));
await redisZSetDelayQueue.Enqueue(new FooBar() { Foo = "002", Bar = "200" }, TimeSpan.FromMinutes(2));
await redisZSetDelayQueue.Enqueue(new FooBar() { Foo = "003", Bar = "300" }, TimeSpan.FromMinutes(3));
while (!redisZSetDelayQueue.IsEmpty())
{
var item = await redisZSetDelayQueue.Dequeue();
if (item == null) {
continue;
} else {
_logger.LogInformation($"{DateTimeOffset.UtcNow}:Hello DelayQueue, {item.Foo}, {item.Bar}.");
}
}
这个就和一开始的例子非常接近了,对吧? 效果如何呢,我们一起来看看:
可以看到,三个任务分别在 1 分钟、2 分钟 和 3 分钟后执行,这个延迟队列,个人表示还行,哈哈!事实上,基于 Redis
的延迟队列,业界的方案还是蛮多的,个人比较推荐 有赞 技术团队的方案,感兴趣的朋友可以在本文的基础上做进一步的探究,我个人关注这个话题,是因为我不太喜欢定时任务轮询的做法,虽然这是一种万金油式的做法,我个人更喜欢下面的做法。
消息队列
OK,提到消息队列的话,参照面试八股文,我们会说,消息队列最主要的作用是削峰平谷,换句话说,消息队列可以将短时间内堆积的大量的请求任务“削峰”,然后“平摊”到平时请求任务较少的时段,所以,好像平时一提起 RabbitMQ 或者 Kafka 这样的东西,大家脑海中浮现出来的就是高并发、高吞吐、高性能这种类似糖尿病“三多一少”的存在,回顾我们一开始从生活中得到的启示,有没有一种可能,我们使用消息队列,并不单单是为了让这条消息被快速地消费,而是可以“让子弹飞一会儿”呢?我想,一切皆有可能。下面,我们以 RabbitMQ 为例,来展示如何实现一个延迟队列:
如图所示,假设消息发送方把消息投递到延迟交换机 default.delay.exchange
,该交换机绑定了延迟队列 default.delay.queue
,显然,正常情况下,消息会出现在这个延迟队列中。接下来,为了让死信机制生效,我们必须对这个延迟队列做一点设置,这里主要有三个参数,x-message-ttl
表示队列中消息的存活时间,x-dead-letter-exchange
表示消息过期以后再次投递时的死信交换器,x-dead-letter-routing-key
表示消息过期以后再次投递时的路由键名。通常情况下,在 RabbitMQ 中消息进入死信队列的前提有三种,即消息过期、队列已满和消息被拒绝。其中,x-max-length
和 x-max-length-bytes
这两个属性,可以分别用来指队列中的最大消息数、最大字节数、而消息被拒绝,则是指主动调用BasicReject()
方法,针对这两种情况触发的死信,我们这里可以不用太关心,因为我们显然考虑的是因为消息过期而触发的死信。OK,讲完了理论,我们来看看代码层面具体是如何实现的吧!
using (var channel = _connection.CreateModel())
{
// 普通/延迟交换机 default.delay.exchnage
var exchangeNormal = "default.delay.exchnage";
channel.ExchangeDeclare(exchangeNormal, "direct", true, false, null);
// 普通/延迟队列
var queueNormal = "default.delay.queue";
var arguments = new Dictionary<string, object>
{
["x-message-ttl"] = 5000,
["x-dead-letter-exchange"] = "default.deadletter.exchange",
["x-dead-letter-routing-key"] = "dead.routingKey"
};
channel.QueueDeclare(queue: queueNormal, true, false, false, arguments: arguments);
// 绑定交换器
channel.QueueBind(queueNormal, exchangeNormal, "normal.routingKey");
// 发送消息
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(jobData));
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
channel.BasicPublish(exchange: exchangeNormal, routingKey: "normal.routingKey", mandatory: true, basicProperties: properties, body: body);
}
简单来说,某一个队列如果需要死信队列,那么你就需要为其设置x-message-ttl
、x-dead-letter-exchange
、x-dead-letter-routing-key
这三个属性即可,你完全不用关心消息是如何投递到这个死信队列中,而对于消息的消费者来说,它只需要从这个死信队列中接收消息即可,因为能被投递到死信队列里的消息,一定是因为消息时间到了或者说过期了,这样就等于间接实现了延迟队列:
// 死信交换机 default.deadletter.exchange
var exchangeDead = "default.deadletter.exchange";
_consumerChannel.ExchangeDeclare(exchangeDead, "direct", true, false, null);
// 死信队列 default.deadletter.queue
var queueDead = "default.deadletter.queue";
_consumerChannel.QueueDeclare(queue: queueDead, true, false, false, null);
// 绑定交换器
_consumerChannel.QueueBind(queueDead, exchangeDead, "dead.routingKey");
// 消费消息
_basicConsumer = new EventingBasicConsumer(_consumerChannel);
_consumerChannel.BasicConsume(queue: queueDead, autoAck: false, consumer: _basicConsumer);
_basicConsumer.Received += (s, e) =>
{
var body = Encoding.UTF8.GetString(e.Body.ToArray());
// TODO:
_consumerChannel.BasicAck(e.DeliveryTag, false);
};
如下图所示,消息首先会被发送到延迟队列 default.delay.queue
中,此时,消息还没有过期,不会触发死信机制,注意到,这时候队列中会有 4 条消息:
一段时间后,消息过期,触发死信机制。此时,消息会被在再次转发到死信交换机 default.deadletter.exchange
中,并最终达到死信队列 default.deadletter.queue
:
至此,我们就利用 RabbitMQ 里的 TTL + DLX 特性实现了一个延迟队列,达到了延迟执行的目的。不过,只要你使用消息队列,就一定会遇到消息堆积的问题,而一旦发生消息堆积,延迟执行的这个时间可能就会不准,如果你特别看重这个时间准确与否,那么,实际运作中还有一部分工作完要做。我们目前用定时任务轮训的做法,最大的问题是它产生大量重复且无用的请求,每天单单是相关日志就上百兆,这就算是我下班以后的一点探索,我现在依然觉得,那个定时任务的 API 设计得莫名其妙。
最后,我们来说说 Kafka,虽然 Kafka 单机的 QPS 要远远超过 RabbitMQ 1 到 2 个数量级,但这种快是以牺牲一部分功能作为代价的,像典型的重试和死信,这两样儿都需要使用者自己去实现,比如死信,我们现在是为每个 topic 创建一个对应的死信的 topic 来实现的,比如,我们有一个 topic 叫做 orderInfo
,与之相对应地,我们会同时创建一个叫做 orderInfo_DLQ
的 topic,作为它的死信队列。当然,你还需要一个机制去收集和转发过期消息,基本上你还是需要一个 Timer 去做某种轮询,也许,是因为它选择了 Kafka,所以,需要一个定时任务系统来作为补充,毕竟,技术选型这种问题,注定是要政治正确的啦!
本文小结
《七种武器》是著名武侠小说家古龙先生的代表作之一,原本指长生剑、孔雀翎、碧玉刀、多情环、霸王枪、离别钩等七种精妙绝伦的武器,这里则是用来指实现延迟队列的各种方法,延迟队列适用于那些需要延迟执行的场合,在如今这样一个追求实时性、快节奏生活的时代,人们对快乐和满足的要求有实时和延时的区别,用罗翔老师的话来讲,即时快乐是一种低级的快乐,是一种短暂的、易得的快乐,从这个角度来看,延时满足则是一种需要培养和付出的高级快乐。此中优劣,我们不必去分个泾渭分明,就像这些不同的实现方式,更多的只是场景上的差异,而非功能上的差异,延迟队列可以认为是一种以时间作为权重的、有序的集合;Java 里的 DelayQueue
,.NET 里的 PriorityQueue
,可以实现进程内的、单机版的延迟队列;而像 Quartz
、Hangfire
这类任务调度系统,则可以更精确地控制时间;通过 Redis 里的发布-订阅、ZSet,我们让 DelayQueue 离分布式稍微接近了一点;而 RabbitMQ 里的 TTL + DLX 特性,则让博主比两年前更加理解死信队列……这难道不是一种延时满足吗?你以前不明白的概念,有一天突然有了新的认识,我想,这就是整个过程的意义所在。当然,时间轮算法对我来说还有一点点难,我能留到未来的某一天争取搞懂它吗?好了,以上就是这篇博客的全部内容啦,祝各位晚安,谢谢大家。