熔断器 Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑

本文主要基于 Hystrix 1.5.X 版本

  • 1. 概述

  • 2. #applyHystrixSemantics(...)

  • 3. TryableSemaphore

  • 4. #executeCommandAndObserve(...)

  • 5. #executeCommandWithSpecifiedIsolation(...)

  • 6. #getUserExecutionObservable(...)

  • 7. #getExecutionObservable()

  • 8. CommandState

  • 9. ThreadState

  • 666. 彩蛋


熔断器 Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑

🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表

  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址

  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢

  4. 新的源码解析文章实时收到通知。每周更新一篇左右

  5. 认真的源码交流微信群。


1. 概述

本文主要分享 Hystrix 命令执行(一)之正常执行逻辑

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

Hystrix 执行命令整体流程如下图:

FROM 《【翻译】Hystrix文档-实现原理》「流程图」
熔断器 Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑

  • 框 :Hystrix 命令执行的过程。

  • 圈 :本文分享的部分 —— 正常执行逻辑。


推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG 。

  • 程序猿DD —— 《Spring Cloud微服务实战》

  • 周立 —— 《Spring Cloud与Docker微服务架构实战》

  • 两书齐买,京东包邮。

2. #applyHystrixSemantics(...)

在 《Hystrix 源码解析 —— 执行结果缓存》 里,我们看到 #toObservable() 方法里的第 11 至 19 行,当缓存特性未开启,或者缓存未命中时,使用 applyHystrixSemantics 传入 Observable#defer(...) 方法,声明执行命令的 Observable。

创建 applyHystrixSemantics 变量,代码如下 :

  1. // `AbstractCommand#toObservable()` 方法

  2.  1: final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {

  3.  2:     @Override

  4.  3:     public Observable<R> call() {

  5.  4:         // commandState 处于 UNSUBSCRIBED 时,不执行命令

  6.  5:         if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {

  7.  6:             return Observable.never();

  8.  7:         }

  9.  8:         // 获得 执行Observable

  10.  9:         return applyHystrixSemantics(_cmd);

  11. 10:     }

  12. 11: };

  • 第 5 至 7 行 :当 commandState 处于 UNSUBSCRIBED 时,不执行命令。

  • 第 9 行 :调用 #applyHystrixSemantics(...) 方法,获得执行 Observable 。


