当前位置: 首页 > news >正文

基层建设论文收录在哪个网站做整站优化

基层建设论文收录在哪个网站,做整站优化,安全联盟这种网站建设,深圳网络服务有限公司文章目录1 解析命令行参数和配置文件2 创建并启动 NamesrvController2.1 创建 NamesrvController 对象2.2 启动 NamesrvController 对象第一步:初始化 controller第二步:注册 JVM 钩子第二步:启动 controllerRocketMQ是一个分布式消息中间件&…

文章目录

  • 1 解析命令行参数和配置文件
  • 2 创建并启动 NamesrvController
    • 2.1 创建 NamesrvController 对象
    • 2.2 启动 NamesrvController 对象
      • 第一步:初始化 controller
      • 第二步:注册 JVM 钩子
      • 第二步:启动 controller

RocketMQ是一个分布式消息中间件,它的核心组件之一是namesrv,负责管理broker的路由信息和kv配置。本文将介绍RocketMQ5.1版本中namesrv的启动过程,包括如何解析命令行参数、加载配置文件、初始化和启动namesrv控制器等。

首先,我们需要在环境变量中设置ROCKETMQ_HOME,指向RocketMQ的安装目录。然后,我们可以使用如下命令启动namesrv:

nohup sh mqnamesrv &

这条命令执行运行的是 namesrv.NamesrvStartup#main 方法。

public static void main(String[] args) {main0(args);controllerManagerMain();
}

启动过程分为两部分

  1. main0(args):启动 NamesrvController
  2. controllerManagerMain():启动 ControllerManager

本文只分析 NamesrvController 的启动

namesrv.NamesrvStartup#main0 方法,代码如下:

public static NamesrvController main0(String[] args) {try {// 解析命令行参数和配置文件parseCommandlineAndConfigFile(args);// 创建并启动 NamesrvControllerNamesrvController controller = createAndStartNamesrvController();return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}

namesrv的启动过程可以分为以下几个步骤:

  1. 解析命令行参数和配置文件,初始化namesrvConfig、nettyServerConfig、nettyClientConfig等配置对象
  2. 创建并启动 NamesrvController 对象,该对象是namesrv的核心控制器,它持有各种配置对象、网络通信对象、路由管理对象等

1 解析命令行参数和配置文件

解析命令行参数和配置文件的方法是 namesrv.NamesrvStartup#parseCommandlineAndConfigFile,代码如下:

public static void parseCommandlineAndConfigFile(String[] args) throws Exception {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));Options options = ServerUtil.buildCommandlineOptions(new Options());CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new DefaultParser());if (null == commandLine) {System.exit(-1);return;}// 创建配置类对象namesrvConfig = new NamesrvConfig();nettyServerConfig = new NettyServerConfig();nettyClientConfig = new NettyClientConfig();// 设置namesrv的监听端口nettyServerConfig.setListenPort(9876);// 如果命令行中有-c参数,则从配置文件中加载namesrvConfig、nettyServerConfig、nettyClientConfig、controllerConfig的属性if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));properties = new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);if (namesrvConfig.isEnableControllerInNamesrv()) {controllerConfig = new ControllerConfig();MixAll.properties2Object(properties, controllerConfig);}namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);// 如果命令行中有-p参数,则打印namesrvConfig、nettyServerConfig、nettyClientConfig、controllerConfig的属性if (commandLine.hasOption('p')) {MixAll.printObjectProperties(logConsole, namesrvConfig);MixAll.printObjectProperties(logConsole, nettyServerConfig);MixAll.printObjectProperties(logConsole, nettyClientConfig);if (namesrvConfig.isEnableControllerInNamesrv()) {MixAll.printObjectProperties(logConsole, controllerConfig);}System.exit(0);}if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);}

