일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
- 알고리즘
- 크론 표현식
- 스택/큐
- @Getter
- Java
- K번째수
- H-index
- 모던 자바 인 액션
- @Setter
- 쿠버네티스
- 기능개발
- 코딩 테스트
- 영속 자료구조
- @EnableScheduling
- 다리를 지나는 트럭
- @Data
- 롬복 어노테이션
- @configuration
- 해시
- 스프링 스케쥴러
- 정렬
- 고차원 함수
- 완주하지 못한 선수
- 루씬 인 액션
- 가장 큰 수
- kubenetes
- 커링
- 프로그래머스
- 검색 기능 확장
- 전화번호 목록
- Today
- Total
Today I Learned
[모던 자바 인 액션] 15장. CompletableFuture와 리액티브 프로그래밍 컨셉의 기초 (2) 본문
[모던 자바 인 액션] 15장. CompletableFuture와 리액티브 프로그래밍 컨셉의 기초 (2)
하이라이터 2021. 10. 21. 01:2915.3 박스와 채널 모델
박스와 채널 모델은 동시성을 설계하고 계념화하기 위한 모델이다.
박스와 채널 모델을 이용하면 생각과 코드를 구조화할 수 있으며, 시스템 구현의 추상화 수준을 높일 수 있다.
박스로 원하는 연산을 표현하면 계산을 손으로 코딩한 결과보다 더 효율적일 것이다.
또한 병렬성을 직접 프로그래밍하는 관점을 콤비네이터를 이용해 내부적으로 작업을 처리하는 관점으로 바꿔준다.
위 태스크를 코드로 구현해보자.
int t = p(x);
System.out.println(r(q1(t), q2(t));
위 방식은 q1, q2를 차례로 호출하여 하드웨어 병렬성 활용과는 거리가 멀다.
int t = p(x);
Future<integer> a1 = executorService.submit(() -> q1(t));
Future<integer> a2 = executorService.submit(() -> q2(t));
System.out.println(r(a1.get(), a2.get());
박스와 채널 다이어그램의 모양상 p와 r을 Future로 감싸지 않았지만, 병렬성을 극대화하려면 모든 함수를 Future로 감싸야 한다.
많은 태스크가 get() 메서드를 호출해서 Future가 끝나기를 기다리게 되면 하드웨어의 병렬성을 제대로 활용하지 못하거나 데드락에 걸릴 수도 있다.
15.4 CompletableFuture와 콤비네이터를 이용한 동시성
CompletableFuture는 Future를 조합할 수 있는 기능이 있다.
ComposableFuture가 아닌 CompletableFuture라 부르는 이유는 실행할 코드 없이 Future를 만들 수 있고, complete() 메서드를 이용해 다른 스레드가 완료한 후에 get()으로 값을 얻을 수 있도록 허용하기 때문이다.
public class CFComplete {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFiexedthreadPool(10);
int x = 1337;
CompletableFuture<Integer> a = new CompletableFuture<>();
executorService.submit(()-> a.complete(f(x)));
int b = g(x);
System.out.println(a.get()+b);
executorService.shutdown();
}
}
f(x)와 g(x)를 동시에 실행해 합계를 구하는 위 코드에서, f(x)의 실행이 끝나지 않은 상황에서 get()을 기다리며 프로세싱 자원을 낭비할 수 있다.
ComposableFuture<T>의 thenCombine 메서드를 사용하면 연산 결과를 효과적으로 더할 수 있다.
ComposableFuture<V> thenCombine(CompletableFuture<U> other, Bifunction<T, U, V> fn)
이 메서드는 T, U 값을 받아 새로운 V값을 만든다.
public class CFComplete {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFiexedthreadPool(10);
int x = 1337;
CompletableFuture<Integer> a = new CompletableFuture<>();
CompletableFuture<Integer> b = new CompletableFuture<>();
CompletableFuture<Integer> c = a.thenCombine(b, (y, z) -> y + z);
executorService.submit(()-> a.complete(f(x)));
executorService.submit(()-> b.complete(g(x)));
System.out.println(c.get());
executorService.shutdown();
}
}
결과를 추가하는 세 번째 연산 c는 다른 두 작업이 끝날때까지 실행되지 않으므로 먼저 시작해서 블록되지 않는다.
이전 버전의 y+z 연산은 g(x)를 실행한 스레드에서 수행되어 f(x)가 완료될 때까지 블록될 여지가 있었다.
반면 thenCombine을 이용하면 f(x)와 g(x)가 끝난 다음에 덧셈 계산이 실행된다.
15.5 발행-구독 그리고 리액티브 프로그래밍
Future는 독립적 실행과 병렬성에 기반하므로, 한 번만 실행해 결과를 제공한다.
반면 리액티브 프로그래밍은 시간이 흐르면서 여러 Future 같은 객체를 통해 여러 결과를 제공한다.
또한 가장 최근의 결과에 대해 반응(react)하는 부분이 존재한다.
자바9에서는 java.util.concurrent.Flow 인터페이스에 발행-구독 모델을 적용해 리액티브 프로그래밍을 제공한다.
플로 API를 간단히 정리하면 다음과 같다.
- 구독자가 구독할 수 있는 발행자
- 이 연결을 구독(subscription)이라 한다.
- 이 연결을 이용해 메시지(또는 이벤트)를 전송한다.
15.5.1 두 플로를 합치는 예제
두 정보 소스로부터 발생하는 이벤트를 합쳐서 다른 구독자가 볼 수 있도록 발행하는 예제를 살펴보자.
스프레드 시트의 셀 C3에 "=C1+C2" 공식을 입력했을때 제공되는 동작이라고 볼 수 있다.
private class SimpleCell {
private int value = 0;
private String name;
public SimpleCell(String name){
this.name = name;
}
}
...
SimpleCell c2 = new SimpleCell("c2");
SimpleCell c1 = new SimpleCell("c1");
통신할 구독자를 인수로 받는 발행자 인터페이스와 구독자 인터페이스를 추가해보자.
interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
interface Subscriber<T> {
void onNext(T t);
}
이 두 개념을 합쳐보면 cell은 Publisher이며 동시에 Subscriber임을 알 수 있다.
public class SimpleCell implements Publisher<Integer>, Subscriber<Integer> {
private int value = 0;
private String name;
private List<Subscriber> subscribers = new ArrayList<>();
public SimpleCell(String name) {
this.name = name;
}
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
subscribers.add(subscriber);
}
private void notifyAllSubscribers() {
subscribers.forEach(subscriber -> subscriber.onNext(this.value));
}
@Override
public void onNext(Integer newValue) {
this.value = newValue;
System.out.println(this.name + ":" + this.value);
notifyAllSubscribers();
}
}
이어서 예제를 실행해보면,
SimpleCell c3 = new SimpleCell("c3");
SimpleCell c2 = new SimpleCell("c2");
SimpleCell c1 = new SimpleCell("c1");
c1.subscribe(c3);
c1.onNext(10);
c2.onNext(20);
//결과
//c1:10
//c2:20
//c3:10
왼쪽과 오른쪽의 연산 결과를 저장할 수 있는 별도 클래스를 만들면 'C3=C1+C2'도 구현할 수 있다.
public class ArithmeticCell extends SimpleCell {
private int left;
private int right;
public ArithmeticCell(String name) {
super(name);
}
public void setLeft(int left) {
this.left = left;
onNext(left + this.right);
}
public void setRight(int right) {
this.right = right;
onNext(right + this.left);
}
}
데이터가 발행자(생산자)에서 구독자(소비자)로 흐름에 착안해 이를 업스트림 또는 다운스트림이라 부른다. 위 예제에서 newValue는 업스트림 onNext() 메서드로 전달되고 notifyAllSubscribers() 호출을 통해 다운스트림 onNext() 호출로 전달된다.
15.5.2 역압력
매 초마다 수천개의 메시지가 onNext로 전달된다면 빠르게 전달되는 이벤트를 아무 문제 없이 처리할 수 있을까? 이러한 상황을 압력(pressure)이라 부른다.
이럴때는 정보의 흐름 속도를 제어하는 역압력 기법이 필요하다. 역압력은 Subscriber가 Publisher로 정보를 요청할 때만 전달할수 있도록 한다.
void onSubscribe(Subscription subscription);
위 메서드는 Subscribe 인터페이스에서 제공하며, Subscriber와 Publisher 사이에 채널이 연결되면 첫 이벤트로 이 메서드가 호출된다.
Subscription 객체는 Subscriber와 Publisher가 통신할 수 있는 메서드를 포함한다.
interface subscription {
void cancle ();
void request (long n);
}
Publisher는 Subscription 객체를 만들어 Subscriber로 전달하고 Subscriber는 이를 통해 Publisher로 정보를 보낼 수 있다.
15.5.3 실제 역압력의 간단한 형태
한 번에 한 개의 이벤트를 처리하도록 발행-구독 연결을 구성하기 위해 다음과 같은 작업이 필요하다.
- Subscriber가 onSubscribe로 전달된 Subscription 객체를 subscription 같은 필드에 로컬로 저장한다.
- Subscriber가 수많은 이벤트를 받지 않도록 onSubscribe, onNext, onError의 마지막 동작에 channel, request(1)을 추가해 오직 한 이벤트만 요청한다.
- 요청을 보낸 채널에만 onNext, onError 이벤트를 보내도록 Publisher의 notifyAllSubscribers 코드를 바꾼다.
위처럼 역압력을 구현하려면 장단점도 생각해야 한다.
- 여러 Subscriber가 있을 때 이벤트를 가장 느린 속도로 보낼 것인가? 아니면 각 Subscriber에게 보내지 않은 데이터를 저장할 별도 큐를 가질 것인가?
- 큐가 너무 커지면 어떻게 할까?
- Subscriber가 준비가 안되었다면 큐의 데이터를 폐기할 것인가?
소실 가능 여부 등 데이터의 성격에 따라 구현도 바뀔 것이다.
15.6 리액티브 시스템 vs 리액티브 프로그래밍
리액티브 시스템은 런타임 환경이 변화에 대응하도록 전체 아키텍처가 설계된 프로그램을 가리킨다. 리액티브 시스템이 가져야할 속성은 반응성(responsive), 회복성(resilient), 탄력성(elastic)의 세가지 속성으로 요약할 수 있다.
이러한 속성을 구현하는 방법 중 하나로 리액티브 프로그래밍을 이용할 수 있다.
'JAVA & Spring > 모던 자바 인 액션' 카테고리의 다른 글
[모던 자바 인 액션] 16장. CompletableFuture : 안정적인 비동기 프로그래밍 (2) (0) | 2021.11.03 |
---|---|
[모던 자바 인 액션] 16장. CompletableFuture : 안정적인 비동기 프로그래밍 (1) (0) | 2021.10.27 |
[모던 자바 인 액션] 15장. CompletableFuture와 리액티브 프로그래밍 컨셉의 기초 (1) (0) | 2021.10.18 |
[모던 자바 인 액션] 14장. 자바 모듈 시스템 (0) | 2021.10.06 |
[모던 자바 인 액션] 13장. 디폴트 메서드 (0) | 2021.09.28 |