Skip to content

译·RxJava

Posted on:March 16, 2020 at 10:30:09 GMT+8

Table of contents

Open Table of contents

什么是响应式编程?

如今,所有人都在讨论响应式编程(Reactive Programming),而且你想学习这个叫做响应式编程的新东西。可能你已经在一些地方使用过TA了,但你还是有些疑惑要阐明清楚。

在这篇文章里,我们将要学习关于响应式编程的基本概念。下一篇文章开始我们要真正的编程,还有学习如何把RxJava运用在Android程序开发中。

先要明白我们遇到的问题。为什么我们需要响应式编程?如果没有问题,那我们也不需要解决方法了。

为什么我们需要异步

最简单的答案就是我们想要提升用户体验。我们想要我们的应用更具有响应性。我们不想阻塞主线程,不想让应用运行缓慢,不想向用户提供糟糕的用户体验,想给用户平滑的用户体验。

为了让主线程自由运行,我们要把我们想要做的大量耗时的工作放在后台线程中。因为我们的移动设备不能做繁重的工作,所以我们想把繁重的工作和复杂的计算放在服务器进行。所以我们需要异步来进行网络操作。

评价矩阵

1_cbi_HDYYHDSp7tm4IzVlQg.png

我们需要从库中得到些什么来处理所有的异步工作。你可以把下面四点当做异步库的评价矩阵。

什么是响应式编程?

根据维基百科:

响应式编程是一种面向数据流动和改变传递的编程范式。这意味着使用编程语言能简单地传递静态和动态的数据流,而且这个基础执行模型会自动地通过数据流传递改变。

简而言之,在响应式编程中数据流由一个成分发送,然后响应式编程库提供的基础结构会把改变传递给那些注册了要接受这些数据改变的成分。长话短说:Rx由下面这3个关键点组成。

RX = OBSERVABLE + OBSERVER + SCHEDULERS

我们将逐一详细讨论这三点。

3个简单的步骤来吧Rx运用在你的app中

1_-N6sUYNrO615PQCeKtP0Kw.png

让我们看一个简单的示例。TA解释了3个简单的步骤把响应式编程运用在你的app中。

Observable<String> database = Observable      //Observable. This will emit the data
                .just(new String[]{"1", "2", "3", "4"});    //Operator

 Observer<String> observer = new Observer<String>() {
           @Override
            public void onCompleted() {
                //...
            }

            @Override
            public void onError(Throwable e) {
                //...
            }

            @Override
            public void onNext(String s) {
                //...
            }
        };

database.subscribeOn(Schedulers.newThread())          //Observable runs on new background thread.
        .observeOn(AndroidSchedulers.mainThread())    //Observer will run on main UI thread.
        .subscribe(observer);                         //Subscribe the observer

步骤-1 创建发送数据的observable:

这里数据库是一个observable,TA发送数据。在我们的例子中,TA发送字符串。just()是一个操作符,这让在参数中提供的数据一个一个地发送。(我们将在接下来的文章中介绍操作符。所以不用担心。)

步骤-2 创建消耗数据的observer:

在上面的代码中,observer是一个消耗数据库observable发送的数据的observer。TA处理接收的的数据,也处理错误。

步骤-3 管理并发

在最后一步,我们定义了Schedulers来管理并发。subscribeOn(Schedulers.newThread())告诉数据库observable运行在后台线程,observeOn(AndroidSchedulers.mainThread())告诉observer运行在主线程。这是响应式编程的基本代码。

所以至此,你应该明白了为什么我们需要响应式编程,为什么我们需要TA们,还有怎样实现TA。在接下来的文章中,我们将学习如何使用RxJava和详细了解其操作符。


https://medium.com/@kevalpatel2106/code-your-next-android-app-using-rxjava-d1db30ac9fcc

把RxJava用在你的下一个Android应用中

RxJava在全世界的Android开发世界都是一个热门话题。唯一的问题就是TA们难以理解,特别是你从面向对象编程转为函数响应式编程。所以我写了一系列文章帮助你理解响应式编程中的基本概念。

在第一部分,我们知道了响应式编程的基本概念。如果你还没读过,推荐你去看一下,了解基本概念。

正如我们在第一部分讨论的,Rx由下面3个关键点组成。

RX = OBSERVABLE + OBSERVER + SCHEDULERS

让我们一个个创建出来,但首先要进行项目集成。

集成RxAndroid到项目

RxAndroid基本上是对RxJava关于Android的特性的包装,提供一些明确作用于Android而RxJava不能提供的。所以,如果你只想把Rx集成到你的Java工程中,你就不用引入RxAndroid的库。

这是RxJava和RxAndroid的Gradle依赖。

最新版本

dependencies {
    compile 'io.reactivex:rxandroid:1.2.1'
    // Because RxAndroid releases are few and far between, it is recommended you also
    // explicitly depend on RxJava's latest version for bug fixes and new features.
    compile 'io.reactivex:rxjava:1.1.6'
}

Marble Diagram

Rx使用marble diagram解释操作符怎么工作。Marble diagram很方便且容易理解。

1_-RggxXyfw1M3CrYkzGTRDw.png

创建observable

就像你知道的,observable就是用来发送数据流的。下面的observable把1-5一个一个发送。

这里just()就是操作符。TA只是发送参数中的值。(这就是为什么TA叫做just。)

img

Observable.just(1, 2, 3, 4, 5)

有时我们想提炼被observable发送的数据。比如上面一个例子,我们只想要observable发送的奇数数字。我们用filter()操作符实现。就像名字那样,filter操作符过滤observable发送的项目。

1_1FlWy5mNEKsgpzwVDARX6A.png

 Observable
    .just(1, 2, 3, 4, 5)
    .filter(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer integer) {
            //check if the number is odd? If the number is odd return true, to emmit that object.
            return integer % 2 != 0;
        }
    });

创建observer

observer消耗observable发送的数据。当observable发送数据时,所有注册了的observer都会接收到数据。

在RxJava中,在接收数据时有3个回调方法。

Observer<Integer> observer = new Observer<Integer>() {
    @Override
    public void onCompleted() {
        System.out.println("All data emitted.");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Error received: " + e.getMessage());
    }

    @Override
    public void onNext(Integer integer) {
        System.out.println("New data received: " + integer);
    }
};

在很多情况下用不到onComplete()onError()方法。所以我们可以用一个简化的类Action1,来定义我们在onNext()中的操作,而不是使用Observer<T>类。

Action1<Integer> onNextAction = new Action1<Integer>() {
    @Override
    public void call(Integer s) { //This is eqivelent to onNext()
        System.out.println(s);
    }
};

这里,onCall()等价于第一种方法中的onNext()

使用Scheduler管理并发

scheduler在响应式编程中用来管理并发。

在Android中异步任务最常见的操作就是在主线程观察任务的结果,从而来更新UI。Android原生提供了AsyncTask来实现。在RxJava中,可以用schedulers来实现。

有两个方法来管理线程。

RxJava和RxAndroid预置了一些scheduler。比如,Schedulers.io()代表IO线程。而Schedulers.newThread()将会创建一个新线程来运行observer/observable。你可以在这里找到各种schedulers。

最后,我们将用subscribe()来订阅observer,让observer来接收observable发出的数据。这将返回Subscription对象,这个Subscription对象持有连接observer和observable的引用。

Observable<Integer> observable = Observable
  .just(1, 2, 3, 4, 5)
  .filter(new Func1<Integer, Boolean>() {
      @Override
      public Boolean call(Integer integer) {
          //check if the number is odd? If the number is odd return true, to emmit that object.
          return integer % 2 != 0;
      }
  });

Observer<Integer> observer = new Observer<Integer>() {
  @Override
  public void onCompleted() {
      System.out.println("All data emitted.");
  }

  @Override
  public void onError(Throwable e) {
      System.out.println("Error received: " + e.getMessage());
  }

  @Override
  public void onNext(Integer integer) {
      System.out.println("New data received: " + integer);
  }
};

Subscription subscription = observable
  .subscribeOn(Schedulers.io())       //observable will run on IO thread.
  .observeOn(AndroidSchedulers.mainThread())      //Observer will run on main thread.
  .subscribe(observer);               //subscribe the observer

让我们看下上面程序的运行结果。

1_6ZDEDXeWTWsRAsYbXIpjVw.png

我们可以从结果看出,只有奇数被observable发送出来了。最后,当所有数据发送完后,onComplete()方法将会执行。

取消订阅

如果你想从observable取消observer的订阅,你可以调用unsubscribe。

subscription.unsubscribe();

在Android中,在onDestory()中取消订阅来释放observer和observable的连接是至关重要的,否则可能会导致内存泄露。

如果你在类中有多个subscription,你可以使用CompositeSubscription来一次取消所有的订阅。下面是一个实例:

public class MainActivity extends BaseActivity {
    private CompositeSubscription mSubscription = new CompositeSubscription();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
      //...
      //...
      //...

      mSubscription.add(subscription1); //Add subscription 1
      mSubscription.add(subscription2); //Add subscription 2
    }

  @Override
  public void onDestroy() {
      super.onDestroy();

      //Unsubscribe both subscriptions.
      mSubscription.unsubscribe();
  }
}

下一部分讲一些操作符。


https://medium.com/@kevalpatel2106/what-should-you-know-about-rx-operators-54716a16b310

关于Rx操作符你需要知道的所有

RxJava在Android开发中被谈得火热。不论在新手开发者还是有经验的开发者中都是一个热门话题。如果你不知道什么是响应式编程,不知道TA有什么用,我强烈推荐你看一下这个系列的第一篇文章:什么是响应式编程

响应式编程在两方面有用:

有趣的事实

看第一个操作符。

有很多重复的操作符,有着几乎相同的功能。我会尽量同时讲到TA们。

创建Observable

1.just()

就像名字讲的那样,just操作符发送参数里的值。没什么其他的了。

1_tLdm40Irt49HmeNlWS6RYQ.png

Observable.just(1, 2, 3, 4, 5)

这里的Observable使用了just操作符。所以,observable将发送1-5的整数,一个接一个。

过滤操作符

1.filter():

有时我们想精炼observable发出的数据。就像上面的例子,我们只想发送奇数数字。我们可以使用filter()操作符来实现。

1_1FlWy5mNEKsgpzwVDARX6A.png

就像名字讲的那样,filter操作符过滤observable发送的项目。你要做的就是告诉操作符,根据情况发送还是不发送对象。

 Observable
    .just(1, 2, 3, 4, 5)
    .filter(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer integer) {
            //check if the number is odd? If the number is odd return true, to emmit that object.
            return integer % 2 != 0;
        }
    });

2.skip()

skip(n)不让Observable发送前面n项,而是发送n后的元素。所以skip(2)会跳过前两个元素,从第三个元素开始发送。

1_hLtkGczgXU4p2kS3LWiAJQ.png

Observable<Integer> observable = Observable.from(new Integer[]{1,2,3,4,5})  //emit 1 to 5
        .skip(2);   //Skip first two elements

observable
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.io())
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d("Observer", "Output:" + integer);
            }
        });
//Output will be : 3, 4, 5

3.take()

skip()相反。

1_hxLXh3oxWks8mwwWDmIybg.png

联合操作符

1.concat()

concat 2.png

Observable<Integer> observer1 = Observable.from(new Integer[]{1, 2, 3, 4, 5});  //Emit 1 to 5
Observable<Integer> observer2 = Observable.from(new Integer[]{6, 7, 8, 9, 10}); //Emit 6 to 10

Observable.concat(observer1, observer2) //Concat the output of both the operators.
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.io())
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d("Observer", "Output:" + integer);
            }
        });

2.merge()

1_6S4JWTiEWMOPZjHks3U-cg.png

3.zip()

1_egyq-cTJAc6y2G8KvsnAUg.png

//Class that combines both data streams
class ZipObject {
    int number;
    String alphabet;
}

Observable<Integer> observable1 = Observable.from(new Integer[]{1, 2, 3, 4, 5});  //Emits integers
Observable<String> observable2 = Observable.from(new String[]{"A", "B", "C", "D", "F"});  //Emits alphabets
Observable<ZipObject> observable = Observable.zip(observable1, observable2,
    //Function that define how to zip outputs of both the stream into single object.
    new Func2<Integer, String, ZipObject>() {
        @Override
        public ZipObject call(Integer integer, String s) {
            ZipObject zipObject = new ZipObject();
            zipObject.alphabet = s;
            zipObject.number = integer;
            return zipObject;
        }
    });

observable
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.io())
    .subscribe(new Action1<ZipObject>() {
        @Override
        public void call(ZipObject zipObject) {
            Log.d("Observer", "Output:" + zipObject.number + " " + zipObject.alphabet);
        }
    });
}

变换

map()

flatMap()