您的当前位置:首页正文

Python3 多线程 (全局观知识点,附示例)

2024-11-30 来源:个人技术集锦

整理于2020年10月下旬,献给不甘平凡的你

更多企业级爬虫知识请查收于:

 

 

多线程(英语:multithreading),是指从软件或者硬件上实现多个线程并发执行的技术

一:创建并启动一个线程

#函数模式
import threading

def runtask(name):
  print("%s线程1已启动"%name)
  print("%s线程2已启动" % name)

  # args因为是一个元组,如果只有一个参数,则参数后面加上,~~~,否则运行将报错
t = threading.Thread(target=runtask,args=("task1,task2",))
t.start()

运行结果:

知识点1:join

等待当前线程执行完毕,才执行下一条

import threading
import time
def runtask(name):
  print("%s线程已启动"%name)
  time.sleep(2)
t = threading.Thread(target=runtask,args=("task1",))
t.start()
t.join()
print("abc")  # 过了2s才会打印,若无等待将看不到等待2s的效果

Trick:join是经常用到

知识点2:setDaemon(True)

将线程设置为守护线程。若设置为守护线程,主线程结束后,子线程也将结束,并且主线程不会理会子线程是否结束,主线程不会等待子线程结束完后才结束。若没有设置为守护线程,主线程会等待子线程结束后才会结束。

知识点3:active_count

程序的线程数量,数量=主线程+子线程数量

知识点4:Lock(互斥锁)

Python编程中,引入了对象互斥锁的概念,来保证共享数据操作的完整性。每个对象都对应于一个可称为” 互斥锁” 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象。在Python中我们使用threading模块提供的Lock类。

import threading,time

def runtask(name):
    global count#引用全局变量时,不需要global声明;但是后面使用或者修改这个全局变量的时候,需要global声明
    time.sleep(1)
    lock.acquire()   # 获取锁资源,并返回是否获取成功
    count+=1
    print(name,count)
    lock.release()   # 释放资源
count = 0
lock = threading.Lock()   # 互斥锁
for index in range(50):
  t = threading.Thread(target=runtask,args=("thread%d"%index,))
  t.start()

结果如下:

知识点5:RLock(递归锁,可重入锁)

当一个线程中遇到锁嵌套情况该怎么办,又会遇到什么情况?

死锁。使用RLock,RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

import threading,time

def run1():
    global count1
    lock.acquire()
    count1 += 1
    lock.release()
    return count1
def run2():
    global count2
    lock.acquire()
    count2 += 1
    lock.release()
    return count2
def runtask():
    lock.acquire()
    r1 = run1()
    print("="*30)
    r2 = run2()
    lock.release()
    print(r1,r2)
count1,count2 = 0,0
lock = threading.Lock()
for index in range(50):
    t = threading.Thread(target=runtask,)
    t.start()

运行结果:程序卡死,没结果,手动停止

上面的代码只需做一些小小的改动

lock = threading.Lock() 改成lock = threading.RLock(),结果如下:

