转载

爬虫入门——02

1. 引言

上一篇 中,我们简单的了解了爬虫的工作流程,也简单的实现了一个爬虫,并且在文末简单分析了目前存在的问题。这一篇博客将会对上一篇分析出的问题,给出改进方法。我们将从以下几个方面加以改进。

2. 改进

(1) Bloom Filter

我们首先利用Bloom Filet来改进UrlQueue中的visitedSet。

在上一篇中,我们使用visitedSet(HashSet)来存放已经访问过的url。之所以使用HashSet是因为我们需要不断的插入url到visitedSet中,并且还需要频繁地判断某个url是否在其中,而采用Hash Table,在平均情况下,所有的字典操作在O(1)时间内都能完成(具体分析请看 散列表(hash table)——算法导论(13) )。但不足之处在于我们需要花费大量的内存空间去维护hash table,我们是否可以减小它的空间复杂度呢?

从visitedSet的作用入手,它只是用来判断某个url是否被包含在它内部,仅此而已。因此完全没有必要保存每个url的完整信息,保存指纹信息即可。这时我们可想到常用的md5和sha1摘要算法。但尽管对url做了压缩,我们还是需要去保存压缩后的信息。我们还有没有更好的方法呢?这时,Bloom Filter就派上用场了(关于Bloom Filter的介绍及实现在 散列表(hash table)——算法导论(13) 中的最后一小节)。

(2) Berkeley DB

我们再来使用Berkeley DB改进我们UrlQueue中的unvisitedList

Berkeley DB是一个嵌入式数据库系统,简单、小巧、性能高(简单小巧没得说,至于性能,没验证过)。关于Berkeley DB的下载和使用请到其官网: http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/overview/index.html

使用Berkeley DB后,我们会将从页面解析出的url直接存入DB中,而unvisitedList只是作为从DB中取url时的缓冲池。即我们会开启一个线程以一定的频率从DB中读取一定数量的url到unvisitedList中,执行页面请求的线程还是从unvisitedList读取url。

(3). 多线程

最后我们引入多线程来提高爬虫的效率。

多线程的关键在于同步与通信。这些内容请自行百度。

改进后的整个结构图如下:

爬虫入门——02

3. 实现

(1) 代码

最后我们给出改进后的代码:

① 首先是改进后的UrlQueue.java,我们重命名为BloomQueue.java(其中的BloomFilter类,在 散列表(hash table)——算法导论(13) 中可找到)

public class BloomQueue<T> {  private BloomFilter<T> bloomFilter;  private LinkedBlockingQueue<T> visitedList;  private AtomicInteger flowedCount;  private int queueCapacity;  public BloomQueue() {   this(0.000001, 10000000, 500);  }  public BloomQueue(double falsePositiveProbability, int filterCapacity, int queueCapacity) {   this.queueCapacity = queueCapacity;   bloomFilter = new BloomFilter<>(falsePositiveProbability, filterCapacity);   visitedList = new LinkedBlockingQueue<>(queueCapacity);   flowedCount = new AtomicInteger(0);  }  /**   * 入队(当无法入队时,默认阻塞3秒)   *    * @param t   * @return   */  public boolean enqueue(T t) {   return enqueue(t, 3000);  }  /**   * 入队   *    * @param t   * @param timeout   *   单位为毫秒   */  public boolean enqueue(T t, long timeout) {   try {    boolean result = visitedList.offer(t, timeout, TimeUnit.MILLISECONDS);    if (result) {     bloomFilter.add(t);     flowedCount.getAndIncrement();    }    return result;   } catch (InterruptedException e) {    e.printStackTrace();   }   return false;  }  /**   * 出队(当队列为空时,默认会阻塞3秒)   *    * @return   */  public T dequeue() {   return dequeue(3000);  }  /**   * 出队   *    * @param timeout   *   单位为毫秒   * @return   */  public T dequeue(long timeout) {   try {    return visitedList.poll(timeout, TimeUnit.MILLISECONDS);   } catch (InterruptedException e) {   }   return null;  }  /**   * 当前是否包含   *    * @return   */  public boolean contains(T t) {   return visitedList.contains(t);  }  /**   * 曾经是否包含   *    * @param t   * @return   */  public boolean contained(T t) {   return bloomFilter.contains(t);  }  public boolean isEmpty() {   return visitedList.isEmpty();  }  public boolean isFull() {   return visitedList.size() == queueCapacity;  }  public int size() {   return visitedList.size();  }  public int flowedCount() {   return flowedCount.get();  }  @Override  public String toString() {   return visitedList.toString();  } } 

