您的当前位置:首页正文

多线程~POSIX信号量实现生产者消费者模型,PV操作

2024-11-27 来源:个人技术集锦


1.信号量的概念

        信号量本质就是一把计数器,描述临界资源中资源数目的大小,合理使用可以达到对临界资源进行预订的目的。(最多能有多少资源分配给线程)

        多线程预定资源的方法:临界资源如果可以被划分成为一个一个的小资源,如果处理得当,我们也有可能让多个线程同时访问临界资源的不同区域,从而实现并发并行的访问临界资源。

2.sem_t信号量的操作函数

(1).原理

        比如信号量s,我们对信号量进行+1,或者-1的操作时,在底层实际上是用互斥锁保证了线程安全。当对信号量进行-1操作时,先加锁,判断当前信号量是否 <=0,如果是,则挂起当前线程,然后解锁,等待被唤醒,尝试重新去竞争锁;如果当前信号量 >0,则对信号量进行-1操作,然后解锁。这实际上对应的就是PV操作里面的P操作。

        如果要进行+1操作,底层也是进行了加锁和解锁的,对应的是PV操作中的V操作。

sem_wait和sem_post保证了对信号量的申请是原子的,见下方

(2).sem_t函数的使用

  • 定义信号量

sem_t sem

  • 初始化信号量

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:

        pshared:0表示线程间共享,非零表示进程间共享

        value:信号量初始值

成功返回0,-1表示失败,错误码(默认是-1)可以设置。

  • 销毁信号量

int sem_destroy(sem_t *sem);

  • 等待信号量

功能:等待信号量,会将信号量的值减1

int sem_wait(sem_t *sem); //P()

  • 发布信号量

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。

int sem_post(sem_t *sem);//V()

(3).基于信号量和环形队列的生产者消费者模型

1).大致实现思路

环形队列采用数组模拟,用模运算来模拟环状特性

        环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空(我们采用的方式,用两个信号量就可以判断队列是空还是满)。另外也可以预留一个空的位置,作为满的状态

        当队列为空的时候,应该只让生产者执行;队列为满的时候,应该只让消费者执行。队列为空或者队列为满的时候,生产者和消费者指向的是同一个位置。

        那么,当队列不为空,不为满的时候,生产者和消费者一定指向的不是同一个位置,此时生产和消费可以并发执行

对于生产者:

对于消费者

  1. 对data资源进行P操作,表示消费者用掉一个data资源
  2. 生产完以后,将数据data从环形队列中删除,空出一个black资源
  3. 对black资源进行V操作,表示消费者消费掉一个data,空出一个black资源

Task.hpp

        消费者要消费的数据,就是要执行的任务,这里使用的Task和我上一篇文章中的Task一样,对两个操作数执行加减乘除运算

#pragma once

#include <iostream>
#include <pthread.h>

namespace ns_task
{
    class Task
    {
    private:
        int _x;
        int _y;
        char _op; // 操作符
    public:
        Task() {}
        Task(int x, int y, char op) : _x(x), _y(y), _op(op)
        {
        }
        std::string Show()
        {
            std::string message = std::to_string(_x);
            message += _op;
            message += std::to_string(_y);
            message += "=?";
            return message;
        }
        int run()
        {
            int res = 0;
            switch (_op)
            {
            case '+':
                res = _x + _y;
                break;
            case '-':
                res = _x - _y;
                break;
            case '*':
                res = _x * _y;
                break;
            case '/':
                res = _x / _y;
                break;
            case '%':
                res = _x % _y;
                break;
            default:
                std::cout << "something wrong" << std::endl;
                break;
            }
            std::cout << "当前任务正在被线程: " << pthread_self() << " 处理: "
                      << _x << _op << _y << "=" << res << std::endl;
            return res;
        }
        ~Task() {}
    };
}

circular_queue.hpp

        使用POSIX库中的信号量实现环形队列,定义两个_c_pos和_p_pos分别表示生产者要生产资源的位置和消费者要消费资源的位置,分别对应的是当前环形队列中,数据的头部+1的位置和尾部的位置。此时环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,定义的两个信号量就很好的解决了这个问题,对于消费者,如果有data资源,就可以进行消费;对于生产者,如果有blank空白资源,就可以进行生产,两者通过信号量实现同步。

