File size: 16,276 Bytes
87e5c9c
 
 
77d5469
87e5c9c
8312087
77d5469
2d980d5
4399cd3
8312087
9d26661
8312087
77d5469
 
 
 
62a2921
8312087
87e5c9c
c1cba4f
471b053
9d26661
77d5469
ca983bc
77d5469
8312087
 
9e8f29e
 
80098ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9e8f29e
80098ed
62a2921
80098ed
62a2921
80098ed
 
9e8f29e
80098ed
62a2921
ca983bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
80098ed
62a2921
 
 
2d980d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7e0dde7
2d980d5
7e0dde7
2d980d5
 
 
 
 
 
 
 
 
 
 
 
 
 
7e0dde7
 
 
 
 
 
 
 
8312087
 
 
 
 
87e5c9c
079d1ca
77d5469
34de38e
 
7e0dde7
 
34de38e
77d5469
 
 
 
 
079d1ca
 
7e0dde7
87e5c9c
7e0dde7
 
 
 
87e5c9c
9e8f29e
87e5c9c
 
 
9e8f29e
87e5c9c
 
9e8f29e
87e5c9c
 
 
3ea8fe3
87e5c9c
 
7e0dde7
87e5c9c
 
 
925dd67
7452863
 
 
925dd67
3ea8fe3
87e5c9c
 
 
 
 
 
3ea8fe3
87e5c9c
 
 
 
 
 
 
 
 
 
 
 
 
 
34de38e
079d1ca
7e0dde7
 
895976b
 
c430753
 
 
 
 
 
 
 
 
 
 
 
 
895976b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c430753
 
7e0dde7
2d980d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e9ed1f2
c9456c2
e9ed1f2
471b053
e9ed1f2
471b053
 
 
c9456c2
 
 
471b053
 
 
77d5469
e9ed1f2
 
 
 
77d5469
e9ed1f2
471b053
e9ed1f2
471b053
e9ed1f2
 
 
471b053
 
e9ed1f2
471b053
e9ed1f2
471b053
e9ed1f2
 
 
 
471b053
e9ed1f2
471b053
e9ed1f2
471b053
 
 
77d5469
471b053
 
 
 
c9456c2
 
471b053
 
 
03e9034
 
7e0dde7
34de38e
 
 
03e9034
 
 
 
34de38e
77d5469
32939cb
34de38e
 
e219aa1
34de38e
c9456c2
55b49e6
471b053
32939cb
471b053
 
 
77d5469
34de38e
 
 
03e9034
34de38e
 
03e9034
 
 
 
 
34de38e
 
 
32939cb
34de38e
e219aa1
4399cd3
34de38e
03e9034
 
 
 
e219aa1
03e9034
 
4399cd3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
"""
    utils.py - Utility functions for the project.
"""
import logging
import re
import subprocess
from collections import defaultdict, deque
from datetime import datetime, timedelta
from itertools import combinations
from pathlib import Path
from typing import List

logging.basicConfig(
    format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
    level=logging.INFO,
)

import torch
from natsort import natsorted
from nltk.tokenize import WhitespaceTokenizer, sent_tokenize, word_tokenize
from rapidfuzz import fuzz

STOPWORDS = set(
    "a about above after again all also am an and any are aren't as at back be because been before being below between both but by can't cannot could couldn't did didn't do does doesn't doing don't down during each few for from further had hadn't has hasn't have haven't having he'd he'll he's hence her here here's hers herself him himself his how how's however i'd i'll i'm i've if in into is isn't it's its itself just let's me more moreover most mustn't my myself new nor now of off on once only or other ought our ours ourselves out over own really same shan't she'd she'll she's should shouldn't so some such than that that's the their theirs them themselves then there there's therefore these they they'd they'll they're they've this those through thus to too under until up use used using very was wasn't we we'd we'll we're we've were weren't what what's when when's where where's which while who who's whom why why's with won't would wouldn't you'd you'll you're you've your yours yourself yourselves".split()
)


def contraction_aware_tokenize(text: str) -> List[str]:
    """contraction_aware_tokenize - merges words containing apostrophes as one token."""

    # Tokenize the text using the WhitespaceTokenizer
    tokenizer = WhitespaceTokenizer()
    tokens = tokenizer.tokenize(text)

    merged_tokens = []
    merged_token = ""

    for token in tokens:
        if re.search(r"\w+'\w+", token):
            # Token contains an apostrophe, merge with previous token
            merged_token += token
        else:
            # no apostrophe, add previous merged token (if any) and current
            if merged_token:
                merged_tokens.append(merged_token)
                merged_token = ""
            merged_tokens.append(token)

    # Add the last merged token (if any)
    if merged_token:
        merged_tokens.append(merged_token)

    return merged_tokens


