선언형 프로그래밍
무엇을 할 것인가를 명시하는 방식으로 프로그래밍 접근법이며 주요 특징은 다음과 같다.
- 목표 지향적: 해결 하고자 하는 문제의 목표나 결과를 명확하게 정의하고 이를 어떻게 구현할지는 시스템에 맡긴다.
- 과정에 대한 신경을 덜 씀: 구현 세부 사항이나 처리 흐름보다는 결과만을 기술한다.
- 고수준의 추상화: 구현 세부 사항은 보통 프레임워크나 라이브러리, 혹은 시스템 내부에서 처리되므로, 프로그래머는 세부적인 구현에 대한 부담을 덜 수 있다.
대표적인 선언 형 프로그램으로는, SQL, HTML, 리액티브 프로그래밍(React, Vue)등이 있다.
프로그램 예시
명령형 프로그램 예시
reault = 0
for i in range(1,6)
result += i
print(result)
선언형 프로그램 예시
SELECT SUM(value) FROM numbers WHERE value > 0;
선언적 프로그래밍의 장단점
장점
- 가독성 향상: 선언형 코드가 보통 명령형 코드보다 더 직관적이고 간결하다.
- 추상화: 프로그램의 로직을 추상화하여, 세부 구현을 신경 쓰지 않고 비즈니스 로직에 집중할 수 있다.
- 오류 감소: 세부적인 구현을 시스템에 맡기기 때문에, 개발자가 실수할 여지가 적다.
단점
- 제어력 부족: “무엇을 할 것인가”는 정의할 수 있지만, “어떻게 할 것인가”에 대한 세부 제어는 시스템에 맡겨지므로, 때로는 특정한 방식으로 동작하도록 세부적으로 조정하는 데 한계가 있을 수 있다.
- 성능 최적화: 자동화된 추상화가 성능 최적화를 방해할 수 있다. 특정 상황에서는 세부적인 구현을 제어하는 것이 중요할 수 있다.
Reactive 프로그래밍 코드 구성
Reactive 프로그래밍은 비동기 데이터 흐름과 변경 가능한 데이터를 처리하는 프로그래밍 패러다임이다. 이 방식은 데이터 스트림을 처리하고, 비동기로 흐르는 데이터를 처리하는 데 초점을 맞춘다. Reactive 프로그래밍은 주로 불변성, 비동기 처리, 데이터 흐름을 강조한다.
주요 개념
- 스트림(Stream) or 스토어(Store): 데이터의 흐름을 나타낸다. 스트림은 지속적으로 발생하는 데이터 요소들의 시퀀스로, 주로 비동기 적인 데이터 처리를 위해 사용된다.
- 옵저버(Observer): 스트림에서 발생하는 데이터 이벤트를 구독하고 처리하는 객체다. 옵저버는 스트림에서 방출된 데이터를 처리하는 로직을 포함한다.
- 발행자(Publisher): 데이터를 방출하는 객체다. Reactive 스트림에서 데이터를 생산하는 역할을 한다.
- 구독(Subscription): 옵저버가 스트림을 구독하여 데이터의 흐름을 처리하도록 연결하는 과정이다.
- 연산자(Operators): 스트림을 변환하거나 처리할 수 있는 함수적 방법들을 제공한다. 예를 들어, map, filter, merge 등의 연산자를 사용하여 데이터 스트림을 조작할 수 있다.
설계 원칙
- 비동기 메시지 기반 통신으로 동작해야 한다.
- 탄력적이고 회복성을 지녀야 한다.
- 높은 응답성을 지녀야 한다.
- 유지보수와 확장이 용이해야 한다.
Reactive Streams
Reactive Streams는 비동기 데이터 흐름을 처리하고, 스트림을 통해 데이터를 효율적으로 처리하는 표준을 정의한 규격이다. 이 규격은 Backpressure를 관리하고, 비동기적으로 흐르는 데이터 스트림을 처리하기 위해 설계되었다. Reactive Streams는 주로 Reactive 프로그래밍에서 사용되며, Publisher, Subscriber, Subscription, Processor와 같은 네 가지 주요 구성 요소로 이루어져 있다.
Publisher
- Publisher는 데이터를 방출하는 객체이다. 데이터 스트림을 생성하고, 이를 Subscriber에게 전달한다. 데이터는 비동기적으로 방출되며, Publisher는 스트림이 끝날 때까지 데이터를 지속적으로 방출할 수 있다.
- Publisher는 subscribe() 메서드를 사용하여 데이터를 Subscriber에게 전달한다.
import reactor.core.publisher.Mono;
Mono<String> publisher = Mono.just(“Hello, Reactive Streams!”);
Subscriber
- Subscriber는 Publisher가 방출한 데이터를 처리하는 객체이다. Subscriber는 Publisher가 데이터를 방출할 때마다 이를 받아 처리하며, 각 데이터 항목에 대해 적절한 처리를 수행한다.
- Subscriber는 onNext(), onComplete(), onError()와 같은 메서드를 통해 이벤트를 처리한다.
- Subscriber는 Subscription 객체를 통해 Publisher와의 연결을 관리하고, 데이터를 요청할 수 있다.
publisher.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE); // 데이터를 요청
}
@Override
public void onNext(String item) {
System.out.println(item); // 받은 데이터 처리
}
@Override
public void onError(Throwable throwable) {
System.err.println(“Error: ” + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println(“Completed”);
}
});
Subscription
- Subscription은 Publisher와 Subscriber 간의 연결을 나타내며, Subscriber가 Publisher에 대해 데이터를 요청할 수 있는 방법을 제공한다. Subscription은 데이터 요청을 관리하고, Subscriber가 처리할 수 있는 양만큼 데이터를 방출하도록 한다.
- Subscription 객체는 request(long n) 메서드를 사용하여 데이터를 요청한다. 이 메서드는 Publisher가 얼마나 많은 데이터를 방출할지 제어한다.
- Subscription은 cancel() 메서드를 통해 구독을 취소할 수 있다.
subscription.request(10); // 10개의 데이터 항목을 요청
subscription.cancel(); // 구독 취소
Processor
- Processor는 Publisher이자 Subscriber로, 데이터를 처리하면서 스트림을 변환하는 역할을 한다. Processor는 Subscriber로서 데이터를 받아 처리하고, 처리된 데이터를 다른 Subscriber에게 전달하는 Publisher 역할도 수행한다.
- Processor는 transform과 같은 변환 작업을 수행할 수 있으며, 주로 스트림을 필터링하거나 변경하는 데 사용된다.
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
Flux<Integer> processed = Flux.range(1, 5)
.map(x -> x * 2); // 스트림 변환
Reactive Streams의 흐름
- Publisher는 데이터를 생성하고 방출한다.
- Subscriber는 Publisher의 데이터를 구독하고 처리한다.
- Subscription은 구독 중인 데이터 흐름을 관리하고, Subscriber가 요청하는 데이터의 양을 제어한다.
- Processor는 데이터를 변환하거나 필터링하는 데 사용된다.
Backpressure
Reactive Streams의 핵심 개념 중 하나는 Backpressure이다. Backpressure는 데이터가 너무 많이 발생하거나 소비되는 속도에 따라 흐름을 제어하는 방법을 의미한다. Subscriber는 request() 메서드를 통해 자신이 처리할 수 있는 데이터 양을 제한함으로써 과도한 데이터 방출을 방지할 수 있다. 이렇게 함으로써, 시스템의 안정성과 성능을 유지할 수 있다.
Non-Blocking
Multi-Threading
Blocking 되는 프로그램의 단점을 보완하기 위하여 CPU의 개수보다 많은 Thread를 생성하여 처리하는 방안이 만들어 졌다. 하지만, 해당 방법은 다음과 같은 단점을 가지게 된다.
컨텍스트 스위칭으로 인한 문제
멀티 스레딩을 처리하기 위한 기법으로, CPU가 처리할 수 있는 양보다 더 많은 요청을 처리하기 위하여 사용하는 방법이다. 컨텍스트 스위칭은 다음과 같은 과정을 수행하며 쓰레드를 전환하여 다수의 쓰레드 처리를 지원하게 된다.
-
현재 스레드의 상태 저장:
- 스레드가 CPU에서 실행 중일 때, CPU는 해당 스레드의 레지스터 값(예: 프로그램 카운터, 스택 포인터 등)과 기타 상태 정보를 저장한다. 이 정보를 컨텍스트(Context)라고 부른다.
- 이 상태 정보는 보통 프로세스의 PCB(Process Control Block)나 스레드의 TCB(Thread Control Block)에 저장된다.
-
새로운 스레드의 상태 로드:
- 새로운 스레드가 실행될 때, 운영 체제는 새로 실행될 스레드의 컨텍스트를 메모리에서 읽고 CPU에 로드 한다.
- 이때, 새로운 스레드는 이전에 저장된 상태 정보를 바탕으로 실행을 재개한다.
-
스케줄링:
- 운영 체제는 스레드 또는 프로세스를 스케줄링하여 CPU에서 실행할 순서를 결정한다.
- 각 스레드는 일정 시간 동안 실행되고, 시간이 다되거나, 다른 이벤트가 발생하면 다음 스레드로 전환된다.
위와 같은 행위를 통해 다음과 같은 비용이 발생하게 된다.
- 저장 및 로드: 스레드 간의 상태를 저장하고 로드하는 데 시간이 걸린다. 이 과정에서 CPU 레지스터나 메모리 접근이 필요하다.
- 캐시 미스: 각 스레드가 실행될 때마다 CPU 캐시를 다시 채워야 할 수 있다. 이는 캐시 미스를 초래하며 성능에 영향을 줄 수 있다.
- 스케줄링 오버헤드: 스레드의 실행을 제어하는 스케줄러도 추가적인 오버헤드를 발생시킨다.
Context 전환 비용이 발생한 만큼 CPU는 작업을 하지 못하게 되고, 이런 컨텍스트 스위칭이 많이 발생할수록 CPU의 전체 대기시간은 길어지기 때문에 성능이 저하된다.
과다한 메모리 사용으로 오버헤드가 발생
새로운 스레드가 실행되면 JVM에서는 해당 스레드를 위한 정보를 스택 메모리에 저장하게 된다. 기본 쓰레드 정보는 1024KB(64bit) 정도를 소모하며, 단순한 계산으로 동시 사용자 1만명이라고 하면, 대략적으로 10GB 정도의 메모리를 소모하게 된다.
스레드 풀에서의 응답 지연
스레드 생성/제거의 단점을 보완하기 위해 Thread Pool을 사용하게 된다고 하더라도, 유휴 쓰레드가 없다면, 이로 인해 응답 지연이 발생할 수 있다.
함수형 인터페이스 (Functional Interface)
Java 8부터 추가된 기능으로, 단 하나의 추상 메소드만을 가진 인터페이스를 의미한다. 함수형 인터페이스는 람다 표현식이나 메서드 참조와 함께 사용되며, 주로 클로저나 콜백 함수의 형태로 자주 활용된다.
@FunctionalInterface 어노테이션의 경우, 함수형 인터페이스임을 명시적으로 표현하기 위하여 선택적으로 사용된다.
예시 코드
@FunctionalInterface
public interface MyFunctionalInterface {
// 추상 메서드
void doSomething();
// default 메서드 (선택적)
default void defaultMethod() {
System.out.println(“This is a default method.”);
}
// static 메서드 (선택적)
static void staticMethod() {
System.out.println(“This is a static method.”);
}
}
사용 코드 – Lambda 식으로 사용된 예시
public class Main {
public static void main(String[] args) {
// 람다 표현식으로 함수형 인터페이스 구현
MyFunctionalInterface myFunc = () -> System.out.println(“Doing something!”);
// 람다 표현식 호출
myFunc.doSomething(); // 출력: Doing something!
// default 메서드 호출
myFunc.defaultMethod(); // 출력: This is a default method.
// static 메서드 호출
MyFunctionalInterface.staticMethod(); // 출력: This is a static method.
}
}
사용 코드 – Clouser로 사용된 예시
public class ClosureExample {
public static void main(String[] args) {
int outerValue = 20; // 외부 변수
// 익명 클래스는 외부 변수 ‘outerValue’를 캡처하여 사용할 수 있다.
MyFunctionalInterface closureExample = new MyFunctionalInterface() {
@Override
public void printValue() {
System.out.println(“Outer value: ” + outerValue); // 외부 변수 ‘outerValue’를 사용
}
};
closureExample.printValue();
}
}
interface MyFunctionalInterface {
void printValue(); // 함수형 인터페이스 정의
}
Callback으로 사용된 예시
public class CallbackExample {
public static void main(String[] args) {
TaskExecutor taskExecutor = new TaskExecutor();
// 람다 표현식으로 콜백 구현
taskExecutor.executeTask(result -> System.out.println(“Callback received: ” + result));
}
}
Lambda 표현식
Lambda 표현식(Lambda Expression)은 자바 8에서 도입된 기능으로, 익명 함수(Anonymous Function)를 정의하는 간결한 방법이다. 이는 주로 함수형 프로그래밍(Functional Programming) 스타일의 코드를 작성할 때 사용된다. 람다 표현식을 통해 메서드를 간단하게 표현하거나 객체를 생성할 수 있으며, 코드의 가독성이나 유지보수성도 향상시킬 수 있다.
(parameters) -> expression
(String a, String b) -> a.equals(b);
Method Reference
Method Reference는 자바 8에서 도입된 기능으로, 람다 표현식을 좀 더 간결하게 사용할 수 있도록 돕는 문법이다. 람다 표현식이 특정 메서드를 호출하는 경우, 해당 메서드를 직접 참조하는 방식으로 코드를 간결하게 작성할 수 있다. 즉, 메서드를 람다 표현식처럼 호출하되, 메서드 이름을 직접 참조하는 방법이다.
ClassName::methodName
정적 메소드 참조
// 람다 표현식
Function<Integer, Integer> squareLambda = (x) -> MathUtil.square(x);
// Method Reference
Function<Integer, Integer> squreMethodRef = MathUtil:squre;
인스턴스 메소드 참조
StringUtil util = new StringUtil();
// 람다 표현식
Function<String, String> toUpperCaseLambda = (str) -> util.toUppserCase(str);
// 메서드 레퍼런스
Function<String, String> toUpperCaseMethodRef = util::toUpperCase;
함수 디스크립터
함수 디스크립터(Function Descriptor)는 함수의 매개변수와 반환 타입을 설명하는 데 사용된다. 일반적으로 함수형 인터페이스나 람다 표현식을 다룰 때, 함수의 시그니처(signature), 즉 어떤 타입의 입력을 받고 어떤 타입의 출력을 반환하는지에 대한 정보를 정의하는 데 사용된다.
Function<T, R> (단일 입력, 출력 함수)
- 설명: Function<T, R>는 하나의 입력 매개변수 T를 받아서 결과값 R을 반환하는 함수형 인터페이스입니다.
- 함수 디스크립터: T -> R
- Function<Integer, String>에서 함수 디스크립터는 Integer -> String이다.
import java.util.function.Function;
public class FunctionDescriptorExample {
public static void main(String[] args) {
// Function<T, R> 인터페이스를 사용하는 예시
Function<Integer, String> intToString = (i) -> “Number: ” + i;
System.out.println(intToString.apply(5)); // “Number: 5”
}
}
BiFunction<T, U, R> (두 입력, 출력 함수)
- 설명: BiFunction<T, U, R>는 두 개의 입력 매개변수 T와 U를 받아서 결과값 R을 반환하는 함수형 인터페이스입니다.
- 함수 디스크립터: (T, U) -> R
- BiFunction<Integer, Integer, Integer>에서 함수 디스크립터는 (Integer, Integer) -> Integer이다.
import java.util.function.BiFunction;
public class BiFunctionDescriptorExample {
public static void main(String[] args) {
// BiFunction<T, U, R> 인터페이스를 사용하는 예시
BiFunction<Integer, Integer, Integer> sum = (a, b) -> a + b;
System.out.println(sum.apply(5, 3)); // 8
}
}
Predicate<T> (조건 검사 함수)
- 설명: Predicate<T>는 하나의 입력 T를 받아서 boolean 값을 반환하는 함수형 인터페이스입니다. 주로 조건을 체크하는 데 사용됩니다.
- 함수 디스크립터: T -> Boolean
- Predicate<String>에서 함수 디스크립터는 String -> boolean이다.
import java.util.function.Predicate;
public class PredicateDescriptorExample {
public static void main(String[] args) {
// Predicate<T> 인터페이스를 사용하는 예시
Predicate<String> isNotEmpty = str -> !str.isEmpty();
System.out.println(isNotEmpty.test(“Hello”)); // true
}
}
Consumer<T> (입력만 받는 함수)
- 설명: Consumer<T>는 하나의 입력 T를 받아서 반환값 없이 작업을 수행하는 함수형 인터페이스입니다. 주로 출력 작업이나 부수 효과(side effect)를 수행하는 데 사용됩니다.
- 함수 디스크립터: T -> void
- Consumer<String>에서 함수 디스크립터는 String -> void이다.
import java.util.function.Consumer;
public class ConsumerDescriptorExample {
public static void main(String[] args) {
// Consumer<T> 인터페이스를 사용하는 예시
Consumer<String> printMessage = msg -> System.out.println(msg);
printMessage.accept(“Hello, world!”); // “Hello, world!”
}
}
Supplier<T> (출력만 하는 함수)
- 설명: Supplier<T>는 입력 없이 결과값 T를 반환하는 함수형 인터페이스입니다. 주로 생성이나 값 제공의 역할을 합니다.
- 함수 디스크립터: () -> T
- Supplier<String>에서 함수 디스크립터는 () -> String이다.
import java.util.function.Supplier;
public class SupplierDescriptorExample {
public static void main(String[] args) {
// Supplier<T> 인터페이스를 사용하는 예시
Supplier<String> getMessage = () -> “Hello, world!”;
System.out.println(getMessage.get()); // “Hello, world!”
}
}
UnaryOperator<T> (단일 입력, 동일 타입 출력 함수)
- 설명: UnaryOperator<T>는 T 타입을 입력받아 같은 T 타입을 반환하는 함수형 인터페이스입니다. 즉, 입력과 출력의 타입이 동일한 함수입니다.
- 함수 디스크립터: T -> T
- 실행 함수: apply(T)
- UnaryOperator<Integer>에서 함수 디스크립터는 Integer -> Integer이다.
import java.util.function.UnaryOperator;
public class UnaryOperatorDescriptorExample {
public static void main(String[] args) {
// UnaryOperator<T> 인터페이스를 사용하는 예시
UnaryOperator<Integer> doubleValue = n -> n * 2;
System.out.println(doubleValue.apply(5)); // 10
}
}
BinaryOperator<T> (두 입력, 동일 타입 출력 함수)
- 설명: BinaryOperator<T>는 두 개의 입력 T를 받아서 동일한 T 타입을 반환하는 함수형 인터페이스입니다. 즉, 두 개의 입력이 동일한 타입일 때 사용됩니다.
- 함수 디스크립터: (T, T) -> T
- 실행 함수: apply(T, T)
- BinaryOperator<Integer>에서 함수 디스크립터는 (Integer, Integer) -> Integer이다.
import java.util.function.BinaryOperator;
public class BinaryOperatorDescriptorExample {
public static void main(String[] args) {
// BinaryOperator<T> 인터페이스를 사용하는 예시
BinaryOperator<Integer> add = (a, b) -> a + b;
System.out.println(add.apply(5, 3)); // 8
}
}
Runnable (입력 없이 실행만 하는 함수)
- 설명: Runnable은 입력 없이 출력 없이 실행만 하는 함수형 인터페이스입니다. 주로 쓰레드 실행에서 사용됩니다.
- 함수 디스크립터: () -> void
- 실행 함수: run()
- Runnable에서 함수 디스크립터는 () -> void이다.
public class RunnableDescriptorExample {
public static void main(String[] args) {
// Runnable 인터페이스를 사용하는 예시
Runnable printHello = () -> System.out.println(“Hello, world!”);
printHello.run(); // “Hello, world!”
}
}
마블 다이어그램
마블 다이어그램(Marble Diagram)은 주로 리액티브 프로그래밍에서 데이터 흐름이나 시간 기반의 연산을 시각적으로 표현하는 도구이다. 마블 다이어그램은 리액티브 스트림의 연산 결과가 시간에 따라 어떻게 변하는지, 그리고 데이터가 흐르는 과정을 쉽게 이해할 수 있도록 돕는다. 이 다이어그램은 주로 RxJS(Reactive Extensions for JavaScript)나 Project Reactor(Java의 리액티브 라이브러리)에서 많이 사용된다.
마블 다이어그램은 다음과 같은 요소들로 구성된다:
-
마블(Marble): 데이터 스트림에서 흐르는 각 이벤트나 값을 나타내는 기호. 보통 문자나 숫자로 표시되며, 이들은 시간의 흐름에 따라 나열된다.
예: –1–2–3–|는 1, 2, 3이라는 값들이 순차적으로 발행되고 스트림이 종료되는 상태를 나타낸다.
- 타임축(Time axis): 시간이 흐르는 방향을 나타내는 선. 이 선을 따라 데이터가 흐르고, 연산이 어떻게 적용되는지 보여준다.
- 연산자(Operator): 스트림에서 발생하는 변환, 필터링, 결합 등을 나타내는 연산을 시각적으로 나타낸다. 연산자는 특정 시점에서 값들을 어떻게 변환할지 정의한다.
- 구독(Subscription): 데이터 스트림을 구독하는 소비자를 나타낸다. 이 구독자는 스트림에서 발행된 값을 수신하고 이를 처리한다.
예시 Filter 연산자
–1–2–3–4–5–| (filter x -> x % 2 == 0)
—–2—–4—–|
Sequence
Cold Sequence와 Hot Sequence는 리액티브 프로그래밍에서 데이터 스트림의 특성을 설명하는 개념이다. 이들은 데이터 흐름의 생성 시점과 구독 시점에 따른 동작 방식을 구분하는 데 사용된다. 간단히 말하면, Cold Sequence는 구독 시점에 데이터를 생성하는 반면, Hot Sequence는 이미 데이터가 존재하며 구독자와 상관없이 데이터를 제공하는 방식이다.
Cold Sequence (콜드 시퀀스)
설명: Cold Sequence는 구독이 시작될 때마다 데이터를 새로 생성하는 스트림을 말한다. 즉, 각 구독자는 해당 스트림의 데이터 흐름을 처음부터 끝까지 독립적으로 받는다.
특징:
- 구독자가 구독을 시작할 때마다, 데이터는 다시 처음부터 생성된다.
- 스트림이 구독자에 의해 “시작”되며, 각 구독자는 독립적인 데이터 흐름을 갖는다.
- 스트림에 대한 시작점은 구독이 일어날 때마다 초기화된다.
예시: 파일 읽기, 네트워크 요청 등.
Observable<Integer> coldObservable = Observable.create(emitter -> {
System.out.println(“Emitting values”);
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
});
coldObservable.subscribe(value -> System.out.println(“Subscriber 1: ” + value)); // “Emitting values” 출력 후, 값 발행
coldObservable.subscribe(value -> System.out.println(“Subscriber 2: ” + value)); // “Emitting values” 출력 후, 값 발행
동작
위 코드에서 coldObservable는 각 구독자에게 독립적으로 데이터를 새로 생성한다. 즉, 첫 번째 구독자가 데이터를 받을 때마다 Emitting values 메시지가 출력되고, 두 번째 구독자도 데이터를 새로 받는다.
Hot Sequence (핫 시퀀스)
설명: Hot Sequence는 구독 여부와 관계없이 데이터를 미리 생성하는 스트림이다. 즉, 데이터를 이미 생성해 놓고, 구독자가 구독할 때 데이터를 지속적으로 제공한다. 구독자는 이미 존재하는 데이터 흐름에 참여하는 형태이다.
특징:
- 구독자가 구독을 시작하더라도, 데이터가 이미 발행되었거나 발행 중일 수 있다.
- 스트림은 구독자와 관계없이 계속해서 데이터를 방출하고, 구독자는 실시간으로 데이터의 일부만 받을 수 있다.
- 구독자가 나중에 구독을 하더라도, 이미 지나간 데이터는 받지 못한다.
예시: 실시간 데이터 스트림, 웹소켓, 주식 거래 데이터, 센서 데이터 등.
ConnectableObservable<Integer> hotObservable = Observable.create(emitter -> {
System.out.println(“Emitting values”);
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}).publish();
hotObservable.subscribe(value -> System.out.println(“Subscriber 1: ” + value)); // “Emitting values” 출력 후, 값 발행
hotObservable.connect(); // 스트림 시작
hotObservable.subscribe(value -> System.out.println(“Subscriber 2: ” + value)); // Subscriber 2는 이미 발행된 값 중에서만 수신
동작
hotObservable은 connect()를 호출하여 구독자들과 관계없이 데이터를 발행한다. 두 번째 구독자는 이미 발행된 값을 받을 수 없으며, 실시간으로 방출되는 데이터만 받는다.
Backpressure
Backpressure는 리액티브 스트림에서 구독자가 처리할 수 있는 속도보다 더 빠르게 데이터가 방출될 때 발생하는 문제를 다루는 개념이다. 이는 특히 리액티브 프로그래밍에서 중요한 문제로, 데이터를 너무 많이 보내서 소비자가 처리할 수 없는 상황을 방지하기 위해 사용된다.
해결 방법
리액티브 스트림에서 Backpressure를 다루는 주요 방법은 구독자가 처리할 수 있는 양을 알려주고, 생산자가 이에 맞춰 데이터를 보낼 수 있도록 조정하는 것이다. 이를 위해서는 Publisher와 Subscriber가 상호작용할 수 있는 Backpressure 메커니즘이 필요하다.
리액티브 스트림은 **Reactive Streams**라는 표준을 따르며, 이를 통해 Backpressure를 제어할 수 있다. 구체적으로, onBackpressureXXX 메서드를 사용하여 스트림의 흐름을 제어한다.
Reactive Streams 표준에서 Backpressure
Reactive Streams는 구독자와 발행자 간의 백프레셔를 처리하는 표준 인터페이스를 정의하고 있다. 이 표준은 Publisher, Subscriber, Subscription, Processor를 사용하며, onBackpressureXXX 메서드를 통해 Backpressure를 다룬다.
- Subscriber는 onNext, onComplete, onError 메서드를 통해 데이터를 처리하고, request(n) 메서드를 통해 데이터 처리량을 제어한다.
- Publisher는 onSubscribe 메서드로 구독자와 연결되며, 구독자가 처리할 수 있는 데이터 양을 제어하는 방식으로 데이터를 발행한다.
Backpressure 처리 방법
-
onBackpressureBuffer(): 데이터가 구독자의 처리 속도를 초과할 때, 버퍼에 데이터를 저장하고 구독자가 처리할 수 있을 때까지 대기한다. 또한 그래도 계속 데이터가 들어올 경우, 다음 규칙에 따라 데이터를 Drop 시킨다.
-
DROP_OLDEST
- 버퍼가 가득 차면 가장 오래된 데이터를 버리고 새로운 데이터를 추가한다.
- 최신 데이터를 우선시하며, 시스템의 메모리 사용을 최적화할 수 있다.
-
DROP_LATEST
- 버퍼가 가득 차면 가장 최근의 데이터를 버리고 기존 데이터를 계속 보존합니다.
- 이 전략은 기존의 데이터를 우선시하며, 최신 데이터를 버리게 됩니다.
-
BLOCK
- 버퍼가 가득 차면 구독자가 데이터를 처리할 때까지 기다립니다.
- 처리할 공간이 생길 때까지 스트림을 일시적으로 차단합니다.
-
IGNORE
- 버퍼가 가득 차면 새로운 데이터를 무시합니다. 구독자는 데이터 흐름이 끊기게 됩니다.
- onBackpressureDrop(): 구독자가 처리할 수 없는 데이터를 버리도록 설정한다. 즉, 구독자가 처리할 수 없으면 그 데이터를 무시하고 계속 진행한다.
- onBackpressureLatest(): 구독자가 처리할 수 없을 때, 가장 최신 데이터만 유지하고 이전 데이터를 버린다.
- onBackpressureError(): 구독자가 처리할 수 없는 데이터를 발견하면 오류를 발생시킨다. 이 방법은 Backpressure를 해결하기 위해 더 이상 데이터를 발행할 수 없다는 의미로 사용된다.
코드 예시
- 아래 예제에서는 Flux.create()를 통해 계속해서 데이터를 생성합니다.
- onBackpressureBuffer(10)을 사용하여 구독자가 처리할 수 없으면 최대 10개의 데이터까지 버퍼에 저장하도록 설정합니다.
- doOnNext()를 사용하여 각 데이터에 대해 100ms의 딜레이를 주어 처리 속도를 늦추는 효과를 시뮬레이션합니다.
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class BackpressureController {
@GetMapping(“/backpressure”)
public Flux<Integer> handleBackpressure() {
// 데이터를 방출하는 Flux
return Flux.create(emitter -> {
for (int i = 1; i <= 100; i++) {
emitter.next(i); // 계속해서 데이터를 생성
}
emitter.complete();
})
.onBackpressureBuffer(10) // 구독자가 처리할 수 없는 경우 최대 10개의 데이터를 버퍼에 저장
.doOnNext(data -> {
try {
Thread.sleep(100); // 처리 시간이 긴 작업 시뮬레이션
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
Sinks
Sinks는 데이터를 발행하거나 수신하는 역할을 하는 리액티브 스트림의 Producer와 Consumer 사이의 중간 객체로 동작한다. 이는 Flux나 Mono와 같은 리액티브 타입에 데이터를 전송하는 데 중요한 역할을 하며, 여러 스트림(Flux, Mono)을 구독하거나 이벤트를 발행할 수 있다.
Sinks는 Sink 인터페이스를 사용하여 여러 가지 방식으로 데이터를 다룰 수 있는 다양한 구현을 제공한다. 주로 Sinks.Many와 Sinks.One 두 가지 주요 구현이 있다.
-
Sinks.Many: 여러 개의 데이터를 발행할 수 있는 Sink이다. 이 타입은 Flux처럼 여러 개의 항목을 비동기적으로 발행할 수 있다.
-
UnicaseSpec
- 설명: unicast()는 하나의 구독자에게만 데이터를 발행할 수 있도록 설정하는 방식입니다. 즉, 구독자가 하나일 때만 데이터를 전달하며, 다른 구독자들은 해당 데이터를 받지 못합니다. 구독자가 없으면 데이터를 발행하지 않으며, 구독자가 추가되면 그때부터 데이터를 발행합니다.
-
특징
- 하나의 구독자에게만 데이터를 발행
- 구독자 개별적 처리가 필요할 때 사용
- 구독자가 없으면 데이터가 발행되지 않음
- 단일 구독자에 적합
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
-
Multicase()
- 설명: multicast()는 데이터를 여러 구독자에게 동시에 발행할 수 있도록 설정하는 방식입니다. 데이터를 한 번 발행하면, 그 데이터를 모든 구독자에게 동시에 전달합니다. 이 방식은 여러 구독자가 같은 데이터를 처리해야 할 때 유용합니다.
-
특징
- 여러 구독자에게 데이터를 동시에 발행
- 모든 구독자가 동일한 데이터를 수신
- 구독자가 추가되더라도, 이미 발행된 데이터는 받지 않음 (단, 구독 시작 이후 발행된 데이터는 모두 받음)
- 멀티 구독자 환경에 적합
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
- Sinks.One: 하나의 데이터만 발행할 수 있는 Sink이다. 이 타입은 Mono처럼 한 번의 데이터 스트림을 처리하는 데 사용된다.
Sinks 클래스는 데이터를 발행하고 완료 처리하는 다양한 메서드를 제공한다. 대표적인 메서드는 다음과 같다:
- emitNext(T value, EmitFailureHandler failureHandler): 데이터를 발행한다. 주어진 값을 스트림에 방출한다.
- emitComplete(EmitFailureHandler failureHandler): 스트림을 완료 상태로 설정한다. 더 이상 데이터가 발행되지 않음을 나타낸다.
- emitError(Throwable error, EmitFailureHandler failureHandler): 에러를 발생시킨다.
주요 예제
AsFlux를 통한 생성
// Sinks.Many 생성
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
// Flux로 데이터 발행하기
Flux<String> flux = sink.asFlux();
Schedule
Reactive 프로그래밍에서 Scheduler는 비동기 작업이나 스트림의 실행 타이밍과 실행 스레드를 제어하는 중요한 역할을 한다. Reactive 시스템에서는 여러 작업이 비동기적으로 처리되므로, 작업이 언제 실행될지, 어떤 스레드에서 실행될지를 명시적으로 관리해야 한다. 이를 위해 Reactive 프로그래밍은 Scheduler라는 개념을 사용하여 작업 흐름을 제어한다.
subscribeOn, publishOn, runOn등의 메소드를 통해 제어할 수 있다.
목적
- Reactive 프로그래밍에서는 여러 작업이 병렬로 실행될 수 있다. 이를 제어하기 위해 스케줄링을 사용한다.
- 스케줄링을 통해 각 작업이 실행될 타이밍과 스레드를 제어하며, 데이터 흐름을 관리한다.
- 이를 통해 스레드 안전을 보장하고, 백프레셔(backpressure)와 같은 문제를 해결할 수 있다.
역할
- 작업의 실행 위치를 결정: 작업이 메인 스레드, I/O 스레드, 또는 다른 스레드에서 실행될지를 지정한다.
- 작업의 실행 시점을 결정: 예를 들어, 특정 시간 이후에 작업을 실행하거나, 바로 실행되도록 스케줄링할 수 있다.
- 비동기 처리의 흐름을 제어: 비동기 작업들이 서로 충돌하지 않도록 실행 타이밍을 조절한다.
Scheduler의 종류
Immediate Scheduler
작업을 현재 스레드에서 즉시 실행하도록 스케줄링한다.
예를 들어, Schedulers.immediate()는 작업을 즉시 실행하며, 현재 스레드에서 실행된다.
Mono.just(“Hello”)
.subscribeOn(Schedulers.immediate()) // 즉시 실행
.doOnNext(System.out::println)
.subscribe();
Single Scheduler
모든 작업을 단일 스레드에서 실행하도록 스케줄링한다.
예를 들어, Schedulers.single()은 작업을 단일 스레드에서 처리하게 되며, 주로 UI 스레드나 단일 쓰레드 처리가 필요한 경우 사용된다.
Mono.just(“Hello”)
.subscribeOn(Schedulers.single()) // 단일 스레드에서 실행
.doOnNext(System.out::println)
.subscribe();
Elastic Scheduler
유연한 스레드 풀을 사용하여 작업을 스케줄링한다. 이 스케줄러는 I/O 작업이나 비동기적인 작업에 적합하다.
Schedulers.elastic()은 백그라운드 스레드를 사용할 때 유용하며, 여러 작업을 유연하게 분산시킬 수 있다.
Mono.just(“Hello”)
.subscribeOn(Schedulers.elastic()) // I/O 처리를 위한 유연한 스레드 풀 사용
.doOnNext(System.out::println)
.subscribe();
Bounded Elastic Scheduler
Schedulers.boundedElastic()은 Elastic 스케줄러의 개선된 버전입니다. 이 스케줄러는 동적으로 스레드를 생성하되, 최대 스레드 수를 제한하여 자원 소비를 효율적으로 관리합니다.
스레드 풀의 크기 제한: boundedElastic()은 스레드를 동적으로 생성하지만, 최대 스레드 수를 제한합니다. 이를 통해 과도한 스레드 생성으로 인한 자원 낭비를 방지할 수 있습니다.
블로킹 작업 처리: 주로 파일 I/O, 네트워크 I/O, 데이터베이스 쿼리와 같이 블로킹 작업을 처리하는 데 유용합니다.
성능 최적화: 기본적으로 200개의 스레드를 생성하고, 이를 초과하면 새 작업을 대기시키며 자원 관리가 이루어집니다. 이 방식은 스레드 풀이 과도하게 커지는 것을 방지합니다.
Mono.fromCallable(() -> {
// 블로킹 작업 예시 (예: 파일 읽기, DB 쿼리 등)
return “Hello from blocking operation”;
})
.subscribeOn(Schedulers.boundedElastic()) // 블로킹 작업에 적합한 스케줄러
.doOnNext(System.out::println)
.subscribe();
Parallel Scheduler
멀티 스레드에서 작업을 실행할 때 사용한다. 작업을 여러 스레드에서 병렬로 처리하려면 Schedulers.parallel()을 사용한다.
이 스케줄러는 병렬 처리를 위해 스레드 풀을 사용하고, CPU 바운드 작업에 적합하다.
Mono.just(“Hello”)
.subscribeOn(Schedulers.parallel()) // 병렬 스레드에서 실행
.doOnNext(System.out::println)
.subscribe();
NewThread Scheduler
새로운 스레드에서 작업을 실행할 때 사용된다. Schedulers.newSingle()은 새로운 단일 스레드를 생성하여 작업을 실행한다. 두번째 인자를 통해 Daemon Thread (Main Thread와 Lifecycle을 같이 하는)형태로 생성이 가능하다.
Mono.just(“Hello”)
.subscribeOn(Schedulers.newSingle(“new-thread”, true)) // 새로운 스레드에서 실행
.doOnNext(System.out::println)
.subscribe();
New XXX Scheduler
이외의 New로 시작되는 모두 새로운 쓰레드 타입들을 생성하여 스케줄러를 실행한다. (newSingle, newBoundedElastic, newParallel)
ExecutorService Scheduler
주어진 ExecutorService를 기반으로 작업을 실행하는 스케줄러를 생성하는 메소드이다. 이 메소드는 사용자 정의 스레드 풀이나 기존의 스레드 풀을 활용하여 작업을 실행하고자 할 때 유용하다.
기존의 ExecutorService 사용: 이 메소드는 사용자가 제공하는 ExecutorService를 기반으로 작업을 실행한다. 따라서 스레드 풀을 이미 설정하고 있다면, 이를 Reactor의 스케줄링에 통합할 수 있다.
스레드 풀 관리: ExecutorService의 스레드 풀 크기와 관리 정책을 그대로 사용할 수 있다. 예를 들어, CachedThreadPool, FixedThreadPool, SingleThreadExecutor와 같은 다양한 ExecutorService 구현체를 사용할 수 있다.
동시성 및 비동기 처리: ExecutorService에서 제공하는 비동기 작업 실행 및 동시성 기능을 그대로 활용하여 작업을 처리한다.
// 사용자 정의 ExecutorService (예: FixedThreadPool)
ExecutorService executorService = Executors.newFixedThreadPool(4);
Mono.just(“Hello, Reactor!”)
.subscribeOn(Schedulers.fromExecutorService(executorService)) // 사용자 정의 ExecutorService 사용
.doOnNext(System.out::println)
.subscribe();
// ExecutorService 종료 (리소스 해제)
executorService.shutdown();
스케줄러 동작 타입
subscribeOn()
- 스트림의 구독 시점에 스케줄러를 지정한다.
- 스트림 전체가 구독될 때, 구독을 처리하는 스레드를 제어한다.
- 첫 번째 스케줄링에 영향을 미친다.
publishOn()
- 중간 연산자에서 스케줄러를 변경한다.
- 연산자 실행 이후의 스레드를 제어한다.
- 후속 연산에 영향을 미친다.
parallel().runOn()
- 특정 연산을 실행할 스케줄러를 지정한다.
스케줄러에 따른 동작
기본 동작
Flux.fromArray(…) main thread
.filter(…) main thread
.map(…) main thread
.subscribe(…) main thread
PublishOn
Flux.fromArray(…) main thread
.publishOn(…)
.filter(…) A thread
.map(…) A thread
.subscribe(…) A thread
2 PublishOn
Flux.fromArray(…) main thread
.publishOn(…)
.filter(…) A thread
.publishOn(…)
.map(…) B thread
.subscribe(…) B thread
SubscribeOn
Flux.fromArray(…) A thread
.subscribeOn(…) A thread
.filter(…) A thread
.map(…) A thread
.subscribe(…) A thread
SubscribeOn with PublishOn
Flux.fromArray(…) A thread
.subscribeOn(…) A thread
.filter(…) A thread
.publishOn(…)
.map(…) B thread
.subscribe(…) B thread
Context
컨텍스트(Context)는 프로그래밍에서 주로 상태나 환경 정보를 관리하는 객체나 개념을 의미한다. 특히 리액티브 프로그래밍에서는 각각의 비동기 흐름이나 연산이 수행될 때, 해당 흐름의 상태나 정보를 유지하는 데 사용된다. 간단히 말하면, 프로그램의 실행 중에 특정한 정보나 상태를 관리하고 전달하는 방법이다.
Context의 주요 역할
- 상태 전달: 비동기 작업에서 연산이 서로 다른 스레드에서 실행될 때, 해당 작업에 필요한 상태나 값을 전달하는 역할을 한다.
- 디버깅 및 추적: 각 작업의 상태를 추적하거나, 로그를 남길 때, 컨텍스트를 활용하여 작업 간의 연관된 정보를 추적할 수 있다.
- 보안 정보 전달: 인증된 사용자의 정보나 보안 토큰을 연산 흐름에 전달할 때 사용될 수 있다.
- 성능 최적화: 작업 간의 상태를 컨텍스트에서 관리하여, 필요할 때만 해당 정보를 로딩하거나 전달하도록 최적화할 수 있다.
코드 예시
Mono.deferContextual(context -> {
String user = context.get(“user”); // Context에서 ‘user’ 정보를 가져옴
return Mono.just(“Hello, ” + user);
}).contextWrite(context -> context.put(“user”, “John Ddoe”));
// 컨텍스트에 ‘user’ 추가
Context에 데이터 쓰기
contextWrite()메소드를 통해서 Context에 데이터를 쓸 수 있다.
.contextWrite(context -> context.put(“lastName”, “Jobs”)
Context에서 데이터 읽기
Context에서 데이터를 읽는 방식은 두가지가 있다. 원본 데이터 소스 레벨에서 읽는 방식과, Operator 체인의 중간에서 읽는 방식이 있다.
원본 데이터 소스 레벨
원본 데이터 소스 레벨에서 데이터를 읽으려면, deferContextual()를 통해 읽으면 된다.
Operator 체인의 중간 레벨
중간 레벨에서 접근 하려면, transformDefferedContexual()을 통해 접근할 수 있다.
Spring boot에서의 Web Flux
Spring Boot에서는 다음과 같은 과정을 거쳐 Web Flux 비동기 Controller를 생성할 수 있다.