ICIJ_SQL_agent / icij_utils.py
Didier Guillevic
Attempting to give more information on the table columns
8507fc0
"""icij_utils.py
Building an SQL agent over the ICIJ financial data leaks files.
:author: Didier Guillevic
:date: 2025-01-12
"""
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
import pandas as pd
import sqlite3
import os
from pathlib import Path
from sqlalchemy import create_engine, MetaData, Table, Column, String, Integer, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
class ICIJDataLoader:
def __init__(self, db_path='icij_data.db'):
"""Initialize the data loader with database path."""
self.db_path = db_path
self.table_mappings = {
'nodes-addresses.csv': 'addresses',
'nodes-entities.csv': 'entities',
'nodes-intermediaries.csv': 'intermediaries',
'nodes-officers.csv': 'officers',
'nodes-others.csv': 'others',
'relationships.csv': 'relationships'
}
def create_connection(self):
"""Create a database connection."""
try:
conn = sqlite3.connect(self.db_path)
return conn
except sqlite3.Error as e:
print(f"Error connecting to database: {e}")
return None
def create_table_from_csv(self, csv_path, table_name, conn):
"""Create a table based on CSV structure and load data."""
try:
# Read the first few rows to get column names and types
df = pd.read_csv(csv_path, nrows=5)
# Create table with appropriate columns
columns = []
for col in df.columns:
# Determine SQLite type based on pandas dtype
dtype = df[col].dtype
if 'int' in str(dtype):
sql_type = 'INTEGER'
elif 'float' in str(dtype):
sql_type = 'REAL'
else:
sql_type = 'TEXT'
columns.append(f'"{col}" {sql_type}')
# Create table
create_table_sql = f'''CREATE TABLE IF NOT EXISTS {table_name}
({', '.join(columns)})'''
conn.execute(create_table_sql)
# Load data in chunks to handle large files
chunksize = 10000
for chunk in pd.read_csv(csv_path, chunksize=chunksize):
chunk.to_sql(table_name, conn, if_exists='append', index=False)
print(f"Successfully loaded {table_name}")
return True
except Exception as e:
print(f"Error processing {csv_path}: {e}")
return False
def load_all_files(self, data_directory):
"""Load all recognized CSV files from the directory into SQLite."""
conn = self.create_connection()
if not conn:
return False
try:
data_path = Path(data_directory)
files_processed = 0
for csv_file, table_name in self.table_mappings.items():
file_path = data_path / csv_file
if file_path.exists():
print(f"Processing {csv_file}...")
if self.create_table_from_csv(file_path, table_name, conn):
files_processed += 1
# Create indexes for better query performance
self.create_indexes(conn)
conn.commit()
print(f"Successfully processed {files_processed} files")
return True
except Exception as e:
print(f"Error during data loading: {e}")
return False
finally:
conn.close()
def create_indexes(self, conn):
"""Create indexes for better query performance."""
index_definitions = [
'CREATE INDEX IF NOT EXISTS idx_entities_name ON entities(name)',
'CREATE INDEX IF NOT EXISTS idx_officers_name ON officers(name)',
'CREATE INDEX IF NOT EXISTS idx_relationships_from ON relationships(node_id_start)',
'CREATE INDEX IF NOT EXISTS idx_relationships_to ON relationships(node_id_end)'
]
for index_sql in index_definitions:
try:
conn.execute(index_sql)
except sqlite3.Error as e:
print(f"Error creating index: {e}")
class ICIJDatabaseConnector:
def __init__(self, db_path='icij_leaks.db'):
# Create the SQLAlchemy engine
self.engine = create_engine(f'sqlite:///{db_path}', echo=False)
# Create declarative base
self.Base = declarative_base()
# Create session factory
self.Session = sessionmaker(bind=self.engine)
# Initialize metadata
self.metadata = MetaData()
# Reflect existing tables
self.metadata.reflect(bind=self.engine)
def get_engine(self):
"""Return the SQLAlchemy engine."""
return self.engine
def get_session(self):
"""Create and return a new session."""
return self.Session()
def get_table(self, table_name):
"""Get a table by name from the metadata."""
return self.metadata.tables.get(table_name)
def list_tables(self):
"""List all available tables in the database."""
return list(self.metadata.tables.keys())
def get_table_schema(self, table_name):
"""Get column names and their types for a specific table."""
table = self.get_table(table_name)
if table is not None:
return {column.name: str(column.type) for column in table.columns}
return {}
def get_full_database_schema(self):
"""Get the schema for all tables in the database."""
schema = {}
for table_name in self.list_tables():
schema[table_name] = self.get_table_schema(table_name)
return schema
def get_table_columns(self, table_name):
"""Get column names for a specific table."""
table = self.get_table(table_name)
if table is not None:
return [column.name for column in table.columns]
return []
def query_table(self, table_name, limit=1):
"""Execute a simple query on a table."""
table = self.get_table(table_name)
if table is not None:
stmt = select(table).limit(limit)
with self.engine.connect() as connection:
result = connection.execute(stmt)
return [dict(row) for row in result]
return []
class ICIJDatabaseMetadata:
"""Holds detailed documentation about the ICIJ database structure."""
# Comprehensive table documentation
TABLE_DOCS = {
'entities': (
"Contains information about companies, trusts, and other entities mentioned in the leaks. "
"These are typically offshore entities created in tax havens."
),
'officers': (
"Contains information about people or organizations connected to offshore entities. "
"Officers can be directors, shareholders, beneficiaries, or have other roles."
),
'intermediaries': (
"Contains information about professional firms that help create and manage offshore entities. "
"These are typically law firms, banks, or corporate service providers."
),
'addresses': (
"Contains physical address information connected to entities, officers, or intermediaries. "
"Addresses can be shared between multiple parties."
),
'others': (
"Contains information about miscellaneous parties that don't fit into other categories. "
"This includes vessel names, legal cases, events, and other related parties mentioned "
"in the leaks that aren't classified as entities, officers, or intermediaries."
),
'relationships': (
"Defines connections between different nodes (entities, officers, intermediaries) in the database. "
"Shows how different parties are connected to each other."
)
}
# Detailed column documentation for each table
COLUMN_DOCS = {
'entities': {
'name': "Legal name of the offshore entity",
'original_name': "Name in original language/character set",
'former_name': "Previous names of the entity",
#'jurisdiction': "Country/region where the entity is registered",
'jurisdiction_description': "Detailed description of the jurisdiction",
'company_type': "Legal structure of the entity (e.g., corporation, trust)",
'address': "Primary registered address",
'internal_id': "Unique identifier within the leak data",
'incorporation_date': "Date when the entity was created",
'inactivation_date': "Date when the entity became inactive",
'struck_off_date': "Date when entity was struck from register",
'dorm_date': "Date when entity became dormant",
'status': "Current status of the entity",
'service_provider': "Firm that provided offshore services",
'country_codes': '3 letter abbreviations of country names',
'countries': 'name of country',
'sourceID': "Identifier for the leak source"
},
'others': {
'name': "Name of the miscellaneous party or item",
'type': "Type of the other party (e.g., vessel, legal case)",
'incorporation_date': "Date of incorporation or creation if applicable",
'jurisdiction': "2 letter code of the Jurisdiction associated with the party",
'jurisdiction-description': 'full name of the jurisdiction',
'countries': "Countries associated with the party",
'status': "Current status",
'internal_id': "Unique identifier within the leak data",
'address': "Associated address if available",
'sourceID': "Identifier for the leak source",
'valid_until': "Date until which the information is valid"
},
'officers': {
'name': "Name of the individual or organization",
'countries': 'full name of the country connected to the officer',
'country_codes': "3 letter code of the countries connected to the officer",
'sourceID': "Identifier for the leak source",
'valid_until': "Date until which the information is valid",
},
'intermediaries': {
'name': "Name of the professional firm",
'internal_id': "Unique identifier within the leak data",
'address': "Business address",
'status': "Current status",
'countries': "Countries where intermediary operates",
'country_codes': "3 letter abbreviations of the countries where intermediary operates",
'sourceID': "Identifier for the leak source"
},
'addresses': {
'address': "Full address text",
'name': "Name associated with address",
'country_codes': "3 letter country codes for the address",
'countries': "Full country names",
'sourceID': "Identifier for the leak source",
'valid_until': "Date until which address is valid",
},
'relationships': {
'node_id_start': "Internal ID of the source node",
'node_id_end': "Internal ID of the target node",
'rel_type': "Type of relationship (e.g., shareholder, director)",
'link': "Additional details about the relationship",
'start_date': "When the relationship began",
'end_date': "When the relationship ended",
'sourceID': "Identifier for the leak source",
'status': "Current status of the relationship"
}
}
# Source documentation
SOURCE_IDS = {
"PANAMA_PAPERS": "Data from Panama Papers leak (2016)",
"PARADISE_PAPERS": "Data from Paradise Papers leak (2017)",
"BAHAMAS_LEAKS": "Data from Bahamas Leaks (2016)",
"OFFSHORE_LEAKS": "Data from Offshore Leaks (2013)",
"PANDORA_PAPERS": "Data from Pandora Papers leak (2021)"
}