Spaces:
Sleeping
Sleeping
import gradio as gr | |
import json | |
import re | |
import sqlite3 | |
import logging | |
from collections import defaultdict | |
from util import process_json_files | |
from gematria import calculate_gematria | |
from deep_translator import GoogleTranslator, exceptions | |
from urllib.parse import quote_plus | |
# Set up logging | |
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
# Global variables for database connection and translator | |
conn = None | |
translator = None | |
book_names = {} # Dictionary to store book names | |
def flatten_text(text): | |
"""Helper function to flatten nested lists into a single list.""" | |
if isinstance(text, list): | |
return " ".join(flatten_text(item) if isinstance(item, list) else item for item in text) | |
return text | |
def initialize_database(): | |
"""Initializes the SQLite database.""" | |
global conn | |
conn = sqlite3.connect('gematria.db') | |
c = conn.cursor() | |
c.execute(''' | |
CREATE TABLE IF NOT EXISTS results ( | |
gematria_sum INTEGER, | |
words TEXT UNIQUE, | |
translation TEXT, | |
book INTEGER, | |
chapter INTEGER, | |
verse INTEGER, | |
PRIMARY KEY (words, book, chapter, verse) | |
) | |
''') | |
c.execute(''' | |
CREATE TABLE IF NOT EXISTS processed_books ( | |
book INTEGER PRIMARY KEY, | |
max_phrase_length INTEGER | |
) | |
''') | |
conn.commit() | |
logging.info("Database initialized.") | |
def initialize_translator(): | |
"""Initializes the Google Translator.""" | |
global translator | |
translator = GoogleTranslator(source='iw', target='en') | |
logging.info("Translator initialized.") | |
def insert_phrase_to_db(gematria_sum, phrase_candidate, book, chapter, verse): | |
"""Inserts a phrase and its Gematria value into the database.""" | |
global conn | |
c = conn.cursor() | |
try: | |
c.execute(''' | |
INSERT INTO results (gematria_sum, words, book, chapter, verse) | |
VALUES (?, ?, ?, ?, ?) | |
''', (gematria_sum, phrase_candidate, book, chapter, verse)) | |
conn.commit() | |
logging.debug(f"Inserted phrase: {phrase_candidate} (Gematria: {gematria_sum}) at {book}:{chapter}:{verse}") | |
except sqlite3.IntegrityError: | |
logging.debug(f"Phrase already exists: {phrase_candidate} (Gematria: {gematria_sum}) at {book}:{chapter}:{verse}") | |
def populate_database(tanach_texts, max_phrase_length=3): | |
"""Populates the database with phrases from the Tanach and their Gematria values.""" | |
global conn, book_names | |
logging.info("Populating database...") | |
c = conn.cursor() | |
for book_id, text in tanach_texts: # Unpack the tuple (book_id, text) | |
c.execute('''SELECT max_phrase_length FROM processed_books WHERE book = ?''', (book_id,)) | |
result = c.fetchone() | |
if result and result[0] >= max_phrase_length: | |
logging.info(f"Skipping book {book_id}: Already processed with max_phrase_length {result[0]}") | |
continue | |
logging.info(f"Processing book {book_id} with max_phrase_length {max_phrase_length}") | |
if 'text' not in text or not isinstance(text['text'], list): | |
logging.warning(f"Skipping book {book_id} due to missing or invalid 'text' field.") | |
continue | |
title = text.get('title', 'Unknown') | |
book_names[book_id] = title # Store book name | |
chapters = text['text'] | |
for chapter_id, chapter in enumerate(chapters): | |
if not isinstance(chapter, list): | |
logging.warning(f"Skipping chapter {chapter_id} in book {title} due to invalid format.") | |
continue | |
for verse_id, verse in enumerate(chapter): | |
verse_text = flatten_text(verse) | |
verse_text = re.sub(r"[^\u05D0-\u05EA ]+", "", verse_text) | |
verse_text = re.sub(r" +", " ", verse_text) | |
words = verse_text.split() | |
for length in range(1, max_phrase_length + 1): | |
for start in range(len(words) - length + 1): | |
phrase_candidate = " ".join(words[start:start + length]) | |
gematria_sum = calculate_gematria(phrase_candidate.replace(" ", "")) | |
insert_phrase_to_db(gematria_sum, phrase_candidate, book_id, chapter_id + 1, verse_id + 1) | |
try: | |
c.execute('''INSERT INTO processed_books (book, max_phrase_length) VALUES (?, ?)''', (book_id, max_phrase_length)) | |
except sqlite3.IntegrityError: | |
c.execute('''UPDATE processed_books SET max_phrase_length = ? WHERE book = ?''', (max_phrase_length, book_id)) | |
conn.commit() | |
logging.info("Database population complete.") | |
def get_translation(phrase): | |
"""Retrieves or generates the English translation of a Hebrew phrase.""" | |
global translator, conn | |
c = conn.cursor() | |
c.execute(''' | |
SELECT translation FROM results | |
WHERE words = ? | |
''', (phrase,)) | |
result = c.fetchone() | |
if result and result[0]: | |
return result[0] | |
else: | |
translation = translate_and_store(phrase) | |
c.execute(''' | |
UPDATE results | |
SET translation = ? | |
WHERE words = ? | |
''', (translation, phrase)) | |
conn.commit() | |
return translation | |
def translate_and_store(phrase): | |
"""Translates a phrase using Google Translate.""" | |
global translator | |
try: | |
translation = translator.translate(phrase) | |
logging.debug(f"Translated phrase: {translation}") | |
return translation | |
except (exceptions.TranslationNotFound, exceptions.NotValidPayload, | |
exceptions.ServerException, exceptions.RequestError) as e: | |
logging.error(f"Error translating phrase '{phrase}': {e}") | |
return "[Translation Error]" | |
def search_gematria_in_db(gematria_sum): | |
"""Searches the database for phrases with a given Gematria value.""" | |
global conn | |
c = conn.cursor() | |
c.execute(''' | |
SELECT words, book, chapter, verse FROM results WHERE gematria_sum = ? | |
''', (gematria_sum,)) | |
results = c.fetchall() | |
logging.debug(f"Found {len(results)} matching phrases for Gematria: {gematria_sum}") | |
return results | |
def gematria_search_interface(phrase): | |
"""The main function for the Gradio interface.""" | |
if not phrase.strip(): | |
return "Please enter a phrase." | |
# Create database connection inside the function | |
global conn, book_names | |
conn = sqlite3.connect('gematria.db') | |
c = conn.cursor() | |
phrase_gematria = calculate_gematria(phrase.replace(" ", "")) | |
logging.info(f"Searching for phrases with Gematria: {phrase_gematria}") | |
matching_phrases = search_gematria_in_db(phrase_gematria) | |
if not matching_phrases: | |
return "No matching phrases found." | |
# Format results for display | |
results = [] | |
for words, book, chapter, verse in matching_phrases: | |
translation = get_translation(words) | |
book_name_english = book_names.get(book, 'Unknown') # Get book name | |
link = f"https://www.biblegateway.com/passage/?search={quote_plus(book_name_english)}+{chapter}%3A{verse}" | |
results.append( | |
f"Book: {book_name_english}\nChapter: {chapter}, Verse: {verse}\nPhrase: {words}\nTranslation: {translation}\n<a href='{link}' target='_blank'>[See on Bible Gateway]</a>\n\n") | |
conn.close() | |
return "\n".join(results) | |
def run_app(): | |
"""Initializes and launches the Gradio app.""" | |
initialize_database() | |
initialize_translator() | |
# Pre-populate the database | |
tanach_texts = process_json_files(1, 1) # Process all books | |
populate_database(tanach_texts, max_phrase_length=4) | |
tanach_texts = process_json_files(27, 27) # Process all books | |
populate_database(tanach_texts, max_phrase_length=4) | |
iface = gr.Interface( | |
fn=gematria_search_interface, | |
inputs=gr.Textbox(label="Enter phrase"), | |
outputs=gr.HTML(label="Results"), | |
title="Gematria Search in Tanach", | |
description="Search for phrases in the Tanach that have the same Gematria value.", | |
live=False, | |
allow_flagging="never" | |
) | |
iface.launch() | |
if __name__ == "__main__": | |
run_app() |