Linux_生产消费模型_Block_Queue

目录

一、互斥锁

1.1 错误的抢票

1.1.1 类的成员函数与构造

1.1.2 start 函数 

1.1.3 线程的回调函数 

1.1.4 main 函数

1.1.5 结果  

1.2 概念

1.3 相关系统调用

1.3.1 锁的创建

1.3.2 锁的初始化

1.3.2.1 动态初始化

1.3.2.2 静态初始化

1.3.3 锁的销毁

1.3.4 加锁和解锁

1.4 正确的抢票

1.4.1 简单锁的封装

1.4.2 线程封装的改进 

1.4.3 main 函数的改进

二、条件变量

2.1 概念

2.2 相关系统调用

2.2.1 条件变量的创建

2.2.2 条件变量的初始化

2.2.3 条件变量的销毁

2.2.4 条件变量的等待

2.2.5 条件变量的唤醒

三、单线程生产消费模型(BQ)

3.1 初步测试代码

3.2 阻塞队列的封装

3.2.1 成员变量

3.2.2 构造析构

3.2.3 生产者模型

3.2.4 消费者模型

3.2.5 整体代码 

3.3 Main.cc 的修改

3.3.1 整体代码

3.3.2 int main 部分

3.3.3 start 函数

3.3.4 wait 函数

3.4 整体代码

3.4.1 BlockQueue.hpp

3.4.2 Thread.hpp

3.4.3 Main.cc

四、多生产多消费模型


一、互斥锁

1.1 错误的抢票

下面先看封装的一个简单的多线程:

#pragma once

#include <iostream>
#include <string>
#include <pthread.h>
namespace ThreadMoudle
{
    typedef void(*func_t)(const std::string &name);//函数指针类型
    class Thread
    {
    public:
        void Excute()
        {
            std::cout << _name << " is running" << std:: endl;
            _isrunning = true;
            _func(_name);
            _isrunning = false;
        }
    public:
        Thread(const std::string &name, func_t func):_name(name), _func(func)
        {
            std::cout << "create: " << name << "done" << std::endl;
        }
        std::string Name()
        {
            return _name;
        }
        static void *ThreadRoutine(void *args)
        {
            Thread *self = static_cast<Thread*>(args);//获得了当前对象
            self->Excute();
            return nullptr;
        }
        bool Start()
        {
            int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);
            if (n != 0) return false;//创建函数成功返回0
            return true;
        }
        std::string Status()
        {
            if (_isrunning) return "running";
            else return "sleep";
        }
        void Stop()
        {
            if (_isrunning)
            {
                ::pthread_cancel(_tid);
                _isrunning = false;
                std::cout << "Stop" << std::endl;
                std::cout << _name << " Stop" << std::endl;
            }
        }
        void Join()
        {
            if (!_isrunning)
            {
                ::pthread_join(_tid, nullptr);
                std::cout << _name << "Join" << std::endl;
                std::cout << _name << "Joined" << std::endl;
            }
        }
        ~Thread()
        {
        }
    private:
        std::string _name;
        pthread_t _tid;
        bool _isrunning;
        func_t _func;//线程要执行的回调函数
    };
}

1.1.1 类的成员函数与构造

现在来逐步看这个封装类,首先来看一下类成员与类的构造函数:

    typedef void(*func_t)(const std::string &name);//函数指针类型    
    public:
        Thread(const std::string &name, func_t func):_name(name), _func(func)
        {
            std::cout << "create: " << name << "done" << std::endl;
        }
    private:
        std::string _name;
        pthread_t _tid;
        bool _isrunning;
        func_t _func;//线程要执行的回调函数

这里为线程起了一个名字_name,线程的tid为_tid,线程的状态以表明该线程是否在运行_isrunning,以及线程要执行的回调函数。

1.1.2 start 函数 

        bool Start()
        {
            int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);
            if (n != 0) return false;//创建函数成功返回0
            return true;
        }

在start函数中,封装了 pthread_create 的函数调用,这里不进行详细说明,以后专门写一篇来描述单线程以及多线程方面的知识。这里的 pthreada_create 就是创建一个线程,而且因为传入的是_tid 的引用,所以该系统调用会自动把 _tid 修改为创建线程的 tid 。

1.1.3 线程的回调函数 

    public:
        void Excute()
        {
            std::cout << _name << " is running" << std:: endl;
            _isrunning = true;
            _func(_name);
            _isrunning = false;
        }
        static void *ThreadRoutine(void *args)
        {
            Thread *self = static_cast<Thread*>(args);//获得了当前对象
            self->Excute();
            return nullptr;
        }

这里并没有直接写线程要执行的回调函数操作,而是当作中介将参数作为 args参数包 传递给了Excute 函数,这样做的目的是为了让函数更简洁并且这里也有一个坑,成员函数不能直接用作线程函数,因为成员函数需要一个隐藏的 this 指针,而线程库无法自动传递这个指针(参考1.0.2的start函数中的 pthread_create 系统调用):

void* (*start_routine) (void*);//pthread库的线程函数签名是固定的

因此,需要定义一个静态的 ThreadRoutine 函数,这个函数不依赖于任何对象实例,可以被pthread_create 直接调用。在 Thread_Routine 中,通过 static_cast 将 void* 类型的参数转换为 Thread* 类型的指针,从而获得对象实例的指针,然后调用该对象的成员函数 Excute 。这样,Excute 函数就可以访问该对象的成员变量和成员函数。 

