Data Lake House

PySpark Test

pogun 2024. 10. 14. 22:07
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

- config 설정(Delta Lake 라이브러리 추가)

- getOrCreate 기존 SparkSession이 있으면 그것을 반환, 그렇지 않으면 새롭게 생성

data = spark.range(0, 5)
data.write.format("delta").save("./delta-table")

- write 메서드로 DateFrame 저장

- format(“delta”)를 사용하여 저장 형식을 Delta Lake로 지정

- save 메서드로 경로 저장

 

df = spark.read.format("delta").load("./delta-table")
df.show()

- read 메서드를 통해 파일 형식이나 데이터 스키마 지정 ex) delta

- load 메서드를 통해 데이터를 읽어옴

- 0 ~ 4 숫자 랜덤 생성 확인, delta_log 파일에 json파일 추가 확인

 

data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("./delta-table")

- overwrite 모드를 사용해 덮어쓰기

- 5 ~ 9 숫자 랜덤 생성 확인, delta_log 파일에 json파일 추가 확인

 

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "./delta-table")

# id가 짝수인 레코드의 id 값을 100씩 증가 업데이트
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })

# id가 짝수인 값 삭제
deltaTable.delete(condition = expr("id % 2 == 0"))

# 새 데이터 삽입(병합)
newData = spark.range(0, 20)

deltaTable.alias("oldData") \
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()

deltaTable.toDF().show()

- update, delete, marge 작성 코드 이해

 

strmdf = spark.readStream.format("rate").load()
stream = strmdf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", 
"delta-table/checkpoint").start("./delta-table")

selectExpr("value as id")는 value 컬럼 선택하고 id로 이름 변경

- 스트리밍 장애 복구 및 상태 관리를 위해 체크포인크 위치 지정

 

stream2 = spark.readStream.format("delta").load("./delta-table").writeStream.format("console").start()

- 스트럼 진행중인 파일을 읽고 콘솔화면에 출력

- 중간 씩 체크포인트 지정 확인

 

from delta.tables import *
from pyspark.sql.functions import *

"""
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()
  """

DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

NotExists 

  • 테이블이 존재하지 않을 때에만 생성, 이미 있는 경우 작업 수행 안함
  • 테이블의 이름을 지정
  • 테이블 경로나 추가 속성을 정의하지 않음

Replace

  • 테이블을 생성하거나 이미 존재하는 경우에 대체, 즉 이미 존재하더라도 새로운 정의로 대체
  • 테이블 이름을 지정하지 않는 대신 경로를 지정
  • 추가적인 속성을 정의 가능, 이러한 속성은 description과 같은 테이블 메타데이터를 지정할 때 사용
people = spark.read.format("delta").load("/tmp/delta/people10m")
people.show()

 

DeltaTable.create(spark) \
  .tableName("default.events") \
  .addColumn("eventId", "BIGINT") \
  .addColumn("data", "STRING") \
  .addColumn("eventType", "STRING") \
  .addColumn("eventTime", "TIMESTAMP") \
  .addColumn("eventDate", "DATE", generatedAlwaysAs="CAST(eventTime AS DATE)") \
  .partitionedBy("eventType", "eventDate") \
  .execute()

- eventType, eventDate 칼럼으로 분할 파티셔닝

- generatedAlwaysAs 옵션을 통해 Data 컬럼이 Time 컬럼에서 날짜 부분만 추출하여 자동으로 생성되도록 지정

- 날짜별로 데이터를 파티셔닝하는데 유용함

 

event = spark.sql('SELECT * FROM default.events WHERE eventTime >= "2020-10-01 00:00:00" AND eventTime <= "2020-10-01 12:00:00"')
event.show()

- 테이블 조회와 특정 조건을 달아서 where절 사용 실습

 

people.write.format("delta").mode("append").save("/tmp/delta/people10m")

- 기존 delta 테이블에 새 데이터를 일관되게 처리하기 위해 append 모드 사용

 

people.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")

- 테이블의 모든 데이터를 원자적으로 바꾸려면 overwrite 모드 사용 

 

from pyspark.sql.functions import col

# 기존 Delta 테이블 읽어옴
people = spark.read.format("delta").load("/tmp/delta/people10m")

# 새로운 데이터를 포함하는 DataFrame을 생성
people2 = spark.createDataFrame([
    (3, "Emily", "Johnson", "Female", "man", "1990-03-20", "555-12-3456", 70000),
    (4, "Michael", "Williams", "Male", "wuman", "1982-07-10", "888-99-0000", 80000)
], ["id", "firstName", "middleName", "lastName", "gender", "birthDate", "ssn", "salary"])

# 컬럼의 데이터 타입을 명시(타입이 안맞다고 오류 시 유용함)
people2 = people2.withColumn("id", col("id").cast("int"))
people2 = people2.withColumn("birthDate", col("birthDate").cast("TIMESTAMP"))
people2 = people2.withColumn("salary", col("salary").cast("int"))

# 기존 Delta 테이블과 새로운 데이터를 결합
people3 = people.union(people2)

# 결합된 데이터를 다시 Delta 테이블에 저장
people3.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
people3.show()