Netty 教程

#Netty [更新日期 2022/08/01] [字体 ··]

Netty 简介

基于Netty 版本:4.1.70.Final

当我们打开 Netty 官网,会看到一个赫然的标题。

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty 是一个异步事件驱动网络编程框架,用于快速开发可维护的高性能协议服务器和客户端。

从中我们提取一些重点词汇:异步、事件驱动、协议服务器。

没错,这就是 Netty 的特点,基于 Reactor 线程模型异步能力,基于 epoll 处理 IO 的能力,内置了许多协议处理器和编解码器,只需要简单编码就能实现高性能的服务器-客户端应用。

Netty 发展十几年,Java 生态许多高性能的中间件都是用了它,例如:Apache Flink、Apache Spark、Elastic Search 等,这说明了 Netty 是优良网络编程框架。

P.S. 你可以点击这个链接了解使用 Netty 的开源项目 https://netty.io/wiki/related-projects.html

Netty 的优点

那么 Netty 有哪些优点呢?

  1. 基于 Reactor 模型
  2. 使用直接内存避免拷贝开销
  3. 提供了多种协议的编解码器,开箱即用
  4. 提供了测试工具、内存泄露检测工具等

Netty 支持的协议

单拿协议处理器来说,Netty 实现了以下协议的处理器,做到了开箱即用!

协议说明
HTTP/HTTP2/Websocket/
DNS域名解析服务
MQTT消息队列遥测传输协议,低功耗的消息传输协议,常用在移动设备的消息推送
SMTP简单邮件协议
SOCKS一种用于穿透内网防火墙的协议,常实现代理服务器
SCTP一种基于 TCP、UDP 优点的传输协议,适用于音视频传输,WebRTC 技术就应用了这种协议
STOMP一种简单消息传输协议
UDT基于 UDP 的可靠传输协议,现在已经废弃

相信到这里你已经有些心动了,当你使用 Netty 实现一个 CS 程序时你会不由地感叹 Netty 的强大与便捷。

Netty 的核心组件

  • Channel:对应于网络的 Socket,是一个双向管道,即可以写数据又可以读数据。
  • EventLoop:事件循环,每一个事件循环仅与一个线程绑定,用来处理 epoll 事件。
  • Handler:所有的数据读写、编解码、计算等都在这里进行,可以说它是 Netty 业务逻辑处理等单元。
  • Codec:编辑码器,对读取的数据进行解码,对将要发送的数据进行编码,另外它还负责数据压缩和协议转换。
  • Pipeline:是 Handler 链的容器,可以说是业务逻辑处理的大动脉,所有的 IO 事件都在这里流转。
  • ChannelFeaure:代表 channel 处理的一个结果。
  • ChannelHandlerContext:处理器 Handler 的上下文,从中可以获取 Channel,或者进行读写操作关闭链接等操作。

Netty 能用来做什么

世面上有很多中间件使用 Netty 做网络通信,Netty 很擅长这个。如果你想做一个 Websocket 长连接或者 HTTP 服务器,Netty 是一个选择;如果你想实现一个内网穿透工具或者代理工具,Netty 也是一个很好的选择,它支持 SOCKS 协议能进行字节级数据控制。

Netty echo 详解

程序清单

  • EchoServer
  • EchoServerHandler
  • EchoClient
  • EchoClientHandler

代码

EchoServer

 1import io.netty.bootstrap.ServerBootstrap;
 2import io.netty.channel.*;
 3import io.netty.channel.nio.NioEventLoopGroup;
 4import io.netty.channel.socket.SocketChannel;
 5import io.netty.channel.socket.nio.NioServerSocketChannel;
 6import io.netty.handler.logging.LogLevel;
 7import io.netty.handler.logging.LoggingHandler;
 8
 9import java.net.InetSocketAddress;
