Redis的PubSub 模式即:
基于事件的系统中,Pub/Sub
是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。
消息发布者,即 publish
客户端,无需独占链接,你可以在 publish
消息的同时,使用同一个 redis-client
链接进行其他操作(例如:INCR
等)
消息订阅者,即 subscribe
客户端,需要独占链接,即进行 subscribe
期间,redis-client
无法穿插其他操作,此时 client
以阻塞的方式等待 publish
端的消息;这一点很好理解,因此 subscribe
端需要使用单独的链接,甚至需要在额外的线程中使用。
基本使用:
主要是订阅某频道,接收消息,取消订阅,额外的功能是:正则模式订阅,类似订阅整个频道,还有相关的基础统计功能
消费者收到的消息是:
{'pattern': None, 'type': 'subscribe','channel': 'codehole', 'data': 1L} {'pattern': None, 'type': 'message','channel': 'codehole', 'data': 'python comes'}
其中 data
是消息的内容,一个字符串。channel
这个也很明显,它表示当前订阅的主题名称。 type
它表示消息的类型,如果是一个普通的消息,那么类型就是 message
,如果是控制消息,比如订阅指令的反馈,它的类型就是 subscribe
,如果是模式订阅的反馈,它的类型就是 psubscribe
,还有取消订阅指令的反馈 unsubscribe
和 punsubscribe
。pattern
它表示当前消息是使用哪种模式订阅到的,如果是通过 subscribe
指令订阅的,那么这个字段就是空。
获取消息可以:
消费者可以通过轮询来获取消息,没有消息/接收不到则休眠一秒,也可以使用 listen 来阻塞监听消息来进行处理,官方文档:pubsub
缺陷:
PubSub 是一种即发即弃的模式,生产者传递过来一个消息,Redis 会直接找到相应的消费者传递过去。如果一个消费者都没有,那么消息直接丢弃。如果开始有三个消费者,一个消费者突然挂掉了,生产者会继续发送消息,另外两个消费者可以持续收到消息。但是挂掉的消费者重新连上的时候,这断连期间生产者发送的消息,对于这个消费者来说就是彻底丢失了。
如果 Redis 停机重启,PubSub 的消息是不会持久化的,Redis 宕机就相当于一个消费者都没有,所有的消息直接被丢弃。简单来说是没有消息的备份和恢复机制。如果大量消息同时涌入,redis没有对应的性能提升方案和慢消息方案,如果消费者来不及消费,可能造成消息堆积。
如果你期望消息是持久的:
① subscribe端首先向一个Set集合中增加“订阅者ID”,此Set集合保存了“活跃订阅”者,订阅者ID标记每个唯一的订阅者,例如:sub:email,sub:web。此SET称为“活跃订阅者集合”
② subcribe端开启订阅操作,并基于Redis创建一个以“订阅者ID”为KEY的LIST数据结构,此LIST中存储了所有的尚未消费的消息。此LIST称为“订阅者消息队列”
③ publish端:每发布一条消息之后,publish端都需要遍历“活跃订阅者集合”,并依次向每个“订阅者消息队列”尾部追加此次发布的消息。
④ 到此为止,我们可以基本保证,发布的每一条消息,都会持久保存在每个“订阅者消息队列”中。
⑤ subscribe端,每收到一个订阅消息,在消费之后,必须删除自己的“订阅者消息队列”头部的一条记录。
⑥ subscribe端启动时,如果发现自己的“订阅者消息队列”有残存记录,那么将会首先消费这些记录,然后再去订阅。
简单总结是:在额外的 set
中记录所有订阅者,并创建以订阅者 id 为 key 的list
数据结构,发布者将每一条消息发布成功之后,记录进订阅者 ID 为 key 的value
中,订阅者消费成功消息后,删除此消息,此外订阅者断线重连的情况下,读取并消费自身 ID 为 key 的 list 的数据,然后再重新订阅发布者,这样保证数据顺序不会乱,也能保证消息被存储和正确消费。
顺带推荐 质量很高的课程, 欢迎扫码购买
部分内容参考自:cnblog 文章 及 掘金小册 Redis 深度历险 作者:老钱
最后恰饭 阿里云全系列产品/短信包特惠购买 中小企业上云最佳选择 阿里云内部优惠券