#17장 리액티브 프로그래밍
이 장의 내용
리액티브 프로그래밍을 정의하고 리액티브 매니패스토를 확인함
과거의 잘못된 행적을 솔직히 반성하며 새로운 미래를 위한 구체적 약속을 공개적인 방식으로 책임성을 담아 문서로서 선언하는 것
애플리케이션 수준, 시스템 수준의 리액티브 프로그래밍
리액티브 스트림, 자바 9플로 API를사용한예제코드
널리 사용되는 리액티브 라이브러리 RxJava 소개
여러 리액티브 스트림을 변환하고 합치는 RxJava 동작 살펴보기
리액티브 스트림의 동작을 시각적으로 문서화하는 마블 다이어그램
왜 리액티브 블라블라 가 필요하게 되었을까?
오늘날 상황이 변했다.
빅데이터
많은 모바일 디바이스와 수천개 멀티코어 프로세서 애플리케이션 배포 환경
1년 내내 가능한 고객의 서비스 요구
4가지 매니페스토 퀴즈
매핑하시오. ( Elastic Responsive Resilient Message-driven )
시스템은 적시에 응답해야 한다.
시스템은 느슨한 결합을 보장하기 위해 구성 요소 간에 비동기 메시지 전달을 사용해야 한다.
시스템은 높은 부하에서도 응답성을 유지해야 한다.
일부 구성 요소가 장애가 발생해도 시스템은 응답을 유지해야 한다.
리액티브 애플리케이션 vs 리액티브 시스템
잘 이해안됨
리액티브 시스템
아키텍처 스타일의 한 종류
애플리케이션을 조립하고 상호소통을 조절
위의 매니페스토를 따라, 작업 부하가 있으면 유동적으로 scale-up 하는 성질을 가짐
주요 속성: 메시지 주도
리액티브 애플리케이션
비교적 짧은 시간 동안만 유지되는 데이터 스트림에 기반한 연산을 수행하며 보통 이벤트 주도로 분류된다.
요약
리액티브 애플리케이션이 모여 시스템이 되는 것 같다.
앞으로 배울 리액티브 프로그래밍 필요한 이유와 일반적 사실들
스레드는 비싸고 귀한 자원이다. (할당하고 해제하는 것도 비싸다)
리액티브 프로그래밍의 프레임워크나 라이브러리는 개발자가 레이스 컨디션, 데드락 같은 같은 저수준의 멀티스레드 문제를 직접 처리할 필요가 없어지면서 비즈니스 요구사항 구현에 더 집중할 수 있다.
스레드 풀을 쪼갤때는 블럭 동작을 넣지 않아야 한다.
RxJava, Akka 같은 리액티브 프레임워크는 별도로 지정된 스레드 풀에서 블록 동작을 실행시켜 이 문제를 해결한다.

리액티브 시스템은 여러 애플리케이션이 한 개의 일관적인, 회복할 수 있는 플랫폼을 구성할 수 있게 해줄 뿐 아니라 이들 애플리케이션 중 하나가 실패해도 전체 시스템은 계속 운영될 수 있도록 도와주는 소프트웨어 아키텍처다.
리액티브 스트림과 Flow API
리액티브 스트림 이란?
리액티브 프로그래밍을 하기 위해 사용된다.
무한의 비동기 데이터를 순서대로 그리고 블록하지 않는 역압력을 전제해 처리하는 표준 기술
역압력이란?
발행-구독 프로토콜에서 구독자가 느린 속도로 이벤트를 소비하면서 문제가 발생하지 않도록 보장하는 장치
사용함으로써
이벤트를 잃어버리는 문제 해결 가능
이벤트 수신을 늦추는 것, 가능한 수신량 알림기능, 남은일 처리 예측 시간 알림과 같은 기능으로 업스트림 구독자에게 알릴수 있다.
비동기 API를 이용하면 하드웨어 사용률을 극대화할 수 있지만 느린 다운스트림 컴포넌트에 너무 큰 부하를 줄 가능성도 생긴다. 역압력은 이 문제도 해결한다.
회사별 리액티브 스트림 자체 구현 사례
라이트밴드 - Akka Stream
넷플릭스 - RxJava
리액터 - Pivotal
레드햇 - Vert.x
Flow API란?
자바에서 위 4개를 기반으로 최소 기능 집합으로만 재정의한 표준 API = Flow API
Flow 클래스 소개
Flow 클래스가 포함하는 4개 인터페이스
언제 Subscriber의 onSubscribe()와 onNext()가 사용될까?
onSubscribe(): Subscriber가 Publisher에 자신을 등록할 때 Publisher는 처음으로 onSubscribe 메서드를 호출해 Subscription 객체를 전달할 수 있다.
onNext(): Publisher가 새로운 이벤트를 생성할 때마다 호출된다.
Subscription의 request()와 cancel() 메서드의 역할은?
request(): Publisher에게 주어진 개수의 이벤트를 처리할 준비가 되었음을 알리는데 사용
cancel(): 구독 취소 요청
작동 요약
https://www.baeldung.com/java-9-reactive-streams
샘플 애플리케이션 만들어보기
온도 정보 주기적으로 받아볼 수 있게 하는 애플리케이션을 만들어 보자
리액티브 애플리케이션 생명 주기 확인하기

