您好!欢迎来到优码网

优码网

热门搜索: 直播    短视频   

Netty服务端的新连接接入源码解析

分类:技术分享 时间:2023-12-03 21:15 浏览:111
概述
一、源码寻找我们上一章节学到了,当存在IO事件的时候,Netty的反应堆线程会监听这些事件,然后进行处理,忘记的,可以回顾一下上一章节,,我们这里直接进入到:io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)这里的代码,我们昨天只是做了一个
内容

一、源码寻找

我们上一章节学到了,当存在IO事件的时候,Netty的反应堆线程会监听这些事件,然后进行处理,忘记的,可以回顾一下上一章节,,我们这里直接进入到:

io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)

这里的代码,我们昨天只是做了一个大概的分析,并没有深入的讲解,这一章节具体分析一下新连接的接入和Channel数据的读取。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
   final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
   .........忽略有效性验证................

   try {
       int readyOps = k.readyOps();
       ...................忽略其他 事件的处理逻辑...............
       if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
           unsafe.read();
       }
   } catch (CancelledKeyException ignored) {
       unsafe.close(unsafe.voidPromise());
   }
}

我们重点关注当事件存在读事件或者新连接接入事件的时候,才会进入到这一判断逻辑,那么由此可见,我们兵丁是要关注unsafe.read() 这一行代码了!

二、新连接接入源码分析

首选,我们声明一下,我们现在一直是按照服务端启动逻辑进行分析的,那么服务端逻辑分析,对照通道就是NioServerSocketChannel, 我们在创建NioServerSocketChannel的时候初始化过一个Unsafe对象,他是NioMessageUnsafe类型的,如果有疑问的同学可以回顾一下NioServerSocketChannel的初始化过程!

所以,必然,我们这里的unsafe.read();  就必然进入的是NioMessageUnsafe的read方法:

640.png


image-20210504115544746


@Override
public void read() {
   ..................忽略不必要代码............
   try {
       try {
           do {
               //读取数据  可能是数据  也可能是新连接
               int localRead = doReadMessages(readBuf);
               //如果没数据就跳出
               if (localRead == 0) {
                   break;
               }
               //-1 就是连接被关闭
               if (localRead < 0) {
                   closed = true;
                   break;
               }
//读取的连接数增加
               allocHandle.incMessagesRead(localRead);
               //每次默认读取最大16个连接  剩余的后续去读
           } while (allocHandle.continueReading());
       } catch (Throwable t) {
           exception = t;
       }
//获取连接数量或者读取的数据的数量
       int size = readBuf.size();
       for (int i = 0; i < size; i ++) {
           readPending = false;
           //开始传播channelRead属性
           pipeline.fireChannelRead(readBuf.get(i));
       }
       //清空缓冲区
       readBuf.clear();
       allocHandle.readComplete();
       //传播读取完成事件
       pipeline.fireChannelReadComplete();
       ...................忽略不必要代码.......................
   } finally {
       ...................忽略不必要代码.......................
   }
}

1. 读取新连接

int localRead = doReadMessages(readBuf);

这行代码是读取新连接的主要逻辑:

640 (1).png


image-20210504181959983


@Override
protected int doReadMessages(List<Object> buf) throws Exception {
   //调用JDK ServerSocketChannel获取新连接  JDK SocketChannel
   SocketChannel ch = SocketUtils.accept(javaChannel());
   try {
       if (ch != null) {
           //将客户端连接直接包装为 Netty的管道包装对象 NioSocketChannel
           buf.add(new NioSocketChannel(this, ch));
           return1;
       }
   } catch (Throwable t) {
       ................忽略异常处理.............
   }
   return0;
}

可以看到这里的逻辑比较简单,首先,Netty会使用先前保存的JDK 的原生的SocketChannel调用accept方法进行获取JDK新连接的管道!

注意此时获取的管道是JDK NIO的原生的管道对象,和Netty还没有关系,然后再将JDK NIO原生的Channel包装为Netty的NioSocketChannel放到缓冲区里面,注意此时放到缓冲区里面的对象就是Netty的包装对象了!包装完成之后直接返回 ,此时我们的缓冲区就存在数据量,这个数据是NioSocketChannel对象!

我们回到主线 read方法,当调用完doMessage方法之后开始就要处理这个NioSocketChannel了呀!

2. 处理新连接的管道

pipeline.fireChannelRead(readBuf.get(i));

从代码上看,可以看到,他是把刚刚我们读到的NioSocketChannel出来往下传播,这个代码是在通道内传播,我们前几节课讲过,此时Pipeline的结构是如图所示的数据结构:

640 (2).png

我们看如下代码:

@Override
public final ChannelPipeline fireChannelRead(Object msg) {
   AbstractChannelHandlerContext.invokeChannelRead(head, msg);
   returnthis;
}

他是从头节点开始传播的,channelRead的传播是自上而下的,所以就势必会传播到 ServerBootstrapAcceptor的逻辑中,所以我们进入到ServerBootstrapAcceptor#channelRead方法:

