Data Lake House

AWS 활용 delta Lake 최적화 비교(1)

pogun 2024. 10. 16. 13:51

- AWS 콘솔에서 ACCESS_KEY 와 SECRET_ACCESS_KEY 발급

- REGION도 알아야함!

- KEY, REGION 변수화

 

: S3 클라이언트, 리소스, 세션 생성 및 연결

import boto3

# s3 Client 연결 함수
def s3_client_connection():
    try: # s3 Client 생성
        s3 = boto3.client(
            service_name = "s3",
            region_name = ...,
            aws_access_key_id = ...,
            aws_secret_access_key = ...
        )
    except Exception as e:
        print(e)
    else:
        print("s3 bucket connected!")
        return s3
    
# s3 Resource 연결 함수
...
...

# s3 Session 연결 함수
...
...

- s3_client_connection : AWS S3 서비스와 상호작용할 수 있는 클라이언트 생성(boto3.client())

 

: S3 버킷 생성

from glob import glob
import os
import botocore

BUCKET_NAME = "delta-lake-s3"
REGION = "..."

# 버킷이 존재하지 않으면 생성
try:
    client.head_bucket(Bucket=BUCKET_NAME)
except botocore.exceptions.ClientError as e:
    if e.response['Error']['Code'] == '404':
        client.create_bucket(Bucket=BUCKET_NAME, CreateBucketConfiguration={'LocationConstraint': REGION})

# 생성된 버킷 목록 출력
buckets = client.list_buckets()
for bucket in buckets['Buckets']:
    print(bucket['Name'])

- client.head_bucket() : 해당 s3 버킷이 존재하는지 확인

- client.create_bucket() : 없을 시 새 버킷 생성

- client.list_buckets() : 모든 s3 버킷 목록 반환

 

: S3 서비스에 연결하여 사용 가능한 모든 버킷 확인

session = s3_session_connection()

# 생성된 세션을 사용하여 s3 리소스를 생성
session_s3 = session.resource('s3')

# s3 리소스 객체에서 사용 가능한 모든 버킷을 반복적으로 가져옴
for bucket in session_s3.buckets.all():
    print(bucket.name)

 

 

: AWS S3계정에 있는 모든 버킷 목록 확인

# client 연동 후 계정에 있는 모든 버킷 목록을 보여줌
client = s3_client_connection()
response = client.list_buckets() # bucket 목록
response

 

 

: AWS S3 버킷에 업로드 및 버킷에서 파일 다운로드

# Upload File
# local_path : 업로드 하려는 파일의 Local 파일 경로
# bucket_name : Upload하고자 할 bucket 이름
# key : 버킷 안에서 저장하고자 하는 경로
def upload_file(local_path, bucket_name, key, KEY_ID, ACCESS_KEY, REGION):
    try:
        s3 = s3_resource_connection(KEY_ID, ACCESS_KEY, REGION)
        s3.meta.client.upload_file(local_path, bucket_name, key)
    except Exception as e:
        print(e)
    else:
        print("complete Save File to S3")
        
# DownLoad File
# local_path : 다운로드할 파일 Local 저장 경로
# bucket_name : 다운로드하고자 하는 파일이 저장된 bucket 이름
# key : 다운로드 하고자 하는 파일이 저장된 버킷안의 경로
def download_file(local_path, bucket_name, key, KEY_ID, ACCESS_KEY, REGION):
    try:
        s3 = s3_resource_connection(KEY_ID, ACCESS_KEY, REGION)
        bucket = s3.Bucket(bucket_name)
        objects = list(bucket.objects.filter(Prefix=key))

        if objects and objects[0].key == key:
            bucket.download_file(objects[0].key, local_path)
    except Exception as e:
        print(e)
    else:
        print("complete Save File to S3")

- s3_resource_connection() : AWS 인증 정보를 사용해 s3에 연결하는 역할

- bucket.objects.filter(prefix=key) : 지정된 key로 시작하는 파일 목록 필터링 후 가져옴

# Parameter
bucket_name = "delta-lake-s3"
key = "버킷이름/data_extended.csv"

# Upload Parameter
Upload_path = "./data_extended.csv"

# Download Parameter
Download_path = "C:/.../바탕 화면"
import boto3

bucket = s3_resource.Bucket(bucket_name)
bucket.upload_file(Upload_path, key)
# Download Cell
donwload_file(Download_path, bucket_name, key, KEY_ID, ACCESS_KEY, REGION)

 

 

: S3에 있는 파일 읽어오기

import pandas as pd
import io

# 해당 s3 디렉토리 확인하기
s3_client = s3_client_connection(KEY_ID, ACCESS_KEY, REGION)
# 확인할 디렉토리 경로 Parameter
HO = "delta-data/"

HO_info = s3_client.list_objects(Bucket = bucket_name, Prefix = HO, Delimiter = '/')

for content in HO_info['Contents']: # contents는 객체의 리스트
    print(content['Key']) # key는 객체의 키(파일 경로)

- s3_client.list_object() : 지정된 s3 버킷 내에서 주어진 조건에 맞는 객채들의 리스트를 가져옴

- prefix=HO : prefix는 s3에서 특정 경로를 필터링할 때 사용됨

- Delimiter : 디렉토리 경계 구분 용도

# csv파일 읽어온 후 Pandas DataFrame으로 변환하고 출력
key = "버킷이름/data_extended.csv"
obj = s3_client.get_object(Bucket = bucket_name, Key = key)
test_df = pd.read_csv(io.BytesIO(obj["Body"].read()))
test_df # s3객체의 내용을 바이트 스트림으로 읽음

- s3_client.get_object() : s3 버킷에서 지정된 객체를 가져오는 역할

* 메모리 용량 충분히 확보해야함.(동시에 실행 시 메모리 사용량 급증으로 실행 안됐었음.)*

'Data Lake House' 카테고리의 다른 글

AWS 활용 delta Lake 최적화 비교(2)  (0) 2024.10.17
Minio 활용 spark session 생성  (2) 2024.10.15
PySpark Test  (1) 2024.10.14
Data Lakehouse 개념  (0) 2024.08.19
가상 환경 이해 및 Ubuntu 환경 Pyspark 가상환경 구축  (0) 2024.07.09