|
import gradio as gr |
|
import json |
|
import re |
|
import sqlite3 |
|
import logging |
|
from collections import defaultdict |
|
from typing import Tuple, Dict, List |
|
|
|
|
|
from util import process_json_files |
|
from gematria import calculate_gematria |
|
from deep_translator import GoogleTranslator, exceptions |
|
from urllib.parse import quote_plus |
|
from tqdm import tqdm |
|
|
|
|
|
DATABASE_FILE = 'gematria.db' |
|
MAX_PHRASE_LENGTH_LIMIT = 20 |
|
BATCH_SIZE = 10000 |
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
conn: sqlite3.Connection = None |
|
translator: GoogleTranslator = None |
|
book_names: Dict[int, str] = {} |
|
gematria_cache: Dict[Tuple[int, int], List[Tuple[str, str, int, int, int, str]]] = {} |
|
translation_cache: Dict[str, str] = {} |
|
total_word_count: int = 0 |
|
|
|
|
|
def initialize_database() -> None: |
|
"""Initializes the SQLite database.""" |
|
global conn |
|
conn = sqlite3.connect(DATABASE_FILE) |
|
cursor = conn.cursor() |
|
|
|
cursor.execute(''' |
|
CREATE TABLE IF NOT EXISTS results ( |
|
gematria_sum INTEGER, |
|
words TEXT, |
|
translation TEXT, |
|
book TEXT, |
|
chapter INTEGER, |
|
verse INTEGER, |
|
phrase_length INTEGER, |
|
word_position TEXT, |
|
PRIMARY KEY (gematria_sum, words, book, chapter, verse, word_position) |
|
) |
|
''') |
|
|
|
cursor.execute(''' |
|
CREATE INDEX IF NOT EXISTS idx_results_gematria |
|
ON results (gematria_sum) |
|
''') |
|
|
|
cursor.execute(''' |
|
CREATE TABLE IF NOT EXISTS processed_books ( |
|
book TEXT PRIMARY KEY, |
|
max_phrase_length INTEGER |
|
) |
|
''') |
|
|
|
conn.commit() |
|
|
|
|
|
def initialize_translator() -> None: |
|
"""Initializes the Google Translator.""" |
|
global translator |
|
translator = GoogleTranslator(source='iw', target='en') |
|
logging.info("Translator initialized.") |
|
|
|
|
|
def process_book(book_id: int, max_phrase_length: int, cursor): |
|
"""Processes a single book and returns phrases to insert.""" |
|
global book_names, total_word_count |
|
book_data = process_json_files(book_id, book_id) |
|
phrases_to_insert = [] |
|
|
|
if book_id in book_data: |
|
book_data = book_data[book_id] |
|
if 'title' not in book_data or not isinstance(book_data['title'], str): |
|
logging.warning(f"Skipping book {book_id} due to missing 'title' field.") |
|
return phrases_to_insert |
|
|
|
title = book_data['title'] |
|
book_names[book_id] = title |
|
|
|
|
|
cursor.execute('''SELECT max_phrase_length FROM processed_books WHERE book = ?''', (title,)) |
|
result = cursor.fetchone() |
|
if result and result[0] >= max_phrase_length: |
|
logging.info(f"Skipping book {title}: Already processed with max_phrase_length {result[0]}") |
|
return phrases_to_insert |
|
|
|
if 'text' not in book_data or not isinstance(book_data['text'], list): |
|
logging.warning(f"Skipping book {book_id} due to missing 'text' field.") |
|
return phrases_to_insert |
|
|
|
chapters = book_data['text'] |
|
for chapter_id, chapter in enumerate(chapters): |
|
for verse_id, verse in enumerate(chapter): |
|
verse_text = flatten_text(verse) |
|
verse_text = re.sub(r'\[.*?\]', '', verse_text) |
|
verse_text = re.sub(r"[^\u05D0-\u05EA ]+", "", verse_text) |
|
verse_text = re.sub(r" +", " ", verse_text) |
|
words = verse_text.split() |
|
|
|
for length in range(1, max_phrase_length + 1): |
|
for start in range(len(words) - length + 1): |
|
phrase_candidate = " ".join(words[start:start + length]) |
|
gematria_sum = calculate_gematria(phrase_candidate.replace(" ", "")) |
|
|
|
word_position_range = f"{total_word_count + start + 1}-{total_word_count + start + length}" |
|
|
|
phrases_to_insert.append( |
|
(gematria_sum, phrase_candidate, None, title, chapter_id + 1, verse_id + 1, length, |
|
word_position_range)) |
|
|
|
total_word_count += len(words) |
|
|
|
return phrases_to_insert |
|
|
|
|
|
def populate_database(start_book: int, end_book: int, max_phrase_length: int = 1) -> None: |
|
"""Populates the database with phrases from the Tanach.""" |
|
global conn, book_names, total_word_count |
|
logging.info(f"Populating database with books from {start_book} to {end_book}...") |
|
|
|
with sqlite3.connect(DATABASE_FILE) as conn: |
|
cursor = conn.cursor() |
|
|
|
for book_id in tqdm(range(start_book, end_book + 1), desc="Processing Books"): |
|
phrases_to_insert = process_book(book_id, max_phrase_length, cursor) |
|
|
|
if phrases_to_insert: |
|
cursor.executemany(''' |
|
INSERT OR IGNORE INTO results (gematria_sum, words, translation, book, chapter, verse, phrase_length, word_position) |
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?) |
|
''', phrases_to_insert) |
|
|
|
|
|
cursor.execute(''' |
|
INSERT OR REPLACE INTO processed_books (book, max_phrase_length) |
|
VALUES (?, ?) |
|
''', (book_names[book_id], max_phrase_length)) |
|
|
|
conn.commit() |
|
|
|
total_word_count = 0 |
|
|
|
|
|
def get_translation(phrase: str) -> str: |
|
"""Retrieves or generates the English translation of a Hebrew phrase |
|
and caches it in the database. |
|
""" |
|
global conn, translator, translation_cache |
|
|
|
|
|
with sqlite3.connect(DATABASE_FILE) as conn: |
|
cursor = conn.cursor() |
|
cursor.execute("SELECT translation FROM results WHERE words = ? LIMIT 1", (phrase,)) |
|
result = cursor.fetchone() |
|
if result and result[0]: |
|
return result[0] |
|
|
|
|
|
translation = translate_and_store(phrase) |
|
translation_cache[phrase] = translation |
|
|
|
|
|
with sqlite3.connect(DATABASE_FILE) as conn: |
|
cursor = conn.cursor() |
|
cursor.execute("UPDATE results SET translation = ? WHERE words = ?", (translation, phrase)) |
|
conn.commit() |
|
|
|
return translation |
|
|
|
|
|
def translate_and_store(phrase: str) -> str: |
|
"""Translates a Hebrew phrase to English using Google Translate.""" |
|
global translator |
|
max_retries = 3 |
|
retries = 0 |
|
while retries < max_retries: |
|
try: |
|
translation = translator.translate(phrase) |
|
return translation |
|
except (exceptions.TranslationNotFound, exceptions.NotValidPayload, |
|
exceptions.ServerException, exceptions.RequestError) as e: |
|
retries += 1 |
|
logging.warning(f"Error translating phrase '{phrase}': {e}. Retrying... ({retries}/{max_retries})") |
|
logging.error(f"Failed to translate phrase '{phrase}' after {max_retries} retries.") |
|
return "[Translation Error]" |
|
|
|
|
|
def search_gematria_in_db(gematria_sum: int, max_words: int) -> List[Tuple[str, str, int, int, int, str]]: |
|
"""Searches the database for phrases with a given Gematria value.""" |
|
global conn |
|
with sqlite3.connect(DATABASE_FILE) as conn: |
|
cursor = conn.cursor() |
|
cursor.execute(''' |
|
SELECT words, book, chapter, verse, phrase_length, word_position |
|
FROM results |
|
WHERE gematria_sum = ? AND phrase_length <= ? |
|
''', (gematria_sum, max_words)) |
|
results = cursor.fetchall() |
|
return results |
|
|
|
|
|
def gematria_search_interface(phrases: str, max_words: int, show_translation: bool) -> str: |
|
"""The main function for the Gradio interface, handling multiple phrases.""" |
|
global conn, book_names, gematria_cache |
|
|
|
results = [] |
|
all_results = [] |
|
middle_words_results = [] |
|
all_names_average_position = 0 |
|
total_name_count = 0 |
|
|
|
phrases = phrases.strip().splitlines() |
|
if not phrases: |
|
return "Please enter at least one phrase." |
|
|
|
for phrase in phrases: |
|
if not phrase.strip(): |
|
continue |
|
|
|
numbers = re.findall(r'\d+', phrase) |
|
text_without_numbers = re.sub(r'\d+', '', phrase) |
|
phrase_gematria = calculate_gematria(text_without_numbers.replace(" ", "")) |
|
phrase_gematria += sum(int(number) for number in numbers) |
|
|
|
if (phrase_gematria, max_words) in gematria_cache: |
|
matching_phrases = gematria_cache[(phrase_gematria, max_words)] |
|
else: |
|
matching_phrases = search_gematria_in_db(phrase_gematria, max_words) |
|
gematria_cache[(phrase_gematria, max_words)] = matching_phrases |
|
|
|
if not matching_phrases: |
|
results.append(f"No matching phrases found for: {phrase}") |
|
continue |
|
|
|
sorted_phrases = sorted(matching_phrases, |
|
key=lambda x: (int(list(book_names.keys())[list(book_names.values()).index(x[1])]), x[2], |
|
x[3])) |
|
results_by_book = defaultdict(list) |
|
for words, book, chapter, verse, phrase_length, word_position in sorted_phrases: |
|
results_by_book[book].append((words, chapter, verse, phrase_length, word_position)) |
|
|
|
results.append(f"<h2>Results for: {phrase} (Gematria: {phrase_gematria})</h2>") |
|
results.append("<div class='results-container'>") |
|
for book, phrases in results_by_book.items(): |
|
for words, chapter, verse, phrase_length, word_position in phrases: |
|
translation = get_translation(words) if show_translation else "" |
|
link = f"https://www.biblegateway.com/passage/?search={quote_plus(book)}+{chapter}%3A{verse}&version=CJB" |
|
results.append(f""" |
|
<div class='result-item'> |
|
<p><b>Book:</b> {book}</p> |
|
<p><b>Chapter:</b> {chapter}, <b>Verse:</b> {verse}</p> |
|
<p class='hebrew-phrase'><b>Hebrew Phrase:</b> {words}</p> |
|
<p><b>Translation:</b> {translation}</p> |
|
<p><b>Phrase Length:</b> {phrase_length} words</p> |
|
<p><b>Phrase Gematria:</b> {phrase_gematria}</p> |
|
<p><b>Word Position in the Tanach:</b> {word_position}</p> |
|
<a href='{link}' target='_blank' class='bible-link'>[See on Bible Gateway]</a> |
|
</div> |
|
""") |
|
|
|
|
|
name_average_position = calculate_average_position_for_name(results_by_book) |
|
if name_average_position is not None: |
|
results.append(f"<p><b>Average Word Position for '{phrase}' across all books:</b> {name_average_position:.2f}</p>") |
|
all_names_average_position += name_average_position |
|
total_name_count += 1 |
|
|
|
results.append("</div>") |
|
all_results.append(results_by_book) |
|
|
|
|
|
if total_name_count > 0: |
|
all_names_average_position /= total_name_count |
|
results.append(f"<h2>Average Word Position Across All Names and Books: {all_names_average_position:.2f}</h2>") |
|
|
|
|
|
if len(all_results) >= 2: |
|
results.append("<h2>Middle Words (Common Books):</h2>") |
|
results.append("<div class='results-container'>") |
|
|
|
common_books = set.intersection(*[set(results.keys()) for results in all_results]) |
|
logging.debug(f"Common books: {common_books}") |
|
|
|
for book in common_books: |
|
logging.debug(f"Processing book: {book}") |
|
|
|
|
|
nearest_positions = find_nearest_positions([results[book] for results in all_results]) |
|
logging.debug(f"Nearest positions in {book}: {nearest_positions}") |
|
|
|
if nearest_positions: |
|
middle_word_position = sum(nearest_positions) / len(nearest_positions) |
|
logging.debug(f"Calculated middle word position in {book}: {middle_word_position}") |
|
|
|
start_position = int(middle_word_position) |
|
end_position = start_position + 1 if middle_word_position % 1 != 0 else start_position |
|
logging.debug(f"Middle word position range in {book}: {start_position}-{end_position}") |
|
|
|
middle_words_data = get_words_from_db(book, start_position, end_position) |
|
logging.debug(f"Middle words data fetched from database: {middle_words_data}") |
|
|
|
if middle_words_data: |
|
|
|
middle_words_results.extend([(book, data) for data in middle_words_data]) |
|
else: |
|
|
|
logging.debug(f"No middle words found for range {start_position}-{end_position}. " |
|
f"Fetching words independently.") |
|
middle_words_data_start = get_words_from_db(book, start_position, start_position) |
|
middle_words_data_end = get_words_from_db(book, end_position, end_position) |
|
|
|
if middle_words_data_start or middle_words_data_end: |
|
middle_words_results.extend([(book, data) for data in middle_words_data_start + middle_words_data_end]) |
|
|
|
|
|
middle_words_results.sort(key=lambda x: int(list(book_names.keys())[list(book_names.values()).index(x[0])])) |
|
|
|
for book, (words, chapter, verse, phrase_length, word_position) in middle_words_results: |
|
translation = get_translation(words) if show_translation else "" |
|
link = f"https://www.biblegateway.com/passage/?search={quote_plus(book)}+{chapter}%3A{verse}&version=CJB" |
|
results.append(f""" |
|
<div class='result-item'> |
|
<p><b>Book:</b> {book}</p> |
|
<p><b>Chapter:</b> {chapter}, <b>Verse:</b> {verse}</p> |
|
<p class='hebrew-phrase'><b>Hebrew Phrase:</b> {words}</p> |
|
<p><b>Translation:</b> {translation}</p> |
|
<p><b>Phrase Length:</b> {phrase_length} words</p> |
|
<p><b>Word Position in the Tanach:</b> {word_position}</p> |
|
<a href='{link}' target='_blank' class='bible-link'>[See on Bible Gateway]</a> |
|
</div> |
|
""") |
|
results.append("</div>") |
|
|
|
|
|
style = """ |
|
<style> |
|
.results-container { |
|
display: grid; |
|
grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); |
|
gap: 20px; |
|
width: 100%; /* Make results container take full width */ |
|
} |
|
.result-item { |
|
border: 1px solid #ccc; |
|
padding: 15px; |
|
border-radius: 5px; |
|
box-shadow: 2px 2px 5px rgba(0, 0, 0, 0.1); |
|
} |
|
.hebrew-phrase { |
|
font-family: 'SBL Hebrew', 'Ezra SIL', serif; |
|
direction: rtl; |
|
} |
|
.bible-link { |
|
display: block; |
|
margin-top: 10px; |
|
color: #007bff; |
|
text-decoration: none; |
|
} |
|
</style> |
|
""" |
|
return style + "\n".join(results) |
|
|
|
def calculate_average_position_for_name(results_by_book: Dict[str, List[Tuple]]) -> float: |
|
"""Calculates the average word position for a single name across all books.""" |
|
positions = [] |
|
for book, phrases in results_by_book.items(): |
|
for _, _, _, _, word_position in phrases: |
|
start, end = map(int, word_position.split('-')) |
|
positions.append((start + end) / 2) |
|
return sum(positions) / len(positions) if positions else None |
|
|
|
def find_nearest_positions(results_lists: List[List]) -> List[int]: |
|
"""Finds the nearest word positions among multiple lists of results.""" |
|
nearest_positions = [] |
|
for i in range(len(results_lists)): |
|
positions_i = [(int(pos.split('-')[0]) + int(pos.split('-')[1])) / 2 |
|
for _, _, _, _, pos in results_lists[i]] |
|
logging.debug(f"Positions for phrase {i+1}: {positions_i}") |
|
|
|
|
|
average_position = sum(positions_i) / len(positions_i) if positions_i else None |
|
logging.debug(f"Average position for phrase {i+1}: {average_position}") |
|
|
|
if average_position is not None: |
|
nearest_positions.append(average_position) |
|
|
|
return nearest_positions |
|
|
|
|
|
def get_words_from_db(book: str, start_position: int, end_position: int) -> List[Tuple]: |
|
"""Fetches words from the database based on the book and exact word position range.""" |
|
global conn |
|
logging.debug(f"Fetching words from database for {book} at positions {start_position}-{end_position}") |
|
with sqlite3.connect(DATABASE_FILE) as conn: |
|
cursor = conn.cursor() |
|
cursor.execute(""" |
|
SELECT words, chapter, verse, phrase_length, word_position |
|
FROM results |
|
WHERE book = ? AND word_position = ? |
|
""", (book, f"{start_position}-{end_position}")) |
|
results = cursor.fetchall() |
|
logging.debug(f"Words fetched from database: {results}") |
|
return results |
|
|
|
|
|
def flatten_text(text: List) -> str: |
|
"""Flattens nested lists into a single list.""" |
|
if isinstance(text, list): |
|
return " ".join(flatten_text(item) if isinstance(item, list) else item for item in text) |
|
return text |
|
|
|
|
|
def run_app() -> None: |
|
"""Initializes and launches the Gradio app.""" |
|
global conn |
|
initialize_database() |
|
initialize_translator() |
|
|
|
logging.info("Starting database population...") |
|
for max_phrase_length in range(1, MAX_PHRASE_LENGTH_LIMIT + 1): |
|
populate_database(1, 39, max_phrase_length=max_phrase_length) |
|
logging.info("Database population complete.") |
|
|
|
with gr.Blocks() as iface: |
|
with gr.Row(): |
|
textbox = gr.Textbox(label="Enter word(s) or numbers (one phrase per line)", lines=5) |
|
slider = gr.Slider(label="Max Word Count in Result Phrases", minimum=1, |
|
maximum=MAX_PHRASE_LENGTH_LIMIT, step=1, |
|
value=1) |
|
checkbox = gr.Checkbox(label="Show Translation", value=True) |
|
with gr.Row(): |
|
clear_button = gr.Button("Clear") |
|
submit_button = gr.Button("Submit", variant="primary") |
|
|
|
html_output = gr.HTML(label="Results") |
|
|
|
submit_button.click(fn=gematria_search_interface, |
|
inputs=[textbox, slider, checkbox], |
|
outputs=html_output) |
|
clear_button.click(fn=lambda: "", inputs=None, outputs=html_output) |
|
|
|
iface.launch() |
|
|
|
|
|
if __name__ == "__main__": |
|
run_app() |