快捷搜索:  汽车  科技

nettyrpc框架线程模型分析(Java核心知识Netty与RPC一)

nettyrpc框架线程模型分析(Java核心知识Netty与RPC一)EventLoopGroup bossGroup =newNioEventLoopGroup();创建处理所有事件的线程池:workerGroupNetty服务器启动流程:1、创建线程池创建处理连接的线程池:bossgroup

nettyrpc框架线程模型分析(Java核心知识Netty与RPC一)(1)

netty 是一个高性能、异步事件驱动的 NIO 框架,基于 JAVA NIO 提供的 API 实现。它提供了对

TCP、UDP 和文件传输的支持,作为一个异步 NIO 框架,Netty 的所有 IO 操作都是异步非阻塞

的,通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。

本文仅适用与Netty4.0.32版本,其他版本是否适用表示并不清楚...

Netty服务器启动流程:

1、创建线程池

创建处理连接的线程池:bossgroup

创建处理所有事件的线程池:workerGroup

EventLoopGroup bossGroup =newNioEventLoopGroup();

EventLoopGroup workerGroup=newNioEventLoopGroup();

2、设定辅助启动类。ServerBootStrap

传入1中开辟的线程池

指定连接该服务器的channel类型

指定需要执行的childHandler

设置部分参数,如AdaptiveRecvByteBufAllocator缓存大小

.Option用于设置bossGroup相关参数

.childOption用于设置workerGroup相关参数

2.5、此处可处理一个问题:超长字符串在服务端handler无法被一次接收完

可通过此句进行设置:.childOption(ChannelOption.RCVBUF_ALLOCATOR new AdaptiveRecvByteBufAllocator(64 MAX_LENGTH_OF_MSG 65536))

nettyrpc框架线程模型分析(Java核心知识Netty与RPC一)(2)

ServerBootstrap serverBootstrap =newServerBootstrap();

serverBootstrap.group(bossGroup workerGroup)

.channel(NioServersocketChannel.class)//设置channel类型.childOption(ChannelOption.RCVBUF_ALLOCATOR newAdaptiveRecvByteBufAllocator(64 MAX_LENGTH_OF_MSG 65536))

.childHandler(newchildChannelHandler());//选择执行handler

此处的MAX_LENGTH_OF_MSG必须为2的次幂,不然肯定不会是你设置的那个值,具体会变成什么,源码还没看,等看了再补充...

2.75、构建Handler处理流程

样例如下:

publicclasschildChannelHandlerextendsChannelInitializer{

@OverrideprotectedvoidinitChannel(SocketChannel ch)throwsException {//TODO 添加各种功能handler 消息加解密,消息规范检测,构建返回码ch.pipeline().addLast(newNettyServerHandler());

}

}

当要添加多个handler时,就必须注意添加的顺序。

这里的handler分为两种类型:

一种继承ChannelInboundHandler,用于处理来自客户端的消息,比如对客户端的消息进行解码,读取等等。该类型在pipeline中的执行顺序与添加顺序一致。

一种继承ChannelOutboundHandler,用于处理即将发往客户端的消息,比如对该消息进行编辑,编码等等。该类型在pipeline中的执行顺序与添加顺序相反。

而且ChannelOutboundHandler的所有handler,放在ChannelInboundHandler下面是执行不到的。

比如:

publicclasschildChannelHandlerextendsChannelInitializer{

@OverridepublicvoidinitChannel(SocketChannel ch)throwsException {

ch.pipeline().addLast(newOutboundHandler1());//handler1ch.pipeline().addLast(newOutboundHandler2());//handler2ch.pipeline().addLast(newInboundHandler1());//handler3ch.pipeline().addLast(newInboundHandler2());//handler4}

}

以上4个handler的实际执行顺序分别为handler3 -> handler4 -> handler2 ->handler1

如果在handler4下方加上OutboundHandler3,那么这个handler是不会被执行到的。

3、同步等待绑定指定端口

此处可多次执行bind语句绑定多个端口

ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();

channelFuture= serverBootstrap.bind(8081).sync();

...

4、同步等待服务器关闭信息

channelFuture.channel().closeFuture().sync();

5、最后关闭此前开辟的两个线程池

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

最后整段服务器代码如下:

packageNetty;importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.ChannelFuture;importio.netty.channel.ChannelInitializer;importio.netty.channel.ChannelOption;importio.netty.channel.EventLoopGroup;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioServerSocketChannel;importio.netty.channel.AdaptiveRecvByteBufAllocator;publicclassNettyServer {publicvoidstartServerInPort(intport)throwsException{

EventLoopGroup bossGroup=newNioEventLoopGroup();

EventLoopGroup workerGroup=newNioEventLoopGroup();try{//设置启动辅助类ServerBootstrap serverBootstrap =newServerBootstrap();

serverBootstrap.group(bossGroup workerGroup)

.channel(NioServerSocketChannel.class)//设置channel类型.childOption(ChannelOption.RCVBUF_ALLOCATOR newAdaptiveRecvByteBufAllocator(64 2048 65536))

.childHandler(newchildChannelHandler());//选择执行handler//阻塞等待服务器完全启动ChannelFuture channelFuture =serverBootstrap.bind(port).sync();

channelFuture.channel().closeFuture().sync();

}finally{

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

}

}publicclasschildChannelHandlerextendsChannelInitializer{

@OverrideprotectedvoidinitChannel(SocketChannel ch)throwsException {//TODO 添加各种功能handler 消息加解密,消息规范检测,构建返回码ch.pipeline().addLast(newNettyServerHandler());

}

}

}

客户端的这部分代码和服务器端差不多,就不另开一文啰嗦了。之间贴代码:

importio.netty.bootstrap.Bootstrap;importio.netty.channel.ChannelFuture;importio.netty.channel.ChannelInitializer;importio.netty.channel.ChannelOption;importio.netty.channel.EventLoopGroup;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioSocketChannel;publicclassNettyClient {publicvoidsendMsgToServer()throwsException{

EventLoopGroup group=newNioEventLoopGroup();try{//设置辅助启动类信息Bootstrap bootstrap =newBootstrap();

bootstrap.group(group)

.channel(NioSocketChannel.class)//选择channel类型.option(ChannelOption.TCP_NODELAY true)

.handler(newchildChannelHandler());//阻塞等待成功连接服务器ChannelFuture channelFuture = bootstrap.connect(localhost 8000).sync();//阻塞等待来自服务器的处理结果channelFuture.channel().closeFuture().sync();

}finally{

group.shutdownGracefully();

}

}privateclasschildChannelHandlerextendsChannelInitializer{

@OverrideprotectedvoidinitChannel(SocketChannel ch)throwsException {//TODO 添加其他功能处理Handler,如消息加解密ch.pipeline().addLast(newNettyClientHandler());

}

}

}

猜您喜欢: