Java/Java

Java 가상 스레드(Virtual Thread)의 이해: 주의할점, Scope Value, 구조화된 동시성 -2

ysk(0soo) 2024. 3. 29. 23:38

가상 스레드 풀 사용시 주의할점.

가상 스레드가 가볍다고 해서 무조건 좋은것은 아닙니다. 다음과 같은 내용을 주의해야 합니다.

  • 가상 스레드의 스레드풀을 사용할때에는 고정 풀 을 사용하면 안된다.
  • 동시성을 제어하기 위해서 synchronized 키워드 대신 Lock을 사용하자
  • 스레드 로컬에 용량이 큰 객체를 저장하지 말자.

1. 가상 스레드의 스레드풀을 사용할때에는 고정 풀을 사용하면 안된다.

가상 스레드는 고정된 풀 (newFixedThreadPool)을 사용하면 안됩니다.

지정된 한도 이내에 더 많은 가상 스레드를 만들지 못하기 때문입니다.

또한 1회용이기 때문에 풀에 담아 사용하지 말고, 그냥 생성해서 사용하는게 좋습니다.

스레드 실행 수를 제한하고 싶으면, semaphore를 사용할 수 있습니다.

import java.util.concurrent.Semaphore;

public class ConcurrencyLimiter implements AutoCloseable {

    private static final Logger log = LoggerFactory.getLogger(ConcurrencyLimiter.class);

    private final ExecutorService executor;
    private final Semaphore semaphore;
    private final Queue<Callable<?>> queue;

    public ConcurrencyLimiter(ExecutorService executor, int limit) {
        this.executor = executor;
        this.semaphore = new Semaphore(limit);
        this.queue = new ConcurrentLinkedQueue<>();
    }

    public <T> Future<T> submit(Callable<T> callable) {
        this.queue.add(callable);
        return executor.submit(() -> executeTask());
    }

    private <T> T executeTask() {
        try {
            semaphore.acquire();

            return (T)this.queue
                .poll()
                .call();

        } catch (Exception e) {
            log.error("error", e);
        } finally {
            semaphore.release();
        }
        return null;
    }

    @Override
    public void close() throws Exception {
        this.executor.close();
    }
}


public class ConcurrencyLimitWithSemaphore {

    private static final Logger log = LoggerFactory.getLogger(ConcurrencyLimitWithSemaphore.class);

    public static void main(String[] args) throws Exception {
        var factory = Thread.ofVirtual().name("vins", 1).factory();
        var limiter = new ConcurrencyLimiter(Executors.newThreadPerTaskExecutor(factory), 3);
      // 3회로 제한한다. 
        execute(limiter, 200);
    }

    private static void execute(ConcurrencyLimiter concurrencyLimiter, int taskCount) throws Exception {
        try(concurrencyLimiter){
            for (int i = 1; i <= taskCount; i++) {
                int j = i;
                concurrencyLimiter.submit(() -> printProductInfo(j));
            }
            log.info("submitted");
        }
    }

    // 3rd party service
    // contract: 3 concurrent calls are allowed
    private static String printProductInfo(int id){
        var product = Client.getProduct(id);
        log.info("{} => {}", id, product);
        return product;
    }
}

var limiter = new ConcurrencyLimiter(Executors.newThreadPerTaskExecutor(factory), 3);

  • 동시 호출 수를 3회로 제한한다.

2. 동시성을 제어하기 위해서 synchronized 키워드 대신 Lock을 사용하자

Pinning Thread(고정된 스레드) 문제

Pinning 쓰레드는 가상 스레드가 I/O 작업이나 기타 블로킹 연산을 수행할 때 발생하는 현상을 가리킵니다.

Virtual Thread가 플랫폼 스레드에 고정되어 장점을 활용할 수 없는 경우가 있습니다.

바로 Virtual Thread 내에서 synchronized block을 사용하거나, JNI를 통해 네이티브 메서드를 사용하는 경우입니다.

일반적으로 가상 스레드는 실행 중인 작업이 블로킹 상태가 되면, 그 스레드를 실행 중인 캐리어 스레드에서 분리하여 다른 작업을 수행할 수 있게 합니다.

synchronized 블록 내부에서 블로킹 I/O 작업을 수행하면, 해당 작업이 완료될 때까지 가상 스레드가 캐리어 스레드에서 분리되지 않고 고정되어 처리 능력에 영향을 줄 수 있습니다.

따라서 가상 스레드를 사용할 때는 블로킹 I/O 작업이나 synchronized 블록의 사용을 신중하게 고려해야 하며, 가능한 경우 논블로킹 I/O 작업을 수행하거나, 동시성을 관리하기 위해 java.util.concurrent 패키지의 도구들을 활용하는 것이 좋습니다.

SpringBoot와 자바 21을 이용한다면, 실제 내가 끌어다 쓰는 프레임 워크 내부 구현이 Synchronized로 선언되어있는지 확인하고 사용하는것이 좋습니다.

가상 스레드 사용시 타 라이브러리가 이렇게 막혀있다면 어떻게 미리 탐지하여 pinning을 막을 수 있을까?

리액터의 BlockHound처럼 가상스레드에서도 미리 pinning을 방지할 수 있습니다.

public class Synchronization {

    // Use this to check if virtual threads are getting pinned in your application
    static {
        System.setProperty("jdk.tracePinnedThreads", "short");
    }
}

아래와같이 로깅이 되서 추적해서 잡을 수 있게됩니다.

Thread[#80,ForkJoinPool-1-worker-10,5,CarrierThreads]
    com.ys.example.ioTask(Synchronization.java:47) <== monitors:1

syncronized 대신 동시 접근을 방지하는법

synchronized보다는 ReentrantLock을 권장합니다.

ReentrantLock과 달리, synchronized 키워드는 내부적으로 스레드를 OS 레벨의 락에 묶어버리는데, 이렇게되면 pinning이 발생하기 때문에 캐리어 스레드가 묶여 스케줄링 하지 못하기 때문입니다.

스레드 핀닝의 영향을 받지 않게됩니다.

ReentrantLock의 장점은 다음과 같습니다.

  • synchronized보다 유연성을 제공합니다.
  • 공정성을 지원합니다.
    • 더 오래 기다린 스레드가 잠금을 획득할 기회를 얻습니다.
  • 타임아웃이 있는 tryLock 을 지원합니다.
    • 스레드가 잠금을 획득하기 위해 대기할 수 있는 최대 시간을 설정할 수 있습니다.

아래처럼 Lock을 사용하면 동시 접근에 대한 정합성이 보장됩니다.

public class ReentrantLock {

    private static final Logger log = LoggerFactory.getLogger(ReentrantLock.class);
    private static final Lock lock = new ReentrantLock();
    private static final List<Integer> list = new ArrayList<>();

    public static void main(String[] args) {

        demo(Thread.ofVirtual());

        CommonUtils.sleep(Duration.ofSeconds(2));

        log.info("list size: {}", list.size());
    }

    private static void demo(Thread.Builder builder){
        for (int i = 0; i < 50; i++) {
            builder.start(() -> {
                log.info("Task started. {}", Thread.currentThread());
                for (int j = 0; j < 200; j++) {
                    inMemoryTask();
                }
                log.info("Task ended. {}", Thread.currentThread());
            });
        }
    }

    private static void inMemoryTask(){
        try{
            lock.lock();
            list.add(1);
        }catch (Exception e){
            log.error("error", e);
        }finally {
            lock.unlock();
        }
    }

}

3. 스레드 로컬에 용량이 큰 객체를 저장하지 말자.

가상 스레드와 스레드 로컬 그리고 Scope Value

가상 스레드도 스레드 로컬을 사용할 수 있으며 가상 스레드의 자식 가상 스레드도 스레드 로컬이 전파됩니다.

public class ThreadLocal {

    private static final Logger log = LoggerFactory.getLogger(ThreadLocal.class);
    private static final ThreadLocal<String> SESSION_TOKEN = new ThreadLocal<>();

    public static void main(String[] args) {

        Thread.ofVirtual().name("virtual-1").start( () -> processIncomingRequest());
        Thread.ofVirtual().name("virtual-2").start( () -> processIncomingRequest());

        CommonUtils.sleep(Duration.ofSeconds(1));
    }

    private static void processIncomingRequest(){
        authenticate();
        controller();
    }

    private static void authenticate(){
        var token = UUID.randomUUID().toString();
        log.info("token={}", token);
        SESSION_TOKEN.set(token);
    }

    private static void controller(){
        log.info("controller: {}", SESSION_TOKEN.get());
        service();
    }

    private static void service(){
        log.info("service: {}", SESSION_TOKEN.get());
        var threadName = "child-of-" + Thread.currentThread().getName();
        Thread.ofVirtual().name(threadName).start(ThreadLocal::callExternalService);
    }

    // This is a client to call external service
    private static void callExternalService(){
        log.info("preparing HTTP request with token: {}", SESSION_TOKEN.get());
    }

}

결과

// here
00:11:01.874 [virtual-1] INFO com.ys.ThreadLocal -- token=03dca3e7-9c5e-4b8c-a535-0cf4b7a8655f

00:11:01.874 [virtual-2] INFO com.ys.ThreadLocal -- token=b56e4b9a-3569-4525-a5fc-bf1d1cf11807
00:11:01.876 [virtual-2] INFO com.ys.ThreadLocal -- controller: b56e4b9a-3569-4525-a5fc-bf1d1cf11807

// here
00:11:01.876 [virtual-1] INFO com.ys.ThreadLocal -- controller: 03dca3e7-9c5e-4b8c-a535-0cf4b7a8655f
00:11:01.876 [virtual-2] INFO com.ys.ThreadLocal -- service: b56e4b9a-3569-4525-a5fc-bf1d1cf11807

//here
00:11:01.876 [virtual-1] INFO com.ys.ThreadLocal -- service: 03dca3e7-9c5e-4b8c-a535-0cf4b7a8655f

//here
00:11:01.877 [child-of-virtual-1] INFO com.ys.ThreadLocal -- preparing HTTP request with token: 03dca3e7-9c5e-4b8c-a535-0cf4b7a8655f
00:11:01.877 [child-of-virtual-2] INFO com.ys.ThreadLocal -- preparing HTTP request with token: b56e4b9a-3569-4525-a5fc-bf1d1cf11807

자식스레드에서 스레드 로컬에 무슨 짓을 하더라도 부모 스레드로는 전파되지 않습니다. 자식스레드로 사본을 제공하기 때문입니다.

그런데, 문제는 너무 복제가 많아지면 메모리나 관리적에서 힘들 수 있어 가상 스레드를 사용한 코드에서 스레드 로컬은 지양하는것이 좋습니다.

이 문제를 자바에서는 Scope Value란것을 도입해서 해결하려고 합니다.


Scope Value

스코프 벨류란, 범위가 지정된 값을 사용하는 것입니다.

스레드로컬의 문제에서 보면, 복제된 스레드가 상위 스레드의 스레드 로컬 정보를 가지고 있습니다.

서로 다른 스레드에는 서로 다른 데이터가 필요할 수 있으며 다른 스레드가 소유한 데이터에 액세스하거나 재정의할 수 없어야 하는데, 스레드 로컬은 가능합니다.

이로인한 스레드로컬의 문제는 다음과 같습니다.

  • 첫째, 모든 스레드-로컬 변수는 변경 가능하며, 어떤 코드에서든 언제든지 setter 메소드를 호출할 수 있습니다. 따라서, 데이터는 컴포넌트 사이에서 어떤 방향으로든 흐를 수 있어, 어떤 컴포넌트가 공유 상태를 업데이트하는지와 그 순서를 이해하기 어렵게 만듭니다.
  • 둘째, set 메소드를 사용하여 스레드의 인스턴스를 작성할 때, 데이터는 스레드의 전체 수명 동안 혹은 스레드가 remove 메소드를 호출할 때까지 유지됩니다. 사용을 다 하고, remove 메소드를 호출하는 것을 잊어버리면, 데이터는 필요 이상으로 메모리에 유지됩니다.
  • 마지막으로, 부모 스레드의 스레드로컬 변수는 자식 스레드에 의해 상속될 수 있습니다. 부모 스레드로컬 변수를 상속하는 자식 스레드를 생성할 때, 새 스레드는 모든 부모 스레드로컬 변수에 대한 추가 저장 공간을 할당해야 합니다.

Scope Value를 사용하면 메서드 인수를 사용하지 않고도 변경할 수 없는 데이터를 안전하고 효율적으로 공유 할 수 있습니다

사용법 측면에서는 스레드 로컬과 비슷합니다.

scope value는 스레드당 하나씩 여러 형태로 사용합니다. 스레드로컬 변수와 유사하게, Scope Values은 스레드마다 하나씩 여러 incarnation을 사용합니다.

그리고 보통 public static 필드로 선언되어 많은 컴포넌트에서 쉽게 접근할 수 있습니다:

public final static ScopedValue<User> LOGGED_IN_USER = ScopedValue.newInstance();

반면에, 스코프 값은 한 번 작성되면 변경할 수 없습니다. 스코프 값은 스레드 실행의 제한된 기간 동안만 사용할 수 있습니다:

ScopedValue.where(LOGGED_IN_USER, user.get()).run(
  () -> service.getData()
);

where 메소드는 Scope Value값을 받아 바인딩될 객체를 필요로 합니다.

run 메소드를 호출할 때, 스코프 값은 바인딩되어 현재 스레드에 고유한 인카네이션을 생성한 다음, 람다 함수가 실행됩니다.

run 메소드의 수명 동안, 표현식에서 직접적으로나 간접적으로 호출된 어떤 메소드라도 스코프 값을 읽을 수 있게되는데, , run 메소드가 끝나면 바인딩은 해제됩니다.

즉 Scope Value의 제한된 수명과 불변성은 스레드 동작에 대한 추론을 단순화하는 데 도움을 줍니다.

결국 여러 컴포넌트에서 이리저리 데이터가 흐르는 것이 아닌, 데이터는 한 방향으로만 전달되어 관리 및 추적을 용이하게 해요.

scope value의 바인딩 문제

ScopedValue의 바인딩은 runWhere에 전달된 람다 표현식의 실행 컨텍스트 내에서만 유효하며, 이 컨텍스트를 벗어난 후에는 더 이상 유효하지 않습니다. 범위 밖에서 사용하면 예외가 발생하게 됩니다.

아래 예제를 봅시다

public class ScopedValues {

    private static final Logger log = LoggerFactory.getLogger(ScopedValues.class);
    private static final ScopedValue<String> SESSION_TOKEN = ScopedValue.newInstance();

    public static void main(String[] args) {

        log.info("isBound={}", SESSION_TOKEN.isBound());
        log.info("value={}", SESSION_TOKEN.orElse("default value"));

        Thread.ofVirtual().name("1").start( () -> processIncomingRequest());
        // Thread.ofVirtual().name("2").start( () -> processIncomingRequest());

        CommonUtils.sleep(Duration.ofSeconds(1));
    }

    private static void processIncomingRequest(){
        var token = authenticate();

        ScopedValue.runWhere(SESSION_TOKEN, token, () -> controller());

        System.out.println("good : " + SESSION_TOKEN.get()); // 예외 발생
         //controller(); // 여기서도 예외 발생 
    }

    private static String authenticate(){
        var token = UUID.randomUUID().toString();
        log.info("token={}", token);
        return token;
    }

    // @Principal
    private static void controller(){
        log.info("controller: {}", SESSION_TOKEN.get());
        service();
    }

    private static void service(){
        log.info("service: {}", SESSION_TOKEN.get());
        ScopedValue.runWhere(SESSION_TOKEN, "new-token-" + Thread.currentThread().getName(), () -> callExternalService());
        System.out.println("service end");
    }

    // This is a client to call external service
    private static void callExternalService(){
        log.info("preparing HTTP request with token: {}", SESSION_TOKEN.get());
    }

}

ScopedValue는 특정한 스레드에서만 잠시 동안 사용할 수 있는 변수입니다. 이 변수는 설정한 스레드 내에서만 값을 가지고 있고, 그 스레드의 작업이 끝나면 그 값은 사라지게 됩니다.

  • 여기서 ScopedValue.runWhere라는 메서드를 사용하면, 그 메서드 안에서만 SESSION_TOKEN 변수에 특정한 값을 "임시로" 할당할 수 있습니다
  • ScopedValue.runWhere 메서드를 사용하여 SESSION_TOKEN의 범위를 지정된 람다 표현식(() -> controller()) 실행 동안에만 바인딩합니다
ScopedValue.runWhere(SESSION_TOKEN, token, () -> controller());

이 바인딩은 runWhere 메서드에 의해 생성된 람다 표현식의 실행 컨텍스트 내에서만 유효합니다.

즉, controllerservice 메서드 내에서 SESSION_TOKEN에 접근할 수 있으나, runWhere 메서드 호출이 완료되고 나면 해당 바인딩은 해제됩니다.

runWhere 메서드 내부에서 SESSION_TOKEN에 값을 할당했지만, 그 메서드가 끝나는 순간 그 할당한 값은 사라지기 때문에, 메서드 밖에서 그 값을 호출하려고 하면 값을 찾을 수 없게 되는 거죠.

유효한 범위 내에서 사용하지 않으면 noSuchElementException이 발생하게 됩니다.

이런 문제를 두고, 이 값을 상속시키기 위해 구조화된 동시성(StructuredConcorrency)라는 개념이 나오게 됩니다.

Inheriting Scoped Value (스코프 벨류 상속 )

범위 지정된 값은 StructuredTaskScope를 사용하여 생성된 모든 자식 스레드에 자동으로 상속됩니다. 자식 스레드는 부모 스레드에서 설정된 범위 지정된 값에 대한 바인딩을 사용할 수 있습니다:

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Future<Optional<Data>> internalData = scope.fork(
      () -> internalService.getData(request)
    );
    Future<String> externalData = scope.fork(externalService::getData);
    try {
        scope.join();
        scope.throwIfFailed();

        Optional<Data> data = internalData.resultNow();
        // 응답에서 데이터를 반환하고 적절한 HTTP 상태를 설정
    } catch (InterruptedException | ExecutionException | IOException e) {
        response.setStatus(500);
    }
}

이 경우, fork 메소드를 통해 생성된 자식 스레드에서 실행 중인 서비스에서도 범위 지정된 값을 여전히 접근할 수 있습니다. 하지만, 스레드로컬 변수와 달리 부모 스레드에서 자식 스레드로 범위 지정된 값이 복사되지는 않습니다.

3.3. 범위 지정된 값의 재바인딩

범위 지정된 값은 변경 불가능하기 때문에 저장된 값을 변경하기 위한 set 메소드를 지원하지 않습니다. 하지만, 제한된 코드 섹션의 호출에 대해 범위 지정된 값을 재바인딩할 수 있습니다.

예를 들어, run에서 호출된 메소드로부터 범위 지정된 값을 숨기기 위해 where 메소드를 사용하여 null로 설정할 수 있습니다:

ScopedValue.where(Server.LOGGED_IN_USER, null).run(service::extractData);

하지만, 해당 코드가 종료되는 즉시 원래 값이 다시 사용 가능해집니다. run 메소드의 반환 타입이 void인 이유를 잘 봐야합니다. 만약 값을 반환해야하는 경우, 반환된 값들을 처리할 수 있도록 call 메소드를 사용할 수 있습니다.

구조화된 동시성(StructuredConcorrency)

구조화된 동시성은 동시성 프로그래밍에서의 한 패턴으로, 코드의 복잡성을 줄이고, 버그를 쉽게 찾을 수 있도록 돕는 방식입니다. 동시에 실행되는 작업들을 더 잘 관리하고, 코드의 흐름을 이해하기 쉽게 만드는 데 중점을 둡니다. 주요 목표 중 하나는 프로그램에서 생성된 모든 병렬 작업이 명확하게 구조화되고, 제어될 수 있도록 하는 것입니다.

구조화된 동시성의 핵심 원칙:

  1. 범위 지정: 구조화된 동시성에서는 모든 작업이 명시적인 scope 안에서 생성되어야 합니다. 이는 작업이 시작되고 종료되는 생명주기가 명확히 정의되어 있음을 의미합니다.
  2. 자원 관리: 구조화된 동시성을 사용하면 생성된 자원(예: 스레드, 핸들 등)이 적절히 관리되고 해제됩니다. 메모리 누수나 리소스 고갈같은 문제를 방지하는 데 도움이 됩니다.
  3. 오류 처리: 오류를 효과적으로 캐치하고 처리할 수 있게합니다. 구조화된 동시성에서는 범위 내에서 발생한 오류를 범위를 관리하는 상위 코드 블록으로 전파하여 적절히 처리할 수 있습니다.
  4. 가독성과 유지보수성 향상: 코드의 동시적 부분이 명확하게 구조화되어 있으면, 프로그램의 흐름을 더 쉽게 이해할 수 있습니다. 즉 가독성을 향상시키고, 버그를 더 쉽게 찾아내고 수정할 수 있게 합니다.

테스크가 여러 작은 작업으로 나뉠때, 스레드마다 서로 다른 결과가 나올 수 있습니다.

아래 케이스에 따라 동시성을 성공과 실패로 나누어 예외를 관리할 수 있게됩니다.

