交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
第4-3 章:DataStreamer
分享
未结
0
587
李延
LV6
2022-04-11
悬赏:20积分
# 1 说明 前面我们分析到数据包最后被添加到了DataStreamer的dataQueue 队列中,我们来具体分析一下,首先我们看到它继承了`Thread`,而且有一个run方法,在最上面的`DFSOutputStream.newStreamForCreate`中我们调用了out.start 方法。我们看一下DFSOutputStream实现 ```java protected synchronized void start() { getStreamer().start(); } ``` 这里是调用了DataStreamer的start,也就是说在创建输出流时,就启动了一新的线程。我们来具体分析一下这个线程干了什么事情。 ```java public void run() { TraceScope scope = null; while (!streamerClosed && dfsClient.clientRunning) { // if the Responder encountered an error, shutdown Responder if (errorState.hasError()) { closeResponder(); } DFSPacket one; try { // process datanode IO errors if any boolean doSleep = processDatanodeOrExternalError(); synchronized (dataQueue) { // wait for a packet to be sent. while ((!shouldStop() && dataQueue.isEmpty()) || doSleep) { long timeout = 1000; //发送心跳 if (stage == BlockConstructionStage.DATA_STREAMING) { timeout = sendHeartbeat(); } try { dataQueue.wait(timeout); } catch (InterruptedException e) { LOG.debug("Thread interrupted", e); } doSleep = false; } if (shouldStop()) { continue; } // get packet to be sent. try { backOffIfNecessary(); } catch (InterruptedException e) { LOG.debug("Thread interrupted", e); } //获取队列中的数据包 one = dataQueue.getFirst(); // regular data packet SpanId[] parents = one.getTraceParents(); if (parents.length > 0) { scope = dfsClient.getTracer(). newScope("dataStreamer", parents[0]); scope.getSpan().setParents(parents); } } // get new block from namenode. LOG.debug("stage={}, {}", stage, this); //判断是否需要打开一个新的数据块 if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { LOG.debug("Allocating new block: {}", this); setPipeline(nextBlockOutputStream()); initDataStreaming(); } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { LOG.debug("Append to block {}", block); setupPipelineForAppendOrRecovery(); if (streamerClosed) { continue; } initDataStreaming(); } long lastByteOffsetInBlock = one.getLastByteOffsetBlock(); if (lastByteOffsetInBlock > stat.getBlockSize()) { throw new IOException("BlockSize " + stat.getBlockSize() + " < lastByteOffsetInBlock, " + this + ", " + one); } if (one.isLastPacketInBlock()) { // wait for all data packets have been successfully acked waitForAllAcks(); if(shouldStop()) { continue; } stage = BlockConstructionStage.PIPELINE_CLOSE; } // send the packet SpanId spanId = SpanId.INVALID; synchronized (dataQueue) { // move packet from dataQueue to ackQueue if (!one.isHeartbeatPacket()) { if (scope != null) { spanId = scope.getSpanId(); scope.detach(); one.setTraceScope(scope); } scope = null; dataQueue.removeFirst(); ackQueue.addLast(one); packetSendTime.put(one.getSeqno(), Time.monotonicNow()); dataQueue.notifyAll(); } } LOG.debug("{} sending {}", this, one); // write out data to remote datanode try (TraceScope ignored = dfsClient.getTracer(). newScope("DataStreamer#writeTo", spanId)) { //发送数据 sendPacket(one); } catch (IOException e) { // HDFS-3398 treat primary DN is down since client is unable to // write to primary DN. If a failed or restarting node has already // been recorded by the responder, the following call will have no // effect. Pipeline recovery can handle only one node error at a // time. If the primary node fails again during the recovery, it // will be taken out then. errorState.markFirstNodeIfNotMarked(); throw e; } // update bytesSent long tmpBytesSent = one.getLastByteOffsetBlock(); if (bytesSent < tmpBytesSent) { bytesSent = tmpBytesSent; } if (shouldStop()) { continue; } // Is this block full? if (one.isLastPacketInBlock()) { // wait for the close packet has been acked waitForAllAcks(); if (shouldStop()) { continue; } endBlock(); } if (progress != null) { progress.progress(); } // This is used by unit test to trigger race conditions. if (artificialSlowdown != 0 && dfsClient.clientRunning) { Thread.sleep(artificialSlowdown); } } catch (Throwable e) { // Log warning if there was a real error. if (!errorState.isRestartingNode()) { // Since their messages are descriptive enough, do not always // log a verbose stack-trace WARN for quota exceptions. if (e instanceof QuotaExceededException) { LOG.debug("DataStreamer Quota Exception", e); } else { LOG.warn("DataStreamer Exception", e); } } lastException.set(e); assert !(e instanceof NullPointerException); errorState.setInternalError(); if (!errorState.isNodeMarked()) { // Not a datanode issue streamerClosed = true; } } finally { if (scope != null) { scope.close(); scope = null; } } } closeInternal(); } ``` 我们这里主要关注的有 - setPipeline(nextBlockOutputStream()); 创建与datanode的连接 - sendPacket(one); 发送数据 而sendPacket 为 ```java private void sendPacket(DFSPacket packet) throws IOException { // write out data to remote datanode try { packet.writeTo(blockStream); blockStream.flush(); } catch (IOException e) { // HDFS-3398 treat primary DN is down since client is unable to // write to primary DN. If a failed or restarting node has already // been recorded by the responder, the following call will have no // effect. Pipeline recovery can handle only one node error at a // time. If the primary node fails again during the recovery, it // will be taken out then. errorState.markFirstNodeIfNotMarked(); throw e; } lastPacket = Time.monotonicNow(); } ``` 就是将数据刷新到blockStream 流里。这个流应该是在创建datanode 时创建的,我们回头看一下。 ```java protected LocatedBlock nextBlockOutputStream() throws IOException { LocatedBlock lb; DatanodeInfo[] nodes; StorageType[] nextStorageTypes; String[] nextStorageIDs; //副本数量 int count = dfsClient.getConf().getNumBlockWriteRetry(); boolean success; final ExtendedBlock oldBlock = block.getCurrentBlock(); do { errorState.resetInternalError(); lastException.clear(); DatanodeInfo[] excluded = getExcludedNodes(); //通过namenode接口创建block lb = locateFollowingBlock( excluded.length > 0 ? excluded : null, oldBlock); block.setCurrentBlock(lb.getBlock()); block.setNumBytes(0); bytesSent = 0; accessToken = lb.getBlockToken(); nodes = lb.getLocations(); nextStorageTypes = lb.getStorageTypes(); nextStorageIDs = lb.getStorageIDs(); // Connect to first DataNode in the list. //在lb中获取到连接信息,创建于datanode 的scoket success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs, 0L, false); if (!success) { LOG.warn("Abandoning " + block); dfsClient.namenode.abandonBlock(block.getCurrentBlock(), stat.getFileId(), src, dfsClient.clientName); block.setCurrentBlock(null); final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()]; LOG.warn("Excluding datanode " + badNode); excludedNodes.put(badNode, badNode); } } while (!success && --count >= 0); if (!success) { throw new IOException("Unable to create new block."); } return lb; } ``` 这里我们 看到主要是分为两步 - 调用namenode.addBlock 在namenode中创建Block数据块。 - 通过namenode.addBlock获取到的信息与datanode建立tcp网络连接
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号