我们上一章节学到了,当存在IO事件的时候,Netty的反应堆线程会监听这些事件,然后进行处理,忘记的,可以回顾一下上一章节,,我们这里直接进入到:
io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
这里的代码,我们昨天只是做了一个大概的分析,并没有深入的讲解,这一章节具体分析一下新连接的接入和Channel数据的读取。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
.........忽略有效性验证................
try {
int readyOps = k.readyOps();
...................忽略其他 事件的处理逻辑...............
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
我们重点关注当事件存在读事件或者新连接接入事件的时候,才会进入到这一判断逻辑,那么由此可见,我们兵丁是要关注unsafe.read() 这一行代码了!
首选,我们声明一下,我们现在一直是按照服务端启动逻辑进行分析的,那么服务端逻辑分析,对照通道就是NioServerSocketChannel, 我们在创建NioServerSocketChannel的时候初始化过一个Unsafe对象,他是NioMessageUnsafe
类型的,如果有疑问的同学可以回顾一下NioServerSocketChannel的初始化过程!
所以,必然,我们这里的unsafe.read();
就必然进入的是NioMessageUnsafe
的read方法:
@Override
public void read() {
..................忽略不必要代码............
try {
try {
do {
//读取数据 可能是数据 也可能是新连接
int localRead = doReadMessages(readBuf);
//如果没数据就跳出
if (localRead == 0) {
break;
}
//-1 就是连接被关闭
if (localRead < 0) {
closed = true;
break;
}
//读取的连接数增加
allocHandle.incMessagesRead(localRead);
//每次默认读取最大16个连接 剩余的后续去读
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
//获取连接数量或者读取的数据的数量
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//开始传播channelRead属性
pipeline.fireChannelRead(readBuf.get(i));
}
//清空缓冲区
readBuf.clear();
allocHandle.readComplete();
//传播读取完成事件
pipeline.fireChannelReadComplete();
...................忽略不必要代码.......................
} finally {
...................忽略不必要代码.......................
}
}
int localRead = doReadMessages(readBuf);
这行代码是读取新连接的主要逻辑:
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
//调用JDK ServerSocketChannel获取新连接 JDK SocketChannel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
//将客户端连接直接包装为 Netty的管道包装对象 NioSocketChannel
buf.add(new NioSocketChannel(this, ch));
return1;
}
} catch (Throwable t) {
................忽略异常处理.............
}
return0;
}
可以看到这里的逻辑比较简单,首先,Netty会使用先前保存的JDK 的原生的SocketChannel调用accept方法进行获取JDK新连接的管道!
注意此时获取的管道是JDK NIO的原生的管道对象,和Netty还没有关系,然后再将JDK NIO原生的Channel包装为Netty的NioSocketChannel放到缓冲区里面,注意此时放到缓冲区里面的对象就是Netty的包装对象了!包装完成之后直接返回 ,此时我们的缓冲区就存在数据量,这个数据是NioSocketChannel对象!
我们回到主线 read方法,当调用完doMessage方法之后开始就要处理这个NioSocketChannel了呀!
pipeline.fireChannelRead(readBuf.get(i));
从代码上看,可以看到,他是把刚刚我们读到的NioSocketChannel出来往下传播,这个代码是在通道内传播,我们前几节课讲过,此时Pipeline的结构是如图所示的数据结构:
我们看如下代码:
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
returnthis;
}
他是从头节点开始传播的,channelRead的传播是自上而下的,所以就势必会传播到 ServerBootstrapAcceptor的逻辑中,所以我们进入到ServerBootstrapAcceptor#channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//能进入都这一段逻辑的就必定是通道对象,因为只有服务端管道会存在该处理器
final Channel child = (Channel) msg;
//向服务端管道追加childHandler 在构建ServerBootStrap的时候传入的
child.pipeline().addLast(childHandler);
//在构建ServerBootStrap的时候传入的
setChannelOptions(child, childOptions, logger);
//在构建ServerBootStrap的时候传入的
setAttributes(child, childAttrs);
try {
//开始进行注册,注册逻辑同NioServerSocketChannel相同
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
我们可以看到这个,先是向通道内注册一些客户端的参数,然后开始进行注册Channel, 注册的时候同NioServerSocketChannel的注册逻辑一样,只不过NioSocketChannel的关注事件是OP_READ事件,这里留一个作业,同学们可以自己分析一下NioSocketChannel的创建,分析一下它的注册逻辑与反应堆逻辑!
我们还是直接回到
io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
.........忽略有效性验证................
try {
int readyOps = k.readyOps();
...................忽略其他 事件的处理逻辑...............
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
上面分析过,负责客户端新连接的通道是NioSocketChannel,大家自行分析一下内部逻辑,与NioServerSocketChannel的相似度90%!
NioSocketChannel的Unsafe是 NioByteUnsafe
, 所以我们直接进入到:
io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
查看他是如何进行数据读取的:
@Override
public final void read() {
........................忽略........................
//获取客户端通道管道
final ChannelPipeline pipeline = pipeline();
//获取一个内存分配器
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
//分配一个ByteBuf缓冲区
byteBuf = allocHandle.allocate(allocator);
//开始向缓冲区内写入通道的数据
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// 如果没有读取到缓冲区,就释放缓冲区.
byteBuf.release();
byteBuf = null;
//设置关闭标志
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
//传播一次readChnnel事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
//传播一次readChnnelComplete事件
pipeline.fireChannelReadComplete();
//关闭通道
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
....................忽略不必要代码.....................
}
}
我们整体将逻辑分为以下几个步骤:
在Netty中NioServerSocketChannel与NioSocketChannel的处理中,对于数据的读取拥有不同的处理方法,NioServerSockerChannel主要用于处理新连接的,在初始化的时候就会在通道内加入一个新连接接入器ServerBootstrapAcceptor
!
NioServerSocketChannel对象在读取到数据后将之包装为NioSocketChannel对象,然后使用ServerBootstrapAcceptor进行NioSocketChannel的注册与启动反应堆线程!
当通道内存在数据的时候,被NioSockerChannel探测到后,就会先分配一块缓冲区,将数据读取进预先分配好的缓冲区,然后进行数据的向下通道流转(事件触发)!
才疏学浅,如果文章中理解有误,欢迎大佬们私聊指正!欢迎入驻优码网,一起进步,一起学习!