redis一些总结

简介

redis提供亚毫秒级别的响应,是内存键值数据存储,每秒处理百万请求轻轻松松,在游戏,广告,金融,医疗,会话管理,游戏。排行榜,实时分析,网约车,聊天室,流媒体,发布订阅领域都占有很高的市场。

  • 性能极致:读速度110000(11w)次/s, 写 81000(8w) 次每秒
  • 丰富的数据类型:支持string,lists,hashes,set。
  • 原子:操作都是原子性
  • 特性:支持发布订阅,通知,key过期

与memcached对比

  • 数据结构比memcached(接下来我会用mc来代称)丰富,mc支持的是ky结构,redis丰富很多
  • redis比mc块很多,切单节点并发更高,多配置提高性能
  • 可靠性比mc好,提供数据持久化选项,可以保存数据到磁盘上
  • 扩展性比mc,主从复制,集群模式。
  • 场景更多,可用于复杂数据结构类型,典型场景有:消息队列mq,缓存,会话管理,分布式锁。mc则是用于简单的kv缓存用来提高web性能。
    redis典型场景很多

缓存:用于web或移动端的会话数据管理,api调用查询结果。

实时数据处理:大型社交网络,游戏,用户活动跟踪,实时事件统计。

分布式锁:协调分布式系统的并发控制。

消息队列:异步处理任务,推送通知等等。

地理位置:查找附近商店等等定位。

数据类型

对于redis而言,所有的key都是字符串,value则有五种。

分别是:

  • 1 string: 可以是字符串,可以是整数,浮点数

  • 2 List:链表,节点包含一个字符串,可以push,pop

  • 3 Set:无序集合,基础方式查看存在,添加,获取,删除,计算交集,并集,差集。

  • 4 Hash: 哈希表

  • 5 Sorted Set:类似集合但是有序,每个元素有一个分数,根据分数进行排序(排行榜最多)
    还有其他的五个特殊数据类型:

  • 1 Streams: 有序,持久化的,高性能消息传递系统,类似消息队列,支持多个消费者并发消费。

  • 2 Geospatial indexes:支持地理位置信息存储和索引,经纬度进行地理位置查询。

  • 3 Bitmaps: 存储只有1和0数据,可以进行位运算和统计。

  • 4 Bitfields:可以存储不同长度的二进制数据,位运算和类型转换。

  • 5 HpyerLogLog:基数估计算法,可以低内存估算一个集合的元素个数,帮助统计和分析。
    问题:一个字符串类型的值能存储最大容量是多少?是 512 MB(536,870,912 bytes)

字符串

字符串在这里几个特性:

  • 快速读写:纳秒级别
  • 支持各种编码:raw,int,数据内容自适应选择,节省内存空间
  • 支持各种操作:设置,获取,删除,添加,自增自减
  • 批量操作:batch操作提高性能
  • 过期时间:设置好,到点了自己删除kv值
  • 计数器:可以作为计数器使用,自增自减

基本操作

常见操作:GET,SET,GETSET,还有INCR & DECR & INCRBY & DECRBY:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import redis

# 连接到 Redis 服务器
client = redis.StrictRedis(host='localhost', port=6379, db=0)

# SET 和 GET 示例
client.set('mykey', 'Hello, Redis!')
value = client.get('mykey')
print(value.decode('utf-8'))  # 输出: Hello, Redis!

# GETSET 示例
client.set('mykey', 'Hello, Redis!')
old_value = client.getset('mykey', 'New Value')
print(old_value.decode('utf-8'))  # 输出: Hello, Redis!
new_value = client.get('mykey')
print(new_value.decode('utf-8'))  # 输出: New Value

# INCR 和 DECR 示例
client.set('counter', 0)
client.incr('counter')
print(client.get('counter').decode('utf-8'))  # 输出: 1
client.decr('counter')
print(client.get('counter').decode('utf-8'))  # 输出: 0

# INCRBY 和 DECRBY 示例
client.set('counter', 0)
client.incrby('counter', 10)
print(client.get('counter').decode('utf-8'))  # 输出: 10
client.decrby('counter', 5)
print(client.get('counter').decode('utf-8'))  # 输出: 5

