Java/Java

Java 가상 스레드(Virtual Thread) : SpringBoot에서 사용하기 -3

ysk(0soo) 2024. 3. 29. 23:49

SpringBoot With Virtual Thread

springboot 3.2 + 자바 21 버전부터 virtual thread를 사용할 수 있습니다.

property 설정으로 AutoCofngiruation을 통해 활성화 할 수 있습니다.

# virtual thread enabled/disabled
spring.threads.virtual.enabled=true

위 조건을 활성화하게되면, SpringBoot Webserver AutoConfiguration에서 기본 스레드풀 대신 가상 스레드 풀을 이용한 톰캣, 제티 등이 활성화가 됩니다.

@AutoConfiguration
@ConditionalOnNotWarDeployment
@ConditionalOnWebApplication
@EnableConfigurationProperties(ServerProperties.class)
public class EmbeddedWebServerFactoryCustomizerAutoConfiguration {

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass({ Tomcat.class, UpgradeProtocol.class })
    public static class TomcatWebServerFactoryCustomizerConfiguration {

        @Bean
        public TomcatWebServerFactoryCustomizer tomcatWebServerFactoryCustomizer(Environment environment,
            ServerProperties serverProperties) {
            return new TomcatWebServerFactoryCustomizer(environment, serverProperties);
        }

        @Bean
        @ConditionalOnThreading(Threading.VIRTUAL)
        TomcatVirtualThreadsWebServerFactoryCustomizer tomcatVirtualThreadsProtocolHandlerCustomizer() {
            return new TomcatVirtualThreadsWebServerFactoryCustomizer();
        }

    }

}
  • Redis, Kafka등 많은 AutoConfigurationClass도 활성화 여부에 따라 가상 스레드를 풀로 사용해요.
  • LettuceConnectionConfiguration, KafkaAnnotationDrivenConfiguration

스레드 모델 조건에 따라 빈을 다르게 생성하기 위한 Condition 어노테이션과 구현체가 추가되어서, 해당 Condition으로 확인하고 스레드풀을 다르게 등록합니다.

/**
 * {@link Conditional @Conditional} that matches when the specified threading is active.
 *
 * @author Moritz Halbritter
 * @since 3.2.0
 */
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Conditional(OnThreadingCondition.class)
public @interface ConditionalOnThreading {

    /**
     * The {@link Threading threading} that must be active.
     * @return the expected threading
     */
    Threading value();

}

그리하여 다음처럼 Virutal Thread Executor 전용 Bean을 선언할 수 있게되었습니다.

@Configuration
public class ExecutorServiceConfig {

    @Bean
    @ConditionalOnThreading(Threading.VIRTUAL)
    public ExecutorService virtualThreadExecutor(){
        return Executors.newVirtualThreadPerTaskExecutor();
    }

    @Bean
    @ConditionalOnThreading(Threading.PLATFORM)
    public ExecutorService platformThreadExecutor(){
        return Executors.newCachedThreadPool();
    }

}

만약 가상 스레드 이름을 지정하고 싶다면?

 @Bean
 @ConditionalOnThreading(Threading.VIRTUAL)
 public ExecutorService virtualThreadExecutor() {
     ThreadFactory factory = Thread.ofVirtual().name("my-virtual").factory();

     return Executors.newThreadPerTaskExecutor(factory);
 }

가상 스레드 예외 핸들링

가상 스레드를 이용해서 실행한 코드에서 발생한 예외가 전파되지 않고 핸들링 하고 싶은 경우 다음처럼 이용할 수 있습니다.

1. 실행할 코드에서 핸들링하기

Thread.ofVirtual().start(() -> {
    try {
        예외가 발생할 수 있는 로직 
    } catch (Exception e) {
        // 예외 처리 로직
    }
});

2. UncaughtExceptionHandler 이용하기

UncaughtExceptionHandler 객체 인자를 받는 메소드를 제공해요.

해당 객체는 아래 인터페이스에요

/**
 * {@code Thread}가 잡히지 않은 예외로 인해 갑자기 종료될 때 호출되는 핸들러를 정의합니다.
 * 스레드가 잡히지 않은 예외로 인해 종료될 때, 자바 가상 머신은 스레드에 대한 {@code UncaughtExceptionHandler}를
 * 를 사용하여 조회하고 핸들러의 {@code uncaughtException} 메소드를 호출하며, 스레드와 예외를
 * 인자로 전달합니다.
 * 스레드가 {@code UncaughtExceptionHandler}를
 * 명시적으로 설정하지 않은 경우, 해당 스레드의 {@code ThreadGroup} 객체가 그 역할을 합니다. 
 * {@code ThreadGroup} 객체가 예외를 다루는 특별한 요구사항이 없으면, getDefaultUncaughtExceptionHandler로 호출을 전달할 수 있습니다.
 */

@FunctionalInterface
public interface UncaughtExceptionHandler {
  /**
  * 주어진 스레드가 주어진 잡히지 않은 예외로 인해 종료될 때 호출되는 메소드입니다.
  * @param t 스레드
  * @param e 예외
  */
  void uncaughtException(Thread t, Throwable e);  
}

아래처럼 핸들링하면 됩니다.

Thread.ofVirtual().unstarted(() -> System.out.println("Virtual thread"))
        .setUncaughtExceptionHandler((t, e) -> System.err.println("Uncaught exception in thread " + t.getName() + ": " + e.getMessage()));

// or

Thread virtualThread = Thread.ofVirtual().unstarted(() -> {
    // 예외가 발생할 수 있는 경우 
});

virtualThread.setUncaughtExceptionHandler((t, e) -> {
    // 예외 처리 로직
});

virtualThread.start();

3. CustomThreadFactory 이용하기

