Spaces:
Sleeping
Sleeping
"""icij_utils.py | |
Building an SQL agent over the ICIJ financial data leaks files. | |
:author: Didier Guillevic | |
:date: 2025-01-12 | |
""" | |
import logging | |
logger = logging.getLogger(__name__) | |
logging.basicConfig(level=logging.INFO) | |
import pandas as pd | |
import sqlite3 | |
import os | |
from pathlib import Path | |
from sqlalchemy import create_engine, MetaData, Table, Column, String, Integer, Float | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.orm import sessionmaker | |
class ICIJDataLoader: | |
def __init__(self, db_path='icij_data.db'): | |
"""Initialize the data loader with database path.""" | |
self.db_path = db_path | |
self.table_mappings = { | |
'nodes-addresses.csv': 'addresses', | |
'nodes-entities.csv': 'entities', | |
'nodes-intermediaries.csv': 'intermediaries', | |
'nodes-officers.csv': 'officers', | |
'nodes-others.csv': 'others', | |
'relationships.csv': 'relationships' | |
} | |
def create_connection(self): | |
"""Create a database connection.""" | |
try: | |
conn = sqlite3.connect(self.db_path) | |
return conn | |
except sqlite3.Error as e: | |
print(f"Error connecting to database: {e}") | |
return None | |
def create_table_from_csv(self, csv_path, table_name, conn): | |
"""Create a table based on CSV structure and load data.""" | |
try: | |
# Read the first few rows to get column names and types | |
df = pd.read_csv(csv_path, nrows=5) | |
# Create table with appropriate columns | |
columns = [] | |
for col in df.columns: | |
# Determine SQLite type based on pandas dtype | |
dtype = df[col].dtype | |
if 'int' in str(dtype): | |
sql_type = 'INTEGER' | |
elif 'float' in str(dtype): | |
sql_type = 'REAL' | |
else: | |
sql_type = 'TEXT' | |
columns.append(f'"{col}" {sql_type}') | |
# Create table | |
create_table_sql = f'''CREATE TABLE IF NOT EXISTS {table_name} | |
({', '.join(columns)})''' | |
conn.execute(create_table_sql) | |
# Load data in chunks to handle large files | |
chunksize = 10000 | |
for chunk in pd.read_csv(csv_path, chunksize=chunksize): | |
chunk.to_sql(table_name, conn, if_exists='append', index=False) | |
print(f"Successfully loaded {table_name}") | |
return True | |
except Exception as e: | |
print(f"Error processing {csv_path}: {e}") | |
return False | |
def load_all_files(self, data_directory): | |
"""Load all recognized CSV files from the directory into SQLite.""" | |
conn = self.create_connection() | |
if not conn: | |
return False | |
try: | |
data_path = Path(data_directory) | |
files_processed = 0 | |
for csv_file, table_name in self.table_mappings.items(): | |
file_path = data_path / csv_file | |
if file_path.exists(): | |
print(f"Processing {csv_file}...") | |
if self.create_table_from_csv(file_path, table_name, conn): | |
files_processed += 1 | |
# Create indexes for better query performance | |
self.create_indexes(conn) | |
conn.commit() | |
print(f"Successfully processed {files_processed} files") | |
return True | |
except Exception as e: | |
print(f"Error during data loading: {e}") | |
return False | |
finally: | |
conn.close() | |
def create_indexes(self, conn): | |
"""Create indexes for better query performance.""" | |
index_definitions = [ | |
'CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name)', | |
'CREATE INDEX IF NOT EXISTS idx_officers_name ON officers(name)', | |
'CREATE INDEX IF NOT EXISTS idx_relationships_from ON relationships(node_id_start)', | |
'CREATE INDEX IF NOT EXISTS idx_relationships_to ON relationships(node_id_end)' | |
] | |
for index_sql in index_definitions: | |
try: | |
conn.execute(index_sql) | |
except sqlite3.Error as e: | |
print(f"Error creating index: {e}") | |
class ICIJDatabaseConnector: | |
def __init__(self, db_path='icij_leaks.db'): | |
# Create the SQLAlchemy engine | |
self.engine = create_engine(f'sqlite:///{db_path}', echo=False) | |
# Create declarative base | |
self.Base = declarative_base() | |
# Create session factory | |
self.Session = sessionmaker(bind=self.engine) | |
# Initialize metadata | |
self.metadata = MetaData() | |
# Reflect existing tables | |
self.metadata.reflect(bind=self.engine) | |
def get_engine(self): | |
"""Return the SQLAlchemy engine.""" | |
return self.engine | |
def get_session(self): | |
"""Create and return a new session.""" | |
return self.Session() | |
def get_table(self, table_name): | |
"""Get a table by name from the metadata.""" | |
return self.metadata.tables.get(table_name) | |
def list_tables(self): | |
"""List all available tables in the database.""" | |
return list(self.metadata.tables.keys()) | |
def get_table_schema(self, table_name): | |
"""Get column names and their types for a specific table.""" | |
table = self.get_table(table_name) | |
if table is not None: | |
return {column.name: str(column.type) for column in table.columns} | |
return {} | |
def get_full_database_schema(self): | |
"""Get the schema for all tables in the database.""" | |
schema = {} | |
for table_name in self.list_tables(): | |
schema[table_name] = self.get_table_schema(table_name) | |
return schema | |
def get_table_columns(self, table_name): | |
"""Get column names for a specific table.""" | |
table = self.get_table(table_name) | |
if table is not None: | |
return [column.name for column in table.columns] | |
return [] | |
def query_table(self, table_name, limit=1): | |
"""Execute a simple query on a table.""" | |
table = self.get_table(table_name) | |
if table is not None: | |
stmt = select(table).limit(limit) | |
with self.engine.connect() as connection: | |
result = connection.execute(stmt) | |
return [dict(row) for row in result] | |
return [] | |
class ICIJDatabaseMetadata: | |
"""Holds detailed documentation about the ICIJ database structure.""" | |
# Comprehensive table documentation | |
TABLE_DOCS = { | |
'entities': ( | |
"Contains information about companies, trusts, and other entities mentioned in the leaks. " | |
"These are typically offshore entities created in tax havens." | |
), | |
'officers': ( | |
"Contains information about people or organizations connected to offshore entities. " | |
"Officers can be directors, shareholders, beneficiaries, or have other roles." | |
), | |
'intermediaries': ( | |
"Contains information about professional firms that help create and manage offshore entities. " | |
"These are typically law firms, banks, or corporate service providers." | |
), | |
'addresses': ( | |
"Contains physical address information connected to entities, officers, or intermediaries. " | |
"Addresses can be shared between multiple parties." | |
), | |
'others': ( | |
"Contains information about miscellaneous parties that don't fit into other categories. " | |
"This includes vessel names, legal cases, events, and other related parties mentioned " | |
"in the leaks that aren't classified as entities, officers, or intermediaries." | |
), | |
'relationships': ( | |
"Defines connections between different nodes (entities, officers, intermediaries) in the database. " | |
"Shows how different parties are connected to each other." | |
) | |
} | |
# Detailed column documentation for each table | |
COLUMN_DOCS = { | |
'entities': { | |
'name': "Legal name of the offshore entity", | |
'original_name': "Name in original language/character set", | |
'former_name': "Previous names of the entity", | |
#'jurisdiction': "Country/region where the entity is registered", | |
'jurisdiction_description': "Detailed description of the jurisdiction", | |
'company_type': "Legal structure of the entity (e.g., corporation, trust)", | |
'address': "Primary registered address", | |
'internal_id': "Unique identifier within the leak data", | |
'incorporation_date': "Date when the entity was created", | |
'inactivation_date': "Date when the entity became inactive", | |
'struck_off_date': "Date when entity was struck from register", | |
'dorm_date': "Date when entity became dormant", | |
'status': "Current status of the entity", | |
'service_provider': "Firm that provided offshore services", | |
'country_codes': '3 letter abbreviations of country names', | |
'countries': 'name of country', | |
'sourceID': "Identifier for the leak source" | |
}, | |
'others': { | |
'name': "Name of the miscellaneous party or item", | |
'type': "Type of the other party (e.g., vessel, legal case)", | |
'incorporation_date': "Date of incorporation or creation if applicable", | |
'jurisdiction': "2 letter code of the Jurisdiction associated with the party", | |
'jurisdiction-description': 'full name of the jurisdiction', | |
'countries': "Countries associated with the party", | |
'status': "Current status", | |
'internal_id': "Unique identifier within the leak data", | |
'address': "Associated address if available", | |
'sourceID': "Identifier for the leak source", | |
'valid_until': "Date until which the information is valid" | |
}, | |
'officers': { | |
'name': "Name of the individual or organization", | |
'countries': 'full name of the country connected to the officer', | |
'country_codes': "3 letter code of the countries connected to the officer", | |
'sourceID': "Identifier for the leak source", | |
'valid_until': "Date until which the information is valid", | |
}, | |
'intermediaries': { | |
'name': "Name of the professional firm", | |
'internal_id': "Unique identifier within the leak data", | |
'address': "Business address", | |
'status': "Current status", | |
'countries': "Countries where intermediary operates", | |
'country_codes': "3 letter abbreviations of the countries where intermediary operates", | |
'sourceID': "Identifier for the leak source" | |
}, | |
'addresses': { | |
'address': "Full address text", | |
'name': "Name associated with address", | |
'country_codes': "3 letter country codes for the address", | |
'countries': "Full country names", | |
'sourceID': "Identifier for the leak source", | |
'valid_until': "Date until which address is valid", | |
}, | |
'relationships': { | |
'node_id_start': "Internal ID of the source node", | |
'node_id_end': "Internal ID of the target node", | |
'rel_type': "Type of relationship (e.g., shareholder, director)", | |
'link': "Additional details about the relationship", | |
'start_date': "When the relationship began", | |
'end_date': "When the relationship ended", | |
'sourceID': "Identifier for the leak source", | |
'status': "Current status of the relationship" | |
} | |
} | |
# Source documentation | |
SOURCE_IDS = { | |
"PANAMA_PAPERS": "Data from Panama Papers leak (2016)", | |
"PARADISE_PAPERS": "Data from Paradise Papers leak (2017)", | |
"BAHAMAS_LEAKS": "Data from Bahamas Leaks (2016)", | |
"OFFSHORE_LEAKS": "Data from Offshore Leaks (2013)", | |
"PANDORA_PAPERS": "Data from Pandora Papers leak (2021)" | |
} | |