柚子快報激活碼778899分享:信號量——Linux并發(fā)之魂
柚子快報激活碼778899分享:信號量——Linux并發(fā)之魂
歡迎來到?破曉的歷程的 博客
引言
今天,我們繼續(xù)學(xué)習(xí)Linux線程本分,在Linux條件變量中,我們對條件變量的做了詳細(xì)的說明,今天我們要利用條件變量來引出我們的另一個話題——信號量內(nèi)容的學(xué)習(xí)。
1.復(fù)習(xí)條件變量
在上一期博客中,我們沒有對條件變量做具體的使用,所以,這里我們通過一份代碼來復(fù)習(xí)一下,接下來,我們實現(xiàn)基于BlockingQueue的生產(chǎn)者消費者模型。
1.1何為基于BlockingQueue的生產(chǎn)者消費者模型
BlockingQueue在多線程編程中阻塞隊列(Blocking Queue)是一種常用于實現(xiàn)生產(chǎn)者和消費者模型的數(shù)據(jù)結(jié)構(gòu)。其與普通的隊列區(qū)別在于,當(dāng)隊列為空時,從隊列獲取元素的操作將會被阻塞,直到隊列中被放入了元素;當(dāng)隊列滿時,往隊列里存放元素的操作也會被阻塞,直到有元素被從隊列中取出(以上的操作都是基于不同的線程來說的,線程在對阻塞隊列進(jìn)程操作時會被阻塞) 如圖:
1.2分析該模型
這里我想寫多個生產(chǎn)線程和多個消費線程的模型 我們來分析一下。
首先生產(chǎn)任務(wù)的過程和消費任務(wù)的過程必須是互斥關(guān)系,不可以同時訪問該隊列(此時,這個隊列是共享資源)。當(dāng)隊列滿時,生產(chǎn)線程就不能再生產(chǎn)任務(wù),必須在特定的條件變量下等待;同理當(dāng)隊列為空時,消費線程就不能再消費任務(wù),也必須在特定的條件變量下等待。 所以,類應(yīng)這樣設(shè)計:
template
class BlockQueue
{
public:
BlockQueue(const int &maxcap=gmaxcap):_maxcap(maxcap)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_pcond,nullptr);
pthread_cond_init(&_ccond,nullptr);
}
void push(const T&in)//輸入型參數(shù),const &
{
pthread_mutex_lock(&_mutex);
while(is_full())
{
pthread_cond_wait(&_pcond,&_mutex);
}
_q.push(in);
pthread_cond_signal(&_ccond);
pthread_mutex_unlock(&_mutex);
}
void pop(T*out)
{
pthread_mutex_lock(&_mutex);
while(is_empty())
{
pthread_cond_wait(&_ccond,&_mutex);
}
*out=_q.front();
_q.pop();
pthread_cond_signal(&_pcond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_ccond);
pthread_cond_destroy(&_pcond);
}
private:
bool is_empty()
{
return _q.empty();
}
bool is_full()
{
return _q.size()==_maxcap;
}
private:
std::queue
int _maxcap; //隊列中元素的上線
pthread_mutex_t _mutex;
pthread_cond_t _pcond; //生產(chǎn)者對應(yīng)的條件變量
pthread_cond_t _ccond;
};
由于我們不知道存儲的數(shù)據(jù)類型,所以這里我們選擇使用泛型編程的方式。 接下來就是要生產(chǎn)任務(wù),為了可以觀察到整個生產(chǎn)和消費任務(wù)的過程,我們可以生成兩個隨機數(shù),然后進(jìn)行運算。代碼如下:
class CalTask
{
using func_t = function
public:
CalTask() {}
CalTask(int x, int y, char op, func_t func)
:_x(x),_y(y),_op(op),_callback(func)
{}
string operator()()
{
int result=_callback(_x,_y,_op);
char buffer[1024];
snprintf(buffer,sizeof buffer,"%d %c %d=%d",_x,_op,_y,result);
return buffer;
}
string toTaskstring()
{
char buffer[1024];
snprintf(buffer,sizeof buffer,"%d %c %d=?",_x,_op,_y);
return buffer;
}
private:
int _x;
int _y;
char _op;
func_t _callback;
};
const char*oper="+-*/%";
int mymath(int x,int y,char op)
{
int result=0;
switch(op)
{
case '+':
result=x+y;
break;
case '-':
result=x-y;
break;
case '*':
result=x*y;
break;
case '/':
if(y==0)
{
cerr<<"div zero error"< result=-1; } else { result=x/y; } break; case '%': if(y==0) { cerr<<"mod zero error"< result=-1; } else { result=x%y; } default: break; } return result; } 接下來,我們來寫整體的代碼。 1.3完整代碼 我們要創(chuàng)建三個文件:BlockQueue.hpp Task.hpp Main.cc各文件內(nèi)容如下所示: BlockQueue.hpp #pragma once #include #include #include #include #include #include using namespace std; const int gmaxcap=100; template class BlockQueue { public: BlockQueue(const int &maxcap=gmaxcap):_maxcap(maxcap) { pthread_mutex_init(&_mutex,nullptr); pthread_cond_init(&_pcond,nullptr); pthread_cond_init(&_ccond,nullptr); } void push(const T&in)//輸入型參數(shù),const & { pthread_mutex_lock(&_mutex); while(is_full()) { pthread_cond_wait(&_pcond,&_mutex); } _q.push(in); pthread_cond_signal(&_ccond); pthread_mutex_unlock(&_mutex); } void pop(T*out) { pthread_mutex_lock(&_mutex); while(is_empty()) { pthread_cond_wait(&_ccond,&_mutex); } *out=_q.front(); _q.pop(); pthread_cond_signal(&_pcond); pthread_mutex_unlock(&_mutex); } ~BlockQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_ccond); pthread_cond_destroy(&_pcond); } private: bool is_empty() { return _q.empty(); } bool is_full() { return _q.size()==_maxcap; } private: std::queue int _maxcap; //隊列中元素的上線 pthread_mutex_t _mutex; pthread_cond_t _pcond; //生產(chǎn)者對應(yīng)的條件變量 pthread_cond_t _ccond; }; Task.hpp #pragma once #include #include #include #include #include using namespace std; class CalTask { using func_t = function public: CalTask() {} CalTask(int x, int y, char op, func_t func) :_x(x),_y(y),_op(op),_callback(func) {} string operator()() { int result=_callback(_x,_y,_op); char buffer[1024]; snprintf(buffer,sizeof buffer,"%d %c %d=%d",_x,_op,_y,result); return buffer; } string toTaskstring() { char buffer[1024]; snprintf(buffer,sizeof buffer,"%d %c %d=?",_x,_op,_y); return buffer; } private: int _x; int _y; char _op; func_t _callback; }; const char*oper="+-*/%"; int mymath(int x,int y,char op) { int result=0; switch(op) { case '+': result=x+y; break; case '-': result=x-y; break; case '*': result=x*y; break; case '/': if(y==0) { cerr<<"div zero error"< result=-1; } else result=x/y; } break; case '%': if(y==0) { cerr<<"mod zero error"< result=-1; } else { result=x%y; } default: break; } return result; } Main.cc include "BlockQueue.hpp" #include "Task.hpp" #include #include #include #include using namespace std; void *productor(void *bqs_) { BlockQueue while(true) { int x=rand()%10+1; int y=rand()%5+1; int opercode=rand()%(sizeof(oper)); CalTask T(x,y,oper[opercode],mymath); bqs->push(T); cout<<"生產(chǎn)任務(wù): "; cout< sleep(1); } } void *consumer(void *bqs_) { BlockQueue while(true) { CalTask T; bqs->pop(&T); cout<<"消費任務(wù): "; cout< } } int main() { BlockQueue pthread_t p[5]; pthread_t c[5]; for(int i=0;i<5;i++) { pthread_create(&p[i],nullptr,productor,&bqs); pthread_create(&c[i],nullptr,consumer,&bqs); } for(int i=0;i<5;i++) { pthread_join(p[i],nullptr); pthread_join(c[i],nullptr); } } 在代碼中,有幾個點需要注意一下: 第一點: pthread_cond_wait的第二個參數(shù)一定是我們正在使用的互斥鎖,這個函數(shù)在被運行時,會以原子性的方式將鎖釋放,然后將自己掛起,等待被條件變量喚醒。該函數(shù)在被喚醒時,會自動重新獲取持有的鎖,然后繼續(xù)向下執(zhí)行。 假如數(shù)個生產(chǎn)者線程一起被喚醒,然后先后持有鎖,接著繼續(xù)生產(chǎn)任務(wù),當(dāng)隊列剩余的空間小于這些生產(chǎn)者生產(chǎn)的任務(wù)時,就會出現(xiàn)問題,所以讓所有被喚醒的線程先通過while循環(huán),如果有剩余的空間,再進(jìn)行任務(wù)的生產(chǎn)活動。 生產(chǎn)線程這樣處理,消費線程也要這樣處理 大家可以在自己試這敲一下,有問題可以在評論區(qū)和我交流。 接下來,我們來查找一下這些代碼有哪些"不足的地方" 2.代碼中的“不足” 一個線程在操作臨界資源時,臨界資源必須是滿足條件的,然后線程才能對臨界資源進(jìn)行操作。比如:在如上代碼中,生產(chǎn)者線程只有在隊列(臨界資源)有剩余空間的條件下,才能進(jìn)行下一步操作。 可是,臨界資源是否滿足生產(chǎn)和消費的條件,我們不能事前得知,只等進(jìn)入臨界資源后,再進(jìn)行進(jìn)一步的檢測。 所以,一般訪問臨界資源的過程為:先加鎖,再檢測,如果條件滿足,就進(jìn)行下一步的操作;反之,就將該線程掛起,釋放鎖,然后掛起等待,等到條件滿足時,重新獲得鎖,接著進(jìn)行下一步操作。 因為不可能事先得知是否滿足條件,所以我們只能先加鎖,進(jìn)入臨界資源內(nèi)部進(jìn)行檢測。 只要我們申請了信號量,就默認(rèn)對這部分資源的整體使用,但通常情況下,我們使用的僅僅是臨界資源的一小部分。 實際情況中,有沒有可能不同的線程訪問臨界資源不同部分的情況,有可能。所以,前輩大佬們給出了一種解決方案——信號量。 3.信號量 3.1什么是信號量 信號量的本質(zhì)是一把計數(shù)器,一把衡量臨界資源多少的計數(shù)器。只要擁有信號量,就在未來一定能夠擁有臨界資源的一部分。 申請信號量的本質(zhì):就是對臨界資源的預(yù)定機制。 比如:我想去看電影,首先我要買票。我一旦買到票,無論我去不去看電影,都會有一個位置屬于我。買票的過程==申請信號信號量的過程。 所以,在訪問臨界資源之前,我們可以申請信號量。通過申請信號量,我們就可以獲知臨界資源的使用情況。①只要申請成功,就一定有我可以訪問的資源。②只要申請失敗,說明條件不就緒,只能等待。如此,就不需要進(jìn)入臨界資源再進(jìn)行檢測了。 3.2信號量的相關(guān)接口 如上這些借口如果調(diào)用成功的話,返回0;調(diào)用失敗的話,返回-1,并且錯誤原因被設(shè)置。 我們知道信號量的本質(zhì)是一把計數(shù)器,所以信號量必須可以進(jìn)行遞增和遞減的操作。 信號量-1:申請資源,其過程必須是原子性的。簡稱P操作。信號量+1:歸還資源,其過程必須是原子性的。簡稱V操作。 所以,信號量的核心操作:PV原語。 接下來,我們就使用信號量來完成我們的基于環(huán)形隊列的生產(chǎn)消費模型。 3.3用信號量來實現(xiàn)基于環(huán)形隊列的生產(chǎn)消費模型 3.3.1對環(huán)形隊列的簡單介紹 相信大家在C++學(xué)習(xí)期間到都模擬實現(xiàn)過環(huán)形隊列隊列。如圖: 環(huán)形隊列的邏輯結(jié)構(gòu)為環(huán)形,但其存儲結(jié)構(gòu)實際上就是隊列,其實就是一個數(shù)組,只不過用下標(biāo)不斷的%上隊列的長度。 大家在模擬實現(xiàn)環(huán)形隊列時,大家必定遇到的問題是:當(dāng)rear==front時,究竟是環(huán)形隊列已滿還是環(huán)形隊列為空呢?其實,這個問題有多種處理方式,今天就不講了。 今天,我們的基于環(huán)形隊列的生產(chǎn)消費模型必須遵守哪些規(guī)則呢? 我們來講一個故事: 張三和李四在一個房間里做游戲,這個房間里有一張大圓桌,桌子上有很多的盤子。規(guī)定張三往每個盤子里放一個桃子?,然后李四在后邊吃桃子?,由于李四還要吃桃子,所以速度一定比張三放的速度滿。 總結(jié)一下,我們發(fā)現(xiàn)張三和李四必須滿足這些規(guī)律: 李四不可以超過張三——消費者不可以超過生產(chǎn)者。張三不可以把李四套一個圈——生產(chǎn)者不可以把消費者套一個圈。張三和李四什么時候在一起?①盤子全為空,張三和李四在一起,張三先運行(生產(chǎn)者先運行)。②盤子全為滿,張三和李四在一起,李四先運行(消費者先運行)。③其他情況,張三和李四指向不同的位置。 我們將這些規(guī)則遷移到環(huán)形隊列的生產(chǎn)消費模型,就是生產(chǎn)消費模型應(yīng)該遵守的規(guī)則: ①消費者不能超過生產(chǎn)者。②生產(chǎn)者不能把消費者套一個圈。③生產(chǎn)者和消費者什么情況下會在一起呢?空的時候和滿的時候,對應(yīng)不同的處理方式。④只要生產(chǎn)者和消費者指向不同的位置,就可以實現(xiàn)生產(chǎn)者和消費者的并發(fā)執(zhí)行。只有在為空和為 滿時,才會出現(xiàn)同步和互斥問題。 那這些規(guī)則由什么來保證呢?信號量。信號量是表征臨界資源中資源數(shù)目的。 1.對于生產(chǎn)者而言,看中的是隊列中的剩余空間——空間資源定義一個信號量。 2.對于消費者而言,看中的是隊列中的數(shù)據(jù)——數(shù)據(jù)資源定義一個信號量。 接下來,我們基于這份偽代碼來理解一下,看看能否滿足我們的規(guī)則。 生產(chǎn)者關(guān)注的是隊列里的剩余空間,在隊列為空時剩余空間為10,所以生產(chǎn)者可以順利申請到信號量。但是由于空間中這部分資源已經(jīng)被占用,所以無法歸還。但是消費者所關(guān)注的隊列中的數(shù)據(jù)資源不知不覺中已經(jīng)多了一份。所以對消費者信號量應(yīng)進(jìn)行V操作。 消費者關(guān)注的是隊列中的數(shù)據(jù)資源,隊列剛開始為空時,數(shù)據(jù)資源為0,消費者申請失敗。等到生產(chǎn)者申請神域空間成功后,生產(chǎn)了數(shù)據(jù)。所以消費者可以成功申請到數(shù)據(jù)資源信號量,然后消費數(shù)據(jù)。但不知不覺,隊列中的剩余空間多了一份,所以應(yīng)對剩余空間資源的信號量進(jìn)行V操作。 若隊列滿時,剩余空間信號量為0,生產(chǎn)者申請信號量失敗。此時,數(shù)據(jù)資源信號量為滿,消費者可以申請到信號量,從而進(jìn)行操作。所以必須消費者先運行。 若隊列空時,數(shù)據(jù)資源信號量為0,消費者申請信號量失敗。此時,剩余空間信號量為滿,生產(chǎn)者可以申請到信號量,從而進(jìn)行操作。所以必須生產(chǎn)者先運行。 所以,這偽代碼完全符合我們的規(guī)則。接下來,我們編寫單生產(chǎn)進(jìn)程和單消費進(jìn)程的代碼。 編寫代碼 我們創(chuàng)建三個源文件:RingQueue.hpp main.cc Task.hpp Ringqueue.hpp: #pragma once #include #include #include #include #include static const int gcap = 5; template class RingQueue { private: void P(sem_t &sem) { int n = sem_wait(&sem); assert(n == 0); // if (void)n; } void V(sem_t &sem) { int n = sem_post(&sem); assert(n == 0); (void)n; } public: RingQueue(const int &cap = gcap): _queue(cap), _cap(cap) { int n = sem_init(&_spaceSem, 0, _cap); assert(n == 0); n = sem_init(&_dataSem, 0, 0); assert(n == 0); _productorStep = _consumerStep = 0; pthread_mutex_init(&_pmutex, nullptr); pthread_mutex_init(&_cmutex, nullptr); } // 生產(chǎn)者 void Push(const T &in) { // ?: 這個代碼 有沒有優(yōu)化的可能 // 你認(rèn)為:現(xiàn)加鎖,后申請信號量,還是現(xiàn)申請信號量,在加鎖? P(_spaceSem); // 申請到了空間信號量,意味著,我一定能進(jìn)行正常的生產(chǎn) pthread_mutex_lock(&_pmutex); _queue[_productorStep++] = in; _productorStep %= _cap; pthread_mutex_unlock(&_pmutex); V(_dataSem); } // 消費者 void Pop(T *out) { // 你認(rèn)為:現(xiàn)加鎖,后申請信號量,還是現(xiàn)申請信號量,在加鎖? P(_dataSem); pthread_mutex_lock(&_cmutex); *out = _queue[_consumerStep++]; _consumerStep %= _cap; pthread_mutex_unlock(&_cmutex); V(_spaceSem); } ~RingQueue() { sem_destroy(&_spaceSem); sem_destroy(&_dataSem); pthread_mutex_destroy(&_pmutex); pthread_mutex_destroy(&_cmutex); } private: std::vector int _cap; sem_t _spaceSem; // 生產(chǎn)者 想生產(chǎn),看中的是什么資源呢? 空間資源 sem_t _dataSem; // 消費者 想消費,看中的是什么資源呢? 數(shù)據(jù)資源 int _productorStep; int _consumerStep; pthread_mutex_t _pmutex; pthread_mutex_t _cmutex; }; Task.hpp #pragma once #include #include #include #include class Task { using func_t = std::function // typedef std::function public: Task() {} Task(int x, int y, char op, func_t func) :_x(x), _y(y), _op(op), _callback(func) {} std::string operator()() { int result = _callback(_x, _y, _op); char buffer[1024]; snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result); return buffer; } std::string toTaskString() { char buffer[1024]; snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y); return buffer; } private: int _x; int _y; char _op; func_t _callback; }; const std::string oper = "+-*/%"; int mymath(int x, int y, char op) { int result = 0; switch (op) { case '+': result = x + y; break; case '-': result = x - y; break; case '*': result = x * y; break; case '/': { if (y == 0) { std::cerr << "div zero error!" << std::endl; result = -1; } else result = x / y; } break; case '%': { if (y == 0) { std::cerr << "mod zero error!" << std::endl; result = -1; } else result = x % y; } break; default: // do nothing break; } return result; } main.cc #include "RingQueue.hpp" #include "Task.hpp" #include #include #include #include #include std::string SelfName() { char name[128]; snprintf(name, sizeof(name), "thread[0x%x]", pthread_self()); return name; } void *ProductorRoutine(void *rq) { // RingQueue RingQueue while(true) { // version1 // int data = rand() % 10 + 1; // ringqueue->Push(data); // std::cout << "生產(chǎn)完成,生產(chǎn)的數(shù)據(jù)是:" << data << std::endl; // version2 // 構(gòu)建or獲取任務(wù) --- 這個是要花時間的! int x = rand() % 10; int y = rand() % 5; char op = oper[rand()%oper.size()]; Task t(x, y, op, mymath); // 生產(chǎn)任務(wù) ringqueue->Push(t); // 輸出提示 std::cout << SelfName() << ", 生產(chǎn)者派發(fā)了一個任務(wù): " << t.toTaskString() << std::endl; // sleep(1); } } void *ConsumerRoutine(void *rq) { // RingQueue RingQueue while(true) { //version1 // int data; // ringqueue->Pop(&data); // std::cout << "消費完成,消費的數(shù)據(jù)是:" << data << std::endl; // sleep(1); // version2 Task t; //消費任務(wù) ringqueue->Pop(&t); std::string result = t(); // 消費也是要花時間的! std::cout << SelfName() << ", 消費者消費了一個任務(wù): " << result << std::endl; // sleep(1); } } int main() { srand((unsigned int)time(nullptr) ^ getpid() ^ pthread_self() ^ 0x71727374); // RingQueue RingQueue // 單生產(chǎn),單消費,多生產(chǎn),多消費 --> 只要保證,最終進(jìn)入臨界區(qū)的是一個生產(chǎn),一個消費就行! // 多生產(chǎn),多消費的意義?? pthread_t p[4], c[8]; for(int i = 0; i < 4; i++) pthread_create(p+i, nullptr, ProductorRoutine, rq); for(int i = 0; i < 8; i++) pthread_create(c+i, nullptr, ConsumerRoutine, rq); for(int i = 0; i < 4; i++) pthread_join(p[i], nullptr); for(int i = 0; i < 8; i++) pthread_join(c[i], nullptr); delete rq; return 0; } 大家可以自己敲一敲,試一下。 寫到這里,這篇博客就結(jié)束了,下篇博客我們再見。 柚子快報激活碼778899分享:信號量——Linux并發(fā)之魂 好文鏈接
本文內(nèi)容根據(jù)網(wǎng)絡(luò)資料整理,出于傳遞更多信息之目的,不代表金鑰匙跨境贊同其觀點和立場。
轉(zhuǎn)載請注明,如有侵權(quán),聯(lián)系刪除。