본문 바로가기
스프링부트

@Async 를 통해 비동기 호출을 해보자

by pius712 2024. 11. 10.

스프링에서는 Async 어노테이션을 통해 간편하게 메서드 호출을 비동기 방식으로 만들 수 있다.

즉, @Async 어노테이션을 달면, 해당 메서드가 별도의 스레드에서 호출된다.

@EnableAsync

우선, 스프링의 비동기 메서드 실행을 활성화하기 위해서는 @EnableAsync 어노테이션을 추가해줘야한다.

스프링 부트의 AutoConfigure 에 의해서 비동기와 관련된 빈들을 등록해주어야하기 때문이다.

해당 Configuration 을 작성하지 않는 경우, 비동기 프로세스로 실행되지 않는다.

@Configuration
@EnableAsync
class AsyncConfig : AsyncConfigurer

우선 아무런 추가 설정을 추가하지 않고, 아래의 두 코드를 추가해보자.

@Service
class AsyncDemoService(
    private val asyncDemo: AsyncDemo
) {
    private val log = LoggerFactory.getLogger(javaClass)
    fun demo() {
        log.info("Demo service called")
        asyncDemo.demoMethod()
    }
}

@Component
class AsyncDemo {
    private val log = LoggerFactory.getLogger(javaClass)
    @Async
    fun demoMethod() {
        log.info("Demo method called")
    }
}

잘 동작하는지 테스트 코드를 작성해보았다.

@SpringBootTest
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
class AsyncDemoServiceTest(
    private val asyncDemoService: AsyncDemoService
) {

    private val log = LoggerFactory.getLogger(javaClass)
    @Test
    fun demo() {
        asyncDemoService.demo()
        log.info("test")
    }
}

작성된 테스트 코드의 로그를 간략히 호출해보니, 아래와 같다. 비동기적으로 실행된 것을 알 수 있다.

[    Test worker]: Demo service called Test worker
[    Test worker]: test
[         task-1]: Demo method called task-1

Executor 설정

기본적으로 스프링은 비동기 호출을 SimpleAsyncTaskExecutor 를 통해서 실행한다.

이 경우, 스레드를 재사용하지 않고 매 호출시 스레드를 생성하기 때문에 스레드 생성과 관련된 비용 (성능과 자원) 문제가 발생할 수 있다.

비동기 호출에 스레드 풀 사용하기

앞서 작성한 AsyncConfig 을 통해 Executor 와 ExceptionHandler 설정을 할 수 있다.

이 설정을 통해서 비동기 호출에 스레드 풀을 사용할 수 있다.

Executor 의 구현체는 다양한데, 일반적으로 비동기처리에는 ThreadPoolTaskExecutor 구현체를 자주 사용하는 것으로 보인다.

@Configuration
@EnableAsync
class AsyncConfig : AsyncConfigurer{
    override fun getAsyncExecutor(): Executor {
        val executor = ThreadPoolTaskExecutor()
        executor.corePoolSize = 10
        executor.maxPoolSize = 10
        executor.queueCapacity = 10_000
        executor.setWaitForTasksToCompleteOnShutdown(true)
        executor.setAwaitTerminationSeconds(10)
        executor.initialize()
        return executor
    }
}

corePoolSize, maxPoolSize, queueCapacity 와 같은 설정의 경우, Executor 프레임워크에 대해 배경지식이 필요하다.

정확하지는 않지만 쉽게 설명하자면, 아래와 같다.

  • corePoolSize: 스레드 풀에 유지되는 최소한의 스레드 개수이다.
  • maxPoolSize: 만약 스레드 풀에서 추가로 스레드가 필요한 경우, 추가 생성하여 만들 수 있는 전체 스레드 수를 말한다.
  • queueCapacity: 스레드 풀에 처리되기를 대기하는 큐이다.

Async 와 MDC 연결하기

Async 처리와 Context

Async 를 통해 비동기 처리를 하는 경우, MDC 와 같은 ThreadLocal 에 값을 저장하는 Context 값들을 비동기 스레드에서 가져올 수 없다. 그 이유는 Async 를 통해 처리되는 메서드 호출은 별개의 스레드에서 처리되기 때문이다.

TaskDecorator 를 통해 MDC 연결

TaskDecorator 는 실행될 Runnable 에 데코레이터를 추가하여, 실제 태스크 실행 동작의 앞뒤에 추가적인 동작을 할 수 있게 해준다.

모니터링, 로깅, 컨텍스트 추가 등의 작업에 사용할 수 있다.

