数据类型和命令
官方 redis 命令文档
https://redis.io/docs/latest/commands/
通用命令
切换数据库
# 切换到0号库
select 0
# 切换到1号库
select 1判断 key 是否存在
set key1 v1
# 判断key1是否存在
exists key1keys 返回匹配指定模式的所有 key
set key1 v1
set key2 v2
# 返回当前数据库所有key
keys *
# 返回匹配 k* 的所有key
keys k*type 查看指定 key 的数据类型
set key1 v1
# 查看key1的数据类型
type key1del 删除指定 key
set key1 v1
# 删除指定key1
del key1unlink 异步删除指定的 key
set key1 v1
# 异步删除key1
unlink key1expire 设置指定 key 过期时间
set key1 v1
# 设置 key1 过期时间5秒
expire key1 5
keys *ttl 查看指定 key 剩余过期时间
-1 永不过期,-2 已经过期
set key1 v1
expire key1
# 查看 key1 剩余过期秒数
ttl key1查看 key 个数
dbsizeflushdb 清空当前数据库
set key1 v1
flushdb
keys *flushall 清空所有数据库
set key1 v1
select 1
set key2 v2
select 0
flushall
keys *数据类型
string 类型
string 类型 value 最多存储 512M 字符串
set、get
# 设置 key1,如果不存在 key1 则新增,否则更新 key1 内容为 v1
set key1 v1
# 获取 key1 的值
get key1mset、mget
# 一次设置多个key
mset k1 v1 k2 v2 k3 v3
keys *
# 一次获取多个key对应的值
mget k1 k2 k3
mset k1 v11 k2 v21 k4 v41
keys *
mget k1 k2 k4incr、decr、incrby、decrby、incrbyfloat
# key1 的值为数值类型时增加 1,否则报告错误 (error) ERR value is not an integer or out of range
incr key1
# key1 的值为数值类型时减少 1
decr key1
# key1增加指定步长
incrby key1 2
# key1减少指定步长
decrby key1 2
# 浮点类型数据指定浮点类型自增量
set key2 10.1
incrbyfloat key2 0.5append
set key1 v1
# 在 key1 中追加v2值,此时 key1 的值变为 v1v2
append key1 v2strlen
set key1 v123456
# 获取字符串长度
strlen key1setnx
# key1不存在时才设置v3值
setnx key1 v3
# 上面命令等价于下面命令
set key1 v3 nx
# 获取 key1 值
get key1setex
# 设置键值对时同时设置过期时间,设置键为key1,值为v1,过期时间为10秒的键值对
setex key1 10 v1
# 上面命令等价于下面命令
set key1 v1 ex 10
ttl key1setrange、getrange
# 获取值片段
set key1 123456
# 返回索引位置1-3(左闭右闭)的值片段,结果为"234"
getrange key1 1 3
# 设置值片段
set key1 123456
# 覆盖索引位置为3开始的值片段,结果为"123abc"
setrange key1 3 abcgetset
# 以新换旧,替换新值,返回旧值
set key1 v1
getset key1 v2hash 类型
hset、hget
# 设置和读取hash值
hset user1001 id 1001
hset user1001 name zhangsan
hget user1001 id
hget user1001 namehmset、hmget
# 一次设置和读取多个field值
hmset user1001 id 1001 name zhangsan
hmget user1001 id namehgetall
# 返回 hash 数据类型中所有 field 和 value
hmset user1001 id 1001 name zhangsan age 21
hgetall user1001hkeys、hvals
# 列出 hash 中所有 field
hmset user1001 id 1001 name zhangsan
hkeys user1001
# 列出 hash 中所有 value
hvals user1001hexists
# 判断field是否存在
hmset user1001 id 1001 name zhangsan
# 结果返回1
hexists user1001 id
# 结果返回0
hexists user1001 id1hincby
# 为指定的field增加指定的值
hmset user1001 id 1001 name zhangsan
# id field增加10
hincrby user1001 id 10hsetnx
# field不存在时才设置指定值
hmset user1001 id 1001 name zhangsan
# field id已经存在设置不成功
hsetnx user1001 id 1002
# field gender不存在设置成功
hsetnx user1001 gender femalelist 类型
lpush、lpop、rpush、rpop
# 从左边插入值
lpush k1 v3 v2 v1
# 使用lpop/rpop取出一个元素
lpush k1 v3 v2 v1
# 从左边取出一个元素
lpop k1
del k1
lpush k1 v3 v2 v1
# 从右边取出一个元素
rpop k1rpoplpush
从第一个 list 中 rpop 元素后 lpush 到第二个 list 中。
# rpoplpush用法
rpush k1 v1 v2 v3
rpush k2 1 2 3
rpoplpush k1 k2
lrange k1 0 -1
lrange k2 0 -1lrange
lpush k1 v3 v2 v1
# 从左边开始获取索引 0-最后一个元素 列表,-1 表示列表的最后一个元素,-2 表示列表的倒数第二个元素
# https://www.runoob.com/redis/lists-lrange.html
lrange k1 0 -1
rpush k2 v1 v2 v3
lrange k2 0 -1
lrange k2 0 1lindex
获取指定索引的元素,不会删除元素。
rpush k1 v1 v2 v3
# 结果为v1
lindex k1 0
# 结果为v2
lindex k2 1llen
获取列表长度。
rpush k1 v1 v2 v3
llen k1linsert
# 在指定元素前插入元素
rpush k1 v1 v2 v3
linsert k1 before v2 newv2
lrange k1 0 -1
# 在指定元素后插入元素
rpush k2 v1 v2 v3
linsert k2 after v2 newv2
lrange k2 0 -1lrem
# 从左边起删除2个v2元素
rpush k1 v1 v2 v2 v3
lrem k1 2 v2
lrange k1 0 -1lset
# 替换指定索引元素
rpush k1 v1 v2 v3
lset k1 0 v11
lrange k1 0 -1blpop、brpop
blpop 和 brpop 与 lpop 和 rpop 类似,只不过在没有元素时等待指定时间,而不是直接返回 nil。
rpush k1 1
# rpop 超时 5 秒
brpop k1 5使用 list 类型模拟栈
使用 lpush、lpop 命令组合,使用 rpush、rpop 命令组合。
使用 list 类型模拟队列
使用 lpush、rpop 命令组合,使用 rpush、lpop 命令组合。
使用 list 类型模拟阻塞队列
使用 lpush、brpop 命令组合,使用 rpush、blpop 命令组合。
set 类型
元素不重复,无序排列
sadd、srem
# 添加元素
sadd k1 v1 v2 v3
smembers k1
# 删除集合元素
sadd k1 v1 v2 v3
srem k1 v2
smembers k1scard
# 返回集合元素个数
sadd k1 v1 v2 v3
scard k1sismember
# 判断元素是否存在
sadd k1 v1 v2 v3
# 结果返回1
sismember k1 v2
# 结果返回0
sismember k1 888smembers
# 获取全部元素
sadd k1 v1 v2 v3
smembers k1spop
# 从集合中随机pop一个元素
sadd k1 v1 v2 v3
spop k1
smembers k1smove
# 把集合中的一个值移动到另外一个集合中
sadd k1 v1 v2 v3 v5
sadd k2 v1 v2
# 把k1中的v5移动到k2中
smove k1 k2 v5
smembers k1
smembers k2sinter
# 求集合的交集
sadd k1 v1 v2 v3
sadd k2 v3 v5 v6
# 结果返回v3
sinter k1 k2sunion
# 求集合的并集
sadd k1 v1 v2 v3
sadd k2 v3 v5 v6
# 结果返回 v1 v2 v3 v5 v6
sunion k1 k2sdiff
# 求集合 k1-k2 差集结果
sadd k1 v1 v2 v3
sadd k2 v3 v5 v6
# 结果返回 v1 v2
sdiff k1 k2zset 类型(有序集合)
介绍
Redis 的 ZSet(Sorted Set)是一种非常强大且灵活的数据结构,它结合了集合(Set)和有序列表(Sorted List)的特性。以下是关于 Redis ZSet 的详细解释:
一、基本特性
- 唯一性:ZSet 中的成员是唯一的,即不会有重复的成员存在。
- 有序性:每个成员都关联着一个分数(score),Redis 会根据这些分数对成员进行排序。默认情况下,分数较小的成员排在前面,但也可以通过相关命令来反转排序顺序。
- 高效性:ZSet 提供了多种高效的操作,如按分数范围查询、按排名范围查询、增加或减少成员的分数等。这些操作的时间复杂度通常较低,使得 ZSet 在处理大量数据时依然能够保持高性能。
二、底层实现
ZSet 的底层实现结合了哈希表(Hash Table)和跳表(Skip List)两种数据结构:
- 哈希表:用于存储成员和分数之间的映射关系。这使得 ZSet 能够在 O(1) 时间复杂度内快速查找成员的分数。
- 跳表:用于维护成员的有序性。跳表是一种概率型数据结构,它能够在 O(log N) 时间复杂度内完成成员的查找、插入和删除操作。同时,跳表还能够支持范围查询等复杂操作。
三、常用命令
Redis 提供了丰富的 ZSet 命令,以满足各种应用场景的需求。以下是一些常用的 ZSet 命令:
ZADD key score member:向 ZSet 中添加一个成员,并为其指定分数。如果成员已经存在,则更新其分数。ZRANGE key start stop [WITHSCORES]:按分数从小到大返回 ZSet 中指定范围的成员。可选地,可以返回成员的分数。ZREVRANGE key start stop [WITHSCORES]:按分数从大到小返回 ZSet 中指定范围的成员。可选地,可以返回成员的分数。ZSCORE key member:返回 ZSet 中指定成员的分数。ZRANK key member:返回 ZSet 中指定成员的排名(按分数从小到大排序)。排名从 0 开始。ZREVRANK key member:返回 ZSet 中指定成员的排名(按分数从大到小排序)。排名从 0 开始。ZINCRBY key increment member:增加 ZSet 中指定成员的分数。increment为增加的分数值。ZDECRBY key decrement member:减少 ZSet 中指定成员的分数。decrement为减少的分数值。ZREM key member [member ...]:移除 ZSet 中一个或多个成员。ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]:按分数范围返回 ZSet 中的成员。可选地,可以返回成员的分数,并使用 LIMIT 子句来限制返回的结果数量。ZREMRANGEBYRANK key start stop:移除 ZSet 中指定排名的成员。ZREMRANGEBYSCORE key min max:移除 ZSet 中指定分数范围的成员。
四、应用场景
由于 ZSet 的独特特性,它在许多应用场景中都表现出色。以下是一些典型的应用场景:
- 排行榜:ZSet 可以用来实现各种排行榜功能,如游戏排行榜、文章阅读量排行榜等。成员可以是用户ID或文章ID,分数可以是用户的得分或文章的阅读量。
- 延迟队列:可以将任务的执行时间作为分数,将任务ID作为成员存储在 ZSet 中。然后,通过定时检查 ZSet 中分数最小的成员(即最早应该执行的任务)来实现延迟执行的功能。
- 范围查询:ZSet 支持高效的范围查询操作,可以用来实现如价格筛选、时间筛选等功能。
总之,Redis 的 ZSet 是一种非常强大且灵活的数据结构,适用于许多不同的应用场景。通过合理利用 ZSet 的特性,可以大大提高应用程序的性能和可扩展性。
zadd
# 创建集合
zadd topn 100 java 400 c++ 200 php 300 mysqlzrange
# 根据索引烦范围升序排序返回结果 java php mysql c++
zadd topn 100 java 400 c++ 200 php 300 mysql
zrange topn 0 -1
# 获取结果包含分数
zadd topn 100 java 400 c++ 200 php 300 mysql
zrange topn 0 -1 withscoreszrangebyscore、zrevrangebyscore
# 返回200-300之间(包括200和300),升序排列
zadd topn 100 java 400 c++ 200 php 300 mysql
zrangebyscore topn 200 300 withscores
# 返回200-300之间(包括200和300),降序排序
zadd topn 100 java 400 c++ 200 php 300 mysql
zrevrangebyscore topn 300 200 withscoreszrangebyscore 和 zrevrangebyscore 区别
在 Redis 中,ZRANGEBYSCORE 和 ZREVRANGEBYSCORE 是两个用于操作有序集合(Sorted Set)的命令,它们允许用户根据分数范围来获取集合中的元素,但排序方式有所不同。
ZRANGEBYSCORE
ZRANGEBYSCORE 命令返回有序集合中指定分数区间的成员列表,成员按分数值递增(从小到大)次序排列。如果具有相同分数值的成员,它们会按字典序来排列。
命令格式:
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]key:有序集合的键名。min和max:分数的最小值和最大值,用于筛选范围内的元素。可以使用-inf和+inf分别表示负无穷和正无穷。WITHSCORES:可选参数,如果指定此选项,命令将返回元素及其对应的分数。LIMIT offset count:可选参数,用于分页。offset是结果集的起始位置(从 0 开始),count是要返回的元素数量。
ZREVRANGEBYSCORE
ZREVRANGEBYSCORE 命令与 ZRANGEBYSCORE 类似,但它返回的是有序集合中指定分数区间内的所有成员,但成员是按分数值递减(从大到小)的次序排列的。如果具有相同分数值的成员,它们会按字典序的逆序排列。
命令格式:
ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]- 参数与
ZRANGEBYSCORE类似,但需要注意的是max和min的位置是相反的,这是因为ZREVRANGEBYSCORE是按分数递减排序的。 - 其他参数的含义与
ZRANGEBYSCORE相同。
主要区别
- 排序方式:
ZRANGEBYSCORE是按分数递增排序,而ZREVRANGEBYSCORE是按分数递减排序。 - 参数顺序:在
ZREVRANGEBYSCORE命令中,max和min的参数顺序与ZRANGEBYSCORE相反,以反映递减的排序方式。
示例
假设我们有一个有序集合 salary,其中包含以下元素及其分数:
- jack: 2500
- tom: 5000
- peter: 12000
使用 ZRANGEBYSCORE 获取分数在 2500 到 5000 之间的元素:
ZRANGEBYSCORE salary 2500 5000返回结果:
1) "jack"
2) "tom"使用 ZREVRANGEBYSCORE 获取分数在 2500 到 12000 之间的元素,但按分数递减排序:
ZREVRANGEBYSCORE salary 12000 2500返回结果:
1) "peter"
2) "tom"
3) "jack"注意,在 ZREVRANGEBYSCORE 的示例中,尽管我们指定了 12000 到 2500 的范围,但由于是按递减排序,所以返回的结果是从最高分到最低分的顺序。同时,由于我们没有使用 WITHSCORES 选项,所以返回的结果中只包含了元素名,没有包含对应的分数。如果需要包含分数,可以在命令中添加 WITHSCORES 选项。
zincrby
# 增加指定值的score
zadd topn 100 java 400 c++ 200 php 300 mysql
# java score增加50
zincrby topn 50 javazrem
# 删除指定member
zadd topn 100 java 400 c++ 200 php 300 mysql
zrem topn javazcount
# 统计指定分数区间内元素个数
zadd topn 100 java 400 c++ 200 php 300 mysql
zcount topn 200 300zrank
# 查询指定member排名
zadd topn 100 java 400 c++ 200 php 300 mysql
# 返回2,表示排名为第3位
zrank topn mysql发布和订阅
提醒:如果您的应用程序需要更强的投递保证,您可能需要了解 Redis Streams。流中的消息会被持久化,并且支持“最多一次”和“至少一次”的投递语义。
基于 Jedis 发布和订阅详细用法请参考 示例
基于 RedisTemplate 发布和订阅详细用法请参考 示例
官方参考
SUBSCRIBE、UNSUBSCRIBE 和 PUBLISH 实现了发布/订阅消息传递范式,其中(引用维基百科)发送者(发布者)无需编程将消息发送给特定的接收者(订阅者)。相反,已发布的消息会被划分到不同的频道,而无需知道有哪些订阅者(如果有)。订阅者会对一个或多个频道表示感兴趣,并且只接收感兴趣的消息,而无需知道有哪些发布者(如果有)。这种发布者和订阅者的解耦允许更高的可扩展性和更动态的网络拓扑。
例如,为了订阅频道“channel11”和“ch:00”,客户端发出 SUBSCRIBE 并提供频道名称:
SUBSCRIBE channel11 ch:00其他客户端发送到这些频道的消息将被 Redis 推送给所有订阅的客户端。订阅者按照消息发布的顺序接收消息。
订阅了一个或多个频道的客户端不应发出命令,但它可以向其他频道执行 SUBSCRIBE 和 UNSUBSCRIBE 操作。订阅和取消订阅操作的回复以消息的形式发送,以便客户端只需读取连贯的消息流,其中第一个元素指示消息的类型。在已订阅的 RESP2 客户端上下文中允许的命令包括:
但是,如果使用 RESP3(参见 HELLO),客户端可以在订阅状态下发出任何命令。
请注意,使用 redis-cli 时,在订阅模式下不能使用 UNSUBSCRIBE 和 PUNSUBSCRIBE 等命令,因为 redis-cli 不会接受任何命令,只能使用 Ctrl-C 退出该模式。
传递语义
Redis 的 Pub/Sub 协议实现了“最多一次”的消息传递语义。顾名思义,这意味着一条消息最多只会被传递一次。一旦 Redis 服务器发出消息,就不可能再次发送。如果订阅者无法处理该消息(例如,由于错误或网络断开),该消息将永远丢失。
如果您的应用程序需要更强的投递保证,您可能需要了解 Redis Streams。流中的消息会被持久化,并且支持“最多一次”和“至少一次”的投递语义。
推送消息格式
消息是一个包含三个元素的数组回复。
第一个元素是消息类型:
- subscribe:表示我们成功订阅了回复中第二个元素指定的频道。第三个参数表示我们当前订阅的频道数量。
- unsubscribe:表示我们已成功取消订阅回复中第二个元素指定的频道。第三个参数表示我们当前订阅的频道数量。当最后一个参数为零时,我们不再订阅任何频道,并且由于我们已脱离发布/订阅状态,客户端可以发出任何类型的 Redis 命令。
- message:它是另一个客户端发出 PUBLISH 命令后收到的消息。第二个元素是发起通道的名称,第三个参数是实际的消息负载。
数据库和范围
Pub/Sub 与键空间无关。它的设计初衷是在任何层面上都不会干扰键空间,包括数据库编号。
在 db 10 上发布的内容将被 db 1 上的订阅者听到。
如果您需要某种范围界定,请在渠道前加上环境名称(测试、暂存、生产……)。
Wire 协议示例
SUBSCRIBE first second
*3
$9
subscribe
$5
first
:1
*3
$9
subscribe
$6
second
:2此时,我们从另一个客户端向名为 second 的频道发出 PUBLISH 操作:
> PUBLISH second Hello这是第一个客户端收到的内容:
*3
$7
message
$6
second
$5
Hello现在客户端使用 UNSUBSCRIBE 命令取消订阅所有频道,无需任何附加参数:
UNSUBSCRIBE
*3
$11
unsubscribe
$6
second
:1
*3
$11
unsubscribe
$5
first
:0模式匹配订阅
Redis 的 Pub/Sub 实现支持模式匹配。客户端可以订阅全局模式,接收所有发送到与指定模式匹配的频道名称的消息。
例如:
PSUBSCRIBE news.*将接收发送到频道 news.art.figurative、news.music.jazz 等的所有消息。所有 glob 样式模式均有效,因此支持多个通配符。
PUNSUBSCRIBE news.*然后将取消客户端对该模式的订阅。此调用不会影响其他订阅。
通过模式匹配收到的消息以不同的格式发送:
- 消息类型为 pmessage:它是另一个客户端发出的 PUBLISH 命令后收到的消息,该命令与模式匹配订阅匹配。第二个元素是匹配的原始模式,第三个元素是原始频道的名称,最后一个元素是实际的消息负载。
与 SUBSCRIBE 和 UNSUBSCRIBE 类似,系统会通过发送 psubscribe 和 punsubscribe 类型的消息来确认 PSUBSCRIBE 和 PUNSUBSCRIBE 命令,其格式与订阅和取消订阅消息格式相同。
匹配模式和频道订阅的消息
如果客户端订阅了与已发布消息匹配的多个模式,或者同时订阅了与该消息匹配的模式和频道,则客户端可能会多次收到同一条消息。以下示例展示了这一点:
SUBSCRIBE foo
PSUBSCRIBE f*在上面的例子中,如果向通道 foo 发送一条消息,客户端将收到两条消息:一条是 message 类型,另一条是 pmessage 类型。
模式匹配的订阅计数的含义
在 subscribe、unsubscribe、psubscribe 和 punsubscribe 消息类型中,最后一个参数是仍在活动的订阅数量。此数字表示客户端仍在订阅的频道和模式的总数。因此,只有当此数量因取消订阅所有频道和模式而降至零时,客户端才会退出发布/订阅状态。
分片发布/订阅
从 Redis 7.0 开始,引入了分片发布/订阅 (Sharded Pub/Sub) 功能,其中分片通道的分配算法与将键分配给槽的算法相同。分片消息必须发送到拥有分片通道哈希到的槽的节点。集群确保已发布的分片消息转发到分片中的所有节点,因此客户端可以通过连接到负责该槽的主服务器或其任何副本服务器来订阅分片通道。SSUBSCRIBE、SUNSUBSCRIBE 和 SPUBLISH 用于实现分片发布/订阅。
分片发布/订阅有助于在集群模式下扩展发布/订阅的使用率。它将消息的传播限制在集群的分片内。因此,与全局发布/订阅(每条消息都会传播到集群中的每个节点)相比,通过集群总线传输的数据量受到限制。这允许用户通过添加更多分片来水平扩展发布/订阅的使用率。
编程示例
Pieter Noordhuis 提供了一个很好的示例,使用 EventMachine 和 Redis 创建多用户高性能网络聊天。
客户端库实现提示
因为所有收到的消息都包含导致消息传递的原始订阅(对于消息类型,为通道;对于 pmessage 类型,为原始模式),所以客户端库可以使用哈希表将原始订阅绑定到回调(可以是匿名函数、块、函数指针)。
当收到消息时,可以进行 O(1) 查找以将消息传递给已注册的回调。
CLI 实验
# 订阅channel1
subscribe channel1
# 向channel1发布消息
publish channel1 hellostream 类型
todo
- 消息已经处理但没有 ack 怎么后续处理呢?
- 性能测试
介绍
Redis Stream是Redis 5.0版本引入的一种专门为消息队列设计的数据结构。以下是对Redis Stream的详细介绍:
一、特点与功能
- 消息队列:Redis Stream提供了消息队列的功能,支持消息的持久化和主从复制,使得客户端可以访问任何时刻的数据。
- 消息有序性:Stream中的消息根据ID进行有序排列,这些ID默认由时间戳组成,保证了消息的有序性。
- 消费者模式:Redis Stream支持独立消费者模式和消费者组模式。在消费者组模式中,可以挂多个消费组,每个消费组内的消费者是竞争关系,不会重复消费同一条消息。
- ACK确认机制:消费者在处理完消息后,需要通过ACK确认机制通知Stream已经处理完消息,确保消息不会被重复处理。
- 持久化与复制:Stream数据存储在内存中,但可以通过RDB进行持久化到硬盘,同时支持主从复制,保证了数据的高可用性。
二、数据结构
Redis Stream的数据结构主要由以下几个部分组成:
- 消息链表:将所有加入的消息都串起来,形成一个链表结构。
- 消息ID:每个消息都有一个唯一的ID,用于标识消息的顺序和位置。
- 消息内容:消息内容是一个键值对的集合,类似于Redis Hash结构。
三、基础命令
Redis Stream提供了一系列基础命令用于操作和管理Stream,以下是一些常用的命令:
- XADD:向Stream末尾添加消息。如果指定的Stream不存在,则会自动创建。
- XRANGE:按范围获取Stream中的消息。支持指定开始和结束ID,以及返回的消息数量。
- XREVRANGE:反向按范围获取Stream中的消息。与XRANGE类似,但返回的消息顺序相反。
- XREAD:非阻塞地读取一个或多个Stream中的新消息。支持阻塞等待新消息的到来。
- XREADGROUP:阻塞读取消息并将其分配给消费者组中的消费者。支持指定消费者组和消费者名称。
- XACK:确认消息已被消费。消费者在处理完消息后,需要调用此命令通知Stream。
- XPENDING:获取待处理的消息列表。用于监控消费者的处理情况,以及获取尚未被确认的消息。
- XDEL:删除Stream中的消息。可以根据消息ID删除一个或多个消息。
- XTRIM:对流进行修剪,限制Stream的长度。可以精确指定长度或模糊指定长度阈值。
- XINFO:查看Stream和消费者组的相关信息。支持查看Stream的详细信息、消费者组的信息以及组内消费者的信息等。
四、使用场景
Redis Stream适用于以下场景:
- 实时消息系统:如聊天系统、即时通讯应用等,需要实时传递和处理消息。
- 日志收集与处理:将系统日志、应用日志等收集到Stream中,进行实时处理和分析。
- 任务队列:将需要异步处理的任务添加到Stream中,由消费者组内的消费者竞争处理。
- 消息分发系统:将消息分发到多个消费者组或独立消费者中,实现消息的多播和分组消费。
综上所述,Redis Stream是一种功能强大的消息队列数据结构,适用于多种实时消息处理场景。通过合理使用其基础命令和特性,可以实现高效、可靠的消息传递和处理。
官方参考
Redis 流简介
Redis 流是一种数据结构,其作用类似于仅追加日志,但它也实现了多种操作来克服典型仅追加日志的一些限制。这些限制包括 O(1) 时间复杂度的随机访问和复杂的消费策略,例如消费者组。您可以使用流来实时记录和同步事件。Redis 流用例示例包括:
- 事件源(例如,跟踪用户操作、点击等)
- 传感器监控(例如,现场设备的读数)
- 通知(例如,将每个用户的通知记录存储在单独的流中)
Redis 会为每个流条目生成一个唯一的 ID。您可以使用这些 ID 稍后检索与其关联的条目,或读取和处理流中的所有后续条目。请注意,由于这些 ID 与时间相关,因此此处显示的 ID 可能会有所不同,并且与您在自己的 Redis 实例中看到的 ID 不同。
Redis 流支持多种修剪策略(以防止流无限制增长)和多种消费策略(参见 XREAD、XREADGROUP 和 XRANGE)。
基本命令
- XADD 向流中添加新条目。
- XREAD 读取一个或多个条目,从给定位置开始并随时间向前移动。
- XRANGE 返回两个提供的条目 ID 之间的条目范围。
- XLEN 返回流的长度。
查看 流命令的完整列表。
示例
当我们的赛车手通过检查站时,我们会为每个赛车手添加一个流条目,其中包括赛车手的姓名、速度、位置和位置 ID:
bash> XADD race:france * rider Castilla speed 30.2 position 1 location_id 1 "1692632086370-0" > XADD race:france * rider Norem speed 28.8 position 3 location_id 1 "1692632094485-0" > XADD race:france * rider Prickett speed 29.7 position 2 location_id 1 "1692632102976-0"读取从 ID 1692632086370-0 开始的两个流条目:
bash> XRANGE race:france 1692632086370-0 + COUNT 2 1) 1) "1692632086370-0" 2) 1) "rider" 2) "Castilla" 3) "speed" 4) "30.2" 5) "position" 6) "1" 7) "location_id" 8) "1" 2) 1) "1692632094485-0" 2) 1) "rider" 2) "Norem" 3) "speed" 4) "28.8" 5) "position" 6) "3" 7) "location_id" 8) "1"从流末尾开始读取最多 100 个新流条目,如果没有写入条目,则阻塞最多 300 毫秒:
bash> XREAD COUNT 100 BLOCK 300 STREAMS race:france $ (nil)
性能
向流中添加条目的复杂度为 O(1)。访问任何单个条目的复杂度为 O(n),其中 n 是 ID 的长度。由于流 ID 通常较短且长度固定,因此这实际上简化为常数时间查找。有关原因的详细信息,请注意流是以基数树实现的。
简而言之,Redis 流提供高效的插入和读取操作。查看每个命令的时间复杂度以了解更多详情。
流基础知识
流是一种仅可追加的数据结构。其基本写入命令名为 XADD,用于将新条目追加到指定的流中。
每个流条目由一个或多个字段值对组成,有点像字典或 Redis 哈希:
> XADD race:france * rider Castilla speed 29.9 position 1 location_id 2
"1692632147973-0"上述 XADD 命令调用将一个条目添加到流中,该条目名为 rider:Castilla,速度:29.9,位置:1,location_id:2,其键名为 race:france,使用自动生成的条目 ID(即命令返回的 ID),具体为 1692632147973-0。它的第一个参数是键名 race:france,第二个参数是用于标识流中每个条目的条目 ID。但是,在本例中,我们传递了 *,因为我们希望服务器为我们生成一个新的 ID。每个新的 ID 都会单调递增,因此,更简单地说,每个新添加的条目的 ID 都会比所有过去的条目更高。服务器自动生成 ID 几乎总是您想要的,而明确指定 ID 的情况非常少见。我们稍后会详细讨论这一点。每个流条目都有一个 ID 这一事实与日志文件的另一个相似之处是,日志文件可以使用行号或文件内的字节偏移量来标识给定的条目。回到我们的 XADD 示例,在键名和 ID 之后,下一个参数是组成流条目的字段值对。
只需使用 XLEN 命令即可获取 Stream 内的项目数量:
> XLEN race:france
(integer) 4条目 ID
XADD 命令返回的条目 ID 明确标识给定流中的每个条目,它由两部分组成:
<millisecondsTime>-<sequenceNumber>毫秒时间部分实际上是生成流 ID 的本地 Redis 节点的本地时间,但是,如果当前毫秒时间恰好小于前一个条目时间,则使用前一个条目时间。因此,即使时钟向后跳转,单调递增的 ID 属性仍然成立。序列号用于在同一毫秒内创建的条目。由于序列号宽度为 64 位,因此实际上在同一毫秒内可以生成的条目数量没有限制。
这类 ID 的格式乍一看可能有点奇怪,读者可能会好奇为什么时间会包含在 ID 中。这是因为 Redis 流支持通过 ID 进行范围查询。由于 ID 与条目的生成时间相关,因此可以几乎不费吹灰之力地查询时间范围。我们稍后会在介绍 XRANGE 命令时看到这一点。
如果由于某种原因,用户需要与时间无关但实际上与另一个外部系统 ID 相关联的增量 ID,如前所述,XADD 命令可以采用显式 ID,而不是触发自动生成的 * 通配符 ID,如下例所示:
> XADD race:usa 0-1 racer Castilla
0-1
> XADD race:usa 0-2 racer Norem
0-2请注意,在这种情况下,最小 ID 为 0-1,并且该命令不会接受等于或小于前一个 ID 的 ID:
> XADD race:usa 0-1 racer Prickett
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item如果您运行的是 Redis 7 或更高版本,您还可以提供仅包含毫秒部分的显式 ID。在这种情况下,ID 的序列部分将自动生成。为此,请使用以下语法:
> XADD race:usa 0-* racer Prickett
0-3从 Streams 获取数据
现在我们终于可以通过 XADD 在流中追加条目了。然而,虽然将数据追加到流中显而易见,但查询流以提取数据的方式却不那么显而易见。如果我们继续以日志文件为例,一个显而易见的方法是模仿我们通常使用 Unix 命令 tail -f 的操作,也就是说,我们可以开始监听以获取追加到流中的新消息。需要注意的是,与 Redis 的阻塞列表操作不同,在阻塞列表操作中,给定元素会到达单个客户端,而该客户端在 BLPOP 等弹出操作中处于阻塞状态。对于流,我们希望多个消费者能够看到追加到流中的新消息(就像许多 tail -f 进程可以看到日志中添加的内容一样)。使用传统术语,我们希望流能够将消息扇出到多个客户端。
然而,这只是一种潜在的访问模式。我们也可以以完全不同的方式看待流:不再将其视为消息系统,而是将其视为时间序列存储。在这种情况下,获取附加的新消息可能也很有用,但另一种自然的查询模式是按时间范围获取消息,或者使用游标迭代消息以增量方式检查所有历史记录。这无疑是另一种有用的访问模式。
最后,如果我们从消费者的角度来看待流,我们可能希望以另一种方式访问它,即将其作为一条消息流,并划分给多个正在处理这些消息的消费者,这样消费者组就只能看到单个流中到达的消息子集。通过这种方式,可以跨不同的消费者扩展消息处理,而无需单个消费者处理所有消息:每个消费者只需处理不同的消息。这基本上就是 Kafka (TM) 对消费者组所做的工作。通过消费者组读取消息是从 Redis 流中读取消息的另一种有趣模式。
Redis Streams 通过不同的命令支持上述三种查询模式。下一节将从最简单、最直接的使用方式开始,逐一介绍它们:范围查询。
按范围查询:XRANGE 和 XREVRANGE
要按范围查询流,我们只需指定两个 ID:起始 ID 和结束 ID。返回的范围将包含以起始 ID 或结束 ID 为 ID 的元素,因此该范围包含起始 ID 和结束 ID。两个特殊 ID - 和 + 分别表示可能的最小 ID 和最大 ID。
> XRANGE race:france - +
1) 1) "1692632086370-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"
2) 1) "1692632094485-0"
2) 1) "rider"
2) "Norem"
3) "speed"
4) "28.8"
5) "position"
6) "3"
7) "location_id"
8) "1"
3) 1) "1692632102976-0"
2) 1) "rider"
2) "Prickett"
3) "speed"
4) "29.7"
5) "position"
6) "2"
7) "location_id"
8) "1"
4) 1) "1692632147973-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "29.9"
5) "position"
6) "1"
7) "location_id"
8) "2"返回的每个条目都是一个包含两项的数组:ID 和字段值对列表。我们已经说过,条目 ID 与时间相关,因为 - 字符左侧的部分是创建流条目的本地节点在创建条目时的 Unix 时间(以毫秒为单位)(但请注意,流是使用完全指定的 XADD 命令复制的,因此副本的 ID 将与主节点相同)。这意味着我可以使用 XRANGE 查询一个时间范围。但是,为了做到这一点,我可能需要省略 ID 的序列部分:如果省略,则在范围的开头它将被假定为 0,而在结尾部分它将被假定为可用的最大序列号。这样,仅使用两毫秒的 Unix 时间进行查询,我们就能以包含的方式获得在该时间范围内生成的所有条目。例如,如果我想查询两毫秒的时间段,我可以使用:
> XRANGE race:france 1692632086369 1692632086371
1) 1) "1692632086370-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"我在这个范围内只有一个条目。然而,在实际数据集中,我可能会查询几小时的范围,或者在短短两毫秒内可能会有很多条目,返回的结果可能会非常庞大。因此,XRANGE 在末尾支持一个可选的 COUNT 选项。通过指定计数,我可以只获取前 N 个条目。如果我想要更多条目,我可以获取返回的最后一个 ID,将序列部分加一,然后再次查询。让我们在以下示例中看一下。假设 race:france 流包含 4 个条目。为了开始迭代,每个命令获取 2 个条目,我从完整范围开始,但计数为 2。
> XRANGE race:france - + COUNT 2
1) 1) "1692632086370-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"
2) 1) "1692632094485-0"
2) 1) "rider"
2) "Norem"
3) "speed"
4) "28.8"
5) "position"
6) "3"
7) "location_id"
8) "1"为了继续对接下来的两个项目进行迭代,我必须选择返回的最后一个 ID,即 1692632094485-0,并在其后添加前缀 (。得到的独占范围间隔,即本例中的 (1692632094485-0,现在可以用作下一个 XRANGE 调用的新起始参数:
> XRANGE race:france (1692632094485-0 + COUNT 2
1) 1) "1692632102976-0"
2) 1) "rider"
2) "Prickett"
3) "speed"
4) "29.7"
5) "position"
6) "2"
7) "location_id"
8) "1"
2) 1) "1692632147973-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "29.9"
5) "position"
6) "1"
7) "location_id"
8) "2"现在我们已经从只有 4 个条目的流中检索了 4 个项目,如果我们尝试检索更多项目,我们将得到一个空数组:
> XRANGE race:france (1692632147973-0 + COUNT 2
(empty array)由于 XRANGE 的查找复杂度为 O(log(N)),返回 M 个元素的复杂度为 O(M),因此,当元素数量较少时,该命令的时间复杂度为对数级,这意味着迭代的每一步都很快。因此,XRANGE 也是事实上的流迭代器,不需要 XSCAN 命令。
命令 XREVRANGE 相当于 XRANGE,但返回元素的顺序是相反的,因此 XREVRANGE 的实际用途是检查 Stream 中的最后一项是什么:
> XREVRANGE race:france + - COUNT 1
1) 1) "1692632147973-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "29.9"
5) "position"
6) "1"
7) "location_id"
8) "2"请注意,XREVRANGE 命令以相反的顺序采用开始和停止参数。
使用 XREAD 聆听新内容
当我们不想按范围访问流中的项目时,通常我们想要的是订阅到达流的新项目。这个概念可能看起来与 Redis 的发布/订阅(订阅频道)或 Redis 的阻塞列表(等待键获取要获取的新元素)相关,但在使用流的方式上存在根本区别:
- 一个流中可以有多个客户端(消费者)等待数据。默认情况下,每个新数据项都会被投递给给定流中每个等待数据的消费者。此行为不同于阻塞列表,阻塞列表中的每个消费者都会获得不同的数据项。但是,向多个消费者分发数据的能力类似于发布/订阅模式。
- 在发布/订阅中,消息是“发射后不管”的,永远不会被存储;在使用阻止列表时,当客户端收到一条消息时,它会被弹出(实际上是从列表中移除)。而流的工作方式则截然不同。所有消息都会无限期地添加到流中(除非用户明确要求删除条目):不同的消费者会通过记住最后一条收到的消息的 ID 来从自身的角度判断哪些是新消息。
- Streams Consumer Groups 提供了 Pub/Sub 或阻止列表无法实现的控制级别,对同一流有不同的组,明确确认已处理的项目,检查待处理的项目的能力,声明未处理的消息,以及每个客户端的连贯历史记录可见性,只能查看其私人的消息历史记录。
提供监听流中新消息功能的命令称为 XREAD。它比 XRANGE 稍微复杂一些,因此我们将先展示简单的形式,稍后再提供完整的命令布局。
> XREAD COUNT 2 STREAMS race:france 0
1) 1) "race:france"
2) 1) 1) "1692632086370-0"
2) 1) "rider"
2) "Castilla"
3) "speed"
4) "30.2"
5) "position"
6) "1"
7) "location_id"
8) "1"
2) 1) "1692632094485-0"
2) 1) "rider"
2) "Norem"
3) "speed"
4) "28.8"
5) "position"
6) "3"
7) "location_id"
8) "1"以上是 XREAD 的非阻塞形式。请注意,COUNT 选项并非强制选项,实际上该命令唯一强制的选项是 STREAMS 选项,它指定一个键列表以及调用方消费者已在每个流中看到的相应最大 ID,这样该命令将仅向客户端提供 ID 大于我们指定的 ID 的消息。
在上面的命令中,我们写入了 STREAMS race:france 0,这意味着我们希望“race:france”流中所有消息的 ID 都大于 0-0。正如您在上面的示例中所见,该命令返回的是键名,因为实际上可以使用多个键调用此命令来同时从不同的流中读取数据。例如,我可以这样写:STREAMS race:france race:italy 0 0。请注意,在 STREAMS 选项之后,我们需要提供键名,然后再提供 ID。因此,STREAMS 选项必须始终放在最后一个选项中。任何其他选项都必须放在 STREAMS 选项之前。
除了 XREAD 可以同时访问多个流,并且我们可以指定最后一个 ID 来获取更新的消息之外,在这个简单的形式下,该命令与 XRANGE 并没有什么不同。然而,有趣的是,我们可以通过指定 BLOCK 参数轻松地将 XREAD 转换为阻塞命令:
> XREAD BLOCK 0 STREAMS race:france $请注意,在上面的示例中,除了删除 COUNT 之外,我还指定了新的 BLOCK 选项,并将超时设置为 0 毫秒(即永不超时)。此外,我没有为流 mystream 传递普通的 ID,而是传递了特殊 ID $。这个特殊 ID 意味着 XREAD 应该使用流 mystream 中已存储的最大 ID 作为最后一个 ID,这样我们就会从开始监听时开始只接收新消息。这在某种程度上类似于 Unix 命令 tail -f。
请注意,使用 BLOCK 选项时,我们不必使用特殊 ID $。我们可以使用任何有效 ID。如果命令能够立即处理我们的请求而不会阻塞,它就会执行,否则就会阻塞。通常,如果我们想从新条目开始使用流,我们会从 ID $ 开始,然后继续使用收到的最后一条消息的 ID 进行下一次调用,依此类推。
XREAD 的阻塞形式也能够监听多个流,只需指定多个键名即可。如果请求可以同步处理,因为至少有一个流的元素大于我们指定的相应 ID,则返回结果。否则,该命令将阻塞并返回第一个获取新数据的流(根据指定的 ID)的项目。
与阻塞列表操作类似,阻塞流读取从等待数据的客户端的角度来看是公平的,因为它的语义遵循先进先出 (FIFO) 原则。当有新数据可用时,第一个被给定流阻塞的客户端将第一个解除阻塞。
XREAD 除了 COUNT 和 BLOCK 之外没有其他选项,因此它是一个非常基本的命令,其特定目的是将消费者连接到一个或多个流。使用消费者组 API 可以使用更强大的流消费功能,但是通过消费者组读取数据是由另一个名为 XREADGROUP 的命令实现的,本指南的下一节将介绍该命令。
消费者群体
当当前任务是从不同客户端消费同一条流时,XREAD 已经提供了一种扇出到 N 个客户端的方法,并且可能还会使用副本来提供更高的读取可扩展性。然而,在某些问题中,我们想要做的不是向多个客户端提供同一条消息流,而是向多个客户端提供来自同一条流的不同消息子集。一个明显的例子是处理速度较慢的消息时,这种方法非常有用:拥有 N 个不同的工作器来接收流的不同部分,使我们能够通过将不同的消息路由到准备好执行更多工作的不同工作器来扩展消息处理。
实际上,如果我们想象有三个消费者 C1、C2、C3 和一个包含消息 1、2、3、4、5、6、7 的流,那么我们希望根据下图提供消息:
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1为了实现这一点,Redis 使用了一个称为消费者组的概念。需要注意的是,从实现的角度来看,Redis 消费者组与 Kafka™ 消费者组没有任何关联。但它们的功能相似,因此我决定保留 Kafka™ 的术语,因为它最初推广了这一概念。
消费者组就像一个从流中获取数据的伪消费者,实际上为多个消费者提供服务,并提供某些保证:
- 每条消息都会发送给不同的消费者,因此同一条消息不可能被发送给多个消费者。
- 在消费者组中,消费者通过名称进行标识,该名称是一个区分大小写的字符串,由实现消费者的客户端选择。这意味着即使断开连接后,流消费者组仍会保留所有状态,因为客户端将再次声明为同一个消费者。然而,这也意味着客户端需要提供唯一的标识符。
- 每个消费者组都有第一个从未消费过的 ID 的概念,这样,当消费者请求新消息时,它可以只提供以前未传递的消息。
- 然而,消费消息需要使用特定命令进行显式确认。Redis 将确认解释为:此消息已被正确处理,因此可以将其从消费者组中移除。
- 消费者组会跟踪所有当前处于待处理状态的消息,即已投递给该消费者组中某个消费者但尚未被确认处理的消息。得益于此功能,当访问流的消息历史记录时,每个消费者只能看到已投递给它的消息。
从某种程度上来说,消费者群体可以被想象为关于流的一定量的状态:
+----------------------------------------+
| consumer_group_name: mygroup |
| consumer_group_stream: somekey |
| last_delivered_id: 1292309234234-92 |
| |
| consumers: |
| "consumer-1" with pending messages |
| 1292309234234-4 |
| 1292309234232-8 |
| "consumer-42" with pending messages |
| ... (and so forth) |
+----------------------------------------+从这个角度来看,很容易理解消费者组的功能,它如何只向消费者提供其待处理消息的历史记录,以及如何只向请求新消息的消费者提供大于 last_delivered_id 的消息 ID。同时,如果将消费者组视为 Redis 流的辅助数据结构,则显然单个流可以包含多个消费者组,每个消费者组包含不同的消费者集合。实际上,同一个流甚至可以有客户端通过 XREAD 读取而不使用消费者组,而客户端则通过不同消费者组中的 XREADGROUP 读取。
现在我们来仔细看看基本的消费者组命令。它们如下:
- XGROUP 用于创建、销毁和管理消费者组。
- XREADGROUP 用于通过消费者组从流中读取。
- XACK 是允许消费者将待处理消息标记为已正确处理的命令。
创建消费者组
假设我已经有一个流类型的关键 race:france,为了创建一个消费者组,我只需要执行以下操作:
> XGROUP CREATE race:france france_riders $
OK正如您在上面的命令中看到的,在创建消费者组时,我们必须指定一个 ID,在示例中就是 $。这是必需的,因为除其他状态外,消费者组必须知道在第一个消费者连接时接下来要提供什么消息,也就是说,该组刚刚创建时的最后一个消息 ID 是什么。如果我们像之前一样提供 $,那么只有从现在开始到达流中的新消息才会提供给组中的消费者。如果我们指定 0,则消费者组将首先使用流历史记录中的所有消息。当然,您可以指定任何其他有效 ID。您知道的是,消费者组将开始传递大于您指定的 ID 的消息。因为 $ 表示流中当前最大的 ID,所以指定 $ 将具有仅使用新消息的效果。
如果流不存在,XGROUP CREATE 还支持自动创建流,使用可选的 MKSTREAM 子命令作为最后一个参数:
> XGROUP CREATE race:italy italy_riders $ MKSTREAM
OK现在消费者组已创建,我们可以立即尝试使用 XREADGROUP 命令通过消费者组读取消息。我们将从名为 Alice 和 Bob 的消费者那里读取消息,看看系统如何将不同的消息返回给 Alice 或 Bob。
XREADGROUP 与 XREAD 非常相似,也提供相同的 BLOCK 选项,除此之外,它是一个同步命令。但是,有一个必须始终指定的强制选项,即 GROUP,它包含两个参数:消费者组的名称和尝试读取的消费者的名称。此外,还支持 COUNT 选项,其功能与 XREAD 中的相同。
我们将把 riders 添加到 race:italy 流中,并尝试使用消费者组读取一些内容:注意:这里的 rider 是字段名称,name 是关联值。记住,流项是小型字典。
> XADD race:italy * rider Castilla
"1692632639151-0"
> XADD race:italy * rider Royce
"1692632647899-0"
> XADD race:italy * rider Sam-Bodden
"1692632662819-0"
> XADD race:italy * rider Prickett
"1692632670501-0"
> XADD race:italy * rider Norem
"1692632678249-0"
> XREADGROUP GROUP italy_riders Alice COUNT 1 STREAMS race:italy >
1) 1) "race:italy"
2) 1) 1) "1692632639151-0"
2) 1) "rider"
2) "Castilla"XREADGROUP 回复与 XREAD 回复类似。但请注意上面提供的 GROUP <group-name> <consumer-name> 。它表示我想使用消费者组 mygroup 从流中读取数据,我是消费者 Alice。每次消费者对消费者组执行操作时,都必须指定其名称,以便在组中唯一地标识该消费者。
上面的命令行中还有一个非常重要的细节,在强制的 STREAMS 选项之后,为键 mystream 请求的 ID 是一个特殊 ID >。这个特殊 ID 仅在消费者组上下文中有效,其含义是:迄今为止从未传递给其他消费者的消息。
这几乎总是你想要的,但是也可以指定一个真实的 ID,例如 0 或任何其他有效的 ID。然而,在这种情况下,我们请求 XREADGROUP 只提供待处理消息的历史记录,在这种情况下,我们永远不会在组中看到新消息。因此,XREADGROUP 基本上会根据我们指定的 ID 做出以下行为:
- 如果 ID 是特殊 ID >,则该命令将仅返回迄今为止从未传递给其他消费者的新消息,并且作为副作用,将更新消费者组的最后一个 ID。
- 如果 ID 是任何其他有效的数字 ID,该命令将允许我们访问待处理消息的历史记录。待处理消息是指已投递给指定消费者(由提供的名称标识)且迄今为止未通过 XACK 确认的消息集合。
我们可以立即测试此行为,指定 ID 为 0,不带任何 COUNT 选项:我们只会看到唯一待处理的消息,即有关 Castilla 的消息:
> XREADGROUP GROUP italy_riders Alice STREAMS race:italy 0
1) 1) "race:italy"
2) 1) 1) "1692632639151-0"
2) 1) "rider"
2) "Castilla"但是,如果我们确认该消息已处理,它将不再是待处理消息历史记录的一部分,因此系统将不再报告任何内容:
> XACK race:italy italy_riders 1692632639151-0
(integer) 1
> XREADGROUP GROUP italy_riders Alice STREAMS race:italy 0
1) 1) "race:italy"
2) (empty array)如果您还不知道 XACK 如何工作,请不要担心,其原理就是已处理的消息不再是我们可以访问的历史记录的一部分。
现在轮到鲍勃读一些东西了:
> XREADGROUP GROUP italy_riders Bob COUNT 2 STREAMS race:italy >
1) 1) "race:italy"
2) 1) 1) "1692632647899-0"
2) 1) "rider"
2) "Royce"
2) 1) "1692632662819-0"
2) 1) "rider"
2) "Sam-Bodden"Bob 请求最多两条消息,并且正在通过同一个群组 mygroup 阅读。因此,Redis 只会报告新消息。正如您所见,“Castilla”消息未送达,因为它已经发送给了 Alice,所以 Bob 收到了 Royce 和 Sam-Bodden 的消息,依此类推。
这样,Alice、Bob 以及群组中的任何其他消费者都可以从同一个流中读取不同的消息,读取尚未处理的消息历史记录,或者将消息标记为已处理。这允许创建不同的拓扑和语义来从流中消费消息。
需要记住以下几点:
- 消费者在第一次被提及时就会自动创建,无需明确创建。
- 即使使用 XREADGROUP,您也可以同时读取多个键,但要使其正常工作,您需要在每个流中创建一个同名的消费者组。这并非常见需求,但值得一提的是,该功能在技术上是可行的。
- XREADGROUP 是一个写入命令,因为即使它从流中读取,消费者组也会作为读取的副作用被修改,因此它只能在主实例上调用。
下面是一个使用 Ruby 语言编写的使用消费者组的消费者实现示例。Ruby 代码旨在让几乎所有有经验的程序员都能读懂,即使他们不懂 Ruby:
require 'redis'
if ARGV.length == 0
puts "Please specify a consumer name"
exit 1
end
ConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.new
def process_message(id,msg)
puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end
$lastid = '0-0'
puts "Consumer #{ConsumerName} starting..."
check_backlog = true
while true
# Pick the ID based on the iteration: the first time we want to
# read our pending messages, in case we crashed and are recovering.
# Once we consumed our history, we can start getting new messages.
if check_backlog
myid = $lastid
else
myid = '>'
end
items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)
if items == nil
puts "Timeout!"
next
end
# If we receive an empty reply, it means we were consuming our history
# and that the history is now empty. Let's start to consume new messages.
check_backlog = false if items[0][1].length == 0
items[0][1].each{|i|
id,fields = i
# Process the message
process_message(id,fields)
# Acknowledge the message as processed
r.xack(:my_stream_key,GroupName,id)
$lastid = id
}
end如您所见,这里的想法是从消费历史记录(即待处理消息列表)开始。这很有用,因为消费者可能之前崩溃过,所以在重启的情况下,我们希望重新读取那些已发送给我们的、但尚未确认的消息。请注意,我们可能会多次处理一条消息,也可能只处理一次(至少在消费者故障的情况下是这样,但也涉及到 Redis 持久化和复制的限制,请参阅关于此主题的具体章节)。
一旦历史记录被使用,我们就会得到一个空的消息列表,我们可以切换到使用>特殊ID来使用新消息。
从永久性故障中恢复
上面的示例允许我们编写参与同一消费者组的消费者,每个消费者处理一部分消息,并在故障恢复时重新读取已发送给它们的待处理消息。然而,在现实世界中,消费者可能会永久失败,并且永远无法恢复。如果消费者因任何原因停止后永远无法恢复,那么这些待处理消息会怎样呢?
Redis 消费者组提供了一种功能,用于在这些情况下认领特定消费者的待处理消息,以便这些消息更改所有权并重新分配给其他消费者。该功能非常明确。消费者必须检查待处理消息列表,并使用特殊命令认领特定消息,否则服务器将永久保留待处理消息并分配给旧消费者。通过这种方式,不同的应用程序可以选择是否使用此功能,以及如何使用它。
此过程的第一步仅仅是一个命令,该命令提供对消费者组中待处理条目的可观察性,名为 XPENDING。这是一个只读命令,始终可以安全调用,并且不会更改任何消息的所有权。在最简单的形式中,该命令使用两个参数调用,分别是流的名称和消费者组的名称。
> XPENDING race:italy italy_riders
1) (integer) 2
2) "1692632647899-0"
3) "1692632662819-0"
4) 1) 1) "Bob"
2) "2"以这种方式调用时,该命令会输出消费者组中待处理消息的总数(本例中为两条)、待处理消息中 ID 较低的和 ID 较高的消息,以及最终的消费者列表及其待处理消息数量。我们只看到 Bob 有两条待处理消息,因为 Alice 请求的那条消息已使用 XACK 确认。
我们可以通过向 XPENDING 提供更多参数来请求更多信息,因为完整的命令签名如下:
XPENDING <key> <groupname> [[IDLE <min-idle-time>] <start-id> <end-id> <count> [<consumer-name>]]通过提供起始 ID 和结束 ID(可以是 XRANGE 中的 - 和 +),以及一个计数来控制命令返回的信息量,我们可以了解更多有关待处理消息的信息。最后一个可选参数,即消费者名称,用于将输出限制为仅显示特定消费者的待处理消息,但在以下示例中不会使用此功能。
> XPENDING race:italy italy_riders - + 10
1) 1) "1692632647899-0"
2) "Bob"
3) (integer) 74642
4) (integer) 1
2) 1) "1692632662819-0"
2) "Bob"
3) (integer) 74642
4) (integer) 1现在,我们得到了每条消息的详细信息:ID、消费者名称、空闲时间(以毫秒为单位)(即自上次将消息传递给某个消费者以来经过的毫秒数),以及该消息的投递次数。我们有两条来自 Bob 的消息,它们空闲了 60000 多毫秒,大约一分钟。
请注意,没有人阻止我们仅使用 XRANGE 检查第一条消息的内容。
> XRANGE race:italy 1692632647899-0 1692632647899-0
1) 1) "1692632647899-0"
2) 1) "rider"
2) "Royce"我们只需在参数中重复两次相同的 ID。现在我们有了一些想法,Alice 可能会认为,如果 Bob 一分钟没有处理消息,他可能无法快速恢复,因此需要认领这些消息并代替 Bob 继续处理。为此,我们使用 XCLAIM 命令。
此命令非常复杂,完整形式包含许多选项,因为它用于复制消费者组更改,但我们仅使用通常需要的参数。在本例中,它非常简单:
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>简单来说,对于这个特定的键和组,我希望指定的消息 ID 能够更改所有权,并分配给指定的消费者名称 <consumer>。此外,我们还提供了一个最小空闲时间,这样只有当上述消息的空闲时间大于指定的空闲时间时,该操作才会生效。这很有用,因为可能有两个客户端同时重试认领一条消息:
Client 1: XCLAIM race:italy italy_riders Alice 60000 1692632647899-0
Client 2: XCLAIM race:italy italy_riders Lora 60000 1692632647899-0然而,领取消息的副作用是,它会重置其空闲时间,并增加其投递次数计数器,因此第二个客户端将无法领取该消息。这样,我们避免了对消息进行重复处理(即使在一般情况下无法获得恰好一次的处理)。
这是命令执行的结果:
> XCLAIM race:italy italy_riders Alice 60000 1692632647899-0
1) 1) "1692632647899-0"
2) 1) "rider"
2) "Royce"Alice 成功认领了该消息,她现在可以处理该消息并确认它,即使原始消费者没有恢复,她也可以继续处理该消息。
从上面的示例中可以清楚地看出,成功认领某条消息后,XCLAIM 命令也会返回该消息。但这并非强制要求。可以使用 JUSTID 选项仅返回成功认领的消息的 ID。如果您希望减少客户端和服务器之间的带宽占用(以及命令的性能),并且由于消费者的实现方式是会不时重新扫描待处理消息的历史记录,因此您对该消息不感兴趣,那么 JUSTID 选项非常有用。
认领也可以通过一个单独的进程来实现:该进程只检查待处理消息列表,并将空闲消息分配给看起来活跃的消费者。可以使用 Redis 流的可观察性功能之一来获取活跃消费者。这是下一节的主题。
自动认领
Redis 6.2 中新增的 XAUTOCLAIM 命令实现了我们上文描述的声明流程。XPENDING 和 XCLAIM 为不同类型的恢复机制提供了基本构建块。此命令通过让 Redis 管理来优化通用流程,并为大多数恢复需求提供了一个简单的解决方案。
XAUTOCLAIM 识别空闲的待处理消息,并将其所有权转移给消费者。该命令的签名如下:
XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]因此,在上面的例子中,我可以使用自动声明来声明一条消息,如下所示:
> XAUTOCLAIM race:italy italy_riders Alice 60000 0-0 COUNT 1
1) "0-0"
2) 1) 1) "1692632662819-0"
2) 1) "rider"
2) "Sam-Bodden"与 XCLAIM 类似,该命令会返回一个已认领消息的数组,但它还会返回一个流 ID,以便迭代待处理的条目。流 ID 是一个游标,我可以在下次调用时使用它来继续认领空闲的待处理消息:
> XAUTOCLAIM race:italy italy_riders Lora 60000 (1692632662819-0 COUNT 1
1) "1692632662819-0"
2) 1) 1) "1692632647899-0"
2) 1) "rider"
2) "Royce"当 XAUTOCLAIM 返回“0-0”流 ID 作为游标时,表示它已到达消费者组待处理条目列表的末尾。但这并不意味着没有新的空闲待处理消息,因此该过程将继续从流的开头调用 XAUTOCLAIM。
认领和送货柜台
您在 XPENDING 输出中看到的计数器是每条消息的投递次数。计数器以两种方式递增:当通过 XCLAIM 成功认领消息时,或者当使用 XREADGROUP 调用访问待处理消息的历史记录时。
发生故障时,消息通常会被多次投递,但最终通常会被处理和确认。然而,处理某些特定消息可能会出现问题,因为它可能被损坏或被篡改,从而触发了处理代码中的错误。在这种情况下,消费者将持续无法处理这条特定消息。因为我们有投递尝试次数的计数器,所以我们可以使用该计数器来检测由于某种原因无法处理的消息。因此,一旦投递计数器达到您选择的特定数值,将此类消息放入另一个流并向系统管理员发送通知可能是更明智的做法。这基本上就是 Redis Streams 实现死信概念的方式。
流可观察性
缺乏可观察性的消息系统很难使用。不知道谁在消费消息、哪些消息处于待处理状态、以及给定流中活跃的消费者组集合,这一切都让一切都变得不透明。因此,Redis 流和消费者组提供了不同的方法来观察正在发生的事情。我们已经介绍了 XPENDING,它允许我们检查给定时刻正在处理的消息列表,以及它们的空闲时间和投递次数。
然而,我们可能想做的不止这些,XINFO 命令是一个可观察性接口,可以与子命令一起使用以获取有关流或消费者组的信息。
此命令使用子命令来显示有关流及其消费者组状态的不同信息。例如,XINFO STREAM 报告有关流本身的信息。
> XINFO STREAM race:italy
1) "length"
2) (integer) 5
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1692632678249-0"
9) "groups"
10) (integer) 1
11) "first-entry"
12) 1) "1692632639151-0"
2) 1) "rider"
2) "Castilla"
13) "last-entry"
14) 1) "1692632678249-0"
2) 1) "rider"
2) "Norem"输出显示了有关流内部编码方式的信息,以及流中的第一条和最后一条消息。另一条可用信息是与此流关联的消费者组的数量。我们可以进一步挖掘以获取有关消费者组的更多信息。
> XINFO GROUPS race:italy
1) 1) "name"
2) "italy_riders"
3) "consumers"
4) (integer) 3
5) "pending"
6) (integer) 2
7) "last-delivered-id"
8) "1692632662819-0"正如您在该命令和上一个输出中看到的,XINFO 命令输出一系列字段值项。由于它是一个可观察性命令,这使得用户能够立即了解报告的内容,并且允许该命令将来通过添加更多字段来报告更多信息,而不会破坏与旧客户端的兼容性。其他需要更高效利用带宽的命令(例如 XPENDING)只会报告信息,而不包含字段名称。
上面使用 GROUPS 子命令的示例的输出,通过观察字段名称应该非常清晰。我们可以通过检查组中注册的消费者来更详细地检查特定消费者组的状态。
> XINFO CONSUMERS race:italy italy_riders
1) 1) "name"
2) "Alice"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 177546
2) 1) "name"
2) "Bob"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 424686
3) 1) "name"
2) "Lora"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 72241如果您不记得命令的语法,只需向命令本身寻求帮助:
> XINFO HELP
1) XINFO <subcommand> [<arg> [value] [opt] ...]. Subcommands are:
2) CONSUMERS <key> <groupname>
3) Show consumers of <groupname>.
4) GROUPS <key>
5) Show the stream consumer groups.
6) STREAM <key> [FULL [COUNT <count>]
7) Show information about the stream.
8) HELP
9) Prints this help.与 Kafka(TM)分区的区别
Redis 流中的消费者组可能在某种程度上类似于 Kafka (TM) 基于分区的消费者组,但请注意,Redis 流在实际操作上有很大不同。分区只是逻辑上的,消息也只是被放入一个 Redis 键中,因此不同客户端的服务方式取决于哪些客户端已准备好处理新消息,而不是客户端正在从哪个分区读取消息。例如,如果消费者 C3 在某个时刻永久故障,Redis 将继续为 C1 和 C2 提供所有到达的新消息,就好像现在只有两个逻辑分区一样。
类似地,如果某个消费者处理消息的速度比其他消费者快得多,那么该消费者在单位时间内收到的消息数量也会相应增加。这是因为 Redis 会明确跟踪所有未确认的消息,并记住谁收到了哪条消息,以及第一条从未传递给任何消费者的消息的 ID。
然而,这也意味着,在 Redis 中,如果您真的想将同一个流中的消息分区到多个 Redis 实例中,则必须使用多个键和一些分片系统,例如 Redis 集群或其他特定于应用程序的分片系统。单个 Redis 流不会自动分区到多个实例。
我们可以概括地说,以下说法是正确的:
- 如果您使用 1 个流 -> 1 个消费者,则您将按顺序处理消息。
- 如果您使用 N 个流和 N 个消费者,以便只有给定的消费者才能命中 N 个流的子集,那么您可以扩展上述 1 个流 -> 1 个消费者的模型。
- 如果您使用 1 个流 -> N 个消费者,那么您将对 N 个消费者进行负载平衡,然而在这种情况下,有关同一逻辑项的消息可能会无序使用,因为给定消费者处理消息 3 的速度可能比另一个消费者处理消息 4 的速度更快。
因此,基本上 Kafka 分区更类似于使用 N 个不同的 Redis 键,而 Redis 消费者组是从给定流到 N 个不同消费者的消息的服务器端负载平衡系统。
封顶溪流
许多应用程序不希望永远将数据收集到流中。有时,在流中最多包含给定数量的项目是有用的;有时,一旦达到给定的大小,将数据从 Redis 移动到非内存存储(速度虽然慢,但适合存储未来数十年的历史记录)也很有帮助。Redis 流对此提供了一些支持。其中一个是 XADD 命令的 MAXLEN 选项。此选项使用起来非常简单:
> XADD race:italy MAXLEN 2 * rider Jones
"1692633189161-0"
> XADD race:italy MAXLEN 2 * rider Wood
"1692633198206-0"
> XADD race:italy MAXLEN 2 * rider Henshaw
"1692633208557-0"
> XLEN race:italy
(integer) 2
> XRANGE race:italy - +
1) 1) "1692633198206-0"
2) 1) "rider"
2) "Wood"
2) 1) "1692633208557-0"
2) 1) "rider"
2) "Henshaw"使用 MAXLEN 时,当达到指定长度时,旧条目会被自动清除,从而使流保持恒定大小。目前没有选项可以指示流仅保留不早于给定时间的项目,因为为了保持一致运行,此类命令可能会阻塞很长时间以清除项目。想象一下,如果出现插入峰值,然后是长时间暂停,然后再次插入,并且所有插入操作的最大时间都相同,会发生什么情况。流将阻塞以清除暂停期间变得太旧的数据。因此,用户需要进行一些规划并了解所需的最大流长度。此外,虽然流的长度与使用的内存成正比,但按时间修剪不太容易控制和预测:它取决于插入速率,而插入速率通常会随时间变化(当插入速率不变时,仅按大小修剪就很简单了)。
然而,使用 MAXLEN 进行修剪可能代价高昂:为了提高内存效率,流会用宏节点表示成基数树。修改由几十个元素组成的单个宏节点并非最佳选择。因此,可以使用以下特殊形式的命令:
XADD race:italy MAXLEN ~ 1000 * ... entry fields here ...MAXLEN 选项和实际计数之间的 ~ 参数意味着,我并不需要正好是 1000 个项目。它可以是 1000、1010 或 1030,只要确保至少保存 1000 个项目即可。使用此参数,只有当我们能够移除整个节点时才会执行修剪。这使得修剪过程更加高效,通常也是您想要的。您会注意到,客户端库对此有各种实现。例如,Python 客户端默认使用近似值,必须显式设置为真实长度。
还有 XTRIM 命令,它执行的操作与上面的 MAXLEN 选项非常相似,只是它可以单独运行:
> XTRIM race:italy MAXLEN 10
(integer) 0或者,对于 XADD 选项:
> XTRIM mystream MAXLEN ~ 10
(integer) 0然而,XTRIM 的设计初衷是接受不同的修剪策略。另一种修剪策略是 MINID,它会删除 ID 低于指定值的条目。
由于 XTRIM 是一个明确的命令,因此用户应该了解不同修剪策略可能存在的缺点。
将来可能会添加到 XTRIM 的另一个有用的驱逐策略是按一系列 ID 进行删除,以便在需要时轻松使用 XRANGE 和 XTRIM 将数据从 Redis 移动到其他存储系统。
流 API 中的特殊 ID
您可能已经注意到,Redis API 中可以使用几个特殊 ID。这里简单回顾一下,以便将来更好地理解它们。
前两个特殊 ID 是 - 和 +,用于 XRANGE 命令的范围查询。这两个 ID 分别表示可能的最小 ID(基本上是 0-1)和可能的最大值(即 18446744073709551615-18446744073709551615)。正如你所见,用 - 和 + 代替这些数字要简洁得多。
然后,有些 API 会要求我们获取流中 ID 最大的条目的 ID。这就是 $ 的含义。例如,如果我只想要 XREADGROUP 的新条目,我会使用这个 ID 来表示我已经拥有所有现有条目,但不包含将来会插入的新条目。同样,当我创建或设置消费者组的 ID 时,我可以将最后交付的条目设置为 $,以便只向组中的消费者交付新条目。
如您所见,$ 并不代表 +,它们是两个不同的东西,因为 + 表示所有可能的流中最大的 ID,而 $ 表示包含给定条目的给定流中最大的 ID。此外,API 通常只能理解 + 或 $,但避免加载具有多重含义的给定符号很有用。
另一个特殊 ID 是 >,它的特殊含义仅与消费者组相关,并且仅在使用 XREADGROUP 命令时才有效。这个特殊 ID 意味着我们只需要迄今为止从未交付给其他消费者的条目。因此,基本上, > ID 是消费者组的最后交付 ID。
最后,特殊 ID * 只能与 XADD 命令一起使用,表示为新条目自动选择一个 ID。
因此我们有 -、+、$、> 和 *,它们都有不同的含义,并且大多数时候可以在不同的上下文中使用。
持久性、复制和消息安全
与任何其他 Redis 数据结构一样,Stream 会被异步复制到副本服务器,并持久化到 AOF 和 RDB 文件中。然而,可能不太明显的是,消费者组的完整状态也会传播到 AOF、RDB 和副本服务器,因此,如果主服务器中有待处理的消息,副本服务器也会拥有相同的信息。同样,重启后,AOF 也会恢复消费者组的状态。
但请注意,Redis 流和消费者组使用 Redis 默认复制进行持久化和复制,因此:
- 如果消息持久性在您的应用程序中很重要,则必须将 AOF 与强大的 fsync 策略一起使用。
- 默认情况下,异步复制不会保证复制 XADD 命令或消费者组状态更改:故障转移后,可能会丢失某些内容,具体取决于副本从主服务器接收数据的能力。
- 可以使用 WAIT 命令强制将更改传播到一组副本。但请注意,虽然这可以降低数据丢失的可能性,但由 Sentinel 或 Redis Cluster 操作的 Redis 故障转移过程只会尽力检查故障转移到最新的副本,并且在某些特定的故障情况下,可能会提升缺少部分数据的副本。
因此,在使用 Redis 流和消费者组设计应用程序时,请务必了解应用程序在发生故障时应具有的语义属性,并进行相应的配置,评估它是否足以满足您的用例的安全要求。
从流中删除单个项目
流还具有一个特殊的命令,用于仅通过 ID 从流中间删除项目。通常,对于仅附加的数据结构来说,这看起来可能是一个奇怪的功能,但它实际上对于涉及隐私法规等的应用非常有用。该命令名为 XDEL,其参数为流的名称以及要删除的 ID:
> XRANGE race:italy - + COUNT 2
1) 1) "1692633198206-0"
2) 1) "rider"
2) "Wood"
2) 1) "1692633208557-0"
2) 1) "rider"
2) "Henshaw"
> XDEL race:italy 1692633208557-0
(integer) 1
> XRANGE race:italy - + COUNT 2
1) 1) "1692633198206-0"
2) 1) "rider"
2) "Wood"但是在当前实现中,直到宏节点完全为空时内存才会真正被回收,因此您不应滥用此功能。
零长度流
流与其他 Redis 数据结构的区别在于,当其他数据结构不再包含任何元素时,调用移除元素的命令会产生副作用,即键本身会被移除。例如,当调用 ZREM 移除有序集合中的最后一个元素时,有序集合会被完全移除。而流则可以保留零个元素,这既可以是使用计数为零的 MAXLEN 选项(XADD 和 XTRIM 命令),也可以是调用了 XDEL 命令。
存在这种不对称的原因在于,Streams 可能关联有消费者组,而我们不希望仅仅因为 Stream 中不再有任何项目就丢失消费者组定义的状态。目前,即使 Streams 没有关联的消费者组,也不会被删除。
消费一条消息的总延迟
非阻塞流命令(例如不带 BLOCK 选项的 XRANGE 和 XREAD 或 XREADGROUP)与其他 Redis 命令一样,都是同步执行的,因此讨论此类命令的延迟毫无意义:查看 Redis 文档中这些命令的时间复杂度更有意义。只需说明,在提取范围时,流命令至少与有序集合命令一样快,而 XADD 非常快,如果使用流水线,在普通机器上每秒可以轻松插入 50 万到 100 万个项目。
然而,如果我们想要了解在消费者组中阻止消费者的背景下处理消息的延迟,从通过 XADD 生成消息的那一刻到消费者由于 XREADGROUP 返回消息而获得消息的那一刻,延迟就成为一个有趣的参数。
为被屏蔽的消费者提供服务的工作原理
在提供已执行测试的结果之前,有必要了解 Redis 使用什么模型来路由流消息(以及通常如何管理等待数据的任何阻塞操作)。
- 被阻塞的客户端会被一个哈希表引用,该哈希表将至少有一个阻塞消费者的键映射到正在等待该键的消费者列表。这样,只要给定一个接收到数据的键,我们就可以解析所有正在等待该数据的客户端。
- 当发生写入操作时,在本例中,当调用 XADD 命令时,它会调用 signalKeyAsReady() 函数。该函数会将键放入需要处理的键列表中,因为这些键可能包含新的数据,需要等待被阻塞的消费者处理。需要注意的是,这些就绪的键稍后会被处理,因此在同一事件循环中,该键可能会收到其他写入操作。
- 最后,在返回事件循环之前,会处理已就绪的键。对于每个键,都会扫描等待数据的客户端列表,如果适用,则这些客户端将接收新到达的数据。对于流,数据是消费者请求的适用范围内的消息。
如您所见,基本上,在返回事件循环之前,调用 XADD 的客户端和被阻塞以消费消息的客户端都会在输出缓冲区中收到它们的回复,因此 XADD 的调用者应该在消费者收到新消息的同时收到来自 Redis 的回复。
此模型基于推送,因为将数据添加到消费者缓冲区将直接通过调用 XADD 的操作执行,因此延迟往往是相当可预测的。
延迟测试结果
为了检查这些延迟特性,我们进行了一项测试,使用多个 Ruby 程序实例推送消息,并在消息中添加一个附加字段,即计算机毫秒时间,并由 Ruby 程序从消费者组读取消息并进行处理。消息处理步骤包括将当前计算机时间与消息时间戳进行比较,以了解总延迟。
获得的结果:
Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%因此,99.9% 的请求延迟小于等于 2 毫秒,异常值仍然非常接近平均值。
向流中添加数百万条未确认的消息不会改变基准测试的要点,大多数查询仍然以非常短的延迟进行处理。
几点说明:
- 这里我们每次迭代最多处理 10,000 条消息,这意味着 XREADGROUP 的 COUNT 参数设置为 10,000。这会增加不少延迟,但为了让速度较慢的消费者能够跟上消息流,这样做是必要的。因此,实际延迟会小得多。
- 与目前的标准相比,本次基准测试所用的系统速度非常慢。
xadd
redis xadd 命令是用于向 Redis Stream 中添加新条目的命令。Redis Stream 是一种用于消息队列的系统,它允许生产者添加消息,消费者可以按需读取消息。xadd 命令的作用就是将一个新的条目(消息)添加到指定的流中。
命令格式
XADD key ID field value [field value ...]- key:流的名称。
- ID:条目的唯一标识符。通常,使用特殊值
*来自动生成一个唯一的 ID。这个 ID 是基于时间的,确保了条目的顺序性。 - field value:消息的字段和值对。你可以添加多个这样的对来构建复杂的消息。
示例
假设我们有一个名为 mystream 的流,我们想要添加一个包含用户登录信息的消息:
XADD mystream * user_id 123 username alice timestamp 1625145600这里,* 表示让 Redis 自动生成一个 ID。消息包含三个字段:user_id、username 和 timestamp。
自动生成的 ID
当使用 * 作为 ID 时,Redis 会基于当前的时间戳和自增的序列号生成一个 ID。这个 ID 是全局唯一的,并且可以保证添加的顺序。例如,可能会生成类似于 1625145600000-0 的 ID,其中 1625145600000 是时间戳部分,0 是序列号部分。
返回值
xadd 命令返回生成的 ID。这允许你知道新添加的条目的唯一标识符,以便后续可能需要引用或处理这个条目时使用。
使用场景
Redis Stream 非常适合用于实现消息队列、日志记录、事件处理等场景。通过使用 xadd 命令,你可以轻松地向流中添加消息,然后通过 xread、xgroup、xack 等命令来读取和处理这些消息。
注意事项
- 确保在添加消息时考虑到消息的大小,因为大的消息可能会影响性能和内存使用。
- 使用合适的字段和值结构来存储消息,以便于消费者解析和处理。
- 利用 Redis Stream 的消费者组功能来实现消息的分发和消费确认。
总之,xadd 命令是 Redis Stream 数据结构中的一个核心命令,用于向流中添加消息。通过合理使用这个命令,你可以构建出高效、可扩展的消息处理系统。
xread
Redis 的 XREAD 命令用于读取 Redis Streams 中的数据。Streams 是 Redis 5.0 版本引入的一种新的数据结构,它提供了一种高效的方式来处理实时数据流。 XREAD 命令允许你从一个或多个 Streams 中读取消息,并且可以根据需要进行阻塞或非阻塞操作。
命令语法:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]让我们逐个分析参数:
XREAD: 这是命令本身。[COUNT count]: 可选参数,指定最多读取多少条消息。如果省略,则默认读取所有可用的消息。[BLOCK milliseconds]: 可选参数,指定阻塞等待的时间(以毫秒为单位)。如果在指定的时间内没有新的消息到达,则命令返回空结果。如果省略此参数,则XREAD命令会立即返回,即使没有可用消息。BLOCK 0表示阻塞模式,等待直到有消息才返回。STREAMS: 关键字,表示接下来要指定的是 Streams 的键和 ID。key [key ...]: 一个或多个 Streams 的键名。 你可以指定多个 Streams,XREAD会同时从所有指定的 Streams 中读取消息。ID [ID ...]: 一个或多个消息 ID。 这指定了从哪个位置开始读取消息。 每个 Stream 必须对应一个 ID。 你可以使用以下几种 ID:$: 表示读取最新消息。这是最常用的选项。>: (Redis 6.2+) 与$等效,读取最新消息。0: 表示读取 Stream 中的第一条消息。- 具体的 ID: 如果你知道某个消息的 ID,你可以使用该 ID 来读取从该消息开始的所有后续消息。 消息 ID 是一个字符串,通常由时间戳和序列号组成,例如:
1678886400000-0
示例:
读取名为
mystream的 Stream 中最新的 10 条消息:bashXREAD COUNT 10 STREAMS mystream $阻塞 1 秒钟,读取名为
mystream和anotherstream的 Stream 中最新的消息:bashXREAD BLOCK 1000 STREAMS mystream $ anotherstream $从
mystream的 ID1678886400000-0开始读取消息:bashXREAD STREAMS mystream 1678886400000-0阻塞模式读取
mystream的最新消息 (Redis 6.2+)bashXREAD BLOCK 0 STREAMS mystream >
返回结果:
XREAD 命令的返回结果是一个数组,每个元素对应一个 Stream。 每个 Stream 元素包含 Stream 的键名和一个消息列表。 消息列表中的每个消息包含消息 ID 和消息体。
重要提示:
- 使用
$读取最新消息是最常见的用法。 BLOCK参数对于构建实时应用程序非常有用,它允许你的程序在没有新消息时休眠,而不是不断轮询。- 理解消息 ID 至关重要,它决定了从哪里开始读取消息。
希望这个解释能够帮助你理解 Redis XREAD 命令。 如果你还有其他问题,请随时提出。
消费者组
介绍
Redis Streams 的消费者组 (Consumer Groups) 是一种机制,允许多个消费者同时从同一个 Stream 读取消息,并且确保每个消息只被消费一次。 这对于构建高吞吐量的、可扩展的消息处理系统至关重要。 如果没有消费者组,多个消费者可能会读取相同的消息,导致数据处理的错误或不一致。
关键概念:
- Stream: Redis Streams 的数据结构,用于存储消息序列。
- Consumer Group: 一组消费者,它们共享同一个 Stream,并协同工作来消费消息。
- Consumer: 属于消费者组的个体进程或线程,负责从 Stream 中读取并处理消息。
- Pending Entries (待处理消息): 消费者组中未被任何消费者处理的消息。
- Last Delivered ID (最后交付的消息ID): 消费者组中每个消费者最后成功处理的消息的 ID。
消费者组的核心功能:
- 消息分发: 消费者组确保每个消息只被一个消费者处理一次。 即使有多个消费者同时运行,它们也不会读取相同的消息。
- 消息持久性: 即使消费者关闭或崩溃,未处理的消息仍然保留在 Stream 中,直到被其他消费者处理。
- 可扩展性: 通过添加更多的消费者,可以提高消息处理的吞吐量。
- 容错性: 如果一个消费者出现故障,其他消费者可以继续处理消息。
关键命令:
XGROUP CREATE <key> <groupname> <id>: 创建一个消费者组。<key>是 Stream 的键名,<groupname>是消费者组的名称,<id>是消费者组的起始消息 ID (例如0表示从 Stream 的开头开始,$表示从最新消息开始)。XGROUP SETID <key> <groupname> <id>: 设置消费者组的起始消息 ID。 这可以用于重置消费者组的消费进度。XGROUP DESTROY <key> <groupname>: 删除消费者组。XGROUP DELCONSUMER <key> <groupname> <consumername>: 从消费者组中删除一个消费者。XPENDING <key> <groupname> [start] [end] [count] [consumer]: 查看待处理的消息。XREADGROUP GROUP <groupname> <consumername> [COUNT count] [BLOCK milliseconds] STREAMS <key> <id>: 从消费者组中读取消息。<consumername>是消费者的名称。XACK <key> <groupname> <id> [id ... ]: 确认消息已被成功处理。 Redis 会将确认的消息从待处理消息列表中移除。
示例:
创建消费者组:
bashXGROUP CREATE mystream mygroup 0读取消息 (消费者名为
consumer1):bashXREADGROUP GROUP mygroup consumer1 COUNT 10 BLOCK 0 STREAMS mystream >确认消息已处理:
假设
XREADGROUP返回的消息 ID 为1678886400000-0,则确认消息:bashXACK mystream mygroup 1678886400000-0
需要注意的事项:
- 确保
XACK命令及时被调用以确认消息的处理。 否则,消息会一直保留在待处理消息列表中。 - 消费者组的名称必须唯一。
- 合理地选择
BLOCK参数,以平衡延迟和资源利用率。 - 监控
XPENDING命令的输出,以确保消息不会积压。
消费者组是 Redis Streams 的一个强大功能,它能够有效地处理大量消息并提高系统的可靠性和可扩展性。 理解这些概念和命令对于有效地使用 Redis Streams 至关重要。
Pending List
Redis Stream 消费者组的 Pending List (待处理列表) 是一个内部数据结构,它存储着尚未被任何消费者成功处理的消息。更准确地说,它记录了哪些消息已经被消费者获取但尚未被 XACK 命令确认处理。
Pending List 的作用:
确保消息至少被处理一次: 即使消费者在处理消息的过程中崩溃或出现故障,这些消息仍然保留在 Pending List 中,等待其他消费者处理,保证了消息不会丢失。
实现精确一次语义 (Exactly-Once Semantics): 通过 Pending List 和
XACK命令的配合,Redis Streams 的消费者组可以确保每个消息只被成功处理一次。 如果消费者处理消息成功,它会使用XACK命令确认,将消息从 Pending List 中移除;如果消费者处理失败或崩溃,消息仍然保留在 Pending List 中,直到被其他消费者处理。负载均衡和容错: Pending List 允许消费者组中的消费者在处理消息时出现不平衡的情况,也提供了容错性。如果一个消费者处理速度慢或宕机,其他消费者可以接管并处理 Pending List 中的消息。
Pending List 的内容:
Pending List 中的每个条目包含以下信息:
- 消息 ID: 待处理消息的唯一标识符。
- 消费者名称: 获取该消息的消费者的名称。
- 消息的创建时间: 该消息加入 Pending List 的时间。
- 消息内容 (通常不在 Pending List 中直接存储): 消息的内容本身并不存储在 Pending List 中,而是存储在 Stream 本身。 Pending List 只存储消息的 ID 和其他元数据信息。
如何查看 Pending List:
可以使用 XPENDING 命令查看 Pending List 中的消息:
XPENDING <key> <groupname> [start] [end] [count] [consumer]参数说明:
<key>: Stream 的键名。<groupname>: 消费者组的名称。[start]: 可选,起始消息 ID,默认为-(最小 ID)。[end]: 可选,结束消息 ID,默认为+(最大 ID)。[count]: 可选,最多返回多少条消息,默认为 100。[consumer]: 可选,指定要查看哪个消费者的待处理消息,默认为所有消费者。
总结:
Pending List 是 Redis Streams 消费者组的核心机制之一,它对于保证消息的可靠性和一次性处理至关重要。 通过 XPENDING 命令,你可以监控 Pending List 中的消息数量,及时发现并处理潜在的问题,例如消费者处理速度过慢或系统故障。 理解 Pending List 的工作原理对于正确使用 Redis Streams 的消费者组至关重要。
读取消息 ID 中 0、$、> 的区别
在 Redis Streams 中,ID 用于标识消息,0、$ 和 > 都是特殊的 ID,代表不同的位置或含义:
0: 表示 Stream 中的第一条消息的 ID。 这通常是 Stream 创建后添加的第一条消息的 ID。 使用0开始读取,会从 Stream 的起始位置开始读取所有消息。$: 表示 Stream 中的最新消息的 ID。 使用$开始读取,会读取 Stream 中当前最新的消息。 这是最常用的读取方式,尤其是在实时应用中,需要获取最新的数据。>: (Redis 6.2 及以上版本) 与$完全等效。 它也是表示 Stream 中最新消息的 ID。 Redis 引入>是为了使 ID 的表示更加清晰和直观。 在新的代码中,建议使用>来代替$。
总结表格:
| ID | 说明 | Redis 版本 |
|---|---|---|
0 | Stream 中第一条消息的 ID | 5.0+ |
$ | Stream 中最新消息的 ID (旧式写法) | 5.0-6.1 |
> | Stream 中最新消息的 ID (推荐写法) | 6.2+ |
使用示例 (假设你的 Stream 键名为 mystream):
XREAD STREAMS mystream 0: 从mystream的第一条消息开始读取所有消息。XREAD STREAMS mystream $: 从mystream的最新消息开始读取消息(Redis 6.1 及以下)。XREAD STREAMS mystream >: 从mystream的最新消息开始读取消息(Redis 6.2 及以上,推荐)。
选择哪个 ID 取决于你的需求:
- 如果你需要处理 Stream 中的所有历史消息,则使用
0。 - 如果你只需要处理最新的消息,则使用
>(Redis 6.2+) 或$(Redis 6.1 及以下)。
需要注意的是,0 和 $/> 只在 XREAD、XREADGROUP 等命令中作为起始 ID 使用。 它们本身不是实际的消息 ID,而是指向特定位置的特殊标识符。 实际的消息 ID 是由时间戳和序列号组成的更复杂的字符串。
不结合消费者组用法
注意事项:
- 因为 xread 使用 $ 时读取最新消息有漏读消息的风险(即无法读取没有 xread 期间新增的消息)
- 多个消费者会重复消费同一个消息
# 添加消息到 stream 中
# s1 是 stream 的名称
# * 表示不指定消息 ID
# k1 v1 k2 v2 是消息的键值对
xadd s1 * k1 v1 k2 v2
xadd s1 * k3 v3
xadd s1 * k4 v4
# 查看 stream 中所有消息
# - 对应的参数用于指定起始 ID,这里是无穷小
# + 对应的参数用于指定结束 ID,这里是无穷大
xrange s1 - +
# 查看 stream 中前 2 条消息
xrange s1 - + count 2
# 查看 stream 中消息数量
xlen s1
# 非阻塞读取 stream 中的消息
# count 1 表示读取一条消息
# streams s1 表示从 s1 中读取消息
# 0 表示读取 stream 中的第一条消息
xread count 1 streams s1 0
# 非阻塞读取 s2 中所有消息
xread streams s1 0
# 阻塞读取 s1,等待直到有最新消息
# count 1 表示读取一条消息
# block 0 表示阻塞读取,等待到有消息才返回
# streams s1 表示从 s1 中读取消息
# $ 表示最新的消息
# 注意:$ 读取最新消息有漏读消息的风险,即无法读取没有 xread 期间新增的消息
xread count 1 block 0 streams s1 $
# 从指定的消息 ID 开始读取其后面的所有消息
xread streams s1 1737170246150-0
# 删除指定消息
xdel s1 1737170246150-0模拟 RabbitMQ 工作模式队列
详细用法请参考本站 示例
// 模拟 RabbitMQ 的工作模式
@Test
public void testSimRabbitMQWorkerMode() throws InterruptedException {
String streamKey = UUID.randomUUID().toString();
String groupKey = UUID.randomUUID().toString();
// 创建消费者组
String result = this.redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.latest(), groupKey);
Assertions.assertEquals("OK", result);
int totalMessageCount = 5;
// 发送消息
for (int i = 0; i < totalMessageCount; i++) {
int finalI = i;
this.redisTemplate.opsForStream().add(MapRecord.create(streamKey, new HashMap<String, String>() {{
put("k" + finalI, "v" + finalI);
}}));
}
AtomicInteger readCounter = new AtomicInteger();
// 演示多个消费者也不会重复消费同一个消息
ExecutorService executor = Executors.newCachedThreadPool();
int concurrentThreads = 32;
for (int i = 0; i < concurrentThreads; i++) {
executor.submit(() -> {
String consumerKey = UUID.randomUUID().toString();
while (true) {
List<MapRecord<String, Object, Object>> recordList = this.redisTemplate.opsForStream().read(
Consumer.from(groupKey, consumerKey)
// block(Duration.ofSeconds(0)) 表示阻塞等待,直到有消息才返回
, StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1))
, StreamOffset.create(streamKey, ReadOffset.lastConsumed()));
if (recordList != null && !recordList.isEmpty()) {
readCounter.incrementAndGet();
} else {
break;
}
}
});
}
executor.shutdown();
while (!executor.awaitTermination(10, TimeUnit.MILLISECONDS)) ;
Assertions.assertEquals(totalMessageCount, readCounter.get());
}RedisTemplate 通过消息监听器容器进行异步接收
详细用法请参考本站 示例
基本用法
注意:spring-boot-starter-parent 版本需要使用 2.7.18,否则在调用
this.redisTemplate.opsForStream().createGroup(Const.StreamName, Const.GroupNameB);时会报告 Stream 不存错误。
POM 配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.18</version>
</parent>
<groupId>com.future.demo</groupId>
<artifactId>redistemplate-stream</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>application.properties 配置如下:
spring.redis.host=localhost
spring.redis.port=6379
spring.redis.password=123456
# 需要调整连接池大小否则在注册多个 consumer 时报告连接失败错误
spring.redis.lettuce.pool.max-active=2048创建 StreamListener
@Slf4j
public class MyStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
String groupName;
String consumerName;
StringRedisTemplate redisTemplate;
public MyStreamListener(String groupName, String consumerName, StringRedisTemplate redisTemplate) {
this.groupName = groupName;
this.consumerName = consumerName;
this.redisTemplate = redisTemplate;
}
@Override
public void onMessage(MapRecord<String, String, String> message) {
Const.RecordIdList.add(message.getId().getValue());
log.info("{} - {} - {}", groupName, consumerName, message);
this.redisTemplate.opsForStream().acknowledge(groupName, message);
}
}注册 Listener
@Configuration
@Slf4j
public class ConfigRedis {
@Autowired
StringRedisTemplate redisTemplate;
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> configStreamListener(
RedisConnectionFactory connectionFactory) {
// region 创建多个消费组,每个消费组中有多个消费者
// 多个消费组会收到同一个消息,但是每个消费组内同一个消息不会被重复消费
try {
this.redisTemplate.opsForStream().createGroup(Const.StreamName, Const.GroupNameA);
} catch (RedisSystemException ex) {
if (ex.getRootCause() != null && !ex.getRootCause().getMessage().contains("Group name already exists")) {
throw ex;
}
}
try {
this.redisTemplate.opsForStream().createGroup(Const.StreamName, Const.GroupNameB);
} catch (RedisSystemException ex) {
if (ex.getRootCause() != null && !ex.getRootCause().getMessage().contains("Group name already exists")) {
throw ex;
}
}
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>>
options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.errorHandler((e) -> {
log.error(e.getMessage(), e);
})
.pollTimeout(Duration.ofMillis(100))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
StreamMessageListenerContainer.create(connectionFactory, options);
container.receive(Consumer.from(Const.GroupNameA, Const.ConsumerNameA),
StreamOffset.create(Const.StreamName, ReadOffset.lastConsumed()),
new MyStreamListener(Const.GroupNameA, Const.ConsumerNameA, this.redisTemplate));
container.receive(Consumer.from(Const.GroupNameA, Const.ConsumerNameB),
StreamOffset.create(Const.StreamName, ReadOffset.lastConsumed()),
new MyStreamListener(Const.GroupNameA, Const.ConsumerNameB, this.redisTemplate));
container.receive(Consumer.from(Const.GroupNameB, Const.ConsumerNameA),
StreamOffset.create(Const.StreamName, ReadOffset.lastConsumed()),
new MyStreamListener(Const.GroupNameB, Const.ConsumerNameA, this.redisTemplate));
container.receive(Consumer.from(Const.GroupNameB, Const.ConsumerNameB),
StreamOffset.create(Const.StreamName, ReadOffset.lastConsumed()),
new MyStreamListener(Const.GroupNameB, Const.ConsumerNameB, this.redisTemplate));
// endregion
return container;
}
}测试配置是否正确
@Test
public void testMessageListenerContainer() throws InterruptedException {
Const.RecordIdList.clear();
StringRecord record = StringRecord.of(new HashMap<String, String>() {{
this.put("k1", "v1");
}}).withStreamKey(Const.StreamName);
RecordId recordId = this.redisTemplate.opsForStream().add(record);
TimeUnit.MILLISECONDS.sleep(500);
Assertions.assertEquals(2, Const.RecordIdList.size());
Const.RecordIdList.forEach(o -> Assertions.assertEquals(recordId.getValue(), o));
}同时监听多个 Stream
创建 StreamListener
@Slf4j
public class MyStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
String groupName;
String consumerName;
StringRedisTemplate redisTemplate;
public MyStreamListener(String groupName, String consumerName, StringRedisTemplate redisTemplate) {
this.groupName = groupName;
this.consumerName = consumerName;
this.redisTemplate = redisTemplate;
}
@Override
public void onMessage(MapRecord<String, String, String> message) {
Const.RecordIdList.add(message.getId().getValue());
log.info("{} - {} - {}", groupName, consumerName, message);
this.redisTemplate.opsForStream().acknowledge(groupName, message);
}
}注册 Listener
@Configuration
@Slf4j
public class ConfigRedis {
@Autowired
StringRedisTemplate redisTemplate;
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> configStreamListener(
RedisConnectionFactory connectionFactory) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>>
options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.errorHandler((e) -> {
log.error(e.getMessage(), e);
})
.pollTimeout(Duration.ofMillis(100))
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
StreamMessageListenerContainer.create(connectionFactory, options);
// region 创建多个 stream 并监听消息
for (int i = 0; i < Const.StreamCount; i++) {
String streamName = Const.StreamName + i;
String groupName = Const.GroupNameA;
String consumerName = Const.ConsumerNameA;
try {
this.redisTemplate.opsForStream().createGroup(streamName, groupName);
} catch (RedisSystemException ex) {
if (ex.getRootCause() != null && !ex.getRootCause().getMessage().contains("Group name already exists")) {
throw ex;
}
}
// 每个 stream 注册多次 listener 否则无法并发处理消息
/*for (int j = 0; j < 64; j++) {*/
container.receive(Consumer.from(groupName, consumerName),
StreamOffset.create(streamName, ReadOffset.lastConsumed()),
new MyStreamListener(groupName, consumerName, this.redisTemplate));
/*}*/
}
// endregion
return container;
}
}测试
@Test
public void testReadMultipleStreamConcurrently() throws InterruptedException {
List<String> recordIdList = new ArrayList<>();
for (int i = 0; i < Const.StreamCount; i++) {
String streamName = Const.StreamName + i;
StringRecord record = StringRecord.of(new HashMap<String, String>() {{
this.put("k1", "v1");
}}).withStreamKey(streamName);
RecordId recordId = this.redisTemplate.opsForStream().add(record);
recordIdList.add(recordId.getValue());
}
TimeUnit.SECONDS.sleep(1);
Assertions.assertEquals(recordIdList.size(), Const.RecordIdList.size());
recordIdList.forEach(o -> {
Assertions.assertTrue(Const.RecordIdList.contains(o));
});
}geo 类型
todo 暂时未遇到需求,所以不作出研究。
bitmap 类型
介绍
Redis Bitmap是Redis中的一种数据结构,它用于表示位图(bit array),是由0和1状态表现的二进制位的bit数组。以下是对Redis Bitmap的详细解释:
一、基本概念
- Bitmap不是Redis的一个独立数据类型,而是基于String类型实现的。Redis的String类型值最大能存储512MB的内容,每个String由多个字节组成,每个字节又由8个Bit位组成。
- Bitmap利用这些位来存储大量的二进制数据,这些位可以代表不同的状态或标志。
二、存储原理
- Bitmap利用Redis String类型的最大容量(512MB)来存储一个连续的二进制序列。
- 在这个序列中,每个字节的8位可以分别代表8个独立的状态。因此,可以用一个Bitmap来跟踪数百万甚至数十亿的状态。
三、常见操作
- SETBIT key offset value:设置指定偏移量处的位状态。其中,key是Redis键,offset是偏移量(从0开始),value是要设置的位状态(0或1)。
- GETBIT key offset:获取指定偏移量处的位状态。
- BITCOUNT key [start end]:统计给定范围内为1的位的数量。可以通过指定额外的start或end参数来限制统计范围。
- BITOP operation destkey key [key ...]:执行针对多个Bitmap的位操作,并将结果存储到指定的key中。operation参数可以是AND(逻辑与)、OR(逻辑或)、XOR(逻辑异或)、NOT(逻辑非)中的任意一种。
四、应用场景
- 用户在线状态记录:每位代表一个用户是否在线,可以用0表示离线,1表示在线。
- 用户行为标记:例如用户是否阅读过某篇文章、是否参与过某项活动等,可以用位图中的位来表示这些行为的状态。
- 大数据分析前的数据预处理:如用户画像构建,通过Bitmap记录用户属性,便于后续的批量统计和分析。
- 布隆过滤器的实现:虽然Redis内并未直接提供布隆过滤器,但可以使用Bitmap和其他数据结构组合来模拟布隆过滤器的功能。
- 统计活跃用户:每个用户对应一个Bitmap,每一位表示一天的活跃状态,可以用来统计一段时间内的活跃用户数。
- 其他应用场景:如商品浏览量统计、网站用户访问次数统计、排行榜和等级制度等。
五、优点
- 内存效率高:使用Bitmap可以比传统的哈希表或集合节省更多的内存空间。
- 操作高效:Redis提供了丰富的Bitmap操作命令,这些命令的执行效率非常高。
- 灵活性高:Bitmap可以与其他Redis数据结构结合使用,实现复杂的数据查询和处理逻辑。
综上所述,Redis Bitmap是一种非常灵活且高效的数据结构,适用于许多需要处理大量布尔值和空间优化的场景。
用户签到场景演示 bitmap 用法
# 第一天签到
# bm1 是 key,0 是 bitmap 位的偏移量,1 是指定位的偏移量的值
setbit bm1 0 1
# 第二天签到
setbit bm1 1 1
# 第六天签到,中间的 3、4、5 天没有签到
setbit bm1 5 1
# 查询第二天的签到情况
getbit bm1 1
# 查询第三天的签到情况
getbit bm1 2
# 查询共签到了多少次
bitcount bm1hyperloglog 类型
介绍
Redis HyperLogLog是一种高级数据结构,它是一种基数估计算法,能够在数据量很大的情况下,使用很小的空间近似地统计出所有数据的基数(即不重复元素的数量)。以下是对Redis HyperLogLog的详细介绍:
一、原理
HyperLogLog的原理基于伯努利试验和调和平均数。它通过随机映射函数将输入元素映射到一个固定大小的位图中,然后通过位图中零位的数量来估算基数。为了减小误差率,HyperLogLog使用了多个随机映射函数和稀疏位图等技术。
具体来说,HyperLogLog将数据通过哈希函数转换成比特串,然后分桶存储。每个桶中记录的是比特串中从低位到高位第一次出现1的位置(k_max)。最后,结合所有桶中的k_max值,代入估算公式,得出估算值。
二、Redis中的HyperLogLog命令
在Redis中,HyperLogLog提供了三个主要命令:
- PFADD:用于向HyperLogLog添加元素。如果添加成功返回1,如果元素已经存在则返回0。
- PFCOUNT:用于计算一个或多个HyperLogLog的独立总数(即基数)。对于单个key,它返回HyperLogLog中存储的基数统计结果;对于多个key,它返回多个key对应的HyperLogLog合并后的结果。
- PFMERGE:用于合并多个HyperLogLog,并将结果存储到指定的destkey中。
三、特点
- 空间效率高:HyperLogLog使用极小的内存空间就能完成独立总数的统计。在Redis中,每个HyperLogLog键只需要花费约12KB内存,就可以计算2^64的数据。
- 标准误差率低:Redis中HyperLogLog的标准误差率为0.81%,这意味着即使在非常大的数据集上,它也可以提供非常准确的结果。但需要注意的是,HyperLogLog是一种近似算法,存在一定的误差率。因此,在需要高精度统计的场景中,可能需要考虑其他算法或数据结构。
- 适用场景广泛:HyperLogLog适用于各种需要基数统计的场景,如独立访客统计、活跃用户数统计等。
四、使用示例
以下是一个简单的HyperLogLog使用示例:
# 向HyperLogLog中添加元素
127.0.0.1:6379> PFADD page user1
(integer) 1
# 计算HyperLogLog的基数
127.0.0.1:6379> PFCOUNT page
(integer) 1
# 向HyperLogLog中添加更多元素
127.0.0.1:6379> PFADD page user2 user3 user4
(integer) 1
# 再次计算HyperLogLog的基数
127.0.0.1:6379> PFCOUNT page
(integer) 4
# 合并多个HyperLogLog
127.0.0.1:6379> PFADD page1 user5 user6
(integer) 1
127.0.0.1:6379> PFADD page2 user7 user8
(integer) 1
127.0.0.1:6379> PFMERGE page_all page page1 page2
OK
# 计算合并后的HyperLogLog的基数
127.0.0.1:6379> PFCOUNT page_all五、注意事项
- HyperLogLog不支持单个元素的删除操作。如果需要删除元素,通常需要重新计算整个HyperLogLog。
- HyperLogLog的内存占用是固定的,与输入元素的数量无关。这使得它在处理大规模数据集时具有显著的优势。
综上所述,Redis HyperLogLog是一种非常有用的高级数据结构,它能够在保证一定精度的情况下,高效地统计大规模数据集的基数。
UV 统计场景演示 HyperLogLog 用法
详细用法请参考示例https://gitee.com/dexterleslie/demonstration/tree/master/demo-redis/redistemplate/redistemplate-hyperloglog