Netty Nio启动全流程

摘要:
NettyNio启动全流程1.各组件之间的关系说明:EventLoopGroup类似线程池,EventLoop为单线程,每个EventLoop关联一个NioSelector,用于注册Channel,形成一个EventLoop被多个channel公用。在EventLoop会执行通道Io选择操作,以及非Io任务。在Channel初始化后会创建pipeline,是handler的链表结构。=null){//RegistrationontheEventLoopfailedsofailtheChannelPromisedirectlytonotcausean//IllegalStateExceptiononcewetrytoaccesstheEventLoopoftheChannel.promise.setFailure;}else{//Registrationwassuccessful,sosetthecorrectexecutortouse.//Seehttps://github.com/netty/netty/issues/2586promise.registered();doResolveAndConnect0;}}});returnpromise;}}一言以蔽之,首先做初始化channel和channel注册操作,然后服务器启动做绑定操作,客户端启动做连接操作。最大化重用代码。

Netty Nio启动全流程

1. 各组件之间的关系

netty
说明:EventLoopGroup类似线程池,EventLoop为单线程,每个EventLoop关联一个Nio Selector,用于注册Channel,形成一个EventLoop被多个channel公用。在EventLoop会执行通道Io选择操作,以及非Io任务。在Channel初始化后会创建pipeline,是handler的链表结构。

2. 服务端vs客户端启动

// 服务端启动
private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}
//客户端启动
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();

    if (regFuture.isDone()) {
        if (!regFuture.isSuccess()) {
            return regFuture;
        }
        return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                // Directly obtain the cause and do a null check so we only need one volatile read in case of a
                // failure.
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();
                    doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

一言以蔽之,首先做初始化channel和channel注册操作,然后服务器启动做绑定操作,客户端启动做连接操作。而初始化channel和channel注册都是通过initAndRegister()实现。最大化重用代码。

3. 初始化创建通道以及通道注册

3.1 模板方法的创建通道->初始化通道->通道注册

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
      // 创建通道
        channel = channelFactory.newChannel();
     // 初始化通道
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    // 通道注册
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

3.2 创建通道

创建通道

  1. 构造channel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

NioChannel将java SelectableChannel包装了一把,并添加了pipeline和unsafe操作,默认的pipeline是一个双向链表结构,只包含head和tail两个节点。
2. 初始化channel
对于客户端而言,直接向pipeline中添加builder方法的handler,以及一些nio操作的通用属性,对于服务端创建而言,除了一些基本nio属性外,只添加了一个初始化的handler

// 客户端创建
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
//服务端创建
p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(final Channel ch) throws Exception {
        final ChannelPipeline pipeline = ch.pipeline();
        ChannelHandler handler = config.handler();
        if (handler != null) {
            pipeline.addLast(handler);
        }

        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }
});

注:ChannelInitializer的initChannnel会在注册成功之后调用,以此实现动态扩展。
客户端创建时候pipeline中没有ChannelInitializer,需要自己添加。
3. 通道注册
主要将channel绑定到EventLoop上面,然后在eventLoop单线程中执行注册操作

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    // 此时在主线程中,不知eventLoop线程池中
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

register0主要干三件事,注册->调用ChannelInitializer的initChannnel完成添加handler->注册channel关心的操作
3.1 java channel注册,0表示只注册,不执行任何操作

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

3.2 pipeline.fireChannelRegistered()
此时,pipeline中包含三个handler,其中一个是ChannelInitializer。

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    if (initChannel(ctx)) {
        ctx.pipeline().fireChannelRegistered();
    } else {
        ctx.fireChannelRegistered();
    }
}

3.2 beginRead();

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

注意,此时才会真实注册关系的事件,对服务端而言为Accept,对客户端创建,就是connect

public NioServerSocketChannel(ServerSocketChannel channel) {
     super(null, channel, SelectionKey.OP_ACCEPT);
     config = new NioServerSocketChannelConfig(this, javaChannel().socket());
 }
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
     super(parent, ch, SelectionKey.OP_READ);
 }

至此,客户端与服务端完成了初始化channel以及注册channel操作。

4. 服务端绑定到指定端口

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

在eventLoop中执行绑定端口操作

@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
        throws Exception {
    unsafe.bind(localAddress, promise);
}

最后都是会调用unsafe的bind方法完成端口绑定操作。

5. 客户端连接远程服务端

private static void doConnect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
    final Channel channel = connectPromise.channel();
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (localAddress == null) {
                channel.connect(remoteAddress, connectPromise);
            } else {
                channel.connect(remoteAddress, localAddress, connectPromise);
            }
            connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }
    });
}

连接服务端最终也是在eventLoop中执行,最终调用unsafe的connect方法。

protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        doBind0(localAddress);
    }

    boolean success = false;
    try {
        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
        if (!connected) {
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}

connect有三种结果,成功,直接返回true,失败则暂时不知道结果,检测OP_CONNECT,异常直接关闭链路。
值得说明的是jdk默认不支持连接超时,netty添加了超时机制:在EventLoop中添加超时任务,触发超时时间后会关闭连接,连接成功会删除该超时任务。

// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
    connectTimeoutFuture = eventLoop().schedule(new Runnable() {
        @Override
        public void run() {
            ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
            ConnectTimeoutException cause =
                    new ConnectTimeoutException("connection timed out: " + remoteAddress);
            if (connectPromise != null && connectPromise.tryFailure(cause)) {
                close(voidPromise());
            }
        }
    }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}

promise.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isCancelled()) {
            if (connectTimeoutFuture != null) {
                connectTimeoutFuture.cancel(false);
            }
            connectPromise = null;
            close(voidPromise());
        }
    }
});

6.EventLooop 处理IO事件

if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
    // See https://github.com/netty/netty/issues/924
    int ops = k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    k.interestOps(ops);

    unsafe.finishConnect();
}

// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
    ch.unsafe().forceFlush();
}

// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

免责声明:文章转载自《Netty Nio启动全流程》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇EFLAGS标志寄存器加深理解关于Promise,你必须知道的几点。下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

netty作为基础通信组件

阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。其中,服务提供者和服务消费者之间,服务提供者、服务消费者和性能统计节点之间使用 Netty 进行异步/同步通信。     除了 Dubbo 之外,淘宝的消息中间件 RocketMQ...

Netty入门1之----认识Netty

Netty 什么是Netty? Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。 Netty 是一个广泛使用的 Java 网络编程框架(Netty 在 2011 年获得了Duke's Choice Award,见https://www.java.net/dukescho...

Netty之Bootstrapping-netty学习笔记(10)-20210810

Bootstrap类 引导类的层次结构包括一个抽象的父类和两个具体的引导子类: 相对于将具体的引导类分别看作用于服务器和客户端的引导来说,记住它们的本意是用来支撑不同的应用程序的功能的将有所裨益。也就是说,服务器致力于使用一个父 Channel 来接受来自客户端的连接,并创建子 Channel 以用于它们之间的通信;而客户端将最可能只需要一个 单独的、没...

MongoDB查询报错:class com.mongodb.MongoSecurityException: Exception authenticating MongoCredential

异常日志: 2019-05-30 10:10:24,252 [http-nio-8080-exec-1] DEBUG [java.sql.Connection] -ooo Connection Opened 2019-05-30 10:10:24,258 [http-nio-8080-exec-1] DEBUG [java.sql.PreparedSta...

Java NIO 学习笔记(七)----NIO/IO 的对比和总结

目录:Java NIO 学习笔记(一)----概述,Channel/BufferJava NIO 学习笔记(二)----聚集和分散,通道到通道Java NIO 学习笔记(三)----SelectorJava NIO 学习笔记(四)----文件通道和网络通道Java NIO 学习笔记(五)----路径、文件和管道 Path/Files/PipeJava NI...

Java NIO 学习笔记

为了防止无良网站的爬虫抓取文章,特此标识,转载请注明文章出处。LaplaceDemon/SJQ。 http://www.cnblogs.com/shijiaqi1066/p/3344148.html 0 概述 0.1 Socket的问题 传统socket由于需要等待资源,所以会出现阻塞现象。服务器端一般只能使用一个客户端socket对应一个处理线程。 但是...