其他操作可以参考网络

实现方式sds

redis实现字符串底层是靠 Simple Dynamic String,简单动态字符串。

1
2
3
4
5
6
struct __attribute__ ((__packed__)) sdshdr8 {
    uint8_t len; /* used */
    uint8_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};
  • 获取字符串长度的时间复杂度位O(1),因为sds里面的len存储了长度值。
  • 不容易缓冲区溢出,因为传统c字符串不会记录长度值,sds会,空间不足则申请,不容易溢出。
  • c语系的字符串stl容器之类修改内容是会导致重新分配内存,性能不高,redis则采用了两种方式去解决,第一是空间预分配:当对sds进行拓展,redis会位sds分配好内存,根据公式分配多余的free空间,用于备用。第二是,惰性空间释放,分配多余的空间后,如果字符串内容缩水,则不会立即回收,而是当再次操作依旧未使用多余空间,才会导致回收。

zset

zset用处很大,它不允许重复元素,而且每个元素会关联一个double类型的分数,redis正是通过这个分数对元素进行从小到大排序。

zset的一个元素具备三个元素,分别是key(键),score(排序用的),member(值)。

zset的常用操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import redis
# 连接到 Redis 服务器
r = redis.Redis(host='localhost', port=6379, db=0)
# 添加和更新成员
r.zadd('myzset', {'one': 1, 'two': 2})
# 获取成员的分数
score_one = r.zscore('myzset', 'one')
print(f"Score of 'one': {score_one}")
# 获取集合中成员的数量
count = r.zcard('myzset')
print(f"Number of members in 'myzset': {count}")
# 获取成员的排名(从小到大)
rank_one = r.zrank('myzset', 'one')
print(f"Rank of 'one': {rank_one}")
# 获取成员的逆序排名(从大到小)
revrank_one = r.zrevrank('myzset', 'one')
print(f"Reverse rank of 'one': {revrank_one}")
# 获取有序集合中所有成员
members = r.zrange('myzset', 0, -1, withscores=True)
print(f"All members in 'myzset': {members}")
# 删除有序集合中的成员
r.zrem('myzset', 'one')
print(f"After removing 'one': {r.zrange('myzset', 0, -1, withscores=True)}")

zset的底层结构

分两种场景,分别是元素多的时候,还有元素少的时候(元素数量少于 128 的时候

每个元素的长度小于 64 字节)。

  • 当元素多的时候,使用的是跳跃表按小到大存([score,value]),和哈希表,由哈希表维护元素和分数值,跳跃表维护顺序。
  • 当元素少的时候。使用压缩列表,元素被编码连续的二进制块,按排序分数大小排序,可以将多个元素压缩到一个连续的内存空间内。
    对于跳跃表可以参考:https://zhuanlan.zhihu.com/p/54869087

为什么不用平衡树

  • 在各种数据库内(AVL,红黑树,B+, B树)都是常客,但是为什么不用这类树结构维护有序。因为范围查找的时候,树操作比skiplist复杂,树还要进行中序遍历不超过最大值的节点。而skiplist只是简单的遍历。
  • 树的插入和删除都会引发树结构的变化,skiplist插入删除只是调整指针而已,然后读取,插入,和删除操作对于redis来说非常高发。

常见场景

  • 排行榜: redis zset常客,把点击次数,时间当score。
  • 时间轴:将元素的时间当作socre。
  • 队列系统:将元素的优先级别当作socre。
  • 去重:zset不允许重复元素。

与哈希表对比

哈希表的底层实现是依靠字典来实现,内部使用链地址法解决哈希冲突,当kv值增加的时候渐进式对哈希表rehash扩缩,避免一次性花大时间来重新计算哈希值。

可以参考代码:

https://github.com/redis/redis/blob/unstable/src/dict.h

https://github.com/redis/redis/blob/unstable/src/dict.c

使用场景就不多赘述。

hepyerLogLog

对于如何统计海量数据?这是一个基数算法,搞笑计算一个大集合里的独立元素个数,而不需要加载集合到内存中。最主要优势在于使用非常小的内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import redis

# 连接到 Redis 服务器
r = redis.Redis(host='localhost', port=6379, db=0)

