转载

[问题分析]HystrixTimeout后请求没有立即中断

1. 问题

背景

SpringCloud框架,没有特殊的实现。即,请求到达Zuul网关后,由Ribbon负载均衡到目标组件节点,由Hystrix转发请求。

关键配置

hystrix.command.default.execution.isolation.strategy=THREAD
    hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=10000

现象

一次调用中发现,请求过程超过10s后,后台已经打印HystrixTimeoutException且进入了自定义的FallbackProvider,前端仍然没有收到响应,直到请求链路处理完成,前端才返回FallbackProvider中返回的异常响应。

2. 分析

尝试一

看文档- Hystrix官方文档 ,THREAD隔离模式下是请求超时是会取消调用线程从而立即返回的,SEMAPHORE模式下会等待响应回来再判断是否超时。而上述配置的是所有的Route都默认是THREAD-线程隔离模式,遂认为配置没问题;

尝试二

跟踪源码,RxJava实现的,响应式编程不熟悉,初期调试时没头苍蝇一样到处打断点,看得不明所以(现在也是)。网上的资料大多是翻译上述文档,只知道是HystrixCommand.execute()发送请求,AbstractCommand.handleTimeoutViaFallback()触发FallbackProvider,中间超时如何处理没有说清;

尝试三

开启DEBUG级别日志,用日志跟踪一个请求的全部过程。发现打印的配置是 executionIsolationStrategy=SEMAPHORE !!!查阅SpringCloud相关资料,发现用Hystrix+Ribbon的时候,发送请求用的是HystrixCommand的AbstractRibbonCommand实现,而后者的部分配置会覆盖掉HystrixCommandProperties中的配置,其中就有隔离模式这项配置,用的是ZuulProperties中的默认值SEMAPHORE:

AbstractRibbonCommand

protected AbstractRibbonCommand(Setter setter, LBC client,
                                 RibbonCommandContext context,
                                 ZuulFallbackProvider fallbackProvider, IClientConfig config) {
        //将setter传到HystrixCommand的构造方法中                                 
        super(setter);
        this.client = client;
        this.context = context;
        this.zuulFallbackProvider = fallbackProvider;
        this.config = config;
    }

    //创建Setter
    protected static HystrixCommandProperties.Setter createSetter(IClientConfig config, String commandKey, ZuulProperties zuulProperties) {
        int hystrixTimeout = getHystrixTimeout(config, commandKey);
        return HystrixCommandProperties.Setter().withExecutionIsolationStrategy(
                //executionIsolationStrategy用的是ZuulProperties中的值
                zuulProperties.getRibbonIsolationStrategy()).withExecutionTimeoutInMilliseconds(hystrixTimeout);
    }

ZuulProperties

//默认是SEMAPHORE
    private ExecutionIsolationStrategy ribbonIsolationStrategy = SEMAPHORE;

HystrixCommandProperties

//最终setter作为参数builder传入
    protected HystrixCommandProperties(HystrixCommandKey key, HystrixCommandProperties.Setter builder, String propertyPrefix) {
        this.key = key;
        // 省略其它配置
        this.executionIsolationStrategy = getProperty(propertyPrefix, key, "execution.isolation.strategy", builder.getExecutionIsolationStrategy(), default_executionIsolationStrategy);

3. 解决方式

a. 指定commandKey的方式

hystrix.command.aService.execution.isolation.strategy=THREAD

b. 修改Zuul配置的方式,注意useSeparateThreadPools默认为false,此时所有组件共用一个commandKey=RibbinCommand的线程池

zuul.ribbonIsolationStrategy=THREAD指定ribbon的隔离模式
zuul.threadPool.useSeparateThreadPools=true每个commandKey一个线程池

4. Hystrix源码解析

在对RxJava有大概了解的准备下,梳理Hystrix关键请求流程如下:

a. 发送请求,进入Zuul过滤器RibbonRoutingFilter,通过工厂类创建AbstractRibbonCommand,调用其execute方法

protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
        Map<String, Object> info = this.helper.debug(context.getMethod(),
                context.getUri(), context.getHeaders(), context.getParams(),
                context.getRequestEntity());
        // 创建AbstractRibbonCommand
        RibbonCommand command = this.ribbonCommandFactory.create(context);
        try {
            // 调用execute方法
            ClientHttpResponse response = command.execute();
            this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
            return response;
        }
        catch (HystrixRuntimeException ex) {
            return handleException(info, ex);
        }

    }

