Netty之EventLoop-netty学习笔记(11)-20210813

摘要:
EventLop接口运行任务来处理在连接的生命周期内发生的事件是任何网络框架的基本功能。与之相应的编程上的构造通常被称为事件循环——一个Netty使用了interfaceio.netty.channel.EventLoop来适配的术语。根据配置和可用核心的不同,可能会创建多个EventLoop实例用以优化资源的使用,并且单个EventLoop可能会被指派用于服务多个Channel。Netty4中的I/O和事件处理由I/O操作触发的事件将流经安装了一个或者多个ChannelHandler的ChannelPipeline。因此,在Netty4中,所有的I/O操作和事件都由已经被分配给了EventLoop的那个Thread来处理。

线程模型概述

基本的线程池化模式可以描述为:

从池的空闲线程列表中选择一个 Thread,并且指派它去运行一个已提交的任务(一个Runnable 的实现);当任务完成时,将该 Thread 返回给该列表,使其可被重用。

虽然池化和重用线程相对于简单地为每个任务都创建和销毁线程是一种进步,但是它并不能 消除由上下文切换所带来的开销,其将随着线程数量的增加很快变得明显,并且在高负载下愈演 愈烈。此外,仅仅由于应用程序的整体复杂性或者并发需求,在项目的生命周期内也可能会出现 其他和线程相关的问题。

EventLop接口

运行任务来处理在连接的生命周期内发生的事件是任何网络框架的基本功能。与之相应的编程上的构造通常被称为事件循环——一个 Netty 使用了 interface io.netty.channel. EventLoop 来适配的术语。

下面的代码说明了事件循环的基本思想,其中每个任务都是一个 Runnable 的实例:

//阻塞,直到有事件已经就绪可被运行
while(!terminated) {
    List<Runnable> readyEvents =blockUntilEventsReady();
    for(Funnable ev : readyEvents){
        ev.run();//循环遍历,并处理所有的事件
}
}

Netty 的 EventLoop 是协同设计的一部分,它采用了两个基本的 API:并发和网络编程。 首先,io.netty.util.concurrent包构建在 JDK 的java.util.concurrent包上,用 来提供线程执行器。其次,io.netty.channel包中的类,为了与 Channel 的事件进行交互, 扩展了这些接口/类。

Netty之EventLoop-netty学习笔记(11)-20210813第1张

在这个模型中,一个 EventLoop 将由一个永远都不会改变的 Thread 驱动,同时任务 (Runnable 或者 Callable)可以直接提交给 EventLoop 实现,以立即执行或者调度执行。
根据配置和可用核心的不同,可能会创建多个 EventLoop 实例用以优化资源的使用,并且单个 EventLoop 可能会被指派用于服务多个 Channel。

Netty4中的I/O和事件处理
由 I/O 操作触发的事件将流经安装了一个或者多个 ChannelHandler 的 ChannelPipeline。传播这些事件的方法调用可以随后被 Channel- Handler 所拦截,并且可以按需地处理事件。

事件的性质通常决定了它将被如何处理;它可能将数据从网络栈中传递到你的应用程序中, 或者进行逆向操作,或者 执行一些截然不同的操作。但是事件的处理逻辑必须足够的通用和灵活, 以处理所有可能的用例。因此,在Netty 4 中,所有的I/O操作和事件都由已经被分配给了 EventLoop的那个Thread来处理。

NioEventLoopGroup 类层次结构

Netty之EventLoop-netty学习笔记(11)-20210813第2张

NioEventLoopGroup 实例化过程

NioEventLoopGroup 的初始化过程:

Netty之EventLoop-netty学习笔记(11)-20210813第3张

即:

  • EventLoopGroup(其实是MultithreadEventExecutorGroup) 内部维护一个类型为 EventExecutor children 数组, 其大小是 nThreads, 这样就构成了一个线程池

  • 如果我们在实例化 NioEventLoopGroup 时, 如果指定线程池大小, 则 nThreads 就是指定的值, 反之是处理器核心数 * 2

  • MultithreadEventExecutorGroup 中会调用 newChild 抽象方法来初始化 children 数组

  • 抽象方法 newChild 是在 NioEventLoopGroup 中实现的, 它返回一个 NioEventLoop 实例.

  • NioEventLoop 属性:

    • SelectorProvider provider 属性: NioEventLoopGroup 构造器中通过 SelectorProvider.provider() 获取一个 SelectorProvider

    • Selector selector 属性: NioEventLoop 构造器中通过调用通过 selector = provider.openSelector() 获取一个 selector 对象.

NioEventLoop

NioEventLoop 继承于 SingleThreadEventLoop, 而 SingleThreadEventLoop 又继承于 SingleThreadEventExecutor. SingleThreadEventExecutor 是 Netty 中对本地线程的抽象, 它内部有一个 Thread thread 属性, 存储了一个本地 Java 线程. 因此我们可以认为, 一个 NioEventLoop 其实和一个特定的线程绑定, 并且在其生命周期内, 绑定的线程都不会再改变。

NioEventLoop 类层次结构

Netty之EventLoop-netty学习笔记(11)-20210813第4张

NioEventLoop 的类层次结构图还是比较复杂的, 不过我们只需要关注几个重要的点即可. 首先 NioEventLoop 的继承链如下:

NioEventLoop -> SingleThreadEventLoop -> SingleThreadEventExecutor -> AbstractScheduledEventExecutor

在 AbstractScheduledEventExecutor 中, Netty 实现了 NioEventLoop 的 schedule 功能, 即我们可以通过调用一个 NioEventLoop 实例的 schedule 方法来运行一些定时任务. 而在 SingleThreadEventLoop 中, 又实现了任务队列的功能, 通过它, 我们可以调用一个 NioEventLoop 实例的 execute 方法来向任务队列中添加一个 task, 并由 NioEventLoop 进行调度执行。

通常来说, NioEventLoop 肩负着两种任务, 第一个是作为 IO 线程, 执行与 Channel 相关的 IO 操作, 包括 调用 select 等待就绪的 IO 事件、读写数据与数据的处理等; 而第二个任务是作为任务队列, 执行 taskQueue 中的任务, 例如用户调用 eventLoop.schedule 提交的定时任务也是这个线程执行的。

Netty之EventLoop-netty学习笔记(11)-20210813第5张

从上图可以看到, SingleThreadEventExecutor 有一个名为thread的 Thread 类型字段, 这个字段就代表了与 SingleThreadEventExecutor 关联的本地线程。

下面是这个构造器的代码:

protectedSingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, booleanaddTaskWakesUp) {
    this.parent =parent;
    this.addTaskWakesUp =addTaskWakesUp;

    thread = threadFactory.newThread(newRunnable() {
        @Override
        public voidrun() {
            boolean success = false;
            updateLastExecutionTime();
            try{
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch(Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally{
                //省略清理代码
...
            }
        }
    });
    threadProperties = newDefaultThreadProperties(thread);
    taskQueue =newTaskQueue();
}

在 SingleThreadEventExecutor 构造器中, 通过threadFactory.newThread创建了一个新的 Java 线程. 在这个线程中所做的事情主要就是调用SingleThreadEventExecutor.this.run()方法, 而因为 NioEventLoop 实现了这个方法, 因此根据多态性, 其实调用的是NioEventLoop.run()方法。

EventLoop 与 Channel 的关联

Netty 中, 每个 Channel 都有且仅有一个 EventLoop 与之关联, 它们的关联过程如下:

Netty之EventLoop-netty学习笔记(11)-20210813第6张

从上图中我们可以看到, 当调用了AbstractChannel#AbstractUnsafe.register后, 就完成了 Channel 和 EventLoop 的关联. register 实现如下:

@Override
public final void register(EventLoop eventLoop, finalChannelPromise promise) {
    //删除条件检查.
...
    AbstractChannel.this.eventLoop =eventLoop;

    if(eventLoop.inEventLoop()) {
        register0(promise);
    } else{
        try{
            eventLoop.execute(newOneTimeTask() {
                @Override
                public voidrun() {
                    register0(promise);
                }
            });
        } catch(Throwable t) {
            ...
        }
    }
}

在AbstractChannel#AbstractUnsafe.register中, 会将一个 EventLoop 赋值给 AbstractChannel 内部的 eventLoop 字段, 到这里就完成了 EventLoop 与 Channel 的关联过程。

EventLoop 的启动

在前面我们已经知道了, NioEventLoop 本身就是一个 SingleThreadEventExecutor, 因此 NioEventLoop 的启动, 其实就是 NioEventLoop 所绑定的本地 Java 线程的启动。

依照这个思想, 我们只要找到在哪里调用了 SingleThreadEventExecutor 的 thread 字段的start()方法就可以知道是在哪里启动的这个线程了。

从代码中搜索, thread.start() 被封装到SingleThreadEventExecutor.startThread()方法中了:

private voidstartThread() {
    if (STATE_UPDATER.get(this) ==ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            thread.start();
        }
    }

STATE_UPDATER是 SingleThreadEventExecutor 内部维护的一个属性, 它的作用是标识当前的 thread 的状态. 在初始的时候,STATE_UPDATER == ST_NOT_STARTED, 因此第一次调用 startThread() 方法时, 就会进入到 if 语句内, 进而调用到 thread.start()。
而这个关键的startThread()方法又是在哪里调用的呢? 经过方法调用关系搜索, 我们发现, startThread 是在 SingleThreadEventExecutor.execute 方法中调用的:

@Override
public voidexecute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop =inEventLoop();
    if(inEventLoop) {
        addTask(task);
    } else{
        startThread(); //调用 startThread 方法, 启动EventLoop 线程.
addTask(task);
        if (isShutdown() &&removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp &&wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

既然如此, 那现在我们的工作就变为了寻找在哪里第一次调用了 SingleThreadEventExecutor.execute() 方法.

在注册 channel 的过程中, 会在AbstractChannel#AbstractUnsafe.register中调用 eventLoop.execute 方法, 在 EventLoop 中进行 Channel 注册代码的执行, AbstractChannel#AbstractUnsafe.register 部分代码如下:

if(eventLoop.inEventLoop()) {
    register0(promise);
} else{
    try{
        eventLoop.execute(newOneTimeTask() {
            @Override
            public voidrun() {
                register0(promise);
            }
        });
    } catch(Throwable t) {
        ...
    }
}

很显然, 一路从 Bootstrap.bind 方法跟踪到 AbstractChannel#AbstractUnsafe.register 方法, 整个代码都是在主线程中运行的, 因此上面的eventLoop.inEventLoop()就为 false, 于是进入到 else 分支, 在这个分支中调用了eventLoop.execute. eventLoop 是一个 NioEventLoop 的实例, 而 NioEventLoop 没有实现 execute 方法, 因此调用的是SingleThreadEventExecutor.execute:

@Override
public voidexecute(Runnable task) {
    ...
    boolean inEventLoop =inEventLoop();
    if(inEventLoop) {
        addTask(task);
    } else{
        startThread();
        addTask(task);
        if (isShutdown() &&removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp &&wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

我们已经分析过了,inEventLoop == false, 因此执行到 else 分支, 在这里就调用了startThread()方法来启动 SingleThreadEventExecutor 内部关联的 Java 本地线程了。
总结一句话, 当 EventLoop.execute第一次被调用时, 就会触发startThread()的调用, 进而导致了 EventLoop 所对应的 Java 线程的启动。
我们将EventLoop 与 Channel 的关联小节中的时序图补全后, 就得到了 EventLoop 启动过程的时序图:

Netty之EventLoop-netty学习笔记(11)-20210813第7张

Netty 的 IO 处理循环

在 Netty 中, 一个 EventLoop 需要负责两个工作, 第一个是作为 IO 线程, 负责相应的 IO 操作; 第二个是作为任务线程, 执行 taskQueue 中的任务。接下来我们先从 IO 操纵方面入手, 看一下 TCP 数据是如何从 Java NIO Socket 传递到我们的 handler 中的.

Netty 是 Reactor 模型的一个实现, 并且是基于 Java NIO 的, Netty 中必然有一个 Selector 线程, 用于不断调用 Java NIO 的 Selector.select 方法, 查询当前是否有就绪的 IO 事件. 回顾一下在 Java NIO 中所讲述的 Selector 的使用流程:

  1. 通过 Selector.open() 打开一个 Selector.

  2. 将 Channel 注册到 Selector 中, 并设置需要监听的事件(interest set)

  3. 不断重复:

    • 调用 select() 方法

    • 调用 selector.selectedKeys() 获取 selected keys

    • 迭代每个 selected key:

      • 1) 从 selected key 中获取 对应的 Channel 和附加信息(如果有的话)

      • 2) 判断是哪些 IO 事件已经就绪了, 然后处理它们.如果是 OP_ACCEPT 事件, 则调用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 获取 SocketChannel, 并将它设置为 非阻塞的, 然后将这个 Channel 注册到 Selector 中.

      • 3) 根据需要更改 selected key 的监听事件.

      • 4) 将已经处理过的 key 从 selected keys 集合中删除.

上面的使用流程用代码来体现就是:

/*** @authorxiongyongshun
 * @Email yongshun1228@gmail.com
 * @version1.0
 * @created 16/8/1 13:13
 */
public classNioEchoServer {
    private static final int BUF_SIZE = 256;
    private static final int TIMEOUT = 3000;

    public static void main(String args[]) throwsException {
        //打开服务端 Socket
        ServerSocketChannel serverSocketChannel =ServerSocketChannel.open();

        //打开 Selector
        Selector selector =Selector.open();

        //服务端 Socket 监听8080端口, 并配置为非阻塞模式
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        serverSocketChannel.configureBlocking(false);

        //将 channel 注册到 selector 中.
        //通常我们都是先注册一个 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ
        //注册到 Selector 中.
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            //通过调用 select 方法, 阻塞地等待 channel I/O 可操作
            if (selector.select(TIMEOUT) == 0) {
                System.out.print(".");
                continue;
            }

            //获取 I/O 操作就绪的 SelectionKey, 通过 SelectionKey 可以知道哪些 Channel 的哪类 I/O 操作已经就绪.
            Iterator<SelectionKey> keyIterator =selector.selectedKeys().iterator();

            while(keyIterator.hasNext()) {

                SelectionKey key =keyIterator.next();

                //当获取一个 SelectionKey 后, 就要将它删除, 表示我们已经对这个 IO 事件进行了处理.
keyIterator.remove();

                if(key.isAcceptable()) {
                    //当 OP_ACCEPT 事件到来时, 我们就有从 ServerSocketChannel 中获取一个 SocketChannel,
                    //代表客户端的连接
                    //注意, 在 OP_ACCEPT 事件中, 从 key.channel() 返回的 Channel 是 ServerSocketChannel.
                    //而在 OP_WRITE 和 OP_READ 中, 从 key.channel() 返回的是 SocketChannel.
                    SocketChannel clientChannel =((ServerSocketChannel) key.channel()).accept();
                    clientChannel.configureBlocking(false);
                    //在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中.
                    //注意, 这里我们如果没有设置 OP_READ 的话, 即 interest set 仍然是 OP_CONNECT 的话, 那么 select 方法会一直直接返回.
clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
                }

                if(key.isReadable()) {
                    SocketChannel clientChannel =(SocketChannel) key.channel();
                    ByteBuffer buf =(ByteBuffer) key.attachment();
                    long bytesRead =clientChannel.read(buf);
                    if (bytesRead == -1) {
                        clientChannel.close();
                    } else if (bytesRead > 0) {
                        key.interestOps(OP_READ |SelectionKey.OP_WRITE);
                        System.out.println("Get data length: " +bytesRead);
                    }
                }

                if (key.isValid() &&key.isWritable()) {
                    ByteBuffer buf =(ByteBuffer) key.attachment();
                    buf.flip();
                    SocketChannel clientChannel =(SocketChannel) key.channel();

                    clientChannel.write(buf);

                    if (!buf.hasRemaining()) {
                        key.interestOps(OP_READ);
                    }
                    buf.compact();
                }
            }
        }
    }
}

操作的第一步通过 Selector.open() 打开一个 Selector, Netty 中是通过调用 SelectorProvider.openSocketChannel() 来打开一个新的 Java NIO SocketChannel:

private staticSocketChannel newSocket(SelectorProvider provider) {
    ...
    returnprovider.openSocketChannel();
}

第二步将 Channel 注册到 Selector 中, 并设置需要监听的事件(interest set)的操作我们在第一章channel 的注册过程中也分析过了, 我们在来回顾一下, 在客户端的 Channel 注册过程中, 会有如下调用链:

Bootstrap.initAndRegister ->AbstractBootstrap.initAndRegister ->MultithreadEventLoopGroup.register ->SingleThreadEventLoop.register ->AbstractUnsafe.register ->AbstractUnsafe.register0 ->AbstractNioChannel.doRegister

在 AbstractUnsafe.register 方法中调用了 register0 方法:

@Override
public final void register(EventLoop eventLoop, finalChannelPromise promise) {
    //省略条件判断和错误处理
    AbstractChannel.this.eventLoop =eventLoop;
    register0(promise);
}

register0 方法代码如下:

private voidregister0(ChannelPromise promise) {
    boolean firstRegistration =neverRegistered;
    doRegister();
    neverRegistered = false;
    registered = true;
    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();
    //Only fire a channelActive if the channel has never been registered. This prevents firing
    //multiple channel actives if the channel is deregistered and re-registered.
    if (firstRegistration &&isActive()) {
        pipeline.fireChannelActive();
    }
}

register0 又调用了 AbstractNioChannel.doRegister:

@Override
protected void doRegister() throwsException {
    //省略错误处理
    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}

在这里 javaChannel() 返回的是一个 Java NIO SocketChannel 对象, 我们将此 SocketChannel 注册到前面第一步获取的 Selector 中。

thread 的 run 循环

下面是此线程的 run() 方法, 我已经把一些异常处理和收尾工作的代码都去掉了. 这个 run 方法可以说是十分简单, 主要就是调用了SingleThreadEventExecutor.this.run()方法. 而 SingleThreadEventExecutor.run() 是一个抽象方法, 它的实现在 NioEventLoop 中。

thread = threadFactory.newThread(newRunnable() {
        @Override
        public voidrun() {
            boolean success = false;
            updateLastExecutionTime();
            try{
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch(Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally{
                ...
            }
        }
    });

继续跟踪到 NioEventLoop.run() 方法, 其源码如下:

@Override
protected voidrun() {
    for(;;) {
        boolean oldWakenUp = wakenUp.getAndSet(false);
        try{
            if(hasTasks()) {
                selectNow();
            } else{
                select(oldWakenUp);
                if(wakenUp.get()) {
                    selector.wakeup();
                }
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                processSelectedKeys();
                runAllTasks();
            } else{
                final long ioStartTime =System.nanoTime();

                processSelectedKeys();

                final long ioTime = System.nanoTime() -ioStartTime;
                runAllTasks(ioTime * (100 - ioRatio) /ioRatio);
            }

            if(isShuttingDown()) {
                closeAll();
                if(confirmShutdown()) {
                    break;
                }
            }
        } catch(Throwable t) {
            ...
        }
    }
}

NioEventLoop 事件循环的核心就是上面代码的for(;;)所构成的死循环。

这个 run 方法可以说是 Netty NIO 的核心, 属于重中之重, 把它分析明白了, 那么对 Netty 的事件循环机制也就了解了大部分了。

IO 事件的轮询

首先, 在 run 方法中, 第一步是调用hasTasks()方法来判断当前任务队列中是否有任务:

protected booleanhasTasks() {
    assertinEventLoop();
    return !taskQueue.isEmpty();
}

这个方法很简单, 仅仅是检查了一下taskQueue是否为空. 至于 taskQueue 是什么呢, 其实它就是存放一系列的需要由此 EventLoop 所执行的任务列表。

当 taskQueue 不为空时, 就执行到了 if 分支中的 selectNow() 方法. 然而当 taskQueue 为空时, 执行的是 select(oldWakenUp) 方法. 那么selectNow()和select(oldWakenUp)之间有什么区别呢? 来看一下, selectNow() 的源码如下:

void selectNow() throwsIOException {
    try{
        selector.selectNow();
    } finally{
        //restore wakup state if needed
        if(wakenUp.get()) {
            selector.wakeup();
        }
    }
}

首先调用了selector.selectNow()方法, 这个selector字段正是 Java NIO 中的多路复用器Selector. 那么这里selector.selectNow()就很好理解了, selectNow() 方法会检查当前是否有就绪的 IO 事件, 如果有, 则返回就绪 IO 事件的个数; 如果没有, 则返回0.注意, selectNow() 是立即返回的, 不会阻塞当前线程.当 selectNow() 调用后, finally 语句块中会检查 wakenUp 变量是否为 true, 当为 true 时, 调用 selector.wakeup() 唤醒 select() 的阻塞调用。

看了 if 分支的 selectNow 方法后, 我们再来看一下 else 分支的select(oldWakenUp)方法。

其实 else 分支的select(oldWakenUp)方法的处理逻辑比较复杂, 而我们这里的目的暂时不是分析这个方法调用的具体工作, 因此我这里长话短说, 只列出我们我们关注的内如:

private void select(boolean oldWakenUp) throwsIOException {
    Selector selector = this.selector;
    try{
        ...
        int selectedKeys =selector.select(timeoutMillis);
        ...
    } catch(CancelledKeyException e) {
        ...
    }
}

在这个 select 方法中, 调用了selector.select(timeoutMillis), 而这个调用是会阻塞住当前线程的, timeoutMillis 是阻塞的超时时间。

到来这里, 我们可以看到, 当hasTasks()为真时, 调用的的selectNow()方法是不会阻塞当前线程的, 而当hasTasks()为假时, 调用的select(oldWakenUp)是会阻塞当前线程的。

这其实也很好理解: 当 taskQueue 中没有任务时, 那么 Netty 可以阻塞地等待 IO 就绪事件; 而当 taskQueue 中有任务时, 我们自然地希望所提交的任务可以尽快地执行, 因此 Netty 会调用非阻塞的 selectNow() 方法, 以保证 taskQueue 中的任务尽快可以执行。

IO 事件的处理

在 NioEventLoop.run() 方法中, 第一步是通过 select/selectNow 调用查询当前是否有就绪的 IO 事件. 那么当有 IO 事件就绪时, 第二步自然就是处理这些 IO 事件啦。

首先让我们来看一下 NioEventLoop.run 中循环的剩余部分:

final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
    processSelectedKeys();
    runAllTasks();
} else{
    final long ioStartTime =System.nanoTime();

    processSelectedKeys();

    final long ioTime = System.nanoTime() -ioStartTime;
    runAllTasks(ioTime * (100 - ioRatio) /ioRatio);
}

上面列出的代码中, 有两个关键的调用, 第一个是processSelectedKeys()调用, 根据字面意思, 我们可以猜出这个方法肯定是查询就绪的 IO 事件, 然后处理它; 第二个调用是runAllTasks(), 这个方法我们也可以一眼就看出来它的功能就是运行 taskQueue 中的任务。

这里的代码还有一个十分有意思的地方, 即ioRatio. 那什么是ioRatio呢? 它表示的是此线程分配给 IO 操作所占的时间比(即运行 processSelectedKeys 耗时在整个循环中所占用的时间). 例如 ioRatio 默认是 50, 则表示 IO 操作和执行 task 的所占用的线程执行时间比是 1 : 1。

当知道了 IO 操作耗时和它所占用的时间比, 那么执行 task 的时间就可以很方便的计算出来了:

设 IO 操作耗时为 ioTime, ioTime 占的时间比例为 ioRatio, 则:
    ioTime / ioRatio = taskTime /taskRatio
    taskRatio = 100 -ioRatio
    => taskTime = ioTime * (100 - ioRatio) / ioRatio

根据上面的公式, 当我们设置 ioRate = 70 时, 则表示 IO 运行耗时占比为70%, 即假设某次循环一共耗时为 100ms, 那么根据公式, 我们知道processSelectedKeys()方法调用所耗时大概为70ms(即 IO 耗时), 而runAllTasks()耗时大概为 30ms(即执行 task 耗时)。
当 ioRatio 为 100 时, Netty 就不考虑 IO 耗时的占比, 而是分别调用processSelectedKeys()、runAllTasks(); 而当 ioRatio 不为 100时, 则执行到 else 分支, 在这个分支中, 首先记录下processSelectedKeys()所执行的时间(即 IO 操作的耗时), 然后根据公式, 计算出执行 task 所占用的时间, 然后以此为参数, 调用runAllTasks()。

我们这里先分析一下processSelectedKeys()方法调用,runAllTasks()我们留到下一节再分析。
processSelectedKeys()方法的源码如下:

private voidprocessSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized(selectedKeys.flip());
    } else{
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

这个方法中, 会根据selectedKeys字段是否为空, 而分别调用processSelectedKeysOptimized或processSelectedKeysPlain.selectedKeys字段是在调用 openSelector() 方法时, 根据 JVM 平台的不同, 而有设置不同的值, 在我所调试这个值是不为 null 的. 其实processSelectedKeysOptimized方法processSelectedKeysPlain没有太大的区别, 为了简单起见, 我们以processSelectedKeysOptimized为例分析一下源码的工作流程。

private voidprocessSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    for (int i = 0;; i ++) {
        final SelectionKey k =selectedKeys[i];
        if (k == null) {
            break;
        }
        selectedKeys[i] = null;

        final Object a =k.attachment();

        if (a instanceofAbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else{
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        ...
    }
}

两个关键的点:迭代selectedKeys获取就绪的 IO 事件, 然后为每个事件都调用processSelectedKey来处理它。
这里正好完美对应上了我们提到的 Selector 的使用流程中的第三步里操作。
还有一点需要注意的是, 我们可以调用selectionKey.attach(object)给一个 selectionKey 设置一个附加的字段, 然后可以通过Object attachedObj = selectionKey.attachment()获取它. 上面代代码正是通过了k.attachment()来获取一个附加在 selectionKey 中的对象, 那么这个对象是什么呢? 它又是在哪里设置的呢? 我们再来回忆一下 SocketChannel 是如何注册到 Selector 中的:
在客户端的 Channel 注册过程中, 会有如下调用链:

Bootstrap.initAndRegister ->AbstractBootstrap.initAndRegister ->MultithreadEventLoopGroup.register ->SingleThreadEventLoop.register ->AbstractUnsafe.register ->AbstractUnsafe.register0 ->AbstractNioChannel.doRegister

最后的 AbstractNioChannel.doRegister 方法会调用SocketChannel.register方法注册一个 SocketChannel 到指定的 Selector:

@Override
protected void doRegister() throwsException {
    //省略错误处理
    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}

特别注意一下register的第三个参数, 这个参数是设置 selectionKey 的附加对象的, 和调用selectionKey.attach(object)的效果一样. 而调用register所传递的第三个参数是this, 它其实就是一个NioSocketChannel的实例. 那么这里就很清楚了, 我们在将 SocketChannel 注册到 Selector 中时, 将 SocketChannel 所对应的 NioSocketChannel 以附加字段的方式添加到了selectionKey 中。
再回到processSelectedKeysOptimized方法中, 当我们获取到附加的对象后, 我们就调用processSelectedKey来处理这个 IO 事件:

final Object a =k.attachment();

if (a instanceofAbstractNioChannel) {
    processSelectedKey(k, (AbstractNioChannel) a);
} else{
    @SuppressWarnings("unchecked")
    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
    processSelectedKey(k, task);
}

processSelectedKey方法源码如下:

private static voidprocessSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final NioUnsafe unsafe =ch.unsafe();
    ...
    try{
        int readyOps =k.readyOps();
        
        //可读事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                //Connection already closed - no need to handle write.
                return;
            }
        }
        
        //可写事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            //Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
        }
        
        //连接建立事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            //remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            //See https://github.com/netty/netty/issues/924
            int ops =k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }
    } catch(CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

processSelectedKey中处理了三个事件, 分别是:

  • OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取.

  • OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据.

  • OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态.

下面我们分别根据这三个事件来看一下 Netty 是怎么处理的吧.

OP_READ 处理

当就绪的 IO 事件是OP_READ, 代码会调用unsafe.read()方法, 即:

//可读事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
    if (!ch.isOpen()) {
        //Connection already closed - no need to handle write.
        return;
    }
}

