Spaces:
Sleeping
Sleeping
import tempfile | |
import gradio as gr | |
import googleapiclient.discovery | |
import re | |
import yt_dlp | |
import whisper | |
from pydub import AudioSegment | |
from transformers import pipeline | |
from youtube_transcript_api import YouTubeTranscriptApi | |
import openai | |
import json | |
import os | |
from pytube import YouTube | |
from pytrends.request import TrendReq | |
import torch | |
from urllib.parse import urlparse, parse_qs | |
def extract_video_id(url): | |
"""Extracts the video ID from a YouTube URL.""" | |
try: | |
parsed_url = urlparse(url) | |
if "youtube.com" in parsed_url.netloc: | |
query_params = parse_qs(parsed_url.query) | |
return query_params.get('v', [None])[0] | |
elif "youtu.be" in parsed_url.netloc: | |
return parsed_url.path.strip("/") | |
else: | |
print("Invalid YouTube URL.") | |
return None | |
except Exception as e: | |
print(f"Error parsing URL: {e}") | |
return None | |
def get_video_duration(video_id, api_key): | |
"""Fetches the video duration in minutes.""" | |
try: | |
youtube = googleapiclient.discovery.build("youtube", "v3", developerKey=api_key) | |
request = youtube.videos().list(part="contentDetails", id=video_id) | |
response = request.execute() | |
if response["items"]: | |
duration = response["items"][0]["contentDetails"]["duration"] | |
match = re.match(r'PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?', duration) | |
hours = int(match.group(1)) if match.group(1) else 0 | |
minutes = int(match.group(2)) if match.group(2) else 0 | |
seconds = int(match.group(3)) if match.group(3) else 0 | |
return hours * 60 + minutes + seconds / 60 | |
else: | |
print("No video details found.") | |
return None | |
except Exception as e: | |
print(f"Error fetching video duration: {e}") | |
return None | |
def download_and_transcribe_with_whisper(youtube_url): | |
try: | |
# Temporary directory for storing the downloaded audio | |
with tempfile.TemporaryDirectory() as temp_dir: | |
temp_audio_file = os.path.join(temp_dir, "audio.mp4") # Pytube downloads in mp4 format | |
# Download audio using pytube | |
yt = YouTube(youtube_url) | |
audio_stream = yt.streams.filter(only_audio=True).first() # Get the first available audio stream | |
audio_stream.download(output_path=temp_dir, filename="audio.mp4") # Download audio to temp dir | |
# Convert the downloaded audio (mp4) to wav for Whisper | |
audio = AudioSegment.from_file(temp_audio_file) | |
wav_file = os.path.join(temp_dir, "audio.wav") | |
audio.export(wav_file, format="wav") | |
# Run Whisper transcription | |
model = whisper.load_model("turbo") | |
result = model.transcribe(wav_file) | |
transcript = result['text'] | |
return transcript | |
except Exception as e: | |
print(f"Error during transcription: {e}") | |
return None | |
def get_transcript_from_youtube_api(video_id, video_length): | |
"""Fetches transcript using YouTube API if available.""" | |
try: | |
# Fetch available transcripts | |
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) | |
# Look for manually created transcripts first | |
for transcript in transcript_list: | |
if not transcript.is_generated: # This checks for manually created transcripts | |
manual_transcript = transcript.fetch() | |
# Check if manual_transcript is iterable (should be a list) | |
if isinstance(manual_transcript, list): | |
full_transcript = " ".join([segment['text'] for segment in manual_transcript]) | |
return full_transcript # Return manual transcript immediately | |
else: | |
print("Manual transcript is not iterable.") | |
return None | |
# If no manual transcript found, proceed to auto-generated transcript | |
if video_length > 15: | |
# Video is longer than 15 minutes, so use auto-generated transcript | |
print("Video is longer than 15 minutes, using auto-generated transcript.") | |
auto_transcript = transcript_list.find_generated_transcript(['en']) | |
if auto_transcript: | |
# Extract the text from the auto-generated transcript | |
full_transcript = " ".join([segment['text'] for segment in auto_transcript.fetch()]) | |
return full_transcript # Return auto-generated transcript | |
else: | |
print("No auto-generated transcript available.") | |
return None | |
else: | |
# Video is shorter than 15 minutes, use Whisper for transcription | |
print("Video is shorter than 15 minutes, using Whisper for transcription.") | |
return None # This will be handled by Whisper in your main function | |
except Exception as e: | |
print(f"Error fetching transcript: {e}") | |
return None | |
def get_transcript(youtube_url, api_key): | |
"""Gets transcript from YouTube API or Whisper if unavailable.""" | |
video_id = youtube_url.split("v=")[-1] # Extract the video ID from URL | |
video_length = get_video_duration(video_id, api_key) | |
if video_length is not None: | |
print(f"Video length: {video_length} minutes.") | |
# Fetch transcript using YouTube API | |
transcript = get_transcript_from_youtube_api(video_id, video_length) | |
# If a transcript is found from YouTube, use it | |
if transcript: | |
print("Transcript found.") | |
return transcript | |
else: | |
# No transcript found from YouTube API, proceed with Whisper | |
print("No transcript found on YouTube, using Whisper for transcription.") | |
return download_and_transcribe_with_whisper(youtube_url) # Use Whisper for short videos | |
else: | |
print("Error fetching video duration.") | |
return None | |
def summarize_text_huggingface(text): | |
"""Summarizes text using a Hugging Face summarization model.""" | |
summarizer = pipeline("summarization", model="facebook/bart-large-cnn", device=0 if torch.cuda.is_available() else -1) | |
max_input_length = 1024 | |
chunk_overlap = 100 | |
text_chunks = [ | |
text[i:i + max_input_length] | |
for i in range(0, len(text), max_input_length - chunk_overlap) | |
] | |
summaries = [ | |
summarizer(chunk, max_length=100, min_length=50, do_sample=False)[0]['summary_text'] | |
for chunk in text_chunks | |
] | |
return " ".join(summaries) | |
def generate_optimized_content(api_key, summarized_transcript): | |
openai.api_key = api_key | |
prompt = f""" | |
Analyze the following summarized YouTube video transcript and: | |
1. Extract the top 10 keywords. | |
2. Generate an optimized title (less than 65 characters). | |
3. Create an engaging description. | |
4. Generate related tags for the video. | |
Summarized Transcript: | |
{summarized_transcript} | |
Provide the results in the following JSON format: | |
{{ | |
"keywords": ["keyword1", "keyword2", ..., "keyword10"], | |
"title": "Generated Title", | |
"description": "Generated Description", | |
"tags": ["tag1", "tag2", ..., "tag10"] | |
}} | |
""" | |
try: | |
# Use the updated OpenAI API format for chat completions | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[ | |
{"role": "system", "content": "You are an SEO expert."}, | |
{"role": "user", "content": prompt} | |
] | |
) | |
# Extract and parse the response | |
response_content = response['choices'][0]['message']['content'] | |
content = json.loads(response_content) | |
return content | |
except Exception as e: | |
print(f"Error generating content: {e}") | |
return None | |
YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY") | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
# Add all your functions like `extract_video_id()`, `get_transcript()`, etc. | |
# Gradio Function for YouTube SEO | |
def youtube_seo_pipeline(youtube_url): | |
print("Starting the SEO pipeline...") # Debugging line | |
if not YOUTUBE_API_KEY or not OPENAI_API_KEY: | |
return "API keys missing! Please check environment variables." | |
print("Extracting video ID...") | |
video_id = extract_video_id(youtube_url) | |
if not video_id: | |
return "Invalid YouTube URL." | |
print(f"Video ID: {video_id}") | |
print("Fetching transcript...") | |
transcript = get_transcript(youtube_url, YOUTUBE_API_KEY) | |
print(transcript) | |
if not transcript: | |
return "Failed to fetch transcript. Try another video." | |
print("Summarizing transcript...") | |
summarized_text = summarize_text_huggingface(transcript) | |
print(f"Summarized Text: {summarized_text[:200]}...") # Show only the first 200 chars | |
print("Generating optimized content...") | |
optimized_content = generate_optimized_content(OPENAI_API_KEY, summarized_text) | |
if optimized_content: | |
return json.dumps(optimized_content, indent=4) | |
else: | |
return "Failed to generate SEO content." | |
# Define Gradio Interface | |
iface = gr.Interface( | |
fn=youtube_seo_pipeline, | |
inputs="text", | |
outputs="text", | |
title="YouTube SEO Optimizer", | |
description="Enter a YouTube video URL to fetch and optimize SEO content (title, description, tags, and keywords)." | |
) | |
# Launch Gradio App | |
if __name__ == "__main__": | |
iface.launch() |