propertyValue.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. # coding=utf-8
  2. '''
  3. Created on 2016年6月17日
  4. @author: Pro1
  5. '''
  6. import threading
  7. import datetime
  8. from pymongo.mongo_client import MongoClient
  9. from util_common import Constant
  10. import cx_Oracle
  11. from concurrent2.blockQueue import BlockQueue
  12. '''
  13. 生产者
  14. '''
  15. class Producer(threading.Thread):
  16. def __init__(self, db, currentPage, pageSize, blockQueue):
  17. threading.Thread.__init__(self)
  18. self.db = db
  19. self.currentPage = currentPage
  20. self.pageSize = pageSize
  21. self.blockQueue = blockQueue
  22. self.stoped = False
  23. def run(self):
  24. value_list = self.db.propertyvalue_0614.find().skip((self.currentPage - 1) * self.pageSize).limit(self.pageSize)
  25. for index, row in enumerate(value_list):
  26. if self.stoped:
  27. break
  28. self.blockQueue.getQueue().put(row)
  29. self.blockQueue.close()
  30. print("page %s ready at %s" % (self.currentPage, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
  31. def stop(self):
  32. self.stoped = True
  33. '''
  34. 消费者
  35. '''
  36. class Consumer(threading.Thread):
  37. def __init__(self, conn, currentPage, pageSize, blockQueue):
  38. threading.Thread.__init__(self)
  39. self.conn = conn
  40. self.currentPage = currentPage
  41. self.pageSize = pageSize
  42. self.blockQueue = blockQueue
  43. self.stoped = False
  44. def run(self):
  45. cursor = self.conn.cursor()
  46. cursor.prepare('insert into product$propertyvalue$1 (pv_id,pv_componentid,pv_detno,pv_propertyid,pv_value) values(:1,:2,:3,:4,:5)')
  47. param_list = list()
  48. currentIndex = (self.currentPage - 1) * self.pageSize
  49. while not self.blockQueue.getQueue().empty() or not self.blockQueue.isClose():
  50. if self.stoped:
  51. print("page %s break at %s" % (self.currentPage, currentIndex))
  52. break
  53. row = self.blockQueue.getQueue().get()
  54. currentIndex += 1
  55. try:
  56. if len(row["value"]) > 80:
  57. continue
  58. param_list.append((currentIndex, row['componentid'], row['detno'], row['propertyid'], row['value']))
  59. except:
  60. param_list.append((currentIndex, row['componentid'], row['detno'], row['propertyid'], None))
  61. print("page %s finish at %s" % (self.currentPage, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
  62. cursor.executemany(None, param_list)
  63. self.conn.commit()
  64. cursor.close()
  65. def stop(self):
  66. self.stoped = True
  67. class Converter(threading.Thread):
  68. def __init__(self, currentPage, pageSize):
  69. threading.Thread.__init__(self)
  70. self.currentPage = currentPage
  71. self.pageSize = pageSize
  72. self.cli = MongoClient(Constant.MONGODB_URL)
  73. self.db = self.cli.spider
  74. self.conn = cx_Oracle.connect(Constant.ORACLE_URL)
  75. def run(self):
  76. print("page %s start at %s" % (self.currentPage, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
  77. blockQueue = BlockQueue(maxSize=self.pageSize)
  78. producer = Producer(self.db, self.currentPage, self.pageSize, blockQueue)
  79. consumer = Consumer(self.conn, self.currentPage, self.pageSize, blockQueue)
  80. try:
  81. producer.start()
  82. consumer.start()
  83. producer.join()
  84. consumer.join()
  85. except Exception as e:
  86. print("page %s error" % self.currentPage, e)
  87. finally:
  88. producer.stop()
  89. consumer.stop()
  90. self.cli.close()
  91. self.conn.close()