ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spark SQL이란?
    Data 2021. 6. 6. 13:35
    반응형

     

     

    Apache Spark RDDs(Resilient Distributed Datasets)의 이점과 특징 (feat. DSM)

    아파치 스파크의 중심에는 데이터 모델인 Resilient Distributed Datasets (RDDs)가 존재합니다. 스파크 연구의 연혁을 살펴보더라도 핵심인 RDDs에 대한 설계 및 구현이 끝난 이후(2012년 4월, [1]) 폭발적으

    kadensungbincho.tistory.com

     

    Spark SQL은 정형 데이터 처리를 위한 스파크 모듈입니다 [3]. 기본적인 Spark RDD API와 다르게, 스파크 SQL 인터페이스는 데이터와 수행되는 컴퓨팅과 관련한 구조에 대한 정보를 제공합니다. 내부적으로 스파크 SQL은 이러한 추가 정보를 사용하여 최적화를 수행하게 되는데요. 

     

    이전 시스템과 비교해, 스파크 SQL은 2가지 주요 기능을 가지고 있습니다. 첫 번째로, 절차형 스파크 코드를 통합하는 declarative DataFrame API를 통해 관계형 및 절차형 처리 간 더욱 강력한 연동을 제공합니다. 두 번째로, Scala 프로그래밍 언어의 기능을 사용한 매우 확장성 있는 Catalyst 옵티마이저를 포함하며 composable rules, control code generation 추가와 확장을 쉽게 제공합니다. 

     

    이번 글에서는 위의 2가지 주요 기능을 중점으로 아래와 같은 부분들을 살펴보겠습니다:

     

    • Spark SQL Programming Interface
    • Catalyst Optimizer

    Spark SQL Programming Interface

    스파크 SQL은 스파크 코어에 기반한 라이브러리 형태로 실행됩니다. 스파크 SQL은 SQL 인터페이스를 제공하며 이것은 1) JDBC/ODBC, 2) 커맨드라인 콘솔, 3) 스파크가 지원하는 언어의 DataFrame API 3가지를 통해 접근할 수 있습니다:

     

    Interfaces to Spark SQL, and interaction with Spark - Image from [1]

    DataFrame API

    Spark SQL API의 주요 추상화는 데이터프레임으로, 데이터프레임은 동일한 스키마를 가진 rows들의 분산 콜렉션입니다. 데이터프레임은 관계형 데이터베이스에서 하나의 테이블에 대응되며, RDDs와 유사한 형태로 조작될 수 있습니다. RDDs와는 달리, 데이터프레임은 스키마 정보를 계속가지며 좀 더 최적화된 실행을 도출하는 다양한 관계형 연산을 지원합니다. 

     

    데이터프레임은 시스템 카탈로그 상의 테이블이나 기존에 존재하던 native Java/Python 객체의 RDDs로부터 생성될 수 있습니다. 일단 생성되면 데이터프레임은 domain-specific language(DSL)의 where, groupBy와 같은 다양한 관계형 연상으로 조작할 수 있습니다. 각 데이터프레임은 Row 객체들로 된 RDD로 볼 수 있으며 사용자는 map과 같은 절차형 스파크 API를 호출할 수도 있습니다. 

     

    전통적인 데이터프레임 API들과는 달리, 스파크 데이터프레임은 lazy합니다. 그렇기에 각 데이터프레임 객체는 하나의 데이터셋을 처리하기 위한 논리적 플랜을 나타내며 사용자가 save와 같은 action을 호출하기 전까지 어떠한 실제 실행도 발생하지 않습니다. 이러한 lazy한 특성은 데이터프레임을 만드는데 사용되는 모든 연산에 효과적인 최적화를 가능하게 합니다. 

     

    ctx = new HiveContext()
    users = ctx.table("users")
    young = users.where(users("age") < 21)
    println(young.count())

    위의 코드에서, users와 young은 데이터프레임입니다. users("age") < 21 부분은 데이터프레임 DSL의 expression으로 스파크 코어의 API에서 Scala 함수를 나타내는 것과 달리 추상화된 syntax tree로 처리되게 됩니다. 데이터프레임들은 순전히 논리적인 플랜을 나타내고, 사용자가 count를 호출할 때야 비로소 스파크 SQL은 물리적 플랜은 만들어 최종 결과를 산출하게 됩니다. 

     

     

    데이터 모델

    스파크 SQL은 테이블과 데이터프레임에 대해 하이브에 기반한 nested data model을 사용합니다. 또한, boolean, integer, double, decimal, string, date, timestamp와 complex data types(structs, arrays, maps, unions)와  같은 SQL의 주요 데이터 타입을 지원합니다. 

     

     

    데이터프레임 연산

    사용자는 R의 데이터프레임 또는 Python Pandas와 유사하게 DSL을 통해 데이터프레임에 연산을 수행할 수 있습니다. 데이터프레임은 projection(select), filter(where), join, aggregations(groupBy)와 같은 일반적인 관계형 연산을 모두 지원합니다. 이러한 연산자는 limited DSL 상의 expression 객체를 취하여 스파크가 expression의 구조를 감지하도록 합니다. 

     

    exployees
    	.join(dept , employees ("deptId") === dept ("id"))
    	.where( employees ("gender") === "female")
    	.groupBy(dept ("id"), dept ("name"))
    	.agg(count ("name"))

     

    위에서 employees는 데이터프레임이고 exployees("deptId")는 expression으로 deptId 컬럼을 나타냅니다. Expression 객체들은 새로운 expression을 리턴하는 여러 연산자를 가지고 있습니다. 이러한 다양한 연산자는 expression의 abstract syntax tree를 만들고 Catalyst에 전달되어 최적화를 진행하게 됩니다. 

     

     

    데이터프레임 vs 관계형 쿼리 언어

    표면상으로 데이터프레임은 SQL, Pig와 같은 관계형 쿼리 언어와 같은 연산을 제공하지만, 스파크 SQL은 절차형 및 관계형 인터페이스를 통합하여 제공하기에  사용자가 매우 편리하게 작업을 진행할 수 있도록 합니다. 사용자는 Scala, Java, Python으로 된 함수로 데이터프레임을 전달하여 논리적 플랜을 만들고, 최종적으로 전체 플랜에 있어 최적화의 이점도 누릴 수 있습니다. 또한, if 문장과 같은 control 구조를 통해 SQL로만은 작성하기 까다로운 프로그램도 손쉽게 작성할 수 있게 됩니다. 

     

    인메모리 캐싱

    스파크 SQL은 이전의 Shark와 같이 자주 사용되는 데이터를 컬럼형 스토리지를 사용해 메모리에 materilize(주로 캐시로 언급되는)할 수 있습니다. 스파크의 native 캐시는 데이터를 단순히 JVM 객체로 저장하는 반면, 컬럼형 캐시는 dictionary encoding과 run-length 인코딩과 같은 컬럼 압축 방식을 적용하기에 메모리 소비량이 매우 적습니다. 

     

    옵션명 필요 공간 CPU 시간 메모리 저장 디스크 저장
    MEMORY_ONLY high low Yes No
    MEMORY_ONLY_SER low high Yes No
    MEMORY_AND_DISK high medium Some(prior) Some
    MEMORY_AND_DISK_SER low high Some(prior) Some
    DISK_ONLY low low No Yes

     

    User-Defined Functions

    UDFs는 데이터베이스 시스템에서 매우 중요한 확장 포인트가 되었습니다. 예로, MySQL은 JSON 데이터에 대한 기본적인 지원을 UDFs에 기반하고 있습니다. 그러나, 데이터베이스 시스템은 주로 UDFs가 주요 쿼리 인터페이스와 다른 분리된 프로그래밍 환경에 정의되도록 합니다. 반면 스파크 SQL 데이터프레임 API는 inline UDFs 정의를 지원하여 데이터베이스 시스템에서와 같은 복잡한 패키징이나 등록과정이 필요하지 않습니다. 

     

     

    Catalyst Optimizer

    스파크 SQL의 핵심에는 advanced 프로그래밍 언어의 기능(Scala의 패턴매칭과 quasi quotes)을 이용하여 확장성 있는 쿼리최적화도구인 Catalyst 옵티마이저가 존재합니다. Catalyst는 Scala의 함수형 프로그래밍 constructs에 기반하여 아래와 같은 2가지 목적을 위해 디자인 되었습니다:

     

    • 스파크 SQL에 새로운 최적화 테크닉과 기능을 쉽게 더할 수 있도록
    • 외부 개발자가 옵티마이저를 확장할 수 있도록

    Catalyst는 tree를 나타내고 tree를 조작하기 위한 룰을 적용하기 위한 라이브러리입니다. 이러한 핵심 프레임워크에 기반해, 관계형 쿼리 프로세싱에 특화된 라이브러리와 쿼리 실행에서의 여러 페이즈(analysis, logical optimization, physical planning, 쿼리를 자바 바이트 코드로 컴파일 code generation)에서의 처리를 위한 여러 룰을 가지고 있습니다. 후자를 위해서 Scala의 기능인 quasiquotes를 사용하여 composable expressions으로부터 런타임 시 코드 생성을 쉽게 만들어 줍니다. 

     

    또한, Catalyst는 외부 데이터 소스와 user-defined 타입과 같은 몇몇의 public extension 포인트를 제공해줍니다. 그리고 룰 기반 및 코스트-기반 최적화를 지원합니다 [7]. 

     

    트리

    Catalyst의 메인 데이터 타입은 node 객체로 구성된 tree입니다. 각 노드는 노드 타입과 0개 또는 그 이상의 children을 가집니다. 새로운 노드 타입은 Scala에서 TreeNode 클래스의 서브타입으로 정의되게 됩니다. 이러한 객체들은 immutable하고 이후에 나올 함수형 transformation을 통해 조작될 수 있습니다. 

     

    간단한 예시를 살펴보면, 간단한 expression 언어를 위해 아래와 같은 3가지 노드 클래스를 가지고 있다고 가정하면,

     

    • Literal(value: Int): constant 값
    • Attribute(name: String): input row로부터의 attribute
    • add(left: TreeNode, right: TreeNode): 두 expression의 sum

    이러한 클래스들은 아래와 같은 트리를 만드는데에 사용될 수 있습니다:

     

    Add(Attribute(x), Add(Literal(1), Literal(2)))

     

    Catalyst tree for the expression x+(1+2) - Image from [1]

    Rules

    트리는 룰을 사용해서 조작될 수 있습니다. 룰은 하나의 트리에서 다른 트리로의 함수입니다. 룰이 input 트리에 대해 임의적인 코드를 실행할 수 있지만, 가장 일반적인 접근은 패턴매칭 세트를 사용하여 서브트리의 일부분을 찾고 특정 구조로 바꾸는 것입니다. 

     

    패턴매칭은 algebraic 데이터 타입의 nested 구조들로부터 값을 추출하는 것을 가능하게하는 여러 함수형 언어들의 기능입니다. Catalyst에서 트리는 transform 메소드를 제공합니다. 이것은 트리의 모든 노드에 recursive하게 패턴매칭 함수를 적용하고 패턴에 맞는 각각을 결과로 transform합니다. 예로, constants간 Add 연산을 수행하는 부분을 간단하게 하는 룰을 아래와 같이 구현할 수 있습니다:

     

    tree.transform {
        case Add(Literal(c1), Literal(c2)) -> Literal(c1+c2)
    }

     

    이러한 룰을 x+(1+2)와 같은 트리에 적용하면, 새로운 트리인 x+3가 만들어지게 됩니다. 

     

    transform에 전달되는 패턴 매칭 expression은 partial funcation으로 가능한 모든 input 트리의 서브셋에 매칭된다는 것을 의미합니다. Catalyst는 주어진 룰이 input 트리에 어떤 부분에 적용될지 테스트하고, 자동적으로 매칭되지 않는 서브트리를 스킵합니다.  이러한 능력은 룰이 주어진 조건에 매칭되는 트리에서만 처리될 필요가 있게 만들기에, 시스템에 더해지는 룰은 새로운 연산자의 타입으로 변경될 필요가 없습니다. 

     

    룰은 같은 transform 호출에서 여러 패턴을 매칭할 수 있어서, 간결하게 여러 transform을 한 번에 구현할 수 있습니다:

     

    tree. transform {
        case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
        case Add(left , Literal (0)) => left
        case Add(Literal (0), right) => right
    }

     

    실제로, 룰들은 완전히 트리를 변환하기 위해 여러번 실행될 필요가 있습니다. Catalyst는 룰들을 배치로 그룹하여 트리가 fixed point(트리가 더이상 변하지 않을 때)에 도달할 때까지 각 배치를 실행합니다. fixed point까지 룰들을 실행한다는 점은 각 룰이 간단하고 self-contained하지만 결국 트리에 큰 글로벌 영향을 미친다는 점을 의미합니다. 각 배치 이후에 개발자는 새로운 트리에 검증을 위한 체크를 실행할 수 있으며 주로 recursive 매칭을 통해 작성됩니다. 

     

    마지막으로 룰의 조건들과 바디는 임의의 Scala 코드로 이뤄져 있기에, Catalyst에 domain specific한 언어보다 더욱 간결하고 강력한 기능을 제공합니다. 

     

     

    Spark SQL에서의 Catalyst

    Catalyst의 일반적인 트리 transformation 프레임워크는 4가지 페이즈에 사용됩니다. 1) 참조 resolve를 위한 논리적 계획 분석, 2) 논리적 계획 최적화, 3) 물리적 계획, 4) 쿼리를 Java 바이트코드로 컴파일하는 코드 생성 시가 그 4가지에 해당됩니다. 물리적 계획 단계에서 Catalyst는 여러 계획을 생성하고 cost에 기반해 비교합니다. 다른 3가지 페이즈는 순수하게 룰 기반으로 작성되어 있습니다. 각 페이즈는 다른 종류의 트리 노드를 사용합니다. 그리고 Catalyst는 expressions, 데이터 타입, 논리적 및 물리적 연산자를 위한 노드로 이뤄진 라이브러리를 포함합니다. 

     

    Phases of query planning in Spark SQL. Rounded rectangles represent Catalyst trees - Image from [1]

     

    분석(Analysis)

    스파크 SQL은 컴퓨팅될 relation으로부터 시작합니다. 이것은 SQL parser가 리턴하는 Abstract syntax tree (AST) 또는 API를 사용해 만들어진 DataFrame 객체로부터 전달되게 됩니다. 2가지 케이스 모두에서, relation은 unresolved 어트리뷰트 참조나 relation을 포함합니다. 예로, SQL 쿼리 "SELECT col FROM sales"에서 col의 타입, valid한 컬럼명인지 등은 테이블 sales을 살펴보기 전까지 알 수 없습니다. 스파크 SQL은 Catalyst 룰과 Catalog 객체를 사용하여 이러한 어트리뷰트는 resolve하기 위해서 모든 데이터 소스의 테이블들을 추적합니다. 이 부분에서 unbound 어트리뷰터와 데이터 타입이 있는 "unresolved 논리적 계획" 트리를 만들며 단계를 시작하고, 이후 아래와 같은 룰들을 적용합니다:

     

    • 카탈로그로부터 relation을 이름을 통해 찾음
    • col과 같은 이름이 있는 어트리뷰트를 주어진 연산자의 children의 input에 매핑함
    • 같은 값을 참조하는 어트리뷰트들에 유니트 ID를 부여함 (추후 col = col과 같은 expression 최적화에 사용)
    • expressions 전반에 타입을 전파하고 연쇄적으로 규정함. 예호, 1 + col의 타입은 col의 타입을 알기 전에는 resolve할 수 없음.

     

    논리적 최적화

    논리적 최적화 페이즈는 논리적 계획에 룰베이스 최적화를 적용합니다.  이러한 최적화에는 constant folding, predicate pushdown, projection pruning, null propagation, Boolean expression 단순화 등과 같은 룰들이 포함되어 있습니다. 일반적으로, 스파크 SQL 개발자가 여러 상황에 적용되는 룰을 추가하는 것은 매우 쉬웠습니다. 예로, fixed-precision DECIMAL 타입을 스파크 SQL에 더할 때, small precision의 DECIMAL의 합계과 평균과 같은 집계를 최적화하려 하였습니다. 그러한 부분을 위해 12 줄의 코드가 추가되었고 단순히 decimals의 합계와 평균 expression에 unscaled 64-bit LONG으로 캐스트해주는 것이 전부였습니다. 합계 부분만 구현하여 단순화한 버젼은 아래와 같습니다:

     

    object DecimalAggregates extends Rule[ LogicalPlan ] {
        /** Maximum number of decimal digits in a Long */
        val MAX_LONG_DIGITS = 18
    
        def apply(plan: LogicalPlan ): LogicalPlan = {
            plan transformAllExpressions {
                case Sum(e @ DecimalType . Expression (prec , scale ))
                   if prec + 10 <= MAX_LONG_DIGITS =>
                       MakeDecimal (Sum( LongValue (e)), prec + 10, scale)
        }
    }

     

     

    물리적 계획

    물리적 계획 단계에서, 스파크 SQL은 논리적 계획을 받아 스파크 실행 엔진에 맞는 물리적 연산자를 사용하여 하나 이상의 물리적 계획을 생성합니다. 이후 여러 물리적 계획 중 cost에 기반해 하나를 선택하게 됩니다. 이 부분에서 코스트 기반 최적화는 join 알고리즘을 선택하기 위해서만 사용됩니다(작다고 알려진 relation에는 브로드캐스트 조인을 사용). 그러나 코스트 기반 최적화는 룰을 사용해 전체 트리에 recursive하게 적용될 수 있기에 추후 더욱 많은 코스트 기반 최적화를 구현할 수 있습니다.

     

    물리적 플래너는 또한 파이프라인 프로젝션 또는 맵 연산에 필터 주입과 같은 룰 기반의 물리적 최적화도 수행합니다. 추가적으로 논리적 계획에서의 연산을 predicate 또는 프로젝션 pushdown을 지원하는 데이터 소스에 push할 수 있습니다. 

     

    Code Generation

    쿼리 최적화의 마지막 페이즈는 각 머신에서 실행될 Java 바이트코드를 생성하는 것에 관련이 있습니다. 스파크 SQL은 보통 프로세싱이 CPU 바운드한 인메모리 데이터셋에서 처리되기에 코드 제너레이션을 지원하여 많은 최적화를 이룰 수 있습니다. 그럼에도 불구하고 코드 제너레이션 엔진은 보통 만들기가 컴파일러만큼 복잡합니다. Catalyst는 코드 제너레이션을 간단하게 하기 위해 , Scala 언어의 특별한 기능인 quasiquotes에 의존합니다. Quasiquotes는 Scala 언어의 abstract syntax trees (ASTs)를 프로그래밍을 통한 구축을 가능하게 하고, ASTs는 이후 런타임 시에 Scala 컴파일러에 들어가 바이트코드를 생성합니다. 스파크 SQL은 Scala 코드가 expression을 evaluate하기 위해 SQL에 있는 expression을 나타내는 트리를 AST로 변형하고, 이후 컴파일하고 바이트코드를 실행하게 됩니다. 

     

    간단한 예시를 살펴보면, 위에서 이용한 Add, Attribute, Literal 노드 타입으로 이뤄진 (x + y) + 1와 같은 트리가 있습니다. 코드 제너레이션 없이는 그러한 expression은 Add, Attribute, Literal 노드로 이뤄진 트리를 단계적으로 내려가며 각 데이터 row를 번역하여야 합니다. 이러한 방법은 많은 양의 브랜치와 가상 함수 콜을 발생시키고 실행을 느리게 만듭니다. 

     

    코드 제너레이션을 통해, 특정 expression 트리를 Scala AST로 변형하는 함수를 아래와 같이 작성할 수 있습니다:

     

    def compile(node: Node ): AST = node match {
        case Literal(value) => q"$value"
        case Attribute (name) => q"row.get($name )"
        case Add(left , right) =>
        q"${compile(left )} + ${compile(right )}"
    }

     

    q로 시작하는 quasiquotes로 비록 생긴게 String과 비슷하게 생겼으나, 이것들은 컴파일 시에 Scala 컴파일러에 의해 파싱되어 코드 안의 ASTs를 나타냅니다. Quasiquotes는 변수들과 그 안에 이어지는 다른 ASTs($로 나타냄)들을 가질 수 있습니다.  예로, Literal(1)은 1을 위한 Scala AST가 되는 반면 Attribute("x")는 row.get("x")가 됩니다. 최종적으로 Add(Literal(1), Attribute("x"))와 같은 트리는 Scala expression 1+row.get("x")와 같은 AST가 됩니다.

     

    Quasiquotes는 오직 적절한 ASTs 또는 Literals만 대체되는 것을 확실히 하기 위해 컴파일 시에 타입 체킹을 진행하며, 이 점은 Quasiquotes가 string concatenation보다 훨씬 유용하게 만들어 주고 런타임 시 Scala 파서를 실행하는 대신 직접적으로 Scala AST로 나타낼 수 있게 합니다. 

     

    더욱이, 각 노드에 대한 코드 제너레이션 룰은 children이 리턴하는 트리가 어떻게 만들어졌는지 알 필요가 없기 때문에 매우 composable합니다. 마지막으로, Catalyst가 놓친 expression 레벨의 최적화가 남아 있다면 Scala 컴파일러에 의해 추가적으로 최적화될 수 있습니다. 

     

    Quasiquotes는 코드 제너레이션을 위해 사용되기에 매우 명확하며 많은 스파크 SQL  contributors가 빠르게 새로운 expressions 타입에 룰을 더하는 것을 목격해 왔습니다. Quasiquotes는 또한, native Java 객체에 대한 실행이라는 목적에도 잘 부합하여 이러한 객체들의 필드에 접근할 때 객체를 스파크 Row에 복사하여 Row의 접근 메소드를 사용하는 것 대신 필요한 필드에 직접 접근하는 코드를 생성할 수 있습니다. 

     

     

    Reference

    [1] Spark SQL: Relational Data Processing in Spark

    [2] Shark: SQL and Rich Analytics at Scale

    [3] https://databricks.com/glossary/what-is-spark-sql

    [4] https://spark.apache.org/docs/latest/sql-programming-guide.html

    [5] https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html

    [6] https://databricks.com/session_eu19/dynamic-partition-pruning-in-apache-spark

    [7] https://spoddutur.github.io/spark-notes/deep_dive_into_storage_formats.html

    [8] https://databricks.com/glossary/catalyst-optimizer

    [9] https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html

     

     

     

    Apache Spark(아파치 스파크): Dynamic Partition Pruning이란?

    스파크 3.0 Release [2]에서 추가된 Dynamic Partition Pruning(DPP)은 아래와 같이 전반적인 성능 향상과 더불어, 특정 쿼리에서는 x100에 가까운 최적화를 가능하게 해주었습니다. 이번 글에서는 이 DPP에 대

    kadensungbincho.tistory.com

     

    반응형
Kaden Sungbin Cho