File size: 13,706 Bytes
e276be2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
import sys
import io
import os
import re
import json
import tarfile
from functools import partial

import webdataset as wds
from webdataset import ResampledShards, DataPipeline, tarfile_to_samples
from webdataset.filters import pipelinefilter
from webdataset.tariterators import url_opener, group_by_keys
from webdataset.handlers import reraise_exception
from webdataset.gopen import gopen_schemes, gopen


def pytorch_worker_info(group=None):  # sourcery skip: use-contextlib-suppress
    """Return node and worker info for PyTorch and some distributed environments."""
    rank = 0
    world_size = 1
    worker = 0
    num_workers = 1
    try:
        import torch.distributed

        if torch.distributed.is_available() and torch.distributed.is_initialized():
            group = group or torch.distributed.group.WORLD
            rank = torch.distributed.get_rank(group=group)
            world_size = torch.distributed.get_world_size(group=group)
    except ModuleNotFoundError:
        pass
    try:
        import torch.utils.data

        worker_info = torch.utils.data.get_worker_info()
        if worker_info is not None:
            worker = worker_info.id
            num_workers = worker_info.num_workers
    except ModuleNotFoundError:
        pass

    return rank, world_size, worker, num_workers


def pytorch_worker_seed(group=None):
    """Compute a distinct, deterministic RNG seed for each worker and node."""
    rank, world_size, worker, num_workers = pytorch_worker_info(group=group)
    return rank * 1000 + worker


def worker_seed_sat(group=None, seed=0):
    return pytorch_worker_seed(group=group) + seed * 23


class ConfiguredResampledShards(ResampledShards):
    def __init__(self, urls, seed, nshards=sys.maxsize, deterministic=True):
        from sat.helpers import print_rank0

        try:
            from megatron.core.parallel_state import get_data_parallel_group

            group = get_data_parallel_group()
            print_rank0("Using megatron data parallel group.")
        except:
            from sat.mpu import get_data_parallel_group

            try:
                group = get_data_parallel_group()
                print_rank0("Using sat data parallel group.")
            except AssertionError:
                group = None
                print_rank0("No data parallel group is specified!")
        worker_seed_sat_this = partial(worker_seed_sat, group=group, seed=seed)
        super().__init__(urls, nshards, worker_seed_sat_this, deterministic)


class SimpleDistributedWebDataset(DataPipeline):
    def __init__(self, path, process_fn, seed, *, shuffle_buffer=1000):
        # set shuffle_buffer = 1 to disable it, model-parallel will be different due to shuffle
        try:
            from sat.mpu import get_model_parallel_world_size

            if get_model_parallel_world_size() > 1:
                shuffle_buffer = 1
        except Exception:
            pass
        super().__init__(
            ConfiguredResampledShards(path, seed),  # Lots of shards are recommended, or not evenly
            tarfile_to_samples(),
            wds.shuffle(shuffle_buffer),
            process_fn,
        )


