Browse Source

搜索时不检测索引是否需要重新加载,加载工作放在更新索引之后延迟一定时间进行

sunyj 9 years ago
parent
commit
710abd1321

+ 12 - 3
search-console-b2b/src/main/java/com/uas/search/console/b2b/service/impl/IndexServiceImpl.java

@@ -39,6 +39,7 @@ import com.uas.search.console.b2b.jms.QueueMessageParser;
 import com.uas.search.console.b2b.model.LuceneQueueMessage;
 import com.uas.search.console.b2b.model.ParsedQueueMessage;
 import com.uas.search.console.b2b.service.IndexService;
+import com.uas.search.console.b2b.support.IndexSearcherManager;
 import com.uas.search.console.b2b.support.IndexWriterManager;
 import com.uas.search.console.b2b.util.ClassAndTableNameUtils;
 import com.uas.search.console.b2b.util.FileUtils;
@@ -66,6 +67,8 @@ public class IndexServiceImpl implements IndexService {
 	@Autowired
 	private LuceneProperties luceneProperties;
 
+	private IndexSearcherManager indexSearcherManager = IndexSearcherManager.getInstance();
+
 	private Logger logger = LoggerFactory.getLogger(IndexServiceImpl.class);
 
 	/**
@@ -375,9 +378,11 @@ public class IndexServiceImpl implements IndexService {
 			Document document = ObjectToDocumentUtils.toDocument(obj);
 			if (document != null) {
 				try {
-					indexWriter = indexWriterManager.get(ClassAndTableNameUtils.toTableName(obj.getClass()));
+					Table_name tableName = ClassAndTableNameUtils.toTableName(obj.getClass());
+					indexWriter = indexWriterManager.get(tableName);
 					indexWriter.addDocument(document);
 					indexWriter.commit();
+					indexSearcherManager.flushCache(tableName, null);
 					return obj;
 				} catch (IOException | InterruptedException e) {
 					logger.error("", e);
@@ -397,10 +402,12 @@ public class IndexServiceImpl implements IndexService {
 			Document document = ObjectToDocumentUtils.toDocument(obj);
 			if (document != null) {
 				try {
-					indexWriter = indexWriterManager.get(ClassAndTableNameUtils.toTableName(obj.getClass()));
+					Table_name tableName = ClassAndTableNameUtils.toTableName(obj.getClass());
+					indexWriter = indexWriterManager.get(tableName);
 					indexWriter.updateDocument(new Term(ClassAndTableNameUtils.getIdField(obj.getClass()),
 							String.valueOf(ClassAndTableNameUtils.getId(obj))), document);
 					indexWriter.commit();
+					indexSearcherManager.flushCache(tableName, null);
 					return obj;
 				} catch (IOException | InterruptedException e) {
 					logger.error("", e);
@@ -418,10 +425,12 @@ public class IndexServiceImpl implements IndexService {
 	public <T> T delete(T obj) {
 		if (obj != null) {
 			try {
-				indexWriter = indexWriterManager.get(ClassAndTableNameUtils.toTableName(obj.getClass()));
+				Table_name tableName = ClassAndTableNameUtils.toTableName(obj.getClass());
+				indexWriter = indexWriterManager.get(tableName);
 				indexWriter.deleteDocuments(new Term(ClassAndTableNameUtils.getIdField(obj.getClass()),
 						String.valueOf(ClassAndTableNameUtils.getId(obj))));
 				indexWriter.commit();
+				indexSearcherManager.flushCache(tableName, null);
 				return obj;
 			} catch (IOException e) {
 				logger.error("", e);

+ 75 - 24
search-console-b2b/src/main/java/com/uas/search/console/b2b/support/IndexSearcherManager.java

@@ -8,12 +8,12 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.lucene.store.NIOFSDirectory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
 
 import com.uas.search.b2b.service.SearchService.Table_name;
 import com.uas.search.console.b2b.util.ClassAndTableNameUtils;
@@ -27,6 +27,8 @@ import com.uas.search.console.b2b.util.SearchUtils;
  */
 public class IndexSearcherManager {
 
+	private static volatile IndexSearcherManager instance;
+
 	/**
 	 * 用于存放指定表的索引IndexSearcher
 	 */
@@ -37,15 +39,31 @@ public class IndexSearcherManager {
 	 */
 	private Map<Table_name, Boolean> reopening = new ConcurrentHashMap<>();
 
+	/**
+	 * 标识在后台是否存在线程在等待刷新缓存中的索引
+	 */
+	private Map<Table_name, Boolean> waitingFlushCache = new ConcurrentHashMap<>();
+
 	private Logger logger = LoggerFactory.getLogger(getClass());
 
-	public IndexSearcherManager() {
+	private IndexSearcherManager() {
 		List<Table_name> tableNames = ClassAndTableNameUtils.getTableNames();
 		for (Table_name tableName : tableNames) {
 			reopening.put(tableName, false);
 		}
 	}
 
+	public static IndexSearcherManager getInstance() {
+		if (instance == null) {
+			synchronized (IndexSearcherManager.class) {
+				if (instance == null) {
+					instance = new IndexSearcherManager();
+				}
+			}
+		}
+		return instance;
+	}
+
 	/**
 	 * 对指定表的IndexSearcher进行初始化操作
 	 * 
@@ -53,21 +71,21 @@ public class IndexSearcherManager {
 	 *            表名
 	 * @return 指定表的FSDirectory
 	 */
-	private synchronized FSDirectory warm(Table_name tableName) {
-		String indexPath = SearchUtils.getIndexPath(tableName);
-		FSDirectory directory = null;
+	private synchronized void warm(Table_name tableName) {
 		IndexSearcher indexSearcher = indexSearchers.get(tableName);
-		// 本来NIOFSDirectory速度更快,但因为jre的bug,在Windows平台上,其性能很差
-		try {
-			if (System.getProperty("os.name").toLowerCase().startsWith("win")) {
-				directory = FSDirectory.open(Paths.get(indexPath));
-			} else {
-				directory = NIOFSDirectory.open(Paths.get(indexPath));
-			}
+		// 不为空的话说明索引已成功加载
+		if (indexSearcher == null) {
+			String indexPath = SearchUtils.getIndexPath(tableName);
+			FSDirectory directory = null;
+			// 本来NIOFSDirectory速度更快,但因为jre的bug,在Windows平台上,其性能很差
+			try {
+				if (System.getProperty("os.name").toLowerCase().startsWith("win")) {
+					directory = FSDirectory.open(Paths.get(indexPath));
+				} else {
+					directory = NIOFSDirectory.open(Paths.get(indexPath));
+				}
 
-			File[] files = directory.getDirectory().toFile().listFiles();
-			// 不为空的话说明索引已成功加载
-			if (indexSearcher == null) {
+				File[] files = directory.getDirectory().toFile().listFiles();
 				// 路径不为空文件夹
 				if (files.length != 0) {
 					try {
@@ -79,11 +97,10 @@ public class IndexSearcherManager {
 				} else {
 					logger.error("索引文件不存在:" + tableName.value());
 				}
+			} catch (IOException e) {
+				logger.error("索引路径打开失败:" + indexPath, e);
 			}
-		} catch (IOException e) {
-			logger.error("索引路径打开失败:" + indexPath, e);
 		}
-		return directory;
 	}
 
 	/**
@@ -99,18 +116,15 @@ public class IndexSearcherManager {
 		startReopen(tableName);
 
 		try {
-			// 每次都要进行初始化处理,是为了防止加载索引时,索引为空,而后来索引不为空时,却不能够正确加载
-			FSDirectory directory = warm(tableName);
 			IndexSearcher currentSearcher = indexSearchers.get(tableName);
 			if (currentSearcher == null) {
 				return;
 			}
 			IndexSearcher searcher = get(tableName);
-			Long currentVersion = ((DirectoryReader) searcher.getIndexReader()).getVersion();
-			// 版本号不一致(本地索引有更改),更新IndexReader
 			try {
-				if (DirectoryReader.open(directory).getVersion() != currentVersion) {
-					IndexReader newReader = DirectoryReader.open(directory);
+				// 本地索引有更改,更新IndexReader
+				DirectoryReader newReader = DirectoryReader.openIfChanged((DirectoryReader) searcher.getIndexReader());
+				if (newReader != null) {
 					IndexSearcher newSearcher = new IndexSearcher(newReader);
 					updateIndexSearcher(tableName, newSearcher);
 				}
@@ -151,6 +165,7 @@ public class IndexSearcherManager {
 		if (tableName == null) {
 			return null;
 		}
+		warm(tableName);
 		IndexSearcher currentSearcher = indexSearchers.get(tableName);
 		if (currentSearcher != null) {
 			currentSearcher.getIndexReader().incRef();
@@ -187,4 +202,40 @@ public class IndexSearcherManager {
 		release(currentSearcher);
 		indexSearchers.put(tableName, newIndexSearcher);
 	}
+
+	/**
+	 * 刷新缓存中的索引
+	 * 
+	 * @param tableName
+	 *            表名
+	 * @param delay
+	 *            延迟一定时间刷新/ms,可为空,默认延迟3000ms
+	 */
+	public void flushCache(final Table_name tableName, final Long delay) {
+		if (StringUtils.isEmpty(tableName)) {
+			return;
+		}
+		Boolean waiting = waitingFlushCache.get(tableName);
+		// 有等待的线程,则不增加线程
+		if (waiting == null || !waiting.booleanValue()) {
+			final long delayCopy = (delay == null || delay.longValue() < 0) ? 3000 : delay.longValue();
+			new Thread(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						// 延迟一定时间再刷新缓存
+						Thread.sleep(delayCopy);
+						maybeReopen(tableName);
+					} catch (InterruptedException e) {
+						logger.error("", e);
+					} finally {
+						// 线程执行完毕,无论是否成功,均将状态改为false
+						waitingFlushCache.put(tableName, false);
+					}
+				}
+			}).start();
+			// 将状态改为true
+			waitingFlushCache.put(tableName, true);
+		}
+	}
 }

+ 1 - 2
search-console-b2b/src/main/java/com/uas/search/console/b2b/util/SearchUtils.java

@@ -51,7 +51,7 @@ public class SearchUtils {
 	/**
 	 * IndexSearcher的管理器
 	 */
-	private static IndexSearcherManager searcherManager = new IndexSearcherManager();
+	private static IndexSearcherManager searcherManager = IndexSearcherManager.getInstance();
 
 	/**
 	 * 判断搜索词是否为无效的(比如只包含特殊字符,不含有任何字母、数字、汉字等有意义的字符)
@@ -147,7 +147,6 @@ public class SearchUtils {
 	 * @return IndexSearcher对象
 	 */
 	public static IndexSearcher getIndexSearcher(Table_name tableName) {
-		searcherManager.maybeReopen(tableName);
 		IndexSearcher indexSearcher = searcherManager.get(tableName);
 		if (indexSearcher == null) {
 			throw new SearchException("获取索引文件失败");

+ 6 - 0
search-console/src/main/java/com/uas/search/console/service/impl/IndexServiceImpl.java

@@ -48,6 +48,7 @@ import com.uas.search.console.model.ParsedQueueMessage;
 import com.uas.search.console.model.PurchaseInvoiceSimpleInfo;
 import com.uas.search.console.model.PurchaseSimpleInfo;
 import com.uas.search.console.service.IndexService;
+import com.uas.search.console.support.IndexSearcherManager;
 import com.uas.search.console.support.IndexWriterManager;
 import com.uas.search.console.util.ObjectToDocumentUtils;
 import com.uas.search.console.util.SearchConstants;
@@ -101,6 +102,8 @@ public class IndexServiceImpl implements IndexService {
 	@Autowired
 	private LuceneProperties luceneProperties;
 
+	private IndexSearcherManager indexSearcherManager = IndexSearcherManager.getInstance();
+
 	/**
 	 * 是否正在创建索引
 	 */
@@ -448,6 +451,7 @@ public class IndexServiceImpl implements IndexService {
 					indexWriter = indexWriterManager.get(tableName);
 					indexWriter.addDocument(document);
 					indexWriter.commit();
+					indexSearcherManager.flushCache(tableName, null);
 					return obj;
 				} catch (IOException | InterruptedException e) {
 					logger.error("", e);
@@ -495,6 +499,7 @@ public class IndexServiceImpl implements IndexService {
 						throw new SearchException("Message parsing failed!");
 					}
 					indexWriter.commit();
+					indexSearcherManager.flushCache(tableName, null);
 					return obj;
 				} catch (IOException | InterruptedException e) {
 					logger.error("", e);
@@ -539,6 +544,7 @@ public class IndexServiceImpl implements IndexService {
 					throw new SearchException("Message parsing failed!");
 				}
 				indexWriter.commit();
+				indexSearcherManager.flushCache(tableName, null);
 				return obj;
 			} catch (IOException e) {
 				logger.error("", e);

+ 74 - 24
search-console/src/main/java/com/uas/search/console/support/IndexSearcherManager.java

@@ -9,7 +9,6 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.lucene.index.DirectoryReader;
-import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.lucene.store.NIOFSDirectory;
@@ -26,6 +25,8 @@ import com.uas.search.console.util.SearchUtils;
  */
 public class IndexSearcherManager {
 
+	private static volatile IndexSearcherManager instance;
+
 	/**
 	 * 用于存放指定表的索引IndexSearcher
 	 */
@@ -36,15 +37,31 @@ public class IndexSearcherManager {
 	 */
 	private Map<String, Boolean> reopening = new ConcurrentHashMap<>();
 
+	/**
+	 * 标识在后台是否存在线程在等待刷新缓存中的索引
+	 */
+	private Map<String, Boolean> waitingFlushCache = new ConcurrentHashMap<>();
+
 	private static Logger logger = LoggerFactory.getLogger(IndexSearcherManager.class);
 
-	public IndexSearcherManager() {
+	private IndexSearcherManager() {
 		List<String> tableNames = SearchUtils.getTableNames();
 		for (String tableName : tableNames) {
 			reopening.put(tableName, false);
 		}
 	}
 
+	public static IndexSearcherManager getInstance() {
+		if (instance == null) {
+			synchronized (IndexSearcherManager.class) {
+				if (instance == null) {
+					instance = new IndexSearcherManager();
+				}
+			}
+		}
+		return instance;
+	}
+
 	/**
 	 * 对指定表的IndexSearcher进行初始化操作
 	 * 
@@ -52,21 +69,21 @@ public class IndexSearcherManager {
 	 *            表名
 	 * @return 指定表的FSDirectory
 	 */
-	private synchronized FSDirectory warm(String tableName) {
-		String indexPath = SearchUtils.getIndexPath(tableName);
-		FSDirectory directory = null;
+	private synchronized void warm(String tableName) {
 		IndexSearcher indexSearcher = indexSearchers.get(tableName);
-		// 本来NIOFSDirectory速度更快,但因为jre的bug,在Windows平台上,其性能很差
-		try {
-			if (System.getProperty("os.name").toLowerCase().startsWith("win")) {
-				directory = FSDirectory.open(Paths.get(indexPath));
-			} else {
-				directory = NIOFSDirectory.open(Paths.get(indexPath));
-			}
+		// 不为空的话说明索引已成功加载
+		if (indexSearcher == null) {
+			String indexPath = SearchUtils.getIndexPath(tableName);
+			FSDirectory directory = null;
+			// 本来NIOFSDirectory速度更快,但因为jre的bug,在Windows平台上,其性能很差
+			try {
+				if (System.getProperty("os.name").toLowerCase().startsWith("win")) {
+					directory = FSDirectory.open(Paths.get(indexPath));
+				} else {
+					directory = NIOFSDirectory.open(Paths.get(indexPath));
+				}
 
-			File[] files = directory.getDirectory().toFile().listFiles();
-			// 不为空的话说明索引已成功加载
-			if (indexSearcher == null) {
+				File[] files = directory.getDirectory().toFile().listFiles();
 				// 路径不为空文件夹
 				if (files.length != 0) {
 					try {
@@ -78,11 +95,10 @@ public class IndexSearcherManager {
 				} else {
 					logger.error("索引文件不存在:" + tableName);
 				}
+			} catch (IOException e) {
+				logger.error("索引路径打开失败:" + indexPath, e);
 			}
-		} catch (IOException e) {
-			logger.error("索引路径打开失败:" + indexPath, e);
 		}
-		return directory;
 	}
 
 	/**
@@ -98,18 +114,15 @@ public class IndexSearcherManager {
 		startReopen(tableName);
 
 		try {
-			// 每次都要进行初始化处理,是为了防止加载索引时,索引为空,而后来索引不为空时,却不能够正确加载
-			FSDirectory directory = warm(tableName);
 			IndexSearcher currentSearcher = indexSearchers.get(tableName);
 			if (currentSearcher == null) {
 				return;
 			}
 			IndexSearcher searcher = get(tableName);
-			Long currentVersion = ((DirectoryReader) searcher.getIndexReader()).getVersion();
-			// 版本号不一致(本地索引有更改),更新IndexReader
 			try {
-				if (DirectoryReader.open(directory).getVersion() != currentVersion) {
-					IndexReader newReader = DirectoryReader.open(directory);
+				// 本地索引有更改,更新IndexReader
+				DirectoryReader newReader = DirectoryReader.openIfChanged((DirectoryReader) searcher.getIndexReader());
+				if (newReader != null) {
 					IndexSearcher newSearcher = new IndexSearcher(newReader);
 					updateIndexSearcher(tableName, newSearcher);
 				}
@@ -150,6 +163,7 @@ public class IndexSearcherManager {
 		if (StringUtils.isEmpty(tableName)) {
 			return null;
 		}
+		warm(tableName);
 		IndexSearcher currentSearcher = indexSearchers.get(tableName);
 		if (currentSearcher != null) {
 			currentSearcher.getIndexReader().incRef();
@@ -186,4 +200,40 @@ public class IndexSearcherManager {
 		release(currentSearcher);
 		indexSearchers.put(tableName, newIndexSearcher);
 	}
+
+	/**
+	 * 刷新缓存中的索引
+	 * 
+	 * @param tableName
+	 *            表名
+	 * @param delay
+	 *            延迟一定时间刷新/ms,可为空,默认延迟3000ms
+	 */
+	public void flushCache(final String tableName, final Long delay) {
+		if (StringUtils.isEmpty(tableName)) {
+			return;
+		}
+		Boolean waiting = waitingFlushCache.get(tableName);
+		// 有等待的线程,则不增加线程
+		if (waiting == null || !waiting.booleanValue()) {
+			final long delayCopy = (delay == null || delay.longValue() < 0) ? 3000 : delay.longValue();
+			new Thread(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						// 延迟一定时间再刷新缓存
+						Thread.sleep(delayCopy);
+						maybeReopen(tableName);
+					} catch (InterruptedException e) {
+						logger.error("", e);
+					} finally {
+						// 线程执行完毕,无论是否成功,均将状态改为false
+						waitingFlushCache.put(tableName, false);
+					}
+				}
+			}).start();
+			// 将状态改为true
+			waitingFlushCache.put(tableName, true);
+		}
+	}
 }

+ 1 - 2
search-console/src/main/java/com/uas/search/console/util/SearchUtils.java

@@ -49,7 +49,7 @@ public class SearchUtils {
 	/**
 	 * IndexSearcher的管理器
 	 */
-	private static IndexSearcherManager searcherManager = new IndexSearcherManager();
+	private static IndexSearcherManager searcherManager = IndexSearcherManager.getInstance();
 
 	private static LuceneProperties luceneProperties = ContextUtils.getBean(LuceneProperties.class);
 
@@ -139,7 +139,6 @@ public class SearchUtils {
 	 * @return IndexSearcher对象
 	 */
 	public static IndexSearcher getIndexSearcher(String tableName) {
-		searcherManager.maybeReopen(tableName);
 		IndexSearcher indexSearcher = searcherManager.get(tableName);
 		if (indexSearcher == null) {
 			throw new SearchException("获取索引文件失败");