-
[스파크 완벽 가이드] Chapter 6 - 다양한 데이터 타입 다루기 (3)Dev/Spark 2020. 10. 25. 18:51
*해당 포스트는 "스파크 완벽 가이드" 책을 읽고 난 이후의 정리 내용입니다.
자세한 내용은 "스파크 완벽 가이드" 책을 통해 확인해주세요.
* 내용이 너무 길어 나눠 게시합니다.
6.9 복합 데이터 타입
struct, array, map 등
6.9.1 Struct
Struct: Dataframe 내부의 Dataframe
12345678910111213141516171819202122232425df.selectExpr("(Description, InvoiceNo) as complex", "*").show(2)// +--------------------+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+// | complex|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country|// +--------------------+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+// |[RABBIT NIGHT LIG...| 580538| 23084| RABBIT NIGHT LIGHT| 48|2011-12-05 08:38:00| 1.79| 14075.0|United Kingdom|// |[DOUGHNUT LIP GLO...| 580538| 23077|DOUGHNUT LIP GLOSS | 20|2011-12-05 08:38:00| 1.25| 14075.0|United Kingdom|// +--------------------+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+// only showing top 2 rowsdf.selectExpr("struct(Description, InvoiceNo) as complex", "*").show(2)// +--------------------+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+// | complex|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country|// +--------------------+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+// |[RABBIT NIGHT LIG...| 580538| 23084| RABBIT NIGHT LIGHT| 48|2011-12-05 08:38:00| 1.79| 14075.0|United Kingdom|// |[DOUGHNUT LIP GLO...| 580538| 23077|DOUGHNUT LIP GLOSS | 20|2011-12-05 08:38:00| 1.25| 14075.0|United Kingdom|// +--------------------+---------+---------+-------------------+--------+-------------------+---------+----------+--------------+// only showing top 2 rowsimport org.apache.spark.sql.functions.struct// import org.apache.spark.sql.functions.structval complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))// complexDF: org.apache.spark.sql.DataFrame = [complex: struct<Description: string, InvoiceNo: string>]complexDF.createOrReplaceTempView("complexDF")cs Select(조회) 방법: "."을 사용, getField("컬럼명") Method 사용
Struct컬럼.*으로 조회 시 Struct 컬럼 내부의 모든 내용을 Column으로 펼친 상태로 반환함
1234567891011121314151617181920212223242526complexDF.select("complex.Description").show(2)// +-------------------+// | Description|// +-------------------+// | RABBIT NIGHT LIGHT|// |DOUGHNUT LIP GLOSS |// +-------------------+// only showing top 2 rowscomplexDF.select(col("complex").getField("Description")).show(2)// +-------------------+// |complex.Description|// +-------------------+// | RABBIT NIGHT LIGHT|// |DOUGHNUT LIP GLOSS |// +-------------------+// only showing top 2 rowscomplexDF.select("complex.*").show(2)// +-------------------+---------+// | Description|InvoiceNo|// +-------------------+---------+// | RABBIT NIGHT LIGHT| 580538|// |DOUGHNUT LIP GLOSS | 580538|// +-------------------+---------+// only showing top 2 rowscs 6.9.2 Array
예) 하나의 Document가 있고 그 Document에 카테고리가 따로 존재하는 경우 사용
먼저 Array 컬럼 생성하는 것부터 진행
split(tgt, delimiter): 하나의 문자열(tgt)을 여러 문자의 배열로 변환, 변환 시에는 delimiter 사용, Column에도 적용 가능
1234567891011121314151617181920import org.apache.spark.sql.functions.split// import org.apache.spark.sql.functions.splitdf.select(split(col("Description"), " ")).show(2)// +---------------------+// |split(Description, )|// +---------------------+// | [RABBIT, NIGHT, L...|// | [DOUGHNUT, LIP, G...|// +---------------------+// only showing top 2 rowsdf.select(split(col("Description"), " ").alias("array_col")).selectExpr("array_col[0]").show(2)// +------------+// |array_col[0]|// +------------+// | RABBIT|// | DOUGHNUT|// +------------+// only showing top 2 rowscs size(arr): arr 배열의 길이를 구하는 함수, Array 또는 Map Column에서만 사용 가능
1234567891011121314df.select(size(split(col("Description"), " "))).show(2)// +---------------------------+// |size(split(Description, ))|// +---------------------------+// | 3|// | 4|// +---------------------------+// only showing top 2 rowsdf.select(size(col("Description"))).show(2)// org.apache.spark.sql.AnalysisException: cannot resolve 'size(`Description`)' due to data type mismatch: argument 1 requires (array or map) type, however, '`Description`' is of string type.;;// 'Project [size(Description#12) AS size(Description)#879]// +- Relation[InvoiceNo#10,StockCode#11,Description#12,Quantity#13,InvoiceDate#14,UnitPrice#15,CustomerID#16,Country#17] csvcs array_contains(arr, tgt): arr 배열에 tgt이 존재하는지 확인
123456789101112import org.apache.spark.sql.functions.array_contains// import org.apache.spark.sql.functions.array_containsdf.select(array_contains(split(col("Description"), " "), "WHITE")).show(2)// +--------------------------------------------+// |array_contains(split(Description, ), WHITE)|// +--------------------------------------------+// | true|// | true|// +--------------------------------------------+// only showing top 2 rowscs explode(arr): arr의 내부 데이터를 모두 분리해 하나의 row에 매핑함(나머지 컬럼은 계속 중복처리함)
1234567891011121314151617import org.apache.spark.sql.functions.{split, explode}// import org.apache.spark.sql.functions.{split, explode}df.withColumn("splitted", split(col("Description"), " ")).withColumn("exploded", explode(col("splitted"))).select("Description", "InvoiceNo", "exploded").show(6)// +--------------------+---------+--------+// | Description|InvoiceNo|exploded|// +--------------------+---------+--------+// |WHITE HANGING HEA...| 536365| WHITE|// |WHITE HANGING HEA...| 536365| HANGING|// |WHITE HANGING HEA...| 536365| HEART|// |WHITE HANGING HEA...| 536365| T-LIGHT|// |WHITE HANGING HEA...| 536365| HOLDER|// | WHITE METAL LANTERN| 536365| WHITE|// +--------------------+---------+--------+// only showing top 6 rowscs 6.9.3 Map
Key-Value의 데이터 타입.
map(key, value): 두 컬럼(key, value)을 key 컬럼(key)과 value 컬럼(value)로 합치는 함수
selectExpr에서 Key를 통해 조회 가능(없으면 Null)
explode를 통해 Key/Value 컬럼으로 변환할 수 있음
123456789101112131415161718192021222324252627282930313233343536373839404142434445import org.apache.spark.sql.functions.map// import org.apache.spark.sql.functions.mapdf.select(map(col("Description"), col("InvoiceNo")).alias("complex_map")).show(2)// +--------------------+// | complex_map|// +--------------------+// |[WHITE HANGING HE...|// |[WHITE METAL LANT...|// +--------------------+// only showing top 2 rowsdf.select(map(col("InvoiceNo"),col("Description")).alias("complex_map")).show(2)// +--------------------+// | complex_map|// +--------------------+// |[536365 -> WHITE ...|// |[536365 -> WHITE ...|// +--------------------+// only showing top 2 rowsdf.select(map(col("Description"), col("InvoiceNo")).alias("complex_map")).selectExpr("complex_map['WHITE METAL LANTERN']").show(2)// +--------------------------------+// |complex_map[WHITE METAL LANTERN]|// +--------------------------------+// | null|// | 536365|// +--------------------------------+// only showing top 2 rowsdf.select(map(col("Description"), col("InvoiceNo")).alias("complex_map")).selectExpr("explode(complex_map)").show(2)// +--------------------+------+// | key| value|// +--------------------+------+// |WHITE HANGING HEA...|536365|// | WHITE METAL LANTERN|536365|// +--------------------+------+// only showing top 2 rowscs 6.10 JSON
Spark에서 JSON Column을 다루는 방법
JSON Column 생성&조회
123456789101112131415val jsonDF = spark.range(1).selectExpr("""'{"myJSONKey" : {"myJSONValue" : [1,2,3]}}' as jsonString""")// jsonDF: org.apache.spark.sql.DataFrame = [jsonString: string]import org.apache.spark.sql.functions.{get_json_object, json_tuple}// import org.apache.spark.sql.functions.{get_json_object, json_tuple}jsonDF.select(get_json_object(col("jsonString"),"$.myJSONKey.myJSONValue[1]") as "column",json_tuple(col("jsonString"), "myJSONKey")).show(2)// +------+--------------------+// |column| c0|// +------+--------------------+// | 2|{"myJSONValue":[1...|// +------+--------------------+cs to_json("Struct Column"): Struct Column을 json 문자열로 변환
from_json("json Column", Schema): Schema Type에 맞춰 JSON Column을 변환
1234567891011121314151617181920212223242526272829303132333435import org.apache.spark.sql.functions.to_json// import org.apache.spark.sql.functions.to_jsondf.selectExpr("(InvoiceNo, Description) as myStruct").select(to_json(col("myStruct"))).show(2)// +-----------------------+// |structstojson(myStruct)|// +-----------------------+// | {"InvoiceNo":"536...|// | {"InvoiceNo":"536...|// +-----------------------+// only showing top 2 rowsimport org.apache.spark.sql.functions.from_json// import org.apache.spark.sql.functions.from_jsonimport org.apache.spark.sql.types._// import org.apache.spark.sql.types._val parseSchema = new StructType(Array(new StructField("InvoiceNo", StringType, true),new StructField("Description", StringType, true)))// parseSchema: org.apache.spark.sql.types.StructType = StructType(StructField(InvoiceNo,StringType,true), StructField(Description,StringType,true))df.selectExpr("(InvoiceNo, Description) as myStruct").select(to_json(col("myStruct")).alias("newJSON")).select(from_json(col("newJSON"),parseSchema),col("newJSON")).show(2)// +----------------------+--------------------+// |jsontostructs(newJSON)| newJSON|// +----------------------+--------------------+// | [536365, WHITE HA...|{"InvoiceNo":"536...|// | [536365, WHITE ME...|{"InvoiceNo":"536...|// +----------------------+--------------------+// only showing top 2 rowscs 6.11 UDF(User Defined Function)
사용자가 원하는 형태로 트렌스포메이션을 만들 수 있게 해주는 함수
하나 이상의 Column을 입력으로 받아 변환
*주의: UDF를 사용할 경우 Spark의 함수를 사용하는 것이 아닌 java, scala, python 등의 method, function들을 사용하는 것이기 때문에 성능 저하가 발생할 수 있다.
만들어진 UDF는 모든 Worker Node로 전달되서 사용 가능한 상태가 됨(Register)
(이미지 출처: adatis.co.uk/databricks-udf-performance-comparisons/)
SparkSession.udf.register("함수명", function): 개발한 UDF를 SparkSession에 등록해 Dataframe은 물론 SQL, 타 프로그래밍 언어에서 사용 가능하게 등록하는 역할
12345678910111213141516171819202122232425262728293031323334353637val udfExampleDF = spark.range(5).toDF("num")// udfExampleDF: org.apache.spark.sql.DataFrame = [num: bigint]def power3(number:Double):Double = number*number*number// power3: (number: Double)Doublepower3(2.0)// res75: Double = 8.0import org.apache.spark.sql.functions.udf// import org.apache.spark.sql.functions.udfval power3udf = udf(power3(_:Double):Double)// power3udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(DoubleType)))udfExampleDF.select(power3udf(col("num"))).show()// +--------+// |UDF(num)|// +--------+// | 0.0|// | 1.0|// | 8.0|// | 27.0|// | 64.0|// +--------+spark.udf.register("power3", power3(_:Double):Double)// res77: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(DoubleType)))udfExampleDF.selectExpr("power3(num)").show(2)// +-------------------------------+// |UDF:power3(cast(num as double))|// +-------------------------------+// | 0.0|// | 1.0|// +-------------------------------+// only showing top 2 rowscs 6.12 Hive UDF
SparkSession.builder().enableHiveSupport() 명시 후 사용
라이브러리 의존성 명시해야 함
'Dev > Spark' 카테고리의 다른 글
[스파크 완벽 가이드] Chapter 11 - Dataset (0) 2020.11.15 [스파크 완벽 가이드] 목차 (0) 2020.10.27 [스파크 완벽 가이드] Chapter 6 - 다양한 데이터 타입 다루기 (2) (0) 2020.10.18 [스파크 완벽 가이드] Chapter 7 - 집계 연산 (2) (0) 2020.10.18 [스파크 완벽 가이드] Chapter 7 - 집계 연산 (1) (0) 2020.10.18