交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
Redis 4-2、哨兵模式分布式(未完)
分享
未结
0
1015
李延
LV6
2021-07-24
悬赏:20积分
# 配置 # 哨兵节点功能 # 网络交互 # 源码解析 ```c //处理集群中的所有实例 void sentinelHandleDictOfRedisInstances(dict *instances) { dictIterator *di; dictEntry *de; sentinelRedisInstance *switch_to_promoted = NULL; /* There are a number of things we need to perform against every master. */ di = dictGetIterator(instances); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); //处理当前实例 sentinelHandleRedisInstance(ri); //如果是主节点,处理子节点 if (ri->flags & SRI_MASTER) { sentinelHandleDictOfRedisInstances(ri->slaves); sentinelHandleDictOfRedisInstances(ri->sentinels); if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) { switch_to_promoted = ri; } } } if (switch_to_promoted) sentinelFailoverSwitchToPromotedSlave(switch_to_promoted); dictReleaseIterator(di); } ``` ```c /* Perform scheduled operations for the specified Redis instance. */ void sentinelHandleRedisInstance(sentinelRedisInstance *ri) { /* ========== MONITORING HALF ============ */ /* Every kind of instance */ //创建连接 sentinelReconnectInstance(ri); //发送ping和info sentinelSendPeriodicCommands(ri); /* ============== ACTING HALF ============= */ /* We don't proceed with the acting half if we are in TILT mode. * TILT happens when we find something odd with the time, like a * sudden change in the clock. */ if (sentinel.tilt) { if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return; sentinel.tilt = 0; sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited"); } /* Every kind of instance */ //判断当前节点是否主观下线,客观下线 sentinelCheckSubjectivelyDown(ri); /* Masters and slaves */ if (ri->flags & (SRI_MASTER|SRI_SLAVE)) { /* Nothing so far. */ } /* Only masters */ //如果是主节点 if (ri->flags & SRI_MASTER) { //检查是否主观下线 sentinelCheckObjectivelyDown(ri); //判断是否需要进入failover if (sentinelStartFailoverIfNeeded(ri)) //在检测到master状态为sdown后,sentinel向其它sentinel节点发送sentinel is-master-down-by-addr消息 sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED); //开始转移异常 sentinelFailoverStateMachine(ri); sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS); } } ``` ```c /** * * 与当前服务器建立两个连接 * * @param ri */ void sentinelReconnectInstance(sentinelRedisInstance *ri) { if (ri->link->disconnected == 0) return; if (ri->addr->port == 0) return; /* port == 0 means invalid address. */ instanceLink *link = ri->link; mstime_t now = mstime(); if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return; ri->link->last_reconn_time = now; /* Commands connection. */ //cc保存的普通的tcp连接,用于发送PING、INFO命令 if (link->cc == NULL) { //创建连接。 link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR); if (!link->cc->err && server.tls_replication && (instanceLinkNegotiateTLS(link->cc) == C_ERR)) { sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #Failed to initialize TLS"); instanceLinkCloseConnection(link,link->cc); } else if (link->cc->err) { sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s", link->cc->errstr); instanceLinkCloseConnection(link,link->cc); } else { link->pending_commands = 0; link->cc_conn_time = mstime(); link->cc->data = link; redisAeAttach(server.el,link->cc); //设置一些回调函数 redisAsyncSetConnectCallback(link->cc, sentinelLinkEstablishedCallback); redisAsyncSetDisconnectCallback(link->cc, sentinelDisconnectCallback); sentinelSendAuthIfNeeded(ri,link->cc); sentinelSetClientName(ri,link->cc,"cmd"); /* Send a PING ASAP when reconnecting. */ //发送PING命令 sentinelSendPing(ri); } } /* Pub / Sub */ //建立订阅频道的连接 if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) { link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR); if (!link->pc->err && server.tls_replication && (instanceLinkNegotiateTLS(link->pc) == C_ERR)) { sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #Failed to initialize TLS"); } else if (link->pc->err) { sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s", link->pc->errstr); instanceLinkCloseConnection(link,link->pc); } else { int retval; link->pc_conn_time = mstime(); link->pc->data = link; redisAeAttach(server.el,link->pc); //设置回调函数 redisAsyncSetConnectCallback(link->pc, sentinelLinkEstablishedCallback); redisAsyncSetDisconnectCallback(link->pc, sentinelDisconnectCallback); sentinelSendAuthIfNeeded(ri,link->pc); sentinelSetClientName(ri,link->pc,"pubsub"); /* Now we subscribe to the Sentinels "Hello" channel. */ //订阅__sentinel__:hello频道 retval = redisAsyncCommand(link->pc, sentinelReceiveHelloMessages, ri, "%s %s", sentinelInstanceMapCommand(ri,"SUBSCRIBE"), SENTINEL_HELLO_CHANNEL); if (retval != C_OK) { /* If we can't subscribe, the Pub/Sub connection is useless * and we can simply disconnect it and try again. */ instanceLinkCloseConnection(link,link->pc); return; } } } /* Clear the disconnected status only if we have both the connections * (or just the commands connection if this is a sentinel instance). */ if (link->cc && (ri->flags & SRI_SENTINEL || link->pc)) link->disconnected = 0; } ``` ```c /* Send periodic PING, INFO, and PUBLISH to the Hello channel to * the specified master or slave instance. */ void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) { mstime_t now = mstime(); mstime_t info_period, ping_period; int retval; /* Return ASAP if we have already a PING or INFO already pending, or * in the case the instance is not properly connected. */ if (ri->link->disconnected) return; /* For INFO, PING, PUBLISH that are not critical commands to send we * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't * want to use a lot of memory just because a link is not working * properly (note that anyway there is a redundant protection about this, * that is, the link will be disconnected and reconnected if a long * timeout condition is detected. */ if (ri->link->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return; /* If this is a slave of a master in O_DOWN condition we start sending * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD * period. In this state we want to closely monitor slaves in case they * are turned into masters by another Sentinel, or by the sysadmin. * * Similarly we monitor the INFO output more often if the slave reports * to be disconnected from the master, so that we can have a fresh * disconnection time figure. */ //普通情况下10秒发送,主服务器失联 1秒发送 if ((ri->flags & SRI_SLAVE) && ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) || (ri->master_link_down_time != 0))) { info_period = 1000; } else { info_period = SENTINEL_INFO_PERIOD; } /* We ping instances every time the last received pong is older than * the configured 'down-after-milliseconds' time, but every second * anyway if 'down-after-milliseconds' is greater than 1 second. */ ping_period = ri->down_after_period; if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD; /* Send INFO to masters and slaves, not sentinels. */ //发送INFO消息 if ((ri->flags & SRI_SENTINEL) == 0 && (ri->info_refresh == 0 || (now - ri->info_refresh) > info_period)) { retval = redisAsyncCommand(ri->link->cc, sentinelInfoReplyCallback, ri, "%s", sentinelInstanceMapCommand(ri,"INFO")); if (retval == C_OK) ri->link->pending_commands++; } /* Send PING to all the three kinds of instances. */ //间隔1秒发送ping信息 if ((now - ri->link->last_pong_time) > ping_period && (now - ri->link->last_ping_time) > ping_period/2) { sentinelSendPing(ri); } /* PUBLISH hello messages to all the three kinds of instances. */ //间隔事件大于2描述发送在hello中发送消息。内容为当前哨兵节点信息,与主节点信息 if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) { sentinelSendHello(ri); } } ``` ```c /* Is this instance down from our point of view? */ void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) { mstime_t elapsed = 0; if (ri->link->act_ping_time) elapsed = mstime() - ri->link->act_ping_time; else if (ri->link->disconnected) elapsed = mstime() - ri->link->last_avail_time; /* Check if we are in need for a reconnection of one of the * links, because we are detecting low activity. * * 1) Check if the command link seems connected, was connected not less * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a * pending ping for more than half the timeout. */ //当心跳超过一定时间,就任务服务器断开连接。我们需要关闭当前连接 if (ri->link->cc && (mstime() - ri->link->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD && ri->link->act_ping_time != 0 && /* There is a pending ping... */ /* The pending ping is delayed, and we did not receive * error replies as well. */ (mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) && (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2)) { //关闭连接 instanceLinkCloseConnection(ri->link,ri->link->cc); } /* 2) Check if the pubsub link seems connected, was connected not less * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no * activity in the Pub/Sub channel for more than * SENTINEL_PUBLISH_PERIOD * 3. */ //检查pc连接(订阅频道的连接) if (ri->link->pc && (mstime() - ri->link->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD && (mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3)) { instanceLinkCloseConnection(ri->link,ri->link->pc); } /* Update the SDOWN flag. We believe the instance is SDOWN if: * * 1) It is not replying. * 2) We believe it is a master, it reports to be a slave for enough time * to meet the down_after_period, plus enough time to get two times * INFO report from the instance. */ if (elapsed > ri->down_after_period || (ri->flags & SRI_MASTER && ri->role_reported == SRI_SLAVE && mstime() - ri->role_reported_time > (ri->down_after_period+SENTINEL_INFO_PERIOD*2))) { /* Is subjectively down */ if ((ri->flags & SRI_S_DOWN) == 0) { sentinelEvent(LL_WARNING,"+sdown",ri,"%@"); ri->s_down_since_time = mstime(); //修改状态为下线 ri->flags |= SRI_S_DOWN; } } else { /* Is subjectively up */ if (ri->flags & SRI_S_DOWN) { sentinelEvent(LL_WARNING,"-sdown",ri,"%@"); ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT); } } } ``` ```c void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) { dictIterator *di; dictEntry *de; unsigned int quorum = 0, odown = 0; //如果客观下线 if (master->flags & SRI_S_DOWN) { /* Is down for enough sentinels? */ quorum = 1; /* the current sentinel. */ /* Count all the other sentinels. */ //遍历其他哨兵节点 di = dictGetIterator(master->sentinels); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); //其他哨兵节点反馈自己记录的主节点信息 if (ri->flags & SRI_MASTER_DOWN) quorum++; } dictReleaseIterator(di); if (quorum >= master->quorum) odown = 1; } /* Set the flag accordingly to the outcome. */ //超过我们设置的哨兵节点判断主节点下线,我们就问题它是主观下线 if (odown) { if ((master->flags & SRI_O_DOWN) == 0) { sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d", quorum, master->quorum); master->flags |= SRI_O_DOWN; master->o_down_since_time = mstime(); } } else { if (master->flags & SRI_O_DOWN) { sentinelEvent(LL_WARNING,"-odown",master,"%@"); master->flags &= ~SRI_O_DOWN; } } } ``` ```c int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) { /* We can't failover if the master is not in O_DOWN state. */ //判断是否是主观下线 if (!(master->flags & SRI_O_DOWN)) return 0; /* Failover already in progress? */ //判断是否正在错误转移 if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0; /* Last failover attempt started too little time ago? */ if (mstime() - master->failover_start_time < master->failover_timeout*2) { if (master->failover_delay_logged != master->failover_start_time) { time_t clock = (master->failover_start_time + master->failover_timeout*2) / 1000; char ctimebuf[26]; ctime_r(&clock,ctimebuf); ctimebuf[24] = '\0'; /* Remove newline. */ master->failover_delay_logged = master->failover_start_time; serverLog(LL_WARNING, "Next failover delay: I will not start a failover before %s", ctimebuf); } return 0; } //设置节点为正在错误转移状态,设置flags状态 sentinelStartFailover(master); return 1; } ``` ```c void sentinelFailoverStateMachine(sentinelRedisInstance *ri) { serverAssert(ri->flags & SRI_MASTER); if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return; //不同阶段触发不同的任务 switch(ri->failover_state) { //开始转移异常,判断如果是leader,进入下一阶段 case SENTINEL_FAILOVER_STATE_WAIT_START: sentinelFailoverWaitStart(ri); break; //提升主节点阶段,判断应该将哪个几点提升为主节点 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: sentinelFailoverSelectSlave(ri); break; //向需要提升的节点发送 SLAVEOF NO ONE 取消关联 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: sentinelFailoverSendSlaveOfNoOne(ri); break; //等待INFO 结果,如果变为了主,进入下一阶段 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: sentinelFailoverWaitPromotion(ri); break; //向其他从节点发送slave消息 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: sentinelFailoverReconfNextSlave(ri); break; } } ```
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号