更多企业级爬虫知识请查收于:
多线程(英语:multithreading),是指从软件或者硬件上实现多个线程并发执行的技术
#函数模式
import threading
def runtask(name):
print("%s线程1已启动"%name)
print("%s线程2已启动" % name)
# args因为是一个元组,如果只有一个参数,则参数后面加上,~~~,否则运行将报错
t = threading.Thread(target=runtask,args=("task1,task2",))
t.start()
运行结果:
知识点1:join
等待当前线程执行完毕,才执行下一条
import threading
import time
def runtask(name):
print("%s线程已启动"%name)
time.sleep(2)
t = threading.Thread(target=runtask,args=("task1",))
t.start()
t.join()
print("abc") # 过了2s才会打印,若无等待将看不到等待2s的效果
Trick:join是经常用到
知识点2:setDaemon(True)
将线程设置为守护线程。若设置为守护线程,主线程结束后,子线程也将结束,并且主线程不会理会子线程是否结束,主线程不会等待子线程结束完后才结束。若没有设置为守护线程,主线程会等待子线程结束后才会结束。
知识点3:active_count
程序的线程数量,数量=主线程+子线程数量
知识点4:Lock(互斥锁)
Python编程中,引入了对象互斥锁的概念,来保证共享数据操作的完整性。每个对象都对应于一个可称为” 互斥锁” 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象。在Python中我们使用threading模块提供的Lock类。
import threading,time
def runtask(name):
global count#引用全局变量时,不需要global声明;但是后面使用或者修改这个全局变量的时候,需要global声明
time.sleep(1)
lock.acquire() # 获取锁资源,并返回是否获取成功
count+=1
print(name,count)
lock.release() # 释放资源
count = 0
lock = threading.Lock() # 互斥锁
for index in range(50):
t = threading.Thread(target=runtask,args=("thread%d"%index,))
t.start()
结果如下:
知识点5:RLock(递归锁,可重入锁)
当一个线程中遇到锁嵌套情况该怎么办,又会遇到什么情况?
死锁。使用RLock,RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
import threading,time
def run1():
global count1
lock.acquire()
count1 += 1
lock.release()
return count1
def run2():
global count2
lock.acquire()
count2 += 1
lock.release()
return count2
def runtask():
lock.acquire()
r1 = run1()
print("="*30)
r2 = run2()
lock.release()
print(r1,r2)
count1,count2 = 0,0
lock = threading.Lock()
for index in range(50):
t = threading.Thread(target=runtask,)
t.start()
运行结果:程序卡死,没结果,手动停止
上面的代码只需做一些小小的改动
将lock
=
threading.Lock() 改成lock = threading.RLock(),结果如下:
知识点6:最大可执行线程(threading.BoundedSemaphore(num)
)
threading.BoundedSemaphore(5)
设置可同时执行的最大线程数为5个,后面的线程需排队等待前面的线程执行完毕
import time,threading
def runtask(name):
global num
semaphore.acquire()
time.sleep(1)
num += 1
semaphore.release()
print(name,num)
num = 0
semaphore = threading.BoundedSemaphore(5)
for index in range(50):
t = threading.Thread(target=runtask,args=("线程%s"%index,))
t.start()
上面的程序是每次只有5个线程在同时运行,其他线程需等待前面的线程执行完毕,这就是最大可执行线程。
知识点7:Event
Python提供了Event对象用于线程间通信,它是由线程设置的信号标志,如果信号标志位为假,则线程等待直到信号被其他线程设置成真。Event中提供了四个重要的方法来满足基本的需求。
示例:(死循环)
import threading,time
def lighter():
num = 0
event.set() # 设置标记
while True:
if num >= 5 and num < 10:
event.clear() # 清除标记
print("红灯亮起,车辆禁止通行")
if num >= 10:
event.set() # 设置标记
print("绿灯亮起,车辆可以通行")
num = 0
num += 1
time.sleep(1)
def car():
while True:
if event.is_set():
print("车辆正在跑...")
else:
print("车辆停下了")
event.wait()
time.sleep(1)
event = threading.Event()
t1 = threading.Thread(target=lighter,)
t2 = threading.Thread(target=car,)
t1.start()
t2.start()
解析:这是一个简单的红灯停绿灯行案例。初始设置为绿灯并标记,车辆看到标记后通行,当红灯亮起的时候取消标记,车辆看到没有标记时停下,等待标记。
知识点8:Queue队列
使任务按照某一种特定顺序有条不紊的进行。下面介绍几种常用的队列:
queue.Queue()
:先进先出queue.LifoQueue()
:先进后出queue.PriorityQueue
:优先级队列,优先级的值越小,越先执行下面介绍几种常用的方法:
get()
:获取item,如果队列已经取空将会卡住。可设置timeout参数,给定一个超时的值,或者设置参数block=False,队列空直接抛异常get_nowait()
:b获取item。如果队列取空了,将会直接抛异常put()
:放入队列empty()
:队列是否为空qsize()
:获取队列的item数量Trick:Queue与多线程是我喜欢的搭配
知识点9:
t=Thread(target=func)
# 启动子线程
t.start()
# 阻塞子线程,待子线程结束后,再往下执行
t.join()
# 判断线程是否在执行状态,在执行返回True,否则返回False
t.is_alive()
t.isAlive()
# 设置线程是否随主线程退出而退出,默认为False
t.daemon = True
t.daemon = False
# 设置线程名
t.name = "My-Thread"
方式1:函数实现:
import threading
import time
def main(name):
print('线程%s 开始' % name)
time.sleep(5)
print('线程%s 结束' % name)
if __name__ == '__main__':
threads = []
thread_name = ['1', '2', '3']
for name in thread_name:
#利用threading 创建线程,args传递参数,参数指定的一定是一个元组类型
#基本格式t = threading.Thread(target = 函数名,args = (1,2,) )
t = threading.Thread(target=main, args=(name,))
t.start()
threads.append(t)
for thread in threads:
thread.join()
方式2:threading.Thread 模块 继承实现:
import threading
import time
# 核心是类里面覆写run()方法,用这个类的实例对象来调用start()方法
class TestThread(threading.Thread):
def __init__(self, name):
super(TestThread, self).__init__()#继承
self.name = name
def run(self):
print('线程%s 开始' % self.name)
time.sleep(5)
print('线程%s 结束' % self.name)
def main():
threads = []
thread_name = ['1', '2', '3']
for name in thread_name:
t = TestThread(name)#name 参数为字符串类型
threads.append(t)
for thread in threads:
thread.start() # 启动线程
for thread in threads:
thread.join() # 阻塞主线程
if __name__ == '__main__':
main()
# -*- coding: utf-8 -*-
# Author : szy
from threading import Thread
from queue import Queue
import time
from lxml import etree
import requests, os
data_path = 'D:\liqinlin'
tsv_file_path = '1.tsv'
class DouBanSpider(Thread):
def __init__(self, url, q):
# 重写写父类的__init__方法
super(DouBanSpider, self).__init__()
self.url = url
self.q = q
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36',
}
def run(self):
self.parse_page()
def send_request(self, url):
'''
用来发送请求的方法
:return: 返回网页源码
'''
# 请求出错时,重复请求3次,
i = 0
while i <= 3:
try:
print(u"[INFO]请求url:" + url)
ip_host = '******************'
proxies = { 'http': ip_host, 'https': ip_host }
html = requests.get(url=url, headers=self.headers, proxies=proxies, verify=False).content
except Exception as e:
print(u'[INFO] %s%s' % (e, url))
i += 1
else:
return html
def parse_page(self):
'''
解析网站源码,并采用xpath提取 电影名称和平分放到队列中
:return:
'''
response = self.send_request(self.url)
score = response
self.q.put(score)
def all_url():
res = []
with open(tsv_file_path, 'r') as f:
content = f.readlines()
content = [x.strip() for x in content]
for x in content:
res.append(x.split("\t")[0])
# print(res)
return res
def main():
# 创建一个队列用来保存进程获取到的数据
q = Queue()
# 构造所有url
url_list = all_url()
# 保存线程
Thread_list = []
# 创建并启动线程
for url in url_list:
p = DouBanSpider(url, q)
p.start()
with open(os.path.join(data_path, url.split('/')[-1]), 'wb') as f:
f.write(q.get())
Thread_list.append(p)
# 让主线程等待子线程执行完成
for i in Thread_list:
i.join()
while not q.empty():
print(q.get())
if __name__ == "__main__":
start = time.time()
main()
print('[info]耗时:%s' % (time.time() - start))
Trick:上面代码现在看是不太完美的,比如可以封装requests即可完成几个函数的功能,欢迎交流