之前的内容已经大致实现了如何获取网页、分析网页、获取目标内容。接下来的目标是如何让网页抓取进行得更效率些。在进行抓取的时候,时间的消耗主要是在请求等待的时间上,所以一个最容易想到的优化方式就是使用多线程。
多线程的实现还是比较简单的,下面是一个简单的线程实现方案:
#!python
# encoding: utf-8
from threading import Thread
def func():
print("this is multi thread")
def start():
t = Thread(target=func)
t.start()
if __name__ == '__main__':
start()
就是这么简单。
在抓取网页的时候,一个简单的思路就是为每个网页启动一个线程。在要抓取的网页比较少的时候——比如百十来个——这样子还是可行的。但是网页比较多的时候,这样做就不太合理了。因为线程的创建启动和运行都会消耗很多的资源,线程启动太多会耗尽资源导致机器卡死。而且,创建线程后只执行一次也是一种浪费。为了减少线程的创建、实现线程的重复利用,我们需要引入线程池。
可以使用python的ThreadPoolExecutor来创建线程池:
#!python
# encoding: utf-8
from concurrent.futuresimport ThreadPoolExecutor
def func():
print("this is multi thread")
def start_pool():
pool = ThreadPoolExecutor(64)
for i in range(10):
pool.submit(func)
if __name__ == '__main__':
start_pool()
也很简单,甚至不比单线程多一行代码。在代码里创建了一个总数为64的线程池,然后在一个循环中每次取出一条线程来执行func函数,没有空闲线程时则会进入等待。
按照这样的思路,我们也可以使用Queue来自己创建线程池:
class CustomThread(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.__queue = queue def run(self): while True: cmd = self.__queue.get() cmd() self.__queue.task_done() def custom_pool(): queue = Queue(5) for i in range(queue.maxsize): t = CustomThread(queue) t.setDaemon(True) t.start() for i in range(20): queue.put(func) queue.join()
在上面的代码里创建了一个长度为5的队列,然后在队列里入了几个线程,并立刻启动。每个线程随时待命,一旦队列里面有了要执行的任务就会立即执行,并在执行完成后发送通知给队列。queue.join()方法则会在队列中的所有任务执行完成前阻塞住线程,待所有任务执行完成后再继续执行后面的代码。
前面所述的方案是并发处理的方案。并发处理的方案可以充分利用CPU。不过现在的CPU通常都是多核的,为了利用多核CPU的特点,可以考虑使用并行处理的方案。下面是一个进程的演示:
#!python # encoding: utf-8 from multiprocessing import Process def process(): p = Process(target=func) p.start()
当然也有对应的进程池了:
#!python
# encoding: utf-8
from multiprocessing.pool import Pool
from multiprocessing import Process
def func():
print("this is multi thread")
def process_pool():
pool = Pool(processes=3)
for i in range(6):
pool.apply_async(func)
pool.close()
pool.join()
在我使用爬虫的经历中极少会用到多进程或进程池。不过,既然说起来了就得说个全套,所以才耐着性子把这块儿写完。
很多人可能就是因为要学习爬虫才开始看Python的。不过看了python却只知道爬虫未免有些可惜,我还是希望能够多接触到一些较深入的东西的。多线程和多进程是学习Python的经历中无论如何也不应该绕过的部分,如果不想浅尝而止的话,还是建议多看看。
完整的抓取程序在下面。程序写于半年前,有很多不成熟的地方,也没心思修改了,凑合看吧。因为代码太长所以折叠了起来:
#!python
# encoding: utf8
# zhyea.com
from urllib.requestimport Request
from urllib import parse
import urllib.request
import threading
from bs4import BeautifulSoup
from queue import Queue
class Client:
def __init__(self):
self.__TIMEOUT = 300
self.__CHARSET = "utf8"
self.__HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:43.0) Gecko/20100101 Firefox/43.0"}
def get(self, url):
request = Request(url, headers=self.__HEADERS)
response = urllib.request.urlopen(request, timeout=self.__TIMEOUT)
content = response.read().decode(self.__CHARSET)
response.close()
return content
def post(self, url, **paras):
data = parse.urlencode(paras).encode(self.__CHARSET)
request = Request(url, data, headers=self.__HEADERS)
response = urllib.request.urlopen(request, timeout=self.__TIMEOUT)
content = response.read().decode(self.__CHARSET)
response.close()
return content
class GrabThread(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.__queue = queue
def run(self):
while True:
cmd, arg = self.__queue.get()
cmd(arg)
self.__queue.task_done()
class Grabber:
def __init__(self):
self.__URL_SEARCH = "http://www.xxxxxxx.net/search/"
self.__SAVE_PATH = "D://xxxx.log"
self.__OUTPUT = open(self.__SAVE_PATH, 'a', encoding='utf-8')
self.__queue = Queue()
def __write(self, title, magnent):
self.__OUTPUT.write(
"".join(["/n", title, "/n", magnent, "/n", "---------------------------------------------"]))
def __grab_anchor(self, url):
content = Client().get(url);
magnet_anchors = BeautifulSoup(content, "html.parser").select('a[href^="magnet:?xt"]')
for i in range(len(magnet_anchors)):
self.__write(magnet_anchors[i].attrs['title'], magnet_anchors[i].attrs['href'])
def __grab(self, key):
content = Client().post(self.__URL_SEARCH, q=parse.quote(key));
page_anchors = BeautifulSoup(content, "html.parser").select(".pagination > a")
if len(page_anchors) == 0:
self.__queue.put((self.__grabAnchor, "".join([self.__URL_SEARCH, "/", key])))
else:
page_total = int(page_anchors[-2].get_text())
for i in range(page_total):
self.__queue.put((self.__grabAnchor, "".join([self.__URL_SEARCH, "/", key, "/", "%d" % i])))
self.__queue.join()
def start(self):
for i in range(32):
t = GrabThread(self.__queue)
t.daemon = True
t.start()
keys = ["key1", "key2"]
for i in range(len(keys)):
self.__grab(keys[i])
self.__OUTPUT.close()
if __name__ == "__main__":
Grabber().start()
####################