moviepy_with_manga_test / 3mergeDialogToVideo.py
rogerxavier's picture
Update 3mergeDialogToVideo.py
6fb4f91 verified
raw
history blame
24.7 kB
# rogerxavier-ocr-with-fastapi.hf.space
import os
##这个模型目前只适合确定文本框顺序后再识别,因为如果后面的
##完整图片处理的反例 现在处理的图片是10\0.jpg
# [[[953, 743], [987, 743], [987, 867], [953, 867]], [[917, 745], [951, 745], [951, 867], [917, 867]], [[881, 741], [918, 742], [915, 898], [877, 897]], [[843, 743], [879, 743], [879, 809], [843, 809]], [[629, 1058], [669, 1058], [669, 1210], [629, 1210]], [[549, 1227], [583, 1227], [583, 1381], [549, 1381]], [[535, 115], [563, 115], [563, 145], [535, 145]], [[535, 147], [563, 147], [563, 213], [535, 213]], [[507, 443], [539, 443], [539, 579], [507, 579]], [[505, 115], [533, 115], [533, 197], [505, 197]], [[511, 1225], [547, 1225], [547, 1321], [511, 1321]], [[475, 117], [503, 117], [503, 265], [475, 265]], [[467, 421], [503, 421], [503, 575], [467, 575]], [[419, 235], [447, 235], [447, 337], [419, 337]], [[387, 236], [417, 237], [414, 339], [385, 338]], [[209, 796], [242, 797], [239, 921], [206, 920]], [[175, 173], [205, 173], [205, 225], [175, 225]], [[177, 231], [205, 231], [205, 285], [177, 285]], [[103, 1153], [129, 1153], [129, 1223], [103, 1223]], [[41, 100], [108, 101], [104, 549], [36, 548]]]
# ['就算是你', '没有圣剑', '也不可能有', '胜算', '就算如此', '我也不觉得', '做', ':做个', '·就不觉得', '老好人', '你可怕', '也要有个限度', '我很恐怖吗', '该说真是', '无药可救', '说的是呢', '这个', '但是', '为何?', '第二话让人怜爱']
import requests
import tempfile
import time
import re #正则对话剔除非中文,保留'\n'
from moviepy.audio.AudioClip import AudioArrayClip
from moviepy.editor import *
import cv2
import numpy as np
import io
import base64
import json
from io import BytesIO
import pandas as pd
from PIL import Image
import os
from mutagen.mp3 import MP3 #读取音频获取时长
azure_speech_key = os.getenv('azure_speech_key')
azure_service_region = os.getenv('azure_service_region')
my_openai_key = os.getenv('my_openai_key')
speech_synthesis_voice_name = "zh-CN-YunhaoNeural" ##云皓
print("azure key是",azure_speech_key)
print("azure_service_region是",azure_service_region)
print("my_openai_key",my_openai_key)
#通过去水印完整漫画图片->获取相应的对话框图片->获取对话框文字->返回对话框文字
def get_image_copywrite(image_path:"图片路径(包含后缀)",dialog_cut_path:"对话框切割路径")->"返回漫画关联对话框识别后得到的文案str(原文即可),也可能是none":
def extract_chinese(text:str)->str:
#剔除除了 '\n'外的非中文字符
chinese_pattern = re.compile("[\u4e00-\u9fa5]+") # 匹配中文字符的正则表达式
chinese_text = ""
for char in text:
if char == '\n' or re.match(chinese_pattern, char):
chinese_text += char
return chinese_text
dialog_texts = ''
associate_dialog_img = get_associate_dialog(image_path=image_path,dialog_cut_path=dialog_cut_path)
if len(associate_dialog_img)!=0:
#如果有对应的对话框
for dialog_img_path in associate_dialog_img:
cur_dialog_texts = get_sorted_dialog_text(dialog_img_path)#一个对话框的文字list
if cur_dialog_texts is not None:
for dialog_text in cur_dialog_texts:
# dialog_texts += dialog_text
dialog_texts += extract_chinese(dialog_text)
#因为已经在数组中加入了\n 换行,这里就不用加了
else:
print(dialog_img_path+"识别是空-可能是有问题")
return dialog_texts
return None#不规范图片不请求,直接返回none
#通过传入无水印漫画图片对话框路径,得到关联的对话框图片list
def get_associate_dialog(image_path:"图片路径(包含后缀)",dialog_cut_path:"对话框切割路径")->"返回漫画关联对话框list,也可能是空的list":
image_name = os.path.splitext(os.path.basename(image_path))[0]
image_name_format = '{:03d}'.format(int(image_name))
associated_dialogs = []
for root, _, files in os.walk(dialog_cut_path):
for file in files:
if file.startswith(image_name_format) and file.endswith('.jpg'):
associated_dialogs.append(os.path.join(root, file))
return associated_dialogs
def merge_sublists(lists):
merged = []
for sublist in lists:
found = False
for m in merged:
if any(elem in sublist for elem in m):
m.extend(elem for elem in sublist if elem not in m)
found = True
break
if not found:
merged.append(sublist)
return merged
# 任意两框进行中心高度差和中心宽度差比较,如果xy都相近,那么认为是同一个框的对话,加入一个对话数组里面,
# 最终将漫画块分成几个对话框数组,然后再对数组间进行从上到下,从右到左排序
# 定义一个函数来寻找相关的点并加入新的list
def find_associate_text(sorted_indices,centers,sorted_coordinates,boxInfo):
associate_text_list = []
related_groups = []
for i in range(len(sorted_indices) - 1):
for j in range(i+1 , len(sorted_indices)):
if (abs(centers[sorted_indices[i]][1] - centers[sorted_indices[j]][1]) < abs(
(sorted_coordinates[i][2][1] - sorted_coordinates[i][0][1])) / 3) \
and (abs(centers[sorted_indices[i]][0] - centers[sorted_indices[j]][0]) < abs(
(sorted_coordinates[i][2][0] - sorted_coordinates[i][0][0])) * 1.5):
# Check if the points i and j are already in the same related group
found = False
for group in related_groups:
if i in group or j in group:
group.add(i)
group.add(j)
found = True
break
if not found:
related_groups.append({i, j})
for group in related_groups:
text_group = []
for idx in group:
text_group.append(boxInfo['Text'][str(sorted_indices[idx])])#这里加入的是排序后的索引
associate_text_list.append(text_group)
return merge_sublists(associate_text_list),related_groups
#先对组内对话从右到左排序,处理反馈到related_groups (因为sorted_indices本身就是从右到左,从上到下排序后的)
# 这个记录的顺序改变,最后sorted_text = [boxInfo['Text'][str(i)] for i in sorted_indices]就可以得到正确的顺序
#要保证一个List中的组内有序和组间有序,通常应该先排序组内,然后再保持组间有序
def sort_associate_text_list(sorted_indices:list,related_groups:list,boxCoordinates,centers)->list:
sorted_groups = []
# 返回组内排序后的 sorted_groups
for group in related_groups:
group = list(group) # 将集合转换为列表
isVertical = False
isCross = False
# 前提是竖框->使用 lambda 函数按照中心点坐标的 x 值对 group 中的元素进行排序,使得x大的(靠右的)在前面
for idx in group:
if (boxCoordinates[sorted_indices[idx]][2][0] - boxCoordinates[sorted_indices[idx]][0][0]) > (
boxCoordinates[sorted_indices[idx]][2][1] - boxCoordinates[sorted_indices[idx]][0][1]):
# 这里是宽>高,说明是横框
isCross =True
pass # 你可以在这里添加你想要执行的代码
else:
# 这里宽<高,说明是竖框
isVertical = True
pass # 你可以在这里添加你想要执行的代码
if isVertical:
group.sort(key=lambda idx: centers[sorted_indices[idx]][0], reverse=True)
if isCross:
group.sort(key=lambda idx: centers[sorted_indices[idx]][1], reverse=False)
sorted_groups.append(group)
return sorted_groups
#再对组间对话先上后下,从右到左排序,同时将单独对话加入合适位置.返回排序后的related_groups
def sort_dialog_list(sorted_indices:list,related_groups:list,sorted_coordinates)->list:
sorted_groups = []
related_groups_copy = related_groups.copy()
sorted_indices_copy = sorted_indices.copy()
added = {}
# 返回组内排序后的 sorted_groups
# 任意两框进行加权高度差值比较,然后交换顺序,而不是只遍历一遍交换,如果y中心点差在1/3 文本框长度下认为相同,这时按照x从右往左顺序看
for i in range(len(sorted_indices) - 1):
if ((
sorted_coordinates[i][2][0] - sorted_coordinates[i][0][0]) < (
sorted_coordinates[i][2][1] - sorted_coordinates[i][0][1])):
# 竖框情况下(宽小于高),依次加入元素,加到在组中的那么后序按组顺序加,然后继续(再碰到不加)
pass #竖框不动,横框剔除,后序不在次循环中储粮
else:
sorted_indices_copy.remove(i)
# 横框情况下(宽大于高)#横框干脆不读了(从sorted_indices_copy中剔除),太影响了
for idx in sorted_indices_copy:
added[idx] = False
for group in related_groups_copy:
if idx in group:
sorted_groups.append(group)
related_groups_copy.remove(group)
added[idx] = True
break
if not added[idx]:
sorted_groups.append(idx)
# 创建一个新列表来存储不应该单独存在的元素,并且游离的元素也变[]包裹
filtered_data = []
data = sorted_groups
for item in data:
if isinstance(item, list):
# 如果元素是列表,则将其添加到新列表中
filtered_data.append(item)
else:
# 如果元素不是列表,则检查是否存在于其他子项数组中,如果不存在则添加到新列表中
is_in_sublist = False
for sublist in data:
if isinstance(sublist, list) and item in sublist:
is_in_sublist = True
break
if not is_in_sublist:
filtered_data.append([item])
return filtered_data
def get_sorted_dialog_text(image_path:"包含后缀的文件路径")->"返回排序后的text list(一列或者几列话,反正是一个框的内容,几句不清楚,一个框的list当一次文案就行) 或者失败请求返回none":
image_bytes = open(image_path, 'rb')
headers = {
'authority': 'rogerxavier-fastapi-t5-magi.hf.space',
'scheme': 'https',
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cookie': 'spaces-jwt=eyJhbGciOiJFZERTQSJ9.eyJyZWFkIjp0cnVlLCJwZXJtaXNzaW9ucyI6eyJyZXBvLmNvbnRlbnQucmVhZCI6dHJ1ZX0sIm9uQmVoYWxmT2YiOnsia2luZCI6InVzZXIiLCJfaWQiOiI2NDJhNTNiNTE2ZDRkODI5M2M5YjdiNzgiLCJ1c2VyIjoicm9nZXJ4YXZpZXIifSwiaWF0IjoxNzE2Njg3MzU3LCJzdWIiOiIvc3BhY2VzL3JvZ2VyeGF2aWVyL29jcl93aXRoX2Zhc3RhcGkiLCJleHAiOjE3MTY3NzM3NTcsImlzcyI6Imh0dHBzOi8vaHVnZ2luZ2ZhY2UuY28ifQ._sGdEgC-ijbIhLmB6iNSBQ_xHNzb4Ydb9mD0L3ByRmJSbB9ccfGbRgtNmkV1JLLldHp_VEKUSQt9Mwq_q4aGAQ',
'Dnt': '1',
'Priority': 'u=1, i',
'Sec-Ch-Ua': '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"',
'Sec-Ch-Ua-Mobile': '?0',
'Sec-Ch-Ua-Platform': '"Windows"',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'
}
files = {
"image": image_bytes,
}
try:
resp = requests.post("https://rogerxavier-ocr-with-fastapi.hf.space/getCoordinates", files=files,headers=headers)#还是有header才能跑
#先json转换,0为坐标list合集,1为 boxid和text合集
boxCoordinates , boxInfo = resp.json()[0],resp.json()[1] #分别是list和dict类型
print("ofa ocr识别漫画块成功返回")
print("boxCoordinates是",boxCoordinates)
print("boxInfo是",boxInfo)
# 计算文本框的中心点,以便按照从右往左,从上往下的顺序进行排序
centers = [((box[0][0] + box[2][0]) / 2, (box[0][1] + box[2][1]) / 2) for box in boxCoordinates]
# 根据每个元组的第一个元素进行降序排序,如果第一个元素相同时,则根据第二个元素进行升序排序。即先关注y后关注x(更重上下)
sorted_indices = sorted(range(len(centers)), key=lambda i: ( centers[i][1],-centers[i][0]))
# # 即先关注x后关注y(更注重从右到左)
# sorted_indices = sorted(range(len(centers)), key=lambda i: ( -centers[i][0],centers[i][1]))
# 获取排序后的文本框坐标和对应的文字
sorted_coordinates = [boxCoordinates[i] for i in sorted_indices]
# 调用函数并打印结果
associate_text_list,related_groups = find_associate_text(sorted_indices,centers,sorted_coordinates,boxInfo)
#print("相关list是",associate_text_list)
#print("related_groups是",related_groups)
#print("sorted_indices是",sorted_indices)
related_groups = sort_associate_text_list(sorted_indices,related_groups,boxCoordinates,centers)
#print("组内排序后的related_groups是",related_groups)
#[[3, 4], [7, 5, 6], [10, 9], [11, 12, 13], [15, 16, 14]]
related_groups_in_sorted_indices = []
for group in related_groups:
related_groups_in_sorted_indices_item = []
for idx in group:
related_groups_in_sorted_indices_item.append(sorted_indices[idx])# 这里加入的是排序后的索引
related_groups_in_sorted_indices.append(related_groups_in_sorted_indices_item)
#print("related_groups_in_sorted_indices是",related_groups_in_sorted_indices)
#related_groups_in_sorted_indices->[[7, 6], [3, 2, 4], [9, 10], [11, 13, 12], [15, 16, 14]]->
#期望结果[[0],[3, 2, 4],[1],[5],[7, 6],[8], [9, 10],[11, 13, 12], [15, 16, 14]]
related_groups = sort_dialog_list(sorted_indices,related_groups,sorted_coordinates)
#print("related_groups组件排序后是:",related_groups)
# 将子列表中的数字提取出来组成一个新的列表(纯数字),去除子项间的[],
# 如[[3, 4], [7, 5, 6], [10, 9], [11, 12, 13], [15, 16, 14]] ->[3, 4, 7, 5, 6, 10, 9, 11, 12, 13, 15, 16, 14]
flattened_list = [num for sublist in related_groups for num in sublist]
added_indices = set()
sorted_text = []
for i in flattened_list:
for sublist in related_groups:
if i in sublist:
if i == sublist[-1] and i not in added_indices:
sorted_text.append(boxInfo['Text'][str(sorted_indices[i])] + '\n')
added_indices.add(i)
elif i not in added_indices:
sorted_text.append(boxInfo['Text'][str(sorted_indices[i])])
added_indices.add(i)
#print("不完整的sorted_text是",sorted_text)
# 不用在最后一个项末尾添加"\n",从而隔开其他的漫画块对话(因为总会有最后一个子块,因而上述方式就可以加上了)
sorted_coordinates = [boxCoordinates[i] for i in sorted_indices]
print(sorted_coordinates)
print(sorted_text)
return sorted_text
except Exception as e:
print("ofa ocr图片请求出现问题")
print(e)
return None
#通过文字获取音频
def get_audio_data(text:str)-> "返回audio data io句柄, duration(也有可能包含无效字符导致生成音频400错误)":
# Creates an instance of a speech config with specified subscription key and service region.
speech_key = azure_speech_key
service_region = azure_service_region
voiceText = text
url = f"https://{service_region}.tts.speech.microsoft.com/cognitiveservices/v1"
headers = {
"Ocp-Apim-Subscription-Key": speech_key,
"Content-Type": "application/ssml+xml",
"X-Microsoft-OutputFormat": "audio-16khz-128kbitrate-mono-mp3",
"User-Agent": "curl"
}
ssml_text = '''
<speak version='1.0' xml:lang='zh-CN'>
<voice xml:lang='zh-CN' xml:gender='male' name='{voiceName}'>
{voiceText}
</voice>
</speak>
'''.format(voiceName=speech_synthesis_voice_name,voiceText = voiceText)
response = requests.post(url, headers=headers, data=ssml_text.encode('utf-8'))
if response.status_code == 200:
# 创建临时文件 -当前路径下面
try:
with tempfile.NamedTemporaryFile(dir='/mp3_out/',delete=False) as temp_file:
temp_file.write(response.content)
temp_file.close()
audio = MP3(temp_file.name)
# 获取音频时长(单位为秒)
audio_duration_seconds = audio.info.length #int即可
# 在这里完成您对文件的操作,比如返回文件名
file_name = temp_file.name
return file_name, audio_duration_seconds
except Exception as e:
print("可能遇到mp3 can not sync to MPEG frame错误,总之音频能获取到但是不能识别",e)
return None,None#这种也返回none告知错误不要管了
else:
print("Error: Failed to synthesize audio. Status code:", response.status_code)
return None,None
# 补零函数,将数字部分补齐为指定长度
def zero_pad(s, length):
return s.zfill(length)
def gpt_polish(text:str)->"通过gpt润色str文案并返回str新文案,或者gpt请求失败none":
# Set your OpenAI API key
api_key = my_openai_key
# Define the headers
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
}
# Chat Completions request data
data = {
'model': 'gpt-3.5-turbo', # Replace with your chosen model
'messages': [
{'role': 'system', 'content': "你是一个assistant,能够根据user发送的漫画中提取的对话文字,生成一个短视频中一帧的文案(1-2句话)"},
{'role': 'user', 'content': text}
]
}
try:
response = requests.post('https://api.yingwu.lol/v1/chat/completions', headers=headers, data=json.dumps(data))
print("gpt请求的结果是",response.text)
print("润色后文案是:"+response.json()['choices'][0]['message']['content'])
return response.json()['choices'][0]['message']['content']
except Exception as e:
print("gpt润色文案失败:")
print(e)
return None
if __name__ == '__main__':
# 获取存放去水印漫画图片的路径 ---放这里是因为获取对话文字时需要和原图关联
img_path = 'manga1'
# 获取切割后的文本框路径
dialog_img_path = 'manga12'
#获取漫画原图无水印的加入image_files,并排序
subdir_path = os.path.join(os.getcwd(), img_path)
# 对话图片经过加入list并补0确定顺序
image_files = []
for root, dirs, files in os.walk(subdir_path):
for file in files:
if file.endswith(".jpg") or file.endswith(".png"):
image_files.append(os.path.relpath(os.path.join(root, file)))
# 对对话框文件名中的数字部分进行补零操作-这样顺序会正常
image_files.sort(
key=lambda x: zero_pad(''.join(filter(str.isdigit, os.path.splitext(os.path.basename(x))[0])), 3))
dialog_subdir_path = os.path.join(os.getcwd(), dialog_img_path)
# 对话图片经过加入list并补0确定顺序
dialog_image_files = []
for root, dirs, files in os.walk(dialog_subdir_path):
for file in files:
if file.endswith(".jpg") or file.endswith(".png"):
dialog_image_files.append(os.path.relpath(os.path.join(root, file)))
# 对对话框文件名中的数字部分进行补零操作-这样顺序会正常
dialog_image_files.sort(
key=lambda x: zero_pad(''.join(filter(str.isdigit, os.path.splitext(os.path.basename(x))[0])), 3))
# 对话图片经过加入list并补0确定顺序
###音视频相关参数-------------------------------------------------------------------------------------
##这个是临时生成音频文件的全局变量--方便后续删除
filename = ''
# 视频分辨率和帧率
# 获取第一张图片的尺寸
image = Image.open(image_files[0])
# width, height = 1125, 1600 #
width, height = image.size #使用图片的size作为宽高
#读取第一个图片作为cover保存到cover/0.jpg
# 定义要保存的文件路径
save_path = os.path.join("cover", "0.jpg")
# 保存图片文件
image.save(save_path)
#读取第一个图片作为cover保存到cover/0.jpg
fps = 30
font_path = '1.ttf' # 设置字体以防默认字体无法同时处理中英文
# 创建视频编辑器
video_clips = []
###音视频相关参数-------------------------------------------------------------------------------------
#因为是根据原图无水印的进行遍历,所以处理前要进行筛选,只处理能找到相应对话框图片的原图
filtered_image_files = []
for image_path in image_files:
dialog_list = get_associate_dialog(image_path, dialog_img_path)
if dialog_list:
filtered_image_files.append(image_path)
image_files = filtered_image_files
for idx, image_file in enumerate(image_files):
print("现在处理的图片是"+image_file)
#后面是视音频生成部分-这里图片需要用到完整的去水印的而不是对话框用于识别的
img = cv2.imread(image_file)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) ##只支持英文路径
##获取当前图片对应的对话框识别文字(还需gpt处理后作为字幕文案)
cur_copywrite = get_image_copywrite(image_file,dialog_img_path) # image_file就是6.jpg了
#cur_copywrite = gpt_polish(cur_copywrite)#不用gpt,只用新版漫画块得到的100%识别原文即可
if cur_copywrite is not None:
##获取当前图片对应的临时音频文件名称和文案时长
# filename, duration = get_audio_data(cur_copywrite)
filename, duration = get_audio_data(cur_copywrite)#这里是一个原图的全部文案,不是一个漫画块的,不能在这里加\n断开不同漫画块的对话
if filename is not None:
print("存放临时mp3文件的路径是",filename)
#含字幕版
# clip = ImageClip(img).set_duration(duration).resize((width, height)) # 初始clip
# txt_clip = TextClip(cur_copywrite, fontsize=40, color='white', bg_color='black',
# font=font_path) ##文本clip后加入视频
# txt_clip = txt_clip.set_pos(('center', 'bottom')).set_duration(duration)
# # 创建音频剪辑
# audio_clip = AudioFileClip(filename)
# clip = clip.set_audio(audio_clip) # 将音频与视频片段关联
# clip = CompositeVideoClip([clip, txt_clip])
# video_clips.append(clip)
#含字幕版
#不含字幕版
clip = ImageClip(img).set_duration(duration).resize((width, height))
# 去掉添加字幕的部分(原文太长了,再加上音频都是一整个原图(即多个漫画块)的全部内容,也没法分割)
audio_clip = AudioFileClip(filename)
clip = clip.set_audio(audio_clip)
video_clips.append(clip)
#不含字幕版
else:
pass ##音频特殊字符或者其他原因无法生成跳过
video = concatenate_videoclips(video_clips)
# 保存视频
video.write_videofile('mp4_out/output_video.mp4', fps=fps,temp_audiofile="mp3_out/temp.mp3")
# # 在文件关闭后删除临时文件
print("删除临时mp3文件", filename)
os.remove(filename)