RM新时代网站-首页

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

協(xié)程的實現(xiàn)與原理

科技綠洲 ? 來源:Linux開發(fā)架構(gòu)之路 ? 作者:Linux開發(fā)架構(gòu)之路 ? 2023-11-10 10:57 ? 次閱讀

前言

協(xié)程這個概念很久了,好多程序員是實現(xiàn)過這個組件的,網(wǎng)上關(guān)于協(xié)程的文章,博客,論壇都是汗牛充棟,在知乎,github上面也有很多大牛寫了關(guān)于協(xié)程的心得體會。突發(fā)奇想,我也來實現(xiàn)一個這樣的組件,并測試了一下性能。借鑒了很多大牛的思想,閱讀了很多大牛的代碼。于是把整個思考過程寫下來。實現(xiàn)代碼

https://github.com/wangbojing/NtyCotyCo

代碼簡單易讀,如果在你的項目中,NtyCo能夠為你解決些許工程問題,那就榮幸之至。

本文章的設(shè)計思路,是在每一個章的最前面以問題提出,每章節(jié)的學習目的。大家能夠帶著每章的問題來讀每章節(jié)的內(nèi)容,方便讀者能夠方便的進入每章節(jié)的思考。讀者讀完以后加上案例代碼閱讀,編譯,運行,能夠?qū)ι衩氐膮f(xié)程有一個全新的理解。能夠運用到工程代碼,幫助你更加方便高效的完成工程工作。

第一章 協(xié)程的起源

問題:協(xié)程存在的原因?協(xié)程能夠解決哪些問題?

在我們現(xiàn)在CS,BS開發(fā)模式下,服務(wù)器的吞吐量是一個很重要的參數(shù)。其實吞吐量是IO處理時間加上業(yè)務(wù)處理。為了簡單起見,比如,客戶端與服務(wù)器之間是長連接的,客戶端定期給服務(wù)器發(fā)送心跳包數(shù)據(jù)。客戶端發(fā)送一次心跳包到服務(wù)器,服務(wù)器更新該新客戶端狀態(tài)的。心跳包發(fā)送的過程,業(yè)務(wù)處理時長等于IO讀?。≧ECV系統(tǒng)調(diào)用)加上業(yè)務(wù)處理(更新客戶狀態(tài))。吞吐量等于1s業(yè)務(wù)處理次數(shù)。

業(yè)務(wù)處理(更新客戶端狀態(tài))時間,業(yè)務(wù)不一樣的,處理時間不一樣,我們就不做討論。

那如何提升recv的性能。若只有一個客戶端,recv的性能也沒有必要提升,也不能提升。若在有百萬計的客戶端長連接的情況,我們該如何提升。以Linux為例,在這里需要介紹一個“網(wǎng)紅”就是epoll。服務(wù)器使用epoll管理百萬計的客戶端長連接,代碼框架如下:

while (1) {

 int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);



 for (i = 0;i < nready;i ++) {



 int sockfd = events[i].data.fd;

 if (sockfd == listenfd) {

 int connfd = accept(listenfd, xxx, xxxx);



            setnonblock(connfd);



            ev.events = EPOLLIN | EPOLLET;

            ev.data.fd = connfd;

            epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);



        } else {

            handle(sockfd);

        }

    }

}

對于響應(yīng)式服務(wù)器,所有的客戶端的操作驅(qū)動都是來源于這個大循環(huán)。來源于epoll_wait的反饋結(jié)果。

對于服務(wù)器處理百萬計的IO。Handle(sockfd)實現(xiàn)方式有兩種。

第一種,handle(sockfd)函數(shù)內(nèi)部對sockfd進行讀寫動作。代碼如下

int handle(int sockfd) {



 recv(sockfd, rbuffer, length, 0);



    parser_proto(rbuffer, length);



 send(sockfd, sbuffer, length, 0);



}

handle的io操作(send,recv)與epoll_wait是在同一個處理流程里面的。這就是IO同步操作。

優(yōu)點:

  1. sockfd管理方便。
  2. 操作邏輯清晰。

缺點:

  1. 服務(wù)器程序依賴epoll_wait的循環(huán)響應(yīng)速度慢。
  2. 程序性能差

第二種,handle(sockfd)函數(shù)內(nèi)部將sockfd的操作,push到線程池中,代碼如下:

int thread_cb(int sockfd) {

 // 此函數(shù)是在線程池創(chuàng)建的線程中運行。

    // 與handle不在一個線程上下文中運行

 recv(sockfd, rbuffer, length, 0);

    parser_proto(rbuffer, length);

 send(sockfd, sbuffer, length, 0);

}



int handle(int sockfd) {

 //此函數(shù)在主線程 main_thread 中運行

    //在此處之前,確保線程池已經(jīng)啟動。

    push_thread(sockfd, thread_cb); //將sockfd放到其他線程中運行。

}

