Today I Learned

[스프링5를 활용한 리액티브 프로그래밍] 02 스프링을 이용한 리액티브 프로그래밍 - 기본 개념 (2) 본문

카테고리 없음

[스프링5를 활용한 리액티브 프로그래밍] 02 스프링을 이용한 리액티브 프로그래밍 - 기본 개념 (2)

하이라이터 2022. 9. 5. 18:20
728x90

리액티브 프레임워크 RxJava

RxJava 라이브러리는 Reactive Extensions(ReativeX)의 자바 구현체이다.

ReactiveX는 동기식 또는 비동기식 스트림과 관계없이 명령형 언어를 이용해 데이터 스트림을 조작할 수 있는 일련의 도구이다.

 

관찰자 + 반복자 = 리액티브 스트림

다음 코드를 통해 관찰자 패턴을 다시 요약해보자.

public interface Observer<T> {
  void notify(T event);
}

public interface Subject<T> {
  void registerObserver(Observer<T> observer);
  void unregisterObserver(Observer<T> observer);
  void notifyObservers(T event);
}

이 접근법은 무한한 데이터 스트림에선 매력적이었지만, 데이터 스트림의 끝을 알리는 기능이 있다면 더 좋았을 것이다.

또한 컨슈머가 준비하기 전에 프로듀서가 이벤트 생성하는 문제점도 있었다.

동기식 코드에선 반복자 패턴을 통해 해결할 수 있었다.

public interface Iterator<T> {
  T next();
  boolean hasNext();
}

관찰자 패턴에 의한 비동기식 실행과 이 아이디어를 혼합하면 어떻게 될까?

public interface RxObserver<T> {
  void onNext(T next);
  void onComplete();
  void onError(Exception e);
}

RxObserver는 Iterator와 매우 비슷하지만, next() 대신 onNext() 콜백에 의해 RxObserver에 새로운 값이 통지된다.

onComplete()를 통해 스트림의 끝을 알리고 오류를 전파하기 위해 콜백 onError()를 추가할 수 있다.

 

Observer 인터페이스는 관찰자 패턴의 Observer와 유사하며, 리액티브 스트림의 모든 컴포넌트 사이에 데이터가 흐르는 방법을 정의한다. 

리액티브 Observable 클래스는 관찰자 패턴의 Subject와 일치한다. Observable은 이벤트를 발생시킬 때 이벤트 소스 역할을 수행하며, 리액티브 스트림을 초기화하는 팩토리 메서드와 스트림 변환 메서드들이 있다.

Subcriber 추상 클래스는 Observer 인터페이스를 구현하고 이벤트를 소비한다.

RxJava는 이벤트 생성에 대한 규칙을 정의한다. 연결된 각 구독자에 대한 Observable은 onNext()를 호출해 0을 포함해 일정 개수의 이벤트를 보낼 수 있다. 그런 다음 onComplete()로 성공을 알리거나 onError()로 오류를 발생시켜 실행 종료를 알린다.

 

스트림의 생산과 소비

먼저 Observable 클래스로 표현되는 스트림을 정의해보자.

Observable<String> observale = Observable.create(
  new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> sub) { // (1)
      sub.onNext("Hello, reactive world!"); // (2)
      sub.onCompleted(); // (3)
    }
  }
);

현재 Observable은 구독하는 즉시 구독자에게 이벤트를 전파하는 일종의 이벤트 생성기라 할 수 있다.

  • (1) 구독자가 나타나자마자 적용될 콜백
  • 콜백이 호출되면 하나의 문자열(2)을 생성한 후 스트림 끝을 구독자에게 알림(3)

자바8의 람다식을 통해 개선하면 다음과 같다.

Observable<String> observable = Observable.create(
  sub -> {
    sub.onNext("Hello, reactive world!");
    sub.onCompleted();
  }
);
RxJava 1.2.7부터 Observable을 생성하는 방식은 더이상 사용되지 않는다. 이 방식은 생성하는 것들이 너무 많고 구독자에게 과도한 부하를 줄 수 있어 안전하지 않다. 즉, 이 방법은 배압(backpressure)을 지원하지 않는다.

 

그리고 다음과 같이 구독자가 필요하다.

Subscriber<String> subscriber = new Subscriber<String>() {
  @Override
  public void onNext(String s) { // (1)새 이벤트 발생
    System.out.println(s);
  }
  
  @Override
  public void onCompleted() { // (2)스트림 완료
    System.out.println("Done!");
  }

  @Override
  public void onError(Throwable e) { // (3)오류 처리
    System.err.println(e);
  }
};

이제 observable 인스턴스와 subscriber 인스턴스를 연결해보자.

observable.subscribe(subscriber);

위 코드를 실행하면 결과는 다음과 같다.

이렇게 해서 간단한 리액티브 애플리케이션을 작성했다.

람다 표현식을 사용하면 다음과 같이 작성할 수 있다.

Observable.create(
  sub -> {
    sub.onNext("Hello, reactive world!");
    sub.onCompleted();
  }
).subscribe(
  System.out::println,
  System.err::println,
  () -> System.out.println("Done!")
);

RxJava 라이브러리는 요소를 직접 등록하거나 배열이나 Iterable 컬렉션을 사용해서 Observable 인스턴스를 만들 수 있다.

