博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RxJava2学习笔记(3)
阅读量:6526 次
发布时间:2019-06-24

本文共 2368 字,大约阅读时间需要 7 分钟。

接继续,今天来学习下zip(打包)操作

一、zip操作

@Test    public void zipTest() {        Observable.zip(Observable.create(emitter -> {            for (int i = 0; i < 10; i++) {                emitter.onNext(100 + i);            }        }), Observable.create(emitter -> {            for (int i = 0; i < 5; i++) {                emitter.onNext(new Character((char) (65 + i)));            }        }), (integer, character) -> integer + "" + character).subscribe(s -> System.out.println(s));    }

zip字面意义,就是打包操作,把多个Obserable合并在一起,形成一个新的Obserable,类似文件1、文件2 ... 文件n,合成一个新文件。上面这段代码的输出:

100A101B102C103D104E

第1个生产者,发射了10个数字(100~109),第1个生产者发射了5个字符(A~E),合并处理时,会把 “数字+字符",变成一个新字符串,然后继续发射。注意:这里有一个类似"木桶原理",即决定一个木桶能盛多少水的,永远是最短的那块木头。10发A型子弹 + 5发B型子弹,按1:1来合成,最终只有得到5发新型子弹。

 

二、限流

生产者-消费者模型中,有可能会遇到这样一种情况:生产者精力旺盛,狂生产数据,然后消费者力不从心,根本来不及处理,这样上游就堵住了,严重的话,可能导致内存耗尽。最简单的办法,就是把来不及处理的内容给扔掉(即:丢弃策略)。刚刚提到的zip操作中的木桶原理,就可以派上用场了。

@Test    public void zipTest1() throws InterruptedException {        Observable.zip(Observable.create(emitter -> {            for (int i = 0; ; i++) { //一直不停的发                emitter.onNext(i);            }        }).subscribeOn(Schedulers.newThread()), Observable.create(emitter -> {            for (int i = 0; i < 5; i++) {                emitter.onNext(0); //这里技巧性的处理:发1个0过去            }        }).subscribeOn(Schedulers.newThread()),                (BiFunction
) (i1, i2) -> (Integer) i1 + (Integer) i2) //1个数字+0,不影响原值 .subscribe(integer -> System.out.println(integer)); Thread.sleep(200); }

  输出:

01234

  如果是字符串,可以参考下面这样处理:

Observable.zip(Observable.create(emitter -> {                    for (int i = 0; ; i++) {                        emitter.onNext("A" + i);                    }                }).subscribeOn(Schedulers.newThread()), Observable.create(emitter -> {                    for (int i = 0; i < 5; i++) {                        emitter.onNext("");                    }                }).subscribeOn(Schedulers.newThread()),                (BiFunction
) (i1, i2) -> (String) i1 + (String) i2) .subscribe(s -> System.out.println(s)); Thread.sleep(200);

  输出:

A0A1A2A3A4

 

三、Flowable

刚才用zip这种"奇淫技巧"实现了限流,但其实rxjava还有更科学的做法(Flowable)。再思考一下“限流”这种场景,生产者太猛,一下喷出来的量太多,而消费者太弱,完全吸收不下。比较温和的方式,最好是生产者喷发前先问下消费者,你1次能接承受多大的量?我根据你的能力来

作者:
出处:
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
你可能感兴趣的文章
产品经理学习总结之技术和设计篇
查看>>
23种设计模式(15):备忘录模式
查看>>
java基础学习总结——IO流
查看>>
iOS获取APP ipa 包以及资源文件
查看>>
CentOS 7 关闭启动防火墙
查看>>
Vue-选项卡切换
查看>>
linux网络命令
查看>>
nodejs ejs 请求路径和静态资源文件路径
查看>>
4.1 State Snapshot Transfer
查看>>
C++小代码
查看>>
记一次思维转变的时刻
查看>>
远程桌面无法复制粘贴
查看>>
bzoj2754
查看>>
redis liunx下安装和配置
查看>>
Asp.Net MVC 学习心得 之 View
查看>>
STL - Map - 运行期自定义排序
查看>>
Oil Deposits
查看>>
poj3984 迷宫问题(简单搜索+记录路径)
查看>>
Linux 服务器buff/cache清理
查看>>
算法试题 及其他知识点
查看>>