Data Lake House

Minio 활용 spark session 생성

pogun 2024. 10. 15. 01:39
$ nano docker-compose.yml

Docker-compose.yml 파일 생성(작성 끝나면 ‘ctrl + x’ -> yes -> enter)

 

version: '3'
services:
  s3:
    image: minio/minio
    ports:
      - 9000:9000
      - 9001:9001
    environment:
      MINIO_ROOT_USER: admin
      MINIO_ROOT_PASSWORD: changeme
    volumes:
      - ./storage/minio:/data
    command: server /data --console-address ":9001"

- 서비스 명 s3로 설정, docker에서 사용할 image를 설정(minio/minio), 사용될 포트 9000, 9001 환경변수로 user 및 password 지정, volume을 지정하여 재시작을 해도 기존에 저장했던 스토리지 정보가 남을 수 있게 설정

 

- Mino 접속 후 CSV 데이터 파일 업로드


VS Code에서 실행

from minio import Minio
from glob import glob
import os

BUCKET_NAME = "deltalaketest"

client = Minio(
    "localhost:9000",
    access_key = "admin", secret_key = "changeme", secure = False
)
import os

os.environ['JAVA_HOME'] = '/.../java-8-openjdk-amd64/jre'
client

- os.environ['KEY']처럼 환경 변수의 값을 읽고, os.environ['KEY'] = 'VALUE'처럼 환경 변수를 설정

- 해당 코드에서는 JAVA_HOME 환경 변수 설정 및 url 지정

 

buckets = client.list_buckets()

for bucket in buckets:
    print(bucket.name)

- 연결된 Minio에 존재하는 buckets 확인

 

client.bucket_exists('deltalaketest')

- 해당 버킷 존재 유무 확인

 

if not client.bucket_exists(BUCKET_NAME):
    client.make_bucket(BUCKET_NAME)

- 버킷이 존재하는지 여부 확인 및 없을 시 새로 생성


SparkSession 연결

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

# docker-compose.yml 파일 설정한 아이디와 비밀번호
minio_access_key = "admin"
minio_secret_key = "changeme"

# spark session 생성시 aws와 연동하기
def s3_connect_spark(minio_access_key, minio_secret_key):
    # 설정
    conf = (
        SparkConf() # 객체를 사용하여 Spark설정을 정의
        # Spark애플리케이션의 이름을 설정
        .setAppName("MY_APP") 
        
        # Minio ID
        .set("spark.hadoop.fs.s3a.access.key", minio_access_key) 
        
        # minio PassWord
        .set("spark.hadoop.fs.s3a.secret.key", minio_secret_key) 
        
        # minio 엔드포인트 설정
        .set("spark.hadoop.fs.s3a.endpoint", "http://...") 
        
        # Delta Lake와 AWS SDK를 Spark에 추가
        .set("spark.jars.packages", "io.delta:delta-core_2.12:2.1.1, 
        org.apache.hadoop:hadoop-aws:3.3.1")
        
        # Delta Lake 보존 기간 확인 비활성화
        .set("spark.databricks.delta.retentionDurationCheck.enabled", "false") 
        
        # Deltalake를 Apache Spark에 확장
        .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
        
        # Deltalake를 Apache Spark 카탈로그로 설정
        .set("spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
        
        # 칼럼 이름에서 공백 및 특수문자 인식
        .set("spark.databricks.delta.properties.defaults.columnMapping.mode","name") 
        
        # Parquet 파일을 읽을 때 사용되는 배치 크기 설정
        .set('spark.sql.parquet.columnarReaderBatchSize',100) 
        
        # 각 Spark worker의 memory 크기, worker가 사용할 수 있는 메모리 양
        .set("spark.executor.memory", "8g") 
        
        # Spark Driver의 크기, Driver에 할당된 메모리 양을 지정
        .set("spark.driver.memory", "2g")
    )

    # spark 생성
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    
    return spark
spark = s3_connect_spark(minio_access_key, minio_secret_key)
spark