上述代码的执行流程如下:

  1. 创建一个NamesrvConfig对象,用于存储namesrv的配置信息,如rocketmqHome、kvConfigPath等。NamesrvConfig类的具体属性如下表所示

    属性名含义默认值
    rocketmqHomeRocketMQ的根目录环境变量ROCKETMQ_HOME
    kvConfigPathKV配置文件的路径${user.home}/namesrv/kvConfig.json
    configStorePath配置存储文件的路径${user.home}/namesrv/namesrv.properties
    productEnvName环境名称center
    clusterTest是否开启集群测试false
    orderMessageEnable是否支持顺序消息false
  2. 创建一个NettyServerConfig对象,用于存储netty服务端的配置信息,如listenPort、serverWorkerThreads等。NettyServerConfig类的具体属性如下表所示

    属性名含义默认值
    listenPort监听端口,用于接收客户端的连接请求9876
    serverWorkerThreads服务端工作线程数量,用于处理网络IO事件或执行业务任务8
    serverCallbackExecutorThreads服务端回调执行线程数量,用于执行异步回调任务,如果为0,则使用公共线程池0
    serverSelectorThreads服务端选择器线程数量,用于接收和分发网络IO事件,建议不要超过3个3
    serverOnewaySemaphoreValue服务端单向请求信号量值,用于限制单向请求的并发度,防止资源耗尽256
    serverAsyncSemaphoreValue服务端异步请求信号量值,用于限制异步请求的并发度,防止资源耗尽64
    serverChannelMaxIdleTimeSeconds服务端通道最大空闲时间(秒),超过该时间则关闭连接,释放资源120
    serverSocketSndBufSize服务端Socket发送缓冲区大小(字节),如果为-1,则使用操作系统默认值-1
    serverSocketRcvBufSize服务端Socket接收缓冲区大小(字节),如果为-1,则使用操作系统默认值-1
    serverPooledByteBufAllocatorEnable是否启用服务端池化ByteBuf分配器,可以提高内存利用率和性能true
    useEpollNativeSelector是否使用Epoll本地选择器,可以提高Linux平台下的网络IO效率,需要操作系统支持false
  3. 创建一个NettyClientConfig对象,用于存储netty客户端的配置信息,如clientWorkerThreads、clientCallbackExecutorThreads等。NettyClientConfig类的具体属性如下表所示

    属性名含义默认值
    clientWorkerThreads客户端工作线程数量,用于处理网络IO事件或执行业务任务4
    clientCallbackExecutorThreads客户端回调执行线程数量,用于执行异步回调任务,如果为0,则使用公共线程池可用处理器的数量
    connectTimeoutMillis连接超时时间(毫秒),超过该时间则放弃连接3000
    channelNotActiveInterval通道不活跃的间隔时间(毫秒),超过该时间则关闭通道,释放资源1000 * 60
    clientChannelMaxIdleTimeSeconds客户端通道最大空闲时间(秒),超过该时间则关闭连接,释放资源120
    clientSocketSndBufSize客户端Socket发送缓冲区大小(字节),如果为-1,则使用操作系统默认值-1
    clientSocketRcvBufSize客户端Socket接收缓冲区大小(字节),如果为-1,则使用操作系统默认值-1
    clientPooledByteBufAllocatorEnable是否启用客户端池化ByteBuf分配器,可以提高内存利用率和性能false
    clientCloseSocketIfTimeout是否在超时时关闭客户端Socket,可以避免资源泄漏false
    useTLS是否使用TLS协议进行加密通信,需要操作系统支持false
    clientOnewaySemaphoreValue客户端单向请求信号量值,用于限制单向请求的并发度,防止资源耗尽65535
    clientAsyncSemaphoreValue客户端异步请求信号量值,用于限制异步请求的并发度,防止资源耗尽65535
  4. 解析命令行参数,支持以下选项:

    • -c 指定配置文件路径,如果指定了配置文件,会从文件中加载namesrvConfig、nettyServerConfig和nettyClientConfig的属性。
    • -p 打印所有配置信息,并退出程序
    • 其他选项会被转换为properties对象,并覆盖namesrvConfig、nettyServerConfig和nettyClientConfig的属性

2 创建并启动 NamesrvController

创建并启动 NamesrvController 的方法是 namesrv.NamesrvStartup#createAndStartNamesrvController

