redis zset 延迟队列

Redis中的zset主要支持以下命令:



zadd、zincrby zrem、zremrangebyrank、zremrangebyscore、zremrangebyrank zrange、zrevrange、zrangebyscore、zrevrangebyscore、zrangebylex、zrevrangebylex zcount、zcard、zscore、zrank、zrevrank zunionstore、zinterstore
zset的源码主要涉及redis.h和t_zset.c两个文件。

Redis中的zset在实现时用到了跳跃表skiplist这种数据结构。skiplist是一种基于并联链表的、随机化的数据结构,由 William Pugh 在论文《Skip lists: a probabilistic alternative to balanced trees》中首次提出,可以实现平均复杂度为O(longN)的插入、删除和查找操作。
1.1、跳跃表的存储结构
Redis中的跳跃表实现和William Pugh在《Skip Lists: A Probabilistic Alternative to Balanced Trees》一文中描述的跳跃表基本一致,主要有以下三点进行了修改:



Redis中的跳跃表允许有重复的分值score,以支持有序集合中多个元素可以有相同的分值score。 节点的比较操作不仅仅比较其分值score,同时还要比较其关联的元素值value。 每个节点还有一个后退指针(相当于双向链表中的prev指针),通个该指针,我们可以从表尾向表头遍历列表。这个属性可以实现zset的一些逆向操作命令如zrevrange。



有序集合zset主要有两种编码方式:REDIS_ENCODING_ZIPLIST和REDIS_ENCODING_SKIPLIST。ziplist可以表示较小的有序集合, skiplist表示任意大小的有序集合。



前面我们介绍List数据类型时,List以ziplist作为默认编码。但在zset中则采取不同的策略,zset会根据zadd命令添加的第一个元素的长度大小来选择创建编码方式。具体而言:如果满足下面两个条件之一则使用ziplist编码方式



队列设计
目前可以考虑使用rabbitmq来满足需求 但是不打算使用,因为目前太多的业务使用了另外的MQ中间件。



开发前需要考虑的问题?



及时性 消费端能按时收到
同一时间消息的消费权重
可靠性 消息不能出现没有被消费掉的情况
可恢复 假如有其他情况 导致消息系统不可用了 至少能保证数据可以恢复
可撤回 因为是延迟消息 没有到执行时间的消息支持可以取消消费
高可用 多实例 这里指HA/主备模式并不是多实例同时一起工作
消费端如何消费



当然初步选用redis作为数据缓存的主要原因是因为redis自身支持zset的数据结构(score 延迟时间毫秒) 这样就少了排序的烦恼而且性能还很高,正好我们的需求就是按时间维度去判定执行的顺序 同时也支持map list数据结构。



运行原理:



用Map来存储元数据。id作为key,整个消息结构序列化(json/…)之后作为value,放入元消息池中。
将id放入其中(有N个)一个zset有序列表中,以createTime+delay+priority作为score。修改状态为正在延迟中
使用timer实时监控zset有序列表中top 10的数据 。 如果数据score<=当前时间毫秒就取出来,根据topic重新放入一个新的可消费列表(list)中,在zset中删除已经取出来的数据,并修改状态为待消费
客户端获取数据只需要从可消费队列中获取就可以了。并且状态必须为待消费 运行时间需要<=当前时间的 如果不满足 重新放入zset列表中,修改状态为正在延迟。如果满足修改状态为已消费。或者直接删除元数据。
客户端
因为涉及到不同程序语言的问题,所以当前默认支持http访问方式。



添加延时消息添加成功之后返回消费唯一ID POST /push {…..消息体}
删除延时消息 需要传递消息ID GET /delete?id=
恢复延时消息 GET /reStore?expire=true|false expire是否恢复已过期未执行的消息。
恢复单个延时消息 需要传递消息ID GET /reStore/id
获取消息 需要长连接 GET /get/topic
用nginx暴露服务,配置为轮询 在添加延迟消息的时候就可以流量平均分配。



目前系统中客户端并没有采用HTTP长连接的方式来消费消息,而是采用MQ的方式来消费数据这样客户端就可以不用关心延迟消息队列。只需要在发送MQ的时候拦截一下 如果是延迟消息就用延迟消息系统处理。



消息可恢复
实现恢复的原理 正常情况下一般都是记录日志,比如mysql的binlog等。



这里我们直接采用mysql数据库作为记录日志。



目前打算创建以下2张表:



消息表 字段包括整个消息体
消息流转表 字段包括消息ID、变更状态、变更时间、zset扫描线程Name、host/ip
定义zset扫描线程Name是为了更清楚的看到消息被分发到具体哪个zset中。前提是zset的key和监控zset的线程名称要有点关系 这里也可以是zset key



1.Java中java.util.concurrent.DelayQueue
优点:JDK自身实现,使用方便,量小适用
缺点:队列消息处于jvm内存,不支持分布式运行和消息持久化
2.Rocketmq延时队列
优点:消息持久化,分布式
缺点:不支持任意时间精度,只支持特定level的延时消息
3.Rabbitmq延时队列(TTL+DLX实现)
优点:消息持久化,分布式
缺点:延时相同的消息必须扔在同一个队列



根据自身业务和公司情况,如果实现一个自己的延时队列服务需要考虑一下几点:




  • 消息存储

  • 过期延时消息实时获取

  • 高可用性
    设计主要包含以下几点:



将整个Redis当做消息池,以kv形式存储消息
使用ZSET做优先队列,按照score维持优先级
使用LIST结构,以先进先出的方式消费
zset和list存储消息地址(对应消息池的每个key)
自定义路由对象,存储zset和list名称,以点对点的方式将消息从zset路由到正确的list
使用定时器维持路由
根据TTL规则实现消息延迟



Category storage