Spaces:
Sleeping
Sleeping
File size: 5,134 Bytes
6b725f7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
from request import RequestFile as req
from response import ResponseFile as res
from response import ResponseDefault as res1
import function.dropbox as sf_dropbox
import os
import shutil
import re
from repository import UserRepository
from function import support_function as sf
ALLOWED_EXTENSIONS = {'csv', 'txt', 'doc', 'docx', 'pdf', 'xlsx', 'pptx', 'json','md'}
regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b'
def check_email(email):
if(re.fullmatch(regex, email)):
return True
else:
return False
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def listNameFiles(request: req.RequestGetNameFile ):
try:
user_id = request.user_id
email = sf.check_email_service(user_id)
if isinstance(email,res1.ReponseError):
return email
list_files = sf_dropbox.list_files(email)
return res.ResponseGetNameFile(
status= 200,
data = res.DataGetNameFile(files=list_files)
)
except:
return res.ReponseError(
status=500,
data =res.Message(message="Server Error")
)
def deleteFile(request: req.RequestDeleteFile):
try:
user_id = request.user_id
name_file = request.name_file
email = sf.check_email_service(user_id)
if isinstance(email, res1.ReponseError):
return email
if name_file is None or name_file == "":
return res.ReponseError(
status=400,
data =res.Message(message="Name file is empty")
)
sf_dropbox.delete_file(email,name_file)
return res.ResponseDeleteFile(
status=200,
data =res.Message(message=f"delete {name_file} success")
)
except:
return res.ReponseError(
status=500,
data =res.Message(message=f"delete {name_file} error")
)
def download_folder(request:req.RequestDownLoadFolder):
try:
user_id = request.user_id
email = sf.check_email_service(user_id)
if isinstance(email, res1.ReponseError):
return email
sf_dropbox.download_folder(email)
return res.ResponseDownloadFolder(
status=200,
data =res.Message(message=f"Downloaded folder {email} success")
)
except:
return res.ReponseError(
status=500,
data =res.Message(message=f"Server error")
)
def download_file(request:req.RequestDownLoadFile):
try:
user_id = request.user_id
name_file = request.name_file
email = sf.check_email_service(user_id)
if isinstance(email, res1.ReponseError):
return email
if name_file is None or name_file == "":
return res.ReponseError(
status=400,
data =res.Message(message="name_file is empty")
)
sf_dropbox.search_and_download_file(name_file,email)
return res.ResponseDownloadFile(
status=200,
data =res.Message(message=f"Downloaded file '{name_file}' by email: '{email}' success")
)
except:
return res.ReponseError(
status=500,
data =res.Message(message=f"Server error")
)
ALLOWED_EXTENSIONS = {'csv', 'txt', 'doc', 'docx', 'pdf', 'xlsx', 'pptx', 'json','md'}
def allowed_file1(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def upload_files(request: req.RequestUploadFile):
try:
user_id = request.user_id
files = request.files
email = sf.check_email_service(user_id)
if isinstance(email, res1.ReponseError):
return email
for file in files:
if not allowed_file(file.filename):
return res.ReponseError(
status=415,
data =res.Message(message=f"File type not allow")
)
temp_dir = f"/code/temp/{email}"
os.makedirs(temp_dir, exist_ok=True)
file_path = os.path.join(temp_dir, file.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
cloud_path = f"/{email}/{file.filename}"
sf_dropbox.upload_file(file_path, cloud_path)
return res.ResponseUploadedFile(
status=200,
data =res.Message(message=f"Load file success")
)
except:
return res.ReponseError(
status=500,
data =res.Message(message=f"Load file error")
)
def deleteAllFile(request: req.RequestDeleteAllFile):
try:
user_id = request.user_id
email = sf.check_email_service(user_id)
if isinstance(email, res.ReponseError):
return email
sf_dropbox.delete_all_files_in_folder(email)
return res.ResponseDeleteAllFile(
status=200,
data=res.Message(message=f"Delete all file success")
)
except:
return res.ReponseError(
status=500,
data=res.Message(message=f"Delete all file error")
) |