快捷搜索:  汽车  科技

netty中文教程(netty入门到弹幕实战)

netty中文教程(netty入门到弹幕实战)零拷贝传统拷贝方式问题原因:TCP协议发送时造成Netty解决方案:4.Netty的零拷贝

一、IO模型
  • 阻塞式IO模型
  • 非阻塞式IO模型
  • IO复用
  • 信号驱动式IO
  • 异步IO
  1. Linux IO流程

netty中文教程(netty入门到弹幕实战)(1)


2. 各个IO模型的比较

netty中文教程(netty入门到弹幕实战)(2)

NIO的优势

  • 事件驱动模型 避免多线程 单线程处理多任务
  • 非阻塞,IO读写不再阻塞,而是返回0
  • 基于通道的传输,比基于流更有效率
  • 更高级的IO函数,零拷贝
  • IO多路复用大大提高了java网络应用的可伸缩性和实用性

NIO的缺点

  • 编程困难
  • 陷阱重重

3.TCP粘包,拆包问题

问题原因:TCP协议发送时造成

netty中文教程(netty入门到弹幕实战)(3)

Netty解决方案:

netty中文教程(netty入门到弹幕实战)(4)


4.Netty的零拷贝

传统拷贝方式

  1. 数据从磁盘读取到内核的read buffer
  2. 数据从内核缓冲区拷贝到用户缓冲区
  3. 数据从用户缓冲区拷贝到内核的socket buffer
  4. 数据从内核的socket buffer拷贝到网卡接口(硬件)的缓冲区

零拷贝

  1. 调用transferTo 数据从文件由DMA引擎拷贝到内核read buffer
  2. 接着DMA从内核read buffer将数据拷贝到网卡接口buffer

Netty中的零拷贝体现在这三个方面:

1.bytebuffer

Netty发送和接收消息主要使用bytebuffer,bytebuffer使用对外内存(DirectMemory)直接进行socket读写。

2.Composite Buffers

传统的byteBuffer,如果需要将两个ByteBuffer中的数据组合到一起,我们需要首先创建一个size=size1 size2大小的新的数组,然后将两个数组中的数据拷贝到新的数组中。但是使用Netty提供的组合ByteBuf,就可以避免这样的操作,因为CompositeByteBuf并没有真正将多个Buffer组合起来,而是保存了它们的引用,从而避免了数据的拷贝,实现了零拷贝。

3.对于FileChannel.transferTo的使用

Netty中使用了FileChannel的transferTo方法,该方法依赖于操作系统实现零拷贝。

二、Netty组件
  1. Channel – 对应NIO中的channel
  2. EventLoop --对应NIO中的while循环
  3. ChannelHandler和ChannelPipline --对应NIO客户逻辑实现handleRead和handleWrite
  4. ByteBuf --对应Nio中的ByteBuffer
  5. BootStrap和BootServerStrap --对应NIO中的Selecter、ServerSocketChannel等的创建、配置、启动

Reactor线程模型

Rector线程模型有三种形式:

1.单线程模型:

netty中文教程(netty入门到弹幕实战)(5)

2.多线程模型:

netty中文教程(netty入门到弹幕实战)(6)

3.mutiple模型

netty中文教程(netty入门到弹幕实战)(7)

Netty对这三种模式都有支持

三、简单的例子

1.引入pom包

<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.42.Final</version> </dependency>

2.服务器端

