$ nano docker-compose.yml
Docker-compose.yml 파일 생성(작성 끝나면 ‘ctrl + x’ -> yes -> enter)
version: '3'
services:
s3:
image: minio/minio
ports:
- 9000:9000
- 9001:9001
environment:
MINIO_ROOT_USER: admin
MINIO_ROOT_PASSWORD: changeme
volumes:
- ./storage/minio:/data
command: server /data --console-address ":9001"
- 서비스 명 s3로 설정, docker에서 사용할 image를 설정(minio/minio), 사용될 포트 9000, 9001 환경변수로 user 및 password 지정, volume을 지정하여 재시작을 해도 기존에 저장했던 스토리지 정보가 남을 수 있게 설정
- Mino 접속 후 CSV 데이터 파일 업로드
VS Code에서 실행
from minio import Minio
from glob import glob
import os
BUCKET_NAME = "deltalaketest"
client = Minio(
"localhost:9000",
access_key = "admin", secret_key = "changeme", secure = False
)
import os
os.environ['JAVA_HOME'] = '/.../java-8-openjdk-amd64/jre'
client
- os.environ['KEY']처럼 환경 변수의 값을 읽고, os.environ['KEY'] = 'VALUE'처럼 환경 변수를 설정
- 해당 코드에서는 JAVA_HOME 환경 변수 설정 및 url 지정
buckets = client.list_buckets()
for bucket in buckets:
print(bucket.name)
- 연결된 Minio에 존재하는 buckets 확인
client.bucket_exists('deltalaketest')
- 해당 버킷 존재 유무 확인
if not client.bucket_exists(BUCKET_NAME):
client.make_bucket(BUCKET_NAME)
- 버킷이 존재하는지 여부 확인 및 없을 시 새로 생성
SparkSession 연결
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
# docker-compose.yml 파일 설정한 아이디와 비밀번호
minio_access_key = "admin"
minio_secret_key = "changeme"
# spark session 생성시 aws와 연동하기
def s3_connect_spark(minio_access_key, minio_secret_key):
# 설정
conf = (
SparkConf() # 객체를 사용하여 Spark설정을 정의
# Spark애플리케이션의 이름을 설정
.setAppName("MY_APP")
# Minio ID
.set("spark.hadoop.fs.s3a.access.key", minio_access_key)
# minio PassWord
.set("spark.hadoop.fs.s3a.secret.key", minio_secret_key)
# minio 엔드포인트 설정
.set("spark.hadoop.fs.s3a.endpoint", "http://...")
# Delta Lake와 AWS SDK를 Spark에 추가
.set("spark.jars.packages", "io.delta:delta-core_2.12:2.1.1,
org.apache.hadoop:hadoop-aws:3.3.1")
# Delta Lake 보존 기간 확인 비활성화
.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
# Deltalake를 Apache Spark에 확장
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
# Deltalake를 Apache Spark 카탈로그로 설정
.set("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
# 칼럼 이름에서 공백 및 특수문자 인식
.set("spark.databricks.delta.properties.defaults.columnMapping.mode","name")
# Parquet 파일을 읽을 때 사용되는 배치 크기 설정
.set('spark.sql.parquet.columnarReaderBatchSize',100)
# 각 Spark worker의 memory 크기, worker가 사용할 수 있는 메모리 양
.set("spark.executor.memory", "8g")
# Spark Driver의 크기, Driver에 할당된 메모리 양을 지정
.set("spark.driver.memory", "2g")
)
# spark 생성
spark = SparkSession.builder.config(conf=conf).getOrCreate()
return spark
spark = s3_connect_spark(minio_access_key, minio_secret_key)
spark
'Data Lake House' 카테고리의 다른 글
AWS 활용 delta Lake 최적화 비교(2) (0) | 2024.10.17 |
---|---|
AWS 활용 delta Lake 최적화 비교(1) (0) | 2024.10.16 |
PySpark Test (1) | 2024.10.14 |
Data Lakehouse 개념 (0) | 2024.08.19 |
가상 환경 이해 및 Ubuntu 환경 Pyspark 가상환경 구축 (0) | 2024.07.09 |