123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- # coding=utf-8
- '''
- Created on 2016年6月17日
- @author: Pro1
- '''
- import threading
- import datetime
- from pymongo.mongo_client import MongoClient
- from util_common import Constant
- import cx_Oracle
- from concurrent2.blockQueue import BlockQueue
- '''
- 生产者
- '''
- class Producer(threading.Thread):
- def __init__(self, db, currentPage, pageSize, blockQueue):
- threading.Thread.__init__(self)
- self.db = db
- self.currentPage = currentPage
- self.pageSize = pageSize
- self.blockQueue = blockQueue
- self.stoped = False
-
- def run(self):
- value_list = self.db.propertyvalue_0614.find().skip((self.currentPage - 1) * self.pageSize).limit(self.pageSize)
- for index, row in enumerate(value_list):
- if self.stoped:
- break
- self.blockQueue.getQueue().put(row)
- self.blockQueue.close()
- print("page %s ready at %s" % (self.currentPage, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
-
- def stop(self):
- self.stoped = True
-
-
- '''
- 消费者
- '''
- class Consumer(threading.Thread):
- def __init__(self, conn, currentPage, pageSize, blockQueue):
- threading.Thread.__init__(self)
- self.conn = conn
- self.currentPage = currentPage
- self.pageSize = pageSize
- self.blockQueue = blockQueue
- self.stoped = False
-
- def run(self):
- cursor = self.conn.cursor()
- cursor.prepare('insert into product$propertyvalue$1 (pv_id,pv_componentid,pv_detno,pv_propertyid,pv_value) values(:1,:2,:3,:4,:5)')
- param_list = list()
- currentIndex = (self.currentPage - 1) * self.pageSize
- while not self.blockQueue.getQueue().empty() or not self.blockQueue.isClose():
- if self.stoped:
- print("page %s break at %s" % (self.currentPage, currentIndex))
- break
- row = self.blockQueue.getQueue().get()
- currentIndex += 1
- try:
- if len(row["value"]) > 80:
- continue
- param_list.append((currentIndex, row['componentid'], row['detno'], row['propertyid'], row['value']))
- except:
- param_list.append((currentIndex, row['componentid'], row['detno'], row['propertyid'], None))
- print("page %s finish at %s" % (self.currentPage, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
- cursor.executemany(None, param_list)
- self.conn.commit()
- cursor.close()
-
- def stop(self):
- self.stoped = True
- class Converter(threading.Thread):
- def __init__(self, currentPage, pageSize):
- threading.Thread.__init__(self)
- self.currentPage = currentPage
- self.pageSize = pageSize
- self.cli = MongoClient(Constant.MONGODB_URL)
- self.db = self.cli.spider
- self.conn = cx_Oracle.connect(Constant.ORACLE_URL)
-
- def run(self):
- print("page %s start at %s" % (self.currentPage, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
- blockQueue = BlockQueue(maxSize=self.pageSize)
- producer = Producer(self.db, self.currentPage, self.pageSize, blockQueue)
- consumer = Consumer(self.conn, self.currentPage, self.pageSize, blockQueue)
- try:
- producer.start()
- consumer.start()
- producer.join()
- consumer.join()
- except Exception as e:
- print("page %s error" % self.currentPage, e)
- finally:
- producer.stop()
- consumer.stop()
- self.cli.close()
- self.conn.close()
-
|