/* * The Future returned by Observable.toBlocking().toFuture() does not implement the * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true; * thus, to comply with the contract of Future, we must wrap around it. */ final Future<R> delegate = toObservable().toBlocking().toFuture();
// AbstractCommand.java return Observable.defer(new Func0<Observable<R>>() { // 调用toBlocking方法后就会触发执行这里的代码了 @Override public Observable<R> call(){ // 一个Command命令只能被执行一次,所以Command对象每次都是new // 刚开始状态是NOT_STARTED /* this is a stateful object so can only be used once */ if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); //TODO make a new error type for this thrownew HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null); } // 命令开始的时间戳 commandStartTimestamp = System.currentTimeMillis();
// 日志 if (properties.requestLogEnabled().get()) { // log this command execution regardless of what happened // 请求日志 if (currentRequestLog != null) { currentRequestLog.addExecutedCommand(_cmd); } }
finalboolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey();
// 缓存RequestCache /* try from cache first */ if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } // 利用回调创建了Observable对象 Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// 处理Request cache // put in cache if (requestCacheEnabled && cacheKey != null) { // wrap it for caching HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null) { // another thread beat us so we'll use the cached value instead toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { // we just created an ObservableCommand so we cast and return it afterCache = toCache.toObservable(); } } else { afterCache = hystrixObservable; }
return afterCache .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once .doOnCompleted(fireOnCompletedHook); } });
// AbstractCommand.java private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd){ // mark that we're starting execution on the ExecutionHook // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent // 这个类是ExecutionHookDeprecationWrapper,是内部类 executionHook.onStart(_cmd);
// 断路器是否打开,打开的话直接走降级逻辑 /* determine if we're allowed to execute */ if (circuitBreaker.attemptExecution()) { // 这里会拿到信号量,如果不是使用SEMAPHORE的话,这里拿到的是一个什么都不干的TryableSemaphoreNoOp.DEFAULT final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override publicvoidcall(){ if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } };
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override publicvoidcall(Throwable t){ eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } };
/** * Get the TryableSemaphore this HystrixCommand should use for execution if not running in a separate thread. * * @return TryableSemaphore */ // 如果不是信号量的话,返回TryableSemaphoreNoOp.DEFAULT,啥也不干 protected TryableSemaphore getExecutionSemaphore(){ // 用SEMAPHORE if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) { if (executionSemaphoreOverride == null) { TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name()); if (_s == null) { // we didn't find one cache so setup executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests())); // assign whatever got set (this or another thread) return executionSemaphorePerCircuit.get(commandKey.name()); } else { return _s; } } else { return executionSemaphoreOverride; } } else { // return NoOp implementation since we're not using SEMAPHORE isolation // 不用SEMAPHORE return TryableSemaphoreNoOp.DEFAULT; } }
// AbstractCommand.java /** * This decorates "Hystrix" functionality around the run() Observable. * * @return R */ private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd){ final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
final Action1<R> markEmits = new Action1<R>() { @Override publicvoidcall(R r){ if (shouldOutputOnNextEvents()) { executionResult = executionResult.addEvent(HystrixEventType.EMIT); eventNotifier.markEvent(HystrixEventType.EMIT, commandKey); } if (commandIsScalar()) { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList()); circuitBreaker.markSuccess(); } } };
final Action0 markOnCompleted = new Action0() { @Override publicvoidcall(){ if (!commandIsScalar()) { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList()); circuitBreaker.markSuccess(); } } };
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t){ circuitBreaker.markNonSuccess(); Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } elseif (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); } elseif (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { /* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */ if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); }
return handleFailureViaFallback(e); } } };
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override publicvoidcall(Notification<? super R> rNotification){ setRequestContextIfNeeded(currentRequestContext); } };
// AbstractCommand.java private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd){ // 默认就是线程隔离的 if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call(){ executionResult = executionResult.setExecutionOccurred(); // 命令状态从OBSERVABLE_CHAIN_CREATED切换到USER_CODE_EXECUTED,不是花就报错 if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } // 监控 metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { // the command timed out in the wrapping thread so we will return immediately // and not increment any of the counters below or other such logic return Observable.error(new RuntimeException("timed out before executing run()")); } // 判断线程的状态,从NOT_USING_THREAD,切换到STARTED if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { //we have not been unsubscribed, so should proceed HystrixCounters.incrementGlobalConcurrentThreads(); threadPool.markThreadExecution(); // store the command that is being run // 将要执行的一个命令,压入一个栈中 endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); executionResult = executionResult.setExecutedInThread(); /** * If any of these hooks throw an exception, then it appears as if the actual execution threw an error */ try { // 这几行代码也没干啥 executionHook.onThreadStart(_cmd); executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); // run方法最终执行的地方 return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { //command has already been unsubscribed, so return immediately return Observable.error(new RuntimeException("unsubscribed before executing run()")); } } }).doOnTerminate(new Action0() { @Override publicvoidcall(){ if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) { //if it was never started and received terminal, then no need to clean up (I don't think this is possible) } //if it was unsubscribed, then other cleanup handled it } }).doOnUnsubscribe(new Action0() { @Override publicvoidcall(){ if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) { //if it was never started and was cancelled, then no need to clean up } //if it was terminal, then other cleanup handled it } }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call(){ return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); } else { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call(){ executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); }
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE); // semaphore isolated // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); try { executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw } catch (Throwable ex) { //If the above hooks throw, then use that as the result of the run method return Observable.error(ex); } } }); } }
try { // 这行代码里的Func0.call()会执行HystrixCommand的run方法 userObservable = getExecutionObservable(); } catch (Throwable ex) { // the run() method is a user provided implementation so can throw instead of using Observable.onError // so we catch it here and turn it into Observable.error userObservable = Observable.error(ex); }
// HystrixCommand.java @Override finalprotected Observable<R> getExecutionObservable(){ return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call(){ try { // 这里就是执行HystrixCommand的run方法了 return Observable.just(run()); } catch (Throwable ex) { return Observable.error(ex); } } }).doOnSubscribe(new Action0() { @Override publicvoidcall(){ // Save thread on which we get subscribed so that we can interrupt it later if needed executionThread.set(Thread.currentThread()); } }); }
// AbstractCommand.java privatestatic HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults){ if (fromConstructor == null) { // get the default implementation of HystrixThreadPool return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults); } else { return fromConstructor; } }
// HystrixThreadPool.java // 维护了一个map,key就是threadPoolKey,一个key就对应了一个线程池 /* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder){ // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work String key = threadPoolKey.name();
// this should find it for all but the first time HystrixThreadPool previouslyCached = threadPools.get(key); if (previouslyCached != null) { return previouslyCached; }
// if we get here this is the first time so we need to initialize synchronized (HystrixThreadPool.class) { if (!threadPools.containsKey(key)) { // 创建线程池 threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder)); } } return threadPools.get(key); }
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties){ // ThreadFactory这个就是为了给线程起名字,Hystrix开头的名字 final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
finalboolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get(); finalint dynamicCoreSize = threadPoolProperties.coreSize().get(); finalint keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get(); finalint maxQueueSize = threadPoolProperties.maxQueueSize().get(); final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize); // 默认是false if (allowMaximumSizeToDivergeFromCoreSize) { finalint dynamicMaximumSize = threadPoolProperties.maximumSize().get(); if (dynamicCoreSize > dynamicMaximumSize) { logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); returnnew ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } else { returnnew ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } } else { returnnew ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } }
// 得到线程池队列 public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize){ /* * We are using SynchronousQueue if maxQueueSize <= 0 (meaning a queue is not wanted). * <p> * SynchronousQueue will do a handoff from calling thread to worker thread and not allow queuing which is what we want. * <p> * Queuing results in added latency and would only occur when the thread-pool is full at which point there are latency issues * and rejecting is the preferred solution. */ // hystrix.threadpool.ServiceA.maxQueueSize = -1,直接返回SynchronousQueue,这是一个同步队列,也就是收到请求后直接创建线程,不会去排队 if (maxQueueSize <= 0) { returnnew SynchronousQueue<Runnable>(); } else { // 就会返回LinkedBlockingQueue,优先用core-size的线程数量去处理,如果满了就去排队,如果排队的也满了,就会增加core-size到maximumSize,还不够就拒绝掉了 returnnew LinkedBlockingQueue<Runnable>(maxQueueSize); } }
@Override public Subscription schedule(Action0 action, long delayTime, TimeUnit unit){ if (threadPool != null) { // 判断队列是否还有空间,没有就拒绝了 if (!threadPool.isQueueSpaceAvailable()) { thrownew RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold."); } } // HystrixContexSchedulerAction包含了回调HystrxCommand.run方法的逻辑 return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit); }
@Override publicbooleanisQueueSpaceAvailable(){ if (queueSize <= 0) { // we don't have a queue so we won't look for space but instead // let the thread-pool reject or not returntrue; } else { return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get(); } }
@Override public Subscription schedule(final Action0 action){ if (subscription.isUnsubscribed()) { // don't schedule, we are unsubscribed return Subscriptions.unsubscribed(); }
// This is internal RxJava API but it is too useful. ScheduledAction sa = new ScheduledAction(action);
@Override publicvoidtick(){ // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath // otherwise it means we lost a race and the run() execution completed or did not start if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { // report timeout failure originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
// shut down the original request s.unsubscribe();
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur // 会监听拒绝、异常、超时等数据,从统计信息metrics里拿的数据 Subscription s = subscribeToStream(); activeSubscription.set(s); }
private Subscription subscribeToStream(){ /* * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream */ return metrics.getHealthCountsStream() .observe() .subscribe(new Subscriber<HealthCounts>() { @Override publicvoidonCompleted(){
}
@Override publicvoidonError(Throwable e){
}
// 最近10秒钟的统计信息 @Override publicvoidonNext(HealthCounts hc){ // 在最近的一个时间窗口以内(10秒),totalRequests(总请求数量)小于circuitBreakerRequestVolumeThreshold(默认是20),那就什么都不干 // check if we are past the statisticalWindowVolumeThreshold if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // we are not past the minimum volume threshold for the stat window, // so no change to circuit status. // if it was CLOSED, it stays CLOSED // if it was half-open, we need to wait for a successful command execution // if it was open, we need to wait for sleep window to elapse } else { // 进入下一步的尝试 // 如果最近一个时间窗口(默认是10s)内的异常请求次数所占的比例(25次请求,5次,20%) < circuitBreakerErrorThresholdPercentage(默认是50%)什么都不干 if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { //we are not past the minimum error threshold for the stat window, // so no change to circuit status. // if it was CLOSED, it stays CLOSED // if it was half-open, we need to wait for a successful command execution // if it was open, we need to wait for sleep window to elapse } else // 反之,如果最近一个时间窗口(默认是10s)内的异常请求次数所占的比例(25次请求,20次,80%) >= circuitBreakerErrorThresholdPercentage(默认是50%),此时就会打开熔断开关 // our failure rate is too high, we need to set the state to OPEN if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { circuitOpened.set(System.currentTimeMillis()); } } } } }); }
@Override publicbooleanattemptExecution(){ if (properties.circuitBreakerForceOpen().get()) { returnfalse; } if (properties.circuitBreakerForceClosed().get()) { returntrue; } // -1 就是可以执行请求,断路器没有打开 if (circuitOpened.get() == -1) { returntrue; } else { if (isAfterSleepWindow()) { // 状态搞为半打开,让一个请求执行试一下 // 如果失败了,那么还是OPEN,handleFallback -> circuitBreaker.markNonSuccess();同时会更新熔断的时间戳 // 如果请求成功,markEmits/markOnCompleted,circuitBreaker.markSuccess(),关闭熔断器。会变成CLOSED if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) { //only the first request after sleep window should execute returntrue; } else { returnfalse; } } else { returnfalse; } } } // 过了一个circuitBreakerSleepWindowInMilliseconds时间以后,这个时间默认是5秒 privatebooleanisAfterSleepWindow(){ finallong circuitOpenTime = circuitOpened.get(); finallong currentTime = System.currentTimeMillis(); finallong sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get(); return currentTime > circuitOpenTime + sleepWindowTime; }
@Override publicvoidmarkSuccess(){ if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) { //This thread wins the race to close the circuit - it resets the stream to start it over from 0 metrics.resetStream(); Subscription previousSubscription = activeSubscription.get(); if (previousSubscription != null) { previousSubscription.unsubscribe(); } Subscription newSubscription = subscribeToStream(); activeSubscription.set(newSubscription); circuitOpened.set(-1L); } }
@Override publicvoidmarkNonSuccess(){ if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) { //This thread wins the race to re-open the circuit - it resets the start time for the sleep window circuitOpened.set(System.currentTimeMillis()); } }