public void channelRead(ChannelHandlerContext ctx, Object msg) {
   //能进入都这一段逻辑的就必定是通道对象,因为只有服务端管道会存在该处理器
   final Channel child = (Channel) msg;
//向服务端管道追加childHandler   在构建ServerBootStrap的时候传入的
   child.pipeline().addLast(childHandler);
//在构建ServerBootStrap的时候传入的
   setChannelOptions(child, childOptions, logger);
//在构建ServerBootStrap的时候传入的
   setAttributes(child, childAttrs);
   try {
       //开始进行注册,注册逻辑同NioServerSocketChannel相同
       childGroup.register(child).addListener(new ChannelFutureListener() {
           @Override
           public void operationComplete(ChannelFuture future) throws Exception {
               if (!future.isSuccess()) {
                   forceClose(child, future.cause());
               }
           }
       });
   } catch (Throwable t) {
       forceClose(child, t);
   }
}

我们可以看到这个,先是向通道内注册一些客户端的参数,然后开始进行注册Channel, 注册的时候同NioServerSocketChannel的注册逻辑一样,只不过NioSocketChannel的关注事件是OP_READ事件,这里留一个作业,同学们可以自己分析一下NioSocketChannel的创建,分析一下它的注册逻辑与反应堆逻辑!

三、客户端数据读取源码解析

我们还是直接回到

io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
   final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
   .........忽略有效性验证................

   try {
       int readyOps = k.readyOps();
       ...................忽略其他 事件的处理逻辑...............
       if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
           unsafe.read();
       }
   } catch (CancelledKeyException ignored) {
       unsafe.close(unsafe.voidPromise());
   }
}

上面分析过,负责客户端新连接的通道是NioSocketChannel,大家自行分析一下内部逻辑,与NioServerSocketChannel的相似度90%!

1. 读取通道数据

NioSocketChannel的Unsafe是 NioByteUnsafe, 所以我们直接进入到:

io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read

查看他是如何进行数据读取的:

@Override
public final void read() {
   ........................忽略........................
   //获取客户端通道管道
   final ChannelPipeline pipeline = pipeline();
   //获取一个内存分配器
   final ByteBufAllocator allocator = config.getAllocator();
   final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
   allocHandle.reset(config);

   ByteBuf byteBuf = null;
   boolean close = false;
   try {
       do {
           //分配一个ByteBuf缓冲区
           byteBuf = allocHandle.allocate(allocator);
           //开始向缓冲区内写入通道的数据
           allocHandle.lastBytesRead(doReadBytes(byteBuf));
           if (allocHandle.lastBytesRead() <= 0) {
               // 如果没有读取到缓冲区,就释放缓冲区.
               byteBuf.release();
               byteBuf = null;
               //设置关闭标志
               close = allocHandle.lastBytesRead() < 0;
               if (close) {
                   // There is nothing left to read as we received an EOF.
                   readPending = false;
               }
               break;
           }

           allocHandle.incMessagesRead(1);
           readPending = false;
           //传播一次readChnnel事件
           pipeline.fireChannelRead(byteBuf);
           byteBuf = null;
       } while (allocHandle.continueReading());

       allocHandle.readComplete();
       //传播一次readChnnelComplete事件
       pipeline.fireChannelReadComplete();
//关闭通道
       if (close) {
           closeOnRead(pipeline);
       }
   } catch (Throwable t) {
       ....................忽略不必要代码.....................
   }
}

我们整体将逻辑分为以下几个步骤:

  1. 获取一个内存分配器,Netty中存在一个专门用于分配ByteBuf的内存分配器。这里是将它获取出来!
  2. 使用上一步获取的内存分配器分配一块缓冲区,用域后续的使用!
  3. 开始读取通道内的数据写入预先分配好的缓冲区!
  4. 读取数据完毕后,将带有数据的缓冲区调用pieline的传播方法进行数据的传播 readChannel方法!
  5. 当通道内的数据被处理完后,传播一次 channelReadComplete方法

四、总结

  1. 在Netty中NioServerSocketChannel与NioSocketChannel的处理中,对于数据的读取拥有不同的处理方法,NioServerSockerChannel主要用于处理新连接的,在初始化的时候就会在通道内加入一个新连接接入器ServerBootstrapAcceptor

    NioServerSocketChannel对象在读取到数据后将之包装为NioSocketChannel对象,然后使用ServerBootstrapAcceptor进行NioSocketChannel的注册与启动反应堆线程!

  2. 当通道内存在数据的时候,被NioSockerChannel探测到后,就会先分配一块缓冲区,将数据读取进预先分配好的缓冲区,然后进行数据的向下通道流转(事件触发)!

才疏学浅,如果文章中理解有误,欢迎大佬们私聊指正!欢迎入驻优码网,一起进步,一起学习!


评论
联系我们
全国客服热线: 400-8866-759 投诉建议 youmaserve@163.com 工作时间:10:00-22:00
联系客服
售前咨询 售后咨询 联系客服
400-8866-759
手机版

扫一扫进手机版
返回顶部