detail_main_by_mongodb.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. # coding=utf-8
  2. '''
  3. Created on 2016年3月9日
  4. 这是基于直连数据库的方法
  5. @author: ChenHao
  6. '''
  7. from pymongo.mongo_client import MongoClient
  8. from proxy import ip_pool
  9. from util_common import html_downloader, Constant
  10. import threading
  11. import random
  12. from time import sleep
  13. import urllib
  14. class DetailPageMain():
  15. def __init__(self, userName=None, maxThread=100):
  16. self.cli = MongoClient(Constant.MONGODB_URL)
  17. self.db = self.cli.spider
  18. self.user = self._get_user(userName)
  19. self.pool = ip_pool.Pool()
  20. self.activeThread = 0
  21. self.maxThread = maxThread
  22. self.successed = 0
  23. self.failured = 0
  24. self.total = 0
  25. # last one
  26. self.isLast = False
  27. self.wait = False
  28. def reset(self):
  29. self.wait = True
  30. while self.activeThread > 0:
  31. sleep(0.5)
  32. try:
  33. if self.cli is not None:
  34. self.cli.close();
  35. self.cli = MongoClient(Constant.MONGODB_URL)
  36. self.db = self.cli.spider
  37. self.wait = False
  38. except:
  39. self.reset()
  40. def _get_user(self, userName):
  41. rs_user = self.db.user.find_one({"name": userName})
  42. if rs_user is None:
  43. # 初次接入
  44. print(userName, ":is new for this task, Welcome!")
  45. rs_user = self.db.user.insert({"name": userName, "starttime": 0})
  46. return rs_user
  47. # 获得N个任务
  48. def _get_task(self, size=1):
  49. if self.wait:
  50. return []
  51. try:
  52. if self.isLast:
  53. return self.db.detail_todo.find({"status" : Constant.TODO})
  54. rand = random.random()
  55. result = self.db.detail_todo.find({"status" : Constant.TODO, "random" : {"$gt" : rand}}).sort("random", 1).limit(size)
  56. if result is None:
  57. result = self.db.detail_todo.find({"status" : Constant.TODO, "random" : {"$lt" : rand}}).sort("random", -1).limit(size)
  58. return result
  59. except:
  60. return []
  61. def hasNext(self):
  62. if self.wait:
  63. return False
  64. try:
  65. count = self.db.detail_todo.find({"status": Constant.TODO}).count()
  66. self.isLast = count < self.maxThread - self.activeThread
  67. return count > 0
  68. except:
  69. return True
  70. def _save_one_result(self, url, cont):
  71. if self.wait:
  72. return;
  73. try:
  74. task = self.db.detail_todo.find_one({"status" : Constant.TODO, "url" : url})
  75. if task is not None:
  76. task["download_user"] = self.user['name']
  77. task["str_html"] = cont
  78. task["status"] = Constant.DONE
  79. task["parserTask"] = Constant.TODO
  80. self.db.detail_todo.save(task)
  81. else:
  82. print('reload', url)
  83. except:
  84. None
  85. def _get_one_proxy(self):
  86. return self.pool.get()
  87. def _remove_one_proxy(self, proxy):
  88. self.pool.remove(proxy)
  89. def _on_download_success(self, cont, url, proxy):
  90. self.activeThread -= 1
  91. # 检查是不是有效下载
  92. if len(cont) < 60000 or str(cont).find('制造商零件编号') == -1:
  93. self._remove_one_proxy(proxy)
  94. self.failured += 1
  95. if len(cont) > 150000:
  96. print("bad url", url)
  97. task = self.db.detail_todo.find_one({"status": Constant.TODO, "url": url})
  98. if task is not None:
  99. task["status"] = Constant.ERROR
  100. self.db.detail_todo.save(task)
  101. elif len(cont) > 60000:
  102. print("robot!", url)
  103. else:
  104. print("robot!")
  105. else:
  106. self._save_one_result(url, cont)
  107. self.successed += 1
  108. #print("success!")
  109. def _on_download_error(self, e, url, proxy):
  110. self.activeThread -= 1
  111. self._remove_one_proxy(proxy)
  112. self.failured += 1
  113. #print("failed!", e)
  114. def craw(self):
  115. if self.maxThread > self.activeThread:
  116. currentTasks = self._get_task(self.maxThread - self.activeThread)
  117. for task in currentTasks:
  118. proxy = self._get_one_proxy()
  119. if proxy is not None:
  120. crawer = CrawerThread(task["url"], proxy, self._on_download_success, self._on_download_error)
  121. crawer.start()
  122. self.activeThread += 1
  123. self.total += 1
  124. def statistic(self):
  125. return self.successed, self.failured, self.activeThread, self.total
  126. class CrawerThread(threading.Thread):
  127. def __init__(self, url, proxy, success, error):
  128. threading.Thread.__init__(self)
  129. self.downloader = html_downloader.HtmlDownloader()
  130. self.url = url
  131. self.proxy = proxy
  132. self.success = success
  133. self.error = error
  134. def run(self):
  135. try:
  136. cont = self.downloader.download(self.url, self.proxy)
  137. if self.success is not None:
  138. self.success(cont, self.url, self.proxy)
  139. except Exception as e:
  140. if self.error is not None:
  141. self.error(e, self.url, self.proxy)
  142. if __name__ == '__main__':
  143. cli = MongoClient(Constant.MONGODB_URL)
  144. db = cli.spider
  145. for u in db.user.find():
  146. print(u['name'], db.detail_todo.find({"status":2, "download_user": u['name']}).count())