redis-cli pipe模式

Mysql百万数据量级数据快速导入Redis



随着系统的运行,数据量变得越来越大,单纯的将数据存储在mysql中,已然不能满足查询要求了,此时我们引入Redis作为查询的缓存层,将业务中的热数据保存到Redis,扩展传统关系型数据库的服务能力,用户通过应用
直接从Redis中快速获取常用数据,或者在交互式应用中使用Redis保存活跃用户的会话,都可以极大地降低后端关系型数据库的负载,提升用户体验

传统命令的缺点
使用传统的redis client命令在大数据量的导入场景下存在如下缺陷:



由于redis是单线程模型,虽然避免了多线程下线程切换所耗费的时间,单一顺序的执行命令也很快,但是在大批量数据导入的场景下,发送命令所花费的时间和接收服务器响应结果耗费的时间就会被放大。



假如需要导入100万条数据,那光是命令执行时间,就需要花费100万*(t1 + t2)。



file



除了逐条命令发送,当然redis设计肯定也会考虑这个问题,所以出现了pipelining管道模式。



file



但是pipelining在命令行中是没有的,使得我们又需要编写新的处理代码,来接收批量的响应。但是只有很少很少的客户端代码支持,比如php-redis的扩展就不支持异步。



pipelining管道模式,其实就是减少了TCP连接的交互时间,当一批命令执行完毕后,一次性发送结果。



其实现原理是采用FIFO(先进先出)的队列来保证数据的顺序性。



只有一小部分客户端支持非阻塞I/O,并不是所有的客户端都能够以一种有效的方式解析应答,以最大化吞吐量。



由于这些原因,将庞大数据导入到Redis的首选方法是生成一个包含Redis协议数据格式,批量的发送过去。



数据导入Redis热身
采用nc命令导入数据
nc是netcat的简写,nc的作用有:



(1)实现任意TCP/UDP端口的侦听,增加-l参数后,nc可以作为server以TCP或UDP方式侦听指定端口



(2)端口的扫描,nc可以作为client发起TCP或UDP连接



(3)机器之间传输文件



(4)机器之间网络测速



file



file



采用pipe模式导入数据
然而,使用nc监听并不是一个非常可靠的方式来执行大规模的数据导入,因为netcat并不真正知道何时传输了所有数据,也无法检查错误。在2.6或更高版本的Redis中,Redis -cli脚本支持一种称为pipe管道模式的新模式,这种模式是为了执行大规模插入而设计的。
使用管道模式的命令运行如下:



file



由上图,可以看到pipe命令的返回结果,txt文件中有多少行命令,返回的replies数就是多少,
errors表示其中执行错误的命令条数。



redis协议学习
协议的格式为:



*<参数数量> \r\n
$<参数 1 的字节数量> \r\n
<参数 1 的数据> \r\n
...
$<参数 N="" 的字节数量=""> \r\n


<参数 N="" 的数据=""> \r\n
比如:
插入一条hash类型的数据。

HSET id book1 book_description1
根据Redis协议,总共有4个部分,所以开头为*4,其余内容解释如下:

内容 长度 协议命令
HSET 4 $4
id 2 $2
book1 5 $5
book_description1 17 $17
注意一下:HSET命令本身也作为协议的其中一个参数来发送。

构造出来的协议数据结构:

*4\r\n$4\r\nHSET\r\n$2\r\nid\r\n$5\r\nbook1\r\n$17\r\nbook_description1\r\n

格式化一下:

*4\r\n
$4\r\n
HSET\r\n
$2\r\n
idvvvv\r\n
$5\r\n
book1\r\n
$17\r\n
book_description1\r\n
RESP协议 bulk
Redis客户机使用一种称为RESP (Redis序列化协议)的协议与Redis服务器通信。

redis-cli pipe模式需要和nc命令一样快,并且解决了nc命令不知道何时命令结束的问题。

在发送数据的同时,它同样会去读取响应,尝试去解析。

一旦输入流中没有读取到更多的数据之后,它就会发送一个特殊的20比特的echo命令,标识最后一个命令已经发送完毕
如果在响应结果中匹配到这个相同数据后,说明本次批量发送是成功的。

使用这个技巧,我们不需要解析发送给服务器的协议来了解我们发送了多少命令,只需要解析应答即可。

在解析应答时,redis会对解析的应答进行一个计数,在最后能够告诉用户大量插入会话向服务器传输的命令的数量。也就是上面我们使用pipe模式实际操作的响应结果。

将输入数据源换成mysql
上面的例子中,我们以一个txt文本为输入数据源,使用了pipe模式导入数据。

