RM新时代网站-首页

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

無鎖CAS如何實現(xiàn)各種無鎖的數(shù)據(jù)結(jié)構(gòu)

科技綠洲 ? 來源:Linux開發(fā)架構(gòu)之路 ? 作者:Linux開發(fā)架構(gòu)之路 ? 2023-11-13 15:38 ? 次閱讀

一、引言

鎖是解決并發(fā)問題的萬能鑰匙,可是并發(fā)問題只有鎖能解決嗎?當(dāng)然不是,CAS也可以解決并發(fā)問題

二、什么是CAS

比較并交換(compare and swap,CAS),是原子操作的一種,可用于在多線程編程中實現(xiàn)不被打斷的數(shù)據(jù)交換操作,從而避免多線程同時改寫某?數(shù)據(jù)時由于執(zhí)行順序不確定性以及中斷的不可預(yù)知性產(chǎn)?的數(shù)據(jù)不一致問題

有了CAS,我們就可以用它來實現(xiàn)各種無鎖(lock free)的數(shù)據(jù)結(jié)構(gòu)

實現(xiàn)原理

該操作通過將內(nèi)存中的值與指定數(shù)據(jù)進行比較,當(dāng)數(shù)值?樣時將內(nèi)存中的數(shù)據(jù)替換為新的值

下面是兩種int類型操作的CAS偽代碼形式:

//輸入reg的地址,判斷reg的值與oldval是否相等//如果相等,那么就將newval賦值給reg;否則reg保持不變//最終將reg原先的值返回回去
 int compare_and_swap(int *reg, int oldval, int newval){    int old_ref_val = *reg;    if(old_reg_val == oldval)
        *reg = newval;    return old_reg_val;
}
//輸入一個pAddr的地址,在函數(shù)內(nèi)部判斷其的值是否與期望值nExpected相等//如果相等那么就將pAddr的值改為nNew并同時返回true;否則就返回false,什么都不做
 bool compare_and_swap(int *pAddr, int nExpected, int nNew){    if(*pAddr == nExpected)
    {
        *pAddr = nNew;        return true;
    }    else
        return false;
}

在上面的兩種實現(xiàn)中第二種形式更好,因為它返回bool值讓調(diào)用者知道是否更新成功

三、CAS的應(yīng)用層實現(xiàn)

因為CAS是原子操作,所以在各種庫的原子庫中都有對應(yīng)的CAS實現(xiàn)方式

gcc/g++中的CAS

對于gcc、g++編譯器來講,其原子操作中包含下面兩個函數(shù),是專門用來做CAS的

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...);type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);

Windows的CAS

在Windows下,你可以使用下面的Windows API來完成CAS:

InterlockedCompareExchange ( __inout LONG volatile  *Target,
                                __in LONG Exchange,
                                __in LONG Comperand);

C++中的CAS

C++11標(biāo)準(zhǔn)庫引入了原子操作,包含在頭文件中,下面是專門用于CAS操作的接口

template< class T >bool atomic_compare_exchange_weak( std::atomic< T >* obj,
                                   T* expected, T desired );template< class T >bool atomic_compare_exchange_weak( volatile std::atomic< T >* obj,
                                   T* expected, T desired );

四、無鎖隊列的實現(xiàn)

此處我們只考慮隊列出隊列和進隊列的并發(fā)問題:

出隊列: 出隊列時,要保證只有一個線程在對頭結(jié)點進行出隊列的操作,否則就會發(fā)生錯亂

入隊列: 入隊列時,也一樣,保證只有一個線程在對尾節(jié)點進行入隊列的操作,否則就會發(fā)生錯亂

無鎖隊列代碼實現(xiàn)