시나리오 설명
성공/실패 서브태스크들이 다른 스레드에서 실행됩니다. 각각의 성공 또는 실패 상태가 될 수 있습니다. (Executor Service 사용과 유사)
모두 성공 모든 서브태스크가 성공해야 합니다. 어느 하나라도 실패하면, 다른 실행 중인 서브태스크들을 취소합니다.
첫번째만 성공 첫 번째로 성공한 응답을 얻고 나머지는 취소합니다.

구조적 동시성이 이러한 다양한 실행 시나리오를 보다 명확하게 관리하고, 효과적으로 실행하기 위한 메커니즘을 제공하기 때문입니다.

구조적 동시성이 없는 경우, 동시성 프로그래밍에서 다음과 같은 여러 문제가 발생할 수 있습니다:

  1. 자원 관리의 어려움: 개별 스레드나 작업을 수동으로 관리해야 하기 때문에, 사용한 자원을 적절히 해제하지 않으면 메모리 누수나 자원 고갈과 같은 문제가 발생할 수 있습니다. 구조적 동시성은 자동으로 자원을 관리하고 해제하여 이러한 문제를 예방합니다.
  2. 오류 처리 복잡성: 복수의 스레드나 태스크에서 발생하는 오류를 효율적으로 관리하고 처리하는 것이 어렵습니다. 오류가 발생했을 때, 모든 관련 태스크를 적절히 취소하거나 오류를 상위로 전파하는 로직을 수동으로 구현해야 합니다. 구조적 동시성은 이를 단순화하여 오류 처리를 더 용이하게 만듭니다.
  3. 코드 복잡성 증가: 개별 스레드의 생명주기를 수동으로 관리하면 코드가 복잡해지고, 이해하기 어려워집니다. 이로 인해 버그가 발생하기 쉬워지고, 유지보수가 어려워집니다. 구조적 동시성은 코드의 구조를 명확하게 하여 이러한 문제를 줄입니다.
  4. 동기화 문제: 여러 스레드가 공유 자원에 접근할 때 동기화를 적절히 관리하지 못하면, 데이터 무결성 문제나 경쟁 상태(race condition)가 발생할 수 있습니다. 구조적 동시성을 사용하면, 이러한 동기화 문제를 더 쉽게 관리할 수 있는 패턴을 제공합니다.
  5. 작업 취소 및 종료의 어려움: 복수의 스레드나 태스크가 실행 중일 때, 특정 조건에서 모든 작업을 취소하거나 안전하게 종료시키는 것이 어려울 수 있습니다. 구조적 동시성은 작업의 범위를 명확하게 정의하고, 범위 내의 모든 작업을 쉽게 제어할 수 있는 메커니즘을 제공합니다.