# 添加元素到 HyperLogLog
r.pfadd('myhll', 'a', 'b', 'c', 'd', 'e')
r.pfadd('myhll', 'a', 'f', 'g')

# 获取基数估算值
count = r.pfcount('myhll')
print(f"Estimated number of unique elements: {count}")

# 合并多个 HyperLogLog
r.pfadd('another_hll', 'h', 'i', 'j')
r.pfmerge('merged_hll', 'myhll', 'another_hll')
merged_count = r.pfcount('merged_hll')
print(f"Estimated number of unique elements in merged HLL: {merged_count}")

使用场景:

1、统计每日活跃用户数、月活跃用户数等指标,特别是针对大数据场景。

2、统计不同用户的去重个数,比如计算网站独立访客数量。

3、统计不同 IP 地址的去重数量,可以用于网络安全领域,比如 DDoS 攻击的防御。

底层算法:

主要利用了前导0,就是左边开数0的数目,举个例子,把所有ip地址通过哈希函数转化为多个随机数,然后分成多个子集后,通过前导0计算基础,之后计算的子集的估计值的调和平均值。

redis线程

对于redis而言,是一个事件启动型引擎,第一种事件是文件事件(主要是和客户端产生交互,连接,接受,读取,写入,关闭),第二种事件是时间事件(清除过期键,服务状态统计)。

但是redis在事件处理器模块是单线程,只是使用了多路复用技术:

(多路复用技术)

  1. 监听多个 socket:使用 I/O 多路复用技术(如 epoll)来监听多个客户端 socket。
  2. 事件触发:当某个 socket 上有事件发生时(如客户端发送数据到服务器),I/O 多路复用会将此事件通知 Redis。
  3. 事件处理:Redis 在事件触发后,调用预先注册的事件处理函数来处理这个事件。
  4. 继续监听:处理完事件后,Redis 继续监听所有的 socket,以等待下一个事件的到来。
    来以此监听多个socket,实现多个client的并发处理(不是并行),这样不需要锁,而不需要切换线程。在一台linux上,redis实例每秒传递的请求百万级别,cpu根本不是其瓶颈所在,而是受限于内存,网络。但是这也不妨碍在cpu层,进行对它的优化,比如部署多个redis实例。

redis多线程

在redis上只有IO操作才是多线程,使用多个cpu核心来分担IO读写负载。具体使用了一个IO线程池子来处理IO事件,主线程乃单线程用来监听连接事件,线程池子处理读写事件。

redis事务

一联想其事务,都知道一个事务包含多个操作,redis的事务时一次性,顺序性,排他性的。

事务命令有五个:

1 MULTI 开启事务块

2 EXEC 执行事事务

3 DISCARD 取消事务

4 WATCH 监控键,键被修改,事务则打断

5 UNWATCN 取消监控

执行EXEC的时候,事务被放进队列里,当轮到的时候才执行里面所有的操作。

但是redis事务有点不太一样。

  • 隔离操作没有那么复杂只是事务执行过程不会被其他客户端打断。

  • 没有多隔离级别概念,没有可重复读之类的level,并发执行多个事务时会互相影响的。

  • 事务不保证原子操作,不支持因为被其他事务影响而回滚的行为。
    但是这里的它的回滚不能称之完全回滚

  • 在入队之前,已经检查出事务里的操作错误,那入队都错误。

  • 在执行前,如果检查出现异常,比如键不存在,也不会执行。

  • 在执行过程种,出现错误,则事务取消,但是!成功的部分是生效的。
    所以不能说是事务回滚。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import redis
# 连接到 Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# WATCH 键 'foo'
r.watch('foo')
# 开始一个事务
pipeline = r.pipeline()
try:
    # 添加一些命令到事务中
    pipeline.multi()
    pipeline.set('foo', 'bar')
    pipeline.incr('counter')

    # 执行事务
    pipeline.execute()
except redis.WatchError:
    # 事务被取消,因为 'foo' 被其他客户端修改
    print("Transaction aborted due to concurrent modification")
# 取消监视键 'foo'
r.unwatch()

redis消息队列