Handle函數(shù)是將sockfd處理方式放到另一個已經(jīng)其他的線程中運行,如此做法,將io操作(recv,send)與epoll_wait 不在一個處理流程里面,使得io操作(recv,send)與epoll_wait實現(xiàn)解耦。這就叫做IO異步操作。

優(yōu)點:

  1. 子模塊好規(guī)劃。
  2. 程序性能高。

缺點:

正因為子模塊好規(guī)劃,使得模塊之間的sockfd的管理異常麻煩。每一個子線程都需要管理好sockfd,避免在IO操作的時候,sockfd出現(xiàn)關(guān)閉或其他異常。

上文有提到IO同步操作,程序響應(yīng)慢,IO異步操作,程序響應(yīng)快。

下面來對比一下IO同步操作與IO異步操作。

代碼如下:

https://github.com/wangbojing/c1000k_test/blob/master/server_mulport_epoll.c

在這份代碼的486行,#if 1, 打開的時候,為IO異步操作。關(guān)閉的時候,為IO同步操作。

接下來把我測試接入量的結(jié)果粘貼出來。

IO異步操作,每1000個連接接入的服務(wù)器響應(yīng)時間(900ms左右)。

IO同步操作,每1000個連接接入的服務(wù)器響應(yīng)時間(6500ms左右)。

IO異步操作與IO同步操作

對比項

IO同步操作

IO異步操作

Sockfd管理

管理方便

多個線程共同管理

代碼邏輯

程序整體邏輯清晰

子模塊邏輯清晰

程序性能

響應(yīng)時間長,性能差

響應(yīng)時間短,性能好

有沒有一種方式,有異步性能,同步的代碼邏輯。來方便編程人員對IO操作的組件呢?有,采用一種輕量級的協(xié)程來實現(xiàn)。在每次send或者recv之前進行切換,再由調(diào)度器來處理epoll_wait的流程。

就是采用了基于這樣的思考,寫了NtyCo,實現(xiàn)了一個IO異步操作與協(xié)程結(jié)合的組件。https://https://github.com/wangbojing/NtyCo

第二章 協(xié)程的案例

問題:協(xié)程如何使用?與線程使用有何區(qū)別?

在做網(wǎng)絡(luò)IO編程的時候,有一個非常理想的情況,就是每次accept返回的時候,就為新來的客戶端分配一個線程,這樣一個客戶端對應(yīng)一個線程。就不會有多個線程共用一個sockfd。每請求每線程的方式,并且代碼邏輯非常易讀。但是這只是理想,線程創(chuàng)建代價,調(diào)度代價就呵呵了。

先來看一下每請求每線程的代碼如下:

while(1) {

 socklen_t len = sizeof(struct sockaddr_in);

 int clientfd = accept(sockfd, (struct sockaddr*)&remote, &len);



 pthread_t thread_id;

 pthread_create(&thread_id, NULL, client_cb, &clientfd);



}

這樣的做法,寫完放到生產(chǎn)環(huán)境下面,如果你的老板不打死你,你來找我。我來幫你老板,為民除害。

如果我們有協(xié)程,我們就可以這樣實現(xiàn)。參考代碼如下:

https://github.com/wangbojing/NtyCo/blob/master/nty_server_test.c


while (1) {

    socklen_t len = sizeof(struct sockaddr_in);

    int cli_fd = nty_accept(fd, (struct sockaddr*)&remote, &len);



    nty_coroutine *read_co;

    nty_coroutine_create(&read_co, server_reader, &cli_fd);



}

這樣的代碼是完全可以放在生成環(huán)境下面的。如果你的老板要打死你,你來找我,我?guī)湍惆涯憷习宕蛩?,為民除害?/p>

線程的API思維來使用協(xié)程,函數(shù)調(diào)用的性能來測試協(xié)程。

NtyCo封裝出來了若干接口,一類是協(xié)程本身的,二類是posix的異步封裝

協(xié)程API:while

  1. 協(xié)程創(chuàng)建

int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)

  1. 協(xié)程調(diào)度器的運行

void nty_schedule_run(void)

POSIX異步封裝API:

int nty_socket(int domain, int type, int protocol)

int nty_accept(int fd, struct sockaddr *addr, socklen_t *len)

int nty_recv(int fd, void *buf, int length)

int nty_send(int fd, const void *buf, int length)

int nty_close(int fd)

接口格式與POSIX標準的函數(shù)定義一致。

第三章 協(xié)程的實現(xiàn)之工作流程

問題:協(xié)程內(nèi)部是如何工作呢?

先來看一下協(xié)程服務(wù)器案例的代碼, 代碼參考:https://github.com/wangbojing/NtyCo/blob/master/nty_server_test.c

