交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
第4 章:hdfs 客户端上传文件 DFSOutputStream
分享
未结
0
765
李延
LV6
2022-03-24
悬赏:20积分
# 1. 代码示例 ```java public static void create() throws IOException { System.setProperty("HADOOP_USER_NAME", "root"); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); FSDataOutputStream fsDataOutputStream = fs.create( new Path("/run.jar")); InputStream inputStream = new FileInputStream("C:\\work\\idea_work\\yarn_test\\src\\main\\resources\\run.jar"); IOUtils.copyBytes(inputStream,fsDataOutputStream,1024); } ``` 我们看到首先通过fs.create 创建一个输出流,然后将数据写入到输出流中。我们分这两步初步分析 # 2 create ```java public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress ) throws IOException { return this.create(f, FsCreateModes.applyUMask( FsPermission.getFileDefault(), FsPermission.getUMask(getConf())), overwrite, bufferSize, replication, blockSize, progress); } ``` 我们看到这里添加了一些默认的参数,我们跳过中间方法,一直调用到DFSClient ```java public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes, String ecPolicyName, String storagePolicy) throws IOException { checkOpen(); final FsPermission masked = applyUMask(permission); LOG.debug("{}: masked={}", src, masked); final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, dfsClientConf.createChecksum(checksumOpt), getFavoredNodesStr(favoredNodes), ecPolicyName, storagePolicy); beginFileLease(result.getFileId(), result); return result; } ``` 这里看到调用了DFSOutputStream.newStreamForCreate 方法 # 3 newStreamForCreate ```java static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, DataChecksum checksum, String[] favoredNodes, String ecPolicyName, String storagePolicy) throws IOException { try (TraceScope ignored = dfsClient.newPathTraceScope("newStreamForCreate", src)) { HdfsFileStatus stat = null; // Retry the create if we get a RetryStartFileException up to a maximum // number of times boolean shouldRetry = true; int retryCount = CREATE_RETRY_COUNT; while (shouldRetry) { shouldRetry = false; try { stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable<>(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName, storagePolicy); break; } catch (RemoteException re) { //............... } } //............... final DFSOutputStream out; if(stat.getErasureCodingPolicy() != null) { out = new DFSStripedOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes); } else { out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes, true); } out.start(); return out; } } ``` 这里我们忽略异常处理主要分为以下几步: - namenode 创建一个空文件,此时是调用namenode的RPC 添加了目录与空文件。 - 创建DFSOutputStream 对象。 - 调用DFSOutputStream对象start 方法,一步上传数据。 这里首先通过namenode 创建一个文件,这一步直接调用的RPC。主要逻辑我们在服务端分析,而重点是DFSOutputStream 对象,与它异步start的内容。
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号