我需要的 pthread 線程集結(jié)點(diǎn)功能,使用同一集結(jié)點(diǎn)的線程將通過(guò) rend_wait 函數(shù)等待,當(dāng)集結(jié)點(diǎn)到達(dá)指定數(shù)量的線程后同時(shí)激發(fā)繼續(xù)執(zhí)行。使用 pthread 的 mutex 和 cond 超輕量實(shí)現(xiàn)。下面 rend.h 是集結(jié)點(diǎn)實(shí)現(xiàn),rendezvous.c 是測(cè)試應(yīng)用。
- /*
- *rend.h
- *
- *Createdon:2009-11-14
- *Author:liuzy(lzy.dev@gmail.com)
- */
- #ifndefREND_H_
- #defineREND_H_
- #include<pthread.h>
- #include<assert.h>
- struct rend_t{
- volatile int count;
- pthread_mutex_tcount_lock;
- pthread_cond_tready;
- };
- #defineDECLARE_REND(name,count)/
- struct rend_tname={(count),PTHREAD_MUTEX_INITIALIZER,PTHREAD_COND_INITIALIZER}
- int rend_init( struct rend_t*prend, int count){
- int ret=0;
- assert(prend);
- prend->count=count;
- if ((ret=pthread_mutex_init(&prend->count_lock,NULL)))
- return ret;
- if ((ret=pthread_cond_init(&prend->ready,NULL)))
- return ret;
- return EXIT_SUCCESS;
- }
- int rend_wait( struct rend_t*prend){
- int ret=0;
- assert(prend);
- if ((ret=pthread_mutex_lock(&prend->count_lock)))
- return ret;
- /*checkcountvalueisreadytoweakupblockcode*/
- if (prend->count==1){
- if ((ret=pthread_cond_broadcast(&prend->ready)))
- return ret;
- if ((ret=pthread_mutex_unlock(&prend->count_lock)))
- return ret;
- } else {
- prend->count--;
- ret=pthread_cond_wait(&prend->ready,&prend->count_lock);
- prend->count++;
- if (ret){
- pthread_mutex_unlock(&prend->count_lock);
- return ret;
- }
- if ((ret=pthread_mutex_unlock(&prend->count_lock)))
- return ret;
- }
- return EXIT_SUCCESS;
- }
- int rend_free( struct rend_t*prend){
- int ret=0;
- assert(prend);
- prend->count=0;
- if ((ret=pthread_mutex_destroy(&prend->count_lock)))
- return ret;
- if ((ret=pthread_cond_destroy(&prend->ready)))
- return ret;
- return EXIT_SUCCESS;
- }
- #endif/*REND_H_*/
rend 使用更簡(jiǎn)單:
- 定義/初始化 rend_t 集結(jié)點(diǎn)對(duì)象。DECLARE_REND 宏用于靜態(tài)定義,rend_init 函數(shù)可以對(duì)動(dòng)態(tài)創(chuàng)建的集結(jié)點(diǎn)結(jié)構(gòu)初始化;
- pthread 線程通過(guò)調(diào)用 rend_wait 函數(shù) P/V 集結(jié)狀態(tài)。集結(jié)關(guān)系的線程要 P/V 在同一個(gè) rend_t 集結(jié)對(duì)象上;
- 釋放集結(jié)對(duì)象,rend_free 函數(shù)。
以上函數(shù)都是成功返回 0,出錯(cuò)返回 errno 值(非 0)。
- /*
- ==============================
- Name:rendezvous.c
- Author:liuzy(lzy.dev@gmail.com)
- Version:0.1
- ==============================
- */
- #include<stdio.h>
- #include<stdlib.h>
- #include<stdarg.h>/*va_list*/
- #include<unistd.h>
- #include<string.h>
- #include<errno.h>/*errno*/
- #include<syslog.h>/*forsyslog(2)andlevel*/
- #include<pthread.h>
- #include"rend.h"
- static int daemon_proc=0; /*forsysloginerr_doit*/
- #defineMAXLINE4096/*maxtextlinelength*/
- void err_doit( int errnoflag, int level, const char *fmt, va_list ap){
- char buf[MAXLINE+1]={0};
- int errno_save=errno,n=0;
- #ifdefHAVE_VSNPRINTF
- vsnprintf(buf,MAXLINE,fmt,ap);
- #else
- vsprintf(buf,fmt,ap);
- #endif/*HAVE_VSNPRINTF*/
- n=strlen(buf);
- if (errnoflag)
- snprintf(buf+n,MAXLINE-n, ":%s" ,strerror(errno_save));
- strcat(buf, "/n" );
- if (daemon_proc){
- syslog(level, "%s" ,buf);
- } else {
- fflush(stdout);
- fputs(buf,stderr);
- fflush(stderr);
- }
- return ;
- }
- void err_msg( const char *fmt,...){
- va_list ap;
- va_start(ap,fmt);
- err_doit(0,LOG_INFO,fmt,ap);
- va_end(ap);
- return ;
- }
- void err_sys( const char *fmt,...){
- va_list ap;
- va_start(ap,fmt);
- err_doit(1,LOG_ERR,fmt,ap);
- va_end(ap);
- exit(EXIT_FAILURE);
- }
- #defineTHREAD_COUNT100/*rendezvoustestthreadworkers*/
- struct worker_arg{
- int worker_id;
- struct rend_t*prend;
- };
- static void *pthread_worker( void *arg){
- struct worker_arg*parg=( struct worker_arg*)arg;
- err_msg( "worker#%drunning." ,( int )parg->worker_id);
- srand(parg->worker_id*2);
- sleep(rand()%5);
- rend_wait(parg->prend); /*workersrendezvous*/
- err_msg( "worker#%dexiting." ,( int )parg->worker_id);
- return EXIT_SUCCESS;
- }
- int main( void ){
- int idx=0;
- void *exitcode=NULL;
- pthread_tthds[THREAD_COUNT];
- struct worker_argarg[THREAD_COUNT];
- DECLARE_REND(rend,THREAD_COUNT);
- err_msg( "workerscreating." );
- for (idx=0;idx<THREAD_COUNT;idx++){
- arg[idx].prend=&rend;
- arg[idx].worker_id=idx;
- if (pthread_create(thds+idx,NULL,pthread_worker,( void *)&arg[idx]))
- err_sys( "worker#%dcreateerror." ,idx);
- }
- puts( "workersexiting." );
- for (idx=0;idx<THREAD_COUNT;idx++)
- if (pthread_join(thds[idx],&exitcode)||(exitcode!=EXIT_SUCCESS))
- err_msg( "worker#%dexiterror." ,idx);
- err_msg( "alldone.exit0." );
- rend_free(&rend);
- return EXIT_SUCCESS;
- }
看了下 semaphore os syscall 及其 infrastructure,也許以后還需要進(jìn)程間(非 pthread)集結(jié)時(shí)用得上。kernel 實(shí)現(xiàn)的超強(qiáng)啊,呵呵~
// 2009.11.17 14:34 添加 ////
futex在非競(jìng)爭(zhēng)情況下可從用戶空間獲取和釋放,不需要進(jìn)入內(nèi)核。與信號(hào)量類似,它有一個(gè)可以原子增減的計(jì)數(shù)器,進(jìn)程可以等待計(jì)數(shù)器值變?yōu)檎龜?shù)。用戶進(jìn)程通過(guò)系統(tǒng)調(diào)用對(duì)資源的競(jìng)爭(zhēng)作一個(gè)公斷。
futex 是一個(gè)用戶空間的整數(shù)值,被多個(gè)線程或進(jìn)程共享。Futex的系統(tǒng)調(diào)用對(duì)該整數(shù)值時(shí)進(jìn)行操作,仲裁競(jìng)爭(zhēng)的訪問(wèn)。 glibc中的NPTL庫(kù)封裝了futex 系統(tǒng)調(diào)用,對(duì)futex接口進(jìn)行了抽象。用戶通過(guò)NPTL庫(kù)像傳統(tǒng)編程一樣地使用線程同步API函數(shù),而不會(huì)感覺(jué)到futex的存在。
futex 的實(shí)現(xiàn)機(jī)制是:如果當(dāng)前進(jìn)程訪問(wèn)臨界區(qū)時(shí),該臨界區(qū)正被另一個(gè)進(jìn)程使用,當(dāng)前進(jìn)程將鎖用一個(gè)值標(biāo)識(shí),表示“有一個(gè)等待者正掛起”,并且調(diào)用 sys_futex(FUTEX_WAIT)等待其他進(jìn)程釋放它。內(nèi)核在內(nèi)部創(chuàng)建futex隊(duì)列,以便以后與喚醒者匹配等待者。當(dāng)臨界區(qū)擁有者線程釋放了 futex,它通過(guò)變量值發(fā)出通知表示還有多個(gè)等待者在掛起,并調(diào)用系統(tǒng)調(diào)用sys_futex(FUTEX_WAKE)喚醒它們。一旦所有等待者已獲取資源并釋放鎖時(shí),futex回到非競(jìng)爭(zhēng)狀態(tài),并沒(méi)有內(nèi)核狀態(tài)與它相關(guān)。
robust futex是為了解決futex鎖崩潰而對(duì)futex進(jìn)行了增強(qiáng)。例如:當(dāng)一個(gè)進(jìn)程在持有pthread_mutex_t鎖正與其他進(jìn)程發(fā)生競(jìng)爭(zhēng)時(shí),進(jìn)程因某種意外原因而提前退出,如:進(jìn)程發(fā)生段錯(cuò)誤,或者被用戶用shell命令kill -9-ed”強(qiáng)行退出,此時(shí),需要有一種機(jī)制告訴等待者“鎖的最一個(gè)持有者已經(jīng)非正常地退出”?!?
為了解決此類問(wèn)題,NPTL創(chuàng)建了robust mutex用戶空間API pthread_mutex_lock(),如果鎖的擁有者進(jìn)程提前退出,pthread_mutex_lock()返回一個(gè)錯(cuò)誤值,新的擁有者進(jìn)程可以決定是否可以安全恢復(fù)被鎖保護(hù)的數(shù)據(jù)。
有幾點(diǎn)不還不理解:
- “futex 如果說(shuō)是一個(gè)用戶空間的整數(shù)值,那怎么被多個(gè)進(jìn)程共享?Futex 系統(tǒng)調(diào)用在 kernel 態(tài)怎么操作該值并仲裁競(jìng)爭(zhēng)?這是那種直接映射到 userspace 的 kernel 地址么。 這個(gè)需要程序間通過(guò) mmap 在共享段中訪問(wèn),與 futex 沒(méi)什么關(guān)系。
- 這個(gè)“robust futex”機(jī)制指的應(yīng)該就是 SVRx 傳統(tǒng) sem IPC 里的 SEM_UNDO flag 吧?
一篇不錯(cuò)的文章,引發(fā)對(duì) glibc nptl 實(shí)現(xiàn)源碼的探索:
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
