Browse Source

删除旧的实时更新程序

sunyj 9 years ago
parent
commit
17451b9cc4

+ 0 - 129
search-console/src/main/java/com/uas/search/console/jms/AQListener.java

@@ -1,129 +0,0 @@
-package com.uas.search.console.jms;
-
-import java.sql.SQLException;
-import java.util.Properties;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.Session;
-
-import org.apache.commons.dbcp.BasicDataSource;
-import org.apache.log4j.Logger;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import com.alibaba.fastjson.JSONException;
-import com.uas.search.console.core.util.ContextUtils;
-import com.uas.search.console.model.ParsedQueueMessage;
-import com.uas.search.console.service.IndexService;
-import com.uas.search.console.util.SearchConstants;
-
-import oracle.jms.AQjmsAdtMessage;
-import oracle.jms.AQjmsFactory;
-import oracle.jms.AQjmsSession;
-
-/**
- * 对数据库的消息队列进行实时监听
- * 
- * @author sunyj
- * @since 2016年7月7日 下午8:49:26
- */
-@Service
-public class AQListener {
-
-	@Autowired
-	private IndexService indexService;
-
-	@Autowired
-	private QueueMessageParser queueMessageParser;
-
-	/**
-	 * 标志,判断是否已存在监听线程,防止重复开启监听线程
-	 */
-	private boolean listening = false;
-
-	Logger logger = Logger.getLogger(AQListener.class);
-
-	public void execute() {
-		if (listening) {
-			logger.warn("已存在线程正在实时监听!");
-			return;
-		}
-		listening = true;
-
-		logger.info("AQListener started...\n");
-		BasicDataSource dataSource = ContextUtils.getApplicationContext().getBean("dataSource",
-				org.apache.commons.dbcp.BasicDataSource.class);
-
-		try {
-			QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(dataSource.getUrl(),
-					new Properties());
-			QueueConnection connection = queueConnectionFactory.createQueueConnection(dataSource.getUsername(),
-					dataSource.getPassword());
-			AQjmsSession session = (AQjmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-			connection.start();
-
-			Queue queue = session.getQueue(dataSource.getUsername(), SearchConstants.LUCENE_QUEUE_NAME);
-			MessageConsumer consumer = session.createConsumer(queue, null, QueueMessageTypeFactory.getFactory(), null,
-					false);
-
-			// 添加监听器,队列中一旦有消息入队,就会接受该消息(并不是真的实时,一般会有10秒以内的延迟)
-			consumer.setMessageListener(new MessageListener() {
-				@Override
-				public void onMessage(Message message) {
-					AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;
-					try {
-						QueueMessageTypeFactory payload = (QueueMessageTypeFactory) adtMessage.getAdtPayload();
-						// 对出队的消息进行解析、处理
-						process(payload.getMessage());
-					} catch (JMSException e) {
-						e.printStackTrace();
-					} catch (SQLException e) {
-						e.printStackTrace();
-					}
-				}
-			});
-		} catch (JMSException e) {
-			e.printStackTrace();
-		}
-	}
-
-	/**
-	 * 对得到的队列消息进行解析,之后根据解析出来的对象,对lucene索引进行添加、更新或删除操作
-	 * 
-	 * @param message
-	 */
-	private void process(String message) {
-		ParsedQueueMessage parsedQueueMessage = null;
-		try {
-			parsedQueueMessage = queueMessageParser.parse(message);
-		} catch (JSONException e) {
-			e.printStackTrace();
-		}
-
-		if (parsedQueueMessage == null) {
-			logger.error("message parsing failed!");
-			return;
-		}
-
-		// 新增索引
-		if (parsedQueueMessage.isInsert()) {
-			indexService.save(parsedQueueMessage.getObject());
-		}
-		// 更新索引
-		else if (parsedQueueMessage.isUpdate()) {
-			indexService.update(parsedQueueMessage.getObject());
-		}
-		// 删除索引
-		else if (parsedQueueMessage.isDelete()) {
-			indexService.delete(parsedQueueMessage.getObject());
-		} else {
-			logger.error("message parsing failed!");
-		}
-	}
-}

+ 0 - 149
search-console/src/main/java/com/uas/search/console/jms/QueueMessageParser.java

@@ -1,149 +0,0 @@
-package com.uas.search.console.jms;
-
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import com.alibaba.fastjson.JSONException;
-import com.alibaba.fastjson.JSONObject;
-import com.uas.search.console.dao.BrandSimpleInfoDao;
-import com.uas.search.console.dao.ComponentSimpleInfoDao;
-import com.uas.search.console.dao.KindSimpleInfoDao;
-import com.uas.search.console.model.BrandSimpleInfo;
-import com.uas.search.console.model.ComponentSimpleInfo;
-import com.uas.search.console.model.KindSimpleInfo;
-import com.uas.search.console.model.ParsedQueueMessage;
-import com.uas.search.console.util.SearchConstants;
-
-/**
- * 对得到的队列消息进行解析的工具
- * 
- * @author sunyj
- * @since 2016年7月7日 下午6:14:03
- */
-@Service
-public class QueueMessageParser {
-
-	@Autowired
-	private KindSimpleInfoDao kindDao;
-
-	@Autowired
-	private BrandSimpleInfoDao brandDao;
-
-	@Autowired
-	private ComponentSimpleInfoDao componentDao;
-
-	/**
-	 * 对得到的json消息进行解析
-	 * 
-	 * @param message
-	 * @return ParsedQueueMessage对象
-	 * @throws JSONException
-	 */
-	// {"method":"value1","table":"value2","param1":"value3","param2":"value4"}
-	public ParsedQueueMessage parse(String message) throws JSONException {
-		if (StringUtils.isEmpty(message) || message.equals("{}")) {
-			return null;
-		}
-		ParsedQueueMessage parsedQueueMessage = new ParsedQueueMessage();
-		JSONObject jsonObject = JSONObject.parseObject(message);
-
-		// 解析数据库表的更改类型
-		String method = jsonObject.getString("method");
-		if (method.equals("insert")) {
-			parsedQueueMessage.setMethodType(ParsedQueueMessage.INSERT);
-		}
-
-		else if (method.equals("update")) {
-			parsedQueueMessage.setMethodType(ParsedQueueMessage.UPDATE);
-		}
-
-		else if (method.equals("delete")) {
-			parsedQueueMessage.setMethodType(ParsedQueueMessage.DELETE);
-		} else {
-			return null;
-		}
-
-		// 解析哪个表有更改
-		Object object = null;
-		String table = jsonObject.getString("table");
-		if (table.equals(SearchConstants.KIND_TABLE_NAME)) {
-			object = parseKind(jsonObject);
-		} else if (table.equals(SearchConstants.BRAND_TABLE_NAME)) {
-			object = parseBrand(jsonObject);
-		} else if (table.equals(SearchConstants.COMPONENT_TABLE_NAME)) {
-			object = parseComponent(jsonObject);
-		} else {
-			return null;
-		}
-
-		if (object == null) {
-			return null;
-		}
-
-		parsedQueueMessage.setObject(object);
-		return parsedQueueMessage;
-	}
-
-	/**
-	 * 对kind类目进行解析
-	 * 
-	 * @param jsonObject
-	 * @return kind类目对象
-	 * @throws JSONException
-	 */
-	// {"method":"value1","table":"product$kind","ki_id":5}
-	private KindSimpleInfo parseKind(JSONObject jsonObject) throws JSONException {
-		KindSimpleInfo kind = new KindSimpleInfo();
-		Long kindid = jsonObject.getLong("ki_id");
-		kind.setId(kindid);
-
-		KindSimpleInfo temp = kindDao.findById(kindid);
-		// 如果更改是删除的话,根据id获取到的对象为null
-		if (temp != null) {
-			kind = temp;
-		}
-		return kind;
-	}
-
-	/**
-	 * 对brand品牌进行解析
-	 * 
-	 * @param jsonObject
-	 * @return brand品牌对象
-	 * @throws JSONException
-	 */
-	// {"method":"value1","table":"product$brand","br_id":60}
-	private BrandSimpleInfo parseBrand(JSONObject jsonObject) throws JSONException {
-		BrandSimpleInfo brand = new BrandSimpleInfo();
-		Long brandid = jsonObject.getLong("br_id");
-		brand.setId(brandid);
-
-		BrandSimpleInfo temp = brandDao.findById(brandid);
-		if (temp != null) {
-			brand = temp;
-		}
-		return brand;
-	}
-
-	/**
-	 * 对component器件进行解析
-	 * 
-	 * @param jsonObject
-	 * @return component器件对象
-	 * @throws JSONException
-	 */
-	// {"method":"value1","table":"product$component","cmp_id":2029}
-	private ComponentSimpleInfo parseComponent(JSONObject jsonObject) throws JSONException {
-		ComponentSimpleInfo component = new ComponentSimpleInfo();
-		Long componentid = jsonObject.getLong("cmp_id");
-		component.setId(componentid);
-
-		ComponentSimpleInfo temp = componentDao.findById(componentid);
-		if (temp != null) {
-			component = temp;
-		}
-		return component;
-	}
-
-}

+ 0 - 58
search-console/src/main/java/com/uas/search/console/jms/QueueMessageTypeFactory.java

@@ -1,58 +0,0 @@
-package com.uas.search.console.jms;
-
-import java.sql.SQLException;
-
-import oracle.jdbc.OracleTypes;
-import oracle.jdbc.driver.OracleConnection;
-import oracle.jpub.runtime.MutableStruct;
-import oracle.sql.CustomDatum;
-import oracle.sql.CustomDatumFactory;
-import oracle.sql.Datum;
-import oracle.sql.STRUCT;
-
-/**
- * 对数据库的数据格式进行转换
- * 
- * @author sunyj
- * @since 2016年7月7日 下午8:49:44
- */
-@SuppressWarnings("deprecation")
-public class QueueMessageTypeFactory implements CustomDatum, CustomDatumFactory {
-
-	public static final int SQL_TYPECODE = OracleTypes.STRUCT;
-
-	MutableStruct struct;
-
-	// 12表示字符串
-	static int[] sqlType = { 12 };
-	static CustomDatumFactory[] factory = new CustomDatumFactory[1];
-	static final QueueMessageTypeFactory messageFactory = new QueueMessageTypeFactory();
-
-	public QueueMessageTypeFactory() {
-		struct = new MutableStruct(new Object[1], sqlType, factory);
-	}
-
-	public static CustomDatumFactory getFactory() {
-		return messageFactory;
-	}
-
-	@Override
-	public CustomDatum create(Datum datum, int sqlType) throws SQLException {
-		if (datum == null) {
-			return null;
-		}
-		QueueMessageTypeFactory queueMessageType = new QueueMessageTypeFactory();
-		queueMessageType.struct = new MutableStruct((STRUCT) datum, QueueMessageTypeFactory.sqlType, factory);
-		return queueMessageType;
-	}
-
-	@Override
-	public Datum toDatum(OracleConnection connection) throws SQLException {
-		return struct.toDatum(connection, "QueueMessageTypeFactory");
-	}
-
-	public String getMessage() throws SQLException {
-		return (String) struct.getAttribute(0);
-	}
-
-}

+ 0 - 84
search-console/src/main/java/com/uas/search/console/model/ParsedQueueMessage.java

@@ -1,84 +0,0 @@
-package com.uas.search.console.model;
-
-/**
- * 对数据库队列里的消息进行解析后所得到的数据
- * 
- * @author sunyj
- * @since 2016年7月7日 下午8:50:13
- */
-public class ParsedQueueMessage {
-
-	/**
-	 * 数据库中表的改动为insert类型
-	 */
-	public static final int INSERT = 1;
-
-	/**
-	 * 改动为update类型
-	 */
-	public static final int UPDATE = 2;
-
-	/**
-	 * 改动为delete类型
-	 */
-	public static final int DELETE = 3;
-
-	/**
-	 * 表改动后,解析消息时,更改的类型
-	 */
-	private int methodType;
-
-	/**
-	 * 存放解析出来的对象:kind、brand或component对象
-	 */
-	private Object object;
-
-	/**
-	 * 是否为insert类型
-	 * 
-	 * @return
-	 */
-	public boolean isInsert() {
-		return methodType == INSERT;
-	}
-
-	/**
-	 * 是否为update类型
-	 * 
-	 * @return
-	 */
-	public boolean isUpdate() {
-		return methodType == UPDATE;
-	}
-
-	/**
-	 * 是否为delete类型
-	 * 
-	 * @return
-	 */
-	public boolean isDelete() {
-		return methodType == DELETE;
-	}
-
-	public int getMethodType() {
-		return methodType;
-	}
-
-	public void setMethodType(int methodType) {
-		this.methodType = methodType;
-	}
-
-	public Object getObject() {
-		return object;
-	}
-
-	public void setObject(Object object) {
-		this.object = object;
-	}
-
-	@Override
-	public String toString() {
-		return "ParsedQueueMessage [methodType=" + methodType + ", object=" + object + "]";
-	}
-
-}