生产者消费者的问题及其解决办法
问题
在之前的生产者消费者模型中,生产者和消费者只有一个,
那么生产者往队列里put几次,消费者就get几次,但是存在一个问题,
生产者不一定只有一个,消费者也不一定只有一个,那么怎么确定生产者进程结束了,
从而让消费者进程跟着一起结束成为一个问题(不然消费者还是一直处于get的阻塞状态,无法终止)
解决办法
1、通过joinablequeue类实例化出来的对象来代替之前queue实例化产生的进程间的队列
2、q.join() 和 q.task_done()方法配合使用
q.join方法可以让主进程进入阻塞状态,满足传入队列的数据put次数等于q.task_done传过来的信号次数才可以接着往下执行,
3、所以为了让所有的生产者能生产完毕,主进程需要等这些子进程完毕才可以进入q.join的阻塞状态
4、主进程等待子进程可以用到Process中的join方法,
5、可以通过p.daemon = True把消费者进程设置为主进程的守护进程,那么主进程代码执行完,守护进程也会结束
从而解决上面不知道生产者生产完毕而消费者不知道一直等待取值的问题
import os from multiprocessing import JoinableQueue,Process def producer(q): for i in range(5): food = "第%s个包子"%i q.put(food) print("%s店%s做好了" % (os.getpid(),food)) def consumer(q): while True: food = q.get() print("%s吃了%s"%(os.getpid(),food)) q.task_done() if __name__ == '__main__': q = JoinableQueue() # 生产者 p1 = Process(target=producer,args=(q,)) p2 = Process(target=producer,args=(q,)) # 消费者 c1 = Process(target=consumer,args=(q,)) p1.start() p2.start() c1.daemon = True # 把消费者设置成主进程的守护进程,那么主进程代码执行完守护进程也会结束 c1.start() p1.join() p2.join() # 为了让主进程等待生产的子进程执行完毕 q.join() # 如果队列里的put数据个数和接收到的task_done信号次数不一致,主进程就会卡在这里,如果相同就会接着往下执行 # 走完q.join之后主进程代码就执行完了,c1的守护进程也会结束,不设置守护进程就会一直卡在接收
线程的定义以及线程与进程的关系
线程的定义
线程是操作系统最小的运算调度单位,被包含在进程中,一个线程就是一个固定的执行流程
线程和进程的关系
线程不能单独存在 必须存在于进程中
进程是一个资源单位,其包含了运行程序所需的所有资源
线程才是真正的执行单位
没有线程,进程中的资源无法被利用起来,所以一个进程至少包含一个线程,称之为主线程
当我们启动一个程序时,操作系统就会自己为这个程序创建一个主线程
线程可以由程序后期开启,自己开启线程称之为子线程(线程之间是平等的,并无子父之分)
为什么需要线程
目的只有一个,就是提高效率
就像一个车间,如果产量跟不上,就需要再造一条流水线
当然可以再造一个新的车间,那就需要把原车间的资源全部运一份过去,这个过程非常耗时
所以通常情况下是创建新的流水线 而不是车间 ,即造线程而不是造进程
如何使用
开启线程的方法和开启进程的方法基本一致,只不过开启进程是Process类,而开启线程是Thread类
开启进程的代码必须放在自执行判断下面,因为开启子进程会重新加载一遍代码,造成递归创建的问题
开启线程的代码可以放在任意位置,因为他们同一进程中,不会重新加载
import os import time from threading import Thread,current_thread,enumerate # 第一种方法,直接实例化 def task(): print("%s子线程 run" % os.getpid()) # 这里查看的进程的pid time.sleep(10) print("%s子线程 over"%current_thread()) # 这里查看的是当前的线程信息 t = Thread(target=task) # 没有必要放在自执行判断中,不会重新加载 t.start() # 第二种方法,继承Thread类,覆盖run方法 class MyThread(Thread): def run(self): print("%s子线程 run" % os.getpid()) time.sleep(10) print("%s子线程 over" % current_thread()) t = MyThread() t.start() print(enumerate()) # enumerate()会把当进程中前存活的线程添加到列表中
线程的特点
1、创建开销小
创建一个线程的速度对比进程来说是非常快的,因为他的开销是比较小的
import time from multiprocessing import Process from threading import Thread def task(): pass if __name__ == '__main__': now = time.time() ls = [] # for i in range(100): # p = Process(target=task()) # # 8.24426531791687 # p.start() # ls.append(p) for i in range(100): t = Thread(target=task) # 0.021970272064208984 t.start() ls.append(t) for m in ls: # 为了让所有的子进程结束 m.join() print(time.time() - now) # 可以看出,同样并发执行100个任务,速度差距非常大
2、同一个进程中的多个线程数据是共享的
用的都是同一个进程里的数据,所以可以同时操作同一个资源
from threading import Thread num = 10 def task(): global num num -= 1 for i in range(10): t = Thread(target=task) t.start() print(num) # 结果是0 ,可以看出在同一进程中,线程之间的资源是相互共享的 # 还可以看出线程的执行开启速度非常快,如果慢,可能会出现先打印,然后再该数据的情况
3、多个线程之间,是平等的没有父子关系 ,所有线程的PID都是相同的(属于同一个进程)
守护线程
一个线程可以设置为另一个线程的守护线程
设置守护线程也是通过 t.daemon = True 来设置成为的
特点是:
被守护的线程如果结束后,那么守护线程也会跟着结束
守护线程会等到所有非守护线程结束后结束 ,前提是除了主线程之外 ,还有别的非守护线程
当然,如果守护线程已经执行完毕,就会立马结束
在这里要区别一些守护线程和守护进程结束的条件
守护线程:
在主线程结束后结束(如果有非守护线程,主线程会等子线程结束后才算结束)
守护进程:
在主线程代码执行完毕后结束(如果有非守护的子进程,主进程也会等所有子进程结束完替他们收完尸后结束)
import time from threading import Thread def task1(): print("我是守护线程") time.sleep(2) print("我守护线程死了") def task2(): print("我是普通子线程") time.sleep(3) print("我,普通子线程死了") if __name__ == '__main__': t = Thread(target=task1) t2 = Thread(target=task2) # 如果没有t2子线程,那就只有一个守护线程,那么主线程执行完毕结束,守护进程也跟着结束 # 但是如果存在非守护线程的子线程,主线程就会等待它结束后而结束,那么此时守护线程也可以执行 t.daemon = True t.start() t2.start()
线程互斥锁
之前提到,线程之间有一个特点,就是在同一个进程中,所有线程是共享同一个资源的
共享就意味着会产生竞争问题,就会产生安全问题
多线程可以并发执行,一旦并发且同时访问了同一个资源就会出现问题
解决的方案还是互斥锁
import time from threading import Thread, enumerate, current_thread, Lock num = 10 lock = Lock() # 之前给进程访问同一资源的时候锁是在__main__中生成锁 # 那样才能保证是同一把锁,但是线程访问同一资源不会加载代码,就不会产生多把锁,也不需要放到判断中 def task(): global num lock.acquire() # 加锁,如果不加会出现下述的问题 temp = num # 每个线程过来先拿到10 time.sleep(1) # 睡1秒之后拿着10 - 1 赋值给num,所以下面打印会是9 num = temp - 1 lock.release() # 释放锁 if __name__ == '__main__': for i in range(10): t = Thread(target=task) t.start() for e in enumerate(): if e == current_thread(): # 如果是主线程就跳过去,不然主线程等主线程会报错 continue e.join() print(num) # 之前我们试过代码的结果是0,那是子线程开启运行非常快 # 如果让他修改时间增加,就会产生问题 # 加完锁之后问题就解决了
死锁问题
当程序出现了不止一把锁,分别被不同的线程持有,有一个资源 想要使用必须同时具备两把锁
这个时候程序就会进入无限卡死状态,这就称之为死锁问题
例如:
要吃饭,就必须具备筷子和盘子,但是一个人拿了筷子,另一个人拿了盘子,那就谁也吃不了,进入卡死状态
import time from threading import Thread, Lock lock1 = Lock() # 筷子 lock2 = Lock() # 盘子 def eat1(): lock1.acquire() # 先拿筷子 time.sleep(0.1) # 这里不睡的话两人都会迟到,因为线程开启速度快,第二个人可能没开,第一个人就吃完了 # 加了时间就会一人拿一个锁,进入卡死状态 lock2.acquire() # 再拿盘子 print("真香") lock2.release() lock1.release() def eat2(): lock2.acquire() # 先拿盘子 lock1.acquire() # 再拿筷子 print("真相") lock1.release() lock2.release() if __name__ == '__main__': t1 = Thread(target=eat1) t2 = Thread(target=eat2) t1.start() t2.start()
如何避免死锁问题
锁不要有多个,一个就足够了
如果真的发生了死锁问题,必须迫使一方先交出锁
可重入锁(递归锁)
Rlock 称之为递归锁或者可重入锁
需要注意的是:
Rlock锁不是用来解决死锁问题的
而是用来解决连续两次执行acquire把自己锁死的情况
与lock唯一的区别:
Rlock同一个线程可以执行多次acquire(相当于给锁加了计数,计数为0其他人才能进),
所以执行了几次acquire就应该对应几次release,不然计数不为0,别人也进不去
from threading import Thread,Lock lock = Lock() def task(): lock.acquire() print("进来咯") lock.release() if __name__ == '__main__': lock.acquire() # 主线程如果锁了一次不释放,那么子线程拿到的锁的计数就不为0 ,就进不去 t = Thread(target=task) t.start()
信号量
信号量Semaphore也是一种锁
它可以限制被锁的代码,同时被多少线程并发访问(解决不了安全问题,也不是用来解决安全问题的)
与Lock Rlock的区别
Lock : 相当于锁住一个马桶,同时只能有一个人访问
Semaphore : 相当于锁一个公共厕所,同时可以来一堆人
用途:
仅用于控制并发访问 并不能防止并发造成的问题
import time from threading import Semaphore, Thread s = Semaphore(5) # 里面有个value的属性默认为1,可以自己设置,就是限制并发访问的人数 # 里面限制设置几个就限制了几个并发访问 def task(): s.acquire() time.sleep(1) print("来了老哥") s.release() if __name__ == '__main__': for i in range(10): # 这里循环10次,结果会是5个一组5个一组的访问 t = Thread(target=task) t.start()