import pyspark
from delta import *
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
- config 설정(Delta Lake 라이브러리 추가)
- getOrCreate 기존 SparkSession이 있으면 그것을 반환, 그렇지 않으면 새롭게 생성
data = spark.range(0, 5)
data.write.format("delta").save("./delta-table")
- write 메서드로 DateFrame 저장
- format(“delta”)를 사용하여 저장 형식을 Delta Lake로 지정
- save 메서드로 경로 저장
df = spark.read.format("delta").load("./delta-table")
df.show()
- read 메서드를 통해 파일 형식이나 데이터 스키마 지정 ex) delta
- load 메서드를 통해 데이터를 읽어옴
- 0 ~ 4 숫자 랜덤 생성 확인, delta_log 파일에 json파일 추가 확인
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("./delta-table")
- overwrite 모드를 사용해 덮어쓰기
- 5 ~ 9 숫자 랜덤 생성 확인, delta_log 파일에 json파일 추가 확인
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, "./delta-table")
# id가 짝수인 레코드의 id 값을 100씩 증가 업데이트
deltaTable.update(
condition = expr("id % 2 == 0"),
set = { "id": expr("id + 100") })
# id가 짝수인 값 삭제
deltaTable.delete(condition = expr("id % 2 == 0"))
# 새 데이터 삽입(병합)
newData = spark.range(0, 20)
deltaTable.alias("oldData") \
.merge(
newData.alias("newData"),
"oldData.id = newData.id") \
.whenMatchedUpdate(set = { "id": col("newData.id") }) \
.whenNotMatchedInsert(values = { "id": col("newData.id") }) \
.execute()
deltaTable.toDF().show()
- update, delete, marge 작성 코드 이해
strmdf = spark.readStream.format("rate").load()
stream = strmdf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation",
"delta-table/checkpoint").start("./delta-table")
- selectExpr("value as id")는 value 컬럼 선택하고 id로 이름 변경
- 스트리밍 장애 복구 및 상태 관리를 위해 체크포인크 위치 지정
stream2 = spark.readStream.format("delta").load("./delta-table").writeStream.format("console").start()
- 스트럼 진행중인 파일을 읽고 콘솔화면에 출력
- 중간 씩 체크포인트 지정 확인
from delta.tables import *
from pyspark.sql.functions import *
"""
DeltaTable.createIfNotExists(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
"""
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
NotExists
- 테이블이 존재하지 않을 때에만 생성, 이미 있는 경우 작업 수행 안함
- 테이블의 이름을 지정
- 테이블 경로나 추가 속성을 정의하지 않음
Replace
- 테이블을 생성하거나 이미 존재하는 경우에 대체, 즉 이미 존재하더라도 새로운 정의로 대체
- 테이블 이름을 지정하지 않는 대신 경로를 지정
- 추가적인 속성을 정의 가능, 이러한 속성은 description과 같은 테이블 메타데이터를 지정할 때 사용
people = spark.read.format("delta").load("/tmp/delta/people10m")
people.show()
DeltaTable.create(spark) \
.tableName("default.events") \
.addColumn("eventId", "BIGINT") \
.addColumn("data", "STRING") \
.addColumn("eventType", "STRING") \
.addColumn("eventTime", "TIMESTAMP") \
.addColumn("eventDate", "DATE", generatedAlwaysAs="CAST(eventTime AS DATE)") \
.partitionedBy("eventType", "eventDate") \
.execute()
- eventType, eventDate 칼럼으로 분할 파티셔닝
- generatedAlwaysAs 옵션을 통해 Data 컬럼이 Time 컬럼에서 날짜 부분만 추출하여 자동으로 생성되도록 지정
- 날짜별로 데이터를 파티셔닝하는데 유용함
event = spark.sql('SELECT * FROM default.events WHERE eventTime >= "2020-10-01 00:00:00" AND eventTime <= "2020-10-01 12:00:00"')
event.show()
- 테이블 조회와 특정 조건을 달아서 where절 사용 실습
people.write.format("delta").mode("append").save("/tmp/delta/people10m")
- 기존 delta 테이블에 새 데이터를 일관되게 처리하기 위해 append 모드 사용
people.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
- 테이블의 모든 데이터를 원자적으로 바꾸려면 overwrite 모드 사용
from pyspark.sql.functions import col
# 기존 Delta 테이블 읽어옴
people = spark.read.format("delta").load("/tmp/delta/people10m")
# 새로운 데이터를 포함하는 DataFrame을 생성
people2 = spark.createDataFrame([
(3, "Emily", "Johnson", "Female", "man", "1990-03-20", "555-12-3456", 70000),
(4, "Michael", "Williams", "Male", "wuman", "1982-07-10", "888-99-0000", 80000)
], ["id", "firstName", "middleName", "lastName", "gender", "birthDate", "ssn", "salary"])
# 컬럼의 데이터 타입을 명시(타입이 안맞다고 오류 시 유용함)
people2 = people2.withColumn("id", col("id").cast("int"))
people2 = people2.withColumn("birthDate", col("birthDate").cast("TIMESTAMP"))
people2 = people2.withColumn("salary", col("salary").cast("int"))
# 기존 Delta 테이블과 새로운 데이터를 결합
people3 = people.union(people2)
# 결합된 데이터를 다시 Delta 테이블에 저장
people3.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
people3.show()
'Data Lake House' 카테고리의 다른 글
AWS 활용 delta Lake 최적화 비교(2) (0) | 2024.10.17 |
---|---|
AWS 활용 delta Lake 최적화 비교(1) (0) | 2024.10.16 |
Minio 활용 spark session 생성 (2) | 2024.10.15 |
Data Lakehouse 개념 (0) | 2024.08.19 |
가상 환경 이해 및 Ubuntu 환경 Pyspark 가상환경 구축 (0) | 2024.07.09 |