ExecutorService에 가상 스레드를 생성하면서 각 스레드에 대해 공통적인 UncaughtExceptionHandler를 사용하므로 일관되게 구현할 수 있어요.

public class VirtualThreadWithExceptionHandler {

    public static void main(String[] args) {
        // 커스텀 ThreadFactory 구현
        ThreadFactory customThreadFactory = task -> {
            Thread thread = Thread.ofVirtual().start(task); // 가상 스레드 생성
            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                @Override
                public void uncaughtException(Thread t, Throwable e) {
                    System.out.println("Uncaught exception in thread: " + t.getName() + ", error: " + e.getMessage());
                }
            });
            return thread;
        };

        // 커스텀 ThreadFactory를 사용하여 ExecutorService 생성
        ExecutorService executor = Executors.newThreadPerTaskExecutor(customThreadFactory);

        // 예외를 발생시키는 작업 제출
        executor.submit(() -> {
            System.out.println("This will throw a runtime exception");
            throw new RuntimeException("Example exception");
        });

        // ExecutorService 종료
        executor.shutdown();
    }
}

@Async 어노테이션을 이용한 비동기 작업에 가상 스레드 사용하기 - AsyncConfig

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {

    @Override
    @Bean(name = "virtualThreadExecutor")
    public Executor getAsyncExecutor() {
        ThreadFactory factory = Thread.ofVirtual().name("virtual-thread", 1)
            .uncaughtExceptionHandler(
                (t, e) -> System.err.println("Uncaught exception in thread " + t.getName() + ": " + e.getMessage()))
            .factory(); // 1은 시작 넘버

        return Executors.newThreadPerTaskExecutor(factory);
    }

    @Bean
    public AsyncTaskExecutor applicationTaskExecutor() {
        return new TaskExecutorAdapter(getAsyncExecutor());
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new CustomAsyncExceptionHandler();
    }

    public static class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

        @Override
        public void handleUncaughtException(Throwable throwable, Method method, Object... params) {
            System.err.println("Exception Name - " + throwable.getClass().getName());
            System.err.println("Exception message - " + throwable.getMessage());
            System.err.println("Method name - " + method.getName());

            for (Object param : params) {
                System.err.println("Parameter value - " + param);
            }

            try {
                throw (Exception) throwable;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            //  추가적인 예외 처리 로직을 구현할 수 있습니다. 예를 들어, 애플리케이션 이벤트를 발행하거나, 알림을 전송할 수 있습니다.
            // eventPublisher.publishEvent(new AsyncErrorEvent(throwable));
        }
    }
}

RestClient에 가상 스레드 팩토리 지정하기

spring 3.2에 나온 RestClient에도 다음처럼 가상 스레드를 지정하여 사용 가능합니다.

@Value("${spring.threads.virtual.enabled}")
private boolean isVirtualThreadEnabled;

private RestClient buildRestClient(String baseUrl) {
    log.info("base url: {}", baseUrl);
    var builder = RestClient.builder()
        .baseUrl(baseUrl);

    if (isVirtualThreadEnabled) {
        builder = builder.requestFactory(new JdkClientHttpRequestFactory(
            HttpClient.newBuilder()
                .executor(Executors.newVirtualThreadPerTaskExecutor())
                .build()
        ));
    }
    return builder.build();
}

여러 외부 api를 동시에 호출하기

@Service
@RequiredArgsConstructor
public class HealthCheckService {

    private static final Logger log = LoggerFactory.getLogger(HealthCheckService.class);
    private final PatientRecordServiceClient patientRecordServiceClient;
    private final AppointmentServiceClient appointmentServiceClient;
    private final MedicationServiceClient medicationServiceClient;

    @Qualifier("virtualThreadExecutor")
    private final ExecutorService executor;

    public HealthCheckReport getHealthCheckReport(String patientId){
        var patientRecords = this.executor.submit(() -> this.patientRecordServiceClient.getPatientRecords(patientId));
        var appointments = this.executor.submit(() -> this.appointmentServiceClient.getAppointments(patientId));
        var medications = this.executor.submit(() -> this.medicationServiceClient.getMedications(patientId));

        return new HealthCheckReport(
                patientId,
                getOrElse(patientRecords, Collections.emptyList()),
                getOrElse(appointments, Collections.emptyList()),
                getOrElse(medications, Collections.emptyList())
        );
    }

    private <T> T getOrElse(Future<T> future, T defaultValue){
        try {
            return future.get();
        } catch (Exception e) {
            log.error("error", e);
        }
        return defaultValue;
    }

}

건강 관리 시스템에서 여러 외부 api를 호출하는 예제에요.
가상스레드는 기존 플랫폼스레드보다 가볍기때문에, 이렇게 동시 여러 I/O작업을 하는데 용이합니다.

getOrElse라는 서포트 메소드를 이용해서 예외를 핸들링 할 수도 있고, ThreadFactory를 이용해서 공통된 ExceptionHandler를 적용할수도 있어요.

결론

자바에서 가상 스레드를 도입함으로써,멀티 스레드 프로그래밍에 더 유연해지고 처리량이 높은 애플리케이션을 구현할 수 있게 되었습니다.

그렇다고 가상 스레드가 기존 플랫폼스레드보다 처리량이 무조건 올라간다, 리액티브는 죽었다 같은 같은 무조건적인 오해는 하지 않는것이 좋다고 생각합니다.

  • CPU Bound 작업에서는 동일하다고 볼 수 있습니다.
    주의할점을 지켜가면서 개발한다면, 처리량이 높은 애플리케이션을 개발하는것에 도움이 된다고 생각합니다.
  • syncronized, 가상스레드풀링, 스레드로컬 마구 사용 등등

참조

관련 포스팅