- Spark 어플리케이션: Spark 클러스터와 사용자 코드
- Spark가 지원하는 언어: Scala, Python, Java 등
-
일반 스칼라 애플리케이션을 개발하는 방법과 크게 다르지 않음
-
빌드 도구: sbt, maven 등 사용 가능
-
SBT : 스칼라에 사용하기 편한 기능을 많이 제공하는 빌드 툴
-
스칼라는 JVM 위에서 동작하는 언어이며 컴파일 되어야 하는 언어이기 때문에 빌드시 폴더 구조가 복잡하며 특히 많은 패키지의 의존성 및 프로젝트 형상유지를 위해서 sbt 라는 빌드 툴을 사용한다.
-
sbt 템플릿 link
-
디렉토리 구조
src/
main/
resources/
<JAR 파일에 포함할 파일들>
scala/
<메인 스칼라 소스 파일>
java/
<메인 자바 소스 파일>
test/
resources
<테스트 JAR에 포함할 파일들>
scala/
<테스트용 스칼라 소스 파일>
java/
<테스트용 자바 소스 파일>
- 소스 코드
object DataFrameExample extends Serializable {
def main(args: Array[String]) = {
val pathToDataFolder = args(0)
// 설정값 지정 -> SparkSession 시작
val spark = SparkSession.builder().appName("Spark Example")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.getOrCreate()
// udf 등록
spark.udf.register("myUDF", someUDF(_:String):String)
val df = spark.read.json(pathToDataFolder + "data.json")
val manipulated = df.groupBy(expr("myUDF(group)")).sum().collect()
.foreach(x => println(x))
}
}
- 실행
$SPARK_HOME/bin/spark-submit \
--class com.databricks.example.DataFrameExample \
--master local \
target/scala-2.11/example_2.11-0.1-SNAPSHOT.jar "hello"
-
클러스터에서 스크립트를 실행하기만 하면 된다.
-
코드 재사용을 위해 여러 python 파일들을 화나의 zip으로 압축함
-
spark-submit의 -py-files 인수로 .py, .zip, .egg 등을 추가 지정 -> 애플리케이션과 함께 배포 가능
-
코드 실행: main 클래스 역할을 하는 python 파일 작성 필요
-
소스 코드
from __future__ import print_function
if __name__ == '__main__':
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("Word Count") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
print(spark.range(5000).where("id>500").selectExpr("sum(id)").collect())
+) 추가
from future import print_function
-
파이썬 3에서 쓰던 문법을 파이썬 2에서 쓸수 있게 해주는 문법이다.
-
파이썬 2와 3 어떤 버젼을 돌리던 모두 파이썬 3 문법인 print() 을 통해 콘솔에 출력이 가능하다.
-
실행
$SPARK_HOME/bin/spark-submit --master local pyspark_template/main.py
- 의존성 주입 : pox.xml 파일 작성. spark core/sql 등 필요한 버전 명시.
- 소스 코드
import org.apache.spark.sql.SparkSession;
public class SimpleExample{
public static void main(String[] args){
SparkSession spark = SparkSession.builder().getOrCreate();
spark.range(1,2000).count();
}
}
- 실행
$SPARK_HOME/bin/spark-submit \
--class com.databricks.example.SimpleExample \
--master local \
target/spark-example-0.1-SNAPSHOT.jar "hello"
- gradle도 지원하니, 편한 빌드 툴 사용하자.
- Spark 어플리케이션 테스트 : 애플리케이션을 작성할 때 몇 가지 핵심 원칙과 구성 전략을 고려해야 함
- 데이터 파이프라인 설계시 고려할 점과 같음.
1. 입력 데이터에 대한 유연성
비즈니스 요구사항이 변하면 데이터도 변하므로 Spark 어플리케이션과 파이프라인은 입력 데이터 중 일부가 변하더라도 유연하게 대처할 수 있어야 함.
따라서 입력 데이터로 인해 발생할 수 있는 다양한 예외 상황에 대한 테스트 코드 작성 필요
2. 비즈니스 로직 변경에 대한 유연성
입력 데이터뿐만 아니라 파이프라인 내부의 비즈니스 로직이 바뀔 수도 있다.
예상했던 데이터의 형태가 실제 데이터와 같은지 확인이 필요하다.
비즈니스 로직을 테스트해 복잡한 비즈니스 파이프라인이 의도한 대로 동작하는지 반드시 확인해야 합니다.
3. 결과의 유연성과 원자성
결과 데이터가 스키마에 맞는 적절한 형태로 반환되는지 확인
단순히 데이터를 특정 경로에 저장해 놓고 전혀 사용하지 않는 경우는 없음
-> 대부분의 Spark 파이프라인에 데이터의 상태, 즉 데이터가 얼마나 자주 갱신되는지 데이터가 완벽한지, 마지막 순간에 데이터가 변경되지는 않았는지 등을 이해할 수 있도록 만들어야 함
- 적절한 단위 테스트 작성 후 입력 데이터나 구조가 변경되어도 비즈니스 로직이 정상적으로 동작하는지 확인해야 함
- 단위 테스트 : 스키마가 변경되는 상황에 쉽게 대응 가능.
- 단위 테스트의 구성 방법 : 비즈니스 도메인과 도메인 경험에 따라 다양할 수 있으므로 개발자 역량에 달려있다
- Spark 로컬 모드 & JUnit 이나 ScalaTest 같은 단위 테스트용 프레임워크 : 비교적 쉽게 Spark 코드를 테스트 가능
- 단위 테스트 하네스의 일부로 로컬 모드의 SparkSession 을 만들어 사용
- 이 테스트 방식이 잘 동작하려면 Spark 코드에 의존성 주입 방식으로 SparkSession 을 관리하도록 만들어야 함
- 즉, SparkSesion 을 한 번만 초기화하고 런타임 환경에서 함수와 클래스에 전달하는 방식을 사용하면 테스트 중에 SparkSession 을 쉽게 교체할 수 있음
from pyspark.sql import SparkSession
class SparkSessionSingleton:
_instance = None
@staticmethod
def get_spark_session(app_name="YourAppName"):
if not SparkSessionSingleton._instance:
SparkSessionSingleton._instance = SparkSession.builder \
.appName(app_name) \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
return SparkSessionSingleton._instance
spark = SparkSessionSingleton.get_spark_session()
- SparkSessionSingleton 클래스를 사용하여 SparkSession을 초기화하고,
- 이 방식으로 SparkSession을 전달하면 동일한 인스턴스를 계속해서 사용할 수 있다.
- API 유형에 상관없이 각 함수의 입력과 출력 타입을 문서로 만들고 테스트해야 함
- type-safe API 사용: 함수가 가지는 최소한의 규약을 지켜야 하므로 다른 코드에서 재사용하기 쉬움
- DataFrame 이나 SQL 을 사용 시 혼란을 없애기 위해 각 함수의 입력 타입과 출력 타입을 문서로 만들고 테스트해야 함
- 저수준 RDD API 는 정적 데이터 타입을 사용, 파티셔닝 같은 저수준 API 의 기능이 필요한 경우에만 사용
- 단위 테스트: 각 언어의 표준 프레임워크를 사용(JUnit, ScalaTest 등)
- 테스트 코드에서는 운영 환경의 데이터소스에 접속하지 말아야 함(개발- 운영 따로 사용하는 것이 좋음)
- Ex) 비즈니스 로직을 가진 함수가 데이터소스에 직접 접근하지 않고 DataFrame 이나 Dataset 을 넘겨받아 작업 수행
- 이렇게 생성된 함수를 재사용하는 코드는 데이터소스의 종류에 상관없이 같은 방식으로 동작함
- Spark 의 구조적 API 사용 시 : 이름이 지정된 테이블 사용(더미 데이터셋에 이름 붙여 테이블로 등록 후 사용)
참고 : 테스트 예시
- 기존 개발 프로세스와 유사
- 로컬 머신에서 실행: spark-shell 과 Spark 가 지원하는 다른 언어용 쉘을 사용해 어플리케이션 개발에 활용
- 대부분의 쉘은 대화형 어플리케이션을 개발할 때 사용
- 클러스터: spark-submit 명령과 같이 Spark 클러스터에 운영용 애플리케이션을 실행할 때 사용
- 이 모드로 실행할 수 있는 쉘에는 PySpark, Spark SQL, SparkR 등이 존재
- 어플리케이션을 개발하고 실행할 패키지나 스크립트를 만들고 나면 spark-submit 명령으로 클러스터에 제출 가능
- spark-submit 명령으로 실행하는 것이 일반적인 방법link
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
spark-submit 각종 옵션 link
- Jar 사용 예
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores 100 \
replace/with/path/to/examples.jar \
1000
view raw16_4_1_1_run_jar.sh hosted with ❤ by GitHub
- python 사용 예
$SPARK_HOME/bin/spark-submit \
--master spark://207.184.161.138:7077 \
examples/src/main/python/pi.py \
1000
- Spark 시스템 설정 방법
- Spark 속성: 대부분의 어플리케이션 파라미터를 제어, SparkConf 객체를 사용해 설정 가능
- Java 시스템 속성
- 하드코딩된 환경 설정 파일
-
Spark 의 /conf 디렉토리: 사용 가능한 여러 종류의 템플릿 파일 존재
-
어플리케이션을 개발 시 템플릿의 설정값을 하드코딩할 수 있으며
-
템플릿에 속성 값을 지정해 런타임에 사용할 수 있음
-
속성 적용의 우선순위
- App 내의 소스 코드에 하드코딩된 속성
- Spark-submit 시의 속성 설정
- spark-defaults.conf 파일
- Spark Conf 내의 속성
- 2가지 방법.
- 소스 코드 내에서 config 설정
import org.apache.spark.SparkConf
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("DefinitiveGuide")
.set("some.conf", "to.some.value")
- 실행 시 구성 (런타임)
$SPARK_HOME/bin/spark-submit --name "DefinitiveGuide" --master local[4] ...
- $SPARK_HOME의 conf/spark-env.sh 파일에 읽은 환경변수로 특정 Spark 설정을 구성 가능
- spark-env.sh: 로컬 어플리케이션이나 제출용 스크립트를 실행할 때 같이 적용됨
- Spark를 설치할 경우 자동 생성되는 것이 아님. But, conf/spark-env.sh.template 파일 존재(템플릿 파일)
spark-env.sh에 설정 가능한 변수들
-
JAVA_HOME : Java 가 설치된 경로(기본 PATH에 자바 경로가 포함되지 않는 경우)
-
PYSPARK_PYTHON : PySpark 의 드라이버와 워커 모두에 사용할 Python 실행 명령 지정
- spark.pyspark.python 속성: PYSPARK_PYTHON 보다 높은 우선순위를 가짐
-
PYSPARK_DRIVER_PYTHON : 드라이버에서 PySpark 를 사용하기 위해 실행 가능한 Python 바이너리 지정
- Default: PYSPARK_PYTHON
- spark.pyspark.driver.python 속성: PYSPARK_DRIVER_PYTHON보다 높은 우선순위를 가짐
-
SPARKR_DRIVER_R : SparkR 쉘에서 사용할 R 바이너리 실행 명령 지정
-
SPARK_LOCAL_IP : 머신의 IP 주소 지정
-
SPARK_PUBLIC_DNS : Spark 프로그램이 다른 머신에 알려줄 호스트명
- 각 머신이 사용할 코어 수, 최대 메모리 크기 같은 Spark 스탠드얼론 클러스터 설정과 관련된 옵션 또한 설정 가능
-
spark-env.sh 파일은 쉘 스크립트이므로 프로그래밍 방식으로 일부 값을 설정 가능
-
Ex) 특정 네트워크 인터페이스의 IP 를 찾아 SPARK_LOCAL_IP 변수의 값을 설정할 수 있음
-
Spark 어플리케이션에서 별도의 스레드를 이용해 여러 잡을 동시에 실행 가능
-
잡: 해당 액션을 수행하기 위해 실행되어야 할 모든 태스크와 Spark 액션
-
Spark 스케줄러: 스레드 안정성을 충분히 보장, 여러 요청을 동시에 처리할 수 있는 어플리케이션을 만들 수 있음
-
Spark 의 스케줄러 작동 방식: FIFO, 큐의 전단에 있는 잡이 클러스터의 전체 자원을 사용하지 않으면 이후 잡을 바로 실행할 수 있음(하지만 큐의 전단에 있는 잡이 너무 크면 이후 잡은 매우 늦게 실행됨)
-
-
여러 Spark 잡이 자원을 공평하게 나눠 쓰도록 구성 가능
-
Spark 는 모든 잡이 클러스터 자원을 거의 동일하게 사용할 수 있도록 라운드-로빈(Round-Robin) 방식으로 여러 Spark 잡의 태스크를 할당
-
장시간 수행되는 Spark 잡이 처리되는 중에 짧게 끝난 Spark 잡이 제출된 경우 즉시 장시간 수행하는 Spark 잡의 자원을 할당받아 처리 -> 장시간 수행되는 Spark 잡의 종료를 기다리지 않고 빠르게 응답 가능.
-
사용자가 많은 환경에 적합
-
-
페어 스케줄러(Fair Scheduler) SparkContext 를 설정할 때 spark.scheduler.mode 속성을 FAIR 로 지정 후 사용
-
페어 스케줄러는 여러 개의 잡을 풀로 그룹화하는 방식도 지원, 개별 풀에 다른 스케줄링 옵션이나 가중치를 설정 가능
-
더 중요한 Spark 잡을 할당할 수 있도록 우선순위가 높은 풀을 만들 수 있음.
-
int list = [1,2,3 list.sum()