一、引言
鎖是解決并發(fā)問題的萬能鑰匙,可是并發(fā)問題只有鎖能解決嗎?當(dāng)然不是,CAS也可以解決并發(fā)問題
二、什么是CAS
比較并交換(compare and swap,CAS),是原子操作的一種,可用于在多線程編程中實現(xiàn)不被打斷的數(shù)據(jù)交換操作,從而避免多線程同時改寫某?數(shù)據(jù)時由于執(zhí)行順序不確定性以及中斷的不可預(yù)知性產(chǎn)?的數(shù)據(jù)不一致問題
有了CAS,我們就可以用它來實現(xiàn)各種無鎖(lock free)的數(shù)據(jù)結(jié)構(gòu)
實現(xiàn)原理
該操作通過將內(nèi)存中的值與指定數(shù)據(jù)進行比較,當(dāng)數(shù)值?樣時將內(nèi)存中的數(shù)據(jù)替換為新的值
下面是兩種int類型操作的CAS偽代碼形式:
//輸入reg的地址,判斷reg的值與oldval是否相等//如果相等,那么就將newval賦值給reg;否則reg保持不變//最終將reg原先的值返回回去
int compare_and_swap(int *reg, int oldval, int newval){ int old_ref_val = *reg; if(old_reg_val == oldval)
*reg = newval; return old_reg_val;
}
//輸入一個pAddr的地址,在函數(shù)內(nèi)部判斷其的值是否與期望值nExpected相等//如果相等那么就將pAddr的值改為nNew并同時返回true;否則就返回false,什么都不做
bool compare_and_swap(int *pAddr, int nExpected, int nNew){ if(*pAddr == nExpected)
{
*pAddr = nNew; return true;
} else
return false;
}
在上面的兩種實現(xiàn)中第二種形式更好,因為它返回bool值讓調(diào)用者知道是否更新成功
三、CAS的應(yīng)用層實現(xiàn)
因為CAS是原子操作,所以在各種庫的原子庫中都有對應(yīng)的CAS實現(xiàn)方式
gcc/g++中的CAS
對于gcc、g++編譯器來講,其原子操作中包含下面兩個函數(shù),是專門用來做CAS的
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...);type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);
Windows的CAS
在Windows下,你可以使用下面的Windows API來完成CAS:
InterlockedCompareExchange ( __inout LONG volatile *Target,
__in LONG Exchange,
__in LONG Comperand);
C++中的CAS
C++11標(biāo)準(zhǔn)庫引入了原子操作,包含在頭文件中,下面是專門用于CAS操作的接口
template< class T >bool atomic_compare_exchange_weak( std::atomic< T >* obj,
T* expected, T desired );template< class T >bool atomic_compare_exchange_weak( volatile std::atomic< T >* obj,
T* expected, T desired );
四、無鎖隊列的實現(xiàn)
此處我們只考慮隊列出隊列和進隊列的并發(fā)問題:
出隊列: 出隊列時,要保證只有一個線程在對頭結(jié)點進行出隊列的操作,否則就會發(fā)生錯亂
入隊列: 入隊列時,也一樣,保證只有一個線程在對尾節(jié)點進行入隊列的操作,否則就會發(fā)生錯亂
無鎖隊列代碼實現(xiàn)
//queue_cas.h#include < iostream >
template< typename ElemType >class Queue{public:
Queue(); //構(gòu)造函數(shù)
~Queue(); //析構(gòu)函數(shù)public: void push(ElemType elem); //入隊列
bool pop(); //出隊列
void show(); //打印隊列的內(nèi)容private: struct _qNode //隊列節(jié)點
{
_qNode(): _next(nullptr) { }
_qNode(ElemType elem): _elem(elem), _next(nullptr) { }
ElemType _elem; struct _qNode *_next;
};private: struct _qNode *_head; //頭結(jié)點
struct _qNode *_tail; //尾節(jié)點};
template< typename ElemType >
Queue< ElemType >::Queue()
{
_head = _tail =new _qNode();
}
template< typename ElemType >
Queue< ElemType >::~Queue()
{ while(_head != nullptr)
{ struct _qNode *tempNode = _head;
_head = _head- >_next; delete tempNode;
}
}
template< typename ElemType >void Queue< ElemType >::push(ElemType elem)
{ //創(chuàng)建一個新的節(jié)點
struct _qNode *newNode = new struct _qNode(elem);
struct _qNode *p = _tail;
struct _qNode *oldp = _tail;
do{ while(p- >_next != nullptr)
p = p- >_next;
} while(__sync_bool_compare_and_swap(&_tail- >_next, nullptr, newNode) != true);
__sync_bool_compare_and_swap(&_tail, oldp, newNode);
}
template< typename ElemType >bool Queue< ElemType >::pop()
{ struct _qNode *p;
do {
p = _head; if(p- >_next == nullptr) return false;
} while(__sync_bool_compare_and_swap(&_head, p , p- >_next) != true); delete p; return true;
}
template< typename ElemType >void Queue< ElemType >::show()
{ struct _qNode* tempNode = _head- >_next;
if(tempNode == nullptr){ std::cout < < "Empty" <
上面為無鎖隊列的實現(xiàn)代碼,我們假定此隊列中頭結(jié)點不存儲數(shù)據(jù)(當(dāng)做哨兵),尾節(jié)點存儲數(shù)據(jù)
其使用到CAS的核心函數(shù)就是push()和pop()函數(shù),在下面我們將
_sync_bool_compare_and_swap()函數(shù)調(diào)用稱之為CAS操作
push()如下:
假設(shè)線程T1和T2都執(zhí)行push()函數(shù),當(dāng)線程T1先執(zhí)行do-while中的CAS操作然后發(fā)現(xiàn)其尾節(jié)點后為空,于是就執(zhí)行do-while中的CAS操作將尾節(jié)點_tail的_next指針賦值為newNode,然后退出do-while循環(huán),調(diào)用第二個CAS操作將尾節(jié)點指針向后移動一位
由于CAS是一個原子操作,所以即使同時T2線程了也調(diào)用了do-while中的CAS操作,但是其判斷p->_next不為空,因為T1線程已經(jīng)將尾節(jié)點向后移動了,所以其只能繼續(xù)執(zhí)行do,將p向后移動,重新移動到尾節(jié)點繼續(xù)重新判斷,直到成功為止....
為什么push()函數(shù)的最后一個CAS操作不需要判斷是否執(zhí)行成功,因為:
1.如果有一個線程T1,它的while中的CAS如果成功的話,那么其它所有的隨后線程的CAS都會失敗,然后就會再循環(huán)
2.此時,如果T1線程還沒有更新tail指針,其它的線程繼續(xù)失敗,因為tail->next不是NULL了
3.直到T1線程更新完tail指針,于是其它的線程中的某個線程就可以得到新的tail指針,繼續(xù)往下走了
do作用域中為什么要使用while將p指針向后移動:
- 假設(shè)T1線程在調(diào)用第二個CAS操作更新_tail指針之前,T1線程停掉或者掛掉了,那么其它線程就會進入死循環(huán)
template< typename ElemType >void Queue< ElemType >::push(ElemType elem)
{ //創(chuàng)建一個新的節(jié)點
struct _qNode *newNode = new struct _qNode(elem);
struct _qNode *p = _tail;
struct _qNode *oldp = _tail;
do{ //不斷的向后指,直到直到尾節(jié)點為止
while(p- >_next != nullptr)
p = p- >_next;
} while(__sync_bool_compare_and_swap(&p- >_next, nullptr, newNode) != true); //如果p沒有移動到真正的尾節(jié)點上,那么繼續(xù)執(zhí)行do
//當(dāng)CAS函數(shù)執(zhí)行成功之后,那么執(zhí)行這個CAS函數(shù),將尾節(jié)點指針向后移動1位
__sync_bool_compare_and_swap(&_tail, oldp, newNode);
}
pop()如下:
- 原理與push()同理,假設(shè)線程T1和線程T2都執(zhí)行pop()操作,假設(shè)T1先執(zhí)行CAS操作將_head向后移動了一位,并且刪除了原先的頭指針
- 那么當(dāng)T2再執(zhí)行時發(fā)現(xiàn)T1更新過后的_head指針(移動了)與一開始獲取的頭指針p不相等了,那么就繼續(xù)執(zhí)行do作用域重新獲取頭指針,然后重新進行CAS操作
template< typename ElemType >bool Queue< ElemType >::pop()
{ struct _qNode *p;
do { //獲取_head指針
p = _head; if(p- >_next == nullptr) return false;
} while(__sync_bool_compare_and_swap(&_head, p , p- >_next) != true); //判斷頭結(jié)點是否被移動過,如果移動過那么就執(zhí)行do內(nèi)部重新獲取_head指針
//刪除頭指針
delete p; return true;
}
測試代碼
//queue_cas_test.cpp#include "queue_cas.h"
int main(){
Queue< int > queue; queue.push(1); queue.push(2); queue.push(3); queue.show();
queue.pop(); queue.show();
queue.pop(); queue.show();
queue.pop(); queue.show();
queue.push(1); queue.show();
queue.push(2); queue.show();
}
我們編寫下面的程序 測試一下無鎖隊列的各種操作是否有誤, 結(jié)果顯示無誤
五、無鎖隊列性能測試
下面我們將上面的無鎖隊列與C++ STL庫中的queue進行對比,查看一下性能
queue_stl.cpp
#include < stdio.h >#include < time.h >#include < pthread.h >#include < queue >#include < mutex >
using namespace std;
#define FOR_LOOP_NUM 10000000 //隊列push和pop操作函數(shù)中for循環(huán)的次數(shù)
static std::queue< int > _queue; //隊列static std::mutex _mutex; //隊列操作要用到的互斥鎖
static int push_count; //隊列總共push的次數(shù)static int pop_count; //隊列總共pop的次數(shù)
typedef void *(*thread_func_t)(void *arg);
void *queue_push(void *arg){ for(int i = 0; i < FOR_LOOP_NUM; ++i)
{
_mutex.lock();
_queue.push(i);
push_count++;
_mutex.unlock();
} return NULL;
}
void *queue_pop(void *arg){ while(true)
{
_mutex.lock(); if(_queue.size() > 0)
{
_queue.pop();
pop_count++;
}
_mutex.unlock();
if(pop_count >= FOR_LOOP_NUM) break;
} return NULL;
}
void test_queue(thread_func_t push_func, thread_func_t pop_func, void *arg){ clock_t start = clock();
pthread_t push_tid; if(pthread_create(&push_tid, NULL, push_func, arg) != 0)
{
perror("pthread_create");
}
pthread_t pop_tid; if(pthread_create(&pop_tid, NULL, pop_func, arg) != 0)
{
perror("pthread_create");
}
pthread_join(push_tid, NULL);
pthread_join(pop_tid, NULL);
clock_t end = clock();
printf("spend clock: %ldn", (end - start) / CLOCKS_PER_SEC);
}
int main(){
push_count = 0;
pop_count = 0;
test_queue(queue_push, queue_pop, NULL);
printf("push_count:%d, pop_count:%dn", push_count, pop_count); return 0;
}
其結(jié)果顯示,執(zhí)行10000000萬次push和 10000000萬次pop操作大概要1秒多的時間
queue_cas.cpp
#include < stdio.h >#include < time.h >#include < pthread.h >#include "queue_cas.h"
using namespace std;
#define FOR_LOOP_NUM 10000000 //隊列push和pop操作函數(shù)中for循環(huán)的次數(shù)
static int push_count; //隊列總共push的次數(shù)static int pop_count; //隊列總共pop的次數(shù)
static Queue< int > _queue;
typedef void *(*thread_func_t)(void *arg);
void *queue_push(void *arg){ for(int i = 0; i < FOR_LOOP_NUM; ++i)
{
_queue.push(i);
push_count++;
} return NULL;
}
void *queue_pop(void *arg){ while(true)
{
_queue.pop();
pop_count++;
if(pop_count >= FOR_LOOP_NUM) break;
} return NULL;
}
void test_queue(thread_func_t push_func, thread_func_t pop_func, void *arg){ clock_t start = clock();
pthread_t push_tid; if(pthread_create(&push_tid, NULL, push_func, arg) != 0)
{
perror("pthread_create");
}
pthread_t pop_tid; if(pthread_create(&pop_tid, NULL, pop_func, arg) != 0)
{
perror("pthread_create");
}
pthread_join(push_tid, NULL);
pthread_join(pop_tid, NULL);
clock_t end = clock();
printf("spend clock: %ldn", (end - start) / CLOCKS_PER_SEC);
}
int main(){
push_count = 0;
pop_count = 0;
test_queue(queue_push, queue_pop, NULL);
printf("push_count:%d, pop_count:%dn", push_count, pop_count); return 0;
}
其結(jié)果顯示,執(zhí)行10000000萬次push和 10000000萬次pop操作大概在1秒之內(nèi),沒有超過1秒中
因此,無鎖隊列比使用mutex的效率要高一些
-
編程
+關(guān)注
關(guān)注
88文章
3614瀏覽量
93686 -
多線程
+關(guān)注
關(guān)注
0文章
278瀏覽量
19943 -
數(shù)據(jù)結(jié)構(gòu)
+關(guān)注
關(guān)注
3文章
573瀏覽量
40123 -
CAS
+關(guān)注
關(guān)注
0文章
34瀏覽量
15201
發(fā)布評論請先 登錄
相關(guān)推薦
評論