|
import pandas as pd |
|
import os |
|
import json |
|
|
|
import logging |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class DataExistsError(Exception): |
|
pass |
|
|
|
|
|
class DataNotFoundError(Exception): |
|
pass |
|
|
|
|
|
|
|
class BaseManager: |
|
def __init__(self, csv_file): |
|
self.csv_file = csv_file |
|
self.columns = ["id", "name", "desc", "params"] |
|
if not os.path.exists(csv_file): |
|
df = pd.DataFrame(columns=self.columns) |
|
df.to_csv(self.csv_file, index=False) |
|
|
|
def _load_data(self): |
|
return pd.read_csv(self.csv_file) |
|
|
|
def _save_data(self, df): |
|
df.to_csv(self.csv_file, index=False) |
|
|
|
def add_item(self, item_id, name, desc, params): |
|
df = self._load_data() |
|
if item_id in df["id"].values: |
|
raise DataExistsError(f"Item ID {item_id} already exists.") |
|
new_row = pd.DataFrame( |
|
[ |
|
{ |
|
"id": item_id, |
|
"name": name, |
|
"desc": desc, |
|
"params": json.dumps(params, ensure_ascii=False), |
|
} |
|
] |
|
) |
|
df = pd.concat([df, new_row], ignore_index=True) |
|
self._save_data(df) |
|
|
|
def delete_item(self, item_id): |
|
df = self._load_data() |
|
if item_id not in df["id"].values: |
|
raise DataNotFoundError(f"Item ID {item_id} not found.") |
|
df = df[df["id"] != item_id] |
|
self._save_data(df) |
|
|
|
def update_item(self, item_id, name=None, desc=None, params=None): |
|
df = self._load_data() |
|
if item_id not in df["id"].values: |
|
raise DataNotFoundError(f"Item ID {item_id} not found.") |
|
if name: |
|
df.loc[df["id"] == item_id, "name"] = name |
|
if desc: |
|
df.loc[df["id"] == item_id, "desc"] = desc |
|
if params: |
|
df.loc[df["id"] == item_id, "params"] = params |
|
self._save_data(df) |
|
|
|
def get_item(self, item_id): |
|
df = self._load_data() |
|
if item_id not in df["id"].values: |
|
raise DataNotFoundError(f"Item ID {item_id} not found.") |
|
item = df[df["id"] == item_id].to_dict("records")[0] |
|
item["params"] = json.loads(item["params"]) |
|
return item |
|
|
|
def list_items(self): |
|
df = self._load_data() |
|
items = df.to_dict("records") |
|
for item in items: |
|
item["params"] = json.loads(item["params"]) |
|
return items |
|
|
|
def find_item_by_name(self, name): |
|
df = self._load_data() |
|
if name not in df["name"].values: |
|
raise DataNotFoundError(f"Name {name} not found.") |
|
item = df[df["name"] == name].to_dict("records")[0] |
|
item["params"] = json.loads(item["params"]) |
|
return item |
|
|
|
def find_params_by_name(self, name): |
|
try: |
|
return self.find_item_by_name(name)["params"] |
|
except Exception as e: |
|
logger.error(e) |
|
return {} |
|
|
|
def find_params_by_id(self, id): |
|
try: |
|
return self.get_item(id)["params"] |
|
except Exception as e: |
|
logger.error(e) |
|
return {} |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
class SpeakerManager(BaseManager): |
|
def __init__(self, csv_file): |
|
super().__init__(csv_file) |
|
|
|
manager = SpeakerManager("speakers.test.csv") |
|
|
|
try: |
|
|
|
manager.add_item( |
|
1, "Speaker1", "Description for speaker 1", '{"param1": "value1"}' |
|
) |
|
except DataExistsError as e: |
|
print(e) |
|
|
|
|
|
speakers = manager.list_items() |
|
print(speakers) |
|
|
|
try: |
|
|
|
speaker = manager.get_item(1) |
|
print(speaker) |
|
except DataNotFoundError as e: |
|
print(e) |
|
|
|
try: |
|
|
|
manager.update_item( |
|
1, name="Updated Speaker1", desc="Updated description for speaker 1" |
|
) |
|
except DataNotFoundError as e: |
|
print(e) |
|
|
|
try: |
|
|
|
manager.delete_item(1) |
|
except DataNotFoundError as e: |
|
print(e) |
|
|
|
try: |
|
|
|
speaker_by_name = manager.find_item_by_name("Updated Speaker1") |
|
print(speaker_by_name) |
|
except DataNotFoundError as e: |
|
print(e) |
|
|
|
|
|
speakers_by_params = manager.find_items_by_params('{"param1": "value1"}') |
|
print(speakers_by_params) |
|
|