ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kotlin Coroutines for Backend
    Java 2022. 2. 21. 22:09
    반응형

    최근 코틀린으로 Spring Webflux 기반의 서비스를 만들며 suspend, withContext 등등 다양한 코틀린 코루틴들을 살펴보게 되었습니다. 코틀린의 기본적인 문법은 익숙해졌지만, 코드를 살펴봐도 명확히 이해되지 않는 부분들이 바로 코루틴과 관련된 부분들이었는데요.

     

    이번 글에서는 [1]을 기반으로, 코틀린의 코루틴이 무엇인지 알아보도록 하겠습니다:

     

    • 왜 코루틴을 사용할까?
    • Sequence builder
    • Suspension은 어떻게 동작하는가?
    • Coroutine의 내부구조

    왜 코루틴을 사용할까?

    JVM 상에서는 이미 잘 갖춰젼 RxJava 또는 Reactor와 같은 라이브러리가 있습니다. 더욱이, 자바는 멀티쓰레딩을 지원하고 일부는 오래된 callback 형태를 선호하기도 합니다. 비동기 연산을 위해서 다양한 옵션들이 이미 존재합니다. 

     

    하지만, 코루틴은 그것들 이상의 장점을 제공하는데요. 코루틴의 개념은 1963년에 정립되었으나, 프로덕션 레벨로 구현되기까지 오랜 시간이 걸렸습니다. 코틀린 코루틴은 다양한 코틀린 플랫폼(JVM, JS, iOS 등)에서 실행될 수 있으며, 코드를 많이 변형하지 않고도 쉽게 비동기로 수행될 수 있게 해줍니다.

     

    기존의 비동기 연산은 어떤가?

     먼저 callback 형태는 아래와 같습니다:

    fun showNews() {
        getConfigFromApi { config ->
            getNewsFromApi(config) { news ->
                getUserFromApi { user ->
                    view.showNews(user, news)
                }
            }
        }
    }

     

    • 뉴스와 유저 데이터 요청 부분이 병렬화 될 수 있으나 안되어 있고, 콜백 형태로는 어려움
    • cancellation 지원이 복잡하고 어려움
    • 많은 indentation이 가독성을 떨어뜨림

     

    그렇다면, 또 다른 대안인 RxJava나 다른 reactive stream은 어떨까요?

     

    RxJava로 구현하면 아래와 같습니다:

     

    fun showNews() {
        disposables += Observable.zip(
            getConfigFromApi().flatMap { getNewsFromApi(it) },
            getUserFromApi(),
            Function2 { news: List<News>, config: Config ->
                Pair(news, config)
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe { (news, config) ->
                view.showNews(news, config)
            }
    }

     

    위와 같은 형태는 콜백보다는 훨씬 개선된 모습입니다. 메모리 유출도 없고, cancellation도 지원되며, 쓰레드도 적절히 사용되고 있습니다. 한 가지 문제점은 복잡하다는 점입니다. 

     

    코틀린을 사용한다면?

    코틀린 코루틴이 도입한 핵심 기능은 특정 시점에 코루틴을 일시중지(suspend)하고, 미래의 시점에 코루틴을 다시 시작할 수 있는 점입니다. 그러한 기능 덕분에, 코드를 메인 쓰레드에서 실행하다가 API 요청을 하게 되는 경우 suspend할 수 있습니다. 코루틴이 suspend되면 쓰레드는 blocked되지 않으며 다른 곳에 사용될 수 있습니다. 이후 API 요청을 응답받아 데이터가 준비되면, 코루틴은 메인 쓰레드를 기다렸다가 쓰레드를 받게되면 suspend했던 지점 이후부터 재실행되게 됩니다. 

     

    fun showNews() {
        viewModelScope.launch {
            val config = async { getConfigFromApi() }
            val news = async { getNewsFromApi(config) }
            val user = async { getUserFromApi() }
            view.showNews(user.await(), news.await())
        }
    }

     

    위 코드는 간결하며, async/await를 통해 비동기/병렬적으로 수행되게 됩니다. 

     

    코틀린 코루틴을 통해, 위와 같이 간결하면서도 효율적인 코드를 쉽게 작성할 수 있습니다. 특히, 백엔드 코드를 작성할 때 이러한 간결성이 더욱 잘 적용될 수 있는데요. 기존의 쓰레드 기반의 코드를 코루틴으로 변경하기 위해서는 suspend를 추가하기만 하면 됩니다.

     

    코루틴의 위와 같은 특징 말고도, 쓰레드는 코루틴에 비해 비싼 자원입니다. 쓰레드는 생성되고, 유지되며, 각각 할당된 메모리도 가지게 되는데요 (대부분 쓰레드 스택의 디폴트 사이즈는 1 MB, 자바 최적화에 의해 정확히 일치하지는 않음). 많은 유저가 서버에 요청을 하고 데이터베이스나 다른 서비스 응답을 위해 쓰레드가 대기하고 있다면, 이는 큰 비용으로 발생하게 됩니다 (memory, processor). 

     

    아래의 코드를 실행해보면, 쓰레드와 코루틴의 비용 차이를 명확히 보여 줍니다:

     

    fun main() {
        repeat(100_000) {
            thread {
                 Thread.sleep(1000L)
                 print(".")
             }
        }
    }
    
    fun main() = runBlocking {
        repeat(100_000) {
            launch {
                delay(1000L)
                print(".")
            }
        }
    }

     

    Sequence builder

    Python, Rust 또는 JavaScript와 같은 다른 언어에서도 특정한 형태의 코루틴을 내부적으로 사용하는 것을 볼 수 있습니다:

     

    • async function
    • generator functions

    코틀린은 generator function 대신 sequence builder를 제공합니다. 그렇기에 코틀린의 sequence builder에도 내부적으로 코루틴이 사용되고 있습니다. 

     

    코틀린의 sequence는 콜렉션과 유사한 개념이나 lazy evaluated 됩니다.  즉, 다음번의 요소(element)는 항상 필요로 하는 시점에 온디맨드로 계산되게 됩니다. 그 결과 sequence는 아래와 같은 장점을 가집니다:

     

    • 필요한 최소한의 연산만 수행
    • inifinite할 수 있음
    • 더욱 메모리 효율적임

    아래의 코드와 같이 sequence를 iterate하며 필요한 시점에 추가적으로 yield하게 됩니다:

     

    val seq = sequence {
        println("Generating first")
        yield(1)
        println("Generating second")
        yield(2)
        println("Generating third")
        yield(3)
        println("Done")
    }
    
    fun main() {
        for (num in seq) {
            println("Next number is $num")
        }
    }
    
    // Generating first
    // Next number is 1
    // Generating second
    // Next number is 2
    // Generating third
    // Next number is 3
    // Done

     

    Suspendsion은 어떻게 동작하는가?

    Suspending 함수는 코틀린 코루틴의 중심입니다. Suspension 기능은 하나의 가장 본질적인 기능으로 다른 개념의 기반이 됩니다. 

     

    코루틴을 suspending하는 것은 실행 중 일시정지하는 것을 의미합니다. 비디오게임을 중지하는 것과 같이, 이후에 원하는 시점에 다시 중지한 시점부터 코루틴 실행을 이어갈 수 있습니다. 

     

    그렇기에 코루틴은 쓰레드와 매우 다릅니다. 코루틴이 suspend되면, 어떠한 자원도 소모하지 않습니다. 하나의 코루틴은 다른 쓰레드에서 재실행될 수 있고, 이론상으로 continuation은 직렬화, 비직렬화되고 재실행될 수 있습니다.

     

    Resume 예시

    실행해보기 위해서 우리는 코루틴 빌더(runBlocking 또는 launch)를 사용해 코루틴을 시작합니다. Suspend 함수는 하나의 코루틴을 suspend할 수 있는 함수인데요. 그렇기에 suspend 함수는 반드시 코루틴 상에서 호출되어야 함을 의미합니다. 최종적으로 suspend 함수는 suspend할 무언가가 필요하기 때문입니다. 아래에서 main 함수가 시작점으로, 실행 시 코틀린은 해당 함수를 하나의 코루틴 안에서 시작하게 됩니다:

     

    suspend fun main() {
        println("Before")
        
        suspendCoroutine<Unit> {}
        
        println("After")
    }
    // Before

     

    위 코드를 실행하면, After가 출력되지 않는 것을 볼 수 있습니다. 코드가 실행을 멈춘 것이 아니라, 코루틴이 "Before" 이후에 suspend 되어서 그렇습니다. 

     

    suspendCoroutine 호출 시, 람다표현식({})으로 끝나는 것을 알 수 있습니다. 아규먼트로 전달되는 함수는 suspension 직전에 호출되게 됩니다:

     

    suspend fun main() {
        println("Before")
        
        suspendCoroutine<Unit> { continuation ->
            println("Before too")
        }
    
        println("After")
    }
    // Before
    // Before too

     

    아규먼트로 전달되는 함수가 suspendsion 직전에 호출되기에, 이 함수는 continuation을 다른 곳에 저장하거나 재실행 여부를 계획하기 위해 사용됩니다. 그렇기에 continuation을 통해 아래와 같이 바로 재실행을 할 수 도 있습니다. 

     

    suspend fun main() {
        println("Before")
        
        suspendCoroutine<Unit> { continuation ->
            continuation.resume(Unit)
        }
        
        println("After")
    }
    // Before
    // After
    
    
    inline fun <T> Continuation<T>.resume(value: T): Unit =
        resumeWith(Result.success(value))
        
    inline fun <T> Continuation<T>.resumeWithException(
        exception: Throwable
    ): Unit = resumeWith(Result.failure(exception))

     

    suspendCoroutine은 결과값을 리턴하는 경우에도 사용될 수 있으며, 앞에서 언급한 바와 같이 데이터베이스 접근이나 API 응답 요청하는 경우에 사용될 수 있습니다:

     

    suspend fun main() {
        println("Before")
        
        val user = suspendCoroutine<User> { cont ->
            requestUser { user ->
                cont.resume(user)
            }
        }
        println(user)
        println("After")
    }
    // Before
    // (1 second delay)
    // User(name=Test)
    // After

     

    대부분의 라이브러리가 suspend 함수를 지원하기에 직접 콜백 형태의 suspendCoroutine을 작성할 필요는 드뭅니다. 그러나 필요하다면, suspendCoroutine 대신, suspendCancellableCoroutine을 사용해 아래와 같이 함수로 쪼개어 작성할 수 있습니다:

     

    suspend fun requestUser(): User {
        return suspendCancellableCoroutine<User> { cont ->
            requestUser { user ->
                cont.resume(user)
            }
        }
    }
    
    suspend fun main() {
        println("Before")
        val user = requestUser()
        println(user)
        println("After")
    }

     

     

    Coroutine의 내부구조

    실제로 코루틴 내부구조를 suspension, continuation과 연결하여 자세히 살펴보겠습니다. 

     

    • suspending 함수는 state machine과 같습니다. 함수의 시작 전 상태와, 각 suspend 함수 호출 마다의 상태를 가집니다.
    • 각 단계별 상태를 식별하는 숫자와 로컬 데이터가 continuation 객체에 저장됩니다.
    • 특정 함수의 continuation은 해당 함수가 호출하는 함수의 continuataion을 decorate합니다. 그 결과, 모든 continuation은 함수 재실행 시의 콜 스택을 나타내게 됩니다.

     

    Coninuation-passing style

    suspending 함수를 구현하는 방법에는 몇 가지가 있습니다. 코틀린 팀은 continuation-passing style이라는 방법을 사용하였는데요. 그 방법은 continuation을 함수에서 함수로 아규먼트 형태로 전달하는 방법입니다. 컨벤션으로 continuation은 마지막 파라미터에 위치합니다.

     

    suspend fun getUser(): User?
    suspend fun setUser(user: User)
    
    // under the hood
    fun getUser(continuation: Continuation<*>): Any?
    fun setUser(user: User, continuation, Continuation<*>): Any

     

    위에서 보듯이, 리턴값도 User? -> Any?로 변경되었는데요. Suspending 함수는 suspend 되기에 초기에 선언된 타입을 리턴하지 않습니다. 그렇게 일시정지된 경우에는 특별한 마커인 COROUTINE_SUSPENDED를 리턴합니다. 그렇기에 getUser는 User? 또는 COROUTINE_SUSPENDED를 리턴하며, 두 객체의 수퍼타입인 Any?가 사용된 것입니다.

     

     

    아래와 같이 간단한 suspend 함수를 살펴보겠습니다:

     

    suspend fun myFunction() {
        println("Before")
        delay(1000) // suspending
        println("After")
    }

     

    이제 실제 구조는 아래와 같다는 것을 이해하셨을텐데요:

    fun myFunction(continuation: Continuation<*>): Any

     

    각 함수는 상태를 저장하기 위해 고유의 continuation이 필요합니다. (실제 continuation은 object expression이라 이름이 없지만, 설명을 위해 있다고 하면) MyFunctionContinuation은 함수 시작점에서 아규먼트로 받은 continuation을 사용해 아래와 같이 생성됩니다:

     

    val continuation = continuation as? MyFunctionContinuation 
        ?: MyFunctionContinuation(continuation)

     

    이제 myFunction 내부를 살펴보면, 중간에 delay(1000)가 있기 때문에, 함수는 2곳에서 실행을 시작할 수 있습니다. 가장 첫 줄의 println("Before")와 delay(1000) 이후의 println("After")가 2 지점인데요. 그러한 (재)실행 시작 지점을 표시하기 위해 아래와 같이 continuation의 label을 활용할 수 있습니다:

     

    fun myFunction(continuation: Continuation<Unit>): Any {
        val continuation = continuation as? MyFunctionContinuation
            ?: MyFunctionContinuation(continuation)
            
        if (continuation.label == 0) {
            println("Before")
            continuation.label = 1
            if (delay(1000, continuation) == COROUTINE_SUSPENDED) {
                return COROUTINE_SUSPENDED
            }
        }
        if (continuation.label == 1) {
            println("After")
            return Unit
        }
        error("Impossible")
    }

     

    delay를 통해서 함수가 suspend되었을 때, COROUTINE_SUSPENDED를 리턴하는 것을 볼 수 있습니다. myFunction에서 COROUTINE_SUSPENDED를 리턴하면, myFunction을 호출한 함수도 suspend되어 COROUTINE_SUSPENDED를 리턴하고, 이러한 전파(propagation)는 콜 스택 꼭대기까지 이어집니다. 이렇게 suspension은 해당되는 함수를 모두 일시정지시키고, 쓰레드를 free시키게 됩니다. 

     

    본질적인 요소들만 남겨서 작성한 MyFunctionContinuation의 형태는 아래와 같습니다:

     

    class MyFunctionContinuation(
        val completion: Continuation<Unit>
    ) : Continuation<Unit> {
        override val context: CoroutinContext
            geet() = completion.context
          
        var label = 0
        var result: Result<Any>? = null
        var userId: String? = null
        
        override fun resumeWith(result: Result<String>) {
            this.result = result
            val res = try {
                val r = myFunction(token, this)
                if (r == COROUTINE_SUSPENDED) return
                Result.success(r as Unit)
            } catch (e: Throwable) {
                Result.failure(e)
            }
            completion.resumeWith(res)
        }
    }

     

    IntelliJ에서 실제 코틀린의 바이트 코드를 보며 확인해 볼 수도 있습니다.

     

     

     

    반응형
Kaden Sungbin Cho