imgfile_main_by_mongodb_with_thread.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. # coding=utf-8
  2. '''
  3. Created on 2016年3月9日
  4. 这是基于直连数据库的方法
  5. 图片文件名全部用.jpg,content_type="image/jpeg"
  6. 因为在2进制方式下面png,jpg可以随意切换
  7. @author: ChenHao
  8. '''
  9. from pymongo.mongo_client import MongoClient
  10. from proxy import ip_pool
  11. from util_common import html_downloader, Constant
  12. import threading
  13. import uuid
  14. import gridfs
  15. class ImgFileMain():
  16. def __init__(self, userName=None, maxThread=100):
  17. cli = MongoClient(Constant.MONGODB_URL)
  18. self.db = cli.spider
  19. self.fs = gridfs.GridFS(cli.pictures)
  20. self.user = self._get_user(userName)
  21. self.pool = ip_pool.Pool()
  22. self.activeThread = 0
  23. self.maxThread = maxThread
  24. self.successed = 0
  25. self.failured = 0
  26. self.total = 0
  27. # last one
  28. self.isLast = False
  29. def _get_user(self, userName):
  30. rs_user = self.db.user.find_one({"name": userName})
  31. if rs_user is None:
  32. # 初次接入
  33. print(userName, ":is new for this task, Welcome!")
  34. rs_user = self.db.user.insert({"name": userName, "starttime": 0})
  35. return rs_user
  36. # 获得N个任务
  37. def _get_task(self, size=1):
  38. try:
  39. if self.isLast:
  40. return [self.db.component_original.find_one({"imgTask" : Constant.TODO}, {"_id": True, "img_url_mouser": True})]
  41. # rand = random.random()
  42. result = self.db.component_original.find({"imgTask" : Constant.TODO}).limit(size)
  43. if result is None:
  44. result = self.db.component_original.find({"imgTask" : Constant.TODO}).limit(size)
  45. return result
  46. except:
  47. return []
  48. # 生成图片文件名
  49. def _get_imgName_and_img_url_uu(self):
  50. uuid_str = str(uuid.uuid1())
  51. imgName = uuid_str.replace("-", "") + ".jpg"
  52. img_url_uu = Constant.IMG_URL_HEADER + imgName
  53. return imgName, img_url_uu
  54. def hasNext(self):
  55. try:
  56. count = self.db.component_original.find({"imgTask": Constant.TODO}).count()
  57. self.isLast = count == 1
  58. return count > 0
  59. except:
  60. return True
  61. def _save_one_result(self, _id, cont_file):
  62. # 保存并生成
  63. try:
  64. imgName, img_url_uu = self._get_imgName_and_img_url_uu()
  65. print (imgName, img_url_uu)
  66. self.fs.put(cont_file, content_type="image/jpeg", filename=imgName)
  67. # self.db.component_original.update_one({'_id': _id}, {'$set': {'imgTask': Constant.DONE, 'img_url_uu': img_url_uu, 'img_download_user': self.user["name"]}})
  68. except:
  69. None
  70. def _get_one_proxy(self):
  71. return self.pool.get()
  72. def _remove_one_proxy(self, proxy):
  73. self.pool.remove(proxy)
  74. def _on_download_success(self, _id, cont_file, proxy):
  75. self.activeThread -= 1
  76. # 检查是不是有效下载(以5KB为标准)
  77. if len(cont_file) < 5000:
  78. self._remove_one_proxy(proxy)
  79. self.failured += 1
  80. else:
  81. self._save_one_result(_id, cont_file)
  82. self.successed += 1
  83. def _on_download_error(self, e, url, proxy):
  84. self.activeThread -= 1
  85. self._remove_one_proxy(proxy)
  86. self.failured += 1
  87. # print("failed! proxy ", proxy, ", url: ", url, e)
  88. def craw(self):
  89. if self.maxThread > self.activeThread:
  90. currentTasks = self._get_task(self.maxThread - self.activeThread)
  91. for task in currentTasks:
  92. crawer = CrawerThread(task["_id"], task["img_url_mouser"], None, self._on_download_success, self._on_download_error)
  93. crawer.start()
  94. self.activeThread += 1
  95. self.total += 1
  96. def statistic(self):
  97. return self.successed, self.failured, self.activeThread, self.total
  98. class CrawerThread(threading.Thread):
  99. def __init__(self, _id, img_url_mouser, proxy, success, error):
  100. threading.Thread.__init__(self)
  101. self.downloader = html_downloader.HtmlDownloader()
  102. self._id = _id
  103. self.img_url_mouser = img_url_mouser
  104. self.proxy = proxy
  105. self.success = success
  106. self.error = error
  107. def run(self):
  108. try:
  109. cont_file = self.downloader.download_file(self.img_url_mouser, self.proxy)
  110. if self.success is not None:
  111. self.success(self._id, cont_file, self.proxy)
  112. except Exception as e:
  113. if self.error is not None:
  114. self.error(e, self.img_url_mouser, self.proxy)
  115. if __name__ == '__main__':
  116. task = ImgFileMain(maxThread=1)
  117. while task.hasNext():
  118. task.craw()
  119. task.close()