Java高并发网络编程(四)Netty

摘要:
由于网络编程本身的复杂性,以及JDKAPI开发的使用难度较高,所以在开源社区中,涌现出来了很多对JDKNIO进行封装、增强的网络编程框架,比如Netty、Mina等。Netty重要的四个内容:Reactor线程模型:一种高性能的多线程程序设计思路Netty中自己定义的Channel概念:增强版的通道概念ChannelPipeline职责链设计模式:事件处理机制内存管理:增强的ByteBuf缓冲区整体结构图二、Netty线程模型为了让NIO处理更好的利用多线程特性,Netty实现了Reactor线程模型。

在网络应用开发的过程中,直接使用JDK提供的NIO的API,比较繁琐,而且想要进行性能提升,还需要结合多线程技术。

由于网络编程本身的复杂性,以及JDK API开发的使用难度较高,所以在开源社区中,涌现出来了很多对JDK NIO进行封装、增强的网络编程框架,比如Netty、Mina等。

一、Netty简介

https://netty.io/官网

Netty是一个高性能、高可扩展性的异步事件驱动的网络应用程序框架,它极大简化了TCP和UDP客户端和服务器开发等网络编程。

Netty重要的四个内容:

  • Reactor线程模型:一种高性能的多线程程序设计思路
  • Netty中自己定义的Channel概念:增强版的通道概念
  • ChannelPipeline职责链设计模式:事件处理机制
  • 内存管理:增强的ByteBuf缓冲区

整体结构图

Java高并发网络编程(四)Netty第1张

二、Netty线程模型

为了让NIO处理更好的利用多线程特性,Netty实现了Reactor线程模型。

Reactor模型中有四个核心概念:

  • Resources资源(请求/任务)
  • Synchronous Event Demultiplexer同步事件复用器
  • Dispatcher分配器
  • Request Handler请求处理器

Java高并发网络编程(四)Netty第2张

Netty启动时会构建多个Reactor

EventLoopGroup初始化过程

Java高并发网络编程(四)Netty第3张

Java高并发网络编程(四)Netty第4张

两组EventLoopGroup(Main&Sub)处理不同通道不同的事件