b. 进入HystrixCommand.execute(),实际是调用Future.get()来立即获取异步方法HystrixCommand.queue()的结果

public R execute() {
        try {
        //queue方法返回的是Future
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

c. 通过AbstractCommand.toObservable()创建一个待订阅的被观察对象(即Observable),创建过程:

  • 没有缓存时进入applyHystrixSemantics方法
Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                // put in cache
                if (requestCacheEnabled && cacheKey != null) {
                    // wrap it for caching
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                    if (fromCache != null) {
                        // another thread beat us so we'll use the cached value instead
                        toCache.unsubscribe();
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    } else {
                        // we just created an ObservableCommand so we cast and return it
                        afterCache = toCache.toObservable();
                    }
                } else {
                    //没有缓存使用applyHystrixSemantics
                    afterCache = hystrixObservable;
                }
  • 获取到信号量后进入executeCommandAndObserve,在THREAD模式下executionSemaphore的实现是TryableSemaphoreNoOp,tryAcquire()默认返回true
// 获取信号量,THREAD模式下
            if (executionSemaphore.tryAcquire()) {
                try {
                    /* used to track userThreadExecutionTime */
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                return handleSemaphoreRejectionViaFallback();
            }
  • 对Observable进行一些装饰,触发事件、记录状态、异常处理,之后进入executeCommandWithSpecifiedIsolation
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                circuitBreaker.markNonSuccess();
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    //超时异常在此处理
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }

                    return handleFailureViaFallback(e);
                }
            }
        };
  • executeCommandWithSpecifiedIsolation中对不同的隔离模式进行了不同的处理,主要区别是THREAD模式下对Observable调用了subscribeOn方法,切换到threadPool.getScheduler中的线程执行
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
            // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
            return Observable.defer(new Func0<Observable<R>>() {
            ...
            }).doOnTerminate(new Action0() {
            ...
            }).doOnUnsubscribe(new Action0() {
            ...
            }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call() {
                    //原始HystrixCommand的状态为TIMED_OUT时
                    return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                }
            }));
        }
  • threadPool是AbstractCommand初始化时创建的HystrixThreadPool,默认实现是HystrixThreadPoolDefault,getScheduler方法动态更新配置并返回一个HystrixContextScheduler,参数shouldInterruptThread表示超时后是否打断请求执行线程
@Override
        public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
            //动态更新配置
            touchConfig();
            return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
        }
  • (不知道哪里)调用HystrixContextScheduler.creataeWorker返回HystrixContextSchedulerWorker,继续调用HystrixContextSchedulerWorker.schedule
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
        this.concurrencyStrategy = concurrencyStrategy;
        this.threadPool = threadPool;
        // actualScheduler是ThreadPoolScheduler
        this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
    }
    
    @Override
    public Worker createWorker() {
        // 创建HystrixContextSchedulerWorker,参数是ThreadPoolWorker
        return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
    }
  • 由实现类ThreadPoolWorker执行schedule方法,包装action,从AbstractCommand的threadPool中获取执行器执行之,将上述参数shouldInterruptThread作为参数构建一个FutureCompleterWithConfigurableInterrupt,作为订阅消息加入到执行任务中,超时后会将
@Override
        public Subscription schedule(final Action0 action) {
            if (subscription.isUnsubscribed()) {
                // don't schedule, we are unsubscribed
                return Subscriptions.unsubscribed();
            }

            // This is internal RxJava API but it is too useful.
            // 包装action
            ScheduledAction sa = new ScheduledAction(action);

            // 这里不懂这个操作啥意思
            subscription.add(sa);
            sa.addParent(subscription);

            // 获取执行器
            ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
            // 执行action
            FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
            // 加入中断线程的subscription
            sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

            return sa;
        }
  • FutureCompleterWithConfigurableInterrupt取消订阅时移除任务,中断请求
@Override
        public void unsubscribe() {
            // 移除上述action
            executor.remove(f);
            if (shouldInterruptThread.call()) {//结果为true取消future
                f.cancel(true);
            } else {
                f.cancel(false);
            }
        }
  • 后面的流程就不写了,与本问题无关
原文  https://segmentfault.com/a/1190000020230380
正文到此结束
Loading...