# coding=utf-8 ''' Created on 2016年6月17日 @author: Pro1 ''' import threading import datetime from pymongo.mongo_client import MongoClient from util_common import Constant import cx_Oracle from concurrent2.blockQueue import BlockQueue ''' 生产者 ''' class Producer(threading.Thread): def __init__(self, db, currentPage, pageSize, blockQueue): threading.Thread.__init__(self) self.db = db self.currentPage = currentPage self.pageSize = pageSize self.blockQueue = blockQueue self.stoped = False def run(self): value_list = self.db.propertyvalue_0614.find().skip((self.currentPage - 1) * self.pageSize).limit(self.pageSize) for index, row in enumerate(value_list): if self.stoped: break self.blockQueue.getQueue().put(row) self.blockQueue.close() print("page %s ready at %s" % (self.currentPage, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) def stop(self): self.stoped = True ''' 消费者 ''' class Consumer(threading.Thread): def __init__(self, conn, currentPage, pageSize, blockQueue): threading.Thread.__init__(self) self.conn = conn self.currentPage = currentPage self.pageSize = pageSize self.blockQueue = blockQueue self.stoped = False def run(self): cursor = self.conn.cursor() cursor.prepare('insert into product$propertyvalue$1 (pv_id,pv_componentid,pv_detno,pv_propertyid,pv_value) values(:1,:2,:3,:4,:5)') param_list = list() currentIndex = (self.currentPage - 1) * self.pageSize while not self.blockQueue.getQueue().empty() or not self.blockQueue.isClose(): if self.stoped: print("page %s break at %s" % (self.currentPage, currentIndex)) break row = self.blockQueue.getQueue().get() currentIndex += 1 try: if len(row["value"]) > 80: continue param_list.append((currentIndex, row['componentid'], row['detno'], row['propertyid'], row['value'])) except: param_list.append((currentIndex, row['componentid'], row['detno'], row['propertyid'], None)) print("page %s finish at %s" % (self.currentPage, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) cursor.executemany(None, param_list) self.conn.commit() cursor.close() def stop(self): self.stoped = True class Converter(threading.Thread): def __init__(self, currentPage, pageSize): threading.Thread.__init__(self) self.currentPage = currentPage self.pageSize = pageSize self.cli = MongoClient(Constant.MONGODB_URL) self.db = self.cli.spider self.conn = cx_Oracle.connect(Constant.ORACLE_URL) def run(self): print("page %s start at %s" % (self.currentPage, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) blockQueue = BlockQueue(maxSize=self.pageSize) producer = Producer(self.db, self.currentPage, self.pageSize, blockQueue) consumer = Consumer(self.conn, self.currentPage, self.pageSize, blockQueue) try: producer.start() consumer.start() producer.join() consumer.join() except Exception as e: print("page %s error" % self.currentPage, e) finally: producer.stop() consumer.stop() self.cli.close() self.conn.close()