10
11/**
12 * echo server,接收打印client发送过来的数据,并把数据返回给client
13 */
14public final class EchoServer {
15
16    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
17
18    public static void main(String[] args) throws Exception {
19
20        // 配置服务器
21        // 主event loop,负责接受(accept)socket连接
22        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
23        // 从event loop,负责处理socket的事件
24        EventLoopGroup workerGroup = new NioEventLoopGroup();
25        // server 的channel处理器
26        final EchoServerHandler serverHandler = new EchoServerHandler();
27        try {
28            // Server引导器(配置器)
29            ServerBootstrap b = new ServerBootstrap();
30            b.group(bossGroup, workerGroup)
31                    .channel(NioServerSocketChannel.class)
32                    // 设置ip地址和端口,server默认使用本地ip,也可以通过bind()来设置
33                    .localAddress(new InetSocketAddress(PORT))
34                    // 配置网络传输参数,例如缓存大小
35                    .option(ChannelOption.SO_BACKLOG, 100)
36                    // 配置日志打印
37                    .handler(new LoggingHandler(LogLevel.INFO))
38                    // netty核心组件,pipeline 处理器
39                    .childHandler(new ChannelInitializer<SocketChannel>() {
40                        @Override
41                        public void initChannel(SocketChannel ch) throws Exception {
42                            // 通过channel获取管道对象
43                            ChannelPipeline pipeline = ch.pipeline();
44                            // 通过管道挂在handler
45                            pipeline.addLast(serverHandler);
46                        }
47                    });
48
49            // 绑定设置的断开(PORT),并阻塞(sync)到绑定完成,绑定完成后就开始监听close信号
50            ChannelFuture f = b.bind().sync();
51
52            // 阻塞到接收client发过来关闭(close)信号
53            f.channel().closeFuture().sync();
54            // 当client发过来close信号后,才会执行这里
55        } finally {
56            // 关闭所有event loop的线程,并关闭内部的线程
57            bossGroup.shutdownGracefully();
58            workerGroup.shutdownGracefully();
59        }
60    }
61
62}

EchoServerHandler

 1import io.netty.buffer.ByteBuf;
 2import io.netty.buffer.Unpooled;
 3import io.netty.channel.ChannelHandler.Sharable;
 4import io.netty.channel.ChannelHandlerContext;
 5import io.netty.channel.ChannelInboundHandlerAdapter;
 6import java.nio.charset.StandardCharsets;
 7
 8/**
 9 * Handler implementation for the echo server.
10 */
11@Sharable
12public class EchoServerHandler extends ChannelInboundHandlerAdapter {
13
14    /**  第一次建立连接调用 */
15    @Override
16    public void channelActive(ChannelHandlerContext ctx) throws Exception {
17        final char[] chars = "hello client, I am Server.".toCharArray();
18        // 建立连接想client发送一句话
19        ctx.pipeline().writeAndFlush(Unpooled.copiedBuffer(chars, StandardCharsets.UTF_8));
20    }
21
22    /** 客户端发送过来数据调用 */
23    @Override
24    public void channelRead(ChannelHandlerContext ctx, Object msg) {
25        final ByteBuf buf = (ByteBuf) msg;
26        String recv = "接收:" + buf.toString(StandardCharsets.UTF_8);
27        if(recv.indexOf("exit") != -1){
28            ctx.close();
29            return;
30        }
31        System.out.print(recv);
32        // 将数据传回客户端
33        ctx.write(Unpooled.copiedBuffer(recv.getBytes(StandardCharsets.UTF_8)));
34    }
35
36    /** 读取完毕,完成一次读事件 */
37    @Override
38    public void channelReadComplete(ChannelHandlerContext ctx) {
39        // flush - 冲刷数据,flush一次便会触发一次或多次client的read事件,反之server也是
40        ctx.flush();
41    }
42
43    /** 捕获异常 */
44    @Override
45    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
46        // 引发异常时关闭连接会
47        cause.printStackTrace();
48        ctx.close();
49    }
50
51}

EchoClient

 1import io.netty.bootstrap.Bootstrap;
 2import io.netty.buffer.Unpooled;
 3import io.netty.channel.*;
 4import io.netty.channel.nio.NioEventLoopGroup;
 5import io.netty.channel.socket.SocketChannel;
 6import io.netty.channel.socket.nio.NioSocketChannel;
 7
 8import java.nio.charset.StandardCharsets;
 9import java.util.Scanner;
