-
[스파크 완벽 가이드] Chapter 7 - 집계 연산 (2)Dev/Spark 2020. 10. 18. 02:39
* 해당 포스트는 "스파크 완벽 가이드" 책을 읽고 난 이후의 정리 내용입니다.
자세한 내용은 "스파크 완벽 가이드" 책을 통해 확인해주세요.
* 내용이 너무 길어 나눠 게시합니다.
7.2 그룹화(Grouping)
Grouping할 대상 컬럼(들)을 설정한 후 대상 컬럼(들), 혹은 타 컬럼들을 가공하기 위해 사용
컬럼(들)의 그룹화(RelationalGroupedDataset으로 반환)->집계 연산 수행(DataFrame으로 반환) 의 두 단계로 나뉨
123456789101112131415161718192021222324252627df.groupBy("InvoiceNo", "CustomerId").count().show()// +---------+----------+-----+// |InvoiceNo|CustomerId|count|// +---------+----------+-----+// | 536846| 14573| 76|// | 537026| 12395| 12|// | 537883| 14437| 5|// | 538068| 17978| 12|// | 538279| 14952| 7|// | 538800| 16458| 10|// | 538942| 17346| 12|// | C539947| 13854| 1|// | 540096| 13253| 16|// | 540530| 14755| 27|// | 541225| 14099| 19|// | 541978| 13551| 4|// | 542093| 17677| 16|// | 536596| null| 6|// | 537252| null| 1|// | 538041| null| 1|// | 543188| 12567| 63|// | 543590| 17377| 19|// | C543757| 13115| 1|// | C544318| 12989| 1|// +---------+----------+-----+// only showing top 20 rowscs 7.2.1 표현식 이용
count: Method(Action), Function(Transformation) 사용 가능, 하지만 Function으로 사용하는 것을 추천
select에서 사용하는 것보다 agg method에서 사용하는 것이 좋음
agg Method: 여러 집계 처리 한번에 지정 가능, 집계에 표현식 사용 가능, Transformation이 완료된 컬럼에 alias 사용 가능
123456789101112131415161718192021222324252627282930313233import org.apache.spark.sql.functions.count// import org.apache.spark.sql.functions.countdf.groupBy("InvoiceNo").agg(count("Quantity").alias("quan"),expr("count(Quantity)")).show()// +---------+----+---------------+// |InvoiceNo|quan|count(Quantity)|// +---------+----+---------------+// | 536596| 6| 6|// | 536938| 14| 14|// | 537252| 1| 1|// | 537691| 20| 20|// | 538041| 1| 1|// | 538184| 26| 26|// | 538517| 53| 53|// | 538879| 19| 19|// | 539275| 6| 6|// | 539630| 12| 12|// | 540499| 24| 24|// | 540540| 22| 22|// | C540850| 1| 1|// | 540976| 48| 48|// | 541432| 4| 4|// | 541518| 101| 101|// | 541783| 35| 35|// | 542026| 9| 9|// | 542375| 6| 6|// | C542604| 8| 8|// +---------+----+---------------+// only showing top 20 rowscs 7.2.2 맵 사용
컬럼->키, 수행할 집계 함수의 문자열->값
12345678df.groupBy("InvoiceNo").agg("Quantity"->"avg", "Quantity"->"stddev_pop").show()// +---------+------------------+--------------------+// |InvoiceNo| avg(Quantity)|stddev_pop(Quantity)|// +---------+------------------+--------------------+// | 536596| 1.5| 1.1180339887498947|// | 536938|33.142857142857146| 20.698023172885524|// ...cs 7.3 윈도우 함수
group-by: 전체 Row를 대상으로 그룹화함
window: 특정 순서(Ex)시간)에 따라 Row를 "분류"함
준비 데이터 생성 과정
12345678910111213141516171819/** Column을 Date 형태로 변환 시작*/import org.apache.spark.sql.functions.{col, to_date}// import org.apache.spark.sql.functions.{col, to_date}val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))// dfWithDate: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 7 more fields]dfWithDate.show()// +---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+// |InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country| date|// +---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+// | 536365| 85123A|WHITE HANGING HEA...| 6|12/1/2010 8:26| 2.55| 17850|United Kingdom|2010-12-01|// | 536365| 71053| WHITE METAL LANTERN| 6|12/1/2010 8:26| 3.39| 17850|United Kingdom|2010-12-01|// .../** 변환 끝*/dfWithDate.createOrReplaceTempView("dfWithDate")// Temp View 만들기cs partitionBy(columns): 파티셔닝 스키마의 개념과 관련 X, 그룹을 어떻게 나눌 지 결정하는 것.
12345678import org.apache.spark.sql.expressions.Window// import org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functions.col// import org.apache.spark.sql.functions.colval windowSpec = Window.partitionBy("CustomerId", "date").orderBy(col("Quantity").desc).rowsBetween(Window.unboundedPreceding, Window.currentRow)// windowSpec: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@fce6355cs 추가 자료: thebook.io/006908/part02/ch05/01/03/02-02/
시간대별 최대 구매 개수 구하기
123456import org.apache.spark.sql.functions.max// import org.apache.spark.sql.functions.maxval maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)// maxPurchaseQuantity: org.apache.spark.sql.Column// = max(Quantity) OVER (PARTITION BY CustomerId,// date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)cs 최대 구매 수량을 가진 날짜 구하기
12345678910import org.apache.spark.sql.functions.{dense_rank, rank}// import org.apache.spark.sql.functions.{dense_rank, rank}val purchaseDenseRank = dense_rank().over(windowSpec)// purchaseDenseRank: org.apache.spark.sql.Column// = DENSE_RANK() OVER (PARTITION BY CustomerId,// date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)val purchaseRank = rank().over(windowSpec)// purchaseRank: org.apache.spark.sql.Column// = RANK() OVER (PARTITION BY CustomerId,// date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)cs 결과 출력
1234567891011121314151617181920import org.apache.spark.sql.functions.col// import org.apache.spark.sql.functions.col// PrintdfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId").select(col("CustomerId"),col("date"),col("Quantity"),purchaseRank.alias("quantityRank"),purchaseDenseRank.alias("quantityDenseRank"),maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()// +----------+----------+--------+------------+-----------------+-------------------+// |CustomerId| date|Quantity|quantityRank|quantityDenseRank|maxPurchaseQuantity|// +----------+----------+--------+------------+-----------------+-------------------+// | 12346|2011-01-18| 74215| 1| 1| 74215|// | 12346|2011-01-18| -74215| 2| 2| 74215|// ...cs 7.4 그룹화 셋
여러 그룹에 걸쳐 집계할 수 있는 방법 필요-> 그룹화 셋(GROUPING SET) 사용
여러 집계를 결합하는 저수준 기능, group-by 구문에서 원하는 형태로 집계 생성 가능
재고 코드와 고객 별 총 수량 구하기
12345678910111213141516171819202122232425262728293031323334val dfNoNull = dfWithDate.drop()// dfNoNull: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 7 more fields]dfNoNull.createOrReplaceTempView("dfNoNull")spark.sql("SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull"+ " GROUP BY customerId, stockCode order by customerId DESC, stockCode desc").show()// +----------+---------+-------------+// |CustomerId|stockCode|sum(Quantity)|// +----------+---------+-------------+// | 18287| 85173| 48|// | 18287| 85040A| 48|// | 18287| 85039B| 120|// | 18287| 85039A| 96|// | 18287| 84920| 4|// | 18287| 84584| 6|// ...// +----------+---------+-------------+// only showing top 20 rowsspark.sql("SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull"+ " GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode))"+ " ORDER BY CustomerId DESC, stockCode DESC").show()// +----------+---------+-------------+// |customerId|stockCode|sum(Quantity)|// +----------+---------+-------------+// | 18287| 85173| 48|// | 18287| 85040A| 48|// | 18287| 85039B| 120|// | 18287| 85039A| 96|// | 18287| 84920| 4|// | 18287| 84584| 6|// ...// +----------+---------+-------------+// only showing top 20 rowscs SQL에서만 지원, DataFrame에서 사용 시 롤업과 큐브 사용해야 함
고객(customerId), 재고수준(stockCode)에 관계 없이 총 수량의 합산 결과 추가 -> GROUPING SETS 구문에 집계 방식 지정
123456789101112131415spark.sql("SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull"+ " GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode), ())"+ " ORDER BY sum(Quantity) DESC, CustomerId DESC, stockCode DESC").show()// +----------+---------+-------------+// |customerId|stockCode|sum(Quantity)|// +----------+---------+-------------+// | null| null| 5176450|// | 13256| 84826| 12540|// | 17949| 22197| 11692|// | 16333| 84077| 10080|// | 16422| 17003| 10077|// | null| 22355| 9178|// ...// +----------+---------+-------------+// only showing top 20 rowscs 7.4.1 롤업
rollup(columns): 그룹화할 컬럼을 설정, rollup 수행
.agg(expr): 집계 설정
기존 컬럼 뿐만 아니라 새로 생성한 컬럼을 통해서도 사용 가능
12345678910111213141516171819202122232425262728293031323334353637val rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity")).selectExpr("Date","Country","'sum(Quantity)' as total_quantity").orderBy("Date").show()// +----------+--------------+--------------+// | Date| Country|total_quantity|// +----------+--------------+--------------+// | null| null| sum(Quantity)|// |2010-12-01| Netherlands| sum(Quantity)|// |2010-12-01|United Kingdom| sum(Quantity)|// ...//// rolledUpDF: Unit = ()val rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity")).select("Date","Country","sum(Quantity)").orderBy("Date").show()// +----------+--------------+-------------+// | Date| Country|sum(Quantity)|// +----------+--------------+-------------+// | null| null| 5176450|// |2010-12-01| Australia| 107|// |2010-12-01| France| 449|// ...//// rolledUpDF: Unit = ()cs rollup된 두 개의 컬럼값이 모두 null인 row: 두 컬럼에 속한 전체 레코드의 합계를 나타냄
12345678910111213141516rolledUpDF.where("Country IS NULL").show()// +----------+-------+-------------+// | Date|Country|sum(Quantity)|// +----------+-------+-------------+// | null| null| 5176450|// |2010-12-01| null| 26814|// |2010-12-02| null| 21023|// ...rolledUpDF.where("Date IS NULL").show()// +----+-------+-------------+// |Date|Country|sum(Quantity)|// +----+-------+-------------+// |null| null| 5176450|// +----+-------+-------------+cs 7.4.2 큐브
롤업을 고차원적으로 사용, 모든 차원에 대해 동일한 작업 수행(Ex) 전체 기간에 대해 날짜와 국가별 결과 추출)
세부적 예시
- 전체 날짜와 모든 국가에 대한 합계
- 모든 국가의 날짜별 합계
- 날짜별 국가의 합계
- 전체 날짜의 국가별 합계
등
1234567891011121314151617181920212223242526272829303132333435363738394041dfNoNull.cube("Date", "Country").agg(sum(col("Quantity"))).select("Date", "Country", "sum(Quantity)").orderBy("Date").show()// +----+--------------------+-------------+// |Date| Country|sum(Quantity)|// +----+--------------------+-------------+// |null| Japan| 25218|// |null| Australia| 83653|// |null| Portugal| 16180|// |null|United Arab Emirates| 982|// |null| Germany| 117448|// |null| Unspecified| 3300|// |null| RSA| 352|// |null| Singapore| 5234|// |null| Cyprus| 6317|// |null| null| 5176450|// ...// +----+--------------------+-------------+// only showing top 20 rowsdfNoNull.cube("Date", "Country").agg(sum(col("Quantity"))).select("Date", "Country", "sum(Quantity)").orderBy("Date").where("DATE IS NOT NULL").show()//+----------+--------------+-------------+//| Date| Country|sum(Quantity)|//+----------+--------------+-------------+//|2010-12-01|United Kingdom| 23949|//|2010-12-01| France| 449|//|2010-12-01| Germany| 117|//|2010-12-01| Norway| 1852|//|2010-12-01| Australia| 107|//|2010-12-01| EIRE| 243|//|2010-12-01| null| 26814|//|2010-12-01| Netherlands| 97|//...//+----------+--------------+-------------+//only showing top 20 rowscs 7.4.3 그룹화 메타데이터
큐브, 롤업 사용 시 집계 수준에 따라 필터링 목적으로 집계 수준 조회 경우 발생 -> grouping_id 사용
grouping_id: 결과 데이터셋의 집계 수준을 명시하는 컬럼 제공
그룹화 수준
3: 가장 높은 계층의 집계 결과에서 나타남, customerId나 stockCode에 관계없이 총 수량 제공
2: 개별 재고 코드의 모든 집계 결과에서 나타남. customerId에 관계 없이 재고 코드별 총 수량 제공
1: 구매한 물품에 관계없이 customerId를 기반으로 총 수량 제공
0: customerId와 stockCode 별 조합에 따라 총 수량 제공
1234567891011121314151617181920212223242526272829303132import org.apache.spark.sql.functions.{grouping_id, sum, expr}// import org.apache.spark.sql.functions.{grouping_id, sum, expr}dfNoNull.cube("customerId", "stockCode").agg(grouping_id(), sum("Quantity")).orderBy(col("grouping_id()").desc).show()/*+----------+---------+-------------+-------------+|customerId|stockCode|grouping_id()|sum(Quantity)|+----------+---------+-------------+-------------+| null| null| 3| 5176450|| null| 22867| 2| 6292|| null| 85135B| 2| 55|| null| 72800B| 2| 144|| null| 22277| 2| 417|| null| 20685| 2| 4027|| null| 21507| 2| 3308|| null| 21164| 2| 1762|| null| 84950| 2| 3621|| null| 90013B| 2| 36|| null| 21331| 2| 12|| null| 22763| 2| 202|| null| 21531| 2| 1507|| null| 85018D| 2| 12|| null| 21291| 2| 943|| null| 72131| 2| 137|| null| 20704| 2| 212|| null| 90197B| 2| 35|| null| 23614| 2| 26|| null| 23485| 2| 244|+----------+---------+-------------+-------------+only showing top 20 rows*/cs 7.4.4 피벗
Row -> Column 변환, 집계 함수 적용 가능
123456789101112131415161718192021222324252627val pivoted = dfWithDate.groupBy("date").pivot("Country").sum()// pivoted: org.apache.spark.sql.DataFrame = [date: date, Australia_sum(Quantity): bigint ... 113 more fields]pivoted.printSchema()/*root...|-- Switzerland_sum(Quantity): long (nullable = true)|-- Switzerland_sum(UnitPrice): double (nullable = true)|-- Switzerland_sum(CustomerID): long (nullable = true)|-- USA_sum(Quantity): long (nullable = true)|-- USA_sum(UnitPrice): double (nullable = true)|-- USA_sum(CustomerID): long (nullable = true)...*/pivoted.where("date > '2011-12-05'").select("date", "USA_sum(Quantity)").show()/*+----------+-----------------+| date|USA_sum(Quantity)|+----------+-----------------+|2011-12-06| null||2011-12-09| null||2011-12-08| -196||2011-12-07| null|+----------+-----------------+*/cs 7.5 UDAF
사용자가 정의하는 집계함수, UserDefinedAggregateFunction 상속 받음
init -> update -> merge -> eval의 단계를 거침
재정의해야할 요소들
inputSchema: 진입 파라메터들
bufferSchema: 중간 결과들
dataType: 반환 데이터 타입
deterministic: 동일한 입력값에 대한 동일한 결과 반환 여부
initialize: inputSchema 초기화
update: 각 노드들이 내부 버퍼를 업데이트 하는 로직
merge: 노드 간 버퍼 병합 로직
evaluate: 최종 반환 결과값 생성 로직
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647import org.apache.spark.sql.expressions.MutableAggregationBufferimport org.apache.spark.sql.expressions.UserDefinedAggregateFunctionimport org.apache.spark.sql.Rowimport org.apache.spark.sql.types._class BoolAnd extends UserDefinedAggregateFunction{def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("value", BooleanType) :: Nil)def bufferSchema: StructType = StructType(StructField("result", BooleanType) :: Nil)def dataType: DataType = BooleanTypedef deterministic: Boolean = truedef initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = true}def update(buffer: MutableAggregationBuffer, input:Row): Unit = {buffer(0) = buffer.getAs[Boolean](0) && input.getAs[Boolean](0)}def merge(buffer1: MutableAggregationBuffer, buffer2:Row): Unit = {buffer1(0) = buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)}def evaluate(buffer: Row): Any = {buffer(0)}}// Defined class BoolAndval ba = new BoolAnd// ba: BoolAnd = BoolAnd@6fbf27efspark.udf.register("booland", ba)// res15: org.apache.spark.sql.expressions.UserDefinedAggregateFunction = BoolAnd@6fbf27efimport org.apache.spark.sql.functions._// import org.apache.spark.sql.functions._spark.range(1).selectExpr("explode(array(TRUE, TRUE, TRUE)) as t").selectExpr("explode(array(TRUE, FALSE, TRUE)) as f", "t").select(ba(col("t")), expr("booland(f)")).show()/*+----------+----------+|booland(t)|booland(f)|+----------+----------+| true| false|+----------+----------+*/cs * 스칼라, 자바에서만 사용 가능
'Dev > Spark' 카테고리의 다른 글
[스파크 완벽 가이드] Chapter 6 - 다양한 데이터 타입 다루기 (3) (0) 2020.10.25 [스파크 완벽 가이드] Chapter 6 - 다양한 데이터 타입 다루기 (2) (0) 2020.10.18 [스파크 완벽 가이드] Chapter 7 - 집계 연산 (1) (0) 2020.10.18 [스파크 완벽 가이드] Chapter 6 - 다양한 데이터 타입 다루기 (1) (0) 2020.10.16 [스파크 완벽 가이드] Chapter 2 - 스파크 간단히 살펴보기 (0) 2020.10.04