1.1.4 main 函数

首先以简单的思维创建一下多线程,模拟一下让几个线程一起进行抢票:

#include <iostream>
#include <unistd.h>
#include <vector>
#include "Thread.hpp"
using namespace ThreadMoudle;
int tickets = 1000;

void route(const std::string &name)
{
    while(true)
    {
        if(tickets > 0)
        {
            // 抢票过程
            usleep(1000); // 1ms -> 抢票花费的时间
            printf("who: %s, get a ticket: %d\n", name.c_str(), tickets);
            tickets--;
        }
        else
        {
            break;
        }
    }
}
int main()
{
    Thread t1("thread-1", route);
    Thread t2("thread-2", route);
    Thread t3("thread-3", route);
    Thread t4("thread-4", route);

    t1.Start();
    t2.Start();
    t3.Start();
    t4.Start();

    t1.Join();
    t2.Join();
    t3.Join();
    t4.Join();
}

按代码来说,在回调函数 route 中,设置了while (tikects > 0) 的条件,也就是说,当 tickets 为正数时,才会进入循环内部执行打印操作,理应抢到还有 0 张票时就停止,但是,运行几次可能会发现,最后会出现某个线程 get 了第 0 张票甚至是负数票。

1.1.5 结果  

观看下面的结果,就会发现错误,这里引入一个概念,票数在这里其实相当于临界资源,所有线程都可以访问并修改,那么就存在线程执行的过快,最后一张票可能沦落到多个线程手中,但是最快执行完的线程打印出了 get a ticket:1 ,并把 1 进行了修改,其他的线程因为执行速度过快,在票还没有变为 0 之前就已经进入了循环,所以会继续执行后续的操作。

打个比方,如果一辆公交车限乘30人,每个人上车后需要进行刷卡,当第30个人已经上车时,后面还有两三个人也挤上来了,第30个人完成刷卡后,公交车检测刷卡人数等于30人,关闭了车门,但此时车上的人已经多余30人,多余的人也可以继续刷卡乘车。

此时,就需要一个规则,比如公交车一次上一人,这个人刷卡后才能上第二个,迁移到 Linux ,这就是互斥锁。

1.2 概念

临界资源:多线程执行流共享的资源就叫做临界资源
临界区:每个线程内部,访问临界资源的代码,就叫做临界区

互斥锁就可以锁住临界区,当开锁后,临界区才会打开来接收下一个线程,下一个线程被成功接收后又进行了上锁,这里,每个线程的关系称为互斥。就好比公交车一次一人的规则,这里的每个乘客就是互斥关系。

互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用

每个线程执行的操作不能被打断,要么就不进行,要么就进行完成,这就是原子性。好比一个人上公交车要么就不乘车,要么就刷卡上车,不能还没刷卡就被别人打断。

原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成

1.3 相关系统调用

1.3.1 锁的创建

这是互斥锁的类型,当要设置互斥锁的时候就可以使用:

pthread_mutex_t _mutex;

1.3.2 锁的初始化

1.3.2.1 动态初始化
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);

参数:
mutex:要初始化的互斥量,注意要传引用
attr:nullptr

1.3.2.2 静态初始化

以上是动态初始化,在定义 mutex 时,同样可以静态传值直接初始化:

pthread_mutex_t _mutex = PTHREAD_MUTEX_INITIALIZER;

1.3.3 锁的销毁

销毁互斥量需要注意:
1.使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
2.不要销毁一个已经加锁的互斥量
3.已经销毁的互斥量,要确保后面不会有线程再尝试加锁

int pthread_mutex_destroy(pthread_mutex_t *mutex);

注意这里也要传入锁的引用。

1.3.4 加锁和解锁

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
//返回值:成功返回0,失败返回错误号

加锁时,可能会遇到以下的情况:

1.互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
2.发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。

这里的阻塞不是一直阻塞,当锁空着的时候就会自动加锁然后返回成功。

1.4 正确的抢票

知道了 mutex 的使用,就可以在上述代码中把抢票的代码改为临界区。

1.4.1 简单锁的封装

这里当调用锁的时候就会自动构造初始化加锁,出临界区后自动调用析构函数进行解锁。 

#pragma once
#include <pthread.h>

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
    {
        pthread_mutex_lock(_mutex);//构造函数时直接加锁
    }
    ~LockGuard()
    {
        pthread_mutex_unlock(_mutex);//析构函数时直接解锁
    }
private:
    pthread_mutex_t *_mutex;
};

1.4.2 线程封装的改进 

这里只是多加了一个线程的数据类型的类 ThreadData ,在这个类中设置了一个锁,并且在线程封装中使用 ThreadData 新建了变量 td ,这样在需要加锁时只需要创建 td 中的锁(详见main函数)即可。

所以这里的 Thread 的构造函数也调整了,可以直接使用 Thread 的构造函数为 _td 传入锁。

#pragma once
#include <iostream>
#include <string>
#include <pthread.h>

namespace ThreadMoudle
{
    class ThreadData
    {
    public:
        ThreadData(const std::string &name, pthread_mutex_t *lock):_name(name), _lock(lock)
        {}
    public:
        std::string _name;
        pthread_mutex_t *_lock;
    };

