Table of contents
Open Table of contents
什么是响应式编程?
如今,所有人都在讨论响应式编程(Reactive Programming),而且你想学习这个叫做响应式编程的新东西。可能你已经在一些地方使用过TA了,但你还是有些疑惑要阐明清楚。
在这篇文章里,我们将要学习关于响应式编程的基本概念。下一篇文章开始我们要真正的编程,还有学习如何把RxJava运用在Android程序开发中。
先要明白我们遇到的问题。为什么我们需要响应式编程?如果没有问题,那我们也不需要解决方法了。
为什么我们需要异步
最简单的答案就是我们想要提升用户体验。我们想要我们的应用更具有响应性。我们不想阻塞主线程,不想让应用运行缓慢,不想向用户提供糟糕的用户体验,想给用户平滑的用户体验。
为了让主线程自由运行,我们要把我们想要做的大量耗时的工作放在后台线程中。因为我们的移动设备不能做繁重的工作,所以我们想把繁重的工作和复杂的计算放在服务器进行。所以我们需要异步来进行网络操作。
评价矩阵
我们需要从库中得到些什么来处理所有的异步工作。你可以把下面四点当做异步库的评价矩阵。
- 执行清晰:如果我们把一大堆工作放在新线程执行,我们应该能控制TA。如果你将进行一些后台任务,你就要收集信息,做些准备。一旦你准备好了,你将可以立即开始后台任务。
- 线程管理简单:在异步任务中,线程管理是关键。我们经常要在后台线程执行的中间或结束时在主线程更新UI。因此,我们需要把工作从一个线程(后台线程)传递到另一个线程(这里是主线程)。因此,要能简单切换线程还有在需要的时候传递数据。
- 容易组合:理想情况下,后台线程在运行时,TA不应该依赖任何其他线程(尤其是UI线程)而且应该与其他线程保持独立,直到TA运行结束。但实际情况,我们有很多事情需要让线程相互依赖,如更新UI、改变数据库等。所以异步库应容易组合,给错误留下更少的发生空间。
- 副作用最小:当多线程同时工作时,一个线程应该对其他线程产生最小的副作用。这让你的代码容易阅读和理解,让错误更容易被追踪。
什么是响应式编程?
根据维基百科:
响应式编程是一种面向数据流动和改变传递的编程范式。这意味着使用编程语言能简单地传递静态和动态的数据流,而且这个基础执行模型会自动地通过数据流传递改变。
简而言之,在响应式编程中数据流由一个成分发送,然后响应式编程库提供的基础结构会把改变传递给那些注册了要接受这些数据改变的成分。长话短说:Rx由下面这3个关键点组成。
RX = OBSERVABLE + OBSERVER + SCHEDULERS
我们将逐一详细讨论这三点。
- Observable:Observable就是数据流。Observable把数据打包,使其能从一个线程传递到另一个线程。在TA的生命周期中,TA们基本上是周期性的发送数据或是只发送一次数据,这要看TA们的配置。有各种各样的操作符可以帮Observable发送某些特定类型的数据——根据具体的事件,但我们要在以后讨论TA。
- Observers:Observer消耗Observable发送的数据流。Observer通过
subscribeOn()
方法订阅Observable,来接收Observable发送的数据。当observable发送数据时,所有注册了的Observer会在onNext()
方法中接收数据。在这里可以进行各种操作,如解析JSON或更新UI。如果有错误从observable被抛出,observer会在onError()
中接收到。 - Schedulers:记住Rx是用来进行异步编程的,所以我们需要线程管理,这就是schedulers发挥作用的地方。Schedulers在Rx中是告诉Observable和Observer运行在哪个线程的成分。你可以使用
observeOn()
方法告诉observer在哪个线程观察。同样地,你可以用scheduleOn()
方法告诉observable应该运行在哪个线程。RxJava中已经有些默认的线程,像Schedulers.newThread()
会在后台创建一个新线程,Schedulers.io()
会在主线程执行代码。
3个简单的步骤来吧Rx运用在你的app中
让我们看一个简单的示例。TA解释了3个简单的步骤把响应式编程运用在你的app中。
Observable<String> database = Observable //Observable. This will emit the data
.just(new String[]{"1", "2", "3", "4"}); //Operator
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
//...
}
@Override
public void onError(Throwable e) {
//...
}
@Override
public void onNext(String s) {
//...
}
};
database.subscribeOn(Schedulers.newThread()) //Observable runs on new background thread.
.observeOn(AndroidSchedulers.mainThread()) //Observer will run on main UI thread.
.subscribe(observer); //Subscribe the observer
步骤-1 创建发送数据的observable:
这里数据库是一个observable,TA发送数据。在我们的例子中,TA发送字符串。just()
是一个操作符,这让在参数中提供的数据一个一个地发送。(我们将在接下来的文章中介绍操作符。所以不用担心。)
步骤-2 创建消耗数据的observer:
在上面的代码中,observer是一个消耗数据库observable发送的数据的observer。TA处理接收的的数据,也处理错误。
步骤-3 管理并发
在最后一步,我们定义了Schedulers来管理并发。subscribeOn(Schedulers.newThread())
告诉数据库observable运行在后台线程,observeOn(AndroidSchedulers.mainThread())
告诉observer运行在主线程。这是响应式编程的基本代码。
所以至此,你应该明白了为什么我们需要响应式编程,为什么我们需要TA们,还有怎样实现TA。在接下来的文章中,我们将学习如何使用RxJava和详细了解其操作符。
https://medium.com/@kevalpatel2106/code-your-next-android-app-using-rxjava-d1db30ac9fcc
把RxJava用在你的下一个Android应用中
RxJava在全世界的Android开发世界都是一个热门话题。唯一的问题就是TA们难以理解,特别是你从面向对象编程转为函数响应式编程。所以我写了一系列文章帮助你理解响应式编程中的基本概念。
在第一部分,我们知道了响应式编程的基本概念。如果你还没读过,推荐你去看一下,了解基本概念。
正如我们在第一部分讨论的,Rx由下面3个关键点组成。
RX = OBSERVABLE + OBSERVER + SCHEDULERS
让我们一个个创建出来,但首先要进行项目集成。
集成RxAndroid到项目
RxAndroid基本上是对RxJava关于Android的特性的包装,提供一些明确作用于Android而RxJava不能提供的。所以,如果你只想把Rx集成到你的Java工程中,你就不用引入RxAndroid的库。
这是RxJava和RxAndroid的Gradle依赖。
dependencies {
compile 'io.reactivex:rxandroid:1.2.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
compile 'io.reactivex:rxjava:1.1.6'
}
Marble Diagram
Rx使用marble diagram解释操作符怎么工作。Marble diagram很方便且容易理解。
- 方框上的线表示原始数据。这些数据需要observable发送。不同类型的对象由不同的形状表示。
- 中间的方框代表操作符。还有其他很多操作符,TA们基本上就是控制observable何时、怎样发送数据。我们在将来再看这些操作符。
- 方框下面的线代表observable发送的的数据。RxJava中的observer接收这些发送的数据。
创建observable
就像你知道的,observable就是用来发送数据流的。下面的observable把1-5一个一个发送。
这里just()
就是操作符。TA只是发送参数中的值。(这就是为什么TA叫做just。)
Observable.just(1, 2, 3, 4, 5)
有时我们想提炼被observable发送的数据。比如上面一个例子,我们只想要observable发送的奇数数字。我们用filter()
操作符实现。就像名字那样,filter操作符过滤observable发送的项目。
Observable
.just(1, 2, 3, 4, 5)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
//check if the number is odd? If the number is odd return true, to emmit that object.
return integer % 2 != 0;
}
});
创建observer
observer消耗observable发送的数据。当observable发送数据时,所有注册了的observer都会接收到数据。
在RxJava中,在接收数据时有3个回调方法。
onNext()
:新的数据被发送的时候,observer会回调这个方法。observable发送的数据能在这个回调方法的参数中得到。onError()
:observable发生错误的时候,会回调这个方法。(毕竟这个世界并不完美。)onComplete()
:当observable发送完所有数据时,这个方法会被回调,表示没有数据要被发送了。
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("All data emitted.");
}
@Override
public void onError(Throwable e) {
System.out.println("Error received: " + e.getMessage());
}
@Override
public void onNext(Integer integer) {
System.out.println("New data received: " + integer);
}
};
在很多情况下用不到onComplete()
和onError()
方法。所以我们可以用一个简化的类Action1
,来定义我们在onNext()
中的操作,而不是使用Observer<T>
类。
Action1<Integer> onNextAction = new Action1<Integer>() {
@Override
public void call(Integer s) { //This is eqivelent to onNext()
System.out.println(s);
}
};
这里,onCall()
等价于第一种方法中的onNext()
。
使用Scheduler管理并发
scheduler在响应式编程中用来管理并发。
在Android中异步任务最常见的操作就是在主线程观察任务的结果,从而来更新UI。Android原生提供了AsyncTask来实现。在RxJava中,可以用schedulers来实现。
有两个方法来管理线程。
subscribeOn()
:指定observable运行的线程。observerOn()
:指定observer运行的线程。
RxJava和RxAndroid预置了一些scheduler。比如,Schedulers.io()
代表IO线程。而Schedulers.newThread()
将会创建一个新线程来运行observer/observable。你可以在这里找到各种schedulers。
最后,我们将用subscribe()
来订阅observer,让observer来接收observable发出的数据。这将返回Subscription
对象,这个Subscription
对象持有连接observer和observable的引用。
Observable<Integer> observable = Observable
.just(1, 2, 3, 4, 5)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
//check if the number is odd? If the number is odd return true, to emmit that object.
return integer % 2 != 0;
}
});
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("All data emitted.");
}
@Override
public void onError(Throwable e) {
System.out.println("Error received: " + e.getMessage());
}
@Override
public void onNext(Integer integer) {
System.out.println("New data received: " + integer);
}
};
Subscription subscription = observable
.subscribeOn(Schedulers.io()) //observable will run on IO thread.
.observeOn(AndroidSchedulers.mainThread()) //Observer will run on main thread.
.subscribe(observer); //subscribe the observer
让我们看下上面程序的运行结果。
我们可以从结果看出,只有奇数被observable发送出来了。最后,当所有数据发送完后,onComplete()
方法将会执行。
取消订阅
如果你想从observable取消observer的订阅,你可以调用unsubscribe。
subscription.unsubscribe();
在Android中,在onDestory()
中取消订阅来释放observer和observable的连接是至关重要的,否则可能会导致内存泄露。
如果你在类中有多个subscription,你可以使用CompositeSubscription
来一次取消所有的订阅。下面是一个实例:
public class MainActivity extends BaseActivity {
private CompositeSubscription mSubscription = new CompositeSubscription();
@Override
protected void onCreate(Bundle savedInstanceState) {
//...
//...
//...
mSubscription.add(subscription1); //Add subscription 1
mSubscription.add(subscription2); //Add subscription 2
}
@Override
public void onDestroy() {
super.onDestroy();
//Unsubscribe both subscriptions.
mSubscription.unsubscribe();
}
}
下一部分讲一些操作符。
https://medium.com/@kevalpatel2106/what-should-you-know-about-rx-operators-54716a16b310
关于Rx操作符你需要知道的所有
RxJava在Android开发中被谈得火热。不论在新手开发者还是有经验的开发者中都是一个热门话题。如果你不知道什么是响应式编程,不知道TA有什么用,我强烈推荐你看一下这个系列的第一篇文章:什么是响应式编程?
响应式编程在两方面有用:
- TA提供了超级简单的接口来处理你应用程序中的并发和线程调度。而且,TA让你的代码更加简洁易读。如果你不知道怎样在Android工程中使用RxJava,你可以看下把RxJava用在你的下一个Android应用中。
- 使用Rx操作符。Rx操作符的功能就是定义observable,怎样、何时发送数据流。在RxJava中有数百种操作符可用。你可以这里找到以字母排序的所有操作符的列表。// …TODO
有趣的事实
- 大多数作用在Observable上的操作符会返回Observable。这允许你一个接一个地链式地使用操作符。每一个操作符都会修改前一个操作符作用后的Observable。
- 链式的操作符不是独立作用于原始的Observable的,而是轮流作用。每一个操作符都会作用于前一个操作符作用后的Observable。
看第一个操作符。
有很多重复的操作符,有着几乎相同的功能。我会尽量同时讲到TA们。
创建Observable
1.just()
就像名字讲的那样,just操作符发送参数里的值。没什么其他的了。
Observable.just(1, 2, 3, 4, 5)
这里的Observable使用了just操作符。所以,observable将发送1-5的整数,一个接一个。
-
有另一个操作符
from()
,TA接收一个对象数组作为输入,然后就像just()
操作符一样一个一个发送对象。下面的代码片段使用from()
操作符发送1-5的整数。Observable.from(new Integer[]{1, 2, 3, 4, 5});
过滤操作符
- 过滤操作符基于一些表达式过滤数据流,只发送符合条件的数据。
- 这里,我将解释其中的一些。你可以在这里找到完整列表。
1.filter()
:
有时我们想精炼observable发出的数据。就像上面的例子,我们只想发送奇数数字。我们可以使用filter()
操作符来实现。
就像名字讲的那样,filter操作符过滤observable发送的项目。你要做的就是告诉操作符,根据情况发送还是不发送对象。
Observable
.just(1, 2, 3, 4, 5)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
//check if the number is odd? If the number is odd return true, to emmit that object.
return integer % 2 != 0;
}
});
2.skip()
skip(n)不让Observable发送前面n项,而是发送n后的元素。所以skip(2)
会跳过前两个元素,从第三个元素开始发送。
- 还有个操作符
skipLast()
。这个操作符只会发送数据流中的最后一个元素。
Observable<Integer> observable = Observable.from(new Integer[]{1,2,3,4,5}) //emit 1 to 5
.skip(2); //Skip first two elements
observable
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.io())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("Observer", "Output:" + integer);
}
});
//Output will be : 3, 4, 5
3.take()
与skip()
相反。
takeLast()
takeFirst()
联合操作符
1.concat()
Observable<Integer> observer1 = Observable.from(new Integer[]{1, 2, 3, 4, 5}); //Emit 1 to 5
Observable<Integer> observer2 = Observable.from(new Integer[]{6, 7, 8, 9, 10}); //Emit 6 to 10
Observable.concat(observer1, observer2) //Concat the output of both the operators.
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.io())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("Observer", "Output:" + integer);
}
});
2.merge()
3.zip()
//Class that combines both data streams
class ZipObject {
int number;
String alphabet;
}
Observable<Integer> observable1 = Observable.from(new Integer[]{1, 2, 3, 4, 5}); //Emits integers
Observable<String> observable2 = Observable.from(new String[]{"A", "B", "C", "D", "F"}); //Emits alphabets
Observable<ZipObject> observable = Observable.zip(observable1, observable2,
//Function that define how to zip outputs of both the stream into single object.
new Func2<Integer, String, ZipObject>() {
@Override
public ZipObject call(Integer integer, String s) {
ZipObject zipObject = new ZipObject();
zipObject.alphabet = s;
zipObject.number = integer;
return zipObject;
}
});
observable
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.io())
.subscribe(new Action1<ZipObject>() {
@Override
public void call(ZipObject zipObject) {
Log.d("Observer", "Output:" + zipObject.number + " " + zipObject.alphabet);
}
});
}
变换
map()
flatMap()
待