본문 바로가기

프로그래밍/RxJava

[RxJava] Subject 클래스 - AsyncSubject 클래스

Subject 클래스는 차가운 Observable뜨거운 Observable로 바꿔준다고 소개했다.

Subject 클래스의 특성은 Observable의 속성과 구독자의 속성이 모두 있다는 점이다.

 

즉, Observable처럼 데이터를 발행할 수 있고, 구독자처럼 발행된 데이터를 바로 처리할 수도 있다.

 

RxJava에서 제공하는 주요 Subject 클래스에는

AsyncSubject, BehaviorSubject, PublishSubject, ReplaySubject 등이 있다.

AsyncSubject 클래스

AsysncSubject 클래스는 Observable에서 발행한 마지막 데이터를 얻어 올 수 있는 Subject 클래스이다.

완료되기 마지막 데이터 데이터에만 관심이 있으며, 이전 데이터들은 무시된다.

 

AsyncSubject 클래스 마블 다이어그램

위의 마블다이어그램을 보다시피, 구독자의 시간 표시줄이 여러개인것을 볼 수 있다.

처리 순서는

 

1. 처음 구독자가 subscribe() 함수를 호출한다.

2. 이후에 '빨간 원', '초록 원'이 발행된 후 두번째 구독자가 subscribe() 함수를 호출한다.

3. 마지막으로 '파란 원'이 발행된 후 데이터 발행을 완료(onComplete 이벤트)한다.

 

여기서, 완료가 되기 전까지 구독자에게 데이터를 전달하지 않다가

완료가됨과 동시에 첫 번째와 두 번째 구독자에게 마지막 데이터를 발행하고 종료한다.

 

위의 마블다이어그램을 AsyncSubject 클래스를 활용하여 코드로 나타내보자.

public void marbleDiagram(){
        AsyncSubject<String> subject = AsyncSubject.create();

        //구독 시작
        subject.subscribe(data -> System.out.println("Subscriber #1 => " + data));

        subject.onNext("red");
        subject.onNext("green");

        //두번째 구독 시작
        subject.subscribe(data -> System.out.println("Subscriber #2 => " + data));
        subject.onNext("blue");
        subject.onComplete();
    }

AsyncSubject 객체인 subject는 정적 팬토리 함수인 create()로 생성한다.

Observable.create()와 같은 기능이다.

subscribe() 함수를 호출하여 구독을 시작한다.

 

그리고 onNext() 함수를 호출하여 데이터를 발행한다.

subject 변수에 String 타입으로 지정했으므로, String  타입으로 인자를 넣어야한다.

"red"와 "green" 데이터를 발행 후

 

두번째 구독자가 subscribe()를 호출한다.

"blue"라는 데이터를 onNext() 함수를 호출하여 발행 후, 마지막으로 onComplete() 함수를 호출한다.

 

이때, 마지막으로 입력된 데이터가 구독자에게 최종 전달된다.

[결과값]
Subscriber #1 => blue
Subscriber #2 => blue

 

AsyncSubject 클래스구독자로도 동작이 가능하다.

아래예제는 AsyncSubject가 Observable의 구독자로 동작하는 예제이다.

public void asSubscriber(){
        Float[] temperature = {10.1f, 13.4f, 12.5f};
        Observable<Float> source = Observable.fromArray(temperature);

        AsyncSubject<Float> subject = AsyncSubject.create();
        subject.subscribe(data -> System.out.println("Subscriber #1 => " + data));

        source.subscribe(subject);
    }
[결과값]
Subscriber #1 => 12.5

먼저 10.1f 등의 Float 타입 온도 데이터를 담는 Observable을 생성한다.

Observable 타입의 변수 이름은 source이다.

subject 변수에 AsyncSubject 객체를 생성하고 data를 수신할 수 있도록 subscribe() 함수를 호출한다.

마지막으로 subject 변수는 Observable인 source를 구독한다.

 

이게 어떻게 가능할까??

그 이유는 Subject 클래스Observable을 상속하고 동시에 Observable 인터페이스를 구현하기 때문이다.

 

마지막으로, AsyncSubject 클래스에서 onComplete() 함수를 호출한 후

구독할 때 어떻게 되는지 살펴보자

 public void afterComplete(){
        AsyncSubject<Integer> subject = AsyncSubject.create();
        subject.onNext(10);
        subject.onNext(11);
        subject.subscribe(data -> System.out.println("Subscriber #1 => " + data ));
        subject.onNext(12);
        subject.onComplete();
        subject.onNext(13);
        subject.subscribe(data -> System.out.println("Subscriber #2 => " + data ));
        subject.subscribe(data -> System.out.println("Subscriber #3 => " + data ));
    }
[결과값]
Subscriber #1 => 12
Subscriber #2 => 12
Subscriber #3 => 12

'10', '11' 이라는 데이터를 발행한 후

첫번째 구독자 subscribe() 함수를 호출한다.

그리고 이어서 '12'라는 데이터를 발행 후 onComplete() 함수를 호출했다.

 

Observable과 마찬가지로, onComplete() 함수 호출 이후에는 onNext() 이벤트를 무시한다.

그 다음 두번째, 세번재 구독자가 subscribe() 함수를 호출했다.

 

실행결과로 첫번째, 두번째, 세번째 구독자 모두 마지막 값 '12'를 전달 받게 된다.

 

AsyncSubject클래스를 이용한 전체 소스 코드 보러가기