unsafe 这个字段, 我们已经和它打了太多的交道了, 在第一章Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (客户端)中我们已经对它进行过浓墨重彩地分析了, 最后我们确定了它是一个NioSocketChannelUnsafe实例, 负责的是 Channel 的底层 IO 操作.
我们可以利用 Intellij IDEA 提供的Go To Implementations功能, 寻找到这个方法的实现. 最后我们发现这个方法没有在NioSocketChannelUnsafe中实现, 而是在它的父类AbstractNioByteChannel实现的, 它的实现源码如下:

@Override
public final voidread() {
    ...
    ByteBuf byteBuf = null;
    int messages = 0;
    boolean close = false;
    try{
        int totalReadAmount = 0;
        boolean readPendingReset = false;
        do{
            byteBuf =allocHandle.allocate(allocator);
            int writable =byteBuf.writableBytes();
            int localReadAmount =doReadBytes(byteBuf);

            //检查读取结果.
...

            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;

            ...

            totalReadAmount +=localReadAmount;
        
            //检查是否是配置了自动读取, 如果不是, 则立即退出循环.
...
        } while (++ messages <maxMessagesPerRead);

        pipeline.fireChannelReadComplete();
        allocHandle.record(totalReadAmount);

        if(close) {
            closeOnRead(pipeline);
            close = false;
        }
    } catch(Throwable t) {
        handleReadException(pipeline, byteBuf, t, close);
    } finally{
    }
}