    typedef void (*func_t)(ThreadData *td); // 函数指针类型

    class Thread
    {
    public:
        void Excute()
        {
            std::cout << _name << " is running" << std::endl;
            _isrunning = true;
            _func(_td);
            _isrunning = false;
        }
    public:
        Thread(const std::string &name, func_t func, ThreadData *td):_name(name), _func(func), _td(td)
        {
            std::cout << "create " << name << " done" << std::endl;
        }
        static void *ThreadRoutine(void *args) // 新线程都会执行该方法!
        {
            Thread *self = static_cast<Thread*>(args); // 获得了当前对象
            self->Excute();
            return nullptr;
        }
        bool Start()
        {
            int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);
            if(n != 0) return false;
            return true;
        }
        std::string Status()
        {
            if(_isrunning) return "running";
            else return "sleep";
        }
        void Stop()
        {
            if(_isrunning)
            {
                ::pthread_cancel(_tid);
                _isrunning = false;
                std::cout << _name << " Stop" << std::endl;
            }
        }
        void Join()
        {
            ::pthread_join(_tid, nullptr);
            std::cout << _name << " Joined" << std::endl;
            delete _td;
        }
        std::string Name()
        {
            return _name;
        }
        ~Thread()
        {
        }

    private:
        std::string _name;
        pthread_t _tid;
        bool _isrunning;
        func_t _func; // 线程要执行的回调函数
        ThreadData *_td;
    };
} // namespace ThreadModle

1.4.3 main 函数的改进

首先进行了锁的创建于初始化,其次将锁传入创建的线程中,当使用回调函数后,传入类的锁然后进行锁的构造,就可以完成加锁的操作,出生命域临时变量被销毁此时自动解锁。

#include <iostream>
#include <vector>
#include <cstdio>
#include <unistd.h>
#include "Thread.hpp"
#include "LockGuard.hpp"

using namespace ThreadMoudle;

int tickets = 10000; // 共享资源,造成了数据不一致的问题

void route(ThreadData *td)
{
    while (true)
    {
        LockGuard lockguard(td->_lock); // RAII风格的锁
        if (tickets > 0)
        {
            // 抢票过程
            usleep(1000); // 1ms -> 抢票花费的时间
            printf("who: %s, get a ticket: %d\n", td->_name.c_str(), tickets);
            tickets--;
        }
        else
        {
            break;
        }
    }
}

static int threadnum = 4;

int main()
{
    pthread_mutex_t mutex;
    pthread_mutex_init(&mutex, nullptr);

    std::vector<Thread> threads;
    for(int i = 0; i < threadnum; i++)
    {
        std::string name = "thread-" + std::to_string(i+1);
        ThreadData *td = new ThreadData(name, &mutex);
        threads.emplace_back(name, route, td);
    }

    for(auto &thread : threads)
    {
        thread.Start();
    }


    for(auto &thread : threads)
    {
        thread.Join();
    }

    pthread_mutex_destroy(&mutex);

}

二、条件变量

2.1 概念

  • 等待特定条件的发生: 条件变量允许一个或多个线程等待某个条件变为真,而无需不断轮询某个变量的状态,从而节省CPU资源线程可以在等待条件变量的同时进入休眠状态,当条件满足时被唤醒。

  • 线程间的协调和通信: 条件变量提供了一种机制,使得一个线程可以通知另一个线程某个条件已经满足。这对于需要在多个线程之间共享资源并协调访问的场景非常有用。

  • 解决生产者-消费者问题: 在生产者-消费者问题中,生产者线程生成数据并放入缓冲区,消费者线程从缓冲区中取出数据处理。条件变量可以用于协调生产者和消费者之间的工作,例如,当缓冲区为空时消费者等待,当缓冲区有数据时通知消费者。

这里也打个比方,如果想买某个商品但是商店缺货,我们需要反复跑去超市确认,毋庸置疑,这是一件费事费力的事情, 但是如果当第一次去超市时,把我们的电话留给店员,当有货时就电话通知我们,是不是就方便灵活了很多?

2.2 相关系统调用

2.2.1 条件变量的创建

pthread_cond_t

和互斥锁类似,条件变量的数据类型为 pthread_cond_t 

2.2.2 条件变量的初始化

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrictattr);

参数:
cond:要初始化的条件变量
attr:nullptr 

2.2.3 条件变量的销毁

int pthread_cond_destroy(pthread_cond_t *cond)

2.2.4 条件变量的等待

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);

参数:
cond:要在这个条件变量上等待
mutex:互斥量,后面详细解释

2.2.5 条件变量的唤醒

int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

其中第一个是唤醒一个线程,第二个是唤醒所有的线程。 

三、单线程生产消费模型(BQ)

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

3.1 初步测试代码

这里直接复用上述封装的线程库:

#ifndef __THREAD_HPP__
#define __THREAD_HPP__

#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>

namespace ThreadModule
{
    template<typename T>
    using func_t = std::function<void(T&)>;
    // typedef std::function<void(const T&)> func_t;