分別討論三個協(xié)程的比較晦澀的工作流程。第一個協(xié)程的創(chuàng)建;第二個IO異步操作;第三個協(xié)程子過程回調(diào)

3.1 創(chuàng)建協(xié)程

當我們需要異步調(diào)用的時候,我們會創(chuàng)建一個協(xié)程。比如accept返回一個新的sockfd,創(chuàng)建一個客戶端處理的子過程。再比如需要監(jiān)聽多個端口的時候,創(chuàng)建一個server的子過程,這樣多個端口同時工作的,是符合微服務(wù)的架構(gòu)的。

創(chuàng)建協(xié)程的時候,進行了如何的工作?創(chuàng)建API如下:

int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)

參數(shù)1:nty_coroutine **new_co,需要傳入空的協(xié)程的對象,這個對象是由內(nèi)部創(chuàng)建的,并且在函數(shù)返回的時候,會返回一個內(nèi)部創(chuàng)建的協(xié)程對象。

參數(shù)2:proc_coroutine func,協(xié)程的子過程。當協(xié)程被調(diào)度的時候,就會執(zhí)行該函數(shù)。

參數(shù)3:void *arg,需要傳入到新協(xié)程中的參數(shù)。

協(xié)程不存在親屬關(guān)系,都是一致的調(diào)度關(guān)系,接受調(diào)度器的調(diào)度。調(diào)用create API就會創(chuàng)建一個新協(xié)程,新協(xié)程就會加入到調(diào)度器的就緒隊列中。

創(chuàng)建的協(xié)程具體步驟會在《協(xié)程的實現(xiàn)之原語操作》來描述。

3.2 實現(xiàn)IO異步操作

大部分的朋友會關(guān)心IO異步操作如何實現(xiàn),在send與recv調(diào)用的時候,如何實現(xiàn)異步操作的。

先來看一下一段代碼:

while (1) {

 int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);



 for (i = 0;i < nready;i ++) {



 int sockfd = events[i].data.fd;

 if (sockfd == listenfd) {

 int connfd = accept(listenfd, xxx, xxxx);



            setnonblock(connfd);



            ev.events = EPOLLIN | EPOLLET;

            ev.data.fd = connfd;

            epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);



        } else {



            epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);

 recv(sockfd, buffer, length, 0);



 //parser_proto(buffer, length);



 send(sockfd, buffer, length, 0);

            epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, NULL);

        }

    }

}

在進行IO操作(recv,send)之前,先執(zhí)行了 epoll_ctl的del操作,將相應(yīng)的sockfd從epfd中刪除掉,在執(zhí)行完IO操作(recv,send)再進行epoll_ctl的add的動作。這段代碼看起來似乎好像沒有什么作用。

如果是在多個上下文中,這樣的做法就很有意義了。能夠保證sockfd只在一個上下文中能夠操作IO的。不會出現(xiàn)在多個上下文同時對一個IO進行操作的。協(xié)程的IO異步操作正式是采用此模式進行的。

把單一協(xié)程的工作與調(diào)度器的工作的劃分清楚,先引入兩個原語操作 resume,yield會在《協(xié)程的實現(xiàn)之原語操作》來講解協(xié)程所有原語操作的實現(xiàn),yield就是讓出運行,resume就是恢復運行。調(diào)度器與協(xié)程的上下文切換如下圖所示

在協(xié)程的上下文IO異步操作(nty_recv,nty_send)函數(shù),步驟如下:

  1. 將sockfd 添加到epoll管理中。
  2. 進行上下文環(huán)境切換,由協(xié)程上下文yield到調(diào)度器的上下文。
  3. 調(diào)度器獲取下一個協(xié)程上下文。Resume新的協(xié)程

IO異步操作的上下文切換的時序圖如下:

3.3 回調(diào)協(xié)程的子過程

在create協(xié)程后,何時回調(diào)子過程?何種方式回調(diào)子過程?

首先來回顧一下x86_64寄存器的相關(guān)知識。匯編與寄存器相關(guān)知識還會在《協(xié)程的實現(xiàn)之切換》繼續(xù)深入探討的。x86_64 的寄存器有16個64位寄存器,分別是:%rax, %rbx,

%rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12, %r13, %r14, %r15。

%rax 作為函數(shù)返回值使用的。

%rsp 棧指針寄存器,指向棧頂

%rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函數(shù)參數(shù),依次對應(yīng)第1參數(shù),第2參數(shù)。。。

%rbx, %rbp, %r12, %r13, %r14, %r15 用作數(shù)據(jù)存儲,遵循調(diào)用者使用規(guī)則,換句話說,就是隨便用。調(diào)用子函數(shù)之前要備份它,以防它被修改

%r10, %r11 用作數(shù)據(jù)存儲,就是使用前要先保存原值

