| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- # coding=utf-8
- '''
- Created on 2016年3月9日
- 这是基于直连数据库的方法
- @author: ChenHao
- '''
- from pymongo.mongo_client import MongoClient
- from proxy import ip_pool
- from util_common import html_downloader, Constant
- import threading
- import random
- from time import sleep
- import urllib
- class DetailPageMain():
- def __init__(self, userName=None, maxThread=100):
- self.cli = MongoClient(Constant.MONGODB_URL)
- self.db = self.cli.spider
- self.user = self._get_user(userName)
- self.pool = ip_pool.Pool()
- self.activeThread = 0
- self.maxThread = maxThread
- self.successed = 0
- self.failured = 0
- self.total = 0
- # last one
- self.isLast = False
- self.wait = False
- def reset(self):
- self.wait = True
- while self.activeThread > 0:
- sleep(0.5)
- try:
- if self.cli is not None:
- self.cli.close();
- self.cli = MongoClient(Constant.MONGODB_URL)
- self.db = self.cli.spider
- self.wait = False
- except:
- self.reset()
- def _get_user(self, userName):
- rs_user = self.db.user.find_one({"name": userName})
- if rs_user is None:
- # 初次接入
- print(userName, ":is new for this task, Welcome!")
- rs_user = self.db.user.insert({"name": userName, "starttime": 0})
- return rs_user
-
- # 获得N个任务
- def _get_task(self, size=1):
- if self.wait:
- return []
- try:
- if self.isLast:
- return self.db.detail_todo.find({"status" : Constant.TODO})
- rand = random.random()
- result = self.db.detail_todo.find({"status" : Constant.TODO, "random" : {"$gt" : rand}}).sort("random", 1).limit(size)
- if result is None:
- result = self.db.detail_todo.find({"status" : Constant.TODO, "random" : {"$lt" : rand}}).sort("random", -1).limit(size)
- return result
- except:
- return []
-
- def hasNext(self):
- if self.wait:
- return False
- try:
- count = self.db.detail_todo.find({"status": Constant.TODO}).count()
- self.isLast = count < self.maxThread - self.activeThread
- return count > 0
- except:
- return True
-
- def _save_one_result(self, url, cont):
- if self.wait:
- return;
- try:
- task = self.db.detail_todo.find_one({"status" : Constant.TODO, "url" : url})
- if task is not None:
- task["download_user"] = self.user['name']
- task["str_html"] = cont
- task["status"] = Constant.DONE
- task["parserTask"] = Constant.TODO
- self.db.detail_todo.save(task)
- else:
- print('reload', url)
- except:
- None
-
- def _get_one_proxy(self):
- return self.pool.get()
-
- def _remove_one_proxy(self, proxy):
- self.pool.remove(proxy)
-
- def _on_download_success(self, cont, url, proxy):
- self.activeThread -= 1
- # 检查是不是有效下载
- if len(cont) < 60000 or str(cont).find('制造商零件编号') == -1:
- self._remove_one_proxy(proxy)
- self.failured += 1
- if len(cont) > 150000:
- print("bad url", url)
- task = self.db.detail_todo.find_one({"status": Constant.TODO, "url": url})
- if task is not None:
- task["status"] = Constant.ERROR
- self.db.detail_todo.save(task)
- elif len(cont) > 60000:
- print("robot!", url)
- else:
- print("robot!")
- else:
- self._save_one_result(url, cont)
- self.successed += 1
- #print("success!")
-
- def _on_download_error(self, e, url, proxy):
- self.activeThread -= 1
- self._remove_one_proxy(proxy)
- self.failured += 1
- #print("failed!", e)
-
- def craw(self):
- if self.maxThread > self.activeThread:
- currentTasks = self._get_task(self.maxThread - self.activeThread)
- for task in currentTasks:
- proxy = self._get_one_proxy()
- if proxy is not None:
- crawer = CrawerThread(task["url"], proxy, self._on_download_success, self._on_download_error)
- crawer.start()
- self.activeThread += 1
- self.total += 1
-
- def statistic(self):
- return self.successed, self.failured, self.activeThread, self.total
-
- class CrawerThread(threading.Thread):
- def __init__(self, url, proxy, success, error):
- threading.Thread.__init__(self)
- self.downloader = html_downloader.HtmlDownloader()
- self.url = url
- self.proxy = proxy
- self.success = success
- self.error = error
-
- def run(self):
- try:
- cont = self.downloader.download(self.url, self.proxy)
- if self.success is not None:
- self.success(cont, self.url, self.proxy)
- except Exception as e:
- if self.error is not None:
- self.error(e, self.url, self.proxy)
- if __name__ == '__main__':
- cli = MongoClient(Constant.MONGODB_URL)
- db = cli.spider
- for u in db.user.find():
- print(u['name'], db.detail_todo.find({"status":2, "download_user": u['name']}).count())
|