Просмотр исходного кода

异步处理询价单作废

git-svn-id: svn+ssh://10.10.101.21/source/platform/platform-b2b@8136 f3bf4e98-0cf0-11e4-a00c-a99a8b9d557d
yingp 9 лет назад
Родитель
Сommit
a85231377b

+ 276 - 0
src/main/java/com/uas/platform/b2b/core/util/ThreadUtils.java

@@ -0,0 +1,276 @@
+package com.uas.platform.b2b.core.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.springframework.util.CollectionUtils;
+
+import com.uas.platform.core.concurrent.ICallable;
+import com.uas.platform.core.concurrent.ICallableAdapter;
+import com.uas.platform.core.concurrent.IRunnable;
+import com.uas.platform.core.concurrent.IRunnableAdapter;
+import com.uas.platform.core.concurrent.Listener;
+import com.uas.platform.core.concurrent.Listeners;
+
+public class ThreadUtils {
+
+	public static <T> Runner<T> tasks(IRunnable<T> runnable, List<T> paramList) {
+		return new Runner<T>(runnable).tasks(paramList);
+	}
+
+	public static <T> void run(IRunnable<T> runnable, List<T> paramList) {
+		tasks(runnable, paramList).run();
+	}
+
+	public static <T> Runner<T> pool(IRunnable<T> runnable, int poolSize) {
+		return new Runner<T>(runnable, poolSize);
+	}
+
+	public static <V, T> Caller<V, T> tasks(ICallable<V, T> callable, List<T> paramList) {
+		return new Caller<V, T>(callable).tasks(paramList);
+	}
+
+	public static <V, T> List<V> call(ICallable<V, T> callable, List<T> paramList) {
+		return tasks(callable, paramList).call();
+	}
+
+	public static <V, T> Caller<V, T> pool(ICallable<V, T> callable, int poolSize) {
+		return new Caller<V, T>(callable, poolSize);
+	}
+
+	public static Executor task(Runnable runnable) {
+		return new Executor().task(runnable);
+	}
+
+	public static Executor pool(int poolSize) {
+		return new Executor(poolSize);
+	}
+
+	/**
+	 * @param <T>
+	 *            参数类型
+	 */
+	public static class Runner<T> {
+		private final ExecutorService threadPool;
+		private IRunnable<T> runner;
+		private int timeout = 900;
+		private int taskCount = 0;
+
+		public Runner() {
+			this.threadPool = Executors.newCachedThreadPool();
+		}
+
+		public Runner(int poolSize) {
+			this.threadPool = Executors.newFixedThreadPool(poolSize);
+		}
+
+		public Runner(IRunnable<T> runner) {
+			this();
+			this.runner = runner;
+		}
+
+		public Runner(IRunnable<T> runner, int poolSize) {
+			this(poolSize);
+			this.runner = runner;
+		}
+
+		public Runner<T> setTimeout(int timeout) {
+			this.timeout = timeout;
+			return this;
+		}
+
+		public Runner<T> task(T param) {
+			threadPool.execute(new IRunnableAdapter<T>(runner, param));
+			taskCount++;
+			return this;
+		}
+
+		public Runner<T> tasks(List<T> params) {
+			if (!CollectionUtils.isEmpty(params)) {
+				for (T param : params)
+					task(param);
+			}
+			return this;
+		}
+
+		public void run() {
+			if (taskCount > 0) {
+				threadPool.shutdown();
+				try {
+					threadPool.awaitTermination(timeout, TimeUnit.SECONDS);
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+				}
+			}
+		}
+
+		public Runner<T> shutdownIf(Listeners closer) {
+			closer.addListener(new Listener() {
+
+				@Override
+				public void handler() {
+					threadPool.shutdownNow();
+				}
+			});
+			return this;
+		}
+
+	}
+
+	/**
+	 * @param <V>
+	 *            返回类型
+	 * @param <T>
+	 *            参数类型
+	 */
+	public static class Caller<V, T> {
+		private final ExecutorService threadPool;
+		private final CompletionService<V> service;
+		private ICallable<V, T> caller;
+		private int taskCount = 0;
+		private AtomicBoolean closed = new AtomicBoolean(false);
+
+		public Caller() {
+			this.threadPool = Executors.newCachedThreadPool();
+			this.service = new ExecutorCompletionService<V>(threadPool);
+		}
+
+		public Caller(int poolSize) {
+			this.threadPool = Executors.newFixedThreadPool(poolSize);
+			this.service = new ExecutorCompletionService<V>(threadPool);
+		}
+
+		public Caller(ICallable<V, T> caller) {
+			this();
+			this.caller = caller;
+		}
+
+		public Caller(ICallable<V, T> caller, int poolSize) {
+			this(poolSize);
+			this.caller = caller;
+		}
+
+		public Caller<V, T> task(T param) {
+			service.submit(new ICallableAdapter<V, T>(caller, param));
+			taskCount++;
+			return this;
+		}
+
+		public Caller<V, T> tasks(List<T> params) {
+			if (!CollectionUtils.isEmpty(params)) {
+				for (T param : params)
+					task(param);
+			}
+			return this;
+		}
+
+		public List<V> call() {
+			threadPool.shutdown();
+			int finish = 0;
+			List<V> results = new ArrayList<V>();
+			while (finish < taskCount && !closed.get()) {
+				Future<V> val = service.poll();
+				// 采用非阻塞方式
+				if (null != val) {
+					try {
+						results.add(val.get());
+					} catch (InterruptedException e) {
+						e.printStackTrace();
+					} catch (ExecutionException e) {
+						e.printStackTrace();
+					}
+					finish++;
+				}
+				try {
+					TimeUnit.MILLISECONDS.sleep(50);
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+				}
+			}
+			closed.set(true);
+			return results;
+		}
+
+		public Caller<V, T> shutdownIf(Listeners closer) {
+			closer.addListener(new Listener() {
+
+				@Override
+				public void handler() {
+					if (!closed.get()) {
+						threadPool.shutdownNow();
+						closed.set(true);
+					}
+				}
+			});
+			return this;
+		}
+
+	}
+
+	/**
+	 * 无参、无返回执行
+	 */
+	public static class Executor {
+		private final ExecutorService threadPool;
+		private int timeout = 900;// 15 min
+		private int taskCount = 0;
+
+		public Executor() {
+			this.threadPool = Executors.newCachedThreadPool();
+		}
+
+		public Executor(int poolSize) {
+			this.threadPool = Executors.newFixedThreadPool(poolSize);
+		}
+
+		public Executor setTimeout(int timeout) {
+			this.timeout = timeout;
+			return this;
+		}
+
+		public Executor task(Runnable runnable) {
+			threadPool.execute(runnable);
+			taskCount++;
+			return this;
+		}
+
+		public Executor tasks(List<Runnable> runnables) {
+			if (!CollectionUtils.isEmpty(runnables)) {
+				for (Runnable runnable : runnables)
+					task(runnable);
+			}
+			return this;
+		}
+
+		public void run() {
+			if (taskCount > 0) {
+				threadPool.shutdown();
+				try {
+					threadPool.awaitTermination(timeout, TimeUnit.SECONDS);
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+				}
+			}
+		}
+
+		public Executor shutdownIf(Listeners closer) {
+			closer.addListener(new Listener() {
+
+				@Override
+				public void handler() {
+					threadPool.shutdownNow();
+				}
+			});
+			return this;
+		}
+
+	}
+}