#applyHystrixSemantics(...) 方法,代码如下 :

  1.  1: private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {

  2.  2:     // TODO 【2003】【HOOK】

  3.  3:     // mark that we're starting execution on the ExecutionHook

  4.  4:     // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent

  5.  5:     executionHook.onStart(_cmd);

  6.  6:

  7.  7:     /* determine if we're allowed to execute */

  8.  8:     if (circuitBreaker.attemptExecution()) {

  9.  9:         // 获得 信号量

  10. 10:         final TryableSemaphore executionSemaphore = getExecutionSemaphore();

  11. 11:

  12. 12:         // 信号量释放Action

  13. 13:         final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);

  14. 14:         final Action0 singleSemaphoreRelease = new Action0() {

  15. 15:             @Override

  16. 16:             public void call() {

  17. 17:                 if (semaphoreHasBeenReleased.compareAndSet(false, true)) {

  18. 18:                     executionSemaphore.release();

  19. 19:                 }

  20. 20:             }

  21. 21:         };

  22. 22:

  23. 23:         // TODO 【2011】【Hystrix 事件机制】

  24. 24:         final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {

  25. 25:             @Override

  26. 26:             public void call(Throwable t) {

  27. 27:                 eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);

  28. 28:             }

  29. 29:         };

  30. 30:

  31. 31:         // 信号量 获得

  32. 32:         if (executionSemaphore.tryAcquire()) {

  33. 33:             try {

  34. 34:                 // 标记 executionResult 调用开始时间

  35. 35:                 /* used to track userThreadExecutionTime */

  36. 36:                 executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());

  37. 37:

  38. 38:                 // 获得 执行Observable

  39. 39:                 return executeCommandAndObserve(_cmd)

  40. 40:                         .doOnError(markExceptionThrown)

  41. 41:                         .doOnTerminate(singleSemaphoreRelease)

  42. 42:                         .doOnUnsubscribe(singleSemaphoreRelease);

  43. 43:             } catch (RuntimeException e) {

  44. 44:                 return Observable.error(e);

  45. 45:             }

  46. 46:         } else {

  47. 47:             return handleSemaphoreRejectionViaFallback();

  48. 48:         }

  49. 49:     } else {

  50. 50:         return handleShortCircuitViaFallback();

  51. 51:     }

  52. 52: }

  • 第 5 行 :TODO 【2003】【HOOK】

  • 第 8 行 :TODO 【2012】【链路健康度】

  • 第 10 行 :调用 #getExecutionSemaphore() 方法,获得信号量( TryableSemaphore )对象,在 「3. TryableSemaphore」 详细解析。

  • 第 13 至 21 行 :信号量释放 Action ,用于下面【执行命令 Observable】的 #doOnTerminate(Action) 和 #doOnUnsubscribe(Action) 方法( 见第 41 至 42 行 )。

  • 第 24 至 29 行 :TODO 【2011】【Hystrix 事件机制】

  • 第 32 行 :调用 TryableSemaphore#tryAcquire() 方法,信号量( TryableSemaphore )使用成功,在 「3. TryableSemaphore」 详细解析。

  • 第 36 行 :标记 executionResult 的调用开始时间。

  • 第 39 行 :调用 #executeCommandAndObserve() 方法,获得【执行命令 Observable】。在 「4. #executeCommandAndObserve(...)」 详细解析。

  • 第 43 至 45 行 :若发生异常,调用 Observable#error(Exception) 方法返回 Observable 。

  • 第 46 至 48 行 :信号量( TryableSemaphore )使用失败,调用 #handleSemaphoreRejectionViaFallback() 方法,处理信号量拒绝的失败回退逻辑,在 《Hystrix 源码解析 —— 命令执行(四)之失败回退逻辑》 详细解析。

  • 第 49 至 51 行 :链路处于熔断状态,调用 #handleShortCircuitViaFallback() 方法,处理链路熔断的失败回退逻辑,在 《Hystrix 源码解析 —— 命令执行(四)之失败回退逻辑》 详细解析。

3. TryableSemaphore

com.netflix.hystrix.AbstractCommand.TryableSemaphore ,Hystrix 定义的信号量接口。代码如下 :

  1. interface TryableSemaphore {

  2.    boolean tryAcquire();

  3.    void release();

  4.    int getNumberOfPermitsUsed();

  5. }

  • 从 API 上,Java 自带的 java.util.concurrent.Semaphore 都能满足,为什么不使用它呢?继续一起往下看。

TryableSemaphore 共有两个子类实现 :

  • TryableSemaphoreNoOp

  • TryableSemaphoreActual

3.1 TryableSemaphoreNoOp

com.netflix.hystrix.AbstractCommand.TryableSemaphoreNoOp无操作的信号量。代码如下 :

  1. /* package */static class TryableSemaphoreNoOp implements TryableSemaphore {

  2.    public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();

  3.    @Override

  4.    public boolean tryAcquire() {

  5.        return true;

  6.    }

  7.    @Override

  8.    public void release() {

  9.    }

  10.    @Override

  11.    public int getNumberOfPermitsUsed() {

  12.        return 0;

  13.    }

  14. }

  • 从实现上看, #tryAcquire() 方法,每次都返回的是 true ; #release() 方法,无任何操作。这个是为什么?在 Hystrix 里提供了两种执行隔离策略 :

    • Thread ,该方式不使用信号量,因此使用 TryableSemaphoreNoOp ,这样每次调用 #tryAcquire() 都能返回 true 。在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》 详细解析该方式。

    • Semaphore ,该方式使用信号量,因此使用 TryableSemaphoreActual ,这样每次调用 #tryAcquire() 根据情况返回 true/false 。在 「3.2 TryableSemaphoreActual」 详细解析。

3.2 TryableSemaphoreActual

