快捷搜索:  汽车  科技

java io是什么模式(从操作系统层面分析)

java io是什么模式(从操作系统层面分析)recvfrom(6 "hello bio\n" 8192 0 NULL NULL) =关于对recvfrom函数的说明,其中第四个参数0 表示这是一个阻塞调用。clone出来的线程socket(AF_INET SOCK_STREAM IPPROTO_IP) = 5 bind(5 {sa_family=AF_INET sin_port=htons(8090) sin_addr=inet_addr("0.0.0.0")} 16) = 0 listen(5 50) = 0 poll([{fd=5 events=POLLIN|POLLERR}] 1 -1) = 1 ([{fd=5 revents=POLLIN}])poll函数会阻塞直到其中任何一个fd发生事件。有客户端连接后acc

java io是什么模式(从操作系统层面分析)(1)

前言

本文从操作系统实际调用角度(以CentOS Linux release 7.5操作系统为示例),力求追根溯源看IO的每一步操作到底发生了什么。

关于如何查看系统调用,Linux可以使用 strace 来查看任何软件的系统调动(这是个很好的分析学习方法):strace -ff -o ./out java TestJava

一 bio

/** * Alipay.com Inc. Copyright (c) 2004-2020 All Rights Reserved. */ package io; import java.io.*; import java.net.Serversocket; import java.net.Socket; /** * @author xiangyong.ding * @version $Id: TestSocket.java v 0.1 2020年08月02日 20:56 xiangyong.ding Exp $ */ public class BIOSocket { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(8090); System.out.println("step1: new ServerSocket "); while (true) { Socket client = serverSocket.accept(); system.out.println("step2: client\t" client.getPort()); new Thread(() -> { try { InputStream in = client.getInputStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(in)); while (true) { System.out.println(reader.readLine()); } } catch (IOException e) { e.printStackTrace(); } }).start(); } } }

1 发生的系统调用

启动时

socket(AF_INET SOCK_STREAM IPPROTO_IP) = 5 bind(5 {sa_family=AF_INET sin_port=htons(8090) sin_addr=inet_addr("0.0.0.0")} 16) = 0 listen(5 50) = 0 poll([{fd=5 events=POLLIN|POLLERR}] 1 -1) = 1 ([{fd=5 revents=POLLIN}])

poll函数会阻塞直到其中任何一个fd发生事件。

有客户端连接后