基于上述协议的学习和理解,我们只需要将mysql中的数据按照既定的协议通过pipe模式导入Redis即可。

实际案例--从Mysql导入百万级数据到Redis
首先造数据
由于环境限制,所以这里没有用真实数据来实现导入,那么我们就先使用一个存储过程来造一百万条数据把。使用存储过程如下:

DELIMITER $$
USE `cb_mon`$$

DROP PROCEDURE IF EXISTS `test_insert`$$
CREATE DEFINER=`root`@`%` PROCEDURE `test_insert`()
BEGIN

DECLARE i INT DEFAULT 1;
WHILE i<= 1000000
DO
INSERT INTO t_book(id,number,NAME,descrition)
VALUES (i, CONCAT("00000",i) , CONCAT('book',i)
, CONCAT('book_description',i));
SET i=i+1;
END WHILE ;
COMMIT;
END$$

DELIMITER ;
调用存储过程:

CALL test_insert();
查看表数据:

按协议构造查询语句
按照上述redis协议,我们使用如下sql来构造协议数据

SELECT
CONCAT(
"*4\r\n",
"$",
LENGTH(redis_cmd),
"\r\n",
redis_cmd,
"\r\n",
"$",
LENGTH(redis_key),
"\r\n",
redis_key,
"\r\n",
"$",
LENGTH(hkey),
"\r\n",
hkey,
"\r\n",
"$",
LENGTH(hval),
"\r\n",
hval,
"\r"
)
FROM
(SELECT
"HSET" AS redis_cmd,
id AS redis_key,
NAME AS hkey,
descrition AS hval
FROM
cb_mon.t_book
) AS t limit 1000000
并将内容保存至redis.sql 文件中。

编写脚本使用pipe模式导入redis
编写shell脚本。由于我在主机上是通过docker安装的redis和mysql,以下脚本供参考:

file

#!/bin/bash
starttime=`date +'%Y-%m-%d %H:%M:%S'`

docker exec -i 899fe01d4dbc mysql --default-character-set=utf8
--skip-column-names --raw < ./redis.sql
| docker exec -i 4c90ef506acd redis-cli --pipe

endtime=`date +'%Y-%m-%d %H:%M:%S'`
start_seconds=$(date --date="$starttime" +%s);
end_seconds=$(date --date="$endtime" +%s);

echo "脚本执行耗时: "$((end_seconds-start_seconds))"s"
执行截图:

file

可以看到百万级的数据导入redis,只花费了7秒,效率非常高。

注意事项
如果mysql表特别大,可以考虑分批导入,或者将表拆分,否则在导入过程中可能会发生

lost connection to mysql server during query
由于max_allowed_packed和超时时间限制,查询数据的过程中,可能会造成连接断开,所以在数据表的数据量特别大的时候,需要分页或者将表拆分导入。

总结
本篇文章主要探讨了,Mysql百万级数据量级下,如何高效的迁移到Redis中去,逐步实现目标的过程中,总结了如下几点

redis单线程执行命令,避免了线程切换所消耗的时间,但是在超大数据量级下,其发送、响应接收的时延不可忽视。
网络nc命令的应用场景,及在数据导入时存在的缺点。
redis RESP协议的理解和应用。
百万量级Mysql数据的Redis快速导入案例。

Redis的pipeline(管道)功能在命令行中没有,但redis是支持pipeline的,而且在各个语言版的client中都有相应的实现。 由于网络开销延迟,就算redis server端有很强的处理能力,也会由于收到的client消息少,而造成吞吐量小。当client 使用pipelining 发送命令时,redis server必须将部分请求放到队列中(使用内存),执行完毕后一次性发送结果;如果发送的命令很多的话,建议对返回的结果加标签,当然这也会增加使用的内存;

  Pipeline在某些场景下非常有用,比如有多个command需要被“及时的”提交,而且他们对相应结果没有互相依赖,对结果响应也无需立即获得,那么pipeline就可以充当这种“批处理”的工具;而且在一定程度上,可以较大的提升性能,性能提升的原因主要是TCP连接中减少了“交互往返”的时间。

  不过在编码时请注意,pipeline期间将“独占”链接,此期间将不能进行非“管道”类型的其他操作,直到pipeline关闭;如果你的pipeline的指令集很庞大,为了不干扰链接中的其他操作,你可以为pipeline操作新建Client链接,让pipeline和其他正常操作分离在2个client中。不过pipeline事实上所能容忍的操作个数,和socket-output缓冲区大小/返回结果的数据尺寸都有很大的关系;同时也意味着每个redis-server同时所能支撑的pipeline链接的个数,也是有限的,这将受限于server的物理内存或网络接口的缓冲能力。

