Redis常用数据处理

主要引入如下模块

1
2
import redis
import rediscluster

一.删除指定模式的key

1.单个节点

使用scan + pipeline + del(每次扫描100个)

1
2
3
4
5
6
7
8
9
10
11
12
13
def delInstancePatternKeys(host, port, pattern, password=None):
client = redis.StrictRedis(host=host, port=port, password=password)
pipeline = client.pipeline(transaction=False)
cursor = 0
while True:
currentCursor, keys = client.scan(cursor=cursor, match=pattern, count=100)
if len(keys) > 0:
for key in keys:
pipeline.delete(key)
pipeline.execute()
cursor = currentCursor
if cursor == 0:
break

例如删除127.0.0.1:6380中以video开头的key,可以执行如下

1
delInstancePatternKeys("127.0.0.1",6380,"video*")

2.集群

借助cluster nodes获取所有master节点,执行上面的函数:

1
2
3
4
5
6
7
8
9
10
11
12
def delClusterPatternKey(startupNodes, pattern):
clusterClient = rediscluster.StrictRedisCluster(startup_nodes=startupNodes, decode_responses=True)
clusterInfos = clusterClient.cluster_nodes()
for clusternode in clusterInfos:
slots = clusternode["slots"]
# only check master
if len(slots) > 0:
print "{tempHost}:{tempPort} start to del pattern keys".format(tempHost=clusternode["host"],
tempPort=clusternode["port"])
delInstancePatternKeys(clusternode["host"], clusternode["port"], pattern)
print "{tempHost}:{tempPort} end to del pattern keys".format(tempHost=clusternode["host"],
tempPort=clusternode["port"])

对于使用者来讲,只需要提供集群中一个可用的节点即可,例如如下:

1
2
startupNodes = [{"host":"127.0.0.1","port":7000}]
delClusterPatternKey(startupNodes, "video*")

二.寻找bigkey

1.单个节点

使用scan + strlen(字符串) | debug object(非字符串)找到字节数在startBytes与endBytes之间的key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def findInstanceBigKey(host, port, startBytes, endBytes, password=None):
client = redis.StrictRedis(host=host, port=port, password=password)
cursor = 0
while True:
currentCursor, keys = client.scan(cursor=cursor, match=None, count=100)
for key in keys:
try:
keyType = client.type(key)
if "string" == keyType.lower():
strlen = client.strlen(key)
if (strlen > startBytes and strlen < endBytes):
print "{key} strlen is {strlen} byte".format(key=key, strlen=strlen)
else:
keyObjectDict = client.debug_object(key)
serializedlength = keyObjectDict["serializedlength"]
if (serializedlength > startBytes and serializedlength < endBytes):
print "{key} serializedlength is {serializedlength} byte".format(key=key,
serializedlength=serializedlength)
except Exception, ex:
print ex
cursor = currentCursor
if cursor == 0:
break

因为debug object中的s是键值序列化后的长度,而且对于非字符串类型debug object命令是有成本的,如下:(每个item都在1~10个字节)

类型 长度 每个item长度 耗时
hash 100000 4字节 27毫秒
hash 500000 4字节 137毫秒
hash 1000000 4字节 255毫秒
list 100000 4字节 4毫秒
list 500000 4字节 8毫秒
list 1000000 4字节 12毫秒
set 100000 4字节 16毫秒
set 500000 4字节 85毫秒
set 1000000 4字节 181毫秒
zset 100000 4字节 78毫秒
zset 500000 4字节 355毫秒
zset 1000000 4字节 733毫秒

而且实际上非字符串类型,所谓的bigkey其实我们更关系的是它的长度,所以上述代码稍作改进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def findInstanceBigKeyV2(host, port, startBytes, endBytes, maxLength, password=None):
client = redis.StrictRedis(host=host, port=port, password=password)
cursor = 0
while True:
currentCursor, keys = client.scan(cursor=cursor, match=None, count=100)
for key in keys:
try:
keyType = client.type(key)
if "string" == keyType.lower():
strlen = client.strlen(key)
if (strlen > startBytes and strlen < endBytes):
print "{key} strlen is {strlen} byte".format(key=key, strlen=strlen)
else:
length = 0
if "hash" == keyType.lower():
length = client.hlen(key)
elif "list" == keyType.lower():
length = client.llen(key)
elif "set" == keyType.lower():
length = client.scard(key)
elif "zset" == keyType.lower():
length = client.zcard(key)
if length > maxLength:
print "{keyType} key {key} length is {length} ".format(keyType=keyType, key=key, length=length)
except Exception, ex:
print ex
cursor = currentCursor
if cursor == 0:
break

