Error message here!

Hide Error message here!

忘记密码?

Error message here!

请输入正确邮箱

Hide Error message here!

密码丢失?请输入您的电子邮件地址。您将收到一个重设密码链接。

Error message here!

返回登录

Close

netty源码解解析(4.0)-14 Channel NIO实现:读取数据

自带buff 2019-02-27 00:08:00 阅读数:176 评论数:0 点赞数:0 收藏数:0

 本章分析Nio Channel的数据读取功能的实现。

Channel读取数据需要Channel和ChannelHandler配合使用,netty设计数据读取功能包括三个要素:Channel, EventLoop和ChannelHandler。Channel有个read方法,这个方法不会直接读取数据,它的作用是通知持有当前channel的eventLoop可以从这个这个channel读取数据了,这个方法被调用之后eventLoop会在channel有数据可读的时候从channel读出数据然后把数据放在channelRead事件中交给ChannelInboundHandler的channelRead方法处理,当eventLoop发现channel中暂时没时间可读会触发一个channelReadComplete事件。

 

read: Nio Channel通知eventLoop开始读数据

channel read方法的调用栈:1 io.netty.channel.AbstractChannel/#read2 io.netty.channel.DefaultChannelPipeline/#read3 io.netty.channel.AbstractChannelHandlerContext/#read4 io.netty.channel.AbstractChannelHandlerContext/#invokeRead5 io.netty.channel.DefaultChannelPipeline.HeadContext/#read6 io.netty.channel.AbstractChannel.AbstractUnsafe/#beginRead7 io.netty.channel.nio.AbstractNioChannel/#doBeginRead

 

  调用channel的read的方法,会触发read事件,通过pipeline调用AbstractChannel unsafe的beginRead方法,这个方法的语义是通知eventLoop可以从channel读数据了,但他没有实现具体功能,把具体功能留给doBeginRead实现。doBeginRead在AbstractChannel中定义,它是一个抽象方法。AbstractNioChannel实现了这个方法:1 @Override2 protected void doBeginRead() throwsException {3 //Channel.read() or ChannelHandlerContext.read() was called 4 if(inputShutdown) {5 return;6 }7 8 final SelectionKey selectionKey = this.selectionKey;9 if (!selectionKey.isValid()) {10 return;11 }12 13 readPending = true;14 15 final int interestOps =selectionKey.interestOps();16 if ((interestOps & readInterestOp) == 0) {17 selectionKey.interestOps(interestOps | readInterestOp);18 }19 }

这里的doBeginRead实现,只有第17行是核心代码:把readInterestOps保存是的read操作标志添加到SelectableChannel的SelectionKey中。这里的readInterestOps是一个类的属性,在AbstractNioChannel中,它没有明确的定义,只有一个抽象的定义:NIO中的一个可以可以当成read操作的的标志。在NIO中可以当成read的有SelectionKey.OPREAD和SelectionKey.OPACCEPT。readInterestOps在AbstractNioChannel的构造方法中使用传入的参数初始化,子类就可以根据需要确定interestOps的具体含义。

设置好beginRead之后,NioEventLoop就可以使用Selector得到检测到channel上的read事件了,下面是NioEventLoop中处理read事件的代码:1 //io.netty.channel.nio.NioEventLoop/#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel) 2 if ((readyOps & (SelectionKey.OPREAD | SelectionKey.OPACCEPT)) != 0 || readyOps == 0) {3 unsafe.read();4 }

这里调用了unsafe的read的方法,在Channel的Unsafe中并没有定义这个方法,它在io.netty.channel.nio.AbstractNioChannel.NioUnsafe中定义,在io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe和io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe中有两个不同的实现。这两个实现的区别是:NioMessageUnsafe.read是把从channel中读出的数据转换成Object, NioByteUnsafe.read是从channel中读出byte数据流。下面来详解分析这两种实现。

AbstractNioChannel.NioUnsafe.read实现:从channel读取数据

netty在NIO Channel的设计上,把读数据设计成独立的抽象层。之所以这样设计有两个方面的原因:

  1. 在NIO中,三中不同类型的Channel读取的数据类型是不一样的,NioServerSocketChannel读出的是一个新建的NioSockeChannel, NioSocketChannel读出的byte数据流,NioDatagramChannel读出是数据报。
  2. NIO三种Channel都运行在非阻塞模式下,相比于阻塞模式,非阻塞模式下读数据要处理的问题要复杂的多。使用Selector和非阻塞模式被动地读取数据,需要处理连接断开和socket异常,由于Selector使用的是边缘触发模式,一次read调用务必要把已经在socket recvbuffer中的数据全部读出来,否则可以导致数据丢失或数据接收不及时。把read独立出来处理读取数据的复杂性,代码结构会比较清晰。

接下来开始详细分析NioUnsafe read方法的两种不同的实现。

 

io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read实现: 从channel中读出Object

这个实现是主要功能是调用doReadMessages方法,从channel中读出Object消息,具体的类型这里没有限制,doReadMessages是一个抽象方法,留给子类实现, 下面是read方法的实现:1 //io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe 2 @Override3 public voidread() {4 asserteventLoop().inEventLoop();5 final ChannelConfig config =config();6 if (!config.isAutoRead() && !isReadPending()) {7 //ChannelConfig.setAutoRead(false) was called in the meantime 8 removeReadOp();9 return;10 }11 12 final int maxMessagesPerRead =config.getMaxMessagesPerRead();13 final ChannelPipeline pipeline =pipeline();14 boolean closed = false;15 Throwable exception = null;16 try{17 try{18 for(;;) {19 int localRead =doReadMessages(readBuf);20 if (localRead == 0) {21 break;22 }23 if (localRead < 0) {24 closed = true;25 break;26 }27 28 //stop reading and remove op 29 if (!config.isAutoRead()) {30 break;31 }32 33 if (readBuf.size() >=maxMessagesPerRead) {34 break;35 }36 }37 } catch(Throwable t) {38 exception =t;39 }40 setReadPending(false);41 int size =readBuf.size();42 for (int i = 0; i < size; i ++) {43 pipeline.fireChannelRead(readBuf.get(i));44 }45 46 readBuf.clear();47 pipeline.fireChannelReadComplete();48 49 if (exception != null) {50 closed =closeOnReadError(exception);51 52 pipeline.fireExceptionCaught(exception);53 }54 55 if(closed) {56 if(isOpen()) {57 close(voidPromise());58 }59 }60 } finally{61 //Check if there is a readPending which was not processed yet.62 //This could be for two reasons:63 /// The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method64 /// The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method65 // 66 //Seehttps://github.com/netty/netty/issues/2254 67 if (!config.isAutoRead() && !isReadPending()) {68 removeReadOp();69 }70 }71 }

 

第12行,得到一次循环读取消息的最大数量maxMessagesPerRead,这个配置的默认值因不同的channel类型而不同,io.netty.channel.ChannelConfig提供了setMaxMessagesPerRead方法设置这个配置的值。调节这个值的大小可以影响I/O操作在eventLoop线程分配的执行时间,它的值越大,I/O操作站的时间越大。

18-36行,使用doReadMessages读取消息,并把消息放到readBuf中,readBuf是List类型。20,21行,没有可读的数据结束循环。23-25行,socket已经关闭。33,34行,readBuf中的消息数量已经超过限制,跳出循环。

41-47行,对readBuf中的每一个消息触发一次channelRead事件,然后清空readBuf, 触发channelReadComplete事件。

49-53行,处理异常。

55-59行,处理channel正常关闭。

doReadMessages方法有两个实现。一个是io.netty.channel.socket.nio.NioServerSocketChannel/#doReadMessages,这个实现中读出的消息是NioSocketChannel。另一个是io.netty.channel.socket.nio.NioDatagramChannel/#doReadMessages,这个实现中读出的消息时DatagramPacket。

io.netty.channel.socket.nio.NioServerSocketChannel/#doReadMessages实现代码:1 @Override2 protected int doReadMessages(List buf) throwsException {3 SocketChannel ch =SocketUtils.accept(javaChannel());4 5 try{6 if (ch != null) {7 buf.add(new NioSocketChannel(this, ch));8 return 1;9 }10 } catch(Throwable t) {11 logger.warn("Failed to create a new channel from an accepted socket.", t);12 13 try{14 ch.close();15 } catch(Throwable t2) {16 logger.warn("Failed to close a socket.", t2);17 }18 }19 20 return 0;21 }

第3行, 使用accept方法得到一个新的SocketChannel。

7,8行,使用新的SocketChannel创建NioSocketChannel,并把它放到buf中。

11-20行,出现异常,关闭这个socket, 最后返回0.

io.netty.channel.socket.nio.NioDatagramChannel/#doReadMessages实现代码:1 @Override2 protected int doReadMessages(List buf) throwsException {3 DatagramChannel ch =javaChannel();4 DatagramChannelConfig config =config();5 RecvByteBufAllocator.Handle allocHandle = this.allocHandle;6 if (allocHandle == null) {7 this.allocHandle = allocHandle =config.getRecvByteBufAllocator().newHandle();8 }9 ByteBuf data =allocHandle.allocate(config.getAllocator());10 boolean free = true;11 try{12 ByteBuffer nioData =data.internalNioBuffer(data.writerIndex(), data.writableBytes());13 int pos =nioData.position();14 InetSocketAddress remoteAddress =(InetSocketAddress) ch.receive(nioData);15 if (remoteAddress == null) {16 return 0;17 }18 19 int readBytes = nioData.position() -pos;20 data.writerIndex(data.writerIndex() +readBytes);21 allocHandle.record(readBytes);22 23 buf.add(newDatagramPacket(data, localAddress(), remoteAddress));24 free = false;25 return 1;26 } catch(Throwable cause) {27 PlatformDependent.throwException(cause);28 return -1;29 } finally{30 if(free) {31 data.release();32 }33 }34 }

 

4-12行,得到接收数据的缓冲区data。

   13-21行,从socket收到一个数据包,这个数据报包含两部分: data中的二进制数据和发送端的地址remoteAddress(第14行)。然后设置data中的数据长度。

23-25行,把数据报转换成DatagramPacket类型放到buf中返回。

io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe/#read实现:从channel中读byte流

这个实现的主要功能是调用doReadBytes读取byte流。doReadBytes是一个抽象方法,留给子类实现。下面是这个read的实现。1 @Override2 public final voidread() {3 final ChannelConfig config =config();4 if (!config.isAutoRead() && !isReadPending()) {5 //ChannelConfig.setAutoRead(false) was called in the meantime 6 removeReadOp();7 return;8 }9 10 final ChannelPipeline pipeline =pipeline();11 final ByteBufAllocator allocator =config.getAllocator();12 final int maxMessagesPerRead =config.getMaxMessagesPerRead();13 RecvByteBufAllocator.Handle allocHandle = this.allocHandle;14 if (allocHandle == null) {15 this.allocHandle = allocHandle =config.getRecvByteBufAllocator().newHandle();16 }17 18 ByteBuf byteBuf = null;19 int messages = 0;20 boolean close = false;21 try{22 int totalReadAmount = 0;23 boolean readPendingReset = false;24 do{25 byteBuf =allocHandle.allocate(allocator);26 int writable =byteBuf.writableBytes();27 int localReadAmount =doReadBytes(byteBuf);28 if (localReadAmount <= 0) {29 //not was read release the buffer 30 byteBuf.release();31 byteBuf = null;32 close = localReadAmount < 0;33 if(close) {34 //There is nothing left to read as we received an EOF. 35 setReadPending(false);36 }37 break;38 }39 if (!readPendingReset) {40 readPendingReset = true;41 setReadPending(false);42 }43 pipeline.fireChannelRead(byteBuf);44 byteBuf = null;45 46 if (totalReadAmount >= Integer.MAXVALUE -localReadAmount) {47 //Avoid overflow. 48 totalReadAmount =Integer.MAXVALUE;49 break;50 }51 52 totalReadAmount +=localReadAmount;53 54 //stop reading 55 if (!config.isAutoRead()) {56 break;57 }58 59 if (localReadAmount The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method79 /// The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method80 // 81 //Seehttps://github.com/netty/netty/issues/2254 82 if (!config.isAutoRead() && !isReadPending()) {83 removeReadOp();84 }85 }86 }

10-16行,得到一个接受缓冲区的分配器和分配器的的专用handle。这两个东西的功能是高效的创建大量的接接收数据缓冲区,具体原理和实现会在后面buffer相关章节中详细分析,这里暂时略过。

24-64行,这是一个使用doReadBytes读取数据并触发channelRead事件的循环。25-27行,得到一个接受数据的缓冲区,然后从socket中读取数据。28-38行,没有数据可读了,或socket已经断开了。43行,正确收到了数据,触发channelRead事件。59-62行,读出的数据小于缓冲区的长度,表示没有socket中暂时没有数据可读了。 64行,读取次数大于上限配置,跳出。

66行,读循环完成,触发channelReadComplete事件。

69-72, 处理socket正常关闭。

74,83行,处理其他异常。

doReadBytes只有一个实现://io.netty.channel.socket.nio.NioSocketChannel/#doWriteBytes @Overrideprotected int doWriteBytes(ByteBuf buf) throwsException {final int expectedWrittenBytes =buf.readableBytes();returnbuf.readBytes(javaChannel(), expectedWrittenBytes); }

这个实现非常简单,使用ByteBuf的能力从SocketChannel中读取byte流。

 

 

版权声明
本文为[自带buff]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/brandonli/p/10278285.html

编程之旅,人生之路,不止于编程,还有诗和远方。
阅代码原理,看框架知识,学企业实践;
赏诗词,读日记,踏人生之路,观世界之行;

支付宝红包,每日可领