交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
第4-2 章:DFSPacket
分享
未结
0
608
李延
LV6
2022-04-11
悬赏:20积分
# 1. 说明 在前面DFSOutputStream分析中,DFSPacket就是DFSPOutputStream与DFSStreamer的桥梁, # 2 成员变量 ```java private byte[] buf; private final boolean lastPacketInBlock; private int checksumStart; private int checksumPos; private final int dataStart; private int dataPos; ``` 这里主要是buf 数组,最后发送的也是这个数组的内容。它主要由3组内容组成,而它主要由下面4个变量记录各自位置的下标 ```java [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___] ^ ^ ^ ^ | checksumPos dataStart dataPos checksumStart ``` 数据最开始是 数据头,它是从buf的0开始到checksumStart结束。 后面跟着的是校验和,它是从checksumStart到checksumPos。 最后是我们传送的数据从dataStart到dataPos。 lastPacketInBlock 表示是否为当前Block的最后一块数据。 # 3 初始化 构造函数也是我们上面描述的几个变量的初始化 ```java public DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno, int checksumSize, boolean lastPacketInBlock) { this.lastPacketInBlock = lastPacketInBlock; this.numChunks = 0; this.offsetInBlock = offsetInBlock; this.seqno = seqno; this.buf = buf; checksumStart = PacketHeader.PKT_MAX_HEADER_LEN; checksumPos = checksumStart; dataStart = checksumStart + (chunksPerPkt * checksumSize); dataPos = dataStart; maxChunks = chunksPerPkt; } ``` #4 写入数据 ```java synchronized void writeData(byte[] inarray, int off, int len) throws ClosedChannelException { checkBuffer(); if (dataPos + len > buf.length) { throw new BufferOverflowException(); } System.arraycopy(inarray, off, buf, dataPos, len); dataPos += len; } ``` ```java public synchronized void writeChecksum(byte[] inarray, int off, int len) throws ClosedChannelException { checkBuffer(); if (len == 0) { return; } if (checksumPos + len > dataStart) { throw new BufferOverflowException(); } System.arraycopy(inarray, off, buf, checksumPos, len); checksumPos += len; } ``` # 5 读取数据 这一步就是将buf 数据发送到DataOutputStream中 ```java public synchronized void writeTo(DataOutputStream stm) throws IOException { checkBuffer(); final int dataLen = dataPos - dataStart; final int checksumLen = checksumPos - checksumStart; final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen; PacketHeader header = new PacketHeader( pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock); if (checksumPos != dataStart) { // Move the checksum to cover the gap. This can happen for the last // packet or during an hflush/hsync call. System.arraycopy(buf, checksumStart, buf, dataStart - checksumLen , checksumLen); checksumPos = dataStart; checksumStart = checksumPos - checksumLen; } final int headerStart = checksumStart - header.getSerializedSize(); assert checksumStart + 1 >= header.getSerializedSize(); assert headerStart >= 0; assert headerStart + header.getSerializedSize() == checksumStart; // Copy the header data into the buffer immediately preceding the checksum // data. System.arraycopy(header.getBytes(), 0, buf, headerStart, header.getSerializedSize()); // corrupt the data for testing. if (DFSClientFaultInjector.get().corruptPacket()) { buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff; } // Write the now contiguous full packet to the output stream. stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen); // undo corruption. if (DFSClientFaultInjector.get().uncorruptPacket()) { buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff; } } ```
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号