davanstrien HF staff commited on
Commit
e901392
·
1 Parent(s): 943dac2

chore: Add data_loader.py for dataset loading and processing

Browse files
Files changed (1) hide show
  1. data_loader.py +146 -0
data_loader.py ADDED
@@ -0,0 +1,146 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from datetime import datetime
3
+
4
+ from dotenv import load_dotenv
5
+ from httpx import Client, AsyncClient
6
+ from huggingface_hub import HfApi
7
+ from huggingface_hub.utils import logging
8
+ from tqdm.auto import tqdm
9
+ from typing import Any, Dict, List
10
+ import pandas as pd
11
+
12
+ load_dotenv()
13
+
14
+ HF_TOKEN = os.getenv("HF_TOKEN")
15
+ assert HF_TOKEN is not None, "You need to set HF_TOKEN in your environment variables"
16
+ USER_AGENT = os.getenv("USER_AGENT")
17
+ assert (
18
+ USER_AGENT is not None
19
+ ), "You need to set USER_AGENT in your environment variables"
20
+
21
+ logger = logging.get_logger(__name__)
22
+ headers = {
23
+ "authorization": f"Bearer ${HF_TOKEN}",
24
+ "user-agent": USER_AGENT,
25
+ }
26
+ client = Client(headers=headers)
27
+ async_client = AsyncClient(headers=headers)
28
+ api = HfApi(token=HF_TOKEN)
29
+
30
+
31
+ def has_card_data(dataset):
32
+ return hasattr(dataset, "card_data")
33
+
34
+
35
+ def check_dataset_has_dataset_info(dataset):
36
+ return bool(
37
+ has_card_data(dataset)
38
+ and hasattr(dataset.card_data, "dataset_info")
39
+ and dataset.card_data.dataset_info is not None
40
+ )
41
+
42
+
43
+ def parse_single_config_dataset(data):
44
+ config_name = data.get("config_name", "default")
45
+ features = data.get("features", [])
46
+ column_names = [feature.get("name") for feature in features]
47
+ return {
48
+ "config_name": config_name,
49
+ "column_names": column_names,
50
+ "features": features,
51
+ }
52
+
53
+
54
+ def parse_multiple_config_dataset(data: List[Dict[str, Any]]):
55
+ return [parse_single_config_dataset(d) for d in data]
56
+
57
+
58
+ def parse_dataset(dataset):
59
+ hub_id = dataset.id
60
+ likes = dataset.likes
61
+ downloads = dataset.downloads
62
+ tags = dataset.tags
63
+ created_at = dataset.created_at
64
+ last_modified = dataset.last_modified
65
+ license = dataset.card_data.license
66
+ language = dataset.card_data.language
67
+ return {
68
+ "hub_id": hub_id,
69
+ "likes": likes,
70
+ "downloads": downloads,
71
+ "tags": tags,
72
+ "created_at": created_at,
73
+ "last_modified": last_modified,
74
+ "license": license,
75
+ "language": language,
76
+ }
77
+
78
+
79
+ def parsed_column_info(dataset_info):
80
+ if isinstance(dataset_info, dict):
81
+ return [parse_single_config_dataset(dataset_info)]
82
+ elif isinstance(dataset_info, list):
83
+ return parse_multiple_config_dataset(dataset_info)
84
+ return None
85
+
86
+
87
+ def ensure_list_of_strings(value):
88
+ if value is None:
89
+ return []
90
+ if isinstance(value, list):
91
+ return [str(item) for item in value]
92
+ return [str(value)]
93
+
94
+
95
+ def refresh_data() -> List[Dict[str, Any]]:
96
+ # current date as string
97
+ now = datetime.now()
98
+ # check if a file for the current date exists
99
+ if os.path.exists(f"datasets_{now.strftime('%Y-%m-%d')}.parquet"):
100
+ df = pd.read_parquet(f"datasets_{now.strftime('%Y-%m-%d')}.parquet")
101
+ return df.to_dict(orient="records")
102
+
103
+ # List all datasets
104
+ datasets = list(api.list_datasets(limit=None, full=True))
105
+
106
+ # Filter datasets with dataset info
107
+ datasets = [
108
+ dataset for dataset in tqdm(datasets) if check_dataset_has_dataset_info(dataset)
109
+ ]
110
+
111
+ parsed_datasets = []
112
+ for dataset in tqdm(datasets):
113
+ try:
114
+ datasetinfo = parse_dataset(dataset)
115
+ column_info = parsed_column_info(dataset.card_data.dataset_info)
116
+ parsed_datasets.extend({**datasetinfo, **info} for info in column_info)
117
+ except Exception as e:
118
+ print(f"Error processing dataset {dataset.id}: {e}")
119
+ continue
120
+
121
+ # Convert to DataFrame
122
+ df = pd.DataFrame(parsed_datasets)
123
+
124
+ # Ensure 'license', 'tags', and 'language' are lists of strings
125
+ df["license"] = df["license"].apply(ensure_list_of_strings)
126
+ df["tags"] = df["tags"].apply(ensure_list_of_strings)
127
+ df["language"] = df["language"].apply(ensure_list_of_strings)
128
+
129
+ # Convert 'features' column to string
130
+ df["features"] = df["features"].apply(lambda x: str(x) if x is not None else None)
131
+ df = df.astype({"hub_id": "string", "config_name": "string"})
132
+
133
+ # save to parquet file with current date
134
+ df.to_parquet(f"datasets_{now.strftime('%Y-%m-%d')}.parquet")
135
+
136
+ # save to JSON file with current date
137
+ df.to_json(
138
+ f"datasets_{now.strftime('%Y-%m-%d')}.json", orient="records", lines=True
139
+ )
140
+
141
+ # return a list of dictionaries
142
+ return df.to_dict(orient="records")
143
+
144
+
145
+ if __name__ == "__main__":
146
+ refresh_data()