File size: 4,969 Bytes
cf4f63b
e190970
cf4f63b
 
 
e190970
 
 
cf4f63b
 
 
 
e190970
c0ec2ff
cf4f63b
 
 
 
 
 
 
e190970
c0ec2ff
cf4f63b
 
c0ec2ff
e190970
c0ec2ff
 
 
 
 
 
 
 
 
 
 
e190970
c0ec2ff
 
 
 
cf4f63b
c0ec2ff
 
cf4f63b
c0ec2ff
 
 
 
 
e190970
c0ec2ff
cf4f63b
0e74637
cf4f63b
 
 
 
 
 
 
 
0e74637
cf4f63b
e190970
cf4f63b
 
 
 
 
 
 
 
 
 
 
 
 
e190970
cf4f63b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e190970
c0ec2ff
cf4f63b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import datetime
from concurrent.futures import as_completed
from urllib import parse

import wandb
from requests_futures.sessions import FuturesSession

from dashboard_utils.time_tracker import _log, simple_time_tracker

URL_QUICKSEARCH = "https://huggingface.co/api/quicksearch?"
WANDB_REPO = "learning-at-home/Worker_logs"


@simple_time_tracker(_log)
def get_new_bubble_data():
    serialized_data_points, latest_timestamp = get_serialized_data_points()
    serialized_data = get_serialized_data(serialized_data_points, latest_timestamp)
    profiles = get_profiles(serialized_data_points)

    return serialized_data, profiles


@simple_time_tracker(_log)
def get_profiles(serialized_data_points):
    profiles = []
    with FuturesSession() as session:
        futures = []
        for username in serialized_data_points.keys():
            future = session.get(URL_QUICKSEARCH + parse.urlencode({"type": "user", "q": username}))
            future.username = username
            futures.append(future)
        for future in as_completed(futures):
            resp = future.result()
            username = future.username
            response = resp.json()
            avatarUrl = None
            if response["users"]:
                for user_candidate in response["users"]:
                    if user_candidate["user"] == username:
                        avatarUrl = response["users"][0]["avatarUrl"]
                        break
            if not avatarUrl:
                avatarUrl = "/avatars/57584cb934354663ac65baa04e6829bf.svg"

            if avatarUrl.startswith("/avatars/"):
                avatarUrl = f"https://huggingface.co{avatarUrl}"

            profiles.append(
                {"id": username, "name": username, "src": avatarUrl, "url": f"https://huggingface.co/{username}"}
            )
    return profiles


@simple_time_tracker(_log)
def get_serialized_data_points():

    api = wandb.Api()
    runs = api.runs(WANDB_REPO)

    serialized_data_points = {}
    latest_timestamp = None
    for run in runs:
        run_summary = run.summary._json_dict
        run_name = run.name

        if run_name in serialized_data_points:
            if "_timestamp" in run_summary and "_step" in run_summary:
                timestamp = run_summary["_timestamp"]
                serialized_data_points[run_name]["Runs"].append(
                    {
                        "batches": run_summary["_step"],
                        "runtime": run_summary["_runtime"],
                        "loss": run_summary["train/loss"],
                        "velocity": run_summary["_step"] / run_summary["_runtime"],
                        "date": datetime.datetime.utcfromtimestamp(timestamp),
                    }
                )
                if not latest_timestamp or timestamp > latest_timestamp:
                    latest_timestamp = timestamp
        else:
            if "_timestamp" in run_summary and "_step" in run_summary:
                timestamp = run_summary["_timestamp"]
                serialized_data_points[run_name] = {
                    "profileId": run_name,
                    "Runs": [
                        {
                            "batches": run_summary["_step"],
                            "runtime": run_summary["_runtime"],
                            "loss": run_summary["train/loss"],
                            "velocity": run_summary["_step"] / run_summary["_runtime"],
                            "date": datetime.datetime.utcfromtimestamp(timestamp),
                        }
                    ],
                }
                if not latest_timestamp or timestamp > latest_timestamp:
                    latest_timestamp = timestamp
    latest_timestamp = datetime.datetime.utcfromtimestamp(latest_timestamp)
    return serialized_data_points, latest_timestamp


@simple_time_tracker(_log)
def get_serialized_data(serialized_data_points, latest_timestamp):
    serialized_data_points_v2 = []
    max_velocity = 1
    for run_name, serialized_data_point in serialized_data_points.items():
        activeRuns = []
        loss = 0
        runtime = 0
        batches = 0
        velocity = 0
        for run in serialized_data_point["Runs"]:
            if run["date"] == latest_timestamp:
                run["date"] = run["date"].isoformat()
                activeRuns.append(run)
                loss += run["loss"]
                velocity += run["velocity"]
            loss = loss / len(activeRuns) if activeRuns else 0
            runtime += run["runtime"]
            batches += run["batches"]
        new_item = {
            "date": latest_timestamp.isoformat(),
            "profileId": run_name,
            "batches": batches,
            "runtime": runtime,
            "activeRuns": activeRuns,
        }
        serialized_data_points_v2.append(new_item)
    serialized_data = {"points": [serialized_data_points_v2], "maxVelocity": max_velocity}
    return serialized_data