여기 코드에선
Publisher: 팩토리 메소드로 바로 생성
Subscriber: TempSubscriber
Subscription: TempSubscription
기타
TempInfo: 온도 정보 담는 객체
특정 도시 온도 정보 구독 하기
온도 정보를 받아볼 예정인 구독자
온도 정보를 받아볼 수 있는 구독권
온도 정보 객체
결과 - 뉴욕의 온도를 네 번 성공적으로 전달했지만 다섯 번째에 에러가 발생
지금까지 개발한 코드에 작은 문제가 있다. 퀴즈를 보고 생각해보자.
퀴즈
지금까지 개발한 코드에 작은 문제가 있다. 하지만 이 문제는 Tempinfo 팩토리 메서드 내에서 에러를 임의로 발생시키는 코드 때문에 감춰진 상태다. 임의로 에러를 발생시키는 코드를 없앤 다음 main을 오래 실행하면 어떤 일이 일어날까?
답: TempSubscriber가 새로운 요소를 onNext 메서드로 받을 때마다 TempSubscription으로 새 요청을 보내면 request 메서드가 TempSubscriber 자신에게 또 다른 요소를 보내는 문제가 있다. 이런 재귀 호출은 스택이 오버플로 될때까지 반복해서 일어난다.
해결 방법
Executor를 TempSubscription으로 추가한 다음 다른 스레드에서 TempSubscriber로 세 요소를 전달한다.
이것 이해 잘 안된다.
Processor 데이터 변환하기
Processor 인터페이스의 사용법도 확인하자
목적: Publisher를 구독한 다음 수신한 데이터를 가공해 다시 제공하는 것
Subscriber 인터페이스를 구현하는 다른 모든 메서드는 단순히 수신한 모든 신호를 업스트림 Subscriber로 전달하며 Publisher의 subscribe 메서드는 업스트림 Subscriber를 Processor로 등록하는 동작을 수행한다.
Main: Publisher를 만들고 TempSubscriberB 구독시킴

