本文共 2368 字,大约阅读时间需要 7 分钟。
接继续,今天来学习下zip(打包)操作
一、zip操作
@Test public void zipTest() { Observable.zip(Observable.create(emitter -> { for (int i = 0; i < 10; i++) { emitter.onNext(100 + i); } }), Observable.create(emitter -> { for (int i = 0; i < 5; i++) { emitter.onNext(new Character((char) (65 + i))); } }), (integer, character) -> integer + "" + character).subscribe(s -> System.out.println(s)); }
zip字面意义,就是打包操作,把多个Obserable合并在一起,形成一个新的Obserable,类似文件1、文件2 ... 文件n,合成一个新文件。上面这段代码的输出:
100A101B102C103D104E
第1个生产者,发射了10个数字(100~109),第1个生产者发射了5个字符(A~E),合并处理时,会把 “数字+字符",变成一个新字符串,然后继续发射。注意:这里有一个类似"木桶原理",即决定一个木桶能盛多少水的,永远是最短的那块木头。10发A型子弹 + 5发B型子弹,按1:1来合成,最终只有得到5发新型子弹。
二、限流
生产者-消费者模型中,有可能会遇到这样一种情况:生产者精力旺盛,狂生产数据,然后消费者力不从心,根本来不及处理,这样上游就堵住了,严重的话,可能导致内存耗尽。最简单的办法,就是把来不及处理的内容给扔掉(即:丢弃策略)。刚刚提到的zip操作中的木桶原理,就可以派上用场了。
@Test public void zipTest1() throws InterruptedException { Observable.zip(Observable.create(emitter -> { for (int i = 0; ; i++) { //一直不停的发 emitter.onNext(i); } }).subscribeOn(Schedulers.newThread()), Observable.create(emitter -> { for (int i = 0; i < 5; i++) { emitter.onNext(0); //这里技巧性的处理:发1个0过去 } }).subscribeOn(Schedulers.newThread()), (BiFunction
输出:
01234
如果是字符串,可以参考下面这样处理:
Observable.zip(Observable.create(emitter -> { for (int i = 0; ; i++) { emitter.onNext("A" + i); } }).subscribeOn(Schedulers.newThread()), Observable.create(emitter -> { for (int i = 0; i < 5; i++) { emitter.onNext(""); } }).subscribeOn(Schedulers.newThread()), (BiFunction
输出:
A0A1A2A3A4
三、Flowable
刚才用zip这种"奇淫技巧"实现了限流,但其实rxjava还有更科学的做法(Flowable)。再思考一下“限流”这种场景,生产者太猛,一下喷出来的量太多,而消费者太弱,完全吸收不下。比较温和的方式,最好是生产者喷发前先问下消费者,你1次能接承受多大的量?我根据你的能力来