-
[Spark Source code 훑어보기] 소스 코드 ImportDev/Spark 2021. 1. 25. 02:02
* 평어체로 작성되었습니다.
Spark?
Java, Scala, Python, R에서 사용할 수 있는 고 수준의 API를 제공하는 대규모 데이터 처리를 위한 통합 분석 엔진이다.
Spark SQL, MLlib, GraphX, Structured Streaming 등의 다양한 고 수준의 툴들을 제공한다.
소스 코드는 어디에 있을까?
Spark의 소스 코드 깃허브 주소는 다음과 같다.
https://github.com/apache/spark
들어가면 다음과 같은 화면을 볼 수 있다.
소스 코드를 다운로드하는 방법은 두 가지로 나뉜다.
1) 웹 페이지에서 Code -> Download ZIP 버튼 클릭
2) Terminal에서 Clone
다운로드가 완료되면 이제 이 프로젝트를 IDE에서 확인해보자
다운로드가 완료되면 IDE에서 Spark 프로젝트를 읽는다. (Intellij 커뮤니티 버전을 사용했다.)
Spark 프로젝트는 Maven, Sbt 빌드 툴을 사용할 수 있도록 설정되어 있다. 일단 Maven을 선택해서 읽었다.
Maven pom.xml에 적힌 의존성들을 추가하며 프로젝트를 읽고 있다.
모든 과정이 끝나면 다음 화면과 같이 프로젝트가 추가된다.
먼저 pom.xml을 들여다 보자.
아파치 스파크 프로젝트는 여러 모듈들로 나누어 구성되어 있다.
각 모듈들은 실제 프로젝트에서 다음과 같이 의존성에서 추가되어 사용된다.
예로 하나 찾아보자.
스파크 프로그램에서 Session을 설정할 때 다음의 object를 확인하면 내부 로직을 확인할 수 있다.
sql/core/src/scala/org/apache/spark/scala/sql/SparkSession
해당 오브젝트는 상단의 클래스와 같이 작성되어 동반자 객체라 불린다(요건 나중에...).
실제 세션을 불러오거나, 생성하는 getOrCreate 함수는 다음과 같이 구성되어 있다.
더보기해당 함수는 SparkSession이 이미 존재한다면 기존의 세션을 반환하고,
함수가 존재하지 않으면 SparkSession을 생성하는 역할을 하는 함수이다.
def getOrCreate(): SparkSession = synchronized { val sparkConf = new SparkConf() options.foreach { case (k, v) => sparkConf.set(k, v) } if (!sparkConf.get(EXECUTOR_ALLOW_SPARK_CONTEXT)) { assertOnDriver() } // Get the session from current thread's active session. var session = activeThreadSession.get() if ((session ne null) && !session.sparkContext.isStopped) { applyModifiableSettings(session) return session } // Global synchronization so we will only set the default session once. SparkSession.synchronized { // If the current thread does not have an active session, get it from the global session. session = defaultSession.get() if ((session ne null) && !session.sparkContext.isStopped) { applyModifiableSettings(session) return session } // No active nor global default session. Create a new one. val sparkContext = userSuppliedContext.getOrElse { // set a random app name if not given. if (!sparkConf.contains("spark.app.name")) { sparkConf.setAppName(java.util.UUID.randomUUID().toString) } SparkContext.getOrCreate(sparkConf) // Do not update `SparkConf` for existing `SparkContext`, as it's shared by all sessions. } applyExtensions( sparkContext.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty), extensions) session = new SparkSession(sparkContext, None, None, extensions, options.toMap) setDefaultSession(session) setActiveSessionInternal(session) registerContextListener(sparkContext) } return session }
이런 식으로 파고들다 보면 스파크 세션의 생성, Dataset과 Dataframe의 사용을 통한 쿼리 생성, 출력의 전 영역의 내부 로직을 다 뜯어볼 수 있지 않을까 싶다.
'Dev > Spark' 카테고리의 다른 글
[스파크 완벽 가이드] Chapter 21 - 구조적 스트리밍의 기초 (0) 2020.12.20 [스파크 완벽 가이드] Chapter 20 - 스트림 처리의 기초 (0) 2020.12.12 [스파크 완벽 가이드] Chapter 16 - 스파크 애플리케이션 개발하기 (0) 2020.11.28 [스파크 완벽 가이드] Chapter 12 - RDD (0) 2020.11.15 [스파크 완벽 가이드] Chapter 11 - Dataset (0) 2020.11.15