티스토리 뷰

7.1 Cold와 Hot의 의미

  • Cold Booting, Hot Deploy, ...
  • Cold : 새로 시작
  • Hot : 새로 시작하지 않음

7.2 Cold Sequence

  • Subscriber가 구독할 때마다 데이터 흐름이 처음부터 다시 시작되는 Sequence
    public static void main(String[] args) throws InterruptedException {

        Flux<String> coldFlux =
                Flux
                    .fromIterable(Arrays.asList("KOREA", "JAPAN", "CHINESE"))
                    .map(String::toLowerCase);

        coldFlux.subscribe(country -> log.info("# Subscriber1: {}", country));
        System.out.println("----------------------------------------------------------------------");
        Thread.sleep(2000L);
        coldFlux.subscribe(country -> log.info("# Subscriber2: {}", country));
    }
    21:27:48.790 [main] INFO - # Subscriber1: korea
    21:27:48.791 [main] INFO - # Subscriber1: japan
    21:27:48.791 [main] INFO - # Subscriber1: chinese
    ----------------------------------------------------------------------
    21:27:50.798 [main] INFO - # Subscriber2: korea
    21:27:50.798 [main] INFO - # Subscriber2: japan
    21:27:50.798 [main] INFO - # Subscriber2: chinese

7.3 Hot Sequence 

  • 구독이 발생한 시점 이전에 emit된 데이터는 전달받지 못하고 구독이 발생한 시점 이후에 emit된 데이터만 전달 받을 수 있음
    public static void main(String[] args) throws InterruptedException {
        String[] singers = {"Singer A", "Singer B", "Singer C", "Singer D", "Singer E"};

        log.info("# Begin concert:");
        Flux<String> concertFlux =
                Flux
                    .fromArray(singers)
                    .delayElements(Duration.ofSeconds(1))
                    .share(); // Hot Sequence로 동작하게 해 주는 Operator

        concertFlux.subscribe(
                singer -> log.info("# Subscriber1 is watching {}'s song", singer)
        );

        Thread.sleep(2500);

        concertFlux.subscribe(
                singer -> log.info("# Subscriber2 is watching {}'s song", singer)
        );

        Thread.sleep(3000);
    }
    21:30:14.426 [parallel-1] INFO - # Subscriber1 is watching Singer A's song
    21:30:15.444 [parallel-2] INFO - # Subscriber1 is watching Singer B's song
    21:30:16.451 [parallel-3] INFO - # Subscriber1 is watching Singer C's song
    21:30:16.452 [parallel-3] INFO - # Subscriber2 is watching Singer C's song
    21:30:17.458 [parallel-4] INFO - # Subscriber1 is watching Singer D's song
    21:30:17.459 [parallel-4] INFO - # Subscriber2 is watching Singer D's song
    21:30:18.470 [parallel-5] INFO - # Subscriber1 is watching Singer E's song
    21:30:18.470 [parallel-5] INFO - # Subscriber2 is watching Singer E's song

7.4 HTTP 요청과 응답에서 Cold Sequence와 Hot Sequence의 동작 흐름

    public static void main(String[] args) throws InterruptedException {
        URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
                .host("worldtimeapi.org")
                .port(80)
                .path("/api/timezone/Asia/Seoul")
                .build()
                .encode()
                .toUri();

        // `cache()` Operator를 통해 Cold Sequence가 Hot sequence로 동작
        Mono<String> mono = getWorldTime(worldTimeUri).cache();

        // 두 번의 구독이 발생하여 두 번의 새로운 HTTP 요청이 발생하는 경우
        mono.subscribe(dateTime -> log.info("# dateTime 1: {}", dateTime));
        Thread.sleep(2000);
        mono.subscribe(dateTime -> log.info("# dateTime 2: {}", dateTime));

        Thread.sleep(2000);
    }

    private static Mono<String> getWorldTime(URI worldTimeUri) {
        return WebClient.create()
                .get()
                .uri(worldTimeUri)
                .retrieve()
                .bodyToMono(String.class)
                .map(response -> {
                    DocumentContext jsonContext = JsonPath.parse(response);
                    String dateTime = jsonContext.read("$.datetime");
                    return dateTime;
                });
    }
    INFO - # dateTime 1: 2023-07-04T21:38:10.257430+09:00
    INFO - # dateTime 2: 2023-07-04T21:38:10.257430+09:00
댓글