10
11/**
12 * 启动client,可以想server发送消息。server的console可以查看client发送的消息。
13 */
14public final class EchoClient {
15
16    static final String HOST = System.getProperty("host", "127.0.0.1");
17    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
18
19    public static void main(String[] args) throws Exception {
20        // 配置客户端
21        // client工作线程,每一个线程将和一个EventLoop绑定
22        EventLoopGroup group = new NioEventLoopGroup();
23        try {
24            // 客户端引导其
25            Bootstrap b = new Bootstrap();
26            //配置client启动参数
27            b.group(group)
28                    // 配置数据通道,不同的数据通道决定了IO的方式,例如Nio开头的是非阻塞IO、Oio的是阻塞IO
29                    .channel(NioSocketChannel.class)
30                    // 网络传输的参数,例如网络缓存缓存的大小、是否延迟
31                    .option(ChannelOption.TCP_NODELAY, true)
32                    // 入站、出站处理器
33                    // initializer是在入站只执行一次的处理器
34                    // 它用来与server建立连接、并在pipeline上挂载编解码处理器、自定义处理器
35                    .handler(new ChannelInitializer<SocketChannel>() {
36                        @Override
37                        public void initChannel(SocketChannel ch) throws Exception {
38                            ChannelPipeline p = ch.pipeline();
39                            //p.addLast(new LoggingHandler(LogLevel.INFO));
40                            p.addLast(new EchoClientHandler());
41                        }
42                    });
43
44            // 与server建立连接(connect),并阻塞(sync)到连接建立
45            ChannelFuture f = b.connect(HOST, PORT).sync();
46            // 获取建立的连接
47            final Channel clientChannel = f.channel();
48
49            System.out.println("input 'exit' to EXIT.");
50            Scanner sc = new Scanner(System.in);
51            String str;
52            // 读取并向server发送数据
53            while (!(str = sc.next()).startsWith("exit")) {
54                clientChannel.writeAndFlush(Unpooled.copiedBuffer(str.toCharArray(), StandardCharsets.UTF_8));
55            }
56            // 想server发送断开连接的信号
57            clientChannel.close();
58            // 获取关闭连接后的结果(future),并阻塞(sync)到接收到server发过来的断开连接信号
59            clientChannel.closeFuture().sync();
60        } finally {
61            // 关闭所有的EventLoop,并终止内部的线程
62            group.shutdownGracefully();
63        }
64    }
65}

EchoClientHandler

 1import io.netty.buffer.ByteBuf;
 2import io.netty.buffer.Unpooled;
 3import io.netty.channel.ChannelHandlerContext;
 4import io.netty.channel.ChannelInboundHandlerAdapter;
 5
 6import java.nio.charset.StandardCharsets;
 7
 8/**
 9 * client 端 handler
10 */
11public class EchoClientHandler extends ChannelInboundHandlerAdapter {
12
13    private final ByteBuf firstMessage;
14
15    public EchoClientHandler() {
16        firstMessage = Unpooled.copiedBuffer("start data transfer.\n".toCharArray(), StandardCharsets.UTF_8);
17    }
18
19    /**
20     * 建立连接向server发送一个消息
21     */
22    @Override
23    public void channelActive(ChannelHandlerContext ctx) {
24        ctx.writeAndFlush(firstMessage);
25    }
26
27    /**
28     * client的读事件触发会调用此方法
29     */
30    @Override
31    public void channelRead(ChannelHandlerContext ctx, Object msg) {
32        System.out.println("client recv : " + ((ByteBuf) msg).toString(StandardCharsets.UTF_8));
33    }
34
35    /**
36     * 读取完毕向网络发送数据,flush会触发server的read事件
37     */
38    @Override
39    public void channelReadComplete(ChannelHandlerContext ctx) {
40        ctx.flush();
41    }
42
43    @Override
44    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
45        cause.printStackTrace();
46        ctx.close();
47    }
48
49}

通过注释能了解 netty 运行的组件和开发模式,但可能还会有些疑问,看下面的问题。

问题

