Spaces:
Running
Running
import json | |
import os | |
import sys | |
import zlib | |
from typing import Callable, TextIO | |
system_encoding = sys.getdefaultencoding() | |
if system_encoding != "utf-8": | |
def make_safe(string): | |
# replaces any character not representable using the system default encoding with an '?', | |
# avoiding UnicodeEncodeError (https://github.com/openai/whisper/discussions/729). | |
return string.encode(system_encoding, errors="replace").decode(system_encoding) | |
else: | |
def make_safe(string): | |
# utf-8 can encode any Unicode code point, so no need to do the round-trip encoding | |
return string | |
def exact_div(x, y): | |
assert x % y == 0 | |
return x // y | |
def str2bool(string): | |
str2val = {"True": True, "False": False} | |
if string in str2val: | |
return str2val[string] | |
else: | |
raise ValueError(f"Expected one of {set(str2val.keys())}, got {string}") | |
def optional_int(string): | |
return None if string == "None" else int(string) | |
def optional_float(string): | |
return None if string == "None" else float(string) | |
def compression_ratio(text) -> float: | |
text_bytes = text.encode("utf-8") | |
return len(text_bytes) / len(zlib.compress(text_bytes)) | |
def format_timestamp(seconds: float, always_include_hours: bool = False, decimal_marker: str = '.'): | |
assert seconds >= 0, "non-negative timestamp expected" | |
milliseconds = round(seconds * 1000.0) | |
hours = milliseconds // 3_600_000 | |
milliseconds -= hours * 3_600_000 | |
minutes = milliseconds // 60_000 | |
milliseconds -= minutes * 60_000 | |
seconds = milliseconds // 1_000 | |
milliseconds -= seconds * 1_000 | |
hours_marker = f"{hours:02d}:" if always_include_hours or hours > 0 else "" | |
return f"{hours_marker}{minutes:02d}:{seconds:02d}{decimal_marker}{milliseconds:03d}" | |
class ResultWriter: | |
extension: str | |
def __init__(self, output_dir: str): | |
self.output_dir = output_dir | |
def __call__(self, result: dict, audio_path: str): | |
audio_basename = os.path.basename(audio_path) | |
output_path = os.path.join(self.output_dir, audio_basename + "." + self.extension) | |
with open(output_path, "w", encoding="utf-8") as f: | |
self.write_result(result, file=f) | |
def write_result(self, result: dict, file: TextIO): | |
raise NotImplementedError | |
class WriteTXT(ResultWriter): | |
extension: str = "txt" | |
def write_result(self, result: dict, file: TextIO): | |
for segment in result["segments"]: | |
print(segment['text'].strip(), file=file, flush=True) | |
class WriteVTT(ResultWriter): | |
extension: str = "vtt" | |
def write_result(self, result: dict, file: TextIO): | |
print("WEBVTT\n", file=file) | |
for segment in result["segments"]: | |
print( | |
f"{format_timestamp(segment['start'])} --> {format_timestamp(segment['end'])}\n" | |
f"{segment['text'].strip().replace('-->', '->')}\n", | |
file=file, | |
flush=True, | |
) | |
class WriteSRT(ResultWriter): | |
extension: str = "srt" | |
def write_result(self, result: dict, file: TextIO): | |
for i, segment in enumerate(result["segments"], start=1): | |
# write srt lines | |
print( | |
f"{i}\n" | |
f"{format_timestamp(segment['start'], always_include_hours=True, decimal_marker=',')} --> " | |
f"{format_timestamp(segment['end'], always_include_hours=True, decimal_marker=',')}\n" | |
f"{segment['text'].strip().replace('-->', '->')}\n", | |
file=file, | |
flush=True, | |
) | |
class WriteTSV(ResultWriter): | |
""" | |
Write a transcript to a file in TSV (tab-separated values) format containing lines like: | |
<start time in integer milliseconds>\t<end time in integer milliseconds>\t<transcript text> | |
Using integer milliseconds as start and end times means there's no chance of interference from | |
an environment setting a language encoding that causes the decimal in a floating point number | |
to appear as a comma; also is faster and more efficient to parse & store, e.g., in C++. | |
""" | |
extension: str = "tsv" | |
def write_result(self, result: dict, file: TextIO): | |
print("start", "end", "text", sep="\t", file=file) | |
for segment in result["segments"]: | |
print(round(1000 * segment['start']), file=file, end="\t") | |
print(round(1000 * segment['end']), file=file, end="\t") | |
print(segment['text'].strip().replace("\t", " "), file=file, flush=True) | |
class WriteJSON(ResultWriter): | |
extension: str = "json" | |
def write_result(self, result: dict, file: TextIO): | |
json.dump(result, file) | |
def get_writer(output_format: str, output_dir: str) -> Callable[[dict, TextIO], None]: | |
writers = { | |
"txt": WriteTXT, | |
"vtt": WriteVTT, | |
"srt": WriteSRT, | |
"tsv": WriteTSV, | |
"json": WriteJSON, | |
} | |
if output_format == "all": | |
all_writers = [writer(output_dir) for writer in writers.values()] | |
def write_all(result: dict, file: TextIO): | |
for writer in all_writers: | |
writer(result, file) | |
return write_all | |
return writers[output_format](output_dir) | |