public static NamesrvController createAndStartNamesrvController() throws Exception {// 创建 NamesrvControllerNamesrvController controller = createNamesrvController();// 启动 NamesrvControllerstart(controller);// 输出启动成功的日志NettyServerConfig serverConfig = controller.getNettyServerConfig();String tip = String.format("The Name Server boot success. serializeType=%s, address %s:%d", RemotingCommand.getSerializeTypeConfigInThisServer(), serverConfig.getBindAddress(), serverConfig.getListenPort());log.info(tip);System.out.printf("%s%n", tip);return controller;
}

代码执行流程如下:

  1. 调用createNamesrvController方法,创建NamesrvController对象
  2. 调用start方法,传入NamesrvController对象,启动 NamesrvController 对象
  3. 格式化一个提示信息,包含序列化类型、绑定地址和监听端口信息
  4. 使用log对象记录提示信息到日志文件中
  5. 把提示信息打印到控制台中
  6. 返回NamesrvController对象

如果在执行中出现异常,则抛出异常并退出程序

2.1 创建 NamesrvController 对象

namesrv.NamesrvStartup#createAndStartNamesrvController 方法中调用了 namesrv.NamesrvStartup#createNamesrvController方法,用于创建 NamesrvController 对象,代码为:

public static NamesrvController createNamesrvController() {// 创建 NamesrvController,传入namesrvConfig、nettyServerConfig、nettyClientConfig配置final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig, nettyClientConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);return controller;
}

创建 NamesrvController 对象的步骤如下:

  1. 创建一个NamesrvController对象,传入namesrvConfig、nettyServerConfig和nettyClientConfig对象,这些对象存储了namesrv的业务配置和网络配置
  2. 调用NamesrvController对象的getConfiguration方法,获取一个Configuration对象,该对象负责管理namesrv的配置信息
  3. 调用Configuration对象的registerConfig方法,传入properties对象,将properties对象中的配置信息注册到Configuration对象中
  4. 返回NamesrvController对象

这个函数的返回值是NamesrvController对象

2.2 启动 NamesrvController 对象

namesrv.NamesrvStartup#createAndStartNamesrvController 方法中调用了 namesrv.NamesrvStartup#start 方法,用于启动创建好的 NamesrvController 对象,代码为:

public static NamesrvController start(final NamesrvController controller) throws Exception {if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}// 初始化controllerboolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// 当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭。// 所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁等操作。Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {controller.shutdown();return null;}));// 启动controllercontroller.start();return controller;
}

启动 NamesrvController 对象的步骤如下:

第一步:初始化 controller

调用 namesrv.NamesrvController#initialize 方法,初始化controller。如果初始化失败,就调用controller的shutdown方法,关闭controller,并退出程序

public boolean initialize() {loadConfig();initiateNetworkComponents();initiateThreadExecutors();registerProcessor();startScheduleService();initiateSslContext();initiateRpcHooks();return true;
}

