ABOUT ME

Today
Yesterday
Total
  • [스파크 완벽 가이드] Chapter 6 - 다양한 데이터 타입 다루기 (1)
    Dev/Spark 2020. 10. 16. 00:21

    *해당 포스트는 "스파크 완벽 가이드" 책을 읽고 난 이후의 정리 내용입니다.

      자세한 내용은 "스파크 완벽 가이드" 책을 통해 확인해주세요.

    * 내용이 너무 길어 나눠 게시합니다.

     

    표현식 생성 방법, 데이터 처리 방법에 대해 익힘

    Booleans
    Numbers
    Strings
    Date, Timestamp
    Null
    Complex Types(Array, Map 등)
    사용자 정의 함수(User Defined Function, UDF)


    6.1 API 찾기
    Spark -> 업데이트가 주기적으로 빨리 됨(현재 3.01)

    주로 봐야할 부분

    Dataset, Dataframe 메서드(DataFrameStatFunctions, DataFrameNaFunctions 등)

    Column

     

    🗨참고용 Docs 페이지들

    Spark Scala Docs

    spark.apache.org/docs/latest/api/scala/org/apache/spark/index.html

     

    Spark 3.0.1 ScalaDoc - org.apache.spark

    package spark Type Members  case class Aggregator[K, V, C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C) extends Product with Serializable  class BarrierTaskContext extends TaskContext with Logging  class Barri

    spark.apache.org

    Spark Quick Guide

    spark.apache.org/docs/latest/quick-start.html

     

    Quick Start - Spark 3.0.1 Documentation

    Quick Start This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark’s interactive shell (in Python or Scala), then show how to write applications in Java, Scala, and Python. To follow along with this guid

    spark.apache.org

    Spark 3.0.1 ScalaDoc - org.apache.spark.sql.Dataset

    spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html

     

    Spark 3.0.1 ScalaDoc - org.apache.spark.sql.Dataset

    class Dataset[T] extends Serializable Instance Constructors  new Dataset(sqlContext: SQLContext, logicalPlan: LogicalPlan, encoder: Encoder[T])  new Dataset(sparkSession: SparkSession, logicalPlan: LogicalPlan, encoder: Encoder[T]) Value Members 

    spark.apache.org

    DataFrame 생성 방법

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    val df = spark.read.format("csv")
            .option("header""true")
            .option("inferSchema""true")
            .load("/Spark-The-Definitive-Guide/data/retail-data/by-day/2010-12-01.csv")
    // df: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]
     
    df.printSchema()
    // root
    //  |-- InvoiceNo: string (nullable = true)
    //  |-- StockCode: string (nullable = true)
    //  |-- Description: string (nullable = true)
    //  |-- Quantity: integer (nullable = true)
    //  |-- InvoiceDate: timestamp (nullable = true)
    //  |-- UnitPrice: double (nullable = true)
    //  |-- CustomerID: double (nullable = true)
    //  |-- Country: string (nullable = true)
     
    df.createOrReplaceTempView("dfTable")
    cs

    Option 설명

    header: csv 파일에 Header 부분이 존재하는지

    inferSchema: 파일을 읽을 때 데이터 타입을 자동으로 추론할 것인지

     

    6.2 스파크 데이터 타입으로 변환하기

    lit: 다른 언어의 데이터 타입 -> 스파크의 데이터 타입 변환

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    import org.apache.spark.sql.functions.lit
    // import org.apache.spark.sql.functions.lit
     
    df.select(lit(5), lit("five"), lit(5.0))
    // res2: org.apache.spark.sql.DataFrame = [5: int, five: string ... 1 more field]
     
    df.select(lit(5), lit("five"), lit(5.0)).show()
    // +---+----+---+
    // |  5|five|5.0|
    // +---+----+---+
    // |  5|five|5.0|
    // |  5|five|5.0|
    // ...
    // only showing top 20 rows
    cs

     

    6.3 불리언 데이터 타입

    모든 필터링 작업의 기반-> 데이터 분석에 필수적

    and, or, true, false로 구성됨

    논리 구문의 기반이 됨 -> true, false로 데이터 로우를 필터링함

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    import org.apache.spark.sql.functions.col
    // import org.apache.spark.sql.functions.col
     
    df.where(
            col("InvoiceNo").equalTo(536365))
        .select("InvoiceNo""Description")
        .show(5false)
    // +---------+-----------------------------------+
    // |InvoiceNo|Description                        |
    // +---------+-----------------------------------+
    // |536365   |WHITE HANGING HEART T-LIGHT HOLDER |
    // |536365   |WHITE METAL LANTERN                |
    // |536365   |CREAM CUPID HEARTS COAT HANGER     |
    // |536365   |KNITTED UNION FLAG HOT WATER BOTTLE|
    // |536365   |RED WOOLLY HOTTIE WHITE HEART.     |
    // +---------+-----------------------------------+
    // only showing top 5 rows
     
    df.where(
            col("InvoiceNo")===536365)
        .select("InvoiceNo""Description")
        .show(5,false)
    // +---------+-----------------------------------+
    // |InvoiceNo|Description                        |
    // +---------+-----------------------------------+
    // |536365   |WHITE HANGING HEART T-LIGHT HOLDER |
    // |536365   |WHITE METAL LANTERN                |
    // |536365   |CREAM CUPID HEARTS COAT HANGER     |
    // |536365   |KNITTED UNION FLAG HOT WATER BOTTLE|
    // |536365   |RED WOOLLY HOTTIE WHITE HEART.     |
    // +---------+-----------------------------------+
    // only showing top 5 rows
     
    cs

    * JavaScript나 Scala에서의 '==='의 의미

       JavaScript나 Scala나 모두 소스 코드 상에는 val, var 등으로 변수를 표현하므로 실제 데이터 타입을 명시하지 않아도 됨

       그렇기에 일반적으로 동등의 의미를 담는 '=='의 경우 내부적 데이터 타입에 상관 없이 실제 두 값이 같은지만 비교하게 됨

       실제 데이터 타입까지 동일한지를 판단하기 위해서는 '==='를 사용함

       Scala에서는 불일치를 나타낼 때에는 =!=를 사용

       (not, equalTo도 사용)

     

    다르게 표현하는 방법

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    df.where("InvoiceNo = 536365").show(5,false)
    // +---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
    // |InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |
    // +---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
    // |536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |2010-12-01 08:26:00|2.55     |17850.0   |United Kingdom|
    // |536365   |71053    |WHITE METAL LANTERN                |6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
    // |536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |2010-12-01 08:26:00|2.75     |17850.0   |United Kingdom|
    // |536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
    // |536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |2010-12-01 08:26:00|3.39     |17850.0   |United Kingdom|
    // +---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
    // only showing top 5 rows
     
     
    df.where("InvoiceNo <> 536365").show(5false)
    // +---------+---------+-----------------------------+--------+-------------------+---------+----------+--------------+
    // |InvoiceNo|StockCode|Description                  |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |
    // +---------+---------+-----------------------------+--------+-------------------+---------+----------+--------------+
    // |536366   |22633    |HAND WARMER UNION JACK       |6       |2010-12-01 08:28:00|1.85     |17850.0   |United Kingdom|
    // |536366   |22632    |HAND WARMER RED POLKA DOT    |6       |2010-12-01 08:28:00|1.85     |17850.0   |United Kingdom|
    // |536367   |84879    |ASSORTED COLOUR BIRD ORNAMENT|32      |2010-12-01 08:34:00|1.69     |13047.0   |United Kingdom|
    // |536367   |22745    |POPPY'S PLAYHOUSE BEDROOM    |6       |2010-12-01 08:34:00|2.1      |13047.0   |United Kingdom|
    // |536367   |22748    |POPPY'S PLAYHOUSE KITCHEN    |6       |2010-12-01 08:34:00|2.1      |13047.0   |United Kingdom|
    // +---------+---------+-----------------------------+--------+-------------------+---------+----------+--------------+
    // only showing top 5 rows
    cs

    불리언은 SQL에서의 Where Cond와 같이 and, or 조건 사용 가능

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
     
    val priceFilter = col("UnitPrice")>600
    // priceFilter: org.apache.spark.sql.Column = (UnitPrice > 600)
     
    val descripFilter = col("Description").contains("POSTAGE")
    // descripFilter: org.apache.spark.sql.Column = contains(Description, POSTAGE)
     
    df.where(col("StockCode").isin("DOT")).where(priceFilter.or(descripFilter)).show()
    // +---------+---------+--------------+--------+-------------------+---------+----------+--------------+
    // |InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
    // +---------+---------+--------------+--------+-------------------+---------+----------+--------------+
    // |   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      null|United Kingdom|
    // |   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      null|United Kingdom|
    // +---------+---------+--------------+--------+-------------------+---------+----------+--------------+
    cs

    Boolean Expression은 필터링 조건(Where)에만 사용가능 한 것이 아니라, DataFrame을 필터링할 수도 있음

    (But, 유지보수를 위해서는 그냥 Where 조건에 사용하는게 좋지 않을까...?)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    val DOTCodeFilter = col("StockCode")==="DOT"
    // DOTCodeFilter: org.apache.spark.sql.Column = (StockCode = DOT)
     
    df.withColumn("isExpensive", DOTCodeFilter.and(priceFilter.or(descripFilter)))
        .where("isExpensive")
        .select("unitPrice""isExpensive")
        .show(5)
    // +---------+-----------+
    // |unitPrice|isExpensive|
    // +---------+-----------+
    // |   569.77|       true|
    // |   607.49|       true|
    // +---------+-----------+
    cs

    필터를 반드시 표현식으로 정의할 필요 X -> 컬럼명을 사용해 필터를 정의할 수 있음

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import org.apache.spark.sql.functions.{expr, not, col}
    // import org.apache.spark.sql.functions.{expr, not, col}
     
    df.withColumn("isExpensive", not(col("UnitPrice").leq(250)))
        .filter("isExpensive")
        .select("Description""UnitPrice")
        .show(5)
    // +--------------+---------+
    // |   Description|UnitPrice|
    // +--------------+---------+
    // |DOTCOM POSTAGE|   569.77|
    // |DOTCOM POSTAGE|   607.49|
    // +--------------+---------+
    cs

    * Filter나 Where나 결국 Spark에서는 동일한 Method입니다.

    stackoverflow.com/questions/33885979/difference-between-filter-and-where-in-scala-spark-sql

     

    Difference between filter and where in scala spark sql

    I've tried both but it works same example val items = List(1, 2, 3) using filter employees.filter($"emp_id".isin(items:_*)).show using where employees.where($"emp_id".isin(items:_*)).show

    stackoverflow.com

    Null 처리 방법 : eqNullSafe등의 함수 사용

    1
    2
    3
    4
    5
    6
    df.where(col("Description").eqNullSafe("hello")).show()
     
    // +---------+---------+-----------+--------+-----------+---------+----------+-------+
    // |InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
    // +---------+---------+-----------+--------+-----------+---------+----------+-------+
    // +---------+---------+-----------+--------+-----------+---------+----------+-------+
    cs

    Spark 2.3부터는 IS [NOT] DISTINCT FROM 추가됨

    issues.apache.org/jira/browse/SPARK-20463

     

    [SPARK-20463] Add support for IS [NOT] DISTINCT FROM to SPARK SQL - ASF JIRA

    Add support for the SQL standard distinct predicate to SPARK SQL. IS [NOT] DISTINCT FROM data = [(10, 20), (30, 30), (40, None), (None, None)] df = sc.parallelize(data).toDF(["c1", "c2"]) df.createTempView("df") spark.sql("select c1, c2 from df where c1 is

    issues.apache.org

    * 6.4 부터는 다음 포스트에서 게시합니다.

    댓글

Designed by Tistory.