ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [스파크 완벽 가이드] Chapter 7 - 집계 연산 (2)
    Dev/Spark 2020. 10. 18. 02:39

    * 해당 포스트는 "스파크 완벽 가이드" 책을 읽고 난 이후의 정리 내용입니다.

      자세한 내용은 "스파크 완벽 가이드" 책을 통해 확인해주세요.

    * 내용이 너무 길어 나눠 게시합니다.

     

    povia.tistory.com/39

     

    [스파크 완벽 가이드] Chapter 7 - 집계 연산 (1)

    *해당 포스트는 "스파크 완벽 가이드" 책을 읽고 난 이후의 정리 내용입니다. 자세한 내용은 "스파크 완벽 가이드" 책을 통해 확인해주세요. * 내용이 너무 길어 나눠 게시합니다. 집계 연산에 대�

    povia.tistory.com

    7.2 그룹화(Grouping)

    Grouping할 대상 컬럼(들)을 설정한 후 대상 컬럼(들), 혹은 타 컬럼들을 가공하기 위해 사용

    컬럼(들)의 그룹화(RelationalGroupedDataset으로 반환)->집계 연산 수행(DataFrame으로 반환) 의 두 단계로 나뉨

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    df.groupBy("InvoiceNo""CustomerId").count().show()
     
    // +---------+----------+-----+                                                    
    // |InvoiceNo|CustomerId|count|
    // +---------+----------+-----+
    // |   536846|     14573|   76|
    // |   537026|     12395|   12|
    // |   537883|     14437|    5|
    // |   538068|     17978|   12|
    // |   538279|     14952|    7|
    // |   538800|     16458|   10|
    // |   538942|     17346|   12|
    // |  C539947|     13854|    1|
    // |   540096|     13253|   16|
    // |   540530|     14755|   27|
    // |   541225|     14099|   19|
    // |   541978|     13551|    4|
    // |   542093|     17677|   16|
    // |   536596|      null|    6|
    // |   537252|      null|    1|
    // |   538041|      null|    1|
    // |   543188|     12567|   63|
    // |   543590|     17377|   19|
    // |  C543757|     13115|    1|
    // |  C544318|     12989|    1|
    // +---------+----------+-----+
    // only showing top 20 rows
    cs

     

    7.2.1 표현식 이용

    count: Method(Action), Function(Transformation) 사용 가능, 하지만 Function으로 사용하는 것을 추천

                  select에서 사용하는 것보다 agg method에서 사용하는 것이 좋음

    agg Method: 여러 집계 처리 한번에 지정 가능, 집계에 표현식 사용 가능, Transformation이 완료된 컬럼에 alias 사용 가능

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    import org.apache.spark.sql.functions.count
    // import org.apache.spark.sql.functions.count
     
    df.groupBy("InvoiceNo")
        .agg(
            count("Quantity").alias("quan"),
            expr("count(Quantity)")
        ).show()
    // +---------+----+---------------+                                                
    // |InvoiceNo|quan|count(Quantity)|
    // +---------+----+---------------+
    // |   536596|   6|              6|
    // |   536938|  14|             14|
    // |   537252|   1|              1|
    // |   537691|  20|             20|
    // |   538041|   1|              1|
    // |   538184|  26|             26|
    // |   538517|  53|             53|
    // |   538879|  19|             19|
    // |   539275|   6|              6|
    // |   539630|  12|             12|
    // |   540499|  24|             24|
    // |   540540|  22|             22|
    // |  C540850|   1|              1|
    // |   540976|  48|             48|
    // |   541432|   4|              4|
    // |   541518| 101|            101|
    // |   541783|  35|             35|
    // |   542026|   9|              9|
    // |   542375|   6|              6|
    // |  C542604|   8|              8|
    // +---------+----+---------------+
    // only showing top 20 rows
    cs

    7.2.2 맵 사용

    컬럼->키, 수행할 집계 함수의 문자열->값

    1
    2
    3
    4
    5
    6
    7
    8
    df.groupBy("InvoiceNo").agg("Quantity"->"avg""Quantity"->"stddev_pop").show()
     
    // +---------+------------------+--------------------+                             
    // |InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
    // +---------+------------------+--------------------+
    // |   536596|               1.5|  1.1180339887498947|
    // |   536938|33.142857142857146|  20.698023172885524|
    // ...
    cs

    7.3 윈도우 함수

    group-by: 전체 Row를 대상으로 그룹화함

    window: 특정 순서(Ex)시간)에 따라 Row를 "분류"함

     

    준비 데이터 생성 과정

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    /*
    * Column을 Date 형태로 변환 시작
    */
    import org.apache.spark.sql.functions.{col, to_date}
    // import org.apache.spark.sql.functions.{col, to_date}
    val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
    // dfWithDate: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 7 more fields]
    dfWithDate.show()
    // +---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
    // |InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|      date|
    // +---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
    // |   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|2010-12-01|
    // |   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01|
    // ...
    /*
    * 변환 끝
    */
    dfWithDate.createOrReplaceTempView("dfWithDate")
    // Temp View 만들기
    cs

    partitionBy(columns): 파티셔닝 스키마의 개념과 관련 X, 그룹을 어떻게 나눌 지 결정하는 것.

    1
    2
    3
    4
    5
    6
    7
    8
    import org.apache.spark.sql.expressions.Window
    // import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions.col
    // import org.apache.spark.sql.functions.col
    val windowSpec = Window.partitionBy("CustomerId""date")
                .orderBy(col("Quantity").desc)
                .rowsBetween(Window.unboundedPreceding, Window.currentRow)
    // windowSpec: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@fce6355
    cs

    추가 자료: thebook.io/006908/part02/ch05/01/03/02-02/

     

    스파크를 다루는 기술: 5.1.3.2 윈도 함수 - 2

     

    thebook.io

     

    시간대별 최대 구매 개수 구하기

    1
    2
    3
    4
    5
    6
    import org.apache.spark.sql.functions.max
    // import org.apache.spark.sql.functions.max
    val maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)
    // maxPurchaseQuantity: org.apache.spark.sql.Column
    //  = max(Quantity) OVER (PARTITION BY CustomerId, 
    //  date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
    cs

    최대 구매 수량을 가진 날짜 구하기

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    import org.apache.spark.sql.functions.{dense_rank, rank}
    // import org.apache.spark.sql.functions.{dense_rank, rank}
    val purchaseDenseRank = dense_rank().over(windowSpec)
    // purchaseDenseRank: org.apache.spark.sql.Column
    // = DENSE_RANK() OVER (PARTITION BY CustomerId, 
    // date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
    val purchaseRank = rank().over(windowSpec)
    // purchaseRank: org.apache.spark.sql.Column
    // = RANK() OVER (PARTITION BY CustomerId, 
    // date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
    cs

    결과 출력

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    import org.apache.spark.sql.functions.col
    // import org.apache.spark.sql.functions.col
     
    // Print
    dfWithDate.where("CustomerId IS NOT NULL")
        .orderBy("CustomerId")
        .select(
            col("CustomerId"), 
            col("date"), 
            col("Quantity"), 
            purchaseRank.alias("quantityRank"), 
            purchaseDenseRank.alias("quantityDenseRank"), 
            maxPurchaseQuantity.alias("maxPurchaseQuantity")
        ).show()
    // +----------+----------+--------+------------+-----------------+-------------------+
    // |CustomerId|      date|Quantity|quantityRank|quantityDenseRank|maxPurchaseQuantity|
    // +----------+----------+--------+------------+-----------------+-------------------+
    // |     12346|2011-01-18|   74215|           1|                1|              74215|
    // |     12346|2011-01-18|  -74215|           2|                2|              74215|
    // ...
    cs

     

    7.4 그룹화 셋

    여러 그룹에 걸쳐 집계할 수 있는 방법 필요-> 그룹화 셋(GROUPING SET) 사용

    여러 집계를 결합하는 저수준 기능, group-by 구문에서 원하는 형태로 집계 생성 가능

    재고 코드와 고객 별 총 수량 구하기

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    val dfNoNull = dfWithDate.drop()
    // dfNoNull: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 7 more fields]
    dfNoNull.createOrReplaceTempView("dfNoNull")
     
    spark.sql("SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull" 
    + " GROUP BY customerId, stockCode order by customerId DESC, stockCode desc").show()
    // +----------+---------+-------------+
    // |CustomerId|stockCode|sum(Quantity)|
    // +----------+---------+-------------+
    // |     18287|    85173|           48|
    // |     18287|   85040A|           48|
    // |     18287|   85039B|          120|
    // |     18287|   85039A|           96|
    // |     18287|    84920|            4|
    // |     18287|    84584|            6|
    // ...
    // +----------+---------+-------------+
    // only showing top 20 rows
     
    spark.sql("SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull"
    + " GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode))" 
    + " ORDER BY CustomerId DESC, stockCode DESC").show()
    // +----------+---------+-------------+
    // |customerId|stockCode|sum(Quantity)|
    // +----------+---------+-------------+
    // |     18287|    85173|           48|
    // |     18287|   85040A|           48|
    // |     18287|   85039B|          120|
    // |     18287|   85039A|           96|
    // |     18287|    84920|            4|
    // |     18287|    84584|            6|
    // ...
    // +----------+---------+-------------+
    // only showing top 20 rows
    cs

    SQL에서만 지원, DataFrame에서 사용 시 롤업과 큐브 사용해야 함

    고객(customerId), 재고수준(stockCode)에 관계 없이 총 수량의 합산 결과 추가 -> GROUPING SETS 구문에 집계 방식 지정

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
     spark.sql("SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull"
     + " GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode), ())"
     + " ORDER BY sum(Quantity) DESC, CustomerId DESC, stockCode DESC").show()
    // +----------+---------+-------------+
    // |customerId|stockCode|sum(Quantity)|
    // +----------+---------+-------------+
    // |      null|     null|      5176450|
    // |     13256|    84826|        12540|
    // |     17949|    22197|        11692|
    // |     16333|    84077|        10080|
    // |     16422|    17003|        10077|
    // |      null|    22355|         9178|
    // ...
    // +----------+---------+-------------+
    // only showing top 20 rows
    cs

     

    7.4.1 롤업

    rollup(columns): 그룹화할 컬럼을 설정, rollup 수행

    .agg(expr): 집계 설정

    기존 컬럼 뿐만 아니라 새로 생성한 컬럼을 통해서도 사용 가능

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    val rolledUpDF = dfNoNull
        .rollup("Date""Country")
        .agg(sum("Quantity"))
        .selectExpr("Date"
            "Country"
            "'sum(Quantity)' as total_quantity")
        .orderBy("Date")
        .show()
     
    // +----------+--------------+--------------+
    // |      Date|       Country|total_quantity|
    // +----------+--------------+--------------+
    // |      null|          null| sum(Quantity)|
    // |2010-12-01|   Netherlands| sum(Quantity)|
    // |2010-12-01|United Kingdom| sum(Quantity)|
    // ...
    // 
    // rolledUpDF: Unit = ()
     
    val rolledUpDF = dfNoNull
        .rollup("Date""Country")
        .agg(sum("Quantity"))
        .select(
            "Date"
            "Country"
            "sum(Quantity)")
        .orderBy("Date")
        .show()
    // +----------+--------------+-------------+                                       
    // |      Date|       Country|sum(Quantity)|
    // +----------+--------------+-------------+
    // |      null|          null|      5176450|
    // |2010-12-01|     Australia|          107|
    // |2010-12-01|        France|          449|
    // ...
    // 
    // rolledUpDF: Unit = ()
    cs

    rollup된 두 개의 컬럼값이 모두 null인 row: 두 컬럼에 속한 전체 레코드의 합계를 나타냄

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    rolledUpDF.where("Country IS NULL").show()
    // +----------+-------+-------------+                                              
    // |      Date|Country|sum(Quantity)|
    // +----------+-------+-------------+
    // |      null|   null|      5176450|
    // |2010-12-01|   null|        26814|
    // |2010-12-02|   null|        21023|
    // ...
     
     
    rolledUpDF.where("Date IS NULL").show()
    // +----+-------+-------------+                                                    
    // |Date|Country|sum(Quantity)|
    // +----+-------+-------------+
    // |null|   null|      5176450|
    // +----+-------+-------------+
    cs

     

    7.4.2 큐브

    롤업을 고차원적으로 사용, 모든 차원에 대해 동일한 작업 수행(Ex) 전체 기간에 대해 날짜와 국가별 결과 추출)

    세부적 예시

    - 전체 날짜와 모든 국가에 대한 합계

    - 모든 국가의 날짜별 합계

    - 날짜별 국가의 합계

    - 전체 날짜의 국가별 합계

    등 

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    dfNoNull.cube("Date""Country")
        .agg(sum(col("Quantity")))
        .select("Date""Country""sum(Quantity)")
        .orderBy("Date").show()
    // +----+--------------------+-------------+
    // |Date|             Country|sum(Quantity)|
    // +----+--------------------+-------------+
    // |null|               Japan|        25218|
    // |null|           Australia|        83653|
    // |null|            Portugal|        16180|
    // |null|United Arab Emirates|          982|
    // |null|             Germany|       117448|
    // |null|         Unspecified|         3300|
    // |null|                 RSA|          352|
    // |null|           Singapore|         5234|
    // |null|              Cyprus|         6317|
    // |null|                null|      5176450|
    // ...
    // +----+--------------------+-------------+
    // only showing top 20 rows
     
     
    dfNoNull.cube("Date""Country")
        .agg(sum(col("Quantity")))
        .select("Date""Country""sum(Quantity)")
        .orderBy("Date").where("DATE IS NOT NULL").show()
    //+----------+--------------+-------------+
    //|      Date|       Country|sum(Quantity)|
    //+----------+--------------+-------------+
    //|2010-12-01|United Kingdom|        23949|
    //|2010-12-01|        France|          449|
    //|2010-12-01|       Germany|          117|
    //|2010-12-01|        Norway|         1852|
    //|2010-12-01|     Australia|          107|
    //|2010-12-01|          EIRE|          243|
    //|2010-12-01|          null|        26814|
    //|2010-12-01|   Netherlands|           97|
    //...
    //+----------+--------------+-------------+
    //only showing top 20 rows
     
    cs

     

    7.4.3 그룹화 메타데이터

    큐브, 롤업 사용 시 집계 수준에 따라 필터링 목적으로 집계 수준 조회 경우 발생 -> grouping_id 사용

    grouping_id: 결과 데이터셋의 집계 수준을 명시하는 컬럼 제공

    그룹화 수준

    3: 가장 높은 계층의 집계 결과에서 나타남, customerId나 stockCode에 관계없이 총 수량 제공

    2: 개별 재고 코드의 모든 집계 결과에서 나타남. customerId에 관계 없이 재고 코드별 총 수량 제공

    1: 구매한 물품에 관계없이 customerId를 기반으로 총 수량 제공

    0: customerId와 stockCode 별 조합에 따라 총 수량 제공

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    import org.apache.spark.sql.functions.{grouping_id, sum, expr}
    // import org.apache.spark.sql.functions.{grouping_id, sum, expr}
     
    dfNoNull.cube("customerId""stockCode").agg(grouping_id(), sum("Quantity")).orderBy(col("grouping_id()").desc).show()
    /*
    +----------+---------+-------------+-------------+                              
    |customerId|stockCode|grouping_id()|sum(Quantity)|
    +----------+---------+-------------+-------------+
    |      null|     null|            3|      5176450|
    |      null|    22867|            2|         6292|
    |      null|   85135B|            2|           55|
    |      null|   72800B|            2|          144|
    |      null|    22277|            2|          417|
    |      null|    20685|            2|         4027|
    |      null|    21507|            2|         3308|
    |      null|    21164|            2|         1762|
    |      null|    84950|            2|         3621|
    |      null|   90013B|            2|           36|
    |      null|    21331|            2|           12|
    |      null|    22763|            2|          202|
    |      null|    21531|            2|         1507|
    |      null|   85018D|            2|           12|
    |      null|    21291|            2|          943|
    |      null|    72131|            2|          137|
    |      null|    20704|            2|          212|
    |      null|   90197B|            2|           35|
    |      null|    23614|            2|           26|
    |      null|    23485|            2|          244|
    +----------+---------+-------------+-------------+
    only showing top 20 rows
    */
     
    cs

    7.4.4 피벗

    Row -> Column 변환, 집계 함수 적용 가능

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    val pivoted = dfWithDate.groupBy("date").pivot("Country").sum()
    // pivoted: org.apache.spark.sql.DataFrame = [date: date, Australia_sum(Quantity): bigint ... 113 more fields]
     
    pivoted.printSchema()
    /*
    root
    ...
     |-- Switzerland_sum(Quantity): long (nullable = true)
     |-- Switzerland_sum(UnitPrice): double (nullable = true)
     |-- Switzerland_sum(CustomerID): long (nullable = true)
     |-- USA_sum(Quantity): long (nullable = true)
     |-- USA_sum(UnitPrice): double (nullable = true)
     |-- USA_sum(CustomerID): long (nullable = true)
    ...
    */
     
    pivoted.where("date > '2011-12-05'").select("date""USA_sum(Quantity)").show()
    /*
    +----------+-----------------+                                                  
    |      date|USA_sum(Quantity)|
    +----------+-----------------+
    |2011-12-06|             null|
    |2011-12-09|             null|
    |2011-12-08|             -196|
    |2011-12-07|             null|
    +----------+-----------------+
    */
    cs

    7.5 UDAF

    사용자가 정의하는 집계함수, UserDefinedAggregateFunction 상속 받음

    init -> update -> merge -> eval의 단계를 거침

    재정의해야할 요소들

    inputSchema: 진입 파라메터들

    bufferSchema: 중간 결과들

    dataType: 반환 데이터 타입

    deterministic: 동일한 입력값에 대한 동일한 결과 반환 여부

    initialize: inputSchema 초기화

    update: 각 노드들이 내부 버퍼를 업데이트 하는 로직

    merge: 노드 간 버퍼 병합 로직

    evaluate: 최종 반환 결과값 생성 로직

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    import org.apache.spark.sql.expressions.MutableAggregationBuffer
    import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
     
    class BoolAnd extends UserDefinedAggregateFunction{
        def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("value", BooleanType) :: Nil)
        def bufferSchema: StructType = StructType(StructField("result", BooleanType) :: Nil)
        def dataType: DataType = BooleanType
        def deterministic: Boolean = true
        def initialize(buffer: MutableAggregationBuffer): Unit = {
            buffer(0= true
        }
        def update(buffer: MutableAggregationBuffer, input:Row): Unit = {
            buffer(0= buffer.getAs[Boolean](0&& input.getAs[Boolean](0)
        }
        def merge(buffer1: MutableAggregationBuffer, buffer2:Row): Unit = {
            buffer1(0= buffer1.getAs[Boolean](0&& buffer2.getAs[Boolean](0)
        }
        def evaluate(buffer: Row): Any = {
            buffer(0)
        }
    }
    // Defined class BoolAnd
     
    val ba = new BoolAnd
    // ba: BoolAnd = BoolAnd@6fbf27ef
     
    spark.udf.register("booland", ba)
    // res15: org.apache.spark.sql.expressions.UserDefinedAggregateFunction = BoolAnd@6fbf27ef
     
    import org.apache.spark.sql.functions._
    // import org.apache.spark.sql.functions._
     
    spark
        .range(1)
        .selectExpr("explode(array(TRUE, TRUE, TRUE)) as t")
        .selectExpr("explode(array(TRUE, FALSE, TRUE)) as f""t")
        .select(ba(col("t")), expr("booland(f)"))
        .show()
    /*
    +----------+----------+
    |booland(t)|booland(f)|
    +----------+----------+
    |      true|     false|
    +----------+----------+
    */
    cs

    * 스칼라, 자바에서만 사용 가능

    댓글

Designed by Tistory.