開發(fā)環(huán)境:
RT-Thread版本:4.0.3
操作系統(tǒng):Windows10
Keil版本:V5.30
RT-Thread Studio版本:2.0.1
LWIP:2.0.2
3 Select/Poll概述
在LWIP中,如果要實現(xiàn)并發(fā)服務(wù)器,可以基于Sequentaial API來實現(xiàn),這種方式需要使用多線程,也就是為每個連接創(chuàng)建一個線程來處理數(shù)據(jù)。而在資源受限的嵌入式設(shè)備來說,如果為每個連接都創(chuàng)建一個線程,這種資源的消耗是巨大的,因此,我們需要換一種實現(xiàn)思路,也就是使用IO多路復(fù)用的機制來實現(xiàn),也就是select機制。
Select/Poll則是POSIX所規(guī)定,一般操作系統(tǒng)或協(xié)議棧均有實現(xiàn)。
值得注意的是,poll和select都是基于內(nèi)核函數(shù)sys_poll實現(xiàn)的,不同在于在Linux系統(tǒng)中select是從BSD Unix系統(tǒng)繼承而來,poll則是從System V Unix系統(tǒng)繼承而來,因此兩種方式相差不大。poll函數(shù)沒有最大文件描述符數(shù)量的限制。poll和 select與一樣,大量文件描述符的數(shù)組被整體復(fù)制于用戶和內(nèi)核的地址空間之間,開銷隨著文件描述符數(shù)量的增加而線性增大。
3.1 Select函數(shù)
在BSD Socket中,select函數(shù)原型如下:
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,struct timeval *timeout);
【參數(shù)說明】
- nfds:select監(jiān)視的文件句柄數(shù),一般設(shè)為要監(jiān)視各文件中的最大文件描述符值加1。
- readfds:文件描述符集合監(jiān)視文件集中的任何文件是否有數(shù)據(jù)可讀,當(dāng)select函數(shù)返回的時候,readfds將清除其中不可讀的文件描述符,只留下可讀的文件描述符。
- writefds:文件描述符集合監(jiān)視文件集中的任何文件是否有數(shù)據(jù)可寫,當(dāng)select函數(shù)返回的時候,writefds將清除其中不可寫的文件描述符,只留下可寫的文件描述符。
- exceptfds:文件集將監(jiān)視文件集中的任何文件是否發(fā)生錯誤,可用于其他的用途,例如,監(jiān)視帶外數(shù)據(jù)OOB,帶外數(shù)據(jù)使用MSG_OOB標(biāo)志發(fā)送到套接字上。當(dāng)select函數(shù)返回的時候,exceptfds將清除其中的其他文件描述符,只留下標(biāo)記有OOB數(shù)據(jù)的文件描述符。
- timeout參數(shù)是一個指向 struct timeval類型的指針,它可以使 select()在等待 timeout時間后若沒有文件描述符準(zhǔn)備好則返回。其timeval結(jié)構(gòu)用于指定這段時間的秒數(shù)和微秒數(shù)。它可以使select處于三種狀態(tài):
(1)若將NULL以形參傳入,即不傳入時間結(jié)構(gòu),就是將select置于阻塞狀態(tài),一定等到監(jiān)視文件描述符集合中某個文件描述符發(fā)生變化為止;
(2)若將時間值設(shè)為0秒0毫秒,就變成一個純粹的非阻塞函數(shù),不管文件描述符是否有變化,都立刻返回繼續(xù)執(zhí)行,文件無變化返回0,有變化返回一個正值;
(3) timeout的值大于0,這就是等待的超時時間,即select在timeout時間內(nèi)阻塞,超時時間之內(nèi)有事件到來就返回了,否則在超時后不管怎樣一定返回,返回值同上述。
timeval結(jié)構(gòu)體定義
struct timeval
{
int tv_sec;/*秒 */
int tv_usec;/*微妙 */
};
【返回值】
- int:若有就緒描述符返回其數(shù)目,若超時則為0,若出錯則為-1
下列操作用來設(shè)置、清除、判斷文件描述符集合。
FD_ZERO(fd_set *set);//清除一個文件描述符集。
FD_SET(int fd,fd_set *set);//將一個文件描述符加入文件描述符集中。
FD_CLR(int fd,fd_set *set);//將一個文件描述符從文件描述符集中清除。
FD_ISSET(int fd,fd_set *set);//判斷文件描述符是否被置位
fd_set可以理解為一個集合,這個集合中存放的是文件描述符(file descriptor),即文件句柄。中間的三個參數(shù)指定我們要讓內(nèi)核測試讀、寫和異常條件的文件描述符集合。如果對某一個的條件不感興趣,就可以把它設(shè)為空指針。
select()的機制中提供一種fd_set的數(shù)據(jù)結(jié)構(gòu),實際上是一個long類型的數(shù)組,每一個數(shù)組元素都能與打開的文件句柄(不管是Socket句柄,還是其他文件或命名管道或設(shè)備句柄)建立聯(lián)系,建立聯(lián)系的工作由程序員完成,當(dāng)調(diào)用select()時,由內(nèi)核根據(jù)IO狀態(tài)修改fd_set的內(nèi)容,由此來通知執(zhí)行了select()的進程哪一Socket或文件可讀。
3.2 Poll函數(shù)
poll的函數(shù)原型:
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
【參數(shù)說明】
- fds:fds是一個struct pollfd類型的數(shù)組,用于存放需要檢測其狀態(tài)的socket描述符,并且調(diào)用poll函數(shù)之后fds數(shù)組不會被清空;一個pollfd結(jié)構(gòu)體表示一個被監(jiān)視的文件描述符,通過傳遞fds指示 poll()監(jiān)視多個文件描述符。
struct pollfd原型如下:
typedef struct pollfd {
int fd; //需要被檢測或選擇的文件描述符
short events; //對文件描述符fd上感興趣的事件
short revents; //文件描述符fd上當(dāng)前實際發(fā)生的事件
} pollfd_t;
其中,結(jié)構(gòu)體的events域是監(jiān)視該文件描述符的事件掩碼,由用戶來設(shè)置這個域,結(jié)構(gòu)體的revents域是文件描述符的操作結(jié)果事件掩碼,內(nèi)核在調(diào)用返回時設(shè)置這個域。
- nfds:記錄數(shù)組fds中描述符的總數(shù)量。
- timeout:指定等待的毫秒數(shù),無論 I/O是否準(zhǔn)備好,poll()都會返回,和select函數(shù)是類似的。
【返回值】
- int:函數(shù)返回fds集合中就緒的讀、寫,或出錯的描述符數(shù)量,返回0表示超時,返回-1表示出錯;
poll改變了文件描述符集合的描述方式,使用了pollfd結(jié)構(gòu)而不是select的fd_set結(jié)構(gòu),使得poll支持的文件描述符集合限制遠大于select的1024。這也是和select不同的地方。
4 LWIP的select/poll實現(xiàn)
好了,接下來看看LWIP是如何實現(xiàn)select/poll的。
4.1 lwip_select實現(xiàn)
目前LWIP已經(jīng)完全實現(xiàn)select,它是基于信號量的機制來實現(xiàn)的,函數(shù)名是lwip_select。
LWIP實現(xiàn)Select的基本流程如下:
1.依次檢套接字集合中的每個套接字的事件表示,若有效,則記錄該套接字。
2.若存在一個或多事件,則返回,否則創(chuàng)建一個信號量并阻塞等待,記錄信號量的結(jié)構(gòu)體是select_cb_list,是一個鏈表,在[sockets.c]文件中定義的:
static struct lwip_select_cb *select_cb_list;//管理select的鏈表
lwip_select_cb原型如下:
/** Description for a task waiting in select */
struct lwip_select_cb {
/** Pointer to the next waiting task */
struct lwip_select_cb *next;
/** Pointer to the previous waiting task */
struct lwip_select_cb *prev;
#if LWIP_SOCKET_SELECT
/** readset passed to select */
fd_set *readset;
/** writeset passed to select */
fd_set *writeset;
/** unimplemented: exceptset passed to select */
fd_set *exceptset;
#endif /* LWIP_SOCKET_SELECT */
#if LWIP_SOCKET_POLL
/** fds passed to poll; NULL if select */
struct pollfd *poll_fds;
/** nfds passed to poll; 0 if select */
nfds_t poll_nfds;
#endif /* LWIP_SOCKET_POLL */
/** don't signal the same semaphore twice: set to 1 when signalled */
int sem_signalled;//是否釋放信號領(lǐng)
/** semaphore to wake up a task waiting for select */
SELECT_SEM_T sem;//select阻塞的信號量
};
3.當(dāng)套接字集合初始化,會向netconn結(jié)構(gòu)注冊回調(diào)函數(shù)event_callback,當(dāng)有是事件發(fā)生時,回調(diào)函數(shù)就被被執(zhí)行,而且回調(diào)函數(shù)會遍歷select_cb_list,如果套接字在select_cb_list中,則select_cb_list釋放一個信號量。
好了,接下來看看LWIP的select具體實現(xiàn),其原型如下:
int lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,
struct timeval *timeout)
{
u32_t waitres = 0;//記錄select等待時間
int nready;
fd_set lreadset, lwriteset, lexceptset;//記錄發(fā)生事件的套接字
u32_t msectimeout;
int i;
int maxfdp2;
#if LWIP_NETCONN_SEM_PER_THREAD
int waited = 0;
#endif
#if LWIP_NETCONN_FULLDUPLEX
fd_set used_sockets;
#endif
SYS_ARCH_DECL_PROTECT(lev);
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select(%d, %p, %p, %p, tvsec=%"S32_F" tvusec=%"S32_F")\n",
maxfdp1, (void *)readset, (void *) writeset, (void *) exceptset,
timeout ? (s32_t)timeout->tv_sec : (s32_t) - 1,
timeout ? (s32_t)timeout->tv_usec : (s32_t) - 1));
if ((maxfdp1 < 0) || (maxfdp1 > LWIP_SELECT_MAXNFDS)) {
set_errno(EINVAL);
return -1;
}
lwip_select_inc_sockets_used(maxfdp1, readset, writeset, exceptset, &used_sockets);
/* Go through each socket in each list to count number of sockets which
currently match */
//檢測套接字集合中是否發(fā)生事件
nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);
if (nready < 0) {
/* one of the sockets in one of the fd_sets was invalid */
set_errno(EBADF);
lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
return -1;
} else if (nready > 0) {
/* one or more sockets are set, no need to wait */
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: nready=%d\n", nready));
} else {
/* If we don't have any current events, then suspend if we are supposed to */
if (timeout && timeout->tv_sec == 0 && timeout->tv_usec == 0) {
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: no timeout, returning 0\n"));
/* This is OK as the local fdsets are empty and nready is zero,
or we would have returned earlier. */
} else {
/* None ready: add our semaphore to list:
We don't actually need any dynamic memory. Our entry on the
list is only valid while we are in this function, so it's ok
to use local variables (unless we're running in MPU compatible
mode). */
API_SELECT_CB_VAR_DECLARE(select_cb);
API_SELECT_CB_VAR_ALLOC(select_cb, set_errno(ENOMEM); lwip_select_dec_sockets_used(maxfdp1, &used_sockets); return -1);
memset(&API_SELECT_CB_VAR_REF(select_cb), 0, sizeof(struct lwip_select_cb));
API_SELECT_CB_VAR_REF(select_cb).readset = readset;
API_SELECT_CB_VAR_REF(select_cb).writeset = writeset;
API_SELECT_CB_VAR_REF(select_cb).exceptset = exceptset;
#if LWIP_NETCONN_SEM_PER_THREAD
API_SELECT_CB_VAR_REF(select_cb).sem = LWIP_NETCONN_THREAD_SEM_GET();
#else /* LWIP_NETCONN_SEM_PER_THREAD */
if (sys_sem_new(&API_SELECT_CB_VAR_REF(select_cb).sem, 0) != ERR_OK) {
/* failed to create semaphore */
set_errno(ENOMEM);
lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
API_SELECT_CB_VAR_FREE(select_cb);
return -1;
}
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
lwip_link_select_cb(&API_SELECT_CB_VAR_REF(select_cb));
/* Increase select_waiting for each socket we are interested in */
maxfdp2 = maxfdp1;
for (i = LWIP_SOCKET_OFFSET; i < maxfdp1; i++) {
if ((readset && FD_ISSET(i, readset)) ||
(writeset && FD_ISSET(i, writeset)) ||
(exceptset && FD_ISSET(i, exceptset))) {
struct lwip_sock *sock;
SYS_ARCH_PROTECT(lev);
sock = tryget_socket_unconn_locked(i);
if (sock != NULL) {
sock->select_waiting++;//讀寫異常通知,并且socket是存在的,則會將select_wainting增加1
if (sock->select_waiting == 0) {
/* overflow - too many threads waiting */
sock->select_waiting--;
nready = -1;
maxfdp2 = i;
SYS_ARCH_UNPROTECT(lev);
done_socket(sock);
set_errno(EBUSY);
break;
}
SYS_ARCH_UNPROTECT(lev);
done_socket(sock);
} else {
/* Not a valid socket */
nready = -1;
maxfdp2 = i;
SYS_ARCH_UNPROTECT(lev);
set_errno(EBADF);
break;
}
}
}
if (nready >= 0) {
/* Call lwip_selscan again: there could have been events between
the last scan (without us on the list) and putting us on the list! */
//執(zhí)行完上述操作,再次掃描一次是否有socket有事件產(chǎn)生
nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);
if (!nready) {
/* Still none ready, just wait to be woken */
if (timeout == 0) {
/* Wait forever */
msectimeout = 0;
} else {
long msecs_long = ((timeout->tv_sec * 1000) + ((timeout->tv_usec + 500) / 1000));
if (msecs_long <= 0) {
/* Wait 1ms at least (0 means wait forever) */
msectimeout = 1;
} else {
msectimeout = (u32_t)msecs_long;
}
}
? //休眠指定時間,讓出cpu控制權(quán)
waitres = sys_arch_sem_wait(SELECT_SEM_PTR(API_SELECT_CB_VAR_REF(select_cb).sem), msectimeout);
#if LWIP_NETCONN_SEM_PER_THREAD
waited = 1;
#endif
}
}
?
/* Decrease select_waiting for each socket we are interested in */
for (i = LWIP_SOCKET_OFFSET; i < maxfdp2; i++) {
if ((readset && FD_ISSET(i, readset)) ||
(writeset && FD_ISSET(i, writeset)) ||
(exceptset && FD_ISSET(i, exceptset))) {
struct lwip_sock *sock;
SYS_ARCH_PROTECT(lev);
sock = tryget_socket_unconn_locked(i);
if (sock != NULL) {
/* for now, handle select_waiting==0... */
LWIP_ASSERT("sock->select_waiting > 0", sock->select_waiting > 0);
if (sock->select_waiting > 0) {
sock->select_waiting--;//休眠結(jié)束,將對應(yīng)socket->select_waiting減1
}
SYS_ARCH_UNPROTECT(lev);
done_socket(sock);
} else {
SYS_ARCH_UNPROTECT(lev);
/* Not a valid socket */
nready = -1;
set_errno(EBADF);
}
}
}
lwip_unlink_select_cb(&API_SELECT_CB_VAR_REF(select_cb));
#if LWIP_NETCONN_SEM_PER_THREAD
if (API_SELECT_CB_VAR_REF(select_cb).sem_signalled && (!waited || (waitres == SYS_ARCH_TIMEOUT))) {
/* don't leave the thread-local semaphore signalled */
sys_arch_sem_wait(API_SELECT_CB_VAR_REF(select_cb).sem, 1);
}
#else /* LWIP_NETCONN_SEM_PER_THREAD */
sys_sem_free(&API_SELECT_CB_VAR_REF(select_cb).sem);
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
API_SELECT_CB_VAR_FREE(select_cb);
if (nready < 0) {
/* This happens when a socket got closed while waiting */
lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
return -1;
}
if (waitres == SYS_ARCH_TIMEOUT) {
/* Timeout */
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: timeout expired\n"));
/* This is OK as the local fdsets are empty and nready is zero,
or we would have returned earlier. */
} else {
/* See what's set now after waiting */
nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: nready=%d\n", nready));
}
}
}
lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
set_errno(0);
if (readset) {
*readset = lreadset;
}
if (writeset) {
*writeset = lwriteset;
}
if (exceptset) {
*exceptset = lexceptset;
}
return nready;
}
以上代碼最核心的就是socket->select_waiting加1和減1的地方,當(dāng)socket存在且的確需要監(jiān)聽事件,且并不是進來事件就已經(jīng)產(chǎn)生或者已經(jīng)超時,一定會加1;然后線程會有可能會進行休眠;正常情況下,休眠結(jié)束后,socket->select_waiting減1,離開該函數(shù),socket->select_waiting恢復(fù)原值。但是,如果在休眠期間進行了close(socket),則通過try_socket(socket)獲取不到socket結(jié)構(gòu)體,則socket->select_waiting不會進行減1,后面執(zhí)行一系列語句后,退出該函數(shù),socket->select_waiting沒有恢復(fù)原值,且比進來時大1。針對該函數(shù),socket->select_waiting加1的次數(shù)是>=減1的次數(shù),所以如果只要在函數(shù)退出時沒有恢復(fù)原值,則socket->select_waiting永遠不可能再減為0了,此時socket資源就出現(xiàn)了假占用,該socket再也不能被其他人使用了。
lwip_select函數(shù)實現(xiàn)的具體流程如下:
Select的實現(xiàn)有個重要的結(jié)構(gòu)體lwip_sock,其原型如下:
/** Contains all internal pointers and states used for a socket */
struct lwip_sock {
/** sockets currently are built on netconns, each socket has one netconn */
struct netconn *conn;
/** data that was left from the previous read */
union lwip_sock_lastdata lastdata;
#if LWIP_SOCKET_SELECT || LWIP_SOCKET_POLL
/** number of times data was received, set by event_callback(),
tested by the receive and select functions */
s16_t rcvevent;
/** number of times data was ACKed (free send buffer), set by event_callback(),
tested by select */
u16_t sendevent;
/** error happened for this socket, set by event_callback(), tested by select */
u16_t errevent;
/** counter of how many threads are waiting for this socket using select */
SELWAIT_T select_waiting;
#endif /* LWIP_SOCKET_SELECT || LWIP_SOCKET_POLL */
#if LWIP_NETCONN_FULLDUPLEX
/* counter of how many threads are using a struct lwip_sock (not the 'int') */
u8_t fd_used;
/* status of pending close/delete actions */
u8_t fd_free_pending;
#define LWIP_SOCK_FD_FREE_TCP 1
#define LWIP_SOCK_FD_FREE_FREE 2
#endif
#ifdef SAL_USING_POSIX
rt_wqueue_t wait_head;
#endif
};
在socket數(shù)據(jù)接收時,lwip_sock利用netconn相關(guān)的接收函數(shù)獲得一個pbuf(對于TCP)或者一個netbuf(對于UDP)數(shù)據(jù),而這二者封裝的數(shù)據(jù)可能大于socket用戶指定的數(shù)據(jù)接收長度,因此在這種情況下,這兩個數(shù)據(jù)包需要暫時保存在socket中,以待用戶下一次讀取,這里lastdata就用于指向未被用戶完全讀取的數(shù)據(jù)包,而lastoffset則指向了未讀取的數(shù)據(jù)在數(shù)據(jù)包中的偏移。lwip_sock最后的五個字段是為select機制實現(xiàn)時使用的。
lwip_socket是上層Socket API中的實現(xiàn),它對netconn結(jié)構(gòu)的封裝和增強,描述一個具體連接。它基于內(nèi)核netconn來實現(xiàn)所有邏輯,conn指向了與socket對應(yīng)的netconn結(jié)構(gòu)。Netconn原型如下:
/** A callback prototype to inform about events for a netconn */
typedef void (* netconn_callback)(struct netconn *, enum netconn_evt, u16_t len);
/** A netconn descriptor */
struct netconn {
/** type of the netconn (TCP, UDP or RAW) */
enum netconn_type type;
/** current state of the netconn */
enum netconn_state state;
/** the lwIP internal protocol control block */
union {
struct ip_pcb *ip;
struct tcp_pcb *tcp;
struct udp_pcb *udp;
struct raw_pcb *raw;
} pcb;
/** the last asynchronous unreported error this netconn had */
err_t pending_err;
#if !LWIP_NETCONN_SEM_PER_THREAD
/** sem that is used to synchronously execute functions in the core context */
sys_sem_t op_completed;
#endif
/** mbox where received packets are stored until they are fetched
by the netconn application thread (can grow quite big) */
sys_mbox_t recvmbox;
#if LWIP_TCP
/** mbox where new connections are stored until processed
by the application thread */
sys_mbox_t acceptmbox;
#endif /* LWIP_TCP */
#if LWIP_NETCONN_FULLDUPLEX
/** number of threads waiting on an mbox. This is required to unblock
all threads when closing while threads are waiting. */
int mbox_threads_waiting;
#endif
/** only used for socket layer */
#if LWIP_SOCKET
int socket;
#endif /* LWIP_SOCKET */
#if LWIP_SO_SNDTIMEO
/** timeout to wait for sending data (which means enqueueing data for sending
in internal buffers) in milliseconds */
s32_t send_timeout;
#endif /* LWIP_SO_RCVTIMEO */
#if LWIP_SO_RCVTIMEO
/** timeout in milliseconds to wait for new data to be received
(or connections to arrive for listening netconns) */
u32_t recv_timeout;
#endif /* LWIP_SO_RCVTIMEO */
#if LWIP_SO_RCVBUF
/** maximum amount of bytes queued in recvmbox
not used for TCP: adjust TCP_WND instead! */
int recv_bufsize;
/** number of bytes currently in recvmbox to be received,
tested against recv_bufsize to limit bytes on recvmbox
for UDP and RAW, used for FIONREAD */
int recv_avail;
#endif /* LWIP_SO_RCVBUF */
#if LWIP_SO_LINGER
/** values <0 mean linger is disabled, values > 0 are seconds to linger */
s16_t linger;
#endif /* LWIP_SO_LINGER */
/** flags holding more netconn-internal state, see NETCONN_FLAG_* defines */
u8_t flags;
#if LWIP_TCP
/** TCP: when data passed to netconn_write doesn't fit into the send buffer,
this temporarily stores the message.
Also used during connect and close. */
struct api_msg *current_msg;
#endif /* LWIP_TCP */
/** A callback function that is informed about events for this netconn */
netconn_callback callback;
};
前文已經(jīng)提到,套接字集合初始化時,會向netconn結(jié)構(gòu)注冊回調(diào)函數(shù)event_callback,這個回調(diào)函數(shù)就是結(jié)構(gòu)體netconn中netconn_callback,接下來看看netconn_callback函數(shù)原型:
/**
* Callback registered in the netconn layer for each socket-netconn.
* Processes recvevent (data available) and wakes up tasks waiting for select.
*
* @note for LWIP_TCPIP_CORE_LOCKING any caller of this function
* must have the core lock held when signaling the following events
* as they might cause select_list_cb to be checked:
* NETCONN_EVT_RCVPLUS數(shù)據(jù)被內(nèi)核接收則會產(chǎn)生該事件
* NETCONN_EVT_SENDPLUS數(shù)據(jù)成功發(fā)送則產(chǎn)生該事件
* NETCONN_EVT_ERROR連接錯誤則產(chǎn)生該事件
* This requirement will be asserted in select_check_waiters()
*/
static void
event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len)
{
int s, check_waiters;
struct lwip_sock *sock;
SYS_ARCH_DECL_PROTECT(lev);
LWIP_UNUSED_ARG(len);
/* Get socket */
if (conn) {
s = conn->socket;
if (s < 0) {
/* Data comes in right away after an accept, even though
* the server task might not have created a new socket yet.
? * Just count down (or up) if that's the case and we
* will use the data later. Note that only receive events
* can happen before the new socket is set up. */
SYS_ARCH_PROTECT(lev);
if (conn->socket < 0) {
if (evt == NETCONN_EVT_RCVPLUS) {
/* conn->socket is -1 on initialization
lwip_accept adjusts sock->recvevent if conn->socket < -1 */
conn->socket--;
}
SYS_ARCH_UNPROTECT(lev);
return;
}
s = conn->socket;
SYS_ARCH_UNPROTECT(lev);
}
sock = get_socket(s);//獲取socket對應(yīng)的結(jié)構(gòu)
if (!sock) {
return;
}
} else {
return;
}
check_waiters = 1;
//進入臨界區(qū),根據(jù)事件來更新socket的event值
SYS_ARCH_PROTECT(lev);
/* Set event as required */
switch (evt) {
case NETCONN_EVT_RCVPLUS://數(shù)據(jù)被內(nèi)核收到
sock->rcvevent++;
if (sock->rcvevent > 1) {
check_waiters = 0;
}
break;
case NETCONN_EVT_RCVMINUS://數(shù)據(jù)被用戶讀取
sock->rcvevent--;
check_waiters = 0;
break;
case NETCONN_EVT_SENDPLUS://輸出發(fā)送成功
if (sock->sendevent) {
check_waiters = 0;
}
sock->sendevent = 1;
break;
case NETCONN_EVT_SENDMINUS://用戶寫入數(shù)據(jù)到緩沖區(qū)
sock->sendevent = 0;
check_waiters = 0;
break;
case NETCONN_EVT_ERROR://連接錯誤
sock->errevent = 1;
break;
default:
LWIP_ASSERT("unknown event", 0);
break;
}
//事件設(shè)置完畢,喚醒阻塞的select函數(shù)
if (sock->select_waiting && check_waiters) {
/* Save which events are active */
int has_recvevent, has_sendevent, has_errevent;
has_recvevent = sock->rcvevent > 0;//數(shù)據(jù)可讀事件
has_sendevent = sock->sendevent != 0;//數(shù)據(jù)可寫事件
has_errevent = sock->errevent != 0;//數(shù)據(jù)異常事件
SYS_ARCH_UNPROTECT(lev);
/* Check any select calls waiting on this socket */
select_check_waiters(s, has_recvevent, has_sendevent, has_errevent);
} else {
SYS_ARCH_UNPROTECT(lev);
}
done_socket(sock);
}
綜上,event_callback的本質(zhì)就是readset、writeset、exceptset集合的監(jiān)聽,并對rcvevent、sendevent、errevent的填寫,并阻塞的lwip_select函數(shù)發(fā)送信號量。而lwip_select的本質(zhì)就是對rcvevent、sendevent、errevent的讀取,并執(zhí)行相應(yīng)的操作,lwip_select主要是通過lwip_selscan來掃描事件的。
4.2 lwip_poll實現(xiàn)
LWIP也完全實現(xiàn)poll,函數(shù)名是lwip_poll。lwip_poll和lwip_select的實現(xiàn)機制差不多,只是lwip_poll使用pollfd的結(jié)構(gòu)來存儲描述符的,它是基于鏈表來存儲的,這樣lwip_poll函數(shù)沒有最大文件描述符數(shù)量的限制。lwip_poll函數(shù)原型如下:
int lwip_poll(struct pollfd *fds, nfds_t nfds, int timeout)
{
u32_t waitres = 0;
int nready;
u32_t msectimeout;
#if LWIP_NETCONN_SEM_PER_THREAD
int waited = 0;
#endif
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll(%p, %d, %d)\n",
(void*)fds, (int)nfds, timeout));
LWIP_ERROR("lwip_poll: invalid fds", ((fds != NULL && nfds > 0) || (fds == NULL && nfds == 0)),
set_errno(EINVAL); return -1;);
lwip_poll_inc_sockets_used(fds, nfds);
/* Go through each struct pollfd to count number of structures
which currently match */
nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_CLEAR);
if (nready < 0) {
lwip_poll_dec_sockets_used(fds, nfds);
return -1;
}
/* If we don't have any current events, then suspend if we are supposed to */
if (!nready) {
API_SELECT_CB_VAR_DECLARE(select_cb);
if (timeout == 0) {
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: no timeout, returning 0\n"));
goto return_success;
}
API_SELECT_CB_VAR_ALLOC(select_cb, set_errno(EAGAIN); lwip_poll_dec_sockets_used(fds, nfds); return -1);
memset(&API_SELECT_CB_VAR_REF(select_cb), 0, sizeof(struct lwip_select_cb));
/* None ready: add our semaphore to list:
We don't actually need any dynamic memory. Our entry on the
list is only valid while we are in this function, so it's ok
to use local variables. */
API_SELECT_CB_VAR_REF(select_cb).poll_fds = fds;
API_SELECT_CB_VAR_REF(select_cb).poll_nfds = nfds;
#if LWIP_NETCONN_SEM_PER_THREAD
API_SELECT_CB_VAR_REF(select_cb).sem = LWIP_NETCONN_THREAD_SEM_GET();
#else /* LWIP_NETCONN_SEM_PER_THREAD */
if (sys_sem_new(&API_SELECT_CB_VAR_REF(select_cb).sem, 0) != ERR_OK) {
/* failed to create semaphore */
set_errno(EAGAIN);
lwip_poll_dec_sockets_used(fds, nfds);
API_SELECT_CB_VAR_FREE(select_cb);
return -1;
}
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
lwip_link_select_cb(&API_SELECT_CB_VAR_REF(select_cb));
/* Increase select_waiting for each socket we are interested in.
Also, check for events again: there could have been events between
the last scan (without us on the list) and putting us on the list! */
nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_INC_WAIT);
if (!nready) {
/* Still none ready, just wait to be woken */
if (timeout < 0) {
/* Wait forever */
msectimeout = 0;
} else {
/* timeout == 0 would have been handled earlier. */
LWIP_ASSERT("timeout > 0", timeout > 0);
msectimeout = timeout;
}
waitres = sys_arch_sem_wait(SELECT_SEM_PTR(API_SELECT_CB_VAR_REF(select_cb).sem), msectimeout);
#if LWIP_NETCONN_SEM_PER_THREAD
waited = 1;
#endif
}
/* Decrease select_waiting for each socket we are interested in,
and check which events occurred while we waited. */
nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_DEC_WAIT);
lwip_unlink_select_cb(&API_SELECT_CB_VAR_REF(select_cb));
#if LWIP_NETCONN_SEM_PER_THREAD
if (select_cb.sem_signalled && (!waited || (waitres == SYS_ARCH_TIMEOUT))) {
/* don't leave the thread-local semaphore signalled */
sys_arch_sem_wait(API_SELECT_CB_VAR_REF(select_cb).sem, 1);
}
#else /* LWIP_NETCONN_SEM_PER_THREAD */
sys_sem_free(&API_SELECT_CB_VAR_REF(select_cb).sem);
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
API_SELECT_CB_VAR_FREE(select_cb);
if (nready < 0) {
/* This happens when a socket got closed while waiting */
lwip_poll_dec_sockets_used(fds, nfds);
return -1;
}
if (waitres == SYS_ARCH_TIMEOUT) {
? /* Timeout */
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: timeout expired\n"));
goto return_success;
}
}
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: nready=%d\n", nready));
return_success:
lwip_poll_dec_sockets_used(fds, nfds);
set_errno(0);
return nready;
}
和lwip_select一樣也是對事件進行掃描,只是掃描函數(shù)是lwip_pollscan而已。后面的內(nèi)容就不在分析,有興趣請參看LWIP源碼。
lwip_poll函數(shù)實現(xiàn)的具體流程如下:
5并發(fā)服務(wù)器實現(xiàn)
前文講解了select/poll機制在LWIP的實現(xiàn),接下來將使用select/poll來實現(xiàn)并發(fā)服務(wù)器。這里以select為例。
select并發(fā)服務(wù)器模型:
socket(...); //創(chuàng)建套接字
bind(...); //綁定
listen(...); //監(jiān)聽
while(1)
{
if(select(...) > 0) //檢測監(jiān)聽套接字是否可讀
{
if(FD_ISSET(...)>0) //套接字可讀,證明有新客戶端連接服務(wù)器
{
accpet(...);//取出已經(jīng)完成的連接
process(...);//處理請求,反饋結(jié)果
}
}
close(...); //關(guān)閉連接套接字:accept()返回的套接字
}
因此,基于select實現(xiàn)的并發(fā)服務(wù)器模型如下:
從流程上來看,使用select函數(shù)進行IO請求和同步阻塞模型沒有太大的區(qū)別,甚至還多了添加監(jiān)視socket,以及調(diào)用select函數(shù)的額外操作,效率更差。但是,使用select以后最大的優(yōu)勢是用戶可以在一個線程內(nèi)同時處理多個socket的IO請求。用戶可以注冊多個socket,然后不斷地調(diào)用select讀取被激活的socket,即可達到在同一個線程內(nèi)同時處理多個IO請求的目的。而在同步阻塞模型中,必須通過多線程的方式才能達到這個目的。
Server:
/**
******************************************************************************
* @file server.c
* @author BruceOu
* @rtt version V4.0.3
* @version V1.0
* @date 2022-06-08
* @blog https://blog.bruceou.cn/
* @Official Accounts 嵌入式實驗樓
* @brief 基于select的服務(wù)器
******************************************************************************
*/
#include
#include
#include
#include
#include
#include
#define SERVER_PORT 8888
#define BUFF_SIZE 1024
static char recvbuff[BUFF_SIZE];
static void net_server_thread_entry(void *parameter)
{
int sfd, cfd, maxfd, i, nready, n;
struct sockaddr_in server_addr, client_addr;
struct netdev *netdev = RT_NULL;
char sendbuff[] = "Hello client!";
socklen_t client_addr_len;
fd_set all_set, read_set;
//FD_SETSIZE里面包含了服務(wù)器的fd
int clientfds[FD_SETSIZE - 1];
/*通過名稱獲取 netdev網(wǎng)卡對象 */
netdev = netdev_get_by_name((char*)parameter);
if (netdev == RT_NULL)
{
rt_kprintf("get network interface device(%s) failed.\n", (char*)parameter);
}
//創(chuàng)建socket
if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
rt_kprintf("Socket create failed.\n");
}
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
//server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
/*?獲取網(wǎng)卡對象中 IP?地址信息 */
server_addr.sin_addr.s_addr = netdev->ip_addr.addr;
//綁定socket
if (bind(sfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) < 0)
{
rt_kprintf("socket bind failed.\n");
closesocket(sfd);
}
rt_kprintf("socket bind network interface device(%s) success!\n", netdev->name);
//監(jiān)聽socket
if(listen(sfd, 5) == -1)
{
rt_kprintf("listen error");
}
else
{
rt_kprintf("listening...\n");
}
client_addr_len = sizeof(client_addr);
//初始化 maxfd等于 sfd
maxfd = sfd;
//清空fdset
FD_ZERO(&all_set);
//把sfd文件描述符添加到集合中
FD_SET(sfd, &all_set);
//初始化客戶端fd的集合
for(i = 0; i < FD_SETSIZE -1 ; i++)
{
//初始化為-1
clientfds[i] = -1;
}
while(1)
{
//每次select返回之后,fd_set集合就會變化,再select時,就不能使用,
//所以我們要保存設(shè)置fd_set?和?讀取的fd_set
read_set = all_set;
nready = select(maxfd + 1, &read_set, NULL, NULL, NULL);
//沒有超時機制,不會返回0
if(nready < 0)
{
? rt_kprintf("select error \r\n");
}
//判斷監(jiān)聽的套接字是否有數(shù)據(jù)
if(FD_ISSET(sfd, &read_set))
{
//有客戶端進行連接了
cfd = accept(sfd, (struct sockaddr *)&client_addr, &client_addr_len);
if(cfd < 0)
{
rt_kprintf("accept socket error\r\n");
//繼續(xù)select
continue;
}
rt_kprintf("new client connect fd = %d\r\n", cfd);
//把新的cfd?添加到fd_set集合中
FD_SET(cfd, &all_set);
//更新要select的maxfd
maxfd = (cfd > maxfd)?cfd:maxfd;
//把新的cfd保存到cfds集合中
for(i = 0; i < FD_SETSIZE -1 ; i++)
{
if(clientfds[i] == -1)
{
clientfds[i] = cfd;
//退出,不需要添加
break;
}
}
//沒有其他套接字需要處理:這里防止重復(fù)工作,就不去執(zhí)行其他任務(wù)
if(--nready == 0)
{
//繼續(xù)select
continue;
}
}
//遍歷所有的客戶端文件描述符
for(i = 0; i < FD_SETSIZE -1 ; i++)
{
if(clientfds[i] == -1)
{
//繼續(xù)遍歷
continue;
}
//判斷是否在fd_set集合里面
? if(FD_ISSET(clientfds[i], &read_set))
{
n = recv(clientfds[i], recvbuff, sizeof(recvbuff), 0);
rt_kprintf("clientfd %d:? %s \r\n",clientfds[i], recvbuff);
if(n <= 0)
{
//從集合里面清除
FD_CLR(clientfds[i], &all_set);
//當(dāng)前的客戶端fd?賦值為-1
clientfds[i] = -1;? }
else
{
//寫回客戶端
n = send(clientfds[i], sendbuff, strlen(sendbuff), 0);
if(n < 0)
{
//從集合里面清除
FD_CLR(clientfds[i], &all_set);
//當(dāng)前的客戶端fd?賦值為-1
? clientfds[i] = -1;
}
}
}
}
}
}
static int server(int argc, char **argv)
{
rt_err_t ret = RT_EOK;
if (argc != 2)
{
rt_kprintf("bind_test [netdev_name]? --bind network interface device by name.\n");
return -RT_ERROR;
}
/*?創(chuàng)建 serial?線程 */
rt_thread_t thread = rt_thread_create("server",
net_server_thread_entry,
argv[1],
4096,
10,
10);
/*?創(chuàng)建成功則啟動線程 */
if (thread != RT_NULL)
{
rt_thread_startup(thread);
}
else
{
? ret = RT_ERROR;
}
return ret;
}
#ifdef FINSH_USING_MSH
#include
MSH_CMD_EXPORT(server, network interface device test);
#endif /* FINSH_USING_MSH */
Client:【Linux版】
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define SERVPORT 8888
int main(int argc,char *argv[])
{
char sendbuf[] = "Client1 : Hello Rtthread!";
char recvbuf[2014];
int sockfd,sendbytes;
struct sockaddr_in serv_addr;//需要連接的服務(wù)器地址信息
if (argc != 2)
{
perror("init error");
}
//1.創(chuàng)建socket
//AF_INET表示IPV4
//SOCK_STREAM表示TCP
if((sockfd = socket(AF_INET,SOCK_STREAM,0)) < 0)
{
perror("socket");
exit(1);
}
//填充服務(wù)器地址信息
serv_addr.sin_family ???? = AF_INET; //網(wǎng)絡(luò)層的IP協(xié)議: IPV4
serv_addr.sin_port ??????? = htons(SERVPORT); //傳輸層的端口號
serv_addr.sin_addr.s_addr? = inet_addr(argv[1]); //網(wǎng)絡(luò)層的IP地址:?實際的服務(wù)器IP地址
?bzero(&(serv_addr.sin_zero),8); //保留的8字節(jié)置零
//2.發(fā)起對服務(wù)器的連接信息
//三次握手,需要將sockaddr_in類型的數(shù)據(jù)結(jié)構(gòu)強制轉(zhuǎn)換為sockaddr
if((connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(struct sockaddr))) < 0) {
perror("connect failed!");
exit(1);
}
printf("connect successful! \n");
//3.發(fā)送消息給服務(wù)器端
while (1)
{
send(sockfd, sendbuf, strlen(sendbuf), 0);
recv(sockfd, recvbuf, sizeof(recvbuf), 0);
????? printf("Server : %s \n", recvbuf);
????? sleep(2);
}
//4.關(guān)閉
close(sockfd);
}
Client:【RT-Thread版】
/**
******************************************************************************
* @file client.c
* @author BruceOu
* @rtt version V4.0.3
* @version V1.0
* @date 2022-06-01
* @blog https://blog.bruceou.cn/
* @Official Accounts 嵌入式實驗樓
* @brief 客戶端
******************************************************************************
*/
#include
#include
#include
#include
#include
#include
#define SERVER_HOST "192.168.101.8"
#define SERVER_PORT 8888
static int client(int argc, char **argv)
{
struct sockaddr_in client_addr;
struct sockaddr_in server_addr;
struct netdev *netdev = RT_NULL;
int sockfd = -1;
char sendbuf[] = "Hello RT-Thread! \r\n";
char recvbuf[2014];
if (argc != 2)
{
rt_kprintf("bind_test [netdev_name] --bind network interface device by name.\n");
return -RT_ERROR;
}
/*通過名稱獲取 netdev網(wǎng)卡對象 */
netdev = netdev_get_by_name(argv[1]);
if (netdev == RT_NULL)
{
rt_kprintf("get network interface device(%s) failed.\n", argv[1]);
return -RT_ERROR;
}
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
rt_kprintf("Socket create failed.\n");
return -RT_ERROR;
}
/*?初始化需要綁定的客戶端地址 */
client_addr.sin_family = AF_INET;
client_addr.sin_port = htons(8080);
/*?獲取網(wǎng)卡對象中 IP?地址信息 */
client_addr.sin_addr.s_addr = netdev->ip_addr.addr;
rt_memset(&(client_addr.sin_zero), 0, sizeof(client_addr.sin_zero));
if (bind(sockfd, (struct sockaddr *)&client_addr, sizeof(struct sockaddr)) < 0)
{
rt_kprintf("socket bind failed.\n");
closesocket(sockfd);
return -RT_ERROR;
}
rt_kprintf("socket bind network interface device(%s) success!\n", netdev->name);
/*初始化預(yù)連接的服務(wù)端地址 */
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
server_addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
rt_memset(&(server_addr.sin_zero), 0, sizeof(server_addr.sin_zero));
/*連接到服務(wù)端 */
if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) < 0)
{
rt_kprintf("socket connect failed!\n");
closesocket(sockfd);
return -RT_ERROR;
}
else
{
rt_kprintf("socket connect success!\n");
}
while (1)
{
send(sockfd, sendbuf, strlen(sendbuf), 0);
recv(sockfd, recvbuf, sizeof(recvbuf), 0);
fputs(recvbuf, stdout);
memset(recvbuf, 0, sizeof(recvbuf));
rt_thread_mdelay(500);
}
/*?關(guān)閉連接 */
closesocket(sockfd);
return RT_EOK;
}
#ifdef FINSH_USING_MSH
#include
MSH_CMD_EXPORT(client, network interface device test);
#endif /* FINSH_USING_MSH */
接下來就是驗證了,關(guān)于ART-Pi的聯(lián)網(wǎng)部分就不再贅述了有不懂的看前面的章節(jié)。
現(xiàn)在ART-Pi上開啟服務(wù)器:
Server:
然后開啟客戶端,筆者的客戶端在Ubuntu上運行的:
Client:
筆者這里使用的客戶端只有兩個,有興趣的也可以使用多個客戶端。
當(dāng)然啦,如果懶得寫客戶端,也可使用網(wǎng)絡(luò)調(diào)試助手測試。
-
服務(wù)器
+關(guān)注
關(guān)注
12文章
9123瀏覽量
85322 -
API
+關(guān)注
關(guān)注
2文章
1499瀏覽量
61958 -
RT-Thread
+關(guān)注
關(guān)注
31文章
1285瀏覽量
40079 -
select
+關(guān)注
關(guān)注
0文章
28瀏覽量
3912 -
ART-Pi
+關(guān)注
關(guān)注
0文章
23瀏覽量
1296
發(fā)布評論請先 登錄
相關(guān)推薦
評論