def tar_file_iterator_with_meta(
    fileobj, meta_names, skip_meta=r"__[^/]*__($|/)", suffix=None, handler=reraise_exception, meta_stream=None
):
    """Iterate over tar file, yielding filename, content pairs for the given tar stream.

    :param fileobj: byte stream suitable for tarfile
    :param meta_names: key of different items in meta file
    :param skip_meta: regexp for keys that are skipped entirely (Default value = r"__[^/]*__($|/)")

    """
    stream = tarfile.open(fileobj=fileobj, mode="r|*")
    data_dir, filename = fileobj.name.rsplit("/", 1)
    meta_data = {}  # {id: {meta_name: meta_value, meta_name2: meta_value2, ...}}

    if meta_stream is None:
        meta_file_name = filename.split(".")[0] + ".meta.jsonl"
        meta_path = os.path.join(data_dir, meta_file_name)
        if os.path.exists(meta_path):
            meta_stream = open(meta_path, "r")
    else:
        meta_file_name = meta_stream.name

    if meta_stream is not None:
        for lineno, line in enumerate(meta_stream):
            meta_list = []
            try:
                meta_list.append(json.loads(line))
            except Exception as exn:
                from sat.helpers import print_rank0

                print_rank0(f"Error in loading jsonl {meta_file_name}, lineno {lineno}: {line}", level="DEBUG")
                continue
            for item in meta_list:
                if not item["key"] in meta_data:
                    meta_data[item["key"]] = {}
                for meta_name in meta_names:
                    if meta_name in item:
                        meta_data[item["key"]][meta_name] = item[meta_name]
        meta_stream.close()

    try:
        for tarinfo in stream:
            fname = tarinfo.name
            try:
                if not tarinfo.isreg():
                    continue
                if fname is None:
                    continue
                if "/" not in fname and fname.startswith("__") and fname.endswith("__"):
                    # skipping metadata for now
                    continue
                if skip_meta is not None and re.match(skip_meta, fname):
                    continue
                if fname.endswith(".txt") and suffix is not None:
                    data = (stream.extractfile(tarinfo).read().decode() + suffix).encode()
                else:
                    data = stream.extractfile(tarinfo).read()
                result = dict(fname=fname, data=data)
                yield result

                if fname.endswith(".id"):
                    fid = fname.split(".")[0]
                    if "-$#%@&" in fid:
                        sfid = fid.split("-$#%@&")[0]
                    else:
                        sfid = fid
                    meta_data_fid = meta_data.get(sfid, {})
                    for meta_name in meta_names:
                        meta_fname = fid + "." + meta_name
                        meta = meta_data_fid.get(meta_name, None)
                        yield dict(fname=meta_fname, data=meta)
                stream.members = []
            except Exception as exn:
                if hasattr(exn, "args") and len(exn.args) > 0:
                    exn.args = (exn.args[0] + " @ " + str(fileobj),) + exn.args[1:]
                if handler(exn):
                    continue
                else:
                    break
    except Exception as exn:
        print(exn)
    del stream


def tar_file_expander_with_meta(data, meta_names, handler=reraise_exception):
    """Expand a stream of open tar files into a stream of tar file contents.

    This returns an iterator over (filename, file_contents).
    """
    for source in data:
        url = source["url"]
        try:
            assert isinstance(source, dict)
            assert "stream" in source
            for sample in tar_file_iterator_with_meta(source["stream"], meta_names, meta_stream=source["meta_stream"]):
                assert isinstance(sample, dict) and "data" in sample and "fname" in sample
                sample["__url__"] = url
                yield sample
        except Exception as exn:
            exn.args = exn.args + (source.get("stream"), source.get("url"))
            if handler(exn):
                continue
            else:
                break


def url_opener(
    data,
    handler,
    **kw,
):
    """Open URLs and yield a stream of url+stream pairs.

    Args:
        data: iterator over dict(url=...)
        handler: exception handler.
        kw: keyword arguments for gopen.gopen.

    Yields:
        a stream of url+stream pairs.
    """
    for sample in data:
        assert isinstance(sample, dict), sample
        assert "url" in sample
        url = sample["url"]
        try:
            stream = gopen(url, **kw)
            if hasattr(stream, "meta_stream"):
                meta_stream = stream.meta_stream
                del stream.meta_stream
            else:
                meta_stream = None
            sample.update(stream=stream, meta_stream=meta_stream)
            yield sample
        except Exception as exn:
            exn.args = exn.args + (url,)
            if handler(exn):
                continue
            else:
                break


def tarfile_samples_with_meta(src, meta_names, handler=reraise_exception):
    streams = url_opener(src, handler=handler)
    files = tar_file_expander_with_meta(streams, meta_names, handler)
    samples = group_by_keys(files, handler=handler)
    return samples


