摘要: 原创出处 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Hystrix 1.5.X 版本
???关注**微信公众号:【芋道源码】**有福利:
- RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
- 您对于源码的疑问每条留言都将得到认真回复。甚至不知道如何读源码也可以请教噢。
- 新的源码解析文章实时收到通知。每周更新一篇左右。
- 认真的源码交流微信群。
1. 概述
本文主要分享 Hystrix 执行命令方法。
建议 :对 RxJava 已经有一定的了解的基础上阅读本文。
在官方提供的示例中,我们看到 通过继承 抽象类,有四种调用方式:
方法 | |
---|---|
#execute() | 同步调用,返回直接结果 |
#queue() | 异步调用,返回 java.util.concurrent.Future |
#observe() | 异步调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果 |
#toObservable() | 未调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果 |
- 第四种方式,点击 查看笔者补充的示例。
推荐 Spring Cloud 书籍:
- 请支持正版。下载盗版,等于主动编写低级 BUG 。
- 程序猿DD ——
- 周立 ——
- 两书齐买,京东包邮。
2. 实现
// AbstractCommand.javaabstract class AbstractCommandimplements HystrixInvokableInfo , HystrixObservable { // ... 省略无关属性与方法 public Observable toObservable() { return Observable.defer(new Func0 >() { @Override public Observable call() { // .... } } } public Observable observe() { // us a ReplaySubject to buffer the eagerly subscribed-to Observable ReplaySubject subject = ReplaySubject.create(); // eagerly kick off subscription final Subscription sourceSubscription = toObservable().subscribe(subject); // return the subject that can be subscribed to later while the execution has already started return subject.doOnUnsubscribe(new Action0() { @Override public void call() { sourceSubscription.unsubscribe(); } }); }}// HystrixCommand.javapublic abstract class HystrixCommand extends AbstractCommand implements HystrixExecutable , HystrixInvokableInfo , HystrixObservable { // ... 省略无关属性与方法 public Future queue() { final Future delegate = toObservable().toBlocking().toFuture(); final Future f = new Future () { // ... 包装 delegate } // ... return f; } public R execute() { try { return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } } protected abstract R run() throws Exception;}
-
#toObservable()
方法 :未做订阅,返回干净的 Observable 。这就是为什么上文说“未调用” 。 -
#observe()
方法 :调用#toObservable()
方法的基础上,向 Observable 注册rx.subjects.ReplaySubject
发起订阅 。- ReplaySubject 会发射所有来自原始 Observable 的数据给观察者,无论它们是何时订阅的。感兴趣的同学可以阅读 。
-
#queue()
方法 :调用#toObservable()
方法的基础上,调用:Observable#toBlocking()
方法 :将 Observable 转换成阻塞的rx.observables.BlockingObservable
。BlockingObservable#toFuture()
方法 :返回可获得#run()
抽象方法执行结果的 Future 。#run()
方法 :子类实现该方法,执行正常的业务逻辑。
- BlockingObservable 在 详细解析。
-
#execute()
方法 :调用#queue()
方法的基础上,调用Future#get()
方法,同步返回#run()
的执行结果。 -
整理四种调用方式如下:
FROM
3. BlockingObservable
本小节为拓展内容,源码解析 RxJava ( 非 Hystrix ) 的 rx.observables.BlockingObservable
的实现,所以你可以选择:
- 1 ) 跳过本小节,不影响对本文的理解。
- 2 ) 选择阅读 ,理解 BlockingObservable 的原理。
- 3 ) 选择阅读本小节,理解 BlockingObservable 的原理以及实现。
666. 彩蛋
第一篇 Hystrix 正式的源码解析。
梳理 Hystrix 的源码还是蛮痛苦的,主要是因为对 RxJava 不够熟悉。
胖友,分享一波朋友圈可好!