消息队列的特点总共就4个:

  • 1 可靠性:网络故障系统崩溃,或者消费者暂时的离线,都不应该影响到消息传递。
  • 2 顺序性:消息按顺序发送。
  • 3 并发:支持多个消费者,多个生产者。
  • 4 扩展性:根据系统负载动态缩小或增大。
    redis可以实现消息队列的方式有三种

List作为消息队列

Redis 的 LIST 数据结构可以用作简单的队列。使用 LPUSH 将消息推入队列,使用 RPOP 从队列中取出消息。

  • 优点:实现简单,适合小规模、简单的消息队列。
  • 缺点:不支持消息持久化、消息重放和消费者组。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    import redis

    # 连接到 Redis
    r = redis.Redis(host='localhost', port=6379, db=0)

    # 生产者:将消息推入队列
    def producer(queue_name, message):
        r.lpush(queue_name, message)

    # 消费者:从队列中取出消息
    def consumer(queue_name):
        while True:
            message = r.rpop(queue_name)
            if message:
                print(f"Consumed message: {message.decode('utf-8')}")
            else:
                break

    # 使用示例
    queue_name = 'message_queue'

    # 生产者推入消息
    producer(queue_name, 'Message 1')
    producer(queue_name, 'Message 2')

    # 消费者消费消息
    consumer(queue_name)

发布订阅

发布订阅是一种简单的消息传递模式,消息被发布到频道,所有订阅该频道的客户端都能接收到消息。

  • 优点:实时消息传递,适合广播消息。
  • 缺点:消息不持久化,订阅者必须在线才能接收到消息。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    import redis
    import threading

    # 连接到 Redis
    r = redis.Redis(host='localhost', port=6379, db=0)

    # 发布者:发布消息到频道
    def publisher(channel, message):
        r.publish(channel, message)

    # 订阅者:订阅频道并处理消息
    def subscriber(channel):
        pubsub = r.pubsub()
        pubsub.subscribe(channel)

        for message in pubsub.listen():
            if message['type'] == 'message':
                print(f"Received message: {message['data'].decode('utf-8')}")

    # 使用示例
    channel_name = 'my_channel'

    # 启动订阅者线程
    subscriber_thread = threading.Thread(target=subscriber, args=(channel_name,))
    subscriber_thread.start()

    # 发布者发布消息
    publisher(channel_name, 'Message 1')
    publisher(channel_name, 'Message 2')

stream

