转载

Python网络爬虫4 – 多线程抓取

之前的内容已经大致实现了如何获取网页、分析网页、获取目标内容。接下来的目标是如何让网页抓取进行得更效率些。在进行抓取的时候,时间的消耗主要是在请求等待的时间上,所以一个最容易想到的优化方式就是使用多线程。

多线程

多线程的实现还是比较简单的,下面是一个简单的线程实现方案:

#!python
# encoding: utf-8
from threading import Thread
 
 
def func():
    print("this is multi thread")
 
 
def start():
    t = Thread(target=func)
    t.start()
 
 
if __name__ == '__main__':
    start()

就是这么简单。

线程池

在抓取网页的时候,一个简单的思路就是为每个网页启动一个线程。在要抓取的网页比较少的时候——比如百十来个——这样子还是可行的。但是网页比较多的时候,这样做就不太合理了。因为线程的创建启动和运行都会消耗很多的资源,线程启动太多会耗尽资源导致机器卡死。而且,创建线程后只执行一次也是一种浪费。为了减少线程的创建、实现线程的重复利用,我们需要引入线程池。

可以使用python的ThreadPoolExecutor来创建线程池:

#!python
# encoding: utf-8
from concurrent.futuresimport ThreadPoolExecutor
 
 
def func():
    print("this is multi thread")
 
 
def start_pool():
    pool = ThreadPoolExecutor(64)
    for i in range(10):
        pool.submit(func)
 
 
if __name__ == '__main__':
    start_pool()

也很简单,甚至不比单线程多一行代码。在代码里创建了一个总数为64的线程池,然后在一个循环中每次取出一条线程来执行func函数,没有空闲线程时则会进入等待。

按照这样的思路,我们也可以使用Queue来自己创建线程池:

class CustomThread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.__queue = queue
 
    def run(self):
        while True:
            cmd = self.__queue.get()
            cmd()
            self.__queue.task_done()
 
 
def custom_pool():
    queue = Queue(5)
    for i in range(queue.maxsize):
        t = CustomThread(queue)
        t.setDaemon(True)
        t.start()
 
    for i in range(20):
        queue.put(func)
    queue.join()

在上面的代码里创建了一个长度为5的队列,然后在队列里入了几个线程,并立刻启动。每个线程随时待命,一旦队列里面有了要执行的任务就会立即执行,并在执行完成后发送通知给队列。queue.join()方法则会在队列中的所有任务执行完成前阻塞住线程,待所有任务执行完成后再继续执行后面的代码。

并行

前面所述的方案是并发处理的方案。并发处理的方案可以充分利用CPU。不过现在的CPU通常都是多核的,为了利用多核CPU的特点,可以考虑使用并行处理的方案。下面是一个进程的演示:

#!python
# encoding: utf-8
from multiprocessing import Process
 
 
def process():
    p = Process(target=func)
    p.start()

当然也有对应的进程池了:

#!python
# encoding: utf-8
from multiprocessing.pool import Pool
 
from multiprocessing import Process
 
 
def func():
    print("this is multi thread")
 
 
def process_pool():
    pool = Pool(processes=3)
    for i in range(6):
        pool.apply_async(func)
    pool.close()
    pool.join()

在我使用爬虫的经历中极少会用到多进程或进程池。不过,既然说起来了就得说个全套,所以才耐着性子把这块儿写完。

很多人可能就是因为要学习爬虫才开始看Python的。不过看了python却只知道爬虫未免有些可惜,我还是希望能够多接触到一些较深入的东西的。多线程和多进程是学习Python的经历中无论如何也不应该绕过的部分,如果不想浅尝而止的话,还是建议多看看。

完整的抓取程序在下面。程序写于半年前,有很多不成熟的地方,也没心思修改了,凑合看吧。因为代码太长所以折叠了起来:

#!python
# encoding: utf8
# zhyea.com
from urllib.requestimport Request
from urllib import parse
import urllib.request
import threading
from bs4import BeautifulSoup
from queue import Queue
 
 
class Client:
    def __init__(self):
        self.__TIMEOUT = 300
        self.__CHARSET = "utf8"
        self.__HEADERS = {"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:43.0) Gecko/20100101 Firefox/43.0"}
 
    def get(self, url):
        request = Request(url, headers=self.__HEADERS)
        response = urllib.request.urlopen(request, timeout=self.__TIMEOUT)
        content = response.read().decode(self.__CHARSET)
        response.close()
        return content
 
    def post(self, url, **paras):
        data = parse.urlencode(paras).encode(self.__CHARSET)
        request = Request(url, data, headers=self.__HEADERS)
        response = urllib.request.urlopen(request, timeout=self.__TIMEOUT)
        content = response.read().decode(self.__CHARSET)
        response.close()
        return content
 
 
class GrabThread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.__queue = queue
 
    def run(self):
        while True:
            cmd, arg = self.__queue.get()
            cmd(arg)
            self.__queue.task_done()
 
 
class Grabber:
    def __init__(self):
        self.__URL_SEARCH = "http://www.xxxxxxx.net/search/"
        self.__SAVE_PATH = "D://xxxx.log"
        self.__OUTPUT = open(self.__SAVE_PATH, 'a', encoding='utf-8')
        self.__queue = Queue()
 
    def __write(self, title, magnent):
        self.__OUTPUT.write(
            "".join(["/n", title, "/n", magnent, "/n", "---------------------------------------------"]))
 
    def __grab_anchor(self, url):
        content = Client().get(url);
        magnet_anchors = BeautifulSoup(content, "html.parser").select('a[href^="magnet:?xt"]')
        for i in range(len(magnet_anchors)):
            self.__write(magnet_anchors[i].attrs['title'], magnet_anchors[i].attrs['href'])
 
    def __grab(self, key):
        content = Client().post(self.__URL_SEARCH, q=parse.quote(key));
        page_anchors = BeautifulSoup(content, "html.parser").select(".pagination > a")
        if len(page_anchors) == 0:
            self.__queue.put((self.__grabAnchor, "".join([self.__URL_SEARCH, "/", key])))
        else:
            page_total = int(page_anchors[-2].get_text())
            for i in range(page_total):
                self.__queue.put((self.__grabAnchor, "".join([self.__URL_SEARCH, "/", key, "/", "%d" % i])))
        self.__queue.join()
 
    def start(self):
        for i in range(32):
            t = GrabThread(self.__queue)
            t.daemon = True
            t.start()
 
        keys = ["key1", "key2"]
        for i in range(len(keys)):
            self.__grab(keys[i])
        self.__OUTPUT.close()
 
 
if __name__ == "__main__":
    Grabber().start()

参考文档

  • python cookbook: http://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p07_creating_thread_pool.html
  • Python 并发编程之使用多线程和多处理器: http://developer.51cto.com/art/201405/438178.htm

####################

原文  http://www.zhyea.com/2016/08/13/python-spider-4-multi-thread.html
正文到此结束
Loading...