基于无锁的C#并发队列实现

摘要:
最近,我开始学习非锁编程。与传统的基于锁的算法相比,非锁编程有其独特的优势,这在AngelLucifer关于非锁编程的文章中有详细描述。无锁编程的目标是在不使用锁的情况下确保并发期间共享数据的一致性。其主要实现基础是CAS操作,即compare_和_Swap通过处理器提供的指令,可以自动更新共享数据,同时监视其他线程的干扰。中的相应实现。网络已互锁


最近开始学习无锁编程,和传统的基于Lock的算法相比,无锁编程具有其独特的优点,Angel Lucifer的关于无锁编程一文对此有详细的描述。

无锁编程的目标是在不使用Lock的前提下保证并发过程中共享数据的一致性,其主要的实现基础是CAS操作,也就是compare_and_swap,通过处理器提供的指令,可以原子地更新共享数据,并同时监测其他线程的干扰,.Net中的对应实现是InterLocked.CompareExchange函数。

既然不使用Lock,那在无锁编程中要时刻注意的是,代码可能在任意语句中被中断。如果是单个变量,我们可以使用 InterLocked.XXX 保证操作的原子性,但是如果有多个操作要完成的话,简单地组合 InterLocked.XXX 是远远不够的。通常的原则是对函数中用到的共享变量,先在代码开始处用局部变量保存它的内容,在后面更新共享变量时,使用前述变量来判断其是否发生了改变,如果共享变量发生了改变,那么我们可能需要重试,或者在某些可能的情况下,当前线程可以"帮助"其他更新中的线程完成更新。

从上面可以总结出无锁算法的两个基本特征:

1. 无锁算法总是包含一个循环结构,以保证更新失败后重试

2. 无锁算法在更新共享变量时,总是使用CAS和原始值进行比较,以保证没有冲突

下面是按照Michael-Scott算法实现的并发队列,其中的Dequeue算法在IBM的非阻塞算法一文中有详细介绍。代码如下:

基于无锁的C#并发队列实现第1张基于无锁的C#并发队列实现第2张Code
 1基于无锁的C#并发队列实现第3张public class ConcurrentLinkedQueue<T> 
 2基于无锁的C#并发队列实现第4张基于无锁的C#并发队列实现第5张基于无锁的C#并发队列实现第6张{
 3基于无锁的C#并发队列实现第7张    private class Node<K>
 4基于无锁的C#并发队列实现第8张基于无锁的C#并发队列实现第9张    基于无锁的C#并发队列实现第6张{
 5基于无锁的C#并发队列实现第7张        internal K Item;
 6基于无锁的C#并发队列实现第7张        internal Node<K> Next;
 7基于无锁的C#并发队列实现第7张
 8基于无锁的C#并发队列实现第7张        public Node(K item, Node<K> next)
 9基于无锁的C#并发队列实现第8张基于无锁的C#并发队列实现第9张        基于无锁的C#并发队列实现第6张{
10基于无锁的C#并发队列实现第7张            this.Item = item;
11基于无锁的C#并发队列实现第7张            this.Next = next;
12基于无锁的C#并发队列实现第20张        }

13基于无锁的C#并发队列实现第20张    }

14基于无锁的C#并发队列实现第7张
15基于无锁的C#并发队列实现第7张    private Node<T> _head;
16基于无锁的C#并发队列实现第7张    private Node<T> _tail;
17基于无锁的C#并发队列实现第7张
18基于无锁的C#并发队列实现第7张    public ConcurrentLinkedQueue()
19基于无锁的C#并发队列实现第8张基于无锁的C#并发队列实现第9张    基于无锁的C#并发队列实现第6张{
20基于无锁的C#并发队列实现第7张        _head = new Node<T>(default(T), null);
21基于无锁的C#并发队列实现第7张        _tail = _head;
22基于无锁的C#并发队列实现第20张    }

23基于无锁的C#并发队列实现第7张
24基于无锁的C#并发队列实现第7张    public bool IsEmpty
25基于无锁的C#并发队列实现第8张基于无锁的C#并发队列实现第9张    基于无锁的C#并发队列实现第6张{
26基于无锁的C#并发队列实现第8张基于无锁的C#并发队列实现第9张        get 基于无锁的C#并发队列实现第6张return (_head.Next == null); }
27基于无锁的C#并发队列实现第20张    }

28基于无锁的C#并发队列实现第7张
29基于无锁的C#并发队列实现第7张    public void Enqueue(T item)
30基于无锁的C#并发队列实现第8张基于无锁的C#并发队列实现第9张    基于无锁的C#并发队列实现第6张{
31基于无锁的C#并发队列实现第7张        Node<T> newNode = new Node<T>(item, null);
32基于无锁的C#并发队列实现第7张        while (true)
33基于无锁的C#并发队列实现第8张基于无锁的C#并发队列实现第9张        基于无锁的C#并发队列实现第6张{
34基于无锁的C#并发队列实现第7张            Node<T> curTail = _tail;
35基于无锁的C#并发队列实现第7张            Node<T> residue = curTail.Next;
36基于无锁的C#并发队列实现第7张
37基于无锁的C#并发队列实现第7张            //判断_tail是否被其他process改变
38基于无锁的C#并发队列实现第7张            if (curTail == _tail)
39基于无锁的C#并发队列实现第8张基于无锁的C#并发队列实现第9张            基于无锁的C#并发队列实现第6张{
40基于无锁的C#并发队列实现第7张                //A 有其他process执行C成功,_tail应该指向新的节点
41基于无锁的C#并发队列实现第7张                if (residue == null
42基于无锁的C#并发队列实现第8张基于无锁的C#并发队列实现第9张                基于无锁的C#并发队列实现第6张{
43基于无锁的C#并发队列实现第7张                    //C 如果其他process改变了tail.next节点,需要重新取新的tail节点
44基于无锁的C#并发队列实现第7张                    if (Interlocked.CompareExchange<Node<T>>(
45基于无锁的C#并发队列实现第7张                        ref curTail.Next, newNode, residue) == residue) 
46基于无锁的C#并发队列实现第8张基于无锁的C#并发队列实现第9张                    基于无锁的C#并发队列实现第6张{
47基于无锁的C#并发队列实现第7张                        //D 尝试修改tail
48基于无锁的C#并发队列实现第7张                        Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail); 
49基于无锁的C#并发队列实现第7张                        return;
50基于无锁的C#并发队列实现第20张                    }

51基于无锁的C#并发队列实现第20张                }

52基于无锁的C#并发队列实现第7张                else
53基于无锁的C#并发队列实现第8张基于无锁的C#并发队列实现第9张                基于无锁的C#并发队列实现第6张{
54基于无锁的C#并发队列实现第7张                    //B 帮助其他线程完成D操作
55基于无锁的C#并发队列实现第7张                    Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail); 
56基于无锁的C#并发队列实现第20张                }

57基于无锁的C#并发队列实现第20张            }

58基于无锁的C#并发队列实现第20张        }

59基于无锁的C#并发队列实现第20张    }

60基于无锁的C#并发队列实现第7张
61基于无锁的C#并发队列实现第7张    public bool TryDequeue(out T result)
62基于无锁的C#并发队列实现第8张基于无锁的C#并发队列实现第9张    基于无锁的C#并发队列实现第6张{
63基于无锁的C#并发队列实现第7张        Node<T> curHead;
64基于无锁的C#并发队列实现第7张        Node<T> curTail;
65基于无锁的C#并发队列实现第7张        Node<T> next;
66基于无锁的C#并发队列实现第7张        do
67基于无锁的C#并发队列实现第8张基于无锁的C#并发队列实现第9张        基于无锁的C#并发队列实现第6张{
68基于无锁的C#并发队列实现第7张            curHead = _head;
69基于无锁的C#并发队列实现第7张            curTail = _tail;
70基于无锁的C#并发队列实现第7张            next = curHead.Next;
71基于无锁的C#并发队列实现第7张            if (curHead == _head)
72基于无锁的C#并发队列实现第8张基于无锁的C#并发队列实现第9张            基于无锁的C#并发队列实现第6张{
73基于无锁的C#并发队列实现第7张                if (next == null)  //Queue为空
74基于无锁的C#并发队列实现第8张基于无锁的C#并发队列实现第9张                基于无锁的C#并发队列实现第6张{
75基于无锁的C#并发队列实现第7张                    result = default(T);
76基于无锁的C#并发队列实现第7张                    return false;
77基于无锁的C#并发队列实现第20张                }

78基于无锁的C#并发队列实现第7张                if (curHead == curTail) //Queue处于Enqueue第一个node的过程中
79基于无锁的C#并发队列实现第8张基于无锁的C#并发队列实现第9张                基于无锁的C#并发队列实现第6张{
80基于无锁的C#并发队列实现第7张                    //尝试帮助其他Process完成操作
81基于无锁的C#并发队列实现第7张                    Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail); 
82基于无锁的C#并发队列实现第20张                }

83基于无锁的C#并发队列实现第7张                else
84基于无锁的C#并发队列实现第8张基于无锁的C#并发队列实现第9张                基于无锁的C#并发队列实现第6张{
85基于无锁的C#并发队列实现第7张                    //取next.Item必须放到CAS之前
86基于无锁的C#并发队列实现第7张                    result = next.Item; 
87基于无锁的C#并发队列实现第7张                    //如果_head没有发生改变,则将_head指向next并退出
88基于无锁的C#并发队列实现第7张                    if (Interlocked.CompareExchange<Node<T>>(ref _head, 
89基于无锁的C#并发队列实现第7张                        next, curHead) == curHead)
90基于无锁的C#并发队列实现第7张                        break;
91基于无锁的C#并发队列实现第20张                }

92基于无锁的C#并发队列实现第20张            }

93基于无锁的C#并发队列实现第20张        }

94基于无锁的C#并发队列实现第7张        while (true);
95基于无锁的C#并发队列实现第7张        return true;
96基于无锁的C#并发队列实现第20张    }

97基于无锁的C#并发队列实现第135张}

98基于无锁的C#并发队列实现第3张
根据自己的测试(双核CPU),在轻度和中度争用情况下,无锁算法比基于锁的算法性能好很多,在争用非常严重的情况下(100个并发线程以上/每CPU),基于锁的算法性能开始显示出优势,因为一旦发生争用,基于锁的算法会立刻切换到其他线程,而无锁算法会进入下一次循环,导致CPU的占用。但是如此严重的争用在实际中并不多见,并且可以采用SpinWait的方法加以改进。基于锁的算法在测试中曾经出现过类似死锁的现象,无锁算法则完全没有出过类似问题,另外,处理器核心越多,基于锁的算法效率越差。
 
从上面的算法实现中,可以体会到无锁算法的优势:在并发的多个线程中,总是有线程能够推进,算法总能在有限的循环次数内完成,并且在某些冲突的情况下,一个线程可以“帮助”其他线程完成被中断的工作,这些对提高吞吐量都有很大的作用。
 

免责声明:文章转载自《基于无锁的C#并发队列实现》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇jquery插件:点击拉出的右侧滑动菜单Oracle系列之存储过程下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

RedisTemplate常用集合使用说明-opsForSet(五)

 1、add(K key, V... values)   向变量中批量添加值。 Java代码   redisTemplate.opsForSet().add("setValue","A","B","C","B","D","E","F");     2、members(K key)   获取变量中的值。 Java代码   Set set = redis...

进程之间的通信

进程/线程同步的方式和机制,进程间通信 一、进程/线程间同步机制。 临界区、互斥区、事件、信号量四种方式临界区(Critical Section)、互斥量(Mutex)、信号量(Semaphore)、事件(Event)的区别1、临界区:通过对多线程的串行化来访问公共资源或一段代码,速度快,适合控制数据访问。在任意时刻只允许一个线程对共享资源进行访问,如果...

[转]QT子线程与主线程的信号槽通信-亲测可用!

近用QT做一个服务器,众所周知,QT的主线程必须保持畅通,才能刷新UI。所以,网络通信端采用新开线程的方式。在涉及到使用子线程更新Ui上的控件时遇到了点儿麻烦。网上提供了很多同一线程不同类间采用信号槽通信的方式,但是并不完全适合线程间的信号槽通信,这主要体现在自定义消息的传递上。 首先我们看看一般的方式:利用信号-槽发送Qt内置的元数据类型testthre...

Android Runnable 运行在那个线程

Runnable 并不一定是新开一个线程,比如下面的调用方法就是运行在UI主线程中的: Handler mHandler=new Handler(); mHandler.post(new Runnable(){ @Override public void run() { // TODO Auto-generated method stub }...

Thinkphp的Volist标签

Volist标签主要用于在模板中循环输出数据集或者多维数组。 volist标签(循环输出数据) 闭合 非闭合标签 属性 name(必须):要输出的数据模板变量 id(必须):循环变量 offset(可选):要输出数据的offset length(可选):输出数据的长度 key(可选):循环的key变量,默认值为i mod(可选):对key值取模,...

【转】C++标准转换运算符static_cast

static_cast<new_type> (expression) 虽然const_cast是用来去除变量的const限定,但是static_cast却不是用来去除变量的static引用。其实这是很容易理解的,static决定的是一个变量的作用域和生命周期,比如:在一个文件中将变量定义为static,则说明这个变量只能在本Package中使用...