본문 바로가기
대학원 공부/computer science

Big Data : Hadoop : lecture_7 : MapReduce

by 월곡동로봇팔 2019. 10. 30.

Motivation

MapReduce는 구글에서 개발한 대용량 데이터 처리를 위한 소프트웨어 프레임워크이다.

MapReduce는 복수 개의 개인용 컴퓨터에서 데이터를 분산해서 처리할 수 있도록 설계되었다.

MapReduce는 크게 Map, Reduce로 이루어져 있다. Map에서 받은 데이터를 사용자가 정의한 코드에 따라서

intermediate pair를 만들어 준다. 그 후에 가공된 pair를 Reduce 함수에서 만들어진 intermediate 키 값에 따라서 정리를 해준다. 이는 사용자 정의에 의해 다양한 방식으로 활용가능하기 때문에,  다양한 분야에서 활요하고 있다.

 

Background

MapReduce는 주어진 자원을 효율적으로 사용하고, 자유도 있게 사용할 수 있게 해주기 때문에, 빅데이터 처리에 있어서 효과적인 프레임워크라고 할 수 있다.

 

Classic MapReduce (version 1)

classic MapReduce

구성요소는 MapReduce job을 요청하는 Client, job을 run하는데 관리하는 jobtracker, task로 split 되어 들어간 job들을 task에서 run하게 도와주는 tasktracker, HDFS로 구성!

 

step 5은 jobtracker가 submitjob을 부여받으면, 나중에 job scheduler가 진행하는 internal queue에 job을 push한다.

 

tasktracker는 개별적으로 map, reduce task들을 위한 fixed_number의 slot이 정해져있다.

reduce task 선택하기 위해, jobtracker는 데이터 지역성 고려 사항이 없으므로, 아직 실행되지 않은 reduce task 목록에서 다음 작업을 수행합니다. 

그러나 map task 경우, tasktracker의 네트워크 위치를 고려하여 입력이 tasktracker와 가장 가까운 작업을 선택합니다. ( data-local, rack-local)

 

version 1 에서는 jobtracker가 job scheduling, task progress monitoring까지 둘다 다한다.

 


YARN (version 2)

제일 큰 변화는 YARN에서는 jobtracker의 기능을 cluster의 resource를 관리하는 resource manager, MapRedcue 같은 application job들이 돌아가도록 도와주는 application master로 분리하였다.

또한 slot은 container 개념을 바뀌었다.

또한 tasktracker를 클러스터 안에서 machine 위에 container를 시작하고 지켜보는 nodemanager 로 바꾸었다.

How Hadoop Runs a MapReduce Job

MapReduce version 2

 

step1 : submit() method를 통해서 job을 생성한다.

step2 : resource manager에게 새로운 application ID, MapReduce Job ID을 요구한다.

step3 : job 실행을 위해 jar file, configuration file, input split data 같은 resource들을 HDFS에서 copy 해온다. 

step4 : submitApplication()을 통해 job을 resourcemanager에게 제출한다.

step5a : resource manager안에 scheduler가 container를 할당해주고, Container에 있는 Applicaiton master를 실행할 수있는 NodeManager를 찾음.

step5b : nodemanager 가 application master를 찾아서 실행시킨다.

step6 : application master가 job을 실행시킨다. 

step7 : HDFS로 부터 input split data를 불러온다.

 

- job이 만약에 작을경우, applicaiton master는 같은 JVM안에서 돌리도록 지시할 것이다. 다른데 부여안하고

 

step8 : resource가 부족할경우, application master는 resource manager로부터 reduce, map task를 부여하기 위한 container를 request 할 것이다.

 

- map task을 위한 request는 reduce 보다 우선순위에 든다. (당연한...)

- reduce task는 cluster 어디서나 해도되지만, input을 받는 map task는 data locality를 지키기 위해 data와 가까운 곳으

로 배치한다. 

 

step9a-b : application master는 nodemanager contact 하여 container를 시작한다.

 

Progress and State Updates

Progress and State Updates

status :

  • job, task는 각자 status를 가진다.
  • status는 job이나 task의 state, 상태를 의미한다. (run, completed, failed)
  • map, reduce의 prorgress를 의미한다.
  • job counter's value

map, reduce task가 run중일 때, child process는 부모 application master와 communicate 중이다.

이는 R.M web UI가 running 하는 것을 보여준다. UI는 application master와 연결되어있다.

client는 applicatin master로 부터 최신의 status를 받는다. 또한 getStatus()를 써서 status info를 가지고 있는 JobStatus instance를 얻는다.

 

