Spaces:
Sleeping
Sleeping
from pymongo import MongoClient | |
from pydantic import BaseModel, HttpUrl | |
from datetime import datetime | |
from typing import Optional, List, Dict, Union | |
class MongoService: | |
def __init__(self, mongo_url:str, database:str): | |
self.mongo_url = mongo_url | |
self.mongo_client = MongoClient(self.mongo_url) | |
self.mongo_database = self.mongo_client[database] | |
def insert(self, collection:str, data:Dict): | |
inserted = self.mongo_database[collection].insert_one(data) | |
return | |
def get(self, collection:str, filter:Dict, fields_to_retrieve:List=[]): | |
fields = {} | |
if fields_to_retrieve != []: | |
for field in fields_to_retrieve: | |
fields[field] = 1 | |
retrieved_data = list(self.mongo_database[collection].find(filter,fields)) | |
return retrieved_data | |
def update(self, collection:str, filter:Dict, update_value:Dict, many=False): | |
#myquery = { "address": { "$regex": "^S" } } | |
#newvalues = { "$set": { "name": "Minnie" } } | |
if many == True: | |
updated = self.mongo_database[collection].update_many(filter, update_value) | |
else: | |
updated = self.mongo_database[collection].update_one(filter, update_value) | |
return |