-
[스파크 완벽 가이드] Chapter 16 - 스파크 애플리케이션 개발하기Dev/Spark 2020. 11. 28. 23:56
* 해당 포스트는 "스파크 완벽 가이드" 책을 읽고 난 이후의 정리 내용입니다.
자세한 내용은 "스파크 완벽 가이드" 책을 통해 확인해주세요.
1. 스파크 애플리케이션 작성하기
Spark 어플리케이션: Spark 클러스터와 사용자 코드
예제: 클러스터 모드-로컬 모드, 사전에 정의된 어플리케이션을 사용자 코드로 사용
Spark 가 지원하는 언어: Scala, Python, Java 등
➕R에서 Spark 애플리케이션을 수행하는 방법
community.rstudio.com/t/how-to-run-r-files-using-spark-submit-from-cdsw-terminal/23678
1.1 스칼라(+SBT)
일반 스칼라 애플리케이션을 개발하는 방법과 크게 다르지 않음
빌드 도구: sbt, maven 등 사용 가능
sbt에 포함되어야 할 내용들
- 프로젝트 메타데이터(패키지명, 패키지 버전 정도 등)
- 라이브러리 의존성을 관리하는 장소
- 라이브러리에 포함된 의존성 정보
sbt 홈페이지에서 관련 정보 찾기 가능
www.scala-sbt.org/1.x/docs/Basic-Def.html
책의 템플릿
github.com/databricks/Spark-The-Definitive-Guide/blob/master/project-templates/scala/build.sbt
build.sbt(예시)
프로젝트의 디렉토리(패키지) 구조
소스 코드
실행
1.2 Python
클러스터에서 스크립트를 실행하기만 하면 된다.
코드 재사용을 위해 여러 python 파일들을 화나의 egg, zip으로 압축함
spark-submit의 -py-files 인수로 .py, .zip, .egg 등을 추가 지정 -> 애플리케이션과 함께 배포 가능
코드 실행: main 클래스 역할을 하는 python 파일 작성 필요
main.py
실행
1.3 JAVA(+maven)
스칼라에서의 작성 방법과 매우 유사
pom.xml
SimpleExample.java
실행
🆓Scala(혹은 Java)에서 Spark 의존성을 추가하는 다른 방법
Gradle
docs.gradle.org/current/userguide/scala_plugin.html
medium.com/@yanggao1119/using-gradle-to-create-a-simple-java-spark-application-f2a2b6460e8c
medium.com/@faizanahemad/apache-spark-setup-with-gradle-scala-and-intellij-2eeb9f30c02a
2. 스파크 애플리케이션 테스트
Spark 어플리케이션 테스트 - 애플리케이션을 작성할 때 몇 가지 핵심 원칙과 구성 전략을 고려해야 함
2.1 전략적 원칙
데이터 파이프라인과 어플리케이션에 대한 테스트 코드 개발은 실제 어플리케이션 개발만큼이나 중요
테스트 코드는 미래에 발생할 수 있는 데이터, 로직 그리고 결과 변화에 유연하게 대처 가능하게 함
입력 데이터에 대한 유연성
데이터 파이프라인은 다양한 유형의 입력 데이터에 유연하게 대처 가능해야 함
비즈니스 요구사항이 변하면 데이터도 변하므로 Spark 어플리케이션과 파이프라인은 입력 데이터 중 일부가 변하더라도 유연하게 대처할 수 있어야 함 or 오류 상황을 적절하고 유연하게 제어할 수 있어야 함
따라서 입력 데이터로 인해 발생할 수 있는 다양한 예외 상황을 테스트하는 코드를 작성 필요
비즈니스 로직 변경에 대한 유연성
입력 데이터뿐만 아니라 파이프라인 내부의 비즈니스 로직이 바뀔 수도 있습니다. 예상했던 원형 데이터의 형태가 실제 원형 데이터와 같은지 확인하고 싶을겁니다. 이는 원하는 결과를 얻을 수 있도록 실제 유사한 데이터를 사용해 비즈니스 로직을 꼼꼼하게 테스트해야합니다.
이 유형의 데스트에선 Spark 가 가진 기능을 테스트하는 Spark 단위 테스트를 작성하지 않도록 조심해야 합니다. 대신 비즈니스 로직을 테스트해 복잡한 비즈니스 파이프라인이 의도한 대로 동작하는지 반드시 확인해야 합니다.
결과의 유연성과 원자성
결과 데이터가 스키마에 맞는 적절한 형태로 반환되는 지 확인
단순히 데이터를 특정 경로에 저장해 놓고 전혀 사용하지 않는 경우는 없음 -> 대부분의 Spark 파이프라인에 데이터의 상태, 즉 데이터가 얼마나 자주 갱신되는지 데이터가 완벽한지, 마지막 순간에 데이터가 변경되지는 않았는지 등을 이해할 수 있도록 만들어야 함
2.2 테스트 코드 작성 시 고려사항
적절한 단위 테스트 작성 후 입력 데이터나 구조가 변경되어도 비즈니스 로직이 정상적으로 동작하는지 확인해야 함
단위 테스트 - 스키마가 변경되는 상황에 쉽게 대응 가능.
단위 테스트의 구성 방법 - 비즈니스 도메인과 도메인 경험에 따라 다양할 수 있으므로 개발자 역량에 달려있다
(정해져 있지 않음)
SparkSession 관리하기
Spark 로컬 모드 & JUnit 이나 ScalaTest 같은 단위 테스트용 프레임워크 - 비교적 쉽게 Spark 코드를 테스트 가능
단위 테스트 하네스의 일부로 로컬 모드의 SparkSession 을 만들어 사용
이 테스트 방식이 잘 동작하려면 Spark 코드에 의존성 주입 방식으로 SparkSession 을 관리하도록 만들어야 함
즉, SparkSesion 을 한 번만 초기화하고 런타임 환경에서 함수와 클래스에 전달하는 방식을 사용하면 테스트 중에 SparkSession 을 쉽게 교체할 수 있음
테스트 코드용 스파크 API 선정하기
API 유형에 상관없이 각 함수의 입력과 출력 타입을 문서로 만들고 테스트해야 함
타입 안정성 API 사용: 함수가 가지는 최소한의 규약을 지켜야 하므로 다른 코드에서 재사용하기 쉬움
DataFrame 이나 SQL 을 사용 시 혼란을 없애기 위해 각 함수의 입력 타입과 출력 타입을 문서로 만들고 테스트해야 함
저수준 RDD API 는 정적 데이터 타입을 사용, 파티셔닝 같은 저수준 API 의 기능이 필요한 경우에만 사용
Dataset API 를 사용 - 성능을 최적화할 수 있음, 업데이트 등으로 더 많은 성능 최적화 방식을 제공할 가능성이 높음
2.3 단위 테스트 프레임워크에 연결하기
단위 테스트: 각 언어의 표준 프레임워크를 사용(JUnit, ScalaTest 등)
테스트 하네스: 각 테스트마다 SparkSession 을 생성 & 제거 설정(각 프레임 워크는 SparkSession 생성과 제거를 수행할 수 있는 메커니즘을 제공)
2.4 데이터소스 연결하기
테스트 코드에서는 운영 환경의 데이터소스에 접속하지 말아야 함(개발기 - 운영기 따로 사용하는 것이 좋음)
Ex) 비즈니스 로직을 가진 함수가 데이터소스에 직접 접근하지 않고 DataFrame 이나 Dataset 을 넘겨받아 작업 수행
이렇게 생성된 함수를 재사용하는 코드는 데이터소스의 종류에 상관없이 같은 방식으로 동작함
Spark 의 구조적 API 를 사용 시 - 이름이 지정된 테이블 사용(더미 데이터셋에 이름 붙여 테이블로 등록 후 사용)
3. 개발 프로세스
기존 개발 프로세스와 유사
로컬 머신에서 실행: spark-shell 과 Spark 가 지원하는 다른 언어용 쉘을 사용해 어플리케이션 개발에 활용
대부분의 쉘은 대화형 어플리케이션을 개발할 때 사용
클러스터: spark-submit 명령과 같이 Spark 클러스터에 운영용 애플리케이션을 실행할 때 사용
이 모드로 실행할 수 있는 쉘에는 PySpark, Spark SQL, SparkR 등이 존재
어플리케이션을 개발하고 실행할 패키지나 스크립트를 만들고 나면 spark-submit 명령으로 클러스터에 제출 가능
4. 애플리케이션 시작하기
spark-submit 명령으로 실행하는 것이 일반적인 방법
spark.apache.org/docs/latest/submitting-applications.html
run.sh
Spark-submit 도움말
파라메터 설명 --master MASTER_URL spark://host:port, mesos://host:port, yarn 또는 local 지정 --deploy-mode DEPLOY_MODE 드라이버 프로그램을 로컬에서 실행(client), 워커 머신 중 하나에서 실행(cluster) 중 선택
Default: client--class CLASS_NAME 사용자 어플리케이션의 메인 클래스를 지정(JAVA/Scala) --name NAME 어플리케이션의 이름 지정 --jars JARS 드라이버와 익스큐터의 클래스패스에 포함될 로컬 JAR 파일을 콤마로 구분된 목록으로 추가 --packages 드라이버와 익스큐터의 클래스패스에 포함될 메이븐 의존성 정보를 콤마로 구분된 목록으로 지정
로컬 저장소 검색 후 해당 의존성 라이브러리가 없는 경우 메이븐 중앙 저장소나 --repositories에 명시된 원격 저장소 검사. groupId:artifactId:version 포맷으로 표기--exclude-packages --packages 에 명시된 의존성 라이브러리 중에서 충돌 방지를 위해 제외해야 하는 목록을 콤마로 구분된 목록으로 지정 --repositories --packages 에 지정된 의존성 라이브러리를 검색할 때 사용할 원격 메이븐 저장소를 콤마로 구분된 목록으로 지정 --py-files PY_FILES Python 어플리케이션 실행 시 PYTONPATH 에 추가할 .zip, .egg, .py 파일을 지정 --files FILES 각 익스큐터의 작업 디렉토리에 위치할 파일을 콤마로 구분된 목록으로 지정 --conf PROP=VALUE 임의의 Spark 환경 설정 속성값을 지정 --properties-file FILE 부가적인 속성 정보를 읽어 들일 파일의 경로 지정. 지정하지 않으면 conf/spark-defaults.conf 파일을 참조 --driver-memory MEM 드라이버에서 사용할 메모리를 지정. Default: 1024 MB --driver-java-options 드라이버에서 지정할 부가적인 Java 옵션 정보를 지정 --driver-library-path 드라이버에 지정할 부가적인 라이브러리 경로를 지정 --driver-class-path 드라이버에 지정할 부가적인 클래스패스를 지정. --jars 에 지정한 JAR 파일은 자동으로 클래스패스에 추가됨. --executor-memory MEM 각 익스큐터에서 사용할 메모리를 지정. Default: 1 GB --proxy-user NAME 어플리케이션을 제출할 때 위장용으로 사용할 사용자 이름을 지정. 이 인수는 --principal / --keytab 인수와 함께 사용할 수 없음 --help, -h 도움말 출력 --verbose, -v 추가적인 디버그 메세지를 함께 출력 --version 사용중인 Spark 버전을 출력 배포 환경별 설정
클러스터 매니저 지원 모드 설정 설명 스탠드얼론 Cluster --driver-cores NUM 드라이버에서 사용할 코어 수를 지정. Default: 1 스탠드얼론/메소스 Clstuer --supervise 이 옵션을 지정 -> 드라이버 실패 시 재시작 스탠드얼론/메소스 Cluster kill SUBMISSION_ID 지정한 드라이버를 강제 종료 스탠드얼론/메소스 Cluster --status SUBMISSION_ID 지정한 드라이버의 상태를 조회 스탠드얼론/메소스 공통 --total-executor-cores NUM 전체 익스큐터가 사용할 총 코어 수 지정 스탠드얼론/YARN 공통 --executor-cores NUM1 익스큐터별로 사용할 코어 수를 지정.
YARN 모드- Default: 1.
스탠드 얼론 모드- 사용 가능한 전체 워커의 코어 수를 기본값으로 사용YARN 공통 --driver-cores NUM 드라이버에서 사용할 코어 수 지정.
클러스터 모드에서만 사용 가능, Default: 1YARN 공통 queue QUEUE_NAME 잡을 제출할 YARN 의 큐 이름 지정. Default: "default" YARN 공통 --num-executors NUM 실행할 익스큐터 수를 지정. Default: 2. 동적 할당 옵션이 활성화 되어 있으면 초기 익스큐터 수는 NUM 에 지정된 수 이상으로 생성됨 YARN 공통 --archives ARCHIVES 각 익스큐터의 작업 디렉토리에 압축을 해제할 아카이브의 목록을 콤마로 구분하여 지정 YARN 공통 --principal PRINCIPAL 보안이 활성화된 HDFS 를 사용하는 경우 KDC 에 로그인할 때 사용할 보안 주체 지정 YARN 공통 --keytab KEYTAB 보안 주체에 대한 keytab 파일이 저장된 전체 경로 지정. keytab 파일은 주기적으로 로그인 티켓과 위임 토큰을 갱신해 보안통신 분산 캐시를 통해 어플리케이션 마스터를 실행하는 노드로 복사 4.1 애플리케이션 시작 예제
Jar 사용 예
python
5. 애플리케이션 환경 설정
Spark 는 다양한 환경 설정 제공.
각 설정들
- 애플리케이션 속성
- 런타임 환경
- 셔플 동작 방식
- 스파크 UI
- 압축 & 직렬화
- 메모리 관리
- 처리 방식
- 네트워크 설정
- 스케쥴링
- 동적 할당
- 보안
- 암호화
- 스파크 SQL
- 스파크 스트리밍
- SparkR
Spark 시스템 설정 방법
- Spark 속성: 대부분의 어플리케이션 파라미터를 제어, SparkConf 객체를 사용해 설정 가능
- Java 시스템 속성
- 하드코딩된 환경 설정 파일
Spark 의 /conf 디렉토리: 사용 가능한 여러 종류의 템플릿 파일 존재
어플리케이션을 개발 시 템플릿의 설정값을 하드코딩할 수 있으며
템플릿에 속성 값을 지정해 런타임에 사용할 수 있음
➕속성 적용의 우선순위
1) App 내의 소스 코드에 하드코딩된 속성
2) Spark-submit 시의 속성 설정
3) spark-defaults.conf 파일
4) Spark Conf 내의 속성
5.1 SparkConf
소스 코드 내에서 config 설정
실행 시 구성
시간 주기 형태의 속성값의 포맷
- 25ms (milliseconds)
- 5s (seconds)
- 10m or 10min (minutes)
- 3h (hours)
- 5d (days)
- 1y (years)
5.2 애플리케이션 속성
spark-submit 옵션이나 Spark 어플리케이션 개발 시 설정 가능
애플리케이션 속성: 기본 어플리케이션 메타데이터와 일부 실행 특성을 정의
현재 지원하는 어플리케이션 속성 목록
속성명 기본값 의미 spark.app.name (none) 사용자 어플리케이션의 이름을 지정
이 이름은 Spark UI 와 로그 데이터에서 확인 가능spark.driver.cores 1 드라이버 프로세스에서 사용할 코어 수 지정
🛑클러스터 모드에서만 사용 가능spark.driver.maxResultSize 1g 스파크 액션에 대한 직렬화된 결과의 최대 크기
최솟값은 1M, 0 으로 설정하는 경우 무제한
총 결과 크기가 이 제한을 넘어가는 경우 - 잡 종료
너무 큰 값으로 지정 시 드라이버에서 OutOfMemoryError 가 발생할 수 있음spark.driver.memory 1g SparkContext 가 초기화되는 드라이버 프로세스에서 사용할 총 메모리의 크기 지정
client 모드 - 해당 시점에 이미 드라이버 JVM 이 실행되고 있기 때문에 어플리케이션 구현 시 SparkConf 에 직접 설정해야 함
명령행의 --driver-memory 옵션이나 기본 속성 파일에 지정 가능spark.executor.memory 1g 각 익스큐터 프로세스에 사용할 메모리의 크기 지정
spark.extraListeners (none) SparkListener 를 상속받아 구현한 클래스를 콤마로 구분된 목록으로 지정
SparkContext 초기화 시점에 이 클래스의 인스턴스가 생성되어 Spark 의 Listener Bus 에 등록됨spark.logConf FALSE SparkContext 가 시작될 때 SparkConf 에 포함된 정보를 INFO 로그로 출력
spark.master (none) 연결할 클러스터 매니저 지정
spark-submit의 --master 속성으로 사용 가능한 master URL과 동일spark.submit.deployMode (none) Spark 드라이버 프로그램의 배포 모드 지정
client, cluster 사용 가능spark.log.callerContext (none) YARN/HDFS 환경에 실행할 때 YARN RM 로그나 HDFS 감사 로그에 기록할 애플리케이션 정보를 지정
이 설정값의 길이는 Hadoop 설정인 hadoop.caller.context.max.size 의 설정값에 따라 달라짐spark.driver.supervise FALSE 값이 true 인 경우 종료 상태가 0 이 아니면 자동으로 드라이버 재시작
Spark 스탠드얼론 모드나 메소스 클러스터 모드에서만 사용 가능5.3 런타임 속성
런타임 환경 설정 - 드라이버와 익스큐터를 위한 추가 클래스패스, 로그 관련 속성 등 정의 가능
Documentation
spark.apache.org/docs/latest/configuration.html#runtime-environment
5.4 실행 속성
실행 속성과 관련된 설정값: 실제 처리를 더욱 세밀하게 제어 가능(자주 사용하게 됨)
자주 사용하게 되는 속성: spark.executor.cores(각 익스큐터의 코어 수), spark.files.maxPartitionBytes(파일 읽기 시 파티션의 최대 크기) 등
Documentation
spark.apache.org/docs/latest/configuration.html#runtime-environment
5.5 메모리 관리 설정
사용자 어플리케이션을 최적화할 때 메모리 옵션을 수동으로 관리해야하는 경우에 사용
대다수 설정은 메모리 자동 관리 기능 추가로 인해 2.x 버전에서 제거된 개념, 혹은 세밀한 제어를 위한 설정
Documentation에서 확인 가능
spark.apache.org/docs/latest/configuration.html#memory-management
5.6 셔플 동작방식 설정
과도한 네트워크 부하 - Spark Job에서 셔플이 큰 병목 구간이 될 수 있음
셔플 동작 방식을 제어하기 위한 고급 설정 - Documentation에서 확인
spark.apache.org/docs/latest/configuration.html#shuffle-behavior
5.7 환경변수
$SPARK_HOME의 conf/spark-env.sh 파일에 읽은 환경변수로 특정 Spark 설정을 구성 가능
스탠드얼론, 메소스 모드 - 이 파일로 머신에 특화된 정보를 제공 가능
spark-env.sh: 로컬 어플리케이션이나 제출용 스크립트를 실행할 때 같이 적용됨
Spark를 설치할 경우 자동 생성되는 것이 아님. But, conf/spark-env.sh.template 파일 존재(템플릿 파일)
spark-env.sh에 설정 가능한 변수들
- JAVA_HOME
- Java 가 설치된 경로(기본 PATH에 자바 경로가 포함되지 않는 경우)
- PYSPARK_PYTHON
- PySpark 의 드라이버와 워커 모두에 사용할 Python 실행 명령 지정(default: python2.7)
- spark.pyspark.python 속성: PYSPARK_PYTHON 보다 높은 우선순위를 가짐
- PYSPARK_DRIVER_PYTHON
- 드라이버에서 PySpark 를 사용하기 위해 실행 가능한 Python 바이너리 지정
- Default: PYSPARK_PYTHON
- spark.pyspark.driver.python 속성: PYSPARK_DRIVER_PYTHON보다 높은 우선순위를 가짐
- SPARKR_DRIVER_R
- SparkR 쉘에서 사용할 R 바이너리 실행 명령 지정
- Default: R
- spark.r.shell.command 속성: SPARKR_DRIVER_R보다 높은 우선순위를 가짐
- SPARK_LOCAL_IP
- 머신의 IP 주소 지정
- SPARK_PUBLIC_DNS
- Spark 프로그램이 다른 머신에 알려줄 호스트명
각 머신이 사용할 코어 수, 최대 메모리 크기 같은 Spark 스탠드얼론 클러스터 설정과 관련된 옵션 또한 설정 가능
spark-env.sh 파일은 쉘 스크립트이므로 프로그래밍 방식으로 일부 값을 설정 가능
Ex) 특정 네트워크 인터페이스의 IP 를 찾아 SPARK_LOCAL_IP 변수의 값을 설정할 수 있음
5.8 잡 스케쥴링
Spark 어플리케이션에서 별도의 스레드를 이용해 여러 잡을 동시에 실행 가능
잡: 해당 액션을 수행하기 위해 실행되어야 할 모든 태스크와 Spark 액션
Spark 스케줄러: 스레드 안정성을 충분히 보장, 여러 요청을 동시에 처리할 수 있는 어플리케이션을 만들 수 있음
Spark 의 스케줄러 작동 방식: FIFO, 큐의 전단에 있는 잡이 클러스터의 전체 자원을 사용하지 않으면 이후 잡을 바로 실행할 수 있음(하지만 큐의 전단에 있는 잡이 너무 크면 이후 잡은 매우 늦게 실행됨)
여러 Spark 잡이 자원을 공평하게 나눠 쓰도록 구성 가능
Spark 는 모든 잡이 클러스터 자원을 거의 동일하게 사용할 수 있도록 라운드-로빈(Round-Robin) 방식으로 여러 Spark 잡의 태스크를 할당
장시간 수행되는 Spark 잡이 처리되는 중에 짧게 끝난 Spark 잡이 제출된 경우 즉시 장시간 수행하는 Spark 잡의 자원을 할당받아 처리 -> 장시간 수행되는 Spark 잡의 종료를 기다리지 않고 빠르게 응답 가능.
사용자가 많은 환경에 적합
페어 스케줄러(Fair Scheduler) SparkContext 를 설정할 때 spark.scheduler.mode 속성을 FAIR 로 지정 후 사용
페어 스케줄러는 여러 개의 잡을 풀로 그룹화하는 방식도 지원, 개별 풀에 다른 스케줄링 옵션이나 가중치를 설정 가능
더 중요한 Spark 잡을 할당할 수 있도록 우선순위가 높은 풀을 만들 수 있음.
각 사용자의 Spark 잡을 그룹화하는 것도 가능 - 동시에 실행하는 잡수를 고려하지 않고 모든 사용자가 같은 양의 자원을 사용하도록 설정
Spark 의 페어 스케줄러는 Hadoop 의 페어 스케줄러 모델을 본떠서 만들어짐
사용자가 명시적으로 풀을 지정하지 않으면 Spark 는 새로운 잡을 default 풀에 할당
잡을 제출하는 스레드에서 SparkContext 의 로컬 속성인 spark.scheduler.pool 속성을 설정해 풀을 지정
지정 방법
sc.setLocalProperty("spark.scheduler.pool", "pool1")
이 스레드에서 제출하는 모든 잡은 이 풀을 사용
이 설정은 사용자를 대신해 스레드가 여러 잡을 쉽게 실행할 수 있도록 스레드별로 지정 가능
스레드에 연결된 풀을 초기화하고 싶을 경우 spark.scheduler.pool 속성의 값을 null 로 지정
spark.apache.org/docs/latest/job-scheduling.html
'Dev > Spark' 카테고리의 다른 글
[스파크 완벽 가이드] Chapter 21 - 구조적 스트리밍의 기초 (0) 2020.12.20 [스파크 완벽 가이드] Chapter 20 - 스트림 처리의 기초 (0) 2020.12.12 [스파크 완벽 가이드] Chapter 12 - RDD (0) 2020.11.15 [스파크 완벽 가이드] Chapter 11 - Dataset (0) 2020.11.15 [스파크 완벽 가이드] 목차 (0) 2020.10.27