#17장 리액티브 프로그래밍

이 장의 내용

  • 리액티브 프로그래밍을 정의하고 리액티브 매니패스토를 확인함

    • 과거의 잘못된 행적을 솔직히 반성하며 새로운 미래를 위한 구체적 약속을 공개적인 방식으로 책임성을 담아 문서로서 선언하는

  • 애플리케이션 수준, 시스템 수준의 리액티브 프로그래밍

  • 리액티브 스트림, 자바 9플로 API를사용한예제코드

  • 널리 사용되는 리액티브 라이브러리 RxJava 소개

  • 여러 리액티브 스트림을 변환하고 합치는 RxJava 동작 살펴보기

  • 리액티브 스트림의 동작을 시각적으로 문서화하는 마블 다이어그램

왜 리액티브 블라블라 가 필요하게 되었을까?

오늘날 상황이 변했다.

  • 빅데이터

  • 많은 모바일 디바이스와 수천개 멀티코어 프로세서 애플리케이션 배포 환경

  • 1년 내내 가능한 고객의 서비스 요구

4가지 매니페스토 퀴즈

매핑하시오. ( Elastic Responsive Resilient Message-driven )

  • 시스템은 적시에 응답해야 한다.

  • 시스템은 느슨한 결합을 보장하기 위해 구성 요소 간에 비동기 메시지 전달을 사용해야 한다.

  • 시스템은 높은 부하에서도 응답성을 유지해야 한다.

  • 일부 구성 요소가 장애가 발생해도 시스템은 응답을 유지해야 한다.

리액티브 애플리케이션 vs 리액티브 시스템

잘 이해안됨

링크arrow-up-right

  • 리액티브 시스템

    • 아키텍처 스타일의 한 종류

    • 애플리케이션을 조립하고 상호소통을 조절

    • 위의 매니페스토를 따라, 작업 부하가 있으면 유동적으로 scale-up 하는 성질을 가짐

    • 주요 속성: 메시지 주도

  • 리액티브 애플리케이션

    • 비교적 짧은 시간 동안만 유지되는 데이터 스트림에 기반한 연산을 수행하며 보통 이벤트 주도로 분류된다.

  • 요약

    • 리액티브 애플리케이션이 모여 시스템이 되는 것 같다.

앞으로 배울 리액티브 프로그래밍 필요한 이유와 일반적 사실들

  • 스레드는 비싸고 귀한 자원이다. (할당하고 해제하는 것도 비싸다)

  • 리액티브 프로그래밍의 프레임워크나 라이브러리는 개발자가 레이스 컨디션, 데드락 같은 같은 저수준의 멀티스레드 문제를 직접 처리할 필요가 없어지면서 비즈니스 요구사항 구현에 더 집중할 수 있다.

  • 스레드 풀을 쪼갤때는 블럭 동작을 넣지 않아야 한다.

    • RxJava, Akka 같은 리액티브 프레임워크는 별도로 지정된 스레드 풀에서 블록 동작을 실행시켜 이 문제를 해결한다.

![image-20220219195249486](D:\0 Google Drive\03 스터디 모임\모던자바\10주차\17장.assets\image-20220219195249486.png)

리액티브 시스템은 여러 애플리케이션이 한 개의 일관적인, 회복할 수 있는 플랫폼을 구성할 수 있게 해줄 뿐 아니라 이들 애플리케이션 중 하나가 실패해도 전체 시스템은 계속 운영될 수 있도록 도와주는 소프트웨어 아키텍처다.

리액티브 스트림과 Flow API

  • 리액티브 스트림 이란?

    • 리액티브 프로그래밍을 하기 위해 사용된다.

    • 무한의 비동기 데이터를 순서대로 그리고 블록하지 않는 역압력을 전제해 처리하는 표준 기술

  • 역압력이란?

    • 발행-구독 프로토콜에서 구독자가 느린 속도로 이벤트를 소비하면서 문제가 발생하지 않도록 보장하는 장치

    • 사용함으로써

      • 이벤트를 잃어버리는 문제 해결 가능

      • 이벤트 수신을 늦추는 것, 가능한 수신량 알림기능, 남은일 처리 예측 시간 알림과 같은 기능으로 업스트림 구독자에게 알릴수 있다.

      • 비동기 API를 이용하면 하드웨어 사용률을 극대화할 수 있지만 느린 다운스트림 컴포넌트에 너무 큰 부하를 줄 가능성도 생긴다. 역압력은 이 문제도 해결한다.

  • 회사별 리액티브 스트림 자체 구현 사례

    • 라이트밴드 - Akka Stream

    • 넷플릭스 - RxJava

    • 리액터 - Pivotal

    • 레드햇 - Vert.x

  • Flow API란?

    • 자바에서 위 4개를 기반으로 최소 기능 집합으로만 재정의한 표준 API = Flow API

