# coding=utf-8 ''' Created on 2016年3月9日 这是基于直连数据库的方法 @author: ChenHao ''' from pymongo.mongo_client import MongoClient from proxy import ip_pool from util_common import html_downloader, Constant import threading import random from time import sleep import urllib class DetailPageMain(): def __init__(self, userName=None, maxThread=100): self.cli = MongoClient(Constant.MONGODB_URL) self.db = self.cli.spider self.user = self._get_user(userName) self.pool = ip_pool.Pool() self.activeThread = 0 self.maxThread = maxThread self.successed = 0 self.failured = 0 self.total = 0 # last one self.isLast = False self.wait = False def reset(self): self.wait = True while self.activeThread > 0: sleep(0.5) try: if self.cli is not None: self.cli.close(); self.cli = MongoClient(Constant.MONGODB_URL) self.db = self.cli.spider self.wait = False except: self.reset() def _get_user(self, userName): rs_user = self.db.user.find_one({"name": userName}) if rs_user is None: # 初次接入 print(userName, ":is new for this task, Welcome!") rs_user = self.db.user.insert({"name": userName, "starttime": 0}) return rs_user # 获得N个任务 def _get_task(self, size=1): if self.wait: return [] try: if self.isLast: return self.db.detail_todo.find({"status" : Constant.TODO}) rand = random.random() result = self.db.detail_todo.find({"status" : Constant.TODO, "random" : {"$gt" : rand}}).sort("random", 1).limit(size) if result is None: result = self.db.detail_todo.find({"status" : Constant.TODO, "random" : {"$lt" : rand}}).sort("random", -1).limit(size) return result except: return [] def hasNext(self): if self.wait: return False try: count = self.db.detail_todo.find({"status": Constant.TODO}).count() self.isLast = count < self.maxThread - self.activeThread return count > 0 except: return True def _save_one_result(self, url, cont): if self.wait: return; try: task = self.db.detail_todo.find_one({"status" : Constant.TODO, "url" : url}) if task is not None: task["download_user"] = self.user['name'] task["str_html"] = cont task["status"] = Constant.DONE task["parserTask"] = Constant.TODO self.db.detail_todo.save(task) else: print('reload', url) except: None def _get_one_proxy(self): return self.pool.get() def _remove_one_proxy(self, proxy): self.pool.remove(proxy) def _on_download_success(self, cont, url, proxy): self.activeThread -= 1 # 检查是不是有效下载 if len(cont) < 60000 or str(cont).find('制造商零件编号') == -1: self._remove_one_proxy(proxy) self.failured += 1 if len(cont) > 150000: print("bad url", url) task = self.db.detail_todo.find_one({"status": Constant.TODO, "url": url}) if task is not None: task["status"] = Constant.ERROR self.db.detail_todo.save(task) elif len(cont) > 60000: print("robot!", url) else: print("robot!") else: self._save_one_result(url, cont) self.successed += 1 #print("success!") def _on_download_error(self, e, url, proxy): self.activeThread -= 1 self._remove_one_proxy(proxy) self.failured += 1 #print("failed!", e) def craw(self): if self.maxThread > self.activeThread: currentTasks = self._get_task(self.maxThread - self.activeThread) for task in currentTasks: proxy = self._get_one_proxy() if proxy is not None: crawer = CrawerThread(task["url"], proxy, self._on_download_success, self._on_download_error) crawer.start() self.activeThread += 1 self.total += 1 def statistic(self): return self.successed, self.failured, self.activeThread, self.total class CrawerThread(threading.Thread): def __init__(self, url, proxy, success, error): threading.Thread.__init__(self) self.downloader = html_downloader.HtmlDownloader() self.url = url self.proxy = proxy self.success = success self.error = error def run(self): try: cont = self.downloader.download(self.url, self.proxy) if self.success is not None: self.success(cont, self.url, self.proxy) except Exception as e: if self.error is not None: self.error(e, self.url, self.proxy) if __name__ == '__main__': cli = MongoClient(Constant.MONGODB_URL) db = cli.spider for u in db.user.find(): print(u['name'], db.detail_todo.find({"status":2, "download_user": u['name']}).count())