예시코드

  • 구조적 동시성 작업 범위 객체를 정의하고 외부 api 호출을 2개의 가상스레드로 합니다.
/*
    구조화된 태스크 스코프를 사용한 스코프드 값 상속
 */
public class StructuredTaskScopeWithValue {

    private static final Logger log = LoggerFactory.getLogger(StructuredTaskScopeWithValue.class);
    private static final ScopedValue<String> SESSION_TOKEN = ScopedValue.newInstance(); // 세션 토큰을 위한 스코프드 값 생성

    public static void main(String[] args) {

        // 세션 토큰에 "token-123" 값을 할당하고 task 메서드를 실행
        ScopedValue.runWhere(SESSION_TOKEN, "token-123", StructuredTaskScopeWithValue::task);

    }

    private static void task() {
        try (var taskScope = new StructuredTaskScope<>()) { // 가상 스레드를 생성할 수 있는 스코프 생성.

            log.info("token: {}", SESSION_TOKEN.get()); // 현재 세션 토큰 값 로깅

            // 하위 작업 생성
            var subtask1 = taskScope.fork(StructuredTaskScopeWithValue::getDeltaAirfare); // 델타 항공 운임 조회 작업
            var subtask2 = taskScope.fork(StructuredTaskScopeWithValue::getFrontierAirfare); // 프론티어 항공 운임 조회 작업

            taskScope.join(); // 모든 하위 작업이 완료될 때까지 대기

            log.info("subtask1 state: {}", subtask1.state()); // 하위 작업의 상태 (UNAVAILABLE, SUCCESS, FAIL) 반환
            log.info("subtask2 state: {}", subtask2.state());

            log.info("subtask1 result: {}", subtask1.get()); // 하위 작업의 결과 출력
            log.info("subtask2 result: {}", subtask2.get());

        } catch (Exception e) {
            throw new RuntimeException(e); // 예외 발생 시 RuntimeException을 던짐
        }
    }

    private static String getDeltaAirfare() {
        var random = ThreadLocalRandom.current()
                .nextInt(100, 1000); // 100에서 1000 사이의 임의의 값 생성
        log.info("delta: {}", random); // 생성된 무작위 값 로깅
        log.info("token: {}", SESSION_TOKEN.get()); // 현재 세션 토큰 값 로깅
        CommonUtils.sleep("delta", Duration.ofSeconds(1)); // 1초간 대기
        return "Delta-$" + random; // 델타 항공 운임 반환
    }

    private static String getFrontierAirfare() {
        var random = ThreadLocalRandom.current()
                .nextInt(100, 1000); // 100에서 1000 사이의 임의의 값 생성
        log.info("frontier: {}", random); // 생성된 무작위 값 로깅
        log.info("token: {}", SESSION_TOKEN.get()); // 현재 세션 토큰 값 로깅
        CommonUtils.sleep("frontier", Duration.ofSeconds(2)); // 2초간 대기
        failingTask(); // 예외를 발생시키는 작업 실행
        return "Frontier-$" + random; // 프론티어 항공 운임 반환 (이 코드는 실행되지 않음)
    }

    private static String failingTask() {
        throw new RuntimeException("oops"); // RuntimeException 발생
    }

}
  • UNAVIABLE: 아직 시작되지 않았거나 시작상태가 결정되지 않은상태
  • SUCCESS: 테스크 성공,
  • FAIL : 실패. 호출시 예외 발생 가능.

결과