read()源码比较长, 我为了篇幅起见, 删除了部分代码, 只留下了主干,上面read方法其实归纳起来, 可以认为做了如下工作:

  1. 分配 ByteBuf

  2. 从 SocketChannel 中读取数据

  3. 调用pipeline.fireChannelRead发送一个 inbound 事件.

pipeline.fireChannelRead正好就是ChannelPipeline中分析的inbound事件起点. 当调用了pipeline.fireIN_EVT()后, 那么就产生了一个inbound事件, 此事件会以head -> customContext -> tail的方向依次流经 ChannelPipeline 中的各个 handler。
调用了pipeline.fireChannelRead后, 就是 ChannelPipeline 中所需要做的工作了。

OP_WRITE 处理

OP_WRITE可写事件代码如下:

if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    //Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}

OP_CONNECT 处理

最后一个事件是OP_CONNECT, 即 TCP 连接已建立事件:

if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    //remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
    //See https://github.com/netty/netty/issues/924
    int ops =k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    k.interestOps(ops);

    unsafe.finishConnect();
}

OP_CONNECT事件的处理中, 只做了两件事情:

  1. 正如代码中的注释所言, 我们需要将OP_CONNECT从就绪事件集中清除, 不然会一直有OP_CONNECT事件.

  2. 调用 unsafe.finishConnect() 通知上层连接已建立

