您的当前位置:首页正文

多线程操作:互斥、同步与信号量(生产者消费者模型两种实现方式)

2024-11-07 来源:个人技术集锦

互斥与同步的概念

  • 互斥:在一个进程中,有时候我们需要创建多个线程来完成任务,由于这多个线程是共享该进程的资源,那么就可能多个线程同时访问同一块资源,那么这块资源就称为“临界资源”,访问临界资源的代码称为“临界区”由于线程的调度是由操作系统决定的,所以不能确定线程的执行顺序,有可能在A线程访问临界资源的时候,对临界资源进行了修改,而此时还有一个B线程也在临界区内,但线程B不知道线程A已经修改了数据,就会造成数据二义性。互斥的意思就是在同一时刻内临界资源只能有一个线程访问,当有进程在访问临界资源时,其他进程只能等该进程访问结束后才可以访问临界资源。
  • 同步:多个线程都要访问临界资源,有时候访问临界资源必须要有先后顺序,如A线程需要对临界资源进行处理,而B线程要从临界资源拿到A线程处理后的数据。那么B线程就必须等A线程访问临界资源后才可以访问。这就是同步。

基于锁实现互斥与同步

既然一次只能有一个线程访问临界资源,那我们就可以考虑给临界资源加上一把锁,每当有一个线程要开始临界资源,都要先查看锁的状态,如果锁已经处于锁住状态则表明现在有线程正在访问临界资源。当一个线程访问结束后再释放锁,这样其他线程就可以访问临界资源了。

下面介绍一中锁:互斥锁

pthread_mutex_t lock;

pthread_mutex_t是锁的类型,对锁的操作都是通过相关函数进行的。

初始化锁与销毁锁:

pthread_mutex_init();
pthread_mutex_destroy();

加锁:

pthread_mutex_lock();

解锁:

pthread_mutex_unlock();

下面实现一个互斥的例子,模拟抢票。
票务系统一次放出100张票,多个线程同时开始抢票。

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>


int ticket = 100;//全局变量,每个线程都可以看到
pthread_mutex_t lock;

void* get_ticket(void* arg){
  while(1){
    //pthread_mutex_lock(&lock);
    if(ticket > 0){
      usleep(1000);//抢票需要时间
      printf("I am thread :%p, I get a ticket: %d\n",pthread_self(), ticket);
      ticket--;
    }
    else{
     // pthread_mutex_unlock(&lock);
      break;
    }
    //pthread_mutex_unlock(&lock);
  }

}

int main(){
  pthread_t tid[5];
  pthread_mutex_init(&lock, NULL);
  int i = 0;
  for(; i < 5; i++){
  	//创建5个线程,每个线程都执行抢票函数。
    pthread_create(tid+i, NULL, get_ticket, (void*)&i);
  }

  for(i = 0; i < 5; i++){
  	//等待5个线程
    pthread_join(tid[i], NULL);
  }
  pthread_mutex_destroy(&lock);
  return 0;
}

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>


int ticket = 100;//全局变量,每个线程都可以看到
pthread_mutex_t lock;//定义锁

void* get_ticket(void* arg){
  while(1){
  	usleep(1000);//防止票都被一个线程抢光
    pthread_mutex_lock(&lock);//加锁
    if(ticket > 0){
      usleep(1000);//抢票需要时间
      printf("I am thread :%p, I get a ticket: %d\n",pthread_self(), ticket);
      ticket--;
    }
    else{
      pthread_mutex_unlock(&lock);//票已经抢完,解锁
      break;
    }
    pthread_mutex_unlock(&lock);//正常抢到一张票,解锁
  }

}

int main(){
  pthread_t tid[5];
  pthread_mutex_init(&lock, NULL);
  int i = 0;
  for(; i < 5; i++){
    pthread_create(tid+i, NULL, get_ticket, (void*)&i);
  }

  for(i = 0; i < 5; i++){
    pthread_join(tid[i], NULL);
  }
  pthread_mutex_destroy(&lock);
  return 0;
}

基于条件变量和锁实现同步

要实现同步,就要有对应的等待条件,我们使用条件变量来与锁来实现同步。

phhread_t cond

定义一个条件变量,对条件变量的操作都是通过函数来实现的。

初始化与销毁条件变量:

pthread_cond_init();
pthread_cond_destroy();

pthread_cond_destroy();
参数:要销毁的条件变量。

在某个条件变量下等待

pthread_cond_wait();

因为同步访问的也都是临界资源,一个线程发现条件不满足时开始等待,当条件满足时该线程获取到信息,同时申请锁,继续在临界区进行访问。

通知某个条件变量下等待的线程继续工作

pthread_cond_signal();

我们通过生产者消费者模型来实现同步。

基于阻塞队列的生产者与消费者模型

  • 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

  • 生产者消费者模型优点
    解耦
    支持并发
    支持忙闲不均

基于BlockingQueue的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

使用C++来编写代码:
model.hpp:

#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>

#define NUM 5//队列最多存储5个数据


class BlockQueue{
    public:
        BlockQueue(size_t cap = NUM):_cap(cap){
            pthread_mutex_init(&_lock, NULL);
            pthread_cond_init(&_full, NULL);
            pthread_cond_init(&_empty, NULL);
        }
        ~BlockQueue(){
            pthread_mutex_destroy(&_lock);
            pthread_cond_destroy(&_full);
            pthread_cond_destroy(&_empty);
        }

        bool isfull(){
            return q.size() >= _cap;
        }