1、为什么 Server 和 Client 使用不同的引导(BootstrapServerBootstrap

实际上 server 和 client 有很大的不同,client 配置只需要一个连接一个线程就够了,而 server 需要配置多个;channel 也有区别,server 的 channel 一方面与 client 建立连接(accept),另一方面处理 client channel 的读写事件。因为功能的差异,所以配置存在差异,就导致需要不同的引导器配置。

2、为什么 server 有两个线程组(EventLoopGroup),而 client 只有一个

server 是可以只用一个线程组。这是 netty 线程模型的缘故,netty 使用的是 Reactor 线程模型。

3、Future

Future是异步编程常见词汇,它代表了一个结果,这个结果会在未来的某个时刻发生,而功过Future能拿到这个结果。常见的是ChannelFuture,它表示了一个操作(如bind、监听关闭closeFuture)的结果。

4、sync() 方法

这个方法是一个同步方法,会阻塞上个操作,直到满足条件,例如:closeFuture().sync() 会阻塞到接收到关闭连接到信号到来。

Netty ChannelHandler

Handler 是处理器,那它处理什么呢?网络中有什么它就做那些处理,常见的有连接的建立,响应读写事件,网络中数据的处理如编码、解码等,这些都输处理器做的,Netty 中的处理器是ChannelHandler接口,所有的处理器都是这个接口的实现。

首先说下 Netty 处理器的定位,它是 Netty 架构网络处理业务逻辑的解耦,处理器代表了与用户相关的业务逻辑,如数据读取后端计算和发送,而网络连接管理、线程控制、网络事件监听处理都由 Netty 接管,使我们可以更加专注业务逻辑,站在开发的角度使用 Netty 开发接触最多的就是ChannelHandler

处理器有许多种类,通过以下分类你可以先在脑海中建立一个简单的概念。

处理器分类

  • 入站处理器:处理 read 事件的数据,通常是解码操作
  • 出站处理器:处理 write 事件的数据,通常是编码操作
  • 编解码器:一种专用的数据转换器,例如字节转换为字符串,称之为“解码器”,反之把字符串转换为“字节”称之为“编码器”
  • 转换器:复合处理器,把一种数据转换为另一种数据,例如 HTTP1 转换为 HTTP2

ChannelHandler 层次结构

Handler结构.png

常用类使用

最常使用的是 ChannelInboundHandlerAdapter、ChannelDuplexHandler、ChannelOutboundHandlerAdapter 的实现类,这些类定义了作为不同的类型的方法,例如 InboundHandler 与 OutBoundHandler 定义的方法就有很大的差别。

以入站为代表的方法有 ByteToMessageDecoder、SimpleChannelInboundHandler。

例如ByteToMessageDecoder使用时重写decode方法即可,编码器重写 encode

1      public class SquareDecoder extends {@link ByteToMessageDecoder} {
2          @Override
3          public void decode(ChannelHandlerContext ctx,  ByteBuf in, List<Object> out)
4                  throws  Exception {
5              out.add(in.readBytes(in.readableBytes()));
6          }
7      }

SimpleChannelInboundHandler<T> 使用时重写 channelRead0 方法即可,Adapter 里已经帮我们做好了类型转换。

1       public class StringHandler extends SimpleChannelInboundHandler<String> {
2            @Override
3           protected void channelRead0(ChannelHandlerContext ctx, String message)
4                   throws Exception {
5               System.out.println(message);
6           }
7       }

ChannelDuplexHandler 的继承类实现多是进行数据转换,例如:ByteToMessageCodec 将入站的 byte 数据转化为对象,出站时将对象转换为 byte,类似的还有 MessageToMessageCodec,实现两个对象间的转化。

ChannelHandler 的几个注意点

1、执行顺序。按照被添加的顺序执行。

2、共享 Handler 与非共享 Handler。共享的 handler 会帮定到多个 Channel,当 Channel 并行执行时可能存在线程安全问题;非共享 handler 绑定时就会创建一个,Channel 独享不存在线程安全问题。

3、Handler 的 read 会执行多次。当 Channel 中的数据未处理完,上一个 handler 会多次触发下个 handler 的 read 事件。

Netty ChannelPipeline

ChannelPipeline 是 ChannelHandler 链的容器,可以说是业务逻辑处理的大动脉,所有的 IO 事件都在这里流转。

ChannelPipeline 负责 ChannelHandler 的编排,其次是传递 Channel 的事件通知。

通过图说明 ChannelPipeline 的要点

1、每一个 ChannelPipeline 和 Channel 唯一绑定

2、ChannelPipeline 是一个带有头和尾的双向管道,事件可以从头到尾流动,也可以从尾到头流动。

3、写 ChannelPipeline 的事件会是从 Pipeline 的头部开始流动事件

4、通常,如果事件从头到尾流动我们称为入站,从尾到头称为出站,入站的第一个 ChannelHandler 的序号是 1,出站的第一个 ChannelHandler 是序号 4。

Netty ChannelHandlerContext

ChannelHandlerContext 的主要功能是关联 ChannelPipeline 与 ChannelHandler 然后管理 ChannelHandler,另一方面是管理和同一个 Pipeline 中的其他 Handler 的交互,也就是 Handler 之间的事件传递。

ChannelHandlerContext 的能力

image

通过上图里的方法可以看出 ChannelHandlerContext 的能力:

  • 获取 ByteBuf 分配器
  • 获取绑定的 Channel
  • 获取绑定的 Pipeline
  • 进行事件通知,以 fire* 开头的方法
  • 连接管理(bindconnectdisconnect)
  • 数据读写(writewiriteAndFlushread)

ChannelHandlerContext 与 Channel、ChannelPipeline 的关系

image

说明:

  1. 每一个 ChannelHandlerContext 与一个 ChannelHandler 绑定,一旦关联不会改变,因此缓存 ChannelHandlerConntext 是安全的
  2. 每一个 ChannelHandlerContext 也与一个 ChannelPipeline 关联,关联后不会改变
  3. 使用 ChannelHandlerContext 的方法触发的事件从下一个节点流转,而使用 Pipeline、Channel 产生的事件从 pipeline 头部流转。

特殊情况:由于 ChannelHandler 可以是共享的,也就是说可以和多个 Pipeline 关联,此时 ChannelHandler 对于每一个关联的 Pipeline 都创建一个 ChannelHandlerContext。

ChannelHandlerContext 应用

  1. 缓存起来,在其他方法或线程中使用(因为 ChannelHandlerContext 的绑定是不变的可以放心使用)
  2. 控制 pipeline,实现协议的切换

Netty 异常处理

异常处理在任何系统中都是重要的组成部分,Netty 的异常处理是通过在 ChannelHandler 中重写 exceptionCaught 方法来实现,这篇文章聚焦于此。

异常处理方式

1、捕获异常处理

1    public static class InboundHandler extends ChannelInboundHandlerAdapter {
2        @Override
3        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
4            // 处理异常
5            System.err.println(this.getClass().getSimpleName() + " ---- " + cause.getMessage());
6            // 向下一个handler传递异常
7            ctx.fireExceptionCaught(cause);
8        }
9    }

