ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [스파크 완벽 가이드] Chapter 21 - 구조적 스트리밍의 기초
    Dev/Spark 2020. 12. 20. 02:33

    * 해당 포스트는 "스파크 완벽 가이드" 책을 읽고 난 이후의 정리 내용입니다.

      자세한 내용은 "스파크 완벽 가이드" 책을 통해 확인해주세요.

     

    구조적 스트리밍의 기초

    구조적 스트리밍: Spark SQL 기반 스트림 처리 프레임워크(Dataframe, Dataset, SQL 사용)

    스트리밍 연산(배치 연산과 동일하게 표현)

    • 사용자: 스트림 처리용 코드, 목적지 정의
    • 구조적 스트리밍 엔진: 신규 데이터에 대한 증분 & 연속형 쿼리 실행
    • 구조적 스트리밍 엔진: 카탈리스트 엔진(코드 생성, 쿼리 최적화 등의 기능 지원) 사용해 연산에 대한 논리적 명령 처리

    여러 부가 기능 제공: 정확히 한 번 처리(exactly once 처리), checkpoint, WAL(write-ahead log) -> 내고장성 제공

    핵심 아이디어: "스트림 데이터는 데이터가 계속해서 추가되는 테이블"

    스트리밍 잡: 주기적으로 신규 입력 데이터를 확인 & 처리, 상태 저장소에 일부 상태를 갱신해 결과 변경

    핵심: 배치 처리나 스트림 처리와 관련된 쿼리 구문을 변경하지 않아도 된다. 배치나 스트리밍 중 하나를 쿼리 실행 유형으로 지정하면 된다.

    내부적으로 사용자의 쿼리를 어떻게 증분 처리할지 자동으로 파악(내고장성 보장, 신규 데이터가 유입될 때 마다 효율적으로 처리 결과 갱신)

    Basic Concepts of Structured Streaming

    DataFrame도 스트리밍 방식으로 동작 -> 기존 Spark API를 사용하듯이 코드를 작성하면 된다.

    🛑이벤트 시간 처리, 무작위 순서로 유입된 데이터 처리 등의 스트림 처리에 필요한 개념들을 고려해야 한다.

    스파크의 나머지 기능과 구조적 스트리밍을 통합해 연속형 애플리케이션 구축 가능

    databricks.com/blog/2016/07/28/continuous-applications-evolving-streaming-in-apache-spark-2-0.html

     

    Continuous Applications: Evolving Streaming in Apache Spark 2.0

    Apache Spark 2.0 lays the foundation for Continuous Applications, a simplified and unified way to write end-to-end streaming applications that reacts to data in real-time.

    databricks.com

    연속형 애플리케이션: 스트리밍 Job, 배치 Job, 스트리밍+오프라인 데이터의 조인, 대화형 비정형 쿼리의 실행 조합해 데이터에 실시간 반응 -> 통합 빅데이터 처리

    Ex) Spark SQL 활용해 대화형으로 조회하는 테이블 지속적 갱신, MLlib 사용해 학습한 머신러닝 모델 적용 결과 제공, 스파크 데이터소스의 오프라인 데이터 + 스트림 Join

     

    핵심 개념

    Structured Streaming: 단순하게 설계되어 있음(구조적 API 사용해봤으면 바로 사용 가능)

     

    1) 트랜스포메이션과 액션

    구조적 API의 스트림 버전이기 때문에 마찬가지로 Transformation과 Action을 가지고 있음.

    구조적 API에 비해 제약 사항 몇가지 있음.

     

    2) 입력 소스

    입력 받는 소스 데이터의 정보

    4가지의 입력 소스 제공(이 중 2가지는 테스트용)

    • Apache Kafka(0.10~)
    • HDFS나 S3 등 분산 파일 시스템의 파일(스파크는 디렉토리의 신규 파일을 계속 읽음)
    • 테스트용 소켓 소스
    • 테스트용 증분형 입력 소스(Rate Source)

     

    3) 싱크

    스트림의 결과를 저장할 목적지 명시

    • Apache Kafka(0.10~)
    • 파일 포맷
    • 출력 레코드에 임의 연산 수행 foreach 싱크
    • 테스트용 콘솔 싱크
    • 디버깅용 메모리 싱크

     

    4) 출력 모드

    데이터를 출력하는 방법 정의

    • 신규 정보만 추가하려는 경우 - append(싱크에 신규 레코드만 추가)
    • 바뀐 정보로 기존 로우를 갱신(Ex: 특정 웹페이지의 클릭 수 갱신) - update(변경 대상 레코드 자체를 갱신)
    • 매번 전체 결과 덮어씀(Ex: 모든 페이지의 전체 클릭 수 매번 파일로 기록) - complete(전체 출력 내용 재작성)

    특정 쿼리와 싱크는 일부 출력 모드만 지원

    Ex) 스트림에 map 연산만 수행하는 Job 존재 - 신규 데이터 유입 시 출력 데이터의 크기가 무한정 커지기 때문에 전체 데이터를 신규 파일로 저장하는 complete 모드는 적합하지 않음

    Ex) 한정된 수의 키를 사용 집계 시 시간에 따라 일부 키 값을 갱신해야 함 - append 모드보다 complete, update 모드가 더 적합

     

    추가: 쿼리에 따라 적용할 수 있는 모드의 제한이 존재함

    spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes

     

    Structured Streaming Programming Guide - Spark 3.0.1 Documentation

    Structured Streaming Programming Guide Overview Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on s

    spark.apache.org

     

    5) 트리거

    데이터 출력 시점 정의(언제 신규 데이터를 확인하고 결과를 갱신할지 정의)

    구조적 스트리밍: 기본적으로 마지막 입력 데이터 처리한 직후 신규 입력 데이터를 조회해 최단 시간 내에 새로운 처리 결과 만들어냄, 이런 동작 방식 때문에 파일 싱크를 사용하는 경우 작은 크기의 파일이 여러 개 생길 수 있음

    처리 시간(고정된 주기로만 신규 데이터 탐색) 기반의 트리거 지원

     

    6) 이벤트 시간 처리

    무작위로 도착한 레코드 내부에 기록된 타임스탬프를 기준으로 처리

     

    이벤트 시간 데이터

    이벤트 시간(Event-Time): 데이터에 기록된 시간 필드 의미

    스파크: 데이터가 유입된 시간 X - 데이터 생성 시간 기준 처리

    데이터가 늦게 업로드되거나 네트워크 지연으로 데이터의 순서가 뒤섞인 채 시스템에 들어와도 처리 가능

    구조적 스트리밍: 이벤트 시간은 테이블에 있는 하나의 컬럼으로만 고려

    표준 SQL 연산자를 이용해 그룹화, 집계, 윈도우 처리 등을 할 수 있음

    구조적 스트리밍이 내부적으로 이벤트 시간 필드 인식 시 쿼리 실행 최적화, 타임 윈도우에서 상태 정보의 제거 시점 등을 결정하는 등의 작업 수행 가능

    이벤트 시간에서 지연되는 데이터 - 워터마크를 통해 제어

     

    워터마크

    이벤트 시간 처리에서의 시간 제한 설정

    이럴 땐 어떻게 해야할까?

    Ex: 모바일 장비의 로그를 처리하는 애플리케이션 - 업로드 지연 현상으로 인해 30분 전 데이터까지 처리해야 하는 경우 존재

    워터마크의 개념

    워터마크: 과거 데이터의 보관 주기를 제한할 때 사용

    특정 이벤트 시간의 윈도우 결과를 출력하는 시점을 제어(Ex: 워터마크 시간을 초과할 때까지 대기)할 때도 사용됨

     

    구조적 스트리밍 활용

    예제: 인간 행동 인지를 위한 이기종 데이터셋 사용

    github.com/FVBros/Spark-The-Definitive-Guide/tree/master/data

     

    FVBros/Spark-The-Definitive-Guide

    한빛미디어에서 출간한 스파크 완벽 가이드 1판의 소스코드 저장소. Contribute to FVBros/Spark-The-Definitive-Guide development by creating an account on GitHub.

    github.com

    정적 방식으로 데이터셋 읽기

    val static = spark.read.json("D:/Data/Spark-The-Definitive-Guide-master/data/activity-data/")
    // static: org.apache.spark.sql.DataFrame = 
    // [Arrival_Time: bigint, Creation_Time: bigint ... 8 more fields]
    
    val dataSchema = static.schema
    /* dataSchema: org.apache.spark.sql.types.StructType = 
     * StructType(StructField(Arrival_Time,LongType,true), 
     * StructField(Creation_Time,LongType,true), 
     * StructField(Device,StringType,true), 
     * StructField(Index,LongType,true), 
     * StructField(Model,StringType,true), 
     * StructField(User,StringType,true), 
     * StructField(gt,StringType,true), 
     * StructField(x,DoubleType,true), 
     * StructField(y,DoubleType,true), 
     * StructField(z,DoubleType,true))
     */

    데이터 현황

    static.show(5)
    /* +-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
     * | Arrival_Time|      Creation_Time|  Device|Index| Model|User|   gt|           x|           y|           z|
     * +-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
     * |1424686735090|1424686733090638193|nexus4_1|   18|nexus4|   g|stand| 3.356934E-4|-5.645752E-4|-0.018814087|
     * |1424686735292|1424688581345918092|nexus4_2|   66|nexus4|   g|stand|-0.005722046| 0.029083252| 0.005569458|
     * |1424686735500|1424686733498505625|nexus4_1|   99|nexus4|   g|stand|   0.0078125|-0.017654419| 0.010025024|
     * |1424686735691|1424688581745026978|nexus4_2|  145|nexus4|   g|stand|-3.814697E-4|   0.0184021|-0.013656616|
     * |1424686735890|1424688581945252808|nexus4_2|  185|nexus4|   g|stand|-3.814697E-4|-0.031799316| -0.00831604|
     * +-------------+-------------------+--------+-----+------+----+-----+------------+------------+------------+
     * only showing top 5 rows
     */

    스트리밍 입력 데이터 설정

    구조적 API와 유사한 방식으로 생성/제어

    구조적 스트리밍에서 스키마 추론 기능을 사용하고 싶은 경우: 명시적 설정 필요

    spark.sql.streaming.schemaInference 설정을 true로 설정

    📛운영 환경에서는 스키마 추론 방식을 사용하지 않는 것이 좋음

    val streaming = spark
      .readStream
      .schema(dataSchema)
      .option("maxFilesPerTrigger", 1)
      .json("D:/Data/Spark-The-Definitive-Guide-master/data/activity-data/")
    /* streaming: org.apache.spark.sql.DataFrame = 
     * [Arrival_Time: bigint, Creation_Time: bigint ... 8 more fields]
     */

    스트리밍 DataFrame의 생성/실행: Lazy Evaluation 방식으로 동작

    Action 호출 전에 Transformation 지정해서 데이터셋 변경

    val activityCounts = streaming
      .groupBy("gt")
      .count()
    // activityCounts: org.apache.spark.sql.DataFrame = [gt: string, count: bigint]

    Spark SQL의 기본 셔플 파티션 수는 200이기 때문에 로컬 모드로 수행하기 위해 이를 변경 후 쿼리 시작

    // 파티션 수 설정
    spark.conf.set("spark.sql.shuffle.partition", 5)
    
    // 스트림 쿼리 시작(Action: start())
    val activityQuery = activityCounts
      .writeStream
      .queryName("activity_counts")
      .format("memory")
      .outputMode("complete")
      .start()
    /* activityQuery: org.apache.spark.sql.streaming.StreamingQuery = 
     * org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@4b7c4e0e

    코드 예제 실행을 위해선 다음 코드도 추가해야 함

    awaitTermination(): 쿼리 실행 중 드라이버 프로세스가 종료되는 상황 방지

    activityQuery.awaitTermination();

    memory에 저장한 스트림 처리 결과 확인

    for(i <- 1 to 5){
      spark.sql("SELECT * FROM activity_counts").show()
      Thread.sleep(1000)
    }

    실행 시점에 따라 데이터 변화

    /* 
     * +----------+------+  +----------+------+  +----------+------+  +----------+------+  +----------+------+
     * |        gt| count|  |        gt| count|  |        gt| count|  |        gt| count|  |        gt| count|
     * +----------+------+  +----------+------+  +----------+------+  +----------+------+  +----------+------+
     * |  stairsup|209082|  |  stairsup|219543|  |  stairsup|230004|  |  stairsup|230004|  |  stairsup|240465|
     * |       sit|246159|  |       sit|258467|  |       sit|270775|  |       sit|270775|  |       sit|283083|
     * |     stand|227709|  |     stand|239093|  |     stand|250477|  |     stand|250477|  |     stand|261861|
     * |      walk|265119|  |      walk|278375|  |      walk|291631|  |      walk|291631|  |      walk|304887|
     * |      bike|215949|  |      bike|226746|  |      bike|237543|  |      bike|237543|  |      bike|248340|
     * |stairsdown|187264|  |stairsdown|196623|  |stairsdown|205983|  |stairsdown|205983|  |stairsdown|215342|
     * |      null|208954|  |      null|219400|  |      null|229845|  |      null|229845|  |      null|240291|
     * +----------+------+  +----------+------+  +----------+------+  +----------+------+  +----------+------+
     */ 

     

    스트림 트랜스포메이션

    스트리밍 트랜스포메이션: 정적 DataFrame의 트랜스포메이션을 대부분 포함함

    (모든 유형의 선택&필터, 트랜스포메이션, DataFrame의 모든 함수, 개별 컬럼 처리 등)

    But, 스트리밍 데이터에 맞지 않는 트랜스포메이션에 대한 제약 존재

    Ex: Apache Spark 2.2 - 사용자가 집계하지 않은 스트림 정렬 불가능, 상태 기반 처리 사용하지 않을 시 계층적 집계 불가능

    공식 문서에서 업데이트 내역 확인

    spark.apache.org/docs/latest/structured-streaming-programming-guide.html

     

    Structured Streaming Programming Guide - Spark 3.0.1 Documentation

    Structured Streaming Programming Guide Overview Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on s

    spark.apache.org

    1) 선택과 필터링

    키를 변경하지 않기 때문에 append 출력 모드 사용

    val simpleTransform = streaming.withColumn("stairs", expr("gt like '%stairs%'"))
      .where("stairs")
      .where("gt is not null")
      .select("gt", "model", "arrival_time", "creation_time")
      .writeStream
      .queryName("simple_transform")
      .format("console")
      .start()
    simpleTransform.awaitTermination()

     

    2) 집계

    구조적 API처럼 임의의 집계 연산 지정 가능

    Ex: 스마트폰 모델과 사람의 행동 그리고 가속도 센서를 의미하는 x,y,z 평균에 대해 큐브 집계 사용 가능

    val deviceModelStats = streaming
      .cube("gt", "model")
      .avg()
      .drop("avg(Arrival_time)")
      .drop("avg(Creation_Time)")
      .drop("avg(Index)")
      .writeStream
      .queryName("device_counts")
      .format("console")
      .outputMode("complete")
      .start()
    
    deviceModelStats.awaitTermination()

    결과

    /*
     * -------------------------------------------
     * Batch: 1
     * -------------------------------------------
     * +----------+------+--------------------+--------------------+--------------------+
     * |        gt| model|              avg(x)|              avg(y)|              avg(z)|
     * +----------+------+--------------------+--------------------+--------------------+
     * |      null|nexus4|-0.00601179783148...|-7.22352747272202...|0.003847526633724...|
     * |      null|nexus4|0.001116290402170147|-0.00657669929797...|-0.00918707169429...|
     * |      null|  null|0.001116290402170147|-0.00657669929797...|-0.00918707169429...|
     * |      bike|nexus4|0.020961064903431637|-0.00915222693840...|-0.08538726699652664|
     * |     stand|  null|-3.19865608911239...|4.070066943871052E-4|1.027885098598942...|
     * |       sit|nexus4|-4.84874225480320...|3.237740430277438...|-4.65213713798297E-5|
     * |     stand|nexus4|-3.19865608911239...|4.070066943871052E-4|1.027885098598942...|
     * |stairsdown|  null|0.025383337512333622| -0.0362580763332372| 0.12712694345857248|
     * |  stairsup|  null|-0.02442588571617797|-0.01150252324103...|-0.09947070802664394|
     * |       sit|  null|-4.84874225480320...|3.237740430277438...|-4.65213713798297E-5|
     * |  stairsup|nexus4|-0.02442588571617797|-0.01150252324103...|-0.09947070802664394|
     * |      walk|  null|-0.00371092404649...|0.003353258755646...|0.001107451361787...|
     * |stairsdown|nexus4|0.025383337512333622| -0.0362580763332372| 0.12712694345857248|
     * |      bike|  null|0.020961064903431637|-0.00915222693840...|-0.08538726699652664|
     * |      walk|nexus4|-0.00371092404649...|0.003353258755646...|0.001107451361787...|
     * |      null|  null|-0.00601179783148...|-7.22352747272202...|0.003847526633724...|
     * +----------+------+--------------------+--------------------+--------------------+
     */ 

     

    3) 조인

    스트리밍 DataFrame과 정적 DataFrame의 조인, Stream to Stream Join 지원 등 많은 기능이 있으니 Documentation에서 확인하는 것이 제일 좋음

    spark.apache.org/docs/latest/structured-streaming-programming-guide.html#join-operations

     

    Structured Streaming Programming Guide - Spark 3.0.1 Documentation

    Structured Streaming Programming Guide Overview Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on s

    spark.apache.org

    Stream to static Join의 예

    val historicalAgg = static.groupBy("gt", "model").avg()
    val deviceModelStats = streaming
      .drop("Arrival_Time", "Creation_Time", "Index")
      .cube("gt", "model").avg()
      .join(historicalAgg, Seq("gt", "model"))
      .writeStream
      .queryName("device_counts")
      .format("console")
      .outputMode("complete")
      .start()
    deviceModelStats.awaitTermination()

     

    입력과 출력

    구조적 스트리밍: Apache Kafka, 파일, 테스트 및 디버깅용 소스/싱크 지원

    소스/싱크는 섞어서 사용하는 것 가능

    공식 문서를 주기적으로 확인하는 것이 좋음

    1) 데이터를 읽고 쓰는 장소(소스와 싱크)

    파일 소스와 싱크

    주로 쓰이는 파일 형식: parquet, text, json과 csv 등

    스트리밍에서 파일 소스/싱크와 정적 파일 소스를 사용할 때의 유일한 차이점: 트리거 시 읽을 파일 수 결정

    (maxFilesPerTrigger 옵션)

    모든 파일은 설정된 입력 디렉터리에 원자적으로 추가되어야 함

     

    카프카 소스와 싱크

    Apache Kafka

    • 데이터 스트림을 위한 Pub-Sub 방식의 분산형 시스템
    • 메시지 큐 방식처럼 레코드의 스트림 발행&구독하는 방식으로 사용
    • 발행된 메시지: 내고장성을 보장하는 저장소에 일정 기간 저장됨(주기 설정 가능)
    • 분산형 버퍼로 생각할 수 있음
    • 레코드의 스트림: Topic이라 불리는 카테고리에 저장됨
    • 각 레코드: 키, 값, Timestamp 로 구성됨
    • Topic: 순서를 바꿀 수 없는 레코드로 구성, 레코드의 위치는 Offset이라 부름
    • 데이터를 읽는 동작: 구독, 쓰는 동작: 발행

    🗨 Producer, Broker, Consumer의 개념과 Consumer Group까지 알면 더더욱 좋다!

     

    카프카에 저장된 스트림을 배치 & 스트리밍 방식으로 읽어 Dataframe 생성 가능

     

    2) 카프카 소스에서 메시지 읽기

    읽기 위해 가장 먼저 해야할 일 - 옵션 중 하나 선택

    • assign: 토픽 뿐만 아니라 읽으려는 파티션까지 세밀하게 지정
    • subscribe: 토픽 목록 지정
    • subscribePattern: 토픽 패턴 지정

    메시지를 읽을 경우 이 옵션 중 하나만 지정해야 함

    추가적으로 설정해야 하는 옵션

    • startingOffsets & endingOffsets

    쿼리를 시작할 때 읽을 지점

    가장 작은 오프셋부터 읽는 earliest, 가장 큰 오프셋부터 읽는 latest 중 하나 지정하거나 TopicPartition에 대한 시작 오프셋을 명시한 JSON 문자열을 사용해 지정(JSON에 오프셋을 -2로 지정 시 earliest, -1로 지정 시 latest)

    {"topicA": {"0":23, "1":-1}, "topicB":{"0":-2}}

    스트리밍 쿼리가 시작될 때만 적용, 다시 시작한 쿼리에서는 쿼리가 남긴 오프셋 사용

    쿼리 실행 중 새롭게 발견한 파티션은 earliest 방식으로 읽음

     

    • failOnDataLoss

    데이터 유실(토픽 삭제, 오프셋이 범위 벗어남)이 일어날 때 쿼리 중단 여부 지정. 기본값: true

     

    • maxOffsetsPerTrigger

    특정 트리거 시점에 읽을 오프셋의 전체 개수

     

    카프카 소스 설정 방법

    // 하나의 토픽 구독
    val ds1 = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "topic1")
      .load()
    // 여러 토픽 구독  
    val ds2 = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "topic1,topic2")
      .load()
    // 패턴에 따른 토픽 구독
    val ds3 = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribePattern", "topic.*")
      .load()

    카프카 소스의 각 Row

    • 키: binary
    • 값: binary
    • 토픽: string
    • 패턴: int
    • 오프셋: long
    • 타임스탬프: long

    카프카의 각 메시지는 다양한 방식으로 직렬화될 수 있음.

    스파크 함수 & 사용자 정의 함수를 사용해 메시지를 분석용 구조적 포맷으로 변경 가능

    주로 사용되는 포맷: JSON, Avro

     

    3) 카프카 싱크에 메시지 쓰기

    소스를 통해 읽는 것과 차이가 크지 않음

    kafka.bootstrap.servers 속성 명시 필수 + 옵션으로 토픽 명세, 컬럼 지정 등 수행해야 함

    checkpoint도 필수로 지정해야 함

    // selectExpr에서 읽은 topic 값 그대로 발행 시 사용됨
    ds1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream.format("kafka")
      .option("checkpointLocation", "/to/HDFS-compatible/dir")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .start()
    
    // 옵션을 통해 발행할 토픽 지정
    ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream.format("kafka")
      .option("checkpointLocation", "/to/HDFS-compatible/dir")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("topic", "topic1")
      .start()  

     

    foreach 싱크

    Dataset API의 foreachPartitions와 유사

    각 파티션에서 임의의 연산 병렬 수행(3.0.1 버전 이후에서는 Scala, Java 외에도 Python 사용 가능)

    spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

     

    Structured Streaming Programming Guide - Spark 3.0.1 Documentation

    Structured Streaming Programming Guide Overview Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on s

    spark.apache.org

    ForeachWriter 인터페이스 구현 필수(open, process, close) - 트리거 후 출력을 생성할 때마다 호출됨

    인터페이스 구현 시 명심해야 할 내용

    • writer는 UDF나 Dataset 맵 함수처럼 반드시 Serializable 인터페이스 구현해야 함
    • open, process, close는 각 익스큐터에서 실행됨
    • writer는 연결을 맺거나 트랜잭션을 시작하는 등의 모든 initial 작업을 반드시 open에서 수행해야 함

    임의의 사용자 코드를 실행하는 것이기 때문에 반드시 내고장성을 고려해 사용해야 함

    정확히 한 번 처리할 수 있도록 구조적 스트리밍 API에서 제공하는 파라메터

    1. Open Method: 두 개의 파라메터 사용
      두 파라메터는 처리하려는 로우를 식별하는 고유값
      version: 트리거 별로 단순하게 증가하는 Sequence 값
      partitionId: 태스크의 출력 파티션 ID
      Open Method는 해당 로우 처리 여부를 반환해야 함
      Ex: 이미 version 정보와 partitionId를 저장소에 기록했을 경우 로우를 처리하지 않도록 false 반환
    2. Process Method: Open Method가 true 반환 시 데이터의 레코드마다 호출됨, 데이터를 처리하거나 저장하는 용도로만 사용됨
    3. Close Method: Open Method가 호출되고 장애가 발생하지 않는 한 호출. 스트림 처리 도중 오류가 발생하면 이 method에서 그 오류를 받게 됨. Close Method에서는 열려 있는 자원을 해제해야 함

    ForeachWriter를 사용해 자체 싱크를 효과적으로 구현 가능

     

    import org.apache.spark.sql.ForeachWriter
    
    datasetOfString.writeStream.foreach(new ForeachWriter[String] {
      def open(partitionId: Long, version: Long): Boolean = {
        // 데이터베이스 연결 생성
      }
      def process(record: String) = {
        // 데이터베이스에 문자열 저장
      }
      def close(errorOrNull: Throwable): Unit = {
        // 데이터베이스 연결 종료
      }
    })

     

    테스트용 소스와 싱크

    스트리밍 쿼리가 제대로 만들어졌나 확인할 때 사용할 테스트용 소스/싱크

    운영 환경에서 사용 X(종단 간 내고장성 지원하지 않기 때문)

     

    • 소켓 소스

    TCP 소켓을 통해 스트림 데이터 전송

    데이터를 읽기 위한 호스트와 포트 지정해야 함

    스파크: 해당 주소에서 데이터를 읽기 위해 새로운 TCP 연결 생성

    // Scala
    val socketDF = spark.readStream.format("socket")
      .option("host", "localhost").option("port", 9999).load()
      
    // Terminal
    nc -lk 9999

     

    • 콘솔 싱크

    처리 결과를 콘솔에서 바로 확인하기 위함.

    디버깅에 유용, But 내고장성 지원 안함

    append와 complete 모드만 지원

    activityCounts.writeStream.format("console").outputMode("complete").start()

     

    • 메모리 싱크

    스트리밍 시스템 테스트에 사용됨

    드라이버에 데이터를 모음 -> 대화형 쿼리가 가능한 메모리 테이블에 저장

    내고장성 지원 안함(운영 환경 사용 X)

    append, complete 모드만 지원

    activityCounts.writeStream.format("memory").queryName("my_device_table")

    운영 환경에서 출력 데이터를 테이블에 반드시 저장해야 할 때 file sink(hdfs, s3)에 parquet 형식 등으로 지정해서 일정한 위치에 저장하면 바로 사용 가능

     

    4) 데이터 출력 방법(출력 모드)

    결과 Dataset을 구성하는 방식, 정적 DataFrame에서의 저장 모드와 같은 개념

     

    append 모드

    Default로 설정되는 방식

    새로운 Row 결과 테이블에 추가 -> 사용자가 명시한 트리거에 맞춰 싱크로 출력

    내고장성을 보장하는 싱크를 사용한다는 가정 하에 모든 로우 한 번만 출력

     

    complete 모드

    결과 테이블의 전체 상태 싱크로 출력

    모든 데이터가 계속해서 변경될 수 있는 일부 상태 기반 데이터를 다룰 때 유용

    사용 중인 싱크가 저수준 업데이트 지원하지 않을 시 유용

     

    update 모드

    이전 출력 결과에서 변경된 로우만 싱크로 출력

    싱크는 반드시 저수준 업데이트 지원해야 함

     

    적절한 출력 모드 선택

    쿼리 유형별로 사용할 수 있는 출력 모드 정해져 있음

    Ex: map 연산만 수행하는 쿼리 - complete 모드 사용 불가

    (잡이 시작된 이후에 생성된 모든 레코드 저장, 전체 결과를 테이블에 다시 써야 하기 때문)

    선택한 출력 모드를 사용하지 못하는 경우 스트림을 시작할 때 예외 발생

    3.0.1에서의 사용 가능 출력 모드

    쿼리 타입 쿼리 타입(상세) 사용 가능 출력 모드 주의사항

    집계 쿼리

    워터마크와 이벤트 시간을 이용하는 집계 Append, Update, Complete

    Append mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in withWatermark() as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed). See the Late Data section for more details.

    Update mode uses watermark to drop old aggregation state.

    Complete mode does not drop old aggregation state since by definition this mode preserves all data in the Result Table.

    Other aggregations Complete, Update

    Since no watermark is defined (only defined in other category), old aggregation state is not dropped.

    Append mode is not supported as aggregates can update thus violating the semantics of this mode.

    mapGroupsWithState 쿼리 Update  

    flatMapGroupsWithState 쿼리

    Append 운영 모드 Append

    flatMapGroupsWithState 이후 집계 허용

    Update 운영 모드 Update

    flatMapGroupsWithState 이후 집계 허용 X

    Queries with joins Append

    Update and Complete mode not supported yet. See the support matrix in the Join Operations section for more details on what types of joins are supported.

    Other queries Append, Update

    결과 테이블에 집계하지 않은 전체 결과를 유지하는 것은 불가능하므로 complete 모드 사용할 수 없음

     

    5) 데이터 출력 시점

    데이터를 싱크로 출력할 때의 시점 제어: 트리거 설정

    구조적 스트리밍: 직전 트리거가 처리를 마칠 경우 즉시 데이터 출력

    많은 수정 발생할 경우 데이터 출력 시 트리거 사용

    싱크에 큰 부하 발생 현상 방지, 출력 파일의 크기 제어

    주기형 트리거, 일회성 트리거 제공

    처리 시간 기반 트리거

    처리 주기를 문자열(스칼라의 Duration이나 자바의 TimeUnit 사용 가능)로 단순 지정

    import org.apache.spark.sql.streaming.Trigger
    
    activityCounts.writeStream.trigger(Trigger.ProcessingTime("100 seconds"))
      .format("console").outputMode("complete").start()

    ProcessingTime Trigger: 특정 주기만큼 여러 번 대기

    Ex: 1분 주기의 트리거 사용 시 12:00, 12:01, 12:02에 트리거 동작

    If) 이전 처리를 끝내지 못해 트리거 시간 놓치는 경우 -> 이전 처리 종료 후 즉시 트리거 동작 X, 다음 트리거 시점까지 대기

     

    일회성 트리거

    스트리밍 잡을 일회성으로 실행 트리거 설정 가능

    장점

    개발 중 트리거에서 한 번에 처리할 수 있는 수준의 데이터로 애플리케이션 테스트 가능

    운영 환경에서 자주 수행되지 않는 잡(Ex: 요약 테이블에 신규 데이터 반영 처리)을 수동으로 실행 시 사용 가능

    일회성으로 실행해도 처리된 모든 입력 파일, 연산 과정의 상태 정보 확인 가능

    databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html

     

    Running Streaming Jobs Once a Day For 10x Cost Savings

    This is the sixth post in a multi-part series about how you can perform complex streaming analytics using Apache Spark. The new “Run Once” trigger feature added to Structured Streaming in Spark 2.2 affords the benefits of the Catalyst Optimizer increme

    databricks.com

    import org.apache.spark.sql.streaming.Trigger
    
    val activityCounts = streaming.groupBy("gt").count()
    spark.conf.set("spark.sql.shuffle.partition", 5)
    val onceTest = activityCounts
      .writeStream
      .trigger(Trigger.Once())
      .format("console")
      .outputMode("complete")
      .start()
    
    onceTest.awaitTermination()

     

    스트리밍 Dataset API

    Dataset 사용 이유: 타입 안정성을 제공하면서 Dataframe과 같은 연산 수행

    스트리밍 Dataframe -> Dataset 변경 가능

    Dataset의 요소: Scala Case Class, Java Bean Class 등

    case class Flight(DEST_COUNTRY_NAME: String, ORIGIN_COUNTRY_NAME: String, count: BigInt)
    // defined class Flight
    
    val dataSchema = spark
      .read
      .parquet("D:/Data/Spark-The-Definitive-Guide-master/data/flight-data/parquet/2010-summary.parquet/")
    // dataSchema: org.apache.spark.sql.DataFrame = 
    // [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
    
    val dataSchema = spark
      .read
      .parquet("D:/Data/Spark-The-Definitive-Guide-master/data/flight-data/parquet/2010-summary.parquet/")
      .schema
    // dataSchema: org.apache.spark.sql.types.StructType = 
    // StructType(StructField(DEST_COUNTRY_NAME,StringType,true), 
    // StructField(ORIGIN_COUNTRY_NAME,StringType,true), 
    // StructField(count,LongType,true))
    
    val flightsDF = spark
      .readStream
      .schema(dataSchema)
      .parquet("D:/Data/Spark-The-Definitive-Guide-master/data/flight-data/parquet/2010-summary.parquet/")
    // flightsDF: org.apache.spark.sql.DataFrame = 
    // [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
    
    val flights = flightsDF.as[Flight]
    // flights: org.apache.spark.sql.Dataset[Flight] = 
    // [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
    
    
    def originIsDestination(flight_row: Flight): Boolean = {
      return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME
    }
    // originIsDestination: (flight_row: Flight)Boolean
    
    flights
      .filter(flight_row => originIsDestination(flight_row))
      .groupByKey(x=>x.DEST_COUNTRY_NAME)
      .count()
      .writeStream
      .queryName("device_counts")
      .format("memory")
      .outputMode("complete")
      .start()
    // res2: org.apache.spark.sql.streaming.StreamingQuery = 
    // org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@3bc079f1

    댓글

Designed by Tistory.