File size: 4,470 Bytes
01e655b c5458aa 01e655b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
import pandas as pd
import os
import json
import logging
logger = logging.getLogger(__name__)
class DataExistsError(Exception):
pass
class DataNotFoundError(Exception):
pass
# FIXME: 😓这个东西写的比较拉跨,最好找个什么csv库替代掉...
class BaseManager:
def __init__(self, csv_file):
self.csv_file = csv_file
self.columns = ["id", "name", "desc", "params"]
if not os.path.exists(csv_file):
df = pd.DataFrame(columns=self.columns)
df.to_csv(self.csv_file, index=False)
def _load_data(self):
return pd.read_csv(self.csv_file)
def _save_data(self, df):
df.to_csv(self.csv_file, index=False)
def add_item(self, item_id, name, desc, params):
df = self._load_data()
if item_id in df["id"].values:
raise DataExistsError(f"Item ID {item_id} already exists.")
new_row = pd.DataFrame(
[
{
"id": item_id,
"name": name,
"desc": desc,
"params": json.dumps(params, ensure_ascii=False),
}
]
)
df = pd.concat([df, new_row], ignore_index=True)
self._save_data(df)
def delete_item(self, item_id):
df = self._load_data()
if item_id not in df["id"].values:
raise DataNotFoundError(f"Item ID {item_id} not found.")
df = df[df["id"] != item_id]
self._save_data(df)
def update_item(self, item_id, name=None, desc=None, params=None):
df = self._load_data()
if item_id not in df["id"].values:
raise DataNotFoundError(f"Item ID {item_id} not found.")
if name:
df.loc[df["id"] == item_id, "name"] = name
if desc:
df.loc[df["id"] == item_id, "desc"] = desc
if params:
df.loc[df["id"] == item_id, "params"] = params
self._save_data(df)
def get_item(self, item_id):
df = self._load_data()
if item_id not in df["id"].values:
raise DataNotFoundError(f"Item ID {item_id} not found.")
item = df[df["id"] == item_id].to_dict("records")[0]
item["params"] = json.loads(item["params"])
return item
def list_items(self):
df = self._load_data()
items = df.to_dict("records")
for item in items:
item["params"] = json.loads(item["params"])
return items
def find_item_by_name(self, name):
df = self._load_data()
if name not in df["name"].values:
raise DataNotFoundError(f"Name {name} not found.")
item = df[df["name"] == name].to_dict("records")[0]
item["params"] = json.loads(item["params"])
return item
def find_params_by_name(self, name):
try:
return self.find_item_by_name(name)["params"]
except Exception as e:
logger.error(e)
return {}
def find_params_by_id(self, id):
try:
return self.get_item(id)["params"]
except Exception as e:
logger.error(e)
return {}
# Usage example
if __name__ == "__main__":
class SpeakerManager(BaseManager):
def __init__(self, csv_file):
super().__init__(csv_file)
manager = SpeakerManager("speakers.test.csv")
try:
# Add speaker
manager.add_item(
1, "Speaker1", "Description for speaker 1", '{"param1": "value1"}'
)
except DataExistsError as e:
print(e)
# List all speakers
speakers = manager.list_items()
print(speakers)
try:
# Get specific speaker
speaker = manager.get_item(1)
print(speaker)
except DataNotFoundError as e:
print(e)
try:
# Update speaker
manager.update_item(
1, name="Updated Speaker1", desc="Updated description for speaker 1"
)
except DataNotFoundError as e:
print(e)
try:
# Delete speaker
manager.delete_item(1)
except DataNotFoundError as e:
print(e)
try:
# Find speaker by name
speaker_by_name = manager.find_item_by_name("Updated Speaker1")
print(speaker_by_name)
except DataNotFoundError as e:
print(e)
# Find speakers by params
speakers_by_params = manager.find_items_by_params('{"param1": "value1"}')
print(speakers_by_params)
|