Spaces:
Sleeping
Sleeping
Update processors/input_processor.py
Browse files- processors/input_processor.py +21 -55
processors/input_processor.py
CHANGED
@@ -7,11 +7,7 @@ from langchain_community.document_loaders.generic import GenericLoader
|
|
7 |
from langchain_community.document_loaders.parsers.audio import OpenAIWhisperParser
|
8 |
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
9 |
from youtube_transcript_api import YouTubeTranscriptApi
|
10 |
-
from transformers import pipeline
|
11 |
import re
|
12 |
-
from pytube import YouTube
|
13 |
-
import os
|
14 |
-
import tempfile
|
15 |
|
16 |
class ContentProcessor:
|
17 |
def __init__(self):
|
@@ -19,12 +15,6 @@ class ContentProcessor:
|
|
19 |
chunk_size=1000,
|
20 |
chunk_overlap=200
|
21 |
)
|
22 |
-
# Initialize the transcriber once during startup
|
23 |
-
self.transcriber = pipeline(
|
24 |
-
"automatic-speech-recognition",
|
25 |
-
model="openai/whisper-small",
|
26 |
-
device="cpu" # or "cuda" if GPU is available
|
27 |
-
)
|
28 |
|
29 |
def process_pdf(self, file_path):
|
30 |
loader = PyPDFLoader(file_path)
|
@@ -37,57 +27,17 @@ class ContentProcessor:
|
|
37 |
return pages
|
38 |
|
39 |
def process_youtube(self, video_url):
|
|
|
40 |
video_id = self._extract_video_id(video_url)
|
41 |
if not video_id:
|
42 |
raise ValueError("Invalid YouTube URL")
|
43 |
|
44 |
try:
|
45 |
-
#
|
46 |
-
|
47 |
-
except Exception as e:
|
48 |
-
# Second attempt: Download audio and transcribe
|
49 |
-
try:
|
50 |
-
return self._transcribe_audio(video_url)
|
51 |
-
except Exception as audio_error:
|
52 |
-
raise Exception(f"Failed to process video. No subtitles available and audio transcription failed: {str(audio_error)}")
|
53 |
-
|
54 |
-
def _extract_video_id(self, url):
|
55 |
-
# Extract video ID from YouTube URL
|
56 |
-
patterns = [
|
57 |
-
r'(?:v=|\/)([0-9A-Za-z_-]{11}).*',
|
58 |
-
r'(?:youtu\.be\/)([0-9A-Za-z_-]{11})',
|
59 |
-
]
|
60 |
-
for pattern in patterns:
|
61 |
-
match = re.search(pattern, url)
|
62 |
-
if match:
|
63 |
-
return match.group(1)
|
64 |
-
return None
|
65 |
-
|
66 |
-
def _get_transcript_via_api(self, video_id):
|
67 |
-
transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
|
68 |
-
full_transcript = " ".join([entry['text'] for entry in transcript_list])
|
69 |
-
|
70 |
-
from langchain.schema import Document
|
71 |
-
doc = Document(
|
72 |
-
page_content=full_transcript,
|
73 |
-
metadata={"source": f"https://www.youtube.com/watch?v={video_id}"}
|
74 |
-
)
|
75 |
-
|
76 |
-
return self.text_splitter.split_documents([doc])
|
77 |
-
|
78 |
-
def _transcribe_audio(self, video_url):
|
79 |
-
# Download audio using pytube
|
80 |
-
yt = YouTube(video_url)
|
81 |
-
audio_stream = yt.streams.filter(only_audio=True).first()
|
82 |
-
|
83 |
-
# Create temporary directory for audio file
|
84 |
-
with tempfile.TemporaryDirectory() as temp_dir:
|
85 |
-
audio_file = os.path.join(temp_dir, "audio.mp4")
|
86 |
-
audio_stream.download(output_path=temp_dir, filename="audio.mp4")
|
87 |
|
88 |
-
#
|
89 |
-
|
90 |
-
full_transcript = result['text']
|
91 |
|
92 |
# Create a document-like structure
|
93 |
from langchain.schema import Document
|
@@ -98,6 +48,22 @@ class ContentProcessor:
|
|
98 |
|
99 |
# Split the document
|
100 |
return self.text_splitter.split_documents([doc])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
101 |
|
102 |
def process_audio(self, audio_file):
|
103 |
loader = GenericLoader(
|
|
|
7 |
from langchain_community.document_loaders.parsers.audio import OpenAIWhisperParser
|
8 |
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
9 |
from youtube_transcript_api import YouTubeTranscriptApi
|
|
|
10 |
import re
|
|
|
|
|
|
|
11 |
|
12 |
class ContentProcessor:
|
13 |
def __init__(self):
|
|
|
15 |
chunk_size=1000,
|
16 |
chunk_overlap=200
|
17 |
)
|
|
|
|
|
|
|
|
|
|
|
|
|
18 |
|
19 |
def process_pdf(self, file_path):
|
20 |
loader = PyPDFLoader(file_path)
|
|
|
27 |
return pages
|
28 |
|
29 |
def process_youtube(self, video_url):
|
30 |
+
# Extract video ID from URL
|
31 |
video_id = self._extract_video_id(video_url)
|
32 |
if not video_id:
|
33 |
raise ValueError("Invalid YouTube URL")
|
34 |
|
35 |
try:
|
36 |
+
# Get transcript directly using youtube_transcript_api
|
37 |
+
transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
38 |
|
39 |
+
# Combine all transcript pieces
|
40 |
+
full_transcript = " ".join([entry['text'] for entry in transcript_list])
|
|
|
41 |
|
42 |
# Create a document-like structure
|
43 |
from langchain.schema import Document
|
|
|
48 |
|
49 |
# Split the document
|
50 |
return self.text_splitter.split_documents([doc])
|
51 |
+
|
52 |
+
except Exception as e:
|
53 |
+
raise Exception(f"Error getting transcript: {str(e)}")
|
54 |
+
|
55 |
+
def _extract_video_id(self, url):
|
56 |
+
# Handle different YouTube URL formats
|
57 |
+
patterns = [
|
58 |
+
r'(?:youtube\.com\/watch\?v=|youtu.be\/|youtube.com\/embed\/)([^&\n?]*)',
|
59 |
+
r'(?:youtube\.com\/shorts\/)([^&\n?]*)'
|
60 |
+
]
|
61 |
+
|
62 |
+
for pattern in patterns:
|
63 |
+
match = re.search(pattern, url)
|
64 |
+
if match:
|
65 |
+
return match.group(1)
|
66 |
+
return None
|
67 |
|
68 |
def process_audio(self, audio_file):
|
69 |
loader = GenericLoader(
|