生产者消费者模式

    • o.引言
    • 1.demo
    • 2.cartographer 中的生产消费者

o.引言

  • 参考

假设有两个进程(或线程)A、B和一个固定大小的缓冲区,A进程生产数据放入缓冲区,B进程从缓冲区中取出数据进行计算,这就是一个简单的生产者-消费者模型。这里的A进程相当于生产者,B进程相当于消费者。

生产者消费者模式插图

生产者/消费者模式是一种常见的并发编程模式,其中生产者线程生成数据并将其放入缓冲区,而消费者线程从缓冲区中取出数据进行处理。这种模式通过缓冲区解耦了生产者和消费者的速度差异。

生产者-消费者模型特点

  • 保证生产者不会在缓冲区满的时候继续向缓冲区放入数据,而消费者也不会在缓冲区空的时候,消耗数据
  • 当缓冲区满的时候,生产者会进入休眠状态,当下次消费者开始消耗缓冲区数据时,生产者才会被唤醒,开始往缓冲区中添加数据;当缓冲区空的时候,消费者也会进入休眠状态,知道生产者往缓冲区添加数据时才会被唤醒

生产者-消费者模型的优点

  • (1)解耦合:将生产者类和消费者类进行解耦,消除代码之间的依赖性,简化工作负载的管理
  • (2)复用:通过生产者和消费者类独立开来,那么可以对生产者类型和消费者类进行独立的复用和扩展
  • (3)调整并发数:由于生产者和消费者的处理速度是不一样的,可以调整并发数,给予慢的一方多的并发数,来提高任务的处理速度
  • (4)异步:对于生产者和消费者来说能各司其职,生产者只需要关心缓冲区是否还有数据,不需要等待消费者处理完;同样对于消费者涞水,也只需要关注缓冲区的内容,不需要关注生产者,通过异步的方式支持高并发,将一个耗时的流程拆成生产和消费两个阶段,这样生产者因为执行put()的时间比较短,而支持高并发。
  • (5)支持分布式:生产者和消费者通过队列进行通信,所有不需要在同一台机器上,在分布式环境中可以通过redis的list作为队列,而消费者只需要轮询队列中是否有数据。同事还能支持集群的伸缩性,当某台机器宕掉的时候不会导致整个集群宕掉。

生产者消费者模式插图(1)

生产者消费者模式插图(2)

1.demo

#include 
#include 
#include 
#include 
#include 
#include 
// 共享队列和相关同步机制
std::queue<int> buffer;
const unsigned int buffer_size = 10;
std::mutex mtx;
std::condition_variable cv_producer, cv_consumer;
// 生产者函数
void producer(int id, int produce_count) {
for (int i = 0; i < produce_count; ++i) {
std::unique_lock<std::mutex> lock(mtx);
cv_producer.wait(lock, [] { return buffer.size() < buffer_size; });
buffer.push(i);
std::cout << "Producer " << id << " produced: " << i << std::endl;
lock.unlock();
cv_consumer.notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产延迟
}
}
// 消费者函数
void consumer(int id, int consume_count) {
for (int i = 0; i < consume_count; ++i) {
std::unique_lock<std::mutex> lock(mtx);
cv_consumer.wait(lock, [] { return !buffer.empty(); });
int item = buffer.front();
buffer.pop();
std::cout << "Consumer " << id << " consumed: " << item << std::endl;
lock.unlock();
cv_producer.notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(150)); // 模拟消费延迟
}
}
int main() {
const int num_producers = 2;
const int num_consumers = 2;
const int items_per_producer = 20;
const int items_per_consumer = 20;
std::thread producers[num_producers];
std::thread consumers[num_consumers];
// 创建生产者线程
for (int i = 0; i < num_producers; ++i) {
producers[i] = std::thread(producer, i + 1, items_per_producer);
}
// 创建消费者线程
for (int i = 0; i < num_consumers; ++i) {
consumers[i] = std::thread(consumer, i + 1, items_per_consumer);
}
// 等待所有线程完成
for (int i = 0; i < num_producers; ++i) {
producers[i].join();
}
for (int i = 0; i < num_consumers; ++i) {
consumers[i].join();
}
return 0;
}

两个生产者两个消费者处理过程:

