如何执行一个延迟任务?

背景

前段时间做到临时版本管理系统的项目中有这样一个需求:

最新发布的版本发布记录状态为未处理,用户手动确认后状态修改为已处理。超过七天后状态为未处理的记录状态自动修改为过期提醒,同时向用户发送一封邮件。

“修改状态为过期提醒” 的任务就是一个延迟任务,如何执行这样一个任务呢?思考了两种方案:

轮询

启动一个定时任务,每隔一天执行一次,执行的内容如下:

  1. select id from release where status = 2 and release_time > 7days

  2. update release set status = 3 where id in (…)

  3. 向用户发送过期通知邮件(可以交给专门的线程去做)

在代码中,我们可以使用ScheduledThreadPoolExecutor 实现定时任务。ScheduledThreadPoolExecutor的核心使用了DelayQueue这样一个数据结构。DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。由于PriorityQueue内部使用最小堆来实现排序队列,其时间复杂度为O(logN)。

这样的方案是我们第一时间可以想到的,也是实现起来最简单的方案。那么,他有什么不足呢?

  1. 查询语句返回的数据量可能会很大,可能需要分页查询

  2. 查询语句的执行可能需要全表扫描

  3. 时效性不好,最大的发送邮件时间误差可以达到一天

环形队列

参考Netty的HashedWheelTimer

需要实现两个数据结构:

  • 环形队列

创建一个长度为24的队列(本质上是数组)

环形队列
  • 任务集合

队列中的每个元素是一个任务集合Set

然后,启动一个定时任务,每隔一个小时,在环形队列中移动一格,使用一个currentIndex指向队列当前的元素。

Task有两个属性:

  1. cycleNum:当currentIndex第几圈扫描到这个元素时,执行任务

  2. runnable:需要执行的任务,任务内容取出对应id的版本发布记录,查询其状态如果是未处理,则更新其状态为过期提醒并发送通知邮件

设现在currentIndex指向第一格,此时发布了一条版本发布记录,希望在7天之后,触发一个延迟任务:

  1. 计算这个Task应该放在哪个元素的集合当中

    现在指向1,7天之后(7*24),应该放在第一个元素的Set当中

  2. 计算这个Task的cycleNum

    环形队列是24格,每隔一小时currentIndex移动一次,即移动一圈是24小时,所以应该是 (7*24)/24 = 7 圈后再执行

currentIndex每隔一个小时移动一格,每移动一次,就取出当前位置的Set,观察每一个Task的cycleNum:

  1. 如果不是0,说明还需要再移动几圈才可执行,将cycleNum减一

  2. 如果是0,说明这个Task可以马上执行,取出这个Task中的runnable执行(可以交给别的线程执行),将此Task从Set中删除

使用环形队列之后,每次发布一条版本发布记录时,在环形队列中插入一个Task即可,7天之后,这个Task将会被触发:

  1. 不需要全表扫描,只需查询对应id的记录即可(主键索引)

  2. 时效性好,精确到一个小时。可以通过修改环形队列的大小和控制currentIndex的移动频率来控制精度。

  3. 操作环形队列时间复杂度为O(1)

使用环形队列的缺点是增加了复杂度,如果数据量不大,对性能要求不高的场景下,使用DelayQueue即可。

参考

1分钟实现“延迟消息”功能

推荐文章