Spaces:
Running
Running
""" | |
utils.py - Utility functions for the project. | |
""" | |
import logging | |
import re | |
import subprocess | |
from collections import defaultdict, deque | |
from datetime import datetime | |
from itertools import combinations, islice | |
from pathlib import Path | |
from typing import List | |
logging.basicConfig( | |
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", | |
level=logging.INFO, | |
) | |
import torch | |
from natsort import natsorted | |
from rapidfuzz import fuzz | |
# Define stopwords | |
STOPWORDS = set( | |
"a about above after again against all am an and any are aren't as at be because been before being below between both but by can't cannot could couldn't did didn't do does doesn't doing don't down during each few for from further had hadn't has hasn't have haven't having he he'd he'll he's her here here's hers herself him himself his how how's i i'd i'll i'm i've if in into is isn't it it's its itself let's me more most mustn't my myself no nor not of off on once only or other ought our ours ourselves out over own same shan't she she'd she'll she's should shouldn't so some such than that that's the their theirs them themselves then there there's these they they'd they'll they're they've this those through to too under until up very was wasn't we we'd we'll we're we've were weren't what what's when when's where where's which while who who's whom why why's with won't would wouldn't you you'd you'll you're you've your yours yourself yourselves".split() | |
) | |
def validate_pytorch2(torch_version: str = None): | |
torch_version = torch.__version__ if torch_version is None else torch_version | |
pattern = r"^2\.\d+(\.\d+)*" | |
return True if re.match(pattern, torch_version) else False | |
def get_timestamp(detailed=False) -> str: | |
""" | |
get_timestamp - get a timestamp for the current time | |
Returns: | |
str, the timestamp | |
""" | |
return ( | |
datetime.now().strftime("%b%d%Y_%H%M%S%f") | |
if detailed | |
else datetime.now().strftime("%b%d%Y_%H") | |
) | |
def truncate_word_count(text, max_words=512): | |
""" | |
truncate_word_count - a helper function for the gradio module | |
Parameters | |
---------- | |
text : str, required, the text to be processed | |
max_words : int, optional, the maximum number of words, default=512 | |
Returns | |
------- | |
dict, the text and whether it was truncated | |
""" | |
# split on whitespace with regex | |
words = re.split(r"\s+", text) | |
processed = {} | |
if len(words) > max_words: | |
processed["was_truncated"] = True | |
processed["truncated_text"] = " ".join(words[:max_words]) | |
else: | |
processed["was_truncated"] = False | |
processed["truncated_text"] = text | |
return processed | |
def load_examples(src, filetypes=[".txt", ".pdf"]): | |
""" | |
load_examples - a helper function for the gradio module to load examples | |
Returns: | |
list of str, the examples | |
""" | |
src = Path(src) | |
src.mkdir(exist_ok=True) | |
pdf_url = ( | |
"https://www.dropbox.com/s/y92xy7o5qb88yij/all_you_need_is_attention.pdf?dl=1" | |
) | |
subprocess.run(["wget", pdf_url, "-O", src / "all_you_need_is_attention.pdf"]) | |
examples = [f for f in src.iterdir() if f.suffix in filetypes] | |
examples = natsorted(examples) | |
# load the examples into a list | |
text_examples = [] | |
for example in examples: | |
with open(example, "r") as f: | |
text = f.read() | |
text_examples.append([text, "base", 2, 1024, 0.7, 3.5, 3]) | |
return text_examples | |
def load_example_filenames(example_path: str or Path): | |
""" | |
load_example_filenames - a helper function for the gradio module to load examples | |
Returns: | |
dict, the examples (filename:full path) | |
""" | |
example_path = Path(example_path) | |
# load the examples into a list | |
examples = {f.name: f for f in example_path.glob("*.txt")} | |
return examples | |
def textlist2html(text_batches): | |
# Step 1: Generate each summary batch as a string of HTML | |
formatted_batches = [ | |
f""" | |
<div style=" | |
margin-bottom: 20px; | |
font-size: 18px; | |
line-height: 1.5em; | |
color: #333; | |
"> | |
<h2 style="font-size: 22px; color: #555;">Batch {i}:</h2> | |
<p style="white-space: pre-line;">{s}</p> | |
</div> | |
""" | |
for i, s in enumerate(text_batches, start=1) | |
] | |
# Step 2: Join all the summary batches together into one string | |
joined_batches = "".join(formatted_batches) | |
# Step 3: Wrap the summary string in a larger div with background color, border, and padding | |
text_html_block = f""" | |
<div style=" | |
background-color: #f9f9f9; | |
border: 1px solid #ddd; | |
border-radius: 5px; | |
padding: 20px; | |
"> | |
{joined_batches} | |
</div> | |
""" | |
return text_html_block | |
def extract_keywords( | |
text: str, num_keywords: int = 3, window_size: int = 5, kw_max_len: int = 20 | |
) -> List[str]: | |
""" | |
Extracts keywords from a text using a simplified TextRank algorithm. | |
Args: | |
text: The text to extract keywords from. | |
num_keywords: The number of keywords to extract. Default: 3 | |
window_size: The number of words considered for co-occurrence. Default: 5 | |
kw_max_len: The maximum length of a keyword (truncate longer keywords to max). Default: 20 | |
Returns: | |
A list of strings, where each string is a keyword extracted from the input text. | |
""" | |
logger = logging.getLogger(__name__) | |
# Remove stopwords and tokenize the text into words | |
words = [ | |
word | |
for word in re.findall(r"\b\w{3,}\b", text.lower()) | |
if word not in STOPWORDS | |
] | |
# Create a graph of word co-occurrences within a moving window of words | |
cooccur = defaultdict(lambda: defaultdict(int)) | |
deque_words = deque(maxlen=window_size) | |
for word in words: | |
for w1, w2 in combinations(deque_words, 2): | |
cooccur[w1][w2] += 1 | |
cooccur[w2][w1] += 1 | |
deque_words.append(word) | |
# Assign scores to words using a simplified TextRank algorithm | |
scores = defaultdict(float) | |
for _ in range(10): | |
new_scores = defaultdict(float) | |
for word, co_words in cooccur.items(): | |
new_scores[word] = 0.15 + 0.85 * sum( | |
cooccur[word][other] / sum(cooccur[other].values()) * scores[other] | |
for other in co_words | |
) | |
scores = new_scores | |
# Sort the words by score and return the top num_keywords keywords | |
keywords = sorted(scores, key=scores.get, reverse=True)[:num_keywords] | |
logger.debug(f"All keywords: {keywords}") | |
# Use fuzzy matching to remove similar keywords | |
final_keywords = [] | |
for keyword in keywords: | |
if not any(fuzz.ratio(keyword, other) > 70 for other in final_keywords): | |
final_keywords.append(keyword[:kw_max_len]) | |
logger.debug(f"Keywords (max len. {kw_max_len}):\t{final_keywords}") | |
return final_keywords | |
def saves_summary( | |
summarize_output, outpath: str or Path = None, add_signature=True, **kwargs | |
): | |
""" | |
saves_summary - save the summary generated from summarize_via_tokenbatches() to a text file | |
summarize_output: output from summarize_via_tokenbatches() | |
outpath: path to the output file | |
add_signature: whether to add a signature to the output file | |
kwargs: additional keyword arguments to include in the output file | |
""" | |
logger = logging.getLogger(__name__) | |
sum_text = [f"{s['summary'][0]}\n" for s in summarize_output] | |
sum_scores = [f"\n - {round(s['summary_score'],4)}" for s in summarize_output] | |
scores_text = "\n".join(sum_scores) | |
full_summary = "\n".join(sum_text) | |
keywords = "_".join(extract_keywords(full_summary, kw_max_len=4)) | |
logger.debug(f"kw:\t{keywords}") | |
outpath = ( | |
Path.cwd() / f"DocSumm_{keywords}_{get_timestamp()}.txt" | |
if outpath is None | |
else Path(outpath) | |
) | |
logger.info(f"Saving summary to:\t{outpath.name}") | |
with open( | |
outpath, | |
"w", | |
encoding="utf-8", | |
) as fo: | |
fo.writelines(full_summary) | |
fo.write("\n\n") | |
if add_signature: | |
fo.write("\n\n---\n\n") | |
fo.write("Generated with the Document Summarization space :)\n\n") | |
fo.write("https://hf.co/spaces/pszemraj/document-summarization\n\n") | |
with open( | |
outpath, | |
"a", | |
encoding="utf-8", | |
) as fo: | |
fo.write("\n") | |
fo.write(f"## Section Scores:\n\n") | |
fo.writelines(scores_text) | |
fo.write("\n\n") | |
fo.write(f"Date: {get_timestamp()}\n\n") | |
if kwargs: | |
fo.write("---\n\n") | |
fo.write("## Parameters:\n\n") | |
for key, value in kwargs.items(): | |
fo.write(f"{key}: {value}\n") | |
return outpath | |