File size: 4,470 Bytes
01e655b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c5458aa
01e655b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import pandas as pd
import os
import json

import logging

logger = logging.getLogger(__name__)


class DataExistsError(Exception):
    pass


class DataNotFoundError(Exception):
    pass


# FIXME: 😓这个东西写的比较拉跨,最好找个什么csv库替代掉...
class BaseManager:
    def __init__(self, csv_file):
        self.csv_file = csv_file
        self.columns = ["id", "name", "desc", "params"]
        if not os.path.exists(csv_file):
            df = pd.DataFrame(columns=self.columns)
            df.to_csv(self.csv_file, index=False)

    def _load_data(self):
        return pd.read_csv(self.csv_file)

    def _save_data(self, df):
        df.to_csv(self.csv_file, index=False)

    def add_item(self, item_id, name, desc, params):
        df = self._load_data()
        if item_id in df["id"].values:
            raise DataExistsError(f"Item ID {item_id} already exists.")
        new_row = pd.DataFrame(
            [
                {
                    "id": item_id,
                    "name": name,
                    "desc": desc,
                    "params": json.dumps(params, ensure_ascii=False),
                }
            ]
        )
        df = pd.concat([df, new_row], ignore_index=True)
        self._save_data(df)

    def delete_item(self, item_id):
        df = self._load_data()
        if item_id not in df["id"].values:
            raise DataNotFoundError(f"Item ID {item_id} not found.")
        df = df[df["id"] != item_id]
        self._save_data(df)

    def update_item(self, item_id, name=None, desc=None, params=None):
        df = self._load_data()
        if item_id not in df["id"].values:
            raise DataNotFoundError(f"Item ID {item_id} not found.")
        if name:
            df.loc[df["id"] == item_id, "name"] = name
        if desc:
            df.loc[df["id"] == item_id, "desc"] = desc
        if params:
            df.loc[df["id"] == item_id, "params"] = params
        self._save_data(df)

    def get_item(self, item_id):
        df = self._load_data()
        if item_id not in df["id"].values:
            raise DataNotFoundError(f"Item ID {item_id} not found.")
        item = df[df["id"] == item_id].to_dict("records")[0]
        item["params"] = json.loads(item["params"])
        return item

    def list_items(self):
        df = self._load_data()
        items = df.to_dict("records")
        for item in items:
            item["params"] = json.loads(item["params"])
        return items

    def find_item_by_name(self, name):
        df = self._load_data()
        if name not in df["name"].values:
            raise DataNotFoundError(f"Name {name} not found.")
        item = df[df["name"] == name].to_dict("records")[0]
        item["params"] = json.loads(item["params"])
        return item

    def find_params_by_name(self, name):
        try:
            return self.find_item_by_name(name)["params"]
        except Exception as e:
            logger.error(e)
            return {}

    def find_params_by_id(self, id):
        try:
            return self.get_item(id)["params"]
        except Exception as e:
            logger.error(e)
            return {}


# Usage example
if __name__ == "__main__":

    class SpeakerManager(BaseManager):
        def __init__(self, csv_file):
            super().__init__(csv_file)

    manager = SpeakerManager("speakers.test.csv")

    try:
        # Add speaker
        manager.add_item(
            1, "Speaker1", "Description for speaker 1", '{"param1": "value1"}'
        )
    except DataExistsError as e:
        print(e)

    # List all speakers
    speakers = manager.list_items()
    print(speakers)

    try:
        # Get specific speaker
        speaker = manager.get_item(1)
        print(speaker)
    except DataNotFoundError as e:
        print(e)

    try:
        # Update speaker
        manager.update_item(
            1, name="Updated Speaker1", desc="Updated description for speaker 1"
        )
    except DataNotFoundError as e:
        print(e)

    try:
        # Delete speaker
        manager.delete_item(1)
    except DataNotFoundError as e:
        print(e)

    try:
        # Find speaker by name
        speaker_by_name = manager.find_item_by_name("Updated Speaker1")
        print(speaker_by_name)
    except DataNotFoundError as e:
        print(e)

    # Find speakers by params
    speakers_by_params = manager.find_items_by_params('{"param1": "value1"}')
    print(speakers_by_params)