    template<typename T>
    class Thread
    {
    public:
        void Excute()
        {
            _func(_data);
        }
    public:
        Thread(func_t<T> func, T &data, const std::string &name="none-name")
            : _func(func), _data(data), _threadname(name), _stop(true)
        {}
        static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!
        {
            Thread<T> *self = static_cast<Thread<T> *>(args);
            self->Excute();
            return nullptr;
        }
        bool Start()
        {
            int n = pthread_create(&_tid, nullptr, threadroutine, this);
            if(!n)
            {
                _stop = false;
                return true;
            }
            else
            {
                return false;
            }
        }
        void Detach()
        {
            if(!_stop)
            {
                pthread_detach(_tid);
            }
        }
        void Join()
        {
            if(!_stop)
            {
                pthread_join(_tid, nullptr);
            }
        }
        std::string name()
        {
            return _threadname;
        }
        void Stop()
        {
            _stop = true;
        }
        ~Thread() {}

    private:
        pthread_t _tid;
        std::string _threadname;
        T &_data;  // 为了让所有的线程访问同一个全局变量
        func_t<T> _func;
        bool _stop;
    };
} // namespace ThreadModule

#endif

main 函数中,使用 vector<Thread<int>> 包装一个线程数组, 

#include "BlockQueue.hpp"
#include "Thread.hpp"
#include <string>
#include <vector>
#include <unistd.h>

using namespace ThreadModule;
int a = 10;
void Consumer(int &data)
{
    while (data)
    {
        std::cout << "Consumer: " << data-- << std::endl;
        sleep(1);
    }
}
void StartConsumer(std::vector<Thread<int>> *threads, int num)
{
    for (int i = 0; i < num; i++)
    {
        std::string name = "Thread-" + std::to_string(i + 1);
        threads->emplace_back(Consumer, a, name);//执行的函数 向函数传入的参数 name
        threads->back().Start();//取到最后一个线程并启动
    }
}

void Productor(int &data)
{
    while (data)
    {
        std::cout << "Productor: " << data-- << std::endl;
        sleep(1);
    }
}
void StartProductor(std::vector<Thread<int>> *threads, int num)
{
    for (int i = 0; i < num; i++)
    {
        std::string name = "Thread-" + std::to_string(i + 1);
        threads->emplace_back(Productor, a, name);
        threads->back().Start();
    }
}

void WaitAllThread(std::vector<Thread<int>> &threads)
{
    for (auto &thread:threads)
    {
        thread.Join();
    }
}
int main()
{
    std::vector<Thread<int>> threads;
    StartConsumer(&threads, 1);
    StartProductor(&threads, 1);
    WaitAllThread(threads);
    return 0;
}



以上是对消费者与生产者提供的接口,每次启动时把线程的数组传入,以及创建的线程数量传入即可,这里因为是单线程, num 默认传为 1 .



在对消费者与生产者提供的接口中,封装一个消费者与生产者执行的行为。其次,代表其行为的函数可以传入引用来标准执行的次数,所以就需要传入全局变量而不是局部变量,可以看到最上方定义了一个全局变量 a 来规定执行的次数。

下面看一下程序,生产者和消费者都在有序的访问并修改同一份空间。

3.2 阻塞队列的封装

3.2.1 成员变量

下面就要开始封装 blockqueue 阻塞队列,首先来看一下成员变量:

private:
    std::queue<T> _block_queue;   // 阻塞队列,是被整体使用的!!!
    int _cap;
    pthread_mutex_t _mutex;
    pthread_cond_t _product_cond;
    pthread_cond_t _consume_cond;

阻塞队列其中一定要包含一个队列,所以定义了一个 _block_queue 的队列,以后生产者和消费者就在这个队列中添加与取数据。 

其次,我们需要生产者与消费者有序的访问而且访问时不再受其他线程的打扰,所以还需要带锁, _mutex 。
最后,在阻塞队列中,当队列满了以后,生产者无法生产;当队列为空时,消费者无法消费。这就要求我们为生产者与消费者各定义一个条件变量:_product_cond  _consume_cond

3.2.2 构造析构

这两个成员函数相对来说比较简单,只需要继续使用系统调用即可:

public:
    BlockQueue(int cap):_cap(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_product_cond, nullptr);
        pthread_cond_init(&_consume_cond, nullptr);
    }    
    ~BlockQueue()
    {
        _cap = 0;
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_product_cond);
        pthread_cond_destroy(&_consume_cond);
    }

3.2.3 生产者模型

生产者模型其实就是入队列的过程,当队列为满时,停止入队,此时让它的条件变量休眠,反之,唤醒条件变量并将生产的数据加入队列:

    void Enqueue(T &in)//提供给生产者的接口
    {
        pthread_mutex_lock(&_mutex);
        if (IsFull())//队列已满,无法继续生产
        {
            //释放锁:当线程调用pthread_cond_wait时,它会自动释放传入的mutex,这样其他线程就可以获取这个互斥锁并修改共享资源。
            //线程在条件变量上等待,直到其他线程发出信号(通过pthread_cond_signal或pthread_cond_broadcast)来通知条件满足。
            pthread_cond_wait(&_product_cond, &_mutex);
        }
        //满足条件可以生产
        _block_queue.push(in);
        //唤醒消费者来消费
        pthread_cond_signal(&_consume_cond);
        pthread_mutex_unlock(&_mutex);
    }

其中, pthread_cond_wait 的作用值得再次阐述一遍:

