ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 분산처리엔진이란? (Distributed Execution Engine)
    SE Concepts 2021. 8. 29. 15:11
    반응형

    분산처리엔진(Distributed Execution Engine)은 네트워크로 연결된 컴퓨터들로 이뤄진 클러스터에서 대용량의 데이터 처리를 위해 실행되는 소프트웨어 시스템입니다. 분산처리엔진은 하나의 안정적인 머신과 같은 대용량의 컴퓨팅과 I/O 성능이라는 '추상화'를 제공하기에 사용자는 하나의 머신에서 작업을 수행하는 것과 유사한 형태를 통해 작업을 수행할 수 있는데요. 이렇게 네트워크로 연결된 시스템의 각 머신은 데이터에 의존하는 컨트롤 흐름을 처리합니다 [1]. 

     

    그렇기에 분산 병렬 컴퓨팅에서 어려운 부분인 task 스케쥴링, 데이터 동기화, 데이터 송신, 네트워크에서의 실패와 같은 부분들을 구현해 놓았고, 사용자는 손쉽게 분산 병렬 데이터 프로세싱 애플리케이션을 작성할 수 있습니다. 

     

    분산처리엔진은 분산 컴퓨팅의 일종으로 '데이터에 의존'한다는 부분이 분산처리엔진을 구분짓는 특징이라고 할 수 있습니다. 이번 글에서는 아래와 같은 사항들을 알아보도록 하겠습니다:

     

    • 분산 컴퓨팅과 분산처리엔진
    • 분산처리엔진의 특징
    • 다양한 분산처리엔진들

    분산 컴퓨팅과 분산처리엔진

    분산 컴퓨팅은 분산 시스템을 공부하는 분야입니다. 분산 시스템은 시스템을 구성하는 컴포넌트들이 네트워크로 연결된 다른 컴퓨터로 이뤄져 서로 또는 다른 시스템과 메시지를 주고 받으며 수행에 대한 커뮤니케이션과 조정을 하는 시스템입니다.

     

    이러한 분산 시스템은 컴포넌트들의 concurrency, 글로벌 lock의 부재, 컴포넌트의 실패가 독립적이라는 특징을 가지는데요. 클라이언트-서버, 3-티어, n-티어, peer-to-peer와 같은 여러 아키텍쳐가 존재합니다. 실제 적용된 사례도 인터넷 네트워크, WWW 등과 같이 대규모의 통신, 네트워크 애플리케이션부터 distributed cache, 분산 데이터베이스 등과 같이 비교적 작은 규모로도 존재합니다 [4].

     

    분산 애플리케이션을 좀 더 단순하게 구분하기 위해서, 아래에서와 같이 프로그래밍 모델에 기반한 관점으로 나눌 수 있습니다 [7]:

     

      프로그래밍 모델 상태 Fault tolerance Systems
    Bulk Synchronous Parallel [10] Iterative Computation stateless Checkpoint, Lineage MapReduce, Hadoop, Spark
    Task Parallel Functional Programming stateless Lineage TensorFlow, Dask, CIEL
    Communicating
    Processes
    Actors, Conroutines,
    Message Passing
    stateful, but no shared state Custom Orleans, Erlang, Akka, MPI
    Distributed Shared Memory Threads fully stateful None Unix Processes, Unified Parallel C

     

    병렬 컴퓨팅의 가장 간단한 방법은 주어진 함수를 여러 데이터 묶음에 병렬적으로 수행하고 결과를 저장하는 것입니다. 이 방법이 바로 SIMD(single instruction multiple data) 모델인데요. 결과의 집계가 이뤄질 수 없기에 부족한 부분이 많습니다. SIMD에 집계(aggregation)을 더한 것이 Bulk Synchronous Parallel (BSP) 모델입니다. 이 BSP 패턴을 임의의 함수와 데이터 의존성 부분에서 일반화하나, '순수 함수'라는 부분과 'stateful 컴퓨팅'을 지원하지 않는다는 부분을 보존한 것이 Task Parallel 모델입니다. 

     

    강화학습이나 iteractive serving systems의 경우 프로그래밍 모델이 stateful processes를 가지도록 확장하게 됩니다. Communicating Processes 모델에서 상태는 여전히 파티션되어 있고, 프로세스들은 오직 명시적인 커뮤니케이션을 통해서만 상태를 교환할 수 있습니다. 이러한 제한을 좀 더 완화한 형태가 바로 distributed shared memory 모델로 완전한 stateful을 지원합니다. 

     

     

    각 프로그래밍 모델 상세

    분산 컴퓨팅의 프로그래밍 모델에서 어떤 부분이 분산처리엔진과 관련이 있는지 알아보기 전에, 각 프로그래밍 모델을 좀 더 구체적으로 살펴보겠습니다.

     

    Bulk Synchronous Parallel Model

    BSP 모델은 웹사이트를 크롤링하거나 대량의 데이터를 처리하기 위해 웹 초창기에 매우 인기있는 모델이었습니다. MapReduce와 Hadoop의 구현체는 실패를 걱정하지 않고 일반 하드웨어로 이뤄진 클러스터에서 대용량의 데이터 처리를 가능하게 해주었는데요. 그러한 프로그래밍 모델은 간단한 구현이 가능하게 해주었지만, 사실 제한적인 부분이 많았습니다. 프로그램의 로직은 이러한 패러다임에 맞게 다시 설계되어야 했습니다. 

     

    문서에서 inverted index를 생성하는 아래 예시에서, 3가지 매퍼가 존재하고 매퍼는 문서를 토큰으로 나누고 인덱스에 들어가야 할지 여부를 결정합니다. 

    Building an inverted index with MapReduce - Image from [7]

    이후 페이지를 토큰에 연결하고 A-L로 시작하는 모든 토큰을 첫 번째 리듀서로, M-Z로 시작하는 모든 토큰은 두 번째 리듀서로 보냅니다. 리듀서는 각 매퍼로부터 토큰을 받아서, 정렬하고, inverted index로 만듭니다. 

     

    이러한 처리를 Spark와 같은 시스템에서 아래와 같이 간단하게 구현할 수 있습니다:

    rdd.flatMap(lambda (document, contents): 
            [(token, [document] for token in contents.lower().split()])
        .reduceByKey(lambda a, b: a+b)

     

    그러한  몇개의 맵리듀스 페이즈(phases)를 체이닝하여, 이러한 모델에서 iterative한 컴퓨팅을 표현할 수 있습니다. 컴퓨팅의 리니지를 추적하여, 이 모델은 fault-tolerant할 수 있습니다. 그러나 이러한 컴퓨팅 모델은 조금 제한적입니다. 상태를 지원하지 않으며 맵과 리듀스로 표현되기 어려운 애플리케이션을 병렬화하기 매우 어렵습니다. 머신러닝 훈련이 이러한 모델로 표현될 수 있는 반면에, 성능과 유연성 요구사항을 고려하면 병렬 모델 훈련을 위해서 이 모델보다는 확장한 Task Parallel 모델을 고민하게 만듭니다. 

     

    Task Parallel Model

    TP 모델은 임의의 사이드 이펙트가 없는 함수를 분산 형태로 실행할 수 있도록 해줍니다. 각 함수는 input 아규먼트를 가지며 output을 생산합니다. 함수의 모든 input이 가용해지면, 함수는 프로세서들 중 하나에서 실행될 수 있으며 output을 생산하고, 이러한 생산은 이 ouptut에 의존하는 다른 모든 함수들의 실행을 촉발합니다. 맵리듀스 컴퓨팅은 TP 컴퓨팅의 특별한 케이스로, 각 매퍼는 데이터를 변형하는 함수이고 각 리듀서는 변형된 데이터를 받아 output으로 합치는 함수입니다. 일반적인 TP 모델에서, 각 함수는 다를 수 있고 데이터는 임의로 그 함수들 간에 전달될 수 있습니다. 

     

    아래 이미지는 NN 컴퓨팅이 어떻게 TP 모델에 부합하는지 보여줍니다. 아랫쪽에 input 데이터가 있고, 그 input은 재구조화된 후 affine layer로 전달되고, 여러 task 단계를 거쳐 처리됩니다. 

     

    Neural network task graph - Image from [7]

     

    위와 같은 처리는 Tensorflow 코드로 아래와 같이 표현될 수 있습니다:

     

    weights1 = tf.Variable(...)
    biases1 = tf.Variable(...)
    hidden1 = tf.nn.relu(tf.matmul(images, weights1) + biases1)
    
    weights2 = tf.Variable(...)
    biases2 = tf.Variable(...)
    hidden2 = tf.matmul(hidden1, weights2) + biases2
    
    result = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)

     

    이러한 프로그램은 분산 TensorFlow에서 각 함수 invocation을 실행될 기기를 명시하여 병렬화가 가능합니다. 병렬적으로 실행될 수 있는 함수가 있다면 속도를 매우 개선할 수 있습니다. TP 모델은 BSP 모델보다 더욱 강력하고 serial 프로그램이 보통 표현되는 방식과 더욱 잘 호환됩니다. TP 역시도 체크포인팅이나 리니지를 기록하여 fault-tolerant 합니다. 하지만 TP도 많은 상용 애플리케이션이 요구하는 상태를 지원하지는 않습니다.

     

    Communicating Processes Model

    상태를 지원하는 첫 번째 분산 컴퓨팅 모델은 CP 모델입니다. CP 모델은 TP 모델을 일반화한 것으로, TP 모델에서 함수를 프로세스들로 적절한 순서로 스케쥴링 하여 CP 모델 형태로 실행될 수 있습니다. CP 모델의 실제 사례로는 MPI와 같은 메시지 패싱 구현체 또는 Erlang, Akka, Orleans와 같은 액터 시스템이 존재합니다. 여기서는 액터 시스템이 강력한 동시에 좀 더 구조화된 프로그래밍 모델을 가지고 있기에, 액터 시스템을 중심으로 알아보겠습니다. 

     

    액터 모델은 원래 AI(Artificial Intelligence)의 컨텍스트에서 공식적으로 제안되었습니다. 액터 모델에서는 분산 시스템을 서로 원격에서 메소드를 호출할 수 있는 객체들(액터들)의 집합으로 바라봅니다. 각 액터는 상태와 요청받은 메소드들에 대한 리스트를 가지고 있습니다. 보통 이러한 메소드들은 도착한 순서대로 실행됩니다. 본직적으로 액터는 OOP에서의 객체들이 분산된 버젼입니다. 분산 액터 애플리케이션의 예시로는 간단한 채팅 서비스를 들 수 있습니다(WhatsApp은 실제로 Erlang으로 구현됨). 아키텍쳐는 아래와 같이, 매니저 액터가 연결된 유저들을 추적하고, 유저 당 하나의 클라이언트 프록시 액터가 메시지를 유저의 기기로 포워딩합니다. 

    A chatroom implementation inf the actor framework - Image from [7]

     

    코드는 아래와 같습니다.

     

    listen(Port) ->
        Pid = spawn(fun() -> manage_clients([]) end,
        register(client_manager, Pid),
        {ok, Listen} = gen_tcp:listen(Port, ?TCP_OPTIONS),
        accept(Listen).
        
    accept(Listen) ->
        {ok, Socket} = gen_tcp:accept(Listen),
        spawn(fun() -> handle_client(Socket) end),
        client_manager ! {connect, Socket},
        accept(Listen).
    
    manage_clients(Sockets) ->
        receive
            {connect, Socket} ->
                NewSockets = [Socket | Sockets];
            {disconnect, Socket} ->
                NewSockets = lists:delete(Socket, Sockets);
            {data, Data} ->
                send_data(Sockets, Data),
                NewSockets = Sockets
        end,
        manage_clients(NewSockets).
    
    send_data(Sockets, Data) ->
        SendData = fun(Socket) -> gen_tcp:send(Socket, Data) end,

     

    listen 함수는 클라이언트와 통신 시 사용하는 소켓 아규먼트를 받습니다. 이후 listen 함수는 spawn 콜과 같이 매니저 액터를 시작하고 매니저 액터를 client_manager로 등록합니다. 이후 프로그램은 새로운 클라이언트를 accept 메소드로 받습니다. 새로운 클라이언트 각각마다 새로운 클라이언트 프록시가 spawn과 같이 시작되고 매니저와 같이 등록됩니다. 느낌표는 메시지를 다른 액터에 보내는 Erlang의 문법입니다. 이 케이스에서, connect 메시지는 Socket 아규먼트와 같이 client_manager로 전달됩니다. manage_clients는 프로그램의 메인 함수로 매니저 액터가 실행합니다. Sockets 아규먼트 안에 존재하는 액터의 상태는 클라이언트 sockets들로 이뤄져 있습니다. 매니저 액터가 2번째 줄에서 spawn 시에는 빈 리스트를 가지고 시작합니다. 새로운 클라이언트가 연결될 때마다, 클라이언트 소켓은 리스트에 추가되고 클라이언트 종료 시 리스트에서 제거됩니다. 하나의 클라이언트에서 새로운 데이터를 매니저로 보내면, 데이터는 20번째 줄의 send_data 함수를 통해 모든 클라이언트에 포워딩됩니다. 

     

    위의 예시는 분산 프로그램을 작성하는 한 가지 간단한 예시로 클라이언트-서버 모델로 이뤄져 있으며 CP 모델에 속합니다. 일반적으로, 특별하게 할당된 서버 없이 액터들은 서로 메소드를 임의의 패턴으로 호출할 수 있습니다. 액터도 fault-tolerant하도록 구성할 수 있습니다. Erlang에서 fault-tolerance는 supervision tree로 가능한데, 액터는 트리 형태로 구조화되고 만약 액터가 실패하면 해당 액터의 부모가 알림을 받고 액터를 재시작하고 상태를 리셋합니다. 

     

    Distributed Shared Memory Model

    가장 강력한 분산 프로그래밍 모델은 DSM 모델입니다. 이상적인 버젼에서, DSM은 전체 클러스터를 단일한 대규모 멀티코어 기기로 보여주어 공유 메모리에서 데이터를 읽고 쓰는 여러 실행 쓰레드를 사용해 프로그램 될 수 있도록 합니다. 실제로는 이러한 이상적인 버젼은 구현이 불가능합니다. 네트워크를 통해 원격의 메모리에 접근하는 지연속도는 로컬 메모리에 접근하는 지연속도보다 월등히 높기 때문입니다. 그러므로 만약 프로그래머가 클러스터의 topology를 고려하지 않고 DMS을 비-구조화된 형식으로 사용한다면 매우 비효율적인 프로그램이 될 수 있습니다. 게다가 DSM 아키텍쳐는 보통 fault-tolerant하지 않습니다. 

     

    그러나, 특별한 네트워크 하드웨어와 연결된 특화된 슈퍼컴퓨터는 특정 작업에 매우 효율적인 성능을 보여줄 수 있습니다. 일반적인 하드웨어가 사용되는 클라우드 상에서, 이러한 프로그래밍 패러다임을 성공적으로 적용하기란 매우 어렵습니다.

     

    분산처리엔진의 프로그래밍 모델

    분산 처리 엔진은 분산과 동시에 병렬성을 특징으로 가집니다. 그렇기에 Task Parallel 모델과 그에 속하는 Bulk Synchronous Parallel 모델이 분산처리엔진에 사용되는 것을 볼 수 있습니다. 

     

    분산처리엔진의 특징

    • Task Scheduling
    • Data Distribution
    • Load Balancing
    • Transparent Fault Tolerance
    • Control Flow
    • Tracking Dependencies

     

    초창기(2011) DEE들의 특징 비교:

    Analysis of the features provided by existing distributed execution engines - Image from [6]

     

    다양한 분산처리엔진들

    A Lineage of DEE BSP - Image from Author inspired by [2]

     

     

    Reference

    [1] What are Distributed Execution Engines? Characteristics and types of Distributed Execution Engines

    [2] Foundations for Architecting Data Solutions

    [3] https://www.splunk.com/en_us/data-insider/what-are-distributed-systems.html

    [4] https://en.wikipedia.org/wiki/Distributed_computing

    [5] DtCraft: A High-Performance Distributed Execution Engine at Scale

    [6] CIEL: a universal execution engine for distributed data-flow computing

    [7] Ray: A Distributed Execution Engine for the Machine Learning Ecosystem 

    [8] Distributed Computing: Principles, Algorithms, and Systems

    [9] Pregel: a system for large-scale graph processing

    [10] A bridging model for parallel computation

    반응형
Kaden Sungbin Cho