#15장 CompletableFuture와 리액티브 프로그래밍 컨셉의 기초
이 장의 내용
Thread, Future, 자바가 풍부한 concurrency API를 제공하도록 강요하는 진화의 힘
Asynchronous API
동시 컴퓨팅의 boxes와 channels view
CompletableFuture와 콤비네이터로 박스를 동적으로 연결
리액티브 프로그래밍용 자바 9 Flow API의 기초를 이루는 pub-sub 프로토콜
리액티브 프로그래밍과 리액티브 시스템
들어가기
병렬성이 아니라 동시성을 필요로 하는 상황에서, 스레드를 볼록하여 자원을 낭비하는 일은 피해야 한다
해결 도구 2가지
Future 인터페이스 (CompletableFuture 구현)
Flow API
이 장에서는 코드 보다 개념을 설명한다. 코드를 보고 싶으면, 16과 17장으로
스레드, 고수준 추상화, 스레드 풀, Future와 같은 동시성의 진화를 살펴보자.
15.1 concurrency를 구현하는 자바 지원의 진화
초기에는 Runnable과 Thread 이용하여 lock을 하였다.
자바 5에서 나온, ExecutorService
Runnable과 Callable 둘 다 실행 가능
병렬 프로그래밍 가능
자바 7에서, RecursiveTask - fork/join 구현이 가능하게 하여 동시성 지원
자바 9에서, pub-sub 프로토콜 지원하는 Flow API
15.1.1 스레드와 높은 수준의 추상화
스트림으로 외부 반복 대신 내부 반복을 통해 쉽게 병렬성 달성 = 높은 추상화
15.1.2 Executor와 스레드 풀
스레드 추상화의 기반에 해당하는 ExecutorService를 보자.
ExecutorService는 태스크를 제출하고 나중에 결과를 수집할 수 있는 인터페이스를 제공
하드웨어에 상응하는 태스크 수를 유지함과 동시에 수 천개의 태스크를 스레드 풀에 아무 오버헤드 없이 제출할 수 있다
TASK = Runnable, Callable
스레드풀 단점
태스크가 길어지면, 큐가 길어질 수 있다
block 상황에 들어가는 태스크는 스레드풀에 안넣는게 좋지만, 지킬 수 없는 상황도 있따.
15.1.3 스레드의 다른 추상화: 중첩되지 않은 메서드 호출
비동기 메서드: 메서드가 반환된 후에도 안에서 만들어진 태스크 실행이 계속되는 메서드
스레드는 setDaemon() 메서드를 이용해 Deamon또는 Non-Deamon으로 구분시킬 수 있다
Deamon: 애플리케이션이 종료될 때 강제 종료
Non-Deamon: 앱이 종료되지 않고 비대몬이 끊날때 까지 기다리게 한다.
15.1.4 스레드에 무엇을 바라는가?
목표
병렬성의 장점을 극대화
작은 태스크 단위로 구조화
e.g. 이전 장에서 봤던, 병렬 스트림 처리와 포크/조인
16,17장에서는 스레드를 조작하는 복잡한 코드를 구현하지 않고 메서드를 호출하는 방법을 살펴본다.
15.2 Synchronous와 asynchronous APIs
아래 f와 g가 오랜 시간이 걸린다고 가정하자
복잡한 코드 예시
Runnable 대신 Future API 인터페이스를 이용해 코드를 조금 더 단순화
하지만 여전히 명시적인 submit 메서드 호출 같은 불필요한 코드로 오염되었다.
Asynchronous API를 이용해 해결할 수 있다.
15.2.1 Future-style API
1번째 대안: Future를 리턴하게
다음과 같이 호출한다.
15.2.2 Reactive-style API
2번째 대안: 콜백 형식의 프로그래밍한다.
결과가 준비되면 이를 람다로 호출하는 태스크를 만드는 것
하지만, 호출 합계를 정확하지 않다.
이유: 락을 사용하지 않으므로 값을 두 번 출력할수 있을 뿐더러 때로는 +에 제공된 두 피연산자가 println이 호출되기 전에 업데이트될 수도 있다.
해결법
if-then-else으로 적절한 락 이용하여, 두 콜백이 모두 호출되었는지 확인한 다음 printin을 호출한다.
리액티브 형식의 API는 보통 한 결과가 아니라 일련의 이벤트에 반응하도록 설계되었으므로, 대안1인 Future를 이용하는 것이 더 적절하다. (Future 형식의 API는 일회성의 값을 처리하는데 적합하다)
15.2.3 재우는 것이나 다른 blocking 동작은 자제하자
sleep(): 여전히 자원을 점유한다. 잠자기 뿐만 아니라 블로킹도 마찬가지다
블록 동작 예시
Future에 get(): 즉, 다른 태스크가 어떤 동작을 완료하기를 기다리는 동작
키보드 입력 같은 사람의 상호작용을 기다림
DB 서버에서 읽기 작업을 대기
아래는 둘다 작업 1이 끝난 후 10초 대기후 작업 2를 시작하는 코드이다. 둘다 스레드 풀에서 실행된다고 할때, 차이가 무엇일까?
코드 A
코드 B
결론적으로, 코드 B가 더 좋다.
코드 A는 워커 스레드를 점유한 상태에서 아무것도 하지 않고 10 초를 잔다. (=자원 낭비) 반면, B는 다른 작업이 실행될 수 있도록 쓰레드를 양보했다가 10초뒤에 다시 가져온다.
15.2.4 현실 확인
지금까지 배운걸로 보았을때, 태스크를 분할하고, 비동기 호출을 이용한다면 최상의 병렬성을 낼 수 있을 것이다. 하지만, 현실에서는 제약이 많다.
개선된 Concurrancy API를 사용하길 권장한다.
15.2.5 비동기 API에서 예외는 어떻게 처리하는가?
Future를 구현한 CompletableFuture에서는 런타임 메서드인 get()에 예외를 처리할 수 있는 기능을 제공하며 예외에서 회복할 수 있도록 exceptionally() 같은 메서드도 제공한다. (16장에서 자세히)
리액티브 형식의 비동기 API에서는 return 대신 기존 콜백이 호출되므로 예외가 발생했을 때 실행될 추가 콜백을 만들어 인터페이스를 바꿔야 한다.
콜백이 여러 개면 이를 따로 제공하는 것보다는 한 객체로 이 메서드를 감싸는 것이 좋다.
다양한 콜백을 위해, 한객체에 묶어서 사용하는, Subscriber를 이용할 수 있다.
키보드 장치에서 숫자를 읽는 작업으로 치면,
onNext: 번호 입력 됬음
onError: 잘못된 형식 입력
onComplete: 더 이상 처리할 데이터 없음
f에 적용하면 다음과 같이 바뀐다.
15.3 박스와 채널 모델
동시성 모델을 잘 설계하고 개념화하기 위한 그림이 박스와 채널
아래처럼 감싸는 형식이 박스와 같아서 이름이 그런것 같다. (메서드 안에 메서드)
위는 q1, q2를 차례로 호출하기에 병렬성 활용이 아니다.
future를 이용하면 해결.
하지만 내부 중첩적으로 박스가 계속 생기면서 future로 감싸다 보면, 문제가 생겨날 수 있다. (e.g., 병렬성 저하나 데드락)
CompletableFuture와 Combinators를 이요하면 문제를 해결 할 수 있다.
박스와 채널 모델은 병렬성을 직접 프로그래밍하는 관점을 콤비네이터를 이용해 내부적으로 작업을 처리하는 관점으로 바꿔준다
퀴즈
f(x), g(x)를 실행하는 두 개의 활성 스레드가 있는데 한 스레 드는 다른 스레드가 return 문을 실행해 종료될 때까지 기다렸다가 시작한다. 이상황에서, 스레드를 완벽하게 활용할 수 있는 태스크를 어떻게 구현할 수 있을까?
정답은 Future를 조합해, f(x)를 실행하는 한 태스크, g(x)를 실행하는 두 번째 태스크, 합계를 계산하는 세 번째 태스크 세 개를 이용한다.
15.4 CompletableFuture와 콤비네이터를 이용한 동시성
아래는 두 코드는 f(x)와 g(x) 동시에 실행해 합계를 구하는 코드이다. 문제는 무엇일까?
문제: 각각 g(x)나 f(x)가 끝날때를 기다린다 (자원낭비)
CompletableFuture를 이용하면 이 상황을 해결할 수 있다.
CompletableFuture = Future 인터페이스의 구현체
위 퀴즈에서 나온 개념처럼 적용해보자.
CompletableFuture에 thenCombine 메서드를 사용함으로 두 연산 결과를 더 효과적으로 더 할 수 있다. (16장에서 자세히)
결론: 많은 수의 Future를 사용해야 하는 경우, Future 대신 CompletableFuture 와 콤비네이터(thenCombine)를 이용해 get()에서 블록하지 않을 수 있고 그렇게 함으로 병렬 실행의 효율성은 높이고 데드락은 피하는 최상의 해결책을 구현할 수 있다.
15.5 Publish-subscribe와 리액티브 프로그래밍
Future와 CompletableFuture은 독립적 실행과 병렬성이라는 정식적 모델에 기반한다. 연산이 끝나면 get()으로 Future의 결과를 얻을 수 있다. 따라서 Future는 한 번만 실행해 결과를 제공한다.
반면 리액티브 프로그래밍은 시간이 흐르면서 여러 Future 같은 객체를 통해 여러 결과를 제공한다.
자바 9에서는 java.util.concurrent.Flow의 인터페이스에 발행-구독 모델을 적용해 리액티브 프로그래밍을 제공한다.
Flow API는 구독자가 구독할수 있는 발행자이며, 이 연결을 이용해 Message(또는 Event)를 전송한다.
15.5.1 두 flow 를 합치는 예제
두 소스로부터 발생하는 이벤트를 합쳐서 다른 구독자가 볼 수 있도록 발행하는 예
Cell 초기화
c1 이나 c2의 값이 바뀌었을 때 c3이 두 값을 더하도록 어떻게 지정할 수 있을까?
아래 인터페이스가 필요
사실 Cell은 Publisher 이며 동시에 Subscriber 이다. 따라서 다음과 같이 코드가 나온다.
C3는 직접 C1을 구독하므로 다음과 같은 결과가 나온다.
'C3=C1+C2'은 어떻게 구현할까? 아래와 같이 왼쪽 오른쪽 연산을 저장하는 클래스가 필요하다.
결과
C3이 즉시 반응해 자신의 값을 갱신한다는 사실을 확인할 수 있다.
(pub-sub의 장점으로) 그래프 처럼 아래와 같이 'C5=C3+C4'을 추가할 수도 있다.
결과
15.5.2 역압력 (Backpressure)
Flow 인터페이스의 개념을 복잡하게 만든 2 가지 기능은 압력과 역압력이다.
매 초마다 수천 개의 메시지가 onNext로 전달되는 상황을 압력에 비유할 수 있다.
이를 해결하기 위해, 역압력 같은 기법이 필요하다. 예를 들어, 구독자가 수동적으로 받는게 아닌 능동적으로 Publisher로 부터 정보를 할 수 있다. (일종의 pull-based)
Publisher는 Subscription 객체를 만들어 Subscriber로 전딜하면 Subscriber는 이를 이용해 Publisher로 정보를 보낼 수 있다.
15.5.3 실제 Backpressure의 간단한 형태
한 번에 한 개의 이벤트를 처리하도록 pub-sub 연결을 구성하려면 다음과 작업이 필요하다.
Subscriber가 OnSubscribe로 전달된 Subscription 객체를 로컬로 저장한다.
Subscriber가 수많은 이벤트를 받지 않도록 onSubscribe, onNext, onError의 마지막 동 작에 channel.request(1)을 추가해 오직 한 이벤트만 요청한다.
요청을 보낸 채널에만 onNext, onError 이벤트를 보내도록 Publisher의 notifyAllSubscribers 코드를 바꾼다 (보통 여러 Subscriber가 자신만의 속도를 유지할 수 있도록 Publisher는 새 Subscription을 만들어 각 Subscriber와 연결한다).
15.6 Reactive 시스템 vs. reactive 프로그래밍
둘은 상당히 다르다. 리액티브 프로그래밍을 이용해 리액티브시스템을 구현할 수 있다.
요약
스레드 풀은 보통 유용하지만 블록되는 태스크가 많아지면 문제가 발생한다
메서드를 비동기로 만들면 병렬성을 확보 있으며 부수적으로 루프를 최적화한다.
박스와 채널 모델을 이용해 비동기 시스템을 시각화할 수 있다.
CompletableFuture 클래스는 한 번의 비동기 연산을 표현한다. 콤비네이터로 비동기 연 산을 조합함으로 Future를 이용할 때 발생했던 기존의 블로킹 문제를 해결할 수 있다.
Flow API는 발행-구독 프로토콜, 역압력을 이용하면 자바의 리액티브 프로그래밍의 기 초를 제공한다.
Last updated