티스토리 뷰

2.1 리액티브 스트림즈(Reactive Streams)란?

  • 데이터 스트림을 Non-Blocking이면서 비동기적인 방식으로 처리하기 위한 리액티브 라이브러리 표준 사양
    • 구현체 예시 : Reactor, RxJava, Akka Streams, Java 9 Flow API 등

2.2 리액티브 스트림즈 구성요소

  • Publisher : 데이터 생성 및 통지
  • Subscriber : 구독한 Publisher로부터 통지된 데이터를 전달 받아 처리
  • Subscription : Publisher 구독, 요청할 데이터의 개수 지정, 구독 취소
  • Processor : Publisher와 Subscriber 기능 모두

2.3 코드로 보는 리액티브 스트림즈 컴포넌트

  • Publisher
package org.reactivestreams;

public interface Publisher<T> {
    // 개념상으로는 Subscriber가 Publisher를 구독하는 것이지만,
    // 실제 코드상에서는 Publisher가 Subscriber를 등록하는 형태로 구현
    public void subscribe(Subscriber<? super T> s);
}
  • Subscriber
package org.reactivestreams;

public interface Subscriber<T> {
    // 구독 시작 시점 처리
    public void onSubscribe(Subscription s);
    // Publisher가 통지한 데이터 처리
    public void onNext(T t);
    // Publisher가 데이터 통지를 위한 처리 과정에서 에러 발생했을 때 에러 처리
    public void onError(Throwable t);
    // 데이터 통지가 정상적으로 완료될 경우 후 처리
    public void onComplete();
}
  • Subscription
package org.reactivestreams;

public interface Subscription {
    // Publisher에게 요청할 데이터의 개수 지정
    public void request(long n);
    // 구독 해지
    public void cancel();
}
  • Processor
package org.reactivestreams;

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    // 별도 구현 메서드 없음
    // Subscriber 인터페이스와 Publisher 인터페이스 상속
}

2.4 리액티브 스트림즈 관련 용어 정의

  • Signal : Publisher와 Subscriber간 주고받는 상호작용
    • Publisher → Subscriber
      • onSubscribe, onNext, onComplete, onError
    • Subscriber → Publisher
      • request, cancel
  • Demand : Subscriber가 Publisher에게 요청하는 데이터
    • Publisher가 아직 Subscriber에게 전달하지 않은 Subscriber가 요청한 데이터
  • Emit : Publisher가 Subscriber에게 데이터를 전달하는 것 (내보내는 것)
  • Upstream/Downstream : 메서드 체인에서 상위/하위 스트림
  • public static void main(String[] args) { Flux .just(1, 2, 3, 4, 5, 6) // Upstream .filter(n -> n % 2 == 0) .map(n -> n * 2) // Downstream .subscribe(System.out::println); }
  • Sequence : Publisher가 emit하는 데이터의 연속적인 흐름을 Operator 체인 형태로 정의한 것
  • Operator : 데이터 가공 연산자
    • ex. just, filter, map
  • Source : 최초에 생성된 무언가
    • ex. Data Source, Source Publisher, Source Flux

2.5 리액티브 스트림즈의 구현 규칙

  • Publisher 구현을 위한 주요 기본 규칙
    1. Publisher가 Subscriber에게 보내는 onNext signal의 총 개수는 요청된 데이터의 총 개수보다 더 작거나 같아야 한다.
    2. Publisher는 요청된 것보다 적은 수의 onNext signal을 보내고 onComplete 또는 onError를 호출하여 구독을 종료할 수 있다.
    3. Publisher의 데이터 처리가 실패하면 onError signal을 보내야 한다.
    4. Publisher의 데이터 처리가 성공적으로 종료되면 onComplete signal을 보내야 한다.
    5. Publisher가 Subscriber에게 onError 또는 onComplete signal을 보내는 경우 해당 Subscriber의 구독은 취소된 것으로 간주되어야 한다.
    6. 일단 종료 상태 signal을 받으면(onError, onComplete) 더 이상 signal이 발생되지 않아야 한다.
    7. 구독이 취소되면 Subscriber는 signal을 받는 것을 중지해야 한다.
  • Subscriber 구현을 위한 주요 기본 규칙
    1. Subscriber는 Publisher로부터 onNext signal을 수신하기 위해 Subscription.request(n)를 통해 Demand signal을 Publisher에게 보내야 한다.
    2. Subscriber.onComplete() 및 Subscriber.onError(Throwable t)는 Subscription 또는 Publisher의 메서드를 호출해서는 안 된다.
    3. Subscriber.onComplete() 및 Subscriber.onERror(Throwable t)는 signal을 수신한 후 구독이 취소된 것으로 간주해야 한다.
    4. 구독이 더 이상 필요하지 않은 경우 Subscriber는 Subscription.cancel()을 호출해야 한다.
    5. Subscriber.onSubscribe()는 지정된 Subscriber에 대해 최대 한 번만 호출되어야 한다.
  • Subscription 구현을 위한 주요 기본 규칙
    1. 구독은 Subscriber가 onNext 또는 onSubscribe 내에서 동기적으로 Subscription.request를 호출하도록 허용해야 한다.
    2. 구독이 취소된 후 추가적으로 호출되는 Subscription.request(long n)는 효력이 없어야 한다.
    3. 구독이 취소된 후 추가적으로 호출되는 Subscription.cancel()은 효력이 없어야 하낟.
    4. 구독이 취소되지 않은 동안 Subscription.request(long n)의 매개변수가 0보다 작거나 같으면 java.lang.IllegalArgumentException과 함께 onError signal을 보내야 한다.
    5. 구독이 취소되지 않은 동안 Subscription.cancel()은 Publisher가 Subscriber에게 보내는 signal을 결국 중지하도록 요청해야 한다.
    6. 구독이 취소되지 않은 동안 Subscription.cancel()은 Publisher에게 해당 구독자에 대한 참조를 결국 삭제하도록 요청해야 한다.
    7. Subscription.cancel(), Subscription.request() 호출에 대한 응답으로 예외를 던지는 것을 허용하지 않는다.
      • Return normally : 유효한 값 이외에는 어떠한 예외도 던지지 않는다는 의미
    8. 구독은 무제한 수의 request 호출을 지원해야 하고 최대 2^63-1개의 Demand를 지원해야 한다.

2.6 리액티브 스트림즈 구현체

  • Project Reactor
    • Spring Framework 팀에 의해 주도적으로 개발된 리액티브 스트림즈 구현체
  • RxJava
    • Rx : Reactive Extensions
    • .NET 환경의 리액티브 확장 라이브러리를 넷플릭스에서 Java 언어로 포팅하여 만든 JVM 기반 라이브러리
  • Akka Streams
    • JVM 상에서의 동시성과 분산 애플리케이션을 단순화해 주는 오픈소스 툴킷
  • Java Flow API
    • Reactor, RxJava, Akka Streams 같은 리액티브 스트림즈 구현체라기 보다는 표준 사양
    • JDBC와 같은 SPI(Service Provider Interface)
댓글