|
import streamlit as st |
|
from utils import set_algorithm_name, get_pdf_iframe, to_csv_file |
|
from menu import display_pages_menu |
|
from country_by_country.utils.constants import JURIDICTIONS |
|
from Levenshtein import distance |
|
import sys |
|
import logging |
|
import pandas as pd |
|
import numpy as np |
|
import re |
|
|
|
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") |
|
|
|
|
|
def check_last_cell_sum(column): |
|
last_cell = column.iloc[-2] |
|
result = [""] * (len(column.tolist()) - 2) |
|
try: |
|
sum_except_last = column.iloc[ |
|
:-2 |
|
].sum() |
|
result.append( |
|
"background-color: red" |
|
if float(last_cell) != sum_except_last |
|
else "background-color: green" |
|
) |
|
result.append("") |
|
return result |
|
except Exception: |
|
result.append("background-color: red") |
|
result.append("") |
|
return result |
|
|
|
|
|
def column_sum(column): |
|
try: |
|
return column.iloc[:-1].sum() |
|
except Exception: |
|
return None |
|
|
|
|
|
def style_negative(v, props=""): |
|
try: |
|
return props if float(v) < 0 else None |
|
except Exception: |
|
return None |
|
|
|
|
|
def convert_dataframe(dataframe: pd.DataFrame) -> pd.DataFrame: |
|
for column_name in dataframe.columns: |
|
try: |
|
dataframe[column_name] = dataframe[column_name].astype(float) |
|
except Exception as e: |
|
pass |
|
return dataframe |
|
|
|
|
|
special_characters = "#&()[]@©€$'R¹³²" |
|
|
|
|
|
def style_symbol(v, props=""): |
|
try: |
|
return props if any(c in special_characters for c in v) else None |
|
except Exception: |
|
return None |
|
|
|
|
|
def style_specific_cells(dataframe: pd.DataFrame, index_list: list): |
|
|
|
color = "background-color: lightgreen" |
|
df1 = pd.DataFrame("", index=dataframe.index, columns=dataframe.columns) |
|
for index in index_list: |
|
df1.iloc[index, 0] = color |
|
return df1 |
|
|
|
|
|
def most_similar_string(input_string: str) -> str: |
|
def update_min(string, min_distance, most_similar, input_string=input_string): |
|
dist = distance(input_string, string) |
|
if dist < min_distance: |
|
return dist, string |
|
else: |
|
return min_distance, most_similar |
|
|
|
if input_string == None: |
|
return "None" |
|
min_distance = float("inf") |
|
most_similar = None |
|
for string in JURIDICTIONS.keys(): |
|
|
|
min_distance, most_similar = update_min(string, min_distance, most_similar) |
|
|
|
min_distance, most_similar = update_min( |
|
JURIDICTIONS[string]["Alpha-2 code"], min_distance, most_similar |
|
) |
|
|
|
min_distance, most_similar = update_min( |
|
JURIDICTIONS[string]["Alpha-3 code"], min_distance, most_similar |
|
) |
|
return most_similar |
|
|
|
|
|
def validate(data: pd.DataFrame) -> None: |
|
st.session_state.tables[st.session_state["algorithm_name"]] = data |
|
|
|
|
|
def update_df_csv_to_save() -> None: |
|
for idx, change in st.session_state.changes["edited_rows"].items(): |
|
for label, value in change.items(): |
|
st.session_state.tables[st.session_state["algorithm_name"]].loc[ |
|
idx, label |
|
] = value |
|
|
|
st.session_state["df_csv_to_save"] = to_csv_file( |
|
st.session_state.tables[st.session_state["algorithm_name"]], |
|
) |
|
|
|
|
|
st.set_page_config(layout="wide", page_title="Tables customization") |
|
st.title("Country by Country Tax Reporting analysis : Tables") |
|
st.subheader( |
|
"This page will allow you to clean the extracted tables", |
|
) |
|
display_pages_menu() |
|
|
|
if ( |
|
st.session_state.get("validate_selected_pages", False) |
|
and "pdf_after_page_validation" in st.session_state |
|
): |
|
|
|
col3, col4 = st.columns(2) |
|
with col3: |
|
st.markdown( |
|
get_pdf_iframe(st.session_state["pdf_after_page_validation"]), |
|
unsafe_allow_html=True, |
|
) |
|
|
|
with col4: |
|
index = ( |
|
list(st.session_state.tables.keys()).index( |
|
st.session_state["algorithm_name"], |
|
) |
|
if "algorithm_name" in st.session_state |
|
else 0 |
|
) |
|
|
|
st.session_state["algorithm_name"] = st.selectbox( |
|
"Choose the extracted table you want to see", |
|
list(st.session_state.tables.keys()), |
|
index=index, |
|
on_change=set_algorithm_name, |
|
args=("selectbox2",), |
|
key="selectbox2", |
|
) |
|
|
|
if "algorithm_name" in st.session_state: |
|
st.session_state["df_csv_to_save"] = to_csv_file( |
|
st.session_state.tables[st.session_state["algorithm_name"]] |
|
) |
|
st.download_button( |
|
label="📥 Download Current Table", |
|
data=( |
|
st.session_state["df_csv_to_save"] |
|
if "df_csv_to_save" in st.session_state |
|
else None |
|
), |
|
disabled="df_csv_to_save" not in st.session_state, |
|
file_name=( |
|
f"{st.session_state['original_pdf_name']}.csv" |
|
if "original_pdf_name" in st.session_state |
|
else "table.csv" |
|
), |
|
) |
|
|
|
st.session_state.tables[st.session_state["algorithm_name"]] = st.data_editor( |
|
st.session_state.tables[st.session_state["algorithm_name"]], |
|
num_rows="dynamic", |
|
on_change=update_df_csv_to_save, |
|
key="changes", |
|
width=800, |
|
height=900, |
|
) |
|
|
|
st.subheader( |
|
"Filters : ", |
|
) |
|
|
|
col7, col8, col9 = st.columns([1, 1, 1]) |
|
with col7: |
|
total = st.checkbox( |
|
"Calculate the Total of each columns, excluding the last row", value=True |
|
) |
|
country = st.checkbox("Activate the country filter", value=True) |
|
decimal_cleanup = st.checkbox("Apply decimal cleanup") |
|
|
|
with col8: |
|
negativ = st.checkbox( |
|
"Show the negative numbers, for each columns detected as a numerical type" |
|
) |
|
|
|
with st.container(border=True): |
|
cleanup_rules = st.checkbox( |
|
"Apply clean up rules : (number) mean a negative number, o-> 0, homogenization NA, ect ect " |
|
) |
|
if cleanup_rules: |
|
cleanup_excluded = st.multiselect( |
|
"exclude from filtering", |
|
st.session_state.tables[st.session_state["algorithm_name"]].columns, |
|
key="cleanup", |
|
) |
|
|
|
with col9: |
|
with st.container(border=True): |
|
symbol = st.checkbox( |
|
"Show the cells that contain a special symbol : " + special_characters, |
|
value=True, |
|
) |
|
remove_symbols = st.checkbox( |
|
"Remove the special symbols on numeric columns" |
|
) |
|
if remove_symbols: |
|
rm_symbol_excluded = st.multiselect( |
|
"exclude from filtering", |
|
st.session_state.tables[st.session_state["algorithm_name"]].columns, |
|
key="rm_symbol", |
|
) |
|
|
|
dataframe = st.session_state.tables[st.session_state["algorithm_name"]].copy() |
|
dataframe = convert_dataframe(dataframe) |
|
|
|
if country: |
|
dataframe.iloc[:-2, 0] = dataframe.iloc[:-2, 0].apply( |
|
lambda x: most_similar_string(x) |
|
) |
|
|
|
if remove_symbols: |
|
pattern = "[" + re.escape(special_characters) + "]" |
|
for column, dtype in dataframe.dtypes.items(): |
|
if column not in rm_symbol_excluded: |
|
dataframe[column] = dataframe[column].apply( |
|
lambda x: re.sub(pattern, "", str(x)) |
|
) |
|
dataframe = convert_dataframe(dataframe) |
|
|
|
if cleanup_rules: |
|
for column, dtype in dataframe.dtypes.items(): |
|
if column not in cleanup_excluded: |
|
|
|
dataframe[column] = dataframe[column].replace( |
|
{"^-$|^$|^ $|^N/I$|^- -$|^N/A$|^n\\.a\\.$": None}, regex=True |
|
) |
|
dataframe[column] = dataframe[column].replace( |
|
{"^o$|^O$|^\\(o\\)$|^\\(O\\)$|^\\(0\\)$": "0"}, regex=True |
|
) |
|
|
|
if dtype == object: |
|
dataframe[column] = dataframe[column].str.replace( |
|
"(\\(.*\\))[:alnum:]+", "\\1", regex=True |
|
) |
|
dataframe[column] = dataframe[column].str.replace( |
|
"\\([:alnum:]+$|\\)[:alnum:]+$", "", regex=True |
|
) |
|
dataframe[column] = dataframe[column].str.replace( |
|
"\\([:alpha:]+\\)", "", regex=True |
|
) |
|
dataframe[column] = dataframe[column].str.replace( |
|
"(.+)\\(.+\\)$", "\\1", regex=True |
|
) |
|
dataframe[column] = dataframe[column].str.replace( |
|
"^\\(-(.*)\\)", "-\\1", regex=True |
|
) |
|
dataframe[column] = dataframe[column].str.replace( |
|
"^\\((.*)\\)", "-\\1", regex=True |
|
) |
|
dataframe[column] = dataframe[column].str.replace( |
|
"\\(.*\\)| |\\*|^-$|\\[.*\\]|^-€$", "", regex=True |
|
) |
|
dataframe = convert_dataframe(dataframe) |
|
if decimal_cleanup: |
|
decimal_separator = ( |
|
st.session_state["metadata"]["separator"] |
|
if st.session_state["metadata"]["separator"] |
|
else "," |
|
) |
|
for column, dtype in dataframe.dtypes.items(): |
|
if dtype == object: |
|
if decimal_separator == ",": |
|
dataframe[column] = dataframe[column].str.replace( |
|
"\\.", "", regex=False |
|
) |
|
dataframe[column] = dataframe[column].str.replace( |
|
",", ".", regex=False |
|
) |
|
else: |
|
dataframe[column] = dataframe[column].str.replace( |
|
",(.{1,2})$", ".\\1", regex=True |
|
) |
|
dataframe[column] = dataframe[column].str.replace( |
|
"\\.([0-9]{3})", ",\\1", regex=True |
|
) |
|
dataframe[column] = dataframe[column].str.replace( |
|
",", "", regex=False |
|
) |
|
|
|
if total: |
|
dataframe = convert_dataframe(dataframe) |
|
new_row = dataframe.apply(column_sum, axis=0) |
|
new_row.iloc[0] = "Total Calculated" |
|
dataframe.loc[-1] = new_row.transpose() |
|
|
|
dataframe_styler = dataframe.style |
|
|
|
if total: |
|
dataframe_styler = dataframe_styler.apply( |
|
check_last_cell_sum, |
|
subset=pd.IndexSlice[:, dataframe.columns[1:]], |
|
axis=0, |
|
) |
|
|
|
if negativ: |
|
dataframe_styler = dataframe_styler.map( |
|
style_negative, |
|
props="color:red;", |
|
) |
|
if symbol: |
|
dataframe_styler = dataframe_styler.map( |
|
style_symbol, |
|
props="color:red;", |
|
) |
|
|
|
if country: |
|
index_list = [] |
|
for index, (val1, val2) in enumerate( |
|
zip( |
|
dataframe.iloc[:-1, 0], |
|
st.session_state.tables[st.session_state["algorithm_name"]].iloc[ |
|
:-1, 0 |
|
], |
|
) |
|
): |
|
if val1 != val2: |
|
index_list.append(index) |
|
dataframe_styler = dataframe_styler.apply( |
|
lambda x: style_specific_cells(x, index_list), axis=None |
|
) |
|
|
|
st.dataframe(dataframe_styler, use_container_width=True, height=1000) |
|
|
|
st.button( |
|
"Save the table above", |
|
on_click=validate, |
|
args=(dataframe_styler.data,), |
|
) |
|
|