티스토리 뷰

  • Reactor Sequence에서 사용되는 스레드를 관리해 주는 역할

10.1 스레드(Thread)의 개념 이해

  • 물리적인 스레드(Pysical Thread) : CPU 코어를 논리적으로 나눈 것
    • 듀얼코어 4 스레드 (작업관리자 [성능]에서 확인 가능)
  • 논리적인 스레드(Logical Thread) : 프로세스 내에서 실행되는 세부 작업의 단위
    • 이론적으로는 제한이 없지만 실제로는 물리적인 스레드의 가용 범위내에서 생성 가능
  • 병렬성(Parallelism) : 물리적인 스레드가 실제로 동시에 실행됨
  • 동시성(Concurrency) : 동시에 실행되는 것처럼 보이지만 실제로는 아님
    • 무수히 많은 논리적인 스레드가 물리적인 스레드를 아주 빠른 속도로 번갈아 가면서 사용

10.2 Scheduler란?

  • 어떤 스레드에서 무엇을 처리할지 제어(경쟁조건 등 고려)를 Scheduler가 개발자 대신 수행
    • 코드가 간결해짐
    • 개발자 부담 적어짐

10.3 Scheduler를 위한 전용 Operator

  • subscribeOn()
    • 구독 발생 직후 실행될 스레드 지정
    • 원본 Publisher 동작 수행을 위한 스레드
    public static void main(String[] args) throws InterruptedException {
        Flux.fromArray(new Integer[] {1, 3, 5, 7})
                .subscribeOn(Schedulers.boundedElastic())
                .doOnNext(data -> log.info("# doOnNext: {}", data))
                .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
                .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(500L);
    }
    08:35:03.363 [main] INFO - # doOnSubscribe // 최초 실행 스레드 : main
    08:35:03.371 [boundedElastic-1] INFO - # doOnNext: 1 // 구독 발생 직후 스레드 : boundedElastic
    08:35:03.373 [boundedElastic-1] INFO - # onNext: 1
    08:35:03.373 [boundedElastic-1] INFO - # doOnNext: 3
    08:35:03.373 [boundedElastic-1] INFO - # onNext: 3
    08:35:03.373 [boundedElastic-1] INFO - # doOnNext: 5
    08:35:03.373 [boundedElastic-1] INFO - # onNext: 5
    08:35:03.373 [boundedElastic-1] INFO - # doOnNext: 7
    08:35:03.373 [boundedElastic-1] INFO - # onNext: 7
  • publishOn()
    • Downstream으로 Signal을 전송할 때 실행되는 스레드 제어
    public static void main(String[] args) throws InterruptedException {
        Flux.fromArray(new Integer[] {1, 3, 5, 7})
                .doOnNext(data -> log.info("# doOnNext: {}", data))
                .doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
                .publishOn(Schedulers.parallel())
                .subscribe(data -> log.info("# onNext: {}", data));

        Thread.sleep(500L);
    }
    08:40:35.020 [main] INFO - # doOnSubscribe
    08:40:35.024 [main] INFO - # doOnNext: 1
    08:40:35.027 [main] INFO - # doOnNext: 3
    08:40:35.027 [parallel-1] INFO - # onNext: 1 // Downstream 실행 스레드 변경
    08:40:35.027 [main] INFO - # doOnNext: 5
    08:40:35.027 [main] INFO - # doOnNext: 7
    08:40:35.027 [parallel-1] INFO - # onNext: 3
    08:40:35.027 [parallel-1] INFO - # onNext: 5
    08:40:35.027 [parallel-1] INFO - # onNext: 7
  • parallel()
    • 병렬성을 가지는 물리적인 스레드
      • cf. subscribeOn(), publishOn()은 동시성을 가지는 논리적인 스레드
    • 라운드 로빈 방식으로 논리적인 코어(물리적인 스레드) 개수만큼의 스레드를 병렬로 실행
    • Example10_3

subscribeOn(), publishOn()를 활용하여 Publisher에서 데이터를 emit하는 스레드와 emit된 데이터를 가공 처리하는 스레드를 적절하게 분리할 수 있다.

10.5 Scheduler의 종류

  • Schedulers.immediate() : 별도의 스레드를 추가적으로 생성하지 않고, 현재 스레드에서 작업 처리
  • Schedulers.single() : 스레드 하나만 생성해서 Scheduler가 제거되기 전까지 재사용
  • Schedulers.newSingle() : 매번 새로운 스레드 하나 생성
  • Schedulers.boundedElastic() : ExecutorService 기반의 스레드 풀 생성한 후, 정해진 수만큼의 스레드를 사용하여 작업을 처리하고 작업 종료된 스레드는 반납하여 재사용
    • 기본적으로 CPU 코어 수 x 10만큼의 스레드 생성
    • 최대 100,000개의 작업 큐에서 대기 가능
    • Blocking I/O 작업을 효과적으로 처리하기 위한 방식
    • 다른 Blocking I/O 작업이 Non-Blocking 처리에 영향을 주지 않도록 전용 스레드 할당
  • Schedulers.parallel() : Non-Blocking I/O에 최적화되어 있는 Scheduler로서 CPU 코어 수만큼의 스레드 생성
  • Schedulers.fromExecutorService() : 기존에 이미 사용하고 있는 ExecutorService가 있다면 이 ExecutorService로부터 Scheduler를 생성하는 방식
  • Schedulers.newXXXX() : 스레드 이름, 생성 가능한 디폴트 스레드의 개수, 스레드의 유효 시간, 데몬 스레드로의 동작 여부 등 직접 지정해서 커스텀 스레드 풀 생성

'공부 > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글

Chapter 12 | Debugging  (0) 2023.07.21
Chapter 11 | Context  (0) 2023.07.21
Chapter 09 | Sinks  (0) 2023.07.21
Chapter 08 | Backpressure  (0) 2023.07.21
Chapter 07 | Cold Sequence와 Hot Sequence  (0) 2023.07.21
댓글