Redis Streams 是一种强大的消息队列系统,支持消息持久化、消费者组和消息确认等高级功能。

  • 优点:支持持久化、消费者组、消息确认和重放,适合复杂、高可靠性需求的消息队列。

  • 缺点:相对复杂,学习成本较高

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    import redis

    # 连接到 Redis
    r = redis.Redis(host='localhost', port=6379, db=0)

    # 生产者:添加消息到流
    def producer(stream_name, message):
        r.xadd(stream_name, message)

    # 消费者:读取流中的消息
    def consumer(stream_name, group_name, consumer_name):
        try:
            r.xgroup_create(stream_name, group_name, id='0', mkstream=True)
        except redis.exceptions.ResponseError:
            pass  # 消费组已经存在

        while True:
            messages = r.xreadgroup(group_name, consumer_name, {stream_name: '>'}, count=1, block=5000)
            if messages:
                for stream, msgs in messages:
                    for msg_id, msg in msgs:
                        print(f"Consumed message ID: {msg_id}, message: {msg}")
                        # 确认消息已被处理
                        r.xack(stream_name, group_name, msg_id)

    # 使用示例
    stream_name = 'mystream'
    group_name = 'mygroup'
    consumer_name = 'consumer1'

    # 启动消费者线程
    import threading
    consumer_thread = threading.Thread(target=consumer, args=(stream_name, group_name, consumer_name))
    consumer_thread.start()

    # 生产者推入消息
    producer(stream_name, {'field1': 'value1'})
    producer(stream_name, {'field2': 'value2'})

    这里三者最大的区别在于只有stream是支持持久化的,也就是可以再系统崩溃各种情况下不影响消息的传递。也支持消费组。但是比较复杂。所以如果想要构造一个可靠的,消息队列应该选择的是Streams。
    (PS:

  • Redis 适用于需要低延迟和高吞吐的小规模实时应用,如缓存和短期任务队列。

  • Kafka 适用于需要高可用性和可扩展性的大规模数据流处理,如日志收集、实时数据流处理和事件流系统。

分布式锁

分布式锁需要满足几个特性

  • 1 互斥性:就像一个单机锁上多个进程只能获得一个。
  • 2 高可用性:再多个机器小部分机器挂了也不会影响。
  • 3 锁超时:如果某机器获得了锁,然后挂机了,那么应该规定时间内还锁。
  • 4 独占性:加锁解锁都由一个机器来执行,不能A机器加锁,B机器解锁。

加锁操作:

1
SET lock_name my_random_value NX PX 30000

值是自己随机设置的,NX是表示lockname不存在才能SET成功,PX 30000是代表30秒是自动过期时间,一个合理的值。
当A机器的进程加锁了后,B机器的进程也想过来加锁,发现无法SET,因为NX表示lockname不存在才可以SET成功,所以进入了自旋遍历SET的循环里。

解锁操作:

1
2
3
GET lock_name
#进行value与自己的lock value进行对比,对比成功则删除
DEL lock_name

当A机器删除了lockname之后,下一瞬间B机器发现自己可以set了,所以锁被B机器获取了。

分布锁的原子性

有那么一种场景:

A机器set lockname成功了

A访问共享资源

A get lockname,进行value对比

A 突然挂机了

B 等30秒获得了锁,set lockname成功

A 恢复了执行 del lockname操作

(也就是A删除了B添加的锁)

正好redis的事务可以有watch异常,这个时候释放锁使用multi()命令,这样B set lockname 成功

户后,A恢复上线了,会马上抛异常watcherror,放弃事务了。

集群redlock算法

假设redis单个节点挂了,那导致分布式锁没用了,这是不可以接受的。

所以redis集群会基于一个半数议会制度来实现分布式锁。

假设机器A,会依次对多个独立的redis节点加锁,如果机器A能够对半数以上的redis节点加锁成功,则认为A获得了锁。由于A已经锁了半数以上的redis节点,B机器是绝对无法获得半数以上的redis节点支持。

当对每个redis节点加锁的时候,要对加锁操作设置超时时间,一旦A机器完成对所有节点加锁,就要计算总耗时t。

  • 成功条件1:超过半数的redis节点支持加锁成功。
  • 成功条件2:整个加锁时间没有超过锁的有效时间,且重新计算锁的有效时间。
    PS 如果加锁失败,只能把解锁操作发给所有的redis节点。

秒杀的场景

秒杀系统在如今这个环境特别常见,假如某平台打算低价送飞天茅台。

  • 1 造成大量用户同一时间点击网页
  • 2 访问量远大于库存数目
  • 3 业务流程极其简单,就是下单减少库存

cpu内存

ssd固态硬盘读写速度是磁盘的100倍,内存又是ssd的10倍以上,也就是说使用ssd是写入磁盘的1000倍以上,原有的写入磁盘的秒杀系统需要1000台机器的话,那么现在1台机器就嫩搞定。但是会有人问,假设是多个redis实例来应付海量的请求,那么访问的redis实例挂了,“为之奈何?“, 那只能说该用户倒霉了,没秒到。

最后得到了订单序列再持久化到硬盘里,不会丢失数据,只需要写入内存返回秒杀成功给客户端就ok了。

异步处理

用户请求将写入内存将立即返回,通常使用MQ来实现,所以需要从内存读取该数据,使用多个线程来异步从内存中异步读取数据。

分布式处理

如果cpu内存,和异步处理,一台机器处理不来海量内存,可以分布式使用10台甚至百台机器,把请求通过hash均匀分布(paxos算法,hash ring算法)打到不同机器上,redis cluster就是这样的分布式系统。

具体操作

用redis的列表list作为商品库存队列,安装商品id和库存数目存入list里面。

用redis的hash表当作订单列表,把用户id和商品id存入列表里。

当请求抢购,则商品队列pop出一个商品id,并且将用户id和商品id存入订单系统中。

如果商品队列空了,说明卖完了,返回抢购完了返回给用用户。