交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
第6 章 FsEditLog与FSImage
分享
未结
0
775
李延
LV6
2022-03-08
悬赏:20积分
# 0、作用 在上一章章节中,我们看到NameNode的数据都是存储在内存中,而当服务重启时,内存数据就会丢失。所以hdfs通过FsEditLog与Fsimage来实现目录树的持久化,其中: - FsEditLog为操作日志,对于任何修改目录树的操作,都会优先记录在这里。 - FsImage为内存数据的镜像,会定时将FsEditLog数据合并为快照形式进行存储,一般会比较大。 当服务启动时就会从FsEditLog与FsImage这些文件中恢复目录树。 # 1、文件存储结构  我们看到在这个目录下一共有一下几种文件 - seen_txid 记录这当前 image文件已经记录到的txid - edits_{start_txid}-{end_txid} 这类文件是以一个txid开始到一个txid结束,表示当前文件记录了哪些txid的日志操作。 - edits_improgress_{start_txid} 这里文件是正在打开的日志文件,其中txid表示当前文件开始的txid号码 - fsimage_{end_txid} 快照文件,表示已经记录到的txid日志。 其实根据这些文件命名我们也可以大概猜出它记录日志的逻辑 1. 系统中维护一个txid,每记录一条日志就增加1。 2. 打开一个edits 文件,其中文件名包括一个txid,表示这个文件最开始记录的日志id。 3. 当打开的edits文件达到设置的最大值后,就需要重新打开一个新的文件,步骤有:先关闭当前正在打开的文件,并重命名,包括两个txid,一个表示开始,一个表示结束的id。同时打开一个新的edit文件,命名同步骤2. 4. 当edit记录到一定量后,另开线程进行快照操作,合并多个edits。文件名以当前已经记录到的日志id命名。并修改seen_txid的值。 # 2、FsEditLog 我们具体来分析一下edit的实现过程。 ## 2.1 状态机制 对于FsEditLog 分为一下几种状态 ``` UNINITIALIZED 未初始化,文件刚创建时的状态 BETWEEN_LOG_SEGMENTS 表示上一个文件已经关闭,但下一个文件还没有准备好 IN_SEGMENT 可编辑状态 OPEN_FOR_READING 只读状态,对于非活跃节点时的状态 CLOSED 关闭状态 ``` # 2.2 initJournalsForWrite 这个方法就是日志目录的初始化操作。在程序启动时调用,初始化日志目录 ```java public synchronized void initJournalsForWrite() { //检查状态 Preconditions.checkState(state == State.UNINITIALIZED || state == State.CLOSED, "Unexpected state: %s", state); //初始化,这里的editsDirs 就是我们配置的日志目录,可以是多个 initJournals(this.editsDirs); //设置为BETWEEN_LOG_SEGMENTS 说明已经初始化完成,但还没有打开日志文件 state = State.BETWEEN_LOG_SEGMENTS; } ``` ```java private synchronized void initJournals(List<URI> dirs) { int minimumRedundantJournals = conf.getInt( DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY, DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT); synchronized(journalSetLock) { journalSet = new JournalSet(minimumRedundantJournals); //遍历所以的目录,依次创建journal对象 for (URI u : dirs) { boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf) .contains(u); if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) { StorageDirectory sd = storage.getStorageDirectory(u); //将所有对象都放到journalSet里,方便批量操作 if (sd != null) { journalSet.add(new FileJournalManager(conf, sd, storage), required, sharedEditsDirs.contains(u)); } } else { journalSet.add(createJournal(u), required, sharedEditsDirs.contains(u)); } } } if (journalSet.isEmpty()) { LOG.error("No edits directories configured!"); } } ``` 对于具体journalSet的作用我们在单独章节说明 ## 2.3 rollEditLog rollEditLog 就是完成当前日志文件的记录,并打开一个新日志文件,具体如下: ```java synchronized long rollEditLog(int layoutVersion) throws IOException { LOG.info("Rolling edit logs"); //关闭当前文件 endCurrentLogSegment(true); long nextTxId = getLastWrittenTxId() + 1; //打开新的文件 startLogSegmentAndWriteHeaderTxn(nextTxId, layoutVersion); assert curSegmentTxId == nextTxId; return nextTxId; } ``` ```java public synchronized void endCurrentLogSegment(boolean writeEndTxn) { LOG.info("Ending log segment " + curSegmentTxId + ", " + getLastWrittenTxId()); Preconditions.checkState(isSegmentOpen(), "Bad state: %s", state); //记录结束日志 if (writeEndTxn) { logEdit(LogSegmentOp.getInstance(cache.get(), FSEditLogOpCodes.OP_END_LOG_SEGMENT)); } // always sync to ensure all edits are flushed. //立即持久化 logSyncAll(); printStatistics(true); final long lastTxId = getLastWrittenTxId(); final long lastSyncedTxId = getSyncTxId(); Preconditions.checkArgument(lastTxId == lastSyncedTxId, "LastWrittenTxId %s is expected to be the same as lastSyncedTxId %s", lastTxId, lastSyncedTxId); try { //关闭当前文件 journalSet.finalizeLogSegment(curSegmentTxId, lastTxId); editLogStream = null; } catch (IOException e) { //All journals have failed, it will be handled in logSync. } state = State.BETWEEN_LOG_SEGMENTS; } ``` ```java synchronized void startLogSegmentAndWriteHeaderTxn(final long segmentTxId, int layoutVersion) throws IOException { //打开新的文件 startLogSegment(segmentTxId, layoutVersion); //记录打开日志 logEdit(LogSegmentOp.getInstance(cache.get(), FSEditLogOpCodes.OP_START_LOG_SEGMENT)); //;立即刷新 logSync(); } ``` 我们看到几乎所有的操作都是在Journal中完成的。 ## 2.4 logEdit logEdit是所有日志操作调用的方法,其中根据不同的FSEditLogOp对日志进行记录不同的日志。 ```java void logEdit(final FSEditLogOp op) { boolean needsSync = false; synchronized (this) { assert isOpenForWrite() : "bad state: " + state; // wait if an automatic sync is scheduled //如果当前正在持久化,则等待执行完成。 waitIfAutoSyncScheduled(); // check if it is time to schedule an automatic sync // 写日志操作,同时判断是否需要持久化 needsSync = doEditTransaction(op); if (needsSync) { isAutoSyncScheduled = true; } } // Sync the log if an automatic sync is required. //进行持久化 if (needsSync) { logSync(); } } ``` 对于具体写的操作,主要是在editLogStream这个对象中,具体如下: ``` synchronized boolean doEditTransaction(final FSEditLogOp op) { long start = beginTransaction(); op.setTransactionId(txid); try { editLogStream.write(op); } catch (IOException ex) { // All journals failed, it is handled in logSync. } finally { op.reset(); } endTransaction(start); return shouldForceSync(); } ``` ## 2.4 logSync 写的操作首先是在内存当中,当缓冲区满后,就需要持久化。具体如下: ``` protected void logSync(long mytxid) { long syncStart = 0; boolean sync = false; long editsBatchedInSync = 0; try { EditLogOutputStream logStream = null; synchronized (this) { try { printStatistics(false); // if somebody is already syncing, then wait while (mytxid > synctxid && isSyncRunning) { try { wait(1000); } catch (InterruptedException ie) { } } // // If this transaction was already flushed, then nothing to do // if (mytxid <= synctxid) { return; } // now, this thread will do the sync. track if other edits were // included in the sync - ie. batched. if this is the only edit // synced then the batched count is 0 editsBatchedInSync = txid - synctxid - 1; syncStart = txid; isSyncRunning = true; sync = true; // swap buffers try { if (journalSet.isEmpty()) { throw new IOException("No journals available to flush"); } //切换缓冲区的读写缓冲。 editLogStream.setReadyToFlush(); } catch (IOException e) { final String msg = "Could not sync enough journals to persistent storage " + "due to " + e.getMessage() + ". " + "Unsynced transactions: " + (txid - synctxid); LOG.error(msg, new Exception()); synchronized(journalSetLock) { IOUtils.cleanupWithLogger(LOG, journalSet); } terminate(1, msg); } } finally { // Prevent RuntimeException from blocking other log edit write doneWithAutoSyncScheduling(); } //editLogStream may become null, //so store a local variable for flush. logStream = editLogStream; } // do the sync long start = monotonicNow(); try { //持久化 if (logStream != null) { logStream.flush(); } } catch (IOException ex) { synchronized (this) { final String msg = "Could not sync enough journals to persistent storage. " + "Unsynced transactions: " + (txid - synctxid); LOG.error(msg, new Exception()); synchronized(journalSetLock) { IOUtils.cleanupWithLogger(LOG, journalSet); } terminate(1, msg); } } long elapsed = monotonicNow() - start; if (metrics != null) { // Metrics non-null only when used inside name node metrics.addSync(elapsed); metrics.incrTransactionsBatchedInSync(editsBatchedInSync); numTransactionsBatchedInSync.add(editsBatchedInSync); } } finally { // Prevent RuntimeException from blocking other log edit sync synchronized (this) { if (sync) { synctxid = syncStart; for (JournalManager jm : journalSet.getJournalManagers()) { /** * {@link FileJournalManager#lastReadableTxId} is only meaningful * for file-based journals. Therefore the interface is not added to * other types of {@link JournalManager}. */ if (jm instanceof FileJournalManager) { ((FileJournalManager)jm).setLastReadableTxId(syncStart); } } isSyncRunning = false; } this.notifyAll(); } } } ``` 这里主要就是调用两个EditLogOutputStream的方法,我们在具体章节说明 ## 3 FsImage //TODO 后期再看吧 # 4 检查点机制 一个正常大小的editlog文件往往在几十到几百个字节之间,但在某些极端的情况下,editlog文件会变得非常大,甚至将磁盘空间写满。通过上面小节的学习我们知道,在Namenode启动过程中一个很重要的部分就是逐条读取editlog文件中的记录,之后与Namenode命名空间合并。巨大的editlog文件会导致Namenode的启动时间过长,为了解决这个问题,HDFS引入了检查点机制(checkpointing)。 在以下两种情况下,Namenode会触发一次检查点操作:①超过了配置的检查点操作时长(dfs.namenode.checkpoint.period配置项配置);②从上一次检查点操作后,发生的事务(transaction)数超过了配置(dfs.namenode.checkpoint.txns配置项配置)。
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号