ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [스파크 완벽 가이드] Chapter 6 - 다양한 데이터 타입 다루기 (2)
    Dev/Spark 2020. 10. 18. 14:36

    *해당 포스트는 "스파크 완벽 가이드" 책을 읽고 난 이후의 정리 내용입니다.

      자세한 내용은 "스파크 완벽 가이드" 책을 통해 확인해주세요.

    * 내용이 너무 길어 나눠 게시합니다.

     

    (1) povia.tistory.com/38

     

    [스파크 완벽 가이드] Chapter 6 - 다양한 데이터 타입 다루기 (1)

    *해당 포스트는 "스파크 완벽 가이드" 책을 읽고 난 이후의 정리 내용입니다. 자세한 내용은 "스파크 완벽 가이드" 책을 통해 확인해주세요. * 내용이 너무 길어 나눠 게시합니다. 표현식 생성 방�

    povia.tistory.com

    6.4 수치형 데이터 타입 다루기

    count: 가장 기본적으로, 많이 다루는 작업

    pow: 제곱

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    import org.apache.spark.sql.functions.{expr, pow}
    // import org.apache.spark.sql.functions.{expr, pow}
     
    val fabricatedQuantity = pow(col("Quantity"* col("UnitPrice"), 2)+5
    // fabricatedQuantity: org.apache.spark.sql.Column = (POWER((Quantity * UnitPrice), 2.0) + 5)
     
    df.select(expr("CustomerId"), 
            fabricatedQuantity.alias("realQuantity"), 
            expr("Quantity"))
        .show(2)
    // +----------+------------------+--------+
    // |CustomerId|      realQuantity|Quantity|
    // +----------+------------------+--------+
    // |   17850.0|239.08999999999997|       6|
    // |   17850.0|          418.7156|       6|
    // +----------+------------------+--------+
    // only showing top 2 rows
     
    df.selectExpr("CustomerId"
                "(POWER((Quantity * UnitPrice), 2.0) +5) as realQuantity"
                "Quantity")
        .show(2)
    // +----------+------------------+--------+
    // |CustomerId|      realQuantity|Quantity|
    // +----------+------------------+--------+
    // |   17850.0|239.08999999999997|       6|
    // |   17850.0|          418.7156|       6|
    // +----------+------------------+--------+
    // only showing top 2 rows
    cs

    반올림: round, 내림: bround

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    import org.apache.spark.sql.functions.{round, bround}
    // import org.apache.spark.sql.functions.{round, bround}
     
    df.select(round(col("UnitPrice"), 1).alias("rounded"), 
            col("UnitPrice"))
        .show(5)
    // +-------+---------+
    // |rounded|UnitPrice|
    // +-------+---------+
    // |    2.6|     2.55|
    // |    3.4|     3.39|
    // |    2.8|     2.75|
    // |    3.4|     3.39|
    // |    3.4|     3.39|
    // +-------+---------+
    // only showing top 5 rows
     
    import org.apache.spark.sql.functions.{round, bround, lit}
    // import org.apache.spark.sql.functions.{round, bround, lit}
     
    df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)
    // +-------------+--------------+
    // |round(2.5, 0)|bround(2.5, 0)|
    // +-------------+--------------+
    // |          3.0|           2.0|
    // |          3.0|           2.0|
    // +-------------+--------------+
    // only showing top 2 rows
    cs

    상관관계 계산: corr

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    import org.apache.spark.sql.functions.{corr}
    // import org.apache.spark.sql.functions.corr
     
    df.stat.corr("Quantity""UnitPrice")
    // res18: Double = -0.04112314436835551
     
    df.select(corr("Quantity""UnitPrice")).show()
    // +-------------------------+
    // |corr(Quantity, UnitPrice)|
    // +-------------------------+
    // |     -0.04112314436835551|
    // +-------------------------+
     
    df.select(corr("Quantity""UnitPrice")
                .alias("상관관계 계산"))
        .show()
    // +--------------------+
    // |       상관관계 계산   |
    // +--------------------+
    // |-0.04112314436835551|
    // +--------------------+
    cs

    count, mean, stddev, min, max 종합 계산: describe

    (콘솔 확인용으로만 사용할 것)

    정확한 수치가 필요할 시: 직접 함수를 사용해 컬럼에 적용

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    df.describe().show()
    // +-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
    // |summary|        InvoiceNo|         StockCode|         Description|          Quantity|         UnitPrice|        CustomerID|       Country|
    // +-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
    // |  count|             3108|              3108|                3098|              3108|              3108|              1968|          3108|
    // |   mean| 536516.684944841|27834.304044117645|                null| 8.627413127413128| 4.151946589446603|15661.388719512195|          null|
    // | stddev|72.89447869788873|17407.897548583845|                null|26.371821677029203|15.638659854603892|1854.4496996893627|          null|
    // |    min|           536365|             10002| 4 PURPLE FLOCK D...|               -24|               0.0|           12431.0|     Australia|
    // |    max|          C536548|              POST|ZINC WILLIE WINKI...|               600|            607.49|           18229.0|United Kingdom|
    // +-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
     
    import org.apache.spark.sql.functions.{count, mean, stddev_pop, min, max}
    // import org.apache.spark.sql.functions.{count, mean, stddev_pop, min, max}
    cs

     

    StatFunctions: 다양한 통계 함수 제공(Ex) approxQuantile, crosstab, monotonically_increasing_id)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    val colName = "UnitPrice"
    // colName: String = UnitPrice
     
    val quantileProbs = Array(0.5)
    // quantileProbs: Array[Double] = Array(0.5)
     
    val relError = 0.05
    // relError: Double = 0.05
     
    df.stat.approxQuantile("UnitPrice", quantileProbs, relError)
    // res8: Array[Double] = Array(2.51)
     
    df.stat.crosstab("StockCode""Quantity").show(2)
    // 20/10/21 19:09:41 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
    // +------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
    // |StockCode_Quantity| -1|-10|-12| -2|-24| -3| -4| -5| -6| -7|  1| 10|100| 11| 12|120|128| 13| 14|144| 15| 16| 17| 18| 19|192|  2| 20|200| 21|216| 22| 23| 24| 25|252| 27| 28|288|  3| 30| 32| 33| 34| 36|384|  4| 40|432| 47| 48|480|  5| 50| 56|  6| 60|600| 64|  7| 70| 72|  8| 80|  9| 96|
    // +------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
    // |             22578|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
    // |             21327|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  2|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
    // +------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
    // only showing top 2 rows
     
    df.stat.freqItems(Seq("StockCode""Quantity")).show()
    // +--------------------+--------------------+
    // | StockCode_freqItems|  Quantity_freqItems|
    // +--------------------+--------------------+
    // |[90214E, 20728, 2...|[200, 128, 23, 32...|
    // +--------------------+--------------------+
     
    import org.apache.spark.sql.functions.monotonically_increasing_id
    // import org.apache.spark.sql.functions.monotonically_increasing_id
     
    df.select(monotonically_increasing_id()).show(2)
    // +-----------------------------+
    // |monotonically_increasing_id()|
    // +-----------------------------+
    // |                            0|
    // |                            1|
    // +-----------------------------+
    // only showing top 2 rows
    cs

     

    6.5 문자열 데이터 타입

    initcap :각 단어 str 의 첫 번째 문자를 대문자로 반환(나머지 문자는 모두 소문자로)

    lower: 모든 문자를 소문자로

    upper: 모든 문자를 대문자로

    ltrim: 왼쪽 공백을 모두 삭제

    rtrim: 오른쪽 공백을 모두 삭제

    trim: 좌우 공백을 모두 삭제

    lpad: 좌측에 특정 문자 삽입(삽입 숫자가 대상 문자열의 길이보다 짧은 경우 대상 문자열을 삽입 숫자 길이로 출력)

    rpad: 우측에 특정 문자 삽입(삽입 숫자가 대상 문자열의 길이보다 짧은 경우 대상 문자열을 삽입 숫자 길이로 출력)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    import org.apache.spark.sql.functions.{initcap}
    // import org.apache.spark.sql.functions.initcap
     
    df.select(initcap(col("Description"))).show(2false)
    // +----------------------------------+
    // |initcap(Description)              |
    // +----------------------------------+
    // |White Hanging Heart T-light Holder|
    // |White Metal Lantern               |
    // +----------------------------------+
    // only showing top 2 rows
     
    import org.apache.spark.sql.functions.{lower, upper}
    // import org.apache.spark.sql.functions.{lower, upper}
     
    df.select(col("Description"), 
            lower(col("Description")), 
            upper(lower(col("Description"))))
        .show(2)
    // +--------------------+--------------------+-------------------------+
    // |         Description|  lower(Description)|upper(lower(Description))|
    // +--------------------+--------------------+-------------------------+
    // |WHITE HANGING HEA...|white hanging hea...|     WHITE HANGING HEA...|
    // | WHITE METAL LANTERN| white metal lantern|      WHITE METAL LANTERN|
    // +--------------------+--------------------+-------------------------+
    // only showing top 2 rows
     
    import org.apache.spark.sql.functions.{lit, ltrim, rtrim, rpad, lpad, trim}
    // import org.apache.spark.sql.functions.{lit, ltrim, rtrim, rpad, lpad, trim}
     
    df.select(ltrim(lit("     HELLO     ")).as("ltrim"),
            rtrim(lit("     HELLO     ")).as("rtrim"),
            trim(lit("     HELLO     ")).as("trim"),
            lpad(lit("HELLO"), 7"#").as("lp"),
            rpad(lit("HELLO"), 10"#").as("rp")).show(2)
    // +----------+----------+-----+-------+----------+
    // |     ltrim|     rtrim| trim|     lp|        rp|
    // +----------+----------+-----+-------+----------+
    // |HELLO     |     HELLO|HELLO|##HELLO|HELLO#####|
    // |HELLO     |     HELLO|HELLO|##HELLO|HELLO#####|
    // +----------+----------+-----+-------+----------+
    // only showing top 2 rows
    cs

     

    6.5.1 정규 표현식 및 문자 치환

    문자열의 존재/일치/포함 여부를 확인하기 위함

    Java 정규 표현식 문법 사용

    regexp_extract: 정규 표현식에 맞는 값 추출, regexp_extract(str, regexp[, idx])

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    df.select(regexp_extract(lit("100-200"), "(\\d+)-(\\d+)"1)).show(1)
    // +---------------------------------------+
    // |regexp_extract(100-200, (\d+)-(\d+), 1)|
    // +---------------------------------------+
    // |                                    100|
    // +---------------------------------------+
    // only showing top 1 row
     
    df.select(regexp_extract(lit("100-200"), "(\\d+)-(\\d+)"2)).show(1)
    // +---------------------------------------+
    // |regexp_extract(100-200, (\d+)-(\d+), 2)|
    // +---------------------------------------+
    // |                                    200|
    // +---------------------------------------+
    // only showing top 1 row
     
    df.select(regexp_extract(lit("100-200"), "(\\d+)-(\\d+)"0)).show(1)
    // +---------------------------------------+
    // |regexp_extract(100-200, (\d+)-(\d+), 0)|
    // +---------------------------------------+
    // |                                100-200|
    // +---------------------------------------+
    // only showing top 1 row
    cs

    regexp_replace: 정규 표현식에 맞는 값 치환, regexp_replace(str, regexp, rep)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import org.apache.spark.sql.functions.regexp_replace
    // import org.apache.spark.sql.functions.regexp_replace
     
    val simpleColors = Seq("black""white""red""green""blue")
    // simpleColors: Seq[String] = List(black, white, red, green, blue)
     
    val regexString = simpleColors.map(_.toUpperCase).mkString("|")
    // regexString: String = BLACK|WHITE|RED|GREEN|BLUE
     
    df.select(regexp_replace(col("Description"), regexString, "COLOR").alias("color_clean"), 
            col("Description")).show(2)
    // +--------------------+--------------------+
    // |         color_clean|         Description|
    // +--------------------+--------------------+
    // |COLOR HANGING HEA...|WHITE HANGING HEA...|
    // | COLOR METAL LANTERN| WHITE METAL LANTERN|
    // +--------------------+--------------------+
    // only showing top 2 rows
    cs

    translate: 문자 치환(LEET -> L: 1, E: 3, E:3, T:7 로 치환하게 됨)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    import org.apache.spark.sql.functions.translate
    // import org.apache.spark.sql.functions.translate
     
    df.select(translate(col("Description"), "LEET""1337"),
            col("Description")).show(2)
    // +----------------------------------+--------------------+
    // |translate(Description, LEET, 1337)|         Description|
    // +----------------------------------+--------------------+
    // |              WHI73 HANGING H3A...|WHITE HANGING HEA...|
    // |               WHI73 M37A1 1AN73RN| WHITE METAL LANTERN|
    // +----------------------------------+--------------------+
    // only showing top 2 rows
     
    df.select(translate(col("Description"), "LEET""1357"),
            col("Description")).show(2)
    // +----------------------------------+--------------------+
    // |translate(Description, LEET, 1357)|         Description|
    // +----------------------------------+--------------------+
    // |              WHI73 HANGING H3A...|WHITE HANGING HEA...|
    // |               WHI73 M37A1 1AN73RN| WHITE METAL LANTERN|
    // +----------------------------------+--------------------+
    // only showing top 2 rows
     
    df.select(translate(col("Description"), "LEET""1537"),
            col("Description")).show(2)
    // +----------------------------------+--------------------+
    // |translate(Description, LEET, 1537)|         Description|
    // +----------------------------------+--------------------+
    // |              WHI75 HANGING H5A...|WHITE HANGING HEA...|
    // |               WHI75 M57A1 1AN75RN| WHITE METAL LANTERN|
    // +----------------------------------+--------------------+
    // only showing top 2 rows
     
    df.select(translate(lit("LEET MEET"), "LEET""1537"), 
            col("Description")).show(2)
    // +--------------------------------+--------------------+
    // |translate(LEET MEET, LEET, 1537)|         Description|
    // +--------------------------------+--------------------+
    // |                       1557 M557|WHITE HANGING HEA...|
    // |                       1557 M557| WHITE METAL LANTERN|
    // +--------------------------------+--------------------+
    // only showing top 2 rows
    cs

    * 마지막의 LEET MEET의 경우를 보면 1537로 치환하겠다 했음에도 1557로 변경되는 것을 보면, 같은 글자일 경우 치환할 때 앞의 조건에 나온 문자로만 치환하는 것으로 보임

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    import org.apache.spark.sql.functions.regexp_extract
    // import org.apache.spark.sql.functions.regexp_extract
     
    val regexString = simpleColors.map(_.toUpperCase).mkString("(""|"")")
    // regexString: String = (BLACK|WHITE|RED|GREEN|BLUE)
     
    df.select(regexp_extract(col("Description"), regexString, 1).alias("color_clean"),
            col("Description")).show(2)
    // +-----------+--------------------+
    // |color_clean|         Description|
    // +-----------+--------------------+
    // |      WHITE|WHITE HANGING HEA...|
    // |      WHITE| WHITE METAL LANTERN|
    // +-----------+--------------------+
    // only showing top 2 rows
    cs

    * 정규표현식에 맞는 문자를 추출한 예

     

    contains: 해당 문자가 존재하는 지 확인 후 참/거짓 반환

    (python, sql: instr 사용)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    val containsBlack = col("Description").contains("BLACK")
    // containsBlack: org.apache.spark.sql.Column = contains(Description, BLACK)
     
    val containsWhite = col("DESCRIPTION").contains("WHITE")
    // containsWhite: org.apache.spark.sql.Column = contains(DESCRIPTION, WHITE)
     
    df.withColumn("hasSimpleColor", containsBlack.or(containsWhite))
        .where("hasSimpleColor").select("Description").show(3false)
    // +----------------------------------+
    // |Description                       |
    // +----------------------------------+
    // |WHITE HANGING HEART T-LIGHT HOLDER|
    // |WHITE METAL LANTERN               |
    // |RED WOOLLY HOTTIE WHITE HEART.    |
    // +----------------------------------+
    // only showing top 3 rows
    cs

    동적으로 where 절에 조건을 추가하는 방법

    * Scala에서 ":+"의 의미: List 뒤에 요소 추가

    * _*: Function에 해당 변수(예제에서는 selectedColumns)가 Collection임을 알려줌

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    val simpleColors = Seq("black""white""red""green""blue")
    // simpleColors: Seq[String] = List(black, white, red, green, blue)
     
    val selectedColumns = simpleColors.map(color=>{col("Description").contains(color.toUpperCase).alias(s"is_$color")}):+expr("*")
    // selectedColumns: Seq[org.apache.spark.sql.Column] = List(contains(Description, BLACK) AS `is_black`, contains(Description, WHITE) AS `is_white`, contains(Description, RED) AS `is_red`, contains(Description, GREEN) AS `is_green`, contains(Description, BLUE) AS `is_blue`, unresolvedstar())
     
    df.select(selectedColumns:_*).where(col("is_white").or(col("is_red"))).select("Description").show(3,false)
    // +----------------------------------+
    // |Description                       |
    // +----------------------------------+
    // |WHITE HANGING HEART T-LIGHT HOLDER|
    // |WHITE METAL LANTERN               |
    // |RED WOOLLY HOTTIE WHITE HEART.    |
    // +----------------------------------+
    // only showing top 3 rows
    cs

    6.6 Date & Timestamp 데이터 타입

    Spark: Java의 SimpleDateFormat 활용(Date, Timestamp)

    inferSchema 옵션의 경우 데이터 타입을 최대한 식별하려 함

    일반적인 날짜 저장 방법이 String이기 때문에 변환이 필요함

    * TimestampType은 초 단위까지만 지원됨, 밀리세컨드/마이크로세컨드를 다루기 위해서는 Long Type으로 변환 후 사용

    InfluxDB에서 시간 단위를 다루는 방법

    현재 시간 출력

    current_date(): 현재 날짜 반환

    current_timestamp(): 현재 날짜/시각 반환

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    import org.apache.spark.sql.functions.{current_date, current_timestamp}
    // import org.apache.spark.sql.functions.{current_date, current_timestamp}
     
    val dateDF = spark.range(10).withColumn("today", current_date())
    .withColumn("now", current_timestamp())
    // dateDF: org.apache.spark.sql.DataFrame = [id: bigint, today: date ... 1 more field]
     
    dateDF.createOrReplaceTempView("dateTable")
     
    dateDF.show(2)
    // +---+----------+--------------------+
    // | id|     today|                 now|
    // +---+----------+--------------------+
    // |  0|2020-10-25|2020-10-25 14:38:...|
    // |  1|2020-10-25|2020-10-25 14:38:...|
    // +---+----------+--------------------+
    // only showing top 2 rows
    cs

    Date/Timestamp에 일 +/- 방법

    date_sub(): 대상 Column에서 몇일 만큼을 뺌

    date_add(): 대상 Column에서 몇일 만큼을 더함

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import org.apache.spark.sql.functions.{date_add, date_sub}
    // import org.apache.spark.sql.functions.{date_add, date_sub}
     
    dateDF.select(date_sub(col("today"),5), date_add(col("today"), 5)).show(1)
    // +------------------+------------------+
    // |date_sub(today, 5)|date_add(today, 5)|
    // +------------------+------------------+
    // |        2020-10-20|        2020-10-30|
    // +------------------+------------------+
    // only showing top 1 row
     
    dateDF.select(date_sub(col("now"),5), date_add(col("now"), 5)).show(1)
    // +----------------+----------------+
    // |date_sub(now, 5)|date_add(now, 5)|
    // +----------------+----------------+
    // |      2020-10-20|      2020-10-30|
    // +----------------+----------------+
    // only showing top 1 row
    cs

    두 날짜 Column 간 차이 구하기

    datediff(): 일 차이 구함

    months_between(): 월 차이 구함

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    import org.apache.spark.sql.functions.{datediff, months_between, to_date}
    // import org.apache.spark.sql.functions.{datediff, months_between, to_date}
     
    dateDF.withColumn("week_ago", date_sub(col("today"), 7))
        .select(datediff(col("week_ago"), col("today"))).show(1)
    // +-------------------------+
    // |datediff(week_ago, today)|
    // +-------------------------+
    // |                       -7|
    // +-------------------------+
    // only showing top 1 row
     
    dateDF.select(to_date(lit("2016-01-01")).alias("start"),
            to_date(lit("2017-05-22")).alias("end"))
        .select(months_between(col("start"), col("end"))).show(1)
    // +--------------------------------+
    // |months_between(start, end, true)|
    // +--------------------------------+
    // |                    -16.67741935|
    // +--------------------------------+
    // only showing top 1 row
    cs

    to_date(): String -> Date 변환

    기준 포맷을 설정하지 않을 경우 Spark에서 변환 시 오인할 수 있으므로 두번째 파라메터에 format 설정

    to_timestamp(): String -> Timestamp, 항상 Format 설정 필요

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    dateDF.select(to_date(lit("2016-20-12")), to_date(lit("2017-12-11"))).show(1)
    // +---------------------+---------------------+
    // |to_date('2016-20-12')|to_date('2017-12-11')|
    // +---------------------+---------------------+
    // |                 null|           2017-12-11|
    // +---------------------+---------------------+
    // only showing top 1 row
     
    import org.apache.spark.sql.functions.to_date
    // import org.apache.spark.sql.functions.to_date
     
    val dateFormat="yyyy-dd-MM"
    // dateFormat: String = yyyy-dd-MM
     
    val cleanDateDF = spark.range(1).select(
        to_date(lit("2017-12-11"), dateFormat).alias("date"),
        to_date(lit("2017-20-11"), dateFormat).alias("date2"))
    // cleanDateDF: org.apache.spark.sql.DataFrame = [date: date, date2: date]
     
    cleanDateDF.createOrReplaceTempView("dateTable2")
     
    cleanDateDF.show()
    // +----------+----------+
    // |      date|     date2|
    // +----------+----------+
    // |2017-11-12|2017-11-20|
    // +----------+----------+
     
    cleanDateDF.select(datediff(col("date"), col("date2"))).show()
    // +---------------------+
    // |datediff(date, date2)|
    // +---------------------+
    // |                   -8|
    // +---------------------+
     
    import org.apache.spark.sql.functions.to_timestamp
    // import org.apache.spark.sql.functions.to_timestamp
     
    cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()
    // +----------------------------------+
    // |to_timestamp(`date`, 'yyyy-dd-MM')|
    // +----------------------------------+
    // |               2017-11-12 00:00:00|
    // +----------------------------------+
    cs

    Filter, 혹은 Where을 통해(둘이 엄연히 같은 것) 두 날짜 간의 관계를 비교해볼 수 있음

    1
    2
    3
    4
    5
    6
    cleanDateDF.filter(col("date2")>lit("2017-11-12")).show()
    // +----------+----------+
    // |      date|     date2|
    // +----------+----------+
    // |2017-11-12|2017-11-20|
    // +----------+----------+
    cs

     

    6.7 null 값 다루기

    빈 값 다루기: null이 가장 최적화되어 있음

     

    6.7.1 coalesce

    repartition과 함께 나오는 coalesce(int)와는 다른 함수인 듯함

    Dateframe에서 선택한 컬럼들 중 null이 아닌 첫번째 값 반환

    모든 컬럼이 null이 없음: 첫번째 컬럼 값 반환

    순서: 첫번째 컬럼 null check -> 두번째 컬럼 null check

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    import org.apache.spark.sql.functions.coalesce
    // import org.apache.spark.sql.functions.coalesce
     
    df.select(coalesce(col("Description"), col("CustomerId"))).show(2)
    // +---------------------------------+
    // |coalesce(Description, CustomerId)|
    // +---------------------------------+
    // |               RABBIT NIGHT LIGHT|
    // |              DOUGHNUT LIP GLOSS |
    // +---------------------------------+
    // only showing top 2 rows
    cs

     

    6.7.2 ifnull, nullif, nvl, nvl2

    SQL 함수들

    ifnull(tgt, res): tgt null일 경우 res 반환

    nullif(src, tgt): src와 tgt가 같을 경우 null, 아니면 src 반환

    nvl(tgt, res): tgt가 null일 경우 res 반환

    nvl2(tgt, res1, res2): tgt가 null이 아닐 경우 res1, null일 경우 res2 반환

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    sql("SELECT " + 
        "ifnull(null, 'return_value') as a," + 
        "nullif('value', 'value') as b," + 
        "nvl(null, 'return_value') as c," + 
        "nvl2('not_null', 'return_value', 'else_value') as d" + 
        " from dfTable limit 1").show()
    // +------------+----+------------+------------+
    // |           a|   b|           c|           d|
    // +------------+----+------------+------------+
    // |return_value|null|return_value|return_value|
    // +------------+----+------------+------------+
    cs

    6.7.3 drop

    Dataframe.na.drop() 사용

    drop(), drop("any"): Row 중 하나의 Column이라도 null이면 해당 Row 삭제

    drop("all"): Row 중 모든 Column의 값이 null, NaN일 경우에만 해당 Row 삭제

    drop의 두번째 파라메터에 배열 형태로 Target Column들을 넘겨줄 수 있음

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    df.na.drop()
    // res26: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]
     
    df.na.drop("any")
    // res27: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]
     
    df.na.drop("all")
    // res28: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]
     
    df.na.drop("all", Seq("StockCode""InvoiceNo"))
    // res30: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]
     
    df.show(2)
    // +---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
    // |InvoiceNo|StockCode|        Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
    // +---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
    // |   580538|    23084| RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
    // |   580538|    23077|DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
    // +---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
    // only showing top 2 rows
    cs

     

    6.7.4 fill

    na column에 null 값을 바꿔주는 함수

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    df.na.fill("All Null Values become this string").show(2)
    // +---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
    // |InvoiceNo|StockCode|        Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
    // +---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
    // |   580538|    23084| RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
    // |   580538|    23077|DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
    // +---------+---------+-------------------+--------+-------------------+---------+----------+--------------+
    // only showing top 2 rows
     
    df.na.fill(5, Seq("StockCode""InvoiceNo"))
    // res33: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]
     
    val fillColValues = Map("StockCode" -> 5"Description" -> "No Value")
    // fillColValues: scala.collection.immutable.Map[String,Any] = Map(StockCode -> 5, Description -> No Value)
     
    df.na.fill(fillColValues)
    // res34: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]
    cs

     

    6.7.5 replace

    nvl과 같이 null 값을 대체하는 함수

    1
    2
    df.na.replace("Description"Map("" -> "UNKNOWN"))
    // res35: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]
    cs

     

    6.8 정렬

    asc_nulls_first, desc_nulls_first, asc_nulls_last, desc_nulls_last 함수 사용

    1
    2
    3
    4
    5
    6
    7
    8
    9
    df.sort(desc_nulls_last("Description")).show(2)
    // +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
    // |InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
    // +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
    // |   546023|    85175|   wrongly sold sets|    -975|2011-03-08 17:29:00|      0.0|      null|United Kingdom|
    // |   546018|    85172|wrongly sold as sets|    -600|2011-03-08 17:23:00|      0.0|      null|United Kingdom|
    // +---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
    // only showing top 2 rows
     
    cs

     

    * 6.9 부터는 다음 포스트에서 게시합니다.

    댓글

Designed by Tistory.