unsafe.finishConnect() 调用最后会调用到pipeline().fireChannelActive(), 产生一个inbound事件, 通知 pipeline 中的各个 handler TCP 通道已建立(即ChannelInboundHandler.channelActive方法会被调用)

到了这里, 我们整个 NioEventLoop 的 IO 操作部分已经了解完了, 接下来的一节我们要重点分析一下Netty 的任务队列机制。

Netty 的任务队列机制

我们已经提到过, 在Netty 中, 一个 NioEventLoop 通常需要肩负起两种任务, 第一个是作为 IO 线程, 处理 IO 操作; 第二个就是作为任务线程, 处理 taskQueue 中的任务. 这一节的重点就是分析一下 NioEventLoop 的任务队列机制。

Task 的添加

普通 Runnable 任务

NioEventLoop 继承于 SingleThreadEventExecutor, 而SingleThreadEventExecutor中有一个Queue<Runnable> taskQueue字段, 用于存放添加的 Task. 在 Netty 中, 每个 Task 都使用一个实现了 Runnable 接口的实例来表示。

例如当我们需要将一个 Runnable 添加到 taskQueue 中时, 我们可以进行如下操作:

EventLoop eventLoop =channel.eventLoop();
eventLoop.execute(newRunnable() {
    @Override
    public voidrun() {
        System.out.println("Hello, Netty!");
    }
});

