延迟队列设计

当订单⼀一直处于未⽀支付状态时,如何及时的关闭订单,并退还库存?
定时的爆款商品自动上下架、广告的定时更新如何做到?



注:文中架构设计以及实际开发为我司野菱大哥所设计,现入职网易考拉。本文不过是拾人牙慧,将其设计以文字的形式呈现出来而已。



目标:
可靠性:消息进⼊入到延迟队列列后, 保证⾄至少被消费⼀一次。



高可⽤用性:至少得⽀支持多实例例部署。挂掉一 个实例例后,还有后备实例例继续提供服务。



实时性:允许存在⼀一定的时间误 差,希望在秒级。



支持消息删除:业务使⽤用⽅方,可 以随时删除指定消息。

设计方案对比




  1. 轮训+DB
    原理:定时任务扫描扫描db。 优点:实现简单,保证了可靠性,高可用性,实时性。 缺点:很难实现到细粒度,否则数据库压力比较大,程序CPU很大浪费。




  2. RocketMQ
    原理:mq自带延迟消费。 优点:实现简单,保证了可靠性,高可用性,实时性,不需要轮训任务,MQ自带实现。 缺点:比较死的粒度,18个level,最大2小时。




  3. Redis键过期通知
    原理:Redis通过订阅过期的消息,做任务监听。 优点:高可用性,实时性。 缺点:大量键同一时间过期,对redis来说负载大;消息只会发送一次,没有确认机制,不能保证可靠性。




  4. 有赞的延时消息:Redis(zset)+轮训
    原理:Redis-Zset,无限循环扫描任务Bucket。 优点:高可用性,实时性,持久性。 缺点:独立线程的无限循环,CPU的浪费;如果单点任务多,是否会影响后面其他点的任务准时性;针对超长时间(30天以上)的延迟任务,如果继续用redis,想必是很浪费的。





消息结构
每个Job必须包含一下几个属性:



Topic:消息的Topic
Id:Job的唯一标识。用来检索和删除指定的Job信息。
Delay:Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)
tag(消息 标签
msgJson:的内容,供消费者做具体的业务处理,以json格式存储。
具体结构如下图表示:



消息状态转换
每个Job只会处于某一个状态下:



ready:可执行状态,等待消费。
delay:不可执行状态,等待时钟周期。
reserved:已被消费者读取,但还未得到消费者的响应(delete、finish)。
deleted:已被消费完成或者已被删除。
容错机制:
当前节点挂了如何确保任务依然能够无误的执行。



采用zookeeper做任务节点集群,当前节点的任务放入redis缓存,key采用『ip:pid』的设计,保证在节点挂掉的情况,有其他节点接手任务,保证了可靠性和可用性。
极端情况下,可能多发,需要业务方,保证幂等性。
优化:10分钟的业务任务,放入redis缓存List。
附录:
1:JDK Timer(使用案例如HashedWheelTime)
java.util.Timer是一个单线程的定时器,定时调度所拥有的TimerTask任务,TimerTask类是一个定时任务类,实现了Runnable接口,并且是一个抽象类,需要定时执行的任务都需要重写他的run方法



TaskQueue是一个由平衡二叉树堆实现的优先级队列,每个Timer对象内部都有一个TaskQueue队列,用户线程调用Timer的Schedule方法就是把TimerTask任务添加到TaskQueue队列,在调用schedule方法时,long delay参数用来指明该任务延迟多少时间执行。



TimerThread是具体执行任务的线程,他从TaskQueue队列里面获取优先级最高的任务进行执行,只有执行了当前的任务才会从队列里获取下一个任务,而不管队列是否有任务已经达到了设置的delay时间,一个Timer只有一个TimerThrea线程,因此内部实现为多生产者单消费者模型



1:构建一个定时器
public Timer() {
this(“Timer-“ + serialNumber());
}



/**
* Creates a new timer whose associated thread may be specified to
* {@linkplain Thread#setDaemon run as a daemon}.
* A daemon thread is called for if the timer will be used to
* schedule repeating "maintenance activities", which must be
* performed as long as the application is running, but should not
* prolong the lifetime of the application.
*
* @param isDaemon true if the associated thread should run as a daemon.
*/
public Timer(boolean isDaemon) {
this("Timer-" + serialNumber(), isDaemon);
}

/**
* Creates a new timer whose associated thread has the specified name.
* The associated thread does <i>not</i>
* {@linkplain Thread#setDaemon run as a daemon}.
*
* @param name the name of the associated thread
* @throws NullPointerException if {@code name} is null
* @since 1.5
*/
public Timer(String name) {
thread.setName(name);
thread.start();
}

/**
* Creates a new timer whose associated thread has the specified name,
* and may be specified to
* {@linkplain Thread#setDaemon run as a daemon}.
*
* @param name the name of the associated thread
* @param isDaemon true if the associated thread should run as a daemon
* @throws NullPointerException if {@code name} is null
* @since 1.5
*/
public Timer(String name, boolean isDaemon) {
thread.setName(name);
thread.setDaemon(isDaemon);
thread.start();
} 2:任务调度的几种方法


/**
* Schedules the specified task for execution at the specified time. If
* the time is in the past, the task is scheduled for immediate execution.
*
* @param task task to be scheduled.
* @param time time at which task is to be executed.
* @throws IllegalArgumentException if time.getTime() is negative.
* @throws IllegalStateException if task was already scheduled or
* cancelled, timer was cancelled, or timer thread terminated.
* @throws NullPointerException if {@code task} or {@code time} is null
*/
public void schedule(TimerTask task, Date time) {
sched(task, time.getTime(), 0);
}



/**
* Schedules the specified task for repeated <i>fixed-delay execution</i>,
* beginning after the specified delay. Subsequent executions take place
* at approximately regular intervals separated by the specified period.
*
* <p>In fixed-delay execution, each execution is scheduled relative to
* the actual execution time of the previous execution. If an execution
* is delayed for any reason (such as garbage collection or other
* background activity), subsequent executions will be delayed as well.
* In the long run, the frequency of execution will generally be slightly
* lower than the reciprocal of the specified period (assuming the system
* clock underlying <tt>Object.wait(long)</tt> is accurate).
*
* <p>Fixed-delay execution is appropriate for recurring activities
* that require "smoothness." In other words, it is appropriate for
* activities where it is more important to keep the frequency accurate
* in the short run than in the long run. This includes most animation
* tasks, such as blinking a cursor at regular intervals. It also includes
* tasks wherein regular activity is performed in response to human
* input, such as automatically repeating a character as long as a key
* is held down.
*
* @param task task to be scheduled.
* @param delay delay in milliseconds before task is to be executed.
* @param period time in milliseconds between successive task executions.
* @throws IllegalArgumentException if {@code delay < 0}, or
* {@code delay + System.currentTimeMillis() < 0}, or
* {@code period <= 0}
* @throws IllegalStateException if task was already scheduled or
* cancelled, timer was cancelled, or timer thread terminated.
* @throws NullPointerException if {@code task} is null
*/
public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, -period);
}

/**
* Schedules the specified task for repeated <i>fixed-delay execution</i>,
* beginning at the specified time. Subsequent executions take place at
* approximately regular intervals, separated by the specified period.
*
* <p>In fixed-delay execution, each execution is scheduled relative to
* the actual execution time of the previous execution. If an execution
* is delayed for any reason (such as garbage collection or other
* background activity), subsequent executions will be delayed as well.
* In the long run, the frequency of execution will generally be slightly
* lower than the reciprocal of the specified period (assuming the system
* clock underlying <tt>Object.wait(long)</tt> is accurate). As a
* consequence of the above, if the scheduled first time is in the past,
* it is scheduled for immediate execution.
*
* <p>Fixed-delay execution is appropriate for recurring activities
* that require "smoothness." In other words, it is appropriate for
* activities where it is more important to keep the frequency accurate
* in the short run than in the long run. This includes most animation
* tasks, such as blinking a cursor at regular intervals. It also includes
* tasks wherein regular activity is performed in response to human
* input, such as automatically repeating a character as long as a key
* is held down.
*
* @param task task to be scheduled.
* @param firstTime First time at which task is to be executed.
* @param period time in milliseconds between successive task executions.
* @throws IllegalArgumentException if {@code firstTime.getTime() < 0}, or
* {@code period <= 0}
* @throws IllegalStateException if task was already scheduled or
* cancelled, timer was cancelled, or timer thread terminated.
* @throws NullPointerException if {@code task} or {@code firstTime} is null
*/
public void schedule(TimerTask task, Date firstTime, long period) {
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, firstTime.getTime(), -period);
}

/**
* Schedules the specified task for repeated <i>fixed-rate execution</i>,
* beginning after the specified delay. Subsequent executions take place
* at approximately regular intervals, separated by the specified period.
*
* <p>In fixed-rate execution, each execution is scheduled relative to the
* scheduled execution time of the initial execution. If an execution is
* delayed for any reason (such as garbage collection or other background
* activity), two or more executions will occur in rapid succession to
* "catch up." In the long run, the frequency of execution will be
* exactly the reciprocal of the specified period (assuming the system
* clock underlying <tt>Object.wait(long)</tt> is accurate).
*
* <p>Fixed-rate execution is appropriate for recurring activities that
* are sensitive to <i>absolute</i> time, such as ringing a chime every
* hour on the hour, or running scheduled maintenance every day at a
* particular time. It is also appropriate for recurring activities
* where the total time to perform a fixed number of executions is
* important, such as a countdown timer that ticks once every second for
* ten seconds. Finally, fixed-rate execution is appropriate for
* scheduling multiple repeating timer tasks that must remain synchronized
* with respect to one another.
*
* @param task task to be scheduled.
* @param delay delay in milliseconds before task is to be executed.
* @param period time in milliseconds between successive task executions.
* @throws IllegalArgumentException if {@code delay < 0}, or
* {@code delay + System.currentTimeMillis() < 0}, or
* {@code period <= 0}
* @throws IllegalStateException if task was already scheduled or
* cancelled, timer was cancelled, or timer thread terminated.
* @throws NullPointerException if {@code task} is null
*/
public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, period);
}

/**
* Schedules the specified task for repeated <i>fixed-rate execution</i>,
* beginning at the specified time. Subsequent executions take place at
* approximately regular intervals, separated by the specified period.
*
* <p>In fixed-rate execution, each execution is scheduled relative to the
* scheduled execution time of the initial execution. If an execution is
* delayed for any reason (such as garbage collection or other background
* activity), two or more executions will occur in rapid succession to
* "catch up." In the long run, the frequency of execution will be
* exactly the reciprocal of the specified period (assuming the system
* clock underlying <tt>Object.wait(long)</tt> is accurate). As a
* consequence of the above, if the scheduled first time is in the past,
* then any "missed" executions will be scheduled for immediate "catch up"
* execution.
*
* <p>Fixed-rate execution is appropriate for recurring activities that
* are sensitive to <i>absolute</i> time, such as ringing a chime every
* hour on the hour, or running scheduled maintenance every day at a
* particular time. It is also appropriate for recurring activities
* where the total time to perform a fixed number of executions is
* important, such as a countdown timer that ticks once every second for
* ten seconds. Finally, fixed-rate execution is appropriate for
* scheduling multiple repeating timer tasks that must remain synchronized
* with respect to one another.
*
* @param task task to be scheduled.
* @param firstTime First time at which task is to be executed.
* @param period time in milliseconds between successive task executions.
* @throws IllegalArgumentException if {@code firstTime.getTime() < 0} or
* {@code period <= 0}
* @throws IllegalStateException if task was already scheduled or
* cancelled, timer was cancelled, or timer thread terminated.
* @throws NullPointerException if {@code task} or {@code firstTime} is null
*/
public void scheduleAtFixedRate(TimerTask task, Date firstTime,
long period) {
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, firstTime.getTime(), period);
} 3:schedul()与scheduleAtFixedRate() 两者功能相似,但schedule()更注重保持时间间隔的稳定,保证每隔period的时间可调用一次。而scheduleAtFixRate更注重保持执行频率的稳定,保证多次调用的时间趋近于period的时间,如果执行任务的时间大于period的时间,就会在任务执行完马上执行下一次任务。


4:Timer类的缺陷
时间不准确延迟:由于Timer在执行任务的时候是单线程的,如果存在多个任务,某个任务耗时过长,就会导致任务时间延迟,即使是周期执行的,也会影响下一个周期。



异常终止:如果抛出未捕获的异常,那么就会导致Timer线程终止,还会终止所有的任务,而且Timer不会重新恢复线程后再执行。



执行周期任务依赖系统时间,由于Timer执行周期依赖于系统时间,所以如果当前系统时间出现了变化,则会出现一些执行上的变化。



2:scheduledExecutor(在Sentinel中大量使用,如熔断以及warmup模式的限流)
基于线程池设计的SceduledExecutor,就是改成了多线程的方式,任务并发执行,相互之间不会受到干扰。



具体实现类为ScheduledThreadPoolExecutor,这是一个ExecutorService线程池,除了具有线程池的基本功能,还具有定时以及周期执行任务的功能,常用作定时任务。
核心方法



/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}



/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
} 两者的一个对比 1:ScheduledExecutor可以解决Timer的不准确延迟问题,因为内部是线程池实现的,可以并发执行,互相之间不受影响


2:由于是ScheduledExecutor并发执行,可以起到很好的线程隔离,即使产生异常,也不会导致其他任务受到影响。



3:ScheduledExecutor执行的周期任务,如果在执行过程中发生异常,则会终止,不会影响其他任务,而且不会周期性执行。所以,使用过程中尽可能的捕获一切异常。



应用场景
创建订单 10 分钟之后自动支付
订单超时取消
…….等等…
实现方式
最简单的方式,定时扫表;例如每分钟扫表一次十分钟之后未支付的订单进行主动支付 ;
优点: 简单
缺点: 每分钟全局扫表,浪费资源,有一分钟延迟



使用 RabbitMq 实现 RabbitMq 实现延迟队列
优点: 开源,现成的稳定的实现方案;
缺点: RabbitMq 是一个消息中间件;延迟队列只是其中一个小功能,如果团队技术栈中本来就是使用 RabbitMq 那还好,如果不是,那为了使用延迟队列而去部署一套 RabbitMq 成本有点大;