accept(5 {sa_family=AF_INET sin_port=htons(10253) sin_addr=inet_addr("42.120.74.252")} [16]) = 6 clone(child_stack=0x7f013e5c4fb0 flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID parent_tidptr=0x7f013e5c59d0 tls=0x7f013e5c5700 child_tidptr=0x7f013e5c59d0) = 13168 poll([{fd=5 events=POLLIN|POLLERR}] 1 -1

抛出线程(即我们代码里的 new Thread() )后,继续poll阻塞等待连接。

clone出来的线程

recvfrom(6 "hello bio\n" 8192 0 NULL NULL) =

关于对recvfrom函数的说明,其中第四个参数0 表示这是一个阻塞调用。

客户端发送数据后

recvfrom(6 "hello bio\n" 8192 0 NULL NULL) = 10

2 优缺点

优点

代码简单,逻辑清晰。

缺点

  • 由于stream的read操作是阻塞读,面对多个连接时 每个连接需要每线程。无法处理大量连接(C10K问题)。
  • 误区:可见JDK1.8中对于最初的BIO,在Linux OS下仍然使用的poll,poll本身也是相对比较高效的多路复用函数(支持非阻塞、多个socket同时检查event),只是限于JDK最初的stream API限制,无法支持非阻塞读取。
二 NIO(non block)

改进:使用NIO API,将阻塞变为非阻塞, 不需要大量线程。

/** * Alipay.com Inc. Copyright (c) 2004-2020 All Rights Reserved. */ package io; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.LinkedList; /** * @author xiangyong.ding * @version $Id: NioSocket.java v 0.1 2020年08月09日 11:25 xiangyong.ding Exp $ */ public class NIOSocket { private static LinkedList< SocketChannel> clients = new LinkedList<>(); private static void startClientChannelHandleThread(){ new Thread(() -> { while (true){ ByteBuffer buffer = ByteBuffer.allocateDirect(4096); //处理客户端连接 for (SocketChannel c : clients) { // 非阻塞 >0 表示读取到的字节数量 0或-1表示未读取到或读取异常 int num = 0; try { num = c.read(buffer); } catch (IOException e) { e.printStackTrace(); } if (num > 0) { buffer.flip(); byte[] clientBytes = new byte[buffer.limit()]; //从缓冲区 读取到内存中 buffer.get(clientBytes); System.out.println(c.socket().getPort() ":" new String(clientBytes)); //清空缓冲区 buffer.clear(); } } } }).start(); } public static void main(String[] args) throws IOException { //new socket 开启监听 ServerSocketChannel socketChannel = ServerSocketChannel.open(); socketChannel.bind(new InetSocketAddress(9090)); //设置阻塞接受客户端连接 socketChannel.configureBlocking(true); //开始client处理线程 startClientChannelHandleThread(); while (true) { //接受客户端连接; 非阻塞,无客户端返回null(操作系统返回-1) SocketChannel client = socketChannel.accept(); if (client == null) { //System.out.println("no client"); } else { //设置读非阻塞 client.configureBlocking(false); int port = client.socket().getPort(); System.out.println("client port :" port); clients.add(client); } } } }

1 发生的系统调用

主线程

socket(AF_INET SOCK_STREAM IPPROTO_IP) = 4 bind(4 {sa_family=AF_INET sin_port=htons(9090) sin_addr=inet_addr("0.0.0.0")} 16) = 0 listen(4 50) = 0 fcntl(4 F_SETFL O_RDWR|O_NONBLOCK) = 0 accept(4 0x7fe26414e680 0x7fe26c376710) = -1 EAGAIN (Resource temporarily unavailable)

有连接后,子线程

read(6 0x7f3f415b1c50 4096) = -1 EAGAIN (Resource temporarily unavailable) read(6 0x7f3f415b1c50 4096) = -1 EAGAIN (Resource temporarily unavailable) ...

资源使用情况:

java io是什么模式(从操作系统层面分析)(2)

2 优缺点

优点

线程数大大减少。

缺点

需要程序自己扫描 每个连接read,需要 O(n)时间复杂度系统调用 (此时可能只有一个连接发送了数据),高频系统调用(导致CPU 用户态内核态切换)高。导致CPU消耗很高。

三 多路复用器(select、poll、EPOLL)

改进:不需要用户扫描所有连接,由kernel 给出哪些连接有数据,然后应用从有数据的连接读取数据。

1 epoll

import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.LinkedList; import java.util.Set; /** * 多路复用socket * * @author xiangyong.ding * @version $Id: MultiplexingSocket.java v 0.1 2020年08月09日 12:19 xiangyong.ding Exp $ */ public class MultiplexingSocket { static ByteBuffer buffer = ByteBuffer.allocateDirect(4096); public static void main(String[] args) throws Exception { LinkedList< SocketChannel> clients = new LinkedList<>(); //1.启动server //new socket 开启监听 ServerSocketChannel socketChannel = ServerSocketChannel.open(); socketChannel.bind(new InetSocketAddress(9090)); //设置非阻塞,接受客户端 socketChannel.configureBlocking(false); //多路复用器(JDK包装的代理,select /poll/epoll/kqueue) Selector selector = Selector.open(); //java自动代理,默认为epoll //Selector selector = PollSelectorProvider.provider().openSelector();//指定为poll //将服务端socket 注册到 多路复用器 socketChannel.register(selector SelectionKey.OP_ACCEPT); //2. 轮训多路复用器 // 先询问有没有连接 如果有则返回数量以及对应的对象(fd) while (selector.select() > 0) { System.out.println(); Set< SelectionKey> selectionKeys = selector.selectedKeys(); Iterator< SelectionKey> iter = selectionKeys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); //2.1 处理新的连接 if (key.isAcceptable()) { //接受客户端连接; 非阻塞,无客户端返回null(操作系统返回-1) SocketChannel client = socketChannel.accept(); //设置读非阻塞 client.configureBlocking(false); //同样,把client也注册到selector client.register(selector SelectionKey.OP_READ); System.out.println("new client : " client.getRemoteAddress()); } //2.2 处理读取数据 else if (key.isReadable()) { readDataFromSocket(key); } } } } protected static void readDataFromSocket(SelectionKey key) throws Exception { SocketChannel socketChannel = (SocketChannel) key.channel(); // 非阻塞 >0 表示读取到的字节数量 0或-1表示未读取到或读取异常 // 请注意:这个例子降低复杂度,不考虑报文大于buffer size的情况 int num = socketChannel.read(buffer); if (num > 0) { buffer.flip(); byte[] clientBytes = new byte[buffer.limit()]; //从缓冲区 读取到内存中 buffer.get(clientBytes); System.out.println(socketChannel.socket().getPort() ":" new String(clientBytes)); //清空缓冲区 buffer.clear(); } } }

2 发生的系统调用

启动

socket(AF_INET SOCK_STREAM IPPROTO_IP) = 4 bind(4 {sa_family=AF_INET sin_port=htons(9090) sin_addr=inet_addr("0.0.0.0")} 16) = 0 listen(4 50) fcntl(4 F_SETFL O_RDWR|O_NONBLOCK) = 0 epoll_create(256) = 7 epoll_ctl(7 EPOLL_CTL_ADD 5 {EPOLLIN {u32=5 u64=4324783852322029573}}) = 0 epoll_ctl(7 EPOLL_CTL_ADD 4 {EPOLLIN {u32=4 u64=158913789956}}) = 0 epoll_wait(7

关于对epoll_create(对应着Java的 Selector selector = Selector.open()) 的说明,本质上是在内存的操作系统保留区,创建一个epoll数据结构。用于后面当有client连接时,向该epoll区中添加监听。

有连接

epoll_wait(7 [{EPOLLIN {u32=4 u64=158913789956}}] 8192 -1) = 1 accept(4 {sa_family=AF_INET sin_port=htons(29597) sin_addr=inet_addr("42.120.74.252")} [16]) = 8 fcntl(8 F_SETFL O_RDWR|O_NONBLOCK) = 0 epoll_ctl(7 EPOLL_CTL_ADD 8 {EPOLLIN {u32=8 u64=3212844375897800712}}) = 0

关于epoll_ctl (对应着Java的 client.register(selector SelectionKey.OP_READ) )。其中 EPOLLIN 恰好对应着Java的 SelectionKey.OP_READ 即监听数据到达读取事件。

客户端发送数据

epoll_wait(7 [{EPOLLIN {u32=8 u64=3212844375897800712}}] 8192 -1) = 1 read(8 "hello multiplex\n" 4096) = 16 epoll_wait(7

note:epoll_wait第四个参数-1表示block。

poll 和 epoll 对比

根据“1.BIO”中的poll函数调用和epoll函数对比如下:

java io是什么模式(从操作系统层面分析)(3)

java io是什么模式(从操作系统层面分析)(4)

poll和epoll本质上都是同步IO, 区别于BIO的是 多路复用充分降低了 system call,而epoll更进一步,再次降低了system call的时间复杂度。

3 优缺点

优点

  • 线程数同样很少,甚至可以把acceptor线程和worker线程使用同一个。
  • 时间复杂度低,Java实现的Selector(在Linux OS下使用的epoll函数)支持多个clientChannel事件的一次性获取,且时间复杂度维持在O(1)。
  • CPU使用低:得益于Selector,我们不用向 “2.NIO”中需要自己一个个ClientChannel手动去检查事件,因此使得CPU使用率大大降低。

缺点

  • 数据处理麻烦:目前socketChannel.read 读取数据完全是基于字节的,当我们需要需要作为HTTP服务网关时,对于HTTP协议的处理完全需要自己解析,这是个庞大、烦杂、容易出错的工作。
  • 性能现有socket数据的读取(socketChannel.read(buffer))全部通过一个buffer 缓冲区来接受,一旦连接多起来,这无疑是一个单线程读取,性能无疑是个问题。那么此时buffer我们每次读取都重新new出来呢?如果每次都new出来,这样的内存碎片对于GC无疑是一场灾难。如何平衡地协调好buffer的共享,既保证性能,又保证线程安全,这是个难题。
四 Netty

1 研究的目标源码(netty提供的入门example)

TelnetServer

package telnet; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; /** * Simplistic telnet server. */ public final class TelnetServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port" SSL? "8992" : "8023")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate() ssc.privateKey()).build(); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new TelnetServerInitializer(sslCtx)); b.bind(PORT).sync().channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

TelnetServerHandler

package telnet;

import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.net.InetAddress; import java.util.Date; /** * Handles a server-side channel. */ @Sharable public class TelnetServerHandler extends SimpleChannelInboundHandler< String> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // Send greeting for a new connection. ctx.write("Welcome to " InetAddress.getLocalHost().getHostName() "!\r\n"); ctx.write("It is " new Date() " now.\r\n"); ctx.flush(); } @Override public void channelRead0(ChannelHandlerContext ctx String request) throws Exception { // Generate and write a response. String response; boolean close = false; if (request.isEmpty()) { response = "Please type something.\r\n"; } else if ("bye".equals(request.toLowerCase())) { response = "Have a good day!\r\n"; close = true; } else { response = "Did you say '" request "'?\r\n"; } // We do not need to write a ChannelBuffer here. // We know the encoder inserted at TelnetPipelineFactory will do the conversion. ChannelFuture future = ctx.write(response); // Close the connection after sending 'Have a good day!' // if the client has sent 'bye'. if (close) { future.addListener(ChannelFutureListener.CLOSE); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx Throwable cause) { cause.printStackTrace(); ctx.close(); } }

TelnetServerInitializer

package telnet; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.ssl.SslContext; /** * Creates a newly configured {@link ChannelPipeline} for a new channel. */ public class TelnetServerInitializer extends ChannelInitializer< SocketChannel> { private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); private static final TelnetServerHandler SERVER_HANDLER = new TelnetServerHandler(); private final SslContext sslCtx; public TelnetServerInitializer(SslContext sslCtx) { this.sslCtx = sslCtx; } @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } // Add the text line codec combination first pipeline.addLast(new DelimiterBasedFrameDecoder(8192 Delimiters.lineDelimiter())); // the encoder and decoder are static as these are sharable pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // and then business logic. pipeline.addLast(SERVER_HANDLER); } }

2 启动后的系统调用

主线程(23109)

## 256无实际作用,这里只为了兼容旧版kernel api epoll_create(256) = 7epoll_ctl(7 EPOLL_CTL_ADD 5 {EPOLLIN {u32=5 u64=5477705356928876549}}) = 0 epoll_create(256) = 10epoll_ctl(10 EPOLL_CTL_ADD 8 {EPOLLIN {u32=8 u64=17041805914081853448}}) = 0 epoll_create(256) = 13 epoll_ctl(13 EPOLL_CTL_ADD 11 {EPOLLIN {u32=11 u64=17042151607409573899}}) = 0 epoll_create(256) = 16 epoll_ctl(16 EPOLL_CTL_ADD 14 {EPOLLIN {u32=14 u64=17042497300737294350}}) = 0 epoll_create(256) = 19 epoll_ctl(19 EPOLL_CTL_ADD 17 {EPOLLIN {u32=17 u64=17042561450368827409}}) = 0 epoll_create(256) = 10 socket(AF_INET SOCK_STREAM IPPROTO_IP) = 20 clone(child_stack=0x7fc3c509afb0 flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID parent_tidptr=0x7fc3c509b9d0 tls=0x7fc3c509b700 child_tidptr=0x7fc3c509b9d0) = 23130

概括为:

  • 向OS新建socket,并开启clone boss线程23130。
  • 为BOSS创建了一个epoll(论证参见下面“boss”),每个worker创建一个epoll数据结构(本质上是在kernel内存区创建了一个数据结构,用于后续监听)。
  • 创建boss线程监听的socket(本质上在kernel中创建一个数据结构)。

boss(23130)

bind(20 {sa_family=AF_INET sin_port=htons(8023) sin_addr=inet_addr("0.0.0.0")} 16) = 0 listen(20 128) = 0 getsockname(20 {sa_family=AF_INET sin_port=htons(8023) sin_addr=inet_addr("0.0.0.0")} [16]) = 0 getsockname(20 {sa_family=AF_INET sin_port=htons(8023) sin_addr=inet_addr("0.0.0.0")} [16]) = 0 ##将fd为7号epoll和fd为20号的socket绑定,事件:epoll_ctl_add和epoll_ctl_mod epoll_ctl(7 EPOLL_CTL_ADD 20 {EPOLLIN {u32=20 u64=14198059139132817428}}) = 0 epoll_ctl(7 EPOLL_CTL_MOD 20 {EPOLLIN {u32=20 u64=20}}) = 0 epoll_wait(7 [{EPOLLIN {u32=5 u64=17295150779149058053}}] 8192 1000) = 1 epoll_wait(7 [] 8192 1000) = 0(不断轮训,1S超时一次)

概括为:

  • 将上一步中main线程创建的fd:20绑定端口8023,并开启监听(网卡负责监听和接受连接和数据,kernel则负责路由到具体进程,具体参见:关于socket和bind和listen,TODO )。
  • 将7号socket对应的fd绑定到20号对应的epoll数据结构上去(都是操作kernel中的内存)。
  • 开始1S中一次阻塞等待epoll有任何连接或数据到达。

3 客户端连接

boss (23130)

accept(20 {sa_family=AF_INET sin_port=htons(11144) sin_addr=inet_addr("42.120.74.122")} [16]) = 24 getsockname(24 {sa_family=AF_INET sin_port=htons(8023) sin_addr=inet_addr("192.168.0.120")} [16]) = 0 getsockname(24 {sa_family=AF_INET sin_port=htons(8023) sin_addr=inet_addr("192.168.0.120")} [16]) = 0 setsockopt(24 SOL_TCP TCP_NODELAY [1] 4) = 0 getsockopt(24 SOL_SOCKET SO_SNDBUF [87040] [4]) = 0 getsockopt(24 SOL_SOCKET SO_SNDBUF [87040] [4]) = 0 ##抛出 work线程 clone(child_stack=0x7fc3c4c98fb0 flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID parent_tidptr=0x7fc3c4c999d0 tls=0x7fc3c4c99700 child_tidptr=0x7fc3c4c999d0) = 2301

worker (2301)

writev(24 [{"Welcome to iZbp14e1g9ztpshfrla9m"... 37} {"It is Sun Aug 23 15:44:14 CST 20"... 41}] 2) = 78 epoll_ctl(13 EPOLL_CTL_ADD 24 {EPOLLIN {u32=24 u64=24}}) = 0 epoll_ctl(13 EPOLL_CTL_MOD 24 {EPOLLIN {u32=24 u64=14180008216221450264}}) = 0 epoll_wait(13 [{EPOLLIN {u32=11 u64=17042151607409573899}}] 8192 1000) = 1 read(11 "\1" 128) = 1 ##开始无限loop epoll_wait(13 [] 8192 1000) = 0 epoll_wait(13 [{EPOLLIN {u32=24 u64=24}}] 8192 1000) = 1

概括:

  • 当BOSS轮训epoll_wait等到了连接后,首先accept得到该socket对应的fd。
  • 连接建立后 BOSS立马抛出一个线程(clone函数)。
  • worker(即新建的线程)写入了一段数据(这里是业务逻辑)。
  • worker将该client对应的fd绑定到了13号epoll上。
  • worker继续轮训监听13号epoll。

4 客户端主动发送数据

worker(2301)

read(24 "i am daojian\r\n" 1024) = 14 write(24 "Did you say 'i am daojian'?\r\n" 29) = 29 ##继续无限loop epoll_wait(13 [] 8192 1000) = 0

概括为:

  • wait到数据后,立即read到用户控件内存中(读取1024个字节到 用户控件某个buff中)。
  • 写入数据(业务逻辑,不必太关注)。
  • 继续轮训等待13号epoll。

5 客户端发送bye报文,服务器断开TCP连接

worker(2301)

read(24 "bye\r\n" 1024) = 5 write(24 "Have a good day!\r\n" 18) = 18 getsockopt(24 SOL_SOCKET SO_LINGER {onoff=0 linger=0} [8]) = 0 dup2(25 24) = 24 ##从epoll数据结构中(OS)中删除fd为24的socket epoll_ctl(13 EPOLL_CTL_DEL 24 0x7f702dd531e0) = -1 ENOENT ##关闭24 socket close(24) = 0 ##继续等待13 epoll数据 epoll_wait(13 [] 8192 1000) = 0

断开客户端连接概括为:

  • 从epoll中删除该客户端对应的fd(这里触发源头没找到,可能是boss)。
  • close关闭客户端24号fd。
  • 继续轮训epoll。

6 五个客户端同时连接

boss线程(23130)

accept(20 {sa_family=AF_INET sin_port=htons(1846) sin_addr=inet_addr("42.120.74.122")} [16]) = 24 clone(child_stack=0x7f702cc51fb0 flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID parent_tidptr=0x7f702cc529d0 tls=0x7f702cc52700 child_tidptr=0x7f702cc529d0) = 10035 accept(20 {sa_family=AF_INET sin_port=htons(42067) sin_addr=inet_addr("42.120.74.122")} [16]) = 26 clone(child_stack=0x7f702cb50fb0 flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID parent_tidptr=0x7f702cb519d0 tls=0x7f702cb51700 child_tidptr=0x7f702cb519d0) = 10067 ...

woker线程(10035,第一个连接)

epoll_ctl(13 EPOLL_CTL_ADD 24 {EPOLLIN {u32=24 u64=24}}) = 0 epoll_ctl(13 EPOLL_CTL_MOD 24 {EPOLLIN {u32=24 u64=3226004877247250456}}) = 0 epoll_wait(13 [{EPOLLIN {u32=11 u64=17042151607409573899}}] 8192 1000) = 1 = 1 epoll_wait(13 [] 8192 1000) = 0

worker线程(10067,第二个连接)

epoll_ctl(16 EPOLL_CTL_ADD 26 {EPOLLIN {u32=26 u64=26}}) = 0 epoll_ctl(16 EPOLL_CTL_MOD 26 {EPOLLIN {u32=26 u64=3221483685433835546}}) = 0 epoll_wait(16 [{EPOLLIN {u32=14 u64=17042497300737294350}}] 8192 1000) = 1 epoll_wait(16 [] 8192 1000) = 0 epoll_wait(16 [] 8192 1000) = 0

worker线程(10067,第二个连接)

epoll_ctl(19 EPOLL_CTL_ADD 27 {EPOLLIN {u32=27 u64=27}}) = 0 epoll_ctl(19 EPOLL_CTL_MOD 27 {EPOLLIN {u32=27 u64=3216966479350071323}}) = 0

worker线程(8055,第四个连接)

epoll_ctl(10 EPOLL_CTL_ADD 28 {EPOLLIN {u32=28 u64=28}}) = 0 epoll_ctl(10 EPOLL_CTL_MOD 28 {EPOLLIN {u32=28 u64=3302604828697427996}}) = 0

worker线程(10035,第五个连接,不在clone线程,而是复用了第一个epoll对应的worker)

epoll_ctl(13 EPOLL_CTL_ADD 29 {EPOLLIN {u32=29 u64=29}}) = 0 epoll_ctl(13 EPOLL_CTL_MOD 29 {EPOLLIN {u32=29 u64=29}}) = 0

概括为:

  • epoll和boss、worker之间的关系:一共有4个worker对应着4个epoll对象,boss和每个worker都有对应自己的epoll。
  • boss根据epoll数量,平衡分配连接到每个worker对应的epoll中。

7 总结

下图通过对系统调用的调查得出 netty 和 kernel 交互图:

java io是什么模式(从操作系统层面分析)(5)

初始化直接创建5个epoll,其中7号为boss使用,专门用于处理和客户端连接;其余4个用来给worker使用,用户处理和客户端的数据交互。

work的线程数量,取决于初始化时创建了几个epoll,worker的复用本质上是epoll的复用。

work之间为什么要独立使用epoll?为什么不共享?

  • 为了避免各个worker之间发生争抢连接处理,netty直接做了物理隔离,避免竞争。各个worker只负责处理自己管理的连接,并且后续该worker中的每个client的读写操作完全由 该线程单独处理,天然避免了资源竞争,避免了锁。
  • worker单线程,性能考虑:worker不仅仅要epoll_wait,还是处理read、write逻辑,加入worker处理了过多的连接,势必造成这部分消耗时间片过多,来不及处理更多连接,性能下降。

8 优缺点

优点

  • 数据处理:netty提供了大量成熟的数据处理组件(ENCODER、DECODER),HTTP、POP3拿来即用。
  • 编码复杂度、可维护性:netty充分使得业务逻辑与网络处理解耦,只需要少量的BootStrap配置即可,更多的集中在业务逻辑处理上。
  • 性能:netty提供了的ByteBuf(底层Java原生的ByteBuffer),提供了池化的ByteBuf,兼顾读取性能和ByteBuf内存分配(在后续文档中会再做详解)。

缺点

  • 入门有一定难度。
五 AIO

1 启动

main线程

epoll_create(256) = 5 epoll_ctl(5 EPOLL_CTL_ADD 6 {EPOLLIN {u32=6 u64=11590018039084482566}}) = 0 ##创建BOSS 线程(Proactor) clone(child_stack=0x7f340ac06fb0 flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID parent_tidptr=0x7f340ac079d0 tls=0x7f340ac07700 child_tidptr=0x7f340ac079d0) = 22704 socket(AF_INET6 SOCK_STREAM IPPROTO_IP) = 8 setsockopt(8 SOL_IPV6 IPV6_V6ONLY [0] 4) = 0 setsockopt(8 SOL_SOCKET SO_REUSEADDR [1] 4) = 0 bind(8 {sa_family=AF_INET6 sin6_port=htons(9090) inet_pton(AF_INET6 "::" &sin6_addr) sin6_flowinfo=0 sin6_scope_id=0} 28) = 0 listen(8 50) accept(8 0x7f67d01b3120 0x7f67d9246690) = -1 epoll_ctl(5 EPOLL_CTL_MOD 8 {EPOLLIN|EPOLLONESHOT {u32=8 u64=15380749440025362440}}) = -1 ENOENT (No such file or directory) epoll_ctl(5 EPOLL_CTL_ADD 8 {EPOLLIN|EPOLLONESHOT {u32=8 u64=15380749440025362440}}) = 0 read(0

22704(BOSS 线程(Proactor))

epoll_wait(5 < unfinished ...>

2 请求连接

**22704(BOSS 线程(Proactor))处理连接** epoll_wait(5 [{EPOLLIN {u32=9 u64=4294967305}}] 512 -1) = 1 accept(8 {sa_family=AF_INET6 sin6_port=htons(55320) inet_pton(AF_INET6 "::ffff:36.24.32.140" &sin6_addr) sin6_flowinfo=0 sin6_scope_id=0} [28]) = 9 clone(child_stack=0x7ff35c99ffb0 flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID parent_tidptr=0x7ff35c9a09d0 tls=0x7ff35c9a0700 child_tidptr=0x7ff35c9a09d0) = 26241 epoll_wait(5 < unfinished ...>

26241

#将client 连接的FD加入到BOSS的epoll中,以便BOSS线程监听网络事件 epoll_ctl(5 EPOLL_CTL_MOD 9 {EPOLLIN|EPOLLONESHOT {u32=9 u64=4398046511113}}) = -1 ENOENT (No such file or directory) epoll_ctl(5 EPOLL_CTL_ADD 9 {EPOLLIN|EPOLLONESHOT {u32=9 u64=4398046511113}}) = 0 accept(8 0x7ff3440008c0 0x7ff35c99f4d0) = -1 EAGAIN (Resource temporarily unavailable) epoll_ctl(5 EPOLL_CTL_MOD 8 {EPOLLIN|EPOLLONESHOT {u32=8 u64=8}}) = 0

3 客户端发送数据

22704(BOSS 线程(Proactor))处理连接

epoll_wait(5 [{EPOLLIN {u32=9 u64=4294967305}}] 512 -1) = 1 ##数据读出 read(9 "daojian111\r\n" 1024) = 12 ##数据处理交给其他线程,这里由于线程池为空,需要先clone线程 clone(child_stack=0x7ff35c99ffb0 flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID parent_tidptr=0x7ff35c9a09d0 tls=0x7ff35c9a0700 child_tidptr=0x7ff35c9a09d0) = 26532

复制线程处理,线程号26532

write(1 "pool-1-thread-2-10received : dao"... 41) = 41 write(1 "\n" 1) accept(8 0x7f11c400b5f0 0x7f11f42fd4d0) = -1 EAGAIN (Resource temporarily unavailable) epoll_ctl(5 EPOLL_CTL_MOD 8 {EPOLLIN|EPOLLONESHOT {u32=8 u64=8}}) = 0

4 总结

  • 从系统调用角度,Java的AIO事实上是以多路复用(Linux上为epoll)等同步IO为基础,自行实现了异步事件分发。
  • BOSS Thread负责处理连接,并分发事件。
  • WORKER Thread只负责从BOSS接收的事件执行,不负责任何网络事件监听。

java io是什么模式(从操作系统层面分析)(6)

5 优缺点

优点

相比于前面的BIO、NIO,AIO已经封装好了任务调度,使用时只需关心任务处理。

缺点

  • 事件处理完全由Thread Pool完成,对于同一个channel的多个事件可能会出现并发问题。
  • 相比netty,buffer API不友好容易出错;编解码工作复杂。

相关链接
https://man7.org/linux/man-pages/man2/poll.2.html
https://man7.org/linux/man-pages/man2/recvfrom.2.html
https://man7.org/linux/man-pages/man2/epoll_create.2.html
https://man7.org/linux/man-pages/man2/epoll_ctl.2.html

作者 | 道坚

原文链接:https://developer.aliyun.com/article/784933?utm_content=g_1000280864

本文为阿里云原创内容,未经允许不得转载。

猜您喜欢: