目錄
背景 CAS CAS 的標準模式 累加示例 寫著玩的 RingBuffer 備注
背景 返回目錄
大多數企業開發人員都理解數據庫樂觀并發控制,不過很少有人聽說過 CAS(我去年才聽說這個概念),CAS 是多線程樂觀并發控制策略的一種,一些無鎖的支持并發的數據結構都會使用到 CAS,本文對比 CAS 和 數據庫樂觀并發控制,以此達到強化記憶的目的。
CAS 返回目錄
CAS = Compare And Swap
多線程環境下 this.i = this.i + 1 是沒有辦法保證線程安全的,因此就有了 CAS,CAS 可以保證上面代碼的線程安全性,但是 CAS 并不會保證 Swap 的成功,只有 Compare 成功了才會 Swap,即:沒有并發發生,即:在我讀取和修改之間沒有別人修改。另外說一點,如果 i 是局部變量,即:i = i + 1,那么這段代碼是線程安全的,因為局部變量是線程獨享的。
不明白 CAS 沒關系,下面通過 CAS 的標準模式 和 一個簡單的示例來理解 CAS。
CAS 的標準模式 返回目錄
偽代碼
1
var
localValue, currentValue;
2
do
3
{
4
localValue =
this
.
5
6
var
newValue =
執行一些計算;
7
8
currentValue = Interlocked.CompareExchange(
ref
this
.value, newValue, localValue);
9
}
while
(localValue !=
currentValue);
說明
把 this.value 看成是數據庫數據,localValue 是某個用戶讀取的數據,newValue是用戶想修改的值,這里有必要解釋一下 CompareExchange 和 currentValue,它的內部實現代碼是這樣的(想想下面代碼是線程安全的):
1
var
currentValue =
this
.value
2
if
(currentValue ==
localValue){
3
this
.value =
newValue;
4
}
5
return
currentValue;
CompareExchange ?用 sql 來類比就是:update xxx set value = newValue where value = localValue,只不過返回的值不同。通過?CompareExchange 的返回結果我們知道 CAS 是否成功了,即:是否出現并發了,即:是否在我讀取和修改之間別人已經修改過了,如果是,可以選擇重試。
累加示例 返回目錄
CAS 代碼
1
using
System;
2
using
System.Collections.Generic;
3
using
System.Linq;
4
using
System.Text;
5
using
System.Threading.Tasks;
6
using
System.Threading;
7
8
namespace
InterlockStudy
9
{
10
class
ConcurrentIncrease
11
{
12
public
static
void
Test()
13
{
14
var
sum =
0
;
15
16
var
tasks =
new
Task[
10
];
17
for
(
var
i =
1
; i <=
10
; i++
)
18
{
19
tasks[i -
1
] = Task.Factory.StartNew((state) =>
20
{
21
int
localSum, currentSum;
22
do
23
{
24
localSum =
sum;
25
26
Thread.Sleep(
10
);
27
var
value = (
int
)state;
28
var
newValue = localSum +
value;
29
30
currentSum = Interlocked.CompareExchange(
ref
sum, newValue, localSum);
31
}
while
(localSum !=
currentSum);
32
}, i);
33
}
34
35
Task.WaitAll(tasks);
36
37
Console.WriteLine(sum);
38
}
39
}
40
}
數據庫并發代碼
1
public
static
void
Test13()
2
{
3
var
tasks =
new
Task[
10
];
4
for
(
var
i =
1
; i <=
10
; i++
)
5
{
6
tasks[i -
1
] = Task.Factory.StartNew((state) =>
7
{
8
int
localSum, result;
9
do
10
{
11
using
(
var
con =
new
SqlConnection(CONNECTION_STRING))
12
{
13
con.Open();
14
var
cmd =
new
SqlCommand(
"
select sum from Tests where Id = 1
"
, con);
15
var
reader =
cmd.ExecuteReader();
16
reader.Read();
17
localSum = reader.GetInt32(
0
);
18
19
System.Threading.Thread.Sleep(
10
);
20
var
value = (
int
)state;
21
var
newValue = localSum +
value;
22
23
cmd =
new
SqlCommand(
"
update Tests set sum =
"
+ newValue +
"
where sum =
"
+ localSum +
""
, con);
24
result =
cmd.ExecuteNonQuery();
25
}
26
}
while
(result ==
0
);
27
}, i);
28
}
29
30
Task.WaitAll(tasks);
31
}
32
}
說明
我們發現 CAS 版本的代碼和數據庫版本的代碼出奇的相似,數據庫的CAS操作是通過 update + where 來完成的。
寫著玩的 RingBuffer 返回目錄
代碼
1
using
System;
2
using
System.Collections.Generic;
3
using
System.Collections.Concurrent;
4
using
System.Linq;
5
using
System.Text;
6
using
System.Threading.Tasks;
7
using
System.Threading;
8
9
namespace
InterlockStudy
10
{
11
internal
class
Node<T>
12
{
13
public
T Data {
get
;
set
; }
14
15
public
bool
HasValue {
get
;
set
; }
16
}
17
18
class
RingBuffer<T>
19
{
20
private
readonly
Node<T>
[] _nodes;
21
private
long
_tailIndex = -
1
;
22
private
long
_headIndex = -
1
;
23
private
AutoResetEvent _readEvent =
new
AutoResetEvent(
false
);
24
private
AutoResetEvent _writeEvent =
new
AutoResetEvent(
false
);
25
26
public
RingBuffer(
int
maxSize)
27
{
28
_nodes =
new
Node<T>
[maxSize];
29
30
for
(
var
i =
0
; i < maxSize; i++
)
31
{
32
_nodes[i] =
new
Node<T>
();
33
}
34
}
35
36
public
void
EnQueue(T data)
37
{
38
while
(
true
)
39
{
40
if
(
this
.TryEnQueue(data))
41
{
42
_readEvent.Set();
43
return
;
44
}
45
46
_writeEvent.WaitOne();
47
}
48
49
}
50
51
public
T DeQueue()
52
{
53
while
(
true
)
54
{
55
T data;
56
if
(
this
.TryDeQueue(
out
data))
57
{
58
_writeEvent.Set();
59
return
data;
60
}
61
62
_readEvent.WaitOne();
63
}
64
65
}
66
67
public
bool
TryEnQueue(T data)
68
{
69
long
localTailIndex, newTailIndex, currentTailIndex;
70
do
71
{
72
localTailIndex =
_tailIndex;
73
74
if
(!
this
.CanWrite(localTailIndex))
75
{
76
return
false
;
77
}
78
79
newTailIndex = localTailIndex +
1
;
80
81
if
(_nodes[newTailIndex %
_nodes.Length].HasValue)
82
{
83
return
false
;
84
}
85
86
currentTailIndex = Interlocked.CompareExchange(
ref
_tailIndex, newTailIndex, localTailIndex);
87
}
88
while
(localTailIndex !=
currentTailIndex);
89
90
_nodes[newTailIndex % _nodes.Length].Data =
data;
91
_nodes[newTailIndex % _nodes.Length].HasValue =
true
;
92
93
return
true
;
94
}
95
96
public
bool
TryDeQueue(
out
T data)
97
{
98
long
localHeadIndex, newHeadIndex, currentHeadIndex;
99
do
100
{
101
localHeadIndex =
_headIndex;
102
103
if
(!
this
.CanRead(localHeadIndex))
104
{
105
data =
default
(T);
106
return
false
;
107
}
108
109
newHeadIndex = localHeadIndex +
1
;
110
if
(_nodes[newHeadIndex % _nodes.Length].HasValue ==
false
)
111
{
112
data =
default
(T);
113
return
false
;
114
}
115
116
currentHeadIndex = Interlocked.CompareExchange(
ref
_headIndex, newHeadIndex, localHeadIndex);
117
}
118
while
(localHeadIndex !=
currentHeadIndex);
119
120
data = _nodes[newHeadIndex %
_nodes.Length].Data;
121
_nodes[newHeadIndex % _nodes.Length].HasValue =
false
;
122
123
return
true
;
124
}
125
126
private
bool
CanWrite(
long
localTailIndex)
127
{
128
return
localTailIndex - _headIndex <
_nodes.Length;
129
}
130
131
private
bool
CanRead(
long
localHeadIndex)
132
{
133
return
_tailIndex - localHeadIndex >
0
;
134
}
135
}
136
}
備注 返回目錄
倉促成文,如果有必要可以再寫篇文章,希望大家多批評。
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