释放锁:当线程调用pthread_cond_wait时,它会自动释放传入的mutex,这样其他线程就可以获取这个互斥锁并修改共享资源。

线程在条件变量上等待,直到其他线程发出信号(通过pthread_cond_signal或pthread_cond_broadcast)来通知条件满足并重新拿到锁。

3.2.4 消费者模型

消费者模型与生产者模型类似,都是一样的逻辑,但是当队列为空时:wait ;当队列为满时:signal 

    void Pop(T *out)//提供给消费者的接口
    {
        pthread_mutex_lock(&_mutex);
        if (IsEmpty())
        {
            pthread_cond_wait(&_consume_cond, &_mutex);
        }
        *out = _block_queue.front();
        _block_queue.pop();
        //唤醒生产者来生产
        pthread_cond_signal(&_consume_cond);
        pthread_mutex_unlock(&_mutex);
    }

3.2.5 整体代码 

#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>

template<typename T>
class BlockQueue
{
private:
    bool IsFull()
    {
        return _block_queue.size() == _cap;
    }
    bool IsEmpty()
    {
        return _block_queue.empty();
    }
public:
    BlockQueue(int cap):_cap(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_product_cond, nullptr);
        pthread_cond_init(&_consume_cond, nullptr);
    }
    void Enqueue(T &in)//提供给生产者的接口
    {
        pthread_mutex_lock(&_mutex);
        if (IsFull())//队列已满,无法继续生产
        {
            //释放锁:当线程调用pthread_cond_wait时,它会自动释放传入的mutex,这样其他线程就可以获取这个互斥锁并修改共享资源。
            //线程在条件变量上等待,直到其他线程发出信号(通过pthread_cond_signal或pthread_cond_broadcast)来通知条件满足。
            pthread_cond_wait(&_product_cond, &_mutex);
        }
        //满足条件可以生产
        _block_queue.push(in);
        //唤醒消费者来消费
        pthread_cond_signal(&_consume_cond);
        pthread_mutex_unlock(&_mutex);
    }
    void Pop(T *out)//提供给消费者的接口
    {
        pthread_mutex_lock(&_mutex);
        if (IsEmpty())
        {
            pthread_cond_wait(&_consume_cond, &_mutex);
        }
        *out = _block_queue.front();
        _block_queue.pop();
        //唤醒生产者来生产
        pthread_cond_signal(&_consume_cond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        _cap = 0;
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_product_cond);
        pthread_cond_destroy(&_consume_cond);
    }
private:
    std::queue<T> _block_queue;   // 阻塞队列,是被整体使用的!!!
    int _cap;
    pthread_mutex_t _mutex;
    pthread_cond_t _product_cond;
    pthread_cond_t _consume_cond;
};
#endif

3.3 Main.cc 的修改

3.3.1 整体代码

#include "BlockQueue.hpp"
#include "Thread.hpp"
#include <string>
#include <vector>
#include <unistd.h>

using namespace ThreadModule;
int a = 10;

void Consumer(BlockQueue<int> &bq)
{
    while (true)
    {
        int data;
        bq.Pop(&data);
        std::cout << "Consumer Consum data is : " << data << " addr: " << &bq << std::endl;
        sleep(1);
    }
}

void Productor(BlockQueue<int> &bq)
{
    int cnt = 1;
    while (true)
    {
        bq.Enqueue(cnt);
        std::cout << "Productor product data is : " << cnt << " addr: " << &bq << std::endl;
        cnt++;
    }
}

void StartComm(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq, func_t<BlockQueue<int>> func)
{
    for (int i = 0; i < num; i++)
    {
        std::string name = "thread-" + std::to_string(i + 1);
        threads->emplace_back(func, bq, name);
        threads->back().Start();
    }
}

