Redis批量插入

有时候需要在短时间内加载大量的之前存在或者用户产生的数据,Redis能尽可能快的创建数以百万计的的key。这就是所谓的批量插入,本文档旨在提供有关如何给Redis尽可能快的填充数据。

Redis 批量插入


使用协议,Luke

使用正常的Redis客户端执行批量插入不是一个很好的主意,有几个原因:发送一个命令之后,另外一个是非常缓慢的,因为你必须为每一个命令付出往返时间。你也可以使用管道,但是对于大量的插入,你需要在写新命令的同时读取响应以确保Redis能尽可能快的完成插入。

只有一小部分客户端支持非阻塞IO,而且并不是所有的客户端能够以有效地方式来解析响应,以最大限度的提高吞吐量。

这些原因致使批量导入数据到Redis的首选方式是 生成包含Redis 协议的文本文件、以RAW格式来调用命令完成插入所需的数据。

例如,如果我需要产生一个大的数据集,这些数据有数十亿的键值对(KeyN--->ValueN),接下来我将创建一个文件,文件中包含如下以Redis协议格式的命令:

SET Key0 Value0
SET Key1 Value1

SET KeyN ValueN

一旦创建此文件,剩下的动作就是尽可能快的去填充数据。过去这样做是使用如下netcat命令:

(cat data.txt; sleep 10) | nc localhost 6379 > /dev/null

然而对于执行批量插入来说这并不是一个可靠的方式,因为netcat并不知道数据什么时候传输完成而且也不做错误检查。在Redis 2.6和之后的高级版本中,redis-cli支持一种新的模式,该模式被称为管道模式,它就是为了批量插入而设计的。
使用管道模式运行命令如下:

cat data.txt | redis-cli –pipe

它将会产生如下输出:

All data transferred. Waiting for the last reply…
Last reply received from server.
errors: 0, replies: 1000000

redis-cli工具也将确保只将Redis实例的错误重定向到标注输出。


生成Redis协议

生成和解析Redis协议非常的简单。
为了生成批量插入数据的协议,不需要了解协议的每一个细节,但是每一个命令必须都以下面的方式表示:

*<args><cr><lf>
$<len><cr><lf>
<arg0><cr><lf>
<arg1><cr><lf>

<argN><cr><lf>

其中<cr>表示\r(或ASCII字符13),<lf> 表示 \n (或ASCII字符10)。例如 SET key value命令如下:

*3<cr><lf>
$3<cr><lf>
SET<cr><lf>
$3<cr><lf>
key<cr><lf>
$5<cr><lf>
value<cr><lf>

或者表示为如下字符串:

"*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n"

你只需要生成由如上方式表示的命令组成的文件。下面是使用Ruby函数生成的协议

1
2
3
4
5
6
7
8
9
10
11
def gen_redis_proto(*cmd)
proto = ""
proto << "*"+cmd.length.to_s+"\r\n"
cmd.each{|arg|
proto << "$"+arg.to_s.bytesize.to_s+"\r\n"
proto << arg.to_s+"\r\n"
}
proto
end
puts gen_redis_proto("SET","mykey","Hello World!").inspect

使用上面的函数可以很容易的生成上面示例中键值对,程序如下:

1
2
3
(0...1000).each{|n|
STDOUT.write(gen_redis_proto("SET","Key#{n}","Value#{n}"))
}

我们可以通过管道去运行redis-cli来完成批量插入:

$ ruby proto.rb | redis-cli –pipe
All data transferred. Waiting for the last reply…
Last reply received from server.
errors: 0, replies: 1000


在Hoods下管道模式是如何工作的

魔术发生在 redis-cli的管道模式内部,和netcat一样快,而且同时能够知道服务器发送的最后响应是什么时候。

通过以下方式可以得知:

  • redis-cli --pipe尝试尽可能快的发送数据到服务器。
  • 同时,当数据到达时尝试去解析它。
  • 一旦发现没有数据输入,就会发送一个随机的20个字节组成的特殊的ECHO命令:我们能够确保这是发送的最后的命令,并且如果我们收到同样的20字节的响应,我们就可以对响应进行匹配检查。
  • 一旦这个特殊的最终命令被发送,接收响应的代码就开始和这20字节的响应进行匹配。当匹配响应到达时,就可以成功推出了。

使用这个技巧,我们不需要解析我们发送到服务器的协议来了解我们发送的命令,除了响应。

然而,在解析响应时我们为所有的响应开启计数器,以便在最后,能够被告知批量插入到服务器的命令的数量。


实战

需求

使用redis来存储用户投递过的职位Id列表,类似于我的关注,粉丝列表。

方案 :

使用Sorted Set来存储,生成协议文件,然后使用 –pipe导入

格式 :

userId:1231312 deliverTime(投递时间) positionId(职位Id)

查看用户投递的所有职位

zrange userId:645009 0 -1 withscores

查看用户投递数

zcard userId:xxx

查看用户某一时间段的投递数

zcount userId:xxx minScore maxScore

查看用户是否投递过

zrank userId:xxx positionId 获取用户排名(从0开始)
zscore userId:xxx positionId 获取score

移除单个

zrem userId:xxx positionId

移除多个

zrem userId:xxx positionId positionId positionId

移除不存在元素

zrem userId:xxx non-exists-element

生成redis协议文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
其中文件内容生成方式示例(zadd)如下:
/**
* 生成redis proto协议命令
* @param key
* @param score
* @param member
* @return
*/
public static String createRedisProtocolCommonds(String key,Long score,Integer member){
StringBuffer a = new StringBuffer();
// zadd
a.append("*4\r\n");
a.append("$4\r\n");
a.append("ZADD");
a.append("\r\n");
// zadd userId:userId key
a.append("$");
a.append(key.length());
a.append("\r\n");
a.append(key);
a.append("\r\n");
// zadd userId:userId key score
a.append("$");
a.append(String.valueOf(score).length());
a.append("\r\n");
a.append(score);
a.append("\r\n");
// zadd userId:userId key score value
a.append("$");
a.append(String.valueOf(member).length());
a.append("\r\n");
a.append(member);
a.append("\r\n");
return a.toString();
}

以下是我用sql直接生成的(只是示例代码,跟上面的需求无关)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
SELECT CONCAT(
"*4\r\n",
'$', LENGTH(redis_cmd), '\r\n',
redis_cmd, '\r\n',
'$', LENGTH(redis_key), '\r\n',
redis_key, '\r\n',
'$', LENGTH(hkey), '\r\n',
hkey, '\r\n',
'$', LENGTH(hval), '\r\n',
hval, '\r'
)
FROM (
SELECT
'ZADD' AS redis_cmd,
'createTime' AS redis_key,
userId AS hkey,
city AS hval
FROM user_ip_location
) AS t

导入数据命令 :

cat xxx.log | redis-cli –pipe

耗时:千万数据,20S左右,Windows机器。

译自:Redis Mass Insertion 欢迎指正。

实战来自之前的工作内容,因为要将历史数据导入到redis,当时想到mysql有个批量导入,redis应该也有吧,于是就调研了一下。
过去大半年了,今天才落实到文档。