Browse Source

使用TransmittableThreadLocal类,修改hystrix隔离线程池,处理Hystrix 线程隔离导致ThreadLocal数据丢失问题

guq 7 years ago
parent
commit
08af916f2a

+ 3 - 0
applications/document/document-server/src/main/resources/config/application-dev.yml

@@ -11,3 +11,6 @@ eureka:
     registryFetchIntervalSeconds: 5
     serviceUrl:
       defaultZone: http://${spring.security.user.name}:${spring.security.user.password}@127.0.0.1:8500/eureka/
+logging:
+  level:
+    com.usoftchina.saas.auth.client: DEBUG

+ 4 - 1
framework/core/pom.xml

@@ -65,7 +65,10 @@
         <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>transmittable-thread-local</artifactId>
-            <version>2.10.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>

+ 222 - 0
framework/core/src/main/java/com/netflix/hystrix/strategy/concurrency/HystrixContextScheduler.java

@@ -0,0 +1,222 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.netflix.hystrix.strategy.concurrency;
+
+import java.util.concurrent.*;
+
+import com.alibaba.ttl.TtlRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.*;
+import rx.functions.Action0;
+import rx.functions.Func0;
+import rx.internal.schedulers.ScheduledAction;
+import rx.subscriptions.*;
+
+import com.netflix.hystrix.HystrixThreadPool;
+import com.netflix.hystrix.strategy.HystrixPlugins;
+
+/**
+ * 处理Hystrix线程隔离导致ThreadLocal数据丢失问题
+ * @author: guq
+ * @create: 2019-01-10 11:05
+ */
+public class HystrixContextScheduler extends Scheduler {
+
+    private final HystrixConcurrencyStrategy concurrencyStrategy;
+    private final Scheduler actualScheduler;
+    private final HystrixThreadPool threadPool;
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(HystrixContextScheduler.class);
+
+    public HystrixContextScheduler(Scheduler scheduler) {
+        this.actualScheduler = scheduler;
+        this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
+        this.threadPool = null;
+    }
+
+    public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, Scheduler scheduler) {
+        this.actualScheduler = scheduler;
+        this.concurrencyStrategy = concurrencyStrategy;
+        this.threadPool = null;
+    }
+
+    public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool) {
+        this(concurrencyStrategy, threadPool, new Func0<Boolean>() {
+            @Override
+            public Boolean call() {
+                return true;
+            }
+        });
+    }
+
+    public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
+        this.concurrencyStrategy = concurrencyStrategy;
+        this.threadPool = threadPool;
+        this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
+    }
+
+    @Override
+    public Worker createWorker() {
+        return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
+    }
+
+    private class HystrixContextSchedulerWorker extends Worker {
+
+        private final Worker worker;
+
+        private HystrixContextSchedulerWorker(Worker actualWorker) {
+            this.worker = actualWorker;
+        }
+
+        @Override
+        public void unsubscribe() {
+            worker.unsubscribe();
+        }
+
+        @Override
+        public boolean isUnsubscribed() {
+            return worker.isUnsubscribed();
+        }
+
+        @Override
+        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
+            if (threadPool != null) {
+                if (!threadPool.isQueueSpaceAvailable()) {
+                    throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
+                }
+            }
+            return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
+        }
+
+        @Override
+        public Subscription schedule(Action0 action) {
+            if (threadPool != null) {
+                if (!threadPool.isQueueSpaceAvailable()) {
+                    throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
+                }
+            }
+            return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
+        }
+
+    }
+
+    private static class ThreadPoolScheduler extends Scheduler {
+
+        private final HystrixThreadPool threadPool;
+        private final Func0<Boolean> shouldInterruptThread;
+
+        public ThreadPoolScheduler(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
+            this.threadPool = threadPool;
+            this.shouldInterruptThread = shouldInterruptThread;
+        }
+
+        @Override
+        public Worker createWorker() {
+            return new ThreadPoolWorker(threadPool, shouldInterruptThread);
+        }
+
+    }
+
+    /**
+     * Purely for scheduling work on a thread-pool.
+     * <p>
+     * This is not natively supported by RxJava as of 0.18.0 because thread-pools
+     * are contrary to sequential execution.
+     * <p>
+     * For the Hystrix case, each Command invocation has a single action so the concurrency
+     * issue is not a problem.
+     */
+    private static class ThreadPoolWorker extends Worker {
+
+        private final HystrixThreadPool threadPool;
+        private final CompositeSubscription subscription = new CompositeSubscription();
+        private final Func0<Boolean> shouldInterruptThread;
+
+        public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
+            this.threadPool = threadPool;
+            this.shouldInterruptThread = shouldInterruptThread;
+        }
+
+        @Override
+        public void unsubscribe() {
+            subscription.unsubscribe();
+        }
+
+        @Override
+        public boolean isUnsubscribed() {
+            return subscription.isUnsubscribed();
+        }
+
+        @Override
+        public Subscription schedule(final Action0 action) {
+            if (subscription.isUnsubscribed()) {
+                // don't schedule, we are unsubscribed
+                return Subscriptions.unsubscribed();
+            }
+
+            // This is internal RxJava API but it is too useful.
+            ScheduledAction sa = new ScheduledAction(action);
+
+            subscription.add(sa);
+            sa.addParent(subscription);
+
+            ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
+            LOGGER.info("创建新线程...............");
+            Runnable runnable = TtlRunnable.get(sa);
+            FutureTask<?> f = (FutureTask<?>) executor.submit(runnable);
+            sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
+
+            return sa;
+        }
+
+        @Override
+        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
+            throw new IllegalStateException("Hystrix does not support delayed scheduling");
+        }
+    }
+
+    /**
+     * Very similar to rx.internal.schedulers.ScheduledAction.FutureCompleter, but with configurable interrupt behavior
+     */
+    private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
+        private final FutureTask<?> f;
+        private final Func0<Boolean> shouldInterruptThread;
+        private final ThreadPoolExecutor executor;
+
+        private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
+            this.f = f;
+            this.shouldInterruptThread = shouldInterruptThread;
+            this.executor = executor;
+        }
+
+        @Override
+        public void unsubscribe() {
+            executor.remove(f);
+            if (shouldInterruptThread.call()) {
+                f.cancel(true);
+            } else {
+                f.cancel(false);
+            }
+        }
+
+        @Override
+        public boolean isUnsubscribed() {
+            return f.isCancelled();
+        }
+    }
+
+}

+ 6 - 0
pom.xml

@@ -45,6 +45,7 @@
         <fastdfs.client.version>1.26.3</fastdfs.client.version>
         <poi.version>3.17</poi.version>
         <zxing.version>3.3.0</zxing.version>
+        <threadloacl.version>2.10.2</threadloacl.version>
     </properties>
 
     <repositories>
@@ -394,6 +395,11 @@
                 <artifactId>javase</artifactId>
                 <version>${zxing.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.alibaba</groupId>
+                <artifactId>transmittable-thread-local</artifactId>
+                <version>${threadloacl.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>