Data Lake House

AWS 활용 delta Lake 최적화 비교(2)

pogun 2024. 10. 17. 01:55

: SparkSession 생성 및 AWS S3와 연동

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# spark session 생성시 aws와 연동하기
def s3_connect_spark(KEY_ID, KEY_ID):
    # 설정
    conf = (
        SparkConf()
        # Spark애플리케이션의 이름을 설정
        .setAppName("MY_APP") 
        
        .set("spark.hadoop.fs.s3a.access.key", KEY_ID)
        
        .set("spark.hadoop.fs.s3a.secret.key", ACCESS_KEY)
        
        # Delta Lake와 AWS SDK를 Spark에 추가
        .set("spark.jars.packages", "io.delta:delta-core_2.12:2.1.1,
        org.apache.hadoop:hadoop-aws:3.3.1")
        
        # Delta Lake 보존 기간 확인 비활성화
        .set("spark.databricks.delta.retentionDurationCheck.enabled", "false") 
        
        # Deltalake로 Apache Spark 설정
        .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        
        # Deltalake를 Apache Spark 카탈로그로 설정
        .set("spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
        
        # 칼럼 이름에서 공백 및 특수문자 인식
        .set("spark.databricks.delta.properties.defaults.columnMapping.mode","name")
        
        # Parquet 파일을 읽을 때 사용되는 배치 크기 설정
        .set('spark.sql.parquet.columnarReaderBatchSize',100)
        
        # 각 Spark worker의 memory 크기, worker가 사용할 수 있는 메모리 양
        .set("spark.executor.memory", "8g")
        
        # Spark Driver의 크기, Driver에 할당된 메모리 양을 지정
        .set("spark.driver.memory", "2g") 
    )

    # spark 생성
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    
    return spark
spark = s3_connect_spark(...)

 

 

: S3에서 CSV 파일 읽어온 후 Spark DataFrame 변환

# s3에서 spark로 csv 읽기
spark.sparkContext.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
# AWS s3의 버전 4 서명 활성화

s3_data_path = "s3a://.../data_extended.csv"
# s3a:// -> s3 파일 시스템을 사용한다는 의미

df = spark.read.format("csv").option("header",True).option("inferSchema", "true").csv(s3_data_path)
# header: 첫 번째 행이 헤더인지 여부 지정, inferSchema: 스키마를 자동으로 추론 지정

- 마지막 코드가 읽어들인 CSV 파일을 Spark DataFrame으로 변환하는 과정

- 해당 과정은 PySpark를 사용하여 데이터를 처리하고 분석하기 위한 필수적인 단계

 

 

: S3에 CSV 파일을 Delta 테이블로 저장

save_path = "s3a://.../delta-test"

df.write.format("delta").save(save_path)

- s3a:// : s3 버킷에 접근하기 위한 프로토콜

- df.write : Spark DataFrame을 Delta 형식으로 s3에 저장

 

: S3에 delta lake 형식으로 저장된 데이터를 PySpark에서 읽어들여 DataFrame으로 변환

from delta.tables import *
from pyspark.sql.functions import *

s3table_path = "s3a://.../delta-test"

# delta Table load
DTable = DeltaTable.forPath(spark, s3table_path)
print(DTable)

# S3에 delta lake 형식으로 저장된 데이터를 PySpark에서 읽어들여 DataFrame으로 변환
test = DTable.toDF()
print(type(test))

test.show()

- delta.tables : Delta Lake API에서 제공하는 Delta Table 관련 기능을 사용하기 위한 모듈

- pyspark.sql.functions : PySpark에서 제공하는 다양한 sql 함수들을 사용하기 위한 모듈

- DTable.toDF() : Delta 테이블을 PySpark DataFrame으로 변환

- PySpark에서 데이터를 분석하고 처리하기 위한 단계

 

: 특정 버전을 읽어와 DataFrame 생성

# verison 0 -> Raw Data
df0 = spark.read.format("delta").option("versionAsOf", 0).load(s3table_path)

- versionAsof : Delta Lake 테이블의 특정 버전 읽어오기 옵션

# version 1 -> optimize.compaction()
(
    delta.DeltaTable.forPath(spark, s3table_path)
    .optimize() # 테이블 최적화
    .executeCompaction() # 파일을 병합
)

df1 = spark.read.format("delta").option("versionAsOf", 1).load(s3table_path)

- optimize() : Delta 테이블에 최적화를 적용하는 메서드

- executeCompaction() : 최적화된 테이블에 압축 및 병합을 실행하는 메서드

# version 2 -> Z-Ordering()
(
    delta.DeltaTable.forPath(spark, s3table_path)
    .optimize()
    .executeZOrderBy("Batch_ID") # 해당 열을 기준으로 Z-Ordering(효율적으로 정렬하는 작업)
)

- executeZOrderBy() : 지정된 열을 기준으로 Z-Ordering 실행하는 메서드

 

 

- 버전2는 버전1의 이후에 발생하는 변경 사항을 반영

- df0에 버전0 실행 후 읽어옴 -> df1에 버전0 변경사항을 유지하면서 버전1 실행 후 읽어옴

 

::버전별로 실험 결과는 github에 있음::

'Data Lake House' 카테고리의 다른 글

AWS 활용 delta Lake 최적화 비교(1)  (0) 2024.10.16
Minio 활용 spark session 생성  (2) 2024.10.15
PySpark Test  (1) 2024.10.14
Data Lakehouse 개념  (0) 2024.08.19
가상 환경 이해 및 Ubuntu 환경 Pyspark 가상환경 구축  (0) 2024.07.09