|
from __future__ import annotations |
|
|
|
import collections |
|
import functools |
|
import logging |
|
import math |
|
import os |
|
import threading |
|
import warnings |
|
from concurrent.futures import Future, ThreadPoolExecutor |
|
from typing import ( |
|
TYPE_CHECKING, |
|
Any, |
|
Callable, |
|
ClassVar, |
|
Generic, |
|
NamedTuple, |
|
Optional, |
|
OrderedDict, |
|
TypeVar, |
|
) |
|
|
|
if TYPE_CHECKING: |
|
import mmap |
|
|
|
from typing_extensions import ParamSpec |
|
|
|
P = ParamSpec("P") |
|
else: |
|
P = TypeVar("P") |
|
|
|
T = TypeVar("T") |
|
|
|
|
|
logger = logging.getLogger("fsspec") |
|
|
|
Fetcher = Callable[[int, int], bytes] |
|
|
|
|
|
class BaseCache: |
|
"""Pass-though cache: doesn't keep anything, calls every time |
|
|
|
Acts as base class for other cachers |
|
|
|
Parameters |
|
---------- |
|
blocksize: int |
|
How far to read ahead in numbers of bytes |
|
fetcher: func |
|
Function of the form f(start, end) which gets bytes from remote as |
|
specified |
|
size: int |
|
How big this file is |
|
""" |
|
|
|
name: ClassVar[str] = "none" |
|
|
|
def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: |
|
self.blocksize = blocksize |
|
self.nblocks = 0 |
|
self.fetcher = fetcher |
|
self.size = size |
|
self.hit_count = 0 |
|
self.miss_count = 0 |
|
|
|
self.total_requested_bytes = 0 |
|
|
|
def _fetch(self, start: int | None, stop: int | None) -> bytes: |
|
if start is None: |
|
start = 0 |
|
if stop is None: |
|
stop = self.size |
|
if start >= self.size or start >= stop: |
|
return b"" |
|
return self.fetcher(start, stop) |
|
|
|
def _reset_stats(self) -> None: |
|
"""Reset hit and miss counts for a more ganular report e.g. by file.""" |
|
self.hit_count = 0 |
|
self.miss_count = 0 |
|
self.total_requested_bytes = 0 |
|
|
|
def _log_stats(self) -> str: |
|
"""Return a formatted string of the cache statistics.""" |
|
if self.hit_count == 0 and self.miss_count == 0: |
|
|
|
return "" |
|
return " , %s: %d hits, %d misses, %d total requested bytes" % ( |
|
self.name, |
|
self.hit_count, |
|
self.miss_count, |
|
self.total_requested_bytes, |
|
) |
|
|
|
def __repr__(self) -> str: |
|
|
|
return f""" |
|
<{self.__class__.__name__}: |
|
block size : {self.blocksize} |
|
block count : {self.nblocks} |
|
file size : {self.size} |
|
cache hits : {self.hit_count} |
|
cache misses: {self.miss_count} |
|
total requested bytes: {self.total_requested_bytes}> |
|
""" |
|
|
|
|
|
class MMapCache(BaseCache): |
|
"""memory-mapped sparse file cache |
|
|
|
Opens temporary file, which is filled blocks-wise when data is requested. |
|
Ensure there is enough disc space in the temporary location. |
|
|
|
This cache method might only work on posix |
|
""" |
|
|
|
name = "mmap" |
|
|
|
def __init__( |
|
self, |
|
blocksize: int, |
|
fetcher: Fetcher, |
|
size: int, |
|
location: str | None = None, |
|
blocks: set[int] | None = None, |
|
) -> None: |
|
super().__init__(blocksize, fetcher, size) |
|
self.blocks = set() if blocks is None else blocks |
|
self.location = location |
|
self.cache = self._makefile() |
|
|
|
def _makefile(self) -> mmap.mmap | bytearray: |
|
import mmap |
|
import tempfile |
|
|
|
if self.size == 0: |
|
return bytearray() |
|
|
|
|
|
if self.location is None or not os.path.exists(self.location): |
|
if self.location is None: |
|
fd = tempfile.TemporaryFile() |
|
self.blocks = set() |
|
else: |
|
fd = open(self.location, "wb+") |
|
fd.seek(self.size - 1) |
|
fd.write(b"1") |
|
fd.flush() |
|
else: |
|
fd = open(self.location, "r+b") |
|
|
|
return mmap.mmap(fd.fileno(), self.size) |
|
|
|
def _fetch(self, start: int | None, end: int | None) -> bytes: |
|
logger.debug(f"MMap cache fetching {start}-{end}") |
|
if start is None: |
|
start = 0 |
|
if end is None: |
|
end = self.size |
|
if start >= self.size or start >= end: |
|
return b"" |
|
start_block = start // self.blocksize |
|
end_block = end // self.blocksize |
|
need = [i for i in range(start_block, end_block + 1) if i not in self.blocks] |
|
hits = [i for i in range(start_block, end_block + 1) if i in self.blocks] |
|
self.miss_count += len(need) |
|
self.hit_count += len(hits) |
|
while need: |
|
|
|
|
|
i = need.pop(0) |
|
|
|
sstart = i * self.blocksize |
|
send = min(sstart + self.blocksize, self.size) |
|
self.total_requested_bytes += send - sstart |
|
logger.debug(f"MMap get block #{i} ({sstart}-{send})") |
|
self.cache[sstart:send] = self.fetcher(sstart, send) |
|
self.blocks.add(i) |
|
|
|
return self.cache[start:end] |
|
|
|
def __getstate__(self) -> dict[str, Any]: |
|
state = self.__dict__.copy() |
|
|
|
del state["cache"] |
|
return state |
|
|
|
def __setstate__(self, state: dict[str, Any]) -> None: |
|
|
|
self.__dict__.update(state) |
|
self.cache = self._makefile() |
|
|
|
|
|
class ReadAheadCache(BaseCache): |
|
"""Cache which reads only when we get beyond a block of data |
|
|
|
This is a much simpler version of BytesCache, and does not attempt to |
|
fill holes in the cache or keep fragments alive. It is best suited to |
|
many small reads in a sequential order (e.g., reading lines from a file). |
|
""" |
|
|
|
name = "readahead" |
|
|
|
def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: |
|
super().__init__(blocksize, fetcher, size) |
|
self.cache = b"" |
|
self.start = 0 |
|
self.end = 0 |
|
|
|
def _fetch(self, start: int | None, end: int | None) -> bytes: |
|
if start is None: |
|
start = 0 |
|
if end is None or end > self.size: |
|
end = self.size |
|
if start >= self.size or start >= end: |
|
return b"" |
|
l = end - start |
|
if start >= self.start and end <= self.end: |
|
|
|
self.hit_count += 1 |
|
return self.cache[start - self.start : end - self.start] |
|
elif self.start <= start < self.end: |
|
|
|
self.miss_count += 1 |
|
part = self.cache[start - self.start :] |
|
l -= len(part) |
|
start = self.end |
|
else: |
|
|
|
self.miss_count += 1 |
|
part = b"" |
|
end = min(self.size, end + self.blocksize) |
|
self.total_requested_bytes += end - start |
|
self.cache = self.fetcher(start, end) |
|
self.start = start |
|
self.end = self.start + len(self.cache) |
|
return part + self.cache[:l] |
|
|
|
|
|
class FirstChunkCache(BaseCache): |
|
"""Caches the first block of a file only |
|
|
|
This may be useful for file types where the metadata is stored in the header, |
|
but is randomly accessed. |
|
""" |
|
|
|
name = "first" |
|
|
|
def __init__(self, blocksize: int, fetcher: Fetcher, size: int) -> None: |
|
if blocksize > size: |
|
|
|
blocksize = size |
|
super().__init__(blocksize, fetcher, size) |
|
self.cache: bytes | None = None |
|
|
|
def _fetch(self, start: int | None, end: int | None) -> bytes: |
|
start = start or 0 |
|
if start > self.size: |
|
logger.debug("FirstChunkCache: requested start > file size") |
|
return b"" |
|
|
|
end = min(end, self.size) |
|
|
|
if start < self.blocksize: |
|
if self.cache is None: |
|
self.miss_count += 1 |
|
if end > self.blocksize: |
|
self.total_requested_bytes += end |
|
data = self.fetcher(0, end) |
|
self.cache = data[: self.blocksize] |
|
return data[start:] |
|
self.cache = self.fetcher(0, self.blocksize) |
|
self.total_requested_bytes += self.blocksize |
|
part = self.cache[start:end] |
|
if end > self.blocksize: |
|
self.total_requested_bytes += end - self.blocksize |
|
part += self.fetcher(self.blocksize, end) |
|
self.hit_count += 1 |
|
return part |
|
else: |
|
self.miss_count += 1 |
|
self.total_requested_bytes += end - start |
|
return self.fetcher(start, end) |
|
|
|
|
|
class BlockCache(BaseCache): |
|
""" |
|
Cache holding memory as a set of blocks. |
|
|
|
Requests are only ever made ``blocksize`` at a time, and are |
|
stored in an LRU cache. The least recently accessed block is |
|
discarded when more than ``maxblocks`` are stored. |
|
|
|
Parameters |
|
---------- |
|
blocksize : int |
|
The number of bytes to store in each block. |
|
Requests are only ever made for ``blocksize``, so this |
|
should balance the overhead of making a request against |
|
the granularity of the blocks. |
|
fetcher : Callable |
|
size : int |
|
The total size of the file being cached. |
|
maxblocks : int |
|
The maximum number of blocks to cache for. The maximum memory |
|
use for this cache is then ``blocksize * maxblocks``. |
|
""" |
|
|
|
name = "blockcache" |
|
|
|
def __init__( |
|
self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32 |
|
) -> None: |
|
super().__init__(blocksize, fetcher, size) |
|
self.nblocks = math.ceil(size / blocksize) |
|
self.maxblocks = maxblocks |
|
self._fetch_block_cached = functools.lru_cache(maxblocks)(self._fetch_block) |
|
|
|
def cache_info(self): |
|
""" |
|
The statistics on the block cache. |
|
|
|
Returns |
|
------- |
|
NamedTuple |
|
Returned directly from the LRU Cache used internally. |
|
""" |
|
return self._fetch_block_cached.cache_info() |
|
|
|
def __getstate__(self) -> dict[str, Any]: |
|
state = self.__dict__ |
|
del state["_fetch_block_cached"] |
|
return state |
|
|
|
def __setstate__(self, state: dict[str, Any]) -> None: |
|
self.__dict__.update(state) |
|
self._fetch_block_cached = functools.lru_cache(state["maxblocks"])( |
|
self._fetch_block |
|
) |
|
|
|
def _fetch(self, start: int | None, end: int | None) -> bytes: |
|
if start is None: |
|
start = 0 |
|
if end is None: |
|
end = self.size |
|
if start >= self.size or start >= end: |
|
return b"" |
|
|
|
|
|
start_block_number = start // self.blocksize |
|
end_block_number = end // self.blocksize |
|
|
|
|
|
for block_number in range(start_block_number, end_block_number + 1): |
|
self._fetch_block_cached(block_number) |
|
|
|
return self._read_cache( |
|
start, |
|
end, |
|
start_block_number=start_block_number, |
|
end_block_number=end_block_number, |
|
) |
|
|
|
def _fetch_block(self, block_number: int) -> bytes: |
|
""" |
|
Fetch the block of data for `block_number`. |
|
""" |
|
if block_number > self.nblocks: |
|
raise ValueError( |
|
f"'block_number={block_number}' is greater than " |
|
f"the number of blocks ({self.nblocks})" |
|
) |
|
|
|
start = block_number * self.blocksize |
|
end = start + self.blocksize |
|
self.total_requested_bytes += end - start |
|
self.miss_count += 1 |
|
logger.info("BlockCache fetching block %d", block_number) |
|
block_contents = super()._fetch(start, end) |
|
return block_contents |
|
|
|
def _read_cache( |
|
self, start: int, end: int, start_block_number: int, end_block_number: int |
|
) -> bytes: |
|
""" |
|
Read from our block cache. |
|
|
|
Parameters |
|
---------- |
|
start, end : int |
|
The start and end byte positions. |
|
start_block_number, end_block_number : int |
|
The start and end block numbers. |
|
""" |
|
start_pos = start % self.blocksize |
|
end_pos = end % self.blocksize |
|
|
|
self.hit_count += 1 |
|
if start_block_number == end_block_number: |
|
block: bytes = self._fetch_block_cached(start_block_number) |
|
return block[start_pos:end_pos] |
|
|
|
else: |
|
|
|
out = [self._fetch_block_cached(start_block_number)[start_pos:]] |
|
|
|
|
|
|
|
|
|
out.extend( |
|
map( |
|
self._fetch_block_cached, |
|
range(start_block_number + 1, end_block_number), |
|
) |
|
) |
|
|
|
|
|
out.append(self._fetch_block_cached(end_block_number)[:end_pos]) |
|
|
|
return b"".join(out) |
|
|
|
|
|
class BytesCache(BaseCache): |
|
"""Cache which holds data in a in-memory bytes object |
|
|
|
Implements read-ahead by the block size, for semi-random reads progressing |
|
through the file. |
|
|
|
Parameters |
|
---------- |
|
trim: bool |
|
As we read more data, whether to discard the start of the buffer when |
|
we are more than a blocksize ahead of it. |
|
""" |
|
|
|
name: ClassVar[str] = "bytes" |
|
|
|
def __init__( |
|
self, blocksize: int, fetcher: Fetcher, size: int, trim: bool = True |
|
) -> None: |
|
super().__init__(blocksize, fetcher, size) |
|
self.cache = b"" |
|
self.start: int | None = None |
|
self.end: int | None = None |
|
self.trim = trim |
|
|
|
def _fetch(self, start: int | None, end: int | None) -> bytes: |
|
|
|
|
|
if start is None: |
|
start = 0 |
|
if end is None: |
|
end = self.size |
|
if start >= self.size or start >= end: |
|
return b"" |
|
if ( |
|
self.start is not None |
|
and start >= self.start |
|
and self.end is not None |
|
and end < self.end |
|
): |
|
|
|
offset = start - self.start |
|
self.hit_count += 1 |
|
return self.cache[offset : offset + end - start] |
|
|
|
if self.blocksize: |
|
bend = min(self.size, end + self.blocksize) |
|
else: |
|
bend = end |
|
|
|
if bend == start or start > self.size: |
|
return b"" |
|
|
|
if (self.start is None or start < self.start) and ( |
|
self.end is None or end > self.end |
|
): |
|
|
|
self.total_requested_bytes += bend - start |
|
self.miss_count += 1 |
|
self.cache = self.fetcher(start, bend) |
|
self.start = start |
|
else: |
|
assert self.start is not None |
|
assert self.end is not None |
|
self.miss_count += 1 |
|
|
|
if start < self.start: |
|
if self.end is None or self.end - end > self.blocksize: |
|
self.total_requested_bytes += bend - start |
|
self.cache = self.fetcher(start, bend) |
|
self.start = start |
|
else: |
|
self.total_requested_bytes += self.start - start |
|
new = self.fetcher(start, self.start) |
|
self.start = start |
|
self.cache = new + self.cache |
|
elif self.end is not None and bend > self.end: |
|
if self.end > self.size: |
|
pass |
|
elif end - self.end > self.blocksize: |
|
self.total_requested_bytes += bend - start |
|
self.cache = self.fetcher(start, bend) |
|
self.start = start |
|
else: |
|
self.total_requested_bytes += bend - self.end |
|
new = self.fetcher(self.end, bend) |
|
self.cache = self.cache + new |
|
|
|
self.end = self.start + len(self.cache) |
|
offset = start - self.start |
|
out = self.cache[offset : offset + end - start] |
|
if self.trim: |
|
num = (self.end - self.start) // (self.blocksize + 1) |
|
if num > 1: |
|
self.start += self.blocksize * num |
|
self.cache = self.cache[self.blocksize * num :] |
|
return out |
|
|
|
def __len__(self) -> int: |
|
return len(self.cache) |
|
|
|
|
|
class AllBytes(BaseCache): |
|
"""Cache entire contents of the file""" |
|
|
|
name: ClassVar[str] = "all" |
|
|
|
def __init__( |
|
self, |
|
blocksize: int | None = None, |
|
fetcher: Fetcher | None = None, |
|
size: int | None = None, |
|
data: bytes | None = None, |
|
) -> None: |
|
super().__init__(blocksize, fetcher, size) |
|
if data is None: |
|
self.miss_count += 1 |
|
self.total_requested_bytes += self.size |
|
data = self.fetcher(0, self.size) |
|
self.data = data |
|
|
|
def _fetch(self, start: int | None, stop: int | None) -> bytes: |
|
self.hit_count += 1 |
|
return self.data[start:stop] |
|
|
|
|
|
class KnownPartsOfAFile(BaseCache): |
|
""" |
|
Cache holding known file parts. |
|
|
|
Parameters |
|
---------- |
|
blocksize: int |
|
How far to read ahead in numbers of bytes |
|
fetcher: func |
|
Function of the form f(start, end) which gets bytes from remote as |
|
specified |
|
size: int |
|
How big this file is |
|
data: dict |
|
A dictionary mapping explicit `(start, stop)` file-offset tuples |
|
with known bytes. |
|
strict: bool, default True |
|
Whether to fetch reads that go beyond a known byte-range boundary. |
|
If `False`, any read that ends outside a known part will be zero |
|
padded. Note that zero padding will not be used for reads that |
|
begin outside a known byte-range. |
|
""" |
|
|
|
name: ClassVar[str] = "parts" |
|
|
|
def __init__( |
|
self, |
|
blocksize: int, |
|
fetcher: Fetcher, |
|
size: int, |
|
data: Optional[dict[tuple[int, int], bytes]] = None, |
|
strict: bool = True, |
|
**_: Any, |
|
): |
|
super().__init__(blocksize, fetcher, size) |
|
self.strict = strict |
|
|
|
|
|
if data: |
|
old_offsets = sorted(data.keys()) |
|
offsets = [old_offsets[0]] |
|
blocks = [data.pop(old_offsets[0])] |
|
for start, stop in old_offsets[1:]: |
|
start0, stop0 = offsets[-1] |
|
if start == stop0: |
|
offsets[-1] = (start0, stop) |
|
blocks[-1] += data.pop((start, stop)) |
|
else: |
|
offsets.append((start, stop)) |
|
blocks.append(data.pop((start, stop))) |
|
|
|
self.data = dict(zip(offsets, blocks)) |
|
else: |
|
self.data = {} |
|
|
|
def _fetch(self, start: int | None, stop: int | None) -> bytes: |
|
if start is None: |
|
start = 0 |
|
if stop is None: |
|
stop = self.size |
|
|
|
out = b"" |
|
for (loc0, loc1), data in self.data.items(): |
|
|
|
|
|
if loc0 <= start < loc1: |
|
off = start - loc0 |
|
out = data[off : off + stop - start] |
|
if not self.strict or loc0 <= stop <= loc1: |
|
|
|
|
|
|
|
|
|
out += b"\x00" * (stop - start - len(out)) |
|
self.hit_count += 1 |
|
return out |
|
else: |
|
|
|
|
|
|
|
start = loc1 |
|
break |
|
|
|
|
|
|
|
|
|
if self.fetcher is None: |
|
|
|
raise ValueError(f"Read is outside the known file parts: {(start, stop)}. ") |
|
|
|
|
|
warnings.warn( |
|
f"Read is outside the known file parts: {(start, stop)}. " |
|
f"IO/caching performance may be poor!" |
|
) |
|
logger.debug(f"KnownPartsOfAFile cache fetching {start}-{stop}") |
|
self.total_requested_bytes += stop - start |
|
self.miss_count += 1 |
|
return out + super()._fetch(start, stop) |
|
|
|
|
|
class UpdatableLRU(Generic[P, T]): |
|
""" |
|
Custom implementation of LRU cache that allows updating keys |
|
|
|
Used by BackgroudBlockCache |
|
""" |
|
|
|
class CacheInfo(NamedTuple): |
|
hits: int |
|
misses: int |
|
maxsize: int |
|
currsize: int |
|
|
|
def __init__(self, func: Callable[P, T], max_size: int = 128) -> None: |
|
self._cache: OrderedDict[Any, T] = collections.OrderedDict() |
|
self._func = func |
|
self._max_size = max_size |
|
self._hits = 0 |
|
self._misses = 0 |
|
self._lock = threading.Lock() |
|
|
|
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: |
|
if kwargs: |
|
raise TypeError(f"Got unexpected keyword argument {kwargs.keys()}") |
|
with self._lock: |
|
if args in self._cache: |
|
self._cache.move_to_end(args) |
|
self._hits += 1 |
|
return self._cache[args] |
|
|
|
result = self._func(*args, **kwargs) |
|
|
|
with self._lock: |
|
self._cache[args] = result |
|
self._misses += 1 |
|
if len(self._cache) > self._max_size: |
|
self._cache.popitem(last=False) |
|
|
|
return result |
|
|
|
def is_key_cached(self, *args: Any) -> bool: |
|
with self._lock: |
|
return args in self._cache |
|
|
|
def add_key(self, result: T, *args: Any) -> None: |
|
with self._lock: |
|
self._cache[args] = result |
|
if len(self._cache) > self._max_size: |
|
self._cache.popitem(last=False) |
|
|
|
def cache_info(self) -> UpdatableLRU.CacheInfo: |
|
with self._lock: |
|
return self.CacheInfo( |
|
maxsize=self._max_size, |
|
currsize=len(self._cache), |
|
hits=self._hits, |
|
misses=self._misses, |
|
) |
|
|
|
|
|
class BackgroundBlockCache(BaseCache): |
|
""" |
|
Cache holding memory as a set of blocks with pre-loading of |
|
the next block in the background. |
|
|
|
Requests are only ever made ``blocksize`` at a time, and are |
|
stored in an LRU cache. The least recently accessed block is |
|
discarded when more than ``maxblocks`` are stored. If the |
|
next block is not in cache, it is loaded in a separate thread |
|
in non-blocking way. |
|
|
|
Parameters |
|
---------- |
|
blocksize : int |
|
The number of bytes to store in each block. |
|
Requests are only ever made for ``blocksize``, so this |
|
should balance the overhead of making a request against |
|
the granularity of the blocks. |
|
fetcher : Callable |
|
size : int |
|
The total size of the file being cached. |
|
maxblocks : int |
|
The maximum number of blocks to cache for. The maximum memory |
|
use for this cache is then ``blocksize * maxblocks``. |
|
""" |
|
|
|
name: ClassVar[str] = "background" |
|
|
|
def __init__( |
|
self, blocksize: int, fetcher: Fetcher, size: int, maxblocks: int = 32 |
|
) -> None: |
|
super().__init__(blocksize, fetcher, size) |
|
self.nblocks = math.ceil(size / blocksize) |
|
self.maxblocks = maxblocks |
|
self._fetch_block_cached = UpdatableLRU(self._fetch_block, maxblocks) |
|
|
|
self._thread_executor = ThreadPoolExecutor(max_workers=1) |
|
self._fetch_future_block_number: int | None = None |
|
self._fetch_future: Future[bytes] | None = None |
|
self._fetch_future_lock = threading.Lock() |
|
|
|
def cache_info(self) -> UpdatableLRU.CacheInfo: |
|
""" |
|
The statistics on the block cache. |
|
|
|
Returns |
|
------- |
|
NamedTuple |
|
Returned directly from the LRU Cache used internally. |
|
""" |
|
return self._fetch_block_cached.cache_info() |
|
|
|
def __getstate__(self) -> dict[str, Any]: |
|
state = self.__dict__ |
|
del state["_fetch_block_cached"] |
|
del state["_thread_executor"] |
|
del state["_fetch_future_block_number"] |
|
del state["_fetch_future"] |
|
del state["_fetch_future_lock"] |
|
return state |
|
|
|
def __setstate__(self, state) -> None: |
|
self.__dict__.update(state) |
|
self._fetch_block_cached = UpdatableLRU(self._fetch_block, state["maxblocks"]) |
|
self._thread_executor = ThreadPoolExecutor(max_workers=1) |
|
self._fetch_future_block_number = None |
|
self._fetch_future = None |
|
self._fetch_future_lock = threading.Lock() |
|
|
|
def _fetch(self, start: int | None, end: int | None) -> bytes: |
|
if start is None: |
|
start = 0 |
|
if end is None: |
|
end = self.size |
|
if start >= self.size or start >= end: |
|
return b"" |
|
|
|
|
|
start_block_number = start // self.blocksize |
|
end_block_number = end // self.blocksize |
|
|
|
fetch_future_block_number = None |
|
fetch_future = None |
|
with self._fetch_future_lock: |
|
|
|
if self._fetch_future is not None: |
|
assert self._fetch_future_block_number is not None |
|
if self._fetch_future.done(): |
|
logger.info("BlockCache joined background fetch without waiting.") |
|
self._fetch_block_cached.add_key( |
|
self._fetch_future.result(), self._fetch_future_block_number |
|
) |
|
|
|
self._fetch_future_block_number = None |
|
self._fetch_future = None |
|
else: |
|
|
|
must_join = bool( |
|
start_block_number |
|
<= self._fetch_future_block_number |
|
<= end_block_number |
|
) |
|
if must_join: |
|
|
|
|
|
fetch_future_block_number = self._fetch_future_block_number |
|
fetch_future = self._fetch_future |
|
|
|
|
|
self._fetch_future_block_number = None |
|
self._fetch_future = None |
|
|
|
|
|
if fetch_future is not None: |
|
logger.info("BlockCache waiting for background fetch.") |
|
|
|
self._fetch_block_cached.add_key( |
|
fetch_future.result(), fetch_future_block_number |
|
) |
|
|
|
|
|
for block_number in range(start_block_number, end_block_number + 1): |
|
self._fetch_block_cached(block_number) |
|
|
|
|
|
|
|
end_block_plus_1 = end_block_number + 1 |
|
with self._fetch_future_lock: |
|
if ( |
|
self._fetch_future is None |
|
and end_block_plus_1 <= self.nblocks |
|
and not self._fetch_block_cached.is_key_cached(end_block_plus_1) |
|
): |
|
self._fetch_future_block_number = end_block_plus_1 |
|
self._fetch_future = self._thread_executor.submit( |
|
self._fetch_block, end_block_plus_1, "async" |
|
) |
|
|
|
return self._read_cache( |
|
start, |
|
end, |
|
start_block_number=start_block_number, |
|
end_block_number=end_block_number, |
|
) |
|
|
|
def _fetch_block(self, block_number: int, log_info: str = "sync") -> bytes: |
|
""" |
|
Fetch the block of data for `block_number`. |
|
""" |
|
if block_number > self.nblocks: |
|
raise ValueError( |
|
f"'block_number={block_number}' is greater than " |
|
f"the number of blocks ({self.nblocks})" |
|
) |
|
|
|
start = block_number * self.blocksize |
|
end = start + self.blocksize |
|
logger.info("BlockCache fetching block (%s) %d", log_info, block_number) |
|
self.total_requested_bytes += end - start |
|
self.miss_count += 1 |
|
block_contents = super()._fetch(start, end) |
|
return block_contents |
|
|
|
def _read_cache( |
|
self, start: int, end: int, start_block_number: int, end_block_number: int |
|
) -> bytes: |
|
""" |
|
Read from our block cache. |
|
|
|
Parameters |
|
---------- |
|
start, end : int |
|
The start and end byte positions. |
|
start_block_number, end_block_number : int |
|
The start and end block numbers. |
|
""" |
|
start_pos = start % self.blocksize |
|
end_pos = end % self.blocksize |
|
|
|
|
|
self.hit_count += 1 |
|
|
|
if start_block_number == end_block_number: |
|
block = self._fetch_block_cached(start_block_number) |
|
return block[start_pos:end_pos] |
|
|
|
else: |
|
|
|
out = [self._fetch_block_cached(start_block_number)[start_pos:]] |
|
|
|
|
|
|
|
|
|
out.extend( |
|
map( |
|
self._fetch_block_cached, |
|
range(start_block_number + 1, end_block_number), |
|
) |
|
) |
|
|
|
|
|
out.append(self._fetch_block_cached(end_block_number)[:end_pos]) |
|
|
|
return b"".join(out) |
|
|
|
|
|
caches: dict[str | None, type[BaseCache]] = { |
|
|
|
None: BaseCache, |
|
} |
|
|
|
|
|
def register_cache(cls: type[BaseCache], clobber: bool = False) -> None: |
|
"""'Register' cache implementation. |
|
|
|
Parameters |
|
---------- |
|
clobber: bool, optional |
|
If set to True (default is False) - allow to overwrite existing |
|
entry. |
|
|
|
Raises |
|
------ |
|
ValueError |
|
""" |
|
name = cls.name |
|
if not clobber and name in caches: |
|
raise ValueError(f"Cache with name {name!r} is already known: {caches[name]}") |
|
caches[name] = cls |
|
|
|
|
|
for c in ( |
|
BaseCache, |
|
MMapCache, |
|
BytesCache, |
|
ReadAheadCache, |
|
BlockCache, |
|
FirstChunkCache, |
|
AllBytes, |
|
KnownPartsOfAFile, |
|
BackgroundBlockCache, |
|
): |
|
register_cache(c) |
|
|