shopsmart / app.py
Spanicin's picture
Update app.py
537d818 verified
raw
history blame
22.3 kB
from flask import Flask, request, jsonify, send_from_directory
import torch
import shutil
import os
import sys
from src.utils.preprocess import CropAndExtract
from src.test_audio2coeff import Audio2Coeff
from src.facerender.animate import AnimateFromCoeff
from src.generate_batch import get_data
from src.generate_facerender_batch import get_facerender_data
import tempfile
from openai import OpenAI
from elevenlabs import set_api_key, generate, play, clone, Voice, VoiceSettings
from flask_cors import CORS, cross_origin
# from flask_swagger_ui import get_swaggerui_blueprint
import uuid
import time
from PIL import Image
import moviepy.editor as mp
import requests
import json
import pickle
import re
# from videoretalking import inference_function
# import base64
# import gfpgan_enhancer
# import threading
# import elevenlabs
# from argparse import Namespace
# from argparse import ArgumentParser
# from time import strftime
# from src.utils.init_path import init_path
class AnimationConfig:
def __init__(self, driven_audio_path, source_image_path, result_folder,pose_style,expression_scale,enhancer,still,preprocess,ref_pose_video_path, image_hardcoded):
self.driven_audio = driven_audio_path
self.source_image = source_image_path
self.ref_eyeblink = None
self.ref_pose = ref_pose_video_path
self.checkpoint_dir = './checkpoints'
self.result_dir = result_folder
self.pose_style = pose_style
self.batch_size = 8
self.expression_scale = expression_scale
self.input_yaw = None
self.input_pitch = None
self.input_roll = None
self.enhancer = enhancer
self.background_enhancer = None
self.cpu = False
self.face3dvis = False
self.still = still
self.preprocess = preprocess
self.verbose = False
self.old_version = False
self.net_recon = 'resnet50'
self.init_path = None
self.use_last_fc = False
self.bfm_folder = './checkpoints/BFM_Fitting/'
self.bfm_model = 'BFM_model_front.mat'
self.focal = 1015.
self.center = 112.
self.camera_d = 10.
self.z_near = 5.
self.z_far = 15.
self.device = 'cuda'
self.image_hardcoded = image_hardcoded
app = Flask(__name__)
CORS(app)
TEMP_DIR = None
start_time = None
VIDEO_DIRECTORY = None
args = None
unique_id = None
app.config['temp_response'] = None
app.config['generation_thread'] = None
app.config['text_prompt'] = None
app.config['final_video_path'] = None
app.config['final_video_duration'] = None
# Global paths
dir_path = os.path.dirname(os.path.realpath(__file__))
current_root_path = dir_path
path_of_lm_croper = os.path.join(current_root_path, 'checkpoints', 'shape_predictor_68_face_landmarks.dat')
path_of_net_recon_model = os.path.join(current_root_path, 'checkpoints', 'epoch_20.pth')
dir_of_BFM_fitting = os.path.join(current_root_path, 'checkpoints', 'BFM_Fitting')
wav2lip_checkpoint = os.path.join(current_root_path, 'checkpoints', 'wav2lip.pth')
audio2pose_checkpoint = os.path.join(current_root_path, 'checkpoints', 'auido2pose_00140-model.pth')
audio2pose_yaml_path = os.path.join(current_root_path, 'src', 'config', 'auido2pose.yaml')
audio2exp_checkpoint = os.path.join(current_root_path, 'checkpoints', 'auido2exp_00300-model.pth')
audio2exp_yaml_path = os.path.join(current_root_path, 'src', 'config', 'auido2exp.yaml')
free_view_checkpoint = os.path.join(current_root_path, 'checkpoints', 'facevid2vid_00189-model.pth.tar')
# Function for running the actual task (using preprocessed data)
def process_chunk(audio_chunk, preprocessed_data, args):
print("Entered Process Chunk Function")
global audio2pose_checkpoint, audio2pose_yaml_path, audio2exp_checkpoint, audio2exp_yaml_path, wav2lip_checkpoint
global free_view_checkpoint
if args.preprocess == 'full':
mapping_checkpoint = os.path.join(current_root_path, 'checkpoints', 'mapping_00109-model.pth.tar')
facerender_yaml_path = os.path.join(current_root_path, 'src', 'config', 'facerender_still.yaml')
else:
mapping_checkpoint = os.path.join(current_root_path, 'checkpoints', 'mapping_00229-model.pth.tar')
facerender_yaml_path = os.path.join(current_root_path, 'src', 'config', 'facerender.yaml')
first_coeff_path = preprocessed_data["first_coeff_path"]
crop_pic_path = preprocessed_data["crop_pic_path"]
crop_info_path = "/home/user/app/preprocess_data/crop_info.json"
with open(crop_info_path , "rb") as f:
crop_info = json.load(f)
print(f"Loaded existing preprocessed data")
print("first_coeff_path",first_coeff_path)
print("crop_pic_path",crop_pic_path)
print("crop_info",crop_info)
torch.cuda.empty_cache()
batch = get_data(first_coeff_path, audio_chunk, args.device, ref_eyeblink_coeff_path=None, still=args.still)
audio_to_coeff = Audio2Coeff(audio2pose_checkpoint, audio2pose_yaml_path,
audio2exp_checkpoint, audio2exp_yaml_path,
wav2lip_checkpoint, args.device)
coeff_path = audio_to_coeff.generate(batch, args.result_dir, args.pose_style, ref_pose_coeff_path=None)
# Further processing with animate_from_coeff using the coeff_path
animate_from_coeff = AnimateFromCoeff(free_view_checkpoint, mapping_checkpoint,
facerender_yaml_path, args.device)
torch.cuda.empty_cache()
data = get_facerender_data(coeff_path, crop_pic_path, first_coeff_path, audio_chunk,
args.batch_size, args.input_yaw, args.input_pitch, args.input_roll,
expression_scale=args.expression_scale, still_mode=args.still, preprocess=args.preprocess)
torch.cuda.empty_cache()
print("Will Enter Animation")
result, base64_video, temp_file_path, _ = animate_from_coeff.generate(data, args.result_dir, args.source_image, crop_info,
enhancer=args.enhancer, background_enhancer=args.background_enhancer, preprocess=args.preprocess)
# video_clip = mp.VideoFileClip(temp_file_path)
# duration = video_clip.duration
app.config['temp_response'] = base64_video
app.config['final_video_path'] = temp_file_path
# app.config['final_video_duration'] = duration
torch.cuda.empty_cache()
return base64_video, temp_file_path
def create_temp_dir():
return tempfile.TemporaryDirectory()
def save_uploaded_file(file, filename,TEMP_DIR):
unique_filename = str(uuid.uuid4()) + "_" + filename
file_path = os.path.join(TEMP_DIR.name, unique_filename)
file.save(file_path)
return file_path
client = OpenAI(api_key="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")
def openai_chat_avatar(text_prompt):
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Summarize the following paragraph into a complete and accurate single sentence with no more than 15 words. The summary should capture the gist of the paragraph and make sense."},
{"role": "user", "content": f"Please summarize the following paragraph into one sentence with 15 words or fewer, ensuring it makes sense and captures the gist: {text_prompt}"},
],
max_tokens = len(text_prompt), # Limit the response to a reasonable length for a summary
)
return response
def ryzedb_chat_avatar(question, app_id):
url = "https://inference.dev.ryzeai.ai/chat/stream"
# question = question + ". Summarize the answer in one line."
# print("question",question)
payload = json.dumps({
"input": {
"chat_history": [],
"app_id": app_id,
"question": question
},
"config": {}
})
headers = {
'Content-Type': 'application/json'
}
try:
# Send the POST request
response = requests.request("POST", url, headers=headers, data=payload)
# Check for successful request
response.raise_for_status()
# Return the response JSON
return response.text
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}")
return None
def custom_cleanup(temp_dir, exclude_dir):
# Iterate over the files and directories in TEMP_DIR
for filename in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, filename)
# Skip the directory we want to exclude
if file_path != exclude_dir:
try:
if os.path.isdir(file_path):
shutil.rmtree(file_path)
else:
os.remove(file_path)
print(f"Deleted: {file_path}")
except Exception as e:
print(f"Failed to delete {file_path}. Reason: {e}")
def generate_audio(voice_cloning, voice_gender, text_prompt):
print("generate_audio")
if voice_cloning == 'no':
if voice_gender == 'male':
voice = 'echo'
print('Entering Audio creation using elevenlabs')
set_api_key('92e149985ea2732b4359c74346c3daee')
audio = generate(text = text_prompt, voice = "Daniel", model = "eleven_monolingual_v1",stream=True, latency=4)
with tempfile.NamedTemporaryFile(suffix=".mp3", prefix="text_to_speech_",dir=TEMP_DIR.name, delete=False) as temp_file:
for chunk in audio:
temp_file.write(chunk)
driven_audio_path = temp_file.name
print('driven_audio_path',driven_audio_path)
print('Audio file saved using elevenlabs')
else:
voice = 'nova'
print('Entering Audio creation using whisper')
response = client.audio.speech.create(model="tts-1-hd",
voice=voice,
input = text_prompt)
print('Audio created using whisper')
with tempfile.NamedTemporaryFile(suffix=".wav", prefix="text_to_speech_",dir=TEMP_DIR.name, delete=False) as temp_file:
driven_audio_path = temp_file.name
response.write_to_file(driven_audio_path)
print('Audio file saved using whisper')
elif voice_cloning == 'yes':
set_api_key('92e149985ea2732b4359c74346c3daee')
# voice = clone(name = "User Cloned Voice",
# files = [user_voice_path] )
voice = Voice(voice_id="CEii8R8RxmB0zhAiloZg",name="Marc",settings=VoiceSettings(
stability=0.71, similarity_boost=0.5, style=0.0, use_speaker_boost=True),)
audio = generate(text = text_prompt, voice = voice, model = "eleven_monolingual_v1",stream=True, latency=4)
with tempfile.NamedTemporaryFile(suffix=".mp3", prefix="cloned_audio_",dir=TEMP_DIR.name, delete=False) as temp_file:
for chunk in audio:
temp_file.write(chunk)
driven_audio_path = temp_file.name
print('driven_audio_path',driven_audio_path)
# audio_duration = get_audio_duration(driven_audio_path)
# print('Total Audio Duration in seconds',audio_duration)
return driven_audio_path
def run_preprocessing(args):
global path_of_lm_croper, path_of_net_recon_model, dir_of_BFM_fitting
first_frame_dir = os.path.join(args.result_dir, 'first_frame_dir')
os.makedirs(first_frame_dir, exist_ok=True)
fixed_temp_dir = "/home/user/app/preprocess_data/"
os.makedirs(fixed_temp_dir, exist_ok=True)
preprocessed_data_path = os.path.join(fixed_temp_dir, "preprocessed_data.pkl")
if os.path.exists(preprocessed_data_path) and args.image_hardcoded == "yes":
print("Loading preprocessed data...")
with open(preprocessed_data_path, "rb") as f:
preprocessed_data = pickle.load(f)
print("Loaded existing preprocessed data from:", preprocessed_data_path)
return preprocessed_data
# def remove_brackets(text):
# # Use regex to remove content in brackets at the end of the text
# cleaned_text = re.sub(r'\s*\[.*?\]\s*$', '', text)
# return cleaned_text.strip()
def extract_content(data):
pattern = r'"content":"((?:\\.|[^"\\])*)"'
match = re.search(pattern, data)
if match:
return match.group(1)
else:
return None
@app.route("/run", methods=['POST'])
def generate_video():
global start_time, VIDEO_DIRECTORY
start_time = time.time()
global TEMP_DIR
TEMP_DIR = create_temp_dir()
print('request:',request.method)
try:
if request.method == 'POST':
# source_image = request.files['source_image']
image_path = '/home/user/app/images/shared image (3).png'
source_image = Image.open(image_path)
text_prompt = request.form['text_prompt']
print('Input text prompt: ',text_prompt)
text_prompt = text_prompt.strip()
if not text_prompt:
return jsonify({'error': 'Input text prompt cannot be blank'}), 400
voice_cloning = request.form.get('voice_cloning', 'no')
image_hardcoded = request.form.get('image_hardcoded', 'yes')
chat_model_used = request.form.get('chat_model_used', 'ryzedb')
target_language = request.form.get('target_language', 'original_text')
print('target_language',target_language)
pose_style = int(request.form.get('pose_style', 1))
expression_scale = float(request.form.get('expression_scale', 1))
enhancer = request.form.get('enhancer', None)
voice_gender = request.form.get('voice_gender', 'male')
still_str = request.form.get('still', 'False')
still = still_str.lower() == 'false'
print('still', still)
preprocess = request.form.get('preprocess', 'crop')
print('preprocess selected: ',preprocess)
ref_pose_video = request.files.get('ref_pose', None)
app_id = request.form['app_id']
if not app_id:
return jsonify({'error': 'App ID cannot be blank'}), 400
if chat_model_used == 'ryzedb':
start_time_ryze = time.time()
response = ryzedb_chat_avatar(text_prompt, app_id)
text_prompt = extract_content(response)
text_prompt = text_prompt.replace('\n', ' ').replace('\\n', ' ').strip()
if "No information available" in text_prompt:
text_prompt = re.sub(r'\\+', '', text_prompt)
response = openai_chat_avatar(text_prompt)
text_prompt = response.choices[0].message.content.strip()
app.config['text_prompt'] = text_prompt
print('Final output text prompt using ryzedb: ',text_prompt)
# events = response.split('\r\n\r\n')
# content = None
# for event in events:
# # Split each event block by "\r\n" to get the lines
# lines = event.split('\r\n')
# if len(lines) > 1 and lines[0] == 'event: data':
# # Extract the JSON part from the second line and parse it
# json_data = lines[1].replace('data: ', '')
# try:
# data = json.loads(json_data)
# text_prompt = data.get('content')
# app.config['text_prompt'] = text_prompt
# end_time_ryze = time.time()
# diff = end_time_ryze - start_time_ryze
# print('Final output text prompt using ryzedb: ',text_prompt)
# print('Time to get response from ryzedb: ',diff)
# break # Exit the loop once content is found
# except json.JSONDecodeError:
# continue
elif chat_model_used == 'self':
text_prompt = text_prompt.strip()
else:
print("No Ryze database found")
source_image_path = save_uploaded_file(source_image, 'source_image.png',TEMP_DIR)
print(source_image_path)
driven_audio_path = generate_audio(voice_cloning, voice_gender, text_prompt)
save_dir = tempfile.mkdtemp(dir=TEMP_DIR.name)
result_folder = os.path.join(save_dir, "results")
os.makedirs(result_folder, exist_ok=True)
ref_pose_video_path = None
if ref_pose_video:
with tempfile.NamedTemporaryFile(suffix=".mp4", prefix="ref_pose_",dir=TEMP_DIR.name, delete=False) as temp_file:
ref_pose_video_path = temp_file.name
ref_pose_video.save(ref_pose_video_path)
print('ref_pose_video_path',ref_pose_video_path)
except Exception as e:
app.logger.error(f"An error occurred: {e}")
return "An error occurred", 500
args = AnimationConfig(driven_audio_path=driven_audio_path, source_image_path=source_image_path, result_folder=result_folder, pose_style=pose_style, expression_scale=expression_scale,enhancer=enhancer,still=still,preprocess=preprocess,ref_pose_video_path=ref_pose_video_path, image_hardcoded=image_hardcoded)
if torch.cuda.is_available() and not args.cpu:
args.device = "cuda"
else:
args.device = "cpu"
# generation_thread = threading.Thread(target=main, args=(args,))
# app.config['generation_thread'] = generation_thread
# generation_thread.start()
# response_data = {"message": "Video generation started",
# "process_id": generation_thread.ident}
try:
preprocessed_data = run_preprocessing(args)
base64_video, temp_file_path = process_chunk(driven_audio_path, preprocessed_data, args)
final_video_path = app.config['final_video_path']
print('final_video_path',final_video_path)
if temp_file_path and temp_file_path.endswith('.mp4'):
filename = os.path.basename(temp_file_path)
os.makedirs('videos', exist_ok=True)
VIDEO_DIRECTORY = os.path.abspath('videos')
print("VIDEO_DIRECTORY: ",VIDEO_DIRECTORY)
destination_path = os.path.join(VIDEO_DIRECTORY, filename)
shutil.copy(temp_file_path, destination_path)
video_url = f"/videos/{filename}"
if final_video_path and os.path.exists(final_video_path):
os.remove(final_video_path)
print("Deleted video file:", final_video_path)
preprocess_dir = os.path.join("/tmp", "preprocess_data")
custom_cleanup(TEMP_DIR.name, preprocess_dir)
print("Temporary files cleaned up, but preprocess_data is retained.")
end_time = time.time()
time_taken = end_time - start_time
print(f"Time taken for endpoint: {time_taken:.2f} seconds")
return jsonify({
"message": "Video processed and saved successfully.",
"video_url": video_url,
"text_prompt": text_prompt,
"time_taken": time_taken,
"status": "success"
})
else:
return jsonify({
"message": "Failed to process the video.",
"status": "error"
}), 500
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route("/videos/<string:filename>", methods=['GET'])
def serve_video(filename):
global VIDEO_DIRECTORY
return send_from_directory(VIDEO_DIRECTORY, filename, as_attachment=False)
# @app.route("/status", methods=["GET"])
# def check_generation_status():
# global TEMP_DIR
# global start_time
# response = {"base64_video": "","text_prompt":"", "status": ""}
# process_id = request.args.get('process_id', None)
# # process_id is required to check the status for that specific process
# if process_id:
# generation_thread = app.config.get('generation_thread')
# if generation_thread and generation_thread.ident == int(process_id) and generation_thread.is_alive():
# return jsonify({"status": "in_progress"}), 200
# elif app.config.get('temp_response'):
# # app.config['temp_response']['status'] = 'completed'
# final_response = app.config['temp_response']
# response["base64_video"] = final_response
# response["text_prompt"] = app.config.get('text_prompt')
# response["duration"] = app.config.get('final_video_duration')
# response["status"] = "completed"
# final_video_path = app.config['final_video_path']
# print('final_video_path',final_video_path)
# if final_video_path and os.path.exists(final_video_path):
# os.remove(final_video_path)
# print("Deleted video file:", final_video_path)
# # TEMP_DIR.cleanup()
# preprocess_dir = os.path.join("/tmp", "preprocess_data")
# custom_cleanup(TEMP_DIR.name, preprocess_dir)
# print("Temporary files cleaned up, but preprocess_data is retained.")
# end_time = time.time()
# total_time = round(end_time - start_time, 2)
# print("Total time taken for execution:", total_time, " seconds")
# response["time_taken"] = total_time
# return jsonify(response)
# return jsonify({"error":"No process id provided"})
@app.route("/health", methods=["GET"])
def health_status():
response = {"online": "true"}
return jsonify(response)
if __name__ == '__main__':
app.run(debug=True)