+ 21 - 12
src/main/java/com/uas/platform/b2b/service/impl/PurchaseInquiryServiceImpl.java

@@ -21,6 +21,7 @@ import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
 import org.springframework.util.StringUtils;
 
 
 import com.uas.platform.b2b.core.util.ContextUtils;
 import com.uas.platform.b2b.core.util.ContextUtils;
+import com.uas.platform.b2b.core.util.ThreadUtils;
 import com.uas.platform.b2b.dao.AttachDao;
 import com.uas.platform.b2b.dao.AttachDao;
 import com.uas.platform.b2b.dao.CommonDao;
 import com.uas.platform.b2b.dao.CommonDao;
 import com.uas.platform.b2b.dao.ProductDao;
 import com.uas.platform.b2b.dao.ProductDao;
@@ -50,6 +51,7 @@ import com.uas.platform.b2b.service.PurchaseInquiryService;
 import com.uas.platform.b2b.support.SystemSession;
 import com.uas.platform.b2b.support.SystemSession;
 import com.uas.platform.b2b.support.XingePusher;
 import com.uas.platform.b2b.support.XingePusher;
 import com.uas.platform.b2b.temporary.model.InquiryMessage;
 import com.uas.platform.b2b.temporary.model.InquiryMessage;
+import com.uas.platform.core.concurrent.IRunnable;
 import com.uas.platform.core.exception.IllegalOperatorException;
 import com.uas.platform.core.exception.IllegalOperatorException;
 import com.uas.platform.core.exception.IllegalStatusException;
 import com.uas.platform.core.exception.IllegalStatusException;
 import com.uas.platform.core.model.Constant;
 import com.uas.platform.core.model.Constant;