生产者消费者模式插图(3)

2.cartographer 中的生产消费者

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
template <typename T>
class BlockingQueue {
public:
static constexpr size_t kInfiniteQueueSize = 0;
// Constructs a blocking queue with infinite queue size.
BlockingQueue() : BlockingQueue(kInfiniteQueueSize) {}
BlockingQueue(const BlockingQueue&) = delete;
BlockingQueue& operator=(const BlockingQueue&) = delete;
// Constructs a blocking queue with a size of 'queue_size'.
explicit BlockingQueue(const size_t queue_size) : queue_size_(queue_size) {}
// Pushes a value onto the queue. Blocks if the queue is full.
void Push(std::optional<T> t) {
std::unique_lock<std::mutex> lock(mutex_);
cond_full_.wait(lock, [this] { return QueueNotFullCondition(); });
queue_.push(std::move(t));
cond_empty_.notify_one();
}
// Like push, but returns false if 'timeout' is reached.
bool PushWithTimeout(std::optional<T> t, const std::chrono::milliseconds timeout) {
std::unique_lock<std::mutex> lock(mutex_);
if (!cond_full_.wait_for(lock, timeout, [this] { return QueueNotFullCondition(); })) {
return false;
}
queue_.push(std::move(t));
cond_empty_.notify_one();
return true;
}
// Pops the next value from the queue. Blocks until a value is available.
std::optional<T> Pop() {
std::unique_lock<std::mutex> lock(mutex_);
cond_empty_.wait(lock, [this] { return !QueueEmptyCondition(); });
std::optional<T> t = std::move(queue_.front());
queue_.pop();
cond_full_.notify_one();
return t;
}
// Like Pop, but can timeout. Returns nullopt in this case.
std::optional<T> PopWithTimeout(const std::chrono::milliseconds timeout) {
std::unique_lock<std::mutex> lock(mutex_);
if (!cond_empty_.wait_for(lock, timeout, [this] { return !QueueEmptyCondition(); })) {
return std::nullopt;
}
std::optional<T> t = std::move(queue_.front());
queue_.pop();
cond_full_.notify_one();
return t;
}
// Returns the number of items currently in the queue.
size_t Size() {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
}
// Blocks until the queue is empty.
void WaitUntilEmpty() {
std::unique_lock<std::mutex> lock(mutex_);
cond_empty_.wait(lock, [this] { return QueueEmptyCondition(); });
}
private:
bool QueueEmptyCondition() const {
return queue_.empty();
}
bool QueueNotFullCondition() const {
return queue_size_ == kInfiniteQueueSize || queue_.size() < queue_size_;
}
mutable std::mutex mutex_;
const size_t queue_size_;
std::queue<std::optional<T>> queue_;
std::condition_variable cond_full_;
std::condition_variable cond_empty_;
};
void Producer(BlockingQueue<int>& queue, int id, int num_items) {
for (int i = 0; i < num_items; ++i) {
queue.Push(i);
std::cout << "Producer " << id << " produced: " << i << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));  // 模拟生产时间
}
}
void Consumer(BlockingQueue<int>& queue, int id) {
while (true) {
auto item = queue.Pop();
if (item) {
std::cout << "Consumer " << id << " consumed: " << *item << std::endl;
} else {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(150));  // 模拟消费时间
}
}
int main() {
BlockingQueue<int> queue(10);  // 队列容量为10
const int num_producers = 2;
const int num_consumers = 2;
const int num_items_per_producer = 10;
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
for (int i = 0; i < num_producers; ++i) {
producers.emplace_back(Producer, std::ref(queue), i, num_items_per_producer);
}
for (int i = 0; i < num_consumers; ++i) {
consumers.emplace_back(Consumer, std::ref(queue), i);
}
for (auto& producer : producers) {
producer.join();
}
// 使用空对象来通知消费者停止
for (int i = 0; i < num_consumers; ++i) {
queue.Push(std::optional<int>());  // Push an empty optional to signal end
}
for (auto& consumer : consumers) {
consumer.join();
}
return 0;
}

两个生产者两个消费者处理过程:

生产者消费者模式插图(4)

本站无任何商业行为
个人在线分享-虚灵IT资料分享 » 生产者消费者模式
E-->