def remove_stopwords(
    text: str, stopwords: List[str] = STOPWORDS, contraction_tokenize: bool = True
) -> str:
    """
    remove_stopwords - Remove stopwords from text.

    :param str text: input text
    :param List[str] stopwords: list of stopwords, defaults to STOPWORDS
    :param bool contraction_tokenize: use custom apostrophe tokenizer, defaults to True
    :return str: text with stopwords removed
    """
    lines = text.split("\n")
    filtered_lines = []

    def fix_commas(text: str) -> str:
        """fixes commas in text to have a space after them"""
        spaced_text = text.replace(",", ", ")
        return spaced_text.replace("  ", " ").strip()

    for line in lines:
        sentences = sent_tokenize(line)
        filtered_sentences = []

        for sentence in sentences:
            # Add space around punctuations for the regex to work correctly, only if they are followed by a letter
            sentence_with_spaces = re.sub(r"([.,!?])(\w)", r"\1 \2", sentence[:-1])

            words = (
                contraction_aware_tokenize(sentence_with_spaces)
                if contraction_tokenize
                else word_tokenize(sentence_with_spaces)
            )

            filtered_words = []
            for word in words:
                if word.lower() not in stopwords:
                    filtered_words.append(word)

            filtered_sentence = " ".join(filtered_words)
            # Restore original spaces around punctuation marks
            filtered_sentence = re.sub(r"([.,!?])\s*", r"\1", filtered_sentence)

            filtered_sentences.append(filtered_sentence + sentence[-1])

        filtered_line = " ".join(filtered_sentences)

        # Replace multiple consecutive whitespaces with a single space
        filtered_line = re.sub(r"\s+", " ", filtered_line)
        filtered_line = fix_commas(filtered_line.strip())

        filtered_lines.append(filtered_line)

    filtered_text = "\n".join(filtered_lines)

    return filtered_text


def remove_stagnant_files(
    freq: str = "hourly",
    search_path: str = ".",
    substring="DocSumm",
    remove_suffix=".txt",
):
    """
    remove_stagnant_files - Remove files that have not been modified in a certain amount of time.

    :param str freq: frequency of file removal, defaults to "hourly"
    :param str search_path: location to search for files, defaults to "."
    :param str substring: substring to search for in file names, defaults to "DocSumm"
    :param str remove_suffix: suffix of files to remove, defaults to ".txt"
    :raises ValueError: if freq is not one of "hourly", "daily", or "weekly"
    """
    current_time = datetime.now()
    search_path = Path(search_path)

    if freq == "hourly":
        time_threshold = current_time - timedelta(hours=1)
    elif freq == "daily":
        time_threshold = current_time - timedelta(days=1)
    elif freq == "weekly":
        time_threshold = current_time - timedelta(weeks=1)
    else:
        raise ValueError(
            "Invalid frequency. Supported values are 'hourly', 'daily', and 'weekly'."
        )

    files_to_remove = []
    potential_files = [
        f for f in search_path.iterdir() if f.is_file() and f.suffix == remove_suffix
    ]
    logging.info(f"Found {len(potential_files)} files.")
    for candidate in potential_files:
        if (
            candidate.is_file()
            and substring in candidate.name
            and candidate.stat().st_mtime < time_threshold.timestamp()
        ):
            files_to_remove.append(candidate)
        logging.debug(f"File {candidate} last modified at {candidate.stat().st_mtime}")
    logging.info(f"Removing {len(files_to_remove)} files.")
    for file_path in files_to_remove:
        file_path.unlink()
    logging.debug(f"Removed files: {files_to_remove}")


def compare_model_size(model_name: str, threshold: int = 500) -> bool:
    """
    compare_model_size - compare string representations of model size to a threshold

    :param str model_name: the model name to compare
    :param int threshold: the threshold to compare against in millions, defaults to 500
    :return: True if the model size is greater than the threshold, False or None otherwise
    """
    pattern = r"(\d+)(M|G|k|b)?"  # param regex

    matches = re.findall(pattern, model_name)
    if not matches:
        return None

    # Extract the parameter count and unit
    parameter_count, unit = matches[-1]
    parameter_count = int(parameter_count)

    # Convert to the standard form (M for million, G for billion, k for thousand)
    if unit == "G" or unit == "b":
        parameter_count *= 1000
    elif unit == "M":
        pass
    elif unit == "k":
        parameter_count /= 1000
    else:
        return None  # Unknown

    return parameter_count > threshold


