init_detail_by_thread.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. # coding=utf-8
  2. '''
  3. Created on 2016年3月7日
  4. 使用线程的方式为详情页生成下载任务
  5. @author: ChenHao
  6. '''
  7. from pymongo.mongo_client import MongoClient
  8. from util_common import Constant, html_parser
  9. from bs4 import BeautifulSoup
  10. import random
  11. import threading
  12. from pymongo.errors import ConnectionFailure
  13. class InitDetail():
  14. def __init__(self, maxThread=200):
  15. self.maxThread = maxThread
  16. self.parser = html_parser.HtmlParser()
  17. # 连接数据库
  18. self.cli = MongoClient(Constant.MONGODB_URL)
  19. self.db = self.cli.spider
  20. self.startThread = 0
  21. self.finishThread = 0
  22. self.successed = 0
  23. self.failed = 0
  24. self.details = 0
  25. def hasNext(self):
  26. try:
  27. return self.db.kindlist_todo.find({"status": Constant.DONE , "creatDetailTask": Constant.TODO}).count() > 0
  28. except:
  29. return True
  30. def _get_task(self, size=1):
  31. return self.db.kindlist_todo.find({"status": Constant.DONE , "creatDetailTask": Constant.TODO}).limit(size)
  32. def _on_success(self, url, kinds, details):
  33. self.finishThread += 1
  34. self.details += len(details)
  35. self.successed += 1
  36. print('success', url)
  37. def _on_error(self, url, e):
  38. self.finishThread += 1
  39. self.failed += 1
  40. print('failure', url, e)
  41. def run(self):
  42. tasks = self._get_task(self.maxThread)
  43. for task in tasks:
  44. thread = InitThread(self.db, self.parser, task['url'], task['str_html'], self._on_success, self._on_error)
  45. thread.start()
  46. def stat(self):
  47. return self.successed, self.failed
  48. def close(self):
  49. self.cli.close()
  50. class InitThread(threading.Thread):
  51. def __init__(self, db, parser, url, html, success, error):
  52. threading.Thread.__init__(self)
  53. self.db = db
  54. self.parser = parser
  55. self.url = url
  56. self.html = html
  57. self.success = success
  58. self.error = error
  59. def _parse_kind(self, soup):
  60. # 解析并存储解析出来的类目
  61. kindls = self.parser._get_kindlist_by_listPage(soup)
  62. kindls_list = list()
  63. for kind in kindls:
  64. kind_d = dict()
  65. kind_d["kindls"] = kind
  66. kindls_list.append(kind_d)
  67. return kindls_list
  68. def _parse_detail(self, soup):
  69. # 解析并存储detailTask
  70. detail_urls = self.parser._get_detail_urls_from_listPage(soup)
  71. # 组装detail任务
  72. task_list = list()
  73. for detail_url in detail_urls:
  74. d = dict()
  75. d["url"] = detail_url
  76. d["random"] = random.random()
  77. d["status"] = Constant.TODO
  78. d["analysisTask"] = Constant.TODO
  79. task_list.append(d)
  80. return task_list
  81. def _on_success(self, kinds, details):
  82. # 类目
  83. self.db.kind_from_listpage.insert_many(kinds)
  84. # 详情
  85. self.db.detail_todo.insert_many(details)
  86. # 将此listPage修改状态
  87. self.db.kindlist_todo.update_one({'url': self.url}, {'$set': {'creatDetailTask': Constant.DONE}})
  88. def _on_error(self, e):
  89. self.db.kindlist_todo.update_one({'url': self.url}, {'$set': {'creatDetailTask': Constant.ERROR}})
  90. def run(self):
  91. if len(self.html) < 150000 or str(self.html).find('购买所选商品') == -1:
  92. e = Exception("无效HTML")
  93. self._on_error(e)
  94. if self.error is not None:
  95. self.error(self.url, e)
  96. return
  97. try:
  98. soup = BeautifulSoup(self.html, 'html.parser', from_encoding='utf-8')
  99. kinds = self._parse_kind(soup)
  100. details = self._parse_detail(soup)
  101. self._on_success(kinds, details)
  102. if self.success is not None:
  103. self.success(self.url, kinds, details)
  104. except ConnectionFailure as e:
  105. if self.error is not None:
  106. self.error(self.url, e)
  107. except Exception as e:
  108. self._on_error(e)
  109. if self.error is not None:
  110. self.error(self.url, e)
  111. if __name__ == '__main__':
  112. task = InitDetail(maxThread=100)
  113. while task.hasNext():
  114. task.run()
  115. task.close()