이전에 Observable의 수많은 변형을 다루었다.
주어진 데이터를 발행하기위해 just()나 fromArray() 함수를 사용할 때도 있었고
PublishSubject, AsyncSubject 클래스 등 처럼 Observable이면서 옵서버도 되는 Subject 클래스도 살펴봤다.
이번에 살펴볼 ConnectableObservable 클래스는
Subject 클래스처럼 차가운 Observable을 뜨거운 Observable로 변환한다.
Observable을 여러 구독자에게 공유할 수 있으므로
원 데이터 하나를 여러 구독자에게 동시에 전달할 때 사용한다.
특이한 점은 subscribe() 함수를 호출해도 아무 동작이 일어나지 않는다는 점이다.
그럼 어떻게 발행해??
바로 connect() 함수를 호출한 시점 부터 subscribe() 함수를 호출한 구독자에게 데이터를 발행한다.
ConnectableObservable 객체를 생성하려면 먼저 publish() 함수를 호출해야한다.
이 함수는 여러 구독자에게 데이터를 발행하기 위해 connect() 함수를 호출하기 전까지
데이터 발행을 유예하는 역할을 하게 된다.
아래의 그림은 ConnectableObservable.publish() 함수의 마블 다이어그램이다.
보통은 Observable에 subscribe() 함수를 호출해야 데이터 발행을 시작하는데
이제는 아무런 일도 일어나지 않는다
오로지, connect() 함수를 호출해야지 그때까지 구독했던 구독자 모두에게 데이터를 발행한다.
그렇기에 connect() 함수 이후에 구독한 구독자에게는 구독 이후에 발생한 '파란색 원' 데이터부터 발행한다.
위 구조의 마블 다이어그램은 아래와 같은 소스 코드로 작성될 수 있다.
public void marbleDiagram() throws InterruptedException {
String[] dt = {"red","green","blue"};
Observable<String> balls = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map(Long::intValue)
.map(i -> dt[i])
.take(dt.length);
ConnectableObservable<String> source = balls.publish();
source.subscribe(data -> System.out.println("Subscriber #1 => " + data ));
source.subscribe(data -> System.out.println("Subscriber #2 => " + data ));
source.connect();
Thread.sleep(250L);
source.subscribe(data -> System.out.println("Subscriber #3 => " + data ));
Thread.sleep(100L);
}
[결과값]
Subscriber #1 => red
Subscriber #2 => red
Subscriber #1 => green
Subscriber #2 => green
Subscriber #1 => blue
Subscriber #2 => blue
Subscriber #3 => blue
배우지 않은 부분들이 있어서 코드가 조금 어렵다.
ConnectableObservable의 동작 방식만 이해하고 넘어가도록 하자.
일단 발행하려는 데이터는 "red", "green", "blue"이다.
Observable.interval()은 인자 2개를 받는데, 시간과 시간 단위이다.
즉, 100ms마다 데이터를 발행한다는 것이다.
그리고 map() 함수를 이용하여 0부터 발생하는 데이터를 바탕으로 각 데이터를 발행한다.
take()는 observable이 방출한 몇개의 데이터까지 발행할 것인지를 정해준다. (dt.length 만큼 발행한다)
ConnectableObservable 객체를 생성하기 위해서 publish() 함수를 호출한다.
그리고 첫번째와 두번째의 구독자가 추가된다.
connect() 함수를 호출하게 되면 데이터 발생을 시작하게 된다.
그다음 세번째 구독자가 나오기 전까지 sleep() 함수를 이용해서 250ms를 기다리고
세번째 구독자를 추가한다.
connect() 함수를 호출했으므로 이후에는 구독하면 다음의 데이터를 바로 수신이 가능하다.
sleep() 함수를 이용해서 100ms를 기다린 후
balls 객체의 데이터를 모두 발행하게 된다.
그리고 모두 구독 해지된다.
'프로그래밍 > RxJava' 카테고리의 다른 글
[RxJava] 데이터 발행자와 수신자 (0) | 2020.02.25 |
---|---|
[RxJava] Subject 클래스 - ReplaySubject 클래스 (0) | 2020.02.25 |
[RxJava] Subject 클래스 - PublishSubject 클래스 (0) | 2020.02.25 |
[RxJava] Subject 클래스 - BehaviorSubject 클래스 (0) | 2020.02.25 |
[RxJava] Subject 클래스 - AsyncSubject 클래스 (0) | 2020.02.20 |