ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Apache Spark(아파치 스파크): Adaptive Query Execution이란?
    Data 2021. 6. 9. 22:29
    반응형

    스파크 SQL의 쿼리 옵티마이저는 룰베이스 또는 코스트 기반의 2가지 방식을 가지며, 꾸준하게 개선이 이뤄져 왔습니다.  특히 코스트 기반의 최적화는 적절한 join 타입을 선택하거나, join 순서를 조정하는 등을 다양한 최적화를 담당합니다.

     

    기존의 Cost-based optimization (CBO)는 아래와 같은 한계가 존재합니다:

     

    • Stale 또는 잃어버린 통계치는 부정확한 예측을 발생
    • 통계치 수집에 많은 비용이 들어감
    • Predicates이 UDFs를 포함한 경우 통계치(selectivity, cardinality) 수집이 불가능함
    • 데이터가 빠르게 변화하는 경우에는 하드코딩된 hints가 잘 동작하지 않음

    Adaptive Query Execution (AQE)는 런타임 시 발생하는 다양한 통계치를 수집하여, CBO가 가진 위와 같은 한계점을 극복하고 성능을 개선을 가능하게 합니다.

     

    이 글에서는 AQE와 관련해 아래와 같은 부분들을 다뤄볼 예정입니다:

     

    • AQE와 Query Stages
    • Dynamically coalescing shuffle partitions
    • Dynamically switching join strategies
    • Dynamically optimizing skew joins

     

    AQE와 Query Stages

    AQE의 구현에 있어 중요한 결정요소는 언제 재-최적화를 수행할 것인가에 관한 것입니다. 스파크 오퍼레이터는 주로 파이프라인 형태로 병렬 프로세스로 실행되게 됩니다. 그러나 셔플 또는 브로드캐스트 exchange는 이러한 파이프라인을 끊습니다. 이 부분은 materialization points라고 불리며 "query stages"라는 용어로 불립니다. 

     

    SELECT x, AVG(y)
    FROM t
    GROUP BY x
    ORDER BY avg(y)

    예로, 위와 같은 쿼리는 아래와 같은 쿼리 계획과 materialization point(Pipeline Breadk Point) 그리고 Query Stage를 가집니다:

    Query plan and points - Image from Author inspired by [3]

    각 query stage는 중간 결과물을 materialize하고 이후의 stage는 반드시 해당 query stage의 모든 병렬 처리가 materialize 된 이후에만 진행이 가능합니다. 모든 파티션에 대한 통계치가 존재하고 이후의 연산은 아직 시작되지 않는 시점이기에 이 meterialization point는 자연적으로 매우 좋은 재-최적화 지점이 됩니다. 

     

    쿼리 수행이 시작되면 AQE는 처음에는 모든 leaf stages를 시작합니다. 이러한 stages들이 materialization을 끝내면, 물리적 쿼리 계획에서 종료된 것으로 표시하고, 완료된 stages로부터 얻은 통계치를 가지고 이후의 논리적 계획을 업데이트합니다. 또한 이러한 새로운 통계치에 기반해서 프레임웍은 옵티마이저, 물리적 플래너, 물리적 최적화 룰(일반적인 물리적 룰과 AQE의 여러 룰을 포함 - 아래)을 수행하게 됩니다. 

     

    스파크 3.0의 AQE는 아래에서 하나씩 살펴볼 3가지 기능을 제공합니다.

     

     

    Dynamically coalescing shuffle partitions

    셔플 파티션 숫자와 사이즈는 쿼리 성능에 매우 직결됩니다. 파티션의 크기게 너무 크거나, 작으면 아래와 같은 문제가 발생할 수 있습니다:

     

    파티션이 너무 작을 때 파티션이 너무 클  때
    비효율적인 I/O
    스케쥴러의 오버헤드
    Task 셋업 오버헤드
    GC 부하
    Disk Spilling

    기존의 정적인 파티션 수 설정 방식은 아래와 같은 이유로 인해 위의 2가지 경우의 문제를 피하기가 어려웠습니다:

     

    • 전체 쿼리 실행에서의 하나의 글로벌한 파티션 숫자 설정만 가능하였음
    • 쿼리 실행 시간대에 따라 데이터의 사이즈가 변경됨

     

    'Dynamically coalescing shuffle partitions' 기능은 동적으로 셔플 파티션 수를 줄일 수 있도록 하여,  기존의 정적 파티션 숫자 설정 기능에서 발생하는 것과 같이 너무 크거나 작은 파티션 사이즈의 문제를 피하며 성능을 개선합니다. 최초의 파티션 숫자는 큰 데이터 사이즈를 감당할 수 있도록 크게 설정하고, 쿼리 stage에서 필요하다면 자동적으로 파티션 수를 줄이게 됩니다. 

     

    Dynamically coalesce shuffle partitions - Image from [3]

     

    기존의 정적 파티션 숫자 설정은 아래와 같이 초기에 설정된 파티션 숫자를 끝까지 사용하기에 작은 파티션 사이즈가 존재하게 됩니다:

     

    Regular shuffle, no coalescing - Image from [3]

    하지만, AQE가 적용된 경우에는 Coalescing이 자동적으로 수행되어 파티션 숫자가 동적으로 설정되어 낮아지며 작은 파티션 사이즈의 문제를 피할 수 있게 됩니다:

    AQE-coalesced shuffle - Image from [3]

     

     

    Dynamically switching join strategies

    스파크는 join의 대상인 2개 데이터 중 하나 이상이 메모리에 로드되기에 충분히 작다면  Broadcast Hash Join (BhJ)을 선택합니다. 하지만, 수행 초기 시점에 행한 데이터 크기에 대한 예측이 틀릴 수 있으며 이는 BHJ로 수행될 수 있는 기회를 놓치게 만듭니다. 그러한 틀린 예측은 아래와 같은 이유로 발생하게 됩니다:

     

    • cardinality 또는  selectivity 예측을 위한 통계가 부정확할 수 있음
    • 대상 데이터가 여러 오퍼레이터의 복잡한 서브트리일 수 있음
    • UDFs와 같이 블랙박스 predicates이여서 초기 시점에 통계치 계산이 불가능할 수 있음

    AQE의 'Dynamically switching join strategies' 기능은 런타임 시의 정보를 바탕으로 join 전략을 다시 계획할 수 있도록 합니다. 

     

    Dynamically switch join strategies - Image from [3]

     

    위와 같이 초기의 Sort Merge Join 전략이, 런타임 시 정보가 업데이트되어 특정 데이터 대상이 충분히 작다는 사실을 인지하게 되고 BHJ로 변경되게 됩니다. 

     

     

    Dynamically optimizing skew joins

    AQE의 3번째 기능은 동적으로 skew가 존재하는 join을 최적화하는 기능입니다. 데이터 skew는 셔플 시에 특정 키에 값이 치우쳐져서 많이 존재하고 셔플 시에는 해당 셔플의 병렬 처리가 모두 종료되고 나서 다음 단계의 처리를 수행할 수 있기에 모든 병렬 처리가 빨리 끝나도 다음 단계로 넘어가기 위해서 가장 늦은 처리를 기다릴 수밖에 없게 만듭니다. 그리고 이 부분은 성능을 매우 저하시키게 됩니다. 

     

    AQE의 'Dynamically optimizing skew joins' 기능은 skew join을 런타임 통계치를 사용하여 1) 파티션 사이즈로부터 skew를 디텍팅하고, 2) skew 파티션을 더 작은 서브파티션들로 나누어 줍니다. 

     

    Dynamically optimize skew joins - Image from [3]

    위와 같은 Join 시에, 오른쪽과 같이 skew reader가 skew 파티션의 존재여부를 파악하고 존재한다면 서브파티션을 생성하게 됩니다.

     

    파티션의 사이즈를 중점적으로 살펴보면, 일반적인 경우 아래와 같이 A0 파티션에 더욱 상대적으로 많은 데이터가 존재하고, A1, A2, A3 파티션의 처리가 모두 끝나도 다음 단계의 수행을 위해서 A0 처리 종료를 기다리게 됩니다. 

    Regular sort merge join, no skew optimization - Image from [3]

     

    AQE가 적용되게 되면 아래와 같이 동적으로 skew 파티션을 쪼개어 병렬처리하게 되면서, 더욱 빠르게 셔플 단계의 처리가 수행되게 됩니다:

    Skew-optimized sort merge join - Image from [3]

     

     

    Reference

    [1] https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html

    [2] https://docs.databricks.com/_static/notebooks/aqe-demo.html

    [3] https://databricks.com/session_na20/adaptive-query-execution-speeding-up-spark-sql-at-runtime

     

     

     

     

     

     

    반응형
Kaden Sungbin Cho