Spaces:
Runtime error
Runtime error
import os | |
import subprocess | |
import importlib.util | |
import gradio as gr | |
import logging | |
#from moviepy.editor import VideoFileClip | |
import torch | |
#import spaces | |
ACCESS_KEY = os.getenv("ACCESS_KEY") | |
''' | |
torch.use_deterministic_algorithms(True) | |
torch.backends.cudnn.deterministic = True | |
torch.backends.cudnn.benchmark = False | |
torch.backends.cuda.matmul.allow_tf32 = False | |
torch.backends.cudnn.allow_tf32 = False | |
def truncate_video(video_file): | |
clip = VideoFileClip(video_file) | |
truncated_clip = clip.subclip(0, min(15, clip.duration)) | |
truncated_video_file = "temp_truncated_video.mp4" | |
truncated_clip.write_videofile(truncated_video_file, codec="libx264", audio_codec="aac") | |
return truncated_video_file | |
''' | |
def clone_repo(): | |
repo_url = "https://github.com/NeeravSood/AllMark-MVP.git" | |
repo_path = "./repository" | |
github_pat = os.getenv("GITHUB_PAT") | |
if not github_pat: | |
raise RuntimeError("GitHub Personal Access Token (GITHUB_PAT) not found in environment variables.") | |
authenticated_repo_url = f"https://{github_pat}@github.com/NeeravSood/AllMark-MVP.git" | |
if os.path.exists(repo_path): | |
print("Repository already cloned.") | |
else: | |
try: | |
subprocess.run( | |
["git", "clone", authenticated_repo_url, repo_path], | |
check=True, | |
text=True, | |
capture_output=True | |
) | |
print("Repository cloned successfully.") | |
except subprocess.CalledProcessError as e: | |
print("Output:", e.stdout) | |
print("Error:", e.stderr) | |
raise RuntimeError(f"Failed to clone repository: {e.stderr}") | |
def import_backend_script(script_name): | |
"""Dynamically import the backend script.""" | |
try: | |
script_path = os.path.join("./repository", script_name) | |
if not os.path.exists(script_path): | |
raise FileNotFoundError(f"Script {script_name} not found in the repository.") | |
spec = importlib.util.spec_from_file_location("backend_module", script_path) | |
backend_module = importlib.util.module_from_spec(spec) | |
spec.loader.exec_module(backend_module) | |
return backend_module | |
except Exception as e: | |
logging.error(f"Error importing backend script: {str(e)}") | |
raise RuntimeError(f"Failed to import backend script: {str(e)}") | |
# Run repository setup and model import | |
clone_repo() | |
backend = import_backend_script("app.py") | |
analyzer = backend.DeepfakeAnalyzer() | |
#@spaces.GPU(duration=3000) | |
def analyze_video(video_file): | |
if ACCESS_KEY is None: | |
logging.error("Access key not set in environment variables.") | |
return {"error": "Server misconfiguration. Access key not set."} | |
try: | |
#truncated_video = truncate_video(video_file) # Truncate the video for faster analysis | |
#results = analyzer.analyze_media(truncated_video) | |
results = analyzer.analyze_media(video_file) | |
# Extract frame count and video probability | |
frame_count = len(results['video_analysis']['frame_results']) | |
video_probability = results['video_analysis']['probability'] | |
combined_assessment = results.get('combined_assessment', "Inconclusive") | |
# Reapply combined_results logic for front-end clarity | |
if frame_count < 300: | |
assessment = "Inconclusive due to lack of sufficient frames" | |
elif frame_count < 500: | |
threshold = 0.4 | |
assessment = "Deepfake" if video_probability >= threshold else "Genuine" | |
else: | |
threshold = 0.5 | |
assessment = "Deepfake" if video_probability >= threshold else "Genuine" | |
# Construct output message | |
message = ( | |
f"According to our analysis, the video you uploaded appears to be {assessment}. " | |
f"{frame_count} frames were analyzed in total." | |
) | |
# Return the final output | |
output = { | |
"message": message, | |
"details": { | |
"video_probability": video_probability, | |
"combined_assessment": assessment # Final assessment based on reapplied logic | |
} | |
} | |
return output | |
except Exception as e: | |
logging.error(f"Error during analysis: {e}") | |
return {"error": "An error occurred during video analysis. Please check your input and try again."} | |
interface = gr.Interface( | |
fn=analyze_video, | |
inputs=gr.Video(label="Upload Video"), | |
outputs="json", | |
title="AllMark - Deepfake and AI Video Analyzer", | |
description="Upload a MP4 video for analysis. Multiple model iterations are being tested, so results are slow and may vary for the same video. Our Models 2 and 3 shall be available on AWS." | |
) | |
if __name__ == "__main__": | |
interface.launch() | |