C++并發(fā)開發(fā)(1)- 生產(chǎn)者消費者模型

打算先看一下生產(chǎn)者消費者模型,在進行從頭到尾系統(tǒng)的學習。
參考文章:C++11 并發(fā)指南九(綜合運用: C++11 多線程下生產(chǎn)者消費者模型詳解)

1 單生產(chǎn)者-單消費者模型


源碼及解析如下:

#include <unistd.h>

#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

static const int kItemRepositorySize  = 10; // Item buffer size.
static const int kItemsToProduce  = 1000;   // How many items we plan to produce.

struct ItemRepository {
    int item_buffer[kItemRepositorySize]; // 產(chǎn)品緩沖區(qū), 配合 read_position 和 write_position 模型環(huán)形隊列.
    size_t read_position; // 消費者讀取產(chǎn)品位置.
    size_t write_position; // 生產(chǎn)者寫入產(chǎn)品位置.
    std::mutex mtx; // 互斥量,保護產(chǎn)品緩沖區(qū)
    std::condition_variable repo_not_full; // 條件變量, 指示產(chǎn)品緩沖區(qū)不為滿.
    std::condition_variable repo_not_empty; // 條件變量, 指示產(chǎn)品緩沖區(qū)不為空.
} gItemRepository; // 產(chǎn)品庫全局變量, 生產(chǎn)者和消費者操作該變量.

typedef struct ItemRepository ItemRepository;


void ProduceItem(ItemRepository *ir, int item)
{
    std::unique_lock<std::mutex> lock(ir->mtx);//可以簡單的理解為開啟線程鎖
    while(((ir->write_position + 1) % kItemRepositorySize)== ir->read_position) 
    { // item buffer is full, just wait here.這時說明倉庫滿了,也就是寫入的指針已經(jīng)追著讀取的指針追到一圈了
        std::cout << "Producer is waiting for an empty slot...\n";
        (ir->repo_not_full).wait(lock); // 生產(chǎn)者等待"產(chǎn)品庫緩沖區(qū)不為滿"這一條件發(fā)生.
    }

    (ir->item_buffer)[ir->write_position] = item; // 在生產(chǎn)者指針位置寫入產(chǎn)品.
    (ir->write_position)++; // 寫入位置后移.

    if (ir->write_position == kItemRepositorySize) // 寫入位置若是在隊列最后則重新設置為初始位置.
        ir->write_position = 0;

    (ir->repo_not_empty).notify_all(); // 通知消費者產(chǎn)品庫不為空.
    lock.unlock(); // 解鎖.
}

int ConsumeItem(ItemRepository *ir)
{
    int data;
    std::unique_lock<std::mutex> lock(ir->mtx);
    // item buffer is empty, just wait here.
    while(ir->write_position == ir->read_position) {
        std::cout << "Consumer is waiting for items...\n";
        (ir->repo_not_empty).wait(lock); // 消費者等待"產(chǎn)品庫緩沖區(qū)不為空"這一條件發(fā)生.
    }

    data = (ir->item_buffer)[ir->read_position]; // 讀取某一產(chǎn)品
    (ir->read_position)++; // 讀取位置后移

    if (ir->read_position >= kItemRepositorySize) // 讀取位置若移到最后,則重新置位.
        ir->read_position = 0;

    (ir->repo_not_full).notify_all(); // 通知消費者產(chǎn)品庫不為滿.
    lock.unlock(); // 解鎖.

    return data; // 返回產(chǎn)品.
}


void ProducerTask() // 生產(chǎn)者任務
{
    for (int i = 1; i <= kItemsToProduce; ++i) {
        // sleep(1);
        std::cout << "Produce the " << i << "^th item..." << std::endl;
        ProduceItem(&gItemRepository, i); // 循環(huán)生產(chǎn) kItemsToProduce 個產(chǎn)品.
    }
}

void ConsumerTask() // 消費者任務
{
    static int cnt = 0;
    while(1) {
        sleep(1);
        int item = ConsumeItem(&gItemRepository); // 消費一個產(chǎn)品.
        std::cout << "Consume the " << item << "^th item" << std::endl;
        if (++cnt == kItemsToProduce) break; // 如果產(chǎn)品消費個數(shù)為 kItemsToProduce, 則退出.
    }
}

