ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [스파크 완벽 가이드] Chapter 7 - 집계 연산 (1)
    Dev/Spark 2020. 10. 18. 02:07

    * 해당 포스트는 "스파크 완벽 가이드" 책을 읽고 난 이후의 정리 내용입니다.

      자세한 내용은 "스파크 완벽 가이드" 책을 통해 확인해주세요.

    * 내용이 너무 길어 나눠 게시합니다.

     

    집계 연산에 대해 설명하는 장

    집계: 무언가를 함께 모으는 행위

    집계 함수: 키/그룹 지정 후 하나 이상의 컬럼을 변환하는 집계 함수, 여러 입력 값 -> 그룹별 결과 생성

    Ex) 특정 그룹의 평균값 구하기 등의 수치형 데이터 요약

     

    7.1 집계 함수

    org.apache.spark.sql.functions 패키지에서 찾을 수 있음

     

    사용 데이터

    github.com/databricks/Spark-The-Definitive-Guide

     

    databricks/Spark-The-Definitive-Guide

    Spark: The Definitive Guide's Code Repository. Contribute to databricks/Spark-The-Definitive-Guide development by creating an account on GitHub.

    github.com

    데이터 불러오기

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    val df = spark.read.format("csv")
            .option("header""true")
            .option("inferSchema""true")
            .load("/git/Spark-The-Definitive-Guide/data/retail-data/all/*")
            .coalesce(5)
    // df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [InvoiceNo: string, StockCode: string ... 6 more fields]
     
    df.cache()
    // res0: df.type = [InvoiceNo: string, StockCode: string ... 6 more fields]
     
    df.createOrReplaceTempView("dfTable")
    cs

     

    7.1.1 count

    특정 컬럼, 혹은 전체를 조건으로 Row의 개수를 검색하는 Method

    Transformation으로 작동(Action 아님)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    df.count()
    // res2: Long = 541909                                                             
     
    import org.apache.spark.sql.functions.count
    // import org.apache.spark.sql.functions.count
     
    df.select(count("StockCode")).show()
    /*
    *+----------------+
    *|count(StockCode)|
    *+----------------+
    *|          541909|
    *+----------------+
    */
     
    df.select(count("*")).show()
    /*
    *+--------+
    *|count(1)|
    *+--------+
    *|  541909|
    *+--------+
    */
     
    df.count()
    // res4: Long = 541909
    cs

    select 내의 count와 dataframe에서의 count의 차이 : Transformation이냐 Action이냐

    (Transformation의 경우 Action이 수행되기 전까지는 수행되지 않는 Lazy Execution 형식임)

     

    7.1.2 countDistinct

    중복된 값을 제거하고, 고유 값이 몇개나 나왔는지를 확인하는 Method

    1
    2
    3
    4
    5
    6
    7
    8
    9
    import org.apache.spark.sql.functions.countDistinct
    // import org.apache.spark.sql.functions.countDistinct
     
    df.select(countDistinct("StockCode")).show()
    // +-------------------------+                                                     
    // |count(DISTINCT StockCode)|
    // +-------------------------+
    // |                     4070|
    // +-------------------------+
    cs

    사용 예) 특정 항목이 중복되어 나오는 경우가 많을 때 distinct count를 통해 몇가지 항목이 존재하는 지 확인

     

    7.1.3 approx_count_distinct

    정확한 고유 개수를 구하지 않고 근사치로 고유 개수를 찾음(Full scan을 하지 않기 때문에 대규모일 때 속도 측면에서 유리)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    import org.apache.spark.sql.functions.approx_count_distinct
    // import org.apache.spark.sql.functions.approx_count_distinct
     
    df.select(approx_count_distinct("StockCode",0.1)).show()
    // +--------------------------------+
    // |approx_count_distinct(StockCode)|
    // +--------------------------------+
    // |                            3364|
    // +--------------------------------+
    cs

    Method의 인자: 찾을 컬럼명, 최대 추정 오류율(Maximum Estimation Error)

     

    7.1.4 first & last

    DataFrame의 첫, 마지막 값을 찾는 데 사용, 값이 아닌 Row를 기반으로 동작

    1
    2
    3
    4
    5
    6
    7
    8
    9
    import org.apache.spark.sql.functions.{first, last}
    // import org.apache.spark.sql.functions.{first, last}
     
    df.select(first("StockCode"), last("StockCode")).show()
    // +-----------------------+----------------------+
    // |first(StockCode, false)|last(StockCode, false)|
    // +-----------------------+----------------------+
    // |                 85123A|                 22138|
    // +-----------------------+----------------------+
    cs

     

    7.1.5 min & max

    대상 컬럼의 최솟값, 최댓값을 추출

    1
    2
    3
    4
    5
    6
    7
    8
    9
    import org.apache.spark.sql.functions.{min, max}
    // import org.apache.spark.sql.functions.{min, max}
     
    df.select(min("Quantity"), max("Quantity")).show()
    // +-------------+-------------+
    // |min(Quantity)|max(Quantity)|
    // +-------------+-------------+
    // |       -80995|        80995|
    // +-------------+-------------+
    cs

     

    7.1.6 sum

    대상 컬럼의 전체 합 추출

    1
    2
    3
    4
    5
    6
    7
    8
    9
    import org.apache.spark.sql.functions.sum
    // import org.apache.spark.sql.functions.sum
     
    df.select(sum("Quantity")).show()
    // +-------------+
    // |sum(Quantity)|
    // +-------------+
    // |      5176450|
    // +-------------+
    cs

     

    7.1.7 sumDistinct

    대상 컬럼의 고유값의 전체 합 추출

    1
    2
    3
    4
    5
    6
    7
    8
    9
    import org.apache.spark.sql.functions.sumDistinct
    // import org.apache.spark.sql.functions.sumDistinct
     
    df.select(sumDistinct("Quantity")).show()
    // +----------------------+                                                        
    // |sum(DISTINCT Quantity)|
    // +----------------------+
    // |                 29310|
    // +----------------------+
    cs

     

    7.1.8 avg

    평균값을 구하는 Method

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    import org.apache.spark.sql.functions.{sum, count, avg, expr}
    // import org.apache.spark.sql.functions.{sum, count, avg, expr}
     
    df.select(
            count("Quantity").alias("total_transactions"),
            sum("Quantity").alias("total_purchases"),
            avg("Quantity").alias("avg_purchases"),
            expr("mean(Quantity)").alias("mean_purchases")
        )
        .selectExpr(
            "total_purchases/total_transactions"
            "avg_purchases"
            "mean_purchases"
        )
        .show()
     
    // +--------------------------------------+----------------+----------------+      
    // |(total_purchases / total_transactions)|   avg_purchases|  mean_purchases|
    // +--------------------------------------+----------------+----------------+
    // |                      9.55224954743324|9.55224954743324|9.55224954743324|
    // +--------------------------------------+----------------+----------------+
    cs

     

    7.1.9 분산 & 표준편차

    분산: 각 값들의 평균과의 차이를 제곱한 결과의 평균

    표준편차: 분산의 제곱근

    스파크에서는 Method를 사용해 분산/표준편차 계산 가능

    * 스파크에서는 표본표준편차 외에도 모표준편차 방식도 지원하기 때문에 주의 필요,

       variance, stddev 함수의 경우 기본적으로 표본표준분산과 표본표준편차 공식 사용

       모표준분산/모표준편차 사용하려면 var_pop이나 stddev_pop 사용

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    import org.apache.spark.sql.functions.{var_pop, stddev_pop, var_samp, stddev_samp}
    // import org.apache.spark.sql.functions.{var_pop, stddev_pop, var_samp, stddev_samp}
     
    df.select(
            var_pop("Quantity"), 
            var_samp("Quantity"), 
            stddev_pop("Quantity"), 
            stddev_samp("Quantity")
        )
        .show()
    // +------------------+------------------+--------------------+---------------------+
    // | var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
    // +------------------+------------------+--------------------+---------------------+
    // |47559.303646609056|47559.391409298754|  218.08095663447796|   218.08115785023418|
    // +------------------+------------------+--------------------+---------------------+
    cs

     

    7.1.10 비대칭도 &  첨도

    데이터의 변곡점을 측정하는 방법

    비대칭도: 데이터 평균의 비대칭 정도 측정

    첨도: 데이터 끝 부분 측정

    1
    2
    3
    4
    5
    6
    7
    8
    9
    import org.apache.spark.sql.functions.{skewness, kurtosis}
    // import org.apache.spark.sql.functions.{skewness, kurtosis}
     
    df.select(skewness("Quantity"), kurtosis("Quantity")).show()
    // +-------------------+------------------+
    // | skewness(Quantity)|kurtosis(Quantity)|
    // +-------------------+------------------+
    // |-0.2640755761052562|119768.05495536952|
    // +-------------------+------------------+
    cs

     

    7.1.11 공분산 & 상관관계

    두 컬럼값 사이의 영향도 비교

    cov: 공분산, 표본/모집단(전체) 선택 가능

    corr: 상관관계(-1~1)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    import org.apache.spark.sql.functions.{corr, covar_pop, covar_samp}
    // import org.apache.spark.sql.functions.{corr, covar_pop, covar_samp}
     
    df.select(
            corr("InvoiceNo""Quantity").alias("상관관계"),
            covar_samp("InvoiceNo""Quantity").alias("표본공분산"),
            covar_pop("InvoiceNo""Quantity").alias("모공분산")
        )
        .show()
     
    // +--------------------+------------------+------------------+
    // |            상관관계  |        표본공분산  |          모공분산  | 
    // +--------------------+------------------+------------------+
    // |4.912186085635685E-4|1052.7280543902734|1052.7260778741693|
    // +--------------------+------------------+------------------+
    cs

    7.1.12 복합 데이터 타입의 집계

    특정 컬럼의 값을 리스트/셋으로 전환

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    import org.apache.spark.sql.functions.{collect_set, collect_list}
    // import org.apache.spark.sql.functions.{collect_set, collect_list}
     
    df.agg(collect_set("Country"), collect_list("Country")).show()
    // +--------------------+---------------------+                                    
    // |collect_set(Country)|collect_list(Country)|
    // +--------------------+---------------------+
    // |[Portugal, Italy,...| [United Kingdom, ...|
    // +--------------------+---------------------+
     
    cs

     

    * 7.2부터는 다음 포스트로 게시됩니다.

    댓글

Designed by Tistory.