티스토리 뷰
- Reactor Sequence에서 사용되는 스레드를 관리해 주는 역할
10.1 스레드(Thread)의 개념 이해
- 물리적인 스레드(Pysical Thread) : CPU 코어를 논리적으로 나눈 것
- 듀얼코어 4 스레드 (작업관리자 [성능]에서 확인 가능)
![물리적인 스레드](https://img1.daumcdn.net/thumb/R1280x0/?scode=mtistory2&fname=https%3A%2F%2Fblog.kakaocdn.net%2Fdn%2Fckw6VT%2FbtrLovaspwR%2Fdbvn4JZXSZK379LwjgfFH0%2Fimg.png)
- 논리적인 스레드(Logical Thread) : 프로세스 내에서 실행되는 세부 작업의 단위
- 이론적으로는 제한이 없지만 실제로는 물리적인 스레드의 가용 범위내에서 생성 가능
![논리적인 스레드](https://img1.daumcdn.net/thumb/R1280x0/?scode=mtistory2&fname=https%3A%2F%2Fblog.kakaocdn.net%2Fdn%2F8Ws7p%2FbtrLr3dqm8x%2FhW7mIaQCZ4DkewtOKWr2MK%2Fimg.png)
- 병렬성(Parallelism) : 물리적인 스레드가 실제로 동시에 실행됨
- 동시성(Concurrency) : 동시에 실행되는 것처럼 보이지만 실제로는 아님
- 무수히 많은 논리적인 스레드가 물리적인 스레드를 아주 빠른 속도로 번갈아 가면서 사용
10.2 Scheduler란?
- 어떤 스레드에서 무엇을 처리할지 제어(경쟁조건 등 고려)를 Scheduler가 개발자 대신 수행
- 코드가 간결해짐
- 개발자 부담 적어짐
10.3 Scheduler를 위한 전용 Operator
- subscribeOn()
- 구독 발생 직후 실행될 스레드 지정
- 원본 Publisher 동작 수행을 위한 스레드
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[] {1, 3, 5, 7})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(data -> log.info("# doOnNext: {}", data))
.doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
}
08:35:03.363 [main] INFO - # doOnSubscribe // 최초 실행 스레드 : main
08:35:03.371 [boundedElastic-1] INFO - # doOnNext: 1 // 구독 발생 직후 스레드 : boundedElastic
08:35:03.373 [boundedElastic-1] INFO - # onNext: 1
08:35:03.373 [boundedElastic-1] INFO - # doOnNext: 3
08:35:03.373 [boundedElastic-1] INFO - # onNext: 3
08:35:03.373 [boundedElastic-1] INFO - # doOnNext: 5
08:35:03.373 [boundedElastic-1] INFO - # onNext: 5
08:35:03.373 [boundedElastic-1] INFO - # doOnNext: 7
08:35:03.373 [boundedElastic-1] INFO - # onNext: 7
- publishOn()
- Downstream으로 Signal을 전송할 때 실행되는 스레드 제어
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[] {1, 3, 5, 7})
.doOnNext(data -> log.info("# doOnNext: {}", data))
.doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
}
08:40:35.020 [main] INFO - # doOnSubscribe
08:40:35.024 [main] INFO - # doOnNext: 1
08:40:35.027 [main] INFO - # doOnNext: 3
08:40:35.027 [parallel-1] INFO - # onNext: 1 // Downstream 실행 스레드 변경
08:40:35.027 [main] INFO - # doOnNext: 5
08:40:35.027 [main] INFO - # doOnNext: 7
08:40:35.027 [parallel-1] INFO - # onNext: 3
08:40:35.027 [parallel-1] INFO - # onNext: 5
08:40:35.027 [parallel-1] INFO - # onNext: 7
- parallel()
- 병렬성을 가지는 물리적인 스레드
- cf.
subscribeOn()
,publishOn()
은 동시성을 가지는 논리적인 스레드
- cf.
- 라운드 로빈 방식으로 논리적인 코어(물리적인 스레드) 개수만큼의 스레드를 병렬로 실행
- Example10_3
- 병렬성을 가지는 물리적인 스레드
subscribeOn()
,publishOn()
를 활용하여 Publisher에서 데이터를 emit하는 스레드와 emit된 데이터를 가공 처리하는 스레드를 적절하게 분리할 수 있다.
10.5 Scheduler의 종류
- Schedulers.immediate() : 별도의 스레드를 추가적으로 생성하지 않고, 현재 스레드에서 작업 처리
- Schedulers.single() : 스레드 하나만 생성해서 Scheduler가 제거되기 전까지 재사용
- Schedulers.newSingle() : 매번 새로운 스레드 하나 생성
- Schedulers.boundedElastic() : ExecutorService 기반의 스레드 풀 생성한 후, 정해진 수만큼의 스레드를 사용하여 작업을 처리하고 작업 종료된 스레드는 반납하여 재사용
- 기본적으로 CPU 코어 수 x 10만큼의 스레드 생성
- 최대 100,000개의 작업 큐에서 대기 가능
- Blocking I/O 작업을 효과적으로 처리하기 위한 방식
- 다른 Blocking I/O 작업이 Non-Blocking 처리에 영향을 주지 않도록 전용 스레드 할당
- Schedulers.parallel() : Non-Blocking I/O에 최적화되어 있는 Scheduler로서 CPU 코어 수만큼의 스레드 생성
- Schedulers.fromExecutorService() : 기존에 이미 사용하고 있는 ExecutorService가 있다면 이 ExecutorService로부터 Scheduler를 생성하는 방식
- Schedulers.newXXXX() : 스레드 이름, 생성 가능한 디폴트 스레드의 개수, 스레드의 유효 시간, 데몬 스레드로의 동작 여부 등 직접 지정해서 커스텀 스레드 풀 생성
'공부 > 스프링으로 시작하는 리액티브 프로그래밍' 카테고리의 다른 글
Chapter 12 | Debugging (0) | 2023.07.21 |
---|---|
Chapter 11 | Context (0) | 2023.07.21 |
Chapter 09 | Sinks (0) | 2023.07.21 |
Chapter 08 | Backpressure (0) | 2023.07.21 |
Chapter 07 | Cold Sequence와 Hot Sequence (0) | 2023.07.21 |
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크