目錄
背景 CAS CAS 的標準模式 累加示例 寫著玩的 RingBuffer 備注
背景 返回目錄
大多數企業開發人員都理解數據庫樂觀并發控制,不過很少有人聽說過 CAS(我去年才聽說這個概念),CAS 是多線程樂觀并發控制策略的一種,一些無鎖的支持并發的數據結構都會使用到 CAS,本文對比 CAS 和 數據庫樂觀并發控制,以此達到強化記憶的目的。
CAS 返回目錄
CAS = Compare And Swap
多線程環境下 this.i = this.i + 1 是沒有辦法保證線程安全的,因此就有了 CAS,CAS 可以保證上面代碼的線程安全性,但是 CAS 并不會保證 Swap 的成功,只有 Compare 成功了才會 Swap,即:沒有并發發生,即:在我讀取和修改之間沒有別人修改。另外說一點,如果 i 是局部變量,即:i = i + 1,那么這段代碼是線程安全的,因為局部變量是線程獨享的。
不明白 CAS 沒關系,下面通過 CAS 的標準模式 和 一個簡單的示例來理解 CAS。
CAS 的標準模式 返回目錄
偽代碼
1 var localValue, currentValue; 2 do 3 { 4 localValue = this . 5 6 var newValue = 執行一些計算; 7 8 currentValue = Interlocked.CompareExchange( ref this .value, newValue, localValue); 9 } while (localValue != currentValue);
說明
把 this.value 看成是數據庫數據,localValue 是某個用戶讀取的數據,newValue是用戶想修改的值,這里有必要解釋一下 CompareExchange 和 currentValue,它的內部實現代碼是這樣的(想想下面代碼是線程安全的):
1 var currentValue = this .value 2 if (currentValue == localValue){ 3 this .value = newValue; 4 } 5 return currentValue;
CompareExchange ?用 sql 來類比就是:update xxx set value = newValue where value = localValue,只不過返回的值不同。通過?CompareExchange 的返回結果我們知道 CAS 是否成功了,即:是否出現并發了,即:是否在我讀取和修改之間別人已經修改過了,如果是,可以選擇重試。
累加示例 返回目錄
CAS 代碼
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 using System.Threading; 7 8 namespace InterlockStudy 9 { 10 class ConcurrentIncrease 11 { 12 public static void Test() 13 { 14 var sum = 0 ; 15 16 var tasks = new Task[ 10 ]; 17 for ( var i = 1 ; i <= 10 ; i++ ) 18 { 19 tasks[i - 1 ] = Task.Factory.StartNew((state) => 20 { 21 int localSum, currentSum; 22 do 23 { 24 localSum = sum; 25 26 Thread.Sleep( 10 ); 27 var value = ( int )state; 28 var newValue = localSum + value; 29 30 currentSum = Interlocked.CompareExchange( ref sum, newValue, localSum); 31 } while (localSum != currentSum); 32 }, i); 33 } 34 35 Task.WaitAll(tasks); 36 37 Console.WriteLine(sum); 38 } 39 } 40 }
數據庫并發代碼
1 public static void Test13() 2 { 3 var tasks = new Task[ 10 ]; 4 for ( var i = 1 ; i <= 10 ; i++ ) 5 { 6 tasks[i - 1 ] = Task.Factory.StartNew((state) => 7 { 8 int localSum, result; 9 do 10 { 11 using ( var con = new SqlConnection(CONNECTION_STRING)) 12 { 13 con.Open(); 14 var cmd = new SqlCommand( " select sum from Tests where Id = 1 " , con); 15 var reader = cmd.ExecuteReader(); 16 reader.Read(); 17 localSum = reader.GetInt32( 0 ); 18 19 System.Threading.Thread.Sleep( 10 ); 20 var value = ( int )state; 21 var newValue = localSum + value; 22 23 cmd = new SqlCommand( " update Tests set sum = " + newValue + " where sum = " + localSum + "" , con); 24 result = cmd.ExecuteNonQuery(); 25 } 26 } while (result == 0 ); 27 }, i); 28 } 29 30 Task.WaitAll(tasks); 31 } 32 }
說明
我們發現 CAS 版本的代碼和數據庫版本的代碼出奇的相似,數據庫的CAS操作是通過 update + where 來完成的。
寫著玩的 RingBuffer 返回目錄
代碼
1 using System; 2 using System.Collections.Generic; 3 using System.Collections.Concurrent; 4 using System.Linq; 5 using System.Text; 6 using System.Threading.Tasks; 7 using System.Threading; 8 9 namespace InterlockStudy 10 { 11 internal class Node<T> 12 { 13 public T Data { get ; set ; } 14 15 public bool HasValue { get ; set ; } 16 } 17 18 class RingBuffer<T> 19 { 20 private readonly Node<T> [] _nodes; 21 private long _tailIndex = - 1 ; 22 private long _headIndex = - 1 ; 23 private AutoResetEvent _readEvent = new AutoResetEvent( false ); 24 private AutoResetEvent _writeEvent = new AutoResetEvent( false ); 25 26 public RingBuffer( int maxSize) 27 { 28 _nodes = new Node<T> [maxSize]; 29 30 for ( var i = 0 ; i < maxSize; i++ ) 31 { 32 _nodes[i] = new Node<T> (); 33 } 34 } 35 36 public void EnQueue(T data) 37 { 38 while ( true ) 39 { 40 if ( this .TryEnQueue(data)) 41 { 42 _readEvent.Set(); 43 return ; 44 } 45 46 _writeEvent.WaitOne(); 47 } 48 49 } 50 51 public T DeQueue() 52 { 53 while ( true ) 54 { 55 T data; 56 if ( this .TryDeQueue( out data)) 57 { 58 _writeEvent.Set(); 59 return data; 60 } 61 62 _readEvent.WaitOne(); 63 } 64 65 } 66 67 public bool TryEnQueue(T data) 68 { 69 long localTailIndex, newTailIndex, currentTailIndex; 70 do 71 { 72 localTailIndex = _tailIndex; 73 74 if (! this .CanWrite(localTailIndex)) 75 { 76 return false ; 77 } 78 79 newTailIndex = localTailIndex + 1 ; 80 81 if (_nodes[newTailIndex % _nodes.Length].HasValue) 82 { 83 return false ; 84 } 85 86 currentTailIndex = Interlocked.CompareExchange( ref _tailIndex, newTailIndex, localTailIndex); 87 } 88 while (localTailIndex != currentTailIndex); 89 90 _nodes[newTailIndex % _nodes.Length].Data = data; 91 _nodes[newTailIndex % _nodes.Length].HasValue = true ; 92 93 return true ; 94 } 95 96 public bool TryDeQueue( out T data) 97 { 98 long localHeadIndex, newHeadIndex, currentHeadIndex; 99 do 100 { 101 localHeadIndex = _headIndex; 102 103 if (! this .CanRead(localHeadIndex)) 104 { 105 data = default (T); 106 return false ; 107 } 108 109 newHeadIndex = localHeadIndex + 1 ; 110 if (_nodes[newHeadIndex % _nodes.Length].HasValue == false ) 111 { 112 data = default (T); 113 return false ; 114 } 115 116 currentHeadIndex = Interlocked.CompareExchange( ref _headIndex, newHeadIndex, localHeadIndex); 117 } 118 while (localHeadIndex != currentHeadIndex); 119 120 data = _nodes[newHeadIndex % _nodes.Length].Data; 121 _nodes[newHeadIndex % _nodes.Length].HasValue = false ; 122 123 return true ; 124 } 125 126 private bool CanWrite( long localTailIndex) 127 { 128 return localTailIndex - _headIndex < _nodes.Length; 129 } 130 131 private bool CanRead( long localHeadIndex) 132 { 133 return _tailIndex - localHeadIndex > 0 ; 134 } 135 } 136 }
備注 返回目錄
倉促成文,如果有必要可以再寫篇文章,希望大家多批評。
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
