본문 바로가기

Android 공부/rxJava 공부

RxJava 공부 2 - AsyncSubject, BehaviorSubject, PublishSubject, ReplaySubject

-1. 이전 글


2018/12/10 - [rxJava 공부] - RxJava 공부 1 - just, create, fromArray, interval, range, fromIterable, filter, map



0. 참고 사이트


http://reactivex.io/documentation/ko/subject.html



1. AsyncSubject


AsyncSubject는 소스 Observable로부터 배출된 마지막 값(만)을 배출하고 소스 Observalbe의 동작이 완료된 후에야 동작한다. (만약, 소스 Observable이 아무 값도 배출하지 않으면 AsyncSubject 역시 아무 값도 배출하지 않는다.)


1-1. 코드:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class MainActivity : AppCompatActivity() {
    var count = 0
 
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        executeAsyncSubject()
    }
 
    //print: check#1 : 3
    //print: check#2 : 3
    //onComplete()를 기준으로 값이 출력되는 것을 볼 수 있고
    //AsyncSubject의 특징은 마지막 onNext의 값을 볼 수 있어서 최종 결과 값을 보기에 용이함.
    private fun executeAsyncSubject(){
        val asyncSubject = AsyncSubject.create<String>()
        asyncSubject.subscribe { data -> Log.d("check#1",data)}
        asyncSubject.onNext("0")
        asyncSubject.onNext("1")
        asyncSubject.onNext("2")
        asyncSubject.subscribe{data->Log.d("check#2",data)}
        asyncSubject.onNext("3")
        asyncSubject.onComplete()
    }
}
 
cs


1-2. 결과:



1-3. 해석:


onComplete()를 기준으로 asyncSubject객체에 들어 있는 next의 값을 보여주는 것을 알 수 있음.

그래서 최종적인 결과 값을 출력하기에 용이하게 사용될 것이라고 생각함


2. BehaviorSubject


옵저버가 BehaviorSubject를 구독하기 시작하면, 옵저버는 소스 Observable이 가장 최근에 발행한 항목(또는 아직 아무 값도 발행되지 않았다면 맨 처음 값이나 기본 값)의 발행을 시작하며 그 이후 소스 Observable(들)에 의해 발행된 항목들을 계속 발행한다.


2-1. 코드:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class MainActivity : AppCompatActivity() {
    var count = 0
 
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        //val myObserver = MyObserver(this, lifecycle)
        setContentView(R.layout.activity_main)
        executeBehaviorSubject()
 
    }
 
 
    //----------------default//----------------
    //----------------subscribe check1----------------
    //print : check#1:start
    //print : check#1:0
    //print : check#1:1
    //print : check#1:2
    //----------------subscribe check2-------------------
    //print : check#2:2
    //print : check#1:3
    //print : check#2:3
    //print : check#1:4
    //print : check#1:4
    private fun executeBehaviorSubject(){
        //default 값이 없는 BehaviorSubject
        //val behaviorSubject = BehaviorSubject.create()
        
        // BehaviorSubject.createDefault 를 통해서 값이 없을 때 특정 값을 출력할 수 있음.
        // behaviorSubject는 onNext의 값이 변경될 때마다 subscribe를 출력함
        // 그렇기 때문에 각종 변화 값을 용이하게 캐치함과 동시에 default값을 통해서 각종 
        // null값 또한 캐치할 수 있을 것이라고 생각함
        val behaviorSubject = BehaviorSubject.createDefault("start")
        behaviorSubject.subscribe { data->Log.d("check#1",data)}
        behaviorSubject.onNext("0")
        behaviorSubject.onNext("1")
        behaviorSubject.onNext("2")
        behaviorSubject.subscribe{data->Log.d("check#2",data)}
        behaviorSubject.onNext("3")
        behaviorSubject.onNext("4")
    }
}
cs


2-2. 결과:



2-3. 해석:


onComplete()가 없더라도 onNext의 값이 변경될 때마다 subscribe가 호출 되고

onNext안의 데이터 값을 출력할 수 있다. 그에 따라서 네트워크를 통해서 데이터를 받아오는 코드를 넣게 되면

꾸준하게 데이터 값을 받아오고 UI를 변경시키는 코드를 작성할 수 있을 것이라고 생각한다.



3. PublishSubject


PublishSubject는 구독 이후에 소스 Observable(들)이 배출한 항목들만 옵저버에게 배출한다.

