import os import boto3 from botocore.exceptions import ClientError from config import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, BUCKET_NAME, S3_OBJ_BASE_URL class S3Handler: def __init__(self): """ Establishes connection with the required s3 bucket """ try: self.s3 = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY ) except ClientError as e: print(f"Failed to connect to S3: {e}") def upload_file(self, user_id, s3_path, file_path, **kwargs): """ Uploads file to the s3 """ try: # Filename: File path to upload # Bucket: Name of the bucket to upload the file. # Key: Name of the key to upload to S3. s3_user_path = user_id + '/' + s3_path self.s3.upload_file( Filename=file_path, Bucket=BUCKET_NAME, Key=s3_user_path, # ExtraArgs={'ACL': 'public-read'} ) print(f"File uploaded to {BUCKET_NAME} as {s3_path}") s3_obj_path = S3_OBJ_BASE_URL + '/' + s3_user_path return s3_obj_path except ClientError as e: print(f"Failed to upload file to S3: {e}") def download_file(self, user_id, s3_path, file_path, **kwargs): """ Downloads file from s3 """ try: # Filename: Local File path to download to # Bucket: Name of the bucket to download the file from # Key: Name of the file to download from the bucket s3_user_path = user_id + '/' + s3_path self.s3.download_file( Filename=file_path, Bucket=BUCKET_NAME, Key=s3_user_path ) print(f"File downloaded from {BUCKET_NAME} as {file_path}") s3_obj_path = S3_OBJ_BASE_URL + '/' + s3_user_path return s3_obj_path except ClientError as e: print(f"Failed to download file from S3: {e}") # s3_path = "sample_video_srt_2.mp4" # file_path = "Output/video_1.mp4" # s3 = S3Handler() # s3.upload_file(s3_path, file_path, user_id="uzair")