交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
Redis 5-1、事件发布与订阅
分享
未结
0
1069
李延
LV6
2021-07-23
悬赏:20积分
# 功能说明 Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。 Redis 客户端可以订阅任意数量的频道。 # 测试用例 我们启动一个redis服务器。并且连接3个客户端。 第一个客户端,执行命令: ```c SUBSCRIBE test1 ``` 意思为订阅test1频道 第二个客户端,执行命令: ```c PSUBSCRIBE test[12] ``` 意思为订阅test1、test2频道 我们使用第三个发布命令: ```c PUBLISH test1 hello-test1 ``` 意思为向test1频道发送hello-test1消息。这时我们看到第一个和第二个客户端都可以收到消息 ```c //第一个客户端 1) "message" 2) "test1" 3) "hello-test1" //第二个客户端 1) "pmessage" 2) "test[12]" 3) "test1" 4) "hello-test1" ``` 我们再用第三个客户向test2频道发送消息 ```c PUBLISH test2 hello-test2 ``` 这时,我们只有第二个客户端可以收到消息 ```c //第二个客户端 1) "pmessage" 2) "test[12]" 3) "test2" 4) "hello-test2" ``` # 源码解析 事件的订阅与发布源码主要在pubsub.c 文件中 ## 订阅频道时保存数据的结构 我们通过subscribe命令。数据会被保存在server.pubsub_channels中。 ```c dict *pubsub_channels; /* Map channels to list of subscribed clients */ ``` 他的结构是dict。其中key就是我们订阅的频道名称。而valeu是一个链表结构。每个元素就是我们已经订阅当前频道的连接。 当我们使用subscribe命令时,如果已经有这个频道,则直接在dict中获取当前频道的链表,并将我们的连接添加在队列的头部;如果当前频道不存在,则在dict中添加key。并创建链表。 当使用psubscribe命令时。数据会被保存在server.pubsub_patterns中。 ```c list *pubsub_patterns; ``` 它是一个链表结果。我们订阅的频道会直接添加在链表的头部,当我们发送数据时,会遍历队列的每个元素。判断是否有符合要求的连接。 ## 订阅 subscribe ```c /** * 订阅 * @param c */ void subscribeCommand(client *c) { int j; //多个频道,循环订阅 for (j = 1; j < c->argc; j++) pubsubSubscribeChannel(c,c->argv[j]); c->flags |= CLIENT_PUBSUB; } //订阅当前频道 int pubsubSubscribeChannel(client *c, robj *channel) { dictEntry *de; list *clients = NULL; int retval = 0; /* Add the channel to the client -> channels hash table */ //如果当前连接还没有订阅此频道 if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { retval = 1; incrRefCount(channel); /* Add the client to the channel -> list of clients hash table */ //在server.pubsub_channels查找频道,如果没有就创建一个频道 de = dictFind(server.pubsub_channels,channel); if (de == NULL) { clients = listCreate(); dictAdd(server.pubsub_channels,channel,clients); incrRefCount(channel); } else { clients = dictGetVal(de); } //将当前连接添加到频道队列的头部 listAddNodeTail(clients,c); } /* Notify the client */ //回复消息 addReplyPubsubSubscribed(c,channel); return retval; } ``` ## 批量订阅 psubscribe ```c void psubscribeCommand(client *c) { int j; for (j = 1; j < c->argc; j++) pubsubSubscribePattern(c,c->argv[j]); c->flags |= CLIENT_PUBSUB; } int pubsubSubscribePattern(client *c, robj *pattern) { dictEntry *de; list *clients; int retval = 0; //在pubsub_patterns中查找,如果有。说明当前连接已经订阅了该频道 if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { retval = 1; pubsubPattern *pat; //将该频道记录在当前连接中 listAddNodeTail(c->pubsub_patterns,pattern); incrRefCount(pattern); pat = zmalloc(sizeof(*pat)); pat->pattern = getDecodedObject(pattern); pat->client = c; //将当前频道记录在server.pubsub_patterns中 listAddNodeTail(server.pubsub_patterns,pat); /* Add the client to the pattern -> list of clients hash table */ de = dictFind(server.pubsub_patterns_dict,pattern); if (de == NULL) { clients = listCreate(); dictAdd(server.pubsub_patterns_dict,pattern,clients); incrRefCount(pattern); } else { clients = dictGetVal(de); } listAddNodeTail(clients,c); } /* Notify the client */ addReplyPubsubPatSubscribed(c,pattern); return retval; } ``` ## 发送消息 ```c /** * 发送消息 * @param c */ void publishCommand(client *c) { //发送消息 int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]); else forceCommandPropagation(c,PROPAGATE_REPL); addReplyLongLong(c,receivers); } int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; dictEntry *de; dictIterator *di; listNode *ln; listIter li; /* Send to clients listening for that channel */ //通过频道名称,找到所以订阅当前频道的连接 de = dictFind(server.pubsub_channels,channel); if (de) { list *list = dictGetVal(de); listNode *ln; listIter li; listRewind(list,&li); //遍历所有连接。发送消息 while ((ln = listNext(&li)) != NULL) { client *c = ln->value; addReplyPubsubMessage(c,channel,message); receivers++; } } /* Send to clients listening to matching channels */ // 获取到pubsub_patterns_dict di = dictGetIterator(server.pubsub_patterns_dict); if (di) { channel = getDecodedObject(channel); //遍历所有pubsub_patterns_dict。匹配符合要求的频道。并发送消息 while((de = dictNext(di)) != NULL) { robj *pattern = dictGetKey(de); list *clients = dictGetVal(de); if (!stringmatchlen((char*)pattern->ptr, sdslen(pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0)) continue; listRewind(clients,&li); while ((ln = listNext(&li)) != NULL) { client *c = listNodeValue(ln); addReplyPubsubPatMessage(c,pattern,channel,message); receivers++; } } decrRefCount(channel); dictReleaseIterator(di); } return receivers; } ```
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号