交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
Redis 3-1、rdb实现
分享
未结
0
971
李延
LV6
2021-07-19
悬赏:20积分
# rdb持久化说明 redis是一个内存数据库,如果不想办法将数据保存在磁盘中,服务重启时,数据就会丢失。所以redis提供了rdb功能保存当前数据。 rdb可以手动保存或者自动触发。这是就会将redis的全量数据保存在磁盘中。(每次保存都是全量),当服务重启时,就会加载这个文件。将之前数据重新加载到内存中 # rdb 创建与载入 ## 命令创建 - save 阻塞创建 - bgsave 异步创建 ## 自动创建 我们可以通过配置文件设置,在满足条件后redis自动通过bgsave保存rdb文件。默认情况下配置如下: ``` save 900 1 save 300 10 save 60 10000 ``` 意思为: - 在900秒内,对数据库进行至少1次修改 - 在300秒内,对数据库进行至少10次修改 - 在60秒内,对数据库进行至少10000次修改 ## 文件的载入 在redis启动时,redis会自动加载rdb文件 # rdb文件结构 [https://zhuanlan.zhihu.com/p/311523487](https://zhuanlan.zhihu.com/p/311523487)  # rdb 实现代码 所有代码都在rdb.c 中实现 ## save ```c void saveCommand(client *c) { //子类正在执行。报错 if (server.rdb_child_pid != -1) { addReplyError(c,"Background save already in progress"); return; } rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); //执行rdb操作 if (rdbSave(server.rdb_filename,rsiptr) == C_OK) { //返回执行结果 addReply(c,shared.ok); } else { addReply(c,shared.err); } } int rdbSave(char *filename, rdbSaveInfo *rsi) { char tmpfile[256]; char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */ FILE *fp; rio rdb; int error = 0; //生成零时文件 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); //打开文件,以w权限 fp = fopen(tmpfile,"w"); if (!fp) { char *cwdp = getcwd(cwd,MAXPATHLEN); serverLog(LL_WARNING, "Failed opening the RDB file %s (in server root dir %s) " "for saving: %s", filename, cwdp ? cwdp : "unknown", strerror(errno)); return C_ERR; } //初始化rdb对象, rioInitWithFile(&rdb,fp); startSaving(RDBFLAGS_NONE); if (server.rdb_save_incremental_fsync) rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES); //保存文件 if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) { errno = error; goto werr; } /* Make sure data will not remain on the OS's output buffers */ if (fflush(fp) == EOF) goto werr; if (fsync(fileno(fp)) == -1) goto werr; if (fclose(fp) == EOF) goto werr; /* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ if (rename(tmpfile,filename) == -1) { char *cwdp = getcwd(cwd,MAXPATHLEN); serverLog(LL_WARNING, "Error moving temp DB file %s on the final " "destination %s (in server root dir %s): %s", tmpfile, filename, cwdp ? cwdp : "unknown", strerror(errno)); unlink(tmpfile); stopSaving(0); return C_ERR; } serverLog(LL_NOTICE,"DB saved on disk"); server.dirty = 0; server.lastsave = time(NULL); server.lastbgsave_status = C_OK; stopSaving(1); return C_OK; werr: serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno)); fclose(fp); unlink(tmpfile); stopSaving(0); return C_ERR; } ``` 这里我们看到最后生成的文件是根据我们在配置文件中指定文件名来确定的。并且在保存时,会首先生成一个零时文件进行保存,之后才会覆盖最终的文件。 文件保存的过程在rdbSaveRio方法中。我们继续跟进 ```c int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { dictIterator *di = NULL; dictEntry *de; char magic[10]; int j; uint64_t cksum; size_t processed = 0; if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum; snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); //保存固定的文件头:REDIS%04d if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; //保存固定的AUX字段 if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr; //保存modules相关数据,这块在具体分析后说明 if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr; //遍历每个库 for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; //获取到每个库的dict dict *d = db->dict; //为空则跳过 if (dictSize(d) == 0) continue; //获取到当前库的迭代器 di = dictGetSafeIterator(d); /* Write the SELECT DB opcode */ //保存RDB_OPCODE_SELECTDB。 254,一个字节。说明后面根的是redis具体库的编号 if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr; //保存库编号 if (rdbSaveLen(rdb,j) == -1) goto werr; /* Write the RESIZE DB opcode. */ uint64_t db_size, expires_size; //dict存储数据数量 db_size = dictSize(db->dict); //expires存储数据数量 expires_size = dictSize(db->expires); //保存,241。说明下面数据开始就是我们保存的数据了 if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr; //将dict和expires数据进行保存 if (rdbSaveLen(rdb,db_size) == -1) goto werr; if (rdbSaveLen(rdb,expires_size) == -1) goto werr; /* Iterate this DB writing every entry */ //遍历所有k-v数据 while((de = dictNext(di)) != NULL) { sds keystr = dictGetKey(de); robj key, *o = dictGetVal(de); long long expire; //将key 也转换为robj结构 initStaticStringObject(key,keystr); //获取当前key的过期时间 expire = getExpire(db,&key); //保存当前k-v数据 if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr; /* When this RDB is produced as part of an AOF rewrite, move * accumulated diff from parent to child while rewriting in * order to have a smaller final write. */ if (rdbflags & RDBFLAGS_AOF_PREAMBLE && rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) { processed = rdb->processed_bytes; aofReadDiffFromParent(); } } //执行迭代,如果迭代完成,是否迭代器空间 dictReleaseIterator(di); di = NULL; /* So that we don't release it again on error. */ } /* If we are storing the replication information on disk, persist * the script cache as well: on successful PSYNC after a restart, we need * to be able to process any EVALSHA inside the replication backlog the * master will send us. */ if (rsi && dictSize(server.lua_scripts)) { di = dictGetIterator(server.lua_scripts); while((de = dictNext(di)) != NULL) { robj *body = dictGetVal(de); if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1) goto werr; } dictReleaseIterator(di); di = NULL; /* So that we don't release it again on error. */ } if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr; /* EOF opcode */ //保存文件结束信息 if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr; /* CRC64 checksum. It will be zero if checksum computation is disabled, the * loading code skips the check in this case. */ //保存校验核 cksum = rdb->cksum; memrev64ifbe(&cksum); if (rioWrite(rdb,&cksum,8) == 0) goto werr; return C_OK; werr: if (error) *error = errno; if (di) dictReleaseIterator(di); return C_ERR; } ``` 在上面代码中,我们可以参考rdb的结构,理解整个解析过程。 首先会遍历每个库,之后时遍历每个可以,通过rdbSaveKeyValuePair方法保存每个数据 ## bgsave 代码与save相同,只是开启一个新的线程来出来逻辑 ```c int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { pid_t childpid; if (hasActiveChildProcess()) return C_ERR; server.dirty_before_bgsave = server.dirty; server.lastbgsave_try = time(NULL); openChildInfoPipe(); //创建子线程,创建rdb文件。if里是子线程执行的代码 if ((childpid = redisFork()) == 0) { int retval; /* Child */ redisSetProcTitle("redis-rdb-bgsave"); redisSetCpuAffinity(server.bgsave_cpulist); //与save相同,调用rdbSave retval = rdbSave(filename,rsi); if (retval == C_OK) { sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB"); } //结束子线程 exitFromChild((retval == C_OK) ? 0 : 1); //下面是父线程执行的代码 } else { /* Parent */ //设置rdb_child_pid属性,并返回ok给客户端 if (childpid == -1) { closeChildInfoPipe(); server.lastbgsave_status = C_ERR; serverLog(LL_WARNING,"Can't save in background: fork: %s", strerror(errno)); return C_ERR; } serverLog(LL_NOTICE,"Background saving started by pid %d",childpid); server.rdb_save_time_start = time(NULL); server.rdb_child_pid = childpid; server.rdb_child_type = RDB_CHILD_TYPE_DISK; return C_OK; } return C_OK; /* unreached */ } ``` ## rdb载入 rdb的载入在rdbLoad方法中,这里就不再具体分析每个步骤 # rdb 定时触发机制 我们在前面知道rdb可以通过条件设置自动触发。这部分代码是如何现在的,我们具体看一下 ## 条件保存 在redis启动时,我们设置的条件会被保存在service.h文件的redisServer中 ```c redisServer{ //... struct saveparam *saveparams; //... } ``` 我们debug代码看一下  我们看到这是一个数组,其中有3个元素,就是我们配置文件中的设置 ## dirty计数与lastsave属性 同样在redisServer中有dirty和lastsave两个属性。 - dirty表示上次save数据库更新次数 - lastsave 表示上次save时间 我们在rdbsave方法中可以看到这两个属性的初始化 ```c server.dirty = 0; server.lastsave = time(NULL); ``` ## 判断是否满足条件 在serverCron方法中我们可以找到具体逻辑,其中这个方法时循环事件,每1毫秒会被调用一次。 ```c for (j = 0; j < server.saveparamslen; j++) { struct saveparam *sp = server.saveparams+j; /* Save if we reached the given amount of changes, * the given amount of seconds, and if the latest bgsave was * successful or if, in case of an error, at least * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */ if (server.dirty >= sp->changes && server.unixtime-server.lastsave > sp->seconds && (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY || server.lastbgsave_status == C_OK)) { serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, (int)sp->seconds); rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); rdbSaveBackground(server.rdb_filename,rsiptr); break; } } ``` 这里我们看到就是对我们之前的几个条件的判断。满足条件后调用rdbSaveBackground,也就是我们之前bgsave的方法。
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号