例如寻找127.0.0.1:6380中10KB~20KB(大约)的key,可以执行如下:

1
findInstanceBigKey("127.0.0.1",6380,10000,20000)

2.集群

1
2
3
4
5
6
7
8
9
10
11
12
def findClusterBigKey(startupNodes, startBytes, endBytes):
clusterClient = rediscluster.StrictRedisCluster(startup_nodes=startupNodes, decode_responses=True)
clusterInfos = clusterClient.cluster_nodes()
for clusternode in clusterInfos:
slots = clusternode["slots"]
# only check master
if len(slots) > 0:
print "{tempHost}:{tempPort} start to scan bigkeys".format(tempHost=clusternode["host"],
tempPort=clusternode["port"])
findInstanceBigKey(clusternode["host"], clusternode["port"], startBytes, endBytes)
print "{tempHost}:{tempPort} end to scan bigkeys".format(tempHost=clusternode["host"],
tempPort=clusternode["port"])

对于使用者来讲,只需要提供集群中一个可用的节点即可,例如如下:

1
2
startupNodes = [{"host":"127.0.0.1","port":7000}]
findClusterBigKey(startupNodes, 10000, 20000)

三.空闲key

1.单个节点

scan + debug object找到出空闲时间大于idleDays(单位:天)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def findInstanceIdleKeys(host, port, idleDays, password=None):
client = redis.StrictRedis(host=host, port=port, password=password)
cursor = 0
while True:
currentCursor, keys = client.scan(cursor=cursor, match=None, count=100)
if len(keys) > 0:
for key in keys:
keyObjectDict = client.debug_object(key)
lruSecondsIdle = keyObjectDict["lru_seconds_idle"]
if lruSecondsIdle > idleDays * 3600 *24:
print "{key} idle days is {lruSecondsIdle}".format(key=key, lruSecondsIdle=lruSecondsIdle/24/3600)
currentCursor = cursor
if cursor == 0:
break

例如寻找127.0.0.1:6380中闲置时间大于1天的key,可以执行如下:

1
findInstanceIdleKeys("127.0.0.1",6380,1)

2.集群

1
2
3
4
5
6
7
8
9
10
11
12
def findClusterIdleKeys(startupNodes, idleDays):
clusterClient = rediscluster.StrictRedisCluster(startup_nodes=startupNodes, decode_responses=True)
clusterInfos = clusterClient.cluster_nodes()
for clusternode in clusterInfos:
slots = clusternode["slots"]
# only check master
if len(slots) > 0:
print "{tempHost}:{tempPort} start to find idle keys".format(tempHost=clusternode["host"],
tempPort=clusternode["port"])
findInstanceIdleKeys(host, port, idleDays)
print "{tempHost}:{tempPort} end to find idle keys".format(tempHost=clusternode["host"],
tempPort=clusternode["port"])

对于使用者来讲,只需要提供集群中一个可用的节点即可,例如如下:

1
2
startupNodes = [{"host":"127.0.0.1","port":7000}]
findClusterIdleKeys(startupNodes, 1)

四.死键

1.单个节点

对于死键问题仅仅需要scan即可解决,有关什么是死键可以参考如下:Redis的“死键”问题

1
2
3
4
5
6
7
8
9
def delInstanceDieKeys(host, port, password=None):
client = redis.StrictRedis(host=host, port=port, password=password)
pipeline = client.pipeline(transaction=False)
cursor = 0
while True:
currentCursor, keys = client.scan(cursor=cursor, match=pattern, count=100)
cursor = currentCursor
if cursor == 0:
break

2.集群