com.netflix.hystrix.AbstractCommand.TryableSemaphoreActual真正的的信号量实现。不过实际上,TryableSemaphoreActual 更加像一个计数器。代码如下 :

  1. /* package */static class TryableSemaphoreActual implements TryableSemaphore {

  2.    protected final HystrixProperty<Integer> numberOfPermits;

  3.    private final AtomicInteger count = new AtomicInteger(0);

  4.    public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {

  5.        this.numberOfPermits = numberOfPermits;

  6.    }

  7.    @Override

  8.    public boolean tryAcquire() {

  9.        int currentCount = count.incrementAndGet();

  10.        if (currentCount > numberOfPermits.get()) {

  11.            count.decrementAndGet();

  12.            return false;

  13.        } else {

  14.            return true;

  15.        }

  16.    }

  17.    @Override

  18.    public void release() {

  19.        count.decrementAndGet();

  20.    }

  21.    @Override

  22.    public int getNumberOfPermitsUsed() {

  23.        return count.get();

  24.    }

  25. }

  • numberOfPermits 属性,信号量上限。 com.netflix.hystrix.strategy.properties.HystrixProperty是一个接口,当其使用类似 com.netflix.hystrix.strategy.properties.archaius.IntegerDynamicProperty 动态属性的实现时,可以实现动态调整信号量的上限,这就是上文提到的为什么不使用 java.util.concurrent.Semaphore 的原因之一。

  • count 属性,信号量使用数量。🙂,这是为什么说 TryableSemaphoreActual 更加像一个计数器 的原因。

  • 另一个不使用 java.util.concurrent.Semaphore 的原因,TryableSemaphoreActual 无阻塞获取信号量的需求,使用 AtomicInteger 可以达到更轻量级的实现。

3.3 #getExecutionSemaphore()

调用 #getExecutionSemaphore() 方法,获得信号量对象,代码如下 :

  1. /**

  2. * 执行命令(正常执行)信号量映射

  3. * KEY :命令名 {@link #commandKey}

  4. */

  5. /* each circuit has a semaphore to restrict concurrent fallback execution */

  6. protected static final ConcurrentHashMap<String, TryableSemaphore> executionSemaphorePerCircuit = new ConcurrentHashMap<String, TryableSemaphore>();

  7. protected TryableSemaphore getExecutionSemaphore() {

  8.    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {

  9.        if (executionSemaphoreOverride == null) {

  10.            TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());

  11.            if (_s == null) { // 不存在时,创建 TryableSemaphoreActual

  12.                // we didn't find one cache so setup

  13.               executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));

  14.                // assign whatever got set (this or another thread)

  15.                return executionSemaphorePerCircuit.get(commandKey.name());

  16.            } else {

  17.                return _s;

  18.            }

  19.        } else {

  20.            return executionSemaphoreOverride;

  21.        }

  22.    } else {

  23.        // return NoOp implementation since we're not using SEMAPHORE isolation

  24.        return TryableSemaphoreNoOp.DEFAULT;

  25.   }

  26. }

  • 根据执行隔离策略不同获取不同的信号量实现 :

    • 相同的 commandKey ,使用相同的 TryableSemaphoreActual 。

    • Thread ,该方式不使用信号量,因此使用 TryableSemaphoreNoOp 。

    • Semaphore ,该方式使用信号量,因此使用 TryableSemaphoreActual 。

4. #executeCommandAndObserve(...)