(一)简介
  Redis使用的是客户端-服务器(CS)模型和请求/响应协议的TCP服务器。这意味着通常情况下一个请求会遵循以下步骤:

客户端向服务端发送一个查询请求,并监听Socket返回,通常是以阻塞模式,等待服务端响应。
服务端处理命令,并将结果返回给客户端。
  Redis客户端与Redis服务器之间使用TCP协议进行连接,一个客户端可以通过一个socket连接发起多个请求命令。每个请求命令发出后client通常会阻塞并等待redis服务器处理,redis处理完请求命令后会将结果通过响应报文返回给client,因此当执行多条命令的时候都需要等待上一条命令执行完毕才能执行。比如:

  

  其执行过程如下图所示:

  

  由于通信会有网络延迟,假如client和server之间的包传输时间需要0.125秒。那么上面的三个命令6个报文至少需要0.75秒才能完成。这样即使redis每秒能处理100个命令,而我们的client也只能一秒钟发出四个命令。这显然没有充分利用 redis的处理能力。

  而管道(pipeline)可以一次性发送多条命令并在执行完后一次性将结果返回,pipeline通过减少客户端与redis的通信次数来实现降低往返延时时间,而且Pipeline 实现的原理是队列,而队列的原理是时先进先出,这样就保证数据的顺序性。 Pipeline 的默认的同步的个数为53个,也就是说arges中累加到53条数据时会把数据提交。其过程如下图所示:client可以将三个命令放到一个tcp报文一起发送,server则可以将三条命令的处理结果放到一个tcp报文返回。

  

  需要注意到是用 pipeline方式打包命令发送,redis必须在处理完所有命令前先缓存起所有命令的处理结果。打包的命令越多,缓存消耗内存也越多。所以并不是打包的命令越多越好。具体多少合适需要根据具体情况测试。

(二)比较普通模式与PipeLine模式
  测试环境:
  Windows:Eclipse + jedis2.9.0 + jdk 1.7
  Ubuntu:部署在虚拟机上的服务器 Redis 3.0.7

/*
* 测试普通模式与PipeLine模式的效率:
* 测试方法:向redis中插入10000组数据
*/
public static void testPipeLineAndNormal(Jedis jedis)
throws InterruptedException {
Logger logger = Logger.getLogger("javasoft");
long start = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
jedis.set(String.valueOf(i), String.valueOf(i));
}
long end = System.currentTimeMillis();
logger.info("the jedis total time is:" + (end - start));

Pipeline pipe = jedis.pipelined(); // 先创建一个pipeline的链接对象
long start_pipe = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
pipe.set(String.valueOf(i), String.valueOf(i));
}
pipe.sync(); // 获取所有的response
long end_pipe = System.currentTimeMillis();
logger.info("the pipe total time is:" + (end_pipe - start_pipe));

BlockingQueue logQueue = new LinkedBlockingQueue();
long begin = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
logQueue.put("i=" + i);
}
long stop = System.currentTimeMillis();
logger.info("the BlockingQueue total time is:" + (stop - begin));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
  

  从上述代码以及结果中可以明显的看到PipeLine在“批量处理”时的优势。

(三)适用场景
  有些系统可能对可靠性要求很高,每次操作都需要立马知道这次操作是否成功,是否数据已经写进redis了,那这种场景就不适合。

  还有的系统,可能是批量的将数据写入redis,允许一定比例的写入失败,那么这种场景就可以使用了,比如10000条一下进入redis,可能失败了2条无所谓,后期有补偿机制就行了,比如短信群发这种场景,如果一下群发10000条,按照第一种模式去实现,那这个请求过来,要很久才能给客户端响应,这个延迟就太长了,如果客户端请求设置了超时时间5秒,那肯定就抛出异常了,而且本身群发短信要求实时性也没那么高,这时候用pipeline最好了。

(四)管道(Pipelining) VS 脚本(Scripting)
  大量 pipeline 应用场景可通过 Redis 脚本(Redis 版本 >= 2.6)得到更高效的处理,后者在服务器端执行大量工作。脚本的一大优势是可通过最小的延迟读写数据,让读、计算、写等操作变得非常快(pipeline 在这种情况下不能使用,因为客户端在写命令前需要读命令返回的结果)。

  应用程序有时可能在 pipeline 中发送 EVAL 或 EVALSHA 命令。Redis 通过 SCRIPT LOAD 命令(保证 EVALSHA 成功被调用)明确支持这种情况。

Category storage