以NtyCo的實現(xiàn)為例,來分析這個過程。CPU有一個非常重要的寄存器叫做EIP,用來存儲CPU運行下一條指令的地址。我們可以把回調(diào)函數(shù)的地址存儲到EIP中,將相應(yīng)的參數(shù)存儲到相應(yīng)的參數(shù)寄存器中。實現(xiàn)子過程調(diào)用的邏輯代碼如下:

void _exec(nty_coroutine *co) {

    co- >func(co- >arg); //子過程的回調(diào)函數(shù)

}



void nty_coroutine_init(nty_coroutine *co) {

 //ctx 就是協(xié)程的上下文

    co- >ctx.edi = (void*)co; //設(shè)置參數(shù)

    co- >ctx.eip = (void*)_exec; //設(shè)置回調(diào)函數(shù)入口

 //當實現(xiàn)上下文切換的時候,就會執(zhí)行入口函數(shù)_exec , _exec 調(diào)用子過程func

}

第四章 協(xié)程的實現(xiàn)之原語操作

問題:協(xié)程的內(nèi)部原語操作有哪些?分別如何實現(xiàn)的?

協(xié)程的核心原語操作:create, resume, yield。協(xié)程的原語操作有create怎么沒有exit?以NtyCo為例,協(xié)程一旦創(chuàng)建就不能有用戶自己銷毀,必須得以子過程執(zhí)行結(jié)束,就會自動銷毀協(xié)程的上下文數(shù)據(jù)。以_exec執(zhí)行入口函數(shù)返回而銷毀協(xié)程的上下文與相關(guān)信息。co->func(co->arg) 是子過程,若用戶需要長久運行協(xié)程,就必須要在func函數(shù)里面寫入循環(huán)等操作。所以NtyCo里面沒有實現(xiàn)exit的原語操作。

create:創(chuàng)建一個協(xié)程。

  1. 調(diào)度器是否存在,不存在也創(chuàng)建。調(diào)度器作為全局的單例。將調(diào)度器的實例存儲在線程的私有空間pthread_setspecific。
  2. 分配一個coroutine的內(nèi)存空間,分別設(shè)置coroutine的數(shù)據(jù)項,??臻g,棧大小,初始狀態(tài),創(chuàng)建時間,子過程回調(diào)函數(shù),子過程的調(diào)用參數(shù)。
  3. 將新分配協(xié)程添加到就緒隊列 ready_queue中

實現(xiàn)代碼如下:

int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg) {



    assert(pthread_once(&sched_key_once, nty_coroutine_sched_key_creator) == 0);

    nty_schedule *sched = nty_coroutine_get_sched();



 if (sched == NULL) {

        nty_schedule_create(0);



        sched = nty_coroutine_get_sched();

 if (sched == NULL) {

            printf("Failed to create schedulern");

 return -1;

        }

    }



    nty_coroutine *co = calloc(1, sizeof(nty_coroutine));

 if (co == NULL) {

        printf("Failed to allocate memory for new coroutinen");

 return -2;

    }



 //

 int ret = posix_memalign(&co- >stack, getpagesize(), sched- >stack_size);

 if (ret) {

        printf("Failed to allocate stack for new coroutinen");

        free(co);

 return -3;

    }



    co- >sched = sched;

    co- >stack_size = sched- >stack_size;

    co- >status = BIT(NTY_COROUTINE_STATUS_NEW); //

    co- >id = sched- >spawned_coroutines ++;

co- >func = func;



    co- >fd = -1;

co- >events = 0;



    co- >arg = arg;

    co- >birth = nty_coroutine_usec_now();

    *new_co = co;



    TAILQ_INSERT_TAIL(&co- >sched- >ready, co, ready_next);



 return 0;

}

yield:讓出CPU。

void nty_coroutine_yield(nty_coroutine *co)

參數(shù):當前運行的協(xié)程實例

調(diào)用后該函數(shù)不會立即返回,而是切換到最近執(zhí)行resume的上下文。該函數(shù)返回是在執(zhí)行resume的時候,會有調(diào)度器統(tǒng)一選擇resume的,然后再次調(diào)用yield的。resume與yield是兩個可逆過程的原子操作。

resume:恢復協(xié)程的運行權(quán)

int nty_coroutine_resume(nty_coroutine *co)

參數(shù):需要恢復運行的協(xié)程實例

調(diào)用后該函數(shù)也不會立即返回,而是切換到運行協(xié)程實例的yield的位置。返回是在等協(xié)程相應(yīng)事務(wù)處理完成后,主動yield會返回到resume的地方。

第五章 協(xié)程的實現(xiàn)之切換

問題:協(xié)程的上下文如何切換?切換代碼如何實現(xiàn)?

首先來回顧一下x86_64寄存器的相關(guān)知識。x86_64 的寄存器有16個64位寄存器,分別是:%rax, %rbx, %rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12,