#pragma once

#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>

namespace ns_circular_queue
{
    const int default_len = 10;

    template <class T>
    class CircularQueue
    {
    private:
        std::vector<T> _circular_queue;
        int _len;
        // 生产者关心空位置资源
        sem_t _blank_sem;
        // 消费者关心数据资源
        sem_t _data_sem;

        int _c_pos;
        int _p_pos;

        pthread_mutex_t _c_mtx;
        pthread_mutex_t _p_mtx;

    public:
        CircularQueue(int len = default_len) : _circular_queue(len), _len(len)
        {
            sem_init(&_blank_sem, 0, len);
            sem_init(&_data_sem, 0, 0);
            _c_pos = _p_pos = 0;

            pthread_mutex_init(&_c_mtx, nullptr);
            pthread_mutex_init(&_p_mtx, nullptr);
        }
        ~CircularQueue()
        {
            sem_destroy(&_blank_sem);
            sem_destroy(&_data_sem);

            pthread_mutex_destroy(&_c_mtx);
            pthread_mutex_destroy(&_p_mtx);
        }

    public:
        void push(const T &in)
        {
            // 生产接口
            sem_wait(&_blank_sem); // P(blank),尝试申请blank资源,如果没有blank资源(信号值 <= 0),当前线程会被挂起

            pthread_mutex_lock(&_p_mtx);

            _circular_queue[_p_pos] = in; // 对于生产者,直接在_p_pos处放入生产的数据即可,当前_p_pos处是没有data的

            // 它也变成了临界资源
            _p_pos++;
            _p_pos %= _len;
            pthread_mutex_unlock(&_p_mtx);

            sem_post(&_data_sem); // V(data),释放data资源,如果信号值>0,其他正在调用sem_wait等待信号量的线程将被唤醒
        }
        void pop(T *out)
        {
            // 消费接口
            sem_wait(&_data_sem); // P(data)

            pthread_mutex_lock(&_c_mtx);
            *out = _circular_queue[_c_pos];
            _c_pos++;
            _c_pos %= _len;
            pthread_mutex_unlock(&_c_mtx);

            sem_post(&_blank_sem); // V(blank)
        }
    };
}

circular_cp.cc

        创建3个消费者线程,2个生产者线程,分别消费任务和生产任务。消费任务就是把数据从队列中拿出来,然后处理;生产任务就是把数据放到队列中

#include "circular_queue.hpp"
#include <pthread.h>
#include <time.h>
#include <unistd.h>
#include "Task.hpp"

using namespace ns_circular_queue;
using namespace ns_task;

void *consumer(void *args)
{
    CircularQueue<Task> *cq = (CircularQueue<Task> *)args;
    while (true)
    {
        Task t;
        cq->pop(&t);
        std::cout << "消费数据: " << t.Show()<< "我是线程: " << pthread_self() << std::endl;
        t.run();
        sleep(1);  //1s
    }
}

void *producter(void *args)
{
    CircularQueue<Task> *cq = (CircularQueue<Task> *)args;
    const std::string ops = "+-*/%";
    while (true)
    {
        int x = rand() % 20 + 1;
        int y = rand() % 10 + 1;
        char op = ops[rand() % ops.size()];
        Task t(x, y, op);

        std::cout << "生产数据:  " << t.Show() << "我是线程: " << pthread_self() << std::endl;
        cq->push(t);
        usleep(100000);  //0.1s
    }
}

int main()
{
    srand((long long)time(nullptr));
    CircularQueue<Task> *rq = new CircularQueue<Task>();

    pthread_t c1, c2, c3, p1, p2;

    pthread_create(&c1, nullptr, consumer, (void *)rq);
    pthread_create(&c2, nullptr, consumer, (void *)rq);
    pthread_create(&c3, nullptr, consumer, (void *)rq);
    pthread_create(&p1, nullptr, producter, (void *)rq);
    pthread_create(&p2, nullptr, producter, (void *)rq);

    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(c3, nullptr);
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);

    return 0;
}

运行结果

显示全文