@@ -392,20 +394,27 @@ public class PurchaseInquiryServiceImpl implements PurchaseInquiryService {
 
 
 	@Override
 	@Override
 	public void onReplyInvalid(List<Inquiry> inquiries) {
 	public void onReplyInvalid(List<Inquiry> inquiries) {
-		Long enuu = SystemSession.getUser().getEnterprise().getUu();
-		List<PurchaseInquiryItem> newInquiryItems = new ArrayList<>();
-		for (Inquiry inquiry : inquiries) {
-			List<PurchaseInquiry> purchaseInquiries = purchaseInquiryDao.findByEnUUAndSourceId(enuu, inquiry.getIn_id());
-			if (!CollectionUtils.isEmpty(purchaseInquiries)) {
-				PurchaseInquiry purchaseInquiry = purchaseInquiries.get(0);
-				for (PurchaseInquiryItem item : purchaseInquiry.getInquiryItems()) {
-					item.setStatus((short) Status.DISABLED.value());
-					newInquiryItems.add(item);
+		final Long enuu = SystemSession.getUser().getEnterprise().getUu();
+		final List<PurchaseInquiryItem> newInquiryItems = new ArrayList<>();
+		ThreadUtils.tasks(new IRunnable<Inquiry>() {
+
+			@Override
+			public void run(Inquiry inquiry) {
+				List<PurchaseInquiry> purchaseInquiries = purchaseInquiryDao.findByEnUUAndSourceId(enuu, inquiry.getIn_id());
+				if (!CollectionUtils.isEmpty(purchaseInquiries)) {
+					PurchaseInquiry purchaseInquiry = purchaseInquiries.get(0);
+					for (PurchaseInquiryItem item : purchaseInquiry.getInquiryItems()) {
+						item.setStatus((short) Status.DISABLED.value());
+						synchronized (newInquiryItems) {
+							newInquiryItems.add(item);
+						}
+					}
 				}
 				}
 			}
 			}
-		}
-		newInquiryItems = purchaseInquiryItemDao.save(newInquiryItems);
+
+		}, inquiries).run();
+		List<PurchaseInquiryItem> savedInquiryItems = purchaseInquiryItemDao.save(newInquiryItems);
 		// 触发消息事件
 		// 触发消息事件
-		ContextUtils.publishEvent(new PurchaseInquiryItemInvalidReleaseEvent(PurchaseInquiryItem.distinct(newInquiryItems)));
+		ContextUtils.publishEvent(new PurchaseInquiryItemInvalidReleaseEvent(PurchaseInquiryItem.distinct(savedInquiryItems)));
 	}
 	}
 }
 }