交流
商城
MCN
登入
注册
首页
提问
分享
讨论
建议
公告
动态
发表新帖
发表新帖
第6 章:服务端-单机-启动流程
分享
未结
0
962
李延
LV6
2021-09-10
悬赏:20积分
# 1. 背景 前面的章节我们说明了zookeeper数据存储和网络相关内容下面我们主要分析一下单机模式下主函数启动过程 # 2. 启动流程   # 源码分析 ```JAVA public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); try { main.initializeAndRun(args); } catch (IllegalArgumentException e) { //...................... } LOG.info("Exiting normally"); ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue()); } ``` ```JAVA protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { config.parse(args[0]); } // Start and schedule the the purge task //异步清除历史快照 DatadirCleanupManager purgeMgr = new DatadirCleanupManager( config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running in standalone mode"); // there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args); } } ``` 首先我们看到最开始一步就是清除历史的快照数据,之后判断是否为集群启动,分别使用不同的方法。我们这里分析单机模式,进入`ZooKeeperServerMain.main(args);` ```java public static void main(String[] args) { ZooKeeperServerMain main = new ZooKeeperServerMain(); try { main.initializeAndRun(args); } catch (IllegalArgumentException e) { //................................. } LOG.info("Exiting normally"); ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue()); } protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } //根据参数指定的目录,解析配置文件 ServerConfig config = new ServerConfig(); if (args.length == 1) { config.parse(args[0]); } else { config.parse(args); } runFromConfig(config); } ** public void runFromConfig(ServerConfig config) throws IOException, AdminServerException { LOG.info("Starting server"); FileTxnSnapLog txnLog = null; try { try { metricsProvider = MetricsProviderBootstrap.startMetricsProvider( config.getMetricsProviderClassName(), config.getMetricsProviderConfiguration()); } catch (MetricsProviderLifeCycleException error) { throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error); } ServerMetrics.metricsProviderInitialized(metricsProvider); ProviderRegistry.initialize(); // Note that this thread isn't going to be doing anything else, // so rather than spawning another thread, we will just call // run() in this thread. // create a file logger url from the command line args txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir); JvmPauseMonitor jvmPauseMonitor = null; if (config.jvmPauseMonitorToRun) { jvmPauseMonitor = new JvmPauseMonitor(config); } //构建ZooKeeperServer服务,zookeeper的数据和网络接收到的请求都交给它来处理 final ZooKeeperServer zkServer = new ZooKeeperServer(jvmPauseMonitor, txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, config.listenBacklog, null, config.initialConfig); txnLog.setServerStats(zkServer.serverStats()); // Registers shutdown handler which will be used to know the // server error or shutdown state changes. final CountDownLatch shutdownLatch = new CountDownLatch(1); zkServer.registerServerShutdownHandler(new ZooKeeperServerShutdownHandler(shutdownLatch)); // Start Admin server 这里会默认开启一个8080的端口,是一个页面。如果被占用不影响正常启动 adminServer = AdminServerFactory.createAdminServer(); adminServer.setZooKeeperServer(zkServer); adminServer.start(); boolean needStartZKServer = true; if (config.getClientPortAddress() != null) { //构建网络监听,监听我们使用的2181端口 cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false); //启动服务。包括:网络、数据加载、service服务 cnxnFactory.startup(zkServer); // zkServer has been started. So we don't need to start it again in secureCnxnFactory. needStartZKServer = false; } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true); secureCnxnFactory.startup(zkServer, needStartZKServer); } containerManager = new ContainerManager( zkServer.getZKDatabase(), zkServer.firstProcessor, Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)), Integer.getInteger("znode.container.maxPerMinute", 10000), Long.getLong("znode.container.maxNeverUsedIntervalMs", 0) ); containerManager.start(); ZKAuditProvider.addZKStartStopAuditLog(); serverStarted(); // Watch status of ZooKeeper server. It will do a graceful shutdown // if the server is not running or hits an internal error. shutdownLatch.await(); shutdown(); if (cnxnFactory != null) { cnxnFactory.join(); } if (secureCnxnFactory != null) { secureCnxnFactory.join(); } if (zkServer.canShutdown()) { zkServer.shutdown(true); } } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Server interrupted", e); } finally { if (txnLog != null) { txnLog.close(); } if (metricsProvider != null) { try { metricsProvider.stop(); } catch (Throwable error) { LOG.warn("Error while stopping metrics", error); } } } }** ``` 这里我们主要关注两个对象ZooKeeperServer、ServerCnxnFactory。其中ServerCnxnFactory我们在之前的代码中已经分析过了。而ZooKeeperServer中进行所有关于zookeeper的逻辑处理。 我们根据ServerCnxnFactory的startup方法看一下启动过程。 ```java @Override public void startup(ZooKeeperServer zks, boolean startServer) throws IOException, InterruptedException { //启动网络 start(); setZooKeeperServer(zks); //启动服务 if (startServer) { //从磁盘中加载数据 zks.startdata(); //启动服务 zks.startup(); } } ``` 其中 start(); 与 zks.startdata(); 分别为网络和数据的加载,之前已经分析过了,不再赘述,这里主要看 zks.startup(); ```java private void startupWithServerState(State state) { if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); /** * 设置请求处理器 * 它是一个责任链的形式 。我们的请求会调用第一个请求 * 其中 几个责任链是异步的 */ setupRequestProcessors(); /** * * 开启RequestThrottler 请求 * 这个线程以队列接受到请求,并在run方法中进行处理 * */ startRequestThrottler(); registerJMX(); startJvmPauseMonitor(); registerMetrics(); setState(state); requestPathMetricsCollector.start(); localSessionEnabled = sessionTracker.isLocalSessionsEnabled(); notifyAll(); } ``` 这里我们看到有两个主要内容RequestProcessors与RequestThrottler我们下面详细说明
回帖
消灭零回复
提交回复
热议榜
java 相关知识分享
8
好的程序员与不好的程序员
6
写给工程师的十条精进原则
5
spring boot以jar包运行配置的logback日志文件没生成
5
一步一步分析SpringBoot启动源码(一)
5
MockMvc测试
5
【吐槽向】是不是有个吐槽的板块比较好玩
4
logstash jdbc同步mysql多表数据到elasticsearch
3
IntelliJ IDEA 优质License Server
3
.gitignore忽略规则
3
SpringBoot启动源码分析
3
一步一步分析SpringBoot启动源码(三)
3
2
一步一步分析SpringBoot启动源码(二)
2
积分不够将无法发表新帖
2
官方产品
Meta-Boot - 基于MCN
MCN - 快速构建SpringBoot应用
微信扫码关注公众号