Observable.just("1", "2", "3", "4");
Observable.from(new String[]{"A", "B", "C"});
Observable.from(Collections.emptyList());

다음 코드와 같이 Callable 또는 Future를 활용할 수도 있다.

Observable<String> hello = Observable.fromCallable(() -> "Hello "); // (1)Callable
Future<String> future = Executors.newCachedThreadPool().submit(() -> "World");
Observable<String> world = Observable.from(future); // (2)Future

단순한 생성 기능과 함께 Observable 스트림은 다른 Observable 인스턴스를 결합해 생성할 수 있으므로 복잡한 워크플로를 쉽게 구현할 수 있다.

다음은 concat()을 사용한 예제이다.

Observable.concat(hello, world, Observable.just("!"))
  .forEach(System.out::print);

이 코드는 다른 인수를 사용하는 Observable을 간단히 조합하고 forEach() 메서드를 사용해 결과를 반복 처리한다.

출력은 다음과 같다.


비동기 시퀀스 생성하기

RxJava는 하나의 이벤트 뿐만 아니라, 주기적으로 비동기 이벤트 시퀀스를 생성할 수 있다.

Observable.interval(1, TimeUnit.SECONDS)
  .subscribe(e -> System.out.println("Received: " + e));
Thread.sleep(5000);  // (1)

실행 결과는 다음과 같다.

Thread.sleep(...)(1)을 제거하면 아무것도 출력하지 않고 종료된다. 이는 이벤트가 생성되는 것과는 별개의 스레드에서 사용되기 때문이다. 그래서 메인 스레드가 실행을 끝내지 못하도록 sleep()이나 다른 방법을 이용해 종료를 지연시켜야 한다.

 

관찰자-구독자 협력을 제어하기 위한 다른 방법으로 Subscription 인터페이스를 사용할 수있다.

interface Subscription {
  void unsubscribe();
  boolean isUnsubscribed();
}

unsubscribe() 메서드를 사용하면 구독자는 Observable에게 새 이벤트를 보낼 필요가 없음, 즉 가입 취소를 알릴 수 있다.

반면 Observable은 isUnsubscribed()를 호출해 구독자가 여전히 이벤트를 기다리고 있는지 확인한다.

 

구독 취소 기능을 이해하기 위해 구독자가 이벤트가 관심 있는 유일한 당사자인 경우를 생각해보자.

CountDownLatch externalSignal = ...; // (1)

Subscription subscription = Observable // (2)
  .interval(100, MILLISECONDS) // (3)
  .subscribe(System.out::println);

externalSignal.await();
subscription.unsubscribe(); // (4)
  • 외부 신호가 CountDownLatch(1)에 의해 전파될 때까지 이벤트를 소비한다.
  • Subscription(2) 인스턴스를 참조하며, 입력 스트림은 100ms마다 0,1,2,3,... 시퀀스를 무한히 생성하는 이벤트(3)를 생성한다.
  • 이후 externalSignal 호출이 발생하고 구독을 취소(4)한다.

스트림 변환과 마블 다이어그램

Observable과 구독자만으로도 다양한 워크플로를 구현할 수 있지만, RxJava의 모든 기능은 연산자에 의해 구현된다고 할 수 있다.

연산자는 스트림의 원소를 조정하거나 스트림 구조 자체를 변경할 수 있다.

 

Map 연산자

<R> Observable<R> map(Func1<T, R> func)

이 선언은 func 함수가 타입 <T>를 타입 <R>로 변환하고, map을 통해 Observable<T>를 Observable<R>로 변화할 수 있음을 나타낸다.

마블 다이어그램으로 나타내면 다음과 같다.

map을 통해 원소가 하나씩 변환된다. 따라서 출력 스트림은 입력 스트림과 똑같은 개수의 원소를 가진다.

 

Filter 연산자

map 연산자와 달리 필터는 받는 것보다 적은 수의 원소를 생성할 수도 있다. 다음 다이어그램과 같이 조건부 테스트를 성공적으로 통과한 원소만 재발행한다.

 

Count 연산자

입력 스트림의 개수를 반환한다. 그러나 원본 스트림이 완료될 때까지 카운트가 발행되므로 스트림이 무한대일 때는 count 연산자가 완료되지 않거나 아무것도 반환하지 않을 것이다.

Zip 연산자

Zip 함수를 적용해 두 개의 병렬 스트림 값을 결합하기 때문에 동작이 복잡하다. 예상되는 결과의 일부가 다른 출처에서 발행될 때 데이터를 결합하는 데 자주 사용된다.

다음 코드를 통해 두 개의 문자열 값 스트림을 압축해보자.

Observable.zip(
  Observable.just("A", "B", "C"),
  Observable.just("1", "2", "3"),
  (x, y) -> x + y
).forEach(System.out::println);

출력은 다음과 같다.

리액티브 프로그래밍에서 일반적으로 사용되는 연산자에 대한 자세한 내용은 http://rxmarbles.com을 참고하면 된다.

 

RxMarbles: Interactive diagrams of Rx Observables

 

rxmarbles.com


RxJava 사용의 전제 조건과 이점

 

 

728x90
Comments