public final classEchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throwsException {
     //Configure the server.
// 创建EventLoopGroup accept线程组 NioEventLoop
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 创建EventLoopGroup I/O线程组 EventLoopGroup workerGroup
= newNioEventLoopGroup(); final EchoServerHandler serverHandler = newEchoServerHandler(); try{
// 服务端启动引导工具类 ServerBootstrap b
= newServerBootstrap();
// 配置服务端处理的reactor线程组以及服务端的其他配置 b.group(bossGroup, workerGroup2) .channel(NioServerSocketChannel.
class) .option(ChannelOption.SO_BACKLOG, 100) .handler(newLoggingHandler(LogLevel.DEBUG)) .childHandler((ChannelInitializer)(ch)-> { ChannelPipeline p = ch.pipeline(); p.addLast(new EchoServerHandler); }); //通过bind启动服务器 ChannelFuture f =b.bind(PORT).sync(); //阻塞主线程,直到网络服务被关闭 f.channel().closeFuture().sync(); } finally{ //关闭线程组 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

追踪NioEventLoopGroup源码,会发现是创造很多NioEventLoop

public class NioEventLoopGroup extendsMultithreadEventLoopGroup {
    ...
    public NioEventLoopGroup(intnThreads) {
            this(nThreads, (Executor) null);
     }
    ...

追踪到父类

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implementsEventLoopGroup {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);

    private static final intDEFAULT_EVENT_LOOP_THREADS;

    static{
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if(logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

    /*** @seeMultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
     */
    protected MultithreadEventLoopGroup(intnThreads, Executor executor, Object... args) {
        super(nThreads == 0 ?DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); // 默认是cpu核数*2
    }
...

追踪父类

public abstract class MultithreadEventExecutorGroup extendsAbstractEventExecutorGroup {
// 代码省略
    
    //多线程的事件执行器
    protected MultithreadEventExecutorGroup(intnThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {//Tony: 如果执行器为空,则创建一个
// EventLoop都是通过executor创建线程并执行它的 executor = newThreadPerTaskExecutor(newDefaultThreadFactory()); // 线程创建器,源码见下面 } // EventLoop是EventExecutor接口的具体实现 children = newEventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try{
// 有多个实现方法,见下面 返回NioEventLoop children[i]
=newChild(executor, args); success = true; } catch(Exception e) { //TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally{ if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e =children[j]; try{ while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch(InterruptedException interrupted) { //Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } chooser =chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throwsException { if (terminatedChildren.incrementAndGet() ==children.length) { terminationFuture.setSuccess(null); } } }; for(EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren =Collections.unmodifiableSet(childrenSet); }

ThreadPerTaskExecutor创建线程

public final class ThreadPerTaskExecutor implementsExecutor {
    private finalThreadFactory threadFactory;

    publicThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory =threadFactory;
    }

    @Override
    public voidexecute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

Java高并发网络编程(四)Netty第5张

NioEventLoopGroup

public class NioEventLoopGroup extendsMultithreadEventLoopGroup {
  //省略代码
@Override
    protected EventLoop newChild(Executor executor, Object... args) throwsException {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
}

返回NioEventLoop,也传入了executor,用来帮助创建线程执行任务

看NioEventLoop的具体实现

public final class NioEventLoop extendsSingleThreadEventLoop {
    //代码省略
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider =selectorProvider;
        final SelectorTuple selectorTuple =openSelector();
        selector =selectorTuple.selector;
        unwrappedSelector =selectorTuple.unwrappedSelector;
        selectStrategy =strategy;
    }
// 代码省略

selector是NIO的selector

NioEventLoop将通道注册到EventLoop的selector上,进行事件轮询

不断追踪NioEventLoop

最顶层是

public interfaceExecutor {

    /*** Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@codeExecutor} implementation.
     *
     * @paramcommand the runnable task
     * @throwsRejectedExecutionException if this task cannot be
     * accepted for execution
     * @throwsNullPointerException if command is null
     */
    voidexecute(Runnable command);
}

execute由SingleThreadEventExecutor实现

提交任务

 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implementsOrderedEventExecutor {
    //省略代码  
@Override
    public voidexecute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        //判断execute方法的调用者是不是EventLoop同一个线程
        boolean inEventLoop =inEventLoop();
        addTask(task);//增加到任务队列
        if (!inEventLoop) {//不是同一个线程,则调用启动方法
startThread();
            if(isShutdown()) {
                boolean reject = false;
                try{
                    if(removeTask(task)) {
                        reject = true;
                    }
                } catch(UnsupportedOperationException e) {
                    //The task queue does not support removal so the best thing we can do is to just move on and
                    //hope we will be able to pick-up the task before its completely terminated.
                    //In worst case we will log on termination.
}
                if(reject) {
                    reject();
                }
            }
        }

        if (!addTaskWakesUp &&wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

startThread

    private voidstartThread() {
        if (state ==ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                try{
                    doStartThread();//Tony: 未启动,则触发启动
                } catch(Throwable cause) {
                    STATE_UPDATER.set(this, ST_NOT_STARTED);
                    PlatformDependent.throwException(cause);
                }
            }
        }
    }

    private voiddoStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {//这里的executor是初始化EventLoop的时候传进来的
@Override
            public voidrun() {
                thread =Thread.currentThread();
                if(interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {//创建线程开始执行run方法,所以,每个EventLoop都是执行run
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch(Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally{
                    for(;;) {
                        int oldState =state;
                        if (oldState >= ST_SHUTTING_DOWN ||STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    //Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        if(logger.isErrorEnabled()) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                    "be called before run() implementation terminates.");
                        }
                    }

                    try{
                        //Run all remaining tasks and shutdown hooks.
                        for(;;) {
                            if(confirmShutdown()) {
                                break;
                            }
                        }
                    } finally{
                        try{
                            cleanup();
                        } finally{
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                if(logger.isWarnEnabled()) {
                                    logger.warn("An event executor terminated with " +
                                            "non-empty task queue (" + taskQueue.size() + ')');
                                }
                            }

                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }

Java高并发网络编程(四)Netty第6张

@Override
    protected void run() {//有任务提交后,被触发执行
        for (;;) {//执行两件事selector,select的事件 和 taskQueue里面的内容
            try{
                try{
                    switch(selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    caseSelectStrategy.CONTINUE:
                        continue;

                    caseSelectStrategy.BUSY_WAIT:
                        //fall-through to SELECT since the busy-wait is not supported with NIO

                    caseSelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));

                        //'wakenUp.compareAndSet(false, true)' is always evaluated
                        //before calling 'selector.wakeup()' to reduce the wake-up
                        //overhead. (Selector.wakeup() is an expensive operation.)
                        //
                        //However, there is a race condition in this approach.
                        //The race condition is triggered when 'wakenUp' is set to
                        //true too early.
                        //
                        //'wakenUp' is set to true too early if:
                        //1) Selector is waken up between 'wakenUp.set(false)' and
                        //'selector.select(...)'. (BAD)
                        //2) Selector is waken up between 'selector.select(...)' and
                        //'if (wakenUp.get()) { ... }'. (OK)
                        //
                        //In the first case, 'wakenUp' is set to true and the
                        //following 'selector.select(...)' will wake up immediately.
                        //Until 'wakenUp' is set to false again in the next round,
                        //'wakenUp.compareAndSet(false, true)' will fail, and therefore
                        //any attempt to wake up the Selector will fail, too, causing
                        //the following 'selector.select(...)' call to block
                        //unnecessarily.
                        //
                        //To fix this problem, we wake up the selector again if wakenUp
                        //is true immediately after selector.select(...).
                        //It is inefficient in that it wakes up the selector for both
                        //the first case (BAD - wake-up required) and the second case
                        //(OK - no wake-up required).

                        if(wakenUp.get()) {
                            selector.wakeup();
                        }
                        //fall through
                    default:
                    }
                } catch(IOException e) {
                    //If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    //the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
                    handleLoopException(e);
                    continue;
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {//处理事件
processSelectedKeys();
                    } finally{
                        //Ensure we always run tasks.
runAllTasks();
                    }
                } else{
                    final long ioStartTime =System.nanoTime();
                    try{
                        processSelectedKeys();
                    } finally{
                        //Ensure we always run tasks.
                        final long ioTime = System.nanoTime() -ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) /ioRatio);
                    }
                }
            } catch(Throwable t) {
                handleLoopException(t);
            }
            //Always handle shutdown even if the loop processing threw an exception.
            try{
                if(isShuttingDown()) {
                    closeAll();
                    if(confirmShutdown()) {
                        return;
                    }
                }
            } catch(Throwable t) {
                handleLoopException(t);
            }
        }
    }

    private static voidhandleLoopException(Throwable t) {
        logger.warn("Unexpected exception in the selector loop.", t);

        //Prevent possible consecutive immediate failures that lead to
        //excessive CPU consumption.
        try{
            Thread.sleep(1000);
        } catch(InterruptedException e) {
            //Ignore.
}
    }

    private voidprocessSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {//处理事件
processSelectedKeysPlain(selector.selectedKeys());
        }
    }
private void processSelectedKeysPlain(Set<SelectionKey>selectedKeys) {
        //check if the set is empty and if so just return to not create garbage by
        //creating a new Iterator every time even if there is nothing to process.
        //See https://github.com/netty/netty/issues/597
        if(selectedKeys.isEmpty()) {
            return;
        }
        //获取selector所有选中的事件(ServerSocketChannel主要是OP_ACCEPT,SocketChannle主要是OP_READ)
        Iterator<SelectionKey> i =selectedKeys.iterator();
        for(;;) {
            final SelectionKey k =i.next();
            final Object a =k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) {//处理niochannel事件
processSelectedKey(k, (AbstractNioChannel) a);
            } else{
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            if(needsToSelectAgain) {
                selectAgain();
                selectedKeys =selector.selectedKeys();

                //Create the iterator again to avoid ConcurrentModificationException
                if(selectedKeys.isEmpty()) {
                    break;
                } else{
                    i =selectedKeys.iterator();
                }
            }
        }
    }

EventLoop自身实现了Executor接口,当调用executor方法提交任务时,则判断是否启动,未启动则调用内置的executor创建新线程来触发run方法执行

Java高并发网络编程(四)Netty第7张

channel注册到selector上

请求

服务端启动的过程,服务端的启动就是Bind绑定端口的过程

Java高并发网络编程(四)Netty第8张

回到EchoServer

追踪bind源码

 //Start the server.
ChannelFuture f = b.bind(PORT).sync();

bind绑定端口并创建通道

 public ChannelFuture bind(intinetPort) {
        return bind(new InetSocketAddress(inetPort)); //绑定端口的入口代码
}

    /*** Create a new {@linkChannel} and bind it.
     */
    public ChannelFuture bind(String inetHost, intinetPort) {
        returnbind(SocketUtils.socketAddress(inetHost, inetPort));
    }

    /*** Create a new {@linkChannel} and bind it.
     */
    public ChannelFuture bind(InetAddress inetHost, intinetPort) {
        return bind(newInetSocketAddress(inetHost, inetPort));
    }

    /*** Create a new {@linkChannel} and bind it.
     */
    publicChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);//真正干事的代码
}

    private ChannelFuture doBind(finalSocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();//创建/初始化ServerSocketChannel对象,并注册到Selector
        final Channel channel =regFuture.channel();
        if (regFuture.cause() != null) {
            returnregFuture;
        }
        //等注册完成之后,再绑定端口。 防止端口开放了,却不能处理请求
        if(regFuture.isDone()) {
            //At this point we know that the registration was complete and successful.
            ChannelPromise promise =channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);//实际操作绑定端口的代码
            returnpromise;
        } else{
            //Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = newPendingRegistrationPromise(channel);
            regFuture.addListener(newChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throwsException {
                    Throwable cause =future.cause();
                    if (cause != null) {
                        //Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        //IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
                    } else{
                        //Registration was successful, so set the correct executor to use.
                        //See https://github.com/netty/netty/issues/2586
promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            returnpromise;
        }
    }

    finalChannelFutureinitAndRegister() {
        Channel channel = null;
        try{
            channel =channelFactory.newChannel(); // 通道
            init(channel); // 初始化通道
        } catch(Throwable t) {
            if (channel != null) {
                //channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
                //as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return newDefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            //as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(newFailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        //(一开始初始化的group)MultithreadEventLoopGroup里面选择一个eventLoop进行绑定
        ChannelFuture regFuture =config().group().register(channel); // register见下面
        if (regFuture.cause() != null) {
            if(channel.isRegistered()) {
                channel.close();
            } else{
                channel.unsafe().closeForcibly();
            }
        }

        //If we are here and the promise is not failed, it's one of the following cases:
        //1) If we attempted registration from the event loop, the registration has been completed at this point.
        //i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        //2) If we attempted registration from the other thread, the registration request has been successfully
        //added to the event loop's task queue for later execution.
        //i.e. It's safe to attempt bind() or connect() now:
        //because bind() or connect() will be executed *after* the scheduled registration task is executed
        //because register(), bind(), and connect() are all bound to the same thread.

        returnregFuture;
    }

    abstract void init(Channel channel) throwsException;

    private static voiddoBind0(
            final ChannelFuture regFuture, finalChannel channel,
            final SocketAddress localAddress, finalChannelPromise promise) {

        //This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        //the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(newRunnable() {
            @Override
            public void run() {//这里向EventLoop提交任务,一旦有任务提交则会触发EventLoop的轮询
                if (regFuture.isSuccess()) {//本质又绕回到channel的bind方法上面。
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else{
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
@Override
publicChannelFuture register(Channel channel){
    return next().register(channel); //根据选择器,选择一个合适的NioEventLoop进行注册(SingleEventLoop)
}

追踪register代码...

netty中的Channel是一个抽象的概念,可以理解为对JDK NIO Channel的增强和拓展。增加了很多属性和方法,下面罗列几个常见的属性和方法:

Java高并发网络编程(四)Netty第9张

三、责任链设计模式

责任链(Chain of Responsibility Pattern)为请求创建了一个处理对象的链

发起请求具体处理请求的过程进行解耦:职责链上的处理者负责处理请求,客户只需要将请求发送到职责链上即可,无须关心请求的处理细节和请求的传递。

Java高并发网络编程(四)Netty第10张

handler是具体处理请求的程序

实现责任链模式4个要素:处理器抽象类、具体的处理器实现类、保存处理器信息、处理执行

Java高并发网络编程(四)Netty第11张Java高并发网络编程(四)Netty第12张

责任链代码示例

//-----链表形式调用------netty就是类似的这种形式
public classPipelineDemo {
    /*** 初始化的时候造一个head,作为责任链的开始,但是并没有具体的处理
     */
    public HandlerChainContext head = new HandlerChainContext(newAbstractHandler() {
        @Override
        voiddoHandler(HandlerChainContext handlerChainContext, Object arg0) {
            handlerChainContext.runNext(arg0);
        }
    });

    public voidrequestProcess(Object arg0) {
        this.head.handler(arg0);
    }

    public voidaddLast(AbstractHandler handler) {
        HandlerChainContext context =head;
        while (context.next != null) {
            context =context.next;
        }
        context.next = newHandlerChainContext(handler);
    }


    public static voidmain(String[] args) {
        PipelineDemo pipelineChainDemo = newPipelineDemo();
        pipelineChainDemo.addLast(newHandler2());
        pipelineChainDemo.addLast(newHandler1());
        pipelineChainDemo.addLast(newHandler1());
        pipelineChainDemo.addLast(newHandler2());

        //发起请求
        pipelineChainDemo.requestProcess("火车呜呜呜~~");

    }
}

/*** handler上下文,我主要负责维护链,和链的执行
 */
classHandlerChainContext {
    HandlerChainContext next; //下一个节点
AbstractHandler handler;

    publicHandlerChainContext(AbstractHandler handler) {
        this.handler =handler;
    }

    voidhandler(Object arg0) {
        this.handler.doHandler(this, arg0);
    }

    /*** 继续执行下一个
     */
    voidrunNext(Object arg0) {
        if (this.next != null) {
            this.next.handler(arg0);
        }
    }
}

//处理器抽象类
abstract classAbstractHandler {
    /*** 处理器,这个处理器就做一件事情,在传入的字符串中增加一个尾巴..
     */
    abstract void doHandler(HandlerChainContext handlerChainContext, Object arg0); //handler方法
}

//处理器具体实现类
class Handler1 extendsAbstractHandler {
    @Override
    voiddoHandler(HandlerChainContext handlerChainContext, Object arg0) {
        arg0 = arg0.toString() + "..handler1的小尾巴.....";
        System.out.println("我是Handler1的实例,我在处理:" +arg0);
        //继续执行下一个
handlerChainContext.runNext(arg0);
    }
}

//处理器具体实现类
class Handler2 extendsAbstractHandler {
    @Override
    voiddoHandler(HandlerChainContext handlerChainContext, Object arg0) {
        arg0 = arg0.toString() + "..handler2的小尾巴.....";
        System.out.println("我是Handler2的实例,我在处理:" +arg0);
        //继续执行下一个
handlerChainContext.runNext(arg0);
    }
}

Netty中的ChannelPipeline责任链

Java高并发网络编程(四)Netty第13张

Java高并发网络编程(四)Netty第14张

Nettty中定义了很多事件

Java高并发网络编程(四)Netty第15张

Pipeline中的handler是什么?

Java高并发网络编程(四)Netty第16张

ChannelPipeline是线程安全的,ChannelHandler可以在任何时候添加或删除。

例如,可以在即将交换敏感信息时插入加密处理程序,并在交换后删除。

一般操作,初始化的时候增加进去,较少删除。下面是Pipeline中管理handler的API:

Java高并发网络编程(四)Netty第17张

源码查看

handler执行分析

Java高并发网络编程(四)Netty第18张

分析registered入站事件的处理

Java高并发网络编程(四)Netty第19张

源码查看 从bind()进入

Java高并发网络编程(四)Netty第20张

bind出站事件分析

Java高并发网络编程(四)Netty第21张

源码查看

分析accept入站事件的处理

Java高并发网络编程(四)Netty第22张

源码查看

read入站事件的处理

Java高并发网络编程(四)Netty第23张

源码查看

Java高并发网络编程(四)Netty第24张

四、零拷贝机制

JDK ByteBuffer存在一些缺点

  • 无法动态扩容。长度是固定的,不能动态扩展和收缩,当数据大于ByteBuffer容量时,会发生索引越界异常。
  • API使用复杂。读写的时候需要手工调用flip()和rewind()等方法,使用时需要非常谨慎的使用这些api,否则容易出错。

1.Netty自己的ByteBuf

ByteBuf是为解决ByteBuffer的问题和满足网络应用程序开发人员的日常需求而设计的。

ByteBuf的增强

  • API操作便捷性
  • 动态扩容
  • 多种ByteBuf实现
  • 高效的零拷贝机制

2.ByteBuf操作

ByteBuf三个重要属性:capacity容量、readerIndex读取位置、writeIndex写入位置

提供了两个指针变量来支持顺序读和写操作,分别时readerIndex和写操作writeIndex

常用方法定义:

Java高并发网络编程(四)Netty第25张

下图显示了一个缓冲区是如何被两个指针分割成三个区域的:

Java高并发网络编程(四)Netty第26张

示例代码

/*** bytebuf的常规API操作示例
 */
public classByteBufDemo {
    @Test
    public voidapiTest() {
        //+-------------------+------------------+------------------+
        //| discardable bytes |  readable bytes  |  writable bytes  |
        //|                   |     (CONTENT)    |                  |
        //+-------------------+------------------+------------------+
        //|                   |                  |                  |
        //0      <=       readerIndex   <=   writerIndex    <=    capacity

        //1.创建一个非池化的ByteBuf,大小为10个字节
        ByteBuf buf = Unpooled.buffer(10);
        System.out.println("原始ByteBuf为====================>" +buf.toString());
        System.out.println("1.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
");

        //2.写入一段内容
        byte[] bytes = {1, 2, 3, 4, 5};
        buf.writeBytes(bytes);
        System.out.println("写入的bytes为====================>" +Arrays.toString(bytes));
        System.out.println("写入一段内容后ByteBuf为===========>" +buf.toString());
        System.out.println("2.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
");

        //3.读取一段内容
        byte b1 =buf.readByte();
        byte b2 =buf.readByte();
        System.out.println("读取的bytes为====================>" + Arrays.toString(new byte[]{b1, b2}));
        System.out.println("读取一段内容后ByteBuf为===========>" +buf.toString());
        System.out.println("3.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
");

        //4.将读取的内容丢弃
buf.discardReadBytes();
        System.out.println("将读取的内容丢弃后ByteBuf为========>" +buf.toString());
        System.out.println("4.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
");

        //5.清空读写指针
buf.clear();
        System.out.println("将读写指针清空后ByteBuf为==========>" +buf.toString());
        System.out.println("5.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
");

        //6.再次写入一段内容,比第一段内容少
        byte[] bytes2 = {1, 2, 3};
        buf.writeBytes(bytes2);
        System.out.println("写入的bytes为====================>" +Arrays.toString(bytes2));
        System.out.println("写入一段内容后ByteBuf为===========>" +buf.toString());
        System.out.println("6.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
");

        //7.将ByteBuf清零
        buf.setZero(0, buf.capacity());
        System.out.println("将内容清零后ByteBuf为==============>" +buf.toString());
        System.out.println("7.ByteBuf中的内容为================>" + Arrays.toString(buf.array()) + "
");

        //8.再次写入一段超过容量的内容
        byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
        buf.writeBytes(bytes3);
        System.out.println("写入的bytes为====================>" +Arrays.toString(bytes3));
        System.out.println("写入一段内容后ByteBuf为===========>" +buf.toString());
        System.out.println("8.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
");
        //随机访问索引 getByte
        //顺序读 read*
        //顺序写 write*
        //清除已读内容 discardReadBytes
        //清除缓冲区 clear
        //搜索操作
        //标记和重置
        //完整代码示例:参考
        //搜索操作 读取指定位置 buf.getByte(1);
        //
}

}

Unpooled推荐的方式创建buf

可以动态扩容

运行结果

原始ByteBuf为====================>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
1.ByteBuf中的内容为===============>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

写入的bytes为====================>[1, 2, 3, 4, 5]
写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 5, cap: 10)
2.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]

读取的bytes为====================>[1, 2]
读取一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 2, widx: 5, cap: 10)
3.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]

将读取的内容丢弃后ByteBuf为========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
4.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]

将读写指针清空后ByteBuf为==========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
5.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]

写入的bytes为====================>[1, 2, 3]
写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
6.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]

将内容清零后ByteBuf为==============>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
7.ByteBuf中的内容为================>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

写入的bytes为====================>[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 14, cap: 64)
8.ByteBuf中的内容为===============>[0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

动态扩容

io.netty.buffer.AbstractByteBuf下:

  final void ensureWritable0(intminWritableBytes) {
        ensureAccessible();
        if (minWritableBytes <=writableBytes()) {
            return;
        }
        final int writerIndex =writerIndex();
        if(checkBounds) {
            if (minWritableBytes > maxCapacity -writerIndex) {
                throw newIndexOutOfBoundsException(String.format(
                        "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                        writerIndex, minWritableBytes, maxCapacity, this));
            }
        }

        //Normalize the current capacity to the power of 2.
        int minNewCapacity = writerIndex +minWritableBytes;
        int newCapacity =alloc().calculateNewCapacity(minNewCapacity, maxCapacity);

        int fastCapacity = writerIndex +maxFastWritableBytes();
        //Grow by a smaller amount if it will avoid reallocation
        if (newCapacity > fastCapacity && minNewCapacity <=fastCapacity) {
            newCapacity =fastCapacity;
        }

        //Adjust to the new capacity.
capacity(newCapacity);
    }

calculateNewCapacity

@Override
    public int calculateNewCapacity(int minNewCapacity, intmaxCapacity) { // minNewCapacity:14 maxCapacity:2147483647
        checkPositiveOrZero(minNewCapacity, "minNewCapacity");
        if (minNewCapacity >maxCapacity) { // minCapacity:14
            throw newIllegalArgumentException(String.format(
                    "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                    minNewCapacity, maxCapacity));
        } // 阈值4M,这个阈值的用意:容量要求4M以内,每次扩容以2的倍数进行计算,超过4M容量,另外的计算方式。
        final int threshold = CALCULATE_THRESHOLD; //4 MiB page

        if (minNewCapacity ==threshold) { // 新容量的最小要求,如果等于阈值,则立刻返回
            returnthreshold;
        }

        //If over threshold, do not double but just increase by threshold.
        if (minNewCapacity >threshold) {
            int newCapacity = minNewCapacity / threshold *threshold;
            if (newCapacity > maxCapacity -threshold) {
                newCapacity =maxCapacity;
            } else{
                newCapacity +=threshold;
            }
            returnnewCapacity;
        }
        // 如果容量要求没超过阈值,则从64字节开始,不断增加一倍,直至满足新容量最小要求
        //Not over threshold. Double up to 4 MiB, starting from 64.
        int newCapacity = 64;
        while (newCapacity <minNewCapacity) {
            newCapacity <<= 1;
        }

        returnMath.min(newCapacity, maxCapacity);
    }

Java高并发网络编程(四)Netty第27张

Java高并发网络编程(四)Netty第28张

选择合适的ByteBuf实现:

了解核心的3个维度的划分方式,8种具体实现

Java高并发网络编程(四)Netty第29张

在使用中,都是使用ByteBufAllocator分配器进行申请,同时分配器具有内存管理的功能

堆外内存示例

/*** 堆外内存的常规API操作示例
 */
public classDirectByteBufDemo {
    @Test
    public voidapiTest() {
        //+-------------------+------------------+------------------+
        //| discardable bytes |  readable bytes  |  writable bytes  |
        //|                   |     (CONTENT)    |                  |
        //+-------------------+------------------+------------------+
        //|                   |                  |                  |
        //0      <=       readerIndex   <=   writerIndex    <=    capacity

        //1.创建一个非池化的ByteBuf,大小为10个字节
        ByteBuf buf = Unpooled.directBuffer(10);
        System.out.println("原始ByteBuf为====================>" +buf.toString());
        //System.out.println("1.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
"); 堆外内存不能用buf.array()

        //2.写入一段内容
        byte[] bytes = {1, 2, 3, 4, 5};
        buf.writeBytes(bytes);
        System.out.println("写入的bytes为====================>" +Arrays.toString(bytes));
        System.out.println("写入一段内容后ByteBuf为===========>" +buf.toString());
        //System.out.println("2.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
");

        //3.读取一段内容
        byte b1 =buf.readByte();
        byte b2 =buf.readByte();
        System.out.println("读取的bytes为====================>" + Arrays.toString(new byte[]{b1, b2}));
        System.out.println("读取一段内容后ByteBuf为===========>" +buf.toString());
       //System.out.println("3.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
");

        //4.将读取的内容丢弃
buf.discardReadBytes();
        System.out.println("将读取的内容丢弃后ByteBuf为========>" +buf.toString());
        //System.out.println("4.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
");

        //5.清空读写指针
buf.clear();
        System.out.println("将读写指针清空后ByteBuf为==========>" +buf.toString());
        //System.out.println("5.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
");

        //6.再次写入一段内容,比第一段内容少
        byte[] bytes2 = {1, 2, 3};
        buf.writeBytes(bytes2);
        System.out.println("写入的bytes为====================>" +Arrays.toString(bytes2));
        System.out.println("写入一段内容后ByteBuf为===========>" +buf.toString());
       //System.out.println("6.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
");

        //7.将ByteBuf清零
        buf.setZero(0, buf.capacity());
        System.out.println("将内容清零后ByteBuf为==============>" +buf.toString());
       //System.out.println("7.ByteBuf中的内容为================>" + Arrays.toString(buf.array()) + "
");

        //8.再次写入一段超过容量的内容
        byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
        buf.writeBytes(bytes3);
        System.out.println("写入的bytes为====================>" +Arrays.toString(bytes3));
        System.out.println("写入一段内容后ByteBuf为===========>" +buf.toString());
       //System.out.println("8.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
");
        //随机访问索引 getByte
        //顺序读 read*
        //顺序写 write*
        //清除已读内容 discardReadBytes
        //清除缓冲区 clear
        //搜索操作
        //标记和重置
        //完整代码示例:参考
        //搜索操作 读取指定位置 buf.getByte(1);
        //
}

}

Unsafe的实现

Java高并发网络编程(四)Netty第30张

Java高并发网络编程(四)Netty第31张

内存复用

PooledByteBuf对象、内存复用

Java高并发网络编程(四)Netty第32张

Java高并发网络编程(四)Netty第33张

3.零拷贝机制

Netty的零拷贝机制,是一种应用层的实现。和底层的JVM、操作系统内存机制并无过多关联。

Java高并发网络编程(四)Netty第34张

代码示例

/*** 零拷贝示例
 */
public classZeroCopyTest {
    @org.junit.Test
    public voidwrapTest() {
        byte[] arr = {1, 2, 3, 4, 5};
        ByteBuf byteBuf =Unpooled.wrappedBuffer(arr);
        System.out.println(byteBuf.getByte(4));
        arr[4] = 6;
        System.out.println(byteBuf.getByte(4));
    } // java数组转为buf 5 arr修改为6后,byteBuf也变为6,说明两者用的是相同的数据,零拷贝

    @org.junit.Test
    public voidsliceTest() {
        ByteBuf buffer1 = Unpooled.wrappedBuffer("hello".getBytes());
        ByteBuf newBuffer = buffer1.slice(1, 2);
        newBuffer.unwrap();
        System.out.println(newBuffer.toString());
    } // 拆分。不会动原来的buf,还保留原来buf的地址

    @org.junit.Test
    public voidcompositeTest() {
        ByteBuf buffer1 = Unpooled.buffer(3);
        buffer1.writeByte(1);
        ByteBuf buffer2 = Unpooled.buffer(3);
        buffer2.writeByte(4);
        CompositeByteBuf compositeByteBuf =Unpooled.compositeBuffer();
        CompositeByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2);
        System.out.println(newBuffer);
    } // 合并。还保留原来buf的信息

}

免责声明:文章转载自《Java高并发网络编程(四)Netty》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇wpf研究之道——datagrid控件分页如何将DevExpress的Gridcontrol导出到Excel下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

python中多线程,多进程,多协程概念及编程上的应用

1, 多线程  线程是进程的一个实体,是CPU进行调度的最小单位,他是比进程更小能独立运行的基本单位。  线程基本不拥有系统资源,只占用一点运行中的资源(如程序计数器,一组寄存器和栈),但是它可以与同属于一个进程的其他线程共享全部的资源。  提高程序的运行速率,上下文切换快,开销比较少,但是不够稳定,容易丢失数据,形成死锁。 直接上代码: impor...

python基础整理5——多进程多线程和协程

进程与线程 1.进程 我们电脑的应用程序,都是进程,假设我们用的电脑是单核的,cpu同时只能执行一个进程。当程序处于I/O阻塞的时候,CPU如果和程序一起等待,那就太浪费了,cpu会去执行其他的程序,此时就涉及到切换,切换前要保存上一个程序运行的状态,才能恢复,所以就需要有个东西来记录这个东西,就可以引出进程的概念了。 进程就是一个程序在一个数据集上的一次...

进程简述

简而言之,一个程序至少有一个进程,一个进程至少有一个线程. 线程的划分尺度小于进程,使得多线程程序的并发性高。另外,进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。线程在执行过程中与进程还是有区别的。每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。但是线程不能够独立执行,必须依存在应用程序中,由应用程...

Java5 多线程实践

2006 年 1 月 18 日 Java5增加了新的类库并发集java.util.concurrent,该类库为并发程序提供了丰富的API多线程编程在Java 5中更加容易,灵活。本文通过一个网络服务器模型,来实践Java5的多线程编程,该模型中使用了Java5中的线程池,阻塞队列,可重入锁等,还实践了Callable, Future等接口,并使用了Jav...

(五) Docker 安装 Nginx

参考并感谢 官方文档 https://hub.docker.com/_/nginx 下载nginx镜像(不带tag标签则表示下载latest版本) docker pull nginx 启动 nginxTmp 容器,目的是为了拷贝配置文件 docker run -d -p 80:80 --name nxtmp nginx:latest 登录...

【转】jstack命令的使用

jstack是java虚拟机自带的一种堆栈跟踪工具。 jstack用于生成java虚拟机当前时刻的线程快照。线程快照是当前java虚拟机内每一条线程正在执行的方法堆栈的集合,生成线程快照的主要目的是定位线程出现长时间停顿的原因, 如线程间死锁、死循环、请求外部资源导致的长时间等待等。 线程出现停顿的时候通过jstack来查看各个线程的调用堆栈,就可以知道没...