SpringCloud框架,没有特殊的实现。即,请求到达Zuul网关后,由Ribbon负载均衡到目标组件节点,由Hystrix转发请求。
hystrix.command.default.execution.isolation.strategy=THREAD
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=10000
一次调用中发现,请求过程超过10s后,后台已经打印HystrixTimeoutException且进入了自定义的FallbackProvider,前端仍然没有收到响应,直到请求链路处理完成,前端才返回FallbackProvider中返回的异常响应。
看文档- Hystrix官方文档 ,THREAD隔离模式下是请求超时是会取消调用线程从而立即返回的,SEMAPHORE模式下会等待响应回来再判断是否超时。而上述配置的是所有的Route都默认是THREAD-线程隔离模式,遂认为配置没问题;
跟踪源码,RxJava实现的,响应式编程不熟悉,初期调试时没头苍蝇一样到处打断点,看得不明所以(现在也是)。网上的资料大多是翻译上述文档,只知道是HystrixCommand.execute()发送请求,AbstractCommand.handleTimeoutViaFallback()触发FallbackProvider,中间超时如何处理没有说清;
开启DEBUG级别日志,用日志跟踪一个请求的全部过程。发现打印的配置是 executionIsolationStrategy=SEMAPHORE !!!查阅SpringCloud相关资料,发现用Hystrix+Ribbon的时候,发送请求用的是HystrixCommand的AbstractRibbonCommand实现,而后者的部分配置会覆盖掉HystrixCommandProperties中的配置,其中就有隔离模式这项配置,用的是ZuulProperties中的默认值SEMAPHORE:
protected AbstractRibbonCommand(Setter setter, LBC client,
RibbonCommandContext context,
ZuulFallbackProvider fallbackProvider, IClientConfig config) {
//将setter传到HystrixCommand的构造方法中
super(setter);
this.client = client;
this.context = context;
this.zuulFallbackProvider = fallbackProvider;
this.config = config;
}
//创建Setter
protected static HystrixCommandProperties.Setter createSetter(IClientConfig config, String commandKey, ZuulProperties zuulProperties) {
int hystrixTimeout = getHystrixTimeout(config, commandKey);
return HystrixCommandProperties.Setter().withExecutionIsolationStrategy(
//executionIsolationStrategy用的是ZuulProperties中的值
zuulProperties.getRibbonIsolationStrategy()).withExecutionTimeoutInMilliseconds(hystrixTimeout);
}
//默认是SEMAPHORE
private ExecutionIsolationStrategy ribbonIsolationStrategy = SEMAPHORE;
//最终setter作为参数builder传入
protected HystrixCommandProperties(HystrixCommandKey key, HystrixCommandProperties.Setter builder, String propertyPrefix) {
this.key = key;
// 省略其它配置
this.executionIsolationStrategy = getProperty(propertyPrefix, key, "execution.isolation.strategy", builder.getExecutionIsolationStrategy(), default_executionIsolationStrategy);
a. 指定commandKey的方式
hystrix.command.aService.execution.isolation.strategy=THREAD
b. 修改Zuul配置的方式,注意useSeparateThreadPools默认为false,此时所有组件共用一个commandKey=RibbinCommand的线程池
zuul.ribbonIsolationStrategy=THREAD指定ribbon的隔离模式 zuul.threadPool.useSeparateThreadPools=true每个commandKey一个线程池
在对RxJava有大概了解的准备下,梳理Hystrix关键请求流程如下:
a. 发送请求,进入Zuul过滤器RibbonRoutingFilter,通过工厂类创建AbstractRibbonCommand,调用其execute方法
protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
Map<String, Object> info = this.helper.debug(context.getMethod(),
context.getUri(), context.getHeaders(), context.getParams(),
context.getRequestEntity());
// 创建AbstractRibbonCommand
RibbonCommand command = this.ribbonCommandFactory.create(context);
try {
// 调用execute方法
ClientHttpResponse response = command.execute();
this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
return response;
}
catch (HystrixRuntimeException ex) {
return handleException(info, ex);
}
}
b. 进入HystrixCommand.execute(),实际是调用Future.get()来立即获取异步方法HystrixCommand.queue()的结果
public R execute() {
try {
//queue方法返回的是Future
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
c. 通过AbstractCommand.toObservable()创建一个待订阅的被观察对象(即Observable),创建过程:
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
//没有缓存使用applyHystrixSemantics
afterCache = hystrixObservable;
}
// 获取信号量,THREAD模式下
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
//超时异常在此处理
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
return Observable.defer(new Func0<Observable<R>>() {
...
}).doOnTerminate(new Action0() {
...
}).doOnUnsubscribe(new Action0() {
...
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
//原始HystrixCommand的状态为TIMED_OUT时
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
}
@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
//动态更新配置
touchConfig();
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.concurrencyStrategy = concurrencyStrategy;
this.threadPool = threadPool;
// actualScheduler是ThreadPoolScheduler
this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
}
@Override
public Worker createWorker() {
// 创建HystrixContextSchedulerWorker,参数是ThreadPoolWorker
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
@Override
public Subscription schedule(final Action0 action) {
if (subscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}
// This is internal RxJava API but it is too useful.
// 包装action
ScheduledAction sa = new ScheduledAction(action);
// 这里不懂这个操作啥意思
subscription.add(sa);
sa.addParent(subscription);
// 获取执行器
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
// 执行action
FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
// 加入中断线程的subscription
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
@Override
public void unsubscribe() {
// 移除上述action
executor.remove(f);
if (shouldInterruptThread.call()) {//结果为true取消future
f.cancel(true);
} else {
f.cancel(false);
}
}