%r13, %r14, %r15。

%rax 作為函數(shù)返回值使用的。

%rsp 棧指針寄存器,指向棧頂

%rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函數(shù)參數(shù),依次對應(yīng)第1參數(shù),第2參數(shù)。。。

%rbx, %rbp, %r12, %r13, %r14, %r15 用作數(shù)據(jù)存儲,遵循調(diào)用者使用規(guī)則,換句話說,就是隨便用。調(diào)用子函數(shù)之前要備份它,以防它被修改

%r10, %r11 用作數(shù)據(jù)存儲,就是使用前要先保存原值。

上下文切換,就是將CPU的寄存器暫時保存,再將即將運行的協(xié)程的上下文寄存器,分別mov到相對應(yīng)的寄存器上。此時上下文完成切換。如下圖所示:

切換_switch函數(shù)定義:

int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);

參數(shù)1:即將運行協(xié)程的上下文,寄存器列表

參數(shù)2:正在運行協(xié)程的上下文,寄存器列表

我們nty_cpu_ctx結(jié)構(gòu)體的定義,為了兼容x86,結(jié)構(gòu)體項命令采用的是x86的寄存器名字命名。

typedef struct _nty_cpu_ctx {

void *esp; //

void *ebp;

void *eip;

void *edi;

void *esi;

void *ebx;

void *r1;

void *r2;

void *r3;

void *r4;

void *r5;

} nty_cpu_ctx;

_switch返回后,執(zhí)行即將運行協(xié)程的上下文。是實現(xiàn)上下文的切換

_switch的實現(xiàn)代碼:

0: __asm__ (

1: "    .text                                  n"

2: "       .p2align 4,,15                                   n"

3: ".globl _switch                                          n"

4: ".globl __switch                                         n"

5: "_switch:                                                n"

6: "__switch:                                               n"

7: "       movq %rsp, 0(%rsi)      # save stack_pointer     n"

8: "       movq %rbp, 8(%rsi)      # save frame_pointer     n"

9: "       movq (%rsp), %rax       # save insn_pointer      n"

10: "       movq %rax, 16(%rsi)                              n"

11: "       movq %rbx, 24(%rsi)     # save rbx,r12-r15       n"

12: "       movq %r12, 32(%rsi)                              n"

13: "       movq %r13, 40(%rsi)                              n"

14: "       movq %r14, 48(%rsi)                              n"

15: "       movq %r15, 56(%rsi)                              n"

16: "       movq 56(%rdi), %r15                              n"

17: "       movq 48(%rdi), %r14                              n"

18: "       movq 40(%rdi), %r13     # restore rbx,r12-r15    n"

19: "       movq 32(%rdi), %r12                              n"

20: "       movq 24(%rdi), %rbx                              n"

21: "       movq 8(%rdi), %rbp      # restore frame_pointer  n"

22: "       movq 0(%rdi), %rsp      # restore stack_pointer  n"

23: "       movq 16(%rdi), %rax     # restore insn_pointer   n"

24: "       movq %rax, (%rsp)                                n"

25: "       ret                                              n"

26: );

按照x86_64的寄存器定義,%rdi保存第一個參數(shù)的值,即new_ctx的值,%rsi保存第二個參數(shù)的值,即保存cur_ctx的值。X86_64每個寄存器是64bit,8byte。

Movq %rsp, 0(%rsi) 保存在棧指針到cur_ctx實例的rsp項

Movq %rbp, 8(%rsi)

Movq (%rsp), %rax #將棧頂?shù)刂防锩娴闹荡鎯Φ絩ax寄存器中。Ret后出棧,執(zhí)行棧頂

Movq %rbp, 8(%rsi) #后續(xù)的指令都是用來保存CPU的寄存器到new_ctx的每一項中

Movq 8(%rdi), %rbp #將new_ctx的值

Movq 16(%rdi), %rax #將指令指針rip的值存儲到rax中

Movq %rax, (%rsp) # 將存儲的rip值的rax寄存器賦值給棧指針的地址的值。

Ret # 出棧,回到棧指針,執(zhí)行rip指向的指令。

上下文環(huán)境的切換完成。

第六章 協(xié)程的實現(xiàn)之定義

問題:協(xié)程如何定義? 調(diào)度器如何定義?

先來一道設(shè)計題:

設(shè)計一個協(xié)程的運行體R與運行體調(diào)度器S的結(jié)構(gòu)體

  1. 運行體R:包含運行狀態(tài){就緒,睡眠,等待},運行體回調(diào)函數(shù),回調(diào)參數(shù),棧指針,棧大小,當前運行體
  2. 調(diào)度器S:包含執(zhí)行集合{就緒,睡眠,等待}

這道設(shè)計題拆分兩個個問題,一個運行體如何高效地在多種狀態(tài)集合更換。調(diào)度器與運行體的功能界限。