22:25:48.904 [main] INFO com.ys.ScopeValueWithStructedScope -- token: token-123
22:25:48.910 [] INFO com.ys.ScopeValueWithStructedScope -- delta: 949
22:25:48.910 [] INFO com.ys.ScopeValueWithStructedScope -- frontier: 853
22:25:48.910 [] INFO com.ys.ScopeValueWithStructedScope -- detal token: token-123
22:25:48.910 [] INFO com.ys.ScopeValueWithStructedScope -- frontier token: token-123
22:25:50.918 [main] INFO com.ys.ScopeValueWithStructedScope -- subtask1 state: SUCCESS
22:25:50.919 [main] INFO com.ys.ScopeValueWithStructedScope -- subtask1 result: Delta-$949
Exception in thread "main" java.lang.RuntimeException: java.lang.IllegalStateException: Subtask not completed or did not complete successfully
  • Frontier 호출시 일부러 예외를 발생시켰습니다.
  • 그렇게되면, subtask2의 state는 FAIL이 나오며, 하위작업의 결과를 get하는 과정에서 예외가 상위 스콮으로 번져 전체 테스크는 실패하게 됩니다.
  • 그러나 subTask1에는 영향을 미치지 않았습니다. SUCCESS
  • 결과를 보면, 서브테스크로는 ScopeValue가 상속되는것을 볼 수 있습니다. 즉 StructuredTaskScope 내에서는 같은 변수를 공유하는 것이지요.
  • StructuredTaskScope를 사용하면, 개발자는 여러 작업을 동시에 실행하고, 그들이 모두 완료될 때까지 기다릴 수 있는 명확한 구조를 갖게 됩니다.

만약 한 처리가 실패시, 다른 나머지 처리도 실패하고 싶다면?

taskScope.throwIfFailed 메소드는 태스크 스코프 내에서 실패한 태스크가 있는 경우, 사용자 정의 예외(RuntimeException("something went wrong"))를 발생시키는 방법입니다.

try (var taskScope = new StructuredTaskScope.ShutdownOnFailure()) {
    var subtask1 = taskScope.fork(CancelOnFailure::getDeltaAirfare);
    var subtask2 = taskScope.fork(CancelOnFailure::failingTask);

  taskScope.join();
    taskScope.throwIfFailed(ex -> new RuntimeException("something went wrong", ex));

  log.info("subtask1 state: {}", subtask1.state());
    log.info("subtask2 state: {}", subtask2.state());
} catch (Exception e) {
    throw new RuntimeException(e);
}

// 결과
-- delta: 661
-- delta is cancelled
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: something went wrong

이 구조는 한 태스크가 실패하면 모든 태스크가 취소되고 종료되는 방식으로 작동합니다.

만약 한 처리가 실패하더라도 예외가 상위 스콮으로 던지지 않고, 나머지 테스크를 종료 시키고 성공적으로 끝내게 하려면?

StructuredTaskScope.ShutdownOnSuccess 사용: 이 클래스는 여러 태스크를 동시에 실행할 때, 첫 번째 성공적으로 완료된 태스크가 나타나면 나머지 태스크를 자동으로 셧다운하는 기능을 제공합니다. 이는 여러 대안적인 실행 경로가 있고, 그 중 하나만 성공하면 충분한 경우에 유용합니다.

try (var taskScope = new StructuredTaskScope.ShutdownOnSuccess<>()) {
    var subtask1 = taskScope.fork(FirstSuccess::failingTask);
    var subtask2 = taskScope.fork(FirstSuccess::getFrontierAirfare);
    taskScope.join();
    log.info("subtask1 state: {}", subtask1.state());
    log.info("subtask2 state: {}", subtask2.state());
    log.info("subtask result: {}", taskScope.result(ex -> new RuntimeException("all failed", ex)));
} catch (Exception e) {
    throw new RuntimeException(e);
}

// 결과
-- frontier: 753
-- subtask1 state: FAILED
-- subtask2 state: SUCCESS
-- subtask result: Frontier-$753

두 개의 비동기 태스크를 실행하고, 첫 번째로 성공적으로 완료되는 태스크가 있을 때 나머지 태스크를 셧다운(종료)하는 패턴입니다.

만약 모든 태스크가 실패하면, 사용자 정의 예외를 던집니다

다음으로는, SpringBoot에서는 어떻게 사용하는지 알아보겠습니다.

참조

관련 포스팅