pdffile_to_fdfs.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. # coding=utf-8
  2. '''
  3. 下载pdf文件,存储到fdfs文件系统
  4. @author: yingp
  5. '''
  6. from pymongo.mongo_client import MongoClient
  7. from util_common import html_downloader, Constant
  8. import threading
  9. import random
  10. import os
  11. import urllib
  12. import requests
  13. from PyPDF2.pdf import PdfFileReader, PdfFileWriter
  14. class FileMain():
  15. def __init__(self, userName='someone', maxThread=100, tempDir="/tmp/"):
  16. cli = MongoClient(Constant.MONGODB_URL)
  17. self.db = cli.spider
  18. self.user = self._get_user(userName)
  19. self.activeThread = 0
  20. self.maxThread = maxThread
  21. self.tempDir = tempDir
  22. if not os.path.exists(tempDir):
  23. os.mkdir(tempDir)
  24. self.successed = 0
  25. self.failured = 0
  26. self.total = 0
  27. # last one
  28. self.isLast = False
  29. def _get_user(self, userName):
  30. rs_user = self.db.user.find_one({"name": userName})
  31. if rs_user is None:
  32. # 初次接入
  33. print(userName, ":is new for this task, Welcome!")
  34. rs_user = self.db.user.insert({"name": userName, "starttime": 0})
  35. return rs_user
  36. # 获得N个任务
  37. def _get_task(self, size=1):
  38. try:
  39. if self.isLast:
  40. return self.db.component_original.find({"attachTask" : Constant.TODO}, {"_id": True, "attachUrl": True})
  41. rand = random.random()
  42. result = self.db.component_original.find({"attachTask" : Constant.TODO, "random": {"$gt": rand}}).sort("random", 1).limit(size)
  43. if result is None:
  44. result = self.db.component_original.find({"attachTask" : Constant.TODO, "random": {"$lt": rand}}).sort("random", -1).limit(size)
  45. return result
  46. except:
  47. return []
  48. def hasNext(self):
  49. try:
  50. count = self.db.component_original.find({"attachTask": Constant.TODO}).count()
  51. self.isLast = count <= self.maxThread - self.activeThread
  52. return count > 0
  53. except:
  54. return True
  55. def _save_one_result(self, _id, cont_file):
  56. # 保存并生成
  57. try:
  58. with open(cont_file, 'rb') as file:
  59. res = requests.post(Constant.FS_API_UPLOAD, files={'file': file})
  60. res_j = res.json()
  61. self.db.component_original.update_one({'_id': _id}, {'$set': {'attachTask': Constant.DONE, 'attachUrl_uu': res_j['path'], 'attach_download_user': self.user["name"]}})
  62. except Exception as e:
  63. print('failed on save', e)
  64. def _on_download_success(self, _id, cont_file):
  65. self.activeThread -= 1
  66. self._save_one_result(_id, cont_file)
  67. self.successed += 1
  68. def _save_with_url(self, _id, attachUrl_uu):
  69. self.activeThread -= 1
  70. self.db.component_original.update_one({'_id': _id}, {'$set': {'attachTask': Constant.DONE, 'attachUrl_uu': attachUrl_uu, 'attach_download_user': self.user["name"]}})
  71. self.successed += 1
  72. def _on_download_error(self, e, url):
  73. self.activeThread -= 1
  74. self.failured += 1
  75. print("failed! url: ", url, e)
  76. '''不同器件的attachUrl可能一样,按attachUrl先查是否有已经下载了的
  77. '''
  78. def _find_by_mouser_url(self, url):
  79. result = self.db.component_original.find_one({"attachTask": Constant.DONE, "attachUrl": url, "attachUrl_uu": {"$exists": True}});
  80. if result is not None:
  81. return result['attachUrl_uu']
  82. return None
  83. def _before_download(self, _id, url):
  84. exist_url = self._find_by_mouser_url(url)
  85. if exist_url is not None:
  86. self._save_with_url(_id, exist_url)
  87. return True
  88. return False
  89. def craw(self):
  90. if self.maxThread > self.activeThread:
  91. currentTasks = self._get_task(self.maxThread - self.activeThread)
  92. for task in currentTasks:
  93. # 只考虑.pdf文件
  94. if str(task["attachUrl"]).find(".pdf") > -1:
  95. crawer = CrawerThread(task["_id"], task["attachUrl"], self.tempDir, self._before_download, self._on_download_success, self._on_download_error)
  96. crawer.start()
  97. self.activeThread += 1
  98. self.total += 1
  99. def statistic(self):
  100. return self.successed, self.failured, self.activeThread, self.total
  101. def close(self):
  102. self.db.close()
  103. class CrawerThread(threading.Thread):
  104. def __init__(self, _id, attachUrl, tempDir, beforeHandler, success, error):
  105. threading.Thread.__init__(self)
  106. self.downloader = html_downloader.HtmlDownloader()
  107. self._id = _id
  108. self.attachUrl = attachUrl
  109. self.tempDir = tempDir
  110. self.beforeHandler = beforeHandler
  111. self.success = success
  112. self.error = error
  113. def run(self):
  114. if self.beforeHandler(self._id, self.attachUrl):
  115. return
  116. filename = self.tempDir + str(random.random())
  117. filename1 = self.tempDir + str(random.random()) + '.pdf'
  118. try:
  119. urllib.request.urlretrieve(self.attachUrl, filename)
  120. input_stream = open(filename, 'rb')
  121. pdf_input = PdfFileReader(input_stream)
  122. pdf_output = PdfFileWriter()
  123. page = 0
  124. pages = pdf_input.getNumPages() - 1
  125. # remove last page
  126. while page < pages:
  127. pdf_output.addPage(pdf_input.getPage(page))
  128. page += 1
  129. output_stream = open(filename1, 'wb')
  130. pdf_output.write(output_stream)
  131. output_stream.close()
  132. input_stream.close()
  133. if self.success is not None:
  134. self.success(self._id, filename1)
  135. except Exception as e:
  136. if self.error is not None:
  137. self.error(e, self.attachUrl)
  138. finally:
  139. if os.path.exists(filename):
  140. os.remove(filename)
  141. if os.path.exists(filename1):
  142. os.remove(filename1)