def validate_pytorch2(torch_version: str = None) -> bool:
    """
    validate_pytorch2 - validate that the PyTorch version is 2.0 or greater

    :param str torch_version: the PyTorch version to validate, defaults to None
    :return: True if the PyTorch version is 2.0 or greater, False otherwise
    """

    torch_version = torch.__version__ if torch_version is None else torch_version

    pattern = r"^2\.\d+(\.\d+)*"

    return True if re.match(pattern, torch_version) else False


def get_timestamp(detailed=False) -> str:
    """
    get_timestamp - get a timestamp for the current time
    :param bool detailed: whether to include seconds and microseconds, defaults to False
    :return: str, the timestamp
    """
    return (
        datetime.now().strftime("%b%d%Y_%H%M%S%f")
        if detailed
        else datetime.now().strftime("%b%d%Y_%H")
    )


def truncate_word_count(text: str, max_words=1024) -> dict:
    """
    truncate_word_count - truncate a text to a maximum number of words
    :param str text: the text to truncate
    :param int max_words: the maximum number of words to keep, defaults to 1024
    :return: dict, the processed text
    """
    words = contraction_aware_tokenize(str(text))
    processed = {}
    if len(words) > max_words:
        processed["was_truncated"] = True
        processed["processed_text"] = " ".join(words[:max_words])
    else:
        processed["was_truncated"] = False
        processed["processed_text"] = text
    return processed


def load_examples(src, filetypes=[".txt", ".pdf"]):
    """
    load_examples - a helper function for the gradio module to load examples
    :param str src: the path to the examples
    """
    src = Path(src)
    src.mkdir(exist_ok=True)

    pdf_url = (
        "https://www.dropbox.com/s/y92xy7o5qb88yij/all_you_need_is_attention.pdf?dl=1"
    )
    subprocess.run(["wget", pdf_url, "-O", src / "all_you_need_is_attention.pdf"])
    examples = [f for f in src.iterdir() if f.suffix in filetypes]
    examples = natsorted(examples)
    # load the examples into a list
    text_examples = []
    for example in examples:
        with open(example, "r") as f:
            text = f.read()
            text_examples.append([text, "base", 2, 1024, 0.7, 3.5, 3])

    return text_examples


def load_example_filenames(example_path: str or Path):
    """
    load_example_filenames - a helper function for the gradio module to load examples
    Returns:
        dict, the examples (filename:full path)
    """
    example_path = Path(example_path)
    # load the examples into a list
    examples = {f.name: f for f in example_path.glob("*.txt")}
    return examples


def textlist2html(text_batches: List[str]) -> str:
    """textlist2html - convert a list of text summaries into a single HTML string"""
    # Step 1: Generate each summary batch as a string of HTML
    formatted_batches = [
        f"""
        <div style="
            margin-bottom: 20px;
            font-size: 18px;
            line-height: 1.5em;
            color: #333;
        ">
            <h2 style="font-size: 22px; color: #555;">Batch {i}:</h2>
            <p style="white-space: pre-line;">{s}</p>
        </div>
        """
        for i, s in enumerate(text_batches, start=1)
    ]

    # Step 2: Join all the summary batches together into one string
    joined_batches = "".join(formatted_batches)

    # Step 3: Wrap the summary string in a larger div with background color, border, and padding
    text_html_block = f"""
    <div style="
        border: 1px solid #ddd;
        border-radius: 5px;
        padding: 20px;
    ">
    {joined_batches}
    </div>
    """

    return text_html_block


def extract_batches(html_string: str, pattern=None, flags=None) -> list:
    """
    Extract batches of text from an HTML string.

    Args:
        html_string (str): The HTML string to extract batches from.
        pattern (str, optional): The regular expression pattern to use. Defaults to a pattern that matches batches in the format provided.
        flags (int, optional): The flags to use with the regular expression. Defaults to re.DOTALL.

    Returns:
        list: A list of dictionaries where each dictionary represents a batch and has 'title' and 'content' keys.
    """
    # Set default pattern if none provided
    if pattern is None:
        pattern = r'<h2 style="font-size: 22px; color: #555;">(.*?)</h2>\s*<p style="white-space: pre-line;">(.*?)</p>'

    # Set default flags if none provided
    if flags is None:
        flags = re.DOTALL

    try:
        # Find all matches in the string
        matches = re.findall(pattern, html_string, flags)

        # Convert matches to a list of dictionaries
        batches = [
            {"title": title.strip(), "content": content.strip()}
            for title, content in matches
        ]

        return batches
    except re.error as e:
        logging.error(f"An error occurred while trying to extract batches: {e}")
        return []