//queue_cas.h#include < iostream >

 template< typename ElemType >class Queue{public:
    Queue();                  //構(gòu)造函數(shù)
    ~Queue();                 //析構(gòu)函數(shù)public:    void push(ElemType elem); //入隊列
    bool pop();               //出隊列
    void show();              //打印隊列的內(nèi)容private:    struct _qNode             //隊列節(jié)點
    {
        _qNode(): _next(nullptr) { } 
        _qNode(ElemType elem): _elem(elem), _next(nullptr) { } 
        ElemType       _elem;        struct _qNode *_next;
    };private:     struct _qNode *_head;    //頭結(jié)點
     struct _qNode *_tail;    //尾節(jié)點}; 
template< typename ElemType >
Queue< ElemType >::Queue()
{
    _head = _tail =new _qNode();
} 
template< typename ElemType >
Queue< ElemType >::~Queue()
{    while(_head != nullptr)
    {        struct _qNode *tempNode = _head;
        _head = _head- >_next;        delete tempNode;
    }
} 

template< typename ElemType >void Queue< ElemType >::push(ElemType elem)
{   //創(chuàng)建一個新的節(jié)點
   struct _qNode *newNode = new struct _qNode(elem);

   struct _qNode *p = _tail;
   struct _qNode *oldp = _tail;

   do{        while(p- >_next != nullptr)
            p = p- >_next;
   } while(__sync_bool_compare_and_swap(&_tail- >_next, nullptr, newNode) != true);
   __sync_bool_compare_and_swap(&_tail, oldp, newNode);
} 
template< typename ElemType >bool Queue< ElemType >::pop()
{   struct _qNode *p;
   do {
        p = _head;        if(p- >_next == nullptr)            return false;
   } while(__sync_bool_compare_and_swap(&_head, p , p- >_next) != true);   delete p;   return true;
} 
template< typename ElemType >void Queue< ElemType >::show()
{    struct _qNode* tempNode = _head- >_next;

    if(tempNode == nullptr){        std::cout < < "Empty" <

上面為無鎖隊列的實現(xiàn)代碼,我們假定此隊列中頭結(jié)點不存儲數(shù)據(jù)(當(dāng)做哨兵),尾節(jié)點存儲數(shù)據(jù)

圖片

其使用到CAS的核心函數(shù)就是push()和pop()函數(shù),在下面我們將

_sync_bool_compare_and_swap()函數(shù)調(diào)用稱之為CAS操作

push()如下:

假設(shè)線程T1和T2都執(zhí)行push()函數(shù),當(dāng)線程T1先執(zhí)行do-while中的CAS操作然后發(fā)現(xiàn)其尾節(jié)點后為空,于是就執(zhí)行do-while中的CAS操作將尾節(jié)點_tail的_next指針賦值為newNode,然后退出do-while循環(huán),調(diào)用第二個CAS操作將尾節(jié)點指針向后移動一位

由于CAS是一個原子操作,所以即使同時T2線程了也調(diào)用了do-while中的CAS操作,但是其判斷p->_next不為空,因為T1線程已經(jīng)將尾節(jié)點向后移動了,所以其只能繼續(xù)執(zhí)行do,將p向后移動,重新移動到尾節(jié)點繼續(xù)重新判斷,直到成功為止....

為什么push()函數(shù)的最后一個CAS操作不需要判斷是否執(zhí)行成功,因為:

1.如果有一個線程T1,它的while中的CAS如果成功的話,那么其它所有的隨后線程的CAS都會失敗,然后就會再循環(huán)

2.此時,如果T1線程還沒有更新tail指針,其它的線程繼續(xù)失敗,因為tail->next不是NULL了

3.直到T1線程更新完tail指針,于是其它的線程中的某個線程就可以得到新的tail指針,繼續(xù)往下走了

do作用域中為什么要使用while將p指針向后移動:

  • 假設(shè)T1線程在調(diào)用第二個CAS操作更新_tail指針之前,T1線程停掉或者掛掉了,那么其它線程就會進入死循環(huán)
template< typename ElemType >void Queue< ElemType >::push(ElemType elem)
{   //創(chuàng)建一個新的節(jié)點
   struct _qNode *newNode = new struct _qNode(elem);

   struct _qNode *p = _tail;
   struct _qNode *oldp = _tail;

   do{        //不斷的向后指,直到直到尾節(jié)點為止
        while(p- >_next != nullptr)
            p = p- >_next;
   } while(__sync_bool_compare_and_swap(&p- >_next, nullptr, newNode) != true); //如果p沒有移動到真正的尾節(jié)點上,那么繼續(xù)執(zhí)行do

    //當(dāng)CAS函數(shù)執(zhí)行成功之后,那么執(zhí)行這個CAS函數(shù),將尾節(jié)點指針向后移動1位
    __sync_bool_compare_and_swap(&_tail, oldp, newNode);
}

pop()如下:

  • 原理與push()同理,假設(shè)線程T1和線程T2都執(zhí)行pop()操作,假設(shè)T1先執(zhí)行CAS操作將_head向后移動了一位,并且刪除了原先的頭指針
  • 那么當(dāng)T2再執(zhí)行時發(fā)現(xiàn)T1更新過后的_head指針(移動了)與一開始獲取的頭指針p不相等了,那么就繼續(xù)執(zhí)行do作用域重新獲取頭指針,然后重新進行CAS操作
template< typename ElemType >bool Queue< ElemType >::pop()
{   struct _qNode *p;
   do {        //獲取_head指針
        p = _head;        if(p- >_next == nullptr)            return false;
   } while(__sync_bool_compare_and_swap(&_head, p , p- >_next) != true); //判斷頭結(jié)點是否被移動過,如果移動過那么就執(zhí)行do內(nèi)部重新獲取_head指針

   //刪除頭指針
   delete p;   return true;

}

測試代碼

//queue_cas_test.cpp#include "queue_cas.h"
 int main(){
    Queue< int > queue;    queue.push(1);    queue.push(2);    queue.push(3);    queue.show(); 
    queue.pop();    queue.show(); 
    queue.pop();    queue.show(); 
    queue.pop();    queue.show(); 
    queue.push(1);    queue.show(); 
    queue.push(2);    queue.show();
}

我們編寫下面的程序 測試一下無鎖隊列的各種操作是否有誤, 結(jié)果顯示無誤

圖片

五、無鎖隊列性能測試

下面我們將上面的無鎖隊列與C++ STL庫中的queue進行對比,查看一下性能

queue_stl.cpp

#include < stdio.h >#include < time.h >#include < pthread.h >#include < queue >#include < mutex >
 using namespace std; 
#define FOR_LOOP_NUM 10000000  //隊列push和pop操作函數(shù)中for循環(huán)的次數(shù)
 static std::queue< int > _queue; //隊列static std::mutex      _mutex; //隊列操作要用到的互斥鎖
 static int push_count;         //隊列總共push的次數(shù)static int pop_count;          //隊列總共pop的次數(shù)
 typedef void *(*thread_func_t)(void *arg); 

void *queue_push(void *arg){    for(int i = 0; i < FOR_LOOP_NUM; ++i)
    {
        _mutex.lock();
        _queue.push(i);
        push_count++;
        _mutex.unlock();
    }    return NULL;
} 
void *queue_pop(void *arg){    while(true)
    {
        _mutex.lock();        if(_queue.size() > 0)
        {
            _queue.pop();
            pop_count++;
        }
        _mutex.unlock();        
        if(pop_count >= FOR_LOOP_NUM)            break;
    }    return NULL;
} 
void test_queue(thread_func_t push_func, thread_func_t pop_func, void *arg){    clock_t start = clock();    
    pthread_t push_tid;    if(pthread_create(&push_tid, NULL, push_func, arg) != 0)
    {
        perror("pthread_create");
    } 
    pthread_t pop_tid;    if(pthread_create(&pop_tid, NULL, pop_func, arg) != 0)
    {
        perror("pthread_create");
    }

    pthread_join(push_tid, NULL);
    pthread_join(pop_tid, NULL); 
    clock_t end = clock(); 
    printf("spend clock: %ldn", (end - start) / CLOCKS_PER_SEC);
} 
int main(){
    push_count = 0;
    pop_count = 0;

    test_queue(queue_push, queue_pop, NULL); 
    printf("push_count:%d, pop_count:%dn", push_count, pop_count);    return 0;
}

其結(jié)果顯示,執(zhí)行10000000萬次push和 10000000萬次pop操作大概要1秒多的時間

圖片

queue_cas.cpp

#include < stdio.h >#include < time.h >#include < pthread.h >#include "queue_cas.h"
 using namespace std; 
#define FOR_LOOP_NUM 10000000  //隊列push和pop操作函數(shù)中for循環(huán)的次數(shù)
 static int push_count;         //隊列總共push的次數(shù)static int pop_count;          //隊列總共pop的次數(shù)
 static Queue< int > _queue; 
typedef void *(*thread_func_t)(void *arg); 
void *queue_push(void *arg){    for(int i = 0; i < FOR_LOOP_NUM; ++i)
    {
        _queue.push(i);
        push_count++;
    }    return NULL;
} 
void *queue_pop(void *arg){    while(true)
    {
        _queue.pop();
        pop_count++;        
        if(pop_count >= FOR_LOOP_NUM)            break;
    }    return NULL;
} 
void test_queue(thread_func_t push_func, thread_func_t pop_func, void *arg){    clock_t start = clock();    
    pthread_t push_tid;    if(pthread_create(&push_tid, NULL, push_func, arg) != 0)
    {
        perror("pthread_create");
    } 
    pthread_t pop_tid;    if(pthread_create(&pop_tid, NULL, pop_func, arg) != 0)
    {
        perror("pthread_create");
    }

    pthread_join(push_tid, NULL);
    pthread_join(pop_tid, NULL); 
    clock_t end = clock(); 
    printf("spend clock: %ldn", (end - start) / CLOCKS_PER_SEC);
} 
int main(){
    push_count = 0;
    pop_count = 0;

    test_queue(queue_push, queue_pop, NULL); 
    printf("push_count:%d, pop_count:%dn", push_count, pop_count);    return 0;
}

其結(jié)果顯示,執(zhí)行10000000萬次push和 10000000萬次pop操作大概在1秒之內(nèi),沒有超過1秒中

圖片

因此,無鎖隊列比使用mutex的效率要高一些

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 編程
    +關(guān)注

    關(guān)注

    88

    文章

    3614

    瀏覽量

    93686
  • 多線程
    +關(guān)注

    關(guān)注

    0

    文章

    278

    瀏覽量

    19943
  • 數(shù)據(jù)結(jié)構(gòu)

    關(guān)注

    3

    文章

    573

    瀏覽量

    40123
  • CAS
    CAS
    +關(guān)注

    關(guān)注

    0

    文章

    34

    瀏覽量

    15201
收藏 人收藏

    評論

    相關(guān)推薦

    MCU上的原子讀操作

    變量的操作(必須單核),大家可以仔細(xì)想想。單讀單寫隊列是MCU上經(jīng)常用的,對中斷通信接口的緩沖非常方便可靠。以此為基礎(chǔ),可跨平臺實現(xiàn)。
    發(fā)表于 03-06 09:39

    OpenHarmony——內(nèi)核IPC機制數(shù)據(jù)結(jié)構(gòu)解析

    可以調(diào)用LOS_EventClear來清除事件。四、數(shù)據(jù)結(jié)構(gòu)--互斥互斥又稱互斥型信號量,是一種特殊的二值性信號量,用于實現(xiàn)對共享資源的獨占式處理。任意時刻互斥
    發(fā)表于 09-05 11:02

    OpenHarmony——內(nèi)核IPC機制數(shù)據(jù)結(jié)構(gòu)解析

    、數(shù)據(jù)結(jié)構(gòu)--互斥互斥又稱互斥型信號量,是一種特殊的二值性信號量,用于實現(xiàn)對共享資源的獨占式處理。任意時刻互斥的狀態(tài)只有開鎖或閉鎖,當(dāng)
    發(fā)表于 09-08 11:44

    《有》/《》/《簽約》/《解鎖》/《越獄》/《激活》專

    《有》/《》/《簽約》/《解鎖》/《越獄》/《激活》專業(yè)技術(shù)詞解析 在討論區(qū)里,大家看到:《有版》,《
    發(fā)表于 02-03 11:05 ?954次閱讀

    智能按鍵出現(xiàn)反應(yīng)或禁止操作的原因坤坤智能告訴你

    智能按鍵出現(xiàn)反應(yīng)或禁止操作的原因坤坤智能告訴你在日常生活中使用智能時,多多少少會遇到智能熱鍵
    發(fā)表于 12-14 14:47 ?1.1w次閱讀

    利用CAS技術(shù)實現(xiàn)隊列

    【 導(dǎo)讀 】:本文 主要講解利用CAS技術(shù)實現(xiàn)隊列。 關(guān)于隊列的
    的頭像 發(fā)表于 01-11 10:52 ?2282次閱讀
    利用<b class='flag-5'>CAS</b>技術(shù)<b class='flag-5'>實現(xiàn)</b><b class='flag-5'>無</b><b class='flag-5'>鎖</b>隊列

    關(guān)于CAS等原子操作介紹 隊列的鏈表實現(xiàn)方法

    ,X86下對應(yīng)的是 CMPXCHG 匯編指令。有了這個原子操作,我們就可以用其來實現(xiàn)各種(lock free)的數(shù)據(jù)結(jié)構(gòu)。
    的頭像 發(fā)表于 05-18 09:12 ?3406次閱讀
    關(guān)于<b class='flag-5'>CAS</b>等原子操作介紹 <b class='flag-5'>無</b><b class='flag-5'>鎖</b>隊列的鏈表<b class='flag-5'>實現(xiàn)</b>方法

    源智能的應(yīng)用前景

    ,但應(yīng)用前景廣闊。源智能的發(fā)展優(yōu)勢:1.政策支持:近年來,國家大力推進物聯(lián)網(wǎng)、大數(shù)據(jù)、新能源的發(fā)展,并且陸續(xù)出臺各項產(chǎn)業(yè)政策,引導(dǎo)智能行業(yè)有序化、高端化發(fā)展,
    的頭像 發(fā)表于 09-22 10:18 ?1523次閱讀
    <b class='flag-5'>無</b>源智能<b class='flag-5'>鎖</b>的應(yīng)用前景

    新品上架——源智能把手

    為了迎合市場需求,2022年我司開始著手開發(fā)源智能把手。經(jīng)過幾個月的努力,2022年11月我司正式上架源智能把手。源智能把手
    的頭像 發(fā)表于 11-11 17:56 ?599次閱讀
    新品上架——<b class='flag-5'>無</b>源智能把手<b class='flag-5'>鎖</b>

    源智能系統(tǒng)之水務(wù)消防

    源智能系統(tǒng)之水務(wù)消防
    的頭像 發(fā)表于 05-22 09:48 ?464次閱讀
    <b class='flag-5'>無</b>源智能<b class='flag-5'>鎖</b>系統(tǒng)之水務(wù)消防

    如何實現(xiàn)一個多讀多寫的線程安全的隊列

    在ZMQ隊列的原理與實現(xiàn)一文中,我們已經(jīng)知道了ypipe可以實現(xiàn)一線程寫一線程讀的隊列,
    的頭像 發(fā)表于 11-08 15:25 ?1301次閱讀
    如何<b class='flag-5'>實現(xiàn)</b>一個多讀多寫的線程安全的<b class='flag-5'>無</b><b class='flag-5'>鎖</b>隊列

    隊列的潛在優(yōu)勢

    隊列 先大致介紹一下隊列。隊列的根本是CAS
    的頭像 發(fā)表于 11-09 09:23 ?586次閱讀
    <b class='flag-5'>無</b><b class='flag-5'>鎖</b>隊列的潛在優(yōu)勢

    CAS如何實現(xiàn)各種數(shù)據(jù)結(jié)構(gòu)

    匯編指令。有了這個原子操作,我們就可以用其來實現(xiàn)各種(lock free)的數(shù)據(jù)結(jié)構(gòu)。 這個操作用C語言來描述就是下面這個樣子:意思就
    的頭像 發(fā)表于 11-10 11:00 ?549次閱讀
    <b class='flag-5'>CAS</b>如何<b class='flag-5'>實現(xiàn)</b><b class='flag-5'>各種</b><b class='flag-5'>無</b><b class='flag-5'>鎖</b>的<b class='flag-5'>數(shù)據(jù)結(jié)構(gòu)</b>

    隊列解決的問題

    為什么需要隊列 隊列解決了什么問題?隊列解決了
    的頭像 發(fā)表于 11-10 15:33 ?930次閱讀
    <b class='flag-5'>無</b><b class='flag-5'>鎖</b>隊列解決的問題

    互斥和自旋實現(xiàn)原理

    互斥和自旋是操作系統(tǒng)中常用的同步機制,用于控制對共享資源的訪問,以避免多個線程或進程同時訪問同一資源,從而引發(fā)數(shù)據(jù)不一致或競爭條件等問題。 互斥(Mutex) 互斥
    的頭像 發(fā)表于 07-10 10:07 ?484次閱讀
    RM新时代网站-首页