java网络编程实战 - 原生NIO非阻塞式通讯网络编程实战

  |   0浏览

前言

上次提到要改进我们的RPC框架,这周花时间研究一下JDK提供给我们的原生NIO非阻塞式网络编程思想。NIO 库是在 JDK 1.4 中引入的。NIO 弥补了原来的 I/O 的不足,它在标准 Java 代码中提供了高速的、面向块的 I/O。

image.png

BIO与NIO的主要区别

1. 面向流和面向缓冲

java NIO和BIO之间第一个最大的区别是,BIO是面向流的,NIO是面向缓冲区的。 Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。 Java NIO的缓冲导向方法略有不同。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是,还需要检查是否该缓冲区中包含所有需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。

2. 阻塞与非阻塞

Java BIO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。

Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。

3. NIO特有的Selector选择器机制

Java NIO的选择器允许一个单独的线程来监视多个输入通道,你可以注册多个通道使用一个选择器,然后使用一个单独的线程来“选择”通道:这些通道里已经有可以处理的输入,或者选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道。

今天我们就基于以上的理解,实现一个端对端的非阻塞式IO的网络编程。

实战设计

客户端部分

/**

  • @author andychen https://blog.51cto.com/14815984

  • @description:NIO客户端核心处理器
    */
    public class NioClientHandler implements Runnable {
    //服务端主机
    private final String host;
    //服务端口
    private final int port;
    /**定义NIO选择器:用于注册和监听事件

    • 选择监听的事件类型: OP_READ 读事件 / OP_WRITE 写事件
    • OP_CONNECT 客户端连接事件 / OP_ACCEPT 服务端接收通道连接事件
      */
      private Selector selector = null;
      //定义客户端连接通道
      private SocketChannel channel = null;
      //运行状态是否被激活
      private volatile boolean activated=false;
      public NioClientHandler(String host, int port) {
      this.port = port;
      this.host = host;
      this.init();
      }

    /**

    • 处理器初始化
    • 负责建立连接准备工作
      /
      private void init(){
      try {
      //创建并打开选择器
      this.selector = Selector.open();
      //建立并打开监听通道
      this.channel = SocketChannel.open();
      /
      *
      • 设置通道通讯模式为非阻塞,NIO默认为阻塞式的
        */
        this.channel.configureBlocking(false);
        //激活运行状态
        this.activated = true;
        } catch (IOException e) {
        e.printStackTrace();
        this.stop();
        }
        }

    /**

    • 连接服务器
      /
      private void connect(){
      try {
      /
      *
      • 连接服务端:因为之前设置了通讯模式为非阻塞
      • 这里会立即返回TCP握手是否已建立
        */
        if(this.channel.connect(new InetSocketAddress(this.host, this.port))){
        //连接建立后,在通道上注册读事件关注,客户端一接收到数据立即触发处理
        this.channel.register(this.selector, SelectionKey.OP_READ);
        }
        else{
        //若连接握手未建立,则在通道上继续关注连接事件,一旦连接建立继续进行后续的处理逻辑
        this.channel.register(this.selector, SelectionKey.OP_CONNECT);
        }
        } catch (IOException e) {
        e.printStackTrace();
        this.stop();
        }
        }

    /**

    • 选择器事件迭代处理
    • @param keys 选择器事件KEY
      */
      private void eventIterator(Set keys){
      SelectionKey key = null;
      //这里采用迭代器,因为需要迭代时对key进行移除操作
      Iterator it = keys.iterator();
      while (it.hasNext()){
      key = it.next();
      //这里先移除事件key,避免多次处理
      it.remove();
      //处理迭代事件
      this.proccessEvent(key);
      }
      }

    /**

    • 处理发生的事件
    • @param key 选择器事件KEY
      */
      private void proccessEvent(SelectionKey key){
      //只对有效的事件类型进行处理
      if(key.isValid()){
      try {
      //在事件通道上处理
      SocketChannel socketChannel = (SocketChannel) key.channel();
      /**处理连接就绪事件
      • /
        if(key.isConnectable()){
        //检测连接是否完成,避免发生导致NotYetConnectedException异常
        if(socketChannel.finishConnect()){
        System.out.println("Has completed connection with server..");
        /
        *
      • 在通道上关注读事件,NO的写事件一般不特别关注,
      • 原因:写缓冲区大部分时间被认为是空闲的,会频繁被选择器选择(会浪费CPU资源),
      •   所以不应该频繁被注册;
        
      • 只有在写的数据超过写缓冲区可用空间时,把一部分数据刷出缓冲区后,
      • 有空间时再通知应用程序进行写;
      • 且应用程序写完后,应立即关闭写事件
        /
        socketChannel.register(this.selector, SelectionKey.OP_READ);
        }else{//这里若连接仍未建立一般视为网络或其他原因,暂时退出
        this.stop();
        }
        }
        /
        *
      • 处理读事件
        /
        if(key.isReadable()){
        //开辟内存缓冲区,这里用JVM堆内存
        ByteBuffer buffer = ByteBuffer.allocate(Constant.BUF_SIZE);
        //将通道中的数据读到缓冲区
        int length = socketChannel.read(buffer);
        if(0 < length){
        /
        *
      • 进行读写转换,NIO固定范式
        */
        buffer.flip();
        //获取buffer可用空间
        int size = buffer.remaining();
        byte[] bytes = new byte[size];
        //读Buffer
        buffer.get(bytes);
        //获取缓冲区数据
        String result = new String(bytes,"utf-8");
        System.out.println("Recevied server message: "+result);
        }else if(0 > length){
        //取消关注当前事件,关闭通道
        key.cancel();
        socketChannel.close();
        }
        }
        } catch (Exception e) {
        key.cancel();
        if(null != key.channel()){
        try {
        key.channel().close();
        } catch (IOException ex) {
        ex.printStackTrace();
        }
        }
        e.printStackTrace();
        }
        }
        }

    /**

    • 写数据到对端
    • @param data
      */
      public void write(String data){
      try {
      byte[] bytes = data.getBytes();
      ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
      //将数据放入写缓冲区
      buffer.put(bytes);
      buffer.flip();
      this.channel.write(buffer);
      } catch (IOException e) {
      e.printStackTrace();
      }
      }

    /**

    • 停止运行
      */
      public void stop(){
      this.activated = false;
      System.exit(-1);
      }

    /**

    • 客户端通讯业务核实现
      /
      @Override
      public void run() {
      //建立服务器连接
      this.connect();
      //持续监听各种事件的发生
      while (this.activated){
      try {
      //监听事件是否发生,若发生直接返回;反之阻塞至事件发生
      this.selector.select();
      } catch (IOException e) {
      e.printStackTrace();
      this.stop();
      }
      //获取发生事件的类型
      Set keys = this.selector.selectedKeys();
      //迭代处理事件
      this.eventIterator(keys);
      }
      //关闭选择器
      if(null != this.selector){
      try {
      this.selector.close();
      } catch (IOException e) {
      e.printStackTrace();
      }
      }
      this.stop();
      }
      }
      /
      *
  • @author andychen https://blog.51cto.com/14815984

  • @description:NIO客户端启动器
    */
    public class NioClientStarter {
    private static NioClientHandler clientHandler = null;

    /启动运行客户端/
    public static void main(String[] args) {
    try {
    clientHandler = new NioClientHandler(Constant.SERV_HOST, Constant.SERV_PORT);
    new Thread(clientHandler).start();
    } catch (Exception e) {
    e.printStackTrace();
    }
    /**

    • 在控制台发实时数据到对端
      */
      Scanner scanner = new Scanner(System.in);
      while (true){
      String data = scanner.next();
      if(null != data && !"".equals(data)){
      clientHandler.write(data);
      }
      }
      }
      }