调用 #executeCommandAndObserve(...) 方法,获得【执行命令 Observable】。代码如下 :

  1.  1: private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {

  2.  2:     // TODO 【】

  3.  3:     final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

  4.  4:

  5.  5:     // TODO 【2007】【executionResult】用途

  6.  6:     final Action1<R> markEmits = new Action1<R>() {

  7.  7:         @Override

  8.  8:         public void call(R r) {

  9.  9:             if (shouldOutputOnNextEvents()) {

  10. 10:                 executionResult = executionResult.addEvent(HystrixEventType.EMIT);

  11. 11:                 eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);

  12. 12:             }

  13. 13:             if (commandIsScalar()) {

  14. 14:                 long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();

  15. 15:                 eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);

  16. 16:                 executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);

  17. 17:                 eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());

  18. 18:                 circuitBreaker.markSuccess();

  19. 19:             }

  20. 20:         }

  21. 21:     };

  22. 22:

  23. 23:     // TODO 【2007】【executionResult】用途

  24. 24:     final Action0 markOnCompleted = new Action0() {

  25. 25:         @Override

  26. 26:         public void call() {

  27. 27:             if (!commandIsScalar()) {

  28. 28:                 long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();

  29. 29:                 eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);

  30. 30:                 executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);

  31. 31:                 eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());

  32. 32:                 circuitBreaker.markSuccess();

  33. 33:             }

  34. 34:         }

  35. 35:     };

  36. 36:

  37. 37:     // 失败回退逻辑 Func1

  38. 38:     final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {

  39. 39:         @Override

  40. 40:         public Observable<R> call(Throwable t) {

  41. 41:             circuitBreaker.markNonSuccess();

  42. 42:             Exception e = getExceptionFromThrowable(t);

  43. 43:             executionResult = executionResult.setExecutionException(e);

  44. 44:             if (e instanceof RejectedExecutionException) {

  45. 45:                 return handleThreadPoolRejectionViaFallback(e);

  46. 46:             } else if (t instanceof HystrixTimeoutException) {

  47. 47:                 return handleTimeoutViaFallback();

  48. 48:             } else if (t instanceof HystrixBadRequestException) {

  49. 49:                 return handleBadRequestByEmittingError(e);

  50. 50:             } else {

  51. 51:                 /*

  52. 52:                  * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.

  53. 53:                  */

  54. 54:                 if (e instanceof HystrixBadRequestException) {

  55. 55:                     eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);

  56. 56:                     return Observable.error(e);

  57. 57:                 }

  58. 58:

  59. 59:                 return handleFailureViaFallback(e);

  60. 60:             }

  61. 61:         }

  62. 62:     };

  63. 63:

  64. 64:     // TODO 【2008】【请求缓存】

  65. 65:     final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {

  66. 66:         @Override

  67. 67:         public void call(Notification<? super R> rNotification) {

  68. 68:             setRequestContextIfNeeded(currentRequestContext);

  69. 69:         }

  70. 70:     };

  71. 71:

  72. 72:     Observable<R> execution;

  73. 73:     if (properties.executionTimeoutEnabled().get()) {

  74. 74:         execution = executeCommandWithSpecifiedIsolation(_cmd)

  75. 75:                 .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); // 超时

  76. 76:     } else {

  77. 77:         execution = executeCommandWithSpecifiedIsolation(_cmd);

  78. 78:     }

  79. 79:

  80. 80:     return execution.doOnNext(markEmits)

  81. 81:             .doOnCompleted(markOnCompleted)

  82. 82:             .onErrorResumeNext(handleFallback)

  83. 83:             .doOnEach(setRequestContext);

  84. 84: }

  • 第 3 行 :TODO 【2012】【请求上下文】

  • 第 6 至 21 行 :TODO 【2007】【executionResult】用途

  • 第 24 至 35 行 :TODO 【2007】【executionResult】用途

  • 第 38 至 62 行 :失败回退逻辑 Func1 ,在 《Hystrix 源码解析 —— 请求执行(四)之失败回退逻辑》 详细解析。

  • 第 65 至 70 行 :TODO 【2012】【请求上下文】

  • 第 72 至 78 行 :调用 #executeCommandWithSpecifiedIsolation(...) 方法,获得【执行命令 Observable】,在「5. #executeCommandWithSpecifiedIsolation(...)」 详细解析。

    • 若执行命令超时特性开启,调用 Observable#lift(HystrixObservableTimeoutOperator) 方法,实现执行命令超时功能。在 《Hystrix 源码解析 —— 命令执行(三)之执行超时》 详细解析。

  • 第 80 至 83 行 :返回【执行命令 Observable】。

5. #executeCommandWithSpecifiedIsolation(...)

