|
from __future__ import annotations |
|
|
|
from collections.abc import Callable, Mapping |
|
from io import SEEK_SET, UnsupportedOperation |
|
from os import PathLike |
|
from pathlib import Path |
|
from typing import Any, BinaryIO, cast |
|
|
|
from .. import ( |
|
BrokenResourceError, |
|
ClosedResourceError, |
|
EndOfStream, |
|
TypedAttributeSet, |
|
to_thread, |
|
typed_attribute, |
|
) |
|
from ..abc import ByteReceiveStream, ByteSendStream |
|
|
|
|
|
class FileStreamAttribute(TypedAttributeSet): |
|
|
|
file: BinaryIO = typed_attribute() |
|
|
|
path: Path = typed_attribute() |
|
|
|
fileno: int = typed_attribute() |
|
|
|
|
|
class _BaseFileStream: |
|
def __init__(self, file: BinaryIO): |
|
self._file = file |
|
|
|
async def aclose(self) -> None: |
|
await to_thread.run_sync(self._file.close) |
|
|
|
@property |
|
def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]: |
|
attributes: dict[Any, Callable[[], Any]] = { |
|
FileStreamAttribute.file: lambda: self._file, |
|
} |
|
|
|
if hasattr(self._file, "name"): |
|
attributes[FileStreamAttribute.path] = lambda: Path(self._file.name) |
|
|
|
try: |
|
self._file.fileno() |
|
except UnsupportedOperation: |
|
pass |
|
else: |
|
attributes[FileStreamAttribute.fileno] = lambda: self._file.fileno() |
|
|
|
return attributes |
|
|
|
|
|
class FileReadStream(_BaseFileStream, ByteReceiveStream): |
|
""" |
|
A byte stream that reads from a file in the file system. |
|
|
|
:param file: a file that has been opened for reading in binary mode |
|
|
|
.. versionadded:: 3.0 |
|
""" |
|
|
|
@classmethod |
|
async def from_path(cls, path: str | PathLike[str]) -> FileReadStream: |
|
""" |
|
Create a file read stream by opening the given file. |
|
|
|
:param path: path of the file to read from |
|
|
|
""" |
|
file = await to_thread.run_sync(Path(path).open, "rb") |
|
return cls(cast(BinaryIO, file)) |
|
|
|
async def receive(self, max_bytes: int = 65536) -> bytes: |
|
try: |
|
data = await to_thread.run_sync(self._file.read, max_bytes) |
|
except ValueError: |
|
raise ClosedResourceError from None |
|
except OSError as exc: |
|
raise BrokenResourceError from exc |
|
|
|
if data: |
|
return data |
|
else: |
|
raise EndOfStream |
|
|
|
async def seek(self, position: int, whence: int = SEEK_SET) -> int: |
|
""" |
|
Seek the file to the given position. |
|
|
|
.. seealso:: :meth:`io.IOBase.seek` |
|
|
|
.. note:: Not all file descriptors are seekable. |
|
|
|
:param position: position to seek the file to |
|
:param whence: controls how ``position`` is interpreted |
|
:return: the new absolute position |
|
:raises OSError: if the file is not seekable |
|
|
|
""" |
|
return await to_thread.run_sync(self._file.seek, position, whence) |
|
|
|
async def tell(self) -> int: |
|
""" |
|
Return the current stream position. |
|
|
|
.. note:: Not all file descriptors are seekable. |
|
|
|
:return: the current absolute position |
|
:raises OSError: if the file is not seekable |
|
|
|
""" |
|
return await to_thread.run_sync(self._file.tell) |
|
|
|
|
|
class FileWriteStream(_BaseFileStream, ByteSendStream): |
|
""" |
|
A byte stream that writes to a file in the file system. |
|
|
|
:param file: a file that has been opened for writing in binary mode |
|
|
|
.. versionadded:: 3.0 |
|
""" |
|
|
|
@classmethod |
|
async def from_path( |
|
cls, path: str | PathLike[str], append: bool = False |
|
) -> FileWriteStream: |
|
""" |
|
Create a file write stream by opening the given file for writing. |
|
|
|
:param path: path of the file to write to |
|
:param append: if ``True``, open the file for appending; if ``False``, any |
|
existing file at the given path will be truncated |
|
|
|
""" |
|
mode = "ab" if append else "wb" |
|
file = await to_thread.run_sync(Path(path).open, mode) |
|
return cls(cast(BinaryIO, file)) |
|
|
|
async def send(self, item: bytes) -> None: |
|
try: |
|
await to_thread.run_sync(self._file.write, item) |
|
except ValueError: |
|
raise ClosedResourceError from None |
|
except OSError as exc: |
|
raise BrokenResourceError from exc |
|
|