交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
第4-1 章:DataTree
分享
未结
0
708
李延
LV6
2021-09-08
悬赏:20积分
# 1. 功能说明 保存所有节点数据信息,包括节点路径、节点内容、acl权限、ttl过期时间、当前节点的监听事件 # 2. 主要成员变量 - NodeHashMap nodes 所有节点集合 - IWatchManager dataWatches 数据监听器 - IWatchManager childWatches 子节点监听器 - AtomicLong nodeDataSize 存储内容大小 - Map<Long, HashSet<String>> ephemerals 零时节点集合,key 为seesionid。value为当前连接的所有零时节点集合 - ReferenceCountedACLCache aclCache 权限控制缓存器,所有节点的权限都在这里进行缓存 ## 2.1 NodeHashMap nodes 所有的节点与节点数据都保存在这里。我们先看NodeHashMap接口 ![7ce40f22c4eaf41c55881aee1607ede1.png](//cdn.hiboot.cn/3876d8e2d55f450f9f2744fba69582df.png) 我们发现他定义的接口和我们java中 Map的接口非常相似,其中 key 为 节点的路径,而值 则是DataNode结构的节点 我们再看一下它的实现NodeHashMapImpl ![8887efd833a3371e4429e4f31a293440.png](//cdn.hiboot.cn/37baf8122d6b4103af04fbc5a4caf9b5.png) 我们看到他其实就是一个ConcurrentHashMap。 到这里,我们知道zookeeper的数据在内存中通过一个map进行存储数据,map中:key为全路径的path。值为当前节点的全部信息 ## 2.2 DataNode 在上一节中我们看到数据的值是DataNode结构。我们详细看看DataNode内部都保存了哪些数据 - long digest 校验码 - byte[] data 节点数据 - Long acl 权限验证,这里保存的是id。我们可以用ReferenceCountedACLCache对象通过这个id获取真正的权限 - StatPersisted stat 节点状态,也就是我们通过get命令获取到的一些信息 - Set<String> children 子节点 ## 2.3 digest 数据校验码,用与数据的校验,判断两个节点是否相同。 ### 2.3.1 作用 首先在每个节点中有这个校验码,用于判断每个节点内容。 其次在NodeHashMapImpl的成员变量中我们也看到了一个`AdHash hash`对象。 它的方法就是将每个节点的digest求和。我们看到在add和remove中有下面的方法 ![8887efd833a3371e4429e4f31a293440.png](//cdn.hiboot.cn/a7928e9338f544258da4a6f17a189ac0.png) 也就是说AdHash hash保存的是对于整个DataTree是否相同的判断。 这个将在我们集群计算多个节点数据是否同步、当前内存数据与快照数据是否相同的时候进行使用。 ### 2.3.2 生成 ![eef611128bfce4df5ce92d8a21dbd760.png](//cdn.hiboot.cn/5612bcd5d7a0480ca89bbdfef38ec369.png) 这个都是通过DigestCalculator对象进行生成,其具体通过CRC32算法进行计算。 ## 2.4 Map<Long, HashSet<String>> ephemerals 保存零时节点。我们知道临时节点再其连接断开后,节点就会自动删除,而这个map就是保存的所有零时节点路径。 key是连接sessionid。当某个节点断开后。我们就可以通过seessionid获取到这个零时节点的所有路径集合,就可以进行删除 ## 2.5 IWatchManager 保存监听事件,单独说明 ## 2.6 ReferenceCountedACLCache 权限控制缓存器 在每个节点都可以设置自己的权限,并且一个节点可以设置多个权限。 而ReferenceCountedACLCache对象就是通过一个map,将权限进行缓存,返回给我们一个id。 我们在datanode中值保存这个id。如果需要具体的权限,就通过当前对象进行获取。 下面就是保存acl的代码 ```java public synchronized Long convertAcls(List<ACL> acls) { if (acls == null) { return OPEN_UNSAFE_ACL_ID; } // get the value from the map Long ret = aclKeyMap.get(acls); if (ret == null) { ret = incrementIndex(); longKeyMap.put(ret, acls); aclKeyMap.put(acls, ret); } addUsage(ret); return ret; } private long incrementIndex() { return ++aclIndex;} ``` # 3. DataTree主要方法 上一章中。我们大致说明了几个成员变量具体的作用。下面我们看看DataTree主要几个方法 ## 3.1 初始化 ```JAVA DataTree(DigestCalculator digestCalculator) { this.digestCalculator = digestCalculator; nodes = new NodeHashMapImpl(digestCalculator); /* Rather than fight it, let root have an alias */ //初始化根路径 nodes.put("", root); nodes.putWithoutDigest(rootZookeeper, root); /** add the proc node and quota node */ //初始化zookeeper路径 root.addChild(procChildZookeeper); nodes.put(procZookeeper, procDataNode); //初始化/zookeeper/quota路径 procDataNode.addChild(quotaChildZookeeper); nodes.put(quotaZookeeper, quotaDataNode); //初始化/zookeeper/config路径,其中保存了集群信息 addConfigNode(); nodeDataSize.set(approximateDataSize()); try { dataWatches = WatchManagerFactory.createWatchManager(); childWatches = WatchManagerFactory.createWatchManager(); } catch (Exception e) { LOG.error("Unexpected exception when creating WatchManager, exiting abnormally", e); ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue()); } } ``` ## 3.2 create创建节点 ```java public void createNode(final String path, byte[] data, List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat) throws KeeperException.NoNodeException, KeeperException.NodeExistsException { int lastSlash = path.lastIndexOf('/'); String parentName = path.substring(0, lastSlash); String childName = path.substring(lastSlash + 1); //创建节点的基本信息 StatPersisted stat = createStat(zxid, time, ephemeralOwner); //获取父节点 DataNode parent = nodes.get(parentName); if (parent == null) { throw new KeeperException.NoNodeException(); } synchronized (parent) { // Add the ACL to ACL cache first, to avoid the ACL not being // created race condition during fuzzy snapshot sync. // // This is the simplest fix, which may add ACL reference count // again if it's already counted in in the ACL map of fuzzy // snapshot, which might also happen for deleteNode txn, but // at least it won't cause the ACL not exist issue. // // Later we can audit and delete all non-referenced ACLs from // ACL map when loading the snapshot/txns from disk, like what // we did for the global sessions. //获取acl的id编号 Long longval = aclCache.convertAcls(acl); //判断当前父节点是否已经有了同名节点 Set<String> children = parent.getChildren(); if (children.contains(childName)) { throw new KeeperException.NodeExistsException(); } //判断是否需要跟新父节点版本号,一般添加的节点是零时节点的时候不改变 nodes.preChange(parentName, parent); if (parentCVersion == -1) { parentCVersion = parent.stat.getCversion(); parentCVersion++; } // There is possibility that we'll replay txns for a node which // was created and then deleted in the fuzzy range, and it's not // exist in the snapshot, so replay the creation might revert the // cversion and pzxid, need to check and only update when it's // larger. //跟新父节点 if (parentCVersion > parent.stat.getCversion()) { parent.stat.setCversion(parentCVersion); parent.stat.setPzxid(zxid); } //组装DatNode对象 DataNode child = new DataNode(data, longval, stat); //添加 parent.addChild(childName); nodes.postChange(parentName, parent); //跟新数据大小 nodeDataSize.addAndGet(getNodeSize(path, child.data)); nodes.put(path, child); //如果是零时节点,进行添加到ephemerals对象中 EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner); if (ephemeralType == EphemeralType.CONTAINER) { containers.add(path); } else if (ephemeralType == EphemeralType.TTL) { ttls.add(path); } else if (ephemeralOwner != 0) { HashSet<String> list = ephemerals.get(ephemeralOwner); if (list == null) { list = new HashSet<String>(); ephemerals.put(ephemeralOwner, list); } synchronized (list) { list.add(path); } } if (outputStat != null) { child.copyStat(outputStat); } } // now check if its one of the zookeeper node child //zookeeper节点特殊处理 if (parentName.startsWith(quotaZookeeper)) { // now check if its the limit node if (Quotas.limitNode.equals(childName)) { // this is the limit node // get the parent and add it to the trie pTrie.addPath(Quotas.trimQuotaPath(parentName)); } if (Quotas.statNode.equals(childName)) { updateQuotaForPath(Quotas.trimQuotaPath(parentName)); } } String lastPrefix = getMaxPrefixWithQuota(path); long bytes = data == null ? 0 : data.length; // also check to update the quotas for this node if (lastPrefix != null) { // ok we have some match and need to update updateQuotaStat(lastPrefix, bytes, 1); } updateWriteStat(path, bytes); //触发事件 dataWatches.triggerWatch(path, Event.EventType.NodeCreated); childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged); } public void createNode(final String path, byte[] data, List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat) throws KeeperException.NoNodeException, KeeperException.NodeExistsException { int lastSlash = path.lastIndexOf('/'); String parentName = path.substring(0, lastSlash); String childName = path.substring(lastSlash + 1); //创建节点的基本信息 StatPersisted stat = createStat(zxid, time, ephemeralOwner); //获取父节点 DataNode parent = nodes.get(parentName); if (parent == null) { throw new KeeperException.NoNodeException(); } synchronized (parent) { // Add the ACL to ACL cache first, to avoid the ACL not being // created race condition during fuzzy snapshot sync. // // This is the simplest fix, which may add ACL reference count // again if it's already counted in in the ACL map of fuzzy // snapshot, which might also happen for deleteNode txn, but // at least it won't cause the ACL not exist issue. // // Later we can audit and delete all non-referenced ACLs from // ACL map when loading the snapshot/txns from disk, like what // we did for the global sessions. //获取acl的id编号 Long longval = aclCache.convertAcls(acl); //判断当前父节点是否已经有了同名节点 Set<String> children = parent.getChildren(); if (children.contains(childName)) { throw new KeeperException.NodeExistsException(); } //判断是否需要跟新父节点版本号,一般添加的节点是零时节点的时候不改变 nodes.preChange(parentName, parent); if (parentCVersion == -1) { parentCVersion = parent.stat.getCversion(); parentCVersion++; } // There is possibility that we'll replay txns for a node which // was created and then deleted in the fuzzy range, and it's not // exist in the snapshot, so replay the creation might revert the // cversion and pzxid, need to check and only update when it's // larger. //跟新父节点 if (parentCVersion > parent.stat.getCversion()) { parent.stat.setCversion(parentCVersion); parent.stat.setPzxid(zxid); } //组装DatNode对象 DataNode child = new DataNode(data, longval, stat); //添加 parent.addChild(childName); nodes.postChange(parentName, parent); //跟新数据大小 nodeDataSize.addAndGet(getNodeSize(path, child.data)); nodes.put(path, child); //如果是零时节点,进行添加到ephemerals对象中 EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner); if (ephemeralType == EphemeralType.CONTAINER) { containers.add(path); } else if (ephemeralType == EphemeralType.TTL) { ttls.add(path); } else if (ephemeralOwner != 0) { HashSet<String> list = ephemerals.get(ephemeralOwner); if (list == null) { list = new HashSet<String>(); ephemerals.put(ephemeralOwner, list); } synchronized (list) { list.add(path); } } if (outputStat != null) { child.copyStat(outputStat); } } // now check if its one of the zookeeper node child //zookeeper节点特殊处理 if (parentName.startsWith(quotaZookeeper)) { // now check if its the limit node if (Quotas.limitNode.equals(childName)) { // this is the limit node // get the parent and add it to the trie pTrie.addPath(Quotas.trimQuotaPath(parentName)); } if (Quotas.statNode.equals(childName)) { updateQuotaForPath(Quotas.trimQuotaPath(parentName)); } } String lastPrefix = getMaxPrefixWithQuota(path); long bytes = data == null ? 0 : data.length; // also check to update the quotas for this node if (lastPrefix != null) { // ok we have some match and need to update updateQuotaStat(lastPrefix, bytes, 1); } updateWriteStat(path, bytes); //触发事件 dataWatches.triggerWatch(path, Event.EventType.NodeCreated); childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged); } ```
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号