Spaces:
Running
Running
import asyncio | |
from functools import partial, singledispatch | |
from io import BufferedRandom, BufferedReader, BufferedWriter, FileIO, TextIOBase | |
from tempfile import NamedTemporaryFile as syncNamedTemporaryFile | |
from tempfile import SpooledTemporaryFile as syncSpooledTemporaryFile | |
from tempfile import TemporaryDirectory as syncTemporaryDirectory | |
from tempfile import TemporaryFile as syncTemporaryFile | |
from tempfile import _TemporaryFileWrapper as syncTemporaryFileWrapper | |
from ..base import AiofilesContextManager | |
from ..threadpool.binary import AsyncBufferedIOBase, AsyncBufferedReader, AsyncFileIO | |
from ..threadpool.text import AsyncTextIOWrapper | |
from .temptypes import AsyncSpooledTemporaryFile, AsyncTemporaryDirectory | |
import sys | |
__all__ = [ | |
"NamedTemporaryFile", | |
"TemporaryFile", | |
"SpooledTemporaryFile", | |
"TemporaryDirectory", | |
] | |
# ================================================================ | |
# Public methods for async open and return of temp file/directory | |
# objects with async interface | |
# ================================================================ | |
if sys.version_info >= (3, 12): | |
def NamedTemporaryFile( | |
mode="w+b", | |
buffering=-1, | |
encoding=None, | |
newline=None, | |
suffix=None, | |
prefix=None, | |
dir=None, | |
delete=True, | |
delete_on_close=True, | |
loop=None, | |
executor=None, | |
): | |
"""Async open a named temporary file""" | |
return AiofilesContextManager( | |
_temporary_file( | |
named=True, | |
mode=mode, | |
buffering=buffering, | |
encoding=encoding, | |
newline=newline, | |
suffix=suffix, | |
prefix=prefix, | |
dir=dir, | |
delete=delete, | |
delete_on_close=delete_on_close, | |
loop=loop, | |
executor=executor, | |
) | |
) | |
else: | |
def NamedTemporaryFile( | |
mode="w+b", | |
buffering=-1, | |
encoding=None, | |
newline=None, | |
suffix=None, | |
prefix=None, | |
dir=None, | |
delete=True, | |
loop=None, | |
executor=None, | |
): | |
"""Async open a named temporary file""" | |
return AiofilesContextManager( | |
_temporary_file( | |
named=True, | |
mode=mode, | |
buffering=buffering, | |
encoding=encoding, | |
newline=newline, | |
suffix=suffix, | |
prefix=prefix, | |
dir=dir, | |
delete=delete, | |
loop=loop, | |
executor=executor, | |
) | |
) | |
def TemporaryFile( | |
mode="w+b", | |
buffering=-1, | |
encoding=None, | |
newline=None, | |
suffix=None, | |
prefix=None, | |
dir=None, | |
loop=None, | |
executor=None, | |
): | |
"""Async open an unnamed temporary file""" | |
return AiofilesContextManager( | |
_temporary_file( | |
named=False, | |
mode=mode, | |
buffering=buffering, | |
encoding=encoding, | |
newline=newline, | |
suffix=suffix, | |
prefix=prefix, | |
dir=dir, | |
loop=loop, | |
executor=executor, | |
) | |
) | |
def SpooledTemporaryFile( | |
max_size=0, | |
mode="w+b", | |
buffering=-1, | |
encoding=None, | |
newline=None, | |
suffix=None, | |
prefix=None, | |
dir=None, | |
loop=None, | |
executor=None, | |
): | |
"""Async open a spooled temporary file""" | |
return AiofilesContextManager( | |
_spooled_temporary_file( | |
max_size=max_size, | |
mode=mode, | |
buffering=buffering, | |
encoding=encoding, | |
newline=newline, | |
suffix=suffix, | |
prefix=prefix, | |
dir=dir, | |
loop=loop, | |
executor=executor, | |
) | |
) | |
def TemporaryDirectory(suffix=None, prefix=None, dir=None, loop=None, executor=None): | |
"""Async open a temporary directory""" | |
return AiofilesContextManagerTempDir( | |
_temporary_directory( | |
suffix=suffix, prefix=prefix, dir=dir, loop=loop, executor=executor | |
) | |
) | |
# ========================================================= | |
# Internal coroutines to open new temp files/directories | |
# ========================================================= | |
if sys.version_info >= (3, 12): | |
async def _temporary_file( | |
named=True, | |
mode="w+b", | |
buffering=-1, | |
encoding=None, | |
newline=None, | |
suffix=None, | |
prefix=None, | |
dir=None, | |
delete=True, | |
delete_on_close=True, | |
loop=None, | |
executor=None, | |
max_size=0, | |
): | |
"""Async method to open a temporary file with async interface""" | |
if loop is None: | |
loop = asyncio.get_running_loop() | |
if named: | |
cb = partial( | |
syncNamedTemporaryFile, | |
mode=mode, | |
buffering=buffering, | |
encoding=encoding, | |
newline=newline, | |
suffix=suffix, | |
prefix=prefix, | |
dir=dir, | |
delete=delete, | |
delete_on_close=delete_on_close, | |
) | |
else: | |
cb = partial( | |
syncTemporaryFile, | |
mode=mode, | |
buffering=buffering, | |
encoding=encoding, | |
newline=newline, | |
suffix=suffix, | |
prefix=prefix, | |
dir=dir, | |
) | |
f = await loop.run_in_executor(executor, cb) | |
# Wrap based on type of underlying IO object | |
if type(f) is syncTemporaryFileWrapper: | |
# _TemporaryFileWrapper was used (named files) | |
result = wrap(f.file, f, loop=loop, executor=executor) | |
result._closer = f._closer | |
return result | |
else: | |
# IO object was returned directly without wrapper | |
return wrap(f, f, loop=loop, executor=executor) | |
else: | |
async def _temporary_file( | |
named=True, | |
mode="w+b", | |
buffering=-1, | |
encoding=None, | |
newline=None, | |
suffix=None, | |
prefix=None, | |
dir=None, | |
delete=True, | |
loop=None, | |
executor=None, | |
max_size=0, | |
): | |
"""Async method to open a temporary file with async interface""" | |
if loop is None: | |
loop = asyncio.get_running_loop() | |
if named: | |
cb = partial( | |
syncNamedTemporaryFile, | |
mode=mode, | |
buffering=buffering, | |
encoding=encoding, | |
newline=newline, | |
suffix=suffix, | |
prefix=prefix, | |
dir=dir, | |
delete=delete, | |
) | |
else: | |
cb = partial( | |
syncTemporaryFile, | |
mode=mode, | |
buffering=buffering, | |
encoding=encoding, | |
newline=newline, | |
suffix=suffix, | |
prefix=prefix, | |
dir=dir, | |
) | |
f = await loop.run_in_executor(executor, cb) | |
# Wrap based on type of underlying IO object | |
if type(f) is syncTemporaryFileWrapper: | |
# _TemporaryFileWrapper was used (named files) | |
result = wrap(f.file, f, loop=loop, executor=executor) | |
# add delete property | |
result.delete = f.delete | |
return result | |
else: | |
# IO object was returned directly without wrapper | |
return wrap(f, f, loop=loop, executor=executor) | |
async def _spooled_temporary_file( | |
max_size=0, | |
mode="w+b", | |
buffering=-1, | |
encoding=None, | |
newline=None, | |
suffix=None, | |
prefix=None, | |
dir=None, | |
loop=None, | |
executor=None, | |
): | |
"""Open a spooled temporary file with async interface""" | |
if loop is None: | |
loop = asyncio.get_running_loop() | |
cb = partial( | |
syncSpooledTemporaryFile, | |
max_size=max_size, | |
mode=mode, | |
buffering=buffering, | |
encoding=encoding, | |
newline=newline, | |
suffix=suffix, | |
prefix=prefix, | |
dir=dir, | |
) | |
f = await loop.run_in_executor(executor, cb) | |
# Single interface provided by SpooledTemporaryFile for all modes | |
return AsyncSpooledTemporaryFile(f, loop=loop, executor=executor) | |
async def _temporary_directory( | |
suffix=None, prefix=None, dir=None, loop=None, executor=None | |
): | |
"""Async method to open a temporary directory with async interface""" | |
if loop is None: | |
loop = asyncio.get_running_loop() | |
cb = partial(syncTemporaryDirectory, suffix, prefix, dir) | |
f = await loop.run_in_executor(executor, cb) | |
return AsyncTemporaryDirectory(f, loop=loop, executor=executor) | |
class AiofilesContextManagerTempDir(AiofilesContextManager): | |
"""With returns the directory location, not the object (matching sync lib)""" | |
async def __aenter__(self): | |
self._obj = await self._coro | |
return self._obj.name | |
def wrap(base_io_obj, file, *, loop=None, executor=None): | |
"""Wrap the object with interface based on type of underlying IO""" | |
raise TypeError("Unsupported IO type: {}".format(base_io_obj)) | |
def _(base_io_obj, file, *, loop=None, executor=None): | |
return AsyncTextIOWrapper(file, loop=loop, executor=executor) | |
def _(base_io_obj, file, *, loop=None, executor=None): | |
return AsyncBufferedIOBase(file, loop=loop, executor=executor) | |
def _(base_io_obj, file, *, loop=None, executor=None): | |
return AsyncBufferedReader(file, loop=loop, executor=executor) | |
def _(base_io_obj, file, *, loop=None, executor=None): | |
return AsyncFileIO(file, loop=loop, executor=executor) | |