File size: 2,283 Bytes
ee21b96
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

import os
import shutil
import tempfile
import unittest
from typing import Optional


class TestFileChunker(unittest.TestCase):
    _tmpdir: Optional[str] = None
    _tmpfile: Optional[str] = None
    _line_content = "Hello, World\n"
    _num_bytes = None
    _num_lines = 200
    _num_splits = 20

    @classmethod
    def setUpClass(cls) -> None:
        cls._num_bytes = len(cls._line_content.encode("utf-8"))
        cls._tmpdir = tempfile.mkdtemp()
        with open(os.path.join(cls._tmpdir, "test.txt"), "w") as f:
            cls._tmpfile = f.name
            for _i in range(cls._num_lines):
                f.write(cls._line_content)
            f.flush()

    @classmethod
    def tearDownClass(cls) -> None:
        # Cleanup temp working dir.
        if cls._tmpdir is not None:
            shutil.rmtree(cls._tmpdir)  # type: ignore

    def test_find_offsets(self):
        from fairseq.file_chunker_utils import find_offsets

        offsets = find_offsets(self._tmpfile, self._num_splits)
        self.assertEqual(len(offsets), self._num_splits + 1)
        (zero, *real_offsets, last) = offsets
        self.assertEqual(zero, 0)
        for i, o in enumerate(real_offsets):
            self.assertEqual(
                o,
                self._num_bytes
                + ((i + 1) * self._num_bytes * self._num_lines / self._num_splits),
            )
        self.assertEqual(last, self._num_bytes * self._num_lines)

    def test_readchunks(self):
        from fairseq.file_chunker_utils import Chunker, find_offsets

        offsets = find_offsets(self._tmpfile, self._num_splits)
        for start, end in zip(offsets, offsets[1:]):
            with Chunker(self._tmpfile, start, end) as lines:
                all_lines = list(lines)
                num_lines = self._num_lines / self._num_splits
                self.assertAlmostEqual(
                    len(all_lines), num_lines, delta=1
                )  # because we split on the bites, we might end up with one more/less line in a chunk
                self.assertListEqual(
                    all_lines, [self._line_content for _ in range(len(all_lines))]
                )