ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [스파크 완벽 가이드] Chapter 6 - 다양한 데이터 타입 다루기 (3)
    Dev/Spark 2020. 10. 25. 18:51

    *해당 포스트는 "스파크 완벽 가이드" 책을 읽고 난 이후의 정리 내용입니다.

      자세한 내용은 "스파크 완벽 가이드" 책을 통해 확인해주세요.

    * 내용이 너무 길어 나눠 게시합니다.

     

    (1) povia.tistory.com/38

     

    [스파크 완벽 가이드] Chapter 6 - 다양한 데이터 타입 다루기 (1)

    *해당 포스트는 "스파크 완벽 가이드" 책을 읽고 난 이후의 정리 내용입니다. 자세한 내용은 "스파크 완벽 가이드" 책을 통해 확인해주세요. * 내용이 너무 길어 나눠 게시합니다. 표현식 생성 방

    povia.tistory.com

    (2) povia.tistory.com/41

     

    [스파크 완벽 가이드] Chapter 6 - 다양한 데이터 타입 다루기 (2)

    *해당 포스트는 "스파크 완벽 가이드" 책을 읽고 난 이후의 정리 내용입니다. 자세한 내용은 "스파크 완벽 가이드" 책을 통해 확인해주세요. * 내용이 너무 길어 나눠 게시합니다. (1) povia.tistory.com

    povia.tistory.com

     

    6.9 복합 데이터 타입

    struct, array, map 등

     

    6.9.1 Struct

    Struct: Dataframe 내부의 Dataframe

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    df.selectExpr("(Description, InvoiceNo) as complex""*").show(2)
    // +--------------------+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
    // |             complex|InvoiceNo|StockCode|        Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
    // +--------------------+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
    // |[RABBIT NIGHT LIG...|   580538|    23084| RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
    // |[DOUGHNUT LIP GLO...|   580538|    23077|DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
    // +--------------------+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
    // only showing top 2 rows
     
    df.selectExpr("struct(Description, InvoiceNo) as complex""*").show(2)
    // +--------------------+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
    // |             complex|InvoiceNo|StockCode|        Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
    // +--------------------+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
    // |[RABBIT NIGHT LIG...|   580538|    23084| RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
    // |[DOUGHNUT LIP GLO...|   580538|    23077|DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
    // +--------------------+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
    // only showing top 2 rows
     
    import org.apache.spark.sql.functions.struct
    // import org.apache.spark.sql.functions.struct
     
    val complexDF = df.select(struct("Description""InvoiceNo").alias("complex"))
    // complexDF: org.apache.spark.sql.DataFrame = [complex: struct<Description: string, InvoiceNo: string>]
     
    complexDF.createOrReplaceTempView("complexDF")
    cs

    Select(조회) 방법: "."을 사용, getField("컬럼명") Method 사용

    Struct컬럼.*으로 조회 시 Struct 컬럼 내부의 모든 내용을 Column으로 펼친 상태로 반환함

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    complexDF.select("complex.Description").show(2)
    // +-------------------+
    // |        Description|
    // +-------------------+
    // | RABBIT NIGHT LIGHT|
    // |DOUGHNUT LIP GLOSS |
    // +-------------------+
    // only showing top 2 rows
     
    complexDF.select(col("complex").getField("Description")).show(2)
    // +-------------------+
    // |complex.Description|
    // +-------------------+
    // | RABBIT NIGHT LIGHT|
    // |DOUGHNUT LIP GLOSS |
    // +-------------------+
    // only showing top 2 rows
     
    complexDF.select("complex.*").show(2)
    // +-------------------+---------+
    // |        Description|InvoiceNo|
    // +-------------------+---------+
    // | RABBIT NIGHT LIGHT|   580538|
    // |DOUGHNUT LIP GLOSS |   580538|
    // +-------------------+---------+
    // only showing top 2 rows
    cs

     

    6.9.2 Array

    예) 하나의 Document가 있고 그 Document에 카테고리가 따로 존재하는 경우 사용

    먼저 Array 컬럼 생성하는 것부터 진행

    split(tgt, delimiter): 하나의 문자열(tgt)을 여러 문자의 배열로 변환, 변환 시에는 delimiter 사용, Column에도 적용 가능

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    import org.apache.spark.sql.functions.split
    // import org.apache.spark.sql.functions.split
     
    df.select(split(col("Description"), " ")).show(2)
    // +---------------------+
    // |split(Description,  )|
    // +---------------------+
    // | [RABBIT, NIGHT, L...|
    // | [DOUGHNUT, LIP, G...|
    // +---------------------+
    // only showing top 2 rows
     
    df.select(split(col("Description"), " ").alias("array_col")).selectExpr("array_col[0]").show(2)
    // +------------+
    // |array_col[0]|
    // +------------+
    // |      RABBIT|
    // |    DOUGHNUT|
    // +------------+
    // only showing top 2 rows
    cs

    size(arr): arr 배열의 길이를 구하는 함수, Array 또는 Map Column에서만 사용 가능

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    df.select(size(split(col("Description"), " "))).show(2)
    // +---------------------------+
    // |size(split(Description,  ))|
    // +---------------------------+
    // |                          3|
    // |                          4|
    // +---------------------------+
    // only showing top 2 rows
     
    df.select(size(col("Description"))).show(2)
    // org.apache.spark.sql.AnalysisException: cannot resolve 'size(`Description`)' due to data type mismatch: argument 1 requires (array or map) type, however, '`Description`' is of string type.;;
    // 'Project [size(Description#12) AS size(Description)#879]
    // +- Relation[InvoiceNo#10,StockCode#11,Description#12,Quantity#13,InvoiceDate#14,UnitPrice#15,CustomerID#16,Country#17] csv
     
    cs

    array_contains(arr, tgt): arr 배열에 tgt이 존재하는지 확인

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    import org.apache.spark.sql.functions.array_contains
    // import org.apache.spark.sql.functions.array_contains
     
    df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2)
    // +--------------------------------------------+
    // |array_contains(split(Description,  ), WHITE)|
    // +--------------------------------------------+
    // |                                        true|
    // |                                        true|
    // +--------------------------------------------+
    // only showing top 2 rows
     
    cs

    explode(arr): arr의 내부 데이터를 모두 분리해 하나의 row에 매핑함(나머지 컬럼은 계속 중복처리함)

    Explode 수행 결과(Databricks 출처)

    (이미지 출처: www.slideshare.net/databricks/an-introduction-to-higher-order-functions-in-spark-sql-with-herman-van-hovell)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    import org.apache.spark.sql.functions.{split, explode}
    // import org.apache.spark.sql.functions.{split, explode}
     
    df.withColumn("splitted", split(col("Description"), " "))
        .withColumn("exploded", explode(col("splitted")))
        .select("Description""InvoiceNo""exploded").show(6)
    // +--------------------+---------+--------+
    // |         Description|InvoiceNo|exploded|
    // +--------------------+---------+--------+
    // |WHITE HANGING HEA...|   536365|   WHITE|
    // |WHITE HANGING HEA...|   536365| HANGING|
    // |WHITE HANGING HEA...|   536365|   HEART|
    // |WHITE HANGING HEA...|   536365| T-LIGHT|
    // |WHITE HANGING HEA...|   536365|  HOLDER|
    // | WHITE METAL LANTERN|   536365|   WHITE|
    // +--------------------+---------+--------+
    // only showing top 6 rows
    cs

     

    6.9.3 Map

    Key-Value의 데이터 타입.

    map(key, value): 두 컬럼(key, value)을 key 컬럼(key)과 value 컬럼(value)로 합치는 함수

    selectExpr에서 Key를 통해 조회 가능(없으면 Null)

    explode를 통해 Key/Value 컬럼으로 변환할 수 있음

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    import org.apache.spark.sql.functions.map
    // import org.apache.spark.sql.functions.map
     
    df.select(map(col("Description"), col("InvoiceNo"))
            .alias("complex_map")).show(2)
    // +--------------------+
    // |         complex_map|
    // +--------------------+
    // |[WHITE HANGING HE...|
    // |[WHITE METAL LANT...|
    // +--------------------+
    // only showing top 2 rows
     
    df.select(map(col("InvoiceNo"),col("Description"))
            .alias("complex_map")).show(2)
    // +--------------------+
    // |         complex_map|
    // +--------------------+
    // |[536365 -> WHITE ...|
    // |[536365 -> WHITE ...|
    // +--------------------+
    // only showing top 2 rows
     
    df.select(map(col("Description"), col("InvoiceNo"))
            .alias("complex_map"))
        .selectExpr("complex_map['WHITE METAL LANTERN']")
        .show(2)
    // +--------------------------------+
    // |complex_map[WHITE METAL LANTERN]|
    // +--------------------------------+
    // |                            null|
    // |                          536365|
    // +--------------------------------+
    // only showing top 2 rows
     
    df.select(map(col("Description"), col("InvoiceNo"))
            .alias("complex_map"))
        .selectExpr("explode(complex_map)").show(2)
    // +--------------------+------+
    // |                 key| value|
    // +--------------------+------+
    // |WHITE HANGING HEA...|536365|
    // | WHITE METAL LANTERN|536365|
    // +--------------------+------+
    // only showing top 2 rows
    cs

    6.10 JSON

    Spark에서 JSON Column을 다루는 방법

    JSON Column 생성&조회

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    val jsonDF = spark.range(1).selectExpr("""
    '{"myJSONKey" : {"myJSONValue" : [1,2,3]}}' as jsonString""")
    // jsonDF: org.apache.spark.sql.DataFrame = [jsonString: string]
     
    import org.apache.spark.sql.functions.{get_json_object, json_tuple}
    // import org.apache.spark.sql.functions.{get_json_object, json_tuple}
     
    jsonDF.select(get_json_object(col("jsonString"),
            "$.myJSONKey.myJSONValue[1]") as "column",
            json_tuple(col("jsonString"), "myJSONKey")).show(2)
    // +------+--------------------+
    // |column|                  c0|
    // +------+--------------------+
    // |     2|{"myJSONValue":[1...|
    // +------+--------------------+
    cs

    to_json("Struct Column"): Struct Column을 json 문자열로 변환

    from_json("json Column", Schema): Schema Type에 맞춰 JSON Column을 변환

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    import org.apache.spark.sql.functions.to_json
    // import org.apache.spark.sql.functions.to_json
     
    df.selectExpr("(InvoiceNo, Description) as myStruct")
        .select(to_json(col("myStruct"))).show(2)
    // +-----------------------+
    // |structstojson(myStruct)|
    // +-----------------------+
    // |   {"InvoiceNo":"536...|
    // |   {"InvoiceNo":"536...|
    // +-----------------------+
    // only showing top 2 rows
     
    import org.apache.spark.sql.functions.from_json
    // import org.apache.spark.sql.functions.from_json
     
    import org.apache.spark.sql.types._
    // import org.apache.spark.sql.types._
     
    val parseSchema = new StructType(Array(
        new StructField("InvoiceNo", StringType, true),
        new StructField("Description", StringType, true)))
    // parseSchema: org.apache.spark.sql.types.StructType = StructType(StructField(InvoiceNo,StringType,true), StructField(Description,StringType,true))
     
    df.selectExpr("(InvoiceNo, Description) as myStruct")
        .select(to_json(col("myStruct")).alias("newJSON"))
        .select(from_json(col("newJSON"),parseSchema),col("newJSON"))
        .show(2)
    // +----------------------+--------------------+
    // |jsontostructs(newJSON)|             newJSON|
    // +----------------------+--------------------+
    // |  [536365, WHITE HA...|{"InvoiceNo":"536...|
    // |  [536365, WHITE ME...|{"InvoiceNo":"536...|
    // +----------------------+--------------------+
    // only showing top 2 rows
    cs

    6.11 UDF(User Defined Function)

    사용자가 원하는 형태로 트렌스포메이션을 만들 수 있게 해주는 함수

    하나 이상의 Column을 입력으로 받아 변환

    *주의: UDF를 사용할 경우 Spark의 함수를 사용하는 것이 아닌 java, scala, python 등의 method, function들을 사용하는 것이기 때문에 성능 저하가 발생할 수 있다.

    만들어진 UDF는 모든 Worker Node로 전달되서 사용 가능한 상태가 됨(Register)

    UDF 동작 원리(출처: adatis)

    (이미지 출처: adatis.co.uk/databricks-udf-performance-comparisons/)

    SparkSession.udf.register("함수명", function): 개발한 UDF를 SparkSession에 등록해 Dataframe은 물론 SQL, 타 프로그래밍 언어에서 사용 가능하게 등록하는 역할

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    val udfExampleDF = spark.range(5).toDF("num")
    // udfExampleDF: org.apache.spark.sql.DataFrame = [num: bigint]
     
    def power3(number:Double):Double = number*number*number
    // power3: (number: Double)Double
     
    power3(2.0)
    // res75: Double = 8.0
     
    import org.apache.spark.sql.functions.udf
    // import org.apache.spark.sql.functions.udf
     
    val power3udf = udf(power3(_:Double):Double)
    // power3udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(DoubleType)))
     
    udfExampleDF.select(power3udf(col("num"))).show()
    // +--------+
    // |UDF(num)|
    // +--------+
    // |     0.0|
    // |     1.0|
    // |     8.0|
    // |    27.0|
    // |    64.0|
    // +--------+
     
    spark.udf.register("power3", power3(_:Double):Double)
    // res77: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(DoubleType)))
     
    udfExampleDF.selectExpr("power3(num)").show(2)
    // +-------------------------------+
    // |UDF:power3(cast(num as double))|
    // +-------------------------------+
    // |                            0.0|
    // |                            1.0|
    // +-------------------------------+
    // only showing top 2 rows
    cs

     

    6.12 Hive UDF

    SparkSession.builder().enableHiveSupport() 명시 후 사용

    라이브러리 의존성 명시해야 함

    댓글

Designed by Tistory.