6.1 運行體如何高效地在多種狀態(tài)集合更換

新創(chuàng)建的協(xié)程,創(chuàng)建完成后,加入到就緒集合,等待調(diào)度器的調(diào)度;協(xié)程在運行完成后,進行IO操作,此時IO并未準備好,進入等待狀態(tài)集合;IO準備就緒,協(xié)程開始運行,后續(xù)進行sleep操作,此時進入到睡眠狀態(tài)集合。

就緒(ready),睡眠(sleep),等待(wait)集合該采用如何數(shù)據(jù)結(jié)構(gòu)來存儲?

就緒(ready)集合并不沒有設(shè)置優(yōu)先級的選型,所有在協(xié)程優(yōu)先級一致,所以可以使用隊列來存儲就緒的協(xié)程,簡稱為就緒隊列(ready_queue)。

睡眠(sleep)集合需要按照睡眠時長進行排序,采用紅黑樹來存儲,簡稱睡眠樹(sleep_tree)紅黑樹在工程實用為, key為睡眠時長,value為對應(yīng)的協(xié)程結(jié)點。

等待(wait)集合,其功能是在等待IO準備就緒,等待IO也是有時長的,所以等待(wait)集合采用紅黑樹的來存儲,簡稱等待樹(wait_tree),此處借鑒nginx的設(shè)計。

數(shù)據(jù)結(jié)構(gòu)如下圖所示:

Coroutine就是協(xié)程的相應(yīng)屬性,status表示協(xié)程的運行狀態(tài)。sleep與wait兩顆紅黑樹,ready使用的隊列,比如某協(xié)程調(diào)用sleep函數(shù),加入睡眠樹(sleep_tree),status |= S即可。比如某協(xié)程在等待樹(wait_tree)中,而IO準備就緒放入ready隊列中,只需要移出等待樹(wait_tree),狀態(tài)更改status &= ~W即可。有一個前提條件就是不管何種運行狀態(tài)的協(xié)程,都在就緒隊列中,只是同時包含有其他的運行狀態(tài)。

6.2 調(diào)度器與協(xié)程的功能界限

每一協(xié)程都需要使用的而且可能會不同屬性的,就是協(xié)程屬性。每一協(xié)程都需要的而且數(shù)據(jù)一致的,就是調(diào)度器的屬性。比如棧大小的數(shù)值,每個協(xié)程都一樣的后不做更改可以作為調(diào)度器的屬性,如果每個協(xié)程大小不一致,則可以作為協(xié)程的屬性。

用來管理所有協(xié)程的屬性,作為調(diào)度器的屬性。比如epoll用來管理每一個協(xié)程對應(yīng)的IO,是需要作為調(diào)度器屬性。

按照前面幾章的描述,定義一個協(xié)程結(jié)構(gòu)體需要多少域,我們描述了每一個協(xié)程有自己的上下文環(huán)境,需要保存CPU的寄存器ctx;需要有子過程的回調(diào)函數(shù)func;需要有子過程回調(diào)函數(shù)的參數(shù) arg;需要定義自己的??臻g stack;需要有自己??臻g的大小 stack_size;需要定義協(xié)程的創(chuàng)建時間 birth;需要定義協(xié)程當前的運行狀態(tài) status;需要定當前運行狀態(tài)的結(jié)點(ready_next, wait_node, sleep_node);需要定義協(xié)程id;需要定義調(diào)度器的全局對象 sched。

協(xié)程的核心結(jié)構(gòu)體如下:

typedef struct _nty_coroutine {



    nty_cpu_ctx ctx;

    proc_coroutine func;

 void *arg;

 size_t stack_size;



    nty_coroutine_status status;

    nty_schedule *sched;



 uint64_t birth;

 uint64_t id;



 void *stack;



 RB_ENTRY(_nty_coroutine) sleep_node;

 RB_ENTRY(_nty_coroutine) wait_node;



 TAILQ_ENTRY(_nty_coroutine) ready_next;

 TAILQ_ENTRY(_nty_coroutine) defer_next;



} nty_coroutine;

調(diào)度器是管理所有協(xié)程運行的組件,協(xié)程與調(diào)度器的運行關(guān)系。

調(diào)度器的屬性,需要有保存CPU的寄存器上下文 ctx,可以從協(xié)程運行狀態(tài)yield到調(diào)度器運行的。從協(xié)程到調(diào)度器用yield,從調(diào)度器到協(xié)程用resume

以下為協(xié)程的定義。

typedef struct _nty_coroutine_queue nty_coroutine_queue;



typedef struct _nty_coroutine_rbtree_sleep nty_coroutine_rbtree_sleep;

typedef struct _nty_coroutine_rbtree_wait nty_coroutine_rbtree_wait;



