Error message here!

Hide Error message here!

忘记密码?

Error message here!

请输入正确邮箱

Hide Error message here!

密码丢失?请输入您的电子邮件地址。您将收到一个重设密码链接。

Error message here!

返回登录

Close

netty源码解解析(4.0)-12 Channel NIO实现:channel初始化

自带buff 2019-01-22 15:47:00 阅读数:188 评论数:0 点赞数:0 收藏数:0

创建一个channel实例,并把它register到eventLoopGroup中之后,这个channel然后处于inactive状态,仍然是不可用的。只有在bind或connect方法调用成功之后才能正常。因此bind或connect算是channel初始化的最后一步,本章这就重点分析这两个功能的实现。

接下来的代码分析如果没有特别说明,都是以NioSocketChannel为例。

 

bind实现

bind方法的调用栈如下:

io.netty.channel.AbstractChannel#bind(java.net.SocketAddress)
io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress)
io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress) 
io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise) io.netty.channel.AbstractChannelHandlerContext#invokeBind io.netty.channel.DefaultChannelPipeline.HeadContext#bind io.netty.channel.AbstractChannel.AbstractUnsafe#bind io.netty.channel.socket.nio.NioSocketChannel#doBind io.netty.channel.socket.nio.NioSocketChannel#doBind0

为了能简单明了地展示调用关系,这个调用栈忽略了一些调用。可能有多个AbstractChannelHandlerContext的方法在不同的线程中被调用。以后在描述调用栈时也会忽略这一点,不再赘述。

io.netty.channel.AbstractChannel.AbstractUnsafe#bind执行了主要的bind逻辑,它会调用doBind, 然后在channel的状态从inactive变成active,就调用pipline的fireChannelActive方法触发channelActives事件。doBind是io.netty.channel.AbstractChannel定义的抽象方法。NioSocketChannel只需要实现这个方法,整个bind功能就完整了。

 1  @Override
2 protected void doBind(SocketAddress localAddress) throws Exception { 3  doBind0(localAddress); 4  } 5 private void doBind0(SocketAddress localAddress) throws Exception { 6 if (PlatformDependent.javaVersion() >= 7) { 7  SocketUtils.bind(javaChannel(), localAddress); 8 } else { 9  SocketUtils.bind(javaChannel().socket(), localAddress); 10  } 11 }

  SocketUtils封装了通过AccessController调用JDK的socket API接口,事实上还是调用Socket或SocketChannel的bind方法。Nio的三个Channel类实现doBind的代码几乎一样。

 

connect实现

connect的调用栈如下:

io.netty.channel.AbstractChannel#connect(java.net.SocketAddress)
io.netty.channel.DefaultChannelPipeline#connect(java.net.SocketAddress)
io.netty.channel.AbstractChannelHandlerContext#connect(java.net.SocketAddress)
io.netty.channel.AbstractChannelHandlerContext#connect(java.net.SocketAddress, io.netty.channel.ChannelPromise)
io.netty.channel.AbstractChannelHandlerContext#connect(java.net.SocketAddress, java.net.SocketAddress, io.netty.channel.ChannelPromise)
io.netty.channel.AbstractChannelHandlerContext#invokeConnect
io.netty.channel.DefaultChannelPipeline.HeadContext#connect
io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
io.netty.channel.socket.nio.NioSocketChannel#doConnect

connect的主要逻辑在io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect中实现,它的流程是:

1. 调用doConnect方法,这个方法是AbstractNioChanne定义的抽象方法。

2. 如果doConnect成功,且channel的状态从inactive变成active,则调用pipeline的fireChannelActive方法触发channelActive事件。

3. 如果doConnection失败,调用close关闭channel。

io.netty.channel.socket.nio.NioSocketChannel#doConnect中是socket connect API的调用。下面是connect的关键代码。

 1 @Override