P.S. 传递异常将从下个 handler 一直往后传递,不会跳过 OutboundHandler。


2、通过监听 Funture、Promise 处理

1)在调研writewriteAndFlush可以获得得到 ChannelFeature,进而可以监听执行结果是否成功、异常。通常在 InboundHandler 中使用。

 1            ctx.executor().schedule(()->{
 2                channelFuture.addListener(new ChannelFutureListener() {
 3                    @Override
 4                    public void operationComplete(ChannelFuture future) throws Exception {
 5                        System.out.println("writeAndFlush done.");
 6                        if(future.isSuccess()){
 7                            System.out.println("send success.");
 8                        }else{
 9                       		// 处理异常
10                            System.out.println("send fail!");
11                        }
12                    }
13                });
14            }, 0, TimeUnit.SECONDS);

2)在 OutboundHandler 中,write 方法的入参会带有个Promise参数,通过 Promise 对象可以处理异常,处理后将通知之前的 handler。使用时通过setSuccesssetFailure设置。

源码分析

1、是谁触发 exceptionCaught 方法

主要是 AbstractChannelhandlerContext#invokeExceptionCaught 方法。

 1    private void invokeExceptionCaught(final Throwable cause) {
 2        if (invokeHandler()) {
 3            try {
 4                //
 5                handler().exceptionCaught(this, cause);
 6            } catch (Throwable error) {
 7                if (logger.isDebugEnabled()) {
 8                    logger.debug(
 9                        "An exception {}" +
10                        "was thrown by a user handler's exceptionCaught() " +
11                        "method while handling the following exception:",
12                        ThrowableUtil.stackTraceToString(error), cause);
13                } else if (logger.isWarnEnabled()) {
14                    logger.warn(
15                        "An exception '{}' [enable DEBUG level for full stacktrace] " +
16                        "was thrown by a user handler's exceptionCaught() " +
17                        "method while handling the following exception:", error, cause);
18                }
19            }
20        } else {
21            fireExceptionCaught(cause);
22        }
23    }

