import os import csv import json import shutil TMP_DIR = "./__pycache__" TAB_CONFIG = ["jsonl ⇆ csv", "json ⇆ csv", "json ⇆ jsonl"] MODE = {"from": "jsonl", "to": "csv"} def clean_cache(dir_path=TMP_DIR): if os.path.exists(dir_path): shutil.rmtree(dir_path) if not os.path.exists(dir_path): os.makedirs(dir_path) def encoder_json(file_path: str): with open(file_path, "r", encoding="utf-8") as file: data_list = list(json.load(file)) return data_list def encoder_jsonl(file_path: str): data_list = [] with open(file_path, "r", encoding="utf-8") as file: for line in file: # 加载每一行的 JSON 数据 json_data = json.loads(line.strip()) data_list.append(json_data) return data_list def encoder_csv(file_path: str): data_list = [] with open(file_path, "r", encoding="utf-8") as file: csv_reader = csv.DictReader(file) for row in csv_reader: data_list.append(dict(row)) return data_list def decoder_json(data_list: list, file_path=f"{TMP_DIR}/output.json"): if data_list: with open(file_path, "w", encoding="utf-8") as file: # 将整个列表转换成 JSON 格式并写入文件 json.dump(data_list, file, ensure_ascii=False, indent=4) return file_path def decoder_csv(data_list: list, file_path=f"{TMP_DIR}/output.csv"): if data_list: # 提取第一个字典的键作为表头 header = list(data_list[0].keys()) with open(file_path, "w", newline="", encoding="utf-8") as file: csv_writer = csv.writer(file) # 写入表头 csv_writer.writerow(header) # 逐项写入字典的值 for item in data_list: csv_writer.writerow([item[key] for key in header]) return file_path def decoder_jsonl(data_list: list, file_path=f"{TMP_DIR}/output.jsonl"): if data_list: with open(file_path, "w", encoding="utf-8") as file: for data in data_list: # 将每个 JSON 对象转换成字符串并写入文件,每行一个对象 json_line = json.dumps(data, ensure_ascii=False) file.write(json_line + "\n") return file_path def change_mode(input: str): affix = input.split(" ") if affix[1] == "→": MODE["from"] = affix[0] MODE["to"] = affix[2] else: MODE["from"] = affix[2] MODE["to"] = affix[0]