일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
- 커링
- @Setter
- 크론 표현식
- @Getter
- @EnableScheduling
- 알고리즘
- 고차원 함수
- 다리를 지나는 트럭
- 코딩 테스트
- 스프링 스케쥴러
- 영속 자료구조
- kubenetes
- 롬복 어노테이션
- 프로그래머스
- H-index
- 스택/큐
- Java
- 기능개발
- K번째수
- 완주하지 못한 선수
- 전화번호 목록
- 해시
- @configuration
- 정렬
- 쿠버네티스
- 가장 큰 수
- 모던 자바 인 액션
- 루씬 인 액션
- 검색 기능 확장
- @Data
- Today
- Total
Today I Learned
[스프링5를 활용한 리액티브 프로그래밍] 02 스프링을 이용한 리액티브 프로그래밍 - 기본 개념 (1) 본문
[스프링5를 활용한 리액티브 프로그래밍] 02 스프링을 이용한 리액티브 프로그래밍 - 기본 개념 (1)
하이라이터 2022. 9. 5. 03:04이 장에서 다루는 내용
- 관찰자 패턴
- 스프링 서버에서 보낸 이벤트를 구현한 발행-구독(publish-Subscribe) 구현
- RxJava의 역사 및 기본 개념
- 마블(Marble) 다이어그램
- 리액티브 프로그래밍을 적용한 비즈니스 사례
- 리액티브 라이브러리의 현재 상황
리액티브를 위한 스프링 프레임워크의 초기 해법
관찰자(Observer) 패턴
관찰자 패턴은 관찰자라고 불리는 자손의 리스트를 가지고 있는 주체(Subject)를 필요로 한다.
주체는 일반적으로 자신의 메서드 중 하나를 호출해 관찰자에게 상태 변경을 알린다.
이 패턴은 이벤트 처리를 기반으로 시스템을 구현할 때 필수적이며, MVC 패턴의 중요한 부분이다.
관찰자 패턴을 사용하면 구성 요소 구현 세부 사항에 관계없이 런타임에 객체 사이에 일대다 의존성을 등록할 수 있다.
이런 유형의 통신은 일반적으로 단방향으로 이루어지며, 다음 다이어그램과 같이 시스템을 통해 효율적으로 이벤트를 배포하는데 도움이 된다.
다이어그램과 같이 관찰자 패턴은 Subject와 Observer 2 개의 인터페이스로 구성된다.
Observer는 Subject에 등록되고 Subject로부터 알림을 수신한다.
Subject 스스로 이벤트를 발생시키거나 다른 구성 요소에 의해 호출될 수 있다.
public interface Subject<T> {
void registerObserver(Observer<T> observer);
void unregisterObserver(Observer<T> observer);
void notifyObservers(T event);
}
이 제네릭 인터페이스는 타입 T를 사용해 프로그램의 타입 안정성을 향상시키고, 이벤트를 브로드캐스팅하는 구독 관리 메서드도 포함한다.
public interface Observer<T> {
void observe(T event);
}
Observer는 T 타입으로 매개변수화한 일반 인터페이스이며, 이벤트를 처리하는 observe 메서드만 가진다.
다음은 String 메시지를 수신해 출력하는 두 개의 간단한 Observer 구현체와 Subject 구현체이다.
public class ConcreteObserverA implements Observer<String> {
@Override
public void observe(String event) {
System.out.println("Observer A: " + event);
}
}
public class ConcreteObserverB implements Observer<String> {
@Override
public void observe(String event) {
System.out.println("Observer B: " + event);
}
}
public class ConcreteSubject implements Subject<String> {
private final Set<Observer<String>> observers = // (1)
new CopyOnWriteArraySet<>();
public void registerObserver(Observer<String> observer) {
observers.add(observer);
}
public void unregisterObserver(Observer<String> observer) {
observers.remove(observer);
}
public void notifyObservers(String event) { // (2)
observers.forEach(observer -> observer.observe(event)); // (2.1)
}
}
- notify를 받는 데 관심이 있는 Observer set(1)이 있고, registerObserver 및 unregisterObserver 메서드를 이용해 Set<Observer>의 수정(구독 또는 구독 취소)이 가능하다.
- notifyObservers 메서드(2)는 이벤트를 브로드캐스트하기 위해 각 Observer에 대해 반복적으로 observe() 메서드를 호출한다.
- 멀티 스레드 환경에서는 안정성을 위해 업데이트 작업이 발생할 때마다 새 복사본을 생성하는 set 구현체인 CopyOnWriteArraySet을 사용한다. 이는 비용이 많이 드는 작업이지만 구독자 목록은 자주 변경되지 않으므로 스레드 세이프한 구현을 위해 합리적인 선택이다.
관찰자 패턴 사용 예
간단한 JUnit 테스트를 작성해 클래스 연동을 확인해보자.
@Test
public void observersHandleEventsFromSubject() {
// given
Subject<String> subject = new ConcreteSubject();
Observer<String> observerA = Mockito.spy(new ConcreteObserverA());
Observer<String> observerB = Mockito.spy(new ConcreteObserverB());
// when
subject.notifyObservers("No listeners");
subject.registerObserver(observerA);
subject.notifyObservers("Message for A");
subject.registerObserver(observerB);
subject.notifyObservers("Message for A & B");
subject.unregisterObserver(observerA);
subject.notifyObservers("Message for B");
subject.unregisterObserver(observerB);
subject.notifyObservers("No listeners");
// then
Mockito.verify(observerA, times(1)).observe("Message for A");
Mockito.verify(observerA, times(1)).observe("Message for A & B");
Mockito.verifyNoMoreInteractions(observerA);
Mockito.verify(observerB, times(1)).observe("Message for A & B");
Mockito.verify(observerB, times(1)).observe("Message for B");
Mockito.verifyNoMoreInteractions(observerB);
}
위 테스트를 실행하면 다음과 같이 수신 메시지가 출력된다.
대기 시간이 긴 이벤트를 처리하는 관찰자가 많을 경우, notifyObservers 메서드를 다음과 같이 구현해서 스레드 풀을 사용해 메시지를 병렬로 전달할 수 있다.
private final ExecutorService executorService =
Executors.newCachedThreadPool();
public void notifyObservers(String event) {
observers.forEach(observer ->
executorService.submit(
() -> observer.observe(event)
)
);
}
그러나 이러한 개선은 비효율성 및 내재된 버그를 포함하는 파악하기 어려운 코드를 만드는 길일 수도 있다.
excutor가 현재의 작업을 마치지도 전에 클라이언트가 새로운 작업을 예약하도록 요청하는 상황에서 전점 더 많은 수의 스레드를 생성할 것이고, 스레드 풀 크기를 제한하지 않았다면 OutOfMemory가 발생될 것이다.
과도한 리소스 사용을 방지하기 위해 스레드 풀 크기를 제한하고 응용 프로그램의 라이브니스(liveness) 속성을 위반할 수도 있다.
결과적으로 다중 스레드를 위한 관찰자 패턴이 필요할 때는 검증된 라이브러리를 사용하는 것이 좋다.
java.util 패키지에 포함된 Observer 및 Observable 클래스는 JDK 1.0에서 릴리즈됐던 상당히 오래된 클래스들이다.
이러한 클래스는 자바 제네릭 이전에 도입됐기 때문에 Object 타입을 사용하고, 이로 인해 타입 안정성이 보장되지 않는다.
또한 이러한 구현 방식은 멀티 스레드 환경에서 효율적이지 않다.
이 클래스들이 자바9에서 더이상 사용되지 않는다는 점과 앞서 언급한 문제점을 고려할 때 새로운 응용프로그램을 개발할때 이 클래스들을 사용하지 말아야 한다.
@EventListener를 사용한 발행-구독 패턴
관찰자 패턴과 달리, 발행-구독 패턴에서 게시자와 구독자는 다음 그림과 같이 서로를 알 필요가 없다.
이벤트 채널은 수신 메시지를 구독자에게 배포하기 전에 필터링 작업을 할 수도 있다.
스프링 프레임워크는 이벤트 처리를 위한 @EventListener와 이벤트 발행을 위한 ApplicationEventPublisher 클래스를 제공한다.
@EventListener와 ApplicationEventPublisher가 관찰자 패턴의 변형으로 보일 수 있지만, 발행-구독 패턴을 구현한다는 것을 명확히 알 필요가 있다.
@EventListener은 토픽 기반 라우팅과 내용 기반 라우팅 모두에 사용할 수 있다. 메시지 유형은 토픽 역할을 할 수 있고, 조건 속성은 내용 기반 라우팅 이벤트 처리를 가능하게 한다.
@EventListner 활용한 응용 프로그램 개발
스프링 프레임워크에서 발행-구독 패턴을 활용하기 위해 예제로 온도 센서 애플리케이션을 구현해보자.
리액티브 디자인에 따라 애플리케이션을 만들어야 하므로 고전적인 방식의 풀링 모델(pulling model)을 사용해 데이터를 조회할 수는 없다.
대신 서버에서 클라이언트로 비동기 메시지를 전달할 수 있는 웹소켓(WebSocket) 및 SSE(Server-Sent Events)와 같은 잘 정의된 프로토콜을 사용할 수 있다.
스프링 부트 애플리케이션 만들기
예제 유스케이스를 구현하기 위해 스프링부트를 활용한 스프링 웹과 스프링 MVC를 사용하기로 한다.
스프링 이니셜라이저 웹사이트(https://start.spring.io/)에서 gradle 프로젝트를 구성하고 내려받을 수 있다.
Curl과 스프링 이니셜라이저 사이트의 HTTP API를 사용해 프로젝트를 생성할 수도 있다.
비즈니스 로직 구현하기
개발하고자 하는 대략적인 시스템 설계 내용은 다음과 같다.
이 예제에서 도메인 모델은 Temperature 클래스로만 구성되며, 온도를 저장하는 하나의 double 속성만 가진다.
final class Temperature {
private final double value;
// constructor & getter...
}
센서를 시뮬레이션하기 위한 TemperatureSensor 클래스를 구현하고 @Component를 붙여 스프링 빈으로 등록한다.
@Component
public class TemperatureSensor {
private final ApplicationEventPublisher publisher; // (1)
private final Random rnd = new Random(); // (2)
private final ScheduledExecutorService executor = // (3)
Executors.newSingleThreadScheduledExecutor();
public TemperatureSensor(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
@PostConstruct
public void startProcessing() { // (4)
this.executor.schedule(this::probe, 1, SECONDS);
}
private void probe() { // (5)
double temperature = 16 + rnd.nextGaussian() * 10;
publisher.publishEvent(new Temperature(temperature));
// schedule the next read after some random delay (0-5 seconds)
executor
.schedule(this::probe, rnd.nextInt(5000), MILLISECONDS); //(5.1)
}
}
- (1) 시뮬레이션된 온도 센서는 스프링 프레임워크에서 제공하는 ApplicationEventPublisher 클래스에만 의존
- (2) 불규칙적인 온도를 생성하기 위한 난수 발생기
- (3) 별도의 이벤트 생성 프로세스
- (4) @PostConstruct 애노테이션이 붙은 startProcessing() 메서드가 빈이 생성될 때 프레임워크에 의해 호출돼 온도 시나리오의 전체 시퀀스를 시작
- (5) 모든 로직은 probe()메서드에서 정의되며, 각 이벤트 생성은 임의의 지연 시간(5.1)후에 다음 이벤트 생성을 예약
스프링 웹 MVC를 이용한 비동기 HTTP 통신
서블릿 3.0에서 추가된 비동기 지원 기능은 컨테이너 스레드를 사용하는 방식으로 구현됐다.
실행 시간이 긴 작업에 유용하며, 스프링 웹 MVC에서 @controller는 단일 타입 T 이외에도 Callable<T> 또는 DeferredResult<T>도 반환할 수 있게 됐다.
Callable<T>는 컨테이너 스레드 외부에서도 실행될 수 있지만 여전히 블로킹 호출이다.
DeferredResult<T>는 setResult(T result) 메서드를 호출해 컨테이너 외부에서도 비동기 응답을 생성하므로 이벤트 루프 안에서도 사용할 수 있다.
스프링 웹 MVC 버전 4.2부터는 DeferredResult와 비슷하게 동작하는 ResponseBodyEmitter를 반환할 수 있다.
ResponseBodyEmitter는 메시지 컨버터에 의해 개별적으로 만들어진 여러 개의 오브젝트를 전달하는 용도로 사용할 수 있다.
SSeEmitter는 ResponseBodyEmitter를 상속했으며, 하나의 수신 요청에 대해 다수의 발신 메시지를 보낼 수 있다.
또한 스프링 웹 MVC는 StreamingResponseBody 인터페이스도 지원하는데, @Controller에서 반환될 때 데이터를 비동기적으로 보낼 수 있다. 서블릿 스레드를 차단하지 않으면서 큰 파일을 스트리밍해야 하는 경우 유용하다.
SSE 엔드포인트 노출
HTTP 통신을 위한 @RestController를 추가해 TemperetureController 클래스를 만들어보자.
@RestController
public class TemperatureController {
private final Set<SseEmitter> clients = new CopyOnWriteArraySet<>();// (1)
@RequestMapping(
value = "/temperature-stream", // (2)
method = RequestMethod.GET)
public SseEmitter events(HttpServletRequest request) { // (3)
SseEmitter emitter = new SseEmitter(); // (4)
clients.add(emitter); // (5)
// Remove emitter from clients on error or disconnect
emitter.onTimeout(() -> clients.remove(emitter)); // (6)
emitter.onCompletion(() -> clients.remove(emitter)); // (7)
return emitter; // (8)
}
@Async // (9)
@EventListener // (10)
public void handleMessage(Temperature temperature) { // (11)
List<SseEmitter> deadEmitters = new ArrayList<>(); // (12)
clients.forEach(emitter -> {
try {
emitter.send(temperature, MediaType.APPLICATION_JSON); // (13)
} catch (Exception ignore) {
deadEmitters.add(emitter); // (14)
}
});
clients.removeAll(deadEmitters); // (15)
}
}
- SseEmitter 클래스는 SSE 이벤트를 보내는 목적으로만 사용된다. SseEmitter.comple() 메서드가 호출되거나 오류 발생 또는 시간 초과가 발생할 때까지 요청 처리가 계속된다.
- URI /temprature-stream(2)에 대한 하나의 요청 핸들러(3)을 제공하고 새로운 SseEmiter 인스턴스(4)를 만들어 활성 클라이언트(5) 목록에 등록하고 SseEmitter(8)을 반환한다.
- CopyOnWriteArraySet 클래스(1)를 사용하여 목록에 대한 수정과 반복을 동시에 할 수 있다. 새로운 SSE 세션을 요청하면 client 컬렉션에 새로운 emitter를 추가하고, 처리가 끝나거나 timeout(6)(7)에 도달하면 client 컬렉션에서 자신을 제거한다.
- 웹 클라이언트와의 커뮤니케이션 채널을 유지한다는 것은 온도 변화에 대한 이벤트를 수신할 수 있다는 의미이고, 이를 위한 메서드가 handleMessage()(11)이다.
- @EventListner(10)은 스프링으로 이벤트를 수신하기 위한 붙여주며 @Async(9)는 메서드를 비동기 실행으로 표시하고, 이 메서드는 별도로 구성된 스레드 풀에서 호출한다.
- (13)수신받은 온도 이벤트들은 각 이벤트에 대해 병렬로 JSON형식의 메시지를 모든 클라이언트에 비동기 전송한다.
- 실패한 메시지들은 추적하고(14) 활성 클라이언트 목록에서 제거한다(15).
- SseEmitter는 오류 처리에 대한 콜백을 제공하지 않으며 send() 메서드에서만 발생하는 오류를 처리할 수 있다.
비동기 지원 설정하기
@EnableAsync // (1)
@SpringBootApplication // (2)
public class Application implements AsyncConfigurer {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public Executor getAsyncExecutor() { // (3)
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// (4)
executor.setCorePoolSize(2);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(5); // (5)
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){
return new SimpleAsyncUncaughtExceptionHandler(); // (6)
}
}
- 전체 코드를 실행하기 위해 @EnableAsync(1)에 의해 비동기 실행이 가능한 스프링 부트 애플리케이션(2)을 만든다.
- SynchronousQueue가 사용되어 동시성을 제한하기 때문에, 큐 용량(5)을 올바르게 구성하지 않으면 스레드 풀이 커질수 없다는 점에 유의해야 한다.
- 비공기 실행(6)에서 발생한 예외 처리와 비동기 처리를 위한 Exeutor를 생성하기에 적합하다.
SSE를 지원하는 UI 작성
마지막으로 서버와 통신할 수있는 간단한 HTML 페이지이다.
<body>
<ul id="events"></ul>
<script type="application/javascript">
function add(message) {
const el = document.createElement("li");
el.innerHTML = message;
document.getElementById("events").appendChild(el);
}
var eventSource = new EventSource("/temperature-stream"); // (1)
eventSource.onmessage = e => { // (2)
const t = JSON.parse(e.data);
const fixed = Number(t.value).toFixed(2);
add('Temperature: ' + fixed + ' C');
}
eventSource.onopen = e => add('Connection opened'); // (3)
eventSource.onerror = e => add('Connection closed');
</script>
</body>
- URI /temperature-stream을 가리키는 EventSource 객체(1)를 사용하며, onMessage() 함수(2)를 호출해 수신하는 메시지와 오류를 처리한다.
- 또한 동일한 방식으로 서버에 의해 수행되는 스트림 열기에 대해 반응한다(3).
기능 확인하기
코드를 실행하고 localhost로 접속하면 다음과 같은 결과를 볼수 있다.
웹 페이지는 클라이언트 및 서버 접속을 유지하며 이벤트를 수신하고, 네트워크 문제나 접속시간이 초과하는 경우 자동으로 재접속한다.
솔루션에 대한 평가
몇 줄의 코드만으로 탄력적인 리액티브 애플리케이션 구현에 성공했지만, 몇 가지 문제점이 있다.
먼저 스프링에서 제공하는 발행-구독 구조는 응용 프로그램 수명 주기 이벤트를 처리하기 위해 도입됐으며, 고부하 및 고성능 시나리오를 위한 것은 아니다.
하나의 온도 데이터 대신 수천, 수백만 개의 개별 스트림이 필요할 경우에는 적합하지 않다.
그리고 비즈니스 로직을 구현하기 위해 스프링 프레임워크의 내부 메커니즘을 사용하는 것은 위험하며, 스프링 컨텍스트를 로드하지 않고 단위 테스트를 하기 어렵다.
SseEmitter를 사용하면 스트림의 종료와 오류 처리에 대한 구현을 추가할 수 있지만, @EventListner는 그렇지 않다. 따라서 구성 요소 간의 스트림 종료와 오류 발생에 대한 별도의 구현을 추가해야돼서 해결 방법이 복잡해진다.
또 하나의 단점은 온도 이벤트를 비동기적으로 브로드캐스팅 하기 위해 스레드 풀을 사용한다는 것이다.
온도 센서는 클라이언트 수와 관계없이 하나의 이벤트 스트림만 생성하지만, 클라이언트가 하나도 없을 때도 이벤트는 발생한다. 이는 자원 낭비이고, 이벤트 발생에 많은 자원을 필요로 하다면 특히 그렇다.
이러한 문제를 해결하기 위해 이 목적만을 위해 설계된 리액티브 라이브러리가 필요하다.
'JAVA & Spring > 리액티브 프로그래밍' 카테고리의 다른 글
[스프링5를 활용한 리액티브 프로그래밍] 01 왜 리액티브 스프링인가? (0) | 2022.08.30 |
---|