[总结]如何实现延时触发/定时器
1. 问题
- 微信公众平台后台有一个功能即定时群发消息,如明晚的20:00群发一条图文消息。那么这种延时触发的逻辑如何实现呢?
- 滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价为5星。
2. 方案一
每隔一定的时间扫描所有超时的事件
这是最容易想到的一种方案。此方案最关键的两点是轮训的频率以及如何高效地获取超时任务。
- 如果可以允许一秒左右的误差,每隔一秒轮训一次即可。
- 采用红黑树或者最小堆存储触发任务,按照触发时间戳排序。如此,每次扫描能够很快地获取超时的任务。实践中,一个很简单的方案就是使用Redis的SortedSet存储触发任务,这样只需要使用zrangeByScore获取超时的任务,再使用zremrangeByScore即可删除已经触发的任务。
此种方案的缺点在于即使频率到达一秒,也可能会有一秒的误差。此外,轮训的方式在很多情况下并没有可触发的任务,会浪费资源。
2-1.
2-2. 思路
启动一个cron定时任务,每小时跑一次,将完成时间超过48小时的订单取出,置为5星,并把评价状态置为已评价。
假设订单表的结构为:t_order(oid, finish_time, stars, status, …),更具体的,定时任务每隔一个小时会这么做一次:
如果数据量很大,需要分页查询,分页update,这将会是一个for循环。
方案的不足:
(1)轮询效率比较低
(2)每次扫库,已经被执行过记录,仍然会被扫描(只是不会出现在结果集中),有重复计算的嫌疑
(3)时效性不够好,如果每小时轮询一次,最差的情况下,时间误差会达到1小时
(4)如果通过增加cron轮询频率来减少(3)中的时间误差,(1)中轮询低效和(2)中重复计算的问题会进一步凸显
3. 方案二
阻塞线程等待时间超时
此方案思路来自于Nginx中定时器的实现(和Java中的DelayQueue原理类似)。任务的存储和上面的方案类似,采用最小堆或者红黑树即可。然后选择最近要被触发的任务的时间距离作为阻塞调用epoll_wait的超时(也可以使用其他可以设置超时的阻塞调用)。
阻塞超时后,依次获取最小触发时间戳的任务,超时则执行。
此种方案的最大优点在于不会有空的任务检查周期。
4. 方案三
采用环形队列
此方案详细可以见58沈剑的文章《1分钟实现“延迟消息”功能》。大体的思路如下:
采用环形队列,3600个slot,每隔1秒扫描一个slot,检查当前slot里面的所有任务,检查其cycleNum是否为0, 为0则触发,否则cycleNum-1。添加定时事件时,根据扫描指针的当前slot的index和事件触发的时间,计算cycleNum和要放入的slot。
此种方案的本质是栅格化与预计算,相比起前两种方案,大大提升了每次获取可触发任务的效率。但同样存在每次查询任务有可能做无用功的问题。此外,需要特别处理添加任务和扫描任务的临界点的问题,否则也可能会有时间上的误差。
这个方案原理是和时间轮(Netty中的HashedWheelTimer)一样的.
4-1. 思路
高效延时消息,包含两个重要的数据结构:
(1)环形队列,例如可以创建一个包含3600个slot的环形队列(本质是个数组)
(2)任务集合,环上每一个slot是一个Set<Task>
同时,启动一个timer,这个timer每隔1s,在上述环形队列中移动一格,有一个Current Index指针来标识正在检测的slot。
Task结构中有两个很重要的属性:
(1)Cycle-Num:当Current Index第几圈扫描到这个Slot时,执行任务
(2)Task-Function:需要执行的任务指针
假设当前Current Index指向第一格,当有延时消息到达之后,例如希望3610秒之后,触发一个延时消息任务,只需:
(1)计算这个Task应该放在哪一个slot,现在指向1,3610秒之后,应该是第11格,所以这个Task应该放在第11个slot的Set<Task>中
(2)计算这个Task的Cycle-Num,由于环形队列是3600格(每秒移动一格,正好1小时),这个任务是3610秒后执行,所以应该绕3610/3600=1圈之后再执行,于是Cycle-Num=1
Current Index不停的移动,每秒移动到一个新slot,这个slot中对应的Set<Task>,每个Task看Cycle-Num是不是0:
(1)如果不是0,说明还需要多移动几圈,将Cycle-Num减1
(2)如果是0,说明马上要执行这个Task了,取出Task-Funciton执行(可以用单独的线程来执行Task),并把这个Task从Set<Task>中删除
使用了“延时消息”方案之后,“订单48小时后关闭评价”的需求,只需将在订单关闭时,触发一个48小时之后的延时消息即可:
(1)无需再轮询全部订单,效率高
(2)一个订单,任务只执行一次
(3)时效性好,精确到秒(控制timer移动频率可以控制精度)
还有相同算法的另一个例子:
例如:58到家APP实时消息通道系统,对每个用户会维护一个APP到服务器的TCP连接,用来实时收发消息,对这个TCP连接,有这样一个需求:“如果连续30s没有请求包(例如登录,消息,keepalive包),服务端就要将这个用户的状态置为离线”。
1)30s超时,就创建一个index从0到30的环形队列(本质是个数组)
2)环上每一个slot是一个Set<uid>,任务集合
3)同时还有一个Map<uid, index>,记录uid落在环上的哪个slot里
同时:
1)启动一个timer,每隔1s,在上述环形队列中移动一格,0->1->2->3…->29->30->0…
2)有一个Current Index指针来标识刚检测过的slot
当有某用户uid有请求包到达时:
1)从Map结构中,查找出这个uid存储在哪一个slot里
2)从这个slot的Set结构中,删除这个uid
3)将uid重新加入到新的slot中,具体是哪一个slot呢 => Current Index指针所指向的上一个slot,因为这个slot,会被timer在30s之后扫描到
(4)更新Map,这个uid对应slot的index值
哪些元素会被超时掉呢?
Current Index每秒种移动一个slot,这个slot对应的Set<uid>中所有uid都应该被集体超时!如果最近30s有请求包来到,一定被放到Current Index的前一个slot了,Current Index所在的slot对应Set中所有元素,都是最近30s没有请求包来到的。
所以,当没有超时时,Current Index扫到的每一个slot的Set中应该都没有元素。
优势:
(1)只需要1个timer
(2)timer每1s只需要一次触发,消耗CPU很低
(3)批量超时,Current Index扫到的slot,Set中所有元素都应该被超时掉
三、总结
这个环形队列法是一个通用的方法,Set和Map中可以是任何task,本文的uid是一个最简单的举例。
更多参考:
https://github.com/ifesdjeen/hashed-wheel-timer
netty源码解读之时间轮算法实现-HashedWheelTimer
5. 方案四
延时消息队列
目前,RabbitMQ、RocketMQ都支持延时消息队列,直接使用即可。但这种依赖消息队列的方案,如果要取消定时任务,则无法实现。
其中,RabbitMQ的实现思路是基于TTL的,详细可见: 使用RabbitMQ实现延迟任务
6. 方案五
6-1. Redis
考虑前提:Redisson延时队列,代码redis已经封装好,可以直接拿来用。redisson.getBlockingQueue() 和 Redission.getDelayQueue()
大致原理思路:https://zhuanlan.zhihu.com/p/343811173
三个核心集合结构:
延时队列:数据入队的队列
目标 blocking 队列 :到期数据待consume
timeoutSet 过期时间zset:分数值为timeout,辅佐判断元素是否过期。
实现 Timer :
运用了 redis 的 sub/pub 功能,当有数据put的时候,先把它放到一个zset集合,同时发布订阅的key,发布内容为数据到期的timeout,此时客户端开启了一个延时任务(HashedWheelTimer),到了时间,从zset分页取出到期了的数据,放入 blocking 队列中。
缺点:
- 采用 sub/pub 机制的时候,可能会造成多个客户端同时开启一个时间段的延时任务,重复执行,也会有并发的安全问题,因为涉及的要数据加入阻塞队列,和将当前数据从zset移除操作。
- 默认是数据量小的时候比较稳定,数据量一大就需要构建 cluster模式,这一块需要自己开发
6-2. 基于Redisson方案进行改造思路
有赞的延时队列
https://tech.youzan.com/queuing_delay/
6-3.
6-4. 注意
一般实现的方法有几种:
- 使用 RocketMQ、RabbitMQ、Pulsar 等消息队列的延时投递功能
- 使用 Redisson 提供的 DelayedQueue
有一些方案虽然广为流传但存在着致命缺陷,不要用来实现延时任务:
- 使用 Redis 的过期监听
- 使用 RabbitMQ 的死信队列
- 使用非持久化的时间轮
Redisson DelayQueue 是一种基于 Redis Zset 结构的延时队列实现。DelayQueue 中有一个名为 timeoutSetName 的有序集合,其中元素的 score 为投递时间戳。
7. 总结
排序 | 找到到期job | |
---|---|---|
RocektMQ | 指定level,类似桶排序 | for 循环 |
HashedWheelTimer | 数组,桶排序 | for 循环 |
kafka 时间轮 | 堆排序,PriorityQueue | 底层api实现,Condition.awaitNanos()-> parkNanos() |
Redisson 延时队列 | Zset 跳表实现 | 先是 sub/pub 订阅功能,客户端到期从zset中拿数据,用的是 HashedWheelTimer |
基于有赞延时队列 | Zset 跳表实现 | for循环遍历,开启多个线程,每个bucket一个线程 |