public class TimeServer { public void bind(int port) throws InterruptedException { EventLoopGroup boosGroup=new NioEventLoopGroup(); EventLoopGroup workerGroup=new NioEventLoopGroup(); try{ ServerBootstrap b=new ServerBootstrap(); b.group(boosGroup workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG 1024) .childHandler(new ChildChannelHandler()); //绑定端口,同步等待成功 ChannelFuture future = b.bind(port).sync(); //等待服务端监听关闭 future.channel().closeFuture().sync(); }finally { //优雅退出,释放线程池资源 boosGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new TimeServer().bind(8888); } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new TimeServerHandler()); } } }

public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx Object msg) throws Exception { ByteBuf buf=(ByteBuf)msg; byte[] req=new byte[buf.readableBytes()]; buf.readBytes(req); String body=new String(req "UTF-8"); System.out.println(body); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String currentTime=simpleDateFormat.format(new Date()); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.write(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } } 实战弹幕系统

java:

public class WebSocketDanmuServer { private int port; public WebSocketDanmuServer(int port) { this.port = port; } public void run(){ EventLoopGroup bossGroup=new NioEventLoopGroup(1); EventLoopGroup workGroup=new NioEventLoopGroup(8); try { ServerBootstrap b=new ServerBootstrap(); b.group(bossGroup workGroup) .channel(NioServerSocketChannel.class) .childHandler(new WebsocketDanmuServerInitializer()) .option(ChannelOption.SO_BACKLOG 128) .childOption(ChannelOption.SO_KEEPALIVE true); System.out.println("弹幕系统启动了 " port); ChannelFuture future = b.bind(port).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) { new WebSocketDanmuServer(8080).run(); } }

public class WebsocketDanmuServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast("http-decodec" new HttprequestDecoder()); pipeline.addLast("http-aggregator" new HttpObjectAggregator(65536)); pipeline.addLast("http-encodec" new HttpResponseEncoder()); pipeline.addLast("http-chunked" new ChunkedWriteHandler()); /* pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(64*1024)); pipeline.addLast(new ChunkedWriteHandler()); */ pipeline.addLast("http-request" new HttpRequestHandler("/ws")); pipeline.addLast("WebSocket-Protocol" new WebSocketServerProtocolHandler("/ws")); pipeline.addLast("WebSocket-request" new TextWebSocketFrameHandler()); } }

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { private final String wsUri; private static final File INDEX; static { URL location=HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation(); try { String path = location.toURI() "WebsocketDanMu.html"; path = !path.contains("file:") ? path : path.substring(5); INDEX = new File(path); } catch (URISyntaxException e) { throw new IllegalStateException("Unable to locate WebsocketChatClient.html" e); } } public HttpRequestHandler(String wsUri) { this.wsUri = wsUri; } private static void send100Continue(ChannelHandlerContext ctx) { FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1 HttpResponseStatus.CONTINUE); ctx.writeAndFlush(response); } @Override protected void channelRead0(ChannelHandlerContext ctx FullHttpRequest request) throws Exception { if(wsUri.equalsIgnoreCase(request.getUri())){ ctx.fireChannelRead(request.retain()); }else { if(HttpHeaders.is100ContinueExpected(request)){ send100Continue(ctx); } RandomAccessFile file = new RandomAccessFile(INDEX "r");//4 HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion() HttpResponseStatus.OK); response.headers().set(HttpHeaders.Names.CONTENT_TYPE "text/html; charset=UTF-8"); boolean keepAlive = HttpHeaders.isKeepAlive(request); if (keepAlive) { //5 response.headers().set(HttpHeaders.Names.CONTENT_LENGTH file.length()); response.headers().set(HttpHeaders.Names.CONNECTION HttpHeaders.Values.KEEP_ALIVE); } ctx.write(response); //6 if (ctx.pipeline().get(SslHandler.class) == null) { //7 ctx.write(new DefaultFileRegion(file.getChannel() 0 file.length())); } else { ctx.write(new ChunkedNioFile(file.getChannel())); } ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); //8 if (!keepAlive) { future.addListener(ChannelFutureListener.CLOSE); //9 } file.close(); } } }

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx TextWebSocketFrame msg) throws Exception { Channel incoming=ctx.channel(); for (Channel channel:channels){ if(channel!=incoming){ channel.writeAndFlush(new TextWebSocketFrame(msg.text())); }else { channel.writeAndFlush(new TextWebSocketFrame("我发送的 " msg.text())); } } } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2) Channel incoming = ctx.channel(); // Broadcast a message to multiple Channels channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " incoming.remoteAddress() " 加入")); channels.add(incoming); System.out.println("Client:" incoming.remoteAddress() "加入"); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3) Channel incoming = ctx.channel(); // Broadcast a message to multiple Channels channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " incoming.remoteAddress() " 离开")); System.err.println("Client:" incoming.remoteAddress() "离开"); // A closed Channel is automatically removed from ChannelGroup // so there is no need to do "channels.remove(ctx.channel());" } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5) Channel incoming = ctx.channel(); System.out.println("Client:" incoming.remoteAddress() "在线"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6) Channel incoming = ctx.channel(); System.err.println("Client:" incoming.remoteAddress() "掉线"); } @Override public void exceptionCaught(ChannelHandlerContext ctx Throwable cause) // (7) throws Exception { Channel incoming = ctx.channel(); System.err.println("Client:" incoming.remoteAddress() "异常"); // 当出现异常就关闭连接 cause.printStackTrace(); ctx.close(); } }

html:

<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta name="Keywords" content="danmu"> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>弹幕网站</title> <style type="text/css"> body { background: url(http://ot0ak6eri.bkt.clouddn.com/01.jpg); no-repeat:top center; font-size: 12px; font-family: "微软雅黑"; } * { margin: 0; padding: 0; } /* screen start*/ .screen { width: 300px; height: 100px; background: #669900; } .dm { width: 100%; height: 100%; position: absolute; top: 0; left: 0; display: none; } .dm .d_screen .d_del { width: 38px; height: 38px; background: #600; display: block; text-align: center; line-height: 38px; text-decoration: none; font-size: 20px; color: #fff; border-radius: 19px; border: 1px solid #fff; z-index: 2; position: absolute; right: 20px; top: 20px; outline: none; } .dm .d_screen .d_del:hover { background: #F00; } .dm .d_screen .d_mask { width: 100%; height: 100%; background: #000; position: absolute; top: 0; left: 0; opacity: 0.6; filter: alpha(opacity = 60); z-index: 1; } .dm .d_screen .d_show { position: relative; z-index: 2; } .dm .d_screen .d_show div { font-size: 26px; line-height: 36px; font-weight: 500; position: absolute; top: 76px; left: 10; color: #fff; } /*end screen*/ /*send start*/ .send { width: 100%; height: 76px; position: absolute; bottom: 0; left: 0; border: 1px solid red; } .send .s_filter { width: 100%; height: 76px; background: #000; position: absolute; bottom: 0; left: 0; opacity: 0.6; filter: alpha(opacity = 60); } .send .s_con { width: 100%; height: 76px; position: absolute; top: 0; left: 0; z-index: 2; text-align: center; line-height: 76px; } .send .s_con .s_text { width: 800px; height: 36px; border: 0; border-radius: 6px 0 0 6px; outline: none; } .send .s_con .s_submit { width: 100px; height: 36px; border-radius: 0 6px 6px 0; outline: none; font-size: 14px; color: #fff; background: #65c33d; font-family: "微软雅黑"; cursor: pointer; border: 1px solid #5bba32; } .send .s_con .s_submit:hover { background: #3eaf0e; } /*end send*/ </style> </head> <body> <a href="#" id="startDm">开启弹幕</a> <!-- dm start --> <div class="dm"> <!-- d_screen start --> <div class="d_screen"> <a href="#" class="d_del">X</a> <div class="d_mask"></div> <div class="d_show"> </div> </div> <!-- end d_screen --> <!-- send start --> <div class="send"> <div class="s_filter"></div> <div class="s_con"> <input type="text" class="s_text" /> <input type="button" value="发表评论" class="s_submit" id="btn"/> </div> </div> <!-- end send --> </div> <!-- end dm--> <script type="text/javascript" src="http://ajax.aspnetcdn.com/ajax/jQuery/jquery-1.8.0.js"></script> <script type="text/javascript" > String.prototype.endWith=function(str){ if(str==null||str==""||this.length==0||str.length>this.length) return false; if(this.substring(this.length-str.length)==str) return true; else return false; return true; } String.prototype.startWith=function(str){ if(str==null||str==""||this.length==0||str.length>this.length) return false; if(this.substr(0 str.length)==str) return true; else return false; return true; } </script> <!--<script type="text/javascript" src="websocket.js"></script>--> <script type="text/javascript"> $(function() { $("#startDm .d_del").click(function() { $("#startDm .dm").toggle(1000); //init_screen(); }); $("#btn").click(function(){ send(); }); $(".s_text").keydown(function() { var code = window.event.keyCode; if (code == 13)//回车键按下时,输出到弹幕 { send(); } }); }); function launch() { var _height = $(window).height(); var _left = $(window).width() - $("#" index).width(); var time=10000; if(index%2==0) time=20000; _top =80; if(_top>_height-100) _top=80; $("#" index).css({ left:_left top:_top color:getRandomColor() }); $("#" index).animate({ left:"-" _left "px"} time function(){}); index ; } /* //初始化弹幕 function init_screen() { var _top = 0; var _height = $(window).height(); $(".d_show").find("div").show().each(function() { var _left = $(window).width() - $(this).width(); var time=10000; if($(this).index()%2==0) time=20000; _top =80; if(_top>_height-100) _top=80; $(this).css({ left:_left top:_top color:getRandomColor() }); $(this).animate({ left:"-" _left "px"} time function(){}); }); } */ //随机获取颜色值 function getRandomColor() { return '#' (function(h) { return new Array(7 - h.length).join("0") h })((Math.random() * 0x1000000 << 0).toString(16)) } </script> <script type="text/javascript"> var websocket=null; var _top=80; var index=0; var host=window.location.host; //判断当前浏览器是否支持WebSocket if('WebSocket' in window){ websocket=new WebSocket("ws://" host "/ws"); } else{ alert("Not Support WebSocket!"); } //连接发生错误的回调方法 websocket.onerror = function(){ setMessageInnerHTML("error"); }; //连接成功建立的回调方法 websocket.onopen = function(event){ setMessageInnerHTML("open"); } //接收到消息的回调方法 // 收到服务器发送的消息 websocket.onmessage = function(){ setMessageInnerHTML(event.data); } //连接关闭的回调方法 websocket.onclose = function(){ setMessageInnerHTML("close"); } //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。 window.onbeforeunload = function(){ websocket.close(); } //将消息显示在网页上 function setMessageInnerHTML(innerHTML){ //修改背景图 var imgurl; if (innerHTML.startWith("~background ")) { var cmd = innerHTML; imgurl = cmd.split(" ")[1]; document.body.style.background = "url(" imgurl ")"; }else{ $(".d_show").append("<div id='" index "'>" innerHTML "</div>"); } launch(); } //发送消息 function send(){ //var message = document.getElementById('text').value; var message = $(".s_text").val(); $(".s_text").val(""); websocket.send(message); } </script> </body> </html>

猜您喜欢: