前两篇文章我们已经讲解了Hystrix的一些基本概念,并举了一些demo说明如何使用Hystrix,这篇文章我们更深一步,通过阅读一些源码来看下Hystrix是怎么工作的。我们主要根据官方文档上的一个流程图,对其中几个主要的过程从源码层面来研究下。
创建HystrixCommand(HystrixObservableCommand)
|
|
上面我们通过继承HystrixCommand实现run,getFallback,getCacheKey等方法实现了一个自己的Command
Execute the Command
执行一个command可以有以下四种方式(前两种是HystrixCommand独有的):
- execute():阻塞方法,返回一个单个的response(或者异常)
- queue():非阻塞方法,返回一个与单个response关联的future
- observe():返回一个Observable,非延时方法,不管有没有订阅者,都会立即执行命令(不用担心后订阅的订阅者接收不到事件,因为会把事件放到一个RelaySubject里面)
- toObservable():同样是返回一个Observable,延时方法。只有订阅了这个Observable,才会执行命令。
|
|
上述代码,我们通过GetUserAccountCommand.execute来执行命令,debug源码可以看到,execute命令其实也是调用了上面的queue方法返回一个future,然后通过future.get阻塞获取response。继续深入future方法,我们可以先不用管那个代理的future(主要用来实现中断的),final Future
Is the Response Cached
AbstractCommand
上述代码中有个判断final boolean requestCacheEnabled = isRequestCachingEnabled();可以看到如果我们的Command实现了getCacheKey方法,并且requestCacheEnabled(这个属性默认是true,可以通过调用HystrixCommand的构造方法传入一个setter对象修改默认属性)这样就不会执行后续的run方法,就会直接返回一个缓存的Observable。(上一篇文章我们已经提到,必须是同一个request context里面的两个command才能用到缓存)
|
|
Is the Circuit Open?
可以看到在缓存逻辑过了之后,就会判断断路器(Circuit)的状态是否是open,如果是open状态,就会直接调用fallback方法;如果不是就继续后面的流程,这里断路器要重点说一下,我们在第一篇文章中就说了Hystrix可以解决在我们依赖的外部服务异常造成瀑布式报错,就是通过这个断路器来控制的,所以我们很有必要了解它的工作过程。
Circuit Breaker
从上面的图我们可以看到在HystrixCommand(HystrixObservableCommand)在执行过程中会与HystrixCircuitBreaker交互,执行之前会根据断路器的状态来决定后续流程,命令执行成功/失败/超时又会向断路器上报数据,断路器根据这些数据来改变状态。下面是断路器的一个流程图
- 如果通过当前断路器请求达到了阈值HystrixCommandProperties.circuitBreakerRequestVolumeThreshold()
- 如果当前的错误率达到了阈值HystrixCommandProperties.circuitBreakerErrorThresholdPercentage()
- 那么就会把断路器的状态从CLOSED变成OPEN
- OPEN状态时候,就会断路次断路器上所有的请求(直接返回fallback方法)
- 经过一段时间(HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()默认5s)允许一个请求进来,此时断路器状态变为HALF-OPEN,如果这个请求还是失败,那么状态就还是OPEN ,继续等待一个时间;如果此次请求成功,就把状态变更为CLOSED,然后继续循环1的过程。
下面通过源码,来看一下上述断路器的执行过程。
AbstractCommand
commad在request cache逻辑之后,run方法之前都会运行circuitBreaker.attemptExecution(),官方注释说明这个方法不是一个幂等方法,会改变内部的状态。
HystrixCircuitBreaker
上述代码,就可以说明一开始我们解释的断路器状态变迁过程。如果状态为OPEN,返回false;CLOSED返回true;经过了sleepWindow时间后,允许一个请求进来,此时断路器状态从OPEN变为HALF_OPEN,这个status是一个原子性的操作。最终这个放进来的请求完成根据成功失败会调用对应的markSuccess或者markNonSuccess,通过一个CAS操作,将HALF_OPEN状态变为CLOSED或者OPEN,如果没有成功,会circuitOpened.set(System.currentTimeMillis())更新此次失败的时间;成功更新circuitOpened.set(-1L),这样后续请求就都可以进来了。
Is the Thread Pool/Queue/Semaphore Full?
之前说过Hystrix通过为每个单独的外部服务创建一个线程池来达到隔离外部服务的目的,这个很明显的一个好处是外部服务挂了,不会影响我们。但是如果依赖过多的服务,会不会造成创建的线程池过多,这是一个问题。Hystix官方有个压测的结果,可以参考下。如果依赖的服务是一些内存级别的很快的操作,那么创建线程池带来的上下文切换的消耗可能会过大,这个时候Hystrix也提供了Semaphore,可以控制同时并发的请求数,直接在当前线程运行。上述不管是线程池还是Semaphore如果满了,就会执行fallback方法。并且我们可以通过设置execution.isolation.strategy动态调整策略
AbstractCommand
可以看到,如果配置的策略是SEMAPHORE,会创建一个许可是properties.executionIsolationSemaphoreMaxConcurrentRequests()的SEMAPHORE,可以看到tryAcquire方法会根据设置的许可直接返回true或者false,不会阻塞。
|
|
如果配置的不是SEMAPHORE,则返回TryableSemaphoreNoOp.DEFAULT,这个其实不是Semaphore,因为下面源码可以看到都是返回true。
|
|
HystrixObservableCommand.construct() or HystrixCommand.run()
AbstractCommand
上述代码可以看到首先判断是不是ExecutionIsolationStrategy.THREAD模式,如果不是,说明是Semaphore模式,就走到最后面的逻辑,直接调用getUserExecutionObservable;如果是THREAD模式,可以看到最终会对返回的Observable.subscribeOn(Func0
继续来看getUserExecutionObservable方法
这个方法会调用到getExecutionObservable()方法,这里会根据你是实现的HystrixCommand还是HystrixObservableCommand调用对应的方法。
HystrixCommand
并且这里通过doOnSubscribe方法设置了回调保存了当前执行订阅的线程以便后续需要的时候可以interrupt。
HystrixObservableCommand
可以看到通过调用getExecutionObservable最终都会调用到你实现的run方法或者construct方法
Calculate Circuit Health
从整个Hystrix flow chart看到步骤5和6的执行结果不管成功与否都会上报Metrics,然后断路器通过这些上报的数据来计算当前的状态,上面Is the Circuit Open?里面已经说过。
Get the Fallback
从整个Hystrix flow chart看到步骤4断路器状态OPEN,步骤5Semphore或者Threadpool reject,步骤6执行失败或者超时都会执行fallback方法,这里就不详细看源码了。
Return the Successful Response
如果整个流程执行成功,那么就会返回调用者一个Observable,你可以通过同步方式获取结果也可以subscribe这个Observable通过异步方式获取最终结果,整个获取Observable的流程如下: