|
|
|
|
|
import logging |
|
logger = logging.getLogger(__name__) |
|
logging.basicConfig(level=logging.INFO) |
|
|
|
import gradio as gr |
|
import torah |
|
import bible |
|
import quran |
|
import hindu |
|
import tripitaka |
|
from utils import number_to_ordinal_word, custom_normalize, date_to_words, translate_date_to_words |
|
from gematria import calculate_gematria, strip_diacritics |
|
|
|
import pandas as pd |
|
from deep_translator import GoogleTranslator |
|
from gradio_calendar import Calendar |
|
from datetime import datetime, timedelta |
|
import math |
|
import json |
|
import re |
|
import sqlite3 |
|
from collections import defaultdict |
|
from typing import List, Tuple |
|
import rich |
|
from fuzzywuzzy import fuzz |
|
import calendar |
|
import translation_utils |
|
import hashlib |
|
|
|
translation_utils.create_translation_table() |
|
|
|
|
|
translator = GoogleTranslator(source='auto', target='auto') |
|
LANGUAGES_SUPPORTED = translator.get_supported_languages(as_dict=True) |
|
|
|
LANGUAGE_CODE_MAP = LANGUAGES_SUPPORTED |
|
|
|
|
|
DATABASE_FILE = 'gematria.db' |
|
MAX_PHRASE_LENGTH_LIMIT = 20 |
|
|
|
ELS_CACHE_DB = "els_cache.db" |
|
DATABASE_TIMEOUT = 60 |
|
|
|
|
|
def initialize_database(): |
|
global conn |
|
conn = sqlite3.connect(DATABASE_FILE) |
|
cursor = conn.cursor() |
|
cursor.execute(''' |
|
CREATE TABLE IF NOT EXISTS results ( |
|
gematria_sum INTEGER, |
|
words TEXT, |
|
translation TEXT, |
|
book TEXT, |
|
chapter INTEGER, |
|
verse INTEGER, |
|
phrase_length INTEGER, |
|
word_position TEXT, |
|
PRIMARY KEY (gematria_sum, words, book, chapter, verse, word_position) |
|
) |
|
''') |
|
cursor.execute(''' |
|
CREATE INDEX IF NOT EXISTS idx_results_gematria |
|
ON results (gematria_sum) |
|
''') |
|
cursor.execute(''' |
|
CREATE TABLE IF NOT EXISTS processed_books ( |
|
book TEXT PRIMARY KEY, |
|
max_phrase_length INTEGER |
|
) |
|
''') |
|
conn.commit() |
|
|
|
|
|
initialize_database() |
|
|
|
|
|
def create_els_cache_table(): |
|
with sqlite3.connect(ELS_CACHE_DB) as conn: |
|
conn.execute(''' |
|
CREATE TABLE IF NOT EXISTS els_cache ( |
|
query_hash TEXT PRIMARY KEY, |
|
results TEXT |
|
) |
|
''') |
|
|
|
def get_query_hash(func, *args, **kwargs): |
|
key = (func.__name__, args, tuple(sorted(kwargs.items()))) |
|
return hashlib.sha256(json.dumps(key).encode()).hexdigest() |
|
|
|
|
|
def cached_process_json_files(func, *args, **kwargs): |
|
query_hash = get_query_hash(func, *args, **kwargs) |
|
|
|
try: |
|
with sqlite3.connect(ELS_CACHE_DB, timeout=DATABASE_TIMEOUT) as conn: |
|
cursor = conn.cursor() |
|
cursor.execute("SELECT results FROM els_cache WHERE query_hash = ?", (query_hash,)) |
|
result = cursor.fetchone() |
|
if result: |
|
logger.info(f"Cache hit for query: {query_hash}") |
|
return json.loads(result[0]) |
|
except sqlite3.Error as e: |
|
logger.error(f"Database error checking cache: {e}") |
|
|
|
logger.info(f"Cache miss for query: {query_hash}") |
|
results = func(*args, **kwargs) |
|
|
|
try: |
|
with sqlite3.connect(ELS_CACHE_DB, timeout=DATABASE_TIMEOUT) as conn: |
|
cursor = conn.cursor() |
|
cursor.execute("INSERT INTO els_cache (query_hash, results) VALUES (?, ?)", (query_hash, json.dumps(results))) |
|
conn.commit() |
|
except sqlite3.Error as e: |
|
logger.error(f"Database error caching results: {e}") |
|
|
|
return results |
|
|
|
|
|
def flatten_text(text: List) -> str: |
|
if isinstance(text, list): |
|
return " ".join(flatten_text(item) if isinstance(item, list) else item for item in text) |
|
return text |
|
|
|
def search_gematria_in_db(gematria_sum: int, max_words: int) -> List[Tuple[str, str, int, int, int, str]]: |
|
global conn |
|
with sqlite3.connect(DATABASE_FILE) as conn: |
|
cursor = conn.cursor() |
|
cursor.execute(''' |
|
SELECT words, book, chapter, verse, phrase_length, word_position |
|
FROM results |
|
WHERE gematria_sum = ? AND phrase_length <= ? |
|
''', (gematria_sum, max_words)) |
|
results = cursor.fetchall() |
|
return results |
|
|
|
def get_most_frequent_phrase(results): |
|
phrase_counts = defaultdict(int) |
|
for words, book, chapter, verse, phrase_length, word_position in results: |
|
phrase_counts[words] += 1 |
|
most_frequent_phrase = max(phrase_counts, key=phrase_counts.get) if phrase_counts else None |
|
return most_frequent_phrase |
|
|
|
|
|
def create_language_dropdown(label, default_value='English', show_label=True): |
|
return gr.Dropdown( |
|
choices=list(LANGUAGE_CODE_MAP.keys()), |
|
label=label, |
|
value=default_value, |
|
show_label=show_label |
|
) |
|
|
|
def calculate_gematria_sum(text, date_words): |
|
if text or date_words: |
|
combined_input = f"{text} {date_words}" |
|
logger.info(f"searching for input: {combined_input}") |
|
numbers = re.findall(r'\d+', combined_input) |
|
text_without_numbers = re.sub(r'\d+', '', combined_input) |
|
number_sum = sum(int(number) for number in numbers) |
|
text_gematria = calculate_gematria(strip_diacritics(text_without_numbers)) |
|
total_sum = text_gematria + number_sum |
|
return total_sum |
|
else: |
|
return None |
|
|
|
def perform_els_search(step, rounds_combination, tlang, strip_spaces, strip_in_braces, strip_diacritics_chk, include_torah, include_bible, include_quran, include_hindu, include_tripitaka): |
|
if step == 0 or rounds_combination == "0,0": |
|
return None |
|
|
|
results = {} |
|
length = 0 |
|
|
|
selected_language_long = tlang |
|
tlang = LANGUAGES_SUPPORTED.get(selected_language_long) |
|
if tlang is None: |
|
tlang = "en" |
|
logger.warning(f"Unsupported language selected: {selected_language_long}. Defaulting to English (en).") |
|
|
|
if include_torah: |
|
logger.debug(f"Arguments for Torah: {(1, 39, step, rounds_combination, length, tlang, strip_spaces, strip_in_braces, strip_diacritics_chk)}") |
|
results["Torah"] = cached_process_json_files(torah.process_json_files, 1, 39, step, rounds_combination, length, tlang, strip_spaces, strip_in_braces, strip_diacritics_chk) |
|
else: |
|
results["Torah"] = [] |
|
|
|
if include_bible: |
|
results["Bible"] = cached_process_json_files(bible.process_json_files, 40, 66, step, rounds_combination, length, tlang, strip_spaces, strip_in_braces, strip_diacritics_chk) |
|
else: |
|
results["Bible"] = [] |
|
|
|
if include_quran: |
|
results["Quran"] = cached_process_json_files(quran.process_json_files, 1, 114, step, rounds_combination, length, tlang, strip_spaces, strip_in_braces, strip_diacritics_chk) |
|
else: |
|
results["Quran"] = [] |
|
|
|
if include_hindu: |
|
results["Rig Veda"] = cached_process_json_files(hindu.process_json_files, 1, 10, step, rounds_combination, length, tlang, False, strip_in_braces, strip_diacritics_chk) |
|
else: |
|
results["Rig Veda"] = [] |
|
|
|
if include_tripitaka: |
|
results["Tripitaka"] = cached_process_json_files(tripitaka.process_json_files, 1, 52, step, rounds_combination, length, tlang, strip_spaces, strip_in_braces, strip_diacritics_chk) |
|
else: |
|
results["Tripitaka"] = [] |
|
|
|
return results |
|
|
|
|
|
def add_24h_projection(results_dict): |
|
for book_name, results in results_dict.items(): |
|
num_results = len(results) |
|
if num_results > 0: |
|
time_interval = timedelta(minutes=24 * 60 / num_results) |
|
current_time = datetime.min.time() |
|
for i in range(num_results): |
|
next_time = (datetime.combine(datetime.min, current_time) + time_interval).time() |
|
time_range_str = f"{current_time.strftime('%H:%M')}-{next_time.strftime('%H:%M')}" |
|
results[i]['24h Projection'] = time_range_str |
|
current_time = next_time |
|
return results_dict |
|
|
|
|
|
def add_monthly_projection(results_dict, selected_date): |
|
if selected_date is None: |
|
return results_dict |
|
|
|
for book_name, results in results_dict.items(): |
|
num_results = len(results) |
|
if num_results > 0: |
|
days_in_month = calendar.monthrange(selected_date.year, selected_date.month)[1] |
|
total_seconds = (days_in_month - 1) * 24 * 3600 |
|
seconds_interval = total_seconds / num_results |
|
start_datetime = datetime(selected_date.year, selected_date.month, 1) |
|
current_datetime = start_datetime |
|
|
|
|
|
for i in range(num_results): |
|
next_datetime = current_datetime + timedelta(seconds=seconds_interval) |
|
current_date = current_datetime.date() |
|
next_date = next_datetime.date() |
|
date_range_str = f"{current_date.strftime('%h %d')} - {next_date.strftime('%h %d')}" |
|
results[i]['Monthly Projection'] = date_range_str |
|
current_datetime = next_datetime |
|
current_date = next_datetime.date() |
|
return results_dict |
|
|
|
|
|
def add_yearly_projection(results_dict, selected_date): |
|
if selected_date is None: |
|
return results_dict |
|
|
|
for book_name, results in results_dict.items(): |
|
num_results = len(results) |
|
if num_results > 0: |
|
days_in_year = 366 if calendar.isleap(selected_date.year) else 365 |
|
total_seconds = (days_in_year - 1) * 24 * 3600 |
|
seconds_interval = total_seconds / num_results |
|
start_datetime = datetime(selected_date.year, 1, 1) |
|
current_datetime = start_datetime |
|
|
|
|
|
for i in range(num_results): |
|
next_datetime = current_datetime + timedelta(seconds=seconds_interval) |
|
current_date = current_datetime.date() |
|
next_date = next_datetime.date() |
|
date_range_str = f"{current_date.strftime('%b %d')} - {next_date.strftime('%b %d')}" |
|
results[i]['Yearly Projection'] = date_range_str |
|
current_datetime = next_datetime |
|
|
|
return results_dict |
|
|
|
|
|
def sort_results(results): |
|
def parse_time(time_str): |
|
try: |
|
hours, minutes = map(int, time_str.split(':')) |
|
return hours * 60 + minutes |
|
except ValueError: |
|
return 24 * 60 |
|
|
|
return sorted(results, key=lambda x: ( |
|
parse_time(x.get('24h Projection', '23:59').split('-')[0]), |
|
parse_time(x.get('24h Projection', '23:59').split('-')[1]) |
|
)) |
|
|
|
|
|
with gr.Blocks() as app: |
|
with gr.Column(): |
|
with gr.Row(): |
|
tlang = create_language_dropdown("Target Language for Result Translation", default_value='english') |
|
selected_date = Calendar(type="datetime", label="Date to investigate (optional)", info="Pick a date from the calendar") |
|
use_day = gr.Checkbox(label="Use Day", info="Check to include day in search", value=True) |
|
use_month = gr.Checkbox(label="Use Month", info="Check to include month in search", value=True) |
|
use_year = gr.Checkbox(label="Use Year", info="Check to include year in search", value=True) |
|
date_language_input = create_language_dropdown("Language of the person/topic (optional) (Date Word Language)", default_value='english') |
|
with gr.Row(): |
|
gematria_text = gr.Textbox(label="Name and/or Topic (required)", value="Hans Albert Einstein Mileva Marity-Einstein") |
|
date_words_output = gr.Textbox(label="Date in Words Translated (optional)") |
|
gematria_result = gr.Number(label="Journal Sum") |
|
|
|
|
|
|
|
with gr.Row(): |
|
step = gr.Number(label="Jump Width (Steps) for ELS") |
|
float_step = gr.Number(visible=False, value=1) |
|
half_step_btn = gr.Button("Steps / 2") |
|
double_step_btn = gr.Button("Steps * 2") |
|
|
|
with gr.Column(): |
|
round_x = gr.Number(label="Round (1)", value=1) |
|
round_y = gr.Number(label="Round (2)", value=-1) |
|
|
|
rounds_combination = gr.Textbox(label="Combined Rounds", value="1,-1") |
|
|
|
with gr.Row(): |
|
include_torah_chk = gr.Checkbox(label="Include Torah", value=True) |
|
include_bible_chk = gr.Checkbox(label="Include Bible", value=True) |
|
include_quran_chk = gr.Checkbox(label="Include Quran", value=True) |
|
include_hindu_chk = gr.Checkbox(label="Include Rigveda", value=True) |
|
include_tripitaka_chk = gr.Checkbox(label="Include Tripitaka", value=True) |
|
merge_results_chk = gr.Checkbox(label="Merge Results (Torah-Bible-Quran)", value=True) |
|
|
|
strip_spaces = gr.Checkbox(label="Strip Spaces from Books", value=True) |
|
strip_in_braces = gr.Checkbox(label="Strip Text in Braces from Books", value=True) |
|
strip_diacritics_chk = gr.Checkbox(label="Strip Diacritics from Books", value=True) |
|
|
|
translate_btn = gr.Button("Search with ELS") |
|
|
|
|
|
markdown_output = gr.Dataframe(label="ELS Results") |
|
most_frequent_phrase_output = gr.Textbox(label="Most Frequent Phrase in Network Search") |
|
json_output = gr.JSON(label="JSON Output") |
|
|
|
|
|
|
|
def update_date_words(selected_date, date_language_input, use_day, use_month, use_year): |
|
if selected_date is None: |
|
return "" |
|
|
|
if not use_year and not use_month and not use_day: |
|
return translate_date_to_words(selected_date, date_language_input) |
|
|
|
year = selected_date.year if use_year else None |
|
month = selected_date.month if use_month else None |
|
day = selected_date.day if use_day else None |
|
|
|
if year is not None and month is not None and day is not None: |
|
date_obj = selected_date |
|
elif year is not None and month is not None: |
|
date_obj = str(f"{year}-{month}") |
|
elif year is not None: |
|
date_obj = str(f"{year}") |
|
else: |
|
return "" |
|
|
|
|
|
date_in_words = date_to_words(date_obj) |
|
|
|
|
|
translator = GoogleTranslator(source='auto', target=date_language_input) |
|
translated_date_words = translator.translate(date_in_words) |
|
return custom_normalize(translated_date_words) |
|
|
|
def update_journal_sum(gematria_text, date_words_output): |
|
sum_value = calculate_gematria_sum(gematria_text, date_words_output) |
|
return sum_value, sum_value, sum_value |
|
|
|
def update_rounds_combination(round_x, round_y): |
|
return f"{int(round_x)},{int(round_y)}" |
|
|
|
def update_step_half(float_step): |
|
new_step = math.ceil(float_step / 2) |
|
return new_step, float_step / 2 |
|
|
|
def update_step_double(float_step): |
|
new_step = math.ceil(float_step * 2) |
|
return new_step, float_step * 2 |
|
|
|
|
|
def find_closest_phrase(target_phrase, phrases): |
|
best_match = None |
|
best_score = 0 |
|
|
|
logging.debug(f"Target phrase for similarity search: {target_phrase}") |
|
|
|
for phrase, _, _, _, _, _ in phrases: |
|
word_length_diff = abs(len(target_phrase.split()) - len(phrase.split())) |
|
similarity_score = fuzz.ratio(target_phrase, phrase) |
|
combined_score = similarity_score - word_length_diff |
|
|
|
logging.debug(f"Comparing with phrase: {phrase}") |
|
logging.debug( |
|
f"Word Length Difference: {word_length_diff}, Similarity Score: {similarity_score}, Combined Score: {combined_score}") |
|
|
|
if combined_score > best_score: |
|
best_score = combined_score |
|
best_match = phrase |
|
|
|
logging.debug(f"Closest phrase found: {best_match} with score: {best_score}") |
|
return best_match |
|
|
|
def perform_search(step, rounds_combination, tlang, strip_spaces, strip_in_braces, strip_diacritics_chk, include_torah, include_bible, include_quran, include_hindu, include_tripitaka, gematria_text, date_words_output, selected_date): |
|
|
|
els_results = perform_els_search(step, rounds_combination, tlang, strip_spaces, strip_in_braces, |
|
strip_diacritics_chk, include_torah, include_bible, include_quran, |
|
include_hindu, |
|
include_tripitaka) |
|
|
|
|
|
most_frequent_phrases = {} |
|
combined_and_sorted_results = [] |
|
|
|
for book_name, book_results in els_results.items(): |
|
if book_results: |
|
most_frequent_phrases[book_name] = "" |
|
|
|
for result in book_results: |
|
try: |
|
gematria_sum = calculate_gematria(result['result_text']) |
|
max_words = len(result['result_text'].split()) |
|
matching_phrases = search_gematria_in_db(gematria_sum, max_words) |
|
max_words_limit = 20 |
|
while not matching_phrases and max_words < max_words_limit: |
|
max_words += 1 |
|
matching_phrases = search_gematria_in_db(gematria_sum, max_words) |
|
|
|
if matching_phrases: |
|
most_frequent_phrase = get_most_frequent_phrase(matching_phrases) |
|
most_frequent_phrases[book_name] = most_frequent_phrase |
|
else: |
|
closest_phrase = find_closest_phrase(result['result_text'], |
|
search_gematria_in_db(gematria_sum, max_words_limit)) |
|
most_frequent_phrases[ |
|
book_name] = closest_phrase or "" |
|
|
|
result['Most Frequent Phrase'] = most_frequent_phrases[book_name] |
|
if 'book' in result: |
|
if isinstance(result['book'], int): |
|
result['book'] = f"{book_name} {result['book']}." |
|
combined_and_sorted_results.append(result) |
|
|
|
except KeyError as e: |
|
print(f"DEBUG: KeyError - Key '{e.args[0]}' not found in result. Skipping this result.") |
|
continue |
|
|
|
|
|
selected_language_long = tlang |
|
tlang_short = LANGUAGES_SUPPORTED.get(selected_language_long) |
|
if tlang_short is None: |
|
tlang_short = "en" |
|
logger.warning(f"Unsupported language selected: {selected_language_long}. Defaulting to English (en).") |
|
|
|
|
|
phrases_to_translate = [] |
|
phrases_source_langs = [] |
|
results_to_translate = [] |
|
results_source_langs = [] |
|
for result in combined_and_sorted_results: |
|
phrases_to_translate.append(result.get('Most Frequent Phrase', '')) |
|
phrases_source_langs.append(result.get("source_language", "auto")) |
|
results_to_translate.append(result.get('result_text', '')) |
|
results_source_langs.append(result.get("source_language", "auto")) |
|
|
|
|
|
translated_phrases = translation_utils.batch_translate(phrases_to_translate, tlang_short, phrases_source_langs) |
|
translated_result_texts = translation_utils.batch_translate(results_to_translate, tlang_short, results_source_langs) |
|
|
|
|
|
for i, result in enumerate(combined_and_sorted_results): |
|
result['translated_text'] = translated_result_texts.get(results_to_translate[i], None) |
|
result['Translated Most Frequent Phrase'] = translated_phrases.get(phrases_to_translate[i], None) |
|
|
|
|
|
updated_els_results = add_24h_projection(els_results) |
|
updated_els_results = add_monthly_projection(updated_els_results, selected_date) |
|
updated_els_results = add_yearly_projection(updated_els_results, selected_date) |
|
|
|
combined_and_sorted_results = [] |
|
for book_results in updated_els_results.values(): |
|
combined_and_sorted_results.extend(book_results) |
|
combined_and_sorted_results = sort_results(combined_and_sorted_results) |
|
|
|
df = pd.DataFrame(combined_and_sorted_results) |
|
df.index = range(1, len(df) + 1) |
|
df.reset_index(inplace=True) |
|
df.rename(columns={'index': 'Result Number'}, inplace=True) |
|
|
|
for i, result in enumerate(combined_and_sorted_results): |
|
result['Result Number'] = i + 1 |
|
|
|
search_config = { |
|
"step": step, |
|
"rounds_combination": rounds_combination, |
|
"target_language": tlang, |
|
"strip_spaces": strip_spaces, |
|
"strip_in_braces": strip_in_braces, |
|
"strip_diacritics": strip_diacritics_chk, |
|
"include_torah": include_torah, |
|
"include_bible": include_bible, |
|
"include_quran": include_quran, |
|
"include_hindu": include_hindu, |
|
"include_tripitaka": include_tripitaka, |
|
"gematria_text": gematria_text, |
|
"date_words": date_words_output |
|
} |
|
|
|
output_data = { |
|
"search_configuration": search_config, |
|
"results": combined_and_sorted_results |
|
} |
|
|
|
json_data = output_data |
|
|
|
|
|
combined_most_frequent = "\n".join( |
|
f"{book}: {phrase}" for book, phrase in most_frequent_phrases.items()) |
|
return df, combined_most_frequent, json_data |
|
|
|
|
|
|
|
|
|
round_x.change(update_rounds_combination, inputs=[round_x, round_y], outputs=rounds_combination) |
|
round_y.change(update_rounds_combination, inputs=[round_x, round_y], outputs=rounds_combination) |
|
|
|
selected_date.change(update_date_words, inputs=[selected_date, date_language_input, use_day, use_month, use_year], outputs=[date_words_output]) |
|
date_language_input.change(update_date_words, inputs=[selected_date, date_language_input, use_day, use_month, use_year], outputs=[date_words_output]) |
|
|
|
gematria_text.change(update_journal_sum, inputs=[gematria_text, date_words_output], outputs=[gematria_result, step, float_step]) |
|
date_words_output.change(update_journal_sum, inputs=[gematria_text, date_words_output], outputs=[gematria_result, step, float_step]) |
|
|
|
half_step_btn.click(update_step_half, inputs=[float_step], outputs=[step, float_step]) |
|
double_step_btn.click(update_step_double, inputs=[float_step], outputs=[step, float_step]) |
|
|
|
translate_btn.click( |
|
perform_search, |
|
inputs=[step, rounds_combination, tlang, strip_spaces, strip_in_braces, strip_diacritics_chk, include_torah_chk, include_bible_chk, include_quran_chk, include_hindu_chk, include_tripitaka_chk, gematria_text, date_words_output, selected_date], |
|
outputs=[markdown_output, most_frequent_phrase_output, json_output] |
|
) |
|
|
|
app.load( |
|
update_date_words, |
|
inputs=[selected_date, date_language_input, use_day, use_month, use_year], |
|
outputs=[date_words_output] |
|
) |
|
|
|
use_day.change( |
|
update_date_words, |
|
inputs=[selected_date, date_language_input, use_day, use_month, use_year], |
|
outputs=[date_words_output] |
|
) |
|
use_month.change( |
|
update_date_words, |
|
inputs=[selected_date, date_language_input, use_day, use_month, use_year], |
|
outputs=[date_words_output] |
|
) |
|
use_year.change( |
|
update_date_words, |
|
inputs=[selected_date, date_language_input, use_day, use_month, use_year], |
|
outputs=[date_words_output] |
|
) |
|
|
|
def checkbox_behavior(use_day_value, use_month_value): |
|
if use_day_value: |
|
return True, True |
|
|
|
return use_month_value, True |
|
|
|
use_day.change(checkbox_behavior, inputs=[use_day, use_month], outputs=[use_month, use_year]) |
|
use_month.change(checkbox_behavior, inputs=[use_day, use_month], outputs=[use_month, use_year]) |
|
|
|
|
|
if __name__ == "__main__": |
|
app.launch(share=False) |
|
|