-
[스파크 완벽 가이드] Chapter 20 - 스트림 처리의 기초Dev/Spark 2020. 12. 12. 21:33
* 해당 포스트는 "스파크 완벽 가이드" 책을 읽고 난 이후의 정리 내용입니다.
자세한 내용은 "스파크 완벽 가이드" 책을 통해 확인해주세요.
Apache Spark에서는 RDD와 유사한 연산을 실시간 처리로 수행할 수 있게 하는 DStream API와 Dataset, Dataframe을 사용해 실시간 처리를 수행하는 Structured Streaming으로 나뉨
1. 스트림 처리란
스트림 처리: 신규 데이터를 끊임없이 처리해 결과를 만들어내는 행위, 입력 데이터 무한, 시작과 끝이 정해져 있지 않음
입력 데이터: 스트림 처리 시스템에 도착한 일련의 이벤트(Ex: 신용카드 전표 정보, 웹 사이트 클릭, IoT 장비의 센서 데이터 등)
스트리밍 App: 이벤트 스트림이 도착하면 다양한 쿼리 연산을 수행, 다양한 버전의 결과 출력, Key-Value 저장소 등의 외부 sink 시스템에 최신 데이터 저장
배치 처리: 고정된 입력 데이터를 다룸(Ex: 모든 웹사이트의 방문 기록, 센서에서 읽어들인 과거의 모든 데이터 등 데이터 웨어하우스의 대규모 데이터셋), 결과를 한번만 만들어냄
스트리밍 처리와 배치 처리는 함께 사용하기도 함.
Ex) 스트리밍 App에서 입력 데이터를 배치 작업에서의 결과 데이터셋과 조인해야 하는 경우, 스트리밍 App의 결과가 배치 작업용 쿼리에 필요한 파일/테이블인 경우
따라서, 모든 비즈니스 로직은 스트리밍과 배치 연산에서 일관성 있게 동작해야 한다.
Ex) 청구 금액 계산에 사용하는 사용자 정의 코드가 스트리밍과 배치 방식에서 다른 결과를 만들어낼 경우 큰 문제 발생
(일관성이 없음)
따라 구조적 스트리밍은 처음부터 배치 어플리케이션+나머지 컴포넌트와 쉽게 연동할 수 있도록 설계됨 + 연속형 애플리케이션(Continuous Application) 개념을 스파크에 추가
Contious Application: 스트리밍, 배치, 대화형 작업으로 구성된 통합 애플리케이션
databricks.com/blog/2016/07/28/continuous-applications-evolving-streaming-in-apache-spark-2-0.html
1.1 스트림 처리 사례
스트림 처리의 일반적인 사용 사례
통보와 알림
연속적인 이벤트에서 특정 이벤트나 이벤트의 패턴을 탐지했을 때 발생
대응을 위한 의사결정의 의미 X
알림: 담당자에게 이벤트를 알리는 용도
Ex) 물건 주문이 일어날 경우 창고에서 물건 찾은 후 배송 위해 -> 주문 처리 센터의 직원에게 알림
어떤 경우에도 통보는 빠르게 일어나야 함
실시간 리포트
직원을 위한 실시간 대시보드(시스템 모니터링, 통계 모니터링 등)
플랫폼 전체 사용량, 시스템 부하, 실행 시간, 신규 기능의 사용량 등을 관찰
증분형 ETL
스파크 배치 잡: 원시 데이터를 ETL 처리해 Parquet같은 구조적 포맷으로 변환 시 자주 사용됨
구조적 스트리밍: 신규 데이터를 수 초 내에 반영-> 다음 처리에서 데이터를 받는 시점이 빨라짐
내고장성 보장 및 데이터를 정확히 한 번(exactly-once) 처리 필요
-> 데이터 중복 저장 & 유실이 일어나면 안됨
실시간 제공용 데이터 갱신
타 애플리케이션의 서비스용 데이터를 만들기 위해 사용(데이터 가공 후 전달)
Ex) 구글 애널리틱스: 각 페이지의 방문자 수를 연속적으로 추적 -> 최신 방문자 수 갱신에 사용
(사용자는 웹 UI로 최신 방문자 수 조회 가능)
서비스 제공을 위해 스트리밍 시스템은 Key-Value 저장소, 다른 저장소 시스템에 대한 동기식 증분 업데이트,
데이터 변형 방지를 위한 트랜잭션 등을 지원해야 함
실시간 의사결정
신규 입력 분석 -> 비즈니스 로직에 따라 처리
Ex) 신용카드 고객의 최근 이력 기준 카드 트랜잭션이 부정행위에 속하는 지 여부 자동 판단
온라인 머신러닝
여러 사용자의 실시간 데이터 + 이력 데이터 -> 모델 학습
단일 고객의 고정 규칙 적용 X -> 모든 고객의 행위에 기반한 모델을 연속적으로 갱신 + 각 트랜잭션에 적용
전체 고객에 대한 집계, 고정형 데이터셋에 대한 조인, 머신러닝 라이브러리 활용, 빠른 응답 시간 제공 등 필요
1.2 스트림 처리의 장점
일반적인 배치 애플리케이션: 직관적 & 유지 보수와 개발 단순, 많은 처리량 가질 수 있음
스트림 처리: 대기 시간 짧음(분, 초, ms 단위로 빠르게 응답해야 할 경우 메모리 에 저장하는 스트리밍 시스템 필요), 자동으로 연산 결과의 증분 생성 -> 배치 잡보다 결과를 수정하는 데 더 효율적
Ex: 지난 24시간의 웹 트래픽 통계 계산 시 단순히 구현된 배치 잡 사용 -> 실행할 때마다 전체 데이터 계산 필요, 스트리밍 API: 지난 번 Window의 연산 결과 기억+현재 Window의 연산 결과 추가하여 간단하게 계산 가능
1.3 스트림 처리의 과제
입력 데이터가 지연되거나 재전송될 시, 시간이 뒤섞여 도착할 수 있음.
{value: 1, time: "2017-04-07T00:00:00"} {value: 2, time: "2017-04-07T01:00:00"} {value: 5, time: "2017-04-07T02:00:00"} {value: 10, time: "2017-04-07T01:30:00"} {value: 7, time: "2017-04-07T03:00:00"}
5라는 값을 받았을 때의 특정 액션 수행 로직 -> 가능
2->10->5 순서로 데이터가 들어왔을 때의 수행 로직 -> ?
10이 늦게 들어왔을 때를 대비하기 위해 2, 5 등의 값을 저장하는 등의 일부 상태를 유지해야 함.
규모가 커질수록, 정보가 다양해질 수록 상태 유지의 문제는 커짐
추가적 문제점
- 애플리케이션 타임스탬프(이벤트 시간) 기준으로 순서가 뒤섞인 데이터 처리
- 대규모의 상태 정보 유지하기
- 높은 데이터 처리량 보장하기
- 장애 상황에서도 정확히 한 번 처리
- 부하 불균형과 뒤처진 서버 다루기
- 이벤트에 빠르게 응답하기
- 다른 저장소 시스템의 외부 데이터와 조인
- 신규 이벤트 도착 시 출력 싱크의 갱신 방법 결정
- 출력 시스템에 데이터 저장 시 트랜잭션 보장
- 런타임에 비즈니스 로직 변경
2. 스트림 처리의 핵심 설계 개념
앞의 문제점을 해결하는 스트리밍 시스템 설계 필요
2.1 레코드 단위 처리와 선언형 API
Apache Storm: 각 이벤트를 레코드 단위로 애플리케이션에 전달 후 사용자 코드에 반응하도록 설계
레코드 단위 처리 API 사용 API: 애플리케이션 내부에서 여러 처리 파이프라인을 연결하는 기능만 제공
문제점: 상태 관리 등 -> 직접 상태를 추적해야 함, 메모리 확보를 위한 상태 제거 & 중복 처리 방지 등을 직접 구현해야 함
✅개발자, 혹은 담당자의 높은 숙련도 및 이해도 필요
선언형 API: 어떻게 데이터를 처리할 지를 지정하는 것이 아닌 무엇을 처리할 지 지정
DStream API: map, reduce, filter 등의 연산을 기반으로 하는 함수형 API 제공
2.2 이벤트 시간과 처리 시간
이벤트 시간 처리: 원천 시스템에서 각 레코드에 기록한 Timestamp 기반 데이터 처리
처리 시간 기준 처리: 스트리밍 App에 레코드가 도착한 시간 기반으로 처리
IoT 센서 데이터를 처리하는 경우 등에서 이벤트 시간 처리는 매우 중요함.
데이터가 뒤섞여 오거나 지연될 경우 -> 상태 추적 및 유지 필요
2.3 연속형 처리와 마이크로 배치 처리
연속형 처리 기반의 시스템을 구성하는 각 노드: 다른 노드에서 전송하는 메시지를 끊임없이 수신 -> 새로 갱신된 정보를 자신의 하위 노드로 전송
Ex: 각 노드는 레코드들을 map 처리한 후 reducer로 보냄. reducer는 신규 레코드를 받을 때마다 상태를 갱신
전체 입력량이 비교적 적을 경우 가장 빠르게 응답. But, 레코드 단위 부하가 매우 크기 때문에 최대 처리량이 적음.
Ex: 다음 처리 노드로 메시지 패킷을 보내기 위해 OS를 호출하는 연산 부하 발생
전체 시스템을 중지해야 애플리케이션을 변경할 수 있음.
마이크로 배치: 입력 데이터를 작은 배치로 모으기 위해 대기(500ms 등)
각 시간대별 배치는 일반적인 배치 Job의 형태로 병렬 처리되어 결과가 출력됨
최적화 기법을 적용해 더 높은 노드당 처리량을 만들 수 있음
더 적은 노드로 같은 양의 데이터 처리 가능, 부하 분산 가능
마이크로 배치를 모으기 위한 시간이 필요하기에 기본적인 지연 시간이 존재함
스파크-> 마이크로 배치 사용
선택: 지연 시간 요건과 총 운영 비용 고려
마이크로 배치 시스템: Application에 따라 100ms~second 단위의 응답 시간 안정적 제공 -> 운영 비용을 낮출 수 있음, 노드 장애 🔽
연속형 처리: 빠른 응답 시간 필요한 경우 사용
3. 스파크의 스트리밍 API
Spark는 두 가지 스트리밍 API를 제공
DStream API(Spark Streaming): 마이크로 배치 방식으로만 동작, 선언형 API(함수형 기반 API), 이벤트 시간 처리 지원 X
구조적 스트리밍 API(Spark Structured Streaming): 최적화 기술, 이벤트 시간, 연속형 처리넵 지원
3.1 DStream API
RDD를 활용한 Spark API와 장/단점을 공유
장점: 일반적으로 많이 사용됨(안정적), RDD 코드를 함께 사용해 정적 데이터와 조인 등의 기능 제공
단점: Java, Scala, Python 등의 객체와 함수에 매우 의존적 -> 스트림 처리 엔진의 최적화 기법 적용하기 힘듬
처리 시간을 기준으로 동작함-> 이벤트 시간을 기준으로 처리화고 싶은 경우 자체 구현 필요
마이크로 배치 형태로만 동작
3.2 구조적 스트리밍(Structured Streaming)
spark.apache.org/docs/latest/structured-streaming-programming-guide.html
스파크의 구조적 API를 기반으로 하는 고수준 스트리밍 API
구조적 처리를 할 수 있는 모든 환경에서 Scala, Java, Python, R, SQL 등을 통해 사용 가능
DStream처럼 고수준 연산 기반의 선언형 API 제공
최적화 기술 자동으로 수행
이벤트 시간 처리 지원(+Watermark)
연속형 처리 지원(실험적)
import org.apache.spark.sql.streaming.Trigger /* * 연속형 처리 방법 * .trigger 주목 */ spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "topic1") .trigger(Trigger.Continuous("1 second")) // only change in query .start()
spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing
Dataframe, SQL 활용 해 스트림 처리 가능
'Dev > Spark' 카테고리의 다른 글
[Spark Source code 훑어보기] 소스 코드 Import (0) 2021.01.25 [스파크 완벽 가이드] Chapter 21 - 구조적 스트리밍의 기초 (0) 2020.12.20 [스파크 완벽 가이드] Chapter 16 - 스파크 애플리케이션 개발하기 (0) 2020.11.28 [스파크 완벽 가이드] Chapter 12 - RDD (0) 2020.11.15 [스파크 완벽 가이드] Chapter 11 - Dataset (0) 2020.11.15