void InitItemRepository(ItemRepository *ir)
{
    ir->write_position = 0; // 初始化產(chǎn)品寫入位置.
    ir->read_position = 0; // 初始化產(chǎn)品讀取位置.
}

int main()
{
    InitItemRepository(&gItemRepository);
    std::thread producer(ProducerTask); // 創(chuàng)建生產(chǎn)者線程.
    std::thread consumer(ConsumerTask); // 創(chuàng)建消費之線程.
    producer.join();
    consumer.join();
}

2 多生產(chǎn)者-單消費者模型

這個可能近期會用到,所以記錄一下。例如數(shù)據(jù)庫操作,可以把要存儲的數(shù)據(jù)緩存下來,這樣可以提高數(shù)據(jù)采集速度,緩存下來的數(shù)據(jù)可以采用事務處理批量操作,又提高了數(shù)據(jù)庫存儲的速度。多個線程采集數(shù)據(jù),單個線程進行數(shù)據(jù)庫存儲。

#include <unistd.h>

#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

static const int kItemRepositorySize  = 4; // Item buffer size.
static const int kItemsToProduce  = 10;   // How many items we plan to produce.

struct ItemRepository {
    int item_buffer[kItemRepositorySize];
    size_t read_position;
    size_t write_position;
    size_t item_counter;
    std::mutex mtx;
    std::mutex item_counter_mtx;
    std::condition_variable repo_not_full;
    std::condition_variable repo_not_empty;
} gItemRepository;

typedef struct ItemRepository ItemRepository;


void ProduceItem(ItemRepository *ir, int item)
{
    std::unique_lock<std::mutex> lock(ir->mtx);
    while(((ir->write_position + 1) % kItemRepositorySize)
        == ir->read_position) { // item buffer is full, just wait here.
        std::cout << "Producer is waiting for an empty slot...\n";
        (ir->repo_not_full).wait(lock);
    }

    (ir->item_buffer)[ir->write_position] = item;
    (ir->write_position)++;

    if (ir->write_position == kItemRepositorySize)
        ir->write_position = 0;

    (ir->repo_not_empty).notify_all();
    lock.unlock();
}

int ConsumeItem(ItemRepository *ir)
{
    int data;
    std::unique_lock<std::mutex> lock(ir->mtx);
    // item buffer is empty, just wait here.
    while(ir->write_position == ir->read_position) {
        std::cout << "Consumer is waiting for items...\n";
        (ir->repo_not_empty).wait(lock);
    }

    data = (ir->item_buffer)[ir->read_position];
    (ir->read_position)++;

    if (ir->read_position >= kItemRepositorySize)
        ir->read_position = 0;

    (ir->repo_not_full).notify_all();
    lock.unlock();

    return data;
}

void ProducerTask()
{
    bool ready_to_exit = false;
    while(1) {
        sleep(1);
        std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx);//區(qū)別是這里加了個鎖
        if (gItemRepository.item_counter < kItemsToProduce) {
            ++(gItemRepository.item_counter);
            ProduceItem(&gItemRepository, gItemRepository.item_counter);
            std::cout << "Producer thread " << std::this_thread::get_id()
                << " is producing the " << gItemRepository.item_counter
                << "^th item" << std::endl;
        } else ready_to_exit = true;
        lock.unlock();
        if (ready_to_exit == true) break;
    }
    std::cout << "Producer thread " << std::this_thread::get_id()
                << " is exiting..." << std::endl;
}

void ConsumerTask()
{
    static int item_consumed = 0;
    while(1) {
        sleep(1);
        ++item_consumed;
        if (item_consumed <= kItemsToProduce) {
            int item = ConsumeItem(&gItemRepository);
            std::cout << "Consumer thread " << std::this_thread::get_id()
                << " is consuming the " << item << "^th item" << std::endl;
        } else break;
    }
    std::cout << "Consumer thread " << std::this_thread::get_id()
                << " is exiting..." << std::endl;
}

void InitItemRepository(ItemRepository *ir)
{
    ir->write_position = 0;
    ir->read_position = 0;
    ir->item_counter = 0;
}

int main()
{
    InitItemRepository(&gItemRepository);
    std::thread producer1(ProducerTask);
    std::thread producer2(ProducerTask);
    std::thread producer3(ProducerTask);
    std::thread producer4(ProducerTask);
    std::thread consumer(ConsumerTask);

    producer1.join();
    producer2.join();
    producer3.join();
    producer4.join();
    consumer.join();

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容