void StartConsumer(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{
    StartComm(threads, num, bq, Consumer);
}

void StartProductor(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{
    StartComm(threads, num, bq, Productor);
}

void WaitAllThread(std::vector<Thread<BlockQueue<int>>> &threads)
{
    for (auto &thread:threads)
    {
        thread.Join();
    }
}
int main()
{
    BlockQueue<int> *bq = new BlockQueue<int>(10);
    std::vector<Thread<BlockQueue<int>>> threads;

    StartProductor(&threads, 1, *bq);
    StartConsumer(&threads, 1, *bq);
    WaitAllThread(threads);
    return 0;
}

3.3.2 int main 部分

int main()
{
    BlockQueue<int> *bq = new BlockQueue<int>(10);
    std::vector<Thread<BlockQueue<int>>> threads;

    StartProductor(&threads, 1, *bq);
    StartConsumer(&threads, 1, *bq);
    WaitAllThread(threads);
    return 0;
}

下面从 main 函数作为切入点,首先我们把线程数组中的线程节点改为阻塞队列,紧接着就开始调用生产者与消费者的 start 函数,并传入了新建的阻塞队列 bq ,保证生产者与消费者使用的是同一份阻塞队列

3.3.3 start 函数

为了函数的简洁,我们创建了一个公共区域:

void StartComm(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq, func_t<BlockQueue<int>> func)
{
    for (int i = 0; i < num; i++)
    {
        std::string name = "thread-" + std::to_string(i + 1);
        threads->emplace_back(func, bq, name);
        threads->back().Start();
    }
}

void StartConsumer(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{
    StartComm(threads, num, bq, Consumer);
}

void StartProductor(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{
    StartComm(threads, num, bq, Productor);
}

现在不论生产者或是消费者在启动时,只需要访问同一份代码即可,不同的是传入的线程执行方法不同。 

在 StartComm 中,对线程数组进行了 emplace_back ,emplace_back 会根据传入的参数自动构造一个数组的成员,也就是 thread ,传入参数为线程的执行方法、阻塞队列、线程名称。

然后来看一下 StartComm 中不同线程要执行的不同方法,就是分别打印信息:

void Consumer(BlockQueue<int> &bq)
{
    while (true)
    {
        int data;
        bq.Pop(&data);
        std::cout << "Consumer Consum data is : " << data << " addr: " << &bq << std::endl;
        sleep(1);
    }
}

void Productor(BlockQueue<int> &bq)
{
    int cnt = 1;
    while (true)
    {
        bq.Enqueue(cnt);
        std::cout << "Productor product data is : " << cnt << " addr: " << &bq << std::endl;
        cnt++;
    }
}

下面重新回看 int main 部分:

int main()
{
    BlockQueue<int> *bq = new BlockQueue<int>(10);
    std::vector<Thread<BlockQueue<int>>> threads;
    StartProductor(&threads, 1, *bq);
    StartConsumer(&threads, 1, *bq);
    WaitAllThread(threads);
    return 0;
}

首行的 bq 默认的容量设置为 10 ,我们创建的生产者线程数量为 1 ,所以生产者的执行函数会执行 10 次,依次打印 1 - 11 的线程名称。然后创建了消费者线程数量,此时消费者的执行函数又会执行 10 次,所以打印内容为:
 
可以看到它们访问的地址空间都是同一份。

3.3.4 wait 函数

线程等待就比较简单了,直接调用系统调用即可。

3.4 整体代码

3.4.1 BlockQueue.hpp

#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>

template<typename T>
class BlockQueue
{
private:
    bool IsFull()
    {
        return _block_queue.size() == _cap;
    }
    bool IsEmpty()
    {
        return _block_queue.empty();
    }
public:
    BlockQueue(int cap):_cap(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_product_cond, nullptr);
        pthread_cond_init(&_consume_cond, nullptr);
    }
    void Enqueue(T &in)//提供给生产者的接口
    {
        pthread_mutex_lock(&_mutex);
        if (IsFull())//队列已满,无法继续生产
        {
            //释放锁:当线程调用pthread_cond_wait时,它会自动释放传入的mutex,这样其他线程就可以获取这个互斥锁并修改共享资源。
            //线程在条件变量上等待,直到其他线程发出信号(通过pthread_cond_signal或pthread_cond_broadcast)来通知条件满足。
            pthread_cond_wait(&_product_cond, &_mutex);
        }
        //满足条件可以生产
        _block_queue.push(in);
        //唤醒消费者来消费
        pthread_cond_signal(&_consume_cond);
        pthread_mutex_unlock(&_mutex);
    }
    void Pop(T *out)//提供给消费者的接口
    {
        pthread_mutex_lock(&_mutex);
        if (IsEmpty())
        {
            pthread_cond_wait(&_consume_cond, &_mutex);
        }
        *out = _block_queue.front();
        _block_queue.pop();
        //唤醒生产者来生产
        pthread_cond_signal(&_consume_cond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        _cap = 0;
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_product_cond);
        pthread_cond_destroy(&_consume_cond);
    }
private:
    std::queue<T> _block_queue;   // 阻塞队列,是被整体使用的!!!
    int _cap;
    pthread_mutex_t _mutex;
    pthread_cond_t _product_cond;
    pthread_cond_t _consume_cond;
};
#endif

3.4.2 Thread.hpp

#ifndef __THREAD_HPP__
#define __THREAD_HPP__

#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>

namespace ThreadModule
{
    template<typename T>
    using func_t = std::function<void(T&)>;
    // typedef std::function<void(const T&)> func_t;

    template<typename T>
    class Thread
    {
    public:
        void Excute()
        {
            _func(_data);
        }
    public:
        Thread(func_t<T> func, T &data, const std::string &name="none-name")
            : _func(func), _data(data), _threadname(name), _stop(true)
        {}
        static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!
        {
            Thread<T> *self = static_cast<Thread<T> *>(args);
            self->Excute();
            return nullptr;
        }
        bool Start()
        {
            int n = pthread_create(&_tid, nullptr, threadroutine, this);
            if(!n)
            {
                _stop = false;
                return true;
            }
            else
            {
                return false;
            }
        }
        void Detach()
        {
            if(!_stop)
            {
                pthread_detach(_tid);
            }
        }
        void Join()
        {
            if(!_stop)
            {
                pthread_join(_tid, nullptr);
            }
        }
        std::string name()
        {
            return _threadname;
        }
        void Stop()
        {
            _stop = true;
        }
        ~Thread() {}

    private:
        pthread_t _tid;
        std::string _threadname;
        T &_data;  // 为了让所有的线程访问同一个全局变量
        func_t<T> _func;
        bool _stop;
    };
} // namespace ThreadModule

#endif

3.4.3 Main.cc

#include "BlockQueue.hpp"
#include "Thread.hpp"
#include <string>
#include <vector>
#include <unistd.h>

using namespace ThreadModule;
//int a = 10;

void Consumer(BlockQueue<int> &bq)
{
    while (true)
    {
        int data;
        bq.Pop(&data);
        std::cout << "Consumer Consum data is : " << data << " addr: " << &bq << std::endl;
        sleep(1);
    }
}

void Productor(BlockQueue<int> &bq)
{
    int cnt = 1;
    while (true)
    {
        bq.Enqueue(cnt);
        std::cout << "Productor product data is : " << cnt << " addr: " << &bq << std::endl;
        cnt++;
    }
}

void StartComm(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq, func_t<BlockQueue<int>> func)
{
    for (int i = 0; i < num; i++)
    {
        std::string name = "thread-" + std::to_string(i + 1);
        threads->emplace_back(func, bq, name);
        threads->back().Start();
    }
}

void StartConsumer(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{
    StartComm(threads, num, bq, Consumer);
}

void StartProductor(std::vector<Thread<BlockQueue<int>>> *threads, int num, BlockQueue<int> &bq)
{
    StartComm(threads, num, bq, Productor);
}

void WaitAllThread(std::vector<Thread<BlockQueue<int>>> &threads)
{
    for (auto &thread:threads)
    {
        thread.Join();
    }
}
int main()
{
    BlockQueue<int> *bq = new BlockQueue<int>(10);
    std::vector<Thread<BlockQueue<int>>> threads;
    StartProductor(&threads, 1, *bq);
    StartConsumer(&threads, 1, *bq);
    WaitAllThread(threads);
    return 0;
}

四、多生产多消费模型

假设一个场景,假设以下为消费者,当队列有1个数据时,多个线程同时被唤醒,那么此时只有一个线程可以竞争得到锁,看一下我们的程序:


此时线程已经被唤醒,应该直接执行下面的代码,但是锁并不在它身上,当它访问的时候,阻塞队列中唯一的数据已经被竞争锁成功的线程拿走了,此时再进行访问就会出现错误,这种唤醒被称作伪唤醒。所以我们需要把判断中的 if 改为 while ,每次都要判断阻塞队列是否为空,同时,生产者线程中每次都要判断队列是否为满。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/762077.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Linux系统】文件描述符fd

1.回顾一下文件 我们之前对文件的理解是在语言层上&#xff0c;而语言层去理解文件是不可能的&#xff01;&#xff01;&#xff01; 下面是一份c语言文件操作代码&#xff01;&#xff01;&#xff01; #include<stdio.h> int main() {FILE* fd fopen("lo…

节点级、系统级、实车级的LIN测试主要差异点

文章目录 前言一、节点级1.前期准备2.测试执行 二、系统级1.前期准备2.测试执行 三、实车级1.前期准备2.测试执行 总结 前言 LIN协议一致性测试主要指的是物理层&#xff08;电阻、电容、电压、地偏移、显隐性电平、频率占空比、位时间等&#xff09;、数据链路层&#xff08;…

mysql mgr集群断电重启

一、前言 mysql mgr集群所有节点都断电重启时&#xff0c;就会面临一个问题&#xff0c;应该怎么重新构建mgr集群 二、操作 查询所有节点的master状态 show master status; 查看同步状态&#xff0c;可以通过uuid知道是通过哪个节点进行同步的数据 查看所有节点的uuid&#x…

鸿蒙 HarmonyOs 动画效果 快速入门

一、理论 1.1 animation属性 名称参数类型必填描述durationnumber否设置动画时长&#xff0c;默认值&#xff1a;1000&#xff0c;单位&#xff1a;毫秒temponumber否动画播放速度。数值越大&#xff0c;速度越快&#xff0c;默认为1curvestring | Curve否 设置动画曲线。 默…

element el-table表格切换分页保留分页数据+限制多选数量

el-table表格并没有相关的方法来禁用表头里面的多选按钮 那么我们可以另辟蹊径&#xff0c;来实现相同的多选切换分页&#xff08;保留分页数据&#xff09; 限制多选数量的效果 <el-table:data"tableData"style"width: 100%">// 不使用el-talbe自带…

EDI是什么?与ERP有何关系

EDI的发展过程 电子数据交换&#xff08;Electronic Data Interchange&#xff0c;EDI&#xff09;是一种通过电子方式传输商业文件的技术。EDI的历史可以追溯到20世纪60年代&#xff0c;当时企业开始使用计算机进行数据处理。最早的EDI系统是为解决大型企业间的信息交换问题而…

微信AI机器人智能助手:利用大模型定制训练知识库

随着人工智能技术的迅速发展&#xff0c;AI已经渗透到了我们生活得方方面面。AI文本撰写、AI绘画、AI生成视频、AI换脸等各类应用层出不穷。作为领先的创新人工智能和元宇宙厂商&#xff0c;道可云凭借自身在人工智能、元宇宙、虚拟数字人等领域的技术积累&#xff0c;将AI技术…

文本超长省略的几种方式(vue)

第一种&#xff0c;纯css 在给容器设置宽度后&#xff0c;使用css来省略文本超长部分&#xff0c;但是这样就看不到全部的内容 <template><div class"content"><div class"text">{{ text }}</div></div> </template>&…

Vue3 登录成功,浏览器存在toke,再次访问/login路由到/index 首页页面

文章目录 目录 文章目录 流程 小结 概要流程技术细节小结 概要 首先需要清楚知道浏览器localstorage和Session storage的区别 localStorage 和 sessionStorage 是 HTML5 提供的两种客户端存储数据的方法&#xff0c;它们在使用和生命周期上有一些区别&#xff1a; 1. 生命周期…

1.回溯算法.题目

1.回溯算法.题目 题目9.子集问题10.子集||11.递增子序列12.全排列13.全排列||14.回溯算法去重问题的另外一个写法15.重新安排行程16.N皇后 总结去重方式的不同 题目 9.子集问题 &#xff08;题目链接&#xff09; 给定一组不含重复元素的整数数组 nums&#xff0c;返回该数组…

宝塔linux网站迁移步骤

网站迁移到新服务器步骤 1.宝塔网站迁移&#xff0c;有个一键迁移工具&#xff0c;参考官网 宝塔一键迁移API版本 3.0版本教程 - Linux面板 - 宝塔面板论坛 (bt.cn)2 2.修改域名解析为新ip 3.如果网站没有域名&#xff0c;而是用ip访问的&#xff0c;则新宝塔数据库的wp_o…

mysql主键自增连续新增时报错主键重复-Duplicate entry “x” for key PRIMARY

mysql主键自增连续新增时报错主键重复 1、mysql数据库设置数据库主键自增的规律 id -- AUTO_INCREMENT2、可视化工具查看自增没问题 3、问题描述 新增第一个时操作成功&#xff0c;新增第二个时候操作失败报错&#xff1a; Duplicate entry “x” for key PRIMARY4、分析&a…

[BUUCTF从零单排] Web方向 02.Web入门篇之『常见的搜集』解题思路(dirsearch工具详解)

这是作者新开的一个专栏《BUUCTF从零单排》&#xff0c;旨在从零学习CTF知识&#xff0c;方便更多初学者了解各种类型的安全题目&#xff0c;后续分享一定程度会对不同类型的题目进行总结&#xff0c;并结合CTF书籍和真实案例实践&#xff0c;希望对您有所帮助。当然&#xff0…

手把手教你考下39张免费亚马逊AWS证书和学习徽章

小李哥目前共考了39项亚马逊云(AWS)徽章&#xff0c;这也是普通用户可考的全部徽章。这篇文章会介绍如何报名、复习、通过这39张徽章提升云计算基本技能&#xff0c;了解全球第一大云厂亚马逊云科技前沿技术。这篇文章在领英爆&#x1f525;&#xff0c;有将近100k浏览量和11k的…

Linux:系统安全及应用

目录 一、系统账号管理 1.1、系统账号清理 1.2、密码安全控制 1.3、命令历史限制 二、限制su命令用户 三、PAM安全认证 四、sudo机制提升权限 4.1、sudo机制介绍 4.2、用户别名案例 4.3、启用sudo操作日志 4.4、其他案列sudo 4.5、开关机安全控制 4.6、限制更改GR…

root密码忘了怎么办(从系统引导过程解决)

目录 1.Linux系统密码忘记 2.系统引导过程 2.1 systemd 2.2 GRUB和GRUB2 2.3 运行级别 3.修复MBR扇区故障和GRUB引导故障 3.1 MBR扇区故障 3.2 GRUB引导故障 1.Linux系统密码忘记 我们在生活中经常遇到这类困扰&#xff0c;就是某个账号还是账户密码忘了&#xff0c;这…

Docker 部署 Nacos v2.3.2 版本

文章目录 Github官网文档Nacos 生态图Nacos Dockerdocker-compose.ymlapplication.propertiesNacos 官方示例 Github https://github.com/alibaba/nacos 官网 https://nacos.io/ 文档 https://nacos.io/docs/latest/what-is-nacos/ Nacos 生态图 Nacos Docker 镜像&…

《信创数据库沙龙上海站:共话发展,智启未来》

2024 年 6 月 29 日周六 14:00&#xff0c;信创数据库沙龙在上海市徐汇区建国西路 285 号科投大厦 13 楼金星厅成功举办。本次活动吸引了众多学术界和产业界的专家、学者以及技术爱好者参与。 活动中&#xff0c;多位嘉宾带来了精彩分享。薛晓刚探讨了 Oracle 在国内的前景&a…

Java全套智慧校园系统源码:微信小程序+电子班牌 让教育更智能化的一套数字化校园管理系统源码

Java全套智慧校园系统源码&#xff1a;微信小程序电子班牌 让教育更智能化的一套数字化校园管理系统源码 智慧校园管理系统是一种利用科技手段优化学校教育和管理的平台。它可以涵盖多个方面&#xff0c;例如教学、管理、服务等。其中包括智能化教室、智慧校园卡、校园安全监控…

基于flask的闪现、g对象、蓝图

【 一 】闪现&#xff08;flash&#xff09; # 1 flask中得闪现存放数据的地方&#xff0c;一旦取了&#xff0c;数据就没了-实现跨请求间传递数据 # 2 django中有没有类似的东西&#xff1f;message 消息框架# 3 基本使用1 设置&#xff1a;flash(欢迎你、欢迎来到澳门赌场&a…