2 public final void connect( 3 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { 4 if (!promise.setUncancellable() || !ensureOpen(promise)) { 5 return; 6  } 7 8 try { 9 if (connectPromise != null) { 10 // Already a connect in process. 11 throw new ConnectionPendingException(); 12  } 13 14 boolean wasActive = isActive(); 15 if (doConnect(remoteAddress, localAddress)) { 16  fulfillConnectPromise(promise, wasActive); 17 } else { 18 connectPromise = promise; 19 requestedRemoteAddress = remoteAddress; 20 21 // Schedule connect timeout. 22 int connectTimeoutMillis = config().getConnectTimeoutMillis(); 23 if (connectTimeoutMillis > 0) { 24 connectTimeoutFuture = eventLoop().schedule(new Runnable() { 25  @Override 26 public void run() { 27 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; 28 ConnectTimeoutException cause = 29 new ConnectTimeoutException("connection timed out: " + remoteAddress); 30 if (connectPromise != null && connectPromise.tryFailure(cause)) { 31 close(voidPromise()); 32  } 33  } 34  }, connectTimeoutMillis, TimeUnit.MILLISECONDS); 35  } 36 37 promise.addListener(new ChannelFutureListener() { 38  @Override 39 public void operationComplete(ChannelFuture future) throws Exception { 40 if (future.isCancelled()) { 41 if (connectTimeoutFuture != null) { 42 connectTimeoutFuture.cancel(false); 43  } 44 connectPromise = null; 45 close(voidPromise()); 46  } 47  } 48  }); 49  } 50 } catch (Throwable t) { 51  promise.tryFailure(annotateConnectException(t, remoteAddress)); 52 closeIfClosed(); 53  } 54 } 55 56 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { 57 if (promise == null) { 58 return; 59  } 60 boolean active = isActive(); 61 boolean promiseSet = promise.trySuccess(); 62 63 if (!wasActive && active) { 64 pipeline().fireChannelActive(); 65  } 66 if (!promiseSet) { 67  close(voidPromise()); 68  } 69 }

第14,15行和整个fulfillConnectPromise方法处理正常流程。

第18-52行处理异常流程。代码虽然多,但总结起来就一句话: 设置promis返回错误,确保能够调用close方法

io.netty.channel.socket.nio.NioSocketChannel#doConnect实现和doBind实现类似:

 1 @Override
2 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { 3 if (localAddress != null) { 4  doBind0(localAddress); 5  } 6 7 boolean success = false; 8 try { 9 boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); 10 if (!connected) { 11  selectionKey().interestOps(SelectionKey.OP_CONNECT); 12  } 13 success = true; 14 return connected; 15 } finally { 16 if (!success) { 17  doClose(); 18  } 19  } 20 }

在第11行,注册OP_CONNECT事件。由于channel在初始化是被设置成非阻塞模式,connect方法可能返回false, 如果返回false表示connect操作没有完成,需要通过selector关注OP_CONNECT事件,把connect变成一个异步过程。只有异步调用io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#finishConnect之后,connect才算完成。finishConnect在eventLoop中被调用:

1 //io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
2 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
3 int ops = k.interestOps();
4 ops &= ~SelectionKey.OP_CONNECT;
5  k.interestOps(ops);
6  unsafe.finishConnect();
7 }

 finishConnection的实现如下:

 1 //io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#finishConnect
 2 @Override
 3 public final void finishConnect() {
 4 // Note this method is invoked by the event loop only if the connection attempt was
 5 // neither cancelled nor timed out.
 6
 7 assert eventLoop().inEventLoop();
 8 try {
 9 boolean wasActive = isActive();
10 doFinishConnect();
11 fulfillConnectPromise(connectPromise, wasActive);
12 } catch (Throwable t) {
13  fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
14 } finally {
15 // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
16 // See https://github.com/netty/netty/issues/1770
17 if (connectTimeoutFuture != null) {
18 connectTimeoutFuture.cancel(false);
19  }
20 connectPromise = null;
21  }
22 }
23
24 //io.netty.channel.socket.nio.NioSocketChannel#doFinishConnect
25 @Override
26 protected void doFinishConnect() throws Exception {
27 if (!javaChannel().finishConnect()) {
28 throw new Error();
29  }
30 }

9-11行是finishConnection的关键代码, 先调用doFinishConnect执行完成连接之后的操作,NioSocketChannel实现是检查连接是否真的已经完成(27-29行),然后调用fulfillConnectPromise触发事件,设置promise返回值。在前面分析netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect代码时,可以看到在doConnect调用成功以后会立即调用这个方法。这个方法被调用两次是为了确保channelActive事件一定会被触发一次。

 

localAddress,remoteAddress实现:得到channel的本地和远程地址

这个两个方法的实现几乎一样,这里只分析localAddress,它的调用栈如下:

1 io.netty.channel.AbstractChannel#localAddress
2 io.netty.channel.AbstractChannel.AbstractUnsafe#localAddress 3 io.netty.channel.socket.nio.NioSocketChannel#localAddress0

这个方法不会触发任何事件,因此没有通过pipline调用unsafe,它直接调用unsafe的方法:

 1 //io.netty.channel.AbstractChannel#localAddress
2 @Override
3 public SocketAddress localAddress() { 4 SocketAddress localAddress = this.localAddress; 5 if (localAddress == null) { 6 try { 7 this.localAddress = localAddress = unsafe().localAddress(); 8 } catch (Throwable t) { 9 // Sometimes fails on a closed socket in Windows. 10 return null; 11  } 12  } 13 return localAddress; 14 }

在第7行直接调用unsafe的locallAddress方法,这个方法在AbstractUnsafe中实现,它调用了localAddress0,这一个protected的抽象方法,在NioSocketChannel中的实现是:

1 @Override
2 protected SocketAddress localAddress0() { 3 return javaChannel().socket().getLocalSocketAddress(); 4 }

 

版权声明
本文为[自带buff]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/brandonli/p/10277841.html

编程之旅,人生之路,不止于编程,还有诗和远方。
阅代码原理,看框架知识,学企业实践;
赏诗词,读日记,踏人生之路,观世界之行;