当调用 execute 后, 实际上是调用到了 SingleThreadEventExecutor.execute() 方法, 它的实现如下:

@Override
public voidexecute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop =inEventLoop();
    if(inEventLoop) {
        addTask(task);
    } else{
        startThread();
        addTask(task);
        if (isShutdown() &&removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp &&wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

而添加任务的addTask方法的源码如下:

protected voidaddTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if(isShutdown()) {
        reject();
    }
    taskQueue.add(task);
}

因此实际上,taskQueue是存放着待执行的任务的队列。

schedule 任务

除了通过 execute 添加普通的 Runnable 任务外, 我们还可以通过调用 eventLoop.scheduleXXX 之类的方法来添加一个定时任务。

EventLoop 中实现任务队列的功能在超类SingleThreadEventExecutor实现的, 而 schedule 功能的实现是在SingleThreadEventExecutor的父类, 即AbstractScheduledEventExecutor中实现的。

AbstractScheduledEventExecutor中, 有以 scheduledTaskQueue 字段:

Queue<ScheduledFutureTask<?>> scheduledTaskQueue;

scheduledTaskQueue 是一个队列(Queue), 其中存放的元素是ScheduledFutureTask. 而ScheduledFutureTask我们很容易猜到, 它是对 Schedule 任务的一个抽象。

我们来看一下AbstractScheduledEventExecutor所实现的schedule方法吧:

@Override
public  ScheduledFuture<?> schedule(Runnable command, longdelay, TimeUnit unit) {
    ObjectUtil.checkNotNull(command, "command");
    ObjectUtil.checkNotNull(unit, "unit");
    if (delay < 0) {
        throw newIllegalArgumentException(
                String.format("delay: %d (expected: >= 0)", delay));
    }
    return schedule(new ScheduledFutureTask<Void>(
            this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}

这是其中一个重载的 schedule, 当一个 Runnable 传递进来后, 会被封装为一个ScheduledFutureTask对象, 这个对象会记录下这个 Runnable 在何时运行、已何种频率运行等信息。

当构建了ScheduledFutureTask后, 会继续调用 另一个重载的 schedule 方法:

<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V>task) {
    if(inEventLoop()) {
        scheduledTaskQueue().add(task);
    } else{
        execute(newOneTimeTask() {
            @Override
            public voidrun() {
                scheduledTaskQueue().add(task);
            }
        });
    }

    returntask;
}

在这个方法中, ScheduledFutureTask 对象就会被添加到scheduledTaskQueue中了.

任务的执行

当一个任务被添加到 taskQueue 后, 它是怎么被 EventLoop 执行的呢?

让我们回到 NioEventLoop.run() 方法中, 在这个方法里, 会分别调用processSelectedKeys()和runAllTasks()方法, 来进行 IO 事件的处理和 task 的处理.processSelectedKeys()方法我们已经分析过了, 下面我们来看一下runAllTasks()。

runAllTasks 方法有两个重载的方法, 一个是无参数的, 另一个有一个参数的. 首先来看一下无参数的 runAllTasks:

protected booleanrunAllTasks() {
    fetchFromScheduledTaskQueue();
    Runnable task =pollTask();
    if (task == null) {
        return false;
    }

    for(;;) {
        try{
            task.run();
        } catch(Throwable t) {
            logger.warn("A task raised an exception.", t);
        }

        task =pollTask();
        if (task == null) {
            lastExecutionTime =ScheduledFutureTask.nanoTime();
            return true;
        }
    }
}

我们前面已经提到过, EventLoop 可以通过调用EventLoop.execute来将一个 Runnable 提交到 taskQueue 中, 也可以通过调用EventLoop.schedule来提交一个 schedule 任务到scheduledTaskQueue中. 在此方法的一开始调用的fetchFromScheduledTaskQueue()其实就是将scheduledTaskQueue中已经可以执行的(即定时时间已到的 schedule 任务) 拿出来并添加到 taskQueue 中, 作为可执行的 task 等待被调度执行。

它的源码如下:

private voidfetchFromScheduledTaskQueue() {
    if(hasScheduledTasks()) {
        long nanoTime =AbstractScheduledEventExecutor.nanoTime();
        for(;;) {
            Runnable scheduledTask =pollScheduledTask(nanoTime);
            if (scheduledTask == null) {
                break;
            }
            taskQueue.add(scheduledTask);
        }
    }
}

接下来runAllTasks()方法就会不断调用task = pollTask()从taskQueue中获取一个可执行的 task, 然后调用它的run()方法来运行此 task。

注意, 因为 EventLoop 既需要执行 IO 操作, 又需要执行 task, 因此我们在调用 EventLoop.execute 方法提交任务时, 不要提交耗时任务, 更不能提交一些会造成阻塞的任务, 不然会导致我们的 IO 线程得不到调度, 影响整个程序的并发量。

参考:

https://segmentfault.com/a/1190000007403873

https://blog.csdn.net/cold___play/article/details/106764576

https://segmentfault.com/a/1190000007403937

免责声明:文章转载自《Netty之EventLoop-netty学习笔记(11)-20210813》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇linux命令行下修改系统时间、时区Python 函数传递list,传递dict 以及*args和**kargs下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Go:创建新进程(os.StartProcess源码解读)

关于如何使用go语言实现新进程的创建和进程间通信,我在网上找了不少的资料,但是始终未能发现让自己满意的答案,因此我打算自己来分析这部分源代码,然后善加利用,并且分享给大家,期望大家能从中获得启发。 首先我们来看一段代码 proc, _ := os.StartProcess(name, args, attr) if err != nil { fmt.Prin...

Mysql 合并结果接横向拼接字段

近日在做一个报表功能里面有一个这样的需求是统计各部门在某一月入职和离职的人数 我的步骤是这样先查出入职的人数关键sql如下: SELECT dept ,COUNT(1) rcNumber FROM 员工表 WHERE ( 入职时间 != '' OR 入职时间 IS NOT NULL) and DATE_FORMAT(入职时间...

.net 分布式架构之分布式锁实现

分布式锁 经常用于在解决分布式环境下的业务一致性和协调分布式环境。 实际业务场景中,比如说解决并发一瞬间的重复下单,重复确认收货,重复发现金券等。 使用分布式锁的场景一般不能太多。 开源地址:http://git.oschina.net/chejiangyi/XXF.BaseService.DistributedLock 开源相关群: .net 开源基础服...

10条技巧优化数据库速度

  大多数网站的内容都存在数据库里,用户通过请求来访问内容。数据库非常的快,有许多技巧能让你优化数据库的速度,使你不浪费服务器的资源。在这篇文章中,我收录了十个优化数据库速度的技巧。   1、小心设计数据库   第一个技巧也许看来理所当然,但事实上大部分数据库的问题都来自于设计不好的数据库结构。   譬如我曾经遇见过将客户端信息和支付信息储存在同一个数据库...

nginx源码分析:module机制

 根据nginx官方文档,添加一个module的介绍,当我们需要添加一个module时,需要以下操作: 1、为该module新建一个目录。 2、添加一个config文件,一个module核心代码源文件。 3、为configure添加参数--add-module=/path/to/module,然后重新编译。 这个操作步骤我们都很熟悉,那么为什么这样操作后就...

Java多线程并发系列之闭锁(Latch)和栅栏(CyclicBarrier)

   JAVA并发包中有三个类用于同步一批线程的行为,分别是闭锁(Latch),信号灯(Semaphore)和栅栏(CyclicBarrier)。本贴主要说明闭锁(Latch)和栅栏(CyclicBarrier)。 1. 闭锁(Latch) 闭锁(Latch)  —— 确保多个线程在完成各自事务后,才会打开继续执行后面的内容,否则一直等待。 计数器闭锁(...