        bool isempty(){
            return q.size() == 0;
        }
        void put(int& data){//生产数据
            pthread_mutex_lock(&_lock);//访问临界资源先加锁
            while(isfull()){//当前容器已满,条件不满足
                pthread_cond_signal(&_empty);//唤醒正在等待的消费者
                std::cout << "the queue is full... i am waiting!" << std::endl;
                pthread_cond_wait(&_full, &_lock);//生产者开始等待
            }
            q.push(data);//条件满足,将数据放入容器
            pthread_mutex_unlock(&_lock);//离开临界区,解锁
        }

        void pop(){//消费数据
            pthread_mutex_lock(&_lock);//访问临界资源,先加锁
            while(isempty()){//当前容器为空,条件不满足
                pthread_cond_signal(&_full);//唤醒正在等待的生产者
                std::cout << "the queue is empty.. i am waiting!" << std::endl;
                pthread_cond_wait(&_empty, &_lock);//消费者开始等待
            }
            int data = q.front();
            q.pop();//取出数据
            pthread_mutex_unlock(&_lock);//离开临界区,解锁
            std::cout << "i got you! :" << data << " * 2 = " << data * 2 << std::endl; 
        }
    private:
        size_t _cap;//容器的大小
        std::queue<int> q;//容器
        pthread_cond_t _full;//容器满的条件变量
        pthread_cond_t _empty;//容器空的条件变量
        pthread_mutex_t _lock;//锁

};

model.cc

#include "model.hpp"


void* producer(void* arg){
    BlockQueue* bq = (BlockQueue*)arg;
    while(1){
        int data = rand() % 100;
        bq->put(data);
        sleep(1);
    }
}

void* consumer(void* arg){
    BlockQueue* bq = (BlockQueue*)arg;
    while(1){
        bq->pop();
        sleep(1);
    }
}

int main(){
    BlockQueue bq;
    pthread_t a, b;
    //创建两个线程,一个负责生产数据,一个负责消费数据
    pthread_create(&a, NULL, producer, (void*)&bq);
    pthread_create(&b, NULL, consumer, (void*)&bq);
    pthread_join(a, NULL);
    pthread_join(b, NULL);
    return 0;
}

运行结果:

什么是信号量?

对于临界资源,我们最开始的认知是一次只能有一个线程访问,那有没有这种情况,临界资源很大,一个线程只访问其中的一部分,而另外一个线程访问临界资源的另一部分,两个线程互不干扰,这样可不可以实现多个线程同时访问一块临界资源呢?
答案当然是可以的。

信号量可以简单理解为一个计数器。记录当前临界资源还可以被多少个线程访问。

sem_t sem;

定义一个信号量。

初始化与销毁信号量:

sem_init();
sem_destroy();

sem_wait();
sem_post();

下面我们还是通过生产者与消费者模型来理解信号量。

基于信号量的生产者与消费者模型

我们这次使用vector来作为底层容器,因为vector支持下标访问,便于操作。
该模型由于使用信号量实现,所以生产者与消费者会同时访问临界资源,但是互不干扰。过程可理解为一个环形。

head放数据,tail拿数据。如果head速度慢,tail会追上head,此时应该tail停止(等待),head放过数据后,tail才可以拿数据。如果tail速度慢,head会追上tail,此时head应该停止(等待),tail拿过数据后,有空的地方,head才能继续放数据。

代码实现:
Ring.hpp

#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>
#include <stdlib.h>
#include <unistd.h>

#define NUM 5

class RingQueue{
    public:
        RingQueue(int cap = NUM) :_cap(cap), _consumerStep(0), _producerStep(0){
            _v.resize(_cap);
            sem_init(&_dataSem, 0, 0);
            sem_init(&_spaceSem, 0, cap);
        }

        ~RingQueue(){
            sem_destroy(&_dataSem);
            sem_destroy(&_spaceSem);
        }

        void push(int& data){
            sem_wait(&_spaceSem);//空间信号量-1
            _v[_producerStep++] = data;//生产者把数据放入对应格子
            std::cout << "I put a data, you can get it..." << std::endl;
            _producerStep %= _cap;//防止越界
            sem_post(&_dataSem);//数据信号量+1
        }

        void pop(){
            sem_wait(&_dataSem);//数据信号量-1
            int data = _v[_consumerStep++];//消费者从对应位置拿数据
            _consumerStep %= _cap;//防止越界
            std::cout << "I get a data: " << data << " * 2 = " << data * 2 << std::endl;
            sem_post(&_spaceSem);//空间信号量+1
        }

    private:
        std::vector<int> _v;//容器
        int _cap;//容器大小
        sem_t _dataSem;//数据信号量,代表当前容器中还有多少个数据
        sem_t _spaceSem;//空间信号量,代表当前容器还有多少个空间
        int _consumerStep;//消费者下一步要从哪个位置取数据
        int _producerStep;//生产者下一步要向哪个位置放数据
};

Ring.cc

#include "RingQueue.hpp"



void* producer(void* arg){
    
    RingQueue* rq = (RingQueue*)arg;
    while(true){
        int data = rand() % 100;
        rq->push(data);
    }
}

void* consumer(void* arg){
    RingQueue* rq = (RingQueue*)arg;
    while(true){
        rq->pop();
        sleep(1);
    }
}

int main(){
    pthread_t a, b;
    RingQueue rq;
    //创建两个线程,一个负责生产数据,一个负责消费数据
    pthread_create(&a, NULL, consumer, (void*)&rq);
    pthread_create(&b, NULL, producer, (void*)&rq);

    pthread_join(a, NULL);
    pthread_join(b, NULL);
}

运行结果:

基于信号量的生产者消费者模型的效率要大于基于条件变量和互斥锁的生产者消费者模型。因为在这个模型中,生产者和消费者可以同时访问临界资源。

显示全文