初始化操作包括:

  • 调用 namesrv.NamesrvController#loadConfig 方法从 NamesrvConfig 对象中保存的 kvConfigPath 指定的文件中加载kv配置,并创建一个KVConfigManager对象,用于管理和打印kv配置

    private void loadConfig() {this.kvConfigManager.load();
    }
    
  • 调用 namesrv.NamesrvController#initiateNetworkComponents 方法初始化网络组件,包括remotingClient和remotingServer

    remotingClient是一个NettyRemotingClient对象,它用于向其他服务发送请求或响应。remotingServer是一个NettyRemotingServer对象,它用于接收和处理来自其他服务的请求或响应。BrokerHousekeepingService对象用于处理broker的连接和断开事件。

    private void initiateNetworkComponents() {this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);this.remotingClient = new NettyRemotingClient(this.nettyClientConfig);
    }
    
  • 调用 namesrv.NamesrvController#initiateThreadExecutors 方法初始化两个线程池,一个是defaultExecutor,用于处理默认的远程请求;另一个是clientRequestExecutor,用于处理客户端的路由信息请求。这两个线程池都使用了LinkedBlockingQueue作为任务队列,并且重写了newTaskFor方法,使用FutureTaskExt包装了Runnable任务。

    private void initiateThreadExecutors() {this.defaultThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getDefaultThreadPoolQueueCapacity());this.defaultExecutor = new ThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(),this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS,this.defaultThreadPoolQueue, new ThreadFactoryImpl("RemotingExecutorThread_")) {@Overrideprotected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {return new FutureTaskExt<>(runnable, value);}};this.clientRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getClientRequestThreadPoolQueueCapacity());this.clientRequestExecutor = new ThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(),this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS,this.clientRequestThreadPoolQueue, new ThreadFactoryImpl("ClientRequestExecutorThread_")) {@Overrideprotected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {return new FutureTaskExt<>(runnable, value);}};
    }
    
  • 调用 namesrv.NamesrvController#registerProcessor 方法根据 namesrvConfig.isClusterTest() 的值,选择使用ClusterTestRequestProcessor或者DefaultRequestProcessor作为默认处理器

    • ClusterTestRequestProcessor是一个用于集群测试的处理器,它会在请求前后添加一些环境信息,比如产品环境名称、请求时间等
    • DefaultRequestProcessor是一个用于正常运行的处理器,它会根据请求的类型,调用不同的方法来处理,比如注册Broker、获取路由信息、更新配置等。
    • namesrvConfig.isClusterTest() = false 时如果收到请求的 requestCode 等于 RequestCode.GET_ROUTEINFO_BY_TOPIC 则会使用ClientRequestProcessor来处理;当收到其他请求时,会使用DefaultRequestProcessor来处理。
    private void registerProcessor() {if (namesrvConfig.isClusterTest()) {this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.defaultExecutor);} else {// Support get route info only temporarilyClientRequestProcessor clientRequestProcessor = new ClientRequestProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor);this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);}
    }
    

    此部分为 RIP 29 Optimize RocketMQ NameServer 中 Thread pool separation 改进。在改进之前nameserver使用同一个线程池和队列来处理所有的客户端路由请求、broker注册请求等,如果其中一种类型的请求爆发,会影响所有的请求。为了解决这个问题,RIP-29 将最重要的客户端路由请求单独隔离出来,使用不同的线程池和队列,队列大小和线程数都可以配置,这样可以保证不同类型的请求之间不会相互影响,如下图所示

    img
  • 调用 namesrv.NamesrvController#startScheduleService 方法启动定时服务,执行以下三个任务:

    • 每隔一段时间扫描不活跃的broker,并清理路由信息
    • 每隔 10 分钟打印所有的KV配置信息
    • 每隔 1 秒打印线程池的水位日志,即客户端请求线程池和默认线程池的队列大小和头部任务的慢时间(从创建到执行的时间)
    private void startScheduleService() {// 周期性扫描不活跃的Broker,并从路由信息中移除this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);// 每隔 10 分钟打印KVConfig 信息this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,1, 10, TimeUnit.MINUTES);// 每隔 1 秒打印线程池的水位日志,// 即客户端请求线程池和默认线程池的队列大小和头部任务的慢时间(从创建到执行的时间)this.scheduledExecutorService.scheduleAtFixedRate(() -> {try {NamesrvController.this.printWaterMark();} catch (Throwable e) {LOGGER.error("printWaterMark error.", e);}}, 10, 1, TimeUnit.SECONDS);
    }
    
  • 调用 namesrv.NamesrvController#initiateSslContext 方法初始化SSL上下文,即配置remotingServer使用TLS协议进行安全通信

  • 调用 namesrv.NamesrvController#initiateRpcHooks 方法注册RPC钩子,即在remotingServer处理请求之前或之后执行一些自定义的逻辑

    private void initiateRpcHooks() {this.remotingServer.registerRPCHook(new ZoneRouteRPCHook());
    }
    

    namesrv.route.ZoneRouteRPCHook 类中重写了 doAfterResponse 方法,它会在处理GET_ROUTEINFO_BY_TOPIC请求(即请求的 requestCode = RequestCode.GET_ROUTEINFO_BY_TOPIC)时,根据请求中的zoneName参数,过滤掉不属于该区域的broker和queue数据,从而实现区域隔离的功能。

    具体来说,doAfterResponse会设置 response 的 body 为 namesrv.route.ZoneRouteRPCHook#filterByZoneName 方法返回值的字节数组格式,filterByZoneName 方法作用是返回过滤掉不属于该区域的broker和queue数据后的TopicRouteData数据对象

    @Override
    public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {if (RequestCode.GET_ROUTEINFO_BY_TOPIC != request.getCode()) {return;}// 省略部分代码TopicRouteData topicRouteData = RemotingSerializable.decode(response.getBody(), TopicRouteData.class);response.setBody(filterByZoneName(topicRouteData, zoneName).encode());
    }
    