typedef struct _nty_schedule {

 uint64_t birth;

nty_cpu_ctx ctx;



 struct _nty_coroutine *curr_thread;

 int page_size;



 int poller_fd;

 int eventfd;

 struct epoll_event eventlist[NTY_CO_MAX_EVENTS];

 int nevents;



 int num_new_events;



    nty_coroutine_queue ready;

    nty_coroutine_rbtree_sleep sleeping;

    nty_coroutine_rbtree_wait waiting;



} nty_schedule;

第七章 協(xié)程的實現(xiàn)之調(diào)度器

問題:協(xié)程如何被調(diào)度?

調(diào)度器的實現(xiàn),有兩種方案,一種是生產(chǎn)者消費者模式,另一種多狀態(tài)運行。

7.1 生產(chǎn)者消費者模式

邏輯代碼如下:

while (1) {



 //遍歷睡眠集合,將滿足條件的加入到ready

        nty_coroutine *expired = NULL;

 while ((expired = sleep_tree_expired(sched)) != ) {

            TAILQ_ADD(&sched- >ready, expired);

        }



 //遍歷等待集合,將滿足添加的加入到ready

        nty_coroutine *wait = NULL;

 int nready = epoll_wait(sched- >epfd, events, EVENT_MAX, 1);

 for (i = 0;i < nready;i ++) {

            wait = wait_tree_search(events[i].data.fd);

            TAILQ_ADD(&sched- >ready, wait);

        }



 // 使用resume回復ready的協(xié)程運行權(quán)

 while (!TAILQ_EMPTY(&sched- >ready)) {

            nty_coroutine *ready = TAILQ_POP(sched- >ready);

            resume(ready);

        }

    }

7.2 多狀態(tài)運行

實現(xiàn)邏輯代碼如下:

while (1) {



 //遍歷睡眠集合,使用resume恢復expired的協(xié)程運行權(quán)

        nty_coroutine *expired = NULL;

 while ((expired = sleep_tree_expired(sched)) != ) {

            resume(expired);

        }



 //遍歷等待集合,使用resume恢復wait的協(xié)程運行權(quán)

        nty_coroutine *wait = NULL;

 int nready = epoll_wait(sched- >epfd, events, EVENT_MAX, 1);

 for (i = 0;i < nready;i ++) {

            wait = wait_tree_search(events[i].data.fd);

            resume(wait);

        }



 // 使用resume恢復ready的協(xié)程運行權(quán)

 while (!TAILQ_EMPTY(sched- >ready)) {

            nty_coroutine *ready = TAILQ_POP(sched- >ready);

            resume(ready);

        }

    }

第八章 協(xié)程性能測試

測試環(huán)境:4臺VMWare 虛擬機

1臺服務(wù)器 6G內(nèi)存,4核CPU

3臺客戶端 2G內(nèi)存,2核CPU

操作系統(tǒng):ubuntu 14.04

服務(wù)器端測試代碼:https://https://github.com/wangbojing/NtyCotyCo

客戶端測試代碼:https://https://github.com/wangbojing/c1000k_test/blob/master/client_mutlport_epoll.c1000k_test/blob/master/client_mutlport_epoll.c

按照每一個連接啟動一個協(xié)程來測試。每一個協(xié)程??臻g 4096byte

6G內(nèi)存 –> 測試協(xié)程數(shù)量100W無異常。并且能夠正常收發(fā)數(shù)據(jù)。

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學習之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 程序
    +關(guān)注

    關(guān)注

    117

    文章

    3785

    瀏覽量

    81004
  • 代碼
    +關(guān)注

    關(guān)注

    30

    文章

    4779

    瀏覽量

    68521
  • 調(diào)度器
    +關(guān)注

    關(guān)注

    0

    文章

    98

    瀏覽量

    5245