调用 #executeCommandWithSpecifiedIsolation(...) 方法,获得【执行命令 Observable】。代码如下 :

  1.  1: private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {

  2.  2:     if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {

  3.  3:         // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)

  4.  4:         return Observable.defer(new Func0<Observable<R>>() {

  5.  5:             @Override

  6.  6:             public Observable<R> call() {

  7.  7:

  8.  8:                 // 标记 executionResult 执行已发生

  9.  9:                 executionResult = executionResult.setExecutionOccurred();

  10. 10:

  11. 11:                 // 设置 commandState 为 USER_CODE_EXECUTED

  12. 12:                 if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {

  13. 13:                     return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));

  14. 14:                 }

  15. 15:

  16. 16:                 // TODO 【2002】【metrics】

  17. 17:                 metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

  18. 18:

  19. 19:                 // TODO 【2009】【执行超时】

  20. 20:                 if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {

  21. 21:                     // the command timed out in the wrapping thread so we will return immediately

  22. 22:                     // and not increment any of the counters below or other such logic

  23. 23:                     return Observable.error(new RuntimeException("timed out before executing run()"));

  24. 24:                 }

  25. 25:

  26. 26:                 // 设置 线程状态 为 ThreadState.STARTED

  27. 27:                 if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {

  28. 28:                     // TODO 【2002】【metrics】

  29. 29:                     //we have not been unsubscribed, so should proceed

  30. 30:                     HystrixCounters.incrementGlobalConcurrentThreads();

  31. 31:                     threadPool.markThreadExecution();

  32. 32:

  33. 33:                     // TODO 【2010】【endCurrentThreadExecutingCommand】

  34. 34:                     // store the command that is being run

  35. 35:                     endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());

  36. 36:

  37. 37:                     // 标记 executionResult 使用线程执行

  38. 38:                     executionResult = executionResult.setExecutedInThread();

  39. 39:                     /**

  40. 40:                      * If any of these hooks throw an exception, then it appears as if the actual execution threw an error

  41. 41:                      */

  42. 42:                     try {

  43. 43:                         // TODO 【2003】【HOOK】

  44. 44:                         executionHook.onThreadStart(_cmd);

  45. 45:                         executionHook.onRunStart(_cmd);

  46. 46:                         executionHook.onExecutionStart(_cmd);

  47. 47:

  48. 48:                         // 获得 执行Observable

  49. 49:                         return getUserExecutionObservable(_cmd);

  50. 50:                     } catch (Throwable ex) {

  51. 51:                         return Observable.error(ex);

  52. 52:                     }

  53. 53:                 } else {

  54. 54:                     //command has already been unsubscribed, so return immediately

  55. 55:                     return Observable.empty();

  56. 56:                 }

  57. 57:             }

  58. 58:         }).doOnTerminate(new Action0() {

  59. 59:             @Override

  60. 60:             public void call() {

  61. 61:                 if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {

  62. 62:                     handleThreadEnd(_cmd);

  63. 63:                 }

  64. 64:                 if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {

  65. 65:                     //if it was never started and received terminal, then no need to clean up (I don't think this is possible)

  66. 66:                 }

  67. 67:                 //if it was unsubscribed, then other cleanup handled it

  68. 68:             }

  69. 69:         }).doOnUnsubscribe(new Action0() {

  70. 70:             @Override

  71. 71:             public void call() {

  72. 72:                 if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {

  73. 73:                     handleThreadEnd(_cmd);

  74. 74:                 }

  75. 75:                 if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {

  76. 76:                     //if it was never started and was cancelled, then no need to clean up

  77. 77:                 }

  78. 78:                 //if it was terminal, then other cleanup handled it

  79. 79:             }

  80. 80:         }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { // TODO 芋艿:Scheduler

  81. 81:             @Override

  82. 82:             public Boolean call() {

  83. 83:                 return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;

  84. 84:             }

  85. 85:         }));

  86. 86:     } else {

  87. 87:         return Observable.defer(new Func0<Observable<R>>() {

  88. 88:             @Override

  89. 89:             public Observable<R> call() {

  90. 90:                 // 标记 executionResult 执行已发生

  91. 91:                 executionResult = executionResult.setExecutionOccurred();

  92. 92:

  93. 93:                 // 设置 commandState 为 USER_CODE_EXECUTED

  94. 94:                 if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {

  95. 95:                     return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));

  96. 96:                 }

  97. 97:

  98. 98:                 // TODO 【2002】【metrics】

  99. 99:                 metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);

  100. 100:

  101. 101:                 // TODO 【2010】【endCurrentThreadExecutingCommand】

  102. 102:                 // semaphore isolated

  103. 103:                 // store the command that is being run

  104. 104:                 endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());

  105. 105:                 try {

  106. 106:                     // TODO 【2003】【HOOK】

  107. 107:                     executionHook.onRunStart(_cmd);

  108. 108:                     executionHook.onExecutionStart(_cmd);

  109. 109:

  110. 110:                     // 获得 执行Observable

  111. 111:                     return getUserExecutionObservable(_cmd);  //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw

  112. 112:                 } catch (Throwable ex) {

  113. 113:                     //If the above hooks throw, then use that as the result of the run method

  114. 114:                     return Observable.error(ex);

  115. 115:                 }

  116. 116:             }

  117. 117:         });

  118. 118:     }

  119. 119: }

  • 根据执行隔离策略不同,创建不同的【执行命令 Observable】。仔细对比下,大体逻辑都是相同的,差别在于执行隔离策略Thread 时,使用 RxJava Scheduler 以及对线程的处理。


  • 第 2 至 85 行 :执行隔离策略Thread


    • RxJava Scheduler ,在 《RxJava 源码解析 —— Scheduler》 有详细解析。

    • Observable#subscribeOn(Scheduler) ,在 《RxJava 源码解析 —— Observable#subscribeOn(Scheduler)》 有详细解析。

    • 调用 ThreadPool#getScheduler(Func0<Boolean>) 方法,获得 Hystrix 自定义实现的 RxJava Scheduler ,在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》 详细解析。

    • 第 30 至 31 行 :TODO 【2002】【metrics】

    • 第 35 行 :TODO 【2010】【endCurrentThreadExecutingCommand】

    • 第 38 行 :标记 executionResult 使用线程执行。

    • 第 44 至 46 行 :TODO 【2003】【HOOK】

    • 第 49 行 :调用 #getUserExecutionObservable(...) 方法,创建【执行命令 Observable】。

    • 第 50 至 52 行 :若发生异常,调用 Observable#error(Exception) 方法返回 Observable 。

    • 第 9 行 :标记 executionResult 执行已发生。

    • 第 12 至 14 行 :设置 commandState 为 USER_CODE_EXECUTED 。若设置失败,调用 Observable#error(Exception) 方法返回 Observable 。

    • 第 17 行 :TODO 【2002】【metrics】

    • 第 20 至 24 行 :TODO 【2009】【执行超时】

    • 第 27 行 :设置 threadState 为 ThreadState.STARTED 成功。

    • 第 53 至 56 行 :设置 threadState 为 ThreadState.STARTED 失败,执行命令此时已经被取消,调用 Observable#empty() 方法返回 Observable 。

    • 第 58 至 68 行 :调用 Observable#doOnTerminate(...) 方法,添加 Action0 。 #handleThreadEnd(...) 方法,点击 链接 查看。

    • 第 69 至 79 行 :调用 Observable#doOnUnsubscribe(...) 方法,添加 Action0 。

    • 第 80 至 85 行 :调用 Observable#subscribeOn(Scheduler) 方法,指定 Observable 自身在哪个调度器上执行。

  • 第 86 至 118 行 :执行隔离策略SEMAPHORE




    • 第 91 行 :[ 与第 9 行相同 ]。

    • 第 94 至 96 行 :[ 与第 12 至 14行相同 ]。

    • 第 99 行 :[ 与第 17 行类似 ]。

    • 第 104 行 :[ 与第 35 行相同 ]。

    • 第 107 至 108 行 :[ 与第 45 至 46 行相同 ]。

    • 第 111 行 :[ 与第 49 行相同 ]。

    • 第 112 至 115 行 :[ 与第 50 至 52 行相同 ]。

