File size: 2,488 Bytes
82c9012
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import json
import sys, os, os.path as osp
import yt_dlp
import asyncio

import fire
import pandas as pd
from random import random
from concurrent.futures import ProcessPoolExecutor


def ytb_download(uid, url, json_info, output_dir="ytb_videos/"):
    os.makedirs(output_dir, exist_ok=True)
    # uid = url.split("?v=")[-1]
    yt_opts = {
        "format": "best",  # Download the best quality available
        "outtmpl": osp.join(output_dir, f"{uid}.%(ext)s"),  # Set the output template
        "postprocessors": [
            {
                "key": "FFmpegVideoConvertor",
                "preferedformat": "mp4",  # Convert video to mp4 format
            }
        ],
    }

    video_path = osp.join(output_dir, f"{uid}.mp4")
    meta_path = osp.join(output_dir, f"{uid}.json")
    if osp.exists(video_path) and osp.exists(meta_path):
        print(f"{uid} already labeled.")
        return 0

    try:
        with yt_dlp.YoutubeDL(yt_opts) as ydl:
            ydl.download([url])
        with open(osp.join(output_dir, f"{uid}.json"), "w") as fp:
            json.dump(json_info, fp, indent=2)
        return 0
    except:
        return -1


async def main(csv_path, max_workers=256, shards=0, total=-1, limit=False):
    PPE = ProcessPoolExecutor(max_workers=max_workers)
    loop = asyncio.get_event_loop()

    df = pd.read_csv(csv_path)
    output_dir = csv_path.split(".")[0]

    tasks = []

    data_list = list(df.iterrows())

    if total > 0:
        chunk = len(data_list) // total
        begin_idx = shards * chunk
        end_idx = (shards + 1) * chunk
        if shards == total - 1:
            end_idx = len(data_list)
        data_list = data_list[begin_idx:end_idx]
    print(f"download total {len(data_list)} videos")
    
    for idx, (index, row) in enumerate(data_list):
        uid = row["videoID"]
        url = row["url"]

        json_info = {
            "timestamp": eval(row["timestamp"]),
            "caption": eval(row["caption"]),
            "matching_score": eval(row["matching_score"]),
        }

        tasks.append(
            loop.run_in_executor(PPE, ytb_download, uid, url, json_info, output_dir)
        )
        if idx >= 20 and limit:
            break
    res = await asyncio.gather(*tasks)

    print(f"[{sum(res)} / {len(res)}]")


def entry(csv="panda70m_testing.csv", shards=0, total=-1, limit=False):
    asyncio.run(main(csv, shards=shards, total=total, limit=limit))


if __name__ == "__main__":
    fire.Fire(entry)