信号量本质就是一把计数器,描述临界资源中资源数目的大小,合理使用可以达到对临界资源进行预订的目的。(最多能有多少资源分配给线程)
多线程预定资源的方法:临界资源如果可以被划分成为一个一个的小资源,如果处理得当,我们也有可能让多个线程同时访问临界资源的不同区域,从而实现并发并行的访问临界资源。
比如信号量s,我们对信号量进行+1,或者-1的操作时,在底层实际上是用互斥锁保证了线程安全。当对信号量进行-1操作时,先加锁,判断当前信号量是否 <=0,如果是,则挂起当前线程,然后解锁,等待被唤醒,尝试重新去竞争锁;如果当前信号量 >0,则对信号量进行-1操作,然后解锁。这实际上对应的就是PV操作里面的P操作。
如果要进行+1操作,底层也是进行了加锁和解锁的,对应的是PV操作中的V操作。
sem_wait和sem_post保证了对信号量的申请是原子的,见下方
sem_t sem
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
成功返回0,-1表示失败,错误码(默认是-1)可以设置。
int sem_destroy(sem_t *sem);
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空(我们采用的方式,用两个信号量就可以判断队列是空还是满)。另外也可以预留一个空的位置,作为满的状态
当队列为空的时候,应该只让生产者执行;队列为满的时候,应该只让消费者执行。队列为空或者队列为满的时候,生产者和消费者指向的是同一个位置。
那么,当队列不为空,不为满的时候,生产者和消费者一定指向的不是同一个位置,此时生产和消费可以并发执行
对于生产者:
对于消费者
消费者要消费的数据,就是要执行的任务,这里使用的Task和我上一篇文章中的Task一样,对两个操作数执行加减乘除运算
#pragma once
#include <iostream>
#include <pthread.h>
namespace ns_task
{
class Task
{
private:
int _x;
int _y;
char _op; // 操作符
public:
Task() {}
Task(int x, int y, char op) : _x(x), _y(y), _op(op)
{
}
std::string Show()
{
std::string message = std::to_string(_x);
message += _op;
message += std::to_string(_y);
message += "=?";
return message;
}
int run()
{
int res = 0;
switch (_op)
{
case '+':
res = _x + _y;
break;
case '-':
res = _x - _y;
break;
case '*':
res = _x * _y;
break;
case '/':
res = _x / _y;
break;
case '%':
res = _x % _y;
break;
default:
std::cout << "something wrong" << std::endl;
break;
}
std::cout << "当前任务正在被线程: " << pthread_self() << " 处理: "
<< _x << _op << _y << "=" << res << std::endl;
return res;
}
~Task() {}
};
}
使用POSIX库中的信号量实现环形队列,定义两个_c_pos和_p_pos分别表示生产者要生产资源的位置和消费者要消费资源的位置,分别对应的是当前环形队列中,数据的头部+1的位置和尾部的位置。此时环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,定义的两个信号量就很好的解决了这个问题,对于消费者,如果有data资源,就可以进行消费;对于生产者,如果有blank空白资源,就可以进行生产,两者通过信号量实现同步。
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>
namespace ns_circular_queue
{
const int default_len = 10;
template <class T>
class CircularQueue
{
private:
std::vector<T> _circular_queue;
int _len;
// 生产者关心空位置资源
sem_t _blank_sem;
// 消费者关心数据资源
sem_t _data_sem;
int _c_pos;
int _p_pos;
pthread_mutex_t _c_mtx;
pthread_mutex_t _p_mtx;
public:
CircularQueue(int len = default_len) : _circular_queue(len), _len(len)
{
sem_init(&_blank_sem, 0, len);
sem_init(&_data_sem, 0, 0);
_c_pos = _p_pos = 0;
pthread_mutex_init(&_c_mtx, nullptr);
pthread_mutex_init(&_p_mtx, nullptr);
}
~CircularQueue()
{
sem_destroy(&_blank_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_c_mtx);
pthread_mutex_destroy(&_p_mtx);
}
public:
void push(const T &in)
{
// 生产接口
sem_wait(&_blank_sem); // P(blank),尝试申请blank资源,如果没有blank资源(信号值 <= 0),当前线程会被挂起
pthread_mutex_lock(&_p_mtx);
_circular_queue[_p_pos] = in; // 对于生产者,直接在_p_pos处放入生产的数据即可,当前_p_pos处是没有data的
// 它也变成了临界资源
_p_pos++;
_p_pos %= _len;
pthread_mutex_unlock(&_p_mtx);
sem_post(&_data_sem); // V(data),释放data资源,如果信号值>0,其他正在调用sem_wait等待信号量的线程将被唤醒
}
void pop(T *out)
{
// 消费接口
sem_wait(&_data_sem); // P(data)
pthread_mutex_lock(&_c_mtx);
*out = _circular_queue[_c_pos];
_c_pos++;
_c_pos %= _len;
pthread_mutex_unlock(&_c_mtx);
sem_post(&_blank_sem); // V(blank)
}
};
}
创建3个消费者线程,2个生产者线程,分别消费任务和生产任务。消费任务就是把数据从队列中拿出来,然后处理;生产任务就是把数据放到队列中
#include "circular_queue.hpp"
#include <pthread.h>
#include <time.h>
#include <unistd.h>
#include "Task.hpp"
using namespace ns_circular_queue;
using namespace ns_task;
void *consumer(void *args)
{
CircularQueue<Task> *cq = (CircularQueue<Task> *)args;
while (true)
{
Task t;
cq->pop(&t);
std::cout << "消费数据: " << t.Show()<< "我是线程: " << pthread_self() << std::endl;
t.run();
sleep(1); //1s
}
}
void *producter(void *args)
{
CircularQueue<Task> *cq = (CircularQueue<Task> *)args;
const std::string ops = "+-*/%";
while (true)
{
int x = rand() % 20 + 1;
int y = rand() % 10 + 1;
char op = ops[rand() % ops.size()];
Task t(x, y, op);
std::cout << "生产数据: " << t.Show() << "我是线程: " << pthread_self() << std::endl;
cq->push(t);
usleep(100000); //0.1s
}
}
int main()
{
srand((long long)time(nullptr));
CircularQueue<Task> *rq = new CircularQueue<Task>();
pthread_t c1, c2, c3, p1, p2;
pthread_create(&c1, nullptr, consumer, (void *)rq);
pthread_create(&c2, nullptr, consumer, (void *)rq);
pthread_create(&c3, nullptr, consumer, (void *)rq);
pthread_create(&p1, nullptr, producter, (void *)rq);
pthread_create(&p2, nullptr, producter, (void *)rq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
return 0;
}