ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [스파크 완벽 가이드] Chapter 12 - RDD
    Dev/Spark 2020. 11. 15. 17:22

    * 해당 포스트는 "스파크 완벽 가이드" 책을 읽고 난 이후의 정리 내용입니다.

      자세한 내용은 "스파크 완벽 가이드" 책을 통해 확인해주세요.

     

    RDD

    저수준 API는 RDD, SparkContext, Accumulator, Broadcast variable로 이루어짐

    12.1 저수준 API란

    RDD: 분산 데이터 처리 API

    Accumulator, Broadcast Variable: 분산형 공유 변수

     

    12.1.1 저수준 API는 언제 사용할까

    • 고수준 API에서 제공하지 않는 기능이 필요한 경우.
    • RDD를 사용해 개발된 기존 코드를 유지해야 하는 경우
    • 사용자가 정의한 공유 변수를 다뤄야 하는 경우

    스파크의 모든 워크로드: 저수준 기능을 사용하는 기초적인 형태로 컴파일됨

    DataFrame 트랜스포메이션 -> 실제 다수의 RDD 트랜스포메이션으로 변환됨

     

     

    12.1.2 저수준 API는 어떻게 사용할까

    SparkContext 사용

    SparkSession를 통해 sparkContext에 접근할 수 있음

     

    12.2 RDD 개요

    사용자가 실행한 모든 DataFrame/Dataset 코드 -> RDD로 컴파일됨

    RDD: 불변성을 가지며 병렬로 처리할 수 있는 파티셔닝된 레코드의 모음

    RDD의 레코드: Java/Scala/Python의 객체

    RDD를 사용할 경우 자유로운 제어는 가능하나 그만큼 수동으로 작업해야 하는 영역이 많아진다.

    12.2.1 RDD 유형

    RDD -> 다양한 하위 클래스가 존재, DataFrame API에서 최적화된 물리적 실행 계획을 만드는데 대부분 사용됨

    사용자: Generic RDD, Key-Value RDD를 생성할 수 있음

    각 RDD는 5가지 주요 속성으로 구분됨

    • 파티션의 목록
    • 각 조각을 연산하는 함수
    • 다른 RDD와의 의존성 목록
    • 부가적으로 키-값 RDD를 위한 Partitioner
    • 부가적으로 각 조각을 연산하기 위한 기본 위치 목록

    RDD도 DataFrame/Dataset과 마찬가지로 트랜스포메이션/액션의 Lazy Evaluation 형식으로 이루어져 있음.

    RDD에서는 Row X, Object O(Case Class로 구성된 Object들의 List, 혹은 Array)

     

    주의: Python에서 RDD를 다룰 경우 상당한 성능 저하가 발생할 수 있다.

    직렬화 과정을 거친 데이터 -> 파이썬 프로세스 -> 파이썬 return -> 직렬화 -> JVM에 반환

    12.2.2 RDD는 언제 사용할까

    물리적으로 분산된 데이터(자체적으로 구성한 데이터 파티셔닝)에 세부적인 제어가 필요할 때 RDD를 사용하는 것이 적합

    12.2.3 Dataset과 RDD의 케이스 클래스

    Dataset: 구조적 API가 제공하는 기능/최적화 기법을 사용할 수 있음

    RDD: 직접 함수/로직을 구현해야 함

    12.3 RDD 생성

    12.3.1 DataFrame, Dataset으로 RDD 생성

    Dataset[T] => RDD[T]의 형태로 변환됨

    DataFrame은 Row 타입의 Dataset이므로 RDD[Row] 형태로 변환된다.

    "rdd"를 통해 변환한다.

    반대의 경우는 "toDF()"를 사용한다.

     

    12.3.2 로컬 컬렉션으로 RDD 생성

    Collection -> RDD: parallelize 메서드를 사용해서 단일 노드 컬렉션 -> 병렬 컬렉션으로 전환, 파티션 수 지정 가능

     

    12.3.3 데이터소스로 RDD 생성

    textFile(PATH): 파일의 각 한줄 한줄을 Record로 읽음

    wholeTextFiles(PATH): 하나의 파일-> 1개의 Record

     

    12.4 RDD 다루기

    DataFrame을 다루는 방식과 매우 유사

    스파크 데이터 타입을 다루는 것이 아닌 자바/스칼라의 객체를 다룸

    '헬퍼' 메서드나 함수가 DataFrame에 비해 많이 부족함 -> 사용자가 직접 정의해야 함

     

    12.5 트랜스포메이션

    RDD -> 트랜스포메이션을 지정 -> 새 RDD 생성 가능

     

    12.5.1 Distinct

    중복 데이터 제거

     

    12.5.2 Filter

    조건 함수를 만족하는 Record만을 반환해 새 RDD를 만듦

     

    12.5.3 Map

    레코드를 주어진 입력을 원하는 값으로 반환하는 함수를 사용해 적용.

    매핑: Array, Collection의 각 요소들을 map 함수 내의 함수에 맞춰 매핑시킨다.

    flatMap: 매핑 결과를 단일 Row에서 여러 Row로 변환

     

    12.5.4 sortBy

    정렬

     

    12.5.5 randomSplit

    RDD를 임의로 분할, 가중치와 난수로 구성된 배열을 파라메터로 사용

     

     

    12.6 액션

    Lazy Evaluation으로 Transformation들을 적용하기 위해서는 Action을 사용해야 함

     

    12.6.1 reduce

    RDD를 하나의 값으로 만듦

    외부에서 정의한 함수를 사용해 내부에서 reduce를 수행하는 것이 가능

    읽어볼만한 자료: www.ridicorp.com/blog/2018/10/04/spark-rdd-groupby/

     

    Spark RDD에서 GROUP BY를 빠르게 하려면?

    분산 데이터 처리의 성능을 위해 Spark 의 소스코드를 파헤친 경험을 소개합니다.

    www.ridicorp.com

    12.6.2 count

    Row의 수를 계산

    countApprox(timeoutMilliseconds, confidence): count 함수의 근사치를 timeoutMilliseconds 내에 계산. 초과 시 불완전한 결과 반환. confidence: 신뢰도

    countByValue: RDD 값의 개수를 구함

    (주의: 값의 개수가 충분히 적을 때에만 사용해야함. 익스큐터들의 연산 결과가 모두 드라이버 메모리에 적재됨)

    countByValueApprox: countApprox와 마찬가지로 countByValue의 근사치를 구함. 파라메터도 동일

    12.6.3 first

    RDD의 첫 값을 반환

     

    12.6.4 max & min

    RDD의 최대값, 최소값을 반환

    12.6.5 take

    해당 RDD에서 값을 취한다는 의미의 함수, 파라메터는 취할 갯수

    takeOrdered(num: Int): num만큼 하위 갯수를 가져옴

    top(num: Int): num 만큼 상위 갯수를 가져옴

    takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): 고정 크기의 임의 표본 데이터를 얻을 때 사용, num = 임의 표본 수, seed: 난수 시드값(랜덤)

     

    thebook.io/006908/part01/ch04/03/02/04/

     

    스파크를 다루는 기술: 4.3.2.4 top과 takeOrdered로 정렬된 요소 가져오기

     

    thebook.io

    12.7 파일로 저장

    데이터 처리 결과 -> 텍스트 파일

     

    12.7.1 saveAsTextFile

    RDD를 파일로 저장

     

    12.7.2 시퀀스 파일

    바이너리 키-값 쌍으로 구성된 플랫 파일 생성(맵리듀스의 입출력 포맷)

     

    12.7.3 하둡 파일

    stackoverflow.com/questions/40069264/how-can-i-save-an-rdd-into-hdfs-and-later-read-it-back

     

    How can I save an RDD into HDFS and later read it back?

    I have an RDD whose elements are of type (Long, String). For some reason, I want to save the whole RDD into the HDFS, and later also read that RDD back in a Spark program. Is it possible to do that...

    stackoverflow.com

    ➕책에는 없지만 csv 파일로 저장하는 방법

    DataFrame으로 변환 후 repartition, write 사용해 저장

    12.8 캐싱

    RDD의 데이터를 메모리에 저장

    12.9 체크포인팅

    RDD의 데이터 + RDD의 변환 계보 등을 저장.

    유실 등이 일어나 연산을 다시 수행할 때 중간 결과 파티션을 참조해 계산을 이어나간다.

    thebook.io/006908/part01/ch04/04/03/

     

    스파크를 다루는 기술: 4.4.3 체크포인트로 RDD 계보 저장

     

    thebook.io

    12.10 RDD를 시스템 명령으로 전송

    pipe 메서드: 파이핑 요소로 생성된 RDD를 외부 프로세스로 전달

    pipe 사용 이후 결과를 collect()해서 사용할 수 있음

    pipe 수행 시 각 입력 파티션의 모든 요소는 개행 문자 단위로 분할, 이후 여러 줄의 입력 데이터로 변경되어 stdin에 전달됨

    결과 파티션은 프로세스의 stdout으로 생성됨, stdout의 각 줄은 출력 파티션의 하나의 요소가 된다.

     

    12.10.1 mapPartitions

    파티션 단위로 map 연산을 수행하는 것(Row 단위로 수행하는 것이 아니다)

    mapPartitionsWithIndex: 인덱스, 파티션 내부를 순회하는 Iterator를 통해 결과 RDD 생성

     

    12.10.2 foreachPartition

    각 파티션을 순회하는 함수. 결과를 반환하지는 않는다.

    따로 결과 등을 조회/저장하는 connector 등을 내부에 넣을 필요 있음

     

    12.10.3 glom

    모든 파티션을 배열로 변환하는 함수

    RDD의 크기가 크다면 절대로 하지 말자(파티션이 크거나, 파티션의 수가 많거나, 혹은 둘 다거나)

     

    댓글

Designed by Tistory.