知识点6:最大可执行线程(threading.BoundedSemaphore(num)

threading.BoundedSemaphore(5)设置可同时执行的最大线程数为5个,后面的线程需排队等待前面的线程执行完毕

import time,threading
def runtask(name):
    global num
    semaphore.acquire()
    time.sleep(1)
    num += 1
    semaphore.release()
    print(name,num)
num = 0
semaphore = threading.BoundedSemaphore(5)
for index in range(50):
    t = threading.Thread(target=runtask,args=("线程%s"%index,))
    t.start()

上面的程序是每次只有5个线程在同时运行,其他线程需等待前面的线程执行完毕,这就是最大可执行线程。

知识点7:Event

Python提供了Event对象用于线程间通信,它是由线程设置的信号标志,如果信号标志位为假,则线程等待直到信号被其他线程设置成真。Event中提供了四个重要的方法来满足基本的需求。

  • - clear:清除标记
  • - set:设置标记
  • - is_set:是否被标记
  • - wait:等待被标记

示例:(死循环)

import threading,time
def lighter():
  num = 0
  event.set()   # 设置标记
  while True:
    if num >= 5 and num < 10:
      event.clear()  # 清除标记
      print("红灯亮起,车辆禁止通行")
    if num >= 10:
      event.set()   # 设置标记
      print("绿灯亮起,车辆可以通行")
      num = 0
    num += 1
    time.sleep(1)
def car():
  while True:
    if event.is_set():
      print("车辆正在跑...")
    else:
      print("车辆停下了")
      event.wait()
    time.sleep(1)
event = threading.Event()
t1 = threading.Thread(target=lighter,)
t2 = threading.Thread(target=car,)
t1.start()
t2.start()

解析:这是一个简单的红灯停绿灯行案例。初始设置为绿灯并标记,车辆看到标记后通行,当红灯亮起的时候取消标记,车辆看到没有标记时停下,等待标记。

知识点8:Queue队列

使任务按照某一种特定顺序有条不紊的进行。下面介绍几种常用的队列:

  • - queue.Queue():先进先出
  • - queue.LifoQueue():先进后出
  • - queue.PriorityQueue:优先级队列,优先级的值越小,越先执行

下面介绍几种常用的方法:

  • - get():获取item,如果队列已经取空将会卡住。可设置timeout参数,给定一个超时的值,或者设置参数block=False,队列空直接抛异常
  • - get_nowait():b获取item。如果队列取空了,将会直接抛异常
  • - put():放入队列
  • - empty():队列是否为空
  • - qsize():获取队列的item数量

Trick:Queue与多线程是我喜欢的搭配

知识点9:

t=Thread(target=func)
 # 启动子线程
t.start()
 # 阻塞子线程,待子线程结束后,再往下执行
t.join()
# 判断线程是否在执行状态,在执行返回True,否则返回False
t.is_alive()
t.isAlive()
# 设置线程是否随主线程退出而退出,默认为False
t.daemon = True
t.daemon = False
# 设置线程名
t.name = "My-Thread"

二:启动线程的两种基本方法:

方式1:函数实现:

import threading
import time
 
 
def main(name):
    print('线程%s 开始' % name)
    time.sleep(5)
    print('线程%s 结束' % name)
 
 
if __name__ == '__main__':
    threads = []
    thread_name = ['1', '2', '3']
    for name in thread_name:
        #利用threading 创建线程,args传递参数,参数指定的一定是一个元组类型

        #基本格式t = threading.Thread(target = 函数名,args = (1,2,) )

        t = threading.Thread(target=main, args=(name,))
        t.start()
        threads.append(t)
    for thread in threads:
        thread.join()

方式2:threading.Thread 模块 继承实现:

import threading
import time

# 核心是类里面覆写run()方法,用这个类的实例对象来调用start()方法


class TestThread(threading.Thread):
    def __init__(self, name):
        super(TestThread, self).__init__()#继承
        self.name = name

    def run(self):
        print('线程%s 开始' % self.name)
        time.sleep(5)
        print('线程%s 结束' % self.name)


def main():
    threads = []
    thread_name = ['1', '2', '3']
    for name in thread_name:
        t = TestThread(name)#name 参数为字符串类型

        threads.append(t)
    for thread in threads:
        thread.start()  # 启动线程
    for thread in threads:
        thread.join()  # 阻塞主线程


if __name__ == '__main__':
    main()

三:爬虫 多线程 实战

# -*- coding: utf-8 -*-
# Author       :   szy

from threading import Thread
from queue import Queue
import time
from lxml import etree
import requests, os

data_path = 'D:\liqinlin'
tsv_file_path = '1.tsv'


class DouBanSpider(Thread):
    def __init__(self, url, q):
        # 重写写父类的__init__方法
        super(DouBanSpider, self).__init__()
        self.url = url
        self.q = q
        self.headers = {

            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36',
        }

    def run(self):
        self.parse_page()

    def send_request(self, url):
        '''
        用来发送请求的方法
        :return: 返回网页源码
        '''
        # 请求出错时,重复请求3次,
        i = 0
        while i <= 3:
            try:
                print(u"[INFO]请求url:" + url)
                ip_host = '******************'
                proxies = { 'http': ip_host, 'https': ip_host }

                html = requests.get(url=url, headers=self.headers, proxies=proxies, verify=False).content
            except Exception as e:
                print(u'[INFO] %s%s' % (e, url))

                i += 1
            else:
                return html

    def parse_page(self):
        '''
        解析网站源码,并采用xpath提取 电影名称和平分放到队列中
        :return:
        '''
        response = self.send_request(self.url)
        score = response
        self.q.put(score)


def all_url():
    res = []
    with open(tsv_file_path, 'r') as f:
        content = f.readlines()
    content = [x.strip() for x in content]
    for x in content:
        res.append(x.split("\t")[0])
    # print(res)
    return res


def main():
    # 创建一个队列用来保存进程获取到的数据
    q = Queue()
    # 构造所有url
    url_list = all_url()
    # 保存线程
    Thread_list = []
    # 创建并启动线程
    for url in url_list:
        p = DouBanSpider(url, q)
        p.start()
        with open(os.path.join(data_path, url.split('/')[-1]), 'wb') as f:
            f.write(q.get())
        Thread_list.append(p)

    # 让主线程等待子线程执行完成
    for i in Thread_list:
        i.join()
    while not q.empty():
        print(q.get())


if __name__ == "__main__":
    start = time.time()
    main()
    print('[info]耗时:%s' % (time.time() - start))

Trick:上面代码现在看是不太完美的,比如可以封装requests即可完成几个函数的功能,欢迎交流

 

 

 

 

显示全文