sparrow-data-itn / routers /data_utils.py
ITNovaML's picture
Duplicate from katanaml-org/sparrow-data
f25b29f
import secrets
import string
from bson import ObjectId
from pydantic import BaseModel, Field, ValidationError
from typing import List
import datetime
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from base64 import b64encode, b64decode
import base64
from pymongo.errors import DuplicateKeyError
from pymongo.errors import PyMongoError
import json
from config import settings
# Define a key. Note: it must be of length 16, 24, or 32.
secure_key = settings.secure_key
def encrypt(plain_text: str, key: bytes) -> str:
cipher = AES.new(key, AES.MODE_CBC)
iv = cipher.iv
encrypted_text = cipher.encrypt(pad(plain_text.encode(), AES.block_size))
return b64encode(iv + encrypted_text).decode()
def decrypt(encrypted_text: str, key: bytes) -> str:
decrypted_text = b64decode(encrypted_text)
iv = decrypted_text[:16]
cipher = AES.new(key, AES.MODE_CBC, iv=iv)
decrypted_text = unpad(cipher.decrypt(decrypted_text[16:]), AES.block_size)
return decrypted_text.decode()
class PyObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid objectid")
return ObjectId(v)
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")
class ReceiptModel(BaseModel):
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
receipt_key: str = Field(..., description="The unique key for the receipt.")
content: List[List[str]] = Field(..., description="An array of single-element arrays, each containing receipt entry.")
class Config:
allow_population_by_field_name = True
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}
schema_extra = {
'example': {
'receipt_key': 'RzSZ0BTnuG',
'content': [['YOUR GUEST NUMBER IS'], ['43'], ['IN-N-OUT BURGER LINQ']]
},
'title': 'ReceiptModel',
'description': 'A model representing a receipt with a key and its contents.',
}
class ReceiptDBModel(BaseModel):
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
user: str = Field(..., description="The user who uploaded the receipt.")
receipt_key: str = Field(..., description="The unique key for the receipt.")
content: str = Field(..., description="A string representing DB receipt data.")
class Config:
allow_population_by_field_name = True
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}
schema_extra = {
'example': {
'user': 'user1',
'receipt_key': 'RzSZ0BTnuG',
'content': '{"store": "CVS Pharmacy", "location": "3300 S LAS VEGAS BLVD, LAS VEGAS, NV, 89109"}'
},
'title': 'ReceiptProcessedModel',
'description': 'A model representing a receipt DB contents.',
}
def merge_data(values):
data = []
for idx in range(len(values)):
data.append([values[idx][1][0]])
# print(data[idx])
return data
async def store_data(data, db):
print("Storing data...")
key = generate_key()
try:
receipt = ReceiptModel(receipt_key=key, content=data)
except ValidationError as e:
print(f"An error occurred: {e}")
else:
# Convert the Pydantic model instance into a dictionary
receipt_dict = receipt.dict()
receipt_dict["content"] = encrypt(str(receipt_dict["content"]), base64.b64decode(secure_key))
receipt_dict["created_at"] = datetime.datetime.utcnow()
# Insert the dictionary into MongoDB
try:
result = await db["uploads"].insert_one(receipt_dict)
except DuplicateKeyError:
raise
print(f"Inserted document with id: {result.inserted_id}")
return key
return None
async def get_receipt_data(key, db):
print(f"Getting receipt data for key: {key}")
receipt = await db["uploads"].find_one({"receipt_key": key})
if receipt is not None:
await db["uploads"].delete_one({"receipt_key": key})
receipt['content'] = decrypt(receipt['content'], base64.b64decode(secure_key))
return receipt['content']
return None
async def store_receipt_db_data(chatgpt_user, receipt_id, receipt_content, db):
print("Storing receipt data...")
try:
receipt = ReceiptDBModel(user=chatgpt_user, receipt_key=receipt_id, content=receipt_content)
except ValidationError as e:
print(f"An error occurred: {e}")
else:
# Convert the Pydantic model instance into a dictionary
receipt_dict = receipt.dict()
receipt_dict["content"] = encrypt(str(receipt_dict["content"]), base64.b64decode(secure_key))
# Insert the dictionary into MongoDB
try:
query = {"user": chatgpt_user, "receipt_key": receipt_id}
new_data = {"$set": {"content": receipt_dict["content"]}}
result = await db["receipts"].update_one(query, new_data, upsert=True)
except PyMongoError:
raise
print(f"Inserted document with id: {result}")
return result
return None
async def get_receipt_db_data(chatgpt_user, receipt_id, db):
print(f"Getting receipt data for key: {receipt_id}")
receipt = await db["receipts"].find_one({"user": chatgpt_user, "receipt_key": receipt_id})
if receipt is not None:
receipt['content'] = decrypt(receipt['content'], base64.b64decode(secure_key))
return receipt['content']
return None
async def get_user_receipt_db_ids(chatgpt_user, db):
print(f"Getting user receipts ids for user: {chatgpt_user}")
receipts_processed = await db["receipts"].find({"user": chatgpt_user}).to_list(length=100)
receipts = []
if receipts_processed is not None:
for receipt in receipts_processed:
receipts.append(receipt['receipt_key'])
return receipts
async def delete_receipt_db_data(chatgpt_user, receipt_id, db):
print(f"Deleting receipt data for key: {receipt_id}")
result = await db["receipts"].delete_one({"user": chatgpt_user, "receipt_key": receipt_id})
if result.deleted_count == 0:
print(f"Receipt with id: {receipt_id} not found")
else:
print(f"Deleted document with id: {result}")
return result
async def get_user_receipt_content_db(chatgpt_user, db):
print(f"Getting user receipts fields for user: {chatgpt_user}")
receipts_processed = await db["receipts"].find({"user": chatgpt_user}).to_list(length=100)
receipts = []
if receipts_processed is not None:
for receipt in receipts_processed:
receipt['content'] = decrypt(receipt['content'], base64.b64decode(secure_key))
receipts.append(json.loads(receipt['content']))
return receipts
def generate_key(length=10):
alphabet = string.ascii_letters + string.digits
key = ''.join(secrets.choice(alphabet) for i in range(length))
return key