服务端部分

/**

  • @author andychen https://blog.51cto.com/14815984

  • @description:NIO服务端核心处理器
    /
    public class NioServerHandler implements Runnable{
    private final int port;
    //定义选择器
    private Selector selector = null;
    /
    *

    • 定义服务端通道: 与客户端类似的思路
      */
      private ServerSocketChannel channel = null;
      //服务器运行是否被激活
      private volatile boolean activated = false;
      public NioServerHandler(int port) {
      this.port = port;
      this.init();
      }

    /**

    • 初始化处理器
    • 负责做好运行监听和接收之前的准备
      /
      private void init(){
      try {
      //创建并打开选择器
      this.selector = Selector.open();
      //创建并打开监听通道
      this.channel = ServerSocketChannel.open();
      /
      *
      • 设置通道通讯模式为非阻塞(NIO默认为阻塞)
        /
        this.channel.configureBlocking(false);
        //绑定监听的服务端口
        this.channel.socket().bind(new InetSocketAddress(this.port));
        /
        *
      • 注册在服务端通道上,首先关注的事件
        */
        this.channel.register(this.selector, SelectionKey.OP_ACCEPT);
        //设置运行状态激活
        this.activated = true;
        } catch (IOException e) {
        e.printStackTrace();
        this.stop();
        }
        }

    /**

    • 停止服务
      */
      public void stop(){
      this.activated = false;
      try {
      //关闭选择器
      if(null != this.selector){
      if(this.selector.isOpen()){
      this.selector.close();
      }
      this.selector = null;
      }
      //关闭通道
      if(null != this.channel){
      if(this.channel.isOpen()){
      this.channel.close();
      }
      this.channel = null;
      }
      } catch (IOException e) {
      e.printStackTrace();
      }
      System.exit(-1);
      }

    /**

    • 在迭代处理发生的事件
    • @param keys 发生的事件类型
      /
      private void eventIterator(Set keys){
      //SelectionKey key = null;
      Iterator it = keys.iterator();
      while (it.hasNext()){
      SelectionKey key = it.next();
      /
      *
      • 这里先从迭代器移除,避免后面重复执行
        */
        it.remove();
        //处理事件
        this.proccessEvent(key);
        }
        }

    /**
    *

    • @param key 选择执行的事件KEY
      /
      private void proccessEvent(SelectionKey key){
      //只对有效的事件KEY执行处理
      if(key.isValid()){
      try {
      /
      *
      • 处理通道接收数据事件
        /
        if(key.isAcceptable()){
        /
        *
      • 注意这里接收事件的通道是服务端通道
        /
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        //接收客户端Socket
        SocketChannel channel = serverChannel.accept();
        //设置其为非阻塞
        channel.configureBlocking(false);
        //然后注册此通道的读事件
        channel.register(this.selector, SelectionKey.OP_READ);
        System.out.println("Build connection with client..");
        }
        /
        *
      • 处理读事件
        /
        if(key.isReadable()){
        System.out.println("Reading client data...");
        SocketChannel channel = (SocketChannel) key.channel();
        //开辟内存空间,接收数据
        ByteBuffer buffer = ByteBuffer.allocate(Constant.BUF_SIZE);
        //将数据读入缓冲区
        int length = channel.read(buffer);
        if(0 < length){
        //读写切换
        buffer.flip();
        //更具缓冲区数据建立转换的字节数组
        byte[] bytes = new byte[buffer.remaining()];
        //从缓冲区读取字节数据
        buffer.get(bytes);
        //解码数据
        String data = new String(bytes, "utf-8");
        System.out.println("Recevied data: "+data);
        //向对端发送接收应答
        String answer = "Server has recevied data:"+data;
        this.reply(channel, answer);
        }else if(0 > length){
        //取消处理的事件
        key.cancel();
        channel.close();
        }
        }
        /
        *
      • 处理写事件
        */
        if(key.isWritable()){
        SocketChannel channel = (SocketChannel) key.channel();
        //拿到写事件的buffer
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        //若buffer中有数据,则刷到对端
        if(buffer.hasRemaining()){
        int length = channel.write(buffer);
        System.out.println("Write data "+length+" byte to client.");
        }else{
        //若没有数据,则继续监听读事件
        key.interestOps(SelectionKey.OP_READ);
        }
        }
        } catch (IOException e) {
        key.cancel();
        e.printStackTrace();
        }
        }
        }

    /**

    • 应答对端
    • @param msg 应答消息
      /
      private void reply(SocketChannel channel, String msg){
      //消息编码
      byte[] bytes = msg.getBytes();
      //开启写缓冲区
      ByteBuffer buffer = ByteBuffer.allocate(Constant.BUF_SIZE);
      //将数据写入缓冲区
      buffer.put(bytes);
      //切换到读事件
      buffer.flip();
      /
      *
      • 这里为了不出现写空或写溢出缓冲区情况,建立写事件监听同时保留之前的读监听
      • 作为监听的附件传入写操作的buffer
        /
        try {
        channel.register(this.selector, SelectionKey.OP_WRITE |SelectionKey.OP_READ, buffer);
        } catch (ClosedChannelException e) {
        e.printStackTrace();
        }
        }
        /
        *
    • 服务端监听运行核心业务实现
      /
      @Override
      public void run() {
      while (this.activated){
      try {
      /
      *
      • 运行到此方法阻塞,直到有事件发生再返回
      • /
        this.selector.select();
        //获取被监听的事件
        Set keys = this.selector.selectedKeys();
        //在迭代器中,处理不同的事件
        this.eventIterator(keys);
        } catch (IOException e) {
        e.printStackTrace();
        this.stop();
        }
        }
        }
        }
        /
        *
  • @author andychen https://blog.51cto.com/14815984

  • @description:NIO网络编程服务端启动类
    */
    public class NioServerStart {

    /**

    • 运行服务端监听
    • @param args
      */
      public static void main(String[] args) {
      String serverTag = "server: "+Constant.SERV_PORT;
      NioServerHandler serverHandler = null;
      try {
      serverHandler = new NioServerHandler(Constant.SERV_PORT);
      new Thread(serverHandler, serverTag).start();
      System.out.println("Starting "+serverTag+" listening...");
      } catch (Exception e) {
      e.printStackTrace();
      if(null != serverHandler){
      serverHandler.stop();
      }
      }
      }
      }

多次验证结果

image.png

image.png

总结

通过以上的实战,我们看到NIO网络编程实现比BIO稍微要复杂一些。面向缓冲的机制确实比面向流的机制要灵活很多;服务运行的体验也比阻塞式IO更加流畅;独有的选择器机制也让NIO可以支撑较大并发数,但学习和开发的成本稍微高一些,项目当中可以有选择地使用。

目前网络编程这块用得比较多的优秀IO框架非Netty莫属了,很多优秀的RPC框架的底层也基于Netty扩展和开发。下次我们就顺带给大家展示一下Netty的网络编程之美。