使用 Java 中的延迟队列,DelayQueue
优点: java.util.concurrent 包下一个延迟队列,简单易用;拿来即用
缺点: 单机、不能持久化、宕机任务丢失等等;



基于 Redis 自研延迟队列
既然上面没有很好的解决方案,因为 Redis 的 zset、list 的特性,我们可以利用 Redis 来实现一个延迟队列 RedisDelayQueue



设计目标
实时性: 允许存在一定时间内的秒级误差
高可用性:支持单机,支持集群
支持消息删除:业务费随时删除指定消息
消息可靠性: 保证至少被消费一次
消息持久化: 基于 Redis 自身的持久化特性,上面的消息可靠性基于 Redis 的持久化,所以如果 Redis 数据丢失,意味着延迟消息的丢失,不过可以做主备和集群保证;
数据结构
Redis_Delay_Table: 是一个 Hash_Table 结构;里面存储了所有的延迟队列的信息;KV 结构;K=TOPIC:ID V=CONENT; V 由客户端传入的数据,消费的时候回传;
RD_ZSET_BUCKET: 延迟队列的有序集合; 存放 member=TOPIC:ID 和 score= 执行时间戳; 根据时间戳排序;
RD_LIST_TOPIC: list 结构; 每个 Topic 一个 list;list 存放的都是当前需要被消费的延迟 Job;
设计图



738 x 439
2365 x 1407



任务的生命周期
新增一个Job,会在Redis_Delay_Table中插入一条数据,记录了业务消费方的 数据结构; RD_ZSET_BUCKET 也会插入一条数据,记录了执行时间戳;
搬运线程会去RD_ZSET_BUCKET中查找哪些执行时间戳runTimeMillis比现在的时间小;将这些记录全部删除;同时会解析出来每个任务的Topic是什么,然后将这些任务push到Topic对应的列表RD_LIST_TOPIC中;
每个 Topic 的 List 都会有一个监听线程去批量获取 List 中的待消费数据;获取到的数据全部扔给这个Topic的消费线程池
消息线程池执行会去 Redis_Delay_Table 查找数据结构,返回给回调接口,执行回调方法;
以上所有操作,都是基于 Lua 脚本做的操作,Lua 脚本执行的优点在于,批量命令执行具有原子性,事务性, 并且降低了网络开销,毕竟只有一次网络开销;



搬运线程操作流程图



571 x 577
920 x 929



设计细节
搬运操作
1.搬运操作的时机



为了避免频繁的执行搬运操作, 我们基于 wait(time)/notify 的方式来通知执行搬运操作;



738 x 614
1065 x 886



我们用一个AtomicLong nextTime 来保存下一次将要搬运的时间;服务启动的时候nextTime=0;所以肯定比当前时间小,那么就会先去执行一次搬运操作,然后返回搬运操作之后的ZSET的表头时间戳,这个时间戳就是下一次将要执行的时间戳, 把这个时间戳赋值给 nextTime; 如果表中没有元素了则将nextTime=Long.MaxValue ;因为 while 循环,下一次又会跟当前时间对比;如果nextTime比当前时间大,则说明需要等待; 那么我们wait(nextTime-System.currentTimeMills()); 等到时间到了之后,再次去判断一下,就会比当前时间小,就会执行一次搬运操作;



那么当有新增延迟任务 Job 的时间怎么办,这个时候又会将当前新增 Job 的执行时间戳跟nextTime做个对比;如果小的话就重新赋值;
重新赋值之后,还是调用一下 notifyAll() 通知一下搬运线程;让他重新去判断一下 新的时间是否比当前时间小;如果还是大的话,那么就继续wait(nextTime-System.currentTimeMills()); 但是这个时候wait的时间又会变小;更精准;



2.一次搬运操作的最大数量
Redis 的执行速度非常快,在一个 Lua 里面循环遍历 1000 个 10000 个根本没差; 而且是在 Lua 里面操作,就只有一次网络开销;一次操作多少个元素根本就不会是问题;



搬运操作的防护机制
1.每分钟唤醒定时线程



在消费方多实例部署的情况下, 如果某一台机器挂掉了,但是这台机器的 nextTime 是最小的,就在一分钟之后( 新增 job 的时候落到这台机器,刚好时间戳很小), 其他机器可能是 1 个小时之后执行搬运操作; 如果这台机器立马重启,那么还会立马执行一次搬运操作;万一他没有重启;那可能就会很久之后才会搬运;
所以我们需要一种防护手段来应对这种极端情况;
比如每分钟将 nextTime=0;并且唤醒 wait;
那么就会至少每分钟会执行一次搬运操作! 这是可以接受的



LrangeAndLTrim 批量获取且删除待消费任务
1.执行时机以及如何防止频繁请求 Redis
这是一个守护线程,循环去做这样的操作,把拿到的数据给线程池去消费;
但是也不能一直不停的去执行操作,如果 list 已经没有数据了去操作也没有任何意义,不然就太浪费资源了,幸好 List 中有一个BLPOP阻塞原语,如果 list 中有数据就会立马返回,如果没有数据就会一直阻塞在那里,直到有数据返回,可以设置阻塞的超时时间,超时会返回 NULL;
第一次去获取 N 个待消费的任务扔进到消费线程池中;如果获取到了 0 个,那么我们就立马用BLPOP来阻塞,等有元素的时候 BLPOP 就返回数据了,下次就可以尝试去LrangeAndLTrim一次了。 通过BLPOP阻塞,我们避免了频繁的去请求 redis,并且更重要的是提高了实时性;



2.批量获取的数量和消费线程池的阻塞队列



执行上面的一次获取 N 个元素是不定的,这个要看线程池的 maxPoolSize 最大线程数量; 因为避免消费的任务过多而放入线程池的阻塞队列, 放入阻塞队列有宕机丢失任务的风险,关机重启的时候还要讲阻塞队列中的任务重新放入 List 中增加了复杂性;



所以我们每次LrangeAndLTrim获取的元素不能大于当前线程池可用的线程数; 这样的一个控制可用用信号量Semaphore来做



Codis 集群对 BLPOP 的影响
如果 Redis 集群用了 codis 方案或者 Twemproxy 方案; 他们不支持 BLPOP 的命令;
codis 不支持的命令集合
那么就不能利用 BLPOP 来防止频繁请求 redis;那么退而求其次改成每秒执行一次 LrangeAndLTrim 操作;



集群对 Lua 的影响
Lua 脚本的执行只能在单机器上, 集群的环境下如果想要执行 Lua 脚本不出错,那么 Lua 脚本中的所有 key 必须落在同一台机器;
为了支持集群操作 Lua,我们利用 hashtag; 用{}把三个 jey 的关键词包起来;
{projectName}:Redis_Delay_Table
{projectName}:Redis_Delay_Table
{projectName}:RD_LIST_TOPIC
那么所有的数据就会在同一台机器上了



重试机制
消费者回调接口如果抛出异常了,或者执行超时了,那么会将这个 Job 重新放入到 RD_LIST_TOPIC 中等待被下一次消费;默认重试 2 次;可以设置不重试;



超时机制
超时机制的主要思路都一样,就是监听一个线程的执行时间超过设定值之后抛出异常打断方法的执行;



这是使用的方式是 利用 Callable 接口实现异步超时处理



public class TimeoutUtil {



/**执行用户回调接口的 线程池;    计算回调接口的超时时间           **/
private static ExecutorService executorService = Executors.newCachedThreadPool();

/**
* 有超时时间的方法
* @param timeout 时间秒
* @return
*/
public static void timeoutMethod(long timeout, Function function) throws InterruptedException, ExecutionException, TimeoutException {
FutureTask futureTask = new FutureTask(()->(function.apply("")));
executorService.execute(futureTask);
//new Thread(futureTask).start();
try {
futureTask.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//e.printStackTrace();
futureTask.cancel(true);
throw e;
}

} }


这种方式有一点不好就是太费线程了,相当于线程使用翻了一倍;但是相比其他的方式,这种算是更好一点的



优雅停机
在 Jvm 那里注册一个 Runtime.getRuntime().addShutdownHook(Runnable)停机回调接口;在这里面做好善后工作;



关闭异步 AddJob 线程池
关闭每分钟唤醒线程
关闭搬运线程 while(!stop)的形式
关闭所有的 topic 监听线程 while(!stop)的形式
关闭关闭所有 topic 的消费线程 ;先调用 shutdown;再 executor.awaitTermination(20, TimeUnit.SECONDS);检查是否还有剩余的线程任务没有执行完; 如果还没有执行完则等待执行完;最多等待 20 秒之后强制调用 shutdownNow 强制关闭;
关闭重试线程 while(!stop)的形式
关闭 异常未消费 Job 重入 List 线程池
优雅停止线程一般是用下面的方式
① 、 while(!stop)的形式 用标识位来停止线程
② .先 调用 executor.shutdown(); 阻止接受新的任务;然后等待当前正在执行的任务执行完; 如果有阻塞则需要调用 executor.shutdownNow()强制结束;所以要给一个等待时间;



/**
* shutdownNow 终止线程的方法是通过调用Thread.interrupt()方法来实现的
* 如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。
* 上面的情况中断之后还是可以再执行finally里面的方法的;
* 但是如果是其他的情况 finally是不会被执行的
* @param executor
*/
public static void closeExecutor(ExecutorService executor, String executorName) {
try {
//新的任务不进队列
executor.shutdown();
//给10秒钟没有停止完强行停止;
if(!executor.awaitTermination(20, TimeUnit.SECONDS)) {
logger.warn(“线程池: {},{}没有在20秒内关闭,则进行强制关闭”,executorName,executor);
List droppedTasks = executor.shutdownNow();
logger.warn("线程池: {},{} 被强行关闭,阻塞队列中将有{}个将不会被执行.", executorName,executor,droppedTasks.size() );
}
logger.info("线程池:{},{} 已经关闭...",executorName,executor);
} catch (InterruptedException e) {
logger.info("线程池:{},{} 打断...",executorName,executor);
}
}
BLPOP 阻塞的情况如何优雅停止监听 Redis 的线程



如果不是在codis集群的环境下,BLPOP 是可以很方便的阻塞线程的;但是停机的时候可能会有点问题;



假如正在关机,当前线程正在BLPOP阻塞, 那关机线程等我们 20 秒执行, 刚好在倒数 1 秒的时候BLPOP获取到了数据,丢给消费线程去消费;如果消费线程 1 秒执行不完,那么 20 秒倒计时到了,强制关机,那么这个任务就会被丢失了; 怎么解决这个问题呢?



① . 不用BLPOP, 每次都 sleep 一秒去调用LrangeAndLTrim操作;
② .关机的时候杀掉 Redis 的 blpop 客户端; 杀掉之后 BLPOP 会立马返回 null; 进入下一个循环体;



不足
因为 Redis 的持久化特性,做不到消息完全不丢失,如果要保证完成不丢失,Redis 的持久化刷盘策略要收紧
因为 Codis 不能使用 BLPOP 这种阻塞的形式,在获取消费任务的时候用了每秒一次去获取,有点浪费性能;
支持消费者多实例部署,但是可能存在不能均匀的分配到每台机器上去消费;
虽然支持 Redis 集群,但是其实是伪集群,因为 Lua 脚本的原因,让他们都只能落在一台机器上;
总结
实时性
正常情况下 消费的时间误差不超过 1 秒钟; 极端情况下,一台实例宕机,另外的实例 nextTime 很迟; 那么最大误差是 1 分钟; 真正的误差来自于业务方的接口的消费速度



QPS
完全视业务方的消费速度而定; 延迟队列不是瓶颈



目前可以考虑使用rabbitmq来满足需求 但是不打算使用,因为目前太多的业务使用了另外的MQ中间件。



开发前需要考虑的问题?



及时性 消费端能按时收到
同一时间消息的消费权重
可靠性 消息不能出现没有被消费掉的情况
可恢复 假如有其他情况 导致消息系统不可用了 至少能保证数据可以恢复
可撤回 因为是延迟消息 没有到执行时间的消息支持可以取消消费
高可用 多实例 这里指HA/主备模式并不是多实例同时一起工作
消费端如何消费



当然初步选用redis作为数据缓存的主要原因是因为redis自身支持zset的数据结构(score 延迟时间毫秒) 这样就少了排序的烦恼而且性能还很高,正好我们的需求就是按时间维度去判定执行的顺序 同时也支持map list数据结构。



简单定义一个消息数据结构



private String topic;/topic**/
private String id;/
自动生成 全局惟一 snowflake/
private String bizKey;
private long delay;/
延时毫秒数/
private int priority;//优先级
private long ttl;/
消费端消费的ttl/
private String body;/
消息体**/
private long createTime=System.currentTimeMillis();
private int status= Status.WaitPut.ordinal();
运行原理:



用Map来存储元数据。id作为key,整个消息结构序列化(json/…)之后作为value,放入元消息池中。
将id放入其中(有N个)一个zset有序列表中,以createTime+delay+priority作为score。修改状态为正在延迟中
使用timer实时监控zset有序列表中top 10的数据 。 如果数据score<=当前时间毫秒就取出来,根据topic重新放入一个新的可消费列表(list)中,在zset中删除已经取出来的数据,并修改状态为待消费
客户端获取数据只需要从可消费队列中获取就可以了。并且状态必须为待消费 运行时间需要<=当前时间的 如果不满足 重新放入zset列表中,修改状态为正在延迟。如果满足修改状态为已消费。或者直接删除元数据。
客户端
因为涉及到不同程序语言的问题,所以当前默认支持http访问方式。



添加延时消息添加成功之后返回消费唯一ID POST /push {…..消息体}
删除延时消息 需要传递消息ID GET /delete?id=
恢复延时消息 GET /reStore?expire=true|false expire是否恢复已过期未执行的消息。
恢复单个延时消息 需要传递消息ID GET /reStore/id
获取消息 需要长连接 GET /get/topic
用nginx暴露服务,配置为轮询 在添加延迟消息的时候就可以流量平均分配。