当 channel 触发事件调用 invokeChannelActive、invokeChannelRead 过程中出现异常调用 invokeExceptionCaught。

 1    private void invokeChannelActive() {
 2        if (invokeHandler()) {
 3            try {
 4                ((ChannelInboundHandler) handler()).channelInactive(this);
 5            } catch (Throwable t) {
 6            	// 处理异常
 7                invokeExceptionCaught(t);
 8            }
 9        } else {
10            fireChannelInactive();
11        }
12    }
13
14    private void invokeChannelRead(Object msg) {
15        if (invokeHandler()) {
16            try {
17                ((ChannelInboundHandler) handler()).channelRead(this, msg);
18            } catch (Throwable t) {
19            	// 处理异常
20                invokeExceptionCaught(t);
21            }
22        } else {
23            fireChannelRead(msg);
24        }
25    }

2、如果我们创建的 Handler 不处理异常,那么会由 Pipeline 中名为tail 的 ChannelHandlerContext 来捕获异常并打印。

可以看到 DefaultChannelPipeline 中有两个内部类:TailContext、HeadContext,最后的异常就是被 TailContext 的实力tail处理的,可以看到 onUnhandledInboundException 方法中使用 warn 级别输出了异常信息。

 1   // A special catch-all handler that handles both bytes and messages.
 2    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
 3
 4        TailContext(DefaultChannelPipeline pipeline) {
 5            super(pipeline, null, TAIL_NAME, TailContext.class);
 6            setAddComplete();
 7        }
 8
 9        @Override
10        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
11            onUnhandledInboundUserEventTriggered(evt);
12        }
13
14        @Override
15        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
16            // 处理最后的异常,方法如下
17            onUnhandledInboundException(cause);
18        }
19        // 省略...
20    }
21
22    /**
23     * 处理最后的异常
24     * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
25     * in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.
26     */
27    protected void onUnhandledInboundException(Throwable cause) {
28        try {
29            logger.warn(
30                    "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
31                            "It usually means the last handler in the pipeline did not handle the exception.",
32                    cause);
33        } finally {
34            ReferenceCountUtil.release(cause);
35        }
36    }
37
38    final class HeadContext extends AbstractChannelHandlerContext
39            implements ChannelOutboundHandler, ChannelInboundHandler {
40
41        private final Unsafe unsafe;
42
43        HeadContext(DefaultChannelPipeline pipeline) {
44            super(pipeline, null, HEAD_NAME, HeadContext.class);
45            unsafe = pipeline.channel().unsafe();
46            setAddComplete();
47        }
48
49        @Override
50        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
51            ctx.fireExceptionCaught(cause);
52        }
53        // 省略...
54    }

Netty EventLoop

EventLoop 是 Netty 的工作线程,EventLoop 是单线程的,每一个 EventLoop 永远只和一个 Java 线程绑定,这使得 EventLoop 处理读写事件是线程安全的(非共享 Handler)。

Netty 版本:4.1.70.Final

EventLoop 类结构

EventLoop类结构

无论是 EpollEventLoop、NioEventLoop、KQueueEventLoop 都是直接接触 SingleThreadEventLoop 实现的,这个类也再次验证了 EventLoop 是单线程的。

类、接口主要作用

名字类型作用
Executor接口主要定义一个可执行的 execute 方法
ExecutorService接口定义了对任务(Task)的控制方法,如提交(submit)、结束(shutdown)和状态方法(isShutdown)等
ScheduledExecutorService接口定义了定时类任务提交方法,特点是方法入参须传入时间和时间单位
AbstractExecutorService抽象类默认实现了 ExecutorService 的方法
EventExecutorGroup接口定义了组 Executor 的操作方法,核心方法是 next 用于返回一个组管理的 Executor。
EventLoopGroup接口特殊的 EventExecutorGroup,允许注册 Channel,用于选择 Channel
EventExecutor接口定义了一些方法,检测线程是否在 EventLoop 中运行
AbstractEventExecutor抽象类默认 EventExecutor 接口的实现
AbstractScheduledEventExecutor抽象类支持定时任务的 EventExecutor 接口的默认实现
SingleThreadEventExecutor抽象类单线程 EventExecutor 的默认实现,单线程 Executor 基类
EventLoop接口定义获取父 EventLoop 的方法 parent
SingleThreadEventLoop抽象类EventLoop 的抽象基类,它在单个线程中执行所有提交的任务。
NioEventLoop聚合 Nio Selector 对象的类
EpollEventLoop聚合 Epoll fd、Epoll event 的类

EventLoop 核心源码分析

AbstractScheduledEventExecutor

ScheduledEventExecutor 的核心功能是创建执行执行定时任务,这些功能对应的方法是:

1public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
2public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
3public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
4public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

上边的这些方法最后都会调用一个私有的schedule方法,这个方法是 ScheduledEventExecutor 的核心。

 1    /**
 2     * 核心方法,所有暴露的schedule方法最后都调用此方法
 3     * @param task 可执行的对象(实现Runnable)且具有返回值(实现Future)
 4     * @return task执行结果,可获取执行结果(success/failure),可以监听状态
 5     */
 6    private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
 7        // 判断在哪里创建(schedule)定时任务
 8        if (inEventLoop()) {
 9            // 在EventLoop线程中
10            // 如果是在EventLoop(handler执行上下文)中创建的定时任务,就放到任务执行队列中
11            scheduleFromEventLoop(task);
12        } else {
13            // 在外部线程 e.g. main线程调用schedule创建定时任务
14
15            // 截止时间 (当前 - ScheduleFutureTask创建时间)
16            final long deadlineNanos = task.deadlineNanos();
17            // 任务提交前回调判断,true 立即执行任务,false 延迟执行
18            if (beforeScheduledTaskSubmitted(deadlineNanos)) {
19                // execute表示调用执行(立即run),是个未实现的方法,未来由子类实现,由父类调用
20                execute(task);
21            } else {
22                // 在不重写父类lazyExecute时默认仍然使用当前EventLoop线程执行
23                lazyExecute(task);
24                // 任务提交之后回调,方法返回true唤醒当前EventLoop线程,false不唤醒
25                if (afterScheduledTaskSubmitted(deadlineNanos)) {
26                    // 官方解释是为了避免线程竞争
27                    // WAKEUP_TASK这个任务run方法没有执行的语句,使线程保持活跃(等待->可执行,持有CPU资源),避免线程竞争CPU开销
28                    execute(WAKEUP_TASK);
29                }
30            }
31        }
32        return task;
33    }
1    // 在EventLoop中创建的定时任务放在taskQueue, 方便EventGroup调度
2	final void scheduleFromEventLoop(final ScheduledFutureTask<?> task) {
3        // nextTaskId a long and so there is no chance it will overflow back to 0
4        scheduledTaskQueue().add(task.setId(++nextTaskId));
5    }

SingleThreadEventExecutor

这个类主要实现了任务的启动、执行、结束

1private void doStartThread();
2// 执行一个任务
3public void execute(Runnable task);
4// 关闭
5public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);

SingleThreadEventLoop

SingleThreadEventLoop 是 EventLoop 的抽象基类,且继承了 SingleThreadEventExecutor,这个类的主要作用是通过构造器组装父类需要的属性。

NioEventLoop

这个类继承了 SingleThreadEventLoop,内部聚合 Nio Selector,作用是将 Channel 注册到 Selector 并且在事件循环中对这些进行多路复用。

1    /**
2     * The NIO {@link Selector}.
3     */
4    private Selector selector;
5    private Selector unwrappedSelector;
6    private SelectedSelectionKeySet selectedKeys;

EOF


博客没有评论系统,可以通过 邮件 评论和交流。 Top↑