Flow 클래스 소개

Flow 클래스가 포함하는 4개 인터페이스

  • 언제 Subscriber의 onSubscribe()와 onNext()가 사용될까?

    • onSubscribe(): Subscriber가 Publisher에 자신을 등록할 때 Publisher는 처음으로 onSubscribe 메서드를 호출해 Subscription 객체를 전달할 수 있다.

    • onNext(): Publisher가 새로운 이벤트를 생성할 때마다 호출된다.

  • Subscription의 request()와 cancel() 메서드의 역할은?

    • request(): Publisher에게 주어진 개수의 이벤트를 처리할 준비가 되었음을 알리는데 사용

    • cancel(): 구독 취소 요청

  • 작동 요약

    • https://www.baeldung.com/java-9-reactive-streams

샘플 애플리케이션 만들어보기

온도 정보 주기적으로 받아볼 수 있게 하는 애플리케이션을 만들어 보자

리액티브 애플리케이션 생명 주기 확인하기

![image-20220219203502517](D:\0 Google Drive\03 스터디 모임\모던자바\10주차\17장.assets\image-20220219203502517.png)

  • 여기 코드에선

    • Publisher: 팩토리 메소드로 바로 생성

    • Subscriber: TempSubscriber

    • Subscription: TempSubscription

    • 기타

      • TempInfo: 온도 정보 담는 객체

특정 도시 온도 정보 구독 하기

온도 정보를 받아볼 예정인 구독자

온도 정보를 받아볼 수 있는 구독권

온도 정보 객체

결과 - 뉴욕의 온도를 네 번 성공적으로 전달했지만 다섯 번째에 에러가 발생

지금까지 개발한 코드에 작은 문제가 있다. 퀴즈를 보고 생각해보자.

퀴즈

지금까지 개발한 코드에 작은 문제가 있다. 하지만 이 문제는 Tempinfo 팩토리 메서드 내에서 에러를 임의로 발생시키는 코드 때문에 감춰진 상태다. 임의로 에러를 발생시키는 코드를 없앤 다음 main을 오래 실행하면 어떤 일이 일어날까?

답: TempSubscriber가 새로운 요소를 onNext 메서드로 받을 때마다 TempSubscription으로 새 요청을 보내면 request 메서드가 TempSubscriber 자신에게 또 다른 요소를 보내는 문제가 있다. 이런 재귀 호출은 스택이 오버플로 될때까지 반복해서 일어난다.

해결 방법

  • Executor를 TempSubscription으로 추가한 다음 다른 스레드에서 TempSubscriber로 세 요소를 전달한다.

  • 이것 이해 잘 안된다.

Processor 데이터 변환하기

Processor 인터페이스의 사용법도 확인하자

  • 목적: Publisher를 구독한 다음 수신한 데이터를 가공해 다시 제공하는 것

  • Subscriber 인터페이스를 구현하는 다른 모든 메서드는 단순히 수신한 모든 신호를 업스트림 Subscriber로 전달하며 Publisher의 subscribe 메서드는 업스트림 Subscriber를 Processor로 등록하는 동작을 수행한다.

Main: Publisher를 만들고 TempSubscriberB 구독시킴

실행 코드

왜 Flow API 구현을 제공하지 않는가?

  • Flow API가 구현을 재공하지 않아, 우리가 직접 인터페이스를 구현했다.

  • 다른 사례를 보면, 자바는 List의 구현인 ArrayList를 제공한다.

  • 왜 구현 제공을 안할까?

    • 이유: API를 만들 당시 Akka, RxJava 등 다양한 리액티브 스트림의 자바 코드 라이브러리가 이미 존재했기 때문이다

    • 즉, 이 모든 기능의 공통 부분만 봅아내 표준화 작업을 한 것이다.

