= 29
本文出自:【InTheWorld的博客】 (欢迎留言、交流)
  
 
Spring Cloud“全家桶”风头正劲,Hystrix作为服务容错保护组件也是挺有名气。最近我有在看一些Spring Cloud的内容,其中就包括Hystrix。这里我打算从宏观理论和微观实现两个部分来分析Hystrix。
首先是宏观理论了,先抛出两个问题。Hystrix的设计目的是什么?应该怎么完成这些目标?针对第一个问题,我们首先需要明确的是微服务架构应该是有一定的容错性的,而服务不可用的问题是客观存在的。而且这些服务错误常常会恶化和扩散,结果造成更严重的负面影响。所以在无法绝对保证服务可用性的前提下,我们需要一种机制来保护服务错误。
Hystrix的作用主要体现在一下几个方面,
  
 
Hystrix是怎么完成这些需求呢?个人觉得Hystrix的实现有以下几个关键点;
Hystrix使用了“命令模式”,每一个依赖请求都是HystrixCommand或者HystrixObservableCommand。Hystrix的命令执行流程图如下:
  
 
HystrixCommand和HystrixObservableCommand的区别主要体现在异步执行的返回值句柄不同。它们分别对应传统型的Future、和RxJava中的Observable。虽然看似用法略有不同,但是内部实现都是通过RxJava的形式完成的。HystrixCommand的简单使用方法如下:
  @HystrixCommand(fallbackMethod = "findByIdFallback")
  @GetMapping("/user/{id}")
  public User findById(@PathVariable Long id) {
    return this.restTemplate.getForObject("http://microservice-provider-user/" + id, User.class);
  }
  User findByIdFallback(Long id, Throwable throwable) {
    LOGGER.error("进入回退方法,异常:", throwable);
    User user = new User();
    user.setId(-1L);
    user.setName("默认用户");
    return user;
  } 
 HystrixCommand这个注解相当于把findById这个函数包装成了一个HystrixCommand。对这个api端点的请求都会引发一个对应的HystrixCommand执行。HystrixCommand注解除了fallbackMethod,还有很多其他的参数,这些参数会用来构造HystrixCommand命令。
HystrixCommand命令是如何执行的呢?接下来,我们就通过对Hystrix源代码的分析来理解它!无论是HystrixCommand还是HystrixObservableCommand,都是AbstractCommand的子类。所以它包含了很多Hystrix命令执行的细节。以HystrixCommand类为例,它的执行其实是通过调用execute()方法完成的。这个方法的实现如下:
    public R execute() {
        try {
            return this.queue().get();
        } catch (Exception var2) {
            throw this.decomposeException(var2);
        }
    }
    public Future<R> queue() {
        final Future<R> delegate = this.toObservable().toBlocking().toFuture();
        Future<R> f = new Future<R>() {
            public R get() throws InterruptedException, ExecutionException {
                return delegate.get();
            }
            public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                return delegate.get(timeout, unit);
            }
        };
        if(f.isDone()) {
        /* */
        } else {
            return f;
        }
    } 
 可以看到execute()方法实际上是通过调用queue()方法实现的,而queue()方法的目的就是构造出一个Future f。这个f其实仅仅是封装了delegate这个Futrue。所以一番分析之后,我们知道了问题的关键所在,其实就是这行this.toObservable().toBlocking().toFuture()。那就从toObservable()开始分析!
    public Observable<R> toObservable() {
        final Action0 terminateCommandCleanup = new Action0() {
        /* */
        };
        final Action0 unsubscribeCommandCleanup = new Action0() {
        /* */             
        };
        final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
            public Observable<R> call() {
                return AbstractCommand.this.applyHystrixSemantics(AbstractCommand.this);
            }
        };
        final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
            /* */
        };
        final Action0 fireOnCompletedHook = new Action0() {
        /* */
        };
        return Observable.defer(new Func0<Observable<R>>() {
            public Observable<R> call() {
                if(!AbstractCommand.this.commandState.compareAndSet(AbstractCommand.CommandState.NOT_STARTED, AbstractCommand.CommandState.OBSERVABLE_CHAIN_CREATED)) {
                } else {
                    Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);
                    Observable afterCache;
                    if(requestCacheEnabled && cacheKey != null) {
            /* */
                        afterCache = toCache.toObservable();
                    } else {
                        afterCache = hystrixObservable;
                    }
                    return afterCache.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook);
                }
            }
        });
    } 
 虽然这个方法的代码不少,但还是可以很快的定位到关键点——applyHystrixSemantics(); 这个函数完成了Hystrix的基本语义。首先是熔断器的语义,如果熔断器允许请求就去执行请求,反之则直接执行fallback函数。在正常的请求通路下,最终会调用executeCommandWithSpecifiedIsolation()方法来完成请求的执行。从这个函数名就可以看出,这个方法完成了Hystrix隔离的语义。由于这个方法代码比较长,就不贴完整代码了。但是方法中的一个subscribeOn()运算符非常显眼,如下所示:
subscribeOn(this.threadPool.getScheduler(new Func0<Boolean>() {
            public Boolean call() {
                return Boolean.valueOf(((Boolean)AbstractCommand.this.properties.executionIsolationThreadInterruptOnTimeout().get()).booleanValue() && _cmd.isCommandTimedOut.get() == AbstractCommand.TimedOutStatus.TIMED_OUT);
            }
        })); 
 这个threadPool就是构造HystrixCommand的时候设定的线程池,现在大家应该可以理解Hystrix如何实现隔离的。回到那行代码this.toObservable().toBlocking().toFuture()。toBlocking()仅仅是为了构造一个BlockingObservable,然后通过toFuture()构造出一个Future。其实构造这个future的过程也很简单,只是订阅上游的Observable,然后通知和更新Future,具体过程如下:
public final class BlockingOperatorToFuture {
    public static <T> Future<T> toFuture(Observable<? extends T> that) {
        final CountDownLatch finished = new CountDownLatch(1);
        final AtomicReference<T> value = new AtomicReference();
        final AtomicReference<Throwable> error = new AtomicReference();
        final Subscription s = that.single().subscribe(new Subscriber<T>() {
            public void onCompleted() {
                finished.countDown();
            }
            public void onError(Throwable e) {
                error.compareAndSet((Object)null, e);
                finished.countDown();
            }
            public void onNext(T v) {
                value.set(v);
            }
        });
        return new Future<T>() {
            private volatile boolean cancelled;
            public boolean isDone() {
                return finished.getCount() == 0L;
            }
            public T get() throws InterruptedException, ExecutionException {
                finished.await();
                return this.getValue();
            }
            public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                if(finished.await(timeout, unit)) {
                    return this.getValue();
                } else {
                    throw new TimeoutException("Timed out after " + unit.toMillis(timeout) + "ms waiting for underlying Observable.");
                }
            }
            private T getValue() throws ExecutionException {
                Throwable throwable = (Throwable)error.get();
                if(throwable != null) {
                    throw new ExecutionException("Observable onError", throwable);
                } else if(this.cancelled) {
                    throw new CancellationException("Subscription unsubscribed");
                } else {
                    return value.get();
                }
            }
        };
    }
} 
 这个返回的Future就是前面的delegate。讲到这里,Hystrix command的大致回路,算是跑马观花的看了一遍。具体的一些细节实现,这里就先略过了!
对于Hystrix的具体使用,可以参考这个 https://github.com/eacdy/spring-cloud-study 。我也有看这个demo项目学习Spring Cloud。