交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
第4-1 章:DFSOutputStream
分享
未结
0
723
李延
LV6
2022-04-11
悬赏:20积分
# 1. 说明 在之前章节我们看到文件上传最后使用的是DFSOutputStream类。这一章我们逐步进行分析。 DFSOutputStream 对象有一个父类FSOutputSummer。其中 - FSOutputSummer 为一个本地的缓存区,数据会零时缓存在byte数组里。 - DFSOutputStream 就是一个对缓存刷入具体底层的实现。 但在DFSOutputStream中,它不是直接将数据传给hdfs的,而是将数据打包为DFSPacket对象,由DataStreamer 传输给hdfs的。 # 2. 父类FSOutputSummer 我们首先看一下这个类的注释 ```java * This is a generic output stream for generating checksums for * data before it is written to the underlying stream ``` 这是一个通用输出流,用于在将数据写入底层流之前为数据生成校验和 ## 2.1 构造函数 ```java protected FSOutputSummer(DataChecksum sum) { this.sum = sum; // 保存数据 this.buf = new byte[sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS]; // 保存校验和 this.checksum = new byte[getChecksumSize() * BUFFER_NUM_CHUNKS]; // 保存当前保存的字节 this.count = 0; } ``` 我们看到这里初始化了几个重要的成员变量 ## 2.2 write ```java @Override public synchronized void write(byte b[], int off, int len) throws IOException { checkClosed(); //判断写入数据长度合法 if (off < 0 || len < 0 || off > b.length - len) { throw new ArrayIndexOutOfBoundsException(); } // 写入数据 for (int n=0;n<len;n+=write1(b, off+n, len-n)) { } } ``` 这里我们看到有循环调用了`write1` 方法, 主要是因为缓存区一次最多缓存buf.length 个字节。一次可能无法把指定的所有数据全部缓存,所以通过循环判断直到所以数据全部写入。 我们继续看真正的写入方法write1 ```java private int write1(byte b[], int off, int len) throws IOException { //如果当前缓存区数据为空,并且当前需要写入数据大于最大缓存区,则直接通过writeChecksumChunks缓存数据。 if(count==0 && len>=buf.length) { // local buffer is empty and user buffer size >= local buffer size, so // simply checksum the user buffer and send it directly to the underlying // stream final int length = buf.length; writeChecksumChunks(b, off, length); return length; } // copy user data to local buffer //判断当前可以缓存数据长度 int bytesToCopy = buf.length-count; bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy; //将数据缓存至buf。 System.arraycopy(b, off, buf, count, bytesToCopy); count += bytesToCopy; //如果缓存数据写满则刷新 if (count == buf.length) { // local buffer is full flushBuffer(); } return bytesToCopy; } ``` 这里我们看到最后数据是被写入到buf缓存区中,而当缓存区满后,就会调用flushBuffer刷新数据 ## 2.3 flushBuffer ```java protected synchronized void flushBuffer() throws IOException { flushBuffer(false, true); } /* Forces buffered output bytes to be checksummed and written out to * the underlying output stream. If there is a trailing partial chunk in the * buffer, * 1) flushPartial tells us whether to flush that chunk * 2) if flushPartial is true, keep tells us whether to keep that chunk in the * buffer (if flushPartial is false, it is always kept in the buffer) * * Returns the number of bytes that were flushed but are still left in the * buffer (can only be non-zero if keep is true). */ protected synchronized int flushBuffer(boolean keep, boolean flushPartial) throws IOException { int bufLen = count; int partialLen = bufLen % sum.getBytesPerChecksum(); int lenToFlush = flushPartial ? bufLen : bufLen - partialLen; if (lenToFlush != 0) { writeChecksumChunks(buf, 0, lenToFlush); if (!flushPartial || keep) { count = partialLen; System.arraycopy(buf, bufLen - count, buf, 0, count); } else { count = 0; } } // total bytes left minus unflushed bytes left return count - (bufLen - lenToFlush); } ``` 这里有两个参数: - flushPartial 在计算校验和时,我们一般是以512字节为一组计算一个校验和,而当buf最后不足512字节部分,是否也需要一同计算校验和并发送数据。通过flushPartial 控制。为true表示不足512部分也计算校验和。 - keep 当flushPartial为false时,不足512字节部分,是否依然保留。 这里我们看到最后调用的是writeChecksumChunks,和之前大数据块时是一个方法。 ## 2.4 writeChecksumChunks ```java private void writeChecksumChunks(byte b[], int off, int len) throws IOException { //计算校验和,并保存在checksum中 sum.calculateChunkedSums(b, off, len, checksum, 0); TraceScope scope = createWriteTraceScope(); try { //分别以一个数据,一个校验和为一组数据通过writeChunk 发送数据 for (int i = 0; i < len; i += sum.getBytesPerChecksum()) { int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i); int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize(); writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize()); } } finally { if (scope != null) { scope.close(); } } } ``` 在这里我们看到计算校验和,并将数据与校验和一同发送。而writeChunk 具体由子类实现。 # 3 DFSOutputStream 在父类中,我们分析到对于数据和的计算,最后通过调用writeChunk ,将数据与校验和一同进行发送,这里我们就分析一下writeChunk最后的实现。 ## 3.1 构造函数 ```java protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress, DataChecksum checksum, String[] favoredNodes, boolean createStreamer) { this(dfsClient, src, flag, progress, stat, checksum); this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum); if (createStreamer) { streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager, favoredNodes, addBlockFlags); } } private DFSOutputStream(DFSClient dfsClient, String src, EnumSet<CreateFlag> flag, Progressable progress, HdfsFileStatus stat, DataChecksum checksum) { super(getChecksum4Compute(checksum, stat)); this.dfsClient = dfsClient; this.src = src; this.fileId = stat.getFileId(); this.blockSize = stat.getBlockSize(); this.blockReplication = stat.getReplication(); this.fileEncryptionInfo = stat.getFileEncryptionInfo(); this.cachingStrategy = new AtomicReference<>( dfsClient.getDefaultWriteCachingStrategy()); this.addBlockFlags = EnumSet.noneOf(AddBlockFlag.class); if (flag.contains(CreateFlag.NO_LOCAL_WRITE)) { this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_WRITE); } if (flag.contains(CreateFlag.NO_LOCAL_RACK)) { this.addBlockFlags.add(AddBlockFlag.NO_LOCAL_RACK); } if (flag.contains(CreateFlag.IGNORE_CLIENT_LOCALITY)) { this.addBlockFlags.add(AddBlockFlag.IGNORE_CLIENT_LOCALITY); } if (progress != null) { DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream " +"{}", src); } initWritePacketSize(); this.bytesPerChecksum = checksum.getBytesPerChecksum(); if (bytesPerChecksum <= 0) { throw new HadoopIllegalArgumentException( "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0"); } if (blockSize % bytesPerChecksum != 0) { throw new HadoopIllegalArgumentException("Invalid values: " + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum + ") must divide block size (=" + blockSize + ")."); } this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager(); } ``` 这里初始化的比较多,我们主要关注两个变量 - currentPacket 当前数据包,数据与校验和最终后被打包为当前对象进行发送。 - streamer:DataStreamer对象,维护datanode连接。当前对象通过tcp将数据发送给databide。 ## 3.2 writeChunk 在父类中,我们看到数据最后调用的这个方法,我们主要分析一下。 ```java @Override protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen) throws IOException { //检查 currentPacket 是否创建。 writeChunkPrepare(len, ckoff, cklen); currentPacket.writeChecksum(checksum, ckoff, cklen); currentPacket.writeData(b, offset, len); currentPacket.incNumChunks(); getStreamer().incBytesCurBlock(len); // If packet is full, enqueue it for transmission //判断 当前数据包是否已经写完,如果写完需要刷新数据包 //主要包括两个条件:1 数据包容量已经满了,2 当前datanode数据块写满 if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || getStreamer().getBytesCurBlock() == blockSize) { enqueueCurrentPacketFull(); } } ``` 这里我们看到这里我们将数据与校验和一起打包在了currentPacket对象中,同时在复合条件是调用了enqueueCurrentPacketFull 方法。 ## 3.3 enqueueCurrentPacketFull ```java synchronized void enqueueCurrentPacketFull() throws IOException { LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={}," + " appendChunk={}, {}", currentPacket, src, getStreamer() .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(), getStreamer()); enqueueCurrentPacket(); adjustChunkBoundary(); endBlock(); } ``` 这里主要关注两个方法enqueueCurrentPacket 和 endBlock ### 3.3.1 enqueueCurrentPacket ```java void enqueueCurrentPacket() throws IOException { getStreamer().waitAndQueuePacket(currentPacket); currentPacket = null; } ``` 这里我们看到数据块被发送给了streamer,我们跟进 ```java void waitAndQueuePacket(DFSPacket packet) throws IOException { synchronized (dataQueue) { try { // If queue is full, then wait till we have enough space boolean firstWait = true; try { while (!streamerClosed && dataQueue.size() + ackQueue.size() > dfsClient.getConf().getWriteMaxPackets()) { if (firstWait) { Span span = Tracer.getCurrentSpan(); if (span != null) { span.addTimelineAnnotation("dataQueue.wait"); } firstWait = false; } try { dataQueue.wait(); } catch (InterruptedException e) { // If we get interrupted while waiting to queue data, we still need to get rid // of the current packet. This is because we have an invariant that if // currentPacket gets full, it will get queued before the next writeChunk. // // Rather than wait around for space in the queue, we should instead try to // return to the caller as soon as possible, even though we slightly overrun // the MAX_PACKETS length. Thread.currentThread().interrupt(); break; } } } finally { Span span = Tracer.getCurrentSpan(); if ((span != null) && (!firstWait)) { span.addTimelineAnnotation("end.wait"); } } checkClosed(); queuePacket(packet); } catch (ClosedChannelException cce) { LOG.debug("Closed channel exception", cce); } } } void queuePacket(DFSPacket packet) { synchronized (dataQueue) { if (packet == null) return; packet.addTraceParent(Tracer.getCurrentSpanId()); dataQueue.addLast(packet); lastQueuedSeqno = packet.getSeqno(); LOG.debug("Queued {}, {}", packet, this); dataQueue.notifyAll(); } } ``` 这里我们看到数据包最后是被添加到了dataQueue 队列中。 但在前面调用了 dataQueue.wait();。让线程等待,它是在run方法中被唤醒的,只有当队列数据被消费后,才后让新数据添加到队列中,防止队列中数据过多。 ### 3.3.2 endBlock ```java void endBlock() throws IOException { if (getStreamer().getBytesCurBlock() == blockSize) { //创建一个数据块结束的数据包 setCurrentPacketToEmpty(); //发送这个数据包 enqueueCurrentPacket(); getStreamer().setBytesCurBlock(0); lastFlushOffset = 0; } } ``` 这里我们发现它发送了一个特殊的数据包,这个数据包内没有数据,但数据包的成员变量lastPacketInBlock 为true。表示是当前datanode数据块结束。 ```java void setCurrentPacketToEmpty() throws InterruptedIOException { currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), true); currentPacket.setSyncBlock(shouldSyncBlock); } ```
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号