交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
Redis 4-1、主从分布式
分享
未结
0
1003
李延
LV6
2021-07-20
悬赏:20积分
# 说明 在redis中,用户可以通过slaveof命令,让当前服务器去复制其他服务器的数据,我们将执行slaveof的服务器称为从服务器,被复制的服务器成为主服务器。 在这种模式中两个服务器数据始终时一致的,我们可以在主服务器上进行任何curd的操作,此时主服务器会将数据同步至从服务器,而从服务器依然可以进行查询操作。但无法进行数据的修改操作。 在这种主从模式下,可以实现数据的备份,我们可以在主或者从服务器上查询数据,但当主服务器宕机时,从服务器无法进行数据修改,会导致redis不可用。 # 主从模式设置 ## 通过命令行 我们可以通过在从服务器的客户端通过slaveof命令设置它的主服务器。 ```java SLAVEOF 127.0.0.1 6379 ``` 通过这个命令我们就可以将当前服务器设置为从服务器,实时与6379的主服务器同步数据 并且我们可以通过role查看各服务器的角色,我们在本地启动2个redis,结果如下: ```java 127.0.0.1:6378> role 1) "slave" 2) "127.0.0.1" 3) (integer) 6379 4) "connected" 5) (integer) 544 127.0.0.1:6379> role 1) "master" 2) (integer) 572 3) 1) 1) "127.0.0.1" 2) "6378" 3) "572" ``` ## 配置文件 我们也可以直接通过配置文件为当前服务器设置主服务器 ```java slaveof 127.0.0.1 6379 ``` # 数据同步机制 这里直接使用书中的这个表,我认为描述的特别清楚  # 主从模式后的一些特征 在我初步接触主从后心中有一下的几点问题。在这里进行记录和分析 1. 主服务器和从服务器分别都可以进行哪些curd的操作? 主服务器可以进行任何操作,但从服务器只可以进行查询操作 2. 从服务器如果突然短线会如何? 主服务器会尝试ping从服务器,多次尝试后,无法连接就会丢弃 3. 断线后的从服务器重连会如何? 重新同步数据 4. 主服务器断线会如何? 从服务器会一直尝试连接主服务器,不会停止。并且从服务器仍然只可以查询数据,无法修改数据。 5. 断线后的主服务器重连会如何? 重新同步数据 6. 如果从服务器以前就有数据之后再与其他服务器组为主从模式,那么两个数据的服务器会如何? 从服务器自己的数据仍然后保留,但不会同步只主服务器,如果主从有通用key的数据,会保留主服务器的数据。 # 各阶段的数据交互 本段落会模拟两台主从服务器之间的网络数据交互,在下面代码中也是给予本段落内容来实现的,方便我们理解具体的代码。 ## 基本交互步骤 1. 当我们输入SLAVEOF命令后从服务器首先是发送PING命令,尝试连接主服务器。主回复PONG表示在线 2. 将本机的基本信息同步至主服务器,包括本机的ip与port,已经支持的同步类型。 发送命令如下: ```java REPLCONF listening-port 6378 REPLCONF ip-address 127.0.0.1 REPLCONF capa eof capa psync2 ``` 3. 发送PSYNC命令开始同步数据。 首次同步时,主服务器将rdb文件发送给从服务器,进行数据同步 在后续的数据同步时,是通过命令传播的形式进行数据同步的。 4. 主服务器当接受到PSYNC命令后,就会将其设置为从服务器。并且会进行定时的心跳坚持,在多次尝试无法连接后会断开从服务器。 ## 通过telnet模拟 上面我们大致说明了整个数据交互的过程。下面我们就以从服务器的身份进行一下模拟,这里我们使用telnet或者mac电脑的nc命令与主服务建立tcp连接 第一步:发送ping命令与自己服务的基本信息  这里我们看到在我们发送几个命令后主服务都回复了我们+OK,表示成功。 第二步:开始同步数据  这里我们首先接受到“+FULLRESYNC 4dc87334742813146f0e9a7eca21ab379dbde204 2644”表示开始同步。下面就是主服务的rdb内容。 这时,从服务就会根据这些内容进行数据同步。 第三步:命令传播 我们通过在主服务上添加数据,比如添加一个key为test的数据。那么我们就会收到相同的数据。  这里我们看到首先是select 到 0这个数据库,之后就是我们刚才在主服务上的一些命令。 第四步:心跳检测 在我们发送PSYNC我们会定时收到主服务器的PING命令,并且在5次后,我们没有回复,连接就会断开  # 代码实现 所有代码都在replication中,我们以slaveof命令为入口查看整个过程。 ## replicaofCommand 当我们执行slaveof命令后,调用的就是这个方法。我们具体看一下 ```c void replicaofCommand(client *c) { /* SLAVEOF is not allowed in cluster mode as replication is automatically * configured using the current address of the master node. */ //在非集群模式下才可以开启 if (server.cluster_enabled) { addReplyError(c,"REPLICAOF not allowed in cluster mode."); return; } /* The special host/port combination "NO" "ONE" turns the instance * into a master. Otherwise the new master address is set. */ //对于 SLAVEOF no one 命令,将当前服务从主从分布式中去除 if (!strcasecmp(c->argv[1]->ptr,"no") && !strcasecmp(c->argv[2]->ptr,"one")) { if (server.masterhost) { replicationUnsetMaster(); sds client = catClientInfoString(sdsempty(),c); serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')", client); sdsfree(client); } } else { long port; //副本客户端,不执行命令。具体什么情况下是副本客户端目前不知 if (c->flags & CLIENT_SLAVE) { /* If a client is already a replica they cannot run this command, * because it involves flushing all replicas (including this * client) */ addReplyError(c, "Command is not valid when client is a replica."); return; } //将指定的参数转换为long,并负值到port中 if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK)) return; /* Check if we are already attached to the specified slave */ //判断目前与之前的主服务是否相同 if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) && server.masterport == port) { serverLog(LL_NOTICE,"REPLICAOF would result into synchronization " "with the master we are already connected " "with. No operation performed."); addReplySds(c,sdsnew("+OK Already connected to specified " "master\r\n")); return; } /* There was no previous master or the user specified a different one, * we can continue. */ // 主从设置 replicationSetMaster(c->argv[1]->ptr, port); //将client连接信息转换为字符串 sds client = catClientInfoString(sdsempty(),c); //日志 serverLog(LL_NOTICE,"REPLICAOF %s:%d enabled (user request from '%s')", server.masterhost, server.masterport, client); //释放空间 sdsfree(client); } //返回结果 addReply(c,shared.ok); } ``` 这里我们看到基本上时对于自己服务器的一些情况判断,主要方法为replicationSetMaster。我们继续跟进 ```java void replicationSetMaster(char *ip, int port) { //.... //设置ip sdsfree(server.masterhost); server.masterhost = sdsnew(ip); //设置端口 server.masterport = port; if (server.master) { freeClient(server.master); } //... //将复制状态改为等待连接状态 server.repl_state = REPL_STATE_CONNECT; } ``` 这里我们只展示了主要的代码,分别为为设置主节点ip和修改repl_state状态。在这里我们只是设置了需要同步的状态,没有进行具体的设置操作。 ## replicationCron 检测复制状态 replicationCron方法在serverCron中被调用。我们知道redis服务器是单线程的,一直在不断循环serverCron方法。 在serverCron中是这样被调用的 ```c /* Replication cron function -- used to reconnect to master, * detect transfer failures, start background RDB transfers and so forth. */ run_with_period(1000) replicationCron(); ``` 也就是说在服务中,每秒被调用一次。 这里我们只分析与repl_state相关代码 ```c /* Check if we should connect to a MASTER */ //需要同步数据,创建连接 if (server.repl_state == REPL_STATE_CONNECT) { serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", server.masterhost, server.masterport); if (connectWithMaster() == C_OK) { serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started"); } } ``` 因为我们之前将repl_state设置为REPL_STATE_CONNECT状态。所有会去执行connectWithMaster方法 ```c int connectWithMaster(void) { server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket(); if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport, NET_FIRST_BIND_ADDR, syncWithMaster) == C_ERR) { serverLog(LL_WARNING,"Unable to connect to MASTER: %s", connGetLastError(server.repl_transfer_s)); connClose(server.repl_transfer_s); server.repl_transfer_s = NULL; return C_ERR; } server.repl_transfer_lastio = server.unixtime; server.repl_state = REPL_STATE_CONNECTING; return C_OK; } ``` 这里我们看到主要是与主服务器创建连接,同时设置了syncWithMaster方法。也就是说。只要与主服务器步端口连接,就会一直去循环调用syncWithMaster方法。 ## syncWithMaster ```c void syncWithMaster(connection *conn) { char tmpfile[256], *err = NULL; int dfd = -1, maxtries = 5; int psync_result; /* If this event fired after the user turned the instance into a master * with SLAVEOF NO ONE we must just return ASAP. */ if (server.repl_state == REPL_STATE_NONE) { connClose(conn); return; } /* Check for errors in the socket: after a non blocking connect() we * may find that the socket is in error state. */ if (connGetState(conn) != CONN_STATE_CONNECTED) { serverLog(LL_WARNING,"Error condition on socket for SYNC: %s", connGetLastError(conn)); goto error; } /* Send a PING to check the master is able to reply without errors. */ //ping 主服务器 if (server.repl_state == REPL_STATE_CONNECTING) { serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event."); /* Delete the writable event so that the readable event remains * registered and we can wait for the PONG reply. */ connSetReadHandler(conn, syncWithMaster); connSetWriteHandler(conn, NULL); server.repl_state = REPL_STATE_RECEIVE_PONG; /* Send the PING, don't check for errors at all, we have the timeout * that will take care about this. */ err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PING",NULL); if (err) goto write_error; return; } /* Receive the PONG command. */ //执行ping后等待主服务器回复 if (server.repl_state == REPL_STATE_RECEIVE_PONG) { err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL); /* We accept only two replies as valid, a positive +PONG reply * (we just check for "+") or an authentication error. * Note that older versions of Redis replied with "operation not * permitted" instead of using a proper error code, so we test * both. */ if (err[0] != '+' && strncmp(err,"-NOAUTH",7) != 0 && strncmp(err,"-ERR operation not permitted",28) != 0) { serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err); sdsfree(err); goto error; } else { serverLog(LL_NOTICE, "Master replied to PING, replication can continue..."); } sdsfree(err); server.repl_state = REPL_STATE_SEND_AUTH; } /* AUTH with the master if required. */ //用户认证 if (server.repl_state == REPL_STATE_SEND_AUTH) { if (server.masteruser && server.masterauth) { err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH", server.masteruser,server.masterauth,NULL); if (err) goto write_error; server.repl_state = REPL_STATE_RECEIVE_AUTH; return; } else if (server.masterauth) { err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH",server.masterauth,NULL); if (err) goto write_error; server.repl_state = REPL_STATE_RECEIVE_AUTH; return; } else { server.repl_state = REPL_STATE_SEND_PORT; } } /* Receive AUTH reply. */ if (server.repl_state == REPL_STATE_RECEIVE_AUTH) { err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL); if (err[0] == '-') { serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err); sdsfree(err); goto error; } sdsfree(err); server.repl_state = REPL_STATE_SEND_PORT; } /* Set the slave port, so that Master's INFO command can list the * slave listening port correctly. */ //设置端口 if (server.repl_state == REPL_STATE_SEND_PORT) { int port; if (server.slave_announce_port) port = server.slave_announce_port; else if (server.tls_replication && server.tls_port) port = server.tls_port; else port = server.port; sds portstr = sdsfromlonglong(port); err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF", "listening-port",portstr, NULL); sdsfree(portstr); if (err) goto write_error; sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_PORT; return; } /* Receive REPLCONF listening-port reply. */ //端口设置后等待回复 if (server.repl_state == REPL_STATE_RECEIVE_PORT) { err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL); /* Ignore the error if any, not all the Redis versions support * REPLCONF listening-port. */ if (err[0] == '-') { serverLog(LL_NOTICE,"(Non critical) Master does not understand " "REPLCONF listening-port: %s", err); } sdsfree(err); server.repl_state = REPL_STATE_SEND_IP; } /* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */ //设置ip if (server.repl_state == REPL_STATE_SEND_IP && server.slave_announce_ip == NULL) { server.repl_state = REPL_STATE_SEND_CAPA; } /* Set the slave ip, so that Master's INFO command can list the * slave IP address port correctly in case of port forwarding or NAT. */ if (server.repl_state == REPL_STATE_SEND_IP) { err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF", "ip-address",server.slave_announce_ip, NULL); if (err) goto write_error; sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_IP; return; } /* Receive REPLCONF ip-address reply. */ if (server.repl_state == REPL_STATE_RECEIVE_IP) { err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL); /* Ignore the error if any, not all the Redis versions support * REPLCONF listening-port. */ if (err[0] == '-') { serverLog(LL_NOTICE,"(Non critical) Master does not understand " "REPLCONF ip-address: %s", err); } sdsfree(err); server.repl_state = REPL_STATE_SEND_CAPA; } /* Inform the master of our (slave) capabilities. * * EOF: supports EOF-style RDB transfer for diskless replication. * PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>. * * The master will ignore capabilities it does not understand. */ //设置capa信息 if (server.repl_state == REPL_STATE_SEND_CAPA) { err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF", "capa","eof","capa","psync2",NULL); if (err) goto write_error; sdsfree(err); server.repl_state = REPL_STATE_RECEIVE_CAPA; return; } /* Receive CAPA reply. */ //capa回复 if (server.repl_state == REPL_STATE_RECEIVE_CAPA) { err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL); /* Ignore the error if any, not all the Redis versions support * REPLCONF capa. */ if (err[0] == '-') { serverLog(LL_NOTICE,"(Non critical) Master does not understand " "REPLCONF capa: %s", err); } sdsfree(err); server.repl_state = REPL_STATE_SEND_PSYNC; } /* Try a partial resynchonization. If we don't have a cached master * slaveTryPartialResynchronization() will at least try to use PSYNC * to start a full resynchronization so that we get the master run id * and the global offset, to try a partial resync at the next * reconnection attempt. */ //发送psync信息 if (server.repl_state == REPL_STATE_SEND_PSYNC) { if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) { err = sdsnew("Write error sending the PSYNC command."); goto write_error; } server.repl_state = REPL_STATE_RECEIVE_PSYNC; return; } /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */ if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) { serverLog(LL_WARNING,"syncWithMaster(): state machine error, " "state should be RECEIVE_PSYNC but is %d", server.repl_state); goto error; } //接受psync回复 psync_result = slaveTryPartialResynchronization(conn,1); if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ /* If the master is in an transient error, we should try to PSYNC * from scratch later, so go to the error path. This happens when * the server is loading the dataset or is not connected with its * master and so forth. */ if (psync_result == PSYNC_TRY_LATER) goto error; /* Note: if PSYNC does not return WAIT_REPLY, it will take care of * uninstalling the read handler from the file descriptor. */ if (psync_result == PSYNC_CONTINUE) { serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization."); if (server.supervised_mode == SUPERVISED_SYSTEMD) { redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections.\n"); redisCommunicateSystemd("READY=1\n"); } return; } /* PSYNC failed or is not supported: we want our slaves to resync with us * as well, if we have any sub-slaves. The master may transfer us an * entirely different data set and we have no way to incrementally feed * our slaves after that. */ disconnectSlaves(); /* Force our slaves to resync with us as well. */ freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC * and the server.master_replid and master_initial_offset are * already populated. */ if (psync_result == PSYNC_NOT_SUPPORTED) { serverLog(LL_NOTICE,"Retrying with SYNC..."); if (connSyncWrite(conn,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { serverLog(LL_WARNING,"I/O error writing to MASTER: %s", strerror(errno)); goto error; } } /* Prepare a suitable temp file for bulk transfer */ if (!useDisklessLoad()) { while(maxtries--) { snprintf(tmpfile,256, "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); if (dfd != -1) break; sleep(1); } if (dfd == -1) { serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno)); goto error; } server.repl_transfer_tmpfile = zstrdup(tmpfile); server.repl_transfer_fd = dfd; } /* Setup the non blocking download of the bulk file. */ //设置readSyncBulkPayload函数,等待读取rdb文件 if (connSetReadHandler(conn, readSyncBulkPayload) == C_ERR) { char conninfo[CONN_INFO_LEN]; serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo))); goto error; } server.repl_state = REPL_STATE_TRANSFER; server.repl_transfer_size = -1; server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; server.repl_transfer_lastio = server.unixtime; return; error: if (dfd != -1) close(dfd); connClose(conn); server.repl_transfer_s = NULL; if (server.repl_transfer_fd != -1) close(server.repl_transfer_fd); if (server.repl_transfer_tmpfile) zfree(server.repl_transfer_tmpfile); server.repl_transfer_tmpfile = NULL; server.repl_transfer_fd = -1; server.repl_state = REPL_STATE_CONNECT; return; write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */ serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err); sdsfree(err); goto error; } ``` 这里我们看到基本就是我们尝试与主服务器网络连接的过程。从ping 到psync命令的发送。但这个方法将具体的文件加载过程交给了readSyncBulkPayload方法去执行,这里就不再说明了
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号