class AsyncTaskDecorator : TaskDecorator {
    override fun decorate(runnable: Runnable): Runnable {
        val map = MDC.getCopyOfContextMap()
        return Runnable {
            try {
                if (map != null) {
                    MDC.setContextMap(map)
                }
                runnable.run()
            } finally {
                MDC.clear()
            }
        }
    }
}
  • 실행 스레드: caller 의 스레드
  • runnable 인자: async annotation 이 붙은 메서드
  • runnable 반환 값: 데코 실제 executor 에서 실행되는 runnable

참고로, 아래 코드에서 runnable.run() 호출은 스레드를 통해 실행한 것이 아니라, 직접 메서드를 호출하기 때문에, 별도의 스레드를 생성하지 않는다. 따라서, 반환되는 Runnable 을 실행하는 스레드에서 호출된다.

return Runnable {
    try {
        if (map != null) {
            MDC.setContextMap(map)
        }
        runnable.run()
    } finally {
        MDC.clear()
    }
}

그리고 MDC 를 clear 하지 않는 경우, 스레드 풀에 의해 스레드가 재활용되기 때문에 주의해야한다.

의도하는 경우가 아니라면, 아래처럼 호출 이후에 MDC 를 지워주자.

finally {
    MDC.clear()
}

실제 task 가 실행되는 executor 에 task 를 제출하기 전에, 해당 로직이 호출된다.

ThreadPoolTaskExecutor 실행 코드는 아래처럼 구성되어있다.

// ThreadPoolTaskExecutor 의 실행 코드
@Override
public void execute(Runnable command) {
	Runnable decorated = command;
	if (taskDecorator != null) {
	// 여기서 AsyncTaskDecorator 가 호출된다.
		decorated = taskDecorator.decorate(command);
		if (decorated != command) {
			decoratedTaskMap.put(decorated, command);
		}
	}
	super.execute(decorated);
}

위 데코레이터는 아래의 코드를 통해 설정할 수 있다.

@Configuration
@EnableAsync
class AsyncConfig : AsyncConfigurer{
    override fun getAsyncExecutor(): Executor {

		// 생략
		executor.setTaskDecorator(AsyncTaskDecorator())
		// 생략
	}
}

예외 처리하기

기본적으로 스레드에서 발생한 예외는 전파되지 않는다.

예외 발생시 일반적으로 Advice를 통해 Exception 을 처리하지만, 비동기 메서드에서 예외가 발생하면 별도의 스레드에서 동작하여 Advice 에 예외가 포착되지 않는다.

비동기 처리에 대한 예외를 다루기 위해 스프링은 AsyncUncaughtExceptionHandler 를 추가할 수 있도록 해준다.

위 MDC 추가를 통해 에러 핸들러에도 MDC 값이 저장되어있기 때문에, 로그를 통해서 예외 상황에 대해서 적절한 문맥 정보를 획득할 수 있게 된다.

// config 추가 
@Configuration
@EnableAsync
class AsyncConfig : AsyncConfigurer{
	override fun getAsyncUncaughtExceptionHandler(): AsyncUncaughtExceptionHandler? {
        return AsyncExceptionHandler()
    }
}

// 에러 핸들러 추가
// 상황에 맞게 개발 필요
class AsyncExceptionHandler: AsyncUncaughtExceptionHandler {
    private val logger = LoggerFactory.getLogger(javaClass)
    override fun handleUncaughtException(ex: Throwable, method: Method, vararg params: Any?) {
        if(ex is ApiException) {
            when(ex.level) {
                Level.ERROR -> logger.error("Error occurred in async method", ex)
                Level.WARN -> logger.warn("Error occurred in async method", ex)
                Level.INFO -> logger.info("Error occurred in async method", ex)
                Level.DEBUG -> logger.debug("Error occurred in async method", ex)
                Level.TRACE -> logger.trace("Error occurred in async method", ex)
            }
            return
        }
        logger.error("Unexpected error occurred in async method", ex)
    }
}

주의사항

@Async 어노테이션 또한 스프링의 AOP 기술을 통해 구현되는 것이기 때문에 주의해야할 부분이 있다.

IDE 에서 이런 부분을 알려주긴 하지만, 스프링 AOP 는 런타임에 적용되는 기술이기 때문에 컴파일 에러가 발생하지는 않기 때문에 주의해야한다.

  • public 메서드에 적용되어야한다.
  • 내부 호출(self invocation) 을 해서는 안된다.
  • void 혹은 future 를 반환해야한다