Spaces:
Runtime error
Runtime error
File size: 5,028 Bytes
bfdf8df |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
import os
import re
import time
import torch
import scrapetube
from pytube import YouTube
from faster_whisper import WhisperModel
from tqdm import tqdm
# Available models:
# tiny.en, tiny, base.en, base, small.en, small, medium.en, medium
# large-v1, large-v2, large-v3, large
MODEL_NAME = "large-v3"
AUDIO_SAVE_PATH = 'datasets/huggingface_audio/'
TRANSCRIPTS_SAVE_PATH = 'datasets/huggingface_audio_transcribed/'
if torch.cuda.is_available():
# requires: conda install -c anaconda cudnn
print(f"Using {MODEL_NAME} on GPU and float16")
model = WhisperModel(MODEL_NAME, device="cuda", compute_type="float16", device_index=[5])
else:
print(f"Using {MODEL_NAME} on CPU and int8")
model = WhisperModel(MODEL_NAME, device="cpu", compute_type="int8")
def replace_unallowed_chars(filename: str) -> str:
unallowed_chars = [' ', '/', '\\', ':', '*', '?', '"', '<', '>', '|']
for char in unallowed_chars:
filename = filename.replace(char, '_')
return filename
def get_videos_urls(channel_url: str) -> list[str]:
videos = scrapetube.get_channel(channel_url=channel_url)
return [
f"https://www.youtube.com/watch?v={video['videoId']}"
for video in videos
]
def get_audio_from_video(video_url: str, save_path: str) -> tuple[str, int, str, int]:
yt = YouTube(video_url)
if check_if_file_exists(yt.title, save_path):
print(f'Audio already exists for: {yt.title}')
return (video_url, yt.title.replace(" ", "_")+".mp3", yt.title, yt.length)
else:
print(f'Downloading audio for: {yt.title}')
video = yt.streams.filter(only_audio=True).first()
out_file = video.download(output_path=save_path)
base, ext = os.path.splitext(out_file)
new_filename = save_path + replace_unallowed_chars(yt.title) + '.mp3'
print(f'Saving audio to: {new_filename}')
os.rename(out_file, new_filename)
print(f'Video length: {yt.length} seconds')
return (video_url, new_filename, yt.title, yt.length)
def check_if_file_exists(filename: str, save_path: str) -> bool:
title = filename.replace(' ', '_')
return any([
title in filename_
for filename_ in os.listdir(save_path)
])
def transcript_from_audio(audio_path: str) -> dict[str, list[str]]:
segments, info = model.transcribe(audio_path, beam_size=10)
return list(segments)
def process_text(text: str) -> str:
text = text.strip()
text = re.sub('\s+', ' ', text)
return text
def merge_transcripts_segements(
segments: list[str],
file_title: str,
num_segments_to_merge: int = 5,
) -> dict[str, list[str]]:
merged_segments = {}
temp_text = ''
start_time = None
end_time = None
for i, segment in enumerate(segments):
if i % num_segments_to_merge == 0:
start_time = segment.start
end_time = segment.end
temp_text += segment.text + ' '
if (i + 1) % num_segments_to_merge == 0 or i == len(segments) - 1:
key = f'{start_time:.2f}_{end_time:.2f}'
merged_segments[key] = process_text(temp_text)
temp_text = ''
return merged_segments
def main():
if not os.path.exists(AUDIO_SAVE_PATH):
os.makedirs(AUDIO_SAVE_PATH)
if not os.path.exists(TRANSCRIPTS_SAVE_PATH):
os.makedirs(TRANSCRIPTS_SAVE_PATH)
print('Getting videos urls')
videos_urls = get_videos_urls('https://www.youtube.com/@HuggingFace')
print('Downloading audio files')
audio_data = []
for video_url in tqdm(videos_urls):
try:
audio_data.append(
get_audio_from_video(video_url, save_path=AUDIO_SAVE_PATH)
)
except Exception as e:
print(f'Error downloading video: {video_url}')
print(e)
print('Transcribing audio files')
for video_url, filename, title, audio_length in tqdm(audio_data):
if check_if_file_exists(title, TRANSCRIPTS_SAVE_PATH):
print(f'Transcript already exists for: {title}')
continue
try:
print(f'Transcribing: {title}')
start_time = time.time()
segments = transcript_from_audio(filename)
print(f'Transcription took: {time.time() - start_time:.1f} seconds')
merged_segments = merge_transcripts_segements(
segments,
title,
num_segments_to_merge=10
)
# save transcripts to separate files
title = replace_unallowed_chars(title)
for segment, text in merged_segments.items():
with open(f'{TRANSCRIPTS_SAVE_PATH}{title}_{segment}.txt', 'w') as f:
video_url_with_time = f'{video_url}&t={float(segment.split("_")[0]):.0f}'
f.write(f'source: {video_url_with_time}\n\n' + text)
except Exception as e:
print(f'Error transcribing: {title}')
print(e)
if __name__ == '__main__':
main()
|