6. #getUserExecutionObservable(...)

调用 #getUserExecutionObservable(...) 方法,创建【执行命令 Observable】。代码如下 :

  1.  1: private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {

  2.  2:     Observable<R> userObservable;

  3.  3:

  4.  4:     try {

  5.  5:         userObservable = getExecutionObservable();

  6.  6:     } catch (Throwable ex) {

  7.  7:         // the run() method is a user provided implementation so can throw instead of using Observable.onError

  8.  8:         // so we catch it here and turn it into Observable.error

  9.  9:         userObservable = Observable.error(ex);

  10. 10:     }

  11. 11:

  12. 12:     return userObservable

  13. 13:             .lift(new ExecutionHookApplication(_cmd)) // TODO 【2003】【HOOK】

  14. 14:             .lift(new DeprecatedOnRunHookApplication(_cmd)); // 已废弃

  15. 15: }

  • 第 5 行 :调用 #getExecutionObservable() 方法,创建【执行命令 Observable】。 #getExecutionObservable() 是个抽象方法,代码如下 :

    
    


    • HystrixCommand 实现了该方法,在 「7. #getExecutionObservable」 详细解析。

    1. protected abstract Observable<R> getExecutionObservable();

  • 第 6 至 10 行 :若发生异常,调用 Observable#error(Exception) 方法返回 Observable 。


  • 第 12 至 14 行 :返回【执行命令 Observable】。




    • 第 13 行 :TODO 【2003】【HOOK】

