: SparkSession 생성 및 AWS S3와 연동
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
# spark session 생성시 aws와 연동하기
def s3_connect_spark(KEY_ID, KEY_ID):
# 설정
conf = (
SparkConf()
# Spark애플리케이션의 이름을 설정
.setAppName("MY_APP")
.set("spark.hadoop.fs.s3a.access.key", KEY_ID)
.set("spark.hadoop.fs.s3a.secret.key", ACCESS_KEY)
# Delta Lake와 AWS SDK를 Spark에 추가
.set("spark.jars.packages", "io.delta:delta-core_2.12:2.1.1,
org.apache.hadoop:hadoop-aws:3.3.1")
# Delta Lake 보존 기간 확인 비활성화
.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
# Deltalake로 Apache Spark 설정
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
# Deltalake를 Apache Spark 카탈로그로 설정
.set("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
# 칼럼 이름에서 공백 및 특수문자 인식
.set("spark.databricks.delta.properties.defaults.columnMapping.mode","name")
# Parquet 파일을 읽을 때 사용되는 배치 크기 설정
.set('spark.sql.parquet.columnarReaderBatchSize',100)
# 각 Spark worker의 memory 크기, worker가 사용할 수 있는 메모리 양
.set("spark.executor.memory", "8g")
# Spark Driver의 크기, Driver에 할당된 메모리 양을 지정
.set("spark.driver.memory", "2g")
)
# spark 생성
spark = SparkSession.builder.config(conf=conf).getOrCreate()
return spark
spark = s3_connect_spark(...)
: S3에서 CSV 파일 읽어온 후 Spark DataFrame 변환
# s3에서 spark로 csv 읽기
spark.sparkContext.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
# AWS s3의 버전 4 서명 활성화
s3_data_path = "s3a://.../data_extended.csv"
# s3a:// -> s3 파일 시스템을 사용한다는 의미
df = spark.read.format("csv").option("header",True).option("inferSchema", "true").csv(s3_data_path)
# header: 첫 번째 행이 헤더인지 여부 지정, inferSchema: 스키마를 자동으로 추론 지정
- 마지막 코드가 읽어들인 CSV 파일을 Spark DataFrame으로 변환하는 과정
- 해당 과정은 PySpark를 사용하여 데이터를 처리하고 분석하기 위한 필수적인 단계
: S3에 CSV 파일을 Delta 테이블로 저장
save_path = "s3a://.../delta-test"
df.write.format("delta").save(save_path)
- s3a:// : s3 버킷에 접근하기 위한 프로토콜
- df.write : Spark DataFrame을 Delta 형식으로 s3에 저장
: S3에 delta lake 형식으로 저장된 데이터를 PySpark에서 읽어들여 DataFrame으로 변환
from delta.tables import *
from pyspark.sql.functions import *
s3table_path = "s3a://.../delta-test"
# delta Table load
DTable = DeltaTable.forPath(spark, s3table_path)
print(DTable)
# S3에 delta lake 형식으로 저장된 데이터를 PySpark에서 읽어들여 DataFrame으로 변환
test = DTable.toDF()
print(type(test))
test.show()
- delta.tables : Delta Lake API에서 제공하는 Delta Table 관련 기능을 사용하기 위한 모듈
- pyspark.sql.functions : PySpark에서 제공하는 다양한 sql 함수들을 사용하기 위한 모듈
- DTable.toDF() : Delta 테이블을 PySpark DataFrame으로 변환
- PySpark에서 데이터를 분석하고 처리하기 위한 단계
: 특정 버전을 읽어와 DataFrame 생성
# verison 0 -> Raw Data
df0 = spark.read.format("delta").option("versionAsOf", 0).load(s3table_path)
- versionAsof : Delta Lake 테이블의 특정 버전 읽어오기 옵션
# version 1 -> optimize.compaction()
(
delta.DeltaTable.forPath(spark, s3table_path)
.optimize() # 테이블 최적화
.executeCompaction() # 파일을 병합
)
df1 = spark.read.format("delta").option("versionAsOf", 1).load(s3table_path)
- optimize() : Delta 테이블에 최적화를 적용하는 메서드
- executeCompaction() : 최적화된 테이블에 압축 및 병합을 실행하는 메서드
# version 2 -> Z-Ordering()
(
delta.DeltaTable.forPath(spark, s3table_path)
.optimize()
.executeZOrderBy("Batch_ID") # 해당 열을 기준으로 Z-Ordering(효율적으로 정렬하는 작업)
)
- executeZOrderBy() : 지정된 열을 기준으로 Z-Ordering 실행하는 메서드
- 버전2는 버전1의 이후에 발생하는 변경 사항을 반영
- df0에 버전0 실행 후 읽어옴 -> df1에 버전0 변경사항을 유지하면서 버전1 실행 후 읽어옴
::버전별로 실험 결과는 github에 있음::
'Data Lake House' 카테고리의 다른 글
AWS 활용 delta Lake 최적화 비교(1) (0) | 2024.10.16 |
---|---|
Minio 활용 spark session 생성 (2) | 2024.10.15 |
PySpark Test (1) | 2024.10.14 |
Data Lakehouse 개념 (0) | 2024.08.19 |
가상 환경 이해 및 Ubuntu 환경 Pyspark 가상환경 구축 (0) | 2024.07.09 |