Spaces:
Running
Running
#!/usr/bin/python3 | |
# Copyright (c) 2021 LALAL.AI | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in all | |
# copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
import cgi | |
import json | |
import os | |
import sys | |
import time | |
from argparse import ArgumentParser | |
from urllib.parse import quote, unquote, urlencode | |
from urllib.request import urlopen, Request | |
CURRENT_DIR_PATH = os.path.dirname(os.path.realpath(__file__)) | |
URL_API = "https://www.lalal.ai/api/" | |
def update_percent(pct): | |
pct = str(pct) | |
sys.stdout.write("\b" * len(pct)) | |
sys.stdout.write(" " * len(pct)) | |
sys.stdout.write("\b" * len(pct)) | |
sys.stdout.write(pct) | |
sys.stdout.flush() | |
def make_content_disposition(filename, disposition="attachment"): | |
try: | |
filename.encode("ascii") | |
file_expr = f'filename="{filename}"' | |
except UnicodeEncodeError: | |
quoted = quote(filename) | |
file_expr = f"filename*=utf-8''{quoted}" | |
return f"{disposition}; {file_expr}" | |
def upload_file(file_path, license): | |
url_for_upload = URL_API + "upload/" | |
_, filename = os.path.split(file_path) | |
headers = { | |
"Content-Disposition": make_content_disposition(filename), | |
"Authorization": f"license {license}", | |
} | |
with open(file_path, "rb") as f: | |
request = Request(url_for_upload, f, headers) | |
with urlopen(request) as response: | |
upload_result = json.load(response) | |
if upload_result["status"] == "success": | |
return upload_result["id"] | |
else: | |
raise RuntimeError(upload_result["error"]) | |
def split_file(file_id, license, stem, filter_type, splitter): | |
url_for_split = URL_API + "split/" | |
headers = { | |
"Authorization": f"license {license}", | |
} | |
query_args = { | |
"id": file_id, | |
"stem": stem, | |
"filter": filter_type, | |
"splitter": splitter, | |
} | |
encoded_args = urlencode(query_args).encode("utf-8") | |
request = Request(url_for_split, encoded_args, headers=headers) | |
with urlopen(request) as response: | |
split_result = json.load(response) | |
if split_result["status"] == "error": | |
raise RuntimeError(split_result["error"]) | |
def check_file(file_id): | |
url_for_check = URL_API + "check/?" | |
query_args = {"id": file_id} | |
encoded_args = urlencode(query_args) | |
is_queueup = False | |
while True: | |
with urlopen(url_for_check + encoded_args) as response: | |
check_result = json.load(response) | |
if check_result["status"] == "error": | |
raise RuntimeError(check_result["error"]) | |
task_state = check_result["task"]["state"] | |
if task_state == "error": | |
raise RuntimeError(check_result["task"]["error"]) | |
if task_state == "progress": | |
progress = int(check_result["task"]["progress"]) | |
if progress == 0 and not is_queueup: | |
print("Queue up...") | |
is_queueup = True | |
elif progress > 0: | |
update_percent(f"Progress: {progress}%") | |
if task_state == "success": | |
update_percent("Progress: 100%\n") | |
stem_track_url = check_result["split"]["stem_track"] | |
back_track_url = check_result["split"]["back_track"] | |
return stem_track_url, back_track_url | |
time.sleep(15) | |
def get_filename_from_content_disposition(header): | |
_, params = cgi.parse_header(header) | |
filename = params.get("filename") | |
if filename: | |
return filename | |
filename = params.get("filename*") | |
if filename: | |
encoding, quoted = filename.split("''") | |
unquoted = unquote(quoted, encoding) | |
return unquoted | |
raise ValueError("Invalid header Content-Disposition") | |
def download_file(url_for_download, output_path): | |
with urlopen(url_for_download) as response: | |
filename = get_filename_from_content_disposition( | |
response.headers["Content-Disposition"] | |
) | |
file_path = os.path.join(output_path, filename) | |
with open(file_path, "wb") as f: | |
while True: | |
chunk = response.read(8196) | |
if not chunk: | |
break | |
f.write(chunk) | |
return file_path | |
def batch_process_for_file(input_path, output_path, stem, filter_type, splitter): | |
license = os.environ.get("LALALAI_LICENCE") | |
try: | |
print(f'Uploading the file "{input_path}"...') | |
file_id = upload_file(file_path=input_path, license=license) | |
print( | |
f'The file "{input_path}" has been successfully uploaded (file id: {file_id})' | |
) | |
print(f'Processing the file "{input_path}"...') | |
split_file(file_id, license, stem, filter_type, splitter) | |
stem_track_url, back_track_url = check_file(file_id) | |
print(f'Downloading the stem track file "{stem_track_url}"...') | |
stem_file = download_file(stem_track_url, output_path) | |
print(f'The stem track file has been downloaded to "{stem_file}"') | |
print(f'Downloading the back track file "{back_track_url}"...') | |
back_track_file = download_file(back_track_url, output_path) | |
print(f'The back track file has been downloaded to "{back_track_file}"') | |
print(f'The file "{input_path}" has been successfully split') | |
return stem_file | |
except Exception as err: | |
print(f'Cannot process the file "{input_path}": {err}') | |
def batch_process(input_path, output_path, stem, filter_type, splitter): | |
if type(input_path) is list: | |
downloaded_files = [] | |
for path in input_path: | |
if os.path.isfile(path): | |
downloaded_file = batch_process_for_file(path, output_path, stem, filter_type, splitter) | |
downloaded_files.append(downloaded_file) | |
return downloaded_files | |
if os.path.isfile(input_path): | |
downloaded_file = batch_process_for_file(input_path, output_path, stem, filter_type, splitter) | |
return downloaded_file | |
else: | |
downloaded_files = [] | |
for path in os.listdir(input_path): | |
path = os.path.join(input_path, path) | |
if os.path.isfile(path): | |
downloaded_file = batch_process_for_file(path, output_path, stem, filter_type, splitter) | |
downloaded_files.append(downloaded_file) | |
return downloaded_files | |
def main(): | |
parser = ArgumentParser(description="Lalalai splitter") | |
parser.add_argument( | |
"--input", type=str, required=True, help="Input directory or a file" | |
) | |
parser.add_argument( | |
"--output", type=str, default=CURRENT_DIR_PATH, help="Output directory" | |
) | |
parser.add_argument( | |
"--stem", | |
type=str, | |
default="vocals", | |
choices=[ | |
"vocals", | |
"drum", | |
"bass", | |
"piano", | |
"electric_guitar", | |
"acoustic_guitar", | |
"synthesizer", | |
"voice", | |
"strings", | |
"wind", | |
], | |
help='Stem option. Stems "voice", "strings", "wind" are not supported by Cassiopeia', | |
) | |
parser.add_argument( | |
"--filter", | |
type=int, | |
default=1, | |
choices=[0, 1, 2], | |
help="0 (mild), 1 (normal), 2 (aggressive)", | |
) | |
parser.add_argument( | |
"--splitter", | |
type=str, | |
default="phoenix", | |
choices=["phoenix", "cassiopeia"], | |
help="The type of neural network used to split audio", | |
) | |
args = parser.parse_args() | |
os.makedirs(args.output, exist_ok=True) | |
batch_process(args.input, args.output, args.stem, args.filter, args.splitter) | |
if __name__ == "__main__": | |
try: | |
main() | |
except Exception as err: | |
print(err) | |