Spaces:
Sleeping
Sleeping
from request import RequestFile as req | |
from response import ResponseFile as res | |
from response import ResponseDefault as res1 | |
import function.dropbox as sf_dropbox | |
import os | |
import shutil | |
import re | |
from repository import UserRepository | |
from function import support_function as sf | |
ALLOWED_EXTENSIONS = {'csv', 'txt', 'doc', 'docx', 'pdf', 'xlsx', 'pptx', 'json','md'} | |
regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b' | |
def check_email(email): | |
if(re.fullmatch(regex, email)): | |
return True | |
else: | |
return False | |
def allowed_file(filename): | |
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
def listNameFiles(request: req.RequestGetNameFile ): | |
try: | |
user_id = request.user_id | |
email = sf.check_email_service(user_id) | |
if isinstance(email,res1.ReponseError): | |
return email | |
list_files = sf_dropbox.list_files(email) | |
return res.ResponseGetNameFile( | |
status= 200, | |
data = res.DataGetNameFile(files=list_files) | |
) | |
except: | |
return res.ReponseError( | |
status=500, | |
data =res.Message(message="Server Error") | |
) | |
def deleteFile(request: req.RequestDeleteFile): | |
try: | |
user_id = request.user_id | |
name_file = request.name_file | |
email = sf.check_email_service(user_id) | |
if isinstance(email, res1.ReponseError): | |
return email | |
if name_file is None or name_file == "": | |
return res.ReponseError( | |
status=400, | |
data =res.Message(message="Name file is empty") | |
) | |
sf_dropbox.delete_file(email,name_file) | |
return res.ResponseDeleteFile( | |
status=200, | |
data =res.Message(message=f"delete {name_file} success") | |
) | |
except: | |
return res.ReponseError( | |
status=500, | |
data =res.Message(message=f"delete {name_file} error") | |
) | |
def download_folder(request:req.RequestDownLoadFolder): | |
try: | |
user_id = request.user_id | |
email = sf.check_email_service(user_id) | |
if isinstance(email, res1.ReponseError): | |
return email | |
sf_dropbox.download_folder(email) | |
return res.ResponseDownloadFolder( | |
status=200, | |
data =res.Message(message=f"Downloaded folder {email} success") | |
) | |
except: | |
return res.ReponseError( | |
status=500, | |
data =res.Message(message=f"Server error") | |
) | |
def download_file(request:req.RequestDownLoadFile): | |
try: | |
user_id = request.user_id | |
name_file = request.name_file | |
email = sf.check_email_service(user_id) | |
if isinstance(email, res1.ReponseError): | |
return email | |
if name_file is None or name_file == "": | |
return res.ReponseError( | |
status=400, | |
data =res.Message(message="name_file is empty") | |
) | |
sf_dropbox.search_and_download_file(name_file,email) | |
return res.ResponseDownloadFile( | |
status=200, | |
data =res.Message(message=f"Downloaded file '{name_file}' by email: '{email}' success") | |
) | |
except: | |
return res.ReponseError( | |
status=500, | |
data =res.Message(message=f"Server error") | |
) | |
ALLOWED_EXTENSIONS = {'csv', 'txt', 'doc', 'docx', 'pdf', 'xlsx', 'pptx', 'json','md'} | |
def allowed_file1(filename): | |
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
def upload_files(request: req.RequestUploadFile): | |
try: | |
user_id = request.user_id | |
files = request.files | |
email = sf.check_email_service(user_id) | |
if isinstance(email, res1.ReponseError): | |
return email | |
for file in files: | |
if not allowed_file(file.filename): | |
return res.ReponseError( | |
status=415, | |
data =res.Message(message=f"File type not allow") | |
) | |
temp_dir = f"/code/temp/{email}" | |
os.makedirs(temp_dir, exist_ok=True) | |
file_path = os.path.join(temp_dir, file.filename) | |
with open(file_path, "wb") as buffer: | |
shutil.copyfileobj(file.file, buffer) | |
cloud_path = f"/{email}/{file.filename}" | |
sf_dropbox.upload_file(file_path, cloud_path) | |
return res.ResponseUploadedFile( | |
status=200, | |
data =res.Message(message=f"Load file success") | |
) | |
except: | |
return res.ReponseError( | |
status=500, | |
data =res.Message(message=f"Load file error") | |
) | |
def deleteAllFile(request: req.RequestDeleteAllFile): | |
try: | |
user_id = request.user_id | |
email = sf.check_email_service(user_id) | |
if isinstance(email, res.ReponseError): | |
return email | |
sf_dropbox.delete_all_files_in_folder(email) | |
return res.ResponseDeleteAllFile( | |
status=200, | |
data=res.Message(message=f"Delete all file success") | |
) | |
except: | |
return res.ReponseError( | |
status=500, | |
data=res.Message(message=f"Delete all file error") | |
) |