目前系统中客户端并没有采用HTTP长连接的方式来消费消息,而是采用MQ的方式来消费数据这样客户端就可以不用关心延迟消息队列。只需要在发送MQ的时候拦截一下 如果是延迟消息就用延迟消息系统处理。



消息可恢复
实现恢复的原理 正常情况下一般都是记录日志,比如mysql的binlog等。



这里我们直接采用mysql数据库作为记录日志。



目前打算创建以下2张表:



消息表 字段包括整个消息体
消息流转表 字段包括消息ID、变更状态、变更时间、zset扫描线程Name、host/ip
定义zset扫描线程Name是为了更清楚的看到消息被分发到具体哪个zset中。前提是zset的key和监控zset的线程名称要有点关系 这里也可以是zset key。



举个栗子



假如redis服务器宕机了,重启之后发现数据也没有了。所以这个恢复是很有必要的,只需要从表1也就是消息表中把消息状态不等于已消费的数据全部重新分发到延迟队列中去,然后同步一下状态就可以了。



当然恢复单个任务也可以这么干。



关于高可用
分布式协调还是选用zookeeper吧。



如果有多个实例最多同时只能有1个实例工作 这样就避免了分布式竞争锁带来的坏处,当然如果业务需要多个实例同时工作也是支持的,也就是一个消息最多只能有1个实例处理,可以选用zookeeper或者redis就能实现分布式锁了。



最终做了一下测试多实例同时运行,可能因为会涉及到锁的问题性能有所下降,反而单机效果很好。所以比较推荐基于docker的主备部署模式。



扩展
支持zset队列个数可配置 避免大数据带来高延迟的问题。



目前存在日志和redis元数据有可能不一致的问题 如mysql挂了,写日志不会成功。



https://gudaoxuri.gitbook.io/microservices-architecture/wei-fu-wu-hua-zhi-ji-shu-jia-gou/delay-queue



延时任务有别于定式任务,定式任务往往是固定周期的,有明确的触发时间。而延时任务一般没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件。也就是说,任务事件生成时并不想让消费者立即拿到,而是延迟一定时间后才接收到该事件进行消费。



延迟任务相关的业务场景如下:



场景一:在订单系统中,一个用户某个时刻下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将自动进行过期处理。



场景二:用户某个时刻通过手机远程遥控家里的智能设备在指定的时间进行工作。这时就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到只能设备。



下面我们来探讨一些方案,其实这些方案没有好坏之分,和系统架构一样,只有最适合。对于数据量较小的情况下,任意一种方案都可行,考虑的是简单明了和开发速度,尽量避免把系统搞复杂了。而对于数据量较大的情况下,就需要有一些选择,并不是所有的方案都适合了。



解决方式
定时器轮询遍历数据库记录
JDK的DelayQueue
JDK ScheduledExecutorService
时间轮(netty)
利用quartz等定时任务
Redis的ZSet实现
rabbitMq实现延时队列
定时器轮询遍历数据库记录
这是比较常见的一种方式,所有的订单或者所有的命令一般都会存储在数据库中。我们会起一个线程定时去扫数据库或者一个数据库定时Job,找到那些超时的数据,直接更新状态,或者拿出来执行一些操作。这种方式很简单,不会引入其他的技术,开发周期短。



如果数据量比较大,千万级甚至更多,插入频率很高的话,上面的方式在性能上会出现一些问题,查找和更新对会占用很多时间,轮询频率高的话甚至会影响数据入库。一种可以尝试的方式就是使用类似TBSchedule或Elastic-Job这样的分布式的任务调度加上数据分片功能,把需要判断的数据分到不同的机器上执行。



如果数据量进一步增大,那扫数据库肯定就不行了。另一方面,对于订单这类数据,我们也许会遇到分库分表,那上述方案就会变得过于复杂,得不偿失。



JDK的DelayQueue
Java中的DelayQueue位于java.util.concurrent包下,作为单机实现,它很好的实现了延迟一段时间后触发事件的需求。由于是线程安全的它可以有多个消费者和多个生产者,从而在某些情况下可以提升性能。DelayQueue本质是封装了一个PriorityQueue,使之线程安全,加上Delay功能,也就是说,消费者线程只能在队列中的消息“过期”之后才能返回数据获取到消息,不然只能获取到null。



之所以要用到PriorityQueue,主要是需要排序。也许后插入的消息需要比队列中的其他消息提前触发,那么这个后插入的消息就需要最先被消费者获取,这就需要排序功能。PriorityQueue内部使用最小堆来实现排序队列。队首的,最先被消费者拿到的就是最小的那个。使用最小堆让队列在数据量较大的时候比较有优势。使用最小堆来实现优先级队列主要是因为最小堆在插入和获取时,时间复杂度相对都比较好,都是O(logN)。



下面例子实现了未来某个时间要触发的消息。我把这些消息放在DelayQueue中,当消息的触发时间到,消费者就能拿到消息,并且消费,实现处理方法。示例代码:



/*




  • 定义放在延迟队列中的对象,需要实现Delayed接口
    */
    public class DelayedTask implements Delayed {



    private int _expireInSecond = 0;



    public DelayedTask(int delaySecond) {
    Calendar cal = Calendar.getInstance();
    cal.add(Calendar.SECOND, delaySecond);
    _expireInSecond = (int) (cal.getTimeInMillis() / 1000);
    }



    public int compareTo(Delayed o) {
    long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS));
    return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }



    public long getDelay(TimeUnit unit) {
    // TODO Auto-generated method stub



     Calendar cal = Calendar.getInstance();
    return _expireInSecond - (cal.getTimeInMillis() / 1000); }




}
下面定义了三个延迟任务,分别是10秒,5秒和15秒。依次入队列,期望5秒钟后,5秒的消息先被获取到,然后每个5秒钟,依次获取到10秒数据和15秒的那个数据。



public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub



    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

//定义延迟队列
DelayQueue<DelayedTask> delayQueue = new DelayQueue<DelayedTask>();

//定义三个延迟任务
DelayedTask task1 = new DelayedTask(10);
DelayedTask task2 = new DelayedTask(5);
DelayedTask task3 = new DelayedTask(15);

delayQueue.add(task1);
delayQueue.add(task2);
delayQueue.add(task3);

System.out.println(sdf.format(new Date()) + " start");

while (delayQueue.size() != 0) {

//如果没到时间,该方法会返回
DelayedTask task = delayQueue.poll();

if (task != null) {
Date now = new Date();
System.out.println(sdf.format(now));
}

Thread.sleep(1000);
}
} DelayQueue是一种很好的实现方式,虽然是单机,但是可以多线程生产和消费,提高效率。拿到消息后也可以使用异步线程去执行下一步的任务。如果有分布式的需求可以使用Redis来实现消息的分发,如果对消息的可靠性有非常高的要求可以使用消息中间件.


JDK ScheduledExecutorService
JDK自带的一种线程池,它能调度一些命令在一段时间之后执行,或者周期性的执行。文章开头的一些业务场景主要使用第一种方式,即,在一段时间之后执行某个操作。代码例子如下:



public static void main(String[] args) {
// TODO Auto-generated method stub
ScheduledExecutorService executor = Executors.newScheduledThreadPool(100);



    for (int i = 10; i > 0; i--) {
executor.schedule(new Runnable() {

public void run() {
// TODO Auto-generated method stub
System.out.println(
"Work start, thread id:" + Thread.currentThread().getId() + " " + sdf.format(new Date()));
}

}, i, TimeUnit.SECONDS);
}
} ScheduledExecutorService的实现类ScheduledThreadPoolExecutor提供了一种并行处理的模型,简化了线程的调度。DelayedWorkQueue是类似DelayQueue的实现,也是基于最小堆的、线程安全的数据结构,所以会有上例排序后输出的结果。


ScheduledExecutorService比上面一种DelayQueue更加实用。因为,一般来说,使用DelayQueue获取消息后触发事件都会实用多线程的方式执行,以保证其他事件能准时进行。而ScheduledThreadPoolExecutor就是对这个过程进行了封装,让大家更加方便的使用。同时在加强了部分功能,比如定时触发命令。



时间轮
时间轮是一种非常惊艳的数据结构。其在Linux内核中使用广泛,是Linux内核定时器的实现方法和基础之一。按使用场景,大致可以分为两种时间轮:原始时间轮和分层时间轮。分层时间轮是原始时间轮的升级版本,来应对时间“槽”数量比较大的情况,对内存和精度都有很高要求的情况。我们延迟任务的场景一般只需要用到原始时间轮就可以了。



原始时间轮:如下图一个轮子,有8个“槽”,可以代表未来的一个时间。如果以秒为单位,中间的指针每隔一秒钟转动到新的“槽”上面,就好像手表一样。如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈。位置是在2圈之后的5上面(20 % 8 + 1)。这个圈数需要记录在槽中的数据结构里面。这个数据结构最重要的是两个指针,一个是触发任务的函数指针,另外一个是触发的总第几圈数。时间轮可以用简单的数组或者是环形链表来实现。



相比DelayQueue的数据结构,时间轮在算法复杂度上有一定优势。DelayQueue由于涉及到排序,需要调堆,插入和移除的复杂度是O(lgn),而时间轮在插入和移除的复杂度都是O(1)。



时间轮比较好的开源实现是Netty的



// 创建Timer, 精度为100毫秒,
HashedWheelTimer timer = new HashedWheelTimer();



    System.out.println(sdf.format(new Date()));

MyTask task1 = new MyTask();
MyTask task2 = new MyTask();
MyTask task3 = new MyTask();

timer.newTimeout(task1, 5, TimeUnit.SECONDS);
timer.newTimeout(task2, 10, TimeUnit.SECONDS);
timer.newTimeout(task3, 15, TimeUnit.SECONDS);

// 阻塞main线程
try {
System.in.read();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} 其中HashedWheelTimer有多个构造函数。其中:


ThreadFactory :创建线程的类,默认Executors.defaultThreadFactory()。



TickDuration:多少时间指针顺时针转一格,单位由下面一个参数提供。



TimeUnit:上一个参数的时间单位。



TicksPerWheel:时间轮上的格子数。



如果一个任务要在120s后执行,时间轮是默认参数的话,那么这个任务在时间轮上需要经过



120000ms / (512 * 100ms) = 2轮



120000ms % (512 * 100ms) = 176格。



在使用HashedWheelTimer的过程中,延迟任务的实现最好使用异步的,因为如果HashedWheelTimer的任务管理和执行都在一个线程里面,任务会耗时,那么指针就会延迟,导致整个任务就会延迟。



Quartz
quartz是一个企业级的开源的任务调度框架,quartz内部使用TreeSet来保存Trigger,如下图。Java中的TreeSet是使用TreeMap实现,TreeMap是一个红黑树实现。红黑树的插入和删除复杂度都是logN。和最小堆相比各有千秋。最小堆插入比红黑树快,删除顶层节点比红黑树慢。
相比上述的三种轻量级的实现功能丰富很多。有专门的任务调度线程,和任务执行线程池。quartz功能强大,主要是用来执行周期性的任务,当然也可以用来实现延迟任务。但是如果只是实现一个简单的基于内存的延时任务的话,quartz就稍显庞大。



Redis ZSet
Redis中的ZSet是一个有序的Set,内部使用HashMap和跳表(SkipList)来保证数据的存储和有序,HashMap里放的是成员到score的映射,而跳跃表里存放的是所有的成员,排序依据是HashMap里存的score,使用跳跃表的结构可以获得比较高的查找效率,并且在实现上比较简单。



