Spaces:
Sleeping
Sleeping
from huggingface_hub import HfApi | |
from huggingface_hub import CommitOperationAdd | |
from urllib.parse import urlparse | |
import os | |
import random | |
import requests | |
import shutil | |
import gradio as gr | |
hd_total, hd_used, hd_free = shutil.disk_usage("/") | |
hd_text = "Total " + str(hd_total // (2 ** 30)) + " GiB\nUsed " + str(hd_used // (2 ** 30)) + " GiB\nFree " + str(hd_free // (2 ** 30)) + " GiB" | |
def copy(url, repo, token, override, dest): | |
try: | |
if not url: | |
return "Oops, you forgot the remote URL!" | |
if not repo: | |
return "Oops, you forgot the repo name!" | |
if not token: | |
return "Oops, you forgot the token for write access!" | |
# Create download directory | |
localPath = os.path.join(os.getcwd(), generateRandomHexString(16)) | |
os.mkdir(localPath) | |
# Determine filename | |
if dest: | |
filename = dest | |
else: | |
filename = getFilename(url) | |
filePath = os.path.join(localPath, filename) | |
r = requests.get(url, allow_redirects=True) | |
open(filePath, 'wb').write(r.content) | |
print('File downloaded as: ' + filePath) | |
# Hf login | |
api = getHfApi(token) | |
user = api.whoami() | |
repoId = user['name'] + '/' + repo | |
# Create the repo if not exist | |
api.create_repo( | |
repo_id=repoId, | |
private=True, | |
exist_ok=True | |
) | |
# Delete file if already exists on repo and override is set to True | |
if fileExistsOnRepo(api, repoId, filename) and override: | |
deleteFileOnRepo(api, repoId, filename) | |
# Upload the file | |
api.create_commit( | |
repo_id=repoId, | |
operations=[ | |
CommitOperationAdd( | |
path_in_repo=filename, | |
path_or_fileobj=filePath, | |
) | |
], | |
commit_message='Uploaded from ' + url | |
) | |
return 'Done!' | |
except Exception as e: | |
return e | |
def deleteFileOnRepo(api, repoId, filename): | |
try: | |
api.delete_file( | |
repo_id=repoId, | |
path_in_repo=filename, | |
) | |
print('File ' + filename + ' deleted from repo') | |
except BaseException as e: | |
raise Exception('Failed to delete file ' + filename + ' on repo!') | |
print(repr(e)) | |
def generateRandomHexString(size): | |
return ''.join(random.choices('0123456789abcdef', k=size)) | |
def getFilename(url): | |
r = requests.head(url, allow_redirects=True) | |
u = urlparse(r.url) | |
filename = os.path.basename(u.path) | |
if len(filename) == 0 or len(filename) > 128: | |
return generateRandomHexString(8) + ".to_rename" | |
return filename | |
def getHfApi(token): | |
try: | |
api = HfApi(token=token) | |
return api | |
except BaseException as e: | |
raise Exception('Authorisation error') | |
print(repr(e)) | |
def fileExistsOnRepo(api, repoId, filename): | |
repoFiles = api.list_repo_files(repo_id=repoId) | |
for repoFile in repoFiles: | |
if (repoFile == filename): | |
return True | |
return False | |
with gr.Blocks() as app: | |
gr.Markdown(hd_text) | |
url = gr.Textbox(label="Remote URL", placeholder="Enter the full remote URL here...") | |
repo = gr.Textbox(label="Repo name (model)") | |
token = gr.Textbox(label="Write token") | |
override = gr.Checkbox(label="Override if same file name exists, otherwise abort") | |
destinationName = gr.Textbox(label="File name (include path) to save on the repo. Leave blank if you want to use the name from the URL. (random string will be used if fail to retrieve valid name from the URL") | |
submitBtn = gr.Button("Copy Now") | |
submitBtn.click( | |
copy, | |
[ | |
url, | |
repo, | |
token, | |
override, | |
destinationName, | |
], | |
outputs=gr.Textbox(label="Result", lines=4) | |
) | |
app.launch() | |