import os import subprocess import importlib.util import gradio as gr import logging #from moviepy.editor import VideoFileClip import torch #import spaces ACCESS_KEY = os.getenv("ACCESS_KEY") ''' torch.use_deterministic_algorithms(True) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False torch.backends.cuda.matmul.allow_tf32 = False torch.backends.cudnn.allow_tf32 = False def truncate_video(video_file): clip = VideoFileClip(video_file) truncated_clip = clip.subclip(0, min(15, clip.duration)) truncated_video_file = "temp_truncated_video.mp4" truncated_clip.write_videofile(truncated_video_file, codec="libx264", audio_codec="aac") return truncated_video_file ''' def clone_repo(): repo_url = "https://github.com/NeeravSood/AllMark-MVP.git" repo_path = "./repository" github_pat = os.getenv("GITHUB_PAT") if not github_pat: raise RuntimeError("GitHub Personal Access Token (GITHUB_PAT) not found in environment variables.") authenticated_repo_url = f"https://{github_pat}@github.com/NeeravSood/AllMark-MVP.git" if os.path.exists(repo_path): print("Repository already cloned.") else: try: subprocess.run( ["git", "clone", authenticated_repo_url, repo_path], check=True, text=True, capture_output=True ) print("Repository cloned successfully.") except subprocess.CalledProcessError as e: print("Output:", e.stdout) print("Error:", e.stderr) raise RuntimeError(f"Failed to clone repository: {e.stderr}") def import_backend_script(script_name): """Dynamically import the backend script.""" try: script_path = os.path.join("./repository", script_name) if not os.path.exists(script_path): raise FileNotFoundError(f"Script {script_name} not found in the repository.") spec = importlib.util.spec_from_file_location("backend_module", script_path) backend_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(backend_module) return backend_module except Exception as e: logging.error(f"Error importing backend script: {str(e)}") raise RuntimeError(f"Failed to import backend script: {str(e)}") # Run repository setup and model import clone_repo() backend = import_backend_script("app.py") analyzer = backend.DeepfakeAnalyzer() #@spaces.GPU(duration=3000) def analyze_video(video_file): if ACCESS_KEY is None: logging.error("Access key not set in environment variables.") return {"error": "Server misconfiguration. Access key not set."} try: #truncated_video = truncate_video(video_file) # Truncate the video for faster analysis #results = analyzer.analyze_media(truncated_video) results = analyzer.analyze_media(video_file) # Extract frame count and video probability frame_count = len(results['video_analysis']['frame_results']) video_probability = results['video_analysis']['probability'] combined_assessment = results.get('combined_assessment', "Inconclusive") # Reapply combined_results logic for front-end clarity if frame_count < 300: assessment = "Inconclusive due to lack of sufficient frames" elif frame_count < 500: threshold = 0.4 assessment = "Deepfake" if video_probability >= threshold else "Genuine" else: threshold = 0.5 assessment = "Deepfake" if video_probability >= threshold else "Genuine" # Construct output message message = ( f"According to our analysis, the video you uploaded appears to be {assessment}. " f"{frame_count} frames were analyzed in total." ) # Return the final output output = { "message": message, "details": { "video_probability": video_probability, "combined_assessment": assessment # Final assessment based on reapplied logic } } return output except Exception as e: logging.error(f"Error during analysis: {e}") return {"error": "An error occurred during video analysis. Please check your input and try again."} interface = gr.Interface( fn=analyze_video, inputs=gr.Video(label="Upload Video"), outputs="json", title="AllMark - Deepfake and AI Video Analyzer", description="Upload a MP4 video for analysis. Multiple model iterations are being tested, so results are slow and may vary for the same video. Our Models 2 and 3 shall be available on AWS." ) if __name__ == "__main__": interface.launch()