public class ZSetTest {



private JedisPool jedisPool = null;
// Redis服务器IP
private String ADDR = "10.23.22.42";
// Redis的端口号
private int PORT = 6379;

private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public void intJedis() {
jedisPool = new JedisPool(ADDR, PORT);
}

public static void main(String[] args) {
// TODO Auto-generated method stub

ZSetTest zsetTest = new ZSetTest();
zsetTest.intJedis();

zsetTest.addItem();
zsetTest.getItem();

zsetTest.deleteZSet();
}

public void deleteZSet() {
Jedis jedis = jedisPool.getResource();
jedis.del("zset_test");
}

public void addItem() {
Jedis jedis = jedisPool.getResource();

Calendar cal1 = Calendar.getInstance();
cal1.add(Calendar.SECOND, 10);
int second10later = (int) (cal1.getTimeInMillis() / 1000);

Calendar cal2 = Calendar.getInstance();
cal2.add(Calendar.SECOND, 20);
int second20later = (int) (cal2.getTimeInMillis() / 1000);

Calendar cal3 = Calendar.getInstance();
cal3.add(Calendar.SECOND, 30);
int second30later = (int) (cal3.getTimeInMillis() / 1000);

Calendar cal4 = Calendar.getInstance();
cal4.add(Calendar.SECOND, 40);
int second40later = (int) (cal4.getTimeInMillis() / 1000);

Calendar cal5 = Calendar.getInstance();
cal5.add(Calendar.SECOND, 50);
int second50later = (int) (cal5.getTimeInMillis() / 1000);

jedis.zadd("zset_test", second50later, "e");
jedis.zadd("zset_test", second10later, "a");
jedis.zadd("zset_test", second30later, "c");
jedis.zadd("zset_test", second20later, "b");
jedis.zadd("zset_test", second40later, "d");

System.out.println(sdf.format(new Date()) + " add finished.");
}

public void getItem() {

Jedis jedis = jedisPool.getResource();

while (true) {
try {

Set<Tuple> set = jedis.zrangeWithScores("zset_test", 0, 0);

String value = ((Tuple) set.toArray()[0]).getElement();
int score = (int) ((Tuple) set.toArray()[0]).getScore();

Calendar cal = Calendar.getInstance();
int nowSecond = (int) (cal.getTimeInMillis() / 1000);

if (nowSecond >= score) {
jedis.zrem("zset_test", value);
System.out.println(sdf.format(new Date()) + " removed value:" + value);
}

if (jedis.zcard("zset_test") <= 0)
{
System.out.println(sdf.format(new Date()) + " zset empty ");
return;
}
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}


}
在用作延迟任务的时候,可以在添加数据的时候,使用zadd把score写成未来某个时刻的unix时间戳。消费者使用zrangeWithScores获取优先级最高的(最早开始的的)任务。注意,zrangeWithScores并不是取出来,只是看一下并不删除,类似于Queue的peek方法。程序对最早的这个消息进行验证,是否到达要运行的时间,如果是则执行,然后删除zset中的数据。如果不是,则继续等待。



由于zrangeWithScores 和 zrem是先后使用,所以有可能有并发问题,即两个线程或者两个进程都会拿到一样的一样的数据,然后重复执行,最后又都会删除。如果是单机多线程执行,或者分布式环境下,可以使用Redis事务,也可以使用由Redis实现的分布式锁,或者使用下例中Redis Script。你可以在Redis官方的Transaction章节找到事务的相关内容。



使用Redis的好处主要是:



解耦:把任务、任务发起者、任务执行者的三者分开,逻辑更加清晰,程序强壮性提升,有利于任务发起者和执行者各自迭代,适合多人协作。



异常恢复:由于使用Redis作为消息通道,消息都存储在Redis中。如果发送程序或者任务处理程序挂了,重启之后,还有重新处理数据的可能性。



分布式:如果数据量较大,程序执行时间比较长,我们可以针对任务发起者和任务执行者进行分布式部署。特别注意任务的执行者,也就是Redis的接收方需要考虑分布式锁的问题。



RabbitMQ TTL和DXL
AMQP和RabbitMQ本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。
但是我们可以通过RabbitMQ的两个特性来曲线实现延迟队列:



Time To Live(TTL)
RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
RabbitMQ针对队列中的消息过期时间有两种方法可以设置。



A: 通过队列属性设置,队列中所有消息都有相同的过期时间。
B: 对消息进行单独设置,每条消息TTL可以不同。
如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter
Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由。



x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange



x-dead-letter-routing-key:指定routing-key发送



队列出现dead letter的情况有:



消息或者队列的TTL过期
队列达到最大长度
消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false
利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。这时候消息就可以重新被消费



消息队列使用介绍:
        在很多场景下我都有用到消息队列,有的使用Kafka,有的用RabbitMQ等都能满足消息的生产和消费,但是延迟队列呢?怎么设计,但能配合代码+数据库+MQ也能实现,只是逻辑繁琐一点。



 



需求背景:
         大家都用过支付宝|微信,都有对应的支付通知功能,且是有频次控制的哦,(像支付宝:如果商户反馈给支付宝的字符不是success这7个字符,支付宝服务器会不断重发通知,直到超过24小时22分钟。一般情况下,25小时以内完成8次通知(通知的间隔频率一般是:4m,10m,10m,1h,2h,6h,15h),这种场景就可以设计对应的延迟队列来处理了)



 



Redis设计延迟队列:
         Redis的有序集合天然支持这种场景,



         $redis->zAdd(‘notify_msg_queue’, ‘1553053550’, ‘1553053550-1234567891-0’);//值:当前时间戳-订单号-通知次数



         $redis->zRangeByScore(‘notify_msg_queue’, 0, ‘1553053550’);//排序因子用时间戳,这样能去除之前到现在的所有数据



         $redis->zRem(‘notify_msg_queue’, ‘1553053550-1234567891-0’);//指定删除对应的值



 



         设计原理:队列名字为:notify_msg_queue,因子为当前时间戳,值为【当前时间戳-订单号-通知次数】,获取的时候



         如果通过zRangeByScore拿到对应的值能后处理值,根据 “-”符号分割值,如果时间戳 <= ,能后根据订单好通知业务方



         的通知接口,如果通知成功则zRem删除对应值从消息队列中移除,如果通知是吧删除原来的值,重新赋新值入消息队列:历史时间戳+4分钟(若都是败就依次增加:4m,10m,10m,1h,2h,6h,15h,知道15h后)



http://silence.work/2018/12/16/RocketMQ-%E5%BB%B6%E8%BF%9F%E6%B6%88%E6%81%AF%E7%9A%84%E4%BD%BF%E7%94%A8%E4%B8%8E%E5%8E%9F%E7%90%86%E5%88%86%E6%9E%90/



什么是定时消息和延迟消息?
定时消息:Producer 将消息发送到 MQ 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息。
延迟消息:Producer 将消息发送到 MQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。
定时消息与延迟消息在代码配置上存在一些差异,但是最终达到的效果相同:消息在发送到 MQ 服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者。



目前业界MQ对定时消息和延迟消息的支持情况



上图是阿里云上对业界MQ功能的对比,其中开源产品中只有阿里的RocketMQ支持延迟消息,且是固定的18个Level。



固定Level的含义是延迟是特定级别的,比如支持3秒、5秒的Level,那么用户只能发送3秒延迟或者5秒延迟,不能发送8秒延迟的消息。



消息队列RocketMQ的阿里云版本(收费版本)才支持到精确到秒级别的延迟消息(没有特定Level的限制)。



上图是CMQ中对MQ功能的对比,其中标明腾讯的CMQ支持延迟消息,但是没有具体写明支持到什么精度,支持任意时间还是特定的Level。



通过腾讯云上CMQ的API文档可以看到有一个秒级别的delaySeconds,应该是支持任意级别的延迟,即和收费版本的RocketMQ一致。



总结



开源版本中,只有RocketMQ支持延迟消息,且只支持18个特定级别的延迟
付费版本中,阿里云和腾讯云上的MQ产品都支持精度为秒级别的延迟消息
(真是有钱能使鬼推磨啊,有钱就能发任意延迟的消息了,没钱最多只能发特定Level了)



任意延迟的消息难点在哪里?
开源版本没有支持任意延迟的消息,我想可能有以下几个原因:



任意延迟的消息的需求不强烈
可能是一个比较有技术含量的点,不愿意开源
需求不强



对支持任意延迟的需求确实不强,因为:



延迟并不是MQ场景的核心功能,业务单独做一个替代方案的成本不大
业务上一般对延迟的需求都是固定的,比如下单后半小时check是否付款,发货后7天check是否收货
在我司,MQ上线一年多后才有业务方希望我能支持延迟消息,且不要求任意延迟,只要求和RocketMQ开源版本一致,支持一些业务上的级别即可。



不愿意开源



为了差异化(好在云上卖钱),只能降开源版本的功能进行阉割,所以开源版本的RocketMQ变成了只支持特定Level的延迟。



难点在哪里?



既然业务有需求,我们肯定也要去支持。



首先,我们先划清楚定义和边界:



在我们的系统范围内,支持任意延迟的消息指的是:



精度支持到秒级别



最大支持30天的延迟



本着对自己的高要求,我们并不满足于开源RocketMQ的18个Level的方案。那么,如果我们自己要去实现一个支持任意延迟的消息队列,难点在哪里呢?



排序
消息存储
首先,支持任意延迟意味着消息是需要在服务端进行排序的。



比如用户先发了一条延迟1分钟的消息,一秒后发了一条延迟3秒的消息,显然延迟3秒的消息需要先被投递出去。那么服务端在收到消息后需要对消息进行排序后再投递出去。



在MQ中,为了保证可靠性,消息是需要落盘的,且对性能和延迟的要求,决定了在服务端对消息进行排序是完全不可接受的。



其次,目前MQ的方案中都是基于WAL的方式实现的(RocketMQ、Kafka),日志文件会被过期删除,一般会保留最近一段时间的数据。



支持任意级别的延迟,那么需要保存最近30天的消息。



阿里内部 1000+ 核心应用使用,每天流转几千亿条消息,经过双11交易、商品等核心链路真实场景的验证,稳定可靠。



考虑一下一天几千亿的消息,保存30天的话需要堆多少服务器,显然是无法做到的。



知己知彼
虽然决定自己做,但是依旧需要先了解开源的实现,那么就只能看看RocketMQ开源版本中,支持18个Level是怎么实现的,希望能从中得到一些灵感。



上图是通过RocketMQ源码分析后简化一个实现原理方案示意图。



分为两个部分:



消息的写入
消息的Schedule
消息写入中:



在写入CommitLog之前,如果是延迟消息,替换掉消息的Topic和queueId(被替换为延迟消息特定的Topic,queueId则为延迟级别对应的id)
消息写入CommitLog之后,提交dispatchRequest到DispatchService
因为在第①步中Topic和QueueId被替换了,所以写入的ConsumeQueue实际上非真正消息应该所属的ConsumeQueue,而是写入到ScheduledConsumeQueue中(这个特定的Queue存放不会被消费)
Schedule过程中:



给每个Level设置定时器,从ScheduledConsumeQueue中读取信息
如果ScheduledConsumeQueue中的元素已近到时,那么从CommitLog中读取消息内容,恢复成正常的消息内容写入CommitLog
写入CommitLog后提交dispatchRequest给DispatchService
因为在写入CommitLog前已经恢复了Topic等属性,所以此时DispatchService会将消息投递到正确的ConsumeQueue中
回顾一下这个方案,最大的优点就是没有了排序:



先发一条level是5s的消息,再发一条level是3s的消息,因为他们会属于不同的ScheduleQueue所以投递顺序能保持正确
如果先后发两条level相同的消息,那么他们的处于同一个ConsumeQueue且保持发送顺序
因为level数固定,每个level的有自己独立的定时器,开销也不会很大
ScheduledConsumeQueue其实是一个普通的ConsumeQueue,所以可靠性等都可以按照原系统的M-S结构等得到保障
但是这个方案也有一些问题:



固定了Level,不够灵活,最多只能支持18个Level
业务是会变的,但是Level需要提前划分,不支持修改
如果要支持30天的延迟,CommitLog的量会很大,这块怎么处理没有看到



站在巨人的肩膀上
总结RocketMQ的方案,通过划分Level的方式,将排序操作转换为了O(1)的ConsumeQueue 的append操作。



我们去支持任意延迟的消息,必然也需要通过类似的方式避免掉排序。



此时我们想到了TimeWheel:《Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility 》



Netty中也是用TimeWheel来优化I/O超时的操作。



TimeWheel
TimeWheel的大致原理如下:



箭头按照一定方向固定频率移动(如手表指针),每一次跳动称为一个tick。ticksPerWheel表示一个定时轮上的tick数。
如每次tick为1秒,ticksPerWheel为60,那么这就和现实中的秒针走动完全一致。



TimeWheel应用到延迟消息中
无论定时消息还是延迟消息,最终都是投递后延迟一段时间对用户可见。



假设这个延迟时间为X秒,那么X%(ticksPerWheel * tick)可以计算出X所属的TimeWheel中位置。



这里存在一个问题,以上图为例,TimeWheel的size为8,那么延迟1秒和9秒的消息都处在一个链表中。如果用户先发了延迟9秒的消息再发了延迟1秒的消息,他们在一个链表中所以延迟1秒的消息会需要等待延迟9秒的消息先投递。显然这是不能接受的,那么如何解决这个问题?



排序



显然,如果对TimeWheel一个tick中的元素进行排序显然就解决了上面的问题。但是显而易见的是排序是不可能的。



扩大时间轮



最直观的方式,我们能不能通过扩大时间轮的方式避免延迟9和延迟1落到一个tick位置上?



假设支持30天,精度为1秒,那么ticksPerWheel=30 * 24 * 60 * 60,这样每一个tick上的延迟都是一致的,不存在上述的问题(类似于将RocketMQ的Level提升到了30 * 24 * 60 * 60个)。但是TimeWheel需要被加载到内存操作,这显然是无法接受的。



多级时间轮



单个TimeWheel无法支持,那么能否显示中的时针、分针的形式,构建多级时间轮来解决呢?



多级时间轮解决了上述的问题,但是又引入了新的问题:



在整点(tick指向0的位置)需要加载大量的数据会导致延迟,比如第二个时间轮到整点需要加载未来一天的数据
时间轮需要载入到内存,这个开销是不可接受的
延迟加载



多级定时轮的问题在于需要加载大量数据到内存,那么能否优化一下将这里的数据延迟加载到内存来解决内存开销的问题呢?



在多级定时轮的方案中,显然对于未来一小时或者未来一天的数据可以不加载到内存,而可以只加载延迟时间临近的消息。



进一步优化,可以将数据按照固定延迟间隔划分,那么每次加载的数据量是大致相同的,不会出tick约大的定时轮需要加载越多的数据,那么方案如下:



基于上述的方案,那么TimeWheel中存储未来30分钟需要投递的消息的索引,索引为一个long型,那么数据量为:30 * 60 * 8 * TPS,相对来说内存开销是可以接受的,比如TPS为1w那么大概开销为200M+。



之后的数据按照每30分钟一个块的形式写入文件,那么每个整点时的操作就是计算一下将30分钟的消息Hash到对应的TimeWheel上,那么排序问题就解决了。



到此为止就只剩下一个问题,如何保存30天的数据?



CommitLog保存超长延迟的数据
CommitLog是有时效性的,比如在我们只保存最近7天的消息,过期数据将被删除。对于延迟消息,可能需要30天之后投递,显然是不能被删除的。



那么我们怎么保存延迟消息呢?



直观的方法就是将延迟消息从CommitLog中剥离出来,独立存储以保存更长的时间。



通过DispatchService将WAL中的延迟消息写入到独立的文件中。这些文件按照延迟时间组成一个链表。



链表长度为最大延迟时间/每个文件保存的时间长度。



那么WAL可以按照正常的策略进行过期删除,Delay Msg File则在一个文件投递完之后进行删除。



唯一的问题是这里会有Delay Msg File带来的随机写问题,但是这个对系统整体性能不会有很大影响,在可接受范围内。



BOUNS
结合TimeWheel和CommitLog保存超长延迟数据的方案,加上一些优化手段,基本就完成了支持任意延迟时间的方案:



消息写入WAL
Dispatcher处理延迟消息
延迟消息一定时间的直接写入TimeWheel
延迟超过一定时间写入DelayMessageStorage
DelayMessageStorage对DelayMsgFile构建一层索引,这样在映射到TimeWheel时只需要做一次Hash操作
通过TimeWheel将消息投递到ConsumeQueue中完成对Consumer的可见
通过这个方案解决了最初提出来的任意延迟消息的两个难点:



消息的排序问题
超长延迟消息的存储问题



https://www.cnblogs.com/hzmark/p/mq-delay-msg.html


Category web