pszemraj's picture
✨ add and update utils
2d980d5
raw
history blame
13 kB
"""
utils.py - Utility functions for the project.
"""
import logging
import os
import re
import subprocess
from collections import defaultdict, deque
from datetime import datetime, timedelta
from itertools import combinations, islice
from pathlib import Path
from typing import List
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
level=logging.INFO,
)
import torch
from natsort import natsorted
from rapidfuzz import fuzz
# Define stopwords
STOPWORDS = set(
"a about above after again against all am an and any are aren't as at be because been before being below between both but by can't cannot could couldn't did didn't do does doesn't doing don't down during each few for from further had hadn't has hasn't have haven't having he he'd he'll he's her here here's hers herself him himself his how how's i i'd i'll i'm i've if in into is isn't it it's its itself let's me more most mustn't my myself no nor not of off on once only or other ought our ours ourselves out over own same shan't she she'd she'll she's should shouldn't so some such than that that's the their theirs them themselves then there there's these they they'd they'll they're they've this those through to too under until up very was wasn't we we'd we'll we're we've were weren't what what's when when's where where's which while who who's whom why why's with won't would wouldn't you you'd you'll you're you've your yours yourself yourselves".split()
)
def remove_stagnant_files(
freq: str = "hourly",
search_path: str = ".",
substring="DocSumm",
remove_suffix=".txt",
):
"""
remove_stagnant_files - Remove files that have not been modified in a certain amount of time.
:param str freq: frequency of file removal, defaults to "hourly"
:param str search_path: location to search for files, defaults to "."
:param str substring: substring to search for in file names, defaults to "DocSumm"
:param str remove_suffix: suffix of files to remove, defaults to ".txt"
:raises ValueError: if freq is not one of "hourly", "daily", or "weekly"
"""
current_time = datetime.now()
search_path = Path(search_path)
if freq == "hourly":
time_threshold = current_time - timedelta(hours=1)
elif freq == "daily":
time_threshold = current_time - timedelta(days=1)
elif freq == "weekly":
time_threshold = current_time - timedelta(weeks=1)
else:
raise ValueError(
"Invalid frequency. Supported values are 'hourly', 'daily', and 'weekly'."
)
files_to_remove = []
potential_files = [
f for f in search_path.iterdir() if f.is_file() and f.suffix == remove_suffix
]
logging.info(f"Found {len(potential_files)} files.")
for candidate in potential_files:
if (
candidate.is_file()
and substring in candidate.name
and candidate.stat().st_mtime < time_threshold.timestamp()
):
files_to_remove.append(candidate)
logging.debug(f"File {candidate} last modified at {candidate.stat().st_mtime}")
logging.info(f"Removing {len(files_to_remove)} files.")
for file_path in files_to_remove:
file_path.unlink()
logging.debug(f"Removed files: {files_to_remove}")
def compare_model_size(model_name: str, threshold: int = 500) -> bool:
"""
compare_model_size - compare string representations of model size to a threshold
:param str model_name: the model name to compare
:param int threshold: the threshold to compare against in millions, defaults to 500
:return: True if the model size is greater than the threshold, False or None otherwise
"""
pattern = r"(\d+)(M|G|k|b)?" # param regex
matches = re.findall(pattern, model_name)
if not matches:
return None
# Extract the parameter count and unit from the last match
parameter_count, unit = matches[-1]
parameter_count = int(parameter_count) # Convert to an integer
# Convert to the standard form (M for million, G for billion, k for thousand)
if unit == "G" or unit == "b":
parameter_count *= 1000
elif unit == "M":
pass
elif unit == "k":
parameter_count /= 1000
else:
return None # Unknown
return parameter_count > threshold
def validate_pytorch2(torch_version: str = None):
torch_version = torch.__version__ if torch_version is None else torch_version
pattern = r"^2\.\d+(\.\d+)*"
return True if re.match(pattern, torch_version) else False
def get_timestamp(detailed=False) -> str:
"""
get_timestamp - get a timestamp for the current time
Returns:
str, the timestamp
"""
return (
datetime.now().strftime("%b%d%Y_%H%M%S%f")
if detailed
else datetime.now().strftime("%b%d%Y_%H")
)
def truncate_word_count(text, max_words=1024):
"""
truncate_word_count - a helper function for the gradio module
Parameters
----------
text : str, required, the text to be processed
max_words : int, optional, the maximum number of words, default=512
Returns
-------
dict, the text and whether it was truncated
"""
# split on whitespace with regex
words = re.split(r"\s+", text)
processed = {}
if len(words) > max_words:
processed["was_truncated"] = True
processed["truncated_text"] = " ".join(words[:max_words])
else:
processed["was_truncated"] = False
processed["truncated_text"] = text
return processed
def load_examples(src, filetypes=[".txt", ".pdf"]):
"""
load_examples - a helper function for the gradio module to load examples
Returns:
list of str, the examples
"""
src = Path(src)
src.mkdir(exist_ok=True)
pdf_url = (
"https://www.dropbox.com/s/y92xy7o5qb88yij/all_you_need_is_attention.pdf?dl=1"
)
subprocess.run(["wget", pdf_url, "-O", src / "all_you_need_is_attention.pdf"])
examples = [f for f in src.iterdir() if f.suffix in filetypes]
examples = natsorted(examples)
# load the examples into a list
text_examples = []
for example in examples:
with open(example, "r") as f:
text = f.read()
text_examples.append([text, "base", 2, 1024, 0.7, 3.5, 3])
return text_examples
def load_example_filenames(example_path: str or Path):
"""
load_example_filenames - a helper function for the gradio module to load examples
Returns:
dict, the examples (filename:full path)
"""
example_path = Path(example_path)
# load the examples into a list
examples = {f.name: f for f in example_path.glob("*.txt")}
return examples
def textlist2html(text_batches):
# Step 1: Generate each summary batch as a string of HTML
formatted_batches = [
f"""
<div style="
margin-bottom: 20px;
font-size: 18px;
line-height: 1.5em;
color: #333;
">
<h2 style="font-size: 22px; color: #555;">Batch {i}:</h2>
<p style="white-space: pre-line;">{s}</p>
</div>
"""
for i, s in enumerate(text_batches, start=1)
]
# Step 2: Join all the summary batches together into one string
joined_batches = "".join(formatted_batches)
# Step 3: Wrap the summary string in a larger div with background color, border, and padding
text_html_block = f"""
<div style="
background-color: #f9f9f9;
border: 1px solid #ddd;
border-radius: 5px;
padding: 20px;
">
{joined_batches}
</div>
"""
return text_html_block
def extract_batches(html_string, pattern=None, flags=None) -> list:
"""
Extract batches of text from an HTML string.
Args:
html_string (str): The HTML string to extract batches from.
pattern (str, optional): The regular expression pattern to use. Defaults to a pattern that matches batches in the format provided.
flags (int, optional): The flags to use with the regular expression. Defaults to re.DOTALL.
Returns:
list: A list of dictionaries where each dictionary represents a batch and has 'title' and 'content' keys.
"""
# Set default pattern if none provided
if pattern is None:
pattern = r'<h2 style="font-size: 22px; color: #555;">(.*?)</h2>\s*<p style="white-space: pre-line;">(.*?)</p>'
# Set default flags if none provided
if flags is None:
flags = re.DOTALL
try:
# Find all matches in the string
matches = re.findall(pattern, html_string, flags)
# Convert matches to a list of dictionaries
batches = [
{"title": title.strip(), "content": content.strip()}
for title, content in matches
]
return batches
except re.error as e:
logging.error(f"An error occurred while trying to extract batches: {e}")
return []
def extract_keywords(
text: str, num_keywords: int = 3, window_size: int = 5, kw_max_len: int = 20
) -> List[str]:
"""
Extracts keywords from a text using a simplified TextRank algorithm.
Args:
text: The text to extract keywords from.
num_keywords: The number of keywords to extract. Default: 3
window_size: The number of words considered for co-occurrence. Default: 5
kw_max_len: The maximum length of a keyword (truncate longer keywords to max). Default: 20
Returns:
A list of strings, where each string is a keyword extracted from the input text.
"""
logger = logging.getLogger(__name__)
# Remove stopwords and tokenize the text into words
words = [
word
for word in re.findall(r"\b\w{3,}\b", text.lower())
if word not in STOPWORDS
]
# Create a graph of word co-occurrences within a moving window of words
cooccur = defaultdict(lambda: defaultdict(int))
deque_words = deque(maxlen=window_size)
for word in words:
for w1, w2 in combinations(deque_words, 2):
cooccur[w1][w2] += 1
cooccur[w2][w1] += 1
deque_words.append(word)
# Assign scores to words using a simplified TextRank algorithm
scores = defaultdict(float)
for _ in range(10):
new_scores = defaultdict(float)
for word, co_words in cooccur.items():
new_scores[word] = 0.15 + 0.85 * sum(
cooccur[word][other] / sum(cooccur[other].values()) * scores[other]
for other in co_words
)
scores = new_scores
# Sort the words by score and return the top num_keywords keywords
keywords = sorted(scores, key=scores.get, reverse=True)[:num_keywords]
logger.debug(f"All keywords: {keywords}")
# Use fuzzy matching to remove similar keywords
final_keywords = []
for keyword in keywords:
if not any(fuzz.ratio(keyword, other) > 70 for other in final_keywords):
final_keywords.append(keyword[:kw_max_len])
logger.debug(f"Keywords (max len. {kw_max_len}):\t{final_keywords}")
return final_keywords
def saves_summary(
summarize_output, outpath: str or Path = None, add_signature=True, **kwargs
):
"""
saves_summary - save the summary generated from summarize_via_tokenbatches() to a text file
summarize_output: output from summarize_via_tokenbatches()
outpath: path to the output file
add_signature: whether to add a signature to the output file
kwargs: additional keyword arguments to include in the output file
"""
logger = logging.getLogger(__name__)
sum_text = [f"{s['summary'][0]}\n" for s in summarize_output]
sum_scores = [f"\n - {round(s['summary_score'],4)}" for s in summarize_output]
scores_text = "\n".join(sum_scores)
full_summary = "\n".join(sum_text)
keywords = "_".join(extract_keywords(full_summary, kw_max_len=4))
logger.debug(f"kw:\t{keywords}")
outpath = (
Path.cwd() / f"DocSumm_{keywords}_{get_timestamp()}.txt"
if outpath is None
else Path(outpath)
)
logger.info(f"Saving summary to:\t{outpath.name}")
with open(
outpath,
"w",
encoding="utf-8",
) as fo:
fo.writelines(full_summary)
fo.write("\n\n")
if add_signature:
fo.write("\n\n---\n\n")
fo.write("Generated with the Document Summarization space :)\n\n")
fo.write("https://hf.co/spaces/pszemraj/document-summarization\n\n")
with open(
outpath,
"a",
encoding="utf-8",
) as fo:
fo.write("\n")
fo.write(f"## Section Scores:\n\n")
fo.writelines(scores_text)
fo.write("\n\n")
fo.write(f"Date: {get_timestamp()}\n\n")
if kwargs:
fo.write("---\n\n")
fo.write("## Parameters:\n\n")
for key, value in kwargs.items():
fo.write(f"{key}: {value}\n")
return outpath