使用STL线程库模拟生产者消费者问题

Apr 20, 2020 · 2 min read

在C++11的STL线程库没有实现现成的信号量,可以使用互斥量和条件变量实现信号量机制。

建议使用自己定义的namespace以防止与现成的函数/类产生冲突。

由于使用了lambda函数和C++11的STL线程库,因此编译时需要指定 –std=c++11

/**
 * @file: main.cpp
 * @brief: Simulate Productor/Consumer Problem
 * @author: YangLei
 * @date: 2020-04-20
 */

#include <list>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <iostream>

#define BUFFER_SIZE 5

namespace userdef {
class semaphore {
 public:
    //  construction function
    explicit semaphore(int value = 1): count(value), wakeups(0) {}
    //  semWait(or operation P)
    void wait() {
        std::unique_lock<std::mutex> lock(mutex);
        if (--count < 0) {
            condition.wait(lock, [&]()->bool{return wakeups > 0;});
            --wakeups;
        }
    }
    //  semSignal(or operation V)
    void signal() {
        std::lock_guard<std::mutex> lock(mutex);
        if (++count <= 0) {
            ++wakeups;
            condition.notify_one();
        }
    }
 private:
    int count;
    int wakeups;
    std::mutex mutex;
    std::condition_variable condition;
};  //  class semaphore
};  //  namespace userdef

userdef::semaphore product(BUFFER_SIZE);    //  space for new product  
userdef::semaphore consume(0);              //  products in buffer
userdef::semaphore mutex(1);                //  mutex for buffer operation
std::list<int> products;                    //  products buffer
unsigned int seed = (unsigned int)time(NULL);

/**
 * @brief: Thread for Productors
 * @params: n(int) -- total umber of products
 */
void productor(int n) {
    for (int i = 0; i < n; i++) {
        product.wait();                     //  wait for buffer space
        int size = rand_r(&seed) % 10;
        std::this_thread::sleep_for(
            std::chrono::milliseconds(200 * (rand_r(&seed) % 10)));
        mutex.wait();                       //  mutex lock
        std::cout << "Product: "  << size << std::endl;
        products.push_back(size);
        mutex.signal();                     //  mutex unlock
        consume.signal();                   //  signal to consumer
    }
    quit.signal();
}

/**
 * @brief: Thread for Consumers
 * @params: None
 */
void consumer() {
    while (true) {
        consume.wait();                     //  wait for products
        mutex.wait();                       //  mutex lock
        int size = products.front();
        products.pop_front();
        std::cout << "Consume: " << size << std::endl;
        mutex.signal();                     //  mutex unlock
        product.signal();                   //  signal to productor
        std::this_thread::sleep_for(std::chrono::milliseconds(200 * size));
    }
}

/**
 * @brief: main function for simulation
 */
int main(int argc, char* argv[]) {
    int number = 10;
    std::thread prod(productor, number);    //  productor thread
    std::thread cons(consumer);             //  consumer thread
    prod.join();
    cons.join();
    return 0;
}