converter.py.svn-base 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. # coding=utf-8
  2. '''
  3. Created on 2016年6月17日
  4. @author: Pro1
  5. '''
  6. from pymongo.mongo_client import MongoClient
  7. import math
  8. import datetime
  9. from util_common import Constant
  10. from concurrent2.propertyValue import Converter
  11. import time
  12. class PooledThread(object):
  13. def __init__(self, poolSize=50):
  14. self.poolSize = poolSize
  15. self.threads = list()
  16. def waitFor(self):
  17. if len(self.threads) < self.poolSize:
  18. return
  19. for thread in self.threads:
  20. if not thread.isAlive():
  21. self.threads.remove(thread)
  22. if len(self.threads) >= self.poolSize:
  23. time.sleep(1)
  24. self.waitFor()
  25. def startThread(self, currentPage, pageSize):
  26. converter = Converter(currentPage, pageSize)
  27. converter.start()
  28. self.threads.append(converter)
  29. self.waitFor()
  30. if __name__ == '__main__':
  31. cli = MongoClient(Constant.MONGODB_URL)
  32. db = cli.spider
  33. # 65149825
  34. totalElements = db.propertyvalue_0614.find({}, {'_id':True}).count()
  35. cli.close()
  36. pageSize = 10000
  37. # 6520
  38. pages = math.ceil(totalElements / pageSize)
  39. pool = PooledThread(poolSize=50)
  40. print("totalElements: %s, pages: %s, task start at %s" % (totalElements, pages, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
  41. for currentPage in range(1, pages):
  42. pool.startThread(currentPage, pageSize)