- AWS 콘솔에서 ACCESS_KEY 와 SECRET_ACCESS_KEY 발급
- REGION도 알아야함!
- KEY, REGION 변수화
: S3 클라이언트, 리소스, 세션 생성 및 연결
import boto3
# s3 Client 연결 함수
def s3_client_connection():
try: # s3 Client 생성
s3 = boto3.client(
service_name = "s3",
region_name = ...,
aws_access_key_id = ...,
aws_secret_access_key = ...
)
except Exception as e:
print(e)
else:
print("s3 bucket connected!")
return s3
# s3 Resource 연결 함수
...
...
# s3 Session 연결 함수
...
...
- s3_client_connection : AWS S3 서비스와 상호작용할 수 있는 클라이언트 생성(boto3.client())
: S3 버킷 생성
from glob import glob
import os
import botocore
BUCKET_NAME = "delta-lake-s3"
REGION = "..."
# 버킷이 존재하지 않으면 생성
try:
client.head_bucket(Bucket=BUCKET_NAME)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == '404':
client.create_bucket(Bucket=BUCKET_NAME, CreateBucketConfiguration={'LocationConstraint': REGION})
# 생성된 버킷 목록 출력
buckets = client.list_buckets()
for bucket in buckets['Buckets']:
print(bucket['Name'])
- client.head_bucket() : 해당 s3 버킷이 존재하는지 확인
- client.create_bucket() : 없을 시 새 버킷 생성
- client.list_buckets() : 모든 s3 버킷 목록 반환
: S3 서비스에 연결하여 사용 가능한 모든 버킷 확인
session = s3_session_connection()
# 생성된 세션을 사용하여 s3 리소스를 생성
session_s3 = session.resource('s3')
# s3 리소스 객체에서 사용 가능한 모든 버킷을 반복적으로 가져옴
for bucket in session_s3.buckets.all():
print(bucket.name)
: AWS S3계정에 있는 모든 버킷 목록 확인
# client 연동 후 계정에 있는 모든 버킷 목록을 보여줌
client = s3_client_connection()
response = client.list_buckets() # bucket 목록
response
: AWS S3 버킷에 업로드 및 버킷에서 파일 다운로드
# Upload File
# local_path : 업로드 하려는 파일의 Local 파일 경로
# bucket_name : Upload하고자 할 bucket 이름
# key : 버킷 안에서 저장하고자 하는 경로
def upload_file(local_path, bucket_name, key, KEY_ID, ACCESS_KEY, REGION):
try:
s3 = s3_resource_connection(KEY_ID, ACCESS_KEY, REGION)
s3.meta.client.upload_file(local_path, bucket_name, key)
except Exception as e:
print(e)
else:
print("complete Save File to S3")
# DownLoad File
# local_path : 다운로드할 파일 Local 저장 경로
# bucket_name : 다운로드하고자 하는 파일이 저장된 bucket 이름
# key : 다운로드 하고자 하는 파일이 저장된 버킷안의 경로
def download_file(local_path, bucket_name, key, KEY_ID, ACCESS_KEY, REGION):
try:
s3 = s3_resource_connection(KEY_ID, ACCESS_KEY, REGION)
bucket = s3.Bucket(bucket_name)
objects = list(bucket.objects.filter(Prefix=key))
if objects and objects[0].key == key:
bucket.download_file(objects[0].key, local_path)
except Exception as e:
print(e)
else:
print("complete Save File to S3")
- s3_resource_connection() : AWS 인증 정보를 사용해 s3에 연결하는 역할
- bucket.objects.filter(prefix=key) : 지정된 key로 시작하는 파일 목록 필터링 후 가져옴
# Parameter
bucket_name = "delta-lake-s3"
key = "버킷이름/data_extended.csv"
# Upload Parameter
Upload_path = "./data_extended.csv"
# Download Parameter
Download_path = "C:/.../바탕 화면"
import boto3
bucket = s3_resource.Bucket(bucket_name)
bucket.upload_file(Upload_path, key)
# Download Cell
donwload_file(Download_path, bucket_name, key, KEY_ID, ACCESS_KEY, REGION)
: S3에 있는 파일 읽어오기
import pandas as pd
import io
# 해당 s3 디렉토리 확인하기
s3_client = s3_client_connection(KEY_ID, ACCESS_KEY, REGION)
# 확인할 디렉토리 경로 Parameter
HO = "delta-data/"
HO_info = s3_client.list_objects(Bucket = bucket_name, Prefix = HO, Delimiter = '/')
for content in HO_info['Contents']: # contents는 객체의 리스트
print(content['Key']) # key는 객체의 키(파일 경로)
- s3_client.list_object() : 지정된 s3 버킷 내에서 주어진 조건에 맞는 객채들의 리스트를 가져옴
- prefix=HO : prefix는 s3에서 특정 경로를 필터링할 때 사용됨
- Delimiter : 디렉토리 경계 구분 용도
# csv파일 읽어온 후 Pandas DataFrame으로 변환하고 출력
key = "버킷이름/data_extended.csv"
obj = s3_client.get_object(Bucket = bucket_name, Key = key)
test_df = pd.read_csv(io.BytesIO(obj["Body"].read()))
test_df # s3객체의 내용을 바이트 스트림으로 읽음
- s3_client.get_object() : s3 버킷에서 지정된 객체를 가져오는 역할
* 메모리 용량 충분히 확보해야함.(동시에 실행 시 메모리 사용량 급증으로 실행 안됐었음.)*
'Data Lake House' 카테고리의 다른 글
AWS 활용 delta Lake 최적화 비교(2) (0) | 2024.10.17 |
---|---|
Minio 활용 spark session 생성 (2) | 2024.10.15 |
PySpark Test (1) | 2024.10.14 |
Data Lakehouse 개념 (0) | 2024.08.19 |
가상 환경 이해 및 Ubuntu 환경 Pyspark 가상환경 구축 (0) | 2024.07.09 |