ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 맵리듀스란? (Hadoop MapReduce)
    Data 2021. 8. 22. 21:44
    반응형

    맵리듀스는 프로그래밍 모델임과 동시에 구현체를 부르는 말로, 그 구현체는 '분산처리엔진' 역할을 하는 하둡의 중심 모듈 중 하나입니다. 

     

    사용자는 키/값 쌍을 처리하는 map 함수를 설정하여 중간 결과물 형태의 키/값 쌍 데이터를 만들고, reduce 함수를 설정하여 앞선 중간 결과물의 키를 통해 같은 키를 가진 값들을 합쳐서 최종 결과물을 만듭니다 [1]. 

     

    이렇게 함수형 스타일로 작성된 프로그램은 자동적으로 병렬화되어 처리되며, 일반적인 상용 기기에서(특별한 고성능 기기가 아닌) 대용량으로 처리를 가능하게 합니다. 그렇기에 이러한 프로그램을 사용하여 분산병렬 애플리케이션을 손쉽게 개발하고 사용할 수 있게 합니다. 

     

    이 글에서는 '맵리듀스'와 관련해 아래와 같은 내용을 다루겠습니다:

     

    • 맵리듀스 프로그래밍 모델
    • 맵리듀스 구현체의 특징들
    • 하둡 2.0 이후의 맵리듀스 아키텍쳐

     


    맵리듀스 프로그래밍 모델

    맵리듀스 컴퓨팅은 input인 키/값 쌍을 받아서, ouput인 키/값 쌍을 만듭니다. 그것을 위해서 맵리듀스 라이브러리 사용자는 그러한 컴퓨팅을 Map과 Reduce라는 2가지 함수로 표현하게 됩니다. 

     

    Image from [2]

     

    먼저 Map은 사용자가 작성하며 input 쌍을 받아서 중간결과물의 키/값 쌍을 만듭니다. 맵리듀스 라이브러리는 같은 키를 가진 모든 중간결과물들을 모아서 Reduct 함수로 넘깁니다. 

     

    Reduce 함수 역시도 사용자가 작성하며 Map이 넘긴 중간결과물인 키/값 쌍을 받습니다. 이후 그러한 쌍들을 합쳐서 보통 더 작은 값들을 만듭니다. 중간결과물은 사용자가 작성한 reduce에 'iterator'를 통해서 전달되는데, 이것은 메모리에 적재하기 너무 큰 값들을 처리할 수 있도록 도와줍니다. 

     

    예시

    하나의 예시로, 문서에서 각 단어가 발생하는 빈도를 계산하는 프로그램을 들 수 있습니다. 그러한 프로그램의 map과 reduce는 pseudo-code로 아래와 같은데요:

     

    map(String key, String value):
        // key: document name
        // value: document contents
        for each word w in value:
            EmitIntermediate(w, "1");
    
    reduce(String key, Iterator values):
        // key: a word
        // values: a list of counts
        int result = 0;
        for each v in values:
            result += ParseInt(v);
        Emit(AsString(result));

     

    map 함수는 하나의 단어와 발생빈도(위 예시에서는 1)를 발생시킵니다. reduce 함수는 특정 단어 키에 발생한 모든 발생빈도를 합산합니다. 사용자는 input, output 파일이름과 같이 추가적인 튜닝 파라미터가 포함되는 mapreduce specification 객체를 설정할 수 있습니다. 이후 사용자가 specification 객체를 넣고 MapReduce 함수를 실행하게 됩니다. 이후 사용자의 코드는 C++로 구현된 MapReduce 라이브러리와 연결되며 실행됩니다. 

     

    타입

    위에서는 문자열 input, output이었지만, 개념적으로 사용자가 작성하는 map과 reduce 함수는 각각의 타입을 가집니다.

     

    map        (k1, v1)        -> list(k2, v2)
    reduce     (k2, list(v2))  -> list(v2)

     

    즉, input의 키와 값은 output의 키와 값과 다른 원천에서 결정됩니다. 더욱이, 중간결과물의 키와 값은 output의 키와 값과 같은 원천에서 결정됩니다.

     

    맵리듀스 프로그래밍 모델이 사용될 수 있는 곳들

    아래와 같은 사례들은 손쉽게 맵리듀스 컴퓨팅으로 나타내어질 수 있습니다.

     

    • 분산그래프: map 함수는 제공받은 패턴에 매칭되면 하나의 줄을 발생시키니다. reduce 함수는 식별함수로 제공받은 중간결과물을 output으로 복사합니다.
    • URL 접근 빈도 계산: map 함수는 웹 페이지 요청 로그를 처리하여 (URL, 1)의 결과를 발생시킵니다. reduce 함수는 같은 URL에 대한 모든 값을 더하여 (URL, 총 빈도수)의 결과물을 만듭니다.
    • Inverted Index: map 함수는 각 문서를 파싱하고 (단어, 문서 ID)의 중간결과물을 발생시킵니다. reduce 함수는 주어진 단어에 대한 모든 값을 받아서 값인 문서 ID들을 정렬하고 (단어, list(문서 ID))를 만듭니다. 이러한 결과물은 간단한 inverted index를 형성합니다. 

     

     

    맵리듀스 구현체의 특징들

    맵리듀스 프로그래밍 모델은 다양한 방식의 맵리듀스 인터페이스로 구현될 수 있습니다. 적절한 선택은 항상 환경에 의존하게 되는데요. 예로 작은 공유-메모리에 적합한 구현체가 있는 반면, 대용량의 NUMA 멀티-프로세서에 적합한 것이 있을 수 있습니다. 

     

    Hadoop MapReduce의 모체인 Google의  MapReduce는 아래와 같은 환경에 적합하게 만들어 졌습니다:

     

    • 머신은 보통 2-4 GB 메모리를 가지며, 듀얼 x86 프로세서로 리눅스를 사용함
    • 보통의 상용 네트워크 하드웨어를 사용함. 머신 레벨의 성능은 100 MB/s ~ 1 GB/s이며 평균은 좀 더 낮을 수 있음
    • 클러스터는 수백 ~ 수천개의 머신으로 이뤄져 있으며 머신의 실패는 흔함  
    • 저장소는 각 머신에 직접 연결된 비싸지 않은 IDE 디스크로 제공됨. 인하우스로 개발된 분산파일시스템이 이러한 디스크 관리를 위해 사용됨. 파일시스템은 안정적이지 않은 하드웨어에 상에서 가용성과 안정성을 제공하기 위해 복제를 사용함.
    • 사용자는 스케쥴링 시스템에 잡을 제출함. 각 잡은 task들로 구성되며 스케쥴러에 의해 가용한 클러스터 내 머신들에 매핑됨.

     

    실행 오버뷰

    map 실행은 자동적으로 input  데이터를 M개로 나뉘어져(split) 여러 머신들에 분산되게 됩니다. 그렇기에 input 조각들은 다른 머신들에서 병렬적으로 처리될 수 있습니다. Reduce 실행은 파티셔닝 함수를 사용해 중간결과물의 키 공간을 R개로 파티셔닝하여 분산될 수 있습니다. 파티션 수(R)와 파티셔닝 함수는 사용자에 의해 결정됩니다. 

     

    실행 구조 - Image from [1]

    위 이미지는 전체적인 MapReduce 실행 흐름을 보여주고 있습니다. 사용자가 MapReduce 함수를 호출할 때 아래와 같은 일들이 발생하게 됩니다:

     

    1. 사용자 프로그램의 맵리듀스 라이브러리는 먼저 input 파일들을 보통 16 ~ 64 MB 단위로 M개의 조각으로 나눕니다. 이후 클러스터의 머신들에서 프로그램의 여러 복제본들을 시작합니다. 
    2. 그러한 프로그램 복제본들 중 하나는 마스터로 특별한 역할을 합니다. 나머지는 워커로 마스터로부터 일을 할당 받습니다. 총 M개의 map tasks와 R개의 reduce tasks가 존재합니다. 마스터는 놀고 있는 워커를 선택하고 각 task를 워커에 할당합니다.
    3. map task를 할당받은 워커는 상응하는 input 조각을 읽어들입니다. 이후 input 데이터의 키/값 쌍을 파싱하고 각 쌍은 사용자가 정의한 map 함수에 넘깁니다. map에 의해 생성되는 중간결과물인 키/값 쌍은  메모리에 버퍼됩니다. 
    4. 버퍼된 쌍은 파티셔닝 함수에 의해 R개로 나뉘어져 주기적으로 로컬 디스크에 쓰여집니다. 로컬 디스크 상의 버퍼된 쌍의 위치는 마스터에게 전달되고 마스터는 이러한 위치를 reduce 워커에 포워딩합니다. 
    5. reduce 워커가 마스터로부터 이러한 위치에 대한 알림을 받으면, 해당 워커는 remote procedure call을 사용하여  map 워커의 로컬 디스크에 존재하는 버퍼된 데이터를 읽어들입니다. reduce 워커가 모든 중간결과물을 읽어들였을 때, 같은 키들의 값이 그룹될 수 있도록 중간결과물을 키로 정렬합니다. 이러한 정렬은 보통 매우 다른 키들이 같은 reduce task로 매핑되기에 필요합니다. 만약 중간결과물의 크기가 메모리에 올리기에 너무 크다면, 외부의 정렬이 사용됩니다.
    6. reduce 워커는 정렬된 중간결과물 데이터를 순회하면서 유니크한 중간결과물 키를 발견할 때마다 그 키와 연결된 중간결과물 값들을 사용자가 정의한 Reduce 함수로 넘깁니다. Reduce 함수의 결과물은 해당 reduce 파티션을 위한 최종 output 파일에 추가됩니다.
    7. 모든 map과 reduce task들이 완료되면, 마스터는 사용자 프로그램을 깨웁니다. 이 시점에서 사용자 프로그램의 MapReduce 호출은 다시 사용자 코드로 돌아갑니다. 

    성공적인 완료 이후에, 맵리듀스 실행의 output은 R개의 output 파일들에 존재합니다. 보통 사용자는 이런 R개의 output 파일들을 하나로 합칠 필요가 없습니다. 이러한 파일들은 새로운 맵리듀스 작업의 input으로 넣을 수도 있고 다른 분산 애플리케이션에 사용하더라도 흔히 파티션된 여러 파일들을 한번에 읽어들일 수 있습니다. 

     

    마스터 내부의 데이터 구조

    마스터는 몇 가지 데이터 구조를 가집니다. 각 map과 reduce task를 위해서, 마스터는 상태(idle, in-progress 또는 종료), 워커 머신의 식별자(non-idle task를 위해)를 저장합니다. 

     

    마스터는 중간결과물의 위치가 map task로부터 reduce task로 전달되는 통로입니다. 그러므로, 완료된 각 map task를 위해서 마스터는 map task에 의해 생성된 R개의 중간결과물 파일의 위치와 크기를 저장합니다. 이러한 위치와 크기에 대한 정보는 map task들이 종료되면서 저장되게 됩니다. 이러한 정보는 실행중(in-progress)인 reduce task를 가진 워커로 점진적으로 보내지게 됩니다. 

     

    Fault Tolerance

    맵리듀스 라이브러리는 대용량의 데이터를 수백, 수천개의 머신을 사용해 처리하도록 설계되었기에 라이브러리는 반드시 머신 실패를 극복할 수 있어야 합니다. 

     

    워커 실패

    마스터는 각각의 워커를 주기적으로 ping합니다. 만약 워커로부터 특정 시간 동안 응답을 받지 못하는 경우, 마스터는 워커를 실패로 마킹합니다. 워커에 의해 완료된 map task들은 처음 상태인 idle로 돌아가며, 그러므로 다른 워커에 스케쥴링될 수 있는 대상이 됩니다. 유사하게, 실패한 워커의 map, reduce task는 idle로 reset되고 재-스케쥴링에 대상이 됩니다.

     

    완료된 map task들은 실패시 재실행되는데 그 이유는 map의 output들이 실패한 머신의 로컬 디스크에 저장되어 있기에 접근이 불가능하기 때문입니다. 실패한 머신의 완료된 reduce task들은 그 결과물이 글로벌 파일 시스템에 저장되기 때문에 재실행될 필요가 없습니다.

     

    map task가 워커 A에 의해 먼저 실행되고 이후 A가 실패하여 워커 B에 의해 실행되면, reduce task들을 실행하는 모든 워커들은 재실행을 알림 받습니다. 아직 워커 A로부터 데이터를 읽지 않은 reduce task는 워커 B로부터 데이터를 읽습니다. 

     

    맵리듀스는 대규모의 워커 실패를 회복할 수 있습니다. 예로, 한 맵리듀스 실행 중 네트워크 작업이 80개의 머신 그룹에 대한 접근을 몇 분간 불가능하게 만들었던 때에도, 그 작업을 완료할 수 있었습니다. 

     

    마스터 실패

    위에서 언급된 마스터의 데이터 구조를 주기적인 체크포인트로 write해두는 것은 간단합니다. 마스터가 죽으면, 가장 최근의 체크포인트에서부터 시작할 수 있는데요. 그러나 마스터가 하나만 존재하기 때문에 그러한 방식으로 작업을 이어갈 수 없습니다. 현재 맵리듀스 구현체는 마스터 실패 시 맵리듀스의 실행도 실패하게 됩니다. 

     

    실패 시의 Semantics

    사용자가 작성한 map과 reduce 오퍼레이터가 input 값의 deterministic 함수일 때에, 실패가 존재하는 프로그램도 전체 프로그램 실행에 실패가 없을 때와 완전히 동일한 결과물을 생성하게 됩니다. 

     

    이러한 특성을 이루기 위해서 맵리듀스는 map과 reduce task의 atomic 커밋에 의존합니다. 실행중인 각 task는 결과물을 private 임시 파일들로 써둡니다. 하나의 reduce task는 한 개의 파일을, map task는 R개의 파일들을 만들게 되는데요. map task가 완료되면, 워커는 마스터에 메시지를 보내고 R개의 임시 파일명들을 메시지에 넣습니다. 만약 마스터가 이미 완료된 map task에 대한 완료 메시지를 받게 되면, 마스터는 메시지를 무시합니다. 아니라면, R개의 파일명들을 마스터 데이터 구조에 저장합니다.

     

    reduce task가 완료되면 reduce 워커는 atomically 임시 결과물 파일을 최종 결과물 파일로 이름을 변경합니다. 만약 같은 reduce task가 여러 머신에서 실행되고 있다면, 같은 최종 결과물 파일에 대해 여러 번의 rename 호출이 실행되게 됩니다. 맵리듀스는 기반하는 파일시스템에서 제공하는 atomic rename 연산에 의존하여 최종 파일 시스템의 상태가 한 번의 reduce task 실행으로 생성된 데이터만 가지고 있도록 보장합니다. 

     

    대부분의 task과 reduce 오퍼레이터는 deterministic하며 맵리듀스의 semantics이 순차적인 실행과 비슷하다는 점은 프로그래머가 프로그램의 동작방식을 쉽게 이해하는 데 큰 도움이 됩니다. map가 reduce 오퍼레이터가 non-deterministic하게 되면, 조금 복잡하나 그래도 충분히 이해할 수 있는 semantics를 제공합니다. Non-deterministic 오퍼레이터가 존재하는 경우, 특정 reduce task의 결과물 R1은 non-deterministic 프로그램의 순차적 실행으로 생성된 결과물 R1과 같습니다. 그러나, 다른 reduct task의 결과물 R2가 다른 non-deterministic 프로그램의 순차적 실행의 결과물인 R2와 같을 수도 있습니다. 

     

    로컬리티

    컴퓨팅 환경에서 보통 네트워크 대역폭은 상대적으로 희소한 자원입니다. input 데이터가 클러스터를 구성하는 머신들의 로컬 디스크에 저장되어 있다는 점을 이용하여 이러한 네트워크 대역폭을 아낄 수 있는데요.  GFS는 각 파일을 64MB의 블록으로 나누고 각 블록의 복제본들을 다른 머신들에 저장합니다. 맵리듀스 마스터는 input 파일들의 위치 정보를 고려해서 map task를 input 파일의 복제본이 존재하는 머신에 스케쥴할 수 있습니다. 만약 그것이 실패한다면, 마스터는 map task를 task의 input 데이터 복제본이 있는 머신과 가까이에 존재하는 머신에 task를 스케쥴링 합니다. 

     

    Task Granularity

    위에서 언급한 것처럼 map phase는 M개의 조각으로, reduce phase는 R개의 조각으로 나뉩니다. 이상적으로는 M과 R은 워커 머신 수보다 커야합니다. 각 워커가 여러 다른 task들을 실행할 때에 dynamic 로드밸런싱을 개선할 수 있고 워커 실패 시 회복도 빠르게 할 수 있습니다. 

     

    그러나 마스터가 O(M + R) 스케쥴링 결정을 하고 O(M + R)개의 상태를 메모리에 저장해야 하기에 M과 R의 크기를 제한하기도 합니다 (상태 정보 크기는 그닥 크지는 않습니다). 

     

    더욱이 R은 주로 사용자에 의해 제한되는데,  그 이유는 각 reduce task의 결과물은 다른 파일로 생성되기 때문입니다. 보통 2000개의 워커 머신을 사용할 때에 M = 200,000으로 R = 5,000으로 설정하였습니다. 

     

    Backup Tasks

    맵리듀스 수행 시간을 길게 만드는 원인 중 하나는 몇몇 map과 reduce 수행이 매우 느린 머신이 전체 속도를 늦추기 때문입니다(straggler). 이러한 straggler는 호스트의 여러가지 이유로 발생하게 되는데요. 예로, 불량한 디스크를 가진 머신은 잦은 correctable 에러를 마주하고 읽기 성능을 30 MB/s에서 1 MB/s로 낮추기도 합니다. 스케쥴링 시스템이 다른 task를 머신에 할당하면, 맵리듀스 코드가 

    CPU, memory, 로컬 디스크, 네트워크 등 자원 사용을 위해 경쟁하게 되면서 느려지는 경우도 존재합니다. 

     

    이러한 straggler에 대한 일반적은 방비책은 마스터가 남은 실행중(in-progress)인 task를 백업하기 위한 task를 실행하는 것입니다. task는 원래 실행되던 task 또는 새롭게 만들어진 백업 task 둘 중 하나가 끝나는 시점에 완료될 수 있습니다. 

     

     

    하둡 2.0 이후의 맵리듀스 아키텍쳐

     

    MapReduce Architecture - Image from  Author modified from [5]

     

    Image MRAppMaster and HDFS, YARN - Image from [4]

    Reference

    [1] MapReduce: Simplified Data Processing on Large Clusters

    [2] https://www.tutorialspoint.com/map_reduce/map_reduce_quick_guide.htm

    [3] https://hadoop.apache.org/docs/r3.3.1/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html

    [4] https://www.researchgate.net/publication/333119590_SHIYF_A_Secured_and_High-Integrity_YARN_Framework

    [5] https://www.section.io/engineering-education/understanding-map-reduce-in-hadoop/

    반응형
Kaden Sungbin Cho