audio-splitter / audio_sep_splitter.py
deepsync's picture
Update audio_sep_splitter.py
ed208bd verified
#!/usr/bin/python3
# Copyright (c) 2021 LALAL.AI
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import cgi
import json
import os
import sys
import time
from argparse import ArgumentParser
from urllib.parse import quote, unquote, urlencode
from urllib.request import urlopen, Request
CURRENT_DIR_PATH = os.path.dirname(os.path.realpath(__file__))
URL_API = "https://www.lalal.ai/api/"
def update_percent(pct):
pct = str(pct)
sys.stdout.write("\b" * len(pct))
sys.stdout.write(" " * len(pct))
sys.stdout.write("\b" * len(pct))
sys.stdout.write(pct)
sys.stdout.flush()
def make_content_disposition(filename, disposition="attachment"):
try:
filename.encode("ascii")
file_expr = f'filename="{filename}"'
except UnicodeEncodeError:
quoted = quote(filename)
file_expr = f"filename*=utf-8''{quoted}"
return f"{disposition}; {file_expr}"
def upload_file(file_path, license):
url_for_upload = URL_API + "upload/"
_, filename = os.path.split(file_path)
headers = {
"Content-Disposition": make_content_disposition(filename),
"Authorization": f"license {license}",
}
with open(file_path, "rb") as f:
request = Request(url_for_upload, f, headers)
with urlopen(request) as response:
upload_result = json.load(response)
if upload_result["status"] == "success":
return upload_result["id"]
else:
raise RuntimeError(upload_result["error"])
def split_file(file_id, license, stem, filter_type, splitter):
url_for_split = URL_API + "split/"
headers = {
"Authorization": f"license {license}",
}
query_args = {
"id": file_id,
"stem": stem,
"filter": filter_type,
"splitter": splitter,
}
encoded_args = urlencode(query_args).encode("utf-8")
request = Request(url_for_split, encoded_args, headers=headers)
with urlopen(request) as response:
split_result = json.load(response)
if split_result["status"] == "error":
raise RuntimeError(split_result["error"])
def check_file(file_id):
url_for_check = URL_API + "check/?"
query_args = {"id": file_id}
encoded_args = urlencode(query_args)
is_queueup = False
while True:
with urlopen(url_for_check + encoded_args) as response:
check_result = json.load(response)
if check_result["status"] == "error":
raise RuntimeError(check_result["error"])
task_state = check_result["task"]["state"]
if task_state == "error":
raise RuntimeError(check_result["task"]["error"])
if task_state == "progress":
progress = int(check_result["task"]["progress"])
if progress == 0 and not is_queueup:
print("Queue up...")
is_queueup = True
elif progress > 0:
update_percent(f"Progress: {progress}%")
if task_state == "success":
update_percent("Progress: 100%\n")
stem_track_url = check_result["split"]["stem_track"]
back_track_url = check_result["split"]["back_track"]
return stem_track_url, back_track_url
time.sleep(15)
def get_filename_from_content_disposition(header):
_, params = cgi.parse_header(header)
filename = params.get("filename")
if filename:
return filename
filename = params.get("filename*")
if filename:
encoding, quoted = filename.split("''")
unquoted = unquote(quoted, encoding)
return unquoted
raise ValueError("Invalid header Content-Disposition")
def download_file(url_for_download, output_path):
with urlopen(url_for_download) as response:
filename = get_filename_from_content_disposition(
response.headers["Content-Disposition"]
)
file_path = os.path.join(output_path, filename)
with open(file_path, "wb") as f:
while True:
chunk = response.read(8196)
if not chunk:
break
f.write(chunk)
return file_path
def batch_process_for_file(input_path, output_path, stem, filter_type, splitter):
license = os.environ.get("LALALAI_LICENCE")
try:
print(f'Uploading the file "{input_path}"...')
file_id = upload_file(file_path=input_path, license=license)
print(
f'The file "{input_path}" has been successfully uploaded (file id: {file_id})'
)
print(f'Processing the file "{input_path}"...')
split_file(file_id, license, stem, filter_type, splitter)
stem_track_url, back_track_url = check_file(file_id)
print(f'Downloading the stem track file "{stem_track_url}"...')
stem_file = download_file(stem_track_url, output_path)
print(f'The stem track file has been downloaded to "{stem_file}"')
print(f'Downloading the back track file "{back_track_url}"...')
back_track_file = download_file(back_track_url, output_path)
print(f'The back track file has been downloaded to "{back_track_file}"')
print(f'The file "{input_path}" has been successfully split')
return stem_file
except Exception as err:
print(f'Cannot process the file "{input_path}": {err}')
def batch_process(input_path, output_path, stem, filter_type, splitter):
if type(input_path) is list:
downloaded_files = []
for path in input_path:
if os.path.isfile(path):
downloaded_file = batch_process_for_file(path, output_path, stem, filter_type, splitter)
downloaded_files.append(downloaded_file)
return downloaded_files
if os.path.isfile(input_path):
downloaded_file = batch_process_for_file(input_path, output_path, stem, filter_type, splitter)
return downloaded_file
else:
downloaded_files = []
for path in os.listdir(input_path):
path = os.path.join(input_path, path)
if os.path.isfile(path):
downloaded_file = batch_process_for_file(path, output_path, stem, filter_type, splitter)
downloaded_files.append(downloaded_file)
return downloaded_files
def main():
parser = ArgumentParser(description="Lalalai splitter")
parser.add_argument(
"--input", type=str, required=True, help="Input directory or a file"
)
parser.add_argument(
"--output", type=str, default=CURRENT_DIR_PATH, help="Output directory"
)
parser.add_argument(
"--stem",
type=str,
default="vocals",
choices=[
"vocals",
"drum",
"bass",
"piano",
"electric_guitar",
"acoustic_guitar",
"synthesizer",
"voice",
"strings",
"wind",
],
help='Stem option. Stems "voice", "strings", "wind" are not supported by Cassiopeia',
)
parser.add_argument(
"--filter",
type=int,
default=1,
choices=[0, 1, 2],
help="0 (mild), 1 (normal), 2 (aggressive)",
)
parser.add_argument(
"--splitter",
type=str,
default="phoenix",
choices=["phoenix", "cassiopeia"],
help="The type of neural network used to split audio",
)
args = parser.parse_args()
os.makedirs(args.output, exist_ok=True)
batch_process(args.input, args.output, args.stem, args.filter, args.splitter)
if __name__ == "__main__":
try:
main()
except Exception as err:
print(err)