第二步:注册 JVM 钩子

通过addShutdownHook方法注册一个ShutdownHookThread对象,即 JVM 钩子,用来在程序终止时调用controller的shutdown方法,释放资源

public void shutdown() {this.remotingClient.shutdown();this.remotingServer.shutdown();this.defaultExecutor.shutdown();this.clientRequestExecutor.shutdown();this.scheduledExecutorService.shutdown();this.scanExecutorService.shutdown();this.routeInfoManager.shutdown();if (this.fileWatchService != null) {this.fileWatchService.shutdown();}
}

第二步:启动 controller

调用 namesrv.NamesrvController#start 方法,启动 controller

public void start() throws Exception {this.remotingServer.start();// In test scenarios where it is up to OS to pick up an available port, set the listening port back to configif (0 == nettyServerConfig.getListenPort()) {nettyServerConfig.setListenPort(this.remotingServer.localListenPort());}this.remotingClient.updateNameServerAddressList(Collections.singletonList(NetworkUtil.getLocalAddress()+ ":" + nettyServerConfig.getListenPort()));this.remotingClient.start();if (this.fileWatchService != null) {this.fileWatchService.start();}this.routeInfoManager.start();
}

启动操作包括:

  • 调用remotingServer对象的start方法,启动一个NettyRemotingServer,用于接收和处理客户端的请求。
  • 如果nettyServerConfig对象的listenPort属性为0,说明是由操作系统自动分配一个可用端口,那么将remotingServer对象的localListenPort属性赋值给nettyServerConfig对象的listenPort属性,保持一致。
  • 调用remotingClient对象的updateNameServerAddressList方法,更新本地地址列表,只包含当前机器的IP地址和端口号。
  • 调用remotingClient对象的start方法,启动一个NettyRemotingClient,用于向其他服务发送请求。
  • 如果fileWatchService对象不为空,调用它的start方法,启动一个文件监视服务,用于动态加载证书文件。
  • 调用routeInfoManager对象的start方法,启动一个路由信息管理器,用于维护Broker和Topic的路由关系。

至此,NamesrvController 的启动流程结束

http://www.dinnco.com/news/43456.html

相关文章:

  • 网站优化提升速度百度竞价优化排名
  • 光谷做网站推广多少钱成都seo优化
  • 天津网站建设制作开发公司网站推广seo方法
  • 北京网站seo服务最新小组排名
  • 杭州 平台 公司 网站建设深圳做网站的公司
  • 在家建设一个网站需要什么云南今日头条新闻
  • 用凡科做网站好吗企业网站源码
  • 网站开发的广告词江阴网站制作公司
  • 删除wordpress修订版本泰州seo
  • 现在有哪家建筑公司招人石家庄百度推广排名优化
  • 阿里巴巴免费建网站移动慧生活app下载
  • 网站数据怎么更新搜索引擎免费登录入口
  • 第三方网站做appquark搜索引擎入口
  • 网站设计外包合同网络营销有哪些模式
  • 重庆网站建设培训机构网站排名优化需要多久
  • 狠狠做新网站网站流量排名查询工具
  • 深圳建网建网站代发软文
  • 找人做网站 优帮云怎么做app推广
  • 儒枫网网站建设免费手游推广平台
  • 三门网站建设购买一个网站域名需要多少钱
  • 公司如何建设网站首页大数据培训
  • 卡盟网站制作百度舆情
  • 怎么让搜索引擎收录网站独立站seo
  • 做h网站公司推广渠道有哪些
  • 网站后台插入程序代码用什么用手机百度seo怎么优化
  • 做网站网站内容怎么找石狮seo
  • 做网站赚钱 优帮云东莞优化网站制作
  • 公司网页制作哪家强网站搜索排名优化
  • 定制化网站b站推广链接
  • 有没有做鸭子的网站磁力链接搜索引擎2021