Map Task는 input data를 비율대로 나눈다.

Reduce Task는 copy, sort, reduce phase로 나눈다. 이는 뒤에서도 자세히 설명할 예정이다.

 

 

Job Completion

application master가 job이 complete 되었다는 알림을 받으면 job의 status를 success로 돌린다.

그리고 application master, task container들은 그들의 working state를 치운다.

 


Failures and Job Scheduling

 

Task Failure

  • task가 실패하면 : task JVM은 error 메세지를 parent application master에게 보낸다. -> application master는 시도했던 task를 표시해두고 그 container를 비워둔다 (오류 났던 container라 비워두는 듯...) 
  • task가 죽으면 : application master는 progress update를 하지 못했다고 판단 -> JVM이 이 task를 죽여버린다.
  • task가 실패했단 사실을 A.M가 안 경우 : task를 rescheduling 하는데, 이전에 실패했던 nodemanager 에게 스케쥴링 하지 말라고 메세지를 보낸다. 그리고 4번 이상 fail한다면 (default로 줄 수 있다) 다시 시도하지 않는다.

 

Application Master Failure

  • Application Master가 Resource Mananger 에게 heartbeat를 주기적으로 보낸다. application master가 fail 한 경우, 우, resource manager는 fail을 감지할 수 있고, 새로운 container 안에서 master의 instance가 시작된다. 결국 다른 container로 application master 객체를 하나 더 만든다고 생각하면 된다.
  • hidden process에서 application master 가 fail할 경우, client는 timeout 이 된 것을 볼 수 있고, status가 update가 되면 새로운 application master의 주소를 resource manager로 부터 받는다.

 

Node Manager Failure

  • Resource Manager는 10분동안 heartbeat를 받지 못한다면, heartbeat를 보내지 않은 node manager를 알 수 있다.
  • fail한 nodemanager 위에서 돌아가는 모든 task와 application master를 모두 recover 할 수 있다.
  • application fail 수가 크다면 그 nodemanager는 아웃!!!

 

Resource Manager Failure

  • resource manager 가 fail 할 경우, job이나 task container가 시작되지 않는다.
  • default configuration에서 fail이 일어난다. 이는  모든 job이 멈추고 복구되지 않는다.
  • 좋은 결과를 얻기 위해서는 pair of resource manager를 run하는 것이 필수적이다. active, standby
  • standby는 active 가 죽엇을 때, 이를 커버하기 위해 나온다.

 


 

Shuffle and Sort

Shuffle and Sort

MapReduce는 모든 reducer에게 sorted 된 input 값을 준다.

그 이유는 sort한 후, map output을 보내서 reducer의 input으로써 보내는데 이를 shuffle이라 한다. 

shuffle은 Mapper가 split 한 data를 key, value로 나눈 후, 이를 같은 key 값으로 묶어주는 역할을 한다.

 

위에 그래프를 설명하자면, input_data를 HDFS 저장할 때, split한다.

분산 저장한 후, split 되어있는 data를 Mapper에 넣는다. 

Map Task 에서는 메모리 안에 있는 buffer에 split 한 data를 넣는데 만약 threshold size를 넘길 경우, disk에 넘긴다.

partition(파티션은 data의 경우대로 나뉜다. 이는 default 값으로 정할 수 있다.) 별로 sort되어서 disk에 spill 된다.

그 후 partition 별로 merge 를 한다.

그 후 Map Task 별로 끝나는 시간이 다르기 때문에 reducer로 그 때그 때 data를 보낸다. 이를 Copy Phase라 한다.

그 사이 combiner 가 존재할수도 있다.

partition에서 sort 했던거를 유지하면서, Map output을 merge, 이를 Sort Phase라 한다.

그 후 Reduce가 key 값대로 value를 더한다. Reducer는 HDFS에 저장한다.

 

 

Reduce에게 map output이 오는 것을 어떤 machine이 알려줄까???

 

Map Task가 끝난 후, A.M은 hearbeat를 통해 알고 있다. A.M은 Map과 Map output, host 사이에서도 알고있다.

또한 reduce는 주기적으로 A.M과 communicate하고 있고, map output을 다 검색하기 전까지 A.M에게 ask한다.

 

 


Task Execution

  • Speculative execution  of task:
  • Hadoop은 task가 slow될 때를 감지하려고 하고, 동등한 backup task를 시작하려한다.
  • Speculative task는 job의 모든 task가 launch할 때, 시작된다. 또한 이는 돌아가는 중간이나, 다른 task가 많은 진행이 될 때, fail한다.

댓글