Hystrix作为后端弹性架构的一把利器,用处可以说非常的广泛,最近在写代码的时候接触到了这个框架,所以趁着业余时间粗粗的看了下其中的源码,发现有很多地方值得学习,于是准备写几篇文章记录一下。
Hystrix的功能比较多,这一篇文章先探讨其中一个比较简单的功能——fallback的具体实现。
下面我们先来看一下fallback的一个具体使用场景。
@HystrixCommand(
fallbackMethod = "fallbackFunc",
commandProperties = {
//超过此时间,HystrixCommand被标记为TIMEOUT,并执行回退逻辑,默认1000ms
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "500"),
//设置在回路被打开,拒绝请求到再次尝试请求并决定回路是否继续打开的时间,默认5000ms
@HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "60000"),
//设置打开回路并启动回退逻辑的错误比率,默认值50%
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "50")
},
threadPoolProperties = {
@HystrixProperty(name = "coreSize", value = "200")
})
@Override
public Map<String, Boolean> someFunc() {
.......
}
可以看到只要一个方法被注解上了HystrixCommand注解,那么这个方法就会被Hystrix监控,注解中有一个参数叫fallbackMethod,很显然, 就是当被注解的方法发生异常之后,会调用fallbackMethod ,在上文中也就是fallbackFunc。值得一提的是HystrixCommand注解有很多的参数,这也是Hystrix功能核心所在,具体的参数可以参考 这篇文章 。
既然是注解,又是在SpringMVC中,自然而然的就联想到了AOP,翻一翻源码,果不其然有一个HystrixCommandAspect类。
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause() != null ? e.getCause() : e;
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
Pointcut指向2个注解,其中一个就是我们用到的HystrixCommand。
具体的编织方法,我们来看下面这一段:
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause() != null ? e.getCause() : e;
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
很有意思的第四点,竟然有RxJava的代码在其中,其实看过源码你就会知道,Hystrix内部已经深度集成了RxJava了,这对做Android的我来说还是有点惊喜的,我之前的博客中也有几篇关于RxJava的文章哦~
我们看返回值不是Observable的,也就是下面这一行执行代码:
result = CommandExecutor.execute(invokable, executionType, metaHolder);
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
Validate.notNull(invokable);
Validate.notNull(metaHolder);
switch (executionType) {
case SYNCHRONOUS: {
return castToExecutable(invokable, executionType).execute();
}
case ASYNCHRONOUS: {
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand()
&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
}
case OBSERVABLE: {
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
}
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}
根据executionType去执行,我们看其中的同步执行:
private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) {
if (invokable instanceof HystrixExecutable) {
return (HystrixExecutable) invokable;
}
throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode");
}
其实很简单啦,就是调用command的execute方法。
command是Hystrix的核心理念,其中的基类是AbstractCommand,我们来看其子类HystrixCommand的execute方法:
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
其中调用了queue方法,而queue方法中最核心的一行代码是:
final Future<R> delegate = toObservable().toBlocking().toFuture();
又见Rxjava的影子,可见其真的是深度集成啊~
toObservable的代码灰常的长,但是如果你对RxJava有所了解的话,其实逻辑是非常清晰的,我们提取其中关键的代码:
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
......
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
通过defer去做延时绑定,并且判断缓存的逻辑,我们这篇文章不关注缓存,而已afterCache就是hystrixObservable,最后通过doOnTerminate,doOnUnsubscribe和doOnComplete去做收尾的一些工作。
可以看到Hystrix内部是如何结合Rxjava去做异步操作的,逻辑很清晰,这里不得不吹一下RxJava了,真是好用啊!!
下面我们来看applyHystrixSemantics这个observable。
rivate Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// mark that we're starting execution on the ExecutionHook
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
executionHook.onStart(_cmd);
/* determine if we're allowed to execute */
if (circuitBreaker.attemptExecution()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}
其中又是RxJava的代码,最关键的代码是:
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
这里我们就不一一分析的,其实光看名字就知道了,executeCommandAndObserve做具体的方法执行:
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
.....
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
@Override
public void call(Notification<? super R> rNotification) {
setRequestContextIfNeeded(currentRequestContext);
}
};
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
execution的链式调用中有onErrorResumeNext,并传入了handlerFallback,而handlerFallback中处理了具体的一些错误,入超时等。说道超时,我们可以看一下和它相关的代码,execution在生成的时候,有一个HystrixObservableTimeoutOperator操作符:
@Override
public Subscriber<? super R> call(final Subscriber<? super R> child) {
final CompositeSubscription s = new CompositeSubscription();
// if the child unsubscribes we unsubscribe our parent as well
child.add(s);
//capture the HystrixRequestContext upfront so that we can use it in the timeout thread later
final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();
TimerListener listener = new TimerListener() {
@Override
public void tick() {
// if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
// otherwise it means we lost a race and the run() execution completed or did not start
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
// report timeout failure
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
// shut down the original request
s.unsubscribe();
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
@Override
public void run() {
child.onError(new HystrixTimeoutException());
}
});
timeoutRunnable.run();
//if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout
}
}
@Override
public int getIntervalTimeInMilliseconds() {
return originalCommand.properties.executionTimeoutInMilliseconds().get();
}
};
它的call方法中去做了计时,如果超时,就抛出HystrixTimeoutException这个exception。
我们来看和HystrixTimeoutException相关的handleTimeoutViaFallback。
private Observable<R> handleTimeoutViaFallback() {
return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
}
调用了getFallbackOrThrowException方法,而其中则会调用getFallbackAction方法。
protected CommandAction getFallbackAction() {
return commandActions.getFallbackAction();
}
commandActions是怎么传入的呢?回到Aspect类中:
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
command是通过工厂类生成的。
public HystrixInvokable create(MetaHolder metaHolder) {
HystrixInvokable executable;
if (metaHolder.isCollapserAnnotationPresent()) {
executable = new CommandCollapser(metaHolder);
} else if (metaHolder.isObservable()) {
executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
} else {
executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
}
return executable;
}
看最后一个else里的代码是去new了一个command。
protected AbstractHystrixCommand(HystrixCommandBuilder builder) {
super(builder.getSetterBuilder().build());
this.commandActions = builder.getCommandActions();
this.collapsedRequests = builder.getCollapsedRequests();
this.cacheResultInvocationContext = builder.getCacheResultInvocationContext();
this.cacheRemoveInvocationContext = builder.getCacheRemoveInvocationContext();
this.ignoreExceptions = builder.getIgnoreExceptions();
this.executionType = builder.getExecutionType();
}
在构造函数中传入了commandActions。那HystrixCommandBuilder又是怎么生成呢?
public <ResponseType> HystrixCommandBuilder create(MetaHolder metaHolder, Collection<HystrixCollapser.CollapsedRequest<ResponseType, Object>> collapsedRequests) {
validateMetaHolder(metaHolder);
return HystrixCommandBuilder.builder()
.setterBuilder(createGenericSetterBuilder(metaHolder))
.commandActions(createCommandActions(metaHolder))
.collapsedRequests(collapsedRequests)
.cacheResultInvocationContext(createCacheResultInvocationContext(metaHolder))
.cacheRemoveInvocationContext(createCacheRemoveInvocationContext(metaHolder))
.ignoreExceptions(metaHolder.getCommandIgnoreExceptions())
.executionType(metaHolder.getExecutionType())
.build();
}
builder中的commandActions是通过metaHolder生成的:
private CommandActions createCommandActions(MetaHolder metaHolder) {
CommandAction commandAction = createCommandAction(metaHolder);
CommandAction fallbackAction = createFallbackAction(metaHolder);
return CommandActions.builder().commandAction(commandAction)
.fallbackAction(fallbackAction).build();
}
其中createFallbackAction会通过metaHolder找到被注解方法中有没有注解上fallbackMethod(第一章节中的fallbackFunc),如果有,则传入其中。
至此,我们已经分析完了Hystrix中的fallback机制,总得来说就是通过RxJava去做异步操作,并获取注解中的fallbackMethod,在捕获异常之后调用具体的方法。