收藏 人收藏

    評論

    相關(guān)推薦

    談?wù)?b class='flag-5'>協(xié)的那些事兒

    隨著異步編程的發(fā)展以及各種并發(fā)框架的普及,協(xié)作為一種異步編程規(guī)范在各類語言中地位逐步提高。我們不單單會在自己的程序中使用協(xié),各類框架如fastapi,aiohttp等也都是基于異步
    的頭像 發(fā)表于 01-26 11:36 ?1111次閱讀
    談?wù)?b class='flag-5'>協(xié)</b><b class='flag-5'>程</b>的那些事兒

    協(xié)和線程有什么區(qū)別

    協(xié)和線程的區(qū)別協(xié)和線程的共同目的之一是實現(xiàn)系統(tǒng)資源的上下文調(diào)用,不過它們的實現(xiàn)層級不同;線程
    發(fā)表于 12-10 06:23

    怎樣使用C語言去實現(xiàn)Linux系統(tǒng)協(xié)

    Linux系統(tǒng)編程練手項目:使用C語言實現(xiàn)協(xié) 6年嵌入式開發(fā)經(jīng)驗,在多家半...
    發(fā)表于 12-23 06:58

    Tars在ARM平臺上的移植是如何去實現(xiàn)

    計時器來實現(xiàn),具體實現(xiàn)如下。原x86嵌匯編實現(xiàn):支持ARM64平臺后的實現(xiàn):3 協(xié)
    發(fā)表于 03-30 11:30

    Tars移植到ARM64平臺上的過程實現(xiàn)

    計時器來實現(xiàn),具體實現(xiàn)如下。原x86嵌匯編實現(xiàn):支持ARM64平臺后的實現(xiàn):3 協(xié)
    發(fā)表于 07-05 14:59

    關(guān)于C++ 20協(xié)最全面詳解

    花了一兩周的時間后,我想寫寫 C++20 協(xié)的基本用法,因為 C++ 的協(xié)讓我感到很奇怪,寫一個協(xié)
    的頭像 發(fā)表于 04-12 11:10 ?1.3w次閱讀
    關(guān)于C++ 20<b class='flag-5'>協(xié)</b><b class='flag-5'>程</b>最全面詳解

    Python后端項目的協(xié)是什么

    最近公司 Python 后端項目進行重構(gòu),整個后端邏輯基本都變更為采用“異步”協(xié)的方式實現(xiàn)??粗鴿M屏幕經(jīng)過 async await(協(xié)
    的頭像 發(fā)表于 09-23 14:38 ?1326次閱讀

    Python協(xié)與JavaScript協(xié)的對比及經(jīng)驗技巧

    對這兩個語言有興趣的新人理解和吸收。 共同訴求隨著 cpu 多核化,都需要實現(xiàn)由于自身歷史原因(單線程環(huán)境)下的并發(fā)功能 簡化代碼,避免回調(diào)地獄,關(guān)鍵字支持 有效利用操作系統(tǒng)資源和硬件:協(xié)相比線程,占用資源更少,上下文更快 什
    的頭像 發(fā)表于 10-20 14:30 ?1928次閱讀

    通過例子由淺入深的理解yield協(xié)

    send:send() 方法致使協(xié)程前進到下一個yield 語句,另外,生成器可以作為協(xié)使用
    的頭像 發(fā)表于 08-23 11:12 ?2017次閱讀

    使用channel控制協(xié)數(shù)量

    goroutine 是輕量級線程,調(diào)度由 Go 運行時進行管理的。Go 語言的并發(fā)控制主要使用關(guān)鍵字 go 開啟協(xié) goroutine。Go 協(xié)(Goroutine)之間通過信道(
    的頭像 發(fā)表于 09-19 15:06 ?1133次閱讀

    詳解Linux線程、線程與異步編程、協(xié)與異步

    協(xié)不是系統(tǒng)級線程,很多時候協(xié)被稱為“輕量級線程”、“微線程”、“纖(fiber)”等。簡單來說可以認為
    的頭像 發(fā)表于 03-16 15:49 ?977次閱讀

    協(xié)的概念及協(xié)的掛起函數(shù)介紹

    協(xié)是一種輕量級的線程,它可以在單個線程中實現(xiàn)并發(fā)執(zhí)行。與線程不同,協(xié)不需要操作系統(tǒng)的上下文切換,因此可以更高效地使用系統(tǒng)資源。Kotli
    的頭像 發(fā)表于 04-19 10:20 ?887次閱讀

    FreeRTOS任務(wù)與協(xié)介紹

    FreeRTOS 中應(yīng)用既可以使用任務(wù),也可以使用協(xié)(Co-Routine),或者兩者混合使用。但是任務(wù)和協(xié)使用不同的API函數(shù),因此不能通過隊列(或信號量)將數(shù)據(jù)從任務(wù)發(fā)送給協(xié)
    的頭像 發(fā)表于 09-28 11:02 ?986次閱讀

    協(xié)的作用、結(jié)構(gòu)及原理

    本文介紹了協(xié)的作用、結(jié)構(gòu)、原理,并使用C++和匯編實現(xiàn)了64位系統(tǒng)下的協(xié)池。文章內(nèi)容避免了協(xié)
    的頭像 發(fā)表于 11-08 16:39 ?1121次閱讀
    <b class='flag-5'>協(xié)</b><b class='flag-5'>程</b>的作用、結(jié)構(gòu)及原理

    Linux線程、線程與異步編程、協(xié)與異步介紹

    協(xié)不是系統(tǒng)級線程,很多時候協(xié)被稱為“輕量級線程”、“微線程”、“纖(fiber)”等。簡單來說可以認為
    的頭像 發(fā)表于 11-11 11:35 ?1138次閱讀
    Linux線程、線程與異步編程、<b class='flag-5'>協(xié)</b><b class='flag-5'>程</b>與異步介紹
    RM新时代网站-首页