def extract_keywords(
    text: str, num_keywords: int = 3, window_size: int = 5, kw_max_len: int = 20
) -> List[str]:
    """
    Extracts keywords from a text using a simplified TextRank algorithm.

    Args:
        text: The text to extract keywords from.
        num_keywords: The number of keywords to extract. Default: 3
        window_size: The number of words considered for co-occurrence. Default: 5
        kw_max_len: The maximum length of a keyword (truncate longer keywords to max). Default: 20
    Returns:
        A list of strings, where each string is a keyword extracted from the input text.
    """
    logger = logging.getLogger(__name__)
    # Remove stopwords and tokenize the text into words
    words = [
        word
        for word in re.findall(r"\b\w{3,}\b", text.lower())
        if word not in STOPWORDS
    ]

    # Create a graph of word co-occurrences within a moving window of words
    cooccur = defaultdict(lambda: defaultdict(int))
    deque_words = deque(maxlen=window_size)
    for word in words:
        for w1, w2 in combinations(deque_words, 2):
            cooccur[w1][w2] += 1
            cooccur[w2][w1] += 1
        deque_words.append(word)

    # Assign scores to words using a simplified TextRank algorithm
    scores = defaultdict(float)
    for _ in range(10):
        new_scores = defaultdict(float)
        for word, co_words in cooccur.items():
            new_scores[word] = 0.15 + 0.85 * sum(
                cooccur[word][other] / sum(cooccur[other].values()) * scores[other]
                for other in co_words
            )
        scores = new_scores

    # Sort the words by score and return the top num_keywords keywords
    keywords = sorted(scores, key=scores.get, reverse=True)[:num_keywords]
    logger.debug(f"All keywords: {keywords}")
    # Use fuzzy matching to remove similar keywords
    final_keywords = []
    for keyword in keywords:
        if not any(fuzz.ratio(keyword, other) > 70 for other in final_keywords):
            final_keywords.append(keyword[:kw_max_len])
    logger.debug(f"Keywords (max len. {kw_max_len}):\t{final_keywords}")
    return final_keywords


def saves_summary(
    summarize_output, outpath: str or Path = None, add_signature=True, **kwargs
) -> Path:
    """
    saves_summary - save the summary generated from summarize_via_tokenbatches() to a text file

    summarize_output: output from summarize_via_tokenbatches()
    outpath: path to the output file
    add_signature: whether to add a signature to the output file
    kwargs: additional keyword arguments to include in the output file
    """
    logger = logging.getLogger(__name__)
    sum_text = [f"{s['summary'][0]}\n" for s in summarize_output]
    sum_scores = [f"\n - {round(s['summary_score'],4)}" for s in summarize_output]
    scores_text = "\n".join(sum_scores)
    full_summary = "\n".join(sum_text)

    keywords = "_".join(extract_keywords(full_summary, kw_max_len=4))
    logger.debug(f"kw:\t{keywords}")
    outpath = (
        Path.cwd() / f"DocSumm_{keywords}_{get_timestamp()}.txt"
        if outpath is None
        else Path(outpath)
    )
    logger.info(f"Saving summary to:\t{outpath.name}")
    with open(
        outpath,
        "w",
        encoding="utf-8",
    ) as fo:
        fo.writelines(full_summary)
        fo.write("\n\n")
        if add_signature:
            fo.write("\n\n---\n\n")
            fo.write("Generated with the Document Summarization space :)\n\n")
            fo.write("https://hf.co/spaces/pszemraj/document-summarization\n\n")
    with open(
        outpath,
        "a",
        encoding="utf-8",
    ) as fo:
        fo.write("\n")
        fo.write("## Section Scores:\n\n")
        fo.writelines(scores_text)
        fo.write("\n\n")
        fo.write(f"Date: {get_timestamp()}\n\n")
        if kwargs:
            fo.write("---\n\n")
            fo.write("## Parameters:\n\n")
            for key, value in kwargs.items():
                fo.write(f"{key}: {value}\n")
    return str(outpath.resolve())