7. #getExecutionObservable()

调用 HystrixCommand#getExecutionObservable() 方法,创建【执行命令 Observable】。代码如下 :

  1.  1: @Override

  2.  2: final protected Observable<R> getExecutionObservable() {

  3.  3:     return Observable.defer(new Func0<Observable<R>>() {

  4.  4:         @Override

  5.  5:         public Observable<R> call() {

  6.  6:             try {

  7.  7:                 return Observable.just(run());

  8.  8:             } catch (Throwable ex) {

  9.  9:                 return Observable.error(ex);

  10. 10:             }

  11. 11:         }

  12. 12:     }).doOnSubscribe(new Action0() {

  13. 13:         @Override

  14. 14:         public void call() {

  15. 15:             // 记录 执行线程

  16. 16:             // Save thread on which we get subscribed so that we can interrupt it later if needed

  17. 17:             executionThread.set(Thread.currentThread());

  18. 18:         }

  19. 19:     });

  20. 20: }

  21. 21:

  22. 22: protected abstract R run() throws Exception;

  • 第 3 至 11 行 :调用 Observable#defer(Func0<Observable<R>) 方法,创建【执行命令 Observable】。

    • 第 7 行 :调用 #run() 方法,运行正常执逻辑。通过 Observable#just(...) 方法,返回创建【执行命令 Observable】。

  • 第 12 至 19 行 :调用 #doOnSubscribe(...) 方法,添加 Action 。该操作记录执行线程( executionThread ) 。 executionThread 用于 HystrixCommand#queue() 方法,返回的 Future 结果,可以调用 Future#cancel(Boolean) 方法,点击 链接 查看该方法。

  • 第 22 行 : #run() 抽象方法,实现该方法,运行正常执逻辑

8. CommandState

com.netflix.hystrix.AbstractCommand.CommandState ,命令状态,代码如下 :

  1. protected enum CommandState {

  2.    NOT_STARTED, OBSERVABLE_CHAIN_CREATED, USER_CODE_EXECUTED, UNSUBSCRIBED, TERMINAL

  3. }

状态变迁如下图 :

熔断器 Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑

9. ThreadState

com.netflix.hystrix.AbstractCommand.ThreadState ,线程状态,代码如下 :

  1. protected enum ThreadState {

  2.   NOT_USING_THREAD, STARTED, UNSUBSCRIBED, TERMINAL

  3. }

状态变迁如下图 :

熔断器 Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑

666. 彩蛋

对 Hystrix 和 RxJava 慢慢更有感觉了。

柳暗花明又一村。

继续加油!

胖友,分享一波朋友圈可好!


发表评论