Spaces:
Running
Running
import os | |
import re | |
import shutil | |
import requests | |
import gradio as gr | |
from bs4 import BeautifulSoup | |
from pydub import AudioSegment | |
from datetime import datetime, timedelta | |
TMP_DIR = "./__pycache__" | |
def get_prev_day(date_str): | |
date_format = "%Y/%m/%d" | |
date_obj = datetime.strptime(date_str, date_format) | |
previous_day = date_obj - timedelta(days=1) | |
return previous_day.strftime(date_format) | |
def remove_end_seconds(input_file: str, output_file: str, seconds: float): | |
audio = AudioSegment.from_file(input_file) | |
remove_ms = seconds * 1000 | |
new_audio = audio[:-remove_ms] | |
new_audio.export(output_file, format="mp3") | |
def get_first_integer(input_string: str): | |
match = re.search(r"\d+", input_string) | |
if match: | |
return str(int(match.group())) | |
else: | |
return "" | |
def create_dir(dirpath=TMP_DIR): | |
if not os.path.exists(dirpath): | |
os.makedirs(dirpath) | |
def clean_dir(dirpath=TMP_DIR): | |
if os.path.exists(dirpath): | |
shutil.rmtree(dirpath) | |
def download_mp3(url: str, local_filename: str): | |
try: | |
response = requests.get(url) | |
if response.status_code == 200: | |
with open(local_filename, "wb") as f: | |
f.write(response.content) | |
print(f"Successfully downloaded: {local_filename}") | |
remove_end_seconds(local_filename, local_filename, 3.1) | |
return True | |
else: | |
if response.status_code == 404: | |
bad_date = "/".join(url.split("/audio/")[-1].split("/")[:-1]) | |
fixed_date = get_prev_day(bad_date) | |
fixed_url = url.replace(bad_date, fixed_date) | |
return download_mp3(fixed_url, local_filename) | |
print(f"Error: {response.status_code}, {response.text}") | |
return False | |
except Exception as e: | |
print(f"Error: {e}") | |
return False | |
def get_sound_time(page_url): | |
response = requests.get(page_url) | |
soup = BeautifulSoup(response.text, "html.parser") | |
audio_time_span = soup.find("span", class_="audioTime") | |
audio_time = audio_time_span.text if audio_time_span else None | |
if audio_time: | |
return audio_time.replace("-", "/") | |
return "" | |
def infer(page_url: str, date: str): | |
clean_dir() | |
domain = "https://www.lizhi.fm/" | |
fail_voice = "./fail.mp3" | |
if not page_url: | |
return fail_voice | |
if domain in page_url: | |
sound_id = get_first_integer(page_url.split("/")[-1]) | |
if not sound_id.isdigit(): | |
return fail_voice | |
else: | |
return fail_voice | |
voice_time = date.strip().replace("-", "/") # voice_time = get_sound_time(page_url) | |
mp3_url = f"http://cdn5.lizhi.fm/audio/{voice_time}/{sound_id}_hd.mp3" | |
outpath = f"{TMP_DIR}/{sound_id}.mp3" | |
create_dir() | |
if download_mp3(mp3_url, outpath): | |
return outpath | |
else: | |
return fail_voice | |
if __name__ == "__main__": | |
gr.Interface( | |
fn=infer, | |
inputs=[ | |
gr.Textbox( | |
label="Enter the sound page URL", | |
placeholder="https://www.lizhi.fm/*/*", | |
show_copy_button=True, | |
), | |
gr.Textbox( | |
label="Enter sound publication date in format", | |
placeholder="YYYY-MM-DD", | |
show_copy_button=True, | |
), | |
], | |
outputs=gr.Audio( | |
label="Download MP3", | |
show_download_button=True, | |
), | |
flagging_mode="never", | |
).launch() | |