import gradio as gr import pixeltable as pxt from pixeltable.iterators import DocumentSplitter, FrameIterator, StringSplitter from pixeltable.functions.huggingface import sentence_transformer, clip_image, clip_text from pixeltable.functions.video import extract_audio from pixeltable.functions.audio import get_metadata from pixeltable.functions import openai import numpy as np import PIL.Image import os import getpass import requests import tempfile from datetime import datetime # Configuration PIXELTABLE_MEDIA_DIR = os.path.expanduser("~/.pixeltable/media") MAX_TOKENS_DEFAULT = 300 TEMPERATURE_DEFAULT = 0.7 CHUNK_SIZE_DEFAULT = 300 # Initialize API keys def init_api_keys(): if 'OPENAI_API_KEY' not in os.environ: os.environ['OPENAI_API_KEY'] = getpass.getpass('OpenAI API key:') # Common Utilities def initialize_pixeltable(dir_name='unified_app'): """Initialize Pixeltable directory""" pxt.drop_dir(dir_name, force=True) pxt.create_dir(dir_name) @pxt.udf def create_prompt(top_k_list: list[dict], question: str) -> str: """Create a standardized prompt format""" concat_top_k = '\n\n'.join(elt['text'] for elt in reversed(top_k_list)) return f''' PASSAGES: {concat_top_k} QUESTION: {question}''' # Document Processing class DocumentProcessor: @staticmethod def process_documents(pdf_files, chunk_limit, chunk_separator): """Process uploaded documents for chatbot functionality""" initialize_pixeltable() docs = pxt.create_table( 'unified_app.documents', {'document': pxt.Document} ) docs.insert({'document': file.name} for file in pdf_files if file.name.endswith('.pdf')) chunks = pxt.create_view( 'unified_app.chunks', docs, iterator=DocumentSplitter.create( document=docs.document, separators=chunk_separator, limit=chunk_limit if chunk_separator in ["token_limit", "char_limit"] else None ) ) chunks.add_embedding_index('text', string_embed=sentence_transformer.using(model_id='intfloat/e5-large-v2')) return "Documents processed successfully. You can start asking questions." @staticmethod def get_document_answer(question): """Get answer from processed documents""" try: chunks = pxt.get_table('unified_app.chunks') sim = chunks.text.similarity(question) relevant_chunks = chunks.order_by(sim, asc=False).limit(5).select(chunks.text).collect() context = "\n\n".join(chunk['text'] for chunk in relevant_chunks) temp_table = pxt.create_table( 'unified_app.temp_response', { 'question': pxt.String, 'context': pxt.String } ) temp_table.insert([{'question': question, 'context': context}]) temp_table.add_computed_column(response=openai.chat_completions( messages=[ { 'role': 'system', 'content': 'Answer the question based only on the provided context. If the context doesn\'t contain enough information, say so.' }, { 'role': 'user', 'content': f"Context:\n{context}\n\nQuestion: {question}" } ], model='gpt-4o-mini-2024-07-18' )) answer = temp_table.select( answer=temp_table.response.choices[0].message.content ).tail(1)['answer'][0] pxt.drop_table('unified_app.temp_response', force=True) return answer except Exception as e: return f"Error: {str(e)}" # Call Analysis class CallAnalyzer: @staticmethod def process_call(video_file): """Process and analyze call recordings""" try: initialize_pixeltable() calls = pxt.create_table( 'unified_app.calls', {"video": pxt.Video} ) calls.add_computed_column(audio=extract_audio(calls.video, format='mp3')) calls.add_computed_column(transcription=openai.transcriptions(audio=calls.audio, model='whisper-1')) calls.add_computed_column(text=calls.transcription.text) sentences = pxt.create_view( 'unified_app.sentences', calls, iterator=StringSplitter.create(text=calls.text, separators='sentence') ) sentences.add_embedding_index('text', string_embed=sentence_transformer.using(model_id='intfloat/e5-large-v2')) @pxt.udf def generate_insights(text: str) -> list[dict]: return [ {'role': 'system', 'content': 'Analyze this call transcript and provide key insights:'}, {'role': 'user', 'content': text} ] calls.add_computed_column(insights_prompt=generate_insights(calls.text)) calls.add_computed_column(insights=openai.chat_completions( messages=calls.insights_prompt, model='gpt-4o-mini-2024-07-18' ).choices[0].message.content) calls.insert([{"video": video_file}]) result = calls.select(calls.text, calls.audio, calls.insights).tail(1) return result['text'][0], result['audio'][0], result['insights'][0] except Exception as e: return f"Error processing call: {str(e)}", None, None # Video Search class VideoSearcher: @staticmethod def process_video(video_file): """Process video for searching""" try: initialize_pixeltable() videos = pxt.create_table('unified_app.videos', {'video': pxt.Video}) frames = pxt.create_view( 'unified_app.frames', videos, iterator=FrameIterator.create(video=videos.video, fps=1) ) # Embedding Functions frames.add_embedding_index('frame', string_embed=clip_text.using(model_id='openai/clip-vit-base-patch32'), image_embed=clip_image.using(model_id='openai/clip-vit-base-patch32') ) videos.insert([{'video': video_file.name}]) return "Video processed and indexed for search." except Exception as e: return f"Error processing video: {str(e)}" @staticmethod def search_video(search_type, text_query=None, image_query=None): """Search processed video frames""" try: frames = pxt.get_table('unified_app.frames') if search_type == "Text" and text_query: sim = frames.frame.similarity(text_query) elif search_type == "Image" and image_query is not None: sim = frames.frame.similarity(image_query) else: return [] results = frames.order_by(sim, asc=False).limit(5).select(frames.frame).collect() return [row['frame'] for row in results] except Exception as e: print(f"Search error: {str(e)}") return [] # Gradio Interface def create_interface(): with gr.Blocks(theme=gr.themes.Base()) as demo: # Header gr.HTML( """
Pixeltable is a declarative interface for working with text, images, embeddings, and video, enabling you to store, transform, index, and iterate on data.
Open Source AI Data infrastructure.
© 2024 Pixeltable | Apache License 2.0