② 然后我们对Berkeley DB做一个简单的封装,便于使用。

public class DBHelper<T> {  public static final String DEFAULT_DB_DIR = "C:/Users/Administrator/Desktop/db/";  public static final String DEFAULT_Entity_Store = "EntityStore";  public Environment myEnv;  public EntityStore store;  public PrimaryIndex<Long, T> primaryIndex;  public DBHelper(Class<T> clazz) {   this(clazz, DEFAULT_DB_DIR, DEFAULT_Entity_Store, false);  }  public DBHelper(Class<T> clazz, String dbDir, String storeName, boolean isRead) {   File dir = new File(dbDir);   if (!dir.exists()) {    dir.mkdirs();   }   EnvironmentConfig envConfig = new EnvironmentConfig();   envConfig.setAllowCreate(!isRead);   // Environment   myEnv = new Environment(dir, envConfig);   // StoreConfig   StoreConfig storeConfig = new StoreConfig();   storeConfig.setAllowCreate(!isRead);   // store   store = new EntityStore(myEnv, storeName, storeConfig);   // PrimaryIndex   primaryIndex = store.getPrimaryIndex(Long.class, clazz);  }  public void put(T t) {   primaryIndex.put(t);   store.sync();   myEnv.sync();  }  public EntityCursor<T> entities() {   return primaryIndex.entities();  }  public T get(long key) {   return primaryIndex.get(key);  }  public void close() {   if (store != null) {    store.close();   }   if (myEnv != null) {    myEnv.cleanLog();    myEnv.close();   }  } } 

③ 接着我们写一个url的entity,便于存储。

import com.sleepycat.persist.model.Entity; import com.sleepycat.persist.model.PrimaryKey; @Entity public class Url {  @PrimaryKey(sequence = "Sequence_Namespace")  private long id;  private String url;  public Url() {  }  public Url(String url) {   super();   this.url = url;  }  public long getId() {   return id;  }  public void setId(long id) {   this.id = id;  }  public String getUrl() {   return url;  }  public void setUrl(String url) {   this.url = url;  }  @Override  public String toString() {   return url;  }  public boolean isEmpty() {   return url == null || url.isEmpty();  } } 

④最后是我们的核心类CrawlerEngine。该类有两个内部类:Feeder和Fetcher。Feeder意为饲养员、进料器,负责向urlQueue中添加url;Fetcher意为抓取者,负责从urlQueue中取出url,进行请求,解析。其中用到的JsoupDownLoader类和上一篇一样,保持不变。

public class CrawlerEngine {  public static final String DEFAULT_SAVE_DIR = "C:/Users/Administrator/Desktop/html/";  private static final long FEEDER_SLEEP_TIME = 10;  private static final long FEEDER_MAX_WAIT_TIME = 3 * 1000;// 当DB中取不到url时,feeder最长等待时间(即如果等待该时间后,DB还是为空,则feeder结束工作)  private static final int FEEDER_MAX_WAIT_COUNT = (int) (FEEDER_MAX_WAIT_TIME / FEEDER_SLEEP_TIME);// 当DB中取不到url时,feeder最长等待时间(即如果等待该时间后,DB还是为空,则feeder结束工作)  private static final boolean LOG = false;  private BloomQueue<Url> urlQueue;  private ExecutorService fetcherPool;  private int fetcherCount;  private boolean running;  private DBHelper<Url> dbHelper;  private JsoupDownloader downloader;  private String parseRegex;  private String saveRegex;  private String saveDir;  private String saveName;  private long maxCount = 1000;  private long startTime;  private long endTime = Long.MAX_VALUE;  public CrawlerEngine() {   this(20, DEFAULT_SAVE_DIR, null);  }  public CrawlerEngine(int fetcherCount, String saveDir, String saveName) {   this.fetcherCount = fetcherCount;   urlQueue = new BloomQueue<>();   fetcherPool = Executors.newFixedThreadPool(fetcherCount);   dbHelper = new DBHelper<>(Url.class);   downloader = JsoupDownloader.getInstance();   this.saveDir = saveDir;   this.saveName = saveName;  }  public void startUp(String[] seeds) {   if (running) {    return;   }   running = true;   startTime = System.currentTimeMillis();   for (String seed : seeds) {    Url url = new Url(seed);    urlQueue.enqueue(url);   }   for (int i = 0; i < fetcherCount; i++) {    fetcherPool.execute(new Fetcher());   }   new Feeder().start();  }  public void shutdownNow() {   running = false;   fetcherPool.shutdown();  }  public void shutdownAtTime(long time) {   if (time > startTime) {    endTime = time;   }  }  public void shutdownDelayed(long delayed) {   shutdownAtTime(startTime + delayed);  }  public void shutdownAtCount(long count) {   maxCount = count;  }  private boolean isEnd() {   return urlQueue.flowedCount() > maxCount || System.currentTimeMillis() > endTime;  }  private long currId = 1;  private int currWaitCount;  /**   * 饲养员   * <p>   * 从DB中获取一定数量的url到queue   * </p>   *    * @author D.K   *   */  private class Feeder extends Thread {   @Override   public void run() {    while (!isEnd() && running && currWaitCount != FEEDER_MAX_WAIT_COUNT) {     try {      sleep(FEEDER_SLEEP_TIME);      if (urlQueue.isFull()) {       log("Feeder", "队列已满");       continue;      }      Url url = dbHelper.get(currId);      if (url == null) {       currWaitCount++;       log("Feeder", "url为null,currWaitCount = " + currWaitCount);      } else {       while (urlQueue.contained(url)) {        currId++;        url = dbHelper.get(currId);       }       if (url != null) {        log("Feeder", "url准备入队");        urlQueue.enqueue(url);        currId++;        log("Feeder", "url已经入队,currId = " + currId);        currWaitCount = 0;       }      }     } catch (Exception e) {      e.printStackTrace();     }    }    log("Feeder", "执行结束...");    while (true) {     try {      sleep(100);      log("Feeder", "等待Fetcher结束...");     } catch (InterruptedException e) {     }     if (urlQueue.isEmpty()) {      shutdownNow();      System.out.println(">>>>>>>>>>>>爬取结束,共请求了" + urlQueue.flowedCount() + "个页面,用时" + (System.currentTimeMillis() - startTime) + "毫秒<<<<<<<<<<<<");      return;     }    }   }  }  /**   * 抓取者   * <p>   * 从queue中取出url,下载页面,解析页面,并把解析出的新的url添加到DB中   * </p>   *    * @author D.K   *   */  private class Fetcher implements Runnable {   @Override   public void run() {    while (!isEnd() && (running || !urlQueue.isEmpty())) {     log("Fetcher", "开始从队列获取url,size=" + urlQueue.size());     Url url = urlQueue.dequeue();     if (url == null) {      log("Fetcher", "url为null");      continue;     }     log("Fetcher", "取出了url");     Document doc = downloader.downloadPage(url.getUrl());     Set<String> urlSet = downloader.parsePage(doc, parseRegex);     downloader.savePage(doc, saveDir, saveName, saveRegex);     for (String str : urlSet) {      Url u = new Url(str);      if (!urlQueue.contained(u)) {       dbHelper.put(u);      }     }    }   }  }  private void log(String talker, String content) {   if (LOG) {    System.out.println("[" + talker + "] " + content);   }  }  public String getParseRegex() {   return parseRegex;  }  public void setParseRegex(String parseRegex) {   this.parseRegex = parseRegex;  }  public String getSaveRegex() {   return saveRegex;  }  public void setSaveRegex(String saveRegex) {   this.saveRegex = saveRegex;  }  public void setSavePath(String saveDir, String saveName) {   this.saveDir = saveDir;   this.saveName = saveName;  } } 

(2) 测试

我们采用上一篇的测试例子来做同样的测试,以检验我们优化后的效果。下面是测试代码:

public class Test {  public static void main(String[] args) throws InterruptedException {   CrawlerEngine crawlerEngine = new CrawlerEngine();   crawlerEngine.setParseRegex("(http://www.cnblogs.com/artech/p|http://www.cnblogs.com/artech/default|http://www.cnblogs.com/artech/archive///d{4}///d{2}///d{2}/).*");   crawlerEngine.setSaveRegex("(http://www.cnblogs.com/artech/p|http://www.cnblogs.com/artech/archive///d{4}///d{2}///d{2}/).*");   crawlerEngine.startUp(new String[] { "http://www.cnblogs.com/artech/" });   crawlerEngine.shutdownAtCount(1000);  } } 

下面是运行结果:

爬虫入门——02

爬虫入门——02

(4) 总结

对比我们上一篇中的测试时间61s,改进后用时14s,效率有明显的提升。

在下一篇中,我们要对整个代码再次进行小的优化,完善一些细节,如对请求状态码的处理,抽取出一些接口以降低代码之间的耦合度,增强灵活性。

正文到此结束
Loading...