sql_chat_bot / aux_functions /db_functions.py
RajMl's picture
Upload 23 files
1295ae5 verified
raw
history blame
3.55 kB
import pandas as pd
import csv
import os
def read_query_from_file(proj_dir=r"C:\Users\pcraj\OneDrive\Desktop\projects\new_test"):
file_path = 'aux_data/query.txt'
file_name=os.path.join(proj_dir,file_path)
try:
with open(file_name, 'r') as file:
CreateTable = file.read()
return CreateTable
except FileNotFoundError:
print(f"File {file_path} not found.")
return None
def file_name_from_file(proj_dir=r"C:\Users\pcraj\OneDrive\Desktop\projects\new_test"):
file_path = 'aux_data/file_name.txt'
file_name=os.path.join(proj_dir,file_path)
try:
with open(file_name, 'r') as file:
return file.read().strip() # Remove leading/trailing whitespaces
except FileNotFoundError:
print(f"File {file_path} not found.")
return None
import sqlite3
def drop_sqlite_database():
"""
Deletes the SQLite database file at the specified path.
Parameters:
db_path (str): Path to the SQLite database file.
Returns:
None
"""
db_path = 'db.db'
try:
if os.path.exists(db_path):
os.remove(db_path)
print(f"Database at {db_path} has been deleted.")
else:
print(f"No database file found at {db_path}.")
except Exception as e:
print(f"An error occurred while deleting the database: {e}")
def setup_database():
import sqlite3
create_table_query=read_query_from_file()
# create_table_query_2 =create_table_querry
# print(create_table_query_2)
connection = sqlite3.connect('db.db')
cursor = connection.cursor()
cursor.execute(str(create_table_query))
connection.commit()
connection.close()
import sqlite3
import csv
def import_csv(proj_dir=r"C:\Users\pcraj\OneDrive\Desktop\projects\new_test"):
try:
connection = sqlite3.connect('db.db')
c = connection.cursor()
file = file_name_from_file(proj_dir) # Assuming this function is defined elsewhere
print("File to be imported:", file)
folder_name=os.path.join(proj_dir,"data")
with open(f"{folder_name}/{file}.csv", 'r', encoding='utf-8') as f:
reader = csv.reader(f)
print("Opened CSV file")
next(reader) # Skip the header row if present
# Dynamically determine the number of columns in the CSV
first_row = next(reader)
num_columns = len(first_row)
placeholders = ','.join(['?'] * num_columns)
# Re-insert the first row into the reader
reader = csv.reader(f)
next(reader) # Skip the header row again
for row in reader:
print("Inserting row:", row)
c.execute(f"INSERT INTO {file} VALUES ({placeholders})", row)
connection.commit()
print("Data imported successfully")
except sqlite3.Error as e:
print("SQLite error:", e)
except Exception as e:
print("Error:", e)
finally:
if connection:
connection.close()
print("Database connection closed")
if __name__ == "__main__":
print("h1")
print(file_name_from_file())
print("h2")
drop_sqlite_database()
print("h3")
setup_database()
print("h4")
import_csv()