(转)Netty : writeAndFlush的线程安全及并发问题

摘要:
Rocketmq使用netty实现网络连接。发现多个线程无法使用通道连接,所以这是线程安全的吗?使用Netty编程时,我们通常从用户线程而不是Netty线程池启动写操作,因为我们无法在Netty事件回调中执行大量耗时的操作。那么问题来了——1.writeAndFlush线程是否安全?

rocketmq用netty实现的网络连接,发现它多个线程掉用一个channel连接,所以这个是线程安全的?

使用Netty编程时,我们经常会从用户线程,而不是Netty线程池发起write操作,因为我们不能在netty的事件回调中做大量耗时操作。那么问题来了 –

1, writeAndFlush是线程安全的吗?

2, 是否使用了锁,导致并发性能下降呢

我们来看代码 – 在DefaultChannelHandlerContext中

@Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        DefaultChannelHandlerContext next;
        next = findContextOutbound(MASK_WRITE);
        ReferenceCountUtil.touch(msg, next);
        next.invoker.invokeWrite(next, msg, promise);
        next = findContextOutbound(MASK_FLUSH);
        next.invoker.invokeFlush(next);
        return promise;
}

在DefaultChannelHandlerInvoker.java中

@Override
     public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
         if (msg == null) {
             throw new NullPointerException("msg");
         }
         if (!validatePromise(ctx, promise, true)) {
             // promise cancelled
             ReferenceCountUtil.release(msg);
             return;
         }

         if (executor.inEventLoop()) {
             invokeWriteNow(ctx, msg, promise);
         } else {
             AbstractChannel channel = (AbstractChannel) ctx.channel();
             int size = channel.estimatorHandle().size(msg);
             if (size > 0) {
                 ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
                 // Check for null as it may be set to null if the channel is closed already
                 if (buffer != null) {
                     buffer.incrementPendingOutboundBytes(size);
                 }
             }
             safeExecuteOutbound(WriteTask.newInstance(ctx, msg, size, promise), promise, msg);
         }
     }
private void safeExecuteOutbound(Runnable task, ChannelPromise promise, Object msg) {
         try {
             executor.execute(task);
         } catch (Throwable cause) {
             try {
                 promise.setFailure(cause);
             } finally {
                 ReferenceCountUtil.release(msg);
             }
         }
     }

可见,writeAndFlush如果在Netty线程池内执行,则是直接write;否则,将作为一个task插入到Netty线程池执行。

《Netty权威指南》写到
通过调用NioEventLoop的execute(Runnable task)方法实现,Netty有很多系统Task,创建他们的主要原因是:当I/O线程和用户线程同时操作网络资源时,为了防止并发操作导致的锁竞争,将用户线程的操作封装成Task放入消息队列中,由I/O线程负责执行,这样就实现了局部无锁化。

免责声明:文章转载自《(转)Netty : writeAndFlush的线程安全及并发问题》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇推荐两款远程管理Linux工具(基于Windows系统)ubuntu下FTP文件目录共享下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

浅析.Net下的多线程编程(1)

作者:peter 出处:天极论坛 多线程是许多操作系统所具有的特性,它能大大提高程序的运行效率,所以多线程编程技术为编程者广泛关注。 目前微软的.Net战略正进一步推进,各种相关的技术正为广大编程者所接受,同样在.Net中多线程编程技术具有相当重要的地位。本文我就向大家介绍在.Net下进行多线程编程的基本方法和步骤。 开始新线程 在.Net下创建一个新线...

【Chromium中文文档】跨进程通信 (IPC)

跨进程通信 (IPC) 转载请注明出处:https://ahangchen.gitbooks.io/chromium_doc_zh/content/zh//General_Architecture/Inter-process_Communication.html 全书地址 Chromium中文文档 for https://www.chromium.org/...

delphi 多线程 数据库

// 线程类unit Unit2; interface uses Classes; type TMyThread = class(TThread) private FUserName: string; FPassWord: string; FFlag: Boolean; procedure GetUserName(const Value: string);...

C#Win32API编程之PostMessage

  由于C#屏蔽了很多操作系统内核级的操作,将保护机制进行了加强,通过普通方法是无法完成如后台键鼠模拟、进程内存读写、网络封包拦截等操作的。   而C#又提供了调用非托管代码的DllImport,使得我们可以调用操作系统较为底层的API来完善程序功能。   本文就C#调用Win32API函数PostMessage完成指定窗体后台键鼠模拟作为示例,粗略讲解一...

主线程中同步的 XMLHttpRequest 已不推荐使用,因其对终端用户的用户体验存在负面影响。

最近做实训项目,做着做着突然就崩溃了,我打开chrome的检查元素,一步一步跟踪,给了我这样一个提示信息: 主线程中同步的 XMLHttpRequest 已不推荐使用,因其对终端用户的用户体验存在负面影响。更多帮助请见 http://xhr.spec.whatwg.org/ 我百度了一下发现这是我ajax请求数据时出的错。 从提示中,可以知道,建议不要我们...

异步 HttpContext.Current实现取值的方法(解决异步Application,Session,Cache...等失效的问题)

在一个项目中,为了系统执行效率更快,把一个经常用到的数据库表通过dataset放到Application中,发现在异步实现中每一次都会出现HttpContext.Current为null的异常,后来在网上查了好多资料,发现问这个问题的人多,回答的少,回答的也多数都是:引用System.Web,不要用HttpContext.Current.Applicati...