RxJava 사용하기

  • RxJava: 리액티브 라이브러리의 한 종류

  • RxJava 는 Flow.Publisher 를 구현하는 두 클래스를 제공

    • io.reactivex.Flowable

      • Pull 기반 역압력 기능 포함

      • 이미 Flow API를 통해 역압력을 확인했으니 아래선 생략

    • io.reactivex.Observable

      • 역압력 지원 X

      • 단순한 마우스 움직임 같은 사용자 인터페이스 이벤트에 적합

        • 이유: 마우스 움직임을 느리게 하거나 멈출 수 없듯이 역압력을 적용할 수 없기 때문

  • Flow API 와 비교

    • Publisher 역할: Observable

      • 옵저버블은 역압력 지원을 안하므로, request() 메서드를 사용할 필요 없다.

      • Subscription 클래스 이용이 필요 없다는거 같음.

  • Subscriber 역할: Observer

    • 옵저버는 추가로 Disposable 인수를 갖는다.

    • Observer 인터페이스

  • 좋은 시스템 아키텍처를 위한 TIP

    • 세부 사항을 노출하지 말자.

      • 즉, Observable의 추가 구조가 필요한 상황에서만 Observable 을 사용하고 그렇지 않으면 Publisher의 인터페이스를 사용하는 것이 좋다.

      • 비슷한 사례

        • 전달하는 값이 ArrayList 임을 알지만 파라미터 형식을 List로 설정함으로 구현 세부사항을 밖으로 노출하지 않을 수 있다.

        • 이럼으로써, LinkedList로 적용 가능하다.

        • 다형성

  • 마블 다이어그램

    • 모형들을 수평선에 표시해서 리액티브 스트림의 흐름을 보여준다.

![image-20220220002256504](D:\0 Google Drive\03 스터디 모임\모던자바\10주차\17장.assets\image-20220220002256504.png)

지금부터는 RxJava의 리액티브 스트림의 구현을 이용해서 온도 보고 시스템을 정의해보자.

Observable 만들고 사용하기

Observable, Flowable 클래스는 다양한 종류의 리액티브 스트림을 편리하게 만들 수 있도록 여러 팩토리 메서드를 제공한다.

  • just(): 한 개 이상의 요소를 이용해 이를 방출하는 Observable로 변환한다.

  • interval(): 특정 속도로 이벤트를 방출하는 상황에 유용하다.

just() 사용한 간단한 예제

interval() 사용 예

  • 이 코드는 프린팅이 안되고 종료되버린다.

    • 이유: 매 초마다 정보를 발행하는 Observable이 RxJava의 연산 스레드 풀 즉 데몬 스레드에서 실행되기 때문프린팅이 안된다.

    • 해결: blockingSubscribe() 사용하기

예제의 난이도를 높여보자.

온도를 직접 출력하지 않고 사용자에게 팩토리 메서드를 제공해 매 초마다 온도를 방출하는 Observable을 반환해보자.

main 코드

  • 순서 정리

    • Observer를 소비하는 함수로부터 Observable 만들기

    • 매초 마다 무한으로 증가하는 일련의 long 값을 방출하는 Observerble

    • 소비된 Observer가 폐기되지 않았으면 어떤 작업을 수행 (이전 에러)

    • 온도를 다섯번 보고 했으면 옵저버를 완료하고 스트림 종료

    • 아니면 온도를 옵저버로 보고

수신한 온도를 출력하는 Observer

  • 역압력을 제공하지 않으니, request() 메서드가 필요 없어 단순하다.

결과

RxJava 예제를 조금 더 발전시켜서 한 개 이상의 리액티브 스트림을 다루는 방법을 살펴보자.

RxJava 고급지게 활용해보기

Observable이 방출하는 요소를 조작하는 다양한 방법을 확인해보자.

RxJava의 장점: 스트림을 합치고, 만들고, 거르는 등의 풍부한 기능 사용 가능

map() 사례

filter() 사례

세 도시의 온도를 출력하는 Main 클래스

  • getCelsiusTemperatures(): 여러 도시에서 온도를 방출하는 Observable을 가질 수 있도록 처리되었다.

  • merge 메서드는 Iterable을 인수로 받아 마치 한 개의 Observable 처럼 동작하도록 결과를 합친다.

결과

merge 마블 다이어그램

![image-20220220002712153](D:\0 Google Drive\03 스터디 모임\모던자바\10주차\17장.assets\image-20220220002712153.png)

정리

  • 리액티브 프로그래밍의 사상은 이미 오래전에 나왔지만 최근에서야 각광받는 중이다.

  • 리액티브 소프트웨어는 4 가지 속성 (반응성, 회복성, 탄력성, 메시지 주도) 을 가져야 한다.

  • 리액티브 시스템과 리액티브 애플리케이션의 개념 차이를 알자.

  • 리액티브 스트림에서 역압력은 중요하다. (구독자-발행자 속도 차이해결)

  • RxJava는 리액티브 프로그래밍 도구 중 대표적이다. (강력한 연산자들 - filter, map 등)

  • 표준화된 자바 Flow API 가 있다.

Last updated