실행 코드
왜 Flow API 구현을 제공하지 않는가?
Flow API가 구현을 재공하지 않아, 우리가 직접 인터페이스를 구현했다.
다른 사례를 보면, 자바는 List의 구현인 ArrayList를 제공한다.
왜 구현 제공을 안할까?
이유: API를 만들 당시 Akka, RxJava 등 다양한 리액티브 스트림의 자바 코드 라이브러리가 이미 존재했기 때문이다
즉, 이 모든 기능의 공통 부분만 봅아내 표준화 작업을 한 것이다.
RxJava 사용하기
RxJava: 리액티브 라이브러리의 한 종류
RxJava 는 Flow.Publisher 를 구현하는 두 클래스를 제공
io.reactivex.FlowablePull 기반 역압력 기능 포함
이미 Flow API를 통해 역압력을 확인했으니 아래선 생략
io.reactivex.Observable역압력 지원 X
단순한 마우스 움직임 같은 사용자 인터페이스 이벤트에 적합
이유: 마우스 움직임을 느리게 하거나 멈출 수 없듯이 역압력을 적용할 수 없기 때문
Flow API 와 비교
Publisher 역할: Observable
옵저버블은 역압력 지원을 안하므로, request() 메서드를 사용할 필요 없다.
Subscription 클래스 이용이 필요 없다는거 같음.
Subscriber 역할: Observer
옵저버는 추가로 Disposable 인수를 갖는다.
Observer 인터페이스
좋은 시스템 아키텍처를 위한 TIP
세부 사항을 노출하지 말자.
즉, Observable의 추가 구조가 필요한 상황에서만 Observable 을 사용하고 그렇지 않으면 Publisher의 인터페이스를 사용하는 것이 좋다.
비슷한 사례
전달하는 값이 ArrayList 임을 알지만 파라미터 형식을 List로 설정함으로 구현 세부사항을 밖으로 노출하지 않을 수 있다.
이럼으로써, LinkedList로 적용 가능하다.
다형성
마블 다이어그램
모형들을 수평선에 표시해서 리액티브 스트림의 흐름을 보여준다.

지금부터는 RxJava의 리액티브 스트림의 구현을 이용해서 온도 보고 시스템을 정의해보자.
Observable 만들고 사용하기
Observable, Flowable 클래스는 다양한 종류의 리액티브 스트림을 편리하게 만들 수 있도록 여러 팩토리 메서드를 제공한다.
just(): 한 개 이상의 요소를 이용해 이를 방출하는 Observable로 변환한다.
interval(): 특정 속도로 이벤트를 방출하는 상황에 유용하다.
just() 사용한 간단한 예제
interval() 사용 예
이 코드는 프린팅이 안되고 종료되버린다.
이유: 매 초마다 정보를 발행하는 Observable이 RxJava의 연산 스레드 풀 즉 데몬 스레드에서 실행되기 때문프린팅이 안된다.
해결: blockingSubscribe() 사용하기
예제의 난이도를 높여보자.
온도를 직접 출력하지 않고 사용자에게 팩토리 메서드를 제공해 매 초마다 온도를 방출하는 Observable을 반환해보자.
main 코드
순서 정리
Observer를 소비하는 함수로부터 Observable 만들기
매초 마다 무한으로 증가하는 일련의 long 값을 방출하는 Observerble
소비된 Observer가 폐기되지 않았으면 어떤 작업을 수행 (이전 에러)
온도를 다섯번 보고 했으면 옵저버를 완료하고 스트림 종료
아니면 온도를 옵저버로 보고
수신한 온도를 출력하는 Observer
역압력을 제공하지 않으니, request() 메서드가 필요 없어 단순하다.
결과
RxJava 예제를 조금 더 발전시켜서 한 개 이상의 리액티브 스트림을 다루는 방법을 살펴보자.
RxJava 고급지게 활용해보기
Observable이 방출하는 요소를 조작하는 다양한 방법을 확인해보자.
RxJava의 장점: 스트림을 합치고, 만들고, 거르는 등의 풍부한 기능 사용 가능
map() 사례
filter() 사례
세 도시의 온도를 출력하는 Main 클래스
getCelsiusTemperatures(): 여러 도시에서 온도를 방출하는 Observable을 가질 수 있도록 처리되었다.
merge 메서드는 Iterable을 인수로 받아 마치 한 개의 Observable 처럼 동작하도록 결과를 합친다.
결과
merge 마블 다이어그램

정리
리액티브 프로그래밍의 사상은 이미 오래전에 나왔지만 최근에서야 각광받는 중이다.
리액티브 소프트웨어는 4 가지 속성 (반응성, 회복성, 탄력성, 메시지 주도) 을 가져야 한다.
리액티브 시스템과 리액티브 애플리케이션의 개념 차이를 알자.
리액티브 스트림에서 역압력은 중요하다. (구독자-발행자 속도 차이해결)
RxJava는 리액티브 프로그래밍 도구 중 대표적이다. (강력한 연산자들 - filter, map 등)
표준화된 자바 Flow API 가 있다.
Last updated