博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
熔断器 Hystrix 源码解析 —— 执行命令方式
阅读量:6194 次
发布时间:2019-06-21

本文共 2901 字,大约阅读时间需要 9 分钟。

hot3.png

摘要: 原创出处 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本


???关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Hystrix 执行命令方法

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

在官方提供的示例中,我们看到 通过继承 抽象类,有四种调用方式:

方法
#execute() 同步调用,返回直接结果
#queue() 异步调用,返回 java.util.concurrent.Future
#observe() 异步调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果
#toObservable() 未调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果
  • 第四种方式,点击 查看笔者补充的示例。


推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG
  • 程序猿DD ——
  • 周立 ——
  • 两书齐买,京东包邮。

2. 实现

// AbstractCommand.javaabstract class AbstractCommand
implements HystrixInvokableInfo
, HystrixObservable
{ // ... 省略无关属性与方法 public Observable
toObservable() { return Observable.defer(new Func0
>() { @Override public Observable
call() { // .... } } } public Observable
observe() { // us a ReplaySubject to buffer the eagerly subscribed-to Observable ReplaySubject
subject = ReplaySubject.create(); // eagerly kick off subscription final Subscription sourceSubscription = toObservable().subscribe(subject); // return the subject that can be subscribed to later while the execution has already started return subject.doOnUnsubscribe(new Action0() { @Override public void call() { sourceSubscription.unsubscribe(); } }); }}// HystrixCommand.javapublic abstract class HystrixCommand
extends AbstractCommand
implements HystrixExecutable
, HystrixInvokableInfo
, HystrixObservable
{ // ... 省略无关属性与方法 public Future
queue() { final Future
delegate = toObservable().toBlocking().toFuture(); final Future
f = new Future
() { // ... 包装 delegate } // ... return f; } public R execute() { try { return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } } protected abstract R run() throws Exception;}
  • #toObservable() 方法 :做订阅,返回干净的 Observable 。这就是为什么上文说“未调用”

  • #observe() 方法 :调用 #toObservable() 方法的基础上,向 Observable 注册 rx.subjects.ReplaySubject 发起订阅

    • ReplaySubject 会发射所有来自原始 Observable 的数据给观察者,无论它们是何时订阅的。感兴趣的同学可以阅读 。
  • #queue() 方法 :调用 #toObservable() 方法的基础上,调用:

    • Observable#toBlocking() 方法 :将 Observable 转换成阻塞rx.observables.BlockingObservable
    • BlockingObservable#toFuture() 方法 :返回可获得 #run() 抽象方法执行结果的 Future 。
      • #run() 方法 :子类实现该方法,执行正常的业务逻辑
    • BlockingObservable 在 详细解析。
  • #execute() 方法 :调用 #queue() 方法的基础上,调用 Future#get() 方法,同步返回 #run() 的执行结果。

  • 整理四种调用方式如下:

    FROM

3. BlockingObservable

本小节为拓展内容,源码解析 RxJava ( 非 Hystrix ) 的 rx.observables.BlockingObservable 的实现,所以你可以选择:

  • 1 ) 跳过本小节,不影响对本文的理解。
  • 2 ) 选择阅读 ,理解 BlockingObservable 的原理。
  • 3 ) 选择阅读本小节,理解 BlockingObservable 的原理以及实现。

666. 彩蛋

第一篇 Hystrix 正式的源码解析。

梳理 Hystrix 的源码还是蛮痛苦的,主要是因为对 RxJava 不够熟悉。

胖友,分享一波朋友圈可好!

转载于:https://my.oschina.net/sword4j/blog/1575449

你可能感兴趣的文章
CSS3 Border属性介绍
查看>>
inotify 文件同步整理
查看>>
centos编译安装squid3.4
查看>>
2012年7月的主要目标
查看>>
加密的类型及其相关算法--证书机构
查看>>
sqoop的导入工具使用
查看>>
mysql笔记1(转)
查看>>
Unity开发了一款开心消消乐
查看>>
《第一行代码》2day~Activity
查看>>
Juniper 基于路由的×××
查看>>
关闭sql server 2005远程连接
查看>>
***测试03------windows日常巡检,应急响应等总结
查看>>
delphi xe 绘图
查看>>
字符集修改、Linux时间同步、调整文件描述符
查看>>
OSI七层模型03——数据封装
查看>>
centos安装vsftp
查看>>
查看Nginx、apache、MySQL和PHP的编译参数
查看>>
cs4.1 编译与安装
查看>>
BitmapFactory。Options.inSampleSize用法
查看>>
2017.12.27 3周3次课
查看>>