|
|
@@ -1,202 +1,202 @@
|
|
|
-package com.uas.search.console.b2b.jms;
|
|
|
-
|
|
|
-import com.alibaba.fastjson.JSONObject;
|
|
|
-import com.uas.search.b2b.exception.SearchException;
|
|
|
-import com.uas.search.b2b.model.SPage;
|
|
|
-import com.uas.search.console.b2b.util.JSONUtils;
|
|
|
-import com.uas.search.console.b2b.util.SearchConstants;
|
|
|
-import com.uas.search.util.StringUtils;
|
|
|
-import oracle.sql.STRUCT;
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
-
|
|
|
-import javax.sql.DataSource;
|
|
|
-import java.sql.*;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.List;
|
|
|
-
|
|
|
-/**
|
|
|
- * @author sunyj
|
|
|
- * @since 2016年12月1日 下午5:35:24
|
|
|
- */
|
|
|
-public class LuceneQueueMessageDao {
|
|
|
-
|
|
|
- /**
|
|
|
- * 默认的页码
|
|
|
- */
|
|
|
- private static final int PAGE_INDEX = 1;
|
|
|
-
|
|
|
- /**
|
|
|
- * 默认每页的大小
|
|
|
- */
|
|
|
- private static final int PAGE_SIZE = 20;
|
|
|
-
|
|
|
- private DataSource dataSource;
|
|
|
-
|
|
|
- private Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
-
|
|
|
- public LuceneQueueMessageDao(DataSource dataSource) {
|
|
|
- this.dataSource = dataSource;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取关于实时更新情况的详细信息
|
|
|
- *
|
|
|
- * @param page
|
|
|
- * 页码
|
|
|
- * @param size
|
|
|
- * 页大小
|
|
|
- * @param searchContent
|
|
|
- * 对消息内容进行搜索
|
|
|
- * @return 消息(实时更新情况的详细信息)
|
|
|
- */
|
|
|
- public SPage<LuceneQueueMessage> findAll(Integer page, Integer size, String searchContent)
|
|
|
- throws SQLRecoverableException {
|
|
|
- if (StringUtils.isEmpty(searchContent)) {
|
|
|
- searchContent = "";
|
|
|
- }
|
|
|
- SPage<LuceneQueueMessage> sPage = new SPage<>();
|
|
|
-
|
|
|
- // 先获取总数目
|
|
|
- String sql = "select count(*) from " + SearchConstants.LUCENE_QUEUE_TABLE_NAME
|
|
|
- + " t where upper(t.user_data.message) like upper('%" + searchContent + "%')";
|
|
|
- Connection connection = null;
|
|
|
- PreparedStatement preparedStatement = null;
|
|
|
- ResultSet resultSet = null;
|
|
|
-
|
|
|
- try {
|
|
|
- connection = dataSource.getConnection();
|
|
|
- preparedStatement = connection.prepareStatement(sql);
|
|
|
- resultSet = preparedStatement.executeQuery();
|
|
|
- resultSet.next();
|
|
|
- int totalElement = resultSet.getInt(1);
|
|
|
- sPage.setTotalElement(totalElement);
|
|
|
- // 总数目为0,返回
|
|
|
- if (totalElement == 0) {
|
|
|
- return sPage;
|
|
|
- }
|
|
|
- preparedStatement.close();
|
|
|
- resultSet.close();
|
|
|
-
|
|
|
- // 处理分页信息
|
|
|
- if (page == null || page <= 0) {
|
|
|
- page = PAGE_INDEX;
|
|
|
- }
|
|
|
- if (size == null || size <= 0) {
|
|
|
- size = PAGE_SIZE;
|
|
|
- }
|
|
|
-
|
|
|
- int totalPage = (int) Math.ceil(totalElement / (1.0 * size));
|
|
|
- sPage.setTotalPage(totalPage);
|
|
|
- // 如果页码过大
|
|
|
- if (page > totalPage) {
|
|
|
- page = totalPage;
|
|
|
- }
|
|
|
- int minRownum = 1 + (page - 1) * size;
|
|
|
- int maxRownum = page * size;
|
|
|
- sPage.setPage(page);
|
|
|
- sPage.setSize(size);
|
|
|
- if (page == 1) {
|
|
|
- sPage.setFirst(true);
|
|
|
- }
|
|
|
- if (page == totalPage) {
|
|
|
- sPage.setLast(true);
|
|
|
- }
|
|
|
-
|
|
|
- // 分页获取数据
|
|
|
- // enq_time为GMT时间,转为本地时间,需加上8小时
|
|
|
- sql = "select msgid, to_char(enq_time+8/24,'yyyy-mm-dd hh24:mi:ss'),user_data from (select tt.*,rownum r from (select * from "
|
|
|
- + SearchConstants.LUCENE_QUEUE_TABLE_NAME + " t where upper(t.user_data.message) like upper('%"
|
|
|
- + searchContent + "%') order by enq_time) tt where rownum<=?) ttt where ttt.r >=?";
|
|
|
- List<LuceneQueueMessage> luceneQueueMessages = new ArrayList<>();
|
|
|
- preparedStatement = connection.prepareStatement(sql);
|
|
|
- preparedStatement.setInt(1, maxRownum);
|
|
|
- preparedStatement.setInt(2, minRownum);
|
|
|
- resultSet = preparedStatement.executeQuery();
|
|
|
- while (resultSet.next()) {
|
|
|
- LuceneQueueMessage luceneQueueMessage = new LuceneQueueMessage();
|
|
|
- luceneQueueMessage.setMessageId(resultSet.getString(1));
|
|
|
- luceneQueueMessage.setModifyTime(resultSet.getString(2));
|
|
|
- STRUCT userData = (STRUCT) resultSet.getObject(3);
|
|
|
- Object[] attributes = userData.getAttributes();
|
|
|
- // 自定义消息格式 {"method":"value1","table":"value2","ids":"[1,2,3]"}
|
|
|
- // 从队列消息中获取表的改动信息
|
|
|
- JSONObject jsonObject = JSONUtils.parseObject(attributes[0].toString());
|
|
|
- luceneQueueMessage.setTableName(jsonObject.getString("table"));
|
|
|
- luceneQueueMessage.setIds(jsonObject.getString("ids"));
|
|
|
- luceneQueueMessage.setData(JSONUtils.reverseFormat(jsonObject.getString("data")));
|
|
|
- luceneQueueMessage.setMethod(jsonObject.getString("method"));
|
|
|
- luceneQueueMessages.add(luceneQueueMessage);
|
|
|
- }
|
|
|
- sPage.setContent(luceneQueueMessages);
|
|
|
- } catch (Throwable e) {
|
|
|
- throw new SearchException(e).setDetailedMessage(e);
|
|
|
- } finally {
|
|
|
- if (resultSet != null) {
|
|
|
- try {
|
|
|
- resultSet.close();
|
|
|
- } catch (SQLException e) {
|
|
|
- logger.error("", e);
|
|
|
- }
|
|
|
- }
|
|
|
- if (preparedStatement != null) {
|
|
|
- try {
|
|
|
- preparedStatement.close();
|
|
|
- } catch (SQLException e) {
|
|
|
- logger.error("", e);
|
|
|
- }
|
|
|
- }
|
|
|
- if (connection != null) {
|
|
|
- try {
|
|
|
- connection.close();
|
|
|
- } catch (SQLException e) {
|
|
|
- logger.error("", e);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return sPage;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 消息队列出队消息
|
|
|
- *
|
|
|
- * @param messageId
|
|
|
- * 所要出队的消息的id
|
|
|
- * @return 是否出队成功
|
|
|
- */
|
|
|
- public boolean dequeueLuceneQueueMessage(String messageId) throws SQLRecoverableException {
|
|
|
- if (StringUtils.isEmpty(messageId)) {
|
|
|
- return false;
|
|
|
- } // 通过dbms_aq.dequeue出队,无法指定所出队的消息,所以直接通过删除表的数据间接达到出队的效果
|
|
|
- String sql = "delete from " + SearchConstants.LUCENE_QUEUE_TABLE_NAME + " where msgid=?";
|
|
|
- Connection connection = null;
|
|
|
- PreparedStatement preparedStatement = null;
|
|
|
-
|
|
|
- try {
|
|
|
- connection = dataSource.getConnection();
|
|
|
- preparedStatement = connection.prepareStatement(sql);
|
|
|
- preparedStatement.setString(1, messageId);
|
|
|
- int result = preparedStatement.executeUpdate();
|
|
|
- connection.commit();
|
|
|
- return result > 0;
|
|
|
- } catch (SQLException e) {
|
|
|
- throw new SearchException("出队消息失败:msgid=" + messageId).setDetailedMessage(e);
|
|
|
- } finally {
|
|
|
- if (preparedStatement != null) {
|
|
|
- try {
|
|
|
- preparedStatement.close();
|
|
|
- } catch (SQLException e) {
|
|
|
- logger.error("", e);
|
|
|
- }
|
|
|
- }
|
|
|
- if (connection != null) {
|
|
|
- try {
|
|
|
- connection.close();
|
|
|
- } catch (SQLException e) {
|
|
|
- logger.error("", e);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-}
|
|
|
+package com.uas.search.console.b2b.jms;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.uas.search.b2b.exception.SearchException;
|
|
|
+import com.uas.search.b2b.model.SPage;
|
|
|
+import com.uas.search.console.b2b.util.JSONUtils;
|
|
|
+import com.uas.search.console.b2b.util.SearchConstants;
|
|
|
+import com.uas.search.util.StringUtils;
|
|
|
+import oracle.sql.STRUCT;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import javax.sql.DataSource;
|
|
|
+import java.sql.*;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author sunyj
|
|
|
+ * @since 2016年12月1日 下午5:35:24
|
|
|
+ */
|
|
|
+public class LuceneMessageDao {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 默认的页码
|
|
|
+ */
|
|
|
+ private static final int PAGE_INDEX = 1;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 默认每页的大小
|
|
|
+ */
|
|
|
+ private static final int PAGE_SIZE = 20;
|
|
|
+
|
|
|
+ private DataSource dataSource;
|
|
|
+
|
|
|
+ private Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
+
|
|
|
+ public LuceneMessageDao(DataSource dataSource) {
|
|
|
+ this.dataSource = dataSource;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取关于实时更新情况的详细信息
|
|
|
+ *
|
|
|
+ * @param page
|
|
|
+ * 页码
|
|
|
+ * @param size
|
|
|
+ * 页大小
|
|
|
+ * @param searchContent
|
|
|
+ * 对消息内容进行搜索
|
|
|
+ * @return 消息(实时更新情况的详细信息)
|
|
|
+ */
|
|
|
+ public SPage<LuceneMessage> findAll(Integer page, Integer size, String searchContent)
|
|
|
+ throws SQLRecoverableException {
|
|
|
+ if (StringUtils.isEmpty(searchContent)) {
|
|
|
+ searchContent = "";
|
|
|
+ }
|
|
|
+ SPage<LuceneMessage> sPage = new SPage<>();
|
|
|
+
|
|
|
+ // 先获取总数目
|
|
|
+ String sql = "select count(*) from " + SearchConstants.LUCENE_QUEUE_TABLE_NAME
|
|
|
+ + " t where upper(t.user_data.message) like upper('%" + searchContent + "%')";
|
|
|
+ Connection connection = null;
|
|
|
+ PreparedStatement preparedStatement = null;
|
|
|
+ ResultSet resultSet = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ connection = dataSource.getConnection();
|
|
|
+ preparedStatement = connection.prepareStatement(sql);
|
|
|
+ resultSet = preparedStatement.executeQuery();
|
|
|
+ resultSet.next();
|
|
|
+ int totalElement = resultSet.getInt(1);
|
|
|
+ sPage.setTotalElement(totalElement);
|
|
|
+ // 总数目为0,返回
|
|
|
+ if (totalElement == 0) {
|
|
|
+ return sPage;
|
|
|
+ }
|
|
|
+ preparedStatement.close();
|
|
|
+ resultSet.close();
|
|
|
+
|
|
|
+ // 处理分页信息
|
|
|
+ if (page == null || page <= 0) {
|
|
|
+ page = PAGE_INDEX;
|
|
|
+ }
|
|
|
+ if (size == null || size <= 0) {
|
|
|
+ size = PAGE_SIZE;
|
|
|
+ }
|
|
|
+
|
|
|
+ int totalPage = (int) Math.ceil(totalElement / (1.0 * size));
|
|
|
+ sPage.setTotalPage(totalPage);
|
|
|
+ // 如果页码过大
|
|
|
+ if (page > totalPage) {
|
|
|
+ page = totalPage;
|
|
|
+ }
|
|
|
+ int minRownum = 1 + (page - 1) * size;
|
|
|
+ int maxRownum = page * size;
|
|
|
+ sPage.setPage(page);
|
|
|
+ sPage.setSize(size);
|
|
|
+ if (page == 1) {
|
|
|
+ sPage.setFirst(true);
|
|
|
+ }
|
|
|
+ if (page == totalPage) {
|
|
|
+ sPage.setLast(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 分页获取数据
|
|
|
+ // enq_time为GMT时间,转为本地时间,需加上8小时
|
|
|
+ sql = "select msgid, to_char(enq_time+8/24,'yyyy-mm-dd hh24:mi:ss'),user_data from (select tt.*,rownum r from (select * from "
|
|
|
+ + SearchConstants.LUCENE_QUEUE_TABLE_NAME + " t where upper(t.user_data.message) like upper('%"
|
|
|
+ + searchContent + "%') order by enq_time) tt where rownum<=?) ttt where ttt.r >=?";
|
|
|
+ List<LuceneMessage> luceneMessages = new ArrayList<>();
|
|
|
+ preparedStatement = connection.prepareStatement(sql);
|
|
|
+ preparedStatement.setInt(1, maxRownum);
|
|
|
+ preparedStatement.setInt(2, minRownum);
|
|
|
+ resultSet = preparedStatement.executeQuery();
|
|
|
+ while (resultSet.next()) {
|
|
|
+ LuceneMessage luceneMessage = new LuceneMessage();
|
|
|
+ luceneMessage.setMessageId(resultSet.getString(1));
|
|
|
+ luceneMessage.setModifyTime(resultSet.getString(2));
|
|
|
+ STRUCT userData = (STRUCT) resultSet.getObject(3);
|
|
|
+ Object[] attributes = userData.getAttributes();
|
|
|
+ // 自定义消息格式 {"method":"value1","table":"value2","ids":"[1,2,3]"}
|
|
|
+ // 从队列消息中获取表的改动信息
|
|
|
+ JSONObject jsonObject = JSONUtils.parseObject(attributes[0].toString());
|
|
|
+ luceneMessage.setTableName(jsonObject.getString("table"));
|
|
|
+ luceneMessage.setIds(jsonObject.getString("ids"));
|
|
|
+ luceneMessage.setData(JSONUtils.reverseFormat(jsonObject.getString("data")));
|
|
|
+ luceneMessage.setMethod(jsonObject.getString("method"));
|
|
|
+ luceneMessages.add(luceneMessage);
|
|
|
+ }
|
|
|
+ sPage.setContent(luceneMessages);
|
|
|
+ } catch (Throwable e) {
|
|
|
+ throw new SearchException(e).setDetailedMessage(e);
|
|
|
+ } finally {
|
|
|
+ if (resultSet != null) {
|
|
|
+ try {
|
|
|
+ resultSet.close();
|
|
|
+ } catch (SQLException e) {
|
|
|
+ logger.error("", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (preparedStatement != null) {
|
|
|
+ try {
|
|
|
+ preparedStatement.close();
|
|
|
+ } catch (SQLException e) {
|
|
|
+ logger.error("", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (connection != null) {
|
|
|
+ try {
|
|
|
+ connection.close();
|
|
|
+ } catch (SQLException e) {
|
|
|
+ logger.error("", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return sPage;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 消息队列出队消息
|
|
|
+ *
|
|
|
+ * @param messageId
|
|
|
+ * 所要出队的消息的id
|
|
|
+ * @return 是否出队成功
|
|
|
+ */
|
|
|
+ public boolean dequeueLuceneQueueMessage(String messageId) throws SQLRecoverableException {
|
|
|
+ if (StringUtils.isEmpty(messageId)) {
|
|
|
+ return false;
|
|
|
+ } // 通过dbms_aq.dequeue出队,无法指定所出队的消息,所以直接通过删除表的数据间接达到出队的效果
|
|
|
+ String sql = "delete from " + SearchConstants.LUCENE_QUEUE_TABLE_NAME + " where msgid=?";
|
|
|
+ Connection connection = null;
|
|
|
+ PreparedStatement preparedStatement = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ connection = dataSource.getConnection();
|
|
|
+ preparedStatement = connection.prepareStatement(sql);
|
|
|
+ preparedStatement.setString(1, messageId);
|
|
|
+ int result = preparedStatement.executeUpdate();
|
|
|
+ connection.commit();
|
|
|
+ return result > 0;
|
|
|
+ } catch (SQLException e) {
|
|
|
+ throw new SearchException("出队消息失败:msgid=" + messageId).setDetailedMessage(e);
|
|
|
+ } finally {
|
|
|
+ if (preparedStatement != null) {
|
|
|
+ try {
|
|
|
+ preparedStatement.close();
|
|
|
+ } catch (SQLException e) {
|
|
|
+ logger.error("", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (connection != null) {
|
|
|
+ try {
|
|
|
+ connection.close();
|
|
|
+ } catch (SQLException e) {
|
|
|
+ logger.error("", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+}
|