class MetaDistributedWebDataset(DataPipeline):
    """WebDataset with meta information files
    Extra Format:
        in webdataset (tar), for each sample there is a '.id';
        for each tar file, there is a '.meta.jsonl' file with the same name;
        The '.meta.jsonl' file contains lines of json objects, each with a 'key' field to match '.id'.
    """

    def __init__(
        self, path, process_fn, seed, *, meta_names=[], nshards=sys.maxsize, shuffle_buffer=1000, include_dirs=None
    ):
        # os.environ['WDS_SHOW_SEED'] = '1'
        import torch

        if torch.distributed.get_rank() == 0:
            if include_dirs is not None:  # /webdatasets/A,/webdatasets/C
                other_paths = []
                include_dirs = include_dirs.split(",")
                for include_dir in include_dirs:
                    if "*" in include_dir:
                        include_dir, n = include_dir.split("*")
                        n = int(n)
                    else:
                        n = 1
                    for cur_dir, dirs, files in os.walk(include_dir):
                        for f in files:
                            if f.endswith("tar") and os.path.getsize(os.path.join(cur_dir, f)) > 0:
                                # other_paths.append(os.path.join(cur_dir,f))
                                other_paths.extend([os.path.join(cur_dir, f)] * n)
                # print(f'Adding dataset paths {",".join(other_paths)}')
                from braceexpand import braceexpand

                if len(path) > 0:  # not ""
                    path = list(braceexpand(path)) + other_paths
                else:
                    path = other_paths
            path = [path]
        else:
            path = [
                None,
            ]
        torch.distributed.broadcast_object_list(path, src=0)
        path = path[0]

        tarfile_samples = partial(tarfile_samples_with_meta, meta_names=meta_names)
        tarfile_to_samples = pipelinefilter(tarfile_samples)

        # if model parallel, shuffle_buffer should be 1 to disable shuffling
        try:
            from sat.mpu import get_model_parallel_world_size

            if get_model_parallel_world_size() > 1:
                shuffle_buffer = 1
        except Exception:
            pass

        super().__init__(
            ConfiguredResampledShards(path, seed, nshards=nshards),
            tarfile_to_samples(),
            wds.shuffle(shuffle_buffer),
            process_fn,
        )


# rclone support
from webdataset.gopen import Pipe


def gopen_rclone(url, mode="rb", bufsize=1024 * 1024 * 32):
    """Open a URL with `curl`.

    :param url: rclone url, e.g. data:bucket1/foo.tar. data should be configured.
    :param mode: file mode
    :param bufsize: buffer size
    """
    url = url.replace("rclone://", "")
    if mode[0] == "r":
        cmd = f"rclone cat '{url}'"
        return Pipe(
            cmd,
            mode=mode,
            shell=True,
            bufsize=bufsize,
            ignore_status=[141, 23],
        )  # skipcq: BAN-B604
    elif mode[0] == "w":
        cmd = f"rclone cp - '{url}'"
        return Pipe(
            cmd,
            mode=mode,
            shell=True,
            bufsize=bufsize,
            ignore_status=[141, 26],
        )  # skipcq: BAN-B604
    else:
        raise ValueError(f"{mode}: unknown mode")


def gopen_boto3(url, mode="rb", bufsize=8192 * 2):
    """Open a URL with boto3 API.

    :param url: boto3 url, e.g. boto3://bucket1/foo.tar. data should be configured.
    :param mode: file mode
    :param bufsize: buffer size
    """
    import boto3

    # boto3.set_stream_logger('botocore', level='DEBUG')
    if url.startswith("boto3://"):
        url = url.replace("boto3://", "")
        need_meta = False
    else:
        url = url.replace("metaboto3://", "")
        need_meta = True
    endpoint_url = os.environ.get("S3_ENDPOINT_URL", None)
    access_key = os.environ.get("S3_ACCESS_KEY_ID", None)
    secret_key = os.environ.get("S3_SECRET_ACCESS_KEY", None)

    if mode[0] == "r":
        s3_client = boto3.client(
            "s3", endpoint_url=endpoint_url, aws_access_key_id=access_key, aws_secret_access_key=secret_key
        )
        bucket, key = url.split("/", 1)

        if need_meta:
            # download a meta json
            meta_file_key = key.split(".")[0] + ".meta.jsonl"
            meta_stream = io.BytesIO()
            s3_client.download_fileobj(bucket, meta_file_key, meta_stream)
            meta_stream.seek(0)
            meta_stream.name = meta_file_key
        else:
            meta_stream = None

        # data tar stream
        response = s3_client.get_object(Bucket=bucket, Key=key)  # Range optional
        response["Body"].name = key  # actually not used
        response["Body"].meta_stream = meta_stream
        return response["Body"]
    else:
        raise ValueError(f"{mode}: unknown mode")


gopen_schemes["rclone"] = gopen_rclone
gopen_schemes["boto3"] = gopen_boto3
gopen_schemes["metaboto3"] = gopen_boto3