1
2
3
4
5
6
7
8
9
10
11
12
def delClusterDieKeys(startupNodes):
clusterClient = rediscluster.StrictRedisCluster(startup_nodes=startupNodes, decode_responses=True)
clusterInfos = clusterClient.cluster_nodes()
for clusternode in clusterInfos:
slots = clusternode["slots"]
# only check master
if len(slots) > 0:
print "{tempHost}:{tempPort} start to find die keys".format(tempHost=clusternode["host"],
tempPort=clusternode["port"])
delInstanceDieKeys(host,port)
print "{tempHost}:{tempPort} end to find idle keys".format(tempHost=clusternode["host"],
tempPort=clusternode["port"])

五、删除bigkey

1.hash类型

hscan + pipeline + hdel + del

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def delBigHashKey(host, port, delKey, password=None):
client = redis.StrictRedis(host=host, port=port, password=password)
pipeline = client.pipeline(transaction=False)
keyType = client.type(delKey)
if "hash" != keyType:
print "{0} is not hash type".format(delKey)
else:
cursor = 0
while True:
currentCursor,fields = client.hscan(name=delKey,cursor=cursor,match=None,count=100)
for field in fields:
pipeline.hdel(delKey,field)
pipeline.execute()
cursor = currentCursor
if cursor == 0:
break
client.delete(delKey)

2.set类型

sscan + pipeline + srem + del

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def delBigSetKey(host, port, delKey, password=None):
client = redis.StrictRedis(host=host, port=port, password=password)
pipeline = client.pipeline(transaction=False)
keyType = client.type(delKey)
if "set" != keyType:
print "{0} is not set type".format(delKey)
else:
cursor = 0
while True:
currentCursor,items = client.sscan(name=delKey,cursor= cursor,match=None,count=100)
for item in items:
pipeline.srem(delKey,item)
pipeline.execute()
cursor = currentCursor
if cursor == 0:
break
client.delete(delKey)

3.zset类型

zscan + pipeline + zrem + del

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def delBigZsetKey(host, port, delKey, password=None):
client = redis.StrictRedis(host=host, port=port, password=password)
pipeline = client.pipeline(transaction=False)
keyType = client.type(delKey)
if "zset" != keyType:
print "{0} is not zset type".format(delKey)
else:
cursor = 0
while True:
currentCursor,items = client.zscan(name=delKey,cursor= cursor,match=None,count=100)
for item in items:
pipeline.zrem(delKey, item)
pipeline.execute()
cursor = currentCursor
if cursor == 0:
break
client.delete(delKey)

4.list类型

llen + ltrim + del

1
2
3
4
5
6
7
8
9
def delBigListKey(host, port, delKey, password=None):
client = redis.StrictRedis(host=host, port=port, password=password)
keyType = client.type(delKey)
if "list" != keyType:
print "{0} is not list type".format(delKey)
else:
while client.llen(delKey) > 0:
client.ltrim(delKey, 100, -1)
client.delete(delKey)

六、寻找可疑的客户端

client list中的重要属性

1.空闲超过idleSeconds秒的连接

1
2
3
4
5
def findIdleClientList(host, port, idleSeconds, password=None):
client = redis.StrictRedis(host=host, port=port, password=password)
for client in client.client_list():
if client["idle"] > idleSeconds:
print client

每个client中若干属性,例如下面

1
id=9 addr=127.0.0.1:57839 fd=6 name= age=3574 idle=3574 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=NULL

也可以通过awk快速找到idle大的连接

1
2
redis-cli -h 127.0.0.1 -p 6379 client list | awk '{if(int(substr($6,6))>10) print $0}'
id=34 addr=127.0.0.1:64300 fd=6 name= age=77 idle=77 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=NULL

2.输出缓冲区过大的连接

重点关注:obl、oll、omem

3.输入缓冲区过大的连接

重点关注: quf、qbuf-free

4.重点命令

例如:monitor、flush等

也综合考虑age、idle、qbuf、qbuf-free、obl、oll、omem、cmd等检查集群中不合理的客户端连接

七、短时间热点key的寻找

可以使用redis-faina.py短时间利用monitor找到热点key,具体可以参考Redis热点key寻找与优化