주의할 점은, PublishSubject는 (이를 막지 않는 이상) 생성 시점에서 즉시 항목들을 배출하기 시작할 것이고 이런 특성 때문에 주제가 생성되는 시점과 옵저버가 이 주제를 구독하기 시작하는 그 사이에 배출되는 항목들을 잃어 버릴 수 있다는 단점이 있다. 따라서, 소스 Observable이 배출하는 모든 항목들의 배출을 보장해야 한다면 Create을 사용해서 명시적으로 "차가운" Observable(항목들을 배출하기 전에 모든 옵저버가 구독을 시작했는지 체크한다)을 생성하거나, PublishSubject 대신 ReplaySubject를 사용해야 한다.


3-1. 코드 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class MainActivity : AppCompatActivity() {
    var count = 0
 
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        //val myObserver = MyObserver(this, lifecycle)
        setContentView(R.layout.activity_main)
        executePublishSubject()
 
    }   
 
    //--------------create--------------
    //--------------pass--------------
    //--------------pass--------------
    //---------subscribe check1---------
    //---------check#1 : 2---------
    //---------check#1 : 3---------
    //---------subscribe check1---------
    //---------check#1 : 4---------
    //---------check#2 : 4---------
    private fun executePublishSubject(){
        //executePublishSubject는 subject이후의 값을 트랙킹하는 객체입니다.
        //그렇기 때문에 subscribe 이전의 데이터는 생략합니다.
        //그렇기 때문에 0과 1은 check#1과 check#2가 생략합니다.
        //그리고 2,3 / 4를 출력하게 됩니다.
        val publishSubject =  PublishSubject.create<String>()
        publishSubject.onNext("0")
        publishSubject.onNext("1")
        publishSubject.subscribe{data->Log.d("check#1",data)}
        publishSubject.onNext("2")
        publishSubject.onNext("3")
        publishSubject.subscribe{data->Log.d("check#2",data)}
        publishSubject.onNext("4")
publishSubject.onComplete()
    }
}
cs

3-2. 결과:



3-3. 해석:

PublishSubject객체는 doc의 설명과 같이 subscribe 한 이후의 데이터를 출력하기에 용이하다.
그렇기 때문에 네트워크 통신을 해서 데이터를 가져온다고 한다면, 해당 네트워크 통신을 가져온 후의 데이터를 처리할 수 있는
로직을 만들 수 있을 것이라고 생각합니다. 그렇기 때문의 이전의 BehaviorSubject는 어떤 데이터 값이 필요한 통신과 어울린다면
PublishSubject는 네트워크 통신이 일어난 후에 UI를 갱신하는 로직과 어울린다고 생각한다.


4. ReplaySubject


ReplaySubject는 옵저버가 구독을 시작한 시점과 관계 없이 소스 Observable(들)이 배출한 모든 항목들을 모든 옵저버에게 배출한다.

ReplaySubject는 몇 개의 생성자 오버로드를 제공하는데 이를 통해, 재생 버퍼의 크기가 특정 이상으로 증가할 경우, 또는 처음 배출 이후 지정한 시간이 경과할 경우 오래된 항목들을 제거한다.


4-1. 코드:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class MainActivity : AppCompatActivity() {
    var count = 0
 
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        //val myObserver = MyObserver(this, lifecycle)
        setContentView(R.layout.activity_main)
        executeReplaySubject()
 
    }
 
    //-----------shout 1 -------------
    //-------subscribe check#1--------
    //------replay check#1 : 1--------
    //--------check#1 : 2-------------
    //--------check#1 : 3-------------
    //-------subscribe check#2--------
    //------replay check#2 : 1--------
    //------replay check#2 : 2--------
    //------replay check#2 : 3--------
    //--------check#1 : 4-------------
    //--------check#2 : 4-------------
    private fun executeReplaySubject(){
        //replaySubject는 이름과 같이 replay, 처음부터 다시 실행해서 subscribe를
        //동기화 시키는 것처럼 보인다.
 
        val replaySubject = ReplaySubject.create<String>()
        replaySubject.onNext("1")
        replaySubject.subscribe{data->Log.d("check#1",data)}
        replaySubject.onNext("2")
        replaySubject.onNext("3")
        replaySubject.subscribe{data->Log.d("check#2",data)}
        replaySubject.onNext("4")
        replaySubject.onComplete()
    }
}
cs




4-2. 결과:



4-3. 해석:


replaySubject는 해당 객체가 했던 행위를 똑같이 처리해야 할 때 필요한 기능이라고 생각한다.

그렇기 때문